mirror of
https://codeberg.org/redict/redict.git
synced 2025-01-22 16:18:28 -05:00
Propagation: wrap commands in also_propagate array with MULIT/EXEC
Random command like SPOP with count is replicated as some SREM operations, and store them in also_propagate array to propagate after the call, but this would break atomicity. To keep the command's atomicity, wrap also_propagate array with MULTI/EXEC.
This commit is contained in:
parent
b93945585a
commit
37a10cef02
@ -574,11 +574,8 @@ void moduleHandlePropagationAfterCommandCallback(RedisModuleCtx *ctx) {
|
||||
|
||||
/* Handle the replication of the final EXEC, since whatever a command
|
||||
* emits is always wrapped around MULTI/EXEC. */
|
||||
robj *propargv[1];
|
||||
propargv[0] = createStringObject("EXEC",4);
|
||||
alsoPropagate(server.execCommand,c->db->id,propargv,1,
|
||||
alsoPropagate(server.execCommand,c->db->id,&shared.exec,1,
|
||||
PROPAGATE_AOF|PROPAGATE_REPL);
|
||||
decrRefCount(propargv[0]);
|
||||
|
||||
/* If this is not a module command context (but is instead a simple
|
||||
* callback context), we have to handle directly the "also propagate"
|
||||
|
10
src/multi.c
10
src/multi.c
@ -106,11 +106,13 @@ void discardCommand(client *c) {
|
||||
/* Send a MULTI command to all the slaves and AOF file. Check the execCommand
|
||||
* implementation for more information. */
|
||||
void execCommandPropagateMulti(client *c) {
|
||||
robj *multistring = createStringObject("MULTI",5);
|
||||
|
||||
propagate(server.multiCommand,c->db->id,&multistring,1,
|
||||
propagate(server.multiCommand,c->db->id,&shared.multi,1,
|
||||
PROPAGATE_AOF|PROPAGATE_REPL);
|
||||
}
|
||||
|
||||
void execCommandPropagateExec(client *c) {
|
||||
propagate(server.execCommand,c->db->id,&shared.exec,1,
|
||||
PROPAGATE_AOF|PROPAGATE_REPL);
|
||||
decrRefCount(multistring);
|
||||
}
|
||||
|
||||
void execCommand(client *c) {
|
||||
|
@ -1591,11 +1591,8 @@ void evalGenericCommand(client *c, int evalsha) {
|
||||
if (server.lua_replicate_commands) {
|
||||
preventCommandPropagation(c);
|
||||
if (server.lua_multi_emitted) {
|
||||
robj *propargv[1];
|
||||
propargv[0] = createStringObject("EXEC",4);
|
||||
alsoPropagate(server.execCommand,c->db->id,propargv,1,
|
||||
alsoPropagate(server.execCommand,c->db->id,&shared.exec,1,
|
||||
PROPAGATE_AOF|PROPAGATE_REPL);
|
||||
decrRefCount(propargv[0]);
|
||||
}
|
||||
}
|
||||
|
||||
|
18
src/server.c
18
src/server.c
@ -2244,6 +2244,8 @@ void createSharedObjects(void) {
|
||||
shared.rpoplpush = createStringObject("RPOPLPUSH",9);
|
||||
shared.zpopmin = createStringObject("ZPOPMIN",7);
|
||||
shared.zpopmax = createStringObject("ZPOPMAX",7);
|
||||
shared.multi = createStringObject("MULTI",5);
|
||||
shared.exec = createStringObject("EXEC",4);
|
||||
for (j = 0; j < OBJ_SHARED_INTEGERS; j++) {
|
||||
shared.integers[j] =
|
||||
makeObjectShared(createObject(OBJ_STRING,(void*)(long)j));
|
||||
@ -3366,6 +3368,18 @@ void call(client *c, int flags) {
|
||||
redisOp *rop;
|
||||
|
||||
if (flags & CMD_CALL_PROPAGATE) {
|
||||
int multi_emitted = 0;
|
||||
/* Wrap the commands in server.also_propagate array,
|
||||
* but don't wrap it if we are already in MULIT context,
|
||||
* in case the nested MULIT/EXEC.
|
||||
*
|
||||
* And if the array contains only one command, no need to
|
||||
* wrap it, since the single command is atomic. */
|
||||
if (server.also_propagate.numops > 1 && !(c->flags & CLIENT_MULTI)) {
|
||||
execCommandPropagateMulti(c);
|
||||
multi_emitted = 1;
|
||||
}
|
||||
|
||||
for (j = 0; j < server.also_propagate.numops; j++) {
|
||||
rop = &server.also_propagate.ops[j];
|
||||
int target = rop->target;
|
||||
@ -3375,6 +3389,10 @@ void call(client *c, int flags) {
|
||||
if (target)
|
||||
propagate(rop->cmd,rop->dbid,rop->argv,rop->argc,target);
|
||||
}
|
||||
|
||||
if (multi_emitted) {
|
||||
execCommandPropagateExec(c);
|
||||
}
|
||||
}
|
||||
redisOpArrayFree(&server.also_propagate);
|
||||
}
|
||||
|
@ -923,6 +923,7 @@ struct sharedObjectsStruct {
|
||||
*busykeyerr, *oomerr, *plus, *messagebulk, *pmessagebulk, *subscribebulk,
|
||||
*unsubscribebulk, *psubscribebulk, *punsubscribebulk, *del, *unlink,
|
||||
*rpop, *lpop, *lpush, *rpoplpush, *zpopmin, *zpopmax, *emptyscan,
|
||||
*multi, *exec,
|
||||
*select[PROTO_SHARED_SELECT_CMDS],
|
||||
*integers[OBJ_SHARED_INTEGERS],
|
||||
*mbulkhdr[OBJ_SHARED_BULKHDR_LEN], /* "*<value>\r\n" */
|
||||
@ -1751,6 +1752,7 @@ void touchWatchedKeysOnFlush(int dbid);
|
||||
void discardTransaction(client *c);
|
||||
void flagTransaction(client *c);
|
||||
void execCommandPropagateMulti(client *c);
|
||||
void execCommandPropagateExec(client *c);
|
||||
|
||||
/* Redis object implementation */
|
||||
void decrRefCount(robj *o);
|
||||
|
Loading…
Reference in New Issue
Block a user