Redis Functions - Introduce script unit.

Script unit is a new unit located on script.c.
Its purpose is to provides an API for functions (and eval)
to interact with Redis. Interaction includes mostly
executing commands, but also functionalities like calling
Redis back on long scripts or check if the script was killed.

The interaction is done using a scriptRunCtx object that
need to be created by the user and initialized using scriptPrepareForRun.

Detailed list of functionalities expose by the unit:
1. Calling commands (including all the validation checks such as
   acl, cluster, read only run, ...)
2. Set Resp
3. Set Replication method (AOF/REPLICATION/NONE)
4. Call Redis back to on long running scripts to allow Redis reply
   to clients and perform script kill

The commit introduce the new unit and uses it on eval commands to
interact with Redis.
This commit is contained in:
meir@redislabs.com 2021-10-05 19:37:03 +03:00 committed by meir
parent e0cd580aef
commit fc731bc67f
16 changed files with 698 additions and 340 deletions

View File

@ -309,7 +309,7 @@ endif
REDIS_SERVER_NAME=redis-server$(PROG_SUFFIX) REDIS_SERVER_NAME=redis-server$(PROG_SUFFIX)
REDIS_SENTINEL_NAME=redis-sentinel$(PROG_SUFFIX) REDIS_SENTINEL_NAME=redis-sentinel$(PROG_SUFFIX)
REDIS_SERVER_OBJ=adlist.o quicklist.o ae.o anet.o dict.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o crc16.o endianconv.o slowlog.o eval.o bio.o rio.o rand.o memtest.o crcspeed.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o redis-check-rdb.o redis-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o lolwut6.o acl.o tracking.o connection.o tls.o sha256.o timeout.o setcpuaffinity.o monotonic.o mt19937-64.o resp_parser.o call_reply.o script_lua.o REDIS_SERVER_OBJ=adlist.o quicklist.o ae.o anet.o dict.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o crc16.o endianconv.o slowlog.o eval.o bio.o rio.o rand.o memtest.o crcspeed.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o redis-check-rdb.o redis-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o lolwut6.o acl.o tracking.o connection.o tls.o sha256.o timeout.o setcpuaffinity.o monotonic.o mt19937-64.o resp_parser.o call_reply.o script_lua.o script.o
REDIS_CLI_NAME=redis-cli$(PROG_SUFFIX) REDIS_CLI_NAME=redis-cli$(PROG_SUFFIX)
REDIS_CLI_OBJ=anet.o adlist.o dict.o redis-cli.o zmalloc.o release.o ae.o redisassert.o crcspeed.o crc64.o siphash.o crc16.o monotonic.o cli_common.o mt19937-64.o REDIS_CLI_OBJ=anet.o adlist.o dict.o redis-cli.o zmalloc.o release.o ae.o redisassert.o crcspeed.o crc64.o siphash.o crc16.o monotonic.o cli_common.o mt19937-64.o
REDIS_BENCHMARK_NAME=redis-benchmark$(PROG_SUFFIX) REDIS_BENCHMARK_NAME=redis-benchmark$(PROG_SUFFIX)

View File

@ -1897,7 +1897,7 @@ void addACLLogEntry(client *c, int reason, int context, int argpos, sds username
} }
client *realclient = c; client *realclient = c;
if (realclient->flags & CLIENT_LUA) realclient = server.script_caller; if (realclient->flags & CLIENT_SCRIPT) realclient = server.script_caller;
le->cinfo = catClientInfoString(sdsempty(),realclient); le->cinfo = catClientInfoString(sdsempty(),realclient);
le->context = context; le->context = context;

View File

@ -31,6 +31,7 @@
#include "cluster.h" #include "cluster.h"
#include "atomicvar.h" #include "atomicvar.h"
#include "latency.h" #include "latency.h"
#include "script.h"
#include <signal.h> #include <signal.h>
#include <ctype.h> #include <ctype.h>
@ -88,7 +89,7 @@ robj *lookupKey(redisDb *db, robj *key, int flags) {
* commands is to make writable replicas behave consistently. It * commands is to make writable replicas behave consistently. It
* shall not be used in readonly commands. Modules are accepted so * shall not be used in readonly commands. Modules are accepted so
* that we don't break old modules. */ * that we don't break old modules. */
client *c = server.in_eval ? server.lua_client : server.current_client; client *c = server.in_script ? scriptGetClient() : server.current_client;
serverAssert(!c || !c->cmd || (c->cmd->flags & (CMD_WRITE|CMD_MODULE))); serverAssert(!c || !c->cmd || (c->cmd->flags & (CMD_WRITE|CMD_MODULE)));
} }
if (expireIfNeeded(db, key, force_delete_expired)) { if (expireIfNeeded(db, key, force_delete_expired)) {

View File

@ -51,7 +51,14 @@ void ldbLogRedisReply(char *reply);
sds ldbCatStackValue(sds s, lua_State *lua, int idx); sds ldbCatStackValue(sds s, lua_State *lua, int idx);
/* Lua context */ /* Lua context */
luaCtx lctx; struct luaCtx {
lua_State *lua; /* The Lua interpreter. We use just one for all clients */
client *lua_client; /* The "fake client" to query Redis from Lua */
char *lua_cur_script; /* SHA1 of the script currently running, or NULL */
dict *lua_scripts; /* A dictionary of SHA1 -> Lua scripts */
unsigned long long lua_scripts_mem; /* Cached scripts' memory + oh */
int lua_replicate_commands; /* True if we are doing single commands repl. */
} lctx;
/* Debugger shared state is stored inside this global structure. */ /* Debugger shared state is stored inside this global structure. */
#define LDB_BREAKPOINTS_MAX 64 /* Max number of breakpoints. */ #define LDB_BREAKPOINTS_MAX 64 /* Max number of breakpoints. */
@ -141,10 +148,12 @@ int luaRedisDebugCommand(lua_State *lua) {
* already started to write, returns false and stick to whole scripts * already started to write, returns false and stick to whole scripts
* replication, which is our default. */ * replication, which is our default. */
int luaRedisReplicateCommandsCommand(lua_State *lua) { int luaRedisReplicateCommandsCommand(lua_State *lua) {
if (lctx.lua_write_dirty) { scriptRunCtx* rctx = luaGetFromRegistry(lua, REGISTRY_RUN_CTX_NAME);
if (rctx->flags & SCRIPT_WRITE_DIRTY) {
lua_pushboolean(lua,0); lua_pushboolean(lua,0);
} else { } else {
lctx.lua_replicate_commands = 1; lctx.lua_replicate_commands = 1;
rctx->flags &= ~SCRIPT_EVAL_REPLICATION;
/* When we switch to single commands replication, we can provide /* When we switch to single commands replication, we can provide
* different math.random() sequences at every call, which is what * different math.random() sequences at every call, which is what
* the user normally expects. */ * the user normally expects. */
@ -171,7 +180,6 @@ void scriptingInit(int setup) {
lctx.lua_client = NULL; lctx.lua_client = NULL;
server.script_caller = NULL; server.script_caller = NULL;
lctx.lua_cur_script = NULL; lctx.lua_cur_script = NULL;
server.script_timedout = 0;
server.script_disable_deny_script = 0; server.script_disable_deny_script = 0;
ldbInit(); ldbInit();
} }
@ -182,7 +190,7 @@ void scriptingInit(int setup) {
lctx.lua_scripts = dictCreate(&shaScriptObjectDictType); lctx.lua_scripts = dictCreate(&shaScriptObjectDictType);
lctx.lua_scripts_mem = 0; lctx.lua_scripts_mem = 0;
luaEngineRegisterRedisAPI(lua); luaRegisterRedisAPI(lua);
/* register debug commands */ /* register debug commands */
lua_getglobal(lua,"redis"); lua_getglobal(lua,"redis");
@ -243,7 +251,7 @@ void scriptingInit(int setup) {
* by scriptingReset(). */ * by scriptingReset(). */
if (lctx.lua_client == NULL) { if (lctx.lua_client == NULL) {
lctx.lua_client = createClient(NULL); lctx.lua_client = createClient(NULL);
lctx.lua_client->flags |= CLIENT_LUA; lctx.lua_client->flags |= CLIENT_SCRIPT;
/* We do not want to allow blocking commands inside Lua */ /* We do not want to allow blocking commands inside Lua */
lctx.lua_client->flags |= CLIENT_DENY_BLOCKING; lctx.lua_client->flags |= CLIENT_DENY_BLOCKING;
@ -252,7 +260,7 @@ void scriptingInit(int setup) {
/* Lua beginners often don't use "local", this is likely to introduce /* Lua beginners often don't use "local", this is likely to introduce
* subtle bugs in their code. To prevent problems we protect accesses * subtle bugs in their code. To prevent problems we protect accesses
* to global variables. */ * to global variables. */
scriptingEnableGlobalsProtection(lua); luaEnableGlobalsProtection(lua);
lctx.lua = lua; lctx.lua = lua;
} }
@ -375,19 +383,7 @@ void evalGenericCommand(client *c, int evalsha) {
* every call so that our PRNG is not affected by external state. */ * every call so that our PRNG is not affected by external state. */
redisSrand48(0); redisSrand48(0);
/* We set this flag to zero to remember that so far no random command
* was called. This way we can allow the user to call commands like
* SRANDMEMBER or RANDOMKEY from Lua scripts as far as no write command
* is called (otherwise the replication and AOF would end with non
* deterministic sequences).
*
* Thanks to this flag we'll raise an error every time a write command
* is called after a random command was used. */
lctx.lua_random_dirty = 0;
lctx.lua_write_dirty = 0;
lctx.lua_replicate_commands = server.lua_always_replicate_commands; lctx.lua_replicate_commands = server.lua_always_replicate_commands;
lctx.lua_multi_emitted = 0;
lctx.lua_repl = PROPAGATE_AOF|PROPAGATE_REPL;
/* Get the number of arguments that are keys */ /* Get the number of arguments that are keys */
if (getLongLongFromObjectOrReply(c,c->argv[2],&numkeys,NULL) != C_OK) if (getLongLongFromObjectOrReply(c,c->argv[2],&numkeys,NULL) != C_OK)
@ -452,19 +448,24 @@ void evalGenericCommand(client *c, int evalsha) {
luaSetGlobalArray(lua,"KEYS",c->argv+3,numkeys); luaSetGlobalArray(lua,"KEYS",c->argv+3,numkeys);
luaSetGlobalArray(lua,"ARGV",c->argv+3+numkeys,c->argc-3-numkeys); luaSetGlobalArray(lua,"ARGV",c->argv+3+numkeys,c->argc-3-numkeys);
/* Set a hook in order to be able to stop the script execution if it
* is running for too much time.
* We set the hook only if the time limit is enabled as the hook will
* make the Lua script execution slower.
*
* If we are debugging, we set instead a "line" hook so that the
* debugger is call-back at every line executed by the script. */
server.in_script = 1;
server.script_caller = c;
lctx.lua_cur_script = funcname + 2; lctx.lua_cur_script = funcname + 2;
lctx.lua_time_start = getMonotonicUs();
lctx.lua_time_snapshot = mstime(); scriptRunCtx rctx;
lctx.lua_kill = 0; scriptPrepareForRun(&rctx, lctx.lua_client, c, lctx.lua_cur_script);
/* We must set it before we set the Lua hook, theoretically the
* Lua hook might be called wheneven we run any Lua instruction
* such as 'luaSetGlobalArray' and we want the rctx to be available
* each time the Lua hook is invoked. */
luaSaveOnRegistry(lua, REGISTRY_RUN_CTX_NAME, &rctx);
if (!lctx.lua_replicate_commands) rctx.flags |= SCRIPT_EVAL_REPLICATION;
/* This check is for EVAL_RO, EVALSHA_RO. We want to allow only read only commands */
if ((server.script_caller->cmd->proc == evalRoCommand ||
server.script_caller->cmd->proc == evalShaRoCommand)) {
rctx.flags |= SCRIPT_READ_ONLY;
}
if (server.script_time_limit > 0 && ldb.active == 0) { if (server.script_time_limit > 0 && ldb.active == 0) {
lua_sethook(lua,luaMaskCountHook,LUA_MASKCOUNT,100000); lua_sethook(lua,luaMaskCountHook,LUA_MASKCOUNT,100000);
delhook = 1; delhook = 1;
@ -473,29 +474,17 @@ void evalGenericCommand(client *c, int evalsha) {
delhook = 1; delhook = 1;
} }
prepareLuaClient();
/* At this point whether this script was never seen before or if it was /* At this point whether this script was never seen before or if it was
* already defined, we can call it. We have zero arguments and expect * already defined, we can call it. We have zero arguments and expect
* a single return value. */ * a single return value. */
err = lua_pcall(lua,0,1,-2); err = lua_pcall(lua,0,1,-2);
resetLuaClient(); scriptResetRun(&rctx);
/* Perform some cleanup that we need to do both on error and success. */ /* Perform some cleanup that we need to do both on error and success. */
if (delhook) lua_sethook(lua,NULL,0,0); /* Disable hook */ if (delhook) lua_sethook(lua,NULL,0,0); /* Disable hook */
if (server.script_timedout) {
server.script_timedout = 0;
blockingOperationEnds();
/* Restore the client that was protected when the script timeout
* was detected. */
unprotectClient(c);
if (server.masterhost && server.master)
queueClientForReprocessing(server.master);
}
server.in_script = 0;
server.script_caller = NULL;
lctx.lua_cur_script = NULL; lctx.lua_cur_script = NULL;
luaSaveOnRegistry(lua, REGISTRY_RUN_CTX_NAME, NULL);
/* Call the Lua garbage collector from time to time to avoid a /* Call the Lua garbage collector from time to time to avoid a
* full cycle performed by Lua, which adds too latency. * full cycle performed by Lua, which adds too latency.
@ -521,19 +510,10 @@ void evalGenericCommand(client *c, int evalsha) {
} else { } else {
/* On success convert the Lua return value into Redis protocol, and /* On success convert the Lua return value into Redis protocol, and
* send it to * the client. */ * send it to * the client. */
luaReplyToRedisReply(c,lua); /* Convert and consume the reply. */ luaReplyToRedisReply(c,rctx.c,lua); /* Convert and consume the reply. */
lua_pop(lua,1); /* Remove the error handler. */ lua_pop(lua,1); /* Remove the error handler. */
} }
/* If we are using single commands replication, emit EXEC if there
* was at least a write. */
if (lctx.lua_replicate_commands) {
preventCommandPropagation(c);
if (lctx.lua_multi_emitted) {
execCommandPropagateExec(c->db->id);
}
}
/* EVALSHA should be propagated to Slave and AOF file as full EVAL, unless /* EVALSHA should be propagated to Slave and AOF file as full EVAL, unless
* we are sure that the script was already in the context of all the * we are sure that the script was already in the context of all the
* attached slaves *and* the current AOF file if enabled. * attached slaves *and* the current AOF file if enabled.
@ -662,16 +642,7 @@ NULL
addReplyBulkCBuffer(c,sha,40); addReplyBulkCBuffer(c,sha,40);
forceCommandPropagation(c,PROPAGATE_REPL|PROPAGATE_AOF); forceCommandPropagation(c,PROPAGATE_REPL|PROPAGATE_AOF);
} else if (c->argc == 2 && !strcasecmp(c->argv[1]->ptr,"kill")) { } else if (c->argc == 2 && !strcasecmp(c->argv[1]->ptr,"kill")) {
if (server.script_caller == NULL) { scriptKill(c);
addReplyError(c,"-NOTBUSY No scripts in execution right now.");
} else if (server.script_caller->flags & CLIENT_MASTER) {
addReplyError(c,"-UNKILLABLE The busy script was sent by a master instance in the context of replication and cannot be killed.");
} else if (lctx.lua_write_dirty) {
addReplyError(c,"-UNKILLABLE Sorry the script already executed write commands against the dataset. You can either wait the script termination or kill the server in a hard way using the SHUTDOWN NOSAVE command.");
} else {
lctx.lua_kill = 1;
addReply(c,shared.ok);
}
} else if (c->argc == 3 && !strcasecmp(c->argv[1]->ptr,"debug")) { } else if (c->argc == 3 && !strcasecmp(c->argv[1]->ptr,"debug")) {
if (clientHasPendingReplies(c)) { if (clientHasPendingReplies(c)) {
addReplyError(c,"SCRIPT DEBUG must be called outside a pipeline"); addReplyError(c,"SCRIPT DEBUG must be called outside a pipeline");
@ -712,7 +683,7 @@ unsigned long evalScriptsMemory() {
/* Returns the time when the script invocation started */ /* Returns the time when the script invocation started */
mstime_t evalTimeSnapshot() { mstime_t evalTimeSnapshot() {
return lctx.lua_time_snapshot; return scriptTimeSnapshot();
} }
@ -1672,6 +1643,7 @@ ldbLog(sdsnew(" next line of code."));
/* This is the core of our Lua debugger, called each time Lua is about /* This is the core of our Lua debugger, called each time Lua is about
* to start executing a new line. */ * to start executing a new line. */
void luaLdbLineHook(lua_State *lua, lua_Debug *ar) { void luaLdbLineHook(lua_State *lua, lua_Debug *ar) {
scriptRunCtx* rctx = luaGetFromRegistry(lua, REGISTRY_RUN_CTX_NAME);
lua_getstack(lua,0,ar); lua_getstack(lua,0,ar);
lua_getinfo(lua,"Sl",ar); lua_getinfo(lua,"Sl",ar);
ldb.currentline = ar->currentline; ldb.currentline = ar->currentline;
@ -1684,7 +1656,7 @@ void luaLdbLineHook(lua_State *lua, lua_Debug *ar) {
/* Check if a timeout occurred. */ /* Check if a timeout occurred. */
if (ar->event == LUA_HOOKCOUNT && ldb.step == 0 && bp == 0) { if (ar->event == LUA_HOOKCOUNT && ldb.step == 0 && bp == 0) {
mstime_t elapsed = elapsedMs(server.script_time_limit); mstime_t elapsed = elapsedMs(rctx->start_time);
mstime_t timelimit = server.script_time_limit ? mstime_t timelimit = server.script_time_limit ?
server.script_time_limit : 5000; server.script_time_limit : 5000;
if (elapsed >= timelimit) { if (elapsed >= timelimit) {
@ -1714,7 +1686,7 @@ void luaLdbLineHook(lua_State *lua, lua_Debug *ar) {
lua_pushstring(lua, "timeout during Lua debugging with client closing connection"); lua_pushstring(lua, "timeout during Lua debugging with client closing connection");
lua_error(lua); lua_error(lua);
} }
lctx.lua_time_start = getMonotonicUs(); rctx->start_time = getMonotonicUs();
lctx.lua_time_snapshot = mstime(); rctx->snapshot_time = mstime();
} }
} }

View File

@ -33,6 +33,7 @@
#include "server.h" #include "server.h"
#include "bio.h" #include "bio.h"
#include "atomicvar.h" #include "atomicvar.h"
#include "script.h"
#include <math.h> #include <math.h>
/* ---------------------------------------------------------------------------- /* ----------------------------------------------------------------------------
@ -472,7 +473,7 @@ static int evictionTimeProc(
static int isSafeToPerformEvictions(void) { static int isSafeToPerformEvictions(void) {
/* - There must be no script in timeout condition. /* - There must be no script in timeout condition.
* - Nor we are loading data right now. */ * - Nor we are loading data right now. */
if (server.script_timedout || server.loading) return 0; if (scriptIsTimedout() || server.loading) return 0;
/* By default replicas should ignore maxmemory /* By default replicas should ignore maxmemory
* and just be masters exact copies. */ * and just be masters exact copies. */

View File

@ -30,6 +30,7 @@
#include "server.h" #include "server.h"
#include "atomicvar.h" #include "atomicvar.h"
#include "cluster.h" #include "cluster.h"
#include "script.h"
#include <sys/socket.h> #include <sys/socket.h>
#include <sys/uio.h> #include <sys/uio.h>
#include <math.h> #include <math.h>
@ -260,7 +261,7 @@ void clientInstallWriteHandler(client *c) {
int prepareClientToWrite(client *c) { int prepareClientToWrite(client *c) {
/* If it's the Lua client we always return ok without installing any /* If it's the Lua client we always return ok without installing any
* handler since there is no socket at all. */ * handler since there is no socket at all. */
if (c->flags & (CLIENT_LUA|CLIENT_MODULE)) return C_OK; if (c->flags & (CLIENT_SCRIPT|CLIENT_MODULE)) return C_OK;
/* If CLIENT_CLOSE_ASAP flag is set, we need not write anything. */ /* If CLIENT_CLOSE_ASAP flag is set, we need not write anything. */
if (c->flags & CLIENT_CLOSE_ASAP) return C_ERR; if (c->flags & CLIENT_CLOSE_ASAP) return C_ERR;
@ -1491,7 +1492,7 @@ void freeClientAsync(client *c) {
* may access the list while Redis uses I/O threads. All the other accesses * may access the list while Redis uses I/O threads. All the other accesses
* are in the context of the main thread while the other threads are * are in the context of the main thread while the other threads are
* idle. */ * idle. */
if (c->flags & CLIENT_CLOSE_ASAP || c->flags & CLIENT_LUA) return; if (c->flags & CLIENT_CLOSE_ASAP || c->flags & CLIENT_SCRIPT) return;
c->flags |= CLIENT_CLOSE_ASAP; c->flags |= CLIENT_CLOSE_ASAP;
if (server.io_threads_num == 1) { if (server.io_threads_num == 1) {
/* no need to bother with locking if there's just one thread (the main thread) */ /* no need to bother with locking if there's just one thread (the main thread) */
@ -2199,7 +2200,7 @@ int processInputBuffer(client *c) {
* condition on the slave. We want just to accumulate the replication * condition on the slave. We want just to accumulate the replication
* stream (instead of replying -BUSY like we do with other clients) and * stream (instead of replying -BUSY like we do with other clients) and
* later resume the processing. */ * later resume the processing. */
if (server.script_timedout && c->flags & CLIENT_MASTER) break; if (scriptIsTimedout() && c->flags & CLIENT_MASTER) break;
/* CLIENT_CLOSE_AFTER_REPLY closes the connection once the reply is /* CLIENT_CLOSE_AFTER_REPLY closes the connection once the reply is
* written to the client. Make sure to not let the reply grow after * written to the client. Make sure to not let the reply grow after

View File

@ -546,7 +546,7 @@ void replicationFeedMonitors(client *c, list *monitors, int dictid, robj **argv,
gettimeofday(&tv,NULL); gettimeofday(&tv,NULL);
cmdrepr = sdscatprintf(cmdrepr,"%ld.%06ld ",(long)tv.tv_sec,(long)tv.tv_usec); cmdrepr = sdscatprintf(cmdrepr,"%ld.%06ld ",(long)tv.tv_sec,(long)tv.tv_usec);
if (c->flags & CLIENT_LUA) { if (c->flags & CLIENT_SCRIPT) {
cmdrepr = sdscatprintf(cmdrepr,"[%d lua] ",dictid); cmdrepr = sdscatprintf(cmdrepr,"[%d lua] ",dictid);
} else if (c->flags & CLIENT_UNIX_SOCKET) { } else if (c->flags & CLIENT_UNIX_SOCKET) {
cmdrepr = sdscatprintf(cmdrepr,"[%d unix:%s] ",dictid,server.unixsocket); cmdrepr = sdscatprintf(cmdrepr,"[%d unix:%s] ",dictid,server.unixsocket);

432
src/script.c Normal file
View File

@ -0,0 +1,432 @@
/*
* Copyright (c) 2009-2021, Redis Ltd.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* * Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
* * Neither the name of Redis nor the names of its contributors may be used
* to endorse or promote products derived from this software without
* specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
#include "server.h"
#include "script.h"
#include "cluster.h"
/* On script invocation, holding the current run context */
static scriptRunCtx *curr_run_ctx = NULL;
static void exitScriptTimedoutMode(scriptRunCtx *run_ctx) {
serverAssert(run_ctx == curr_run_ctx);
serverAssert(scriptIsTimedout());
run_ctx->flags &= ~SCRIPT_TIMEDOUT;
blockingOperationEnds();
/* if we are a replica and we have an active master, set it for continue processing */
if (server.masterhost && server.master) queueClientForReprocessing(server.master);
}
static void enterScriptTimedoutMode(scriptRunCtx *run_ctx) {
serverAssert(run_ctx == curr_run_ctx);
serverAssert(!scriptIsTimedout());
/* Mark script as timedout */
run_ctx->flags |= SCRIPT_TIMEDOUT;
blockingOperationStarts();
}
int scriptIsTimedout() {
return scriptIsRunning() && (curr_run_ctx->flags & SCRIPT_TIMEDOUT);
}
client* scriptGetClient() {
serverAssert(scriptIsRunning());
return curr_run_ctx->c;
}
/* interrupt function for scripts, should be call
* from time to time to reply some special command (like ping)
* and also check if the run should be terminated. */
int scriptInterrupt(scriptRunCtx *run_ctx) {
if (run_ctx->flags & SCRIPT_TIMEDOUT) {
/* script already timedout
we just need to precess some events and return */
processEventsWhileBlocked();
return (run_ctx->flags & SCRIPT_KILLED) ? SCRIPT_KILL : SCRIPT_CONTINUE;
}
long long elapsed = elapsedMs(run_ctx->start_time);
if (elapsed < server.script_time_limit) {
return SCRIPT_CONTINUE;
}
serverLog(LL_WARNING,
"Slow script detected: still in execution after %lld milliseconds. "
"You can try killing the script using the SCRIPT KILL command.",
elapsed);
enterScriptTimedoutMode(run_ctx);
/* Once the script timeouts we reenter the event loop to permit others
* some commands execution. For this reason
* we need to mask the client executing the script from the event loop.
* If we don't do that the client may disconnect and could no longer be
* here when the EVAL command will return. */
protectClient(run_ctx->original_client);
processEventsWhileBlocked();
return (run_ctx->flags & SCRIPT_KILLED) ? SCRIPT_KILL : SCRIPT_CONTINUE;
}
/* Prepare the given run ctx for execution */
void scriptPrepareForRun(scriptRunCtx *run_ctx, client *engine_client, client *caller, const char *funcname) {
serverAssert(!curr_run_ctx);
/* set the curr_run_ctx so we can use it to kill the script if needed */
curr_run_ctx = run_ctx;
run_ctx->c = engine_client;
run_ctx->original_client = caller;
run_ctx->funcname = funcname;
client *script_client = run_ctx->c;
client *curr_client = run_ctx->original_client;
server.script_caller = curr_client;
/* Select the right DB in the context of the Lua client */
selectDb(script_client, curr_client->db->id);
script_client->resp = 2; /* Default is RESP2, scripts can change it. */
/* If we are in MULTI context, flag Lua client as CLIENT_MULTI. */
if (curr_client->flags & CLIENT_MULTI) {
script_client->flags |= CLIENT_MULTI;
}
server.in_script = 1;
run_ctx->start_time = getMonotonicUs();
run_ctx->snapshot_time = mstime();
run_ctx->flags = 0;
run_ctx->repl_flags = PROPAGATE_AOF | PROPAGATE_REPL;
}
/* Reset the given run ctx after execution */
void scriptResetRun(scriptRunCtx *run_ctx) {
serverAssert(curr_run_ctx);
/* After the script done, remove the MULTI state. */
run_ctx->c->flags &= ~CLIENT_MULTI;
server.in_script = 0;
server.script_caller = NULL;
if (scriptIsTimedout()) {
exitScriptTimedoutMode(run_ctx);
/* Restore the client that was protected when the script timeout
* was detected. */
unprotectClient(run_ctx->original_client);
}
if (!(run_ctx->flags & SCRIPT_EVAL_REPLICATION)) {
preventCommandPropagation(run_ctx->original_client);
if (run_ctx->flags & SCRIPT_MULTI_EMMITED) {
execCommandPropagateExec(run_ctx->original_client->db->id);
}
}
/* unset curr_run_ctx so we will know there is no running script */
curr_run_ctx = NULL;
}
/* return true if a script is currently running */
int scriptIsRunning() {
return curr_run_ctx != NULL;
}
/* Kill the current running script */
void scriptKill(client *c) {
if (!curr_run_ctx) {
addReplyError(c, "-NOTBUSY No scripts in execution right now.");
return;
}
if (curr_run_ctx->original_client->flags & CLIENT_MASTER) {
addReplyError(c,
"-UNKILLABLE The busy script was sent by a master instance in the context of replication and cannot be killed.");
}
if (curr_run_ctx->flags & SCRIPT_WRITE_DIRTY) {
addReplyError(c,
"-UNKILLABLE Sorry the script already executed write "
"commands against the dataset. You can either wait the "
"script termination or kill the server in a hard way "
"using the SHUTDOWN NOSAVE command.");
return;
}
curr_run_ctx->flags |= SCRIPT_KILLED;
addReply(c, shared.ok);
}
static int scriptVerifyCommandArity(struct redisCommand *cmd, int argc, sds *err) {
if (!cmd || ((cmd->arity > 0 && cmd->arity != argc) || (argc < cmd->arity))) {
if (cmd)
*err = sdsnew("Wrong number of args calling Redis command from script");
else
*err = sdsnew("Unknown Redis command called from script");
return C_ERR;
}
return C_OK;
}
static int scriptVerifyACL(client *c, sds *err) {
/* Check the ACLs. */
int acl_errpos;
int acl_retval = ACLCheckAllPerm(c, &acl_errpos);
if (acl_retval != ACL_OK) {
addACLLogEntry(c,acl_retval,ACL_LOG_CTX_LUA,acl_errpos,NULL,NULL);
switch (acl_retval) {
case ACL_DENIED_CMD:
*err = sdsnew("The user executing the script can't run this "
"command or subcommand");
break;
case ACL_DENIED_KEY:
*err = sdsnew("The user executing the script can't access "
"at least one of the keys mentioned in the "
"command arguments");
break;
case ACL_DENIED_CHANNEL:
*err = sdsnew("The user executing the script can't publish "
"to the channel mentioned in the command");
break;
default:
*err = sdsnew("The user executing the script is lacking the "
"permissions for the command");
break;
}
return C_ERR;
}
return C_OK;
}
static int scriptVerifyWriteCommandAllow(scriptRunCtx *run_ctx, char **err) {
if (!(run_ctx->c->cmd->flags & CMD_WRITE)) {
return C_OK;
}
if (run_ctx->flags & SCRIPT_READ_ONLY) {
/* We know its a write command, on a read only run we do not allow it. */
*err = sdsnew("Write commands are not allowed from read-only scripts.");
return C_ERR;
}
if ((run_ctx->flags & SCRIPT_RANDOM_DIRTY) && (run_ctx->flags & SCRIPT_EVAL_REPLICATION)) {
*err = sdsnew("Write commands not allowed after non deterministic commands. Call redis.replicate_commands() at the start of your script in order to switch to single commands replication mode.");
return C_ERR;
}
/* Write commands are forbidden against read-only slaves, or if a
* command marked as non-deterministic was already called in the context
* of this script. */
int deny_write_type = writeCommandsDeniedByDiskError();
if (server.masterhost && server.repl_slave_ro && run_ctx->original_client->flags != CLIENT_ID_AOF
&& !(run_ctx->original_client->flags & CLIENT_MASTER))
{
*err = sdsdup(shared.roslaveerr->ptr);
return C_ERR;
}
if (deny_write_type != DISK_ERROR_TYPE_NONE) {
if (deny_write_type == DISK_ERROR_TYPE_RDB) {
*err = sdsdup(shared.bgsaveerr->ptr);
} else {
*err = sdsempty();
*err = sdscatfmt(*err,
"MISCONF Errors writing to the AOF file: %s\r\n",
strerror(server.aof_last_write_errno));
}
return C_ERR;
}
return C_OK;
}
static int scriptVerifyOOM(scriptRunCtx *run_ctx, char **err) {
/* If we reached the memory limit configured via maxmemory, commands that
* could enlarge the memory usage are not allowed, but only if this is the
* first write in the context of this script, otherwise we can't stop
* in the middle. */
if (server.maxmemory && /* Maxmemory is actually enabled. */
run_ctx->original_client->id != CLIENT_ID_AOF && /* Don't care about mem if loading from AOF. */
!server.masterhost && /* Slave must execute the script. */
!(run_ctx->flags & SCRIPT_WRITE_DIRTY) && /* Script had no side effects so far. */
server.script_oom && /* Detected OOM when script start. */
(run_ctx->c->cmd->flags & CMD_DENYOOM))
{
*err = sdsdup(shared.oomerr->ptr);
return C_ERR;
}
return C_OK;
}
static int scriptVerifyClusterState(client *c, client *original_c, sds *err) {
if (!server.cluster_enabled || original_c->id == CLIENT_ID_AOF || (original_c->flags & CLIENT_MASTER)) {
return C_OK;
}
/* If this is a Redis Cluster node, we need to make sure the script is not
* trying to access non-local keys, with the exception of commands
* received from our master or when loading the AOF back in memory. */
int error_code;
/* Duplicate relevant flags in the script client. */
c->flags &= ~(CLIENT_READONLY | CLIENT_ASKING);
c->flags |= original_c->flags & (CLIENT_READONLY | CLIENT_ASKING);
if (getNodeByQuery(c, c->cmd, c->argv, c->argc, NULL, &error_code) != server.cluster->myself) {
if (error_code == CLUSTER_REDIR_DOWN_RO_STATE) {
*err = sdsnew(
"Script attempted to execute a write command while the "
"cluster is down and readonly");
} else if (error_code == CLUSTER_REDIR_DOWN_STATE) {
*err = sdsnew("Script attempted to execute a command while the "
"cluster is down");
} else {
*err = sdsnew("Script attempted to access a non local key in a "
"cluster node");
}
return C_ERR;
}
return C_OK;
}
static void scriptEmitMultiIfNeeded(scriptRunCtx *run_ctx) {
/* If we are using single commands replication, we need to wrap what
* we propagate into a MULTI/EXEC block, so that it will be atomic like
* a Lua script in the context of AOF and slaves. */
client *c = run_ctx->c;
if (!(run_ctx->flags & SCRIPT_EVAL_REPLICATION)
&& !(run_ctx->flags & SCRIPT_MULTI_EMMITED)
&& !(run_ctx->original_client->flags & CLIENT_MULTI)
&& (run_ctx->flags & SCRIPT_WRITE_DIRTY)
&& ((run_ctx->repl_flags & PROPAGATE_AOF)
|| (run_ctx->repl_flags & PROPAGATE_REPL)))
{
execCommandPropagateMulti(run_ctx->original_client->db->id);
run_ctx->flags |= SCRIPT_MULTI_EMMITED;
/* Now we are in the MULTI context, the lua_client should be
* flag as CLIENT_MULTI. */
c->flags |= CLIENT_MULTI;
}
}
/* set RESP for a given run_ctx */
int scriptSetResp(scriptRunCtx *run_ctx, int resp) {
if (resp != 2 && resp != 3) {
return C_ERR;
}
run_ctx->c->resp = resp;
return C_OK;
}
/* set Repl for a given run_ctx
* either: PROPAGATE_AOF | PROPAGATE_REPL*/
int scriptSetRepl(scriptRunCtx *run_ctx, int repl) {
if ((repl & ~(PROPAGATE_AOF | PROPAGATE_REPL)) != 0) {
return C_ERR;
}
run_ctx->repl_flags = repl;
return C_OK;
}
/* Call a Redis command.
* The reply is written to the run_ctx client and it is
* up to the engine to take and parse.
* The err out variable is set only if error occurs and describe the error.
* If err is set on reply is written to the run_ctx client. */
void scriptCall(scriptRunCtx *run_ctx, robj* *argv, int argc, sds *err) {
client *c = run_ctx->c;
/* Setup our fake client for command execution */
c->argv = argv;
c->argc = argc;
c->user = run_ctx->original_client->user;
/* Process module hooks */
moduleCallCommandFilters(c);
argv = c->argv;
argc = c->argc;
struct redisCommand *cmd = lookupCommand(argv, argc);
if (scriptVerifyCommandArity(cmd, argc, err) != C_OK) {
return;
}
c->cmd = c->lastcmd = cmd;
/* There are commands that are not allowed inside scripts. */
if (!server.script_disable_deny_script && (cmd->flags & CMD_NOSCRIPT)) {
*err = sdsnew("This Redis command is not allowed from script");
return;
}
if (scriptVerifyACL(c, err) != C_OK) {
return;
}
if (scriptVerifyWriteCommandAllow(run_ctx, err) != C_OK) {
return;
}
if (scriptVerifyOOM(run_ctx, err) != C_OK) {
return;
}
if (cmd->flags & CMD_WRITE) {
/* signify that we already change the data in this execution */
run_ctx->flags |= SCRIPT_WRITE_DIRTY;
}
if (cmd->flags & CMD_RANDOM) {
/* signify that we already perform a random command in this execution */
run_ctx->flags |= SCRIPT_RANDOM_DIRTY;
}
if (scriptVerifyClusterState(c, run_ctx->original_client, err) != C_OK) {
return;
}
scriptEmitMultiIfNeeded(run_ctx);
int call_flags = CMD_CALL_SLOWLOG | CMD_CALL_STATS;
if (!(run_ctx->flags & SCRIPT_EVAL_REPLICATION)) {
if (run_ctx->repl_flags & PROPAGATE_AOF) {
call_flags |= CMD_CALL_PROPAGATE_AOF;
}
if (run_ctx->repl_flags & PROPAGATE_REPL) {
call_flags |= CMD_CALL_PROPAGATE_REPL;
}
}
call(c, call_flags);
serverAssert((c->flags & CLIENT_BLOCKED) == 0);
}
/* Returns the time when the script invocation started */
mstime_t scriptTimeSnapshot() {
serverAssert(!curr_run_ctx);
return curr_run_ctx->snapshot_time;
}

96
src/script.h Normal file
View File

@ -0,0 +1,96 @@
/*
* Copyright (c) 2009-2021, Redis Ltd.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* * Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
* * Neither the name of Redis nor the names of its contributors may be used
* to endorse or promote products derived from this software without
* specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
#ifndef __SCRIPT_H_
#define __SCRIPT_H_
/*
* Script.c unit provides an API for functions and eval
* to interact with Redis. Interaction includes mostly
* executing commands, but also functionalities like calling
* Redis back on long scripts or check if the script was killed.
*
* The interaction is done using a scriptRunCtx object that
* need to be created by the user and initialized using scriptPrepareForRun.
*
* Detailed list of functionalities expose by the unit:
* 1. Calling commands (including all the validation checks such as
* acl, cluster, read only run, ...)
* 2. Set Resp
* 3. Set Replication method (AOF/REPLICATION/NONE)
* 4. Call Redis back to on long running scripts to allow Redis reply
* to clients and perform script kill
*/
/*
* scriptInterrupt function will return one of those value,
*
* - SCRIPT_KILL - kill the current running script.
* - SCRIPT_CONTINUE - keep running the current script.
*/
#define SCRIPT_KILL 1
#define SCRIPT_CONTINUE 2
/* runCtx flags */
#define SCRIPT_WRITE_DIRTY (1ULL<<0) /* indicate that the current script already performed a write command */
#define SCRIPT_RANDOM_DIRTY (1ULL<<1) /* indicate that the current script already performed a random reply command.
Thanks to this flag we'll raise an error every time a write command
is called after a random command and prevent none deterministic
replication or AOF. */
#define SCRIPT_MULTI_EMMITED (1ULL<<2) /* indicate that we already wrote a multi command to replication/aof */
#define SCRIPT_TIMEDOUT (1ULL<<3) /* indicate that the current script timedout */
#define SCRIPT_KILLED (1ULL<<4) /* indicate that the current script was marked to be killed */
#define SCRIPT_READ_ONLY (1ULL<<5) /* indicate that the current script should only perform read commands */
#define SCRIPT_EVAL_REPLICATION (1ULL<<6) /* mode for eval, indicate that we replicate the
script invocation and not the effects */
typedef struct scriptRunCtx scriptRunCtx;
struct scriptRunCtx {
const char *funcname;
client *c;
client *original_client;
int flags;
int repl_flags;
monotime start_time;
mstime_t snapshot_time;
};
void scriptPrepareForRun(scriptRunCtx *r_ctx, client *engine_client, client *caller, const char *funcname);
void scriptResetRun(scriptRunCtx *r_ctx);
int scriptSetResp(scriptRunCtx *r_ctx, int resp);
int scriptSetRepl(scriptRunCtx *r_ctx, int repl);
void scriptCall(scriptRunCtx *r_ctx, robj **argv, int argc, sds *err);
int scriptInterrupt(scriptRunCtx *r_ctx);
void scriptKill(client *c);
int scriptIsRunning();
int scriptIsTimedout();
client* scriptGetClient();
mstime_t scriptTimeSnapshot();
#endif /* __SCRIPT_H_ */

View File

@ -1,5 +1,5 @@
/* /*
* Copyright (c) 2009-2021, Redis Labs Ltd. * Copyright (c) 2009-2021, Redis Ltd.
* All rights reserved. * All rights reserved.
* *
* Redistribution and use in source and binary forms, with or without * Redistribution and use in source and binary forms, with or without
@ -39,10 +39,9 @@
#include <lualib.h> #include <lualib.h>
#include <ctype.h> #include <ctype.h>
#include <math.h> #include <math.h>
#include "functions.h"
int redis_math_random (lua_State *L); static int redis_math_random (lua_State *L);
int redis_math_randomseed (lua_State *L); static int redis_math_randomseed (lua_State *L);
static void redisProtocolToLuaType_Int(void *ctx, long long val, const char *proto, size_t proto_len); static void redisProtocolToLuaType_Int(void *ctx, long long val, const char *proto, size_t proto_len);
static void redisProtocolToLuaType_BulkString(void *ctx, const char *str, size_t len, const char *proto, size_t proto_len); static void redisProtocolToLuaType_BulkString(void *ctx, const char *str, size_t len, const char *proto, size_t proto_len);
static void redisProtocolToLuaType_NullBulkString(void *ctx, const char *proto, size_t proto_len); static void redisProtocolToLuaType_NullBulkString(void *ctx, const char *proto, size_t proto_len);
@ -59,6 +58,39 @@ static void redisProtocolToLuaType_BigNumber(void *ctx, const char *str, size_t
static void redisProtocolToLuaType_VerbatimString(void *ctx, const char *format, const char *str, size_t len, const char *proto, size_t proto_len); static void redisProtocolToLuaType_VerbatimString(void *ctx, const char *format, const char *str, size_t len, const char *proto, size_t proto_len);
static void redisProtocolToLuaType_Attribute(struct ReplyParser *parser, void *ctx, size_t len, const char *proto); static void redisProtocolToLuaType_Attribute(struct ReplyParser *parser, void *ctx, size_t len, const char *proto);
/*
* Save the give pointer on Lua registry, used to save the Lua context and
* function context so we can retrieve them from lua_State.
*/
void luaSaveOnRegistry(lua_State* lua, const char* name, void* ptr) {
lua_pushstring(lua, name);
if (ptr) {
lua_pushlightuserdata(lua, ptr);
} else {
lua_pushnil(lua);
}
lua_settable(lua, LUA_REGISTRYINDEX);
}
/*
* Get a saved pointer from registry
*/
void* luaGetFromRegistry(lua_State* lua, const char* name) {
lua_pushstring(lua, name);
lua_gettable(lua, LUA_REGISTRYINDEX);
/* must be light user data */
serverAssert(lua_islightuserdata(lua, -1));
void* ptr = (void*) lua_topointer(lua, -1);
serverAssert(ptr);
/* pops the value */
lua_pop(lua, 1);
return ptr;
}
/* --------------------------------------------------------------------------- /* ---------------------------------------------------------------------------
* Redis reply to Lua type conversion functions. * Redis reply to Lua type conversion functions.
* ------------------------------------------------------------------------- */ * ------------------------------------------------------------------------- */
@ -99,7 +131,7 @@ static const ReplyParserCallbacks DefaultLuaTypeParserCallbacks = {
.error = NULL, .error = NULL,
}; };
void redisProtocolToLuaType(lua_State *lua, char* reply) { static void redisProtocolToLuaType(lua_State *lua, char* reply) {
ReplyParser parser = {.curr_location = reply, .callbacks = DefaultLuaTypeParserCallbacks}; ReplyParser parser = {.curr_location = reply, .callbacks = DefaultLuaTypeParserCallbacks};
parseReply(&parser, lua); parseReply(&parser, lua);
@ -394,7 +426,7 @@ static void redisProtocolToLuaType_Double(void *ctx, double d, const char *proto
* with a single "err" field set to the error string. Note that this * with a single "err" field set to the error string. Note that this
* table is never a valid reply by proper commands, since the returned * table is never a valid reply by proper commands, since the returned
* tables are otherwise always indexed by integers, never by strings. */ * tables are otherwise always indexed by integers, never by strings. */
void luaPushError(lua_State *lua, char *error) { static void luaPushError(lua_State *lua, char *error) {
lua_Debug dbg; lua_Debug dbg;
/* If debugging is active and in step mode, log errors resulting from /* If debugging is active and in step mode, log errors resulting from
@ -422,7 +454,7 @@ void luaPushError(lua_State *lua, char *error) {
* by the non-error-trapping version of redis.pcall(), which is redis.call(), * by the non-error-trapping version of redis.pcall(), which is redis.call(),
* this function will raise the Lua error so that the execution of the * this function will raise the Lua error so that the execution of the
* script will be halted. */ * script will be halted. */
int luaRaiseError(lua_State *lua) { static int luaRaiseError(lua_State *lua) {
lua_pushstring(lua,"err"); lua_pushstring(lua,"err");
lua_gettable(lua,-2); lua_gettable(lua,-2);
return lua_error(lua); return lua_error(lua);
@ -434,7 +466,7 @@ int luaRaiseError(lua_State *lua) {
* *
* The array is sorted using table.sort itself, and assuming all the * The array is sorted using table.sort itself, and assuming all the
* list elements are strings. */ * list elements are strings. */
void luaSortArray(lua_State *lua) { static void luaSortArray(lua_State *lua) {
/* Initial Stack: array */ /* Initial Stack: array */
lua_getglobal(lua,"table"); lua_getglobal(lua,"table");
lua_pushstring(lua,"sort"); lua_pushstring(lua,"sort");
@ -465,7 +497,7 @@ void luaSortArray(lua_State *lua) {
/* Reply to client 'c' converting the top element in the Lua stack to a /* Reply to client 'c' converting the top element in the Lua stack to a
* Redis reply. As a side effect the element is consumed from the stack. */ * Redis reply. As a side effect the element is consumed from the stack. */
void luaReplyToRedisReply(client *c, lua_State *lua) { void luaReplyToRedisReply(client *c, client* script_client, lua_State *lua) {
int t = lua_type(lua,-1); int t = lua_type(lua,-1);
if (!lua_checkstack(lua, 4)) { if (!lua_checkstack(lua, 4)) {
@ -483,7 +515,7 @@ void luaReplyToRedisReply(client *c, lua_State *lua) {
addReplyBulkCBuffer(c,(char*)lua_tostring(lua,-1),lua_strlen(lua,-1)); addReplyBulkCBuffer(c,(char*)lua_tostring(lua,-1),lua_strlen(lua,-1));
break; break;
case LUA_TBOOLEAN: case LUA_TBOOLEAN:
if (lctx.lua_client->resp == 2) if (script_client->resp == 2)
addReply(c,lua_toboolean(lua,-1) ? shared.cone : addReply(c,lua_toboolean(lua,-1) ? shared.cone :
shared.null[c->resp]); shared.null[c->resp]);
else else
@ -587,8 +619,8 @@ void luaReplyToRedisReply(client *c, lua_State *lua) {
while (lua_next(lua,-2)) { while (lua_next(lua,-2)) {
/* Stack now: table, key, value */ /* Stack now: table, key, value */
lua_pushvalue(lua,-2); /* Dup key before consuming. */ lua_pushvalue(lua,-2); /* Dup key before consuming. */
luaReplyToRedisReply(c, lua); /* Return key. */ luaReplyToRedisReply(c, script_client, lua); /* Return key. */
luaReplyToRedisReply(c, lua); /* Return value. */ luaReplyToRedisReply(c, script_client, lua); /* Return value. */
/* Stack now: table, key. */ /* Stack now: table, key. */
maplen++; maplen++;
} }
@ -611,7 +643,7 @@ void luaReplyToRedisReply(client *c, lua_State *lua) {
/* Stack now: table, key, true */ /* Stack now: table, key, true */
lua_pop(lua,1); /* Discard the boolean value. */ lua_pop(lua,1); /* Discard the boolean value. */
lua_pushvalue(lua,-1); /* Dup key before consuming. */ lua_pushvalue(lua,-1); /* Dup key before consuming. */
luaReplyToRedisReply(c, lua); /* Return key. */ luaReplyToRedisReply(c, script_client, lua); /* Return key. */
/* Stack now: table, key. */ /* Stack now: table, key. */
setlen++; setlen++;
} }
@ -633,7 +665,7 @@ void luaReplyToRedisReply(client *c, lua_State *lua) {
lua_pop(lua,1); lua_pop(lua,1);
break; break;
} }
luaReplyToRedisReply(c, lua); luaReplyToRedisReply(c, script_client, lua);
mbulklen++; mbulklen++;
} }
setDeferredArrayLen(c,replylen,mbulklen); setDeferredArrayLen(c,replylen,mbulklen);
@ -650,10 +682,11 @@ void luaReplyToRedisReply(client *c, lua_State *lua) {
#define LUA_CMD_OBJCACHE_SIZE 32 #define LUA_CMD_OBJCACHE_SIZE 32
#define LUA_CMD_OBJCACHE_MAX_LEN 64 #define LUA_CMD_OBJCACHE_MAX_LEN 64
int luaRedisGenericCommand(lua_State *lua, int raise_error) { static int luaRedisGenericCommand(lua_State *lua, int raise_error) {
int j, argc = lua_gettop(lua); int j, argc = lua_gettop(lua);
struct redisCommand *cmd; scriptRunCtx* rctx = luaGetFromRegistry(lua, REGISTRY_RUN_CTX_NAME);
client *c = lctx.lua_client; sds err = NULL;
client* c = rctx->c;
sds reply; sds reply;
/* Cached across calls. */ /* Cached across calls. */
@ -741,16 +774,6 @@ int luaRedisGenericCommand(lua_State *lua, int raise_error) {
* and this way we guaranty we will have room on the stack for the result. */ * and this way we guaranty we will have room on the stack for the result. */
lua_pop(lua, argc); lua_pop(lua, argc);
/* Setup our fake client for command execution */
c->argv = argv;
c->argc = argc;
c->user = server.script_caller->user;
/* Process module hooks */
moduleCallCommandFilters(c);
argv = c->argv;
argc = c->argc;
/* Log the command if debugging is active. */ /* Log the command if debugging is active. */
if (ldbIsEnabled()) { if (ldbIsEnabled()) {
sds cmdlog = sdsnew("<redis>"); sds cmdlog = sdsnew("<redis>");
@ -767,167 +790,13 @@ int luaRedisGenericCommand(lua_State *lua, int raise_error) {
ldbLog(cmdlog); ldbLog(cmdlog);
} }
/* Command lookup */
cmd = lookupCommand(argv[0]->ptr); scriptCall(rctx, argv, argc, &err);
if (!cmd || ((cmd->arity > 0 && cmd->arity != argc) || if (err) {
(argc < -cmd->arity))) luaPushError(lua, err);
{ sdsfree(err);
if (cmd)
luaPushError(lua,
"Wrong number of args calling Redis command From Lua script");
else
luaPushError(lua,"Unknown Redis command called from Lua script");
goto cleanup; goto cleanup;
} }
c->cmd = c->lastcmd = cmd;
/* There are commands that are not allowed inside scripts. */
if (!server.script_disable_deny_script && (cmd->flags & CMD_NOSCRIPT)) {
luaPushError(lua, "This Redis command is not allowed from scripts");
goto cleanup;
}
/* This check is for EVAL_RO, EVALSHA_RO. We want to allow only read only commands */
if ((server.script_caller->cmd->proc == evalRoCommand ||
server.script_caller->cmd->proc == evalShaRoCommand) &&
(cmd->flags & CMD_WRITE))
{
luaPushError(lua, "Write commands are not allowed from read-only scripts");
goto cleanup;
}
/* Check the ACLs. */
int acl_errpos;
int acl_retval = ACLCheckAllPerm(c,&acl_errpos);
if (acl_retval != ACL_OK) {
addACLLogEntry(c,acl_retval,ACL_LOG_CTX_LUA,acl_errpos,NULL,NULL);
switch (acl_retval) {
case ACL_DENIED_CMD:
luaPushError(lua, "The user executing the script can't run this "
"command or subcommand");
break;
case ACL_DENIED_KEY:
luaPushError(lua, "The user executing the script can't access "
"at least one of the keys mentioned in the "
"command arguments");
break;
case ACL_DENIED_CHANNEL:
luaPushError(lua, "The user executing the script can't publish "
"to the channel mentioned in the command");
break;
default:
luaPushError(lua, "The user executing the script is lacking the "
"permissions for the command");
break;
}
goto cleanup;
}
/* Write commands are forbidden against read-only slaves, or if a
* command marked as non-deterministic was already called in the context
* of this script. */
if (cmd->flags & CMD_WRITE) {
int deny_write_type = writeCommandsDeniedByDiskError();
if (lctx.lua_random_dirty && !lctx.lua_replicate_commands) {
luaPushError(lua,
"Write commands not allowed after non deterministic commands. Call redis.replicate_commands() at the start of your script in order to switch to single commands replication mode.");
goto cleanup;
} else if (server.masterhost && server.repl_slave_ro &&
server.script_caller->id != CLIENT_ID_AOF &&
!(server.script_caller->flags & CLIENT_MASTER))
{
luaPushError(lua, shared.roslaveerr->ptr);
goto cleanup;
} else if (deny_write_type != DISK_ERROR_TYPE_NONE) {
if (deny_write_type == DISK_ERROR_TYPE_RDB) {
luaPushError(lua, shared.bgsaveerr->ptr);
} else {
sds aof_write_err = sdscatfmt(sdsempty(),
"-MISCONF Errors writing to the AOF file: %s\r\n",
strerror(server.aof_last_write_errno));
luaPushError(lua, aof_write_err);
sdsfree(aof_write_err);
}
goto cleanup;
}
}
/* If we reached the memory limit configured via maxmemory, commands that
* could enlarge the memory usage are not allowed, but only if this is the
* first write in the context of this script, otherwise we can't stop
* in the middle. */
if (server.maxmemory && /* Maxmemory is actually enabled. */
server.script_caller->id != CLIENT_ID_AOF && /* Don't care about mem if loading from AOF. */
!server.masterhost && /* Slave must execute the script. */
lctx.lua_write_dirty == 0 && /* Script had no side effects so far. */
server.script_oom && /* Detected OOM when script start. */
(cmd->flags & CMD_DENYOOM))
{
luaPushError(lua, shared.oomerr->ptr);
goto cleanup;
}
if (cmd->flags & CMD_RANDOM) lctx.lua_random_dirty = 1;
if (cmd->flags & CMD_WRITE) lctx.lua_write_dirty = 1;
/* If this is a Redis Cluster node, we need to make sure Lua is not
* trying to access non-local keys, with the exception of commands
* received from our master or when loading the AOF back in memory. */
if (server.cluster_enabled && server.script_caller->id != CLIENT_ID_AOF &&
!(server.script_caller->flags & CLIENT_MASTER))
{
int error_code;
/* Duplicate relevant flags in the lua client. */
c->flags &= ~(CLIENT_READONLY|CLIENT_ASKING);
c->flags |= server.script_caller->flags & (CLIENT_READONLY|CLIENT_ASKING);
if (getNodeByQuery(c,c->cmd,c->argv,c->argc,NULL,&error_code) !=
server.cluster->myself)
{
if (error_code == CLUSTER_REDIR_DOWN_RO_STATE) {
luaPushError(lua,
"Lua script attempted to execute a write command while the "
"cluster is down and readonly");
} else if (error_code == CLUSTER_REDIR_DOWN_STATE) {
luaPushError(lua,
"Lua script attempted to execute a command while the "
"cluster is down");
} else {
luaPushError(lua,
"Lua script attempted to access a non local key in a "
"cluster node");
}
goto cleanup;
}
}
/* If we are using single commands replication, we need to wrap what
* we propagate into a MULTI/EXEC block, so that it will be atomic like
* a Lua script in the context of AOF and slaves. */
if (lctx.lua_replicate_commands &&
!lctx.lua_multi_emitted &&
!(server.script_caller->flags & CLIENT_MULTI) &&
lctx.lua_write_dirty &&
lctx.lua_repl != PROPAGATE_NONE)
{
execCommandPropagateMulti(server.script_caller->db->id);
lctx.lua_multi_emitted = 1;
/* Now we are in the MULTI context, the lua_client should be
* flag as CLIENT_MULTI. */
c->flags |= CLIENT_MULTI;
}
/* Run the command */
int call_flags = CMD_CALL_SLOWLOG | CMD_CALL_STATS;
if (lctx.lua_replicate_commands) {
/* Set flags according to redis.set_repl() settings. */
if (lctx.lua_repl & PROPAGATE_AOF)
call_flags |= CMD_CALL_PROPAGATE_AOF;
if (lctx.lua_repl & PROPAGATE_REPL)
call_flags |= CMD_CALL_PROPAGATE_REPL;
}
call(c,call_flags);
serverAssert((c->flags & CLIENT_BLOCKED) == 0);
/* Convert the result of the Redis command into a suitable Lua type. /* Convert the result of the Redis command into a suitable Lua type.
* The first thing we need is to create a single string from the client * The first thing we need is to create a single string from the client
@ -958,8 +827,8 @@ int luaRedisGenericCommand(lua_State *lua, int raise_error) {
/* Sort the output array if needed, assuming it is a non-null multi bulk /* Sort the output array if needed, assuming it is a non-null multi bulk
* reply as expected. */ * reply as expected. */
if ((cmd->flags & CMD_SORT_FOR_SCRIPT) && if ((c->cmd->flags & CMD_SORT_FOR_SCRIPT) &&
(lctx.lua_replicate_commands == 0) && (rctx->flags & SCRIPT_EVAL_REPLICATION) &&
(reply[0] == '*' && reply[1] != '-')) { (reply[0] == '*' && reply[1] != '-')) {
luaSortArray(lua); luaSortArray(lua);
} }
@ -1010,18 +879,18 @@ cleanup:
} }
/* redis.call() */ /* redis.call() */
int luaRedisCallCommand(lua_State *lua) { static int luaRedisCallCommand(lua_State *lua) {
return luaRedisGenericCommand(lua,1); return luaRedisGenericCommand(lua,1);
} }
/* redis.pcall() */ /* redis.pcall() */
int luaRedisPCallCommand(lua_State *lua) { static int luaRedisPCallCommand(lua_State *lua) {
return luaRedisGenericCommand(lua,0); return luaRedisGenericCommand(lua,0);
} }
/* This adds redis.sha1hex(string) to Lua scripts using the same hashing /* This adds redis.sha1hex(string) to Lua scripts using the same hashing
* function used for sha1ing lua scripts. */ * function used for sha1ing lua scripts. */
int luaRedisSha1hexCommand(lua_State *lua) { static int luaRedisSha1hexCommand(lua_State *lua) {
int argc = lua_gettop(lua); int argc = lua_gettop(lua);
char digest[41]; char digest[41];
size_t len; size_t len;
@ -1045,7 +914,7 @@ int luaRedisSha1hexCommand(lua_State *lua) {
* return redis.error_reply("ERR Some Error") * return redis.error_reply("ERR Some Error")
* return redis.status_reply("ERR Some Error") * return redis.status_reply("ERR Some Error")
*/ */
int luaRedisReturnSingleFieldTable(lua_State *lua, char *field) { static int luaRedisReturnSingleFieldTable(lua_State *lua, char *field) {
if (lua_gettop(lua) != 1 || lua_type(lua,-1) != LUA_TSTRING) { if (lua_gettop(lua) != 1 || lua_type(lua,-1) != LUA_TSTRING) {
luaPushError(lua, "wrong number or type of arguments"); luaPushError(lua, "wrong number or type of arguments");
return 1; return 1;
@ -1059,12 +928,12 @@ int luaRedisReturnSingleFieldTable(lua_State *lua, char *field) {
} }
/* redis.error_reply() */ /* redis.error_reply() */
int luaRedisErrorReplyCommand(lua_State *lua) { static int luaRedisErrorReplyCommand(lua_State *lua) {
return luaRedisReturnSingleFieldTable(lua,"err"); return luaRedisReturnSingleFieldTable(lua,"err");
} }
/* redis.status_reply() */ /* redis.status_reply() */
int luaRedisStatusReplyCommand(lua_State *lua) { static int luaRedisStatusReplyCommand(lua_State *lua) {
return luaRedisReturnSingleFieldTable(lua,"ok"); return luaRedisReturnSingleFieldTable(lua,"ok");
} }
@ -1072,11 +941,13 @@ int luaRedisStatusReplyCommand(lua_State *lua) {
* *
* Set the propagation of write commands executed in the context of the * Set the propagation of write commands executed in the context of the
* script to on/off for AOF and slaves. */ * script to on/off for AOF and slaves. */
int luaRedisSetReplCommand(lua_State *lua) { static int luaRedisSetReplCommand(lua_State *lua) {
int argc = lua_gettop(lua); int argc = lua_gettop(lua);
int flags; int flags;
if (lctx.lua_replicate_commands == 0) { scriptRunCtx* rctx = luaGetFromRegistry(lua, REGISTRY_RUN_CTX_NAME);
if (rctx->flags & SCRIPT_EVAL_REPLICATION) {
lua_pushstring(lua, "You can set the replication behavior only after turning on single commands replication with redis.replicate_commands()."); lua_pushstring(lua, "You can set the replication behavior only after turning on single commands replication with redis.replicate_commands().");
return lua_error(lua); return lua_error(lua);
} else if (argc != 1) { } else if (argc != 1) {
@ -1089,12 +960,13 @@ int luaRedisSetReplCommand(lua_State *lua) {
lua_pushstring(lua, "Invalid replication flags. Use REPL_AOF, REPL_REPLICA, REPL_ALL or REPL_NONE."); lua_pushstring(lua, "Invalid replication flags. Use REPL_AOF, REPL_REPLICA, REPL_ALL or REPL_NONE.");
return lua_error(lua); return lua_error(lua);
} }
lctx.lua_repl = flags;
scriptSetRepl(rctx, flags);
return 0; return 0;
} }
/* redis.log() */ /* redis.log() */
int luaLogCommand(lua_State *lua) { static int luaLogCommand(lua_State *lua) {
int j, argc = lua_gettop(lua); int j, argc = lua_gettop(lua);
int level; int level;
sds log; sds log;
@ -1131,7 +1003,7 @@ int luaLogCommand(lua_State *lua) {
} }
/* redis.setresp() */ /* redis.setresp() */
int luaSetResp(lua_State *lua) { static int luaSetResp(lua_State *lua) {
int argc = lua_gettop(lua); int argc = lua_gettop(lua);
if (argc != 1) { if (argc != 1) {
@ -1144,8 +1016,8 @@ int luaSetResp(lua_State *lua) {
lua_pushstring(lua, "RESP version must be 2 or 3."); lua_pushstring(lua, "RESP version must be 2 or 3.");
return lua_error(lua); return lua_error(lua);
} }
scriptRunCtx* rctx = luaGetFromRegistry(lua, REGISTRY_RUN_CTX_NAME);
lctx.lua_client->resp = resp; scriptSetResp(rctx, resp);
return 0; return 0;
} }
@ -1153,7 +1025,7 @@ int luaSetResp(lua_State *lua) {
* Lua engine initialization and reset. * Lua engine initialization and reset.
* ------------------------------------------------------------------------- */ * ------------------------------------------------------------------------- */
void luaLoadLib(lua_State *lua, const char *libname, lua_CFunction luafunc) { static void luaLoadLib(lua_State *lua, const char *libname, lua_CFunction luafunc) {
lua_pushcfunction(lua, luafunc); lua_pushcfunction(lua, luafunc);
lua_pushstring(lua, libname); lua_pushstring(lua, libname);
lua_call(lua, 1, 0); lua_call(lua, 1, 0);
@ -1164,7 +1036,7 @@ LUALIB_API int (luaopen_struct) (lua_State *L);
LUALIB_API int (luaopen_cmsgpack) (lua_State *L); LUALIB_API int (luaopen_cmsgpack) (lua_State *L);
LUALIB_API int (luaopen_bit) (lua_State *L); LUALIB_API int (luaopen_bit) (lua_State *L);
void luaLoadLibraries(lua_State *lua) { static void luaLoadLibraries(lua_State *lua) {
luaLoadLib(lua, "", luaopen_base); luaLoadLib(lua, "", luaopen_base);
luaLoadLib(lua, LUA_TABLIBNAME, luaopen_table); luaLoadLib(lua, LUA_TABLIBNAME, luaopen_table);
luaLoadLib(lua, LUA_STRLIBNAME, luaopen_string); luaLoadLib(lua, LUA_STRLIBNAME, luaopen_string);
@ -1183,7 +1055,7 @@ void luaLoadLibraries(lua_State *lua) {
/* Remove a functions that we don't want to expose to the Redis scripting /* Remove a functions that we don't want to expose to the Redis scripting
* environment. */ * environment. */
void luaRemoveUnsupportedFunctions(lua_State *lua) { static void luaRemoveUnsupportedFunctions(lua_State *lua) {
lua_pushnil(lua); lua_pushnil(lua);
lua_setglobal(lua,"loadfile"); lua_setglobal(lua,"loadfile");
lua_pushnil(lua); lua_pushnil(lua);
@ -1195,7 +1067,7 @@ void luaRemoveUnsupportedFunctions(lua_State *lua) {
* *
* It should be the last to be called in the scripting engine initialization * It should be the last to be called in the scripting engine initialization
* sequence, because it may interact with creation of globals. */ * sequence, because it may interact with creation of globals. */
void scriptingEnableGlobalsProtection(lua_State *lua) { void luaEnableGlobalsProtection(lua_State *lua) {
char *s[32]; char *s[32];
sds code = sdsempty(); sds code = sdsempty();
int j = 0; int j = 0;
@ -1229,7 +1101,7 @@ void scriptingEnableGlobalsProtection(lua_State *lua) {
sdsfree(code); sdsfree(code);
} }
void luaEngineRegisterRedisAPI(lua_State* lua) { void luaRegisterRedisAPI(lua_State* lua) {
luaLoadLibraries(lua); luaLoadLibraries(lua);
luaRemoveUnsupportedFunctions(lua); luaRemoveUnsupportedFunctions(lua);
@ -1350,7 +1222,7 @@ void luaSetGlobalArray(lua_State *lua, char *var, robj **elev, int elec) {
/* The following implementation is the one shipped with Lua itself but with /* The following implementation is the one shipped with Lua itself but with
* rand() replaced by redisLrand48(). */ * rand() replaced by redisLrand48(). */
int redis_math_random (lua_State *L) { static int redis_math_random (lua_State *L) {
/* the `%' avoids the (rare) case of r==1, and is needed also because on /* the `%' avoids the (rare) case of r==1, and is needed also because on
some systems (SunOS!) `rand()' may return a value larger than RAND_MAX */ some systems (SunOS!) `rand()' may return a value larger than RAND_MAX */
lua_Number r = (lua_Number)(redisLrand48()%REDIS_LRAND48_MAX) / lua_Number r = (lua_Number)(redisLrand48()%REDIS_LRAND48_MAX) /
@ -1378,36 +1250,16 @@ int redis_math_random (lua_State *L) {
return 1; return 1;
} }
int redis_math_randomseed (lua_State *L) { static int redis_math_randomseed (lua_State *L) {
redisSrand48(luaL_checkint(L, 1)); redisSrand48(luaL_checkint(L, 1));
return 0; return 0;
} }
/* This is the Lua script "count" hook that we use to detect scripts timeout. */ /* This is the Lua script "count" hook that we use to detect scripts timeout. */
void luaMaskCountHook(lua_State *lua, lua_Debug *ar) { void luaMaskCountHook(lua_State *lua, lua_Debug *ar) {
long long elapsed = elapsedMs(lctx.lua_time_start);
UNUSED(ar); UNUSED(ar);
UNUSED(lua); scriptRunCtx* rctx = luaGetFromRegistry(lua, REGISTRY_RUN_CTX_NAME);
if (scriptInterrupt(rctx) == SCRIPT_KILL) {
/* Set the timeout condition if not already set and the maximum
* execution time was reached. */
if (elapsed >= server.script_time_limit && server.script_timedout == 0) {
serverLog(LL_WARNING,
"Lua slow script detected: still in execution after %lld milliseconds. "
"You can try killing the script using the SCRIPT KILL command. "
"Script SHA1 is: %s",
elapsed, lctx.lua_cur_script);
server.script_timedout = 1;
blockingOperationStarts();
/* Once the script timeouts we reenter the event loop to permit others
* to call SCRIPT KILL or SHUTDOWN NOSAVE if needed. For this reason
* we need to mask the client executing the script from the event loop.
* If we don't do that the client may disconnect and could no longer be
* here when the EVAL command will return. */
protectClient(server.script_caller);
}
if (server.script_timedout) processEventsWhileBlocked();
if (lctx.lua_kill) {
serverLog(LL_WARNING,"Lua script killed by user with SCRIPT KILL."); serverLog(LL_WARNING,"Lua script killed by user with SCRIPT KILL.");
/* /*

View File

@ -1,5 +1,5 @@
/* /*
* Copyright (c) 2009-2021, Redis Labs Ltd. * Copyright (c) 2009-2021, Redis Ltd.
* All rights reserved. * All rights reserved.
* *
* Redistribution and use in source and binary forms, with or without * Redistribution and use in source and binary forms, with or without
@ -30,35 +30,39 @@
#ifndef __SCRIPT_LUA_H_ #ifndef __SCRIPT_LUA_H_
#define __SCRIPT_LUA_H_ #define __SCRIPT_LUA_H_
/*
* script_lua.c unit provides shared functionality between
* eval.c and function_lua.c. Functionality provided:
*
* * Execute Lua code, assuming that the code is located on
* the top of the Lua stack. In addition, parsing the execution
* result and convert it to the resp and reply ot the client.
*
* * Run Redis commands from within the Lua code (Including
* parsing the reply and create a Lua object out of it).
*
* * Register Redis API to the Lua interpreter. Only shared
* API are registered (API that is only relevant on eval.c
* (like debugging) are registered on eval.c).
*
* Uses script.c for interaction back with Redis.
*/
#include "server.h" #include "server.h"
#include "script.h" #include "script.h"
#include <lua.h> #include <lua.h>
#include <lauxlib.h> #include <lauxlib.h>
#include <lualib.h> #include <lualib.h>
typedef struct luaCtx { #define REGISTRY_RUN_CTX_NAME "__RUN_CTX__"
lua_State *lua; /* The Lua interpreter. We use just one for all clients */
client *lua_client; /* The "fake client" to query Redis from Lua */
char *lua_cur_script; /* SHA1 of the script currently running, or NULL */
dict *lua_scripts; /* A dictionary of SHA1 -> Lua scripts */
unsigned long long lua_scripts_mem; /* Cached scripts' memory + oh */
int lua_replicate_commands; /* True if we are doing single commands repl. */
int lua_write_dirty;
int lua_random_dirty;
int lua_multi_emitted;
int lua_repl;
int lua_kill;
monotime lua_time_start; /* monotonic timer to detect timed-out script */
mstime_t lua_time_snapshot; /* Snapshot of mstime when script is started */
} luaCtx;
extern luaCtx lctx; void luaRegisterRedisAPI(lua_State* lua);
void luaEnableGlobalsProtection(lua_State *lua);
void luaEngineRegisterRedisAPI(lua_State* lua);
void scriptingEnableGlobalsProtection(lua_State *lua);
void luaSetGlobalArray(lua_State *lua, char *var, robj **elev, int elec); void luaSetGlobalArray(lua_State *lua, char *var, robj **elev, int elec);
void luaMaskCountHook(lua_State *lua, lua_Debug *ar); void luaMaskCountHook(lua_State *lua, lua_Debug *ar);
void luaReplyToRedisReply(client *c, lua_State *lua); void luaReplyToRedisReply(client *c, client* script_client, lua_State *lua);
void luaSaveOnRegistry(lua_State* lua, const char* name, void* ptr);
void* luaGetFromRegistry(lua_State* lua, const char* name);

View File

@ -35,6 +35,7 @@
#include "latency.h" #include "latency.h"
#include "atomicvar.h" #include "atomicvar.h"
#include "mt19937-64.h" #include "mt19937-64.h"
#include "script.h"
#include <time.h> #include <time.h>
#include <signal.h> #include <signal.h>
@ -4931,13 +4932,13 @@ void call(client *c, int flags) {
/* When EVAL is called loading the AOF we don't want commands called /* When EVAL is called loading the AOF we don't want commands called
* from Lua to go into the slowlog or to populate statistics. */ * from Lua to go into the slowlog or to populate statistics. */
if (server.loading && c->flags & CLIENT_LUA) if (server.loading && c->flags & CLIENT_SCRIPT)
flags &= ~(CMD_CALL_SLOWLOG | CMD_CALL_STATS); flags &= ~(CMD_CALL_SLOWLOG | CMD_CALL_STATS);
/* If the caller is Lua, we want to force the EVAL caller to propagate /* If the caller is Lua, we want to force the EVAL caller to propagate
* the script if the command flag or client flag are forcing the * the script if the command flag or client flag are forcing the
* propagation. */ * propagation. */
if (c->flags & CLIENT_LUA && server.script_caller) { if (c->flags & CLIENT_SCRIPT && server.script_caller) {
if (c->flags & CLIENT_FORCE_REPL) if (c->flags & CLIENT_FORCE_REPL)
server.script_caller->flags |= CLIENT_FORCE_REPL; server.script_caller->flags |= CLIENT_FORCE_REPL;
if (c->flags & CLIENT_FORCE_AOF) if (c->flags & CLIENT_FORCE_AOF)
@ -5070,7 +5071,7 @@ void call(client *c, int flags) {
/* If the client has keys tracking enabled for client side caching, /* If the client has keys tracking enabled for client side caching,
* make sure to remember the keys it fetched via this command. */ * make sure to remember the keys it fetched via this command. */
if (c->cmd->flags & CMD_READONLY) { if (c->cmd->flags & CMD_READONLY) {
client *caller = (c->flags & CLIENT_LUA && server.script_caller) ? client *caller = (c->flags & CLIENT_SCRIPT && server.script_caller) ?
server.script_caller : c; server.script_caller : c;
if (caller->flags & CLIENT_TRACKING && if (caller->flags & CLIENT_TRACKING &&
!(caller->flags & CLIENT_TRACKING_BCAST)) !(caller->flags & CLIENT_TRACKING_BCAST))
@ -5172,7 +5173,7 @@ void populateCommandMovableKeys(struct redisCommand *cmd) {
* other operations can be performed by the caller. Otherwise * other operations can be performed by the caller. Otherwise
* if C_ERR is returned the client was destroyed (i.e. after QUIT). */ * if C_ERR is returned the client was destroyed (i.e. after QUIT). */
int processCommand(client *c) { int processCommand(client *c) {
if (!server.script_timedout) { if (!scriptIsTimedout()) {
/* Both EXEC and EVAL call call() directly so there should be /* Both EXEC and EVAL call call() directly so there should be
* no way in_exec or in_eval or propagate_in_transaction is 1. * no way in_exec or in_eval or propagate_in_transaction is 1.
* That is unless lua_timedout, in which case client may run * That is unless lua_timedout, in which case client may run
@ -5273,7 +5274,7 @@ int processCommand(client *c) {
* 2) The command has no key arguments. */ * 2) The command has no key arguments. */
if (server.cluster_enabled && if (server.cluster_enabled &&
!(c->flags & CLIENT_MASTER) && !(c->flags & CLIENT_MASTER) &&
!(c->flags & CLIENT_LUA && !(c->flags & CLIENT_SCRIPT &&
server.script_caller->flags & CLIENT_MASTER) && server.script_caller->flags & CLIENT_MASTER) &&
!(!c->cmd->movablekeys && c->cmd->key_specs_num == 0 && !(!c->cmd->movablekeys && c->cmd->key_specs_num == 0 &&
c->cmd->proc != execCommand)) c->cmd->proc != execCommand))
@ -5309,7 +5310,7 @@ int processCommand(client *c) {
* the event loop since there is a busy Lua script running in timeout * the event loop since there is a busy Lua script running in timeout
* condition, to avoid mixing the propagation of scripts with the * condition, to avoid mixing the propagation of scripts with the
* propagation of DELs due to eviction. */ * propagation of DELs due to eviction. */
if (server.maxmemory && !server.script_timedout) { if (server.maxmemory && !scriptIsTimedout()) {
int out_of_memory = (performEvictions() == EVICT_FAIL); int out_of_memory = (performEvictions() == EVICT_FAIL);
/* performEvictions may evict keys, so we need flush pending tracking /* performEvictions may evict keys, so we need flush pending tracking
@ -5432,7 +5433,7 @@ int processCommand(client *c) {
* the MULTI plus a few initial commands refused, then the timeout * the MULTI plus a few initial commands refused, then the timeout
* condition resolves, and the bottom-half of the transaction gets * condition resolves, and the bottom-half of the transaction gets
* executed, see Github PR #7022. */ * executed, see Github PR #7022. */
if (server.script_timedout && if (scriptIsTimedout() &&
c->cmd->proc != authCommand && c->cmd->proc != authCommand &&
c->cmd->proc != helloCommand && c->cmd->proc != helloCommand &&
c->cmd->proc != replconfCommand && c->cmd->proc != replconfCommand &&

View File

@ -260,7 +260,7 @@ extern int configOOMScoreAdjValuesDefaults[CONFIG_OOM_COUNT];
#define CLIENT_CLOSE_AFTER_REPLY (1<<6) /* Close after writing entire reply. */ #define CLIENT_CLOSE_AFTER_REPLY (1<<6) /* Close after writing entire reply. */
#define CLIENT_UNBLOCKED (1<<7) /* This client was unblocked and is stored in #define CLIENT_UNBLOCKED (1<<7) /* This client was unblocked and is stored in
server.unblocked_clients */ server.unblocked_clients */
#define CLIENT_LUA (1<<8) /* This is a non connected client used by Lua */ #define CLIENT_SCRIPT (1<<8) /* This is a non connected client used by Lua */
#define CLIENT_ASKING (1<<9) /* Client issued the ASKING command */ #define CLIENT_ASKING (1<<9) /* Client issued the ASKING command */
#define CLIENT_CLOSE_ASAP (1<<10)/* Close this client ASAP */ #define CLIENT_CLOSE_ASAP (1<<10)/* Close this client ASAP */
#define CLIENT_UNIX_SOCKET (1<<11) /* Client connected via Unix domain socket */ #define CLIENT_UNIX_SOCKET (1<<11) /* Client connected via Unix domain socket */
@ -1721,8 +1721,6 @@ struct redisServer {
/* Scripting */ /* Scripting */
client *script_caller; /* The client running script right now, or NULL */ client *script_caller; /* The client running script right now, or NULL */
mstime_t script_time_limit; /* Script timeout in milliseconds */ mstime_t script_time_limit; /* Script timeout in milliseconds */
int script_timedout; /* True if we reached the time limit for script
execution. */
int lua_always_replicate_commands; /* Default replication type. */ int lua_always_replicate_commands; /* Default replication type. */
int script_oom; /* OOM detected when script start */ int script_oom; /* OOM detected when script start */
int script_disable_deny_script; /* Allow running commands marked "no-script" inside a script. */ int script_disable_deny_script; /* Allow running commands marked "no-script" inside a script. */

View File

@ -294,7 +294,7 @@ void sortCommandGeneric(client *c, int readonly) {
* scripting and replication. */ * scripting and replication. */
if (dontsort && if (dontsort &&
sortval->type == OBJ_SET && sortval->type == OBJ_SET &&
(storekey || c->flags & CLIENT_LUA)) (storekey || c->flags & CLIENT_SCRIPT))
{ {
/* Force ALPHA sorting */ /* Force ALPHA sorting */
dontsort = 0; dontsort = 0;

View File

@ -1998,7 +1998,7 @@ void xreadCommand(client *c) {
int moreargs = c->argc-i-1; int moreargs = c->argc-i-1;
char *o = c->argv[i]->ptr; char *o = c->argv[i]->ptr;
if (!strcasecmp(o,"BLOCK") && moreargs) { if (!strcasecmp(o,"BLOCK") && moreargs) {
if (c->flags & CLIENT_LUA) { if (c->flags & CLIENT_SCRIPT) {
/* /*
* Although the CLIENT_DENY_BLOCKING flag should protect from blocking the client * Although the CLIENT_DENY_BLOCKING flag should protect from blocking the client
* on Lua/MULTI/RM_Call we want special treatment for Lua to keep backward compatibility. * on Lua/MULTI/RM_Call we want special treatment for Lua to keep backward compatibility.

View File

@ -984,7 +984,7 @@ start_server {tags {"scripting needs:debug external:skip"}} {
r write $cmd r write $cmd
r flush r flush
set ret [r read] set ret [r read]
assert_match {*Unknown Redis command called from Lua script*} $ret assert_match {*Unknown Redis command called from*} $ret
# make sure the server is still ok # make sure the server is still ok
reconnect reconnect
assert_equal [r ping] {PONG} assert_equal [r ping] {PONG}