diff --git a/src/t_stream.c b/src/t_stream.c index a2f203991..bcef4355a 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -1379,7 +1379,7 @@ void xreadCommand(client *c) { /* Emit the two elements sub-array consisting of the name * of the stream and the data we extracted from it. */ addReplyMultiBulkLen(c,2); - addReplyBulk(c,c->argv[i+streams_arg]); + addReplyBulk(c,c->argv[streams_arg+i]); streamConsumer *consumer = NULL; if (groups) consumer = streamLookupConsumer(groups[i], consumername->ptr,1); @@ -1606,6 +1606,8 @@ NULL if (cg) { addReply(c,shared.ok); server.dirty++; + notifyKeyspaceEvent(NOTIFY_STREAM,"xgroup-create", + c->argv[2],c->db->id); } else { addReplySds(c, sdsnew("-BUSYGROUP Consumer Group name already exists\r\n")); @@ -1620,12 +1622,15 @@ NULL cg->last_id = id; addReply(c,shared.ok); server.dirty++; + notifyKeyspaceEvent(NOTIFY_STREAM,"xgroup-setid",c->argv[2],c->db->id); } else if (!strcasecmp(opt,"DESTROY") && c->argc == 4) { if (cg) { raxRemove(s->cgroups,(unsigned char*)grpname,sdslen(grpname),NULL); streamFreeCG(cg); addReply(c,shared.cone); server.dirty++; + notifyKeyspaceEvent(NOTIFY_STREAM,"xgroup-destroy", + c->argv[2],c->db->id); } else { addReply(c,shared.czero); } @@ -1635,6 +1640,8 @@ NULL long long pending = streamDelConsumer(cg,c->argv[4]->ptr); addReplyLongLong(c,pending); server.dirty++; + notifyKeyspaceEvent(NOTIFY_STREAM,"xgroup-delconsumer", + c->argv[2],c->db->id); } else if (!strcasecmp(opt,"HELP")) { addReplyHelp(c, help); } else {