mirror of
https://codeberg.org/redict/redict.git
synced 2025-01-22 08:08:53 -05:00
Ensure that the function load timeout is disabled during loading from RDB/AOF and on replicas. (#12451)
When loading a function from either RDB/AOF or a replica, it is essential not to fail on timeout errors. The loading time may vary due to various factors, such as hardware specifications or the system's workload during the loading process. Once a function has been successfully loaded, it should be allowed to load from persistence or on replicas without encountering a timeout failure. To maintain a clear separation between the engine and Redis internals, the implementation refrains from directly checking the state of Redis within the engine itself. Instead, the engine receives the desired timeout as part of the library creation and duly respects this timeout value. If Redis wishes to disable any timeout, it can simply send a value of 0.
This commit is contained in:
parent
90ab91f00b
commit
2ee1bbb53b
@ -51,7 +51,6 @@
|
||||
#define REGISTRY_LOAD_CTX_NAME "__LIBRARY_CTX__"
|
||||
#define LIBRARY_API_NAME "__LIBRARY_API__"
|
||||
#define GLOBALS_API_NAME "__GLOBALS_API__"
|
||||
#define LOAD_TIMEOUT_MS 500
|
||||
|
||||
/* Lua engine ctx */
|
||||
typedef struct luaEngineCtx {
|
||||
@ -67,6 +66,7 @@ typedef struct luaFunctionCtx {
|
||||
typedef struct loadCtx {
|
||||
functionLibInfo *li;
|
||||
monotime start_time;
|
||||
size_t timeout;
|
||||
} loadCtx;
|
||||
|
||||
typedef struct registerFunctionArgs {
|
||||
@ -85,7 +85,7 @@ static void luaEngineLoadHook(lua_State *lua, lua_Debug *ar) {
|
||||
loadCtx *load_ctx = luaGetFromRegistry(lua, REGISTRY_LOAD_CTX_NAME);
|
||||
serverAssert(load_ctx); /* Only supported inside script invocation */
|
||||
uint64_t duration = elapsedMs(load_ctx->start_time);
|
||||
if (duration > LOAD_TIMEOUT_MS) {
|
||||
if (load_ctx->timeout > 0 && duration > load_ctx->timeout) {
|
||||
lua_sethook(lua, luaEngineLoadHook, LUA_MASKLINE, 0);
|
||||
|
||||
luaPushError(lua,"FUNCTION LOAD timeout");
|
||||
@ -100,7 +100,7 @@ static void luaEngineLoadHook(lua_State *lua, lua_Debug *ar) {
|
||||
*
|
||||
* Return NULL on compilation error and set the error to the err variable
|
||||
*/
|
||||
static int luaEngineCreate(void *engine_ctx, functionLibInfo *li, sds blob, sds *err) {
|
||||
static int luaEngineCreate(void *engine_ctx, functionLibInfo *li, sds blob, size_t timeout, sds *err) {
|
||||
int ret = C_ERR;
|
||||
luaEngineCtx *lua_engine_ctx = engine_ctx;
|
||||
lua_State *lua = lua_engine_ctx->lua;
|
||||
@ -124,6 +124,7 @@ static int luaEngineCreate(void *engine_ctx, functionLibInfo *li, sds blob, sds
|
||||
loadCtx load_ctx = {
|
||||
.li = li,
|
||||
.start_time = getMonotonicUs(),
|
||||
.timeout = timeout,
|
||||
};
|
||||
luaSaveOnRegistry(lua, REGISTRY_LOAD_CTX_NAME, &load_ctx);
|
||||
|
||||
|
@ -33,6 +33,8 @@
|
||||
#include "adlist.h"
|
||||
#include "atomicvar.h"
|
||||
|
||||
#define LOAD_TIMEOUT_MS 500
|
||||
|
||||
typedef enum {
|
||||
restorePolicy_Flush, restorePolicy_Append, restorePolicy_Replace
|
||||
} restorePolicy;
|
||||
@ -961,7 +963,7 @@ void functionFreeLibMetaData(functionsLibMataData *md) {
|
||||
|
||||
/* Compile and save the given library, return the loaded library name on success
|
||||
* and NULL on failure. In case on failure the err out param is set with relevant error message */
|
||||
sds functionsCreateWithLibraryCtx(sds code, int replace, sds* err, functionsLibCtx *lib_ctx) {
|
||||
sds functionsCreateWithLibraryCtx(sds code, int replace, sds* err, functionsLibCtx *lib_ctx, size_t timeout) {
|
||||
dictIterator *iter = NULL;
|
||||
dictEntry *entry = NULL;
|
||||
functionLibInfo *new_li = NULL;
|
||||
@ -995,7 +997,7 @@ sds functionsCreateWithLibraryCtx(sds code, int replace, sds* err, functionsLibC
|
||||
}
|
||||
|
||||
new_li = engineLibraryCreate(md.name, ei, code);
|
||||
if (engine->create(engine->engine_ctx, new_li, md.code, err) != C_OK) {
|
||||
if (engine->create(engine->engine_ctx, new_li, md.code, timeout, err) != C_OK) {
|
||||
goto error;
|
||||
}
|
||||
|
||||
@ -1063,7 +1065,11 @@ void functionLoadCommand(client *c) {
|
||||
robj *code = c->argv[argc_pos];
|
||||
sds err = NULL;
|
||||
sds library_name = NULL;
|
||||
if (!(library_name = functionsCreateWithLibraryCtx(code->ptr, replace, &err, curr_functions_lib_ctx)))
|
||||
size_t timeout = LOAD_TIMEOUT_MS;
|
||||
if (mustObeyClient(c)) {
|
||||
timeout = 0;
|
||||
}
|
||||
if (!(library_name = functionsCreateWithLibraryCtx(code->ptr, replace, &err, curr_functions_lib_ctx, timeout)))
|
||||
{
|
||||
addReplyErrorSds(c, err);
|
||||
return;
|
||||
|
@ -53,9 +53,14 @@ typedef struct engine {
|
||||
/* engine specific context */
|
||||
void *engine_ctx;
|
||||
|
||||
/* Create function callback, get the engine_ctx, and function code.
|
||||
/* Create function callback, get the engine_ctx, and function code
|
||||
* engine_ctx - opaque struct that was created on engine initialization
|
||||
* li - library information that need to be provided and when add functions
|
||||
* code - the library code
|
||||
* timeout - timeout for the library creation (0 for no timeout)
|
||||
* err - description of error (if occurred)
|
||||
* returns NULL on error and set sds to be the error message */
|
||||
int (*create)(void *engine_ctx, functionLibInfo *li, sds code, sds *err);
|
||||
int (*create)(void *engine_ctx, functionLibInfo *li, sds code, size_t timeout, sds *err);
|
||||
|
||||
/* Invoking a function, r_ctx is an opaque object (from engine POV).
|
||||
* The r_ctx should be used by the engine to interaction with Redis,
|
||||
@ -109,7 +114,7 @@ struct functionLibInfo {
|
||||
};
|
||||
|
||||
int functionsRegisterEngine(const char *engine_name, engine *engine_ctx);
|
||||
sds functionsCreateWithLibraryCtx(sds code, int replace, sds* err, functionsLibCtx *lib_ctx);
|
||||
sds functionsCreateWithLibraryCtx(sds code, int replace, sds* err, functionsLibCtx *lib_ctx, size_t timeout);
|
||||
unsigned long functionsMemory(void);
|
||||
unsigned long functionsMemoryOverhead(void);
|
||||
unsigned long functionsNum(void);
|
||||
|
@ -2981,7 +2981,7 @@ int rdbFunctionLoad(rio *rdb, int ver, functionsLibCtx* lib_ctx, int rdbflags, s
|
||||
|
||||
if (lib_ctx) {
|
||||
sds library_name = NULL;
|
||||
if (!(library_name = functionsCreateWithLibraryCtx(final_payload, rdbflags & RDBFLAGS_ALLOW_DUP, &error, lib_ctx))) {
|
||||
if (!(library_name = functionsCreateWithLibraryCtx(final_payload, rdbflags & RDBFLAGS_ALLOW_DUP, &error, lib_ctx, 0))) {
|
||||
if (!error) {
|
||||
error = sdsnew("Failed creating the library");
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user