diff --git a/redis-cli.c b/redis-cli.c index 807f676d1..04ff947e3 100644 --- a/redis-cli.c +++ b/redis-cli.c @@ -101,6 +101,8 @@ static struct redisCommand cmdTable[] = { {"zincrby",4,REDIS_CMD_BULK}, {"zrem",3,REDIS_CMD_BULK}, {"zremrangebyscore",4,REDIS_CMD_INLINE}, + {"zmerge",-3,REDIS_CMD_INLINE}, + {"zmergeweighed",-4,REDIS_CMD_INLINE}, {"zrange",-4,REDIS_CMD_INLINE}, {"zrank",3,REDIS_CMD_BULK}, {"zrangebyscore",-4,REDIS_CMD_INLINE}, diff --git a/redis.c b/redis.c index f451f9169..e4db385d6 100644 --- a/redis.c +++ b/redis.c @@ -675,6 +675,8 @@ static void substrCommand(redisClient *c); static void zrankCommand(redisClient *c); static void hsetCommand(redisClient *c); static void hgetCommand(redisClient *c); +static void zmergeCommand(redisClient *c); +static void zmergeweighedCommand(redisClient *c); /*================================= Globals ================================= */ @@ -722,6 +724,8 @@ static struct redisCommand cmdTable[] = { {"zincrby",zincrbyCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM,1,1,1}, {"zrem",zremCommand,3,REDIS_CMD_BULK,1,1,1}, {"zremrangebyscore",zremrangebyscoreCommand,4,REDIS_CMD_INLINE,1,1,1}, + {"zmerge",zmergeCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM,2,-1,1}, + {"zmergeweighed",zmergeweighedCommand,-4,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM,2,-2,2}, {"zrange",zrangeCommand,-4,REDIS_CMD_INLINE,1,1,1}, {"zrangebyscore",zrangebyscoreCommand,-4,REDIS_CMD_INLINE,1,1,1}, {"zcount",zcountCommand,4,REDIS_CMD_INLINE,1,1,1}, @@ -5325,6 +5329,105 @@ static void zremrangebyscoreCommand(redisClient *c) { } } +/* This command merges 2 or more zsets to a destination. When an element + * does not exist in a certain set, score 0 is assumed. The score for an + * element across sets is summed. */ +static void zmergeGenericCommand(redisClient *c, int readweights) { + int i, j, zsetnum; + dict **srcdict; + double *weights; + robj *dstkey = c->argv[1], *dstobj; + zset *dst; + dictIterator *di; + dictEntry *de; + + zsetnum = c->argc-2; + if (readweights) { + /* force number of arguments to be even */ + if (zsetnum % 2 > 0) { + addReplySds(c,sdsnew("-ERR wrong number of arguments for ZMERGEWEIGHED\r\n")); + return; + } + zsetnum /= 2; + } + if (!zsetnum) { + addReply(c,shared.syntaxerr); + return; + } + + srcdict = zmalloc(sizeof(dict*) * zsetnum); + weights = zmalloc(sizeof(double) * zsetnum); + for (i = 0; i < zsetnum; i++) { + if (readweights) { + j = 2 + 2*i; + weights[i] = strtod(c->argv[j+1]->ptr, NULL); + } else { + j = 2 + i; + weights[i] = 1.0; + } + + robj *zsetobj = lookupKeyWrite(c->db,c->argv[j]); + if (!zsetobj) { + srcdict[i] = NULL; + } else { + if (zsetobj->type != REDIS_ZSET) { + zfree(srcdict); + zfree(weights); + addReply(c,shared.wrongtypeerr); + return; + } + srcdict[i] = ((zset*)zsetobj->ptr)->dict; + } + } + + dstobj = createZsetObject(); + dst = dstobj->ptr; + for (i = 0; i < zsetnum; i++) { + if (!srcdict[i]) continue; + + di = dictGetIterator(srcdict[i]); + while((de = dictNext(di)) != NULL) { + /* skip key when already processed */ + if (dictFind(dst->dict,dictGetEntryKey(de)) != NULL) continue; + + double *score = zmalloc(sizeof(double)); + *score = 0.0; + for (j = 0; j < zsetnum; j++) { + if (!srcdict[j]) continue; + + dictEntry *other = dictFind(srcdict[j],dictGetEntryKey(de)); + if (other) { + *score = *score + weights[j] * (*(double*)dictGetEntryVal(other)); + } + } + + robj *o = dictGetEntryKey(de); + dictAdd(dst->dict,o,score); + incrRefCount(o); /* added to dictionary */ + zslInsert(dst->zsl,*score,o); + incrRefCount(o); /* added to skiplist */ + } + dictReleaseIterator(di); + } + + deleteKey(c->db,dstkey); + dictAdd(c->db->dict,dstkey,dstobj); + incrRefCount(dstkey); + + addReplyLong(c, dst->zsl->length); + server.dirty++; + zfree(srcdict); + zfree(weights); +} + +static void zmergeCommand(redisClient *c) { + zmergeGenericCommand(c,0); +} + +static void zmergeweighedCommand(redisClient *c) { + zmergeGenericCommand(c,1); +} + static void zrangeGenericCommand(redisClient *c, int reverse) { robj *o; int start = atoi(c->argv[2]->ptr); diff --git a/test-redis.tcl b/test-redis.tcl index a6acea18a..9d458ed7b 100644 --- a/test-redis.tcl +++ b/test-redis.tcl @@ -1462,6 +1462,21 @@ proc main {server port} { list [$r zremrangebyscore zset -inf +inf] [$r zrange zset 0 -1] } {5 {}} + test {ZMERGE basics} { + $r del zseta zsetb zsetc + $r zadd zseta 1 a + $r zadd zseta 2 b + $r zadd zseta 3 c + $r zadd zsetb 1 b + $r zadd zsetb 2 c + $r zadd zsetb 3 d + list [$r zmerge zsetc zseta zsetb] [$r zrange zsetc 0 -1 withscores] + } {4 {a 1 b 3 d 3 c 5}} + + test {ZMERGEWEIGHED basics} { + list [$r zmergeweighed zsetc zseta 2 zsetb 3] [$r zrange zsetc 0 -1 withscores] + } {4 {a 2 b 7 d 9 c 12}} + test {SORT against sorted sets} { $r del zset $r zadd zset 1 a