From 455563faec4b904c8ee9a1a16eac2a5bae532dc7 Mon Sep 17 00:00:00 2001 From: antirez Date: Thu, 20 Jun 2013 15:32:00 +0200 Subject: [PATCH] PUBSUB command implemented. Currently it implements three subcommands: PUBSUB CHANNELS [] List channels with non-zero subscribers. PUBSUB NUMSUB [channel_1 ...] List number of subscribers for channels. PUBSUB NUMPAT Return number of subscribed patterns. --- src/pubsub.c | 47 +++++++++++++++++++++++++++++++++++++++++++++++ src/redis.c | 1 + src/redis.h | 1 + 3 files changed, 49 insertions(+) diff --git a/src/pubsub.c b/src/pubsub.c index 524cb9c5a..307825679 100644 --- a/src/pubsub.c +++ b/src/pubsub.c @@ -309,3 +309,50 @@ void publishCommand(redisClient *c) { if (server.cluster_enabled) clusterPropagatePublish(c->argv[1],c->argv[2]); addReplyLongLong(c,receivers); } + +/* PUBSUB command for Pub/Sub introspection. */ +void pubsubCommand(redisClient *c) { + if (!strcasecmp(c->argv[1]->ptr,"channels") && + (c->argc == 2 || c->argc ==3)) + { + /* PUBSUB CHANNELS [] */ + sds pat = (c->argc == 2) ? NULL : c->argv[2]->ptr; + dictIterator *di = dictGetIterator(server.pubsub_channels); + dictEntry *de; + long mblen = 0; + void *replylen; + + replylen = addDeferredMultiBulkLength(c); + while((de = dictNext(di)) != NULL) { + robj *cobj = dictGetKey(de); + sds channel = cobj->ptr; + + if (!pat || stringmatchlen(pat, sdslen(pat), + channel, sdslen(channel),0)) + { + addReplyBulk(c,cobj); + mblen++; + } + } + dictReleaseIterator(di); + setDeferredMultiBulkLength(c,replylen,mblen); + } else if (!strcasecmp(c->argv[1]->ptr,"numsub") && c->argc > 2) { + /* PUBSUB NUMSUB Channel_1 [... Channel_N] */ + int j; + + addReplyMultiBulkLen(c,(c->argc-2)*2); + for (j = 2; j < c->argc; j++) { + list *l = dictFetchValue(server.pubsub_channels,c->argv[j]); + + addReplyBulk(c,c->argv[j]); + addReplyBulkLongLong(c,l ? listLength(l) : 0); + } + } else if (!strcasecmp(c->argv[1]->ptr,"numpat") && c->argc == 2) { + /* PUBSUB NUMPAT */ + addReplyLongLong(c,listLength(server.pubsub_patterns)); + } else { + addReplyErrorFormat(c, + "Unknown PUBSUB subcommand or wrong number of arguments for '%s'", + (char*)c->argv[1]->ptr); + } +} diff --git a/src/redis.c b/src/redis.c index b974d5ef7..eee6cc53a 100644 --- a/src/redis.c +++ b/src/redis.c @@ -240,6 +240,7 @@ struct redisCommand redisCommandTable[] = { {"psubscribe",psubscribeCommand,-2,"rpslt",0,NULL,0,0,0,0,0}, {"punsubscribe",punsubscribeCommand,-1,"rpslt",0,NULL,0,0,0,0,0}, {"publish",publishCommand,3,"pfltr",0,NULL,0,0,0,0,0}, + {"pubsub",pubsubCommand,-2,"pltrR",0,NULL,0,0,0,0,0}, {"watch",watchCommand,-2,"rs",0,noPreloadGetKeys,1,-1,1,0,0}, {"unwatch",unwatchCommand,1,"rs",0,NULL,0,0,0,0,0}, {"cluster",clusterCommand,-2,"ar",0,NULL,0,0,0,0,0}, diff --git a/src/redis.h b/src/redis.h index 8bc76783e..b97fb0739 100644 --- a/src/redis.h +++ b/src/redis.h @@ -1458,6 +1458,7 @@ void unsubscribeCommand(redisClient *c); void psubscribeCommand(redisClient *c); void punsubscribeCommand(redisClient *c); void publishCommand(redisClient *c); +void pubsubCommand(redisClient *c); void watchCommand(redisClient *c); void unwatchCommand(redisClient *c); void clusterCommand(redisClient *c);