mirror of
https://codeberg.org/redict/redict.git
synced 2025-01-22 08:08:53 -05:00
Fix propagation of entries_read by calling streamPropagateGroupID unconditionally (#12898)
In XREADGROUP ACK, because streamPropagateXCLAIM does not propagate entries-read, entries-read will be inconsistent between master and replicas. I.e. if no entries were claimed, it would have propagated correctly, but if some were claimed, then the entries-read field would be inconsistent on the replica. The fix was suggested by guybe7, call streamPropagateGroupID unconditionally, so that we will normalize entries_read on the replicas. In the past, we would only set propagate_last_id when NOACK was specified. And in #9127, XCLAIM did not propagate entries_read in ACK, which would cause entries_read to be inconsistent between master and replicas. Another approach is add another arg to XCLAIM and let it propagate entries_read, but we decided not to use it. Because we want minimal damage in case there's an old target and new source (in the worst case scenario, the new source doesn't recognize XGROUP SETID ... ENTRIES READ and the lag is lost. If we change XCLAIM, the damage is much more severe). In this patch, now if the user uses XREADGROUP .. COUNT 1 there will be an additional overhead of MULTI, EXEC and XGROUPSETID. We assume the extra command in case of COUNT 1 (4x factor, changing from one XCLAIM to MULTI+XCLAIM+XSETID+EXEC), is probably ok since reading just one entry is in any case very inefficient (a client round trip per record), so we're hoping it's not a common case. Issue was introduced in #9127.
This commit is contained in:
parent
cc9fbd270e
commit
f17381a38d
@ -9386,7 +9386,7 @@ struct COMMAND_ARG XGROUP_CREATE_Args[] = {
|
||||
{MAKE_ARG("group",ARG_TYPE_STRING,-1,NULL,NULL,NULL,CMD_ARG_NONE,0,NULL)},
|
||||
{MAKE_ARG("id-selector",ARG_TYPE_ONEOF,-1,NULL,NULL,NULL,CMD_ARG_NONE,2,NULL),.subargs=XGROUP_CREATE_id_selector_Subargs},
|
||||
{MAKE_ARG("mkstream",ARG_TYPE_PURE_TOKEN,-1,"MKSTREAM",NULL,NULL,CMD_ARG_OPTIONAL,0,NULL)},
|
||||
{MAKE_ARG("entries-read",ARG_TYPE_INTEGER,-1,"ENTRIESREAD",NULL,NULL,CMD_ARG_OPTIONAL,0,NULL)},
|
||||
{MAKE_ARG("entriesread",ARG_TYPE_INTEGER,-1,"ENTRIESREAD",NULL,NULL,CMD_ARG_OPTIONAL,0,NULL),.display_text="entries-read"},
|
||||
};
|
||||
|
||||
/********** XGROUP CREATECONSUMER ********************/
|
||||
|
@ -72,8 +72,9 @@
|
||||
"optional": true
|
||||
},
|
||||
{
|
||||
"name": "entriesread",
|
||||
"display": "entries-read",
|
||||
"token": "ENTRIESREAD",
|
||||
"name": "entries-read",
|
||||
"type": "integer",
|
||||
"optional": true
|
||||
}
|
||||
|
@ -1713,10 +1713,11 @@ size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end
|
||||
group->entries_read = streamEstimateDistanceFromFirstEverEntry(s,&id);
|
||||
}
|
||||
group->last_id = id;
|
||||
/* Group last ID should be propagated only if NOACK was
|
||||
* specified, otherwise the last id will be included
|
||||
* in the propagation of XCLAIM itself. */
|
||||
if (noack) propagate_last_id = 1;
|
||||
/* In the past, we would only set it when NOACK was specified. And in
|
||||
* #9127, XCLAIM did not propagate entries_read in ACK, which would
|
||||
* cause entries_read to be inconsistent between master and replicas,
|
||||
* so here we call streamPropagateGroupID unconditionally. */
|
||||
propagate_last_id = 1;
|
||||
}
|
||||
|
||||
/* Emit a two elements array for each item. The first is
|
||||
|
@ -825,7 +825,9 @@ start_server {tags {"multi"}} {
|
||||
{multi}
|
||||
{xclaim *}
|
||||
{xclaim *}
|
||||
{xgroup SETID * ENTRIESREAD *}
|
||||
{xclaim *}
|
||||
{xgroup SETID * ENTRIESREAD *}
|
||||
{exec}
|
||||
}
|
||||
close_replication_stream $repl
|
||||
|
@ -1277,7 +1277,7 @@ start_server {
|
||||
set replica [srv 0 client]
|
||||
|
||||
foreach autoclaim {0 1} {
|
||||
test "Replication tests of XCLAIM with deleted entries (autclaim=$autoclaim)" {
|
||||
test "Replication tests of XCLAIM with deleted entries (autoclaim=$autoclaim)" {
|
||||
$replica replicaof $master_host $master_port
|
||||
wait_for_condition 50 100 {
|
||||
[s 0 master_link_status] eq {up}
|
||||
@ -1308,6 +1308,27 @@ start_server {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
test {XREADGROUP ACK would propagate entries-read} {
|
||||
$master del mystream
|
||||
$master xadd mystream * a b c d e f
|
||||
$master xgroup create mystream mygroup $
|
||||
$master xreadgroup group mygroup ryan count 1 streams mystream >
|
||||
$master xadd mystream * a1 b1 a1 b2
|
||||
$master xadd mystream * name v1 name v1
|
||||
$master xreadgroup group mygroup ryan count 1 streams mystream >
|
||||
$master xreadgroup group mygroup ryan count 1 streams mystream >
|
||||
|
||||
set reply [$master XINFO STREAM mystream FULL]
|
||||
set group [lindex [dict get $reply groups] 0]
|
||||
assert_equal [dict get $group entries-read] 3
|
||||
assert_equal [dict get $group lag] 0
|
||||
|
||||
set reply [$replica XINFO STREAM mystream FULL]
|
||||
set group [lindex [dict get $reply groups] 0]
|
||||
assert_equal [dict get $group entries-read] 3
|
||||
assert_equal [dict get $group lag] 0
|
||||
}
|
||||
}
|
||||
|
||||
start_server {tags {"stream needs:debug"} overrides {appendonly yes aof-use-rdb-preamble no}} {
|
||||
|
Loading…
Reference in New Issue
Block a user