diff --git a/src/blocked.c b/src/blocked.c index 20c0e760a..06aa5850e 100644 --- a/src/blocked.c +++ b/src/blocked.c @@ -388,7 +388,7 @@ void serveClientsBlockedOnStreamKey(robj *o, readyList *rl) { if (streamCompareID(&s->last_id, gt) > 0) { streamID start = *gt; - start.seq++; /* Can't overflow, it's an uint64_t */ + streamIncrID(&start); /* Lookup the consumer for the group, if any. */ streamConsumer *consumer = NULL; diff --git a/src/stream.h b/src/stream.h index 7de769ba1..b69073994 100644 --- a/src/stream.h +++ b/src/stream.h @@ -111,5 +111,6 @@ streamNACK *streamCreateNACK(streamConsumer *consumer); void streamDecodeID(void *buf, streamID *id); int streamCompareID(streamID *a, streamID *b); void streamFreeNACK(streamNACK *na); +void streamIncrID(streamID *id); #endif diff --git a/src/t_stream.c b/src/t_stream.c index a499f7381..3d46ca0da 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -73,6 +73,21 @@ unsigned long streamLength(const robj *subject) { return s->length; } +/* Set 'id' to be its successor streamID */ +void streamIncrID(streamID *id) { + if (id->seq == UINT64_MAX) { + if (id->ms == UINT64_MAX) { + /* Special case where 'id' is the last possible streamID... */ + id->ms = id->seq = 0; + } else { + id->ms++; + id->seq = 0; + } + } else { + id->seq++; + } +} + /* Generate the next stream item ID given the previous one. If the current * milliseconds Unix time is greater than the previous one, just use this * as time part and start with sequence part of zero. Otherwise we use the @@ -83,8 +98,8 @@ void streamNextID(streamID *last_id, streamID *new_id) { new_id->ms = ms; new_id->seq = 0; } else { - new_id->ms = last_id->ms; - new_id->seq = last_id->seq+1; + *new_id = *last_id; + streamIncrID(new_id); } } @@ -1220,6 +1235,13 @@ void xaddCommand(client *c) { if ((o = streamTypeLookupWriteOrCreate(c,c->argv[1])) == NULL) return; s = o->ptr; + /* Return ASAP if the stream has reached the last possible ID */ + if (s->last_id.ms == UINT64_MAX && s->last_id.seq == UINT64_MAX) { + addReplyError(c,"The stream has exhausted the last possible ID, " + "unable to add more items"); + return; + } + /* Append using the low level function and return the ID. */ if (streamAppendItem(s,c->argv+field_pos,(c->argc-field_pos)/2, &id, id_given ? &id : NULL) @@ -1509,7 +1531,7 @@ void xreadCommand(client *c) { * so start from the next ID, since we want only messages with * IDs greater than start. */ streamID start = *gt; - start.seq++; /* uint64_t can't overflow in this context. */ + streamIncrID(&start); /* Emit the two elements sub-array consisting of the name * of the stream and the data we extracted from it. */ diff --git a/tests/unit/type/stream.tcl b/tests/unit/type/stream.tcl index 656bac5de..9840e3b74 100644 --- a/tests/unit/type/stream.tcl +++ b/tests/unit/type/stream.tcl @@ -328,6 +328,33 @@ start_server { assert_equal [r xrevrange teststream2 1234567891245 -] {{1234567891240-0 {key1 value2}} {1234567891230-0 {key1 value1}}} } + + test {XREAD streamID edge (no-blocking)} { + r del x + r XADD x 1-1 f v + r XADD x 1-18446744073709551615 f v + r XADD x 2-1 f v + set res [r XREAD BLOCK 0 STREAMS x 1-18446744073709551615] + assert {[lindex $res 0 1 0] == {2-1 {f v}}} + } + + test {XREAD streamID edge (blocking)} { + r del x + set rd [redis_deferring_client] + $rd XREAD BLOCK 0 STREAMS x 1-18446744073709551615 + r XADD x 1-1 f v + r XADD x 1-18446744073709551615 f v + r XADD x 2-1 f v + set res [$rd read] + assert {[lindex $res 0 1 0] == {2-1 {f v}}} + } + + test {XADD streamID edge} { + r del x + r XADD x 2577343934890-18446744073709551615 f v ;# we need the timestamp to be in the future + r XADD x * f2 v2 + assert_equal [r XRANGE x - +] {{2577343934890-18446744073709551615 {f v}} {2577343934891-0 {f2 v2}}} + } } start_server {tags {"stream"} overrides {appendonly yes}} {