mirror of
https://codeberg.org/redict/redict.git
synced 2025-01-22 08:08:53 -05:00
Change FUNCTION CREATE, DELETE and FLUSH to be WRITE commands instead of MAY_REPLICATE. (#9953)
The issue with MAY_REPLICATE is that all automatic mechanisms to handle write commands will not work. This require have a special treatment for: * Not allow those commands to be executed on RO replica. * Allow those commands to be executed on RO replica from primary connection. * Allow those commands to be executed on the RO replica from AOF. By setting those commands as WRITE commands we are getting all those properties from Redis. Test was added to verify that those properties work as expected. In addition, rearrange when and where functions are flushed. Before this PR functions were flushed manually on `rdbLoadRio` and cleaned manually on failure. This contradicts the assumptions that functions are data and need to be created/deleted alongside with the data. A side effect of this, for example, `debug reload noflush` did not flush the data but did flush the functions, `debug loadaof` flush the data but not the functions. This PR move functions deletion into `emptyDb`. `emptyDb` (renamed to `emptyData`) will now accept an additional flag, `NOFUNCTIONS` which specifically indicate that we do not want to flush the functions (on all other cases, functions will be flushed). Used the new flag on FLUSHALL and FLUSHDB only! Tests were added to `debug reload` and `debug loadaof` to verify that functions behave the same as the data. Notice that because now functions will be deleted along side with the data we can not allow `CLUSTER RESET` to be called from within a function (it will cause the function to be released while running), this PR adds `NO_SCRIPT` flag to `CLUSTER RESET` so it will not be possible to be called from within a function. The other cluster commands are allowed from within a function (there are use-cases that uses `GETKEYSINSLOT` to iterate over all the keys on a given slot). Tests was added to verify `CLUSTER RESET` is denied from within a script. Another small change on this PR is that `RDBFLAGS_ALLOW_DUP` is also applicable on functions. When loading functions, if this flag is set, we will replace old functions with new ones on collisions.
This commit is contained in:
parent
23b6173486
commit
3bcf108416
@ -660,7 +660,7 @@ void clusterReset(int hard) {
|
||||
if (nodeIsSlave(myself)) {
|
||||
clusterSetNodeAsMaster(myself);
|
||||
replicationUnsetMaster();
|
||||
emptyDb(-1,EMPTYDB_NO_FLAGS,NULL);
|
||||
emptyData(-1,EMPTYDB_NO_FLAGS,NULL);
|
||||
}
|
||||
|
||||
/* Close slots, reset manual failover state. */
|
||||
|
@ -566,7 +566,7 @@ struct redisCommand CLUSTER_Subcommands[] = {
|
||||
{"nodes","Get Cluster config for the node","O(N) where N is the total number of Cluster nodes","3.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_CLUSTER,CLUSTER_NODES_History,CLUSTER_NODES_Hints,clusterCommand,2,CMD_RANDOM|CMD_STALE,0},
|
||||
{"replicas","List replica nodes of the specified master node","O(1)","5.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_CLUSTER,CLUSTER_REPLICAS_History,CLUSTER_REPLICAS_Hints,clusterCommand,3,CMD_ADMIN|CMD_RANDOM|CMD_STALE,0,.args=CLUSTER_REPLICAS_Args},
|
||||
{"replicate","Reconfigure a node as a replica of the specified master node","O(1)","3.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_CLUSTER,CLUSTER_REPLICATE_History,CLUSTER_REPLICATE_Hints,clusterCommand,3,CMD_ADMIN|CMD_RANDOM|CMD_STALE,0,.args=CLUSTER_REPLICATE_Args},
|
||||
{"reset","Reset a Redis Cluster node","O(N) where N is the number of known nodes. The command may execute a FLUSHALL as a side effect.","3.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_CLUSTER,CLUSTER_RESET_History,CLUSTER_RESET_Hints,clusterCommand,3,CMD_ADMIN|CMD_RANDOM|CMD_STALE,0,.args=CLUSTER_RESET_Args},
|
||||
{"reset","Reset a Redis Cluster node","O(N) where N is the number of known nodes. The command may execute a FLUSHALL as a side effect.","3.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_CLUSTER,CLUSTER_RESET_History,CLUSTER_RESET_Hints,clusterCommand,3,CMD_ADMIN|CMD_RANDOM|CMD_STALE|CMD_NOSCRIPT,0,.args=CLUSTER_RESET_Args},
|
||||
{"saveconfig","Forces the node to save cluster state on disk","O(1)","3.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_CLUSTER,CLUSTER_SAVECONFIG_History,CLUSTER_SAVECONFIG_Hints,clusterCommand,2,CMD_ADMIN|CMD_RANDOM|CMD_STALE,0},
|
||||
{"set-config-epoch","Set the configuration epoch in a new node","O(1)","3.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_CLUSTER,CLUSTER_SET_CONFIG_EPOCH_History,CLUSTER_SET_CONFIG_EPOCH_Hints,clusterCommand,3,CMD_ADMIN|CMD_RANDOM|CMD_STALE,0,.args=CLUSTER_SET_CONFIG_EPOCH_Args},
|
||||
{"setslot","Bind a hash slot to a specific node","O(1)","3.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_CLUSTER,CLUSTER_SETSLOT_History,CLUSTER_SETSLOT_Hints,clusterCommand,-4,CMD_ADMIN|CMD_RANDOM|CMD_STALE,0,.args=CLUSTER_SETSLOT_Args},
|
||||
@ -3134,14 +3134,14 @@ struct redisCommandArg FUNCTION_INFO_Args[] = {
|
||||
|
||||
/* FUNCTION command table */
|
||||
struct redisCommand FUNCTION_Subcommands[] = {
|
||||
{"create","PATCH__TBD__15__","PATCH__TBD__14__","7.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_SCRIPTING,FUNCTION_CREATE_History,FUNCTION_CREATE_Hints,functionCreateCommand,-5,CMD_NOSCRIPT|CMD_MAY_REPLICATE,ACL_CATEGORY_SCRIPTING,.args=FUNCTION_CREATE_Args},
|
||||
{"delete","PATCH__TBD__23__","PATCH__TBD__22__","7.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_SCRIPTING,FUNCTION_DELETE_History,FUNCTION_DELETE_Hints,functionDeleteCommand,3,CMD_NOSCRIPT|CMD_MAY_REPLICATE,ACL_CATEGORY_SCRIPTING,.args=FUNCTION_DELETE_Args},
|
||||
{"flush","PATCH__TBD__29__","PATCH__TBD__28__","7.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_SCRIPTING,FUNCTION_FLUSH_History,FUNCTION_FLUSH_Hints,functionFlushCommand,-2,CMD_NOSCRIPT|CMD_MAY_REPLICATE,ACL_CATEGORY_SCRIPTING,.args=FUNCTION_FLUSH_Args},
|
||||
{"create","Create a function with the given arguments (name, code, description)","O(1) (considering compilation time is redundant)","7.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_SCRIPTING,FUNCTION_CREATE_History,FUNCTION_CREATE_Hints,functionCreateCommand,-5,CMD_NOSCRIPT|CMD_WRITE,ACL_CATEGORY_SCRIPTING,.args=FUNCTION_CREATE_Args},
|
||||
{"delete","Delete a function by name","O(1)","7.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_SCRIPTING,FUNCTION_DELETE_History,FUNCTION_DELETE_Hints,functionDeleteCommand,3,CMD_NOSCRIPT|CMD_WRITE,ACL_CATEGORY_SCRIPTING,.args=FUNCTION_DELETE_Args},
|
||||
{"flush","Deleting all functions","O(N) where N is the number of functions deleted","7.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_SCRIPTING,FUNCTION_FLUSH_History,FUNCTION_FLUSH_Hints,functionFlushCommand,-2,CMD_NOSCRIPT|CMD_WRITE,ACL_CATEGORY_SCRIPTING,.args=FUNCTION_FLUSH_Args},
|
||||
{"help","Show helpful text about the different subcommands","O(1)","7.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_SCRIPTING,FUNCTION_HELP_History,FUNCTION_HELP_Hints,functionHelpCommand,2,CMD_LOADING|CMD_STALE,ACL_CATEGORY_SCRIPTING},
|
||||
{"info","PATCH__TBD__11__","PATCH__TBD__10__","7.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_SCRIPTING,FUNCTION_INFO_History,FUNCTION_INFO_Hints,functionInfoCommand,-3,CMD_NOSCRIPT,ACL_CATEGORY_SCRIPTING,.args=FUNCTION_INFO_Args},
|
||||
{"kill","PATCH__TBD__19__","PATCH__TBD__18__","7.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_SCRIPTING,FUNCTION_KILL_History,FUNCTION_KILL_Hints,functionKillCommand,2,CMD_NOSCRIPT,ACL_CATEGORY_SCRIPTING},
|
||||
{"list","PATCH__TBD__27__","PATCH__TBD__26__","7.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_SCRIPTING,FUNCTION_LIST_History,FUNCTION_LIST_Hints,functionListCommand,2,CMD_NOSCRIPT,ACL_CATEGORY_SCRIPTING},
|
||||
{"stats","PATCH__TBD__34__","PATCH__TBD__33__","7.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_SCRIPTING,FUNCTION_STATS_History,FUNCTION_STATS_Hints,functionStatsCommand,2,CMD_NOSCRIPT,ACL_CATEGORY_SCRIPTING},
|
||||
{"info","Return information about a function by function name","O(1)","7.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_SCRIPTING,FUNCTION_INFO_History,FUNCTION_INFO_Hints,functionInfoCommand,-3,CMD_NOSCRIPT,ACL_CATEGORY_SCRIPTING,.args=FUNCTION_INFO_Args},
|
||||
{"kill","Kill the function currently in execution.","O(1)","7.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_SCRIPTING,FUNCTION_KILL_History,FUNCTION_KILL_Hints,functionKillCommand,2,CMD_NOSCRIPT,ACL_CATEGORY_SCRIPTING},
|
||||
{"list","List information about all the functions","O(N) where N is the number of functions","7.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_SCRIPTING,FUNCTION_LIST_History,FUNCTION_LIST_Hints,functionListCommand,2,CMD_NOSCRIPT,ACL_CATEGORY_SCRIPTING},
|
||||
{"stats","Return information about the function currently running (name, description, duration)","O(1)","7.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_SCRIPTING,FUNCTION_STATS_History,FUNCTION_STATS_Hints,functionStatsCommand,2,CMD_NOSCRIPT,ACL_CATEGORY_SCRIPTING},
|
||||
{0}
|
||||
};
|
||||
|
||||
|
@ -10,7 +10,8 @@
|
||||
"command_flags": [
|
||||
"ADMIN",
|
||||
"RANDOM",
|
||||
"STALE"
|
||||
"STALE",
|
||||
"NOSCRIPT"
|
||||
],
|
||||
"arguments": [
|
||||
{
|
||||
|
@ -1,7 +1,7 @@
|
||||
{
|
||||
"CREATE": {
|
||||
"summary": "PATCH__TBD__15__",
|
||||
"complexity": "PATCH__TBD__14__",
|
||||
"summary": "Create a function with the given arguments (name, code, description)",
|
||||
"complexity": "O(1) (considering compilation time is redundant)",
|
||||
"group": "scripting",
|
||||
"since": "7.0.0",
|
||||
"arity": -5,
|
||||
@ -9,7 +9,7 @@
|
||||
"function": "functionCreateCommand",
|
||||
"command_flags": [
|
||||
"NOSCRIPT",
|
||||
"MAY_REPLICATE"
|
||||
"WRITE"
|
||||
],
|
||||
"acl_categories": [
|
||||
"SCRIPTING"
|
||||
|
@ -1,7 +1,7 @@
|
||||
{
|
||||
"DELETE": {
|
||||
"summary": "PATCH__TBD__23__",
|
||||
"complexity": "PATCH__TBD__22__",
|
||||
"summary": "Delete a function by name",
|
||||
"complexity": "O(1)",
|
||||
"group": "scripting",
|
||||
"since": "7.0.0",
|
||||
"arity": 3,
|
||||
@ -9,7 +9,7 @@
|
||||
"function": "functionDeleteCommand",
|
||||
"command_flags": [
|
||||
"NOSCRIPT",
|
||||
"MAY_REPLICATE"
|
||||
"WRITE"
|
||||
],
|
||||
"acl_categories": [
|
||||
"SCRIPTING"
|
||||
|
@ -1,7 +1,7 @@
|
||||
{
|
||||
"FLUSH": {
|
||||
"summary": "PATCH__TBD__29__",
|
||||
"complexity": "PATCH__TBD__28__",
|
||||
"summary": "Deleting all functions",
|
||||
"complexity": "O(N) where N is the number of functions deleted",
|
||||
"group": "scripting",
|
||||
"since": "7.0.0",
|
||||
"arity": -2,
|
||||
@ -9,7 +9,7 @@
|
||||
"function": "functionFlushCommand",
|
||||
"command_flags": [
|
||||
"NOSCRIPT",
|
||||
"MAY_REPLICATE"
|
||||
"WRITE"
|
||||
],
|
||||
"acl_categories": [
|
||||
"SCRIPTING"
|
||||
|
@ -1,7 +1,7 @@
|
||||
{
|
||||
"INFO": {
|
||||
"summary": "PATCH__TBD__11__",
|
||||
"complexity": "PATCH__TBD__10__",
|
||||
"summary": "Return information about a function by function name",
|
||||
"complexity": "O(1)",
|
||||
"group": "scripting",
|
||||
"since": "7.0.0",
|
||||
"arity": -3,
|
||||
|
@ -1,7 +1,7 @@
|
||||
{
|
||||
"KILL": {
|
||||
"summary": "PATCH__TBD__19__",
|
||||
"complexity": "PATCH__TBD__18__",
|
||||
"summary": "Kill the function currently in execution.",
|
||||
"complexity": "O(1)",
|
||||
"group": "scripting",
|
||||
"since": "7.0.0",
|
||||
"arity": 2,
|
||||
|
@ -1,7 +1,7 @@
|
||||
{
|
||||
"LIST": {
|
||||
"summary": "PATCH__TBD__27__",
|
||||
"complexity": "PATCH__TBD__26__",
|
||||
"summary": "List information about all the functions",
|
||||
"complexity": "O(N) where N is the number of functions",
|
||||
"group": "scripting",
|
||||
"since": "7.0.0",
|
||||
"arity": 2,
|
||||
|
@ -1,7 +1,7 @@
|
||||
{
|
||||
"STATS": {
|
||||
"summary": "PATCH__TBD__34__",
|
||||
"complexity": "PATCH__TBD__33__",
|
||||
"summary": "Return information about the function currently running (name, description, duration)",
|
||||
"complexity": "O(1)",
|
||||
"group": "scripting",
|
||||
"since": "7.0.0",
|
||||
"arity": 2,
|
||||
|
26
src/db.c
26
src/db.c
@ -32,6 +32,7 @@
|
||||
#include "atomicvar.h"
|
||||
#include "latency.h"
|
||||
#include "script.h"
|
||||
#include "functions.h"
|
||||
|
||||
#include <signal.h>
|
||||
#include <ctype.h>
|
||||
@ -412,22 +413,24 @@ long long emptyDbStructure(redisDb *dbarray, int dbnum, int async,
|
||||
return removed;
|
||||
}
|
||||
|
||||
/* Remove all keys from all the databases in a Redis server.
|
||||
* If callback is given the function is called from time to time to
|
||||
* signal that work is in progress.
|
||||
/* Remove all data (keys and functions) from all the databases in a
|
||||
* Redis server. If callback is given the function is called from
|
||||
* time to time to signal that work is in progress.
|
||||
*
|
||||
* The dbnum can be -1 if all the DBs should be flushed, or the specified
|
||||
* DB number if we want to flush only a single Redis database number.
|
||||
*
|
||||
* Flags are be EMPTYDB_NO_FLAGS if no special flags are specified or
|
||||
* EMPTYDB_ASYNC if we want the memory to be freed in a different thread
|
||||
* and the function to return ASAP.
|
||||
* and the function to return ASAP. EMPTYDB_NOFUNCTIONS can also be set
|
||||
* to specify that we do not want to delete the functions.
|
||||
*
|
||||
* On success the function returns the number of keys removed from the
|
||||
* database(s). Otherwise -1 is returned in the specific case the
|
||||
* DB number is out of range, and errno is set to EINVAL. */
|
||||
long long emptyDb(int dbnum, int flags, void(callback)(dict*)) {
|
||||
long long emptyData(int dbnum, int flags, void(callback)(dict*)) {
|
||||
int async = (flags & EMPTYDB_ASYNC);
|
||||
int with_functions = !(flags & EMPTYDB_NOFUNCTIONS);
|
||||
RedisModuleFlushInfoV1 fi = {REDISMODULE_FLUSHINFO_VERSION,!async,dbnum};
|
||||
long long removed = 0;
|
||||
|
||||
@ -456,6 +459,11 @@ long long emptyDb(int dbnum, int flags, void(callback)(dict*)) {
|
||||
|
||||
if (dbnum == -1) flushSlaveKeysWithExpireList();
|
||||
|
||||
if (with_functions) {
|
||||
serverAssert(dbnum == -1);
|
||||
functionsCtxClearCurrent(async);
|
||||
}
|
||||
|
||||
/* Also fire the end event. Note that this event will fire almost
|
||||
* immediately after the start event if the flush is asynchronous. */
|
||||
moduleFireServerEvent(REDISMODULE_EVENT_FLUSHDB,
|
||||
@ -583,7 +591,7 @@ int getFlushCommandFlags(client *c, int *flags) {
|
||||
|
||||
/* Flushes the whole server data set. */
|
||||
void flushAllDataAndResetRDB(int flags) {
|
||||
server.dirty += emptyDb(-1,flags,NULL);
|
||||
server.dirty += emptyData(-1,flags,NULL);
|
||||
if (server.child_type == CHILD_TYPE_RDB) killRDBChild();
|
||||
if (server.saveparamslen > 0) {
|
||||
/* Normally rdbSave() will reset dirty, but we don't want this here
|
||||
@ -614,7 +622,8 @@ void flushdbCommand(client *c) {
|
||||
int flags;
|
||||
|
||||
if (getFlushCommandFlags(c,&flags) == C_ERR) return;
|
||||
server.dirty += emptyDb(c->db->id,flags,NULL);
|
||||
/* flushdb should not flush the functions */
|
||||
server.dirty += emptyData(c->db->id,flags | EMPTYDB_NOFUNCTIONS,NULL);
|
||||
addReply(c,shared.ok);
|
||||
#if defined(USE_JEMALLOC)
|
||||
/* jemalloc 5 doesn't release pages back to the OS when there's no traffic.
|
||||
@ -631,7 +640,8 @@ void flushdbCommand(client *c) {
|
||||
void flushallCommand(client *c) {
|
||||
int flags;
|
||||
if (getFlushCommandFlags(c,&flags) == C_ERR) return;
|
||||
flushAllDataAndResetRDB(flags);
|
||||
/* flushall should not flush the functions */
|
||||
flushAllDataAndResetRDB(flags | EMPTYDB_NOFUNCTIONS);
|
||||
addReply(c,shared.ok);
|
||||
}
|
||||
|
||||
|
@ -553,7 +553,7 @@ NULL
|
||||
/* The default behavior is to remove the current dataset from
|
||||
* memory before loading the RDB file, however when MERGE is
|
||||
* used together with NOFLUSH, we are able to merge two datasets. */
|
||||
if (flush) emptyDb(-1,EMPTYDB_NO_FLAGS,NULL);
|
||||
if (flush) emptyData(-1,EMPTYDB_NO_FLAGS,NULL);
|
||||
|
||||
protectClient(c);
|
||||
int ret = rdbLoad(server.rdb_filename,NULL,flags);
|
||||
@ -566,7 +566,7 @@ NULL
|
||||
addReply(c,shared.ok);
|
||||
} else if (!strcasecmp(c->argv[1]->ptr,"loadaof")) {
|
||||
if (server.aof_state != AOF_OFF) flushAppendOnlyFile(1);
|
||||
emptyDb(-1,EMPTYDB_NO_FLAGS,NULL);
|
||||
emptyData(-1,EMPTYDB_NO_FLAGS,NULL);
|
||||
protectClient(c);
|
||||
int ret = loadAppendOnlyFile(server.aof_filename);
|
||||
if (ret != AOF_OK && ret != AOF_EMPTY)
|
||||
|
@ -105,6 +105,16 @@ void functionsCtxClear(functionsCtx *functions_ctx) {
|
||||
functions_ctx->cache_memory = 0;
|
||||
}
|
||||
|
||||
void functionsCtxClearCurrent(int async) {
|
||||
if (async) {
|
||||
functionsCtx *old_f_ctx = functions_ctx;
|
||||
functions_ctx = functionsCtxCreate();
|
||||
freeFunctionsAsync(old_f_ctx);
|
||||
} else {
|
||||
functionsCtxClear(functions_ctx);
|
||||
}
|
||||
}
|
||||
|
||||
/* Free the given functions ctx */
|
||||
void functionsCtxFree(functionsCtx *functions_ctx) {
|
||||
functionsCtxClear(functions_ctx);
|
||||
@ -301,11 +311,6 @@ void functionInfoCommand(client *c) {
|
||||
* FUNCTION DELETE <FUNCTION NAME>
|
||||
*/
|
||||
void functionDeleteCommand(client *c) {
|
||||
if (server.masterhost && server.repl_slave_ro && !(c->flags & CLIENT_MASTER)) {
|
||||
addReplyError(c, "Can not delete a function on a read only replica");
|
||||
return;
|
||||
}
|
||||
|
||||
robj *function_name = c->argv[2];
|
||||
functionInfo *fi = dictFetchValue(functions_ctx->functions, function_name->ptr);
|
||||
if (!fi) {
|
||||
@ -389,13 +394,8 @@ void functionFlushCommand(client *c) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (async) {
|
||||
functionsCtx *old_f_ctx = functions_ctx;
|
||||
functions_ctx = functionsCtxCreate();
|
||||
freeFunctionsAsync(old_f_ctx);
|
||||
} else {
|
||||
functionsCtxClear(functions_ctx);
|
||||
}
|
||||
functionsCtxClearCurrent(async);
|
||||
|
||||
/* Indicate that the command changed the data so it will be replicated and
|
||||
* counted as a data change (for persistence configuration) */
|
||||
server.dirty++;
|
||||
@ -481,12 +481,6 @@ int functionsCreateWithFunctionCtx(sds function_name,sds engine_name, sds desc,
|
||||
* FUNCTION CODE - function code to pass to the engine
|
||||
*/
|
||||
void functionCreateCommand(client *c) {
|
||||
|
||||
if (server.masterhost && server.repl_slave_ro && !(c->flags & CLIENT_MASTER)) {
|
||||
addReplyError(c, "Can not create a function on a read only replica");
|
||||
return;
|
||||
}
|
||||
|
||||
robj *engine_name = c->argv[2];
|
||||
robj *function_name = c->argv[3];
|
||||
|
||||
|
@ -108,6 +108,7 @@ dict* functionsGet();
|
||||
size_t functionsLen(functionsCtx *functions_ctx);
|
||||
functionsCtx* functionsCtxGetCurrent();
|
||||
functionsCtx* functionsCtxCreate();
|
||||
void functionsCtxClearCurrent(int async);
|
||||
void functionsCtxFree(functionsCtx *functions_ctx);
|
||||
void functionsCtxClear(functionsCtx *functions_ctx);
|
||||
void functionsCtxSwapWithCurrent(functionsCtx *functions_ctx);
|
||||
|
11
src/rdb.c
11
src/rdb.c
@ -2697,7 +2697,7 @@ void rdbLoadProgressCallback(rio *r, const void *buf, size_t len) {
|
||||
}
|
||||
}
|
||||
|
||||
static int rdbFunctionLoad(rio *rdb, int ver, functionsCtx* functions_ctx) {
|
||||
static int rdbFunctionLoad(rio *rdb, int ver, functionsCtx* functions_ctx, int rdbflags) {
|
||||
UNUSED(ver);
|
||||
sds name = NULL;
|
||||
sds engine_name = NULL;
|
||||
@ -2731,7 +2731,7 @@ static int rdbFunctionLoad(rio *rdb, int ver, functionsCtx* functions_ctx) {
|
||||
goto error;
|
||||
}
|
||||
|
||||
if (functionsCreateWithFunctionCtx(name, engine_name, desc, blob, 0, &err, functions_ctx) != C_OK) {
|
||||
if (functionsCreateWithFunctionCtx(name, engine_name, desc, blob, rdbflags & RDBFLAGS_ALLOW_DUP, &err, functions_ctx) != C_OK) {
|
||||
serverLog(LL_WARNING, "Failed compiling and saving the function %s", err);
|
||||
goto error;
|
||||
}
|
||||
@ -2751,13 +2751,8 @@ error:
|
||||
* otherwise C_ERR is returned and 'errno' is set accordingly. */
|
||||
int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
|
||||
functionsCtx* functions_ctx = functionsCtxGetCurrent();
|
||||
functionsCtxClear(functions_ctx);
|
||||
rdbLoadingCtx loading_ctx = { .dbarray = server.db, .functions_ctx = functions_ctx };
|
||||
int retval = rdbLoadRioWithLoadingCtx(rdb,rdbflags,rsi,&loading_ctx);
|
||||
if (retval != C_OK) {
|
||||
/* Loading failed, clear the function ctx */
|
||||
functionsCtxClear(functions_ctx);
|
||||
}
|
||||
return retval;
|
||||
}
|
||||
|
||||
@ -2969,7 +2964,7 @@ int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadin
|
||||
continue; /* Read next opcode. */
|
||||
}
|
||||
} else if (type == RDB_OPCODE_FUNCTION) {
|
||||
if (rdbFunctionLoad(rdb, rdbver, rdb_loading_ctx->functions_ctx) != C_OK) {
|
||||
if (rdbFunctionLoad(rdb, rdbver, rdb_loading_ctx->functions_ctx, rdbflags) != C_OK) {
|
||||
serverLog(LL_WARNING,"Failed loading function");
|
||||
goto eoferr;
|
||||
}
|
||||
|
@ -1921,7 +1921,7 @@ void readSyncBulkPayload(connection *conn) {
|
||||
replicationAttachToNewMaster();
|
||||
|
||||
serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Flushing old data");
|
||||
emptyDb(-1,empty_db_flags,replicationEmptyDbCallback);
|
||||
emptyData(-1,empty_db_flags,replicationEmptyDbCallback);
|
||||
}
|
||||
|
||||
/* Before loading the DB into memory we need to delete the readable
|
||||
@ -1997,7 +1997,7 @@ void readSyncBulkPayload(connection *conn) {
|
||||
serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Discarding temporary DB in background");
|
||||
} else {
|
||||
/* Remove the half-loaded data in case we started with an empty replica. */
|
||||
emptyDb(-1,empty_db_flags,replicationEmptyDbCallback);
|
||||
emptyData(-1,empty_db_flags,replicationEmptyDbCallback);
|
||||
}
|
||||
|
||||
/* Note that there's no point in restarting the AOF on SYNC
|
||||
|
@ -2846,7 +2846,8 @@ robj *dbUnshareStringValue(redisDb *db, robj *key, robj *o);
|
||||
|
||||
#define EMPTYDB_NO_FLAGS 0 /* No flags. */
|
||||
#define EMPTYDB_ASYNC (1<<0) /* Reclaim memory in another thread. */
|
||||
long long emptyDb(int dbnum, int flags, void(callback)(dict*));
|
||||
#define EMPTYDB_NOFUNCTIONS (1<<1) /* Indicate not to flush the functions. */
|
||||
long long emptyData(int dbnum, int flags, void(callback)(dict*));
|
||||
long long emptyDbStructure(redisDb *dbarray, int dbnum, int async, void(callback)(dict*));
|
||||
void flushAllDataAndResetRDB(int flags);
|
||||
long long dbTotalServerKeyCount();
|
||||
|
@ -117,6 +117,32 @@ start_server {tags {"scripting"}} {
|
||||
r fcall test 0
|
||||
} {hello} {needs:debug}
|
||||
|
||||
test {FUNCTION - test debug reload different options} {
|
||||
catch {r debug reload noflush} e
|
||||
assert_match "*Error trying to load the RDB*" $e
|
||||
r debug reload noflush merge
|
||||
r function list
|
||||
} {{name test engine LUA description {some description}}} {needs:debug}
|
||||
|
||||
test {FUNCTION - test debug reload with nosave and noflush} {
|
||||
r function delete test
|
||||
r set x 1
|
||||
r function create LUA test1 DESCRIPTION {some description} {return 'hello'}
|
||||
r debug reload
|
||||
r function create LUA test2 DESCRIPTION {some description} {return 'hello'}
|
||||
r debug reload nosave noflush merge
|
||||
assert_equal [r fcall test1 0] {hello}
|
||||
assert_equal [r fcall test2 0] {hello}
|
||||
} {} {needs:debug}
|
||||
|
||||
test {FUNCTION - test flushall and flushdb do not clean functions} {
|
||||
r function flush
|
||||
r function create lua test REPLACE {return redis.call('set', 'x', '1')}
|
||||
r flushall
|
||||
r flushdb
|
||||
r function list
|
||||
} {{name test engine LUA description {}}}
|
||||
|
||||
test {FUNCTION - test fcall_ro with write command} {
|
||||
r function create lua test REPLACE {return redis.call('set', 'x', '1')}
|
||||
catch { r fcall_ro test 0 } e
|
||||
@ -290,14 +316,14 @@ start_server {tags {"scripting repl external:skip"}} {
|
||||
r -1 function create LUA test DESCRIPTION {some description} {return 'hello'}
|
||||
} e
|
||||
set _ $e
|
||||
} {*Can not create a function on a read only replica*}
|
||||
} {*can't write against a read only replica*}
|
||||
|
||||
test "FUNCTION - delete on read only replica" {
|
||||
catch {
|
||||
r -1 function delete test
|
||||
} e
|
||||
set _ $e
|
||||
} {*Can not delete a function on a read only replica*}
|
||||
} {*can't write against a read only replica*}
|
||||
|
||||
test "FUNCTION - function effect is replicated to replica" {
|
||||
r function create LUA test REPLACE {return redis.call('set', 'x', '1')}
|
||||
@ -318,3 +344,31 @@ start_server {tags {"scripting repl external:skip"}} {
|
||||
} {*can't write against a read only replica*}
|
||||
}
|
||||
}
|
||||
|
||||
test {FUNCTION can processes create, delete and flush commands in AOF when doing "debug loadaof" in read-only slaves} {
|
||||
start_server {} {
|
||||
r config set appendonly yes
|
||||
waitForBgrewriteaof r
|
||||
r FUNCTION CREATE lua test "return 'hello'"
|
||||
r config set slave-read-only yes
|
||||
r slaveof 127.0.0.1 0
|
||||
r debug loadaof
|
||||
r slaveof no one
|
||||
assert_equal [r function list] {{name test engine LUA description {}}}
|
||||
|
||||
r FUNCTION DELETE test
|
||||
|
||||
r slaveof 127.0.0.1 0
|
||||
r debug loadaof
|
||||
r slaveof no one
|
||||
assert_equal [r function list] {}
|
||||
|
||||
r FUNCTION CREATE lua test "return 'hello'"
|
||||
r FUNCTION FLUSH
|
||||
|
||||
r slaveof 127.0.0.1 0
|
||||
r debug loadaof
|
||||
r slaveof no one
|
||||
assert_equal [r function list] {}
|
||||
}
|
||||
} {} {needs:debug external:skip}
|
||||
|
@ -618,6 +618,15 @@ start_server {tags {"scripting"}} {
|
||||
set e
|
||||
} {*wrong number*}
|
||||
|
||||
test {CLUSTER RESET can not be invoke from within a script} {
|
||||
catch {
|
||||
run_script {
|
||||
redis.call('cluster', 'reset', 'hard')
|
||||
} 0
|
||||
} e
|
||||
set _ $e
|
||||
} {*command is not allowed*}
|
||||
|
||||
test {Script with RESP3 map} {
|
||||
set expected_dict [dict create field value]
|
||||
set expected_list [list field value]
|
||||
@ -659,7 +668,7 @@ start_server {tags {"scripting"}} {
|
||||
}
|
||||
|
||||
test {Script check unpack with massive arguments} {
|
||||
r eval {
|
||||
run_script {
|
||||
local a = {}
|
||||
for i=1,7999 do
|
||||
a[i] = 1
|
||||
|
Loading…
Reference in New Issue
Block a user