mirror of
https://codeberg.org/redict/redict.git
synced 2025-01-23 00:28:26 -05:00
Streams: fix XREADGROUP history reading when CG last_id is low.
This fixes the issue reported in #5570. This was fixed the hard way, that is, propagating more information to the lower level API about this being a request to read just the history, so that the code is simpler and less likely to regress.
This commit is contained in:
parent
3830ef2483
commit
29251f58e2
@ -894,6 +894,7 @@ void streamPropagateGroupID(client *c, robj *key, streamCG *group, robj *groupna
|
|||||||
#define STREAM_RWR_NOACK (1<<0) /* Do not create entries in the PEL. */
|
#define STREAM_RWR_NOACK (1<<0) /* Do not create entries in the PEL. */
|
||||||
#define STREAM_RWR_RAWENTRIES (1<<1) /* Do not emit protocol for array
|
#define STREAM_RWR_RAWENTRIES (1<<1) /* Do not emit protocol for array
|
||||||
boundaries, just the entries. */
|
boundaries, just the entries. */
|
||||||
|
#define STREAM_RWR_HISTORY (1<<2) /* Only serve consumer local PEL. */
|
||||||
size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end, size_t count, int rev, streamCG *group, streamConsumer *consumer, int flags, streamPropInfo *spi) {
|
size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end, size_t count, int rev, streamCG *group, streamConsumer *consumer, int flags, streamPropInfo *spi) {
|
||||||
void *arraylen_ptr = NULL;
|
void *arraylen_ptr = NULL;
|
||||||
size_t arraylen = 0;
|
size_t arraylen = 0;
|
||||||
@ -902,15 +903,12 @@ size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end
|
|||||||
streamID id;
|
streamID id;
|
||||||
int propagate_last_id = 0;
|
int propagate_last_id = 0;
|
||||||
|
|
||||||
/* If a group was passed, we check if the request is about messages
|
/* If the client is asking for some history, we serve it using a
|
||||||
* never delivered so far (normally this happens when ">" ID is passed).
|
* different function, so that we return entries *solely* from its
|
||||||
*
|
* own PEL. This ensures each consumer will always and only see
|
||||||
* If instead the client is asking for some history, we serve it
|
* the history of messages delivered to it and not yet confirmed
|
||||||
* using a different function, so that we return entries *solely*
|
|
||||||
* from its own PEL. This ensures each consumer will always and only
|
|
||||||
* see the history of messages delivered to it and not yet confirmed
|
|
||||||
* as delivered. */
|
* as delivered. */
|
||||||
if (group && streamCompareID(start,&group->last_id) <= 0) {
|
if (group && (flags & STREAM_RWR_HISTORY)) {
|
||||||
return streamReplyWithRangeFromConsumerPEL(c,s,start,end,count,
|
return streamReplyWithRangeFromConsumerPEL(c,s,start,end,count,
|
||||||
consumer);
|
consumer);
|
||||||
}
|
}
|
||||||
@ -1474,6 +1472,7 @@ void xreadCommand(client *c) {
|
|||||||
stream *s = o->ptr;
|
stream *s = o->ptr;
|
||||||
streamID *gt = ids+i; /* ID must be greater than this. */
|
streamID *gt = ids+i; /* ID must be greater than this. */
|
||||||
int serve_synchronously = 0;
|
int serve_synchronously = 0;
|
||||||
|
int serve_history = 0; /* True for XREADGROUP with ID != ">". */
|
||||||
|
|
||||||
/* Check if there are the conditions to serve the client
|
/* Check if there are the conditions to serve the client
|
||||||
* synchronously. */
|
* synchronously. */
|
||||||
@ -1485,6 +1484,7 @@ void xreadCommand(client *c) {
|
|||||||
gt->seq != UINT64_MAX)
|
gt->seq != UINT64_MAX)
|
||||||
{
|
{
|
||||||
serve_synchronously = 1;
|
serve_synchronously = 1;
|
||||||
|
serve_history = 1;
|
||||||
} else {
|
} else {
|
||||||
/* We also want to serve a consumer in a consumer group
|
/* We also want to serve a consumer in a consumer group
|
||||||
* synchronously in case the group top item delivered is smaller
|
* synchronously in case the group top item delivered is smaller
|
||||||
@ -1520,9 +1520,12 @@ void xreadCommand(client *c) {
|
|||||||
if (groups) consumer = streamLookupConsumer(groups[i],
|
if (groups) consumer = streamLookupConsumer(groups[i],
|
||||||
consumername->ptr,1);
|
consumername->ptr,1);
|
||||||
streamPropInfo spi = {c->argv[i+streams_arg],groupname};
|
streamPropInfo spi = {c->argv[i+streams_arg],groupname};
|
||||||
|
int flags = 0;
|
||||||
|
if (noack) flags |= STREAM_RWR_NOACK;
|
||||||
|
if (serve_history) flags |= STREAM_RWR_HISTORY;
|
||||||
streamReplyWithRange(c,s,&start,NULL,count,0,
|
streamReplyWithRange(c,s,&start,NULL,count,0,
|
||||||
groups ? groups[i] : NULL,
|
groups ? groups[i] : NULL,
|
||||||
consumer, noack, &spi);
|
consumer, flags, &spi);
|
||||||
if (groups) server.dirty++;
|
if (groups) server.dirty++;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user