ZPOP: change sync ZPOP to have a count argument instead of N keys.

Usually blocking operations make a lot of sense with multiple keys so
that we can listen to multiple queues (or whatever the app models) with
a single connection. However in the synchronous case it is more useful
to be able to ask for N elements. This is a change that I also wanted to
perform soon or later in the blocking list variant, but here it is more
natural since there is no reply type difference.
This commit is contained in:
antirez 2018-05-11 18:00:32 +02:00
parent 6efb6c1e06
commit 56bbab238a
3 changed files with 95 additions and 56 deletions

View File

@ -331,7 +331,7 @@ void handleClientsBlockedOnKeys(void) {
receiver->lastcmd->proc == bzpopminCommand) receiver->lastcmd->proc == bzpopminCommand)
? ZSET_MIN : ZSET_MAX; ? ZSET_MIN : ZSET_MAX;
unblockClient(receiver); unblockClient(receiver);
genericZpopCommand(receiver,&rl->key,1,where); genericZpopCommand(receiver,&rl->key,1,where,1,NULL);
propagate(where == ZSET_MIN ? propagate(where == ZSET_MIN ?
server.zpopminCommand : server.zpopmaxCommand, server.zpopminCommand : server.zpopmaxCommand,

View File

@ -1632,7 +1632,7 @@ unsigned long zslGetRank(zskiplist *zsl, double score, sds o);
int zsetAdd(robj *zobj, double score, sds ele, int *flags, double *newscore); int zsetAdd(robj *zobj, double score, sds ele, int *flags, double *newscore);
long zsetRank(robj *zobj, sds ele, int reverse); long zsetRank(robj *zobj, sds ele, int reverse);
int zsetDel(robj *zobj, sds ele); int zsetDel(robj *zobj, sds ele);
void genericZpopCommand(client *c, robj **keyv, int keyc, int reverse); void genericZpopCommand(client *c, robj **keyv, int keyc, int where, int emitkey, robj *countarg);
sds ziplistGetObject(unsigned char *sptr); sds ziplistGetObject(unsigned char *sptr);
int zslValueGteMin(double value, zrangespec *spec); int zslValueGteMin(double value, zrangespec *spec);
int zslValueLteMax(double value, zrangespec *spec); int zslValueLteMax(double value, zrangespec *spec);

View File

@ -3070,13 +3070,28 @@ void zscanCommand(client *c) {
} }
/* This command implements the generic zpop operation, used by: /* This command implements the generic zpop operation, used by:
* ZPOPMIN, ZPOPMAX, BZPOPMIN and BZPOPMAX */ * ZPOPMIN, ZPOPMAX, BZPOPMIN and BZPOPMAX. This function is also used
void genericZpopCommand(client *c, robj **keyv, int keyc, int where) { * inside blocked.c in the unblocking stage of BZPOPMIN and BZPOPMAX.
*
* If 'emitkey' is true also the key name is emitted, useful for the blocking
* behavior of BZPOP[MIN|MAX], since we can block into multiple keys.
*
* The synchronous version instead does not need to emit the key, but may
* use the 'count' argument to return multiple items if available. */
void genericZpopCommand(client *c, robj **keyv, int keyc, int where, int emitkey, robj *countarg) {
int idx; int idx;
robj *key; robj *key;
robj *zobj; robj *zobj;
sds ele; sds ele;
double score; double score;
long count = 1;
/* If a count argument as passed, parse it or return an error. */
if (countarg) {
if (getLongFromObjectOrReply(c,countarg,&count,NULL) != C_OK)
return;
if (count < 0) count = 1;
}
/* Check type and break on the first error, otherwise identify candidate. */ /* Check type and break on the first error, otherwise identify candidate. */
idx = 0; idx = 0;
@ -3094,70 +3109,94 @@ void genericZpopCommand(client *c, robj **keyv, int keyc, int where) {
return; return;
} }
if (zobj->encoding == OBJ_ENCODING_ZIPLIST) { void *arraylen_ptr = addDeferredMultiBulkLength(c);
unsigned char *zl = zobj->ptr; long arraylen = 0;
unsigned char *eptr, *sptr;
unsigned char *vstr;
unsigned int vlen;
long long vlong;
/* Get the first or last element in the sorted set. */ /* We emit the key only for the blocking variant. */
eptr = ziplistIndex(zl,where == ZSET_MAX ? -2 : 0); if (emitkey) addReplyBulk(c,key);
serverAssertWithInfo(c,zobj,eptr != NULL);
serverAssertWithInfo(c,zobj,ziplistGet(eptr,&vstr,&vlen,&vlong));
if (vstr == NULL)
ele = sdsfromlonglong(vlong);
else
ele = sdsnewlen(vstr,vlen);
/* Get the score. */ /* Remove the element. */
sptr = ziplistNext(zl,eptr); do {
serverAssertWithInfo(c,zobj,sptr != NULL); if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
score = zzlGetScore(sptr); unsigned char *zl = zobj->ptr;
} else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) { unsigned char *eptr, *sptr;
zset *zs = zobj->ptr; unsigned char *vstr;
zskiplist *zsl = zs->zsl; unsigned int vlen;
zskiplistNode *zln; long long vlong;
// Get the first or last element in the sorted set /* Get the first or last element in the sorted set. */
zln = (where == ZSET_MAX ? zsl->tail : zsl->header->level[0].forward); eptr = ziplistIndex(zl,where == ZSET_MAX ? -2 : 0);
serverAssertWithInfo(c,zobj,eptr != NULL);
serverAssertWithInfo(c,zobj,ziplistGet(eptr,&vstr,&vlen,&vlong));
if (vstr == NULL)
ele = sdsfromlonglong(vlong);
else
ele = sdsnewlen(vstr,vlen);
// There must be an element in the sorted set /* Get the score. */
serverAssertWithInfo(c,zobj,zln != NULL); sptr = ziplistNext(zl,eptr);
ele = sdsdup(zln->ele); serverAssertWithInfo(c,zobj,sptr != NULL);
score = zln->score; score = zzlGetScore(sptr);
} else { } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
serverPanic("Unknown sorted set encoding"); zset *zs = zobj->ptr;
} zskiplist *zsl = zs->zsl;
zskiplistNode *zln;
// Remove the element /* Get the first or last element in the sorted set. */
serverAssertWithInfo(c,zobj,zsetDel(zobj,ele)); zln = (where == ZSET_MAX ? zsl->tail :
server.dirty++; zsl->header->level[0].forward);
signalModifiedKey(c->db,key);
char *events[2] = {"zpopmin","zpopmax"};
notifyKeyspaceEvent(NOTIFY_ZSET,events[where],key,c->db->id);
// Remove the key, if indeed needed /* There must be an element in the sorted set. */
if (zsetLength(zobj) == 0) { serverAssertWithInfo(c,zobj,zln != NULL);
dbDelete(c->db,key); ele = sdsdup(zln->ele);
notifyKeyspaceEvent(NOTIFY_GENERIC,"del",key,c->db->id); score = zln->score;
} } else {
serverPanic("Unknown sorted set encoding");
}
addReplyMultiBulkLen(c,3); serverAssertWithInfo(c,zobj,zsetDel(zobj,ele));
addReplyBulk(c,key); server.dirty++;
addReplyDouble(c,score);
addReplyBulkCBuffer(c,ele,sdslen(ele)); if (arraylen == 0) { /* Do this only for the first iteration. */
sdsfree(ele); char *events[2] = {"zpopmin","zpopmax"};
notifyKeyspaceEvent(NOTIFY_ZSET,events[where],key,c->db->id);
signalModifiedKey(c->db,key);
}
addReplyDouble(c,score);
addReplyBulkCBuffer(c,ele,sdslen(ele));
sdsfree(ele);
arraylen += 2;
/* Remove the key, if indeed needed. */
if (zsetLength(zobj) == 0) {
dbDelete(c->db,key);
notifyKeyspaceEvent(NOTIFY_GENERIC,"del",key,c->db->id);
break;
}
} while(--count);
setDeferredMultiBulkLength(c,arraylen_ptr,arraylen + (emitkey != 0));
} }
// ZPOPMIN key [key ...] /* ZPOPMIN key [<count>] */
void zpopminCommand(client *c) { void zpopminCommand(client *c) {
genericZpopCommand(c,&c->argv[1],c->argc-1,ZSET_MIN); if (c->argc > 3) {
addReply(c,shared.syntaxerr);
return;
}
genericZpopCommand(c,&c->argv[1],c->argc-1,ZSET_MIN,0,
c->argc == 3 ? c->argv[2] : NULL);
} }
// ZMAXPOP key [key ...] /* ZMAXPOP key [<count>] */
void zpopmaxCommand(client *c) { void zpopmaxCommand(client *c) {
genericZpopCommand(c,&c->argv[1],c->argc-1,ZSET_MAX); if (c->argc > 3) {
addReply(c,shared.syntaxerr);
return;
}
genericZpopCommand(c,&c->argv[1],c->argc-1,ZSET_MAX,0,
c->argc == 3 ? c->argv[2] : NULL);
} }
/* BZPOPMIN / BZPOPMAX actual implementation. */ /* BZPOPMIN / BZPOPMAX actual implementation. */
@ -3178,7 +3217,7 @@ void blockingGenericZpopCommand(client *c, int where) {
} else { } else {
if (zsetLength(o) != 0) { if (zsetLength(o) != 0) {
/* Non empty zset, this is like a normal Z[REV]POP. */ /* Non empty zset, this is like a normal Z[REV]POP. */
genericZpopCommand(c,&c->argv[j],1,where); genericZpopCommand(c,&c->argv[j],1,where,1,NULL);
/* Replicate it as an Z[REV]POP instead of BZ[REV]POP. */ /* Replicate it as an Z[REV]POP instead of BZ[REV]POP. */
rewriteClientCommandVector(c,2, rewriteClientCommandVector(c,2,
where == ZSET_MAX ? shared.zpopmax : shared.zpopmin, where == ZSET_MAX ? shared.zpopmax : shared.zpopmin,