diff --git a/src/blocked.c b/src/blocked.c index 1d59ee16a..61fd9fa87 100644 --- a/src/blocked.c +++ b/src/blocked.c @@ -135,7 +135,9 @@ void processUnblockedClients(void) { /* Unblock a client calling the right function depending on the kind * of operation the client is blocking for. */ void unblockClient(client *c) { - if (c->btype == BLOCKED_LIST || c->btype == BLOCKED_STREAM) { + if (c->btype == BLOCKED_LIST || + c->btype == BLOCKED_ZSET || + c->btype == BLOCKED_STREAM) { unblockClientWaitingData(c); } else if (c->btype == BLOCKED_WAIT) { unblockClientWaitingReplicas(c); @@ -162,7 +164,9 @@ void unblockClient(client *c) { * send it a reply of some kind. After this function is called, * unblockClient() will be called with the same client as argument. */ void replyToBlockedClientTimedOut(client *c) { - if (c->btype == BLOCKED_LIST || c->btype == BLOCKED_STREAM) { + if (c->btype == BLOCKED_LIST || + c->btype == BLOCKED_ZSET || + c->btype == BLOCKED_STREAM) { addReply(c,shared.nullmultibulk); } else if (c->btype == BLOCKED_WAIT) { addReplyLongLong(c,replicationCountAcksByOffset(c->bpop.reploffset)); @@ -244,7 +248,7 @@ void handleClientsBlockedOnKeys(void) { client *receiver = clientnode->value; if (receiver->btype != BLOCKED_LIST) { - /* Put on the tail, so that at the next call + /* Put at the tail, so that at the next call * we'll not run into it again. */ listDelNode(clients,clientnode); listAddNodeTail(clients,receiver); @@ -289,6 +293,43 @@ void handleClientsBlockedOnKeys(void) { * when an element was pushed on the list. */ } + /* Serve clients blocked on sorted set key. */ + else if (o != NULL && o->type == OBJ_ZSET) { + dictEntry *de; + + /* We serve clients in the same order they blocked for + * this key, from the first blocked to the last. */ + de = dictFind(rl->db->blocking_keys,rl->key); + if (de) { + list *clients = dictGetVal(de); + int numclients = listLength(clients); + + while(numclients--) { + listNode *clientnode = listFirst(clients); + client *receiver = clientnode->value; + + if (receiver->btype != BLOCKED_ZSET) { + /* Put at the tail, so that at the next call + * we'll not run into it again. */ + listDelNode(clients,clientnode); + listAddNodeTail(clients,receiver); + continue; + } + + int reverse = (receiver->lastcmd && + receiver->lastcmd->proc == bzpopCommand) ? + 0 : 1; + unblockClient(receiver); + genericZpopCommand(receiver,&rl->key,1,reverse); + + propagate(reverse ? + server.zrevpopCommand : server.zpopCommand, + receiver->db->id,receiver->argv,receiver->argc, + PROPAGATE_AOF|PROPAGATE_REPL); + } + } + } + /* Serve clients blocked on stream key. */ else if (o != NULL && o->type == OBJ_STREAM) { dictEntry *de = dictFind(rl->db->blocking_keys,rl->key); @@ -371,8 +412,9 @@ void handleClientsBlockedOnKeys(void) { } } -/* This is how the current blocking lists/streams work, we use BLPOP as - * example, but the concept is the same for other list ops and XREAD. +/* This is how the current blocking lists/sorted sets/streams work, we use + * BLPOP as example, but the concept is the same for other list ops, sorted + * sets and XREAD. * - If the user calls BLPOP and the key exists and contains a non empty list * then LPOP is called instead. So BLPOP is semantically the same as LPOP * if blocking is not required. @@ -389,14 +431,14 @@ void handleClientsBlockedOnKeys(void) { * to the number of elements we have in the ready list. */ -/* Set a client in blocking mode for the specified key (list or stream), with - * the specified timeout. The 'type' argument is BLOCKED_LIST or BLOCKED_STREAM - * depending on the kind of operation we are waiting for an empty key in - * order to awake the client. The client is blocked for all the 'numkeys' - * keys as in the 'keys' argument. When we block for stream keys, we also - * provide an array of streamID structures: clients will be unblocked only - * when items with an ID greater or equal to the specified one is appended - * to the stream. */ +/* Set a client in blocking mode for the specified key (list, zset or stream), + * with the specified timeout. The 'type' argument is BLOCKED_LIST, + * BLOCKED_ZSET or BLOCKED_STREAM depending on the kind of operation we are + * waiting for an empty key in order to awake the client. The client is blocked + * for all the 'numkeys' keys as in the 'keys' argument. When we block for + * stream keys, we also provide an array of streamID structures: clients will + * be unblocked only when items with an ID greater or equal to the specified + * one is appended to the stream. */ void blockForKeys(client *c, int btype, robj **keys, int numkeys, mstime_t timeout, robj *target, streamID *ids) { dictEntry *de; list *l; @@ -409,7 +451,7 @@ void blockForKeys(client *c, int btype, robj **keys, int numkeys, mstime_t timeo for (j = 0; j < numkeys; j++) { /* The value associated with the key name in the bpop.keys dictionary - * is NULL for lists, or the stream ID for streams. */ + * is NULL for lists and sorted sets, or the stream ID for streams. */ void *key_data = NULL; if (btype == BLOCKED_STREAM) { key_data = zmalloc(sizeof(streamID)); diff --git a/src/db.c b/src/db.c index 5f733e2d8..6c039e129 100644 --- a/src/db.c +++ b/src/db.c @@ -169,7 +169,9 @@ void dbAdd(redisDb *db, robj *key, robj *val) { int retval = dictAdd(db->dict, copy, val); serverAssertWithInfo(NULL,key,retval == DICT_OK); - if (val->type == OBJ_LIST) signalKeyAsReady(db, key); + if (val->type == OBJ_LIST || + val->type == OBJ_ZSET) + signalKeyAsReady(db, key); if (server.cluster_enabled) slotToKeyAdd(key); } diff --git a/src/server.c b/src/server.c index 404429beb..22317be71 100644 --- a/src/server.c +++ b/src/server.c @@ -198,6 +198,10 @@ struct redisCommand redisCommandTable[] = { {"zrank",zrankCommand,3,"rF",0,NULL,1,1,1,0,0}, {"zrevrank",zrevrankCommand,3,"rF",0,NULL,1,1,1,0,0}, {"zscan",zscanCommand,-3,"rR",0,NULL,1,1,1,0,0}, + {"zpop",zpopCommand,-2,"wF",0,NULL,1,-1,1,0,0}, + {"zrevpop",zrevpopCommand,-2,"wF",0,NULL,1,-1,1,0,0}, + {"bzpop",bzpopCommand,-2,"wsF",0,NULL,1,-2,1,0,0}, + {"bzrevpop",bzrevpopCommand,-2,"wsF",0,NULL,1,-2,1,0,0}, {"hset",hsetCommand,-4,"wmF",0,NULL,1,1,1,0,0}, {"hsetnx",hsetnxCommand,4,"wmF",0,NULL,1,1,1,0,0}, {"hget",hgetCommand,3,"rF",0,NULL,1,1,1,0,0}, @@ -1369,6 +1373,8 @@ void createSharedObjects(void) { shared.rpop = createStringObject("RPOP",4); shared.lpop = createStringObject("LPOP",4); shared.lpush = createStringObject("LPUSH",5); + shared.zpop = createStringObject("ZPOP",4); + shared.zrevpop = createStringObject("ZREVPOP",7); for (j = 0; j < OBJ_SHARED_INTEGERS; j++) { shared.integers[j] = makeObjectShared(createObject(OBJ_STRING,(void*)(long)j)); @@ -1562,6 +1568,8 @@ void initServerConfig(void) { server.lpushCommand = lookupCommandByCString("lpush"); server.lpopCommand = lookupCommandByCString("lpop"); server.rpopCommand = lookupCommandByCString("rpop"); + server.zpopCommand = lookupCommandByCString("zpop"); + server.zrevpopCommand = lookupCommandByCString("zrevpop"); server.sremCommand = lookupCommandByCString("srem"); server.execCommand = lookupCommandByCString("exec"); server.expireCommand = lookupCommandByCString("expire"); diff --git a/src/server.h b/src/server.h index 172e99c8a..a8039dde0 100644 --- a/src/server.h +++ b/src/server.h @@ -258,7 +258,8 @@ typedef long long mstime_t; /* millisecond time type. */ #define BLOCKED_WAIT 2 /* WAIT for synchronous replication. */ #define BLOCKED_MODULE 3 /* Blocked by a loadable module. */ #define BLOCKED_STREAM 4 /* XREAD. */ -#define BLOCKED_NUM 5 /* Number of blocked states. */ +#define BLOCKED_ZSET 5 /* BZPOP et al. */ +#define BLOCKED_NUM 6 /* Number of blocked states. */ /* Client request types */ #define PROTO_REQ_INLINE 1 @@ -646,7 +647,7 @@ typedef struct blockingState { mstime_t timeout; /* Blocking operation timeout. If UNIX current time * is > timeout then the operation timed out. */ - /* BLOCKED_LIST and BLOCKED_STREAM */ + /* BLOCKED_LIST, BLOCKED_ZSET and BLOCKED_STREAM */ dict *keys; /* The keys we are waiting to terminate a blocking * operation such as BLPOP or XREAD. Or NULL. */ robj *target; /* The key that should receive the element, @@ -762,7 +763,7 @@ struct sharedObjectsStruct { *masterdownerr, *roslaveerr, *execaborterr, *noautherr, *noreplicaserr, *busykeyerr, *oomerr, *plus, *messagebulk, *pmessagebulk, *subscribebulk, *unsubscribebulk, *psubscribebulk, *punsubscribebulk, *del, *unlink, - *rpop, *lpop, *lpush, *emptyscan, + *rpop, *lpop, *lpush, *zpop, *zrevpop, *emptyscan, *select[PROTO_SHARED_SELECT_CMDS], *integers[OBJ_SHARED_INTEGERS], *mbulkhdr[OBJ_SHARED_BULKHDR_LEN], /* "*\r\n" */ @@ -960,8 +961,8 @@ struct redisServer { off_t loading_process_events_interval_bytes; /* Fast pointers to often looked up command */ struct redisCommand *delCommand, *multiCommand, *lpushCommand, *lpopCommand, - *rpopCommand, *sremCommand, *execCommand, - *expireCommand, *pexpireCommand, *xclaimCommand; + *rpopCommand, *zpopCommand, *zrevpopCommand, *sremCommand, + *execCommand, *expireCommand, *pexpireCommand, *xclaimCommand; /* Fields used only for stats */ time_t stat_starttime; /* Server start time */ long long stat_numcommands; /* Number of processed commands */ @@ -1628,6 +1629,7 @@ unsigned long zslGetRank(zskiplist *zsl, double score, sds o); int zsetAdd(robj *zobj, double score, sds ele, int *flags, double *newscore); long zsetRank(robj *zobj, sds ele, int reverse); int zsetDel(robj *zobj, sds ele); +void genericZpopCommand(client *c, robj **keyv, int keyc, int reverse); sds ziplistGetObject(unsigned char *sptr); int zslValueGteMin(double value, zrangespec *spec); int zslValueLteMax(double value, zrangespec *spec); @@ -1968,6 +1970,10 @@ void zremCommand(client *c); void zscoreCommand(client *c); void zremrangebyscoreCommand(client *c); void zremrangebylexCommand(client *c); +void zpopCommand(client *c); +void zrevpopCommand(client *c); +void bzpopCommand(client *c); +void bzrevpopCommand(client *c); void multiCommand(client *c); void execCommand(client *c); void discardCommand(client *c); diff --git a/src/t_zset.c b/src/t_zset.c index f7f4c6eb2..2b11c7d37 100644 --- a/src/t_zset.c +++ b/src/t_zset.c @@ -3068,3 +3068,147 @@ void zscanCommand(client *c) { checkType(c,o,OBJ_ZSET)) return; scanGenericCommand(c,o,cursor); } + +/* This command implements the generic zpop operation, used by: + * ZPOP, ZREVPOP, BZPOP and BZREVPOP */ +void genericZpopCommand(client *c, robj **keyv, int keyc, int reverse) { + int idx; + robj *key; + robj *zobj; + sds ele; + double score; + char *events[2] = {"zpop","zrevpop"}; + + // Check type and break on the first error, otherwise identify candidate + idx = 0; + while (idx < keyc) { + key = keyv[idx++]; + zobj = lookupKeyWrite(c->db,key); + if (!zobj) continue; + if (checkType(c,zobj,OBJ_ZSET)) return; + break; + } + + // No candidate for zpopping, return empty + if (!zobj) { + addReply(c,shared.emptymultibulk); + return; + } + + if (zobj->encoding == OBJ_ENCODING_ZIPLIST) { + unsigned char *zl = zobj->ptr; + unsigned char *eptr, *sptr; + unsigned char *vstr; + unsigned int vlen; + long long vlong; + + // Get the first or last element in the sorted set + eptr = ziplistIndex(zl,reverse ? -2 : 0); + serverAssertWithInfo(c,zobj,eptr != NULL); + + // There must be an element in the sorted set + serverAssertWithInfo(c,zobj,eptr != NULL); + serverAssertWithInfo(c,zobj,ziplistGet(eptr,&vstr,&vlen,&vlong)); + if (vstr == NULL) + ele = sdsfromlonglong(vlong); + else + ele = sdsnewlen(vstr,vlen); + + // Get the score + sptr = ziplistNext(zl,eptr); + serverAssertWithInfo(c,zobj,sptr != NULL); + score = zzlGetScore(sptr); + } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) { + zset *zs = zobj->ptr; + zskiplist *zsl = zs->zsl; + zskiplistNode *ln; + + // Get the first or last element in the sorted set + ln = (reverse ? zsl->tail : zsl->header->level[0].forward); + + // There must be an element in the sorted set + serverAssertWithInfo(c,zobj,ln != NULL); + ele = sdsdup(ln->ele); + score = ln->score; + } else { + serverPanic("Unknown sorted set encoding"); + } + + // Remove the element + serverAssertWithInfo(c,zobj,zsetDel(zobj,ele)); + server.dirty++; + signalModifiedKey(c->db,key); + notifyKeyspaceEvent(NOTIFY_ZSET,events[reverse],key,c->db->id); + + // Remove the key, if indeed needed + if (zsetLength(zobj) == 0) { + dbDelete(c->db,key); + notifyKeyspaceEvent(NOTIFY_GENERIC,"del",key,c->db->id); + } + + addReplyMultiBulkLen(c,3); + addReplyBulk(c,key); + addReplyDouble(c,score); + addReplyBulkCBuffer(c,ele,sdslen(ele)); + sdsfree(ele); +} + +// ZPOP key [key ...] +void zpopCommand(client *c) { + genericZpopCommand(c,&c->argv[1],c->argc-1,0); +} + +// ZREVPOP key [key ...] +void zrevpopCommand(client *c) { + genericZpopCommand(c,&c->argv[1],c->argc-1,1); +} + +/* Blocking Z[REV]POP */ +void blockingGenericZpopCommand(client *c, int reverse) { + robj *o; + mstime_t timeout; + int j; + + if (getTimeoutFromObjectOrReply(c,c->argv[c->argc-1],&timeout,UNIT_SECONDS) + != C_OK) return; + + for (j = 1; j < c->argc-1; j++) { + o = lookupKeyWrite(c->db,c->argv[j]); + if (o != NULL) { + if (o->type != OBJ_ZSET) { + addReply(c,shared.wrongtypeerr); + return; + } else { + if (zsetLength(o) != 0) { + /* Non empty zset, this is like a normal Z[REV]POP. */ + genericZpopCommand(c,&c->argv[j],1,reverse); + /* Replicate it as an Z[REV]POP instead of BZ[REV]POP. */ + rewriteClientCommandVector(c,2, + reverse ? shared.zrevpop : shared.zpop, + c->argv[j]); + return; + } + } + } + } + + /* If we are inside a MULTI/EXEC and the zset is empty the only thing + * we can do is treating it as a timeout (even with timeout 0). */ + if (c->flags & CLIENT_MULTI) { + addReply(c,shared.nullmultibulk); + return; + } + + /* If the keys do not exist we must block */ + blockForKeys(c,BLOCKED_ZSET,c->argv + 1,c->argc - 2,timeout,NULL,NULL); +} + +// BZPOP key [key ...] timeout +void bzpopCommand(client *c) { + blockingGenericZpopCommand(c,0); +} + +// BZREVPOP key [key ...] timeout +void bzrevpopCommand(client *c) { + blockingGenericZpopCommand(c,1); +} diff --git a/tests/unit/type/zset.tcl b/tests/unit/type/zset.tcl index 564825ae9..797045797 100644 --- a/tests/unit/type/zset.tcl +++ b/tests/unit/type/zset.tcl @@ -648,6 +648,79 @@ start_server {tags {"zset"}} { } } } + + test "Basic Z\[REV\]POP with a single key - $encoding" { + r del zset + assert_equal {} [r zpop zset] + create_zset zset {-1 a 1 b 2 c 3 d 4 e} + assert_equal {zset -1 a} [r zpop zset] + assert_equal {zset 1 b} [r zpop zset] + assert_equal {zset 4 e} [r zrevpop zset] + assert_equal {zset 3 d} [r zrevpop zset] + assert_equal {zset 2 c} [r zpop zset] + assert_equal 0 [r exists zset] + r set foo bar + assert_error "*WRONGTYPE*" {r zpop foo} + } + + test "Z\[REV\]POP with multiple keys - $encoding" { + r del z1 z2 z3 foo + r set foo bar + assert_equal {} [r zpop z1 z2 z3] + assert_error "*WRONGTYPE*" {r zpop z1 foo} + create_zset z1 {0 a 1 b 2 c} + assert_equal {z1 0 a} [r zpop z1 z2 z3] + assert_equal {z1 1 b} [r zpop z3 z2 z1] + create_zset z3 {0 a 1 b 2 c} + assert_equal {z3 2 c} [r zrevpop z3 z2 z1] + assert_equal 1 [r exists z1] + assert_equal 1 [r exists z3] + } + + test "BZ\[REV\]POP with a single existing sorted set - $encoding" { + set rd [redis_deferring_client] + create_zset zset {0 a 1 b 2 c} + + $rd bzpop zset 5 + assert_equal {zset 0 a} [$rd read] + $rd bzpop zset 5 + assert_equal {zset 1 b} [$rd read] + $rd bzrevpop zset 5 + assert_equal {zset 2 c} [$rd read] + assert_equal 0 [r exists zset] + } + + test "BZ\[REV\]POP with multiple existing sorted sets - $encoding" { + set rd [redis_deferring_client] + create_zset z1 {0 a 1 b 2 c} + create_zset z2 {3 d 4 e 5 f} + + $rd bzpop z1 z2 5 + assert_equal {z1 0 a} [$rd read] + $rd bzrevpop z1 z2 5 + assert_equal {z1 2 c} [$rd read] + assert_equal 1 [r zcard z1] + assert_equal 3 [r zcard z2] + + $rd bzrevpop z2 z1 5 + assert_equal {z2 5 f} [$rd read] + $rd bzpop z2 z1 5 + assert_equal {z2 3 d} [$rd read] + assert_equal 1 [r zcard z1] + assert_equal 1 [r zcard z2] + } + + test "BZ\[REV\]POP second sorted set has members - $encoding" { + set rd [redis_deferring_client] + r del z1 + create_zset z2 {3 d 4 e 5 f} + $rd bzrevpop z1 z2 5 + assert_equal {z2 5 f} [$rd read] + $rd bzpop z2 z1 5 + assert_equal {z2 3 d} [$rd read] + assert_equal 0 [r zcard z1] + assert_equal 1 [r zcard z2] + } } basics ziplist @@ -1025,6 +1098,91 @@ start_server {tags {"zset"}} { } assert_equal {} $err } + + test "BZPOP, ZADD + DEL should not awake blocked client" { + set rd [redis_deferring_client] + r del zset + + $rd bzpop zset 0 + r multi + r zadd zset 0 foo + r del zset + r exec + r del zset + r zadd zset 1 bar + $rd read + } {zset 1 bar} + + test "BZPOP, ZADD + DEL + SET should not awake blocked client" { + set rd [redis_deferring_client] + r del list + + r del zset + + $rd bzpop zset 0 + r multi + r zadd zset 0 foo + r del zset + r set zset foo + r exec + r del zset + r zadd zset 1 bar + $rd read + } {zset 1 bar} + + test "BZPOP with same key multiple times should work" { + set rd [redis_deferring_client] + r del z1 z2 + + # Data arriving after the BZPOP. + $rd bzpop z1 z2 z2 z1 0 + r zadd z1 0 a + assert_equal [$rd read] {z1 0 a} + $rd bzpop z1 z2 z2 z1 0 + r zadd z2 1 b + assert_equal [$rd read] {z2 1 b} + + # Data already there. + r zadd z1 0 a + r zadd z2 1 b + $rd bzpop z1 z2 z2 z1 0 + assert_equal [$rd read] {z1 0 a} + $rd bzpop z1 z2 z2 z1 0 + assert_equal [$rd read] {z2 1 b} + } + + test "MULTI/EXEC is isolated from the point of view of BZPOP" { + set rd [redis_deferring_client] + r del zset + $rd bzpop zset 0 + r multi + r zadd zset 0 a + r zadd zset 1 b + r zadd zset 2 c + r exec + $rd read + } {zset 0 a} + + test "BZPOP with variadic ZADD" { + set rd [redis_deferring_client] + r del zset + if {$::valgrind} {after 100} + $rd bzpop zset 0 + if {$::valgrind} {after 100} + assert_equal 2 [r zadd zset -1 foo 1 bar] + if {$::valgrind} {after 100} + assert_equal {zset -1 foo} [$rd read] + assert_equal {bar} [r zrange zset 0 -1] + } + + test "BZPOP with zero timeout should block indefinitely" { + set rd [redis_deferring_client] + r del zset + $rd bzpop zset 0 + after 1000 + r zadd zset 0 foo + assert_equal {zset 0 foo} [$rd read] + } } tags {"slow"} {