mirror of
https://codeberg.org/redict/redict.git
synced 2025-01-21 23:58:51 -05:00
Add RM_RdbLoad and RM_RdbSave module API functions (#11852)
Add `RM_RdbLoad()` and `RM_RdbSave()` to load/save RDB files from the module API. In our use case, we have our clustering implementation as a module. As part of this implementation, the module needs to trigger RDB save operation at specific points. Also, this module delivers RDB files to other nodes (not using Redis' replication). When a node receives an RDB file, it should be able to load the RDB. Currently, there is no module API to save/load RDB files. This PR adds four new APIs: ```c RedisModuleRdbStream *RM_RdbStreamCreateFromFile(const char *filename); void RM_RdbStreamFree(RedisModuleRdbStream *stream); int RM_RdbLoad(RedisModuleCtx *ctx, RedisModuleRdbStream *stream, int flags); int RM_RdbSave(RedisModuleCtx *ctx, RedisModuleRdbStream *stream, int flags); ``` The first step is to create a `RedisModuleRdbStream` object. This PR provides a function to create RedisModuleRdbStream from the filename. (You can load/save RDB with the filename). In the future, this API can be extended if needed: e.g., `RM_RdbStreamCreateFromFd()`, `RM_RdbStreamCreateFromSocket()` to save/load RDB from an `fd` or a `socket`. Usage: ```c /* Save RDB */ RedisModuleRdbStream *stream = RedisModule_RdbStreamCreateFromFile("example.rdb"); RedisModule_RdbSave(ctx, stream, 0); RedisModule_RdbStreamFree(stream); /* Load RDB */ RedisModuleRdbStream *stream = RedisModule_RdbStreamCreateFromFile("example.rdb"); RedisModule_RdbLoad(ctx, stream, 0); RedisModule_RdbStreamFree(stream); ```
This commit is contained in:
parent
f263b6daf3
commit
e55568edb5
@ -54,4 +54,5 @@ $TCLSH tests/test_helper.tcl \
|
||||
--single unit/moduleapi/postnotifications \
|
||||
--single unit/moduleapi/async_rm_call \
|
||||
--single unit/moduleapi/moduleauth \
|
||||
--single unit/moduleapi/rdbloadsave \
|
||||
"${@}"
|
||||
|
135
src/module.c
135
src/module.c
@ -12751,6 +12751,137 @@ int RM_LoadConfigs(RedisModuleCtx *ctx) {
|
||||
return REDISMODULE_OK;
|
||||
}
|
||||
|
||||
/* --------------------------------------------------------------------------
|
||||
* ## RDB load/save API
|
||||
* -------------------------------------------------------------------------- */
|
||||
|
||||
#define REDISMODULE_RDB_STREAM_FILE 1
|
||||
|
||||
typedef struct RedisModuleRdbStream {
|
||||
int type;
|
||||
|
||||
union {
|
||||
char *filename;
|
||||
} data;
|
||||
} RedisModuleRdbStream;
|
||||
|
||||
/* Create a stream object to save/load RDB to/from a file.
|
||||
*
|
||||
* This function returns a pointer to RedisModuleRdbStream which is owned
|
||||
* by the caller. It requires a call to RM_RdbStreamFree() to free
|
||||
* the object. */
|
||||
RedisModuleRdbStream *RM_RdbStreamCreateFromFile(const char *filename) {
|
||||
RedisModuleRdbStream *stream = zmalloc(sizeof(*stream));
|
||||
stream->type = REDISMODULE_RDB_STREAM_FILE;
|
||||
stream->data.filename = zstrdup(filename);
|
||||
return stream;
|
||||
}
|
||||
|
||||
/* Release an RDB stream object. */
|
||||
void RM_RdbStreamFree(RedisModuleRdbStream *stream) {
|
||||
switch (stream->type) {
|
||||
case REDISMODULE_RDB_STREAM_FILE:
|
||||
zfree(stream->data.filename);
|
||||
break;
|
||||
default:
|
||||
serverAssert(0);
|
||||
break;
|
||||
}
|
||||
zfree(stream);
|
||||
}
|
||||
|
||||
/* Load RDB file from the `stream`. Dataset will be cleared first and then RDB
|
||||
* file will be loaded.
|
||||
*
|
||||
* `flags` must be zero. This parameter is for future use.
|
||||
*
|
||||
* On success REDISMODULE_OK is returned, otherwise REDISMODULE_ERR is returned
|
||||
* and errno is set accordingly.
|
||||
*
|
||||
* Example:
|
||||
*
|
||||
* RedisModuleRdbStream *s = RedisModule_RdbStreamCreateFromFile("exp.rdb");
|
||||
* RedisModule_RdbLoad(ctx, s, 0);
|
||||
* RedisModule_RdbStreamFree(s);
|
||||
*/
|
||||
int RM_RdbLoad(RedisModuleCtx *ctx, RedisModuleRdbStream *stream, int flags) {
|
||||
UNUSED(ctx);
|
||||
|
||||
if (!stream || flags != 0) {
|
||||
errno = EINVAL;
|
||||
return REDISMODULE_ERR;
|
||||
}
|
||||
|
||||
/* Not allowed on replicas. */
|
||||
if (server.masterhost != NULL) {
|
||||
errno = ENOTSUP;
|
||||
return REDISMODULE_ERR;
|
||||
}
|
||||
|
||||
/* Drop replicas if exist. */
|
||||
disconnectSlaves();
|
||||
freeReplicationBacklog();
|
||||
|
||||
if (server.aof_state != AOF_OFF) stopAppendOnly();
|
||||
|
||||
/* Kill existing RDB fork as it is saving outdated data. Also killing it
|
||||
* will prevent COW memory issue. */
|
||||
if (server.child_type == CHILD_TYPE_RDB) killRDBChild();
|
||||
|
||||
emptyData(-1,EMPTYDB_NO_FLAGS,NULL);
|
||||
|
||||
/* rdbLoad() can go back to the networking and process network events. If
|
||||
* RM_RdbLoad() is called inside a command callback, we don't want to
|
||||
* process the current client. Otherwise, we may free the client or try to
|
||||
* process next message while we are already in the command callback. */
|
||||
if (server.current_client) protectClient(server.current_client);
|
||||
|
||||
serverAssert(stream->type == REDISMODULE_RDB_STREAM_FILE);
|
||||
int ret = rdbLoad(stream->data.filename,NULL,RDBFLAGS_NONE);
|
||||
|
||||
if (server.current_client) unprotectClient(server.current_client);
|
||||
if (server.aof_state != AOF_OFF) startAppendOnly();
|
||||
|
||||
if (ret != RDB_OK) {
|
||||
errno = (ret == RDB_NOT_EXIST) ? ENOENT : EIO;
|
||||
return REDISMODULE_ERR;
|
||||
}
|
||||
|
||||
errno = 0;
|
||||
return REDISMODULE_OK;
|
||||
}
|
||||
|
||||
/* Save dataset to the RDB stream.
|
||||
*
|
||||
* `flags` must be zero. This parameter is for future use.
|
||||
*
|
||||
* On success REDISMODULE_OK is returned, otherwise REDISMODULE_ERR is returned
|
||||
* and errno is set accordingly.
|
||||
*
|
||||
* Example:
|
||||
*
|
||||
* RedisModuleRdbStream *s = RedisModule_RdbStreamCreateFromFile("exp.rdb");
|
||||
* RedisModule_RdbSave(ctx, s, 0);
|
||||
* RedisModule_RdbStreamFree(s);
|
||||
*/
|
||||
int RM_RdbSave(RedisModuleCtx *ctx, RedisModuleRdbStream *stream, int flags) {
|
||||
UNUSED(ctx);
|
||||
|
||||
if (!stream || flags != 0) {
|
||||
errno = EINVAL;
|
||||
return REDISMODULE_ERR;
|
||||
}
|
||||
|
||||
serverAssert(stream->type == REDISMODULE_RDB_STREAM_FILE);
|
||||
|
||||
if (rdbSaveToFile(stream->data.filename) != C_OK) {
|
||||
return REDISMODULE_ERR;
|
||||
}
|
||||
|
||||
errno = 0;
|
||||
return REDISMODULE_OK;
|
||||
}
|
||||
|
||||
/* Redis MODULE command.
|
||||
*
|
||||
* MODULE LIST
|
||||
@ -13627,4 +13758,8 @@ void moduleRegisterCoreAPI(void) {
|
||||
REGISTER_API(RegisterEnumConfig);
|
||||
REGISTER_API(LoadConfigs);
|
||||
REGISTER_API(RegisterAuthCallback);
|
||||
REGISTER_API(RdbStreamCreateFromFile);
|
||||
REGISTER_API(RdbStreamFree);
|
||||
REGISTER_API(RdbLoad);
|
||||
REGISTER_API(RdbSave);
|
||||
}
|
||||
|
71
src/rdb.c
71
src/rdb.c
@ -1437,31 +1437,29 @@ werr: /* Write error. */
|
||||
return C_ERR;
|
||||
}
|
||||
|
||||
/* Save the DB on disk. Return C_ERR on error, C_OK on success. */
|
||||
int rdbSave(int req, char *filename, rdbSaveInfo *rsi, int rdbflags) {
|
||||
char tmpfile[256];
|
||||
static int rdbSaveInternal(int req, const char *filename, rdbSaveInfo *rsi, int rdbflags) {
|
||||
char cwd[MAXPATHLEN]; /* Current working dir path for error messages. */
|
||||
FILE *fp = NULL;
|
||||
rio rdb;
|
||||
int error = 0;
|
||||
int saved_errno;
|
||||
char *err_op; /* For a detailed log */
|
||||
|
||||
snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid());
|
||||
fp = fopen(tmpfile,"w");
|
||||
FILE *fp = fopen(filename,"w");
|
||||
if (!fp) {
|
||||
saved_errno = errno;
|
||||
char *str_err = strerror(errno);
|
||||
char *cwdp = getcwd(cwd,MAXPATHLEN);
|
||||
serverLog(LL_WARNING,
|
||||
"Failed opening the temp RDB file %s (in server root dir %s) "
|
||||
"for saving: %s",
|
||||
tmpfile,
|
||||
filename,
|
||||
cwdp ? cwdp : "unknown",
|
||||
str_err);
|
||||
errno = saved_errno;
|
||||
return C_ERR;
|
||||
}
|
||||
|
||||
rioInitWithFile(&rdb,fp);
|
||||
startSaving(RDBFLAGS_NONE);
|
||||
|
||||
if (server.rdb_save_incremental_fsync) {
|
||||
rioSetAutoSync(&rdb,REDIS_AUTOSYNC_BYTES);
|
||||
@ -1481,7 +1479,46 @@ int rdbSave(int req, char *filename, rdbSaveInfo *rsi, int rdbflags) {
|
||||
serverLog(LL_NOTICE,"Unable to reclaim cache after saving RDB: %s", strerror(errno));
|
||||
}
|
||||
if (fclose(fp)) { fp = NULL; err_op = "fclose"; goto werr; }
|
||||
fp = NULL;
|
||||
|
||||
return C_OK;
|
||||
|
||||
werr:
|
||||
saved_errno = errno;
|
||||
serverLog(LL_WARNING,"Write error while saving DB to the disk(%s): %s", err_op, strerror(errno));
|
||||
if (fp) fclose(fp);
|
||||
unlink(filename);
|
||||
errno = saved_errno;
|
||||
return C_ERR;
|
||||
}
|
||||
|
||||
/* Save DB to the file. Similar to rdbSave() but this function won't use a
|
||||
* temporary file and won't update the metrics. */
|
||||
int rdbSaveToFile(const char *filename) {
|
||||
startSaving(RDBFLAGS_NONE);
|
||||
|
||||
if (rdbSaveInternal(SLAVE_REQ_NONE,filename,NULL,RDBFLAGS_NONE) != C_OK) {
|
||||
int saved_errno = errno;
|
||||
stopSaving(0);
|
||||
errno = saved_errno;
|
||||
return C_ERR;
|
||||
}
|
||||
|
||||
stopSaving(1);
|
||||
return C_OK;
|
||||
}
|
||||
|
||||
/* Save the DB on disk. Return C_ERR on error, C_OK on success. */
|
||||
int rdbSave(int req, char *filename, rdbSaveInfo *rsi, int rdbflags) {
|
||||
char tmpfile[256];
|
||||
char cwd[MAXPATHLEN]; /* Current working dir path for error messages. */
|
||||
|
||||
startSaving(RDBFLAGS_NONE);
|
||||
snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid());
|
||||
|
||||
if (rdbSaveInternal(req,tmpfile,rsi,rdbflags) != C_OK) {
|
||||
stopSaving(0);
|
||||
return C_ERR;
|
||||
}
|
||||
|
||||
/* Use RENAME to make sure the DB file is changed atomically only
|
||||
* if the generate DB file is ok. */
|
||||
@ -1499,7 +1536,12 @@ int rdbSave(int req, char *filename, rdbSaveInfo *rsi, int rdbflags) {
|
||||
stopSaving(0);
|
||||
return C_ERR;
|
||||
}
|
||||
if (fsyncFileDir(filename) == -1) { err_op = "fsyncFileDir"; goto werr; }
|
||||
if (fsyncFileDir(filename) != 0) {
|
||||
serverLog(LL_WARNING,
|
||||
"Failed to fsync directory while saving DB: %s", strerror(errno));
|
||||
stopSaving(0);
|
||||
return C_ERR;
|
||||
}
|
||||
|
||||
serverLog(LL_NOTICE,"DB saved on disk");
|
||||
server.dirty = 0;
|
||||
@ -1507,13 +1549,6 @@ int rdbSave(int req, char *filename, rdbSaveInfo *rsi, int rdbflags) {
|
||||
server.lastbgsave_status = C_OK;
|
||||
stopSaving(1);
|
||||
return C_OK;
|
||||
|
||||
werr:
|
||||
serverLog(LL_WARNING,"Write error saving DB on disk(%s): %s", err_op, strerror(errno));
|
||||
if (fp) fclose(fp);
|
||||
unlink(tmpfile);
|
||||
stopSaving(0);
|
||||
return C_ERR;
|
||||
}
|
||||
|
||||
int rdbSaveBackground(int req, char *filename, rdbSaveInfo *rsi, int rdbflags) {
|
||||
@ -3361,7 +3396,7 @@ int rdbLoad(char *filename, rdbSaveInfo *rsi, int rdbflags) {
|
||||
/* Reclaim the cache backed by rdb */
|
||||
if (retval == C_OK && !(rdbflags & RDBFLAGS_KEEP_CACHE)) {
|
||||
/* TODO: maybe we could combine the fopen and open into one in the future */
|
||||
rdb_fd = open(server.rdb_filename, O_RDONLY);
|
||||
rdb_fd = open(filename, O_RDONLY);
|
||||
if (rdb_fd > 0) bioCreateCloseJob(rdb_fd, 0, 1);
|
||||
}
|
||||
return (retval==C_OK) ? RDB_OK : RDB_FAILED;
|
||||
|
@ -157,6 +157,7 @@ int rdbLoad(char *filename, rdbSaveInfo *rsi, int rdbflags);
|
||||
int rdbSaveBackground(int req, char *filename, rdbSaveInfo *rsi, int rdbflags);
|
||||
int rdbSaveToSlavesSockets(int req, rdbSaveInfo *rsi);
|
||||
void rdbRemoveTempFile(pid_t childpid, int from_signal);
|
||||
int rdbSaveToFile(const char *filename);
|
||||
int rdbSave(int req, char *filename, rdbSaveInfo *rsi, int rdbflags);
|
||||
ssize_t rdbSaveObject(rio *rdb, robj *o, robj *key, int dbid);
|
||||
size_t rdbSavedObjectLen(robj *o, robj *key, int dbid);
|
||||
|
@ -881,6 +881,7 @@ typedef struct RedisModuleServerInfoData RedisModuleServerInfoData;
|
||||
typedef struct RedisModuleScanCursor RedisModuleScanCursor;
|
||||
typedef struct RedisModuleUser RedisModuleUser;
|
||||
typedef struct RedisModuleKeyOptCtx RedisModuleKeyOptCtx;
|
||||
typedef struct RedisModuleRdbStream RedisModuleRdbStream;
|
||||
|
||||
typedef int (*RedisModuleCmdFunc)(RedisModuleCtx *ctx, RedisModuleString **argv, int argc);
|
||||
typedef void (*RedisModuleDisconnectFunc)(RedisModuleCtx *ctx, RedisModuleBlockedClient *bc);
|
||||
@ -1303,6 +1304,10 @@ REDISMODULE_API int (*RedisModule_RegisterNumericConfig)(RedisModuleCtx *ctx, co
|
||||
REDISMODULE_API int (*RedisModule_RegisterStringConfig)(RedisModuleCtx *ctx, const char *name, const char *default_val, unsigned int flags, RedisModuleConfigGetStringFunc getfn, RedisModuleConfigSetStringFunc setfn, RedisModuleConfigApplyFunc applyfn, void *privdata) REDISMODULE_ATTR;
|
||||
REDISMODULE_API int (*RedisModule_RegisterEnumConfig)(RedisModuleCtx *ctx, const char *name, int default_val, unsigned int flags, const char **enum_values, const int *int_values, int num_enum_vals, RedisModuleConfigGetEnumFunc getfn, RedisModuleConfigSetEnumFunc setfn, RedisModuleConfigApplyFunc applyfn, void *privdata) REDISMODULE_ATTR;
|
||||
REDISMODULE_API int (*RedisModule_LoadConfigs)(RedisModuleCtx *ctx) REDISMODULE_ATTR;
|
||||
REDISMODULE_API RedisModuleRdbStream *(*RedisModule_RdbStreamCreateFromFile)(const char *filename) REDISMODULE_ATTR;
|
||||
REDISMODULE_API void (*RedisModule_RdbStreamFree)(RedisModuleRdbStream *stream) REDISMODULE_ATTR;
|
||||
REDISMODULE_API int (*RedisModule_RdbLoad)(RedisModuleCtx *ctx, RedisModuleRdbStream *stream, int flags) REDISMODULE_ATTR;
|
||||
REDISMODULE_API int (*RedisModule_RdbSave)(RedisModuleCtx *ctx, RedisModuleRdbStream *stream, int flags) REDISMODULE_ATTR;
|
||||
|
||||
#define RedisModule_IsAOFClient(id) ((id) == UINT64_MAX)
|
||||
|
||||
@ -1658,6 +1663,10 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int
|
||||
REDISMODULE_GET_API(RegisterStringConfig);
|
||||
REDISMODULE_GET_API(RegisterEnumConfig);
|
||||
REDISMODULE_GET_API(LoadConfigs);
|
||||
REDISMODULE_GET_API(RdbStreamCreateFromFile);
|
||||
REDISMODULE_GET_API(RdbStreamFree);
|
||||
REDISMODULE_GET_API(RdbLoad);
|
||||
REDISMODULE_GET_API(RdbSave);
|
||||
|
||||
if (RedisModule_IsModuleNameBusy && RedisModule_IsModuleNameBusy(name)) return REDISMODULE_ERR;
|
||||
RedisModule_SetModuleAttribs(ctx,name,ver,apiver);
|
||||
|
@ -61,7 +61,8 @@ TEST_MODULES = \
|
||||
publish.so \
|
||||
usercall.so \
|
||||
postnotifications.so \
|
||||
moduleauthtwo.so
|
||||
moduleauthtwo.so \
|
||||
rdbloadsave.so
|
||||
|
||||
.PHONY: all
|
||||
|
||||
|
162
tests/modules/rdbloadsave.c
Normal file
162
tests/modules/rdbloadsave.c
Normal file
@ -0,0 +1,162 @@
|
||||
#include "redismodule.h"
|
||||
|
||||
#include <stdlib.h>
|
||||
#include <unistd.h>
|
||||
#include <fcntl.h>
|
||||
#include <memory.h>
|
||||
#include <errno.h>
|
||||
|
||||
/* Sanity tests to verify inputs and return values. */
|
||||
int sanity(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
|
||||
REDISMODULE_NOT_USED(argv);
|
||||
REDISMODULE_NOT_USED(argc);
|
||||
|
||||
RedisModuleRdbStream *s = RedisModule_RdbStreamCreateFromFile("dbnew.rdb");
|
||||
|
||||
/* NULL stream should fail. */
|
||||
if (RedisModule_RdbLoad(ctx, NULL, 0) == REDISMODULE_OK || errno != EINVAL) {
|
||||
RedisModule_ReplyWithError(ctx, strerror(errno));
|
||||
goto out;
|
||||
}
|
||||
|
||||
/* Invalid flags should fail. */
|
||||
if (RedisModule_RdbLoad(ctx, s, 188) == REDISMODULE_OK || errno != EINVAL) {
|
||||
RedisModule_ReplyWithError(ctx, strerror(errno));
|
||||
goto out;
|
||||
}
|
||||
|
||||
/* Missing file should fail. */
|
||||
if (RedisModule_RdbLoad(ctx, s, 0) == REDISMODULE_OK || errno != ENOENT) {
|
||||
RedisModule_ReplyWithError(ctx, strerror(errno));
|
||||
goto out;
|
||||
}
|
||||
|
||||
/* Save RDB file. */
|
||||
if (RedisModule_RdbSave(ctx, s, 0) != REDISMODULE_OK || errno != 0) {
|
||||
RedisModule_ReplyWithError(ctx, strerror(errno));
|
||||
goto out;
|
||||
}
|
||||
|
||||
/* Load the saved RDB file. */
|
||||
if (RedisModule_RdbLoad(ctx, s, 0) != REDISMODULE_OK || errno != 0) {
|
||||
RedisModule_ReplyWithError(ctx, strerror(errno));
|
||||
goto out;
|
||||
}
|
||||
|
||||
RedisModule_ReplyWithSimpleString(ctx, "OK");
|
||||
|
||||
out:
|
||||
RedisModule_RdbStreamFree(s);
|
||||
return REDISMODULE_OK;
|
||||
}
|
||||
|
||||
int cmd_rdbsave(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
|
||||
if (argc != 2) {
|
||||
RedisModule_WrongArity(ctx);
|
||||
return REDISMODULE_OK;
|
||||
}
|
||||
|
||||
size_t len;
|
||||
const char *filename = RedisModule_StringPtrLen(argv[1], &len);
|
||||
|
||||
char tmp[len + 1];
|
||||
memcpy(tmp, filename, len);
|
||||
tmp[len] = '\0';
|
||||
|
||||
RedisModuleRdbStream *stream = RedisModule_RdbStreamCreateFromFile(tmp);
|
||||
|
||||
if (RedisModule_RdbSave(ctx, stream, 0) != REDISMODULE_OK || errno != 0) {
|
||||
RedisModule_ReplyWithError(ctx, strerror(errno));
|
||||
goto out;
|
||||
}
|
||||
|
||||
RedisModule_ReplyWithSimpleString(ctx, "OK");
|
||||
|
||||
out:
|
||||
RedisModule_RdbStreamFree(stream);
|
||||
return REDISMODULE_OK;
|
||||
}
|
||||
|
||||
/* Fork before calling RM_RdbSave(). */
|
||||
int cmd_rdbsave_fork(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
|
||||
if (argc != 2) {
|
||||
RedisModule_WrongArity(ctx);
|
||||
return REDISMODULE_OK;
|
||||
}
|
||||
|
||||
size_t len;
|
||||
const char *filename = RedisModule_StringPtrLen(argv[1], &len);
|
||||
|
||||
char tmp[len + 1];
|
||||
memcpy(tmp, filename, len);
|
||||
tmp[len] = '\0';
|
||||
|
||||
int fork_child_pid = RedisModule_Fork(NULL, NULL);
|
||||
if (fork_child_pid < 0) {
|
||||
RedisModule_ReplyWithError(ctx, strerror(errno));
|
||||
return REDISMODULE_OK;
|
||||
} else if (fork_child_pid > 0) {
|
||||
/* parent */
|
||||
RedisModule_ReplyWithSimpleString(ctx, "OK");
|
||||
return REDISMODULE_OK;
|
||||
}
|
||||
|
||||
RedisModuleRdbStream *stream = RedisModule_RdbStreamCreateFromFile(tmp);
|
||||
|
||||
int ret = 0;
|
||||
if (RedisModule_RdbSave(ctx, stream, 0) != REDISMODULE_OK) {
|
||||
ret = errno;
|
||||
}
|
||||
RedisModule_RdbStreamFree(stream);
|
||||
|
||||
RedisModule_ExitFromChild(ret);
|
||||
return REDISMODULE_OK;
|
||||
}
|
||||
|
||||
int cmd_rdbload(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
|
||||
if (argc != 2) {
|
||||
RedisModule_WrongArity(ctx);
|
||||
return REDISMODULE_OK;
|
||||
}
|
||||
|
||||
size_t len;
|
||||
const char *filename = RedisModule_StringPtrLen(argv[1], &len);
|
||||
|
||||
char tmp[len + 1];
|
||||
memcpy(tmp, filename, len);
|
||||
tmp[len] = '\0';
|
||||
|
||||
RedisModuleRdbStream *stream = RedisModule_RdbStreamCreateFromFile(tmp);
|
||||
|
||||
if (RedisModule_RdbLoad(ctx, stream, 0) != REDISMODULE_OK || errno != 0) {
|
||||
RedisModule_RdbStreamFree(stream);
|
||||
RedisModule_ReplyWithError(ctx, strerror(errno));
|
||||
return REDISMODULE_OK;
|
||||
}
|
||||
|
||||
RedisModule_RdbStreamFree(stream);
|
||||
RedisModule_ReplyWithSimpleString(ctx, "OK");
|
||||
return REDISMODULE_OK;
|
||||
}
|
||||
|
||||
int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
|
||||
REDISMODULE_NOT_USED(argv);
|
||||
REDISMODULE_NOT_USED(argc);
|
||||
|
||||
if (RedisModule_Init(ctx, "rdbloadsave", 1, REDISMODULE_APIVER_1) == REDISMODULE_ERR)
|
||||
return REDISMODULE_ERR;
|
||||
|
||||
if (RedisModule_CreateCommand(ctx, "test.sanity", sanity, "", 0, 0, 0) == REDISMODULE_ERR)
|
||||
return REDISMODULE_ERR;
|
||||
|
||||
if (RedisModule_CreateCommand(ctx, "test.rdbsave", cmd_rdbsave, "", 0, 0, 0) == REDISMODULE_ERR)
|
||||
return REDISMODULE_ERR;
|
||||
|
||||
if (RedisModule_CreateCommand(ctx, "test.rdbsave_fork", cmd_rdbsave_fork, "", 0, 0, 0) == REDISMODULE_ERR)
|
||||
return REDISMODULE_ERR;
|
||||
|
||||
if (RedisModule_CreateCommand(ctx, "test.rdbload", cmd_rdbload, "", 0, 0, 0) == REDISMODULE_ERR)
|
||||
return REDISMODULE_ERR;
|
||||
|
||||
return REDISMODULE_OK;
|
||||
}
|
200
tests/unit/moduleapi/rdbloadsave.tcl
Normal file
200
tests/unit/moduleapi/rdbloadsave.tcl
Normal file
@ -0,0 +1,200 @@
|
||||
set testmodule [file normalize tests/modules/rdbloadsave.so]
|
||||
|
||||
start_server {tags {"modules"}} {
|
||||
r module load $testmodule
|
||||
|
||||
test "Module rdbloadsave sanity" {
|
||||
r test.sanity
|
||||
|
||||
# Try to load non-existing file
|
||||
assert_error {*No such file or directory*} {r test.rdbload sanity.rdb}
|
||||
|
||||
r set x 1
|
||||
assert_equal OK [r test.rdbsave sanity.rdb]
|
||||
|
||||
r flushdb
|
||||
assert_equal OK [r test.rdbload sanity.rdb]
|
||||
assert_equal 1 [r get x]
|
||||
}
|
||||
|
||||
test "Module rdbloadsave test with pipelining" {
|
||||
r config set save ""
|
||||
r config set loading-process-events-interval-bytes 1024
|
||||
r config set key-load-delay 50
|
||||
r flushdb
|
||||
|
||||
populate 3000 a 1024
|
||||
r set x 111
|
||||
assert_equal [r dbsize] 3001
|
||||
|
||||
assert_equal OK [r test.rdbsave blabla.rdb]
|
||||
r flushdb
|
||||
assert_equal [r dbsize] 0
|
||||
|
||||
# Send commands with pipeline. First command will call RM_RdbLoad() in
|
||||
# the command callback. While loading RDB, Redis can go to networking to
|
||||
# reply -LOADING. By sending commands in pipeline, we verify it doesn't
|
||||
# cause a problem.
|
||||
# e.g. Redis won't try to process next message of the current client
|
||||
# while it is in the command callback for that client .
|
||||
set rd1 [redis_deferring_client]
|
||||
$rd1 test.rdbload blabla.rdb
|
||||
|
||||
wait_for_condition 50 100 {
|
||||
[s loading] eq 1
|
||||
} else {
|
||||
fail "Redis did not start loading or loaded RDB too fast"
|
||||
}
|
||||
|
||||
$rd1 get x
|
||||
$rd1 dbsize
|
||||
|
||||
assert_equal OK [$rd1 read]
|
||||
assert_equal 111 [$rd1 read]
|
||||
assert_equal 3001 [$rd1 read]
|
||||
r flushdb
|
||||
r config set key-load-delay 0
|
||||
}
|
||||
|
||||
test "Module rdbloadsave with aof" {
|
||||
r config set save ""
|
||||
|
||||
# Enable the AOF
|
||||
r config set appendonly yes
|
||||
r config set auto-aof-rewrite-percentage 0 ; # Disable auto-rewrite.
|
||||
waitForBgrewriteaof r
|
||||
|
||||
r set k v1
|
||||
assert_equal OK [r test.rdbsave aoftest.rdb]
|
||||
|
||||
r set k v2
|
||||
r config set rdb-key-save-delay 10000000
|
||||
r bgrewriteaof
|
||||
|
||||
# RM_RdbLoad() should kill aof fork
|
||||
assert_equal OK [r test.rdbload aoftest.rdb]
|
||||
|
||||
wait_for_condition 50 100 {
|
||||
[string match {*Killing*AOF*child*} [exec tail -20 < [srv 0 stdout]]]
|
||||
} else {
|
||||
fail "Can't find 'Killing AOF child' in recent log lines"
|
||||
}
|
||||
|
||||
# Verify the value in the loaded rdb
|
||||
assert_equal v1 [r get k]
|
||||
|
||||
r flushdb
|
||||
r config set rdb-key-save-delay 0
|
||||
r config set appendonly no
|
||||
}
|
||||
|
||||
test "Module rdbloadsave with bgsave" {
|
||||
r flushdb
|
||||
r config set save ""
|
||||
|
||||
r set k v1
|
||||
assert_equal OK [r test.rdbsave bgsave.rdb]
|
||||
|
||||
r set k v2
|
||||
r config set rdb-key-save-delay 500000
|
||||
r bgsave
|
||||
|
||||
# RM_RdbLoad() should kill RDB fork
|
||||
assert_equal OK [r test.rdbload bgsave.rdb]
|
||||
|
||||
wait_for_condition 10 1000 {
|
||||
[string match {*Background*saving*terminated*} [exec tail -20 < [srv 0 stdout]]]
|
||||
} else {
|
||||
fail "Can't find 'Background saving terminated' in recent log lines"
|
||||
}
|
||||
|
||||
assert_equal v1 [r get k]
|
||||
r flushall
|
||||
waitForBgsave r
|
||||
r config set rdb-key-save-delay 0
|
||||
}
|
||||
|
||||
test "Module rdbloadsave calls rdbsave in a module fork" {
|
||||
r flushdb
|
||||
r config set save ""
|
||||
r config set rdb-key-save-delay 500000
|
||||
|
||||
r set k v1
|
||||
|
||||
# Module will call RM_Fork() before calling RM_RdbSave()
|
||||
assert_equal OK [r test.rdbsave_fork rdbfork.rdb]
|
||||
assert_equal [s module_fork_in_progress] 1
|
||||
|
||||
wait_for_condition 10 1000 {
|
||||
[status r module_fork_in_progress] == "0"
|
||||
} else {
|
||||
fail "Module fork didn't finish"
|
||||
}
|
||||
|
||||
r set k v2
|
||||
assert_equal OK [r test.rdbload rdbfork.rdb]
|
||||
assert_equal v1 [r get k]
|
||||
|
||||
r config set rdb-key-save-delay 0
|
||||
}
|
||||
|
||||
test "Unload the module - rdbloadsave" {
|
||||
assert_equal {OK} [r module unload rdbloadsave]
|
||||
}
|
||||
|
||||
tags {repl} {
|
||||
test {Module rdbloadsave on master and replica} {
|
||||
start_server [list overrides [list loadmodule "$testmodule"]] {
|
||||
set replica [srv 0 client]
|
||||
set replica_host [srv 0 host]
|
||||
set replica_port [srv 0 port]
|
||||
start_server [list overrides [list loadmodule "$testmodule"]] {
|
||||
set master [srv 0 client]
|
||||
set master_host [srv 0 host]
|
||||
set master_port [srv 0 port]
|
||||
|
||||
$master set x 10000
|
||||
|
||||
# Start the replication process...
|
||||
$replica replicaof $master_host $master_port
|
||||
|
||||
wait_for_condition 100 100 {
|
||||
[status $master sync_full] == 1
|
||||
} else {
|
||||
fail "Master <-> Replica didn't start the full sync"
|
||||
}
|
||||
|
||||
# RM_RdbSave() is allowed on replicas
|
||||
assert_equal OK [$replica test.rdbsave rep.rdb]
|
||||
|
||||
# RM_RdbLoad() is not allowed on replicas
|
||||
assert_error {*supported*} {$replica test.rdbload rep.rdb}
|
||||
|
||||
assert_equal OK [$master test.rdbsave master.rdb]
|
||||
$master set x 20000
|
||||
|
||||
wait_for_condition 100 100 {
|
||||
[$replica get x] == 20000
|
||||
} else {
|
||||
fail "Replica didn't get the update"
|
||||
}
|
||||
|
||||
# Loading RDB on master will drop replicas
|
||||
assert_equal OK [$master test.rdbload master.rdb]
|
||||
|
||||
wait_for_condition 100 100 {
|
||||
[status $master sync_full] == 2
|
||||
} else {
|
||||
fail "Master <-> Replica didn't start the full sync"
|
||||
}
|
||||
|
||||
wait_for_condition 100 100 {
|
||||
[$replica get x] == 10000
|
||||
} else {
|
||||
fail "Replica didn't get the update"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user