From bce3d08c66a1bf22ea852295a57b92a8024d4a1b Mon Sep 17 00:00:00 2001 From: Guy Benoish Date: Sun, 3 May 2020 16:49:45 +0300 Subject: [PATCH] XPENDING should not update consumer's seen-time Same goes for XGROUP DELCONSUMER (But in this case, it doesn't have any visible effect) --- src/blocked.c | 7 ++++--- src/rdb.c | 4 ++-- src/stream.h | 7 ++++++- src/t_stream.c | 33 ++++++++++++++++++++------------- 4 files changed, 32 insertions(+), 19 deletions(-) diff --git a/src/blocked.c b/src/blocked.c index 045369e93..92f1cee65 100644 --- a/src/blocked.c +++ b/src/blocked.c @@ -371,9 +371,10 @@ void serveClientsBlockedOnStreamKey(robj *o, readyList *rl) { int noack = 0; if (group) { - consumer = streamLookupConsumer(group, - receiver->bpop.xread_consumer->ptr, - 1); + consumer = + streamLookupConsumer(group, + receiver->bpop.xread_consumer->ptr, + SLC_NONE); noack = receiver->bpop.xread_group_noack; } diff --git a/src/rdb.c b/src/rdb.c index 9f6bf13f1..def8585ac 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -1823,8 +1823,8 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key) { decrRefCount(o); return NULL; } - streamConsumer *consumer = streamLookupConsumer(cgroup,cname, - 1); + streamConsumer *consumer = + streamLookupConsumer(cgroup,cname,SLC_NONE); sdsfree(cname); consumer->seen_time = rdbLoadMillisecondTime(rdb,RDB_VERSION); if (rioGetReadError(rdb)) { diff --git a/src/stream.h b/src/stream.h index b69073994..0d3bf63fc 100644 --- a/src/stream.h +++ b/src/stream.h @@ -96,6 +96,11 @@ typedef struct streamPropInfo { /* Prototypes of exported APIs. */ struct client; +/* Flags for streamLookupConsumer */ +#define SLC_NONE 0 +#define SLC_NOCREAT (1<<0) /* Do not create the consumer if it doesn't exist */ +#define SLC_NOREFRESH (1<<1) /* Do not update consumer's seen-time */ + stream *streamNew(void); void freeStream(stream *s); unsigned long streamLength(const robj *subject); @@ -105,7 +110,7 @@ int streamIteratorGetID(streamIterator *si, streamID *id, int64_t *numfields); void streamIteratorGetField(streamIterator *si, unsigned char **fieldptr, unsigned char **valueptr, int64_t *fieldlen, int64_t *valuelen); void streamIteratorStop(streamIterator *si); streamCG *streamLookupCG(stream *s, sds groupname); -streamConsumer *streamLookupConsumer(streamCG *cg, sds name, int create); +streamConsumer *streamLookupConsumer(streamCG *cg, sds name, int flags); streamCG *streamCreateCG(stream *s, char *name, size_t namelen, streamID *id); streamNACK *streamCreateNACK(streamConsumer *consumer); void streamDecodeID(void *buf, streamID *id); diff --git a/src/t_stream.c b/src/t_stream.c index 5c1b9a523..676ddd9bb 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -1570,7 +1570,8 @@ void xreadCommand(client *c) { addReplyBulk(c,c->argv[streams_arg+i]); streamConsumer *consumer = NULL; if (groups) consumer = streamLookupConsumer(groups[i], - consumername->ptr,1); + consumername->ptr, + SLC_NONE); streamPropInfo spi = {c->argv[i+streams_arg],groupname}; int flags = 0; if (noack) flags |= STREAM_RWR_NOACK; @@ -1706,7 +1707,9 @@ streamCG *streamLookupCG(stream *s, sds groupname) { * consumer does not exist it is automatically created as a side effect * of calling this function, otherwise its last seen time is updated and * the existing consumer reference returned. */ -streamConsumer *streamLookupConsumer(streamCG *cg, sds name, int create) { +streamConsumer *streamLookupConsumer(streamCG *cg, sds name, int flags) { + int create = !(flags & SLC_NOCREAT); + int refresh = !(flags & SLC_NOREFRESH); streamConsumer *consumer = raxFind(cg->consumers,(unsigned char*)name, sdslen(name)); if (consumer == raxNotFound) { @@ -1717,7 +1720,7 @@ streamConsumer *streamLookupConsumer(streamCG *cg, sds name, int create) { raxInsert(cg->consumers,(unsigned char*)name,sdslen(name), consumer,NULL); } - consumer->seen_time = mstime(); + if (refresh) consumer->seen_time = mstime(); return consumer; } @@ -1725,7 +1728,8 @@ streamConsumer *streamLookupConsumer(streamCG *cg, sds name, int create) { * may have pending messages: they are removed from the PEL, and the number * of pending messages "lost" is returned. */ uint64_t streamDelConsumer(streamCG *cg, sds name) { - streamConsumer *consumer = streamLookupConsumer(cg,name,0); + streamConsumer *consumer = + streamLookupConsumer(cg,name,SLC_NOCREAT|SLC_NOREFRESH); if (consumer == NULL) return 0; uint64_t retval = raxSize(consumer->pel); @@ -2068,15 +2072,18 @@ void xpendingCommand(client *c) { } /* XPENDING [] variant. */ else { - streamConsumer *consumer = consumername ? - streamLookupConsumer(group,consumername->ptr,0): - NULL; + streamConsumer *consumer = NULL; + if (consumername) { + consumer = streamLookupConsumer(group, + consumername->ptr, + SLC_NOCREAT|SLC_NOREFRESH); - /* If a consumer name was mentioned but it does not exist, we can - * just return an empty array. */ - if (consumername && consumer == NULL) { - addReplyArrayLen(c,0); - return; + /* If a consumer name was mentioned but it does not exist, we can + * just return an empty array. */ + if (consumer == NULL) { + addReplyArrayLen(c,0); + return; + } } rax *pel = consumer ? consumer->pel : group->pel; @@ -2338,7 +2345,7 @@ void xclaimCommand(client *c) { raxRemove(nack->consumer->pel,buf,sizeof(buf),NULL); /* Update the consumer and idle time. */ if (consumer == NULL) - consumer = streamLookupConsumer(group,c->argv[3]->ptr,1); + consumer = streamLookupConsumer(group,c->argv[3]->ptr,SLC_NONE); nack->consumer = consumer; nack->delivery_time = deliverytime; /* Set the delivery attempts counter if given, otherwise