diff --git a/src/blocked.c b/src/blocked.c index f9fd94ea1..f4b47bb82 100644 --- a/src/blocked.c +++ b/src/blocked.c @@ -331,7 +331,7 @@ void handleClientsBlockedOnKeys(void) { receiver->lastcmd->proc == bzpopminCommand) ? ZSET_MIN : ZSET_MAX; unblockClient(receiver); - genericZpopCommand(receiver,&rl->key,1,where); + genericZpopCommand(receiver,&rl->key,1,where,1,NULL); propagate(where == ZSET_MIN ? server.zpopminCommand : server.zpopmaxCommand, diff --git a/src/server.h b/src/server.h index b5675b476..d9c512c5e 100644 --- a/src/server.h +++ b/src/server.h @@ -1632,7 +1632,7 @@ unsigned long zslGetRank(zskiplist *zsl, double score, sds o); int zsetAdd(robj *zobj, double score, sds ele, int *flags, double *newscore); long zsetRank(robj *zobj, sds ele, int reverse); int zsetDel(robj *zobj, sds ele); -void genericZpopCommand(client *c, robj **keyv, int keyc, int reverse); +void genericZpopCommand(client *c, robj **keyv, int keyc, int where, int emitkey, robj *countarg); sds ziplistGetObject(unsigned char *sptr); int zslValueGteMin(double value, zrangespec *spec); int zslValueLteMax(double value, zrangespec *spec); diff --git a/src/t_zset.c b/src/t_zset.c index b58e58bc3..972412046 100644 --- a/src/t_zset.c +++ b/src/t_zset.c @@ -3070,13 +3070,28 @@ void zscanCommand(client *c) { } /* This command implements the generic zpop operation, used by: - * ZPOPMIN, ZPOPMAX, BZPOPMIN and BZPOPMAX */ -void genericZpopCommand(client *c, robj **keyv, int keyc, int where) { + * ZPOPMIN, ZPOPMAX, BZPOPMIN and BZPOPMAX. This function is also used + * inside blocked.c in the unblocking stage of BZPOPMIN and BZPOPMAX. + * + * If 'emitkey' is true also the key name is emitted, useful for the blocking + * behavior of BZPOP[MIN|MAX], since we can block into multiple keys. + * + * The synchronous version instead does not need to emit the key, but may + * use the 'count' argument to return multiple items if available. */ +void genericZpopCommand(client *c, robj **keyv, int keyc, int where, int emitkey, robj *countarg) { int idx; robj *key; robj *zobj; sds ele; double score; + long count = 1; + + /* If a count argument as passed, parse it or return an error. */ + if (countarg) { + if (getLongFromObjectOrReply(c,countarg,&count,NULL) != C_OK) + return; + if (count < 0) count = 1; + } /* Check type and break on the first error, otherwise identify candidate. */ idx = 0; @@ -3094,70 +3109,94 @@ void genericZpopCommand(client *c, robj **keyv, int keyc, int where) { return; } - if (zobj->encoding == OBJ_ENCODING_ZIPLIST) { - unsigned char *zl = zobj->ptr; - unsigned char *eptr, *sptr; - unsigned char *vstr; - unsigned int vlen; - long long vlong; + void *arraylen_ptr = addDeferredMultiBulkLength(c); + long arraylen = 0; - /* Get the first or last element in the sorted set. */ - eptr = ziplistIndex(zl,where == ZSET_MAX ? -2 : 0); - serverAssertWithInfo(c,zobj,eptr != NULL); - serverAssertWithInfo(c,zobj,ziplistGet(eptr,&vstr,&vlen,&vlong)); - if (vstr == NULL) - ele = sdsfromlonglong(vlong); - else - ele = sdsnewlen(vstr,vlen); + /* We emit the key only for the blocking variant. */ + if (emitkey) addReplyBulk(c,key); - /* Get the score. */ - sptr = ziplistNext(zl,eptr); - serverAssertWithInfo(c,zobj,sptr != NULL); - score = zzlGetScore(sptr); - } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) { - zset *zs = zobj->ptr; - zskiplist *zsl = zs->zsl; - zskiplistNode *zln; + /* Remove the element. */ + do { + if (zobj->encoding == OBJ_ENCODING_ZIPLIST) { + unsigned char *zl = zobj->ptr; + unsigned char *eptr, *sptr; + unsigned char *vstr; + unsigned int vlen; + long long vlong; - // Get the first or last element in the sorted set - zln = (where == ZSET_MAX ? zsl->tail : zsl->header->level[0].forward); + /* Get the first or last element in the sorted set. */ + eptr = ziplistIndex(zl,where == ZSET_MAX ? -2 : 0); + serverAssertWithInfo(c,zobj,eptr != NULL); + serverAssertWithInfo(c,zobj,ziplistGet(eptr,&vstr,&vlen,&vlong)); + if (vstr == NULL) + ele = sdsfromlonglong(vlong); + else + ele = sdsnewlen(vstr,vlen); - // There must be an element in the sorted set - serverAssertWithInfo(c,zobj,zln != NULL); - ele = sdsdup(zln->ele); - score = zln->score; - } else { - serverPanic("Unknown sorted set encoding"); - } + /* Get the score. */ + sptr = ziplistNext(zl,eptr); + serverAssertWithInfo(c,zobj,sptr != NULL); + score = zzlGetScore(sptr); + } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) { + zset *zs = zobj->ptr; + zskiplist *zsl = zs->zsl; + zskiplistNode *zln; - // Remove the element - serverAssertWithInfo(c,zobj,zsetDel(zobj,ele)); - server.dirty++; - signalModifiedKey(c->db,key); - char *events[2] = {"zpopmin","zpopmax"}; - notifyKeyspaceEvent(NOTIFY_ZSET,events[where],key,c->db->id); + /* Get the first or last element in the sorted set. */ + zln = (where == ZSET_MAX ? zsl->tail : + zsl->header->level[0].forward); - // Remove the key, if indeed needed - if (zsetLength(zobj) == 0) { - dbDelete(c->db,key); - notifyKeyspaceEvent(NOTIFY_GENERIC,"del",key,c->db->id); - } + /* There must be an element in the sorted set. */ + serverAssertWithInfo(c,zobj,zln != NULL); + ele = sdsdup(zln->ele); + score = zln->score; + } else { + serverPanic("Unknown sorted set encoding"); + } - addReplyMultiBulkLen(c,3); - addReplyBulk(c,key); - addReplyDouble(c,score); - addReplyBulkCBuffer(c,ele,sdslen(ele)); - sdsfree(ele); + serverAssertWithInfo(c,zobj,zsetDel(zobj,ele)); + server.dirty++; + + if (arraylen == 0) { /* Do this only for the first iteration. */ + char *events[2] = {"zpopmin","zpopmax"}; + notifyKeyspaceEvent(NOTIFY_ZSET,events[where],key,c->db->id); + signalModifiedKey(c->db,key); + } + + addReplyDouble(c,score); + addReplyBulkCBuffer(c,ele,sdslen(ele)); + sdsfree(ele); + arraylen += 2; + + /* Remove the key, if indeed needed. */ + if (zsetLength(zobj) == 0) { + dbDelete(c->db,key); + notifyKeyspaceEvent(NOTIFY_GENERIC,"del",key,c->db->id); + break; + } + } while(--count); + + setDeferredMultiBulkLength(c,arraylen_ptr,arraylen + (emitkey != 0)); } -// ZPOPMIN key [key ...] +/* ZPOPMIN key [] */ void zpopminCommand(client *c) { - genericZpopCommand(c,&c->argv[1],c->argc-1,ZSET_MIN); + if (c->argc > 3) { + addReply(c,shared.syntaxerr); + return; + } + genericZpopCommand(c,&c->argv[1],c->argc-1,ZSET_MIN,0, + c->argc == 3 ? c->argv[2] : NULL); } -// ZMAXPOP key [key ...] +/* ZMAXPOP key [] */ void zpopmaxCommand(client *c) { - genericZpopCommand(c,&c->argv[1],c->argc-1,ZSET_MAX); + if (c->argc > 3) { + addReply(c,shared.syntaxerr); + return; + } + genericZpopCommand(c,&c->argv[1],c->argc-1,ZSET_MAX,0, + c->argc == 3 ? c->argv[2] : NULL); } /* BZPOPMIN / BZPOPMAX actual implementation. */ @@ -3178,7 +3217,7 @@ void blockingGenericZpopCommand(client *c, int where) { } else { if (zsetLength(o) != 0) { /* Non empty zset, this is like a normal Z[REV]POP. */ - genericZpopCommand(c,&c->argv[j],1,where); + genericZpopCommand(c,&c->argv[j],1,where,1,NULL); /* Replicate it as an Z[REV]POP instead of BZ[REV]POP. */ rewriteClientCommandVector(c,2, where == ZSET_MAX ? shared.zpopmax : shared.zpopmin,