diff --git a/src/networking.c b/src/networking.c index 8bc6389a4..9c100db19 100644 --- a/src/networking.c +++ b/src/networking.c @@ -1561,7 +1561,7 @@ unsigned long getClientOutputBufferMemoryUsage(redisClient *c) { int getClientType(redisClient *c) { if ((c->flags & REDIS_SLAVE) && !(c->flags & REDIS_MONITOR)) return REDIS_CLIENT_TYPE_SLAVE; - if (dictSize(c->pubsub_channels) || listLength(c->pubsub_patterns)) + if (c->flags & REDIS_PUBSUB) return REDIS_CLIENT_TYPE_PUBSUB; return REDIS_CLIENT_TYPE_NORMAL; } diff --git a/src/pubsub.c b/src/pubsub.c index f83c3a7a1..720cd5185 100644 --- a/src/pubsub.c +++ b/src/pubsub.c @@ -47,6 +47,12 @@ int listMatchPubsubPattern(void *a, void *b) { (equalStringObjects(pa->pattern,pb->pattern)); } +/* Return the number of channels + patterns a client is subscribed to. */ +int clientSubscriptionsCount(redisClient *c) { + return dictSize(c->pubsub_channels)+ + listLength(c->pubsub_patterns); +} + /* Subscribe a client to a channel. Returns 1 if the operation succeeded, or * 0 if the client was already subscribed to that channel. */ int pubsubSubscribeChannel(redisClient *c, robj *channel) { @@ -73,7 +79,7 @@ int pubsubSubscribeChannel(redisClient *c, robj *channel) { addReply(c,shared.mbulkhdr[3]); addReply(c,shared.subscribebulk); addReplyBulk(c,channel); - addReplyLongLong(c,dictSize(c->pubsub_channels)+listLength(c->pubsub_patterns)); + addReplyLongLong(c,clientSubscriptionsCount(c)); return retval; } @@ -135,7 +141,7 @@ int pubsubSubscribePattern(redisClient *c, robj *pattern) { addReply(c,shared.mbulkhdr[3]); addReply(c,shared.psubscribebulk); addReplyBulk(c,pattern); - addReplyLongLong(c,dictSize(c->pubsub_channels)+listLength(c->pubsub_patterns)); + addReplyLongLong(c,clientSubscriptionsCount(c)); return retval; } @@ -168,7 +174,7 @@ int pubsubUnsubscribePattern(redisClient *c, robj *pattern, int notify) { } /* Unsubscribe from all the channels. Return the number of channels the - * client was subscribed from. */ + * client was subscribed to. */ int pubsubUnsubscribeAllChannels(redisClient *c, int notify) { dictIterator *di = dictGetSafeIterator(c->pubsub_channels); dictEntry *de; @@ -273,6 +279,7 @@ void subscribeCommand(redisClient *c) { for (j = 1; j < c->argc; j++) pubsubSubscribeChannel(c,c->argv[j]); + c->flags |= REDIS_PUBSUB; } void unsubscribeCommand(redisClient *c) { @@ -284,6 +291,7 @@ void unsubscribeCommand(redisClient *c) { for (j = 1; j < c->argc; j++) pubsubUnsubscribeChannel(c,c->argv[j],1); } + if (clientSubscriptionsCount(c) == 0) c->flags &= ~REDIS_PUBSUB; } void psubscribeCommand(redisClient *c) { @@ -291,6 +299,7 @@ void psubscribeCommand(redisClient *c) { for (j = 1; j < c->argc; j++) pubsubSubscribePattern(c,c->argv[j]); + c->flags |= REDIS_PUBSUB; } void punsubscribeCommand(redisClient *c) { @@ -302,6 +311,7 @@ void punsubscribeCommand(redisClient *c) { for (j = 1; j < c->argc; j++) pubsubUnsubscribePattern(c,c->argv[j],1); } + if (clientSubscriptionsCount(c) == 0) c->flags &= ~REDIS_PUBSUB; } void publishCommand(redisClient *c) { diff --git a/src/redis.c b/src/redis.c index 1976cc4db..026e2f0e5 100644 --- a/src/redis.c +++ b/src/redis.c @@ -907,8 +907,7 @@ int clientsCronHandleTimeout(redisClient *c) { !(c->flags & REDIS_SLAVE) && /* no timeout for slaves */ !(c->flags & REDIS_MASTER) && /* no timeout for masters */ !(c->flags & REDIS_BLOCKED) && /* no timeout for BLPOP */ - dictSize(c->pubsub_channels) == 0 && /* no timeout for pubsub */ - listLength(c->pubsub_patterns) == 0 && + !(c->flags & REDIS_PUBSUB) && /* no timeout for Pub/Sub clients */ (now - c->lastinteraction > server.maxidletime)) { redisLog(REDIS_VERBOSE,"Closing idle client"); @@ -2211,8 +2210,8 @@ int processCommand(redisClient *c) { } /* Only allow SUBSCRIBE and UNSUBSCRIBE in the context of Pub/Sub */ - if ((dictSize(c->pubsub_channels) > 0 || listLength(c->pubsub_patterns) > 0) - && + if (c->flags & REDIS_PUBSUB && + c->cmd->proc != pingCommand && c->cmd->proc != subscribeCommand && c->cmd->proc != unsubscribeCommand && c->cmd->proc != psubscribeCommand && diff --git a/src/redis.h b/src/redis.h index 9c3596d9f..d2b00bff5 100644 --- a/src/redis.h +++ b/src/redis.h @@ -241,6 +241,7 @@ typedef long long mstime_t; /* millisecond time type. */ #define REDIS_FORCE_REPL (1<<15) /* Force replication of current cmd. */ #define REDIS_PRE_PSYNC (1<<16) /* Instance don't understand PSYNC. */ #define REDIS_READONLY (1<<17) /* Cluster client is in read-only state. */ +#define REDIS_PUBSUB (1<<18) /* Client is in Pub/Sub mode. */ /* Client block type (btype field in client structure) * if REDIS_BLOCKED flag is set. */