mirror of
https://codeberg.org/redict/redict.git
synced 2025-01-22 16:18:28 -05:00
Scripting: single commands replication mode implemented.
By calling redis.replicate_commands(), the scripting engine of Redis switches to commands replication instead of replicating whole scripts. This is useful when the script execution is costly but only results in a few writes performed to the dataset. Morover, in this mode, it is possible to call functions with side effects freely, since the script execution does not need to be deterministic: anyway we'll capture the outcome from the point of view of changes to the dataset. In this mode math.random() returns different sequences at every call. If redis.replicate_commnads() is not called before any other write, the command returns false and sticks to whole scripts replication instead.
This commit is contained in:
parent
cdda6748c2
commit
fc38235664
@ -318,9 +318,9 @@ int luaRedisGenericCommand(lua_State *lua, int raise_error) {
|
|||||||
* command marked as non-deterministic was already called in the context
|
* command marked as non-deterministic was already called in the context
|
||||||
* of this script. */
|
* of this script. */
|
||||||
if (cmd->flags & CMD_WRITE) {
|
if (cmd->flags & CMD_WRITE) {
|
||||||
if (server.lua_random_dirty) {
|
if (server.lua_random_dirty && !server.lua_replicate_commands) {
|
||||||
luaPushError(lua,
|
luaPushError(lua,
|
||||||
"Write commands not allowed after non deterministic commands");
|
"Write commands not allowed after non deterministic commands. Call redis.replicate_commands() at the start of your script in order to switch to single commands replication mode.");
|
||||||
goto cleanup;
|
goto cleanup;
|
||||||
} else if (server.masterhost && server.repl_slave_ro &&
|
} else if (server.masterhost && server.repl_slave_ro &&
|
||||||
!server.loading &&
|
!server.loading &&
|
||||||
@ -370,8 +370,21 @@ int luaRedisGenericCommand(lua_State *lua, int raise_error) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* If we are using single commands replication, we need to wrap what
|
||||||
|
* we propagate into a MULTI/EXEC block, so that it will be atomic like
|
||||||
|
* a Lua script in the context of AOF and slaves. */
|
||||||
|
if (server.lua_replicate_commands &&
|
||||||
|
!server.lua_multi_emitted &&
|
||||||
|
server.lua_write_dirty)
|
||||||
|
{
|
||||||
|
execCommandPropagateMulti(server.lua_caller);
|
||||||
|
server.lua_multi_emitted = 1;
|
||||||
|
}
|
||||||
|
|
||||||
/* Run the command */
|
/* Run the command */
|
||||||
call(c,CMD_CALL_SLOWLOG | CMD_CALL_STATS);
|
int call_flags = CMD_CALL_SLOWLOG | CMD_CALL_STATS;
|
||||||
|
if (server.lua_replicate_commands) call_flags |= CMD_CALL_PROPAGATE;
|
||||||
|
call(c,call_flags);
|
||||||
|
|
||||||
/* Convert the result of the Redis command into a suitable Lua type.
|
/* Convert the result of the Redis command into a suitable Lua type.
|
||||||
* The first thing we need is to create a single string from the client
|
* The first thing we need is to create a single string from the client
|
||||||
@ -398,6 +411,7 @@ int luaRedisGenericCommand(lua_State *lua, int raise_error) {
|
|||||||
/* Sort the output array if needed, assuming it is a non-null multi bulk
|
/* Sort the output array if needed, assuming it is a non-null multi bulk
|
||||||
* reply as expected. */
|
* reply as expected. */
|
||||||
if ((cmd->flags & CMD_SORT_FOR_SCRIPT) &&
|
if ((cmd->flags & CMD_SORT_FOR_SCRIPT) &&
|
||||||
|
(server.lua_replicate_commands == 0) &&
|
||||||
(reply[0] == '*' && reply[1] != '-')) {
|
(reply[0] == '*' && reply[1] != '-')) {
|
||||||
luaSortArray(lua);
|
luaSortArray(lua);
|
||||||
}
|
}
|
||||||
@ -447,10 +461,12 @@ cleanup:
|
|||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* redis.call() */
|
||||||
int luaRedisCallCommand(lua_State *lua) {
|
int luaRedisCallCommand(lua_State *lua) {
|
||||||
return luaRedisGenericCommand(lua,1);
|
return luaRedisGenericCommand(lua,1);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* redis.pcall() */
|
||||||
int luaRedisPCallCommand(lua_State *lua) {
|
int luaRedisPCallCommand(lua_State *lua) {
|
||||||
return luaRedisGenericCommand(lua,0);
|
return luaRedisGenericCommand(lua,0);
|
||||||
}
|
}
|
||||||
@ -494,14 +510,37 @@ int luaRedisReturnSingleFieldTable(lua_State *lua, char *field) {
|
|||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* redis.error_reply() */
|
||||||
int luaRedisErrorReplyCommand(lua_State *lua) {
|
int luaRedisErrorReplyCommand(lua_State *lua) {
|
||||||
return luaRedisReturnSingleFieldTable(lua,"err");
|
return luaRedisReturnSingleFieldTable(lua,"err");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* redis.status_reply() */
|
||||||
int luaRedisStatusReplyCommand(lua_State *lua) {
|
int luaRedisStatusReplyCommand(lua_State *lua) {
|
||||||
return luaRedisReturnSingleFieldTable(lua,"ok");
|
return luaRedisReturnSingleFieldTable(lua,"ok");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* redis.replicate_commands()
|
||||||
|
*
|
||||||
|
* Turn on single commands replication if the script never called
|
||||||
|
* a write command so far, and returns true. Otherwise if the script
|
||||||
|
* already started to write, returns false and stick to whole scripts
|
||||||
|
* replication, which is our default. */
|
||||||
|
int luaRedisReplicateCommandsCommand(lua_State *lua) {
|
||||||
|
if (server.lua_write_dirty) {
|
||||||
|
lua_pushboolean(lua,0);
|
||||||
|
} else {
|
||||||
|
server.lua_replicate_commands = 1;
|
||||||
|
/* When we switch to single commands replication, we can provide
|
||||||
|
* different math.random() sequences at every call, which is what
|
||||||
|
* the user normally expects. */
|
||||||
|
redisSrand48(rand());
|
||||||
|
lua_pushboolean(lua,1);
|
||||||
|
}
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* redis.log() */
|
||||||
int luaLogCommand(lua_State *lua) {
|
int luaLogCommand(lua_State *lua) {
|
||||||
int j, argc = lua_gettop(lua);
|
int j, argc = lua_gettop(lua);
|
||||||
int level;
|
int level;
|
||||||
@ -697,6 +736,11 @@ void scriptingInit(void) {
|
|||||||
lua_pushcfunction(lua, luaRedisStatusReplyCommand);
|
lua_pushcfunction(lua, luaRedisStatusReplyCommand);
|
||||||
lua_settable(lua, -3);
|
lua_settable(lua, -3);
|
||||||
|
|
||||||
|
/* redis.replicate_commands */
|
||||||
|
lua_pushstring(lua, "replicate_commands");
|
||||||
|
lua_pushcfunction(lua, luaRedisReplicateCommandsCommand);
|
||||||
|
lua_settable(lua, -3);
|
||||||
|
|
||||||
/* Finally set the table as 'redis' global var. */
|
/* Finally set the table as 'redis' global var. */
|
||||||
lua_setglobal(lua,"redis");
|
lua_setglobal(lua,"redis");
|
||||||
|
|
||||||
@ -814,7 +858,8 @@ void luaReplyToRedisReply(client *c, lua_State *lua) {
|
|||||||
case LUA_TTABLE:
|
case LUA_TTABLE:
|
||||||
/* We need to check if it is an array, an error, or a status reply.
|
/* We need to check if it is an array, an error, or a status reply.
|
||||||
* Error are returned as a single element table with 'err' field.
|
* Error are returned as a single element table with 'err' field.
|
||||||
* Status replies are returned as single element table with 'ok' field */
|
* Status replies are returned as single element table with 'ok'
|
||||||
|
* field. */
|
||||||
lua_pushstring(lua,"err");
|
lua_pushstring(lua,"err");
|
||||||
lua_gettable(lua,-2);
|
lua_gettable(lua,-2);
|
||||||
t = lua_type(lua,-1);
|
t = lua_type(lua,-1);
|
||||||
@ -926,8 +971,8 @@ void evalGenericCommand(client *c, int evalsha) {
|
|||||||
long long numkeys;
|
long long numkeys;
|
||||||
int delhook = 0, err;
|
int delhook = 0, err;
|
||||||
|
|
||||||
/* We want the same PRNG sequence at every call so that our PRNG is
|
/* When we replicate whole scripts, we want the same PRNG sequence at
|
||||||
* not affected by external state. */
|
* every call so that our PRNG is not affected by external state. */
|
||||||
redisSrand48(0);
|
redisSrand48(0);
|
||||||
|
|
||||||
/* We set this flag to zero to remember that so far no random command
|
/* We set this flag to zero to remember that so far no random command
|
||||||
@ -940,6 +985,8 @@ void evalGenericCommand(client *c, int evalsha) {
|
|||||||
* is called after a random command was used. */
|
* is called after a random command was used. */
|
||||||
server.lua_random_dirty = 0;
|
server.lua_random_dirty = 0;
|
||||||
server.lua_write_dirty = 0;
|
server.lua_write_dirty = 0;
|
||||||
|
server.lua_replicate_commands = 0;
|
||||||
|
server.lua_multi_emitted = 0;
|
||||||
|
|
||||||
/* Get the number of arguments that are keys */
|
/* Get the number of arguments that are keys */
|
||||||
if (getLongLongFromObjectOrReply(c,c->argv[2],&numkeys,NULL) != C_OK)
|
if (getLongLongFromObjectOrReply(c,c->argv[2],&numkeys,NULL) != C_OK)
|
||||||
@ -1073,7 +1120,7 @@ void evalGenericCommand(client *c, int evalsha) {
|
|||||||
* For repliation, everytime a new slave attaches to the master, we need to
|
* For repliation, everytime a new slave attaches to the master, we need to
|
||||||
* flush our cache of scripts that can be replicated as EVALSHA, while
|
* flush our cache of scripts that can be replicated as EVALSHA, while
|
||||||
* for AOF we need to do so every time we rewrite the AOF file. */
|
* for AOF we need to do so every time we rewrite the AOF file. */
|
||||||
if (evalsha) {
|
if (evalsha && !server.lua_replicate_commands) {
|
||||||
if (!replicationScriptCacheExists(c->argv[1]->ptr)) {
|
if (!replicationScriptCacheExists(c->argv[1]->ptr)) {
|
||||||
/* This script is not in our script cache, replicate it as
|
/* This script is not in our script cache, replicate it as
|
||||||
* EVAL, then add it into the script cache, as from now on
|
* EVAL, then add it into the script cache, as from now on
|
||||||
@ -1088,6 +1135,19 @@ void evalGenericCommand(client *c, int evalsha) {
|
|||||||
forceCommandPropagation(c,PROPAGATE_REPL|PROPAGATE_AOF);
|
forceCommandPropagation(c,PROPAGATE_REPL|PROPAGATE_AOF);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* If we are using single commands replication, emit EXEC if there
|
||||||
|
* was at least a write. */
|
||||||
|
if (server.lua_replicate_commands) {
|
||||||
|
preventCommandPropagation(c);
|
||||||
|
if (server.lua_multi_emitted) {
|
||||||
|
robj *propargv[1];
|
||||||
|
propargv[0] = createStringObject("EXEC",4);
|
||||||
|
alsoPropagate(server.execCommand,c->db->id,propargv,1,
|
||||||
|
PROPAGATE_AOF|PROPAGATE_REPL);
|
||||||
|
decrRefCount(propargv[0]);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void evalCommand(client *c) {
|
void evalCommand(client *c) {
|
||||||
|
@ -1580,6 +1580,7 @@ void initServerConfig(void) {
|
|||||||
server.lpopCommand = lookupCommandByCString("lpop");
|
server.lpopCommand = lookupCommandByCString("lpop");
|
||||||
server.rpopCommand = lookupCommandByCString("rpop");
|
server.rpopCommand = lookupCommandByCString("rpop");
|
||||||
server.sremCommand = lookupCommandByCString("srem");
|
server.sremCommand = lookupCommandByCString("srem");
|
||||||
|
server.execCommand = lookupCommandByCString("exec");
|
||||||
|
|
||||||
/* Slow log */
|
/* Slow log */
|
||||||
server.slowlog_log_slower_than = CONFIG_DEFAULT_SLOWLOG_LOG_SLOWER_THAN;
|
server.slowlog_log_slower_than = CONFIG_DEFAULT_SLOWLOG_LOG_SLOWER_THAN;
|
||||||
|
@ -747,7 +747,7 @@ struct redisServer {
|
|||||||
off_t loading_process_events_interval_bytes;
|
off_t loading_process_events_interval_bytes;
|
||||||
/* Fast pointers to often looked up command */
|
/* Fast pointers to often looked up command */
|
||||||
struct redisCommand *delCommand, *multiCommand, *lpushCommand, *lpopCommand,
|
struct redisCommand *delCommand, *multiCommand, *lpushCommand, *lpopCommand,
|
||||||
*rpopCommand, *sremCommand;
|
*rpopCommand, *sremCommand, *execCommand;
|
||||||
/* Fields used only for stats */
|
/* Fields used only for stats */
|
||||||
time_t stat_starttime; /* Server start time */
|
time_t stat_starttime; /* Server start time */
|
||||||
long long stat_numcommands; /* Number of processed commands */
|
long long stat_numcommands; /* Number of processed commands */
|
||||||
@ -952,6 +952,8 @@ struct redisServer {
|
|||||||
execution of the current script. */
|
execution of the current script. */
|
||||||
int lua_random_dirty; /* True if a random command was called during the
|
int lua_random_dirty; /* True if a random command was called during the
|
||||||
execution of the current script. */
|
execution of the current script. */
|
||||||
|
int lua_replicate_commands; /* True if we are doing single commands repl. */
|
||||||
|
int lua_multi_emitted;/* True if we already proagated MULTI. */
|
||||||
int lua_timedout; /* True if we reached the time limit for script
|
int lua_timedout; /* True if we reached the time limit for script
|
||||||
execution. */
|
execution. */
|
||||||
int lua_kill; /* Kill the script if true. */
|
int lua_kill; /* Kill the script if true. */
|
||||||
@ -1173,6 +1175,7 @@ void touchWatchedKey(redisDb *db, robj *key);
|
|||||||
void touchWatchedKeysOnFlush(int dbid);
|
void touchWatchedKeysOnFlush(int dbid);
|
||||||
void discardTransaction(client *c);
|
void discardTransaction(client *c);
|
||||||
void flagTransaction(client *c);
|
void flagTransaction(client *c);
|
||||||
|
void execCommandPropagateMulti(client *c);
|
||||||
|
|
||||||
/* Redis object implementation */
|
/* Redis object implementation */
|
||||||
void decrRefCount(robj *o);
|
void decrRefCount(robj *o);
|
||||||
|
Loading…
Reference in New Issue
Block a user