From 59cf0824d9e424af0477a133ce700ee76f39f81a Mon Sep 17 00:00:00 2001 From: antirez Date: Wed, 16 Jul 2014 17:34:07 +0200 Subject: [PATCH] PubSub clients refactoring and new PUBSUB flag. The code tested many times if a client had active Pub/Sub subscriptions by checking the length of a list and dictionary where the patterns and channels are stored. This was substituted with a client flag called REDIS_PUBSUB that is simpler to test for. Moreover in order to manage this flag some code was refactored. This commit is believed to have no effects in the behavior of the server. --- src/networking.c | 2 +- src/pubsub.c | 16 +++++++++++++--- src/redis.c | 7 +++---- src/redis.h | 1 + 4 files changed, 18 insertions(+), 8 deletions(-) diff --git a/src/networking.c b/src/networking.c index 8bc6389a4..9c100db19 100644 --- a/src/networking.c +++ b/src/networking.c @@ -1561,7 +1561,7 @@ unsigned long getClientOutputBufferMemoryUsage(redisClient *c) { int getClientType(redisClient *c) { if ((c->flags & REDIS_SLAVE) && !(c->flags & REDIS_MONITOR)) return REDIS_CLIENT_TYPE_SLAVE; - if (dictSize(c->pubsub_channels) || listLength(c->pubsub_patterns)) + if (c->flags & REDIS_PUBSUB) return REDIS_CLIENT_TYPE_PUBSUB; return REDIS_CLIENT_TYPE_NORMAL; } diff --git a/src/pubsub.c b/src/pubsub.c index f83c3a7a1..720cd5185 100644 --- a/src/pubsub.c +++ b/src/pubsub.c @@ -47,6 +47,12 @@ int listMatchPubsubPattern(void *a, void *b) { (equalStringObjects(pa->pattern,pb->pattern)); } +/* Return the number of channels + patterns a client is subscribed to. */ +int clientSubscriptionsCount(redisClient *c) { + return dictSize(c->pubsub_channels)+ + listLength(c->pubsub_patterns); +} + /* Subscribe a client to a channel. Returns 1 if the operation succeeded, or * 0 if the client was already subscribed to that channel. */ int pubsubSubscribeChannel(redisClient *c, robj *channel) { @@ -73,7 +79,7 @@ int pubsubSubscribeChannel(redisClient *c, robj *channel) { addReply(c,shared.mbulkhdr[3]); addReply(c,shared.subscribebulk); addReplyBulk(c,channel); - addReplyLongLong(c,dictSize(c->pubsub_channels)+listLength(c->pubsub_patterns)); + addReplyLongLong(c,clientSubscriptionsCount(c)); return retval; } @@ -135,7 +141,7 @@ int pubsubSubscribePattern(redisClient *c, robj *pattern) { addReply(c,shared.mbulkhdr[3]); addReply(c,shared.psubscribebulk); addReplyBulk(c,pattern); - addReplyLongLong(c,dictSize(c->pubsub_channels)+listLength(c->pubsub_patterns)); + addReplyLongLong(c,clientSubscriptionsCount(c)); return retval; } @@ -168,7 +174,7 @@ int pubsubUnsubscribePattern(redisClient *c, robj *pattern, int notify) { } /* Unsubscribe from all the channels. Return the number of channels the - * client was subscribed from. */ + * client was subscribed to. */ int pubsubUnsubscribeAllChannels(redisClient *c, int notify) { dictIterator *di = dictGetSafeIterator(c->pubsub_channels); dictEntry *de; @@ -273,6 +279,7 @@ void subscribeCommand(redisClient *c) { for (j = 1; j < c->argc; j++) pubsubSubscribeChannel(c,c->argv[j]); + c->flags |= REDIS_PUBSUB; } void unsubscribeCommand(redisClient *c) { @@ -284,6 +291,7 @@ void unsubscribeCommand(redisClient *c) { for (j = 1; j < c->argc; j++) pubsubUnsubscribeChannel(c,c->argv[j],1); } + if (clientSubscriptionsCount(c) == 0) c->flags &= ~REDIS_PUBSUB; } void psubscribeCommand(redisClient *c) { @@ -291,6 +299,7 @@ void psubscribeCommand(redisClient *c) { for (j = 1; j < c->argc; j++) pubsubSubscribePattern(c,c->argv[j]); + c->flags |= REDIS_PUBSUB; } void punsubscribeCommand(redisClient *c) { @@ -302,6 +311,7 @@ void punsubscribeCommand(redisClient *c) { for (j = 1; j < c->argc; j++) pubsubUnsubscribePattern(c,c->argv[j],1); } + if (clientSubscriptionsCount(c) == 0) c->flags &= ~REDIS_PUBSUB; } void publishCommand(redisClient *c) { diff --git a/src/redis.c b/src/redis.c index 1976cc4db..026e2f0e5 100644 --- a/src/redis.c +++ b/src/redis.c @@ -907,8 +907,7 @@ int clientsCronHandleTimeout(redisClient *c) { !(c->flags & REDIS_SLAVE) && /* no timeout for slaves */ !(c->flags & REDIS_MASTER) && /* no timeout for masters */ !(c->flags & REDIS_BLOCKED) && /* no timeout for BLPOP */ - dictSize(c->pubsub_channels) == 0 && /* no timeout for pubsub */ - listLength(c->pubsub_patterns) == 0 && + !(c->flags & REDIS_PUBSUB) && /* no timeout for Pub/Sub clients */ (now - c->lastinteraction > server.maxidletime)) { redisLog(REDIS_VERBOSE,"Closing idle client"); @@ -2211,8 +2210,8 @@ int processCommand(redisClient *c) { } /* Only allow SUBSCRIBE and UNSUBSCRIBE in the context of Pub/Sub */ - if ((dictSize(c->pubsub_channels) > 0 || listLength(c->pubsub_patterns) > 0) - && + if (c->flags & REDIS_PUBSUB && + c->cmd->proc != pingCommand && c->cmd->proc != subscribeCommand && c->cmd->proc != unsubscribeCommand && c->cmd->proc != psubscribeCommand && diff --git a/src/redis.h b/src/redis.h index 9c3596d9f..d2b00bff5 100644 --- a/src/redis.h +++ b/src/redis.h @@ -241,6 +241,7 @@ typedef long long mstime_t; /* millisecond time type. */ #define REDIS_FORCE_REPL (1<<15) /* Force replication of current cmd. */ #define REDIS_PRE_PSYNC (1<<16) /* Instance don't understand PSYNC. */ #define REDIS_READONLY (1<<17) /* Cluster client is in read-only state. */ +#define REDIS_PUBSUB (1<<18) /* Client is in Pub/Sub mode. */ /* Client block type (btype field in client structure) * if REDIS_BLOCKED flag is set. */