mirror of
https://codeberg.org/redict/redict.git
synced 2025-01-22 16:18:28 -05:00
Streams: generate a few additional events.
Currently it does not look it's sensible to generate events for streams consumer groups modification, being them metadata, however at least for key-level events, like the creation or removal of a consumer group, I added a few events here and there. Later we can evaluate if it makes sense to add more. From the POV instead of WAIT (in Redis transaciton) and signaling the key as modified, it looks like that the transaction should not fail when a stream is modified, so no calls are made in consumer groups related functions to signalModifiedKey().
This commit is contained in:
parent
b38682199b
commit
6b8a24a665
@ -1379,7 +1379,7 @@ void xreadCommand(client *c) {
|
||||
/* Emit the two elements sub-array consisting of the name
|
||||
* of the stream and the data we extracted from it. */
|
||||
addReplyMultiBulkLen(c,2);
|
||||
addReplyBulk(c,c->argv[i+streams_arg]);
|
||||
addReplyBulk(c,c->argv[streams_arg+i]);
|
||||
streamConsumer *consumer = NULL;
|
||||
if (groups) consumer = streamLookupConsumer(groups[i],
|
||||
consumername->ptr,1);
|
||||
@ -1606,6 +1606,8 @@ NULL
|
||||
if (cg) {
|
||||
addReply(c,shared.ok);
|
||||
server.dirty++;
|
||||
notifyKeyspaceEvent(NOTIFY_STREAM,"xgroup-create",
|
||||
c->argv[2],c->db->id);
|
||||
} else {
|
||||
addReplySds(c,
|
||||
sdsnew("-BUSYGROUP Consumer Group name already exists\r\n"));
|
||||
@ -1620,12 +1622,15 @@ NULL
|
||||
cg->last_id = id;
|
||||
addReply(c,shared.ok);
|
||||
server.dirty++;
|
||||
notifyKeyspaceEvent(NOTIFY_STREAM,"xgroup-setid",c->argv[2],c->db->id);
|
||||
} else if (!strcasecmp(opt,"DESTROY") && c->argc == 4) {
|
||||
if (cg) {
|
||||
raxRemove(s->cgroups,(unsigned char*)grpname,sdslen(grpname),NULL);
|
||||
streamFreeCG(cg);
|
||||
addReply(c,shared.cone);
|
||||
server.dirty++;
|
||||
notifyKeyspaceEvent(NOTIFY_STREAM,"xgroup-destroy",
|
||||
c->argv[2],c->db->id);
|
||||
} else {
|
||||
addReply(c,shared.czero);
|
||||
}
|
||||
@ -1635,6 +1640,8 @@ NULL
|
||||
long long pending = streamDelConsumer(cg,c->argv[4]->ptr);
|
||||
addReplyLongLong(c,pending);
|
||||
server.dirty++;
|
||||
notifyKeyspaceEvent(NOTIFY_STREAM,"xgroup-delconsumer",
|
||||
c->argv[2],c->db->id);
|
||||
} else if (!strcasecmp(opt,"HELP")) {
|
||||
addReplyHelp(c, help);
|
||||
} else {
|
||||
|
Loading…
Reference in New Issue
Block a user