Stream: Inconsistency between master and replica some XREADGROUP case (#7526)

XREADGROUP auto-creates the consumer inside the consumer group the
first time it saw it.
When XREADGROUP is being used with NOACK option, the message will not
be added into the client's PEL and XGROUP SETID would be propagated.
When the replica gets the XGROUP SETID it will only update the last delivered
id of the group, but will not create the consumer.

So, in this commit XGROUP CREATECONSUMER is being added.
Command pattern: XGROUP CREATECONSUMER <key> <group> <consumer>.

When NOACK option is being used, createconsumer command would be
propagated as well.

In case of AOFREWRITE, consumer with an empty PEL would be saved with
XGROUP CREATECONSUMER whereas consumer with pending entries would be
saved with XCLAIM
This commit is contained in:
valentinogeron 2020-09-24 12:02:40 +03:00 committed by GitHub
parent b7ce583a5e
commit 795c454db1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 193 additions and 21 deletions

View File

@ -1185,6 +1185,20 @@ int rioWriteStreamPendingEntry(rio *r, robj *key, const char *groupname, size_t
return 1; return 1;
} }
/* Helper for rewriteStreamObject(): emit the XGROUP CREATECONSUMER is
* needed in order to create consumers that do not have any pending entries.
* All this in the context of the specified key and group. */
int rioWriteStreamEmptyConsumer(rio *r, robj *key, const char *groupname, size_t groupname_len, streamConsumer *consumer) {
/* XGROUP CREATECONSUMER <key> <group> <consumer> */
if (rioWriteBulkCount(r,'*',5) == 0) return 0;
if (rioWriteBulkString(r,"XGROUP",6) == 0) return 0;
if (rioWriteBulkString(r,"CREATECONSUMER",14) == 0) return 0;
if (rioWriteBulkObject(r,key) == 0) return 0;
if (rioWriteBulkString(r,groupname,groupname_len) == 0) return 0;
if (rioWriteBulkString(r,consumer->name,sdslen(consumer->name)) == 0) return 0;
return 1;
}
/* Emit the commands needed to rebuild a stream object. /* Emit the commands needed to rebuild a stream object.
* The function returns 0 on error, 1 on success. */ * The function returns 0 on error, 1 on success. */
int rewriteStreamObject(rio *r, robj *key, robj *o) { int rewriteStreamObject(rio *r, robj *key, robj *o) {
@ -1273,13 +1287,22 @@ int rewriteStreamObject(rio *r, robj *key, robj *o) {
} }
/* Generate XCLAIMs for each consumer that happens to /* Generate XCLAIMs for each consumer that happens to
* have pending entries. Empty consumers have no semantical * have pending entries. Empty consumers would be generated with
* value so they are discarded. */ * XGROUP CREATECONSUMER. */
raxIterator ri_cons; raxIterator ri_cons;
raxStart(&ri_cons,group->consumers); raxStart(&ri_cons,group->consumers);
raxSeek(&ri_cons,"^",NULL,0); raxSeek(&ri_cons,"^",NULL,0);
while(raxNext(&ri_cons)) { while(raxNext(&ri_cons)) {
streamConsumer *consumer = ri_cons.data; streamConsumer *consumer = ri_cons.data;
/* If there are no pending entries, just emit XGROUP CREATECONSUMER */
if (raxSize(consumer->pel) == 0) {
if (rioWriteStreamEmptyConsumer(r,key,(char*)ri.key,
ri.key_len,consumer) == 0)
{
return 0;
}
continue;
}
/* For the current consumer, iterate all the PEL entries /* For the current consumer, iterate all the PEL entries
* to emit the XCLAIM protocol. */ * to emit the XCLAIM protocol. */
raxIterator ri_pel; raxIterator ri_pel;

View File

@ -371,11 +371,18 @@ void serveClientsBlockedOnStreamKey(robj *o, readyList *rl) {
int noack = 0; int noack = 0;
if (group) { if (group) {
int created = 0;
consumer = consumer =
streamLookupConsumer(group, streamLookupConsumer(group,
receiver->bpop.xread_consumer->ptr, receiver->bpop.xread_consumer->ptr,
SLC_NONE); SLC_NONE,
&created);
noack = receiver->bpop.xread_group_noack; noack = receiver->bpop.xread_group_noack;
if (created && noack) {
streamPropagateConsumerCreation(receiver,rl->key,
receiver->bpop.xread_group,
consumer->name);
}
} }
/* Emit the two elements sub-array consisting of /* Emit the two elements sub-array consisting of

View File

@ -1912,7 +1912,7 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key) {
return NULL; return NULL;
} }
streamConsumer *consumer = streamConsumer *consumer =
streamLookupConsumer(cgroup,cname,SLC_NONE); streamLookupConsumer(cgroup,cname,SLC_NONE,NULL);
sdsfree(cname); sdsfree(cname);
consumer->seen_time = rdbLoadMillisecondTime(rdb,RDB_VERSION); consumer->seen_time = rdbLoadMillisecondTime(rdb,RDB_VERSION);
if (rioGetReadError(rdb)) { if (rioGetReadError(rdb)) {

View File

@ -110,12 +110,13 @@ int streamIteratorGetID(streamIterator *si, streamID *id, int64_t *numfields);
void streamIteratorGetField(streamIterator *si, unsigned char **fieldptr, unsigned char **valueptr, int64_t *fieldlen, int64_t *valuelen); void streamIteratorGetField(streamIterator *si, unsigned char **fieldptr, unsigned char **valueptr, int64_t *fieldlen, int64_t *valuelen);
void streamIteratorStop(streamIterator *si); void streamIteratorStop(streamIterator *si);
streamCG *streamLookupCG(stream *s, sds groupname); streamCG *streamLookupCG(stream *s, sds groupname);
streamConsumer *streamLookupConsumer(streamCG *cg, sds name, int flags); streamConsumer *streamLookupConsumer(streamCG *cg, sds name, int flags, int *created);
streamCG *streamCreateCG(stream *s, char *name, size_t namelen, streamID *id); streamCG *streamCreateCG(stream *s, char *name, size_t namelen, streamID *id);
streamNACK *streamCreateNACK(streamConsumer *consumer); streamNACK *streamCreateNACK(streamConsumer *consumer);
void streamDecodeID(void *buf, streamID *id); void streamDecodeID(void *buf, streamID *id);
int streamCompareID(streamID *a, streamID *b); int streamCompareID(streamID *a, streamID *b);
void streamFreeNACK(streamNACK *na); void streamFreeNACK(streamNACK *na);
void streamIncrID(streamID *id); void streamIncrID(streamID *id);
void streamPropagateConsumerCreation(client *c, robj *key, robj *groupname, sds consumername);
#endif #endif

View File

@ -894,6 +894,30 @@ void streamPropagateGroupID(client *c, robj *key, streamCG *group, robj *groupna
decrRefCount(argv[4]); decrRefCount(argv[4]);
} }
/* We need this when we want to propagate creation of consumer that was created
* by XREADGROUP with the NOACK option. In that case, the only way to create
* the consumer at the replica is by using XGROUP CREATECONSUMER (see issue #7140)
*
* XGROUP CREATECONSUMER <key> <groupname> <consumername>
*/
void streamPropagateConsumerCreation(client *c, robj *key, robj *groupname, sds consumername) {
robj *argv[5];
argv[0] = createStringObject("XGROUP",6);
argv[1] = createStringObject("CREATECONSUMER",14);
argv[2] = key;
argv[3] = groupname;
argv[4] = createObject(OBJ_STRING,sdsdup(consumername));
/* We use progagate() because this code path is not always called from
* the command execution context. Moreover this will just alter the
* consumer group state, and we don't need MULTI/EXEC wrapping because
* there is no message state cross-message atomicity required. */
propagate(server.xgroupCommand,c->db->id,argv,5,PROPAGATE_AOF|PROPAGATE_REPL);
decrRefCount(argv[0]);
decrRefCount(argv[1]);
decrRefCount(argv[4]);
}
/* Send the stream items in the specified range to the client 'c'. The range /* Send the stream items in the specified range to the client 'c'. The range
* the client will receive is between start and end inclusive, if 'count' is * the client will receive is between start and end inclusive, if 'count' is
* non zero, no more than 'count' elements are sent. * non zero, no more than 'count' elements are sent.
@ -1565,11 +1589,17 @@ void xreadCommand(client *c) {
* of the stream and the data we extracted from it. */ * of the stream and the data we extracted from it. */
if (c->resp == 2) addReplyArrayLen(c,2); if (c->resp == 2) addReplyArrayLen(c,2);
addReplyBulk(c,c->argv[streams_arg+i]); addReplyBulk(c,c->argv[streams_arg+i]);
int created = 0;
streamConsumer *consumer = NULL; streamConsumer *consumer = NULL;
if (groups) consumer = streamLookupConsumer(groups[i], if (groups) consumer = streamLookupConsumer(groups[i],
consumername->ptr, consumername->ptr,
SLC_NONE); SLC_NONE,
&created);
streamPropInfo spi = {c->argv[i+streams_arg],groupname}; streamPropInfo spi = {c->argv[i+streams_arg],groupname};
if (created && noack)
streamPropagateConsumerCreation(c,spi.keyname,
spi.groupname,
consumer->name);
int flags = 0; int flags = 0;
if (noack) flags |= STREAM_RWR_NOACK; if (noack) flags |= STREAM_RWR_NOACK;
if (serve_history) flags |= STREAM_RWR_HISTORY; if (serve_history) flags |= STREAM_RWR_HISTORY;
@ -1701,10 +1731,10 @@ streamCG *streamLookupCG(stream *s, sds groupname) {
} }
/* Lookup the consumer with the specified name in the group 'cg': if the /* Lookup the consumer with the specified name in the group 'cg': if the
* consumer does not exist it is automatically created as a side effect * consumer does not exist it is created unless SLC_NOCREAT flag was specified.
* of calling this function, otherwise its last seen time is updated and * Its last seen time is updated unless SLC_NOREFRESH flag was specified. */
* the existing consumer reference returned. */ streamConsumer *streamLookupConsumer(streamCG *cg, sds name, int flags, int *created) {
streamConsumer *streamLookupConsumer(streamCG *cg, sds name, int flags) { if (created) *created = 0;
int create = !(flags & SLC_NOCREAT); int create = !(flags & SLC_NOCREAT);
int refresh = !(flags & SLC_NOREFRESH); int refresh = !(flags & SLC_NOREFRESH);
streamConsumer *consumer = raxFind(cg->consumers,(unsigned char*)name, streamConsumer *consumer = raxFind(cg->consumers,(unsigned char*)name,
@ -1716,8 +1746,10 @@ streamConsumer *streamLookupConsumer(streamCG *cg, sds name, int flags) {
consumer->pel = raxNew(); consumer->pel = raxNew();
raxInsert(cg->consumers,(unsigned char*)name,sdslen(name), raxInsert(cg->consumers,(unsigned char*)name,sdslen(name),
consumer,NULL); consumer,NULL);
} consumer->seen_time = mstime();
if (refresh) consumer->seen_time = mstime(); if (created) *created = 1;
} else if (refresh)
consumer->seen_time = mstime();
return consumer; return consumer;
} }
@ -1726,7 +1758,7 @@ streamConsumer *streamLookupConsumer(streamCG *cg, sds name, int flags) {
* of pending messages "lost" is returned. */ * of pending messages "lost" is returned. */
uint64_t streamDelConsumer(streamCG *cg, sds name) { uint64_t streamDelConsumer(streamCG *cg, sds name) {
streamConsumer *consumer = streamConsumer *consumer =
streamLookupConsumer(cg,name,SLC_NOCREAT|SLC_NOREFRESH); streamLookupConsumer(cg,name,SLC_NOCREAT|SLC_NOREFRESH,NULL);
if (consumer == NULL) return 0; if (consumer == NULL) return 0;
uint64_t retval = raxSize(consumer->pel); uint64_t retval = raxSize(consumer->pel);
@ -1756,6 +1788,7 @@ uint64_t streamDelConsumer(streamCG *cg, sds name) {
/* XGROUP CREATE <key> <groupname> <id or $> [MKSTREAM] /* XGROUP CREATE <key> <groupname> <id or $> [MKSTREAM]
* XGROUP SETID <key> <groupname> <id or $> * XGROUP SETID <key> <groupname> <id or $>
* XGROUP DESTROY <key> <groupname> * XGROUP DESTROY <key> <groupname>
* CREATECONSUMER <key> <groupname> <consumer>
* XGROUP DELCONSUMER <key> <groupname> <consumername> */ * XGROUP DELCONSUMER <key> <groupname> <consumername> */
void xgroupCommand(client *c) { void xgroupCommand(client *c) {
const char *help[] = { const char *help[] = {
@ -1763,6 +1796,7 @@ void xgroupCommand(client *c) {
" option MKSTREAM: create the empty stream if it does not exist.", " option MKSTREAM: create the empty stream if it does not exist.",
"SETID <key> <groupname> <id or $> -- Set the current group ID.", "SETID <key> <groupname> <id or $> -- Set the current group ID.",
"DESTROY <key> <groupname> -- Remove the specified group.", "DESTROY <key> <groupname> -- Remove the specified group.",
"CREATECONSUMER <key> <groupname> <consumer> -- Create new consumer in the specified group.",
"DELCONSUMER <key> <groupname> <consumer> -- Remove the specified consumer.", "DELCONSUMER <key> <groupname> <consumer> -- Remove the specified consumer.",
"HELP -- Prints this help.", "HELP -- Prints this help.",
NULL NULL
@ -1809,6 +1843,7 @@ NULL
/* Certain subcommands require the group to exist. */ /* Certain subcommands require the group to exist. */
if ((cg = streamLookupCG(s,grpname)) == NULL && if ((cg = streamLookupCG(s,grpname)) == NULL &&
(!strcasecmp(opt,"SETID") || (!strcasecmp(opt,"SETID") ||
!strcasecmp(opt,"CREATECONSUMER") ||
!strcasecmp(opt,"DELCONSUMER"))) !strcasecmp(opt,"DELCONSUMER")))
{ {
addReplyErrorFormat(c, "-NOGROUP No such consumer group '%s' " addReplyErrorFormat(c, "-NOGROUP No such consumer group '%s' "
@ -1875,6 +1910,15 @@ NULL
} else { } else {
addReply(c,shared.czero); addReply(c,shared.czero);
} }
} else if (!strcasecmp(opt,"CREATECONSUMER") && c->argc == 5) {
int created = 0;
streamLookupConsumer(cg,c->argv[4]->ptr,SLC_NOREFRESH,&created);
if (created) {
server.dirty++;
notifyKeyspaceEvent(NOTIFY_STREAM,"xgroup-createconsumer",
c->argv[2],c->db->id);
}
addReplyLongLong(c,created);
} else if (!strcasecmp(opt,"DELCONSUMER") && c->argc == 5) { } else if (!strcasecmp(opt,"DELCONSUMER") && c->argc == 5) {
/* Delete the consumer and returns the number of pending messages /* Delete the consumer and returns the number of pending messages
* that were yet associated with such a consumer. */ * that were yet associated with such a consumer. */
@ -2077,7 +2121,8 @@ void xpendingCommand(client *c) {
if (consumername) { if (consumername) {
consumer = streamLookupConsumer(group, consumer = streamLookupConsumer(group,
consumername->ptr, consumername->ptr,
SLC_NOCREAT|SLC_NOREFRESH); SLC_NOCREAT|SLC_NOREFRESH,
NULL);
/* If a consumer name was mentioned but it does not exist, we can /* If a consumer name was mentioned but it does not exist, we can
* just return an empty array. */ * just return an empty array. */
@ -2348,7 +2393,7 @@ void xclaimCommand(client *c) {
raxRemove(nack->consumer->pel,buf,sizeof(buf),NULL); raxRemove(nack->consumer->pel,buf,sizeof(buf),NULL);
/* Update the consumer and idle time. */ /* Update the consumer and idle time. */
if (consumer == NULL) if (consumer == NULL)
consumer = streamLookupConsumer(group,c->argv[3]->ptr,SLC_NONE); consumer = streamLookupConsumer(group,c->argv[3]->ptr,SLC_NONE,NULL);
nack->consumer = consumer; nack->consumer = consumer;
nack->delivery_time = deliverytime; nack->delivery_time = deliverytime;
/* Set the delivery attempts counter if given, otherwise /* Set the delivery attempts counter if given, otherwise

View File

@ -328,6 +328,102 @@ start_server {
assert_equal [lindex $reply 9] "{100-0 {a 1}}" assert_equal [lindex $reply 9] "{100-0 {a 1}}"
} }
test {XGROUP CREATECONSUMER: create consumer if does not exist} {
r del mystream
r XGROUP CREATE mystream mygroup $ MKSTREAM
r XADD mystream * f v
set reply [r xinfo groups mystream]
set group_info [lindex $reply 0]
set n_consumers [lindex $group_info 3]
assert_equal $n_consumers 0 ;# consumers number in cg
# create consumer using XREADGROUP
r XREADGROUP GROUP mygroup Alice COUNT 1 STREAMS mystream >
set reply [r xinfo groups mystream]
set group_info [lindex $reply 0]
set n_consumers [lindex $group_info 3]
assert_equal $n_consumers 1 ;# consumers number in cg
set reply [r xinfo consumers mystream mygroup]
set consumer_info [lindex $reply 0]
assert_equal [lindex $consumer_info 1] "Alice" ;# consumer name
# create group using XGROUP CREATECONSUMER when Alice already exists
set created [r XGROUP CREATECONSUMER mystream mygroup Alice]
assert_equal $created 0
# create group using XGROUP CREATECONSUMER when Bob does not exist
set created [r XGROUP CREATECONSUMER mystream mygroup Bob]
assert_equal $created 1
set reply [r xinfo groups mystream]
set group_info [lindex $reply 0]
set n_consumers [lindex $group_info 3]
assert_equal $n_consumers 2 ;# consumers number in cg
set reply [r xinfo consumers mystream mygroup]
set consumer_info [lindex $reply 0]
assert_equal [lindex $consumer_info 1] "Alice" ;# consumer name
set consumer_info [lindex $reply 1]
assert_equal [lindex $consumer_info 1] "Bob" ;# consumer name
}
test {XGROUP CREATECONSUMER: group must exist} {
r del mystream
r XADD mystream * f v
assert_error "*NOGROUP*" {r XGROUP CREATECONSUMER mystream mygroup consumer}
}
start_server {tags {"stream"} overrides {appendonly yes aof-use-rdb-preamble no appendfsync always}} {
test {XREADGROUP with NOACK creates consumer} {
r del mystream
r XGROUP CREATE mystream mygroup $ MKSTREAM
r XADD mystream * f1 v1
r XREADGROUP GROUP mygroup Alice NOACK STREAMS mystream ">"
set rd [redis_deferring_client]
$rd XREADGROUP GROUP mygroup Bob BLOCK 0 NOACK STREAMS mystream ">"
r XADD mystream * f2 v2
set grpinfo [r xinfo groups mystream]
r debug loadaof
assert {[r xinfo groups mystream] == $grpinfo}
set reply [r xinfo consumers mystream mygroup]
set consumer_info [lindex $reply 0]
assert_equal [lindex $consumer_info 1] "Alice" ;# consumer name
set consumer_info [lindex $reply 1]
assert_equal [lindex $consumer_info 1] "Bob" ;# consumer name
}
test {Consumer without PEL is present in AOF after AOFRW} {
r del mystream
r XGROUP CREATE mystream mygroup $ MKSTREAM
r XADD mystream * f v
r XREADGROUP GROUP mygroup Alice NOACK STREAMS mystream ">"
set rd [redis_deferring_client]
$rd XREADGROUP GROUP mygroup Bob BLOCK 0 NOACK STREAMS mystream ">"
r XGROUP CREATECONSUMER mystream mygroup Charlie
set grpinfo [lindex [r xinfo groups mystream] 0]
r bgrewriteaof
waitForBgrewriteaof r
r debug loadaof
set curr_grpinfo [lindex [r xinfo groups mystream] 0]
assert {$curr_grpinfo == $grpinfo}
set n_consumers [lindex $grpinfo 3]
# Bob should be created only when there will be new data for this client
assert_equal $n_consumers 2
set reply [r xinfo consumers mystream mygroup]
set consumer_info [lindex $reply 0]
assert_equal [lindex $consumer_info 1] "Alice"
set consumer_info [lindex $reply 1]
assert_equal [lindex $consumer_info 1] "Charlie"
}
}
start_server {} { start_server {} {
set master [srv -1 client] set master [srv -1 client]
set master_host [srv -1 host] set master_host [srv -1 host]