Adds exclusive ranges to X[REV]RANGE (#8072)

Adds the ability to use exclusive (open) start and end query intervals in XRANGE and XREVRANGE queries.

Fixes #6562
This commit is contained in:
Itamar Haber 2020-12-03 14:36:48 +02:00 committed by GitHub
parent 4cd1fb1f40
commit 441c490024
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 100 additions and 7 deletions

View File

@ -116,7 +116,8 @@ streamNACK *streamCreateNACK(streamConsumer *consumer);
void streamDecodeID(void *buf, streamID *id);
int streamCompareID(streamID *a, streamID *b);
void streamFreeNACK(streamNACK *na);
void streamIncrID(streamID *id);
int streamIncrID(streamID *id);
int streamDecrID(streamID *id);
void streamPropagateConsumerCreation(client *c, robj *key, robj *groupname, sds consumername);
robj *streamDup(robj *o);

View File

@ -76,12 +76,16 @@ unsigned long streamLength(const robj *subject) {
return s->length;
}
/* Set 'id' to be its successor streamID */
void streamIncrID(streamID *id) {
/* Set 'id' to be its successor stream ID.
* If 'id' is the maximal possible id, it is wrapped around to 0-0 and a
* C_ERR is returned. */
int streamIncrID(streamID *id) {
int ret = C_OK;
if (id->seq == UINT64_MAX) {
if (id->ms == UINT64_MAX) {
/* Special case where 'id' is the last possible streamID... */
id->ms = id->seq = 0;
ret = C_ERR;
} else {
id->ms++;
id->seq = 0;
@ -89,6 +93,27 @@ void streamIncrID(streamID *id) {
} else {
id->seq++;
}
return ret;
}
/* Set 'id' to be its predecessor stream ID.
* If 'id' is the minimal possible id, it remains 0-0 and a C_ERR is
* returned. */
int streamDecrID(streamID *id) {
int ret = C_OK;
if (id->seq == 0) {
if (id->ms == 0) {
/* Special case where 'id' is the first possible streamID... */
id->ms = id->seq = UINT64_MAX;
ret = C_ERR;
} else {
id->ms--;
id->seq = UINT64_MAX;
}
} else {
id->seq--;
}
return ret;
}
/* Generate the next stream item ID given the previous one. If the current
@ -1309,6 +1334,29 @@ int streamParseStrictIDOrReply(client *c, robj *o, streamID *id, uint64_t missin
return streamGenericParseIDOrReply(c,o,id,missing_seq,1);
}
/* Helper for parsing a stream ID that is a range query interval. When the
* exclude argument is NULL, streamParseIDOrReply() is called and the interval
* is treated as close (inclusive). Otherwise, the exclude argument is set if
* the interval is open (the "(" prefix) and streamParseStrictIDOrReply() is
* called in that case.
*/
int streamParseIntervalIDOrReply(client *c, robj *o, streamID *id, int *exclude, uint64_t missing_seq) {
char *p = o->ptr;
size_t len = sdslen(p);
int invalid = 0;
if (exclude != NULL) *exclude = (len > 1 && p[0] == '(');
if (exclude != NULL && *exclude) {
robj *t = createStringObject(p+1,len-1);
invalid = (streamParseStrictIDOrReply(c,t,id,missing_seq) == C_ERR);
decrRefCount(t);
} else
invalid = (streamParseIDOrReply(c,o,id,missing_seq) == C_ERR);
if (invalid)
return C_ERR;
return C_OK;
}
/* We propagate MAXLEN ~ <count> as MAXLEN = <resulting-len-of-stream>
* otherwise trimming is no longer determinsitic on replicas / AOF. */
void streamRewriteApproxMaxlen(client *c, stream *s, int maxlen_arg_idx) {
@ -1433,7 +1481,13 @@ void xaddCommand(client *c) {
signalKeyAsReady(c->db, c->argv[1], OBJ_STREAM);
}
/* XRANGE/XREVRANGE actual implementation. */
/* XRANGE/XREVRANGE actual implementation.
* The 'start' and 'end' IDs are parsed as follows:
* Incomplete 'start' has its sequence set to 0, and 'end' to UINT64_MAX.
* "-" and "+"" mean the minimal and maximal ID values, respectively.
* The "(" prefix means an open (exclusive) range, so XRANGE stream (1-0 (2-0
* will match anything from 1-1 and 1-UINT64_MAX.
*/
void xrangeGenericCommand(client *c, int rev) {
robj *o;
stream *s;
@ -1441,9 +1495,21 @@ void xrangeGenericCommand(client *c, int rev) {
long long count = -1;
robj *startarg = rev ? c->argv[3] : c->argv[2];
robj *endarg = rev ? c->argv[2] : c->argv[3];
int startex = 0, endex = 0;
if (streamParseIDOrReply(c,startarg,&startid,0) == C_ERR) return;
if (streamParseIDOrReply(c,endarg,&endid,UINT64_MAX) == C_ERR) return;
/* Parse start and end IDs. */
if (streamParseIntervalIDOrReply(c,startarg,&startid,&startex,0) != C_OK)
return;
if (startex && streamIncrID(&startid) != C_OK) {
addReplyError(c,"invalid start ID for the interval");
return;
}
if (streamParseIntervalIDOrReply(c,endarg,&endid,&endex,UINT64_MAX) != C_OK)
return;
if (endex && streamDecrID(&endid) != C_OK) {
addReplyError(c,"invalid end ID for the interval");
return;
}
/* Parse the COUNT option if any. */
if (c->argc > 4) {

View File

@ -192,6 +192,32 @@ start_server {
assert {[r xrange mystream - +] == [lreverse [r xrevrange mystream + -]]}
}
test {XRANGE exclusive ranges} {
set ids {0-1 0-18446744073709551615 1-0 42-0 42-42
18446744073709551615-18446744073709551614
18446744073709551615-18446744073709551615}
set total [llength $ids]
r multi
r DEL vipstream
foreach id $ids {
r XADD vipstream $id foo bar
}
r exec
assert {[llength [r xrange vipstream - +]] == $total}
assert {[llength [r xrange vipstream ([lindex $ids 0] +]] == $total-1}
assert {[llength [r xrange vipstream - ([lindex $ids $total-1]]] == $total-1}
assert {[llength [r xrange vipstream (0-1 (1-0]] == 1}
assert {[llength [r xrange vipstream (1-0 (42-42]] == 1}
catch {r xrange vipstream (- +} e
assert_match {ERR*} $e
catch {r xrange vipstream - (+} e
assert_match {ERR*} $e
catch {r xrange vipstream (18446744073709551615-18446744073709551615 +} e
assert_match {ERR*} $e
catch {r xrange vipstream - (0-0} e
assert_match {ERR*} $e
}
test {XREAD with non empty stream} {
set res [r XREAD COUNT 1 STREAMS mystream 0-0]
assert {[lrange [lindex $res 0 1 0 1] 0 1] eq {item 0}}