mirror of
https://codeberg.org/redict/redict.git
synced 2025-01-23 00:28:26 -05:00
Stream: Handle streamID-related edge cases
This commit solves several edge cases that are related to exhausting the streamID limits: We should correctly calculate the succeeding streamID instead of blindly incrementing 'seq' This affects both XREAD and XADD. Other (unrelated) changes: Reply with a better error message when trying to add an entry to a stream that has exhausted last_id
This commit is contained in:
parent
324e22accf
commit
1f75ce30df
@ -388,7 +388,7 @@ void serveClientsBlockedOnStreamKey(robj *o, readyList *rl) {
|
|||||||
|
|
||||||
if (streamCompareID(&s->last_id, gt) > 0) {
|
if (streamCompareID(&s->last_id, gt) > 0) {
|
||||||
streamID start = *gt;
|
streamID start = *gt;
|
||||||
start.seq++; /* Can't overflow, it's an uint64_t */
|
streamIncrID(&start);
|
||||||
|
|
||||||
/* Lookup the consumer for the group, if any. */
|
/* Lookup the consumer for the group, if any. */
|
||||||
streamConsumer *consumer = NULL;
|
streamConsumer *consumer = NULL;
|
||||||
|
@ -111,5 +111,6 @@ streamNACK *streamCreateNACK(streamConsumer *consumer);
|
|||||||
void streamDecodeID(void *buf, streamID *id);
|
void streamDecodeID(void *buf, streamID *id);
|
||||||
int streamCompareID(streamID *a, streamID *b);
|
int streamCompareID(streamID *a, streamID *b);
|
||||||
void streamFreeNACK(streamNACK *na);
|
void streamFreeNACK(streamNACK *na);
|
||||||
|
void streamIncrID(streamID *id);
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
|
@ -73,6 +73,21 @@ unsigned long streamLength(const robj *subject) {
|
|||||||
return s->length;
|
return s->length;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* Set 'id' to be its successor streamID */
|
||||||
|
void streamIncrID(streamID *id) {
|
||||||
|
if (id->seq == UINT64_MAX) {
|
||||||
|
if (id->ms == UINT64_MAX) {
|
||||||
|
/* Special case where 'id' is the last possible streamID... */
|
||||||
|
id->ms = id->seq = 0;
|
||||||
|
} else {
|
||||||
|
id->ms++;
|
||||||
|
id->seq = 0;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
id->seq++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/* Generate the next stream item ID given the previous one. If the current
|
/* Generate the next stream item ID given the previous one. If the current
|
||||||
* milliseconds Unix time is greater than the previous one, just use this
|
* milliseconds Unix time is greater than the previous one, just use this
|
||||||
* as time part and start with sequence part of zero. Otherwise we use the
|
* as time part and start with sequence part of zero. Otherwise we use the
|
||||||
@ -83,8 +98,8 @@ void streamNextID(streamID *last_id, streamID *new_id) {
|
|||||||
new_id->ms = ms;
|
new_id->ms = ms;
|
||||||
new_id->seq = 0;
|
new_id->seq = 0;
|
||||||
} else {
|
} else {
|
||||||
new_id->ms = last_id->ms;
|
*new_id = *last_id;
|
||||||
new_id->seq = last_id->seq+1;
|
streamIncrID(new_id);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1220,6 +1235,13 @@ void xaddCommand(client *c) {
|
|||||||
if ((o = streamTypeLookupWriteOrCreate(c,c->argv[1])) == NULL) return;
|
if ((o = streamTypeLookupWriteOrCreate(c,c->argv[1])) == NULL) return;
|
||||||
s = o->ptr;
|
s = o->ptr;
|
||||||
|
|
||||||
|
/* Return ASAP if the stream has reached the last possible ID */
|
||||||
|
if (s->last_id.ms == UINT64_MAX && s->last_id.seq == UINT64_MAX) {
|
||||||
|
addReplyError(c,"The stream has exhausted the last possible ID, "
|
||||||
|
"unable to add more items");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
/* Append using the low level function and return the ID. */
|
/* Append using the low level function and return the ID. */
|
||||||
if (streamAppendItem(s,c->argv+field_pos,(c->argc-field_pos)/2,
|
if (streamAppendItem(s,c->argv+field_pos,(c->argc-field_pos)/2,
|
||||||
&id, id_given ? &id : NULL)
|
&id, id_given ? &id : NULL)
|
||||||
@ -1509,7 +1531,7 @@ void xreadCommand(client *c) {
|
|||||||
* so start from the next ID, since we want only messages with
|
* so start from the next ID, since we want only messages with
|
||||||
* IDs greater than start. */
|
* IDs greater than start. */
|
||||||
streamID start = *gt;
|
streamID start = *gt;
|
||||||
start.seq++; /* uint64_t can't overflow in this context. */
|
streamIncrID(&start);
|
||||||
|
|
||||||
/* Emit the two elements sub-array consisting of the name
|
/* Emit the two elements sub-array consisting of the name
|
||||||
* of the stream and the data we extracted from it. */
|
* of the stream and the data we extracted from it. */
|
||||||
|
@ -328,6 +328,33 @@ start_server {
|
|||||||
|
|
||||||
assert_equal [r xrevrange teststream2 1234567891245 -] {{1234567891240-0 {key1 value2}} {1234567891230-0 {key1 value1}}}
|
assert_equal [r xrevrange teststream2 1234567891245 -] {{1234567891240-0 {key1 value2}} {1234567891230-0 {key1 value1}}}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
test {XREAD streamID edge (no-blocking)} {
|
||||||
|
r del x
|
||||||
|
r XADD x 1-1 f v
|
||||||
|
r XADD x 1-18446744073709551615 f v
|
||||||
|
r XADD x 2-1 f v
|
||||||
|
set res [r XREAD BLOCK 0 STREAMS x 1-18446744073709551615]
|
||||||
|
assert {[lindex $res 0 1 0] == {2-1 {f v}}}
|
||||||
|
}
|
||||||
|
|
||||||
|
test {XREAD streamID edge (blocking)} {
|
||||||
|
r del x
|
||||||
|
set rd [redis_deferring_client]
|
||||||
|
$rd XREAD BLOCK 0 STREAMS x 1-18446744073709551615
|
||||||
|
r XADD x 1-1 f v
|
||||||
|
r XADD x 1-18446744073709551615 f v
|
||||||
|
r XADD x 2-1 f v
|
||||||
|
set res [$rd read]
|
||||||
|
assert {[lindex $res 0 1 0] == {2-1 {f v}}}
|
||||||
|
}
|
||||||
|
|
||||||
|
test {XADD streamID edge} {
|
||||||
|
r del x
|
||||||
|
r XADD x 2577343934890-18446744073709551615 f v ;# we need the timestamp to be in the future
|
||||||
|
r XADD x * f2 v2
|
||||||
|
assert_equal [r XRANGE x - +] {{2577343934890-18446744073709551615 {f v}} {2577343934891-0 {f2 v2}}}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
start_server {tags {"stream"} overrides {appendonly yes}} {
|
start_server {tags {"stream"} overrides {appendonly yes}} {
|
||||||
|
Loading…
Reference in New Issue
Block a user