Add ZREVRANGEBYSCORE and refactor Z*RANGEBYSCORE

This commit is contained in:
Pieter Noordhuis 2010-09-16 14:35:25 +02:00
parent 192fc3376a
commit 25bb8a4452
4 changed files with 219 additions and 142 deletions

View File

@ -120,6 +120,7 @@ struct redisCommand readonlyCommandTable[] = {
{"zinterstore",zinterstoreCommand,-4,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM,zunionInterBlockClientOnSwappedKeys,0,0,0}, {"zinterstore",zinterstoreCommand,-4,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM,zunionInterBlockClientOnSwappedKeys,0,0,0},
{"zrange",zrangeCommand,-4,REDIS_CMD_INLINE,NULL,1,1,1}, {"zrange",zrangeCommand,-4,REDIS_CMD_INLINE,NULL,1,1,1},
{"zrangebyscore",zrangebyscoreCommand,-4,REDIS_CMD_INLINE,NULL,1,1,1}, {"zrangebyscore",zrangebyscoreCommand,-4,REDIS_CMD_INLINE,NULL,1,1,1},
{"zrevrangebyscore",zrevrangebyscoreCommand,-4,REDIS_CMD_INLINE,NULL,1,1,1},
{"zcount",zcountCommand,4,REDIS_CMD_INLINE,NULL,1,1,1}, {"zcount",zcountCommand,4,REDIS_CMD_INLINE,NULL,1,1,1},
{"zrevrange",zrevrangeCommand,-4,REDIS_CMD_INLINE,NULL,1,1,1}, {"zrevrange",zrevrangeCommand,-4,REDIS_CMD_INLINE,NULL,1,1,1},
{"zcard",zcardCommand,2,REDIS_CMD_INLINE,NULL,1,1,1}, {"zcard",zcardCommand,2,REDIS_CMD_INLINE,NULL,1,1,1},

View File

@ -895,6 +895,7 @@ void zaddCommand(redisClient *c);
void zincrbyCommand(redisClient *c); void zincrbyCommand(redisClient *c);
void zrangeCommand(redisClient *c); void zrangeCommand(redisClient *c);
void zrangebyscoreCommand(redisClient *c); void zrangebyscoreCommand(redisClient *c);
void zrevrangebyscoreCommand(redisClient *c);
void zcountCommand(redisClient *c); void zcountCommand(redisClient *c);
void zrevrangeCommand(redisClient *c); void zrevrangeCommand(redisClient *c);
void zcardCommand(redisClient *c); void zcardCommand(redisClient *c);

View File

@ -296,6 +296,44 @@ zskiplistNode* zslistTypeGetElementByRank(zskiplist *zsl, unsigned long rank) {
return NULL; return NULL;
} }
typedef struct {
double min, max;
int minex, maxex; /* are min or max exclusive? */
} zrangespec;
/* Populate the rangespec according to the objects min and max. */
int zslParseRange(robj *min, robj *max, zrangespec *spec) {
spec->minex = spec->maxex = 0;
/* Parse the min-max interval. If one of the values is prefixed
* by the "(" character, it's considered "open". For instance
* ZRANGEBYSCORE zset (1.5 (2.5 will match min < x < max
* ZRANGEBYSCORE zset 1.5 2.5 will instead match min <= x <= max */
if (min->encoding == REDIS_ENCODING_INT) {
spec->min = (long)min->ptr;
} else {
if (((char*)min->ptr)[0] == '(') {
spec->min = strtod((char*)min->ptr+1,NULL);
spec->minex = 1;
} else {
spec->min = strtod((char*)min->ptr,NULL);
}
}
if (max->encoding == REDIS_ENCODING_INT) {
spec->max = (long)max->ptr;
} else {
if (((char*)max->ptr)[0] == '(') {
spec->max = strtod((char*)max->ptr+1,NULL);
spec->maxex = 1;
} else {
spec->max = strtod((char*)max->ptr,NULL);
}
}
return REDIS_OK;
}
/*----------------------------------------------------------------------------- /*-----------------------------------------------------------------------------
* Sorted set commands * Sorted set commands
*----------------------------------------------------------------------------*/ *----------------------------------------------------------------------------*/
@ -781,125 +819,153 @@ void zrevrangeCommand(redisClient *c) {
zrangeGenericCommand(c,1); zrangeGenericCommand(c,1);
} }
/* This command implements both ZRANGEBYSCORE and ZCOUNT. /* This command implements ZRANGEBYSCORE, ZREVRANGEBYSCORE and ZCOUNT.
* If justcount is non-zero, just the count is returned. */ * If "justcount", only the number of elements in the range is returned. */
void genericZrangebyscoreCommand(redisClient *c, int justcount) { void genericZrangebyscoreCommand(redisClient *c, int reverse, int justcount) {
robj *o; zrangespec range;
double min, max; robj *o, *emptyreply;
int minex = 0, maxex = 0; /* are min or max exclusive? */ zset *zsetobj;
zskiplist *zsl;
zskiplistNode *ln;
int offset = 0, limit = -1; int offset = 0, limit = -1;
int withscores = 0; int withscores = 0;
int badsyntax = 0; unsigned long rangelen = 0;
void *replylen = NULL;
/* Parse the min-max interval. If one of the values is prefixed /* Parse the range arguments. */
* by the "(" character, it's considered "open". For instance zslParseRange(c->argv[2],c->argv[3],&range);
* ZRANGEBYSCORE zset (1.5 (2.5 will match min < x < max
* ZRANGEBYSCORE zset 1.5 2.5 will instead match min <= x <= max */
if (((char*)c->argv[2]->ptr)[0] == '(') {
min = strtod((char*)c->argv[2]->ptr+1,NULL);
minex = 1;
} else {
min = strtod(c->argv[2]->ptr,NULL);
}
if (((char*)c->argv[3]->ptr)[0] == '(') {
max = strtod((char*)c->argv[3]->ptr+1,NULL);
maxex = 1;
} else {
max = strtod(c->argv[3]->ptr,NULL);
}
/* Parse "WITHSCORES": note that if the command was called with /* Parse optional extra arguments. Note that ZCOUNT will exactly have
* the name ZCOUNT then we are sure that c->argc == 4, so we'll never * 4 arguments, so we'll never enter the following code path. */
* enter the following paths to parse WITHSCORES and LIMIT. */ if (c->argc > 4) {
if (c->argc == 5 || c->argc == 8) { int remaining = c->argc - 4;
if (strcasecmp(c->argv[c->argc-1]->ptr,"withscores") == 0) int pos = 4;
withscores = 1;
else
badsyntax = 1;
}
if (c->argc != (4 + withscores) && c->argc != (7 + withscores))
badsyntax = 1;
if (badsyntax) {
addReplyError(c,"wrong number of arguments for ZRANGEBYSCORE");
return;
}
/* Parse "LIMIT" */ while (remaining) {
if (c->argc == (7 + withscores) && strcasecmp(c->argv[4]->ptr,"limit")) { if (remaining >= 1 && !strcasecmp(c->argv[pos]->ptr,"withscores")) {
addReply(c,shared.syntaxerr); pos++; remaining--;
return; withscores = 1;
} else if (c->argc == (7 + withscores)) { } else if (remaining >= 3 && !strcasecmp(c->argv[pos]->ptr,"limit")) {
offset = atoi(c->argv[5]->ptr); offset = atoi(c->argv[pos+1]->ptr);
limit = atoi(c->argv[6]->ptr); limit = atoi(c->argv[pos+2]->ptr);
if (offset < 0) offset = 0; pos += 3; remaining -= 3;
} else {
addReply(c,shared.syntaxerr);
return;
}
}
} }
/* Ok, lookup the key and get the range */ /* Ok, lookup the key and get the range */
o = lookupKeyRead(c->db,c->argv[1]); emptyreply = justcount ? shared.czero : shared.emptymultibulk;
if (o == NULL) { if ((o = lookupKeyReadOrReply(c,c->argv[1],emptyreply)) == NULL ||
addReply(c,justcount ? shared.czero : shared.emptymultibulk); checkType(c,o,REDIS_ZSET)) return;
} else { zsetobj = o->ptr;
if (o->type != REDIS_ZSET) { zsl = zsetobj->zsl;
addReply(c,shared.wrongtypeerr);
/* If reversed, assume the elements are sorted from high to low score. */
ln = zslFirstWithScore(zsl,range.min);
if (reverse) {
/* If range.min is out of range, ln will be NULL and we need to use
* the tail of the skiplist as first node of the range. */
if (ln == NULL) ln = zsl->tail;
/* zslFirstWithScore returns the first element with where with
* score >= range.min, so backtrack to make sure the element we use
* here has score <= range.min. */
while (ln && ln->score > range.min) ln = ln->backward;
/* Move to the right element according to the range spec. */
if (range.minex) {
/* Find last element with score < range.min */
while (ln && ln->score == range.min) ln = ln->backward;
} else { } else {
zset *zsetobj = o->ptr; /* Find last element with score <= range.min */
zskiplist *zsl = zsetobj->zsl; while (ln && ln->level[0].forward &&
zskiplistNode *ln; ln->level[0].forward->score == range.min)
robj *ele;
void *replylen = NULL;
unsigned long rangelen = 0;
/* Get the first node with the score >= min, or with
* score > min if 'minex' is true. */
ln = zslFirstWithScore(zsl,min);
while (minex && ln && ln->score == min) ln = ln->level[0].forward;
if (ln == NULL) {
/* No element matching the speciifed interval */
addReply(c,justcount ? shared.czero : shared.emptymultibulk);
return;
}
/* We don't know in advance how many matching elements there
* are in the list, so we push this object that will represent
* the multi-bulk length in the output buffer, and will "fix"
* it later */
if (!justcount)
replylen = addDeferredMultiBulkLength(c);
while(ln && (maxex ? (ln->score < max) : (ln->score <= max))) {
if (offset) {
offset--;
ln = ln->level[0].forward;
continue;
}
if (limit == 0) break;
if (!justcount) {
ele = ln->obj;
addReplyBulk(c,ele);
if (withscores)
addReplyDouble(c,ln->score);
}
ln = ln->level[0].forward; ln = ln->level[0].forward;
rangelen++; }
if (limit > 0) limit--; } else {
} if (range.minex) {
if (justcount) { /* Find first element with score > range.min */
addReplyLongLong(c,(long)rangelen); while (ln && ln->score == range.min) ln = ln->level[0].forward;
}
}
/* No "first" element in the specified interval. */
if (ln == NULL) {
addReply(c,emptyreply);
return;
}
/* We don't know in advance how many matching elements there
* are in the list, so we push this object that will represent
* the multi-bulk length in the output buffer, and will "fix"
* it later */
if (!justcount)
replylen = addDeferredMultiBulkLength(c);
/* If there is an offset, just traverse the number of elements without
* checking the score because that is done in the next loop. */
while(ln && offset--) {
if (reverse)
ln = ln->backward;
else
ln = ln->level[0].forward;
}
while (ln && limit--) {
/* Check if this this element is in range. */
if (reverse) {
if (range.maxex) {
/* Element should have score > range.max */
if (ln->score <= range.max) break;
} else { } else {
setDeferredMultiBulkLength(c,replylen, /* Element should have score >= range.max */
withscores ? (rangelen*2) : rangelen); if (ln->score < range.max) break;
}
} else {
if (range.maxex) {
/* Element should have score < range.max */
if (ln->score >= range.max) break;
} else {
/* Element should have score <= range.max */
if (ln->score > range.max) break;
} }
} }
/* Do our magic */
rangelen++;
if (!justcount) {
addReplyBulk(c,ln->obj);
if (withscores)
addReplyDouble(c,ln->score);
}
if (reverse)
ln = ln->backward;
else
ln = ln->level[0].forward;
}
if (justcount) {
addReplyLongLong(c,(long)rangelen);
} else {
setDeferredMultiBulkLength(c,replylen,
withscores ? (rangelen*2) : rangelen);
} }
} }
void zrangebyscoreCommand(redisClient *c) { void zrangebyscoreCommand(redisClient *c) {
genericZrangebyscoreCommand(c,0); genericZrangebyscoreCommand(c,0,0);
}
void zrevrangebyscoreCommand(redisClient *c) {
genericZrangebyscoreCommand(c,1,0);
} }
void zcountCommand(redisClient *c) { void zcountCommand(redisClient *c) {
genericZrangebyscoreCommand(c,1); genericZrangebyscoreCommand(c,0,1);
} }
void zcardCommand(redisClient *c) { void zcardCommand(redisClient *c) {

View File

@ -199,26 +199,59 @@ start_server {tags {"zset"}} {
list $v1 $v2 [r zscore zset foo] [r zscore zset bar] list $v1 $v2 [r zscore zset foo] [r zscore zset bar]
} {{bar foo} {foo bar} -2 6} } {{bar foo} {foo bar} -2 6}
test {ZRANGEBYSCORE and ZCOUNT basics} { proc create_default_zset {} {
r del zset create_zset zset {-inf a 1 b 2 c 3 d 4 e 5 f +inf g}
r zadd zset 1 a }
r zadd zset 2 b
r zadd zset 3 c
r zadd zset 4 d
r zadd zset 5 e
list [r zrangebyscore zset 2 4] [r zrangebyscore zset (2 (4] \
[r zcount zset 2 4] [r zcount zset (2 (4]
} {{b c d} c 3 1}
test {ZRANGEBYSCORE withscores} { test "ZRANGEBYSCORE/ZREVRANGEBYSCORE/ZCOUNT basics" {
r del zset create_default_zset
r zadd zset 1 a
r zadd zset 2 b # inclusive range
r zadd zset 3 c assert_equal {a b c} [r zrangebyscore zset -inf 2]
r zadd zset 4 d assert_equal {b c d} [r zrangebyscore zset 0 3]
r zadd zset 5 e assert_equal {d e f} [r zrangebyscore zset 3 6]
r zrangebyscore zset 2 4 withscores assert_equal {e f g} [r zrangebyscore zset 4 +inf]
} {b 2 c 3 d 4} assert_equal {c b a} [r zrevrangebyscore zset 2 -inf]
assert_equal {d c b} [r zrevrangebyscore zset 3 0]
assert_equal {f e d} [r zrevrangebyscore zset 6 3]
assert_equal {g f e} [r zrevrangebyscore zset +inf 4]
assert_equal 3 [r zcount zset 0 3]
# exclusive range
assert_equal {b} [r zrangebyscore zset (-inf (2]
assert_equal {b c} [r zrangebyscore zset (0 (3]
assert_equal {e f} [r zrangebyscore zset (3 (6]
assert_equal {f} [r zrangebyscore zset (4 (+inf]
assert_equal {b} [r zrevrangebyscore zset (2 (-inf]
assert_equal {c b} [r zrevrangebyscore zset (3 (0]
assert_equal {f e} [r zrevrangebyscore zset (6 (3]
assert_equal {f} [r zrevrangebyscore zset (+inf (4]
assert_equal 2 [r zcount zset (0 (3]
}
test "ZRANGEBYSCORE with WITHSCORES" {
create_default_zset
assert_equal {b 1 c 2 d 3} [r zrangebyscore zset 0 3 withscores]
assert_equal {d 3 c 2 b 1} [r zrevrangebyscore zset 3 0 withscores]
}
test "ZRANGEBYSCORE with LIMIT" {
create_default_zset
assert_equal {b c} [r zrangebyscore zset 0 10 LIMIT 0 2]
assert_equal {d e f} [r zrangebyscore zset 0 10 LIMIT 2 3]
assert_equal {d e f} [r zrangebyscore zset 0 10 LIMIT 2 10]
assert_equal {} [r zrangebyscore zset 0 10 LIMIT 20 10]
assert_equal {f e} [r zrevrangebyscore zset 10 0 LIMIT 0 2]
assert_equal {d c b} [r zrevrangebyscore zset 10 0 LIMIT 2 3]
assert_equal {d c b} [r zrevrangebyscore zset 10 0 LIMIT 2 10]
assert_equal {} [r zrevrangebyscore zset 10 0 LIMIT 20 10]
}
test "ZRANGEBYSCORE with LIMIT and WITHSCORES" {
create_default_zset
assert_equal {e 4 f 5} [r zrangebyscore zset 2 5 LIMIT 2 3 WITHSCORES]
assert_equal {d 3 c 2} [r zrevrangebyscore zset 5 2 LIMIT 2 3 WITHSCORES]
}
tags {"slow"} { tags {"slow"} {
test {ZRANGEBYSCORE fuzzy test, 100 ranges in 1000 elements sorted set} { test {ZRANGEBYSCORE fuzzy test, 100 ranges in 1000 elements sorted set} {
@ -302,30 +335,6 @@ start_server {tags {"zset"}} {
} {} } {}
} }
test {ZRANGEBYSCORE with LIMIT} {
r del zset
r zadd zset 1 a
r zadd zset 2 b
r zadd zset 3 c
r zadd zset 4 d
r zadd zset 5 e
list \
[r zrangebyscore zset 0 10 LIMIT 0 2] \
[r zrangebyscore zset 0 10 LIMIT 2 3] \
[r zrangebyscore zset 0 10 LIMIT 2 10] \
[r zrangebyscore zset 0 10 LIMIT 20 10]
} {{a b} {c d e} {c d e} {}}
test {ZRANGEBYSCORE with LIMIT and withscores} {
r del zset
r zadd zset 10 a
r zadd zset 20 b
r zadd zset 30 c
r zadd zset 40 d
r zadd zset 50 e
r zrangebyscore zset 20 50 LIMIT 2 3 withscores
} {d 40 e 50}
test {ZREMRANGEBYSCORE basics} { test {ZREMRANGEBYSCORE basics} {
r del zset r del zset
r zadd zset 1 a r zadd zset 1 a