Xread last entry in stream (#7388) (#13117)

Allow using `+` as a special ID for last item in stream on XREAD
command.

This would allow to iterate on a stream with XREAD starting with the
last available message instead of the next one which `$` is used for.
I.e. the caller can use `BLOCK` and `+` on the first call, and change to
`$` on the next call.

Closes #7388

---------

Co-authored-by: Felipe Machado <462154+felipou@users.noreply.github.com>
This commit is contained in:
Ronen Kalish 2024-03-13 08:23:32 +02:00 committed by GitHub
parent 9efc6ad6a6
commit a8e745117f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 138 additions and 0 deletions

View File

@ -2296,6 +2296,28 @@ void xreadCommand(client *c) {
ids[id_idx].seq = 0;
}
continue;
} else if (strcmp(c->argv[i]->ptr,"+") == 0) {
if (xreadgroup) {
addReplyError(c,"The + ID is meaningless in the context of "
"XREADGROUP: you want to read the history of "
"this consumer by specifying a proper ID, or "
"use the > ID to get new messages. The + ID would "
"just return an empty result set.");
goto cleanup;
}
if (o) {
stream *s = o->ptr;
ids[id_idx] = s->last_id;
if (streamDecrID(&ids[id_idx]) != C_OK) {
/* shouldn't happen */
addReplyError(c,"the stream last element ID is 0-0");
goto cleanup;
}
} else {
ids[id_idx].ms = 0;
ids[id_idx].seq = 0;
}
continue;
} else if (strcmp(c->argv[i]->ptr,">") == 0) {
if (!xreadgroup) {
addReplyError(c,"The > ID can be specified only when calling "

View File

@ -394,6 +394,122 @@ start_server {
$rd close
}
test {XREAD last element from non-empty stream} {
# should return last entry
# add 3 entries to a stream
r DEL lestream
r XADD lestream 1-0 k1 v1
r XADD lestream 2-0 k2 v2
r XADD lestream 3-0 k3 v3
# read the last entry
set res [r XREAD STREAMS lestream +]
# verify it's the last entry
assert_equal $res {{lestream {{3-0 {k3 v3}}}}}
# two more entries, with MAX_UINT64 for sequence number for the last one
r XADD lestream 3-18446744073709551614 k4 v4
r XADD lestream 3-18446744073709551615 k5 v5
# read the new last entry
set res [r XREAD STREAMS lestream +]
# verify it's the last entry
assert_equal $res {{lestream {{3-18446744073709551615 {k5 v5}}}}}
}
test {XREAD last element from empty stream} {
# should return nil
# make sure the stream is empty
r DEL lestream
# read last entry and verify nil is received
assert_equal [r XREAD STREAMS lestream +] {}
# add an element to the stream, than delete it
r XADD lestream 1-0 k1 v1
r XDEL lestream 1-0
# verify nil is still received when reading last entry
assert_equal [r XREAD STREAMS lestream +] {}
}
test {XREAD last element blocking from empty stream} {
# should block until a new entry is available
# make sure there is no stream
r DEL lestream
# read last entry from stream, blocking
set rd [redis_deferring_client]
$rd XREAD BLOCK 20000 STREAMS lestream +
wait_for_blocked_client
# add an entry to the stream
r XADD lestream 1-0 k1 v1
# read and verify result
set res [$rd read]
assert_equal $res {{lestream {{1-0 {k1 v1}}}}}
$rd close
}
test {XREAD last element blocking from non-empty stream} {
# should return last element immediately, w/o blocking
# add 3 entries to a stream
r DEL lestream
r XADD lestream 1-0 k1 v1
r XADD lestream 2-0 k2 v2
r XADD lestream 3-0 k3 v3
# read the last entry
set res [r XREAD BLOCK 1000000 STREAMS lestream +]
# verify it's the last entry
assert_equal $res {{lestream {{3-0 {k3 v3}}}}}
}
test {XREAD last element from multiple streams} {
# should return last element only from non-empty streams
# add 3 entries to one stream
r DEL "\{lestream\}1"
r XADD "\{lestream\}1" 1-0 k1 v1
r XADD "\{lestream\}1" 2-0 k2 v2
r XADD "\{lestream\}1" 3-0 k3 v3
# add 3 entries to another stream
r DEL "\{lestream\}2"
r XADD "\{lestream\}2" 1-0 k1 v4
r XADD "\{lestream\}2" 2-0 k2 v5
r XADD "\{lestream\}2" 3-0 k3 v6
# read last element from 3 streams (2 with enetries, 1 non-existent)
# verify the last element from the two existing streams were returned
set res [r XREAD STREAMS "\{lestream\}1" "\{lestream\}2" "\{lestream\}3" + + +]
assert_equal $res {{{{lestream}1} {{3-0 {k3 v3}}}} {{{lestream}2} {{3-0 {k3 v6}}}}}
}
test {XREAD last element with count > 1} {
# Should return only the last element - count has no affect here
# add 3 entries to a stream
r DEL lestream
r XADD lestream 1-0 k1 v1
r XADD lestream 2-0 k2 v2
r XADD lestream 3-0 k3 v3
# read the last entry
set res [r XREAD COUNT 3 STREAMS lestream +]
# verify only last entry was read, even though COUNT > 1
assert_equal $res {{lestream {{3-0 {k3 v3}}}}}
}
test "XREAD: XADD + DEL should not awake client" {
set rd [redis_deferring_client]
r del s1