Fix propagation of consumer groups last ID.

Issue #5433.
This commit is contained in:
antirez 2018-10-10 12:51:02 +02:00
parent 4eedb0bf94
commit c9d9ae7baa
3 changed files with 56 additions and 9 deletions

View File

@ -1702,6 +1702,7 @@ void initServerConfig(void) {
server.expireCommand = lookupCommandByCString("expire");
server.pexpireCommand = lookupCommandByCString("pexpire");
server.xclaimCommand = lookupCommandByCString("xclaim");
server.xgroupCommand = lookupCommandByCString("xgroup");
/* Slow log */
server.slowlog_log_slower_than = CONFIG_DEFAULT_SLOWLOG_LOG_SLOWER_THAN;

View File

@ -990,7 +990,8 @@ struct redisServer {
struct redisCommand *delCommand, *multiCommand, *lpushCommand,
*lpopCommand, *rpopCommand, *zpopminCommand,
*zpopmaxCommand, *sremCommand, *execCommand,
*expireCommand, *pexpireCommand, *xclaimCommand;
*expireCommand, *pexpireCommand, *xclaimCommand,
*xgroupCommand;
/* Fields used only for stats */
time_t stat_starttime; /* Server start time */
long long stat_numcommands; /* Number of processed commands */

View File

@ -791,18 +791,18 @@ robj *createObjectFromStreamID(streamID *id) {
/* As a result of an explicit XCLAIM or XREADGROUP command, new entries
* are created in the pending list of the stream and consumers. We need
* to propagate this changes in the form of XCLAIM commands. */
void streamPropagateXCLAIM(client *c, robj *key, robj *group, robj *id, streamNACK *nack) {
void streamPropagateXCLAIM(client *c, robj *key, streamCG *group, robj *groupname, robj *id, streamNACK *nack) {
/* We need to generate an XCLAIM that will work in a idempotent fashion:
*
* XCLAIM <key> <group> <consumer> 0 <id> TIME <milliseconds-unix-time>
* RETRYCOUNT <count> FORCE JUSTID.
* RETRYCOUNT <count> FORCE JUSTID LASTID <id>.
*
* Note that JUSTID is useful in order to avoid that XCLAIM will do
* useless work in the slave side, trying to fetch the stream item. */
robj *argv[12];
robj *argv[14];
argv[0] = createStringObject("XCLAIM",6);
argv[1] = key;
argv[2] = group;
argv[2] = groupname;
argv[3] = createStringObject(nack->consumer->name,sdslen(nack->consumer->name));
argv[4] = createStringObjectFromLongLong(0);
argv[5] = id;
@ -812,7 +812,9 @@ void streamPropagateXCLAIM(client *c, robj *key, robj *group, robj *id, streamNA
argv[9] = createStringObjectFromLongLong(nack->delivery_count);
argv[10] = createStringObject("FORCE",5);
argv[11] = createStringObject("JUSTID",6);
propagate(server.xclaimCommand,c->db->id,argv,12,PROPAGATE_AOF|PROPAGATE_REPL);
argv[12] = createStringObject("LASTID",6);
argv[13] = createObjectFromStreamID(&group->last_id);
propagate(server.xclaimCommand,c->db->id,argv,14,PROPAGATE_AOF|PROPAGATE_REPL);
decrRefCount(argv[0]);
decrRefCount(argv[3]);
decrRefCount(argv[4]);
@ -822,6 +824,27 @@ void streamPropagateXCLAIM(client *c, robj *key, robj *group, robj *id, streamNA
decrRefCount(argv[9]);
decrRefCount(argv[10]);
decrRefCount(argv[11]);
decrRefCount(argv[12]);
decrRefCount(argv[13]);
}
/* We need this when we want to propoagate the new last-id of a consumer group
* that was consumed by XREADGROUP with the NOACK option: in that case we can't
* propagate the last ID just using the XCLAIM LASTID option, so we emit
*
* XGROUP SETID <key> <groupname> <id>
*/
void streamPropagateGroupID(client *c, robj *key, streamCG *group, robj *groupname) {
robj *argv[5];
argv[0] = createStringObject("XGROUP",6);
argv[1] = createStringObject("SETID",5);
argv[2] = key;
argv[3] = groupname;
argv[4] = createObjectFromStreamID(&group->last_id);
propagate(server.xgroupCommand,c->db->id,argv,5,PROPAGATE_AOF|PROPAGATE_REPL);
decrRefCount(argv[0]);
decrRefCount(argv[1]);
decrRefCount(argv[4]);
}
/* Send the specified range to the client 'c'. The range the client will
@ -873,6 +896,7 @@ size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end
streamIterator si;
int64_t numfields;
streamID id;
int lastid_updated = 0;
/* If a group was passed, we check if the request is about messages
* never delivered so far (normally this happens when ">" ID is passed).
@ -892,8 +916,10 @@ size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end
streamIteratorStart(&si,s,start,end,rev);
while(streamIteratorGetID(&si,&id,&numfields)) {
/* Update the group last_id if needed. */
if (group && streamCompareID(&id,&group->last_id) > 0)
if (group && streamCompareID(&id,&group->last_id) > 0) {
group->last_id = id;
lastid_updated = 1;
}
/* Emit a two elements array for each item. The first is
* the ID, the second is an array of field-value pairs. */
@ -953,9 +979,12 @@ size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end
/* Propagate as XCLAIM. */
if (spi) {
robj *idarg = createObjectFromStreamID(&id);
streamPropagateXCLAIM(c,spi->keyname,spi->groupname,idarg,nack);
streamPropagateXCLAIM(c,spi->keyname,group,spi->groupname,idarg,nack);
decrRefCount(idarg);
}
} else {
if (lastid_updated)
streamPropagateGroupID(c,spi->keyname,group,spi->groupname);
}
arraylen++;
@ -1993,6 +2022,14 @@ void xpendingCommand(client *c) {
* Return just an array of IDs of messages successfully claimed,
* without returning the actual message.
*
* 6. LASTID <id>:
* Update the consumer group last ID with the specified ID if the
* current last ID is smaller than the provided one.
* This is used for replication / AOF, so that when we read from a
* consumer group, the XCLAIM that gets propagated to give ownership
* to the consumer, is also used in order to update the group current
* ID.
*
* The command returns an array of messages that the user
* successfully claimed, so that the caller is able to understand
* what messages it is now in charge of. */
@ -2061,6 +2098,14 @@ void xclaimCommand(client *c) {
if (getLongLongFromObjectOrReply(c,c->argv[j],&retrycount,
"Invalid RETRYCOUNT option argument for XCLAIM")
!= C_OK) return;
} else if (!strcasecmp(opt,"LASTID") && moreargs) {
j++;
streamID id;
if (streamParseStrictIDOrReply(c,c->argv[j],&id,0) != C_OK) return;
/* Technically it could be more correct to update that only after
* checking for syntax errors, but this option is only used by
* the replication command that outputs correct syntax. */
if (streamCompareID(&id,&group->last_id) > 0) group->last_id = id;
} else {
addReplyErrorFormat(c,"Unrecognized XCLAIM option '%s'",opt);
return;
@ -2147,7 +2192,7 @@ void xclaimCommand(client *c) {
arraylen++;
/* Propagate this change. */
streamPropagateXCLAIM(c,c->argv[1],c->argv[3],c->argv[j],nack);
streamPropagateXCLAIM(c,c->argv[1],group,c->argv[3],c->argv[j],nack);
server.dirty++;
}
}