diff --git a/src/module.c b/src/module.c index be64af368..b6a87ab27 100644 --- a/src/module.c +++ b/src/module.c @@ -574,11 +574,8 @@ void moduleHandlePropagationAfterCommandCallback(RedisModuleCtx *ctx) { /* Handle the replication of the final EXEC, since whatever a command * emits is always wrapped around MULTI/EXEC. */ - robj *propargv[1]; - propargv[0] = createStringObject("EXEC",4); - alsoPropagate(server.execCommand,c->db->id,propargv,1, + alsoPropagate(server.execCommand,c->db->id,&shared.exec,1, PROPAGATE_AOF|PROPAGATE_REPL); - decrRefCount(propargv[0]); /* If this is not a module command context (but is instead a simple * callback context), we have to handle directly the "also propagate" diff --git a/src/multi.c b/src/multi.c index f885fa19c..df11225bd 100644 --- a/src/multi.c +++ b/src/multi.c @@ -106,11 +106,13 @@ void discardCommand(client *c) { /* Send a MULTI command to all the slaves and AOF file. Check the execCommand * implementation for more information. */ void execCommandPropagateMulti(client *c) { - robj *multistring = createStringObject("MULTI",5); - - propagate(server.multiCommand,c->db->id,&multistring,1, + propagate(server.multiCommand,c->db->id,&shared.multi,1, + PROPAGATE_AOF|PROPAGATE_REPL); +} + +void execCommandPropagateExec(client *c) { + propagate(server.execCommand,c->db->id,&shared.exec,1, PROPAGATE_AOF|PROPAGATE_REPL); - decrRefCount(multistring); } void execCommand(client *c) { diff --git a/src/scripting.c b/src/scripting.c index 7cf21f408..dc2510f98 100644 --- a/src/scripting.c +++ b/src/scripting.c @@ -1591,11 +1591,8 @@ void evalGenericCommand(client *c, int evalsha) { if (server.lua_replicate_commands) { preventCommandPropagation(c); if (server.lua_multi_emitted) { - robj *propargv[1]; - propargv[0] = createStringObject("EXEC",4); - alsoPropagate(server.execCommand,c->db->id,propargv,1, + alsoPropagate(server.execCommand,c->db->id,&shared.exec,1, PROPAGATE_AOF|PROPAGATE_REPL); - decrRefCount(propargv[0]); } } diff --git a/src/server.c b/src/server.c index 803cfc809..3ad8664fc 100644 --- a/src/server.c +++ b/src/server.c @@ -2244,6 +2244,8 @@ void createSharedObjects(void) { shared.rpoplpush = createStringObject("RPOPLPUSH",9); shared.zpopmin = createStringObject("ZPOPMIN",7); shared.zpopmax = createStringObject("ZPOPMAX",7); + shared.multi = createStringObject("MULTI",5); + shared.exec = createStringObject("EXEC",4); for (j = 0; j < OBJ_SHARED_INTEGERS; j++) { shared.integers[j] = makeObjectShared(createObject(OBJ_STRING,(void*)(long)j)); @@ -3366,6 +3368,18 @@ void call(client *c, int flags) { redisOp *rop; if (flags & CMD_CALL_PROPAGATE) { + int multi_emitted = 0; + /* Wrap the commands in server.also_propagate array, + * but don't wrap it if we are already in MULIT context, + * in case the nested MULIT/EXEC. + * + * And if the array contains only one command, no need to + * wrap it, since the single command is atomic. */ + if (server.also_propagate.numops > 1 && !(c->flags & CLIENT_MULTI)) { + execCommandPropagateMulti(c); + multi_emitted = 1; + } + for (j = 0; j < server.also_propagate.numops; j++) { rop = &server.also_propagate.ops[j]; int target = rop->target; @@ -3375,6 +3389,10 @@ void call(client *c, int flags) { if (target) propagate(rop->cmd,rop->dbid,rop->argv,rop->argc,target); } + + if (multi_emitted) { + execCommandPropagateExec(c); + } } redisOpArrayFree(&server.also_propagate); } diff --git a/src/server.h b/src/server.h index 5a95d4c14..22e484462 100644 --- a/src/server.h +++ b/src/server.h @@ -923,6 +923,7 @@ struct sharedObjectsStruct { *busykeyerr, *oomerr, *plus, *messagebulk, *pmessagebulk, *subscribebulk, *unsubscribebulk, *psubscribebulk, *punsubscribebulk, *del, *unlink, *rpop, *lpop, *lpush, *rpoplpush, *zpopmin, *zpopmax, *emptyscan, + *multi, *exec, *select[PROTO_SHARED_SELECT_CMDS], *integers[OBJ_SHARED_INTEGERS], *mbulkhdr[OBJ_SHARED_BULKHDR_LEN], /* "*\r\n" */ @@ -1751,6 +1752,7 @@ void touchWatchedKeysOnFlush(int dbid); void discardTransaction(client *c); void flagTransaction(client *c); void execCommandPropagateMulti(client *c); +void execCommandPropagateExec(client *c); /* Redis object implementation */ void decrRefCount(robj *o);