From e938bbc543c7a7aa3d67ee75f87eb5412df0a3ef Mon Sep 17 00:00:00 2001 From: antirez Date: Thu, 3 Oct 2019 10:56:37 +0200 Subject: [PATCH] Modules: implement RM_Replicate() from async callbacks. --- src/module.c | 33 +++++++++++++++++++++++++++++++-- src/server.h | 2 ++ 2 files changed, 33 insertions(+), 2 deletions(-) diff --git a/src/module.c b/src/module.c index 9876a966d..e8fb5adb3 100644 --- a/src/module.c +++ b/src/module.c @@ -147,10 +147,14 @@ struct RedisModuleCtx { int keys_count; struct RedisModulePoolAllocBlock *pa_head; + redisOpArray saved_oparray; /* When propagating commands in a callback + we reallocate the "also propagate" op + array. Here we save the old one to + restore it later. */ }; typedef struct RedisModuleCtx RedisModuleCtx; -#define REDISMODULE_CTX_INIT {(void*)(unsigned long)&RM_GetApi, NULL, NULL, NULL, NULL, 0, 0, 0, NULL, 0, NULL, NULL, 0, NULL} +#define REDISMODULE_CTX_INIT {(void*)(unsigned long)&RM_GetApi, NULL, NULL, NULL, NULL, 0, 0, 0, NULL, 0, NULL, NULL, 0, NULL, {0}} #define REDISMODULE_CTX_MULTI_EMITTED (1<<0) #define REDISMODULE_CTX_AUTO_MEMORY (1<<1) #define REDISMODULE_CTX_KEYS_POS_REQUEST (1<<2) @@ -538,6 +542,24 @@ void moduleHandlePropagationAfterCommandCallback(RedisModuleCtx *ctx) { alsoPropagate(server.execCommand,c->db->id,propargv,1, PROPAGATE_AOF|PROPAGATE_REPL); decrRefCount(propargv[0]); + + /* If this is not a module command context (but is instead a simple + * callback context), we have to handle directly the "also propagate" + * array and emit it. In a module command call this will be handled + * directly by call(). */ + if (!(ctx->flags & REDISMODULE_CTX_MODULE_COMMAND_CALL) && + server.also_propagate.numops) + { + for (int j = 0; j < server.also_propagate.numops; j++) { + redisOp *rop = &server.also_propagate.ops[j]; + int target = rop->target; + if (target) + propagate(rop->cmd,rop->dbid,rop->argv,rop->argc,target); + } + redisOpArrayFree(&server.also_propagate); + } + /* Restore the previous oparray in case of nexted use of the API. */ + server.also_propagate = ctx->saved_oparray; } /* Free the context after the user function was called. */ @@ -1354,9 +1376,16 @@ void moduleReplicateMultiIfNeeded(RedisModuleCtx *ctx) { /* If we already emitted MULTI return ASAP. */ if (ctx->flags & REDISMODULE_CTX_MULTI_EMITTED) return; /* If this is a thread safe context, we do not want to wrap commands - * executed into MUTLI/EXEC, they are executed as single commands + * executed into MULTI/EXEC, they are executed as single commands * from an external client in essence. */ if (ctx->flags & REDISMODULE_CTX_THREAD_SAFE) return; + /* If this is a callback context, and not a module command execution + * context, we have to setup the op array for the "also propagate" API + * so that RM_Replicate() will work. */ + if (!(ctx->flags & REDISMODULE_CTX_MODULE_COMMAND_CALL)) { + ctx->saved_oparray = server.also_propagate; + redisOpArrayInit(&server.also_propagate); + } execCommandPropagateMulti(ctx->client); ctx->flags |= REDISMODULE_CTX_MULTI_EMITTED; } diff --git a/src/server.h b/src/server.h index 4d48fcd1a..c78010ecc 100644 --- a/src/server.h +++ b/src/server.h @@ -1922,6 +1922,8 @@ struct redisCommand *lookupCommandOrOriginal(sds name); void call(client *c, int flags); void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc, int flags); void alsoPropagate(struct redisCommand *cmd, int dbid, robj **argv, int argc, int target); +void redisOpArrayInit(redisOpArray *oa); +void redisOpArrayFree(redisOpArray *oa); void forceCommandPropagation(client *c, int flags); void preventCommandPropagation(client *c); void preventCommandAOF(client *c);