From 6f926c3e83dd4f0997e5b73bb25b4aa6bb658df2 Mon Sep 17 00:00:00 2001 From: antirez Date: Fri, 15 Apr 2016 12:46:18 +0200 Subject: [PATCH] ZRANK refactored into proper API. --- src/server.h | 1 + src/t_zset.c | 121 ++++++++++++++++++++++++++++++--------------------- 2 files changed, 72 insertions(+), 50 deletions(-) diff --git a/src/server.h b/src/server.h index 4ac7ce93b..f91f77461 100644 --- a/src/server.h +++ b/src/server.h @@ -1331,6 +1331,7 @@ void zsetConvertToZiplistIfNeeded(robj *zobj, size_t maxelelen); int zsetScore(robj *zobj, sds member, double *score); unsigned long zslGetRank(zskiplist *zsl, double score, sds o); int zsetAdd(robj *zobj, double score, sds ele, int *flags, double *newscore); +long zsetRank(robj *zobj, sds ele, int reverse); /* Core functions */ int freeMemoryIfNeeded(void); diff --git a/src/t_zset.c b/src/t_zset.c index aab586516..93a6df9ee 100644 --- a/src/t_zset.c +++ b/src/t_zset.c @@ -1370,6 +1370,72 @@ int zsetAdd(robj *zobj, double score, sds ele, int *flags, double *newscore) { return 0; /* Never reached. */ } +/* Given a sorted set object returns the 0-based rank of the object or + * -1 if the object does not exist. + * + * For rank we mean the position of the element in the sorted collection + * of elements. So the first element has rank 0, the second rank 1, and so + * forth up to length-1 elements. + * + * If 'reverse' is false, the rank is returned considering as first element + * the one with the lowest score. Otherwise if 'reverse' is non-zero + * the rank is computed considering as element with rank 0 the one with + * the highest score. */ +long zsetRank(robj *zobj, sds ele, int reverse) { + unsigned long llen; + unsigned long rank; + + llen = zsetLength(zobj); + + if (zobj->encoding == OBJ_ENCODING_ZIPLIST) { + unsigned char *zl = zobj->ptr; + unsigned char *eptr, *sptr; + + eptr = ziplistIndex(zl,0); + serverAssert(eptr != NULL); + sptr = ziplistNext(zl,eptr); + serverAssert(sptr != NULL); + + rank = 1; + while(eptr != NULL) { + if (ziplistCompare(eptr,(unsigned char*)ele,sdslen(ele))) + break; + rank++; + zzlNext(zl,&eptr,&sptr); + } + + if (eptr != NULL) { + if (reverse) + return llen-rank; + else + return rank-1; + } else { + return -1; + } + } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) { + zset *zs = zobj->ptr; + zskiplist *zsl = zs->zsl; + dictEntry *de; + double score; + + de = dictFind(zs->dict,ele); + if (de != NULL) { + score = *(double*)dictGetVal(de); + rank = zslGetRank(zsl,score,ele); + /* Existing elements always have a rank. */ + serverAssert(rank != 0); + if (reverse) + return llen-rank; + else + return rank-1; + } else { + return -1; + } + } else { + serverPanic("Unknown sorted set encoding"); + } +} + /*----------------------------------------------------------------------------- * Sorted set commands *----------------------------------------------------------------------------*/ @@ -2972,62 +3038,17 @@ void zrankGenericCommand(client *c, int reverse) { robj *key = c->argv[1]; robj *ele = c->argv[2]; robj *zobj; - unsigned long llen; - unsigned long rank; + long rank; if ((zobj = lookupKeyReadOrReply(c,key,shared.nullbulk)) == NULL || checkType(c,zobj,OBJ_ZSET)) return; - llen = zsetLength(zobj); serverAssertWithInfo(c,ele,sdsEncodedObject(ele)); - - if (zobj->encoding == OBJ_ENCODING_ZIPLIST) { - unsigned char *zl = zobj->ptr; - unsigned char *eptr, *sptr; - - eptr = ziplistIndex(zl,0); - serverAssertWithInfo(c,zobj,eptr != NULL); - sptr = ziplistNext(zl,eptr); - serverAssertWithInfo(c,zobj,sptr != NULL); - - rank = 1; - while(eptr != NULL) { - if (ziplistCompare(eptr,ele->ptr,sdslen(ele->ptr))) - break; - rank++; - zzlNext(zl,&eptr,&sptr); - } - - if (eptr != NULL) { - if (reverse) - addReplyLongLong(c,llen-rank); - else - addReplyLongLong(c,rank-1); - } else { - addReply(c,shared.nullbulk); - } - } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) { - zset *zs = zobj->ptr; - zskiplist *zsl = zs->zsl; - dictEntry *de; - double score; - - ele = c->argv[2]; - de = dictFind(zs->dict,ele->ptr); - if (de != NULL) { - score = *(double*)dictGetVal(de); - rank = zslGetRank(zsl,score,ele->ptr); - /* Existing elements always have a rank. */ - serverAssertWithInfo(c,ele,rank); - if (reverse) - addReplyLongLong(c,llen-rank); - else - addReplyLongLong(c,rank-1); - } else { - addReply(c,shared.nullbulk); - } + rank = zsetRank(zobj,ele->ptr,reverse); + if (rank >= 0) { + addReplyLongLong(c,rank); } else { - serverPanic("Unknown sorted set encoding"); + addReply(c,shared.nullbulk); } }