From c9d9ae7baaf717607db3d4ad81597dc99d0c002c Mon Sep 17 00:00:00 2001 From: antirez Date: Wed, 10 Oct 2018 12:51:02 +0200 Subject: [PATCH] Fix propagation of consumer groups last ID. Issue #5433. --- src/server.c | 1 + src/server.h | 3 ++- src/t_stream.c | 61 +++++++++++++++++++++++++++++++++++++++++++------- 3 files changed, 56 insertions(+), 9 deletions(-) diff --git a/src/server.c b/src/server.c index 78aee5dbb..a217a93c8 100644 --- a/src/server.c +++ b/src/server.c @@ -1702,6 +1702,7 @@ void initServerConfig(void) { server.expireCommand = lookupCommandByCString("expire"); server.pexpireCommand = lookupCommandByCString("pexpire"); server.xclaimCommand = lookupCommandByCString("xclaim"); + server.xgroupCommand = lookupCommandByCString("xgroup"); /* Slow log */ server.slowlog_log_slower_than = CONFIG_DEFAULT_SLOWLOG_LOG_SLOWER_THAN; diff --git a/src/server.h b/src/server.h index 73630b897..7c18a5f44 100644 --- a/src/server.h +++ b/src/server.h @@ -990,7 +990,8 @@ struct redisServer { struct redisCommand *delCommand, *multiCommand, *lpushCommand, *lpopCommand, *rpopCommand, *zpopminCommand, *zpopmaxCommand, *sremCommand, *execCommand, - *expireCommand, *pexpireCommand, *xclaimCommand; + *expireCommand, *pexpireCommand, *xclaimCommand, + *xgroupCommand; /* Fields used only for stats */ time_t stat_starttime; /* Server start time */ long long stat_numcommands; /* Number of processed commands */ diff --git a/src/t_stream.c b/src/t_stream.c index 4387e08a5..a9474aff6 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -791,18 +791,18 @@ robj *createObjectFromStreamID(streamID *id) { /* As a result of an explicit XCLAIM or XREADGROUP command, new entries * are created in the pending list of the stream and consumers. We need * to propagate this changes in the form of XCLAIM commands. */ -void streamPropagateXCLAIM(client *c, robj *key, robj *group, robj *id, streamNACK *nack) { +void streamPropagateXCLAIM(client *c, robj *key, streamCG *group, robj *groupname, robj *id, streamNACK *nack) { /* We need to generate an XCLAIM that will work in a idempotent fashion: * * XCLAIM 0 TIME - * RETRYCOUNT FORCE JUSTID. + * RETRYCOUNT FORCE JUSTID LASTID . * * Note that JUSTID is useful in order to avoid that XCLAIM will do * useless work in the slave side, trying to fetch the stream item. */ - robj *argv[12]; + robj *argv[14]; argv[0] = createStringObject("XCLAIM",6); argv[1] = key; - argv[2] = group; + argv[2] = groupname; argv[3] = createStringObject(nack->consumer->name,sdslen(nack->consumer->name)); argv[4] = createStringObjectFromLongLong(0); argv[5] = id; @@ -812,7 +812,9 @@ void streamPropagateXCLAIM(client *c, robj *key, robj *group, robj *id, streamNA argv[9] = createStringObjectFromLongLong(nack->delivery_count); argv[10] = createStringObject("FORCE",5); argv[11] = createStringObject("JUSTID",6); - propagate(server.xclaimCommand,c->db->id,argv,12,PROPAGATE_AOF|PROPAGATE_REPL); + argv[12] = createStringObject("LASTID",6); + argv[13] = createObjectFromStreamID(&group->last_id); + propagate(server.xclaimCommand,c->db->id,argv,14,PROPAGATE_AOF|PROPAGATE_REPL); decrRefCount(argv[0]); decrRefCount(argv[3]); decrRefCount(argv[4]); @@ -822,6 +824,27 @@ void streamPropagateXCLAIM(client *c, robj *key, robj *group, robj *id, streamNA decrRefCount(argv[9]); decrRefCount(argv[10]); decrRefCount(argv[11]); + decrRefCount(argv[12]); + decrRefCount(argv[13]); +} + +/* We need this when we want to propoagate the new last-id of a consumer group + * that was consumed by XREADGROUP with the NOACK option: in that case we can't + * propagate the last ID just using the XCLAIM LASTID option, so we emit + * + * XGROUP SETID + */ +void streamPropagateGroupID(client *c, robj *key, streamCG *group, robj *groupname) { + robj *argv[5]; + argv[0] = createStringObject("XGROUP",6); + argv[1] = createStringObject("SETID",5); + argv[2] = key; + argv[3] = groupname; + argv[4] = createObjectFromStreamID(&group->last_id); + propagate(server.xgroupCommand,c->db->id,argv,5,PROPAGATE_AOF|PROPAGATE_REPL); + decrRefCount(argv[0]); + decrRefCount(argv[1]); + decrRefCount(argv[4]); } /* Send the specified range to the client 'c'. The range the client will @@ -873,6 +896,7 @@ size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end streamIterator si; int64_t numfields; streamID id; + int lastid_updated = 0; /* If a group was passed, we check if the request is about messages * never delivered so far (normally this happens when ">" ID is passed). @@ -892,8 +916,10 @@ size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end streamIteratorStart(&si,s,start,end,rev); while(streamIteratorGetID(&si,&id,&numfields)) { /* Update the group last_id if needed. */ - if (group && streamCompareID(&id,&group->last_id) > 0) + if (group && streamCompareID(&id,&group->last_id) > 0) { group->last_id = id; + lastid_updated = 1; + } /* Emit a two elements array for each item. The first is * the ID, the second is an array of field-value pairs. */ @@ -953,9 +979,12 @@ size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end /* Propagate as XCLAIM. */ if (spi) { robj *idarg = createObjectFromStreamID(&id); - streamPropagateXCLAIM(c,spi->keyname,spi->groupname,idarg,nack); + streamPropagateXCLAIM(c,spi->keyname,group,spi->groupname,idarg,nack); decrRefCount(idarg); } + } else { + if (lastid_updated) + streamPropagateGroupID(c,spi->keyname,group,spi->groupname); } arraylen++; @@ -1993,6 +2022,14 @@ void xpendingCommand(client *c) { * Return just an array of IDs of messages successfully claimed, * without returning the actual message. * + * 6. LASTID : + * Update the consumer group last ID with the specified ID if the + * current last ID is smaller than the provided one. + * This is used for replication / AOF, so that when we read from a + * consumer group, the XCLAIM that gets propagated to give ownership + * to the consumer, is also used in order to update the group current + * ID. + * * The command returns an array of messages that the user * successfully claimed, so that the caller is able to understand * what messages it is now in charge of. */ @@ -2061,6 +2098,14 @@ void xclaimCommand(client *c) { if (getLongLongFromObjectOrReply(c,c->argv[j],&retrycount, "Invalid RETRYCOUNT option argument for XCLAIM") != C_OK) return; + } else if (!strcasecmp(opt,"LASTID") && moreargs) { + j++; + streamID id; + if (streamParseStrictIDOrReply(c,c->argv[j],&id,0) != C_OK) return; + /* Technically it could be more correct to update that only after + * checking for syntax errors, but this option is only used by + * the replication command that outputs correct syntax. */ + if (streamCompareID(&id,&group->last_id) > 0) group->last_id = id; } else { addReplyErrorFormat(c,"Unrecognized XCLAIM option '%s'",opt); return; @@ -2147,7 +2192,7 @@ void xclaimCommand(client *c) { arraylen++; /* Propagate this change. */ - streamPropagateXCLAIM(c,c->argv[1],c->argv[3],c->argv[j],nack); + streamPropagateXCLAIM(c,c->argv[1],group,c->argv[3],c->argv[j],nack); server.dirty++; } }