mirror of
https://codeberg.org/redict/redict.git
synced 2025-01-22 08:08:53 -05:00
Add ZMPOP/BZMPOP commands. (#9484)
This is similar to the recent addition of LMPOP/BLMPOP (#9373), but zset. Syntax for the new ZMPOP command: `ZMPOP numkeys [<key> ...] MIN|MAX [COUNT count]` Syntax for the new BZMPOP command: `BZMPOP timeout numkeys [<key> ...] MIN|MAX [COUNT count]` Some background: - ZPOPMIN/ZPOPMAX take only one key, and can return multiple elements. - BZPOPMIN/BZPOPMAX take multiple keys, but return only one element from just one key. - ZMPOP/BZMPOP can take multiple keys, and can return multiple elements from just one key. Note that ZMPOP/BZMPOP can take multiple keys, it eventually operates on just on key. And it will propagate as ZPOPMIN or ZPOPMAX with the COUNT option. As new commands, if we can not pop any elements, the response like: - ZMPOP: Return a NIL in both RESP2 and RESP3, unlike ZPOPMIN/ZPOPMAX return emptyarray. - BZMPOP: Return a NIL in both RESP2 and RESP3 when timeout is reached, like BZPOPMIN/BZPOPMAX. For the normal response is nested arrays in RESP2 and RESP3: ``` ZMPOP/BZMPOP 1) keyname 2) 1) 1) member1 2) score1 2) 1) member2 2) score2 In RESP2: 1) "myzset" 2) 1) 1) "three" 2) "3" 2) 1) "two" 2) "2" In RESP3: 1) "myzset" 2) 1) 1) "three" 2) (double) 3 2) 1) "two" 2) (double) 2 ```
This commit is contained in:
parent
59c9716f96
commit
14d6abd8e9
@ -285,8 +285,8 @@ void serveClientsBlockedOnListKey(robj *o, readyList *rl) {
|
||||
}
|
||||
|
||||
robj *dstkey = receiver->bpop.target;
|
||||
int wherefrom = receiver->bpop.listpos.wherefrom;
|
||||
int whereto = receiver->bpop.listpos.whereto;
|
||||
int wherefrom = receiver->bpop.blockpos.wherefrom;
|
||||
int whereto = receiver->bpop.blockpos.whereto;
|
||||
|
||||
/* Protect receiver->bpop.target, that will be
|
||||
* freed by the next unblockClient()
|
||||
@ -320,9 +320,9 @@ void serveClientsBlockedOnSortedSetKey(robj *o, readyList *rl) {
|
||||
if (de) {
|
||||
list *clients = dictGetVal(de);
|
||||
int numclients = listLength(clients);
|
||||
unsigned long zcard = zsetLength(o);
|
||||
int deleted = 0;
|
||||
|
||||
while(numclients-- && zcard) {
|
||||
while (numclients--) {
|
||||
listNode *clientnode = listFirst(clients);
|
||||
client *receiver = clientnode->value;
|
||||
|
||||
@ -333,23 +333,38 @@ void serveClientsBlockedOnSortedSetKey(robj *o, readyList *rl) {
|
||||
continue;
|
||||
}
|
||||
|
||||
int where = (receiver->lastcmd &&
|
||||
receiver->lastcmd->proc == bzpopminCommand)
|
||||
? ZSET_MIN : ZSET_MAX;
|
||||
long llen = zsetLength(o);
|
||||
long count = receiver->bpop.count;
|
||||
int where = receiver->bpop.blockpos.wherefrom;
|
||||
int use_nested_array = (receiver->lastcmd &&
|
||||
receiver->lastcmd->proc == bzmpopCommand)
|
||||
? 1 : 0;
|
||||
int reply_nil_when_empty = use_nested_array;
|
||||
|
||||
monotime replyTimer;
|
||||
elapsedStart(&replyTimer);
|
||||
genericZpopCommand(receiver,&rl->key,1,where,1,NULL);
|
||||
genericZpopCommand(receiver, &rl->key, 1, where, 1, count, use_nested_array, reply_nil_when_empty, &deleted);
|
||||
updateStatsOnUnblock(receiver, 0, elapsedUs(replyTimer));
|
||||
unblockClient(receiver);
|
||||
zcard--;
|
||||
|
||||
/* Replicate the command. */
|
||||
robj *argv[2];
|
||||
int argc = 2;
|
||||
robj *argv[3];
|
||||
argv[0] = where == ZSET_MIN ? shared.zpopmin : shared.zpopmax;
|
||||
argv[1] = rl->key;
|
||||
incrRefCount(rl->key);
|
||||
propagate(receiver->db->id,argv,2,PROPAGATE_AOF|PROPAGATE_REPL);
|
||||
if (count != 0) {
|
||||
/* Replicate it as command with COUNT. */
|
||||
robj *count_obj = createStringObjectFromLongLong((count > llen) ? llen : count);
|
||||
argv[2] = count_obj;
|
||||
argc++;
|
||||
}
|
||||
propagate(receiver->db->id, argv, argc, PROPAGATE_AOF|PROPAGATE_REPL);
|
||||
decrRefCount(argv[1]);
|
||||
if (count != 0) decrRefCount(argv[2]);
|
||||
|
||||
/* The zset is empty and has been deleted. */
|
||||
if (deleted) break;
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -613,7 +628,7 @@ void handleClientsBlockedOnKeys(void) {
|
||||
*
|
||||
* 'count' for those commands that support the optional count argument.
|
||||
* Otherwise the value is 0. */
|
||||
void blockForKeys(client *c, int btype, robj **keys, int numkeys, long count, mstime_t timeout, robj *target, struct listPos *listpos, streamID *ids) {
|
||||
void blockForKeys(client *c, int btype, robj **keys, int numkeys, long count, mstime_t timeout, robj *target, struct blockPos *blockpos, streamID *ids) {
|
||||
dictEntry *de;
|
||||
list *l;
|
||||
int j;
|
||||
@ -622,7 +637,7 @@ void blockForKeys(client *c, int btype, robj **keys, int numkeys, long count, ms
|
||||
c->bpop.timeout = timeout;
|
||||
c->bpop.target = target;
|
||||
|
||||
if (listpos != NULL) c->bpop.listpos = *listpos;
|
||||
if (blockpos != NULL) c->bpop.blockpos = *blockpos;
|
||||
|
||||
if (target != NULL) incrRefCount(target);
|
||||
|
||||
|
10
src/db.c
10
src/db.c
@ -1711,6 +1711,16 @@ int blmpopGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult
|
||||
return genericGetKeys(0, 2, 3, 1, argv, argc, result);
|
||||
}
|
||||
|
||||
int zmpopGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) {
|
||||
UNUSED(cmd);
|
||||
return genericGetKeys(0, 1, 2, 1, argv, argc, result);
|
||||
}
|
||||
|
||||
int bzmpopGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) {
|
||||
UNUSED(cmd);
|
||||
return genericGetKeys(0, 2, 3, 1, argv, argc, result);
|
||||
}
|
||||
|
||||
/* Helper function to extract keys from the SORT command.
|
||||
*
|
||||
* SORT <sort-key> ... STORE <store-key> ...
|
||||
|
14
src/server.c
14
src/server.c
@ -792,6 +792,13 @@ struct redisCommand redisCommandTable[] = {
|
||||
KSPEC_BS_INDEX,.bs.index={1},
|
||||
KSPEC_FK_RANGE,.fk.range={0,1,0}}}},
|
||||
|
||||
{"zmpop", zmpopCommand,-4,
|
||||
"write @sortedset",
|
||||
{{"write",
|
||||
KSPEC_BS_INDEX,.bs.index={1},
|
||||
KSPEC_FK_KEYNUM,.fk.keynum={0,1,1}}},
|
||||
zmpopGetKeys},
|
||||
|
||||
{"bzpopmin",bzpopminCommand,-3,
|
||||
"write no-script fast @sortedset @blocking",
|
||||
{{"write",
|
||||
@ -804,6 +811,13 @@ struct redisCommand redisCommandTable[] = {
|
||||
KSPEC_BS_INDEX,.bs.index={1},
|
||||
KSPEC_FK_RANGE,.fk.range={-2,1,0}}}},
|
||||
|
||||
{"bzmpop",bzmpopCommand,-5,
|
||||
"write @sortedset @blocking",
|
||||
{{"write",
|
||||
KSPEC_BS_INDEX,.bs.index={2},
|
||||
KSPEC_FK_KEYNUM,.fk.keynum={0,1,1}}},
|
||||
blmpopGetKeys},
|
||||
|
||||
{"zrandmember",zrandmemberCommand,-2,
|
||||
"read-only random @sortedset",
|
||||
{{"read",
|
||||
|
14
src/server.h
14
src/server.h
@ -812,12 +812,12 @@ typedef struct blockingState {
|
||||
* operation such as BLPOP or XREAD. Or NULL. */
|
||||
robj *target; /* The key that should receive the element,
|
||||
* for BLMOVE. */
|
||||
struct listPos {
|
||||
struct blockPos {
|
||||
int wherefrom; /* Where to pop from */
|
||||
int whereto; /* Where to push to */
|
||||
} listpos; /* The positions in the src/dst lists
|
||||
} blockpos; /* The positions in the src/dst lists/zsets
|
||||
* where we want to pop/push an element
|
||||
* for BLPOP, BRPOP and BLMOVE. */
|
||||
* for BLPOP, BRPOP, BLMOVE and BZMPOP. */
|
||||
|
||||
/* BLOCK_STREAM */
|
||||
size_t xread_count; /* XREAD COUNT option. */
|
||||
@ -2343,7 +2343,7 @@ int zsetAdd(robj *zobj, double score, sds ele, int in_flags, int *out_flags, dou
|
||||
long zsetRank(robj *zobj, sds ele, int reverse);
|
||||
int zsetDel(robj *zobj, sds ele);
|
||||
robj *zsetDup(robj *o);
|
||||
void genericZpopCommand(client *c, robj **keyv, int keyc, int where, int emitkey, robj *countarg);
|
||||
void genericZpopCommand(client *c, robj **keyv, int keyc, int where, int emitkey, long count, int use_nested_array, int reply_nil_when_empty, int *deleted);
|
||||
sds lpGetObject(unsigned char *sptr);
|
||||
int zslValueGteMin(double value, zrangespec *spec);
|
||||
int zslValueLteMax(double value, zrangespec *spec);
|
||||
@ -2557,6 +2557,8 @@ int memoryGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult
|
||||
int lcsGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result);
|
||||
int lmpopGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result);
|
||||
int blmpopGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result);
|
||||
int zmpopGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result);
|
||||
int bzmpopGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result);
|
||||
|
||||
unsigned short crc16(const char *buf, int len);
|
||||
|
||||
@ -2593,7 +2595,7 @@ int getTimeoutFromObjectOrReply(client *c, robj *object, mstime_t *timeout, int
|
||||
void disconnectAllBlockedClients(void);
|
||||
void handleClientsBlockedOnKeys(void);
|
||||
void signalKeyAsReady(redisDb *db, robj *key, int type);
|
||||
void blockForKeys(client *c, int btype, robj **keys, int numkeys, long count, mstime_t timeout, robj *target, struct listPos *listpos, streamID *ids);
|
||||
void blockForKeys(client *c, int btype, robj **keys, int numkeys, long count, mstime_t timeout, robj *target, struct blockPos *blockpos, streamID *ids);
|
||||
void updateStatsOnUnblock(client *c, long blocked_us, long reply_us);
|
||||
|
||||
/* timeout.c -- Blocked clients timeout and connections timeout. */
|
||||
@ -2751,8 +2753,10 @@ void zremrangebyscoreCommand(client *c);
|
||||
void zremrangebylexCommand(client *c);
|
||||
void zpopminCommand(client *c);
|
||||
void zpopmaxCommand(client *c);
|
||||
void zmpopCommand(client *c);
|
||||
void bzpopminCommand(client *c);
|
||||
void bzpopmaxCommand(client *c);
|
||||
void bzmpopCommand(client *c);
|
||||
void zrandmemberCommand(client *c);
|
||||
void multiCommand(client *c);
|
||||
void execCommand(client *c);
|
||||
|
@ -1104,7 +1104,7 @@ void blockingPopGenericCommand(client *c, robj **keys, int numkeys, int where, i
|
||||
}
|
||||
|
||||
/* If the keys do not exist we must block */
|
||||
struct listPos pos = {where};
|
||||
struct blockPos pos = {where};
|
||||
blockForKeys(c,BLOCKED_LIST,keys,numkeys,count,timeout,NULL,&pos,NULL);
|
||||
}
|
||||
|
||||
@ -1129,7 +1129,7 @@ void blmoveGenericCommand(client *c, int wherefrom, int whereto, mstime_t timeou
|
||||
addReplyNull(c);
|
||||
} else {
|
||||
/* The list is empty and the client blocks. */
|
||||
struct listPos pos = {wherefrom, whereto};
|
||||
struct blockPos pos = {wherefrom, whereto};
|
||||
blockForKeys(c,BLOCKED_LIST,c->argv + 1,1,0,timeout,c->argv[2],&pos,NULL);
|
||||
}
|
||||
} else {
|
||||
|
264
src/t_zset.c
264
src/t_zset.c
@ -3753,31 +3753,35 @@ void zscanCommand(client *c) {
|
||||
}
|
||||
|
||||
/* This command implements the generic zpop operation, used by:
|
||||
* ZPOPMIN, ZPOPMAX, BZPOPMIN and BZPOPMAX. This function is also used
|
||||
* inside blocked.c in the unblocking stage of BZPOPMIN and BZPOPMAX.
|
||||
* ZPOPMIN, ZPOPMAX, BZPOPMIN, BZPOPMAX and ZMPOP. This function is also used
|
||||
* inside blocked.c in the unblocking stage of BZPOPMIN, BZPOPMAX and BZMPOP.
|
||||
*
|
||||
* If 'emitkey' is true also the key name is emitted, useful for the blocking
|
||||
* behavior of BZPOP[MIN|MAX], since we can block into multiple keys.
|
||||
* Or in ZMPOP/BZMPOP, because we also can take multiple keys.
|
||||
*
|
||||
* The synchronous version instead does not need to emit the key, but may
|
||||
* use the 'count' argument to return multiple items if available. */
|
||||
void genericZpopCommand(client *c, robj **keyv, int keyc, int where, int emitkey, robj *countarg) {
|
||||
* 'count' is the number of elements requested to pop, or 0 for plain single pop.
|
||||
*
|
||||
* 'use_nested_array' when false it generates a flat array (with or without key name).
|
||||
* When true, it generates a nested 2 level array of field + score pairs, or 3 level when emitkey is set.
|
||||
*
|
||||
* 'reply_nil_when_empty' when true we reply a NIL if we are not able to pop up any elements.
|
||||
* Like in ZMPOP/BZMPOP we reply with a structured nested array containing key name
|
||||
* and member + score pairs. In these commands, we reply with null when we have no result.
|
||||
* Otherwise in ZPOPMIN/ZPOPMAX we reply an empty array by default.
|
||||
*
|
||||
* 'deleted' is an optional output argument to get an indication
|
||||
* if the key got deleted by this function.
|
||||
* */
|
||||
void genericZpopCommand(client *c, robj **keyv, int keyc, int where, int emitkey,
|
||||
long count, int use_nested_array, int reply_nil_when_empty, int *deleted) {
|
||||
int idx;
|
||||
robj *key = NULL;
|
||||
robj *zobj = NULL;
|
||||
sds ele;
|
||||
double score;
|
||||
long count = 1;
|
||||
|
||||
/* If a count argument as passed, parse it or return an error. */
|
||||
if (countarg) {
|
||||
if (getLongFromObjectOrReply(c,countarg,&count,NULL) != C_OK)
|
||||
return;
|
||||
if (count <= 0) {
|
||||
addReply(c,shared.emptyarray);
|
||||
return;
|
||||
}
|
||||
}
|
||||
if (deleted) *deleted = 0;
|
||||
|
||||
/* Check type and break on the first error, otherwise identify candidate. */
|
||||
idx = 0;
|
||||
@ -3791,20 +3795,38 @@ void genericZpopCommand(client *c, robj **keyv, int keyc, int where, int emitkey
|
||||
|
||||
/* No candidate for zpopping, return empty. */
|
||||
if (!zobj) {
|
||||
addReply(c,shared.emptyarray);
|
||||
if (reply_nil_when_empty) {
|
||||
addReplyNullArray(c);
|
||||
} else {
|
||||
addReply(c,shared.emptyarray);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
void *arraylen_ptr = addReplyDeferredLen(c);
|
||||
long result_count = 0;
|
||||
|
||||
/* We emit the key only for the blocking variant. */
|
||||
if (emitkey) addReplyBulk(c,key);
|
||||
/* When count is 0, we need to correct it to 1 for plain single pop. */
|
||||
if (count == 0) count = 1;
|
||||
|
||||
/* Respond with a single (flat) array in RESP2 or if countarg is not
|
||||
* provided (returning a single element). In RESP3, when countarg is
|
||||
* provided, use nested array. */
|
||||
int use_nested_array = c->resp > 2 && countarg != NULL;
|
||||
long llen = zsetLength(zobj);
|
||||
long rangelen = (count > llen) ? llen : count;
|
||||
|
||||
if (!use_nested_array && !emitkey) {
|
||||
/* ZPOPMIN/ZPOPMAX with or without COUNT option in RESP2. */
|
||||
addReplyArrayLen(c, rangelen * 2);
|
||||
} else if (use_nested_array && !emitkey) {
|
||||
/* ZPOPMIN/ZPOPMAX with COUNT option in RESP3. */
|
||||
addReplyArrayLen(c, rangelen);
|
||||
} else if (!use_nested_array && emitkey) {
|
||||
/* BZPOPMIN/BZPOPMAX in RESP2 and RESP3. */
|
||||
addReplyArrayLen(c, rangelen * 2 + 1);
|
||||
addReplyBulk(c, key);
|
||||
} else if (use_nested_array && emitkey) {
|
||||
/* ZMPOP/BZMPOP in RESP2 and RESP3. */
|
||||
addReplyArrayLen(c, 2);
|
||||
addReplyBulk(c, key);
|
||||
addReplyArrayLen(c, rangelen);
|
||||
}
|
||||
|
||||
/* Remove the element. */
|
||||
do {
|
||||
@ -3861,64 +3883,114 @@ void genericZpopCommand(client *c, robj **keyv, int keyc, int where, int emitkey
|
||||
addReplyDouble(c,score);
|
||||
sdsfree(ele);
|
||||
++result_count;
|
||||
} while(--rangelen);
|
||||
|
||||
/* Remove the key, if indeed needed. */
|
||||
if (zsetLength(zobj) == 0) {
|
||||
dbDelete(c->db,key);
|
||||
notifyKeyspaceEvent(NOTIFY_GENERIC,"del",key,c->db->id);
|
||||
break;
|
||||
}
|
||||
} while(--count);
|
||||
/* Remove the key, if indeed needed. */
|
||||
if (zsetLength(zobj) == 0) {
|
||||
if (deleted) *deleted = 1;
|
||||
|
||||
if (!use_nested_array) {
|
||||
result_count *= 2;
|
||||
dbDelete(c->db,key);
|
||||
notifyKeyspaceEvent(NOTIFY_GENERIC,"del",key,c->db->id);
|
||||
}
|
||||
setDeferredArrayLen(c,arraylen_ptr,result_count + (emitkey != 0));
|
||||
|
||||
if (c->cmd->proc == zmpopCommand) {
|
||||
/* Always replicate it as ZPOP[MIN|MAX] with COUNT option instead of ZMPOP. */
|
||||
robj *count_obj = createStringObjectFromLongLong((count > llen) ? llen : count);
|
||||
rewriteClientCommandVector(c, 3,
|
||||
(where == ZSET_MAX) ? shared.zpopmax : shared.zpopmin,
|
||||
key, count_obj);
|
||||
decrRefCount(count_obj);
|
||||
}
|
||||
}
|
||||
|
||||
/* ZPOPMIN/ZPOPMAX key [<count>] */
|
||||
void zpopMinMaxCommand(client *c, int where) {
|
||||
if (c->argc > 3) {
|
||||
addReplyErrorObject(c,shared.syntaxerr);
|
||||
return;
|
||||
}
|
||||
|
||||
long count = 0; /* 0 for plain single pop. */
|
||||
if (c->argc == 3) {
|
||||
if (getLongFromObjectOrReply(c, c->argv[2], &count, NULL) != C_OK)
|
||||
return;
|
||||
|
||||
if (count <= 0) {
|
||||
addReply(c, shared.emptyarray);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
/* Respond with a single (flat) array in RESP2 or if count is 0
|
||||
* (returning a single element). In RESP3, when count > 0 use nested array. */
|
||||
int use_nested_array = (c->resp > 2 && count != 0);
|
||||
|
||||
genericZpopCommand(c, &c->argv[1], 1, where, 0, count, use_nested_array, 0, NULL);
|
||||
}
|
||||
|
||||
/* ZPOPMIN key [<count>] */
|
||||
void zpopminCommand(client *c) {
|
||||
if (c->argc > 3) {
|
||||
addReplyErrorObject(c,shared.syntaxerr);
|
||||
return;
|
||||
}
|
||||
genericZpopCommand(c,&c->argv[1],1,ZSET_MIN,0,
|
||||
c->argc == 3 ? c->argv[2] : NULL);
|
||||
zpopMinMaxCommand(c, ZSET_MIN);
|
||||
}
|
||||
|
||||
/* ZMAXPOP key [<count>] */
|
||||
void zpopmaxCommand(client *c) {
|
||||
if (c->argc > 3) {
|
||||
addReplyErrorObject(c,shared.syntaxerr);
|
||||
return;
|
||||
}
|
||||
genericZpopCommand(c,&c->argv[1],1,ZSET_MAX,0,
|
||||
c->argc == 3 ? c->argv[2] : NULL);
|
||||
zpopMinMaxCommand(c, ZSET_MAX);
|
||||
}
|
||||
|
||||
/* BZPOPMIN / BZPOPMAX actual implementation. */
|
||||
void blockingGenericZpopCommand(client *c, int where) {
|
||||
/* BZPOPMIN, BZPOPMAX, BZMPOP actual implementation.
|
||||
*
|
||||
* 'numkeys' is the number of keys.
|
||||
*
|
||||
* 'timeout_idx' parameter position of block timeout.
|
||||
*
|
||||
* 'where' ZSET_MIN or ZSET_MAX.
|
||||
*
|
||||
* 'count' is the number of elements requested to pop, or 0 for plain single pop.
|
||||
*
|
||||
* 'use_nested_array' when false it generates a flat array (with or without key name).
|
||||
* When true, it generates a nested 3 level array of keyname, field + score pairs.
|
||||
* */
|
||||
void blockingGenericZpopCommand(client *c, robj **keys, int numkeys, int where,
|
||||
int timeout_idx, long count, int use_nested_array, int reply_nil_when_empty) {
|
||||
robj *o;
|
||||
robj *key;
|
||||
mstime_t timeout;
|
||||
int j;
|
||||
|
||||
if (getTimeoutFromObjectOrReply(c,c->argv[c->argc-1],&timeout,UNIT_SECONDS)
|
||||
if (getTimeoutFromObjectOrReply(c,c->argv[timeout_idx],&timeout,UNIT_SECONDS)
|
||||
!= C_OK) return;
|
||||
|
||||
for (j = 1; j < c->argc-1; j++) {
|
||||
o = lookupKeyWrite(c->db,c->argv[j]);
|
||||
for (j = 0; j < numkeys; j++) {
|
||||
key = keys[j];
|
||||
o = lookupKeyWrite(c->db,key);
|
||||
/* Non-existing key, move to next key. */
|
||||
if (o == NULL) continue;
|
||||
|
||||
if (checkType(c,o,OBJ_ZSET)) return;
|
||||
if (o != NULL) {
|
||||
if (zsetLength(o) != 0) {
|
||||
/* Non empty zset, this is like a normal ZPOP[MIN|MAX]. */
|
||||
genericZpopCommand(c,&c->argv[j],1,where,1,NULL);
|
||||
/* Replicate it as an ZPOP[MIN|MAX] instead of BZPOP[MIN|MAX]. */
|
||||
rewriteClientCommandVector(c,2,
|
||||
where == ZSET_MAX ? shared.zpopmax : shared.zpopmin,
|
||||
c->argv[j]);
|
||||
return;
|
||||
}
|
||||
|
||||
long llen = zsetLength(o);
|
||||
/* Empty zset, move to next key. */
|
||||
if (llen == 0) continue;
|
||||
|
||||
/* Non empty zset, this is like a normal ZPOP[MIN|MAX]. */
|
||||
genericZpopCommand(c, &key, 1, where, 1, count, use_nested_array, reply_nil_when_empty, NULL);
|
||||
|
||||
if (count == 0) {
|
||||
/* Replicate it as ZPOP[MIN|MAX] instead of BZPOP[MIN|MAX]. */
|
||||
rewriteClientCommandVector(c,2,
|
||||
(where == ZSET_MAX) ? shared.zpopmax : shared.zpopmin,
|
||||
key);
|
||||
} else {
|
||||
/* Replicate it as ZPOP[MIN|MAX] with COUNT option. */
|
||||
robj *count_obj = createStringObjectFromLongLong((count > llen) ? llen : count);
|
||||
rewriteClientCommandVector(c, 3,
|
||||
(where == ZSET_MAX) ? shared.zpopmax : shared.zpopmin,
|
||||
key, count_obj);
|
||||
decrRefCount(count_obj);
|
||||
}
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
/* If we are not allowed to block the client and the zset is empty the only thing
|
||||
@ -3929,17 +4001,18 @@ void blockingGenericZpopCommand(client *c, int where) {
|
||||
}
|
||||
|
||||
/* If the keys do not exist we must block */
|
||||
blockForKeys(c,BLOCKED_ZSET,c->argv + 1,c->argc - 2,0,timeout,NULL,NULL,NULL);
|
||||
struct blockPos pos = {where};
|
||||
blockForKeys(c,BLOCKED_ZSET,c->argv+1,c->argc-2,count,timeout,NULL,&pos,NULL);
|
||||
}
|
||||
|
||||
// BZPOPMIN key [key ...] timeout
|
||||
void bzpopminCommand(client *c) {
|
||||
blockingGenericZpopCommand(c,ZSET_MIN);
|
||||
blockingGenericZpopCommand(c, c->argv+1, c->argc-2, ZSET_MIN, c->argc-1, 0, 0, 0);
|
||||
}
|
||||
|
||||
// BZPOPMAX key [key ...] timeout
|
||||
void bzpopmaxCommand(client *c) {
|
||||
blockingGenericZpopCommand(c,ZSET_MAX);
|
||||
blockingGenericZpopCommand(c, c->argv+1, c->argc-2, ZSET_MAX, c->argc-1, 0, 0, 0);
|
||||
}
|
||||
|
||||
static void zarndmemberReplyWithListpack(client *c, unsigned int count, listpackEntry *keys, listpackEntry *vals) {
|
||||
@ -4189,3 +4262,68 @@ void zrandmemberCommand(client *c) {
|
||||
zsetTypeRandomElement(zset, zsetLength(zset), &ele,NULL);
|
||||
zsetReplyFromListpackEntry(c,&ele);
|
||||
}
|
||||
|
||||
/* ZMPOP/BZMPOP
|
||||
*
|
||||
* 'numkeys_idx' parameter position of key number.
|
||||
* 'is_block' this indicates whether it is a blocking variant. */
|
||||
void zmpopGenericCommand(client *c, int numkeys_idx, int is_block) {
|
||||
long j;
|
||||
long numkeys = 0; /* Number of keys. */
|
||||
int where = 0; /* ZSET_MIN or ZSET_MAX. */
|
||||
long count = 1; /* Reply will consist of up to count elements, depending on the zset's length. */
|
||||
|
||||
/* Parse the numkeys. */
|
||||
if (getRangeLongFromObjectOrReply(c, c->argv[numkeys_idx], 1, LONG_MAX,
|
||||
&numkeys, "numkeys should be greater than 0") != C_OK)
|
||||
return;
|
||||
|
||||
/* Parse the where. where_idx: the index of where in the c->argv. */
|
||||
long where_idx = numkeys_idx + numkeys + 1;
|
||||
if (where_idx >= c->argc) {
|
||||
addReplyErrorObject(c, shared.syntaxerr);
|
||||
return;
|
||||
}
|
||||
if (!strcasecmp(c->argv[where_idx]->ptr, "MIN")) {
|
||||
where = ZSET_MIN;
|
||||
} else if (!strcasecmp(c->argv[where_idx]->ptr, "MAX")) {
|
||||
where = ZSET_MAX;
|
||||
} else {
|
||||
addReplyErrorObject(c, shared.syntaxerr);
|
||||
return;
|
||||
}
|
||||
|
||||
/* Parse the optional arguments. */
|
||||
for (j = where_idx + 1; j < c->argc; j++) {
|
||||
char *opt = c->argv[j]->ptr;
|
||||
int moreargs = (c->argc - 1) - j;
|
||||
|
||||
if (!strcasecmp(opt, "COUNT") && moreargs) {
|
||||
j++;
|
||||
if (getRangeLongFromObjectOrReply(c, c->argv[j], 1, LONG_MAX,
|
||||
&count,"count should be greater than 0") != C_OK)
|
||||
return;
|
||||
} else {
|
||||
addReplyErrorObject(c, shared.syntaxerr);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
if (is_block) {
|
||||
/* BLOCK. We will handle CLIENT_DENY_BLOCKING flag in blockingGenericZpopCommand. */
|
||||
blockingGenericZpopCommand(c, c->argv+numkeys_idx+1, numkeys, where, 1, count, 1, 1);
|
||||
} else {
|
||||
/* NON-BLOCK */
|
||||
genericZpopCommand(c, c->argv+numkeys_idx+1, numkeys, where, 1, count, 1, 1, NULL);
|
||||
}
|
||||
}
|
||||
|
||||
/* ZMPOP numkeys [<key> ...] MIN|MAX [COUNT count] */
|
||||
void zmpopCommand(client *c) {
|
||||
zmpopGenericCommand(c, 1, 0);
|
||||
}
|
||||
|
||||
/* BZMPOP timeout numkeys [<key> ...] MIN|MAX [COUNT count] */
|
||||
void bzmpopCommand(client *c) {
|
||||
zmpopGenericCommand(c, 2, 1);
|
||||
}
|
||||
|
@ -333,7 +333,6 @@ tags {"aof external:skip"} {
|
||||
set client [redis [dict get $srv host] [dict get $srv port] 0 $::tls]
|
||||
set client2 [redis [dict get $srv host] [dict get $srv port] 1 $::tls]
|
||||
wait_done_loading $client
|
||||
wait_done_loading $client2
|
||||
|
||||
# Pop all elements from mylist, should be blmpop delete mylist.
|
||||
$client lmpop 1 mylist left count 1
|
||||
@ -368,4 +367,51 @@ tags {"aof external:skip"} {
|
||||
assert_equal 2 [$client llen mylist3]
|
||||
}
|
||||
}
|
||||
|
||||
# Test that ZMPOP/BZMPOP work fine with AOF.
|
||||
create_aof {
|
||||
append_to_aof [formatCommand zadd myzset 1 one 2 two 3 three]
|
||||
append_to_aof [formatCommand zadd myzset2 4 four 5 five 6 six]
|
||||
append_to_aof [formatCommand zadd myzset3 1 one 2 two 3 three 4 four 5 five]
|
||||
}
|
||||
|
||||
start_server_aof [list dir $server_path aof-load-truncated no] {
|
||||
test "AOF+ZMPOP/BZMPOP: pop elements from the zset" {
|
||||
set client [redis [dict get $srv host] [dict get $srv port] 0 $::tls]
|
||||
set client2 [redis [dict get $srv host] [dict get $srv port] 1 $::tls]
|
||||
wait_done_loading $client
|
||||
|
||||
# Pop all elements from myzset, should be bzmpop delete myzset.
|
||||
$client zmpop 1 myzset min count 1
|
||||
$client bzmpop 0 1 myzset min count 10
|
||||
|
||||
# Pop all elements from myzset2, should be zmpop delete myzset2.
|
||||
$client bzmpop 0 2 myzset myzset2 max count 10
|
||||
$client zmpop 2 myzset myzset2 max count 2
|
||||
|
||||
# Blocking path, be blocked and then released.
|
||||
$client2 bzmpop 0 2 myzset myzset2 min count 2
|
||||
after 100
|
||||
$client zadd myzset2 1 one 2 two 3 three
|
||||
|
||||
# Pop up the last element in myzset2
|
||||
$client bzmpop 0 3 myzset myzset2 myzset3 min count 1
|
||||
|
||||
# Leave two elements in myzset3.
|
||||
$client bzmpop 0 3 myzset myzset2 myzset3 max count 3
|
||||
}
|
||||
}
|
||||
|
||||
start_server_aof [list dir $server_path aof-load-truncated no] {
|
||||
test "AOF+ZMPOP/BZMPOP: after pop elements from the zset" {
|
||||
set client [redis [dict get $srv host] [dict get $srv port] 0 $::tls]
|
||||
wait_done_loading $client
|
||||
|
||||
# myzset and myzset2 no longer exist.
|
||||
assert_equal 0 [$client exists myzset myzset2]
|
||||
|
||||
# Length of myzset3 is two.
|
||||
assert_equal 2 [$client zcard myzset3]
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -6,7 +6,7 @@ start_server {
|
||||
} {
|
||||
source "tests/unit/type/list-common.tcl"
|
||||
|
||||
# A helper function for BPOP/BLMPOP with one input key.
|
||||
# A helper function to execute either B*POP or BLMPOP* with one input key.
|
||||
proc bpop_command {rd pop key timeout} {
|
||||
if {$pop == "BLMPOP_LEFT"} {
|
||||
$rd blmpop $timeout 1 $key left count 1
|
||||
@ -17,7 +17,7 @@ start_server {
|
||||
}
|
||||
}
|
||||
|
||||
# A helper function for BPOP/BLMPOP with two input keys.
|
||||
# A helper function to execute either B*POP or BLMPOP* with two input keys.
|
||||
proc bpop_command_two_key {rd pop key key2 timeout} {
|
||||
if {$pop == "BLMPOP_LEFT"} {
|
||||
$rd blmpop $timeout 2 $key $key2 left count 1
|
||||
@ -719,14 +719,14 @@ foreach {pop} {BLPOP BLMPOP_LEFT} {
|
||||
set rd [redis_deferring_client]
|
||||
set repl [attach_to_replication_stream]
|
||||
|
||||
# BLMPOP without block.
|
||||
# BLMPOP without being blocked.
|
||||
r lpush mylist{t} a b c
|
||||
r rpush mylist2{t} 1 2 3
|
||||
r blmpop 0 1 mylist{t} left count 1
|
||||
r blmpop 0 2 mylist{t} mylist2{t} right count 10
|
||||
r blmpop 0 2 mylist{t} mylist2{t} right count 10
|
||||
|
||||
# BLMPOP with block.
|
||||
# BLMPOP that gets blocked.
|
||||
$rd blmpop 0 1 mylist{t} left count 1
|
||||
wait_for_blocked_client
|
||||
r lpush mylist{t} a
|
||||
@ -737,6 +737,10 @@ foreach {pop} {BLPOP BLMPOP_LEFT} {
|
||||
wait_for_blocked_client
|
||||
r rpush mylist2{t} a b c
|
||||
|
||||
# Released on timeout.
|
||||
assert_equal {} [r blmpop 0.01 1 mylist{t} left count 10]
|
||||
r set foo{t} bar ;# something else to propagate after, so we can make sure the above pop didn't.
|
||||
|
||||
assert_replication_stream $repl {
|
||||
{select *}
|
||||
{lpush mylist{t} a b c}
|
||||
@ -750,6 +754,7 @@ foreach {pop} {BLPOP BLMPOP_LEFT} {
|
||||
{lpop mylist{t} 3}
|
||||
{rpush mylist2{t} a b c}
|
||||
{rpop mylist2{t} 3}
|
||||
{set foo{t} bar}
|
||||
}
|
||||
} {} {needs:repl}
|
||||
|
||||
|
@ -6,6 +6,96 @@ start_server {tags {"zset"}} {
|
||||
}
|
||||
}
|
||||
|
||||
# A helper function to verify either ZPOP* or ZMPOP* response.
|
||||
proc verify_pop_response {pop res zpop_expected_response zmpop_expected_response} {
|
||||
if {[string match "*ZM*" $pop]} {
|
||||
assert_equal $res $zmpop_expected_response
|
||||
} else {
|
||||
assert_equal $res $zpop_expected_response
|
||||
}
|
||||
}
|
||||
|
||||
# A helper function to verify either ZPOP* or ZMPOP* response when given one input key.
|
||||
proc verify_zpop_response {rd pop key count zpop_expected_response zmpop_expected_response} {
|
||||
if {[string match "ZM*" $pop]} {
|
||||
lassign [split $pop "_"] pop where
|
||||
|
||||
if {$count == 0} {
|
||||
set res [$rd $pop 1 $key $where]
|
||||
} else {
|
||||
set res [$rd $pop 1 $key $where COUNT $count]
|
||||
}
|
||||
} else {
|
||||
if {$count == 0} {
|
||||
set res [$rd $pop $key]
|
||||
} else {
|
||||
set res [$rd $pop $key $count]
|
||||
}
|
||||
}
|
||||
verify_pop_response $pop $res $zpop_expected_response $zmpop_expected_response
|
||||
}
|
||||
|
||||
# A helper function to verify either BZPOP* or BZMPOP* response when given one input key.
|
||||
proc verify_bzpop_response {rd pop key timeout count bzpop_expected_response bzmpop_expected_response} {
|
||||
if {[string match "BZM*" $pop]} {
|
||||
lassign [split $pop "_"] pop where
|
||||
|
||||
if {$count == 0} {
|
||||
$rd $pop $timeout 1 $key $where
|
||||
} else {
|
||||
$rd $pop $timeout 1 $key $where COUNT $count
|
||||
}
|
||||
} else {
|
||||
$rd $pop $key $timeout
|
||||
}
|
||||
verify_pop_response $pop [$rd read] $bzpop_expected_response $bzmpop_expected_response
|
||||
}
|
||||
|
||||
# A helper function to verify either ZPOP* or ZMPOP* response when given two input keys.
|
||||
proc verify_bzpop_two_key_response {rd pop key key2 timeout count bzpop_expected_response bzmpop_expected_response} {
|
||||
if {[string match "BZM*" $pop]} {
|
||||
lassign [split $pop "_"] pop where
|
||||
|
||||
if {$count == 0} {
|
||||
$rd $pop $timeout 2 $key $key2 $where
|
||||
} else {
|
||||
$rd $pop $timeout 2 $key $key2 $where COUNT $count
|
||||
}
|
||||
} else {
|
||||
$rd $pop $key $key2 $timeout
|
||||
}
|
||||
verify_pop_response $pop [$rd read] $bzpop_expected_response $bzmpop_expected_response
|
||||
}
|
||||
|
||||
# A helper function to execute either BZPOP* or BZMPOP* with one input key.
|
||||
proc bzpop_command {rd pop key timeout} {
|
||||
if {[string match "BZM*" $pop]} {
|
||||
lassign [split $pop "_"] pop where
|
||||
$rd $pop $timeout 1 $key $where COUNT 1
|
||||
} else {
|
||||
$rd $pop $key $timeout
|
||||
}
|
||||
}
|
||||
|
||||
# A helper function to verify nil response in readraw base on RESP version.
|
||||
proc verify_nil_response {resp nil_response} {
|
||||
if {$resp == 2} {
|
||||
assert_equal $nil_response {*-1}
|
||||
} elseif {$resp == 3} {
|
||||
assert_equal $nil_response {_}
|
||||
}
|
||||
}
|
||||
|
||||
# A helper function to verify zset score response in readraw base on RESP version.
|
||||
proc verify_score_response {rd resp score} {
|
||||
if {$resp == 2} {
|
||||
assert_equal [$rd read] {$1}
|
||||
assert_equal [$rd read] $score
|
||||
} elseif {$resp == 3} {
|
||||
assert_equal [$rd read] ",$score"
|
||||
}
|
||||
}
|
||||
|
||||
proc basics {encoding} {
|
||||
set original_max_entries [lindex [r config get zset-max-ziplist-entries] 1]
|
||||
set original_max_value [lindex [r config get zset-max-ziplist-value] 1]
|
||||
@ -509,10 +599,10 @@ start_server {tags {"zset"}} {
|
||||
assert_equal {} [r zrevrangebylex zset \[elez \[elex]
|
||||
assert_equal {} [r zrevrangebylex zset (hill (omega]
|
||||
}
|
||||
|
||||
|
||||
test "ZLEXCOUNT advanced - $encoding" {
|
||||
create_default_lex_zset
|
||||
|
||||
|
||||
assert_equal 9 [r zlexcount zset - +]
|
||||
assert_equal 0 [r zlexcount zset + -]
|
||||
assert_equal 0 [r zlexcount zset + \[c]
|
||||
@ -913,110 +1003,103 @@ start_server {tags {"zset"}} {
|
||||
}
|
||||
}
|
||||
|
||||
test "Basic ZPOP with a single key - $encoding" {
|
||||
foreach {popmin popmax} {ZPOPMIN ZPOPMAX ZMPOP_MIN ZMPOP_MAX} {
|
||||
test "Basic $popmin/$popmax with a single key - $encoding" {
|
||||
r del zset
|
||||
assert_equal {} [r zpopmin zset]
|
||||
verify_zpop_response r $popmin zset 0 {} {}
|
||||
|
||||
create_zset zset {-1 a 1 b 2 c 3 d 4 e}
|
||||
assert_equal {a -1} [r zpopmin zset]
|
||||
assert_equal {b 1} [r zpopmin zset]
|
||||
assert_equal {e 4} [r zpopmax zset]
|
||||
assert_equal {d 3} [r zpopmax zset]
|
||||
assert_equal {c 2} [r zpopmin zset]
|
||||
verify_zpop_response r $popmin zset 0 {a -1} {zset {{a -1}}}
|
||||
verify_zpop_response r $popmin zset 0 {b 1} {zset {{b 1}}}
|
||||
verify_zpop_response r $popmax zset 0 {e 4} {zset {{e 4}}}
|
||||
verify_zpop_response r $popmax zset 0 {d 3} {zset {{d 3}}}
|
||||
verify_zpop_response r $popmin zset 0 {c 2} {zset {{c 2}}}
|
||||
assert_equal 0 [r exists zset]
|
||||
r set foo bar
|
||||
assert_error "*WRONGTYPE*" {r zpopmin foo}
|
||||
}
|
||||
|
||||
test "ZPOP with count - $encoding" {
|
||||
test "$popmin/$popmax with count - $encoding" {
|
||||
r del z1
|
||||
r del z2
|
||||
r del z3
|
||||
r del foo
|
||||
r set foo bar
|
||||
assert_equal {} [r zpopmin z1 2]
|
||||
assert_error "*WRONGTYPE*" {r zpopmin foo 2}
|
||||
verify_zpop_response r $popmin z1 2 {} {}
|
||||
|
||||
create_zset z1 {0 a 1 b 2 c 3 d}
|
||||
assert_equal {a 0 b 1} [r zpopmin z1 2]
|
||||
assert_equal {d 3 c 2} [r zpopmax z1 2]
|
||||
verify_zpop_response r $popmin z1 2 {a 0 b 1} {z1 {{a 0} {b 1}}}
|
||||
verify_zpop_response r $popmax z1 2 {d 3 c 2} {z1 {{d 3} {c 2}}}
|
||||
}
|
||||
}
|
||||
|
||||
test "BZPOP with a single existing sorted set - $encoding" {
|
||||
foreach {popmin popmax} {BZPOPMIN BZPOPMAX BZMPOP_MIN BZMPOP_MAX} {
|
||||
test "$popmin/$popmax with a single existing sorted set - $encoding" {
|
||||
set rd [redis_deferring_client]
|
||||
create_zset zset {0 a 1 b 2 c}
|
||||
create_zset zset {0 a 1 b 2 c 3 d}
|
||||
|
||||
$rd bzpopmin zset 5
|
||||
assert_equal {zset a 0} [$rd read]
|
||||
$rd bzpopmin zset 5
|
||||
assert_equal {zset b 1} [$rd read]
|
||||
$rd bzpopmax zset 5
|
||||
assert_equal {zset c 2} [$rd read]
|
||||
verify_bzpop_response $rd $popmin zset 5 0 {zset a 0} {zset {{a 0}}}
|
||||
verify_bzpop_response $rd $popmax zset 5 0 {zset d 3} {zset {{d 3}}}
|
||||
verify_bzpop_response $rd $popmin zset 5 0 {zset b 1} {zset {{b 1}}}
|
||||
verify_bzpop_response $rd $popmax zset 5 0 {zset c 2} {zset {{c 2}}}
|
||||
assert_equal 0 [r exists zset]
|
||||
}
|
||||
|
||||
test "BZPOP with multiple existing sorted sets - $encoding" {
|
||||
test "$popmin/$popmax with multiple existing sorted sets - $encoding" {
|
||||
set rd [redis_deferring_client]
|
||||
create_zset z1{t} {0 a 1 b 2 c}
|
||||
create_zset z2{t} {3 d 4 e 5 f}
|
||||
|
||||
$rd bzpopmin z1{t} z2{t} 5
|
||||
assert_equal {z1{t} a 0} [$rd read]
|
||||
$rd bzpopmax z1{t} z2{t} 5
|
||||
assert_equal {z1{t} c 2} [$rd read]
|
||||
verify_bzpop_two_key_response $rd $popmin z1{t} z2{t} 5 0 {z1{t} a 0} {z1{t} {{a 0}}}
|
||||
verify_bzpop_two_key_response $rd $popmax z1{t} z2{t} 5 0 {z1{t} c 2} {z1{t} {{c 2}}}
|
||||
assert_equal 1 [r zcard z1{t}]
|
||||
assert_equal 3 [r zcard z2{t}]
|
||||
|
||||
$rd bzpopmax z2{t} z1{t} 5
|
||||
assert_equal {z2{t} f 5} [$rd read]
|
||||
$rd bzpopmin z2{t} z1{t} 5
|
||||
assert_equal {z2{t} d 3} [$rd read]
|
||||
verify_bzpop_two_key_response $rd $popmax z2{t} z1{t} 5 0 {z2{t} f 5} {z2{t} {{f 5}}}
|
||||
verify_bzpop_two_key_response $rd $popmin z2{t} z1{t} 5 0 {z2{t} d 3} {z2{t} {{d 3}}}
|
||||
assert_equal 1 [r zcard z1{t}]
|
||||
assert_equal 1 [r zcard z2{t}]
|
||||
}
|
||||
|
||||
test "BZPOP second sorted set has members - $encoding" {
|
||||
test "$popmin/$popmax second sorted set has members - $encoding" {
|
||||
set rd [redis_deferring_client]
|
||||
r del z1{t}
|
||||
create_zset z2{t} {3 d 4 e 5 f}
|
||||
$rd bzpopmax z1{t} z2{t} 5
|
||||
assert_equal {z2{t} f 5} [$rd read]
|
||||
$rd bzpopmin z2{t} z1{t} 5
|
||||
assert_equal {z2{t} d 3} [$rd read]
|
||||
|
||||
verify_bzpop_two_key_response $rd $popmax z1{t} z2{t} 5 0 {z2{t} f 5} {z2{t} {{f 5}}}
|
||||
verify_bzpop_two_key_response $rd $popmin z1{t} z2{t} 5 0 {z2{t} d 3} {z2{t} {{d 3}}}
|
||||
assert_equal 0 [r zcard z1{t}]
|
||||
assert_equal 1 [r zcard z2{t}]
|
||||
}
|
||||
}
|
||||
|
||||
test "Basic ZPOP - $encoding RESP3" {
|
||||
foreach {popmin popmax} {ZPOPMIN ZPOPMAX ZMPOP_MIN ZMPOP_MAX} {
|
||||
test "Basic $popmin/$popmax - $encoding RESP3" {
|
||||
r hello 3
|
||||
r del z1
|
||||
create_zset z1 {0 a 1 b 2 c 3 d}
|
||||
assert_equal {a 0.0} [r zpopmin z1]
|
||||
assert_equal {d 3.0} [r zpopmax z1]
|
||||
verify_zpop_response r $popmin z1 0 {a 0.0} {z1 {{a 0.0}}}
|
||||
verify_zpop_response r $popmax z1 0 {d 3.0} {z1 {{d 3.0}}}
|
||||
r hello 2
|
||||
}
|
||||
|
||||
test "ZPOP with count - $encoding RESP3" {
|
||||
test "$popmin/$popmax with count - $encoding RESP3" {
|
||||
r hello 3
|
||||
r del z1
|
||||
create_zset z1 {0 a 1 b 2 c 3 d}
|
||||
assert_equal {{a 0.0} {b 1.0}} [r zpopmin z1 2]
|
||||
assert_equal {{d 3.0} {c 2.0}} [r zpopmax z1 2]
|
||||
verify_zpop_response r $popmin z1 2 {{a 0.0} {b 1.0}} {z1 {{a 0.0} {b 1.0}}}
|
||||
verify_zpop_response r $popmax z1 2 {{d 3.0} {c 2.0}} {z1 {{d 3.0} {c 2.0}}}
|
||||
r hello 2
|
||||
}
|
||||
}
|
||||
|
||||
test "BZPOP - $encoding RESP3" {
|
||||
foreach {popmin popmax} {BZPOPMIN BZPOPMAX BZMPOP_MIN BZMPOP_MAX} {
|
||||
test "$popmin/$popmax - $encoding RESP3" {
|
||||
r hello 3
|
||||
set rd [redis_deferring_client]
|
||||
create_zset zset {0 a 1 b 2 c}
|
||||
create_zset zset {0 a 1 b 2 c 3 d}
|
||||
|
||||
verify_bzpop_response $rd $popmin zset 5 0 {zset a 0} {zset {{a 0}}}
|
||||
verify_bzpop_response $rd $popmax zset 5 0 {zset d 3} {zset {{d 3}}}
|
||||
verify_bzpop_response $rd $popmin zset 5 0 {zset b 1} {zset {{b 1}}}
|
||||
verify_bzpop_response $rd $popmax zset 5 0 {zset c 2} {zset {{c 2}}}
|
||||
|
||||
$rd bzpopmin zset 5
|
||||
assert_equal {zset a 0} [$rd read]
|
||||
$rd bzpopmin zset 5
|
||||
assert_equal {zset b 1} [$rd read]
|
||||
$rd bzpopmax zset 5
|
||||
assert_equal {zset c 2} [$rd read]
|
||||
assert_equal 0 [r exists zset]
|
||||
r hello 2
|
||||
}
|
||||
}
|
||||
|
||||
r config set zset-max-ziplist-entries $original_max_entries
|
||||
r config set zset-max-ziplist-value $original_max_value
|
||||
@ -1025,6 +1108,248 @@ start_server {tags {"zset"}} {
|
||||
basics listpack
|
||||
basics skiplist
|
||||
|
||||
test "ZPOP/ZMPOP against wrong type" {
|
||||
r set foo{t} bar
|
||||
assert_error "*WRONGTYPE*" {r zpopmin foo{t}}
|
||||
assert_error "*WRONGTYPE*" {r zpopmax foo{t}}
|
||||
assert_error "*WRONGTYPE*" {r zpopmin foo{t} 2}
|
||||
|
||||
assert_error "*WRONGTYPE*" {r zmpop 1 foo{t} min}
|
||||
assert_error "*WRONGTYPE*" {r zmpop 1 foo{t} max}
|
||||
assert_error "*WRONGTYPE*" {r zmpop 1 foo{t} max count 200}
|
||||
|
||||
r del foo{t}
|
||||
r set foo2{t} bar
|
||||
assert_error "*WRONGTYPE*" {r zmpop 2 foo{t} foo2{t} min}
|
||||
assert_error "*WRONGTYPE*" {r zmpop 2 foo2{t} foo1{t} max count 1}
|
||||
}
|
||||
|
||||
test "ZMPOP with illegal argument" {
|
||||
assert_error "ERR wrong number of arguments*" {r zmpop}
|
||||
assert_error "ERR wrong number of arguments*" {r zmpop 1}
|
||||
assert_error "ERR wrong number of arguments*" {r zmpop 1 myzset{t}}
|
||||
|
||||
assert_error "ERR numkeys*" {r zmpop 0 myzset{t} MIN}
|
||||
assert_error "ERR numkeys*" {r zmpop a myzset{t} MIN}
|
||||
assert_error "ERR numkeys*" {r zmpop -1 myzset{t} MAX}
|
||||
|
||||
assert_error "ERR syntax error*" {r zmpop 1 myzset{t} bad_where}
|
||||
assert_error "ERR syntax error*" {r zmpop 1 myzset{t} MIN bar_arg}
|
||||
assert_error "ERR syntax error*" {r zmpop 1 myzset{t} MAX MIN}
|
||||
assert_error "ERR syntax error*" {r zmpop 1 myzset{t} COUNT}
|
||||
assert_error "ERR syntax error*" {r zmpop 2 myzset{t} myzset2{t} bad_arg}
|
||||
|
||||
assert_error "ERR count*" {r zmpop 1 myzset{t} MIN COUNT 0}
|
||||
assert_error "ERR count*" {r zmpop 1 myzset{t} MAX COUNT a}
|
||||
assert_error "ERR count*" {r zmpop 1 myzset{t} MIN COUNT -1}
|
||||
assert_error "ERR count*" {r zmpop 2 myzset{t} myzset2{t} MAX COUNT -1}
|
||||
}
|
||||
|
||||
test "ZMPOP propagate as pop with count command to replica" {
|
||||
set repl [attach_to_replication_stream]
|
||||
|
||||
# ZMPOP min/max propagate as ZPOPMIN/ZPOPMAX with count
|
||||
r zadd myzset{t} 1 one 2 two 3 three
|
||||
|
||||
# Pop elements from one zset.
|
||||
r zmpop 1 myzset{t} min
|
||||
r zmpop 1 myzset{t} max count 1
|
||||
|
||||
# Now the zset have only one element
|
||||
r zmpop 2 myzset{t} myzset2{t} min count 10
|
||||
|
||||
# No elements so we don't propagate.
|
||||
r zmpop 2 myzset{t} myzset2{t} max count 10
|
||||
|
||||
# Pop elements from the second zset.
|
||||
r zadd myzset2{t} 1 one 2 two 3 three
|
||||
r zmpop 2 myzset{t} myzset2{t} min count 2
|
||||
r zmpop 2 myzset{t} myzset2{t} max count 1
|
||||
|
||||
# Pop all elements.
|
||||
r zadd myzset{t} 1 one 2 two 3 three
|
||||
r zadd myzset2{t} 4 four 5 five 6 six
|
||||
r zmpop 2 myzset{t} myzset2{t} min count 10
|
||||
r zmpop 2 myzset{t} myzset2{t} max count 10
|
||||
|
||||
assert_replication_stream $repl {
|
||||
{select *}
|
||||
{zadd myzset{t} 1 one 2 two 3 three}
|
||||
{zpopmin myzset{t} 1}
|
||||
{zpopmax myzset{t} 1}
|
||||
{zpopmin myzset{t} 1}
|
||||
{zadd myzset2{t} 1 one 2 two 3 three}
|
||||
{zpopmin myzset2{t} 2}
|
||||
{zpopmax myzset2{t} 1}
|
||||
{zadd myzset{t} 1 one 2 two 3 three}
|
||||
{zadd myzset2{t} 4 four 5 five 6 six}
|
||||
{zpopmin myzset{t} 3}
|
||||
{zpopmax myzset2{t} 3}
|
||||
}
|
||||
} {} {needs:repl}
|
||||
|
||||
foreach resp {3 2} {
|
||||
test "ZPOPMIN/ZPOPMAX readraw in RESP$resp" {
|
||||
r del zset{t}
|
||||
create_zset zset2{t} {1 a 2 b 3 c 4 d 5 e}
|
||||
|
||||
r hello $resp
|
||||
r readraw 1
|
||||
|
||||
# ZPOP against non existing key.
|
||||
assert_equal {*0} [r zpopmin zset{t}]
|
||||
assert_equal {*0} [r zpopmin zset{t} 1]
|
||||
|
||||
# ZPOP without COUNT option.
|
||||
assert_equal {*2} [r zpopmin zset2{t}]
|
||||
assert_equal [r read] {$1}
|
||||
assert_equal [r read] {a}
|
||||
verify_score_response r $resp 1
|
||||
|
||||
# ZPOP with COUNT option.
|
||||
if {$resp == 2} {
|
||||
assert_equal {*2} [r zpopmax zset2{t} 1]
|
||||
assert_equal [r read] {$1}
|
||||
assert_equal [r read] {e}
|
||||
} elseif {$resp == 3} {
|
||||
assert_equal {*1} [r zpopmax zset2{t} 1]
|
||||
assert_equal [r read] {*2}
|
||||
assert_equal [r read] {$1}
|
||||
assert_equal [r read] {e}
|
||||
}
|
||||
verify_score_response r $resp 5
|
||||
|
||||
r readraw 0
|
||||
}
|
||||
|
||||
test "BZPOPMIN/BZPOPMAX readraw in RESP$resp" {
|
||||
r del zset{t}
|
||||
create_zset zset2{t} {1 a 2 b 3 c 4 d 5 e}
|
||||
|
||||
set rd [redis_deferring_client]
|
||||
$rd hello $resp
|
||||
$rd read
|
||||
$rd readraw 1
|
||||
|
||||
# BZPOP released on timeout.
|
||||
$rd bzpopmin zset{t} 0.01
|
||||
verify_nil_response $resp [$rd read]
|
||||
$rd bzpopmax zset{t} 0.01
|
||||
verify_nil_response $resp [$rd read]
|
||||
|
||||
# BZPOP non-blocking path.
|
||||
$rd bzpopmin zset1{t} zset2{t} 0.1
|
||||
assert_equal [$rd read] {*3}
|
||||
assert_equal [$rd read] {$8}
|
||||
assert_equal [$rd read] {zset2{t}}
|
||||
assert_equal [$rd read] {$1}
|
||||
assert_equal [$rd read] {a}
|
||||
verify_score_response $rd $resp 1
|
||||
|
||||
# BZPOP blocking path.
|
||||
$rd bzpopmin zset{t} 5
|
||||
wait_for_blocked_client
|
||||
r zadd zset{t} 1 a
|
||||
assert_equal [$rd read] {*3}
|
||||
assert_equal [$rd read] {$7}
|
||||
assert_equal [$rd read] {zset{t}}
|
||||
assert_equal [$rd read] {$1}
|
||||
assert_equal [$rd read] {a}
|
||||
verify_score_response $rd $resp 1
|
||||
|
||||
$rd readraw 0
|
||||
}
|
||||
|
||||
test "ZMPOP readraw in RESP$resp" {
|
||||
r del zset{t} zset2{t}
|
||||
create_zset zset3{t} {1 a}
|
||||
create_zset zset4{t} {1 a 2 b 3 c 4 d 5 e}
|
||||
|
||||
r hello $resp
|
||||
r readraw 1
|
||||
|
||||
# ZMPOP against non existing key.
|
||||
verify_nil_response $resp [r zmpop 1 zset{t} min]
|
||||
verify_nil_response $resp [r zmpop 1 zset{t} max count 1]
|
||||
verify_nil_response $resp [r zmpop 2 zset{t} zset2{t} min]
|
||||
verify_nil_response $resp [r zmpop 2 zset{t} zset2{t} max count 1]
|
||||
|
||||
# ZMPOP with one input key.
|
||||
assert_equal {*2} [r zmpop 1 zset3{t} max]
|
||||
assert_equal [r read] {$8}
|
||||
assert_equal [r read] {zset3{t}}
|
||||
assert_equal [r read] {*1}
|
||||
assert_equal [r read] {*2}
|
||||
assert_equal [r read] {$1}
|
||||
assert_equal [r read] {a}
|
||||
verify_score_response r $resp 1
|
||||
|
||||
# ZMPOP with COUNT option.
|
||||
assert_equal {*2} [r zmpop 2 zset3{t} zset4{t} min count 2]
|
||||
assert_equal [r read] {$8}
|
||||
assert_equal [r read] {zset4{t}}
|
||||
assert_equal [r read] {*2}
|
||||
assert_equal [r read] {*2}
|
||||
assert_equal [r read] {$1}
|
||||
assert_equal [r read] {a}
|
||||
verify_score_response r $resp 1
|
||||
assert_equal [r read] {*2}
|
||||
assert_equal [r read] {$1}
|
||||
assert_equal [r read] {b}
|
||||
verify_score_response r $resp 2
|
||||
|
||||
r readraw 0
|
||||
}
|
||||
|
||||
test "BZMPOP readraw in RESP$resp" {
|
||||
r del zset{t} zset2{t}
|
||||
create_zset zset3{t} {1 a 2 b 3 c 4 d 5 e}
|
||||
|
||||
set rd [redis_deferring_client]
|
||||
$rd hello $resp
|
||||
$rd read
|
||||
$rd readraw 1
|
||||
|
||||
# BZMPOP released on timeout.
|
||||
$rd bzmpop 0.01 1 zset{t} min
|
||||
verify_nil_response $resp [$rd read]
|
||||
$rd bzmpop 0.01 2 zset{t} zset2{t} max
|
||||
verify_nil_response $resp [$rd read]
|
||||
|
||||
# BZMPOP non-blocking path.
|
||||
$rd bzmpop 0.1 2 zset3{t} zset4{t} min
|
||||
|
||||
assert_equal [$rd read] {*2}
|
||||
assert_equal [$rd read] {$8}
|
||||
assert_equal [$rd read] {zset3{t}}
|
||||
assert_equal [$rd read] {*1}
|
||||
assert_equal [$rd read] {*2}
|
||||
assert_equal [$rd read] {$1}
|
||||
assert_equal [$rd read] {a}
|
||||
verify_score_response $rd $resp 1
|
||||
|
||||
# BZMPOP blocking path with COUNT option.
|
||||
$rd bzmpop 5 2 zset{t} zset2{t} max count 2
|
||||
wait_for_blocked_client
|
||||
r zadd zset2{t} 1 a 2 b 3 c
|
||||
|
||||
assert_equal [$rd read] {*2}
|
||||
assert_equal [$rd read] {$8}
|
||||
assert_equal [$rd read] {zset2{t}}
|
||||
assert_equal [$rd read] {*2}
|
||||
assert_equal [$rd read] {*2}
|
||||
assert_equal [$rd read] {$1}
|
||||
assert_equal [$rd read] {c}
|
||||
verify_score_response $rd $resp 3
|
||||
assert_equal [$rd read] {*2}
|
||||
assert_equal [$rd read] {$1}
|
||||
assert_equal [$rd read] {b}
|
||||
verify_score_response $rd $resp 2
|
||||
|
||||
$rd readraw 0
|
||||
}
|
||||
}
|
||||
|
||||
test {ZINTERSTORE regression with two sets, intset+hashtable} {
|
||||
r del seta{t} setb{t} setc{t}
|
||||
r sadd set1{t} a
|
||||
@ -1073,25 +1398,25 @@ start_server {tags {"zset"}} {
|
||||
assert_error "*ERR*syntax*" {r zinterstore foo{t} 2 zsetd{t} zsetf{t} withscores}
|
||||
assert_error "*ERR*syntax*" {r zdiffstore foo{t} 2 zsetd{t} zsetf{t} withscores}
|
||||
}
|
||||
|
||||
|
||||
test {ZMSCORE retrieve} {
|
||||
r del zmscoretest
|
||||
r zadd zmscoretest 10 x
|
||||
r zadd zmscoretest 20 y
|
||||
|
||||
|
||||
r zmscore zmscoretest x y
|
||||
} {10 20}
|
||||
|
||||
test {ZMSCORE retrieve from empty set} {
|
||||
r del zmscoretest
|
||||
|
||||
|
||||
r zmscore zmscoretest x y
|
||||
} {{} {}}
|
||||
|
||||
|
||||
test {ZMSCORE retrieve with missing member} {
|
||||
r del zmscoretest
|
||||
r zadd zmscoretest 10 x
|
||||
|
||||
|
||||
r zmscore zmscoretest x y
|
||||
} {10 {}}
|
||||
|
||||
@ -1099,7 +1424,7 @@ start_server {tags {"zset"}} {
|
||||
r del zmscoretest
|
||||
r zadd zmscoretest 10 x
|
||||
r zadd zmscoretest 20 y
|
||||
|
||||
|
||||
r zmscore zmscoretest x
|
||||
} {10}
|
||||
|
||||
@ -1107,7 +1432,7 @@ start_server {tags {"zset"}} {
|
||||
r del zmscoretest
|
||||
r zadd zmscoretest 10 x
|
||||
r zadd zmscoretest 20 y
|
||||
|
||||
|
||||
catch {r zmscore zmscoretest} e
|
||||
assert_match {*ERR*wrong*number*arg*} $e
|
||||
}
|
||||
@ -1459,27 +1784,31 @@ start_server {tags {"zset"}} {
|
||||
assert_equal {} $err
|
||||
}
|
||||
|
||||
test "BZPOPMIN, ZADD + DEL should not awake blocked client" {
|
||||
foreach {pop} {BZPOPMIN BZMPOP_MIN} {
|
||||
test "$pop, ZADD + DEL should not awake blocked client" {
|
||||
set rd [redis_deferring_client]
|
||||
r del zset
|
||||
|
||||
$rd bzpopmin zset 0
|
||||
bzpop_command $rd $pop zset 0
|
||||
wait_for_blocked_client
|
||||
|
||||
r multi
|
||||
r zadd zset 0 foo
|
||||
r del zset
|
||||
r exec
|
||||
r del zset
|
||||
r zadd zset 1 bar
|
||||
$rd read
|
||||
} {zset bar 1}
|
||||
|
||||
test "BZPOPMIN, ZADD + DEL + SET should not awake blocked client" {
|
||||
verify_pop_response $pop [$rd read] {zset bar 1} {zset {{bar 1}}}
|
||||
}
|
||||
|
||||
test "$pop, ZADD + DEL + SET should not awake blocked client" {
|
||||
set rd [redis_deferring_client]
|
||||
r del list
|
||||
|
||||
r del zset
|
||||
|
||||
$rd bzpopmin zset 0
|
||||
bzpop_command $rd $pop zset 0
|
||||
wait_for_blocked_client
|
||||
|
||||
r multi
|
||||
r zadd zset 0 foo
|
||||
r del zset
|
||||
@ -1487,8 +1816,10 @@ start_server {tags {"zset"}} {
|
||||
r exec
|
||||
r del zset
|
||||
r zadd zset 1 bar
|
||||
$rd read
|
||||
} {zset bar 1}
|
||||
|
||||
verify_pop_response $pop [$rd read] {zset bar 1} {zset {{bar 1}}}
|
||||
}
|
||||
}
|
||||
|
||||
test "BZPOPMIN with same key multiple times should work" {
|
||||
set rd [redis_deferring_client]
|
||||
@ -1496,9 +1827,11 @@ start_server {tags {"zset"}} {
|
||||
|
||||
# Data arriving after the BZPOPMIN.
|
||||
$rd bzpopmin z1{t} z2{t} z2{t} z1{t} 0
|
||||
wait_for_blocked_client
|
||||
r zadd z1{t} 0 a
|
||||
assert_equal [$rd read] {z1{t} a 0}
|
||||
$rd bzpopmin z1{t} z2{t} z2{t} z1{t} 0
|
||||
wait_for_blocked_client
|
||||
r zadd z2{t} 1 b
|
||||
assert_equal [$rd read] {z2{t} b 1}
|
||||
|
||||
@ -1511,38 +1844,47 @@ start_server {tags {"zset"}} {
|
||||
assert_equal [$rd read] {z2{t} b 1}
|
||||
}
|
||||
|
||||
test "MULTI/EXEC is isolated from the point of view of BZPOPMIN" {
|
||||
foreach {pop} {BZPOPMIN BZMPOP_MIN} {
|
||||
test "MULTI/EXEC is isolated from the point of view of $pop" {
|
||||
set rd [redis_deferring_client]
|
||||
r del zset
|
||||
$rd bzpopmin zset 0
|
||||
|
||||
bzpop_command $rd $pop zset 0
|
||||
wait_for_blocked_client
|
||||
|
||||
r multi
|
||||
r zadd zset 0 a
|
||||
r zadd zset 1 b
|
||||
r zadd zset 2 c
|
||||
r exec
|
||||
$rd read
|
||||
} {zset a 0}
|
||||
|
||||
test "BZPOPMIN with variadic ZADD" {
|
||||
verify_pop_response $pop [$rd read] {zset a 0} {zset {{a 0}}}
|
||||
}
|
||||
|
||||
test "$pop with variadic ZADD" {
|
||||
set rd [redis_deferring_client]
|
||||
r del zset
|
||||
if {$::valgrind} {after 100}
|
||||
$rd bzpopmin zset 0
|
||||
bzpop_command $rd $pop zset 0
|
||||
wait_for_blocked_client
|
||||
if {$::valgrind} {after 100}
|
||||
assert_equal 2 [r zadd zset -1 foo 1 bar]
|
||||
if {$::valgrind} {after 100}
|
||||
assert_equal {zset foo -1} [$rd read]
|
||||
verify_pop_response $pop [$rd read] {zset foo -1} {zset {{foo -1}}}
|
||||
assert_equal {bar} [r zrange zset 0 -1]
|
||||
}
|
||||
|
||||
test "BZPOPMIN with zero timeout should block indefinitely" {
|
||||
test "$pop with zero timeout should block indefinitely" {
|
||||
set rd [redis_deferring_client]
|
||||
r del zset
|
||||
$rd bzpopmin zset 0
|
||||
bzpop_command $rd $pop zset 0
|
||||
wait_for_blocked_client
|
||||
after 1000
|
||||
r zadd zset 0 foo
|
||||
assert_equal {zset foo 0} [$rd read]
|
||||
verify_pop_response $pop [$rd read] {zset foo 0} {zset {{foo 0}}}
|
||||
}
|
||||
}
|
||||
|
||||
r config set zset-max-ziplist-entries $original_max_entries
|
||||
r config set zset-max-ziplist-value $original_max_value
|
||||
}
|
||||
@ -1552,6 +1894,113 @@ start_server {tags {"zset"}} {
|
||||
stressers skiplist
|
||||
}
|
||||
|
||||
test "BZPOP/BZMPOP against wrong type" {
|
||||
r set foo{t} bar
|
||||
assert_error "*WRONGTYPE*" {r bzpopmin foo{t} 1}
|
||||
assert_error "*WRONGTYPE*" {r bzpopmax foo{t} 1}
|
||||
|
||||
assert_error "*WRONGTYPE*" {r bzmpop 1 1 foo{t} min}
|
||||
assert_error "*WRONGTYPE*" {r bzmpop 1 1 foo{t} max}
|
||||
assert_error "*WRONGTYPE*" {r bzmpop 1 1 foo{t} min count 10}
|
||||
|
||||
r del foo{t}
|
||||
r set foo2{t} bar
|
||||
assert_error "*WRONGTYPE*" {r bzmpop 1 2 foo{t} foo2{t} min}
|
||||
assert_error "*WRONGTYPE*" {r bzmpop 1 2 foo2{t} foo{t} max count 1}
|
||||
}
|
||||
|
||||
test "BZMPOP with illegal argument" {
|
||||
assert_error "ERR wrong number of arguments*" {r bzmpop}
|
||||
assert_error "ERR wrong number of arguments*" {r bzmpop 0 1}
|
||||
assert_error "ERR wrong number of arguments*" {r bzmpop 0 1 myzset{t}}
|
||||
|
||||
assert_error "ERR numkeys*" {r bzmpop 1 0 myzset{t} MIN}
|
||||
assert_error "ERR numkeys*" {r bzmpop 1 a myzset{t} MIN}
|
||||
assert_error "ERR numkeys*" {r bzmpop 1 -1 myzset{t} MAX}
|
||||
|
||||
assert_error "ERR syntax error*" {r bzmpop 1 1 myzset{t} bad_where}
|
||||
assert_error "ERR syntax error*" {r bzmpop 1 1 myzset{t} MIN bar_arg}
|
||||
assert_error "ERR syntax error*" {r bzmpop 1 1 myzset{t} MAX MIN}
|
||||
assert_error "ERR syntax error*" {r bzmpop 1 1 myzset{t} COUNT}
|
||||
assert_error "ERR syntax error*" {r bzmpop 1 2 myzset{t} myzset2{t} bad_arg}
|
||||
|
||||
assert_error "ERR count*" {r bzmpop 1 1 myzset{t} MIN COUNT 0}
|
||||
assert_error "ERR count*" {r bzmpop 1 1 myzset{t} MAX COUNT a}
|
||||
assert_error "ERR count*" {r bzmpop 1 1 myzset{t} MIN COUNT -1}
|
||||
assert_error "ERR count*" {r bzmpop 1 2 myzset{t} myzset2{t} MAX COUNT -1}
|
||||
}
|
||||
|
||||
test "BZMPOP with multiple blocked clients" {
|
||||
set rd1 [redis_deferring_client]
|
||||
set rd2 [redis_deferring_client]
|
||||
set rd3 [redis_deferring_client]
|
||||
set rd4 [redis_deferring_client]
|
||||
r del myzset{t} myzset2{t}
|
||||
|
||||
$rd1 bzmpop 0 2 myzset{t} myzset2{t} min count 1
|
||||
$rd2 bzmpop 0 2 myzset{t} myzset2{t} max count 10
|
||||
$rd3 bzmpop 0 2 myzset{t} myzset2{t} min count 10
|
||||
$rd4 bzmpop 0 2 myzset{t} myzset2{t} max count 1
|
||||
wait_for_blocked_clients_count 4
|
||||
|
||||
r multi
|
||||
r zadd myzset{t} 1 a 2 b 3 c 4 d 5 e
|
||||
r zadd myzset2{t} 1 a 2 b 3 c 4 d 5 e
|
||||
r exec
|
||||
|
||||
assert_equal {myzset{t} {{a 1}}} [$rd1 read]
|
||||
assert_equal {myzset{t} {{e 5} {d 4} {c 3} {b 2}}} [$rd2 read]
|
||||
assert_equal {myzset2{t} {{a 1} {b 2} {c 3} {d 4} {e 5}}} [$rd3 read]
|
||||
|
||||
r zadd myzset2{t} 1 a 2 b 3 c
|
||||
assert_equal {myzset2{t} {{c 3}}} [$rd4 read]
|
||||
|
||||
r del myzset{t} myzset2{t}
|
||||
}
|
||||
|
||||
test "BZMPOP propagate as pop with count command to replica" {
|
||||
set rd [redis_deferring_client]
|
||||
set repl [attach_to_replication_stream]
|
||||
|
||||
# BZMPOP without being blocked.
|
||||
r zadd myzset{t} 1 one 2 two 3 three
|
||||
r zadd myzset2{t} 4 four 5 five 6 six
|
||||
r bzmpop 0 1 myzset{t} min
|
||||
r bzmpop 0 2 myzset{t} myzset2{t} max count 10
|
||||
r bzmpop 0 2 myzset{t} myzset2{t} max count 10
|
||||
|
||||
# BZMPOP that gets blocked.
|
||||
$rd bzmpop 0 1 myzset{t} min count 1
|
||||
wait_for_blocked_client
|
||||
r zadd myzset{t} 1 one
|
||||
$rd bzmpop 0 2 myzset{t} myzset2{t} min count 5
|
||||
wait_for_blocked_client
|
||||
r zadd myzset{t} 1 one 2 two 3 three
|
||||
$rd bzmpop 0 2 myzset{t} myzset2{t} max count 10
|
||||
wait_for_blocked_client
|
||||
r zadd myzset2{t} 4 four 5 five 6 six
|
||||
|
||||
# Released on timeout.
|
||||
assert_equal {} [r bzmpop 0.01 1 myzset{t} max count 10]
|
||||
r set foo{t} bar ;# something else to propagate after, so we can make sure the above pop didn't.
|
||||
|
||||
assert_replication_stream $repl {
|
||||
{select *}
|
||||
{zadd myzset{t} 1 one 2 two 3 three}
|
||||
{zadd myzset2{t} 4 four 5 five 6 six}
|
||||
{zpopmin myzset{t} 1}
|
||||
{zpopmax myzset{t} 2}
|
||||
{zpopmax myzset2{t} 3}
|
||||
{zadd myzset{t} 1 one}
|
||||
{zpopmin myzset{t} 1}
|
||||
{zadd myzset{t} 1 one 2 two 3 three}
|
||||
{zpopmin myzset{t} 3}
|
||||
{zadd myzset2{t} 4 four 5 five 6 six}
|
||||
{zpopmax myzset2{t} 3}
|
||||
{set foo{t} bar}
|
||||
}
|
||||
} {} {needs:repl}
|
||||
|
||||
test {ZSET skiplist order consistency when elements are moved} {
|
||||
set original_max [lindex [r config get zset-max-ziplist-entries] 1]
|
||||
r config set zset-max-ziplist-entries 0
|
||||
|
Loading…
Reference in New Issue
Block a user