diff --git a/src/t_stream.c b/src/t_stream.c index 0ca1cea03..c659a7017 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -1447,7 +1447,7 @@ void xackCommand(client *c) { addReplyLongLong(c,acknowledged); } -/* XPENDING [ ] [] +/* XPENDING [ ] [] * * If start and stop are omitted, the command just outputs information about * the amount of pending messages for the key/group pair, together with @@ -1462,6 +1462,8 @@ void xpendingCommand(client *c) { robj *key = c->argv[1]; robj *groupname = c->argv[2]; robj *consumername = (c->argc == 7) ? c->argv[6] : NULL; + streamID startid, endid; + long long count; /* Start and stop, and the consumer, can be omitted. */ if (c->argc != 3 && c->argc != 6 && c->argc != 7) { @@ -1469,6 +1471,17 @@ void xpendingCommand(client *c) { return; } + /* Parse start/end/count arguments ASAP if needed, in order to report + * syntax errors before any other error. */ + if (c->argc >= 6) { + if (getLongLongFromObjectOrReply(c,c->argv[5],&count,NULL) == C_ERR) + return; + if (streamParseIDOrReply(c,c->argv[3],&startid,0) == C_ERR) + return; + if (streamParseIDOrReply(c,c->argv[4],&endid,UINT64_MAX) == C_ERR) + return; + } + /* Lookup the key and the group inside the stream. */ robj *o = lookupKeyRead(c->db,c->argv[1]); streamCG *group; @@ -1494,8 +1507,6 @@ void xpendingCommand(client *c) { addReply(c,shared.nullbulk); /* End. */ addReply(c,shared.nullmultibulk); /* Clients. */ } else { - streamID startid,endid; - /* Start. */ raxIterator ri; raxStart(&ri,group->pel); @@ -1530,6 +1541,47 @@ void xpendingCommand(client *c) { } /* XPENDING [] variant. */ else { + streamConsumer *consumer = consumername ? + streamLookupConsumer(group,consumername->ptr): + NULL; + rax *pel = consumer ? consumer->pel : group->pel; + unsigned char startkey[sizeof(streamID)]; + unsigned char endkey[sizeof(streamID)]; + raxIterator ri; + mstime_t now = mstime(); + + streamEncodeID(startkey,&startid); + streamEncodeID(endkey,&endid); + raxStart(&ri,pel); + raxSeek(&ri,">=",startkey,sizeof(startkey)); + void *arraylen_ptr = addDeferredMultiBulkLength(c); + size_t arraylen = 0; + + while(raxNext(&ri) && memcmp(ri.key,endkey,ri.key_len) <= 0) { + streamNACK *nack = ri.data; + + arraylen++; + addReplyMultiBulkLen(c,4); + + /* Entry ID. */ + streamID id; + streamDecodeID(ri.key,&id); + addReplyStreamID(c,&id); + + /* Consumer name. */ + addReplyBulkCBuffer(c,nack->consumer->name, + sdslen(nack->consumer->name)); + + /* Milliseconds elapsed since last delivery. */ + mstime_t elapsed = now - nack->delivery_time; + if (elapsed < 0) elapsed = 0; + addReplyLongLong(c,elapsed); + + /* Number of deliveries. */ + addReplyLongLong(c,nack->delivery_count); + } + raxStop(&ri); + setDeferredMultiBulkLength(c,arraylen_ptr,arraylen); } }