redict/src/t_zset.c
antirez ebcb6251e6 SCAN code refactored to parse cursor first.
The previous implementation of SCAN parsed the cursor in the generic
function implementing SCAN, SSCAN, HSCAN and ZSCAN.

The actual higher-level command implementation only checked for empty
keys and return ASAP in that case. The result was that inverting the
arguments of, for instance, SSCAN for example and write:

    SSCAN 0 key

Instead of

    SSCAN key 0

Resulted into no error, since 0 is a non-existing key name very likely.
Just the iterator returned no elements at all.

In order to fix this issue the code was refactored to extract the
function to parse the cursor and return the error. Every higher level
command implementation now parses the cursor and later checks if the key
exist or not.
2013-11-05 15:47:50 +01:00

2220 lines
71 KiB
C

/*
* Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
* Copyright (c) 2009-2012, Pieter Noordhuis <pcnoordhuis at gmail dot com>
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* * Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
* * Neither the name of Redis nor the names of its contributors may be used
* to endorse or promote products derived from this software without
* specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
/*-----------------------------------------------------------------------------
* Sorted set API
*----------------------------------------------------------------------------*/
/* ZSETs are ordered sets using two data structures to hold the same elements
* in order to get O(log(N)) INSERT and REMOVE operations into a sorted
* data structure.
*
* The elements are added to an hash table mapping Redis objects to scores.
* At the same time the elements are added to a skip list mapping scores
* to Redis objects (so objects are sorted by scores in this "view"). */
/* This skiplist implementation is almost a C translation of the original
* algorithm described by William Pugh in "Skip Lists: A Probabilistic
* Alternative to Balanced Trees", modified in three ways:
* a) this implementation allows for repeated scores.
* b) the comparison is not just by key (our 'score') but by satellite data.
* c) there is a back pointer, so it's a doubly linked list with the back
* pointers being only at "level 1". This allows to traverse the list
* from tail to head, useful for ZREVRANGE. */
#include "redis.h"
#include <math.h>
zskiplistNode *zslCreateNode(int level, double score, robj *obj) {
zskiplistNode *zn = zmalloc(sizeof(*zn)+level*sizeof(struct zskiplistLevel));
zn->score = score;
zn->obj = obj;
return zn;
}
zskiplist *zslCreate(void) {
int j;
zskiplist *zsl;
zsl = zmalloc(sizeof(*zsl));
zsl->level = 1;
zsl->length = 0;
zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);
for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++) {
zsl->header->level[j].forward = NULL;
zsl->header->level[j].span = 0;
}
zsl->header->backward = NULL;
zsl->tail = NULL;
return zsl;
}
void zslFreeNode(zskiplistNode *node) {
decrRefCount(node->obj);
zfree(node);
}
void zslFree(zskiplist *zsl) {
zskiplistNode *node = zsl->header->level[0].forward, *next;
zfree(zsl->header);
while(node) {
next = node->level[0].forward;
zslFreeNode(node);
node = next;
}
zfree(zsl);
}
/* Returns a random level for the new skiplist node we are going to create.
* The return value of this function is between 1 and ZSKIPLIST_MAXLEVEL
* (both inclusive), with a powerlaw-alike distribution where higher
* levels are less likely to be returned. */
int zslRandomLevel(void) {
int level = 1;
while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))
level += 1;
return (level<ZSKIPLIST_MAXLEVEL) ? level : ZSKIPLIST_MAXLEVEL;
}
zskiplistNode *zslInsert(zskiplist *zsl, double score, robj *obj) {
zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
unsigned int rank[ZSKIPLIST_MAXLEVEL];
int i, level;
redisAssert(!isnan(score));
x = zsl->header;
for (i = zsl->level-1; i >= 0; i--) {
/* store rank that is crossed to reach the insert position */
rank[i] = i == (zsl->level-1) ? 0 : rank[i+1];
while (x->level[i].forward &&
(x->level[i].forward->score < score ||
(x->level[i].forward->score == score &&
compareStringObjects(x->level[i].forward->obj,obj) < 0))) {
rank[i] += x->level[i].span;
x = x->level[i].forward;
}
update[i] = x;
}
/* we assume the key is not already inside, since we allow duplicated
* scores, and the re-insertion of score and redis object should never
* happen since the caller of zslInsert() should test in the hash table
* if the element is already inside or not. */
level = zslRandomLevel();
if (level > zsl->level) {
for (i = zsl->level; i < level; i++) {
rank[i] = 0;
update[i] = zsl->header;
update[i]->level[i].span = zsl->length;
}
zsl->level = level;
}
x = zslCreateNode(level,score,obj);
for (i = 0; i < level; i++) {
x->level[i].forward = update[i]->level[i].forward;
update[i]->level[i].forward = x;
/* update span covered by update[i] as x is inserted here */
x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);
update[i]->level[i].span = (rank[0] - rank[i]) + 1;
}
/* increment span for untouched levels */
for (i = level; i < zsl->level; i++) {
update[i]->level[i].span++;
}
x->backward = (update[0] == zsl->header) ? NULL : update[0];
if (x->level[0].forward)
x->level[0].forward->backward = x;
else
zsl->tail = x;
zsl->length++;
return x;
}
/* Internal function used by zslDelete, zslDeleteByScore and zslDeleteByRank */
void zslDeleteNode(zskiplist *zsl, zskiplistNode *x, zskiplistNode **update) {
int i;
for (i = 0; i < zsl->level; i++) {
if (update[i]->level[i].forward == x) {
update[i]->level[i].span += x->level[i].span - 1;
update[i]->level[i].forward = x->level[i].forward;
} else {
update[i]->level[i].span -= 1;
}
}
if (x->level[0].forward) {
x->level[0].forward->backward = x->backward;
} else {
zsl->tail = x->backward;
}
while(zsl->level > 1 && zsl->header->level[zsl->level-1].forward == NULL)
zsl->level--;
zsl->length--;
}
/* Delete an element with matching score/object from the skiplist. */
int zslDelete(zskiplist *zsl, double score, robj *obj) {
zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
int i;
x = zsl->header;
for (i = zsl->level-1; i >= 0; i--) {
while (x->level[i].forward &&
(x->level[i].forward->score < score ||
(x->level[i].forward->score == score &&
compareStringObjects(x->level[i].forward->obj,obj) < 0)))
x = x->level[i].forward;
update[i] = x;
}
/* We may have multiple elements with the same score, what we need
* is to find the element with both the right score and object. */
x = x->level[0].forward;
if (x && score == x->score && equalStringObjects(x->obj,obj)) {
zslDeleteNode(zsl, x, update);
zslFreeNode(x);
return 1;
} else {
return 0; /* not found */
}
return 0; /* not found */
}
static int zslValueGteMin(double value, zrangespec *spec) {
return spec->minex ? (value > spec->min) : (value >= spec->min);
}
static int zslValueLteMax(double value, zrangespec *spec) {
return spec->maxex ? (value < spec->max) : (value <= spec->max);
}
/* Returns if there is a part of the zset is in range. */
int zslIsInRange(zskiplist *zsl, zrangespec *range) {
zskiplistNode *x;
/* Test for ranges that will always be empty. */
if (range->min > range->max ||
(range->min == range->max && (range->minex || range->maxex)))
return 0;
x = zsl->tail;
if (x == NULL || !zslValueGteMin(x->score,range))
return 0;
x = zsl->header->level[0].forward;
if (x == NULL || !zslValueLteMax(x->score,range))
return 0;
return 1;
}
/* Find the first node that is contained in the specified range.
* Returns NULL when no element is contained in the range. */
zskiplistNode *zslFirstInRange(zskiplist *zsl, zrangespec range) {
zskiplistNode *x;
int i;
/* If everything is out of range, return early. */
if (!zslIsInRange(zsl,&range)) return NULL;
x = zsl->header;
for (i = zsl->level-1; i >= 0; i--) {
/* Go forward while *OUT* of range. */
while (x->level[i].forward &&
!zslValueGteMin(x->level[i].forward->score,&range))
x = x->level[i].forward;
}
/* This is an inner range, so the next node cannot be NULL. */
x = x->level[0].forward;
redisAssert(x != NULL);
/* Check if score <= max. */
if (!zslValueLteMax(x->score,&range)) return NULL;
return x;
}
/* Find the last node that is contained in the specified range.
* Returns NULL when no element is contained in the range. */
zskiplistNode *zslLastInRange(zskiplist *zsl, zrangespec range) {
zskiplistNode *x;
int i;
/* If everything is out of range, return early. */
if (!zslIsInRange(zsl,&range)) return NULL;
x = zsl->header;
for (i = zsl->level-1; i >= 0; i--) {
/* Go forward while *IN* range. */
while (x->level[i].forward &&
zslValueLteMax(x->level[i].forward->score,&range))
x = x->level[i].forward;
}
/* This is an inner range, so this node cannot be NULL. */
redisAssert(x != NULL);
/* Check if score >= min. */
if (!zslValueGteMin(x->score,&range)) return NULL;
return x;
}
/* Delete all the elements with score between min and max from the skiplist.
* Min and max are inclusive, so a score >= min || score <= max is deleted.
* Note that this function takes the reference to the hash table view of the
* sorted set, in order to remove the elements from the hash table too. */
unsigned long zslDeleteRangeByScore(zskiplist *zsl, zrangespec range, dict *dict) {
zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
unsigned long removed = 0;
int i;
x = zsl->header;
for (i = zsl->level-1; i >= 0; i--) {
while (x->level[i].forward && (range.minex ?
x->level[i].forward->score <= range.min :
x->level[i].forward->score < range.min))
x = x->level[i].forward;
update[i] = x;
}
/* Current node is the last with score < or <= min. */
x = x->level[0].forward;
/* Delete nodes while in range. */
while (x && (range.maxex ? x->score < range.max : x->score <= range.max)) {
zskiplistNode *next = x->level[0].forward;
zslDeleteNode(zsl,x,update);
dictDelete(dict,x->obj);
zslFreeNode(x);
removed++;
x = next;
}
return removed;
}
/* Delete all the elements with rank between start and end from the skiplist.
* Start and end are inclusive. Note that start and end need to be 1-based */
unsigned long zslDeleteRangeByRank(zskiplist *zsl, unsigned int start, unsigned int end, dict *dict) {
zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
unsigned long traversed = 0, removed = 0;
int i;
x = zsl->header;
for (i = zsl->level-1; i >= 0; i--) {
while (x->level[i].forward && (traversed + x->level[i].span) < start) {
traversed += x->level[i].span;
x = x->level[i].forward;
}
update[i] = x;
}
traversed++;
x = x->level[0].forward;
while (x && traversed <= end) {
zskiplistNode *next = x->level[0].forward;
zslDeleteNode(zsl,x,update);
dictDelete(dict,x->obj);
zslFreeNode(x);
removed++;
traversed++;
x = next;
}
return removed;
}
/* Find the rank for an element by both score and key.
* Returns 0 when the element cannot be found, rank otherwise.
* Note that the rank is 1-based due to the span of zsl->header to the
* first element. */
unsigned long zslGetRank(zskiplist *zsl, double score, robj *o) {
zskiplistNode *x;
unsigned long rank = 0;
int i;
x = zsl->header;
for (i = zsl->level-1; i >= 0; i--) {
while (x->level[i].forward &&
(x->level[i].forward->score < score ||
(x->level[i].forward->score == score &&
compareStringObjects(x->level[i].forward->obj,o) <= 0))) {
rank += x->level[i].span;
x = x->level[i].forward;
}
/* x might be equal to zsl->header, so test if obj is non-NULL */
if (x->obj && equalStringObjects(x->obj,o)) {
return rank;
}
}
return 0;
}
/* Finds an element by its rank. The rank argument needs to be 1-based. */
zskiplistNode* zslGetElementByRank(zskiplist *zsl, unsigned long rank) {
zskiplistNode *x;
unsigned long traversed = 0;
int i;
x = zsl->header;
for (i = zsl->level-1; i >= 0; i--) {
while (x->level[i].forward && (traversed + x->level[i].span) <= rank)
{
traversed += x->level[i].span;
x = x->level[i].forward;
}
if (traversed == rank) {
return x;
}
}
return NULL;
}
/* Populate the rangespec according to the objects min and max. */
static int zslParseRange(robj *min, robj *max, zrangespec *spec) {
char *eptr;
spec->minex = spec->maxex = 0;
/* Parse the min-max interval. If one of the values is prefixed
* by the "(" character, it's considered "open". For instance
* ZRANGEBYSCORE zset (1.5 (2.5 will match min < x < max
* ZRANGEBYSCORE zset 1.5 2.5 will instead match min <= x <= max */
if (min->encoding == REDIS_ENCODING_INT) {
spec->min = (long)min->ptr;
} else {
if (((char*)min->ptr)[0] == '(') {
spec->min = strtod((char*)min->ptr+1,&eptr);
if (eptr[0] != '\0' || isnan(spec->min)) return REDIS_ERR;
spec->minex = 1;
} else {
spec->min = strtod((char*)min->ptr,&eptr);
if (eptr[0] != '\0' || isnan(spec->min)) return REDIS_ERR;
}
}
if (max->encoding == REDIS_ENCODING_INT) {
spec->max = (long)max->ptr;
} else {
if (((char*)max->ptr)[0] == '(') {
spec->max = strtod((char*)max->ptr+1,&eptr);
if (eptr[0] != '\0' || isnan(spec->max)) return REDIS_ERR;
spec->maxex = 1;
} else {
spec->max = strtod((char*)max->ptr,&eptr);
if (eptr[0] != '\0' || isnan(spec->max)) return REDIS_ERR;
}
}
return REDIS_OK;
}
/*-----------------------------------------------------------------------------
* Ziplist-backed sorted set API
*----------------------------------------------------------------------------*/
double zzlGetScore(unsigned char *sptr) {
unsigned char *vstr;
unsigned int vlen;
long long vlong;
char buf[128];
double score;
redisAssert(sptr != NULL);
redisAssert(ziplistGet(sptr,&vstr,&vlen,&vlong));
if (vstr) {
memcpy(buf,vstr,vlen);
buf[vlen] = '\0';
score = strtod(buf,NULL);
} else {
score = vlong;
}
return score;
}
/* Compare element in sorted set with given element. */
int zzlCompareElements(unsigned char *eptr, unsigned char *cstr, unsigned int clen) {
unsigned char *vstr;
unsigned int vlen;
long long vlong;
unsigned char vbuf[32];
int minlen, cmp;
redisAssert(ziplistGet(eptr,&vstr,&vlen,&vlong));
if (vstr == NULL) {
/* Store string representation of long long in buf. */
vlen = ll2string((char*)vbuf,sizeof(vbuf),vlong);
vstr = vbuf;
}
minlen = (vlen < clen) ? vlen : clen;
cmp = memcmp(vstr,cstr,minlen);
if (cmp == 0) return vlen-clen;
return cmp;
}
unsigned int zzlLength(unsigned char *zl) {
return ziplistLen(zl)/2;
}
/* Move to next entry based on the values in eptr and sptr. Both are set to
* NULL when there is no next entry. */
void zzlNext(unsigned char *zl, unsigned char **eptr, unsigned char **sptr) {
unsigned char *_eptr, *_sptr;
redisAssert(*eptr != NULL && *sptr != NULL);
_eptr = ziplistNext(zl,*sptr);
if (_eptr != NULL) {
_sptr = ziplistNext(zl,_eptr);
redisAssert(_sptr != NULL);
} else {
/* No next entry. */
_sptr = NULL;
}
*eptr = _eptr;
*sptr = _sptr;
}
/* Move to the previous entry based on the values in eptr and sptr. Both are
* set to NULL when there is no next entry. */
void zzlPrev(unsigned char *zl, unsigned char **eptr, unsigned char **sptr) {
unsigned char *_eptr, *_sptr;
redisAssert(*eptr != NULL && *sptr != NULL);
_sptr = ziplistPrev(zl,*eptr);
if (_sptr != NULL) {
_eptr = ziplistPrev(zl,_sptr);
redisAssert(_eptr != NULL);
} else {
/* No previous entry. */
_eptr = NULL;
}
*eptr = _eptr;
*sptr = _sptr;
}
/* Returns if there is a part of the zset is in range. Should only be used
* internally by zzlFirstInRange and zzlLastInRange. */
int zzlIsInRange(unsigned char *zl, zrangespec *range) {
unsigned char *p;
double score;
/* Test for ranges that will always be empty. */
if (range->min > range->max ||
(range->min == range->max && (range->minex || range->maxex)))
return 0;
p = ziplistIndex(zl,-1); /* Last score. */
if (p == NULL) return 0; /* Empty sorted set */
score = zzlGetScore(p);
if (!zslValueGteMin(score,range))
return 0;
p = ziplistIndex(zl,1); /* First score. */
redisAssert(p != NULL);
score = zzlGetScore(p);
if (!zslValueLteMax(score,range))
return 0;
return 1;
}
/* Find pointer to the first element contained in the specified range.
* Returns NULL when no element is contained in the range. */
unsigned char *zzlFirstInRange(unsigned char *zl, zrangespec range) {
unsigned char *eptr = ziplistIndex(zl,0), *sptr;
double score;
/* If everything is out of range, return early. */
if (!zzlIsInRange(zl,&range)) return NULL;
while (eptr != NULL) {
sptr = ziplistNext(zl,eptr);
redisAssert(sptr != NULL);
score = zzlGetScore(sptr);
if (zslValueGteMin(score,&range)) {
/* Check if score <= max. */
if (zslValueLteMax(score,&range))
return eptr;
return NULL;
}
/* Move to next element. */
eptr = ziplistNext(zl,sptr);
}
return NULL;
}
/* Find pointer to the last element contained in the specified range.
* Returns NULL when no element is contained in the range. */
unsigned char *zzlLastInRange(unsigned char *zl, zrangespec range) {
unsigned char *eptr = ziplistIndex(zl,-2), *sptr;
double score;
/* If everything is out of range, return early. */
if (!zzlIsInRange(zl,&range)) return NULL;
while (eptr != NULL) {
sptr = ziplistNext(zl,eptr);
redisAssert(sptr != NULL);
score = zzlGetScore(sptr);
if (zslValueLteMax(score,&range)) {
/* Check if score >= min. */
if (zslValueGteMin(score,&range))
return eptr;
return NULL;
}
/* Move to previous element by moving to the score of previous element.
* When this returns NULL, we know there also is no element. */
sptr = ziplistPrev(zl,eptr);
if (sptr != NULL)
redisAssert((eptr = ziplistPrev(zl,sptr)) != NULL);
else
eptr = NULL;
}
return NULL;
}
unsigned char *zzlFind(unsigned char *zl, robj *ele, double *score) {
unsigned char *eptr = ziplistIndex(zl,0), *sptr;
ele = getDecodedObject(ele);
while (eptr != NULL) {
sptr = ziplistNext(zl,eptr);
redisAssertWithInfo(NULL,ele,sptr != NULL);
if (ziplistCompare(eptr,ele->ptr,sdslen(ele->ptr))) {
/* Matching element, pull out score. */
if (score != NULL) *score = zzlGetScore(sptr);
decrRefCount(ele);
return eptr;
}
/* Move to next element. */
eptr = ziplistNext(zl,sptr);
}
decrRefCount(ele);
return NULL;
}
/* Delete (element,score) pair from ziplist. Use local copy of eptr because we
* don't want to modify the one given as argument. */
unsigned char *zzlDelete(unsigned char *zl, unsigned char *eptr) {
unsigned char *p = eptr;
/* TODO: add function to ziplist API to delete N elements from offset. */
zl = ziplistDelete(zl,&p);
zl = ziplistDelete(zl,&p);
return zl;
}
unsigned char *zzlInsertAt(unsigned char *zl, unsigned char *eptr, robj *ele, double score) {
unsigned char *sptr;
char scorebuf[128];
int scorelen;
size_t offset;
redisAssertWithInfo(NULL,ele,sdsEncodedObject(ele));
scorelen = d2string(scorebuf,sizeof(scorebuf),score);
if (eptr == NULL) {
zl = ziplistPush(zl,ele->ptr,sdslen(ele->ptr),ZIPLIST_TAIL);
zl = ziplistPush(zl,(unsigned char*)scorebuf,scorelen,ZIPLIST_TAIL);
} else {
/* Keep offset relative to zl, as it might be re-allocated. */
offset = eptr-zl;
zl = ziplistInsert(zl,eptr,ele->ptr,sdslen(ele->ptr));
eptr = zl+offset;
/* Insert score after the element. */
redisAssertWithInfo(NULL,ele,(sptr = ziplistNext(zl,eptr)) != NULL);
zl = ziplistInsert(zl,sptr,(unsigned char*)scorebuf,scorelen);
}
return zl;
}
/* Insert (element,score) pair in ziplist. This function assumes the element is
* not yet present in the list. */
unsigned char *zzlInsert(unsigned char *zl, robj *ele, double score) {
unsigned char *eptr = ziplistIndex(zl,0), *sptr;
double s;
ele = getDecodedObject(ele);
while (eptr != NULL) {
sptr = ziplistNext(zl,eptr);
redisAssertWithInfo(NULL,ele,sptr != NULL);
s = zzlGetScore(sptr);
if (s > score) {
/* First element with score larger than score for element to be
* inserted. This means we should take its spot in the list to
* maintain ordering. */
zl = zzlInsertAt(zl,eptr,ele,score);
break;
} else if (s == score) {
/* Ensure lexicographical ordering for elements. */
if (zzlCompareElements(eptr,ele->ptr,sdslen(ele->ptr)) > 0) {
zl = zzlInsertAt(zl,eptr,ele,score);
break;
}
}
/* Move to next element. */
eptr = ziplistNext(zl,sptr);
}
/* Push on tail of list when it was not yet inserted. */
if (eptr == NULL)
zl = zzlInsertAt(zl,NULL,ele,score);
decrRefCount(ele);
return zl;
}
unsigned char *zzlDeleteRangeByScore(unsigned char *zl, zrangespec range, unsigned long *deleted) {
unsigned char *eptr, *sptr;
double score;
unsigned long num = 0;
if (deleted != NULL) *deleted = 0;
eptr = zzlFirstInRange(zl,range);
if (eptr == NULL) return zl;
/* When the tail of the ziplist is deleted, eptr will point to the sentinel
* byte and ziplistNext will return NULL. */
while ((sptr = ziplistNext(zl,eptr)) != NULL) {
score = zzlGetScore(sptr);
if (zslValueLteMax(score,&range)) {
/* Delete both the element and the score. */
zl = ziplistDelete(zl,&eptr);
zl = ziplistDelete(zl,&eptr);
num++;
} else {
/* No longer in range. */
break;
}
}
if (deleted != NULL) *deleted = num;
return zl;
}
/* Delete all the elements with rank between start and end from the skiplist.
* Start and end are inclusive. Note that start and end need to be 1-based */
unsigned char *zzlDeleteRangeByRank(unsigned char *zl, unsigned int start, unsigned int end, unsigned long *deleted) {
unsigned int num = (end-start)+1;
if (deleted) *deleted = num;
zl = ziplistDeleteRange(zl,2*(start-1),2*num);
return zl;
}
/*-----------------------------------------------------------------------------
* Common sorted set API
*----------------------------------------------------------------------------*/
unsigned int zsetLength(robj *zobj) {
int length = -1;
if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
length = zzlLength(zobj->ptr);
} else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) {
length = ((zset*)zobj->ptr)->zsl->length;
} else {
redisPanic("Unknown sorted set encoding");
}
return length;
}
void zsetConvert(robj *zobj, int encoding) {
zset *zs;
zskiplistNode *node, *next;
robj *ele;
double score;
if (zobj->encoding == encoding) return;
if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
unsigned char *zl = zobj->ptr;
unsigned char *eptr, *sptr;
unsigned char *vstr;
unsigned int vlen;
long long vlong;
if (encoding != REDIS_ENCODING_SKIPLIST)
redisPanic("Unknown target encoding");
zs = zmalloc(sizeof(*zs));
zs->dict = dictCreate(&zsetDictType,NULL);
zs->zsl = zslCreate();
eptr = ziplistIndex(zl,0);
redisAssertWithInfo(NULL,zobj,eptr != NULL);
sptr = ziplistNext(zl,eptr);
redisAssertWithInfo(NULL,zobj,sptr != NULL);
while (eptr != NULL) {
score = zzlGetScore(sptr);
redisAssertWithInfo(NULL,zobj,ziplistGet(eptr,&vstr,&vlen,&vlong));
if (vstr == NULL)
ele = createStringObjectFromLongLong(vlong);
else
ele = createStringObject((char*)vstr,vlen);
/* Has incremented refcount since it was just created. */
node = zslInsert(zs->zsl,score,ele);
redisAssertWithInfo(NULL,zobj,dictAdd(zs->dict,ele,&node->score) == DICT_OK);
incrRefCount(ele); /* Added to dictionary. */
zzlNext(zl,&eptr,&sptr);
}
zfree(zobj->ptr);
zobj->ptr = zs;
zobj->encoding = REDIS_ENCODING_SKIPLIST;
} else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) {
unsigned char *zl = ziplistNew();
if (encoding != REDIS_ENCODING_ZIPLIST)
redisPanic("Unknown target encoding");
/* Approach similar to zslFree(), since we want to free the skiplist at
* the same time as creating the ziplist. */
zs = zobj->ptr;
dictRelease(zs->dict);
node = zs->zsl->header->level[0].forward;
zfree(zs->zsl->header);
zfree(zs->zsl);
while (node) {
ele = getDecodedObject(node->obj);
zl = zzlInsertAt(zl,NULL,ele,node->score);
decrRefCount(ele);
next = node->level[0].forward;
zslFreeNode(node);
node = next;
}
zfree(zs);
zobj->ptr = zl;
zobj->encoding = REDIS_ENCODING_ZIPLIST;
} else {
redisPanic("Unknown sorted set encoding");
}
}
/*-----------------------------------------------------------------------------
* Sorted set commands
*----------------------------------------------------------------------------*/
/* This generic command implements both ZADD and ZINCRBY. */
void zaddGenericCommand(redisClient *c, int incr) {
static char *nanerr = "resulting score is not a number (NaN)";
robj *key = c->argv[1];
robj *ele;
robj *zobj;
robj *curobj;
double score = 0, *scores = NULL, curscore = 0.0;
int j, elements = (c->argc-2)/2;
int added = 0, updated = 0;
if (c->argc % 2) {
addReply(c,shared.syntaxerr);
return;
}
/* Start parsing all the scores, we need to emit any syntax error
* before executing additions to the sorted set, as the command should
* either execute fully or nothing at all. */
scores = zmalloc(sizeof(double)*elements);
for (j = 0; j < elements; j++) {
if (getDoubleFromObjectOrReply(c,c->argv[2+j*2],&scores[j],NULL)
!= REDIS_OK) goto cleanup;
}
/* Lookup the key and create the sorted set if does not exist. */
zobj = lookupKeyWrite(c->db,key);
if (zobj == NULL) {
if (server.zset_max_ziplist_entries == 0 ||
server.zset_max_ziplist_value < sdslen(c->argv[3]->ptr))
{
zobj = createZsetObject();
} else {
zobj = createZsetZiplistObject();
}
dbAdd(c->db,key,zobj);
} else {
if (zobj->type != REDIS_ZSET) {
addReply(c,shared.wrongtypeerr);
goto cleanup;
}
}
for (j = 0; j < elements; j++) {
score = scores[j];
if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
unsigned char *eptr;
/* Prefer non-encoded element when dealing with ziplists. */
ele = c->argv[3+j*2];
if ((eptr = zzlFind(zobj->ptr,ele,&curscore)) != NULL) {
if (incr) {
score += curscore;
if (isnan(score)) {
addReplyError(c,nanerr);
goto cleanup;
}
}
/* Remove and re-insert when score changed. */
if (score != curscore) {
zobj->ptr = zzlDelete(zobj->ptr,eptr);
zobj->ptr = zzlInsert(zobj->ptr,ele,score);
server.dirty++;
updated++;
}
} else {
/* Optimize: check if the element is too large or the list
* becomes too long *before* executing zzlInsert. */
zobj->ptr = zzlInsert(zobj->ptr,ele,score);
if (zzlLength(zobj->ptr) > server.zset_max_ziplist_entries)
zsetConvert(zobj,REDIS_ENCODING_SKIPLIST);
if (sdslen(ele->ptr) > server.zset_max_ziplist_value)
zsetConvert(zobj,REDIS_ENCODING_SKIPLIST);
server.dirty++;
added++;
}
} else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) {
zset *zs = zobj->ptr;
zskiplistNode *znode;
dictEntry *de;
ele = c->argv[3+j*2] = tryObjectEncoding(c->argv[3+j*2]);
de = dictFind(zs->dict,ele);
if (de != NULL) {
curobj = dictGetKey(de);
curscore = *(double*)dictGetVal(de);
if (incr) {
score += curscore;
if (isnan(score)) {
addReplyError(c,nanerr);
/* Don't need to check if the sorted set is empty
* because we know it has at least one element. */
goto cleanup;
}
}
/* Remove and re-insert when score changed. We can safely
* delete the key object from the skiplist, since the
* dictionary still has a reference to it. */
if (score != curscore) {
redisAssertWithInfo(c,curobj,zslDelete(zs->zsl,curscore,curobj));
znode = zslInsert(zs->zsl,score,curobj);
incrRefCount(curobj); /* Re-inserted in skiplist. */
dictGetVal(de) = &znode->score; /* Update score ptr. */
server.dirty++;
updated++;
}
} else {
znode = zslInsert(zs->zsl,score,ele);
incrRefCount(ele); /* Inserted in skiplist. */
redisAssertWithInfo(c,NULL,dictAdd(zs->dict,ele,&znode->score) == DICT_OK);
incrRefCount(ele); /* Added to dictionary. */
server.dirty++;
added++;
}
} else {
redisPanic("Unknown sorted set encoding");
}
}
if (incr) /* ZINCRBY */
addReplyDouble(c,score);
else /* ZADD */
addReplyLongLong(c,added);
cleanup:
zfree(scores);
if (added || updated) {
signalModifiedKey(c->db,key);
notifyKeyspaceEvent(REDIS_NOTIFY_ZSET,
incr ? "zincr" : "zadd", key, c->db->id);
}
}
void zaddCommand(redisClient *c) {
zaddGenericCommand(c,0);
}
void zincrbyCommand(redisClient *c) {
zaddGenericCommand(c,1);
}
void zremCommand(redisClient *c) {
robj *key = c->argv[1];
robj *zobj;
int deleted = 0, keyremoved = 0, j;
if ((zobj = lookupKeyWriteOrReply(c,key,shared.czero)) == NULL ||
checkType(c,zobj,REDIS_ZSET)) return;
if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
unsigned char *eptr;
for (j = 2; j < c->argc; j++) {
if ((eptr = zzlFind(zobj->ptr,c->argv[j],NULL)) != NULL) {
deleted++;
zobj->ptr = zzlDelete(zobj->ptr,eptr);
if (zzlLength(zobj->ptr) == 0) {
dbDelete(c->db,key);
break;
}
}
}
} else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) {
zset *zs = zobj->ptr;
dictEntry *de;
double score;
for (j = 2; j < c->argc; j++) {
de = dictFind(zs->dict,c->argv[j]);
if (de != NULL) {
deleted++;
/* Delete from the skiplist */
score = *(double*)dictGetVal(de);
redisAssertWithInfo(c,c->argv[j],zslDelete(zs->zsl,score,c->argv[j]));
/* Delete from the hash table */
dictDelete(zs->dict,c->argv[j]);
if (htNeedsResize(zs->dict)) dictResize(zs->dict);
if (dictSize(zs->dict) == 0) {
dbDelete(c->db,key);
break;
}
}
}
} else {
redisPanic("Unknown sorted set encoding");
}
if (deleted) {
notifyKeyspaceEvent(REDIS_NOTIFY_ZSET,"zrem",key,c->db->id);
if (keyremoved)
notifyKeyspaceEvent(REDIS_NOTIFY_GENERIC,"del",key,c->db->id);
signalModifiedKey(c->db,key);
server.dirty += deleted;
}
addReplyLongLong(c,deleted);
}
void zremrangebyscoreCommand(redisClient *c) {
robj *key = c->argv[1];
robj *zobj;
zrangespec range;
int keyremoved = 0;
unsigned long deleted;
/* Parse the range arguments. */
if (zslParseRange(c->argv[2],c->argv[3],&range) != REDIS_OK) {
addReplyError(c,"min or max is not a float");
return;
}
if ((zobj = lookupKeyWriteOrReply(c,key,shared.czero)) == NULL ||
checkType(c,zobj,REDIS_ZSET)) return;
if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
zobj->ptr = zzlDeleteRangeByScore(zobj->ptr,range,&deleted);
if (zzlLength(zobj->ptr) == 0) {
dbDelete(c->db,key);
keyremoved = 1;
}
} else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) {
zset *zs = zobj->ptr;
deleted = zslDeleteRangeByScore(zs->zsl,range,zs->dict);
if (htNeedsResize(zs->dict)) dictResize(zs->dict);
if (dictSize(zs->dict) == 0) {
dbDelete(c->db,key);
keyremoved = 1;
}
} else {
redisPanic("Unknown sorted set encoding");
}
if (deleted) {
signalModifiedKey(c->db,key);
notifyKeyspaceEvent(REDIS_NOTIFY_ZSET,"zrembyscore",key,c->db->id);
if (keyremoved)
notifyKeyspaceEvent(REDIS_NOTIFY_GENERIC,"del",key,c->db->id);
}
server.dirty += deleted;
addReplyLongLong(c,deleted);
}
void zremrangebyrankCommand(redisClient *c) {
robj *key = c->argv[1];
robj *zobj;
long start;
long end;
int llen;
unsigned long deleted;
int keyremoved = 0;
if ((getLongFromObjectOrReply(c, c->argv[2], &start, NULL) != REDIS_OK) ||
(getLongFromObjectOrReply(c, c->argv[3], &end, NULL) != REDIS_OK)) return;
if ((zobj = lookupKeyWriteOrReply(c,key,shared.czero)) == NULL ||
checkType(c,zobj,REDIS_ZSET)) return;
/* Sanitize indexes. */
llen = zsetLength(zobj);
if (start < 0) start = llen+start;
if (end < 0) end = llen+end;
if (start < 0) start = 0;
/* Invariant: start >= 0, so this test will be true when end < 0.
* The range is empty when start > end or start >= length. */
if (start > end || start >= llen) {
addReply(c,shared.czero);
return;
}
if (end >= llen) end = llen-1;
if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
/* Correct for 1-based rank. */
zobj->ptr = zzlDeleteRangeByRank(zobj->ptr,start+1,end+1,&deleted);
if (zzlLength(zobj->ptr) == 0) {
dbDelete(c->db,key);
keyremoved = 1;
}
} else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) {
zset *zs = zobj->ptr;
/* Correct for 1-based rank. */
deleted = zslDeleteRangeByRank(zs->zsl,start+1,end+1,zs->dict);
if (htNeedsResize(zs->dict)) dictResize(zs->dict);
if (dictSize(zs->dict) == 0) {
dbDelete(c->db,key);
keyremoved = 1;
}
} else {
redisPanic("Unknown sorted set encoding");
}
if (deleted) {
signalModifiedKey(c->db,key);
notifyKeyspaceEvent(REDIS_NOTIFY_ZSET,"zrembyrank",key,c->db->id);
if (keyremoved)
notifyKeyspaceEvent(REDIS_NOTIFY_GENERIC,"del",key,c->db->id);
}
server.dirty += deleted;
addReplyLongLong(c,deleted);
}
typedef struct {
robj *subject;
int type; /* Set, sorted set */
int encoding;
double weight;
union {
/* Set iterators. */
union _iterset {
struct {
intset *is;
int ii;
} is;
struct {
dict *dict;
dictIterator *di;
dictEntry *de;
} ht;
} set;
/* Sorted set iterators. */
union _iterzset {
struct {
unsigned char *zl;
unsigned char *eptr, *sptr;
} zl;
struct {
zset *zs;
zskiplistNode *node;
} sl;
} zset;
} iter;
} zsetopsrc;
/* Use dirty flags for pointers that need to be cleaned up in the next
* iteration over the zsetopval. The dirty flag for the long long value is
* special, since long long values don't need cleanup. Instead, it means that
* we already checked that "ell" holds a long long, or tried to convert another
* representation into a long long value. When this was successful,
* OPVAL_VALID_LL is set as well. */
#define OPVAL_DIRTY_ROBJ 1
#define OPVAL_DIRTY_LL 2
#define OPVAL_VALID_LL 4
/* Store value retrieved from the iterator. */
typedef struct {
int flags;
unsigned char _buf[32]; /* Private buffer. */
robj *ele;
unsigned char *estr;
unsigned int elen;
long long ell;
double score;
} zsetopval;
typedef union _iterset iterset;
typedef union _iterzset iterzset;
void zuiInitIterator(zsetopsrc *op) {
if (op->subject == NULL)
return;
if (op->type == REDIS_SET) {
iterset *it = &op->iter.set;
if (op->encoding == REDIS_ENCODING_INTSET) {
it->is.is = op->subject->ptr;
it->is.ii = 0;
} else if (op->encoding == REDIS_ENCODING_HT) {
it->ht.dict = op->subject->ptr;
it->ht.di = dictGetIterator(op->subject->ptr);
it->ht.de = dictNext(it->ht.di);
} else {
redisPanic("Unknown set encoding");
}
} else if (op->type == REDIS_ZSET) {
iterzset *it = &op->iter.zset;
if (op->encoding == REDIS_ENCODING_ZIPLIST) {
it->zl.zl = op->subject->ptr;
it->zl.eptr = ziplistIndex(it->zl.zl,0);
if (it->zl.eptr != NULL) {
it->zl.sptr = ziplistNext(it->zl.zl,it->zl.eptr);
redisAssert(it->zl.sptr != NULL);
}
} else if (op->encoding == REDIS_ENCODING_SKIPLIST) {
it->sl.zs = op->subject->ptr;
it->sl.node = it->sl.zs->zsl->header->level[0].forward;
} else {
redisPanic("Unknown sorted set encoding");
}
} else {
redisPanic("Unsupported type");
}
}
void zuiClearIterator(zsetopsrc *op) {
if (op->subject == NULL)
return;
if (op->type == REDIS_SET) {
iterset *it = &op->iter.set;
if (op->encoding == REDIS_ENCODING_INTSET) {
REDIS_NOTUSED(it); /* skip */
} else if (op->encoding == REDIS_ENCODING_HT) {
dictReleaseIterator(it->ht.di);
} else {
redisPanic("Unknown set encoding");
}
} else if (op->type == REDIS_ZSET) {
iterzset *it = &op->iter.zset;
if (op->encoding == REDIS_ENCODING_ZIPLIST) {
REDIS_NOTUSED(it); /* skip */
} else if (op->encoding == REDIS_ENCODING_SKIPLIST) {
REDIS_NOTUSED(it); /* skip */
} else {
redisPanic("Unknown sorted set encoding");
}
} else {
redisPanic("Unsupported type");
}
}
int zuiLength(zsetopsrc *op) {
if (op->subject == NULL)
return 0;
if (op->type == REDIS_SET) {
if (op->encoding == REDIS_ENCODING_INTSET) {
return intsetLen(op->subject->ptr);
} else if (op->encoding == REDIS_ENCODING_HT) {
dict *ht = op->subject->ptr;
return dictSize(ht);
} else {
redisPanic("Unknown set encoding");
}
} else if (op->type == REDIS_ZSET) {
if (op->encoding == REDIS_ENCODING_ZIPLIST) {
return zzlLength(op->subject->ptr);
} else if (op->encoding == REDIS_ENCODING_SKIPLIST) {
zset *zs = op->subject->ptr;
return zs->zsl->length;
} else {
redisPanic("Unknown sorted set encoding");
}
} else {
redisPanic("Unsupported type");
}
}
/* Check if the current value is valid. If so, store it in the passed structure
* and move to the next element. If not valid, this means we have reached the
* end of the structure and can abort. */
int zuiNext(zsetopsrc *op, zsetopval *val) {
if (op->subject == NULL)
return 0;
if (val->flags & OPVAL_DIRTY_ROBJ)
decrRefCount(val->ele);
memset(val,0,sizeof(zsetopval));
if (op->type == REDIS_SET) {
iterset *it = &op->iter.set;
if (op->encoding == REDIS_ENCODING_INTSET) {
int64_t ell;
if (!intsetGet(it->is.is,it->is.ii,&ell))
return 0;
val->ell = ell;
val->score = 1.0;
/* Move to next element. */
it->is.ii++;
} else if (op->encoding == REDIS_ENCODING_HT) {
if (it->ht.de == NULL)
return 0;
val->ele = dictGetKey(it->ht.de);
val->score = 1.0;
/* Move to next element. */
it->ht.de = dictNext(it->ht.di);
} else {
redisPanic("Unknown set encoding");
}
} else if (op->type == REDIS_ZSET) {
iterzset *it = &op->iter.zset;
if (op->encoding == REDIS_ENCODING_ZIPLIST) {
/* No need to check both, but better be explicit. */
if (it->zl.eptr == NULL || it->zl.sptr == NULL)
return 0;
redisAssert(ziplistGet(it->zl.eptr,&val->estr,&val->elen,&val->ell));
val->score = zzlGetScore(it->zl.sptr);
/* Move to next element. */
zzlNext(it->zl.zl,&it->zl.eptr,&it->zl.sptr);
} else if (op->encoding == REDIS_ENCODING_SKIPLIST) {
if (it->sl.node == NULL)
return 0;
val->ele = it->sl.node->obj;
val->score = it->sl.node->score;
/* Move to next element. */
it->sl.node = it->sl.node->level[0].forward;
} else {
redisPanic("Unknown sorted set encoding");
}
} else {
redisPanic("Unsupported type");
}
return 1;
}
int zuiLongLongFromValue(zsetopval *val) {
if (!(val->flags & OPVAL_DIRTY_LL)) {
val->flags |= OPVAL_DIRTY_LL;
if (val->ele != NULL) {
if (val->ele->encoding == REDIS_ENCODING_INT) {
val->ell = (long)val->ele->ptr;
val->flags |= OPVAL_VALID_LL;
} else if (sdsEncodedObject(val->ele)) {
if (string2ll(val->ele->ptr,sdslen(val->ele->ptr),&val->ell))
val->flags |= OPVAL_VALID_LL;
} else {
redisPanic("Unsupported element encoding");
}
} else if (val->estr != NULL) {
if (string2ll((char*)val->estr,val->elen,&val->ell))
val->flags |= OPVAL_VALID_LL;
} else {
/* The long long was already set, flag as valid. */
val->flags |= OPVAL_VALID_LL;
}
}
return val->flags & OPVAL_VALID_LL;
}
robj *zuiObjectFromValue(zsetopval *val) {
if (val->ele == NULL) {
if (val->estr != NULL) {
val->ele = createStringObject((char*)val->estr,val->elen);
} else {
val->ele = createStringObjectFromLongLong(val->ell);
}
val->flags |= OPVAL_DIRTY_ROBJ;
}
return val->ele;
}
int zuiBufferFromValue(zsetopval *val) {
if (val->estr == NULL) {
if (val->ele != NULL) {
if (val->ele->encoding == REDIS_ENCODING_INT) {
val->elen = ll2string((char*)val->_buf,sizeof(val->_buf),(long)val->ele->ptr);
val->estr = val->_buf;
} else if (sdsEncodedObject(val->ele)) {
val->elen = sdslen(val->ele->ptr);
val->estr = val->ele->ptr;
} else {
redisPanic("Unsupported element encoding");
}
} else {
val->elen = ll2string((char*)val->_buf,sizeof(val->_buf),val->ell);
val->estr = val->_buf;
}
}
return 1;
}
/* Find value pointed to by val in the source pointer to by op. When found,
* return 1 and store its score in target. Return 0 otherwise. */
int zuiFind(zsetopsrc *op, zsetopval *val, double *score) {
if (op->subject == NULL)
return 0;
if (op->type == REDIS_SET) {
if (op->encoding == REDIS_ENCODING_INTSET) {
if (zuiLongLongFromValue(val) &&
intsetFind(op->subject->ptr,val->ell))
{
*score = 1.0;
return 1;
} else {
return 0;
}
} else if (op->encoding == REDIS_ENCODING_HT) {
dict *ht = op->subject->ptr;
zuiObjectFromValue(val);
if (dictFind(ht,val->ele) != NULL) {
*score = 1.0;
return 1;
} else {
return 0;
}
} else {
redisPanic("Unknown set encoding");
}
} else if (op->type == REDIS_ZSET) {
zuiObjectFromValue(val);
if (op->encoding == REDIS_ENCODING_ZIPLIST) {
if (zzlFind(op->subject->ptr,val->ele,score) != NULL) {
/* Score is already set by zzlFind. */
return 1;
} else {
return 0;
}
} else if (op->encoding == REDIS_ENCODING_SKIPLIST) {
zset *zs = op->subject->ptr;
dictEntry *de;
if ((de = dictFind(zs->dict,val->ele)) != NULL) {
*score = *(double*)dictGetVal(de);
return 1;
} else {
return 0;
}
} else {
redisPanic("Unknown sorted set encoding");
}
} else {
redisPanic("Unsupported type");
}
}
int zuiCompareByCardinality(const void *s1, const void *s2) {
return zuiLength((zsetopsrc*)s1) - zuiLength((zsetopsrc*)s2);
}
#define REDIS_AGGR_SUM 1
#define REDIS_AGGR_MIN 2
#define REDIS_AGGR_MAX 3
#define zunionInterDictValue(_e) (dictGetVal(_e) == NULL ? 1.0 : *(double*)dictGetVal(_e))
inline static void zunionInterAggregate(double *target, double val, int aggregate) {
if (aggregate == REDIS_AGGR_SUM) {
*target = *target + val;
/* The result of adding two doubles is NaN when one variable
* is +inf and the other is -inf. When these numbers are added,
* we maintain the convention of the result being 0.0. */
if (isnan(*target)) *target = 0.0;
} else if (aggregate == REDIS_AGGR_MIN) {
*target = val < *target ? val : *target;
} else if (aggregate == REDIS_AGGR_MAX) {
*target = val > *target ? val : *target;
} else {
/* safety net */
redisPanic("Unknown ZUNION/INTER aggregate type");
}
}
void zunionInterGenericCommand(redisClient *c, robj *dstkey, int op) {
int i, j;
long setnum;
int aggregate = REDIS_AGGR_SUM;
zsetopsrc *src;
zsetopval zval;
robj *tmp;
unsigned int maxelelen = 0;
robj *dstobj;
zset *dstzset;
zskiplistNode *znode;
int touched = 0;
/* expect setnum input keys to be given */
if ((getLongFromObjectOrReply(c, c->argv[2], &setnum, NULL) != REDIS_OK))
return;
if (setnum < 1) {
addReplyError(c,
"at least 1 input key is needed for ZUNIONSTORE/ZINTERSTORE");
return;
}
/* test if the expected number of keys would overflow */
if (setnum > c->argc-3) {
addReply(c,shared.syntaxerr);
return;
}
/* read keys to be used for input */
src = zcalloc(sizeof(zsetopsrc) * setnum);
for (i = 0, j = 3; i < setnum; i++, j++) {
robj *obj = lookupKeyWrite(c->db,c->argv[j]);
if (obj != NULL) {
if (obj->type != REDIS_ZSET && obj->type != REDIS_SET) {
zfree(src);
addReply(c,shared.wrongtypeerr);
return;
}
src[i].subject = obj;
src[i].type = obj->type;
src[i].encoding = obj->encoding;
} else {
src[i].subject = NULL;
}
/* Default all weights to 1. */
src[i].weight = 1.0;
}
/* parse optional extra arguments */
if (j < c->argc) {
int remaining = c->argc - j;
while (remaining) {
if (remaining >= (setnum + 1) && !strcasecmp(c->argv[j]->ptr,"weights")) {
j++; remaining--;
for (i = 0; i < setnum; i++, j++, remaining--) {
if (getDoubleFromObjectOrReply(c,c->argv[j],&src[i].weight,
"weight value is not a float") != REDIS_OK)
{
zfree(src);
return;
}
}
} else if (remaining >= 2 && !strcasecmp(c->argv[j]->ptr,"aggregate")) {
j++; remaining--;
if (!strcasecmp(c->argv[j]->ptr,"sum")) {
aggregate = REDIS_AGGR_SUM;
} else if (!strcasecmp(c->argv[j]->ptr,"min")) {
aggregate = REDIS_AGGR_MIN;
} else if (!strcasecmp(c->argv[j]->ptr,"max")) {
aggregate = REDIS_AGGR_MAX;
} else {
zfree(src);
addReply(c,shared.syntaxerr);
return;
}
j++; remaining--;
} else {
zfree(src);
addReply(c,shared.syntaxerr);
return;
}
}
}
/* sort sets from the smallest to largest, this will improve our
* algorithm's performance */
qsort(src,setnum,sizeof(zsetopsrc),zuiCompareByCardinality);
dstobj = createZsetObject();
dstzset = dstobj->ptr;
memset(&zval, 0, sizeof(zval));
if (op == REDIS_OP_INTER) {
/* Skip everything if the smallest input is empty. */
if (zuiLength(&src[0]) > 0) {
/* Precondition: as src[0] is non-empty and the inputs are ordered
* by size, all src[i > 0] are non-empty too. */
zuiInitIterator(&src[0]);
while (zuiNext(&src[0],&zval)) {
double score, value;
score = src[0].weight * zval.score;
if (isnan(score)) score = 0;
for (j = 1; j < setnum; j++) {
/* It is not safe to access the zset we are
* iterating, so explicitly check for equal object. */
if (src[j].subject == src[0].subject) {
value = zval.score*src[j].weight;
zunionInterAggregate(&score,value,aggregate);
} else if (zuiFind(&src[j],&zval,&value)) {
value *= src[j].weight;
zunionInterAggregate(&score,value,aggregate);
} else {
break;
}
}
/* Only continue when present in every input. */
if (j == setnum) {
tmp = zuiObjectFromValue(&zval);
znode = zslInsert(dstzset->zsl,score,tmp);
incrRefCount(tmp); /* added to skiplist */
dictAdd(dstzset->dict,tmp,&znode->score);
incrRefCount(tmp); /* added to dictionary */
if (sdsEncodedObject(tmp)) {
if (sdslen(tmp->ptr) > maxelelen)
maxelelen = sdslen(tmp->ptr);
}
}
}
zuiClearIterator(&src[0]);
}
} else if (op == REDIS_OP_UNION) {
for (i = 0; i < setnum; i++) {
if (zuiLength(&src[i]) == 0)
continue;
zuiInitIterator(&src[i]);
while (zuiNext(&src[i],&zval)) {
double score, value;
/* Skip an element that when already processed */
if (dictFind(dstzset->dict,zuiObjectFromValue(&zval)) != NULL)
continue;
/* Initialize score */
score = src[i].weight * zval.score;
if (isnan(score)) score = 0;
/* We need to check only next sets to see if this element
* exists, since we process every element just one time so
* it can't exist in a previous set (otherwise it would be
* already processed). */
for (j = (i+1); j < setnum; j++) {
/* It is not safe to access the zset we are
* iterating, so explicitly check for equal object. */
if(src[j].subject == src[i].subject) {
value = zval.score*src[j].weight;
zunionInterAggregate(&score,value,aggregate);
} else if (zuiFind(&src[j],&zval,&value)) {
value *= src[j].weight;
zunionInterAggregate(&score,value,aggregate);
}
}
tmp = zuiObjectFromValue(&zval);
znode = zslInsert(dstzset->zsl,score,tmp);
incrRefCount(zval.ele); /* added to skiplist */
dictAdd(dstzset->dict,tmp,&znode->score);
incrRefCount(zval.ele); /* added to dictionary */
if (sdsEncodedObject(tmp)) {
if (sdslen(tmp->ptr) > maxelelen)
maxelelen = sdslen(tmp->ptr);
}
}
zuiClearIterator(&src[i]);
}
} else {
redisPanic("Unknown operator");
}
if (dbDelete(c->db,dstkey)) {
signalModifiedKey(c->db,dstkey);
touched = 1;
server.dirty++;
}
if (dstzset->zsl->length) {
/* Convert to ziplist when in limits. */
if (dstzset->zsl->length <= server.zset_max_ziplist_entries &&
maxelelen <= server.zset_max_ziplist_value)
zsetConvert(dstobj,REDIS_ENCODING_ZIPLIST);
dbAdd(c->db,dstkey,dstobj);
addReplyLongLong(c,zsetLength(dstobj));
if (!touched) signalModifiedKey(c->db,dstkey);
notifyKeyspaceEvent(REDIS_NOTIFY_ZSET,
(op == REDIS_OP_UNION) ? "zunionstore" : "zinterstore",
dstkey,c->db->id);
server.dirty++;
} else {
decrRefCount(dstobj);
addReply(c,shared.czero);
if (touched)
notifyKeyspaceEvent(REDIS_NOTIFY_GENERIC,"del",dstkey,c->db->id);
}
zfree(src);
}
void zunionstoreCommand(redisClient *c) {
zunionInterGenericCommand(c,c->argv[1], REDIS_OP_UNION);
}
void zinterstoreCommand(redisClient *c) {
zunionInterGenericCommand(c,c->argv[1], REDIS_OP_INTER);
}
void zrangeGenericCommand(redisClient *c, int reverse) {
robj *key = c->argv[1];
robj *zobj;
int withscores = 0;
long start;
long end;
int llen;
int rangelen;
if ((getLongFromObjectOrReply(c, c->argv[2], &start, NULL) != REDIS_OK) ||
(getLongFromObjectOrReply(c, c->argv[3], &end, NULL) != REDIS_OK)) return;
if (c->argc == 5 && !strcasecmp(c->argv[4]->ptr,"withscores")) {
withscores = 1;
} else if (c->argc >= 5) {
addReply(c,shared.syntaxerr);
return;
}
if ((zobj = lookupKeyReadOrReply(c,key,shared.emptymultibulk)) == NULL
|| checkType(c,zobj,REDIS_ZSET)) return;
/* Sanitize indexes. */
llen = zsetLength(zobj);
if (start < 0) start = llen+start;
if (end < 0) end = llen+end;
if (start < 0) start = 0;
/* Invariant: start >= 0, so this test will be true when end < 0.
* The range is empty when start > end or start >= length. */
if (start > end || start >= llen) {
addReply(c,shared.emptymultibulk);
return;
}
if (end >= llen) end = llen-1;
rangelen = (end-start)+1;
/* Return the result in form of a multi-bulk reply */
addReplyMultiBulkLen(c, withscores ? (rangelen*2) : rangelen);
if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
unsigned char *zl = zobj->ptr;
unsigned char *eptr, *sptr;
unsigned char *vstr;
unsigned int vlen;
long long vlong;
if (reverse)
eptr = ziplistIndex(zl,-2-(2*start));
else
eptr = ziplistIndex(zl,2*start);
redisAssertWithInfo(c,zobj,eptr != NULL);
sptr = ziplistNext(zl,eptr);
while (rangelen--) {
redisAssertWithInfo(c,zobj,eptr != NULL && sptr != NULL);
redisAssertWithInfo(c,zobj,ziplistGet(eptr,&vstr,&vlen,&vlong));
if (vstr == NULL)
addReplyBulkLongLong(c,vlong);
else
addReplyBulkCBuffer(c,vstr,vlen);
if (withscores)
addReplyDouble(c,zzlGetScore(sptr));
if (reverse)
zzlPrev(zl,&eptr,&sptr);
else
zzlNext(zl,&eptr,&sptr);
}
} else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) {
zset *zs = zobj->ptr;
zskiplist *zsl = zs->zsl;
zskiplistNode *ln;
robj *ele;
/* Check if starting point is trivial, before doing log(N) lookup. */
if (reverse) {
ln = zsl->tail;
if (start > 0)
ln = zslGetElementByRank(zsl,llen-start);
} else {
ln = zsl->header->level[0].forward;
if (start > 0)
ln = zslGetElementByRank(zsl,start+1);
}
while(rangelen--) {
redisAssertWithInfo(c,zobj,ln != NULL);
ele = ln->obj;
addReplyBulk(c,ele);
if (withscores)
addReplyDouble(c,ln->score);
ln = reverse ? ln->backward : ln->level[0].forward;
}
} else {
redisPanic("Unknown sorted set encoding");
}
}
void zrangeCommand(redisClient *c) {
zrangeGenericCommand(c,0);
}
void zrevrangeCommand(redisClient *c) {
zrangeGenericCommand(c,1);
}
/* This command implements ZRANGEBYSCORE, ZREVRANGEBYSCORE. */
void genericZrangebyscoreCommand(redisClient *c, int reverse) {
zrangespec range;
robj *key = c->argv[1];
robj *zobj;
long offset = 0, limit = -1;
int withscores = 0;
unsigned long rangelen = 0;
void *replylen = NULL;
int minidx, maxidx;
/* Parse the range arguments. */
if (reverse) {
/* Range is given as [max,min] */
maxidx = 2; minidx = 3;
} else {
/* Range is given as [min,max] */
minidx = 2; maxidx = 3;
}
if (zslParseRange(c->argv[minidx],c->argv[maxidx],&range) != REDIS_OK) {
addReplyError(c,"min or max is not a float");
return;
}
/* Parse optional extra arguments. Note that ZCOUNT will exactly have
* 4 arguments, so we'll never enter the following code path. */
if (c->argc > 4) {
int remaining = c->argc - 4;
int pos = 4;
while (remaining) {
if (remaining >= 1 && !strcasecmp(c->argv[pos]->ptr,"withscores")) {
pos++; remaining--;
withscores = 1;
} else if (remaining >= 3 && !strcasecmp(c->argv[pos]->ptr,"limit")) {
if ((getLongFromObjectOrReply(c, c->argv[pos+1], &offset, NULL) != REDIS_OK) ||
(getLongFromObjectOrReply(c, c->argv[pos+2], &limit, NULL) != REDIS_OK)) return;
pos += 3; remaining -= 3;
} else {
addReply(c,shared.syntaxerr);
return;
}
}
}
/* Ok, lookup the key and get the range */
if ((zobj = lookupKeyReadOrReply(c,key,shared.emptymultibulk)) == NULL ||
checkType(c,zobj,REDIS_ZSET)) return;
if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
unsigned char *zl = zobj->ptr;
unsigned char *eptr, *sptr;
unsigned char *vstr;
unsigned int vlen;
long long vlong;
double score;
/* If reversed, get the last node in range as starting point. */
if (reverse) {
eptr = zzlLastInRange(zl,range);
} else {
eptr = zzlFirstInRange(zl,range);
}
/* No "first" element in the specified interval. */
if (eptr == NULL) {
addReply(c, shared.emptymultibulk);
return;
}
/* Get score pointer for the first element. */
redisAssertWithInfo(c,zobj,eptr != NULL);
sptr = ziplistNext(zl,eptr);
/* We don't know in advance how many matching elements there are in the
* list, so we push this object that will represent the multi-bulk
* length in the output buffer, and will "fix" it later */
replylen = addDeferredMultiBulkLength(c);
/* If there is an offset, just traverse the number of elements without
* checking the score because that is done in the next loop. */
while (eptr && offset--) {
if (reverse) {
zzlPrev(zl,&eptr,&sptr);
} else {
zzlNext(zl,&eptr,&sptr);
}
}
while (eptr && limit--) {
score = zzlGetScore(sptr);
/* Abort when the node is no longer in range. */
if (reverse) {
if (!zslValueGteMin(score,&range)) break;
} else {
if (!zslValueLteMax(score,&range)) break;
}
/* We know the element exists, so ziplistGet should always succeed */
redisAssertWithInfo(c,zobj,ziplistGet(eptr,&vstr,&vlen,&vlong));
rangelen++;
if (vstr == NULL) {
addReplyBulkLongLong(c,vlong);
} else {
addReplyBulkCBuffer(c,vstr,vlen);
}
if (withscores) {
addReplyDouble(c,score);
}
/* Move to next node */
if (reverse) {
zzlPrev(zl,&eptr,&sptr);
} else {
zzlNext(zl,&eptr,&sptr);
}
}
} else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) {
zset *zs = zobj->ptr;
zskiplist *zsl = zs->zsl;
zskiplistNode *ln;
/* If reversed, get the last node in range as starting point. */
if (reverse) {
ln = zslLastInRange(zsl,range);
} else {
ln = zslFirstInRange(zsl,range);
}
/* No "first" element in the specified interval. */
if (ln == NULL) {
addReply(c, shared.emptymultibulk);
return;
}
/* We don't know in advance how many matching elements there are in the
* list, so we push this object that will represent the multi-bulk
* length in the output buffer, and will "fix" it later */
replylen = addDeferredMultiBulkLength(c);
/* If there is an offset, just traverse the number of elements without
* checking the score because that is done in the next loop. */
while (ln && offset--) {
if (reverse) {
ln = ln->backward;
} else {
ln = ln->level[0].forward;
}
}
while (ln && limit--) {
/* Abort when the node is no longer in range. */
if (reverse) {
if (!zslValueGteMin(ln->score,&range)) break;
} else {
if (!zslValueLteMax(ln->score,&range)) break;
}
rangelen++;
addReplyBulk(c,ln->obj);
if (withscores) {
addReplyDouble(c,ln->score);
}
/* Move to next node */
if (reverse) {
ln = ln->backward;
} else {
ln = ln->level[0].forward;
}
}
} else {
redisPanic("Unknown sorted set encoding");
}
if (withscores) {
rangelen *= 2;
}
setDeferredMultiBulkLength(c, replylen, rangelen);
}
void zrangebyscoreCommand(redisClient *c) {
genericZrangebyscoreCommand(c,0);
}
void zrevrangebyscoreCommand(redisClient *c) {
genericZrangebyscoreCommand(c,1);
}
void zcountCommand(redisClient *c) {
robj *key = c->argv[1];
robj *zobj;
zrangespec range;
int count = 0;
/* Parse the range arguments */
if (zslParseRange(c->argv[2],c->argv[3],&range) != REDIS_OK) {
addReplyError(c,"min or max is not a float");
return;
}
/* Lookup the sorted set */
if ((zobj = lookupKeyReadOrReply(c, key, shared.czero)) == NULL ||
checkType(c, zobj, REDIS_ZSET)) return;
if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
unsigned char *zl = zobj->ptr;
unsigned char *eptr, *sptr;
double score;
/* Use the first element in range as the starting point */
eptr = zzlFirstInRange(zl,range);
/* No "first" element */
if (eptr == NULL) {
addReply(c, shared.czero);
return;
}
/* First element is in range */
sptr = ziplistNext(zl,eptr);
score = zzlGetScore(sptr);
redisAssertWithInfo(c,zobj,zslValueLteMax(score,&range));
/* Iterate over elements in range */
while (eptr) {
score = zzlGetScore(sptr);
/* Abort when the node is no longer in range. */
if (!zslValueLteMax(score,&range)) {
break;
} else {
count++;
zzlNext(zl,&eptr,&sptr);
}
}
} else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) {
zset *zs = zobj->ptr;
zskiplist *zsl = zs->zsl;
zskiplistNode *zn;
unsigned long rank;
/* Find first element in range */
zn = zslFirstInRange(zsl, range);
/* Use rank of first element, if any, to determine preliminary count */
if (zn != NULL) {
rank = zslGetRank(zsl, zn->score, zn->obj);
count = (zsl->length - (rank - 1));
/* Find last element in range */
zn = zslLastInRange(zsl, range);
/* Use rank of last element, if any, to determine the actual count */
if (zn != NULL) {
rank = zslGetRank(zsl, zn->score, zn->obj);
count -= (zsl->length - rank);
}
}
} else {
redisPanic("Unknown sorted set encoding");
}
addReplyLongLong(c, count);
}
void zcardCommand(redisClient *c) {
robj *key = c->argv[1];
robj *zobj;
if ((zobj = lookupKeyReadOrReply(c,key,shared.czero)) == NULL ||
checkType(c,zobj,REDIS_ZSET)) return;
addReplyLongLong(c,zsetLength(zobj));
}
void zscoreCommand(redisClient *c) {
robj *key = c->argv[1];
robj *zobj;
double score;
if ((zobj = lookupKeyReadOrReply(c,key,shared.nullbulk)) == NULL ||
checkType(c,zobj,REDIS_ZSET)) return;
if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
if (zzlFind(zobj->ptr,c->argv[2],&score) != NULL)
addReplyDouble(c,score);
else
addReply(c,shared.nullbulk);
} else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) {
zset *zs = zobj->ptr;
dictEntry *de;
c->argv[2] = tryObjectEncoding(c->argv[2]);
de = dictFind(zs->dict,c->argv[2]);
if (de != NULL) {
score = *(double*)dictGetVal(de);
addReplyDouble(c,score);
} else {
addReply(c,shared.nullbulk);
}
} else {
redisPanic("Unknown sorted set encoding");
}
}
void zrankGenericCommand(redisClient *c, int reverse) {
robj *key = c->argv[1];
robj *ele = c->argv[2];
robj *zobj;
unsigned long llen;
unsigned long rank;
if ((zobj = lookupKeyReadOrReply(c,key,shared.nullbulk)) == NULL ||
checkType(c,zobj,REDIS_ZSET)) return;
llen = zsetLength(zobj);
redisAssertWithInfo(c,ele,sdsEncodedObject(ele));
if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
unsigned char *zl = zobj->ptr;
unsigned char *eptr, *sptr;
eptr = ziplistIndex(zl,0);
redisAssertWithInfo(c,zobj,eptr != NULL);
sptr = ziplistNext(zl,eptr);
redisAssertWithInfo(c,zobj,sptr != NULL);
rank = 1;
while(eptr != NULL) {
if (ziplistCompare(eptr,ele->ptr,sdslen(ele->ptr)))
break;
rank++;
zzlNext(zl,&eptr,&sptr);
}
if (eptr != NULL) {
if (reverse)
addReplyLongLong(c,llen-rank);
else
addReplyLongLong(c,rank-1);
} else {
addReply(c,shared.nullbulk);
}
} else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) {
zset *zs = zobj->ptr;
zskiplist *zsl = zs->zsl;
dictEntry *de;
double score;
ele = c->argv[2] = tryObjectEncoding(c->argv[2]);
de = dictFind(zs->dict,ele);
if (de != NULL) {
score = *(double*)dictGetVal(de);
rank = zslGetRank(zsl,score,ele);
redisAssertWithInfo(c,ele,rank); /* Existing elements always have a rank. */
if (reverse)
addReplyLongLong(c,llen-rank);
else
addReplyLongLong(c,rank-1);
} else {
addReply(c,shared.nullbulk);
}
} else {
redisPanic("Unknown sorted set encoding");
}
}
void zrankCommand(redisClient *c) {
zrankGenericCommand(c, 0);
}
void zrevrankCommand(redisClient *c) {
zrankGenericCommand(c, 1);
}
void zscanCommand(redisClient *c) {
robj *o;
unsigned long cursor;
if (parseScanCursorOrReply(c,c->argv[2],&cursor) == REDIS_ERR) return;
if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.emptyscan)) == NULL ||
checkType(c,o,REDIS_ZSET)) return;
scanGenericCommand(c,o,cursor);
}