diff --git a/src/t_stream.c b/src/t_stream.c index 66f3295e0..dd83f2b1d 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -894,6 +894,7 @@ void streamPropagateGroupID(client *c, robj *key, streamCG *group, robj *groupna #define STREAM_RWR_NOACK (1<<0) /* Do not create entries in the PEL. */ #define STREAM_RWR_RAWENTRIES (1<<1) /* Do not emit protocol for array boundaries, just the entries. */ +#define STREAM_RWR_HISTORY (1<<2) /* Only serve consumer local PEL. */ size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end, size_t count, int rev, streamCG *group, streamConsumer *consumer, int flags, streamPropInfo *spi) { void *arraylen_ptr = NULL; size_t arraylen = 0; @@ -902,15 +903,12 @@ size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end streamID id; int propagate_last_id = 0; - /* If a group was passed, we check if the request is about messages - * never delivered so far (normally this happens when ">" ID is passed). - * - * If instead the client is asking for some history, we serve it - * using a different function, so that we return entries *solely* - * from its own PEL. This ensures each consumer will always and only - * see the history of messages delivered to it and not yet confirmed + /* If the client is asking for some history, we serve it using a + * different function, so that we return entries *solely* from its + * own PEL. This ensures each consumer will always and only see + * the history of messages delivered to it and not yet confirmed * as delivered. */ - if (group && streamCompareID(start,&group->last_id) <= 0) { + if (group && (flags & STREAM_RWR_HISTORY)) { return streamReplyWithRangeFromConsumerPEL(c,s,start,end,count, consumer); } @@ -1474,6 +1472,7 @@ void xreadCommand(client *c) { stream *s = o->ptr; streamID *gt = ids+i; /* ID must be greater than this. */ int serve_synchronously = 0; + int serve_history = 0; /* True for XREADGROUP with ID != ">". */ /* Check if there are the conditions to serve the client * synchronously. */ @@ -1485,6 +1484,7 @@ void xreadCommand(client *c) { gt->seq != UINT64_MAX) { serve_synchronously = 1; + serve_history = 1; } else { /* We also want to serve a consumer in a consumer group * synchronously in case the group top item delivered is smaller @@ -1520,9 +1520,12 @@ void xreadCommand(client *c) { if (groups) consumer = streamLookupConsumer(groups[i], consumername->ptr,1); streamPropInfo spi = {c->argv[i+streams_arg],groupname}; + int flags = 0; + if (noack) flags |= STREAM_RWR_NOACK; + if (serve_history) flags |= STREAM_RWR_HISTORY; streamReplyWithRange(c,s,&start,NULL,count,0, groups ? groups[i] : NULL, - consumer, noack, &spi); + consumer, flags, &spi); if (groups) server.dirty++; } }