PUBSUB command implemented.

Currently it implements three subcommands:

PUBSUB CHANNELS [<pattern>]    List channels with non-zero subscribers.
PUBSUB NUMSUB [channel_1 ...]  List number of subscribers for channels.
PUBSUB NUMPAT                  Return number of subscribed patterns.
This commit is contained in:
antirez 2013-06-20 15:32:00 +02:00
parent 4c0f8c4e5a
commit 455563faec
3 changed files with 49 additions and 0 deletions

View File

@ -309,3 +309,50 @@ void publishCommand(redisClient *c) {
if (server.cluster_enabled) clusterPropagatePublish(c->argv[1],c->argv[2]); if (server.cluster_enabled) clusterPropagatePublish(c->argv[1],c->argv[2]);
addReplyLongLong(c,receivers); addReplyLongLong(c,receivers);
} }
/* PUBSUB command for Pub/Sub introspection. */
void pubsubCommand(redisClient *c) {
if (!strcasecmp(c->argv[1]->ptr,"channels") &&
(c->argc == 2 || c->argc ==3))
{
/* PUBSUB CHANNELS [<pattern>] */
sds pat = (c->argc == 2) ? NULL : c->argv[2]->ptr;
dictIterator *di = dictGetIterator(server.pubsub_channels);
dictEntry *de;
long mblen = 0;
void *replylen;
replylen = addDeferredMultiBulkLength(c);
while((de = dictNext(di)) != NULL) {
robj *cobj = dictGetKey(de);
sds channel = cobj->ptr;
if (!pat || stringmatchlen(pat, sdslen(pat),
channel, sdslen(channel),0))
{
addReplyBulk(c,cobj);
mblen++;
}
}
dictReleaseIterator(di);
setDeferredMultiBulkLength(c,replylen,mblen);
} else if (!strcasecmp(c->argv[1]->ptr,"numsub") && c->argc > 2) {
/* PUBSUB NUMSUB Channel_1 [... Channel_N] */
int j;
addReplyMultiBulkLen(c,(c->argc-2)*2);
for (j = 2; j < c->argc; j++) {
list *l = dictFetchValue(server.pubsub_channels,c->argv[j]);
addReplyBulk(c,c->argv[j]);
addReplyBulkLongLong(c,l ? listLength(l) : 0);
}
} else if (!strcasecmp(c->argv[1]->ptr,"numpat") && c->argc == 2) {
/* PUBSUB NUMPAT */
addReplyLongLong(c,listLength(server.pubsub_patterns));
} else {
addReplyErrorFormat(c,
"Unknown PUBSUB subcommand or wrong number of arguments for '%s'",
(char*)c->argv[1]->ptr);
}
}

View File

@ -240,6 +240,7 @@ struct redisCommand redisCommandTable[] = {
{"psubscribe",psubscribeCommand,-2,"rpslt",0,NULL,0,0,0,0,0}, {"psubscribe",psubscribeCommand,-2,"rpslt",0,NULL,0,0,0,0,0},
{"punsubscribe",punsubscribeCommand,-1,"rpslt",0,NULL,0,0,0,0,0}, {"punsubscribe",punsubscribeCommand,-1,"rpslt",0,NULL,0,0,0,0,0},
{"publish",publishCommand,3,"pfltr",0,NULL,0,0,0,0,0}, {"publish",publishCommand,3,"pfltr",0,NULL,0,0,0,0,0},
{"pubsub",pubsubCommand,-2,"pltrR",0,NULL,0,0,0,0,0},
{"watch",watchCommand,-2,"rs",0,noPreloadGetKeys,1,-1,1,0,0}, {"watch",watchCommand,-2,"rs",0,noPreloadGetKeys,1,-1,1,0,0},
{"unwatch",unwatchCommand,1,"rs",0,NULL,0,0,0,0,0}, {"unwatch",unwatchCommand,1,"rs",0,NULL,0,0,0,0,0},
{"cluster",clusterCommand,-2,"ar",0,NULL,0,0,0,0,0}, {"cluster",clusterCommand,-2,"ar",0,NULL,0,0,0,0,0},

View File

@ -1458,6 +1458,7 @@ void unsubscribeCommand(redisClient *c);
void psubscribeCommand(redisClient *c); void psubscribeCommand(redisClient *c);
void punsubscribeCommand(redisClient *c); void punsubscribeCommand(redisClient *c);
void publishCommand(redisClient *c); void publishCommand(redisClient *c);
void pubsubCommand(redisClient *c);
void watchCommand(redisClient *c); void watchCommand(redisClient *c);
void unwatchCommand(redisClient *c); void unwatchCommand(redisClient *c);
void clusterCommand(redisClient *c); void clusterCommand(redisClient *c);