block: propagate BRPOPLPUSH as RPOPLPUSH when unblock

This commit is contained in:
zhaozhao.zz 2019-11-22 16:38:49 +08:00
parent 2c970532dc
commit 6b056d29f3
3 changed files with 7 additions and 13 deletions

View File

@ -2464,6 +2464,7 @@ void initServerConfig(void) {
server.pexpireCommand = lookupCommandByCString("pexpire");
server.xclaimCommand = lookupCommandByCString("xclaim");
server.xgroupCommand = lookupCommandByCString("xgroup");
server.rpoplpushCommand = lookupCommandByCString("rpoplpush");
/* Slow log */
server.slowlog_log_slower_than = CONFIG_DEFAULT_SLOWLOG_LOG_SLOWER_THAN;

View File

@ -1159,7 +1159,7 @@ struct redisServer {
*lpopCommand, *rpopCommand, *zpopminCommand,
*zpopmaxCommand, *sremCommand, *execCommand,
*expireCommand, *pexpireCommand, *xclaimCommand,
*xgroupCommand;
*xgroupCommand, *rpoplpushCommand;
/* Fields used only for stats */
time_t stat_starttime; /* Server start time */
long long stat_numcommands; /* Number of processed commands */

View File

@ -653,20 +653,13 @@ int serveClientBlockedOnList(client *receiver, robj *key, robj *dstkey, redisDb
if (!(dstobj &&
checkType(receiver,dstobj,OBJ_LIST)))
{
/* Propagate the RPOP operation. */
argv[0] = shared.rpop;
argv[1] = key;
propagate(server.rpopCommand,
db->id,argv,2,
PROPAGATE_AOF|
PROPAGATE_REPL);
rpoplpushHandlePush(receiver,dstkey,dstobj,
value);
/* Propagate the LPUSH operation. */
argv[0] = shared.lpush;
argv[1] = dstkey;
argv[2] = value;
propagate(server.lpushCommand,
/* Propagate the RPOPLPUSH operation. */
argv[0] = shared.rpoplpush;
argv[1] = key;
argv[2] = dstkey;
propagate(server.rpoplpushCommand,
db->id,argv,3,
PROPAGATE_AOF|
PROPAGATE_REPL);