From a3e8de043005f0e764f6506d1e38143d788cddc4 Mon Sep 17 00:00:00 2001 From: antirez Date: Thu, 29 Oct 2015 13:50:04 +0100 Subject: [PATCH] Lua script selective replication WIP. --- src/scripting.c | 58 +++++++++++++++++++++++++++++++++++++++++++++++-- src/server.h | 3 ++- 2 files changed, 58 insertions(+), 3 deletions(-) diff --git a/src/scripting.c b/src/scripting.c index b24895c05..10e939f34 100644 --- a/src/scripting.c +++ b/src/scripting.c @@ -375,7 +375,8 @@ int luaRedisGenericCommand(lua_State *lua, int raise_error) { * a Lua script in the context of AOF and slaves. */ if (server.lua_replicate_commands && !server.lua_multi_emitted && - server.lua_write_dirty) + server.lua_write_dirty && + server.lua_repl != PROPAGATE_NONE) { execCommandPropagateMulti(server.lua_caller); server.lua_multi_emitted = 1; @@ -383,7 +384,13 @@ int luaRedisGenericCommand(lua_State *lua, int raise_error) { /* Run the command */ int call_flags = CMD_CALL_SLOWLOG | CMD_CALL_STATS; - if (server.lua_replicate_commands) call_flags |= CMD_CALL_PROPAGATE; + if (server.lua_replicate_commands) { + call_flags |= CMD_CALL_PROPAGATE; + /* Don't propagate AOf / replication stream if a redis.set_repl() + * call changed the default replication policy. */ + if (!(server.lua_repl & PROPAGATE_AOF)) preventCommandAOF(c); + if (!(server.lua_repl & PROPAGATE_REPL)) preventCommandReplication(c); + } call(c,call_flags); /* Convert the result of the Redis command into a suitable Lua type. @@ -540,6 +547,31 @@ int luaRedisReplicateCommandsCommand(lua_State *lua) { return 1; } +/* redis.set_repl() + * + * Set the propagation of write commands executed in the context of the + * script to on/off for AOF and slaves. */ +int luaRedisSetReplCommand(lua_State *lua) { + int argc = lua_gettop(lua); + int flags; + + if (server.lua_replicate_commands == 0) { + luaPushError(lua, "You can set the replication behavior only after turning on single commands replication with redis.replicate_commands()."); + return 1; + } else if (argc != 1) { + luaPushError(lua, "redis.set_repl() requires two arguments."); + return 1; + } + + flags = lua_tonumber(lua,-1); + if ((flags & ~(PROPAGATE_AOF|PROPAGATE_REPL)) != 0) { + luaPushError(lua, "Invalid replication flags. Use REPL_AOF, REPL_SLAVE, REPL_ALL or REPL_NONE."); + return 1; + } + server.lua_repl = flags; + return 0; +} + /* redis.log() */ int luaLogCommand(lua_State *lua) { int j, argc = lua_gettop(lua); @@ -741,6 +773,27 @@ void scriptingInit(void) { lua_pushcfunction(lua, luaRedisReplicateCommandsCommand); lua_settable(lua, -3); + /* redis.set_repl and associated flags. */ + lua_pushstring(lua,"set_repl"); + lua_pushcfunction(lua,luaRedisSetReplCommand); + lua_settable(lua,-3); + + lua_pushstring(lua,"REPL_NONE"); + lua_pushnumber(lua,PROPAGATE_NONE); + lua_settable(lua,-3); + + lua_pushstring(lua,"REPL_AOF"); + lua_pushnumber(lua,PROPAGATE_AOF); + lua_settable(lua,-3); + + lua_pushstring(lua,"REPL_SLAVE"); + lua_pushnumber(lua,PROPAGATE_REPL); + lua_settable(lua,-3); + + lua_pushstring(lua,"REPL_ALL"); + lua_pushnumber(lua,PROPAGATE_AOF|PROPAGATE_REPL); + lua_settable(lua,-3); + /* Finally set the table as 'redis' global var. */ lua_setglobal(lua,"redis"); @@ -987,6 +1040,7 @@ void evalGenericCommand(client *c, int evalsha) { server.lua_write_dirty = 0; server.lua_replicate_commands = 0; server.lua_multi_emitted = 0; + server.lua_repl = PROPAGATE_AOF|PROPAGATE_REPL; /* Get the number of arguments that are keys */ if (getLongLongFromObjectOrReply(c,c->argv[2],&numkeys,NULL) != C_OK) diff --git a/src/server.h b/src/server.h index 7a08315d2..fa63b3fa8 100644 --- a/src/server.h +++ b/src/server.h @@ -940,7 +940,7 @@ struct redisServer { int cluster_migration_barrier; /* Cluster replicas migration barrier. */ int cluster_slave_validity_factor; /* Slave max data age for failover. */ int cluster_require_full_coverage; /* If true, put the cluster down if - there is at least an uncovered slot. */ + there is at least an uncovered slot.*/ /* Scripting */ lua_State *lua; /* The Lua interpreter. We use just one for all clients */ client *lua_client; /* The "fake client" to query Redis from Lua */ @@ -954,6 +954,7 @@ struct redisServer { execution of the current script. */ int lua_replicate_commands; /* True if we are doing single commands repl. */ int lua_multi_emitted;/* True if we already proagated MULTI. */ + int lua_repl; /* Script replication flags for redis.set_repl(). */ int lua_timedout; /* True if we reached the time limit for script execution. */ int lua_kill; /* Kill the script if true. */