mirror of
https://codeberg.org/redict/redict.git
synced 2025-01-22 16:18:28 -05:00
Merge pull request #6247 from oranagra/modules_fork
Module API for Forking
This commit is contained in:
commit
b7c33af4a5
@ -13,4 +13,4 @@ then
|
|||||||
fi
|
fi
|
||||||
|
|
||||||
make -C tests/modules && \
|
make -C tests/modules && \
|
||||||
$TCLSH tests/test_helper.tcl --single unit/moduleapi/commandfilter --single unit/moduleapi/testrdb "${@}"
|
$TCLSH tests/test_helper.tcl --single unit/moduleapi/commandfilter --single unit/moduleapi/fork --single unit/moduleapi/testrdb "${@}"
|
||||||
|
35
src/aof.c
35
src/aof.c
@ -264,9 +264,9 @@ int startAppendOnly(void) {
|
|||||||
strerror(errno));
|
strerror(errno));
|
||||||
return C_ERR;
|
return C_ERR;
|
||||||
}
|
}
|
||||||
if (server.rdb_child_pid != -1) {
|
if (hasForkChild() && server.aof_child_pid == -1) {
|
||||||
server.aof_rewrite_scheduled = 1;
|
server.aof_rewrite_scheduled = 1;
|
||||||
serverLog(LL_WARNING,"AOF was enabled but there is already a child process saving an RDB file on disk. An AOF background was scheduled to start when possible.");
|
serverLog(LL_WARNING,"AOF was enabled but there is already another background operation. An AOF background was scheduled to start when possible.");
|
||||||
} else {
|
} else {
|
||||||
/* If there is a pending AOF rewrite, we need to switch it off and
|
/* If there is a pending AOF rewrite, we need to switch it off and
|
||||||
* start a new one: the old one cannot be reused because it is not
|
* start a new one: the old one cannot be reused because it is not
|
||||||
@ -395,7 +395,7 @@ void flushAppendOnlyFile(int force) {
|
|||||||
* useful for graphing / monitoring purposes. */
|
* useful for graphing / monitoring purposes. */
|
||||||
if (sync_in_progress) {
|
if (sync_in_progress) {
|
||||||
latencyAddSampleIfNeeded("aof-write-pending-fsync",latency);
|
latencyAddSampleIfNeeded("aof-write-pending-fsync",latency);
|
||||||
} else if (server.aof_child_pid != -1 || server.rdb_child_pid != -1) {
|
} else if (hasForkChild()) {
|
||||||
latencyAddSampleIfNeeded("aof-write-active-child",latency);
|
latencyAddSampleIfNeeded("aof-write-active-child",latency);
|
||||||
} else {
|
} else {
|
||||||
latencyAddSampleIfNeeded("aof-write-alone",latency);
|
latencyAddSampleIfNeeded("aof-write-alone",latency);
|
||||||
@ -491,9 +491,8 @@ void flushAppendOnlyFile(int force) {
|
|||||||
try_fsync:
|
try_fsync:
|
||||||
/* Don't fsync if no-appendfsync-on-rewrite is set to yes and there are
|
/* Don't fsync if no-appendfsync-on-rewrite is set to yes and there are
|
||||||
* children doing I/O in the background. */
|
* children doing I/O in the background. */
|
||||||
if (server.aof_no_fsync_on_rewrite &&
|
if (server.aof_no_fsync_on_rewrite && hasForkChild())
|
||||||
(server.aof_child_pid != -1 || server.rdb_child_pid != -1))
|
return;
|
||||||
return;
|
|
||||||
|
|
||||||
/* Perform the fsync if needed. */
|
/* Perform the fsync if needed. */
|
||||||
if (server.aof_fsync == AOF_FSYNC_ALWAYS) {
|
if (server.aof_fsync == AOF_FSYNC_ALWAYS) {
|
||||||
@ -1563,39 +1562,24 @@ void aofClosePipes(void) {
|
|||||||
*/
|
*/
|
||||||
int rewriteAppendOnlyFileBackground(void) {
|
int rewriteAppendOnlyFileBackground(void) {
|
||||||
pid_t childpid;
|
pid_t childpid;
|
||||||
long long start;
|
|
||||||
|
|
||||||
if (server.aof_child_pid != -1 || server.rdb_child_pid != -1) return C_ERR;
|
if (hasForkChild()) return C_ERR;
|
||||||
if (aofCreatePipes() != C_OK) return C_ERR;
|
if (aofCreatePipes() != C_OK) return C_ERR;
|
||||||
openChildInfoPipe();
|
openChildInfoPipe();
|
||||||
start = ustime();
|
if ((childpid = redisFork()) == 0) {
|
||||||
if ((childpid = fork()) == 0) {
|
|
||||||
char tmpfile[256];
|
char tmpfile[256];
|
||||||
|
|
||||||
/* Child */
|
/* Child */
|
||||||
closeListeningSockets(0);
|
|
||||||
redisSetProcTitle("redis-aof-rewrite");
|
redisSetProcTitle("redis-aof-rewrite");
|
||||||
snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) getpid());
|
snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) getpid());
|
||||||
if (rewriteAppendOnlyFile(tmpfile) == C_OK) {
|
if (rewriteAppendOnlyFile(tmpfile) == C_OK) {
|
||||||
size_t private_dirty = zmalloc_get_private_dirty(-1);
|
sendChildCOWInfo(CHILD_INFO_TYPE_AOF, "AOF rewrite");
|
||||||
|
|
||||||
if (private_dirty) {
|
|
||||||
serverLog(LL_NOTICE,
|
|
||||||
"AOF rewrite: %zu MB of memory used by copy-on-write",
|
|
||||||
private_dirty/(1024*1024));
|
|
||||||
}
|
|
||||||
|
|
||||||
server.child_info_data.cow_size = private_dirty;
|
|
||||||
sendChildInfo(CHILD_INFO_TYPE_AOF);
|
|
||||||
exitFromChild(0);
|
exitFromChild(0);
|
||||||
} else {
|
} else {
|
||||||
exitFromChild(1);
|
exitFromChild(1);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
/* Parent */
|
/* Parent */
|
||||||
server.stat_fork_time = ustime()-start;
|
|
||||||
server.stat_fork_rate = (double) zmalloc_used_memory() * 1000000 / server.stat_fork_time / (1024*1024*1024); /* GB per second. */
|
|
||||||
latencyAddSampleIfNeeded("fork",server.stat_fork_time/1000);
|
|
||||||
if (childpid == -1) {
|
if (childpid == -1) {
|
||||||
closeChildInfoPipe();
|
closeChildInfoPipe();
|
||||||
serverLog(LL_WARNING,
|
serverLog(LL_WARNING,
|
||||||
@ -1609,7 +1593,6 @@ int rewriteAppendOnlyFileBackground(void) {
|
|||||||
server.aof_rewrite_scheduled = 0;
|
server.aof_rewrite_scheduled = 0;
|
||||||
server.aof_rewrite_time_start = time(NULL);
|
server.aof_rewrite_time_start = time(NULL);
|
||||||
server.aof_child_pid = childpid;
|
server.aof_child_pid = childpid;
|
||||||
updateDictResizePolicy();
|
|
||||||
/* We set appendseldb to -1 in order to force the next call to the
|
/* We set appendseldb to -1 in order to force the next call to the
|
||||||
* feedAppendOnlyFile() to issue a SELECT command, so the differences
|
* feedAppendOnlyFile() to issue a SELECT command, so the differences
|
||||||
* accumulated by the parent into server.aof_rewrite_buf will start
|
* accumulated by the parent into server.aof_rewrite_buf will start
|
||||||
@ -1624,7 +1607,7 @@ int rewriteAppendOnlyFileBackground(void) {
|
|||||||
void bgrewriteaofCommand(client *c) {
|
void bgrewriteaofCommand(client *c) {
|
||||||
if (server.aof_child_pid != -1) {
|
if (server.aof_child_pid != -1) {
|
||||||
addReplyError(c,"Background append only file rewriting already in progress");
|
addReplyError(c,"Background append only file rewriting already in progress");
|
||||||
} else if (server.rdb_child_pid != -1) {
|
} else if (hasForkChild()) {
|
||||||
server.aof_rewrite_scheduled = 1;
|
server.aof_rewrite_scheduled = 1;
|
||||||
addReplyStatus(c,"Background append only file rewriting scheduled");
|
addReplyStatus(c,"Background append only file rewriting scheduled");
|
||||||
} else if (rewriteAppendOnlyFileBackground() == C_OK) {
|
} else if (rewriteAppendOnlyFileBackground() == C_OK) {
|
||||||
|
@ -80,6 +80,8 @@ void receiveChildInfo(void) {
|
|||||||
server.stat_rdb_cow_bytes = server.child_info_data.cow_size;
|
server.stat_rdb_cow_bytes = server.child_info_data.cow_size;
|
||||||
} else if (server.child_info_data.process_type == CHILD_INFO_TYPE_AOF) {
|
} else if (server.child_info_data.process_type == CHILD_INFO_TYPE_AOF) {
|
||||||
server.stat_aof_cow_bytes = server.child_info_data.cow_size;
|
server.stat_aof_cow_bytes = server.child_info_data.cow_size;
|
||||||
|
} else if (server.child_info_data.process_type == CHILD_INFO_TYPE_MODULE) {
|
||||||
|
server.stat_module_cow_bytes = server.child_info_data.cow_size;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
5
src/db.c
5
src/db.c
@ -60,10 +60,7 @@ robj *lookupKey(redisDb *db, robj *key, int flags) {
|
|||||||
/* Update the access time for the ageing algorithm.
|
/* Update the access time for the ageing algorithm.
|
||||||
* Don't do it if we have a saving child, as this will trigger
|
* Don't do it if we have a saving child, as this will trigger
|
||||||
* a copy on write madness. */
|
* a copy on write madness. */
|
||||||
if (server.rdb_child_pid == -1 &&
|
if (!hasForkChild() && !(flags & LOOKUP_NOTOUCH)){
|
||||||
server.aof_child_pid == -1 &&
|
|
||||||
!(flags & LOOKUP_NOTOUCH))
|
|
||||||
{
|
|
||||||
if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) {
|
if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) {
|
||||||
updateLFU(val);
|
updateLFU(val);
|
||||||
} else {
|
} else {
|
||||||
|
@ -1039,7 +1039,7 @@ void activeDefragCycle(void) {
|
|||||||
mstime_t latency;
|
mstime_t latency;
|
||||||
int quit = 0;
|
int quit = 0;
|
||||||
|
|
||||||
if (server.aof_child_pid!=-1 || server.rdb_child_pid!=-1)
|
if (hasForkChild())
|
||||||
return; /* Defragging memory while there's a fork will just do damage. */
|
return; /* Defragging memory while there's a fork will just do damage. */
|
||||||
|
|
||||||
/* Once a second, check if we the fragmentation justfies starting a scan
|
/* Once a second, check if we the fragmentation justfies starting a scan
|
||||||
|
104
src/module.c
104
src/module.c
@ -31,6 +31,7 @@
|
|||||||
#include "cluster.h"
|
#include "cluster.h"
|
||||||
#include "rdb.h"
|
#include "rdb.h"
|
||||||
#include <dlfcn.h>
|
#include <dlfcn.h>
|
||||||
|
#include <wait.h>
|
||||||
|
|
||||||
#define REDISMODULE_CORE 1
|
#define REDISMODULE_CORE 1
|
||||||
#include "redismodule.h"
|
#include "redismodule.h"
|
||||||
@ -293,6 +294,14 @@ typedef struct RedisModuleCommandFilter {
|
|||||||
/* Registered filters */
|
/* Registered filters */
|
||||||
static list *moduleCommandFilters;
|
static list *moduleCommandFilters;
|
||||||
|
|
||||||
|
typedef void (*RedisModuleForkDoneHandler) (int exitcode, int bysignal, void *user_data);
|
||||||
|
|
||||||
|
static struct RedisModuleForkInfo {
|
||||||
|
RedisModuleForkDoneHandler done_handler;
|
||||||
|
void* done_handler_user_data;
|
||||||
|
} moduleForkInfo = {0};
|
||||||
|
|
||||||
|
|
||||||
/* --------------------------------------------------------------------------
|
/* --------------------------------------------------------------------------
|
||||||
* Prototypes
|
* Prototypes
|
||||||
* -------------------------------------------------------------------------- */
|
* -------------------------------------------------------------------------- */
|
||||||
@ -5130,6 +5139,98 @@ int RM_CommandFilterArgDelete(RedisModuleCommandFilterCtx *fctx, int pos)
|
|||||||
return REDISMODULE_OK;
|
return REDISMODULE_OK;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* --------------------------------------------------------------------------
|
||||||
|
* Module fork API
|
||||||
|
* -------------------------------------------------------------------------- */
|
||||||
|
|
||||||
|
/* Create a background child process with the current frozen snaphost of the
|
||||||
|
* main process where you can do some processing in the background without
|
||||||
|
* affecting / freezing the traffic and no need for threads and GIL locking.
|
||||||
|
* Note that Redis allows for only one concurrent fork.
|
||||||
|
* When the child wants to exit, it should call RedisModule_ExitFromChild.
|
||||||
|
* If the parent wants to kill the child it should call RedisModule_KillForkChild
|
||||||
|
* The done handler callback will be executed on the parent process when the
|
||||||
|
* child existed (but not when killed)
|
||||||
|
* Return: -1 on failure, on success the parent process will get a positive PID
|
||||||
|
* of the child, and the child process will get 0.
|
||||||
|
*/
|
||||||
|
int RM_Fork(RedisModuleForkDoneHandler cb, void *user_data)
|
||||||
|
{
|
||||||
|
pid_t childpid;
|
||||||
|
if (hasForkChild()) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
openChildInfoPipe();
|
||||||
|
if ((childpid = redisFork()) == 0) {
|
||||||
|
/* Child */
|
||||||
|
redisSetProcTitle("redis-module-fork");
|
||||||
|
} else if (childpid == -1) {
|
||||||
|
closeChildInfoPipe();
|
||||||
|
serverLog(LL_WARNING,"Can't fork for module: %s", strerror(errno));
|
||||||
|
} else {
|
||||||
|
/* Parent */
|
||||||
|
server.module_child_pid = childpid;
|
||||||
|
moduleForkInfo.done_handler = cb;
|
||||||
|
moduleForkInfo.done_handler_user_data = user_data;
|
||||||
|
serverLog(LL_NOTICE, "Module fork started pid: %d ", childpid);
|
||||||
|
}
|
||||||
|
return childpid;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Call from the child process when you want to terminate it.
|
||||||
|
* retcode will be provided to the done handler executed on the parent process.
|
||||||
|
*/
|
||||||
|
int RM_ExitFromChild(int retcode)
|
||||||
|
{
|
||||||
|
sendChildCOWInfo(CHILD_INFO_TYPE_MODULE, "Module fork");
|
||||||
|
exitFromChild(retcode);
|
||||||
|
return REDISMODULE_OK;
|
||||||
|
}
|
||||||
|
|
||||||
|
void TerminateModuleForkChild(int wait) {
|
||||||
|
int statloc;
|
||||||
|
serverLog(LL_NOTICE,"Killing running module fork child: %ld",
|
||||||
|
(long) server.module_child_pid);
|
||||||
|
if (kill(server.module_child_pid,SIGUSR1) != -1 && wait) {
|
||||||
|
while(wait3(&statloc,0,NULL) != server.module_child_pid);
|
||||||
|
}
|
||||||
|
/* Reset the buffer accumulating changes while the child saves. */
|
||||||
|
server.module_child_pid = -1;
|
||||||
|
moduleForkInfo.done_handler = NULL;
|
||||||
|
moduleForkInfo.done_handler_user_data = NULL;
|
||||||
|
closeChildInfoPipe();
|
||||||
|
updateDictResizePolicy();
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Can be used to kill the forked child process from the parent process.
|
||||||
|
* child_pid whould be the return value of RedisModule_Fork. */
|
||||||
|
int RM_KillForkChild(int child_pid)
|
||||||
|
{
|
||||||
|
/* No module child? return. */
|
||||||
|
if (server.module_child_pid == -1) return REDISMODULE_ERR;
|
||||||
|
/* Make sure the module knows the pid it wants to kill (not trying to
|
||||||
|
* randomly kill other module's forks) */
|
||||||
|
if (server.module_child_pid != child_pid) return REDISMODULE_ERR;
|
||||||
|
/* Kill module child, wait for child exit. */
|
||||||
|
TerminateModuleForkChild(1);
|
||||||
|
return REDISMODULE_OK;
|
||||||
|
}
|
||||||
|
|
||||||
|
void ModuleForkDoneHandler(int exitcode, int bysignal)
|
||||||
|
{
|
||||||
|
serverLog(LL_NOTICE,
|
||||||
|
"Module fork exited pid: %d, retcode: %d, bysignal: %d",
|
||||||
|
server.module_child_pid, exitcode, bysignal);
|
||||||
|
if (moduleForkInfo.done_handler) {
|
||||||
|
moduleForkInfo.done_handler(exitcode, bysignal,
|
||||||
|
moduleForkInfo.done_handler_user_data);
|
||||||
|
}
|
||||||
|
server.module_child_pid = -1;
|
||||||
|
moduleForkInfo.done_handler = NULL;
|
||||||
|
moduleForkInfo.done_handler_user_data = NULL;
|
||||||
|
}
|
||||||
|
|
||||||
/* --------------------------------------------------------------------------
|
/* --------------------------------------------------------------------------
|
||||||
* Modules API internals
|
* Modules API internals
|
||||||
* -------------------------------------------------------------------------- */
|
* -------------------------------------------------------------------------- */
|
||||||
@ -5652,4 +5753,7 @@ void moduleRegisterCoreAPI(void) {
|
|||||||
REGISTER_API(CommandFilterArgInsert);
|
REGISTER_API(CommandFilterArgInsert);
|
||||||
REGISTER_API(CommandFilterArgReplace);
|
REGISTER_API(CommandFilterArgReplace);
|
||||||
REGISTER_API(CommandFilterArgDelete);
|
REGISTER_API(CommandFilterArgDelete);
|
||||||
|
REGISTER_API(Fork);
|
||||||
|
REGISTER_API(ExitFromChild);
|
||||||
|
REGISTER_API(KillForkChild);
|
||||||
}
|
}
|
||||||
|
49
src/rdb.c
49
src/rdb.c
@ -1335,40 +1335,25 @@ werr:
|
|||||||
|
|
||||||
int rdbSaveBackground(char *filename, rdbSaveInfo *rsi) {
|
int rdbSaveBackground(char *filename, rdbSaveInfo *rsi) {
|
||||||
pid_t childpid;
|
pid_t childpid;
|
||||||
long long start;
|
|
||||||
|
|
||||||
if (server.aof_child_pid != -1 || server.rdb_child_pid != -1) return C_ERR;
|
if (hasForkChild()) return C_ERR;
|
||||||
|
|
||||||
server.dirty_before_bgsave = server.dirty;
|
server.dirty_before_bgsave = server.dirty;
|
||||||
server.lastbgsave_try = time(NULL);
|
server.lastbgsave_try = time(NULL);
|
||||||
openChildInfoPipe();
|
openChildInfoPipe();
|
||||||
|
|
||||||
start = ustime();
|
if ((childpid = redisFork()) == 0) {
|
||||||
if ((childpid = fork()) == 0) {
|
|
||||||
int retval;
|
int retval;
|
||||||
|
|
||||||
/* Child */
|
/* Child */
|
||||||
closeListeningSockets(0);
|
|
||||||
redisSetProcTitle("redis-rdb-bgsave");
|
redisSetProcTitle("redis-rdb-bgsave");
|
||||||
retval = rdbSave(filename,rsi);
|
retval = rdbSave(filename,rsi);
|
||||||
if (retval == C_OK) {
|
if (retval == C_OK) {
|
||||||
size_t private_dirty = zmalloc_get_private_dirty(-1);
|
sendChildCOWInfo(CHILD_INFO_TYPE_RDB, "RDB");
|
||||||
|
|
||||||
if (private_dirty) {
|
|
||||||
serverLog(LL_NOTICE,
|
|
||||||
"RDB: %zu MB of memory used by copy-on-write",
|
|
||||||
private_dirty/(1024*1024));
|
|
||||||
}
|
|
||||||
|
|
||||||
server.child_info_data.cow_size = private_dirty;
|
|
||||||
sendChildInfo(CHILD_INFO_TYPE_RDB);
|
|
||||||
}
|
}
|
||||||
exitFromChild((retval == C_OK) ? 0 : 1);
|
exitFromChild((retval == C_OK) ? 0 : 1);
|
||||||
} else {
|
} else {
|
||||||
/* Parent */
|
/* Parent */
|
||||||
server.stat_fork_time = ustime()-start;
|
|
||||||
server.stat_fork_rate = (double) zmalloc_used_memory() * 1000000 / server.stat_fork_time / (1024*1024*1024); /* GB per second. */
|
|
||||||
latencyAddSampleIfNeeded("fork",server.stat_fork_time/1000);
|
|
||||||
if (childpid == -1) {
|
if (childpid == -1) {
|
||||||
closeChildInfoPipe();
|
closeChildInfoPipe();
|
||||||
server.lastbgsave_status = C_ERR;
|
server.lastbgsave_status = C_ERR;
|
||||||
@ -1380,7 +1365,6 @@ int rdbSaveBackground(char *filename, rdbSaveInfo *rsi) {
|
|||||||
server.rdb_save_time_start = time(NULL);
|
server.rdb_save_time_start = time(NULL);
|
||||||
server.rdb_child_pid = childpid;
|
server.rdb_child_pid = childpid;
|
||||||
server.rdb_child_type = RDB_CHILD_TYPE_DISK;
|
server.rdb_child_type = RDB_CHILD_TYPE_DISK;
|
||||||
updateDictResizePolicy();
|
|
||||||
return C_OK;
|
return C_OK;
|
||||||
}
|
}
|
||||||
return C_OK; /* unreached */
|
return C_OK; /* unreached */
|
||||||
@ -2431,10 +2415,9 @@ int rdbSaveToSlavesSockets(rdbSaveInfo *rsi) {
|
|||||||
listNode *ln;
|
listNode *ln;
|
||||||
listIter li;
|
listIter li;
|
||||||
pid_t childpid;
|
pid_t childpid;
|
||||||
long long start;
|
|
||||||
int pipefds[2];
|
int pipefds[2];
|
||||||
|
|
||||||
if (server.aof_child_pid != -1 || server.rdb_child_pid != -1) return C_ERR;
|
if (hasForkChild()) return C_ERR;
|
||||||
|
|
||||||
/* Before to fork, create a pipe that will be used in order to
|
/* Before to fork, create a pipe that will be used in order to
|
||||||
* send back to the parent the IDs of the slaves that successfully
|
* send back to the parent the IDs of the slaves that successfully
|
||||||
@ -2470,8 +2453,7 @@ int rdbSaveToSlavesSockets(rdbSaveInfo *rsi) {
|
|||||||
|
|
||||||
/* Create the child process. */
|
/* Create the child process. */
|
||||||
openChildInfoPipe();
|
openChildInfoPipe();
|
||||||
start = ustime();
|
if ((childpid = redisFork()) == 0) {
|
||||||
if ((childpid = fork()) == 0) {
|
|
||||||
/* Child */
|
/* Child */
|
||||||
int retval;
|
int retval;
|
||||||
rio slave_sockets;
|
rio slave_sockets;
|
||||||
@ -2479,7 +2461,6 @@ int rdbSaveToSlavesSockets(rdbSaveInfo *rsi) {
|
|||||||
rioInitWithFdset(&slave_sockets,fds,numfds);
|
rioInitWithFdset(&slave_sockets,fds,numfds);
|
||||||
zfree(fds);
|
zfree(fds);
|
||||||
|
|
||||||
closeListeningSockets(0);
|
|
||||||
redisSetProcTitle("redis-rdb-to-slaves");
|
redisSetProcTitle("redis-rdb-to-slaves");
|
||||||
|
|
||||||
retval = rdbSaveRioWithEOFMark(&slave_sockets,NULL,rsi);
|
retval = rdbSaveRioWithEOFMark(&slave_sockets,NULL,rsi);
|
||||||
@ -2487,16 +2468,7 @@ int rdbSaveToSlavesSockets(rdbSaveInfo *rsi) {
|
|||||||
retval = C_ERR;
|
retval = C_ERR;
|
||||||
|
|
||||||
if (retval == C_OK) {
|
if (retval == C_OK) {
|
||||||
size_t private_dirty = zmalloc_get_private_dirty(-1);
|
sendChildCOWInfo(CHILD_INFO_TYPE_RDB, "RDB");
|
||||||
|
|
||||||
if (private_dirty) {
|
|
||||||
serverLog(LL_NOTICE,
|
|
||||||
"RDB: %zu MB of memory used by copy-on-write",
|
|
||||||
private_dirty/(1024*1024));
|
|
||||||
}
|
|
||||||
|
|
||||||
server.child_info_data.cow_size = private_dirty;
|
|
||||||
sendChildInfo(CHILD_INFO_TYPE_RDB);
|
|
||||||
|
|
||||||
/* If we are returning OK, at least one slave was served
|
/* If we are returning OK, at least one slave was served
|
||||||
* with the RDB file as expected, so we need to send a report
|
* with the RDB file as expected, so we need to send a report
|
||||||
@ -2565,16 +2537,11 @@ int rdbSaveToSlavesSockets(rdbSaveInfo *rsi) {
|
|||||||
close(pipefds[1]);
|
close(pipefds[1]);
|
||||||
closeChildInfoPipe();
|
closeChildInfoPipe();
|
||||||
} else {
|
} else {
|
||||||
server.stat_fork_time = ustime()-start;
|
|
||||||
server.stat_fork_rate = (double) zmalloc_used_memory() * 1000000 / server.stat_fork_time / (1024*1024*1024); /* GB per second. */
|
|
||||||
latencyAddSampleIfNeeded("fork",server.stat_fork_time/1000);
|
|
||||||
|
|
||||||
serverLog(LL_NOTICE,"Background RDB transfer started by pid %d",
|
serverLog(LL_NOTICE,"Background RDB transfer started by pid %d",
|
||||||
childpid);
|
childpid);
|
||||||
server.rdb_save_time_start = time(NULL);
|
server.rdb_save_time_start = time(NULL);
|
||||||
server.rdb_child_pid = childpid;
|
server.rdb_child_pid = childpid;
|
||||||
server.rdb_child_type = RDB_CHILD_TYPE_SOCKET;
|
server.rdb_child_type = RDB_CHILD_TYPE_SOCKET;
|
||||||
updateDictResizePolicy();
|
|
||||||
}
|
}
|
||||||
zfree(clientids);
|
zfree(clientids);
|
||||||
zfree(fds);
|
zfree(fds);
|
||||||
@ -2617,13 +2584,13 @@ void bgsaveCommand(client *c) {
|
|||||||
|
|
||||||
if (server.rdb_child_pid != -1) {
|
if (server.rdb_child_pid != -1) {
|
||||||
addReplyError(c,"Background save already in progress");
|
addReplyError(c,"Background save already in progress");
|
||||||
} else if (server.aof_child_pid != -1) {
|
} else if (hasForkChild()) {
|
||||||
if (schedule) {
|
if (schedule) {
|
||||||
server.rdb_bgsave_scheduled = 1;
|
server.rdb_bgsave_scheduled = 1;
|
||||||
addReplyStatus(c,"Background saving scheduled");
|
addReplyStatus(c,"Background saving scheduled");
|
||||||
} else {
|
} else {
|
||||||
addReplyError(c,
|
addReplyError(c,
|
||||||
"An AOF log rewriting in progress: can't BGSAVE right now. "
|
"Another BG operation is in progress: can't BGSAVE right now. "
|
||||||
"Use BGSAVE SCHEDULE in order to schedule a BGSAVE whenever "
|
"Use BGSAVE SCHEDULE in order to schedule a BGSAVE whenever "
|
||||||
"possible.");
|
"possible.");
|
||||||
}
|
}
|
||||||
|
@ -182,6 +182,7 @@ typedef void (*RedisModuleTypeFreeFunc)(void *value);
|
|||||||
typedef void (*RedisModuleClusterMessageReceiver)(RedisModuleCtx *ctx, const char *sender_id, uint8_t type, const unsigned char *payload, uint32_t len);
|
typedef void (*RedisModuleClusterMessageReceiver)(RedisModuleCtx *ctx, const char *sender_id, uint8_t type, const unsigned char *payload, uint32_t len);
|
||||||
typedef void (*RedisModuleTimerProc)(RedisModuleCtx *ctx, void *data);
|
typedef void (*RedisModuleTimerProc)(RedisModuleCtx *ctx, void *data);
|
||||||
typedef void (*RedisModuleCommandFilterFunc) (RedisModuleCommandFilterCtx *filter);
|
typedef void (*RedisModuleCommandFilterFunc) (RedisModuleCommandFilterCtx *filter);
|
||||||
|
typedef void (*RedisModuleForkDoneHandler) (int exitcode, int bysignal, void *user_data);
|
||||||
|
|
||||||
#define REDISMODULE_TYPE_METHOD_VERSION 2
|
#define REDISMODULE_TYPE_METHOD_VERSION 2
|
||||||
typedef struct RedisModuleTypeMethods {
|
typedef struct RedisModuleTypeMethods {
|
||||||
@ -372,6 +373,9 @@ const RedisModuleString *REDISMODULE_API_FUNC(RedisModule_CommandFilterArgGet)(R
|
|||||||
int REDISMODULE_API_FUNC(RedisModule_CommandFilterArgInsert)(RedisModuleCommandFilterCtx *fctx, int pos, RedisModuleString *arg);
|
int REDISMODULE_API_FUNC(RedisModule_CommandFilterArgInsert)(RedisModuleCommandFilterCtx *fctx, int pos, RedisModuleString *arg);
|
||||||
int REDISMODULE_API_FUNC(RedisModule_CommandFilterArgReplace)(RedisModuleCommandFilterCtx *fctx, int pos, RedisModuleString *arg);
|
int REDISMODULE_API_FUNC(RedisModule_CommandFilterArgReplace)(RedisModuleCommandFilterCtx *fctx, int pos, RedisModuleString *arg);
|
||||||
int REDISMODULE_API_FUNC(RedisModule_CommandFilterArgDelete)(RedisModuleCommandFilterCtx *fctx, int pos);
|
int REDISMODULE_API_FUNC(RedisModule_CommandFilterArgDelete)(RedisModuleCommandFilterCtx *fctx, int pos);
|
||||||
|
int REDISMODULE_API_FUNC(RedisModule_Fork)(RedisModuleForkDoneHandler cb, void *user_data);
|
||||||
|
int REDISMODULE_API_FUNC(RedisModule_ExitFromChild)(int retcode);
|
||||||
|
int REDISMODULE_API_FUNC(RedisModule_KillForkChild)(int child_pid);
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
/* This is included inline inside each Redis module. */
|
/* This is included inline inside each Redis module. */
|
||||||
@ -546,6 +550,9 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int
|
|||||||
REDISMODULE_GET_API(CommandFilterArgInsert);
|
REDISMODULE_GET_API(CommandFilterArgInsert);
|
||||||
REDISMODULE_GET_API(CommandFilterArgReplace);
|
REDISMODULE_GET_API(CommandFilterArgReplace);
|
||||||
REDISMODULE_GET_API(CommandFilterArgDelete);
|
REDISMODULE_GET_API(CommandFilterArgDelete);
|
||||||
|
REDISMODULE_GET_API(Fork);
|
||||||
|
REDISMODULE_GET_API(ExitFromChild);
|
||||||
|
REDISMODULE_GET_API(KillForkChild);
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
if (RedisModule_IsModuleNameBusy && RedisModule_IsModuleNameBusy(name)) return REDISMODULE_ERR;
|
if (RedisModule_IsModuleNameBusy && RedisModule_IsModuleNameBusy(name)) return REDISMODULE_ERR;
|
||||||
|
@ -751,11 +751,11 @@ void syncCommand(client *c) {
|
|||||||
/* Target is disk (or the slave is not capable of supporting
|
/* Target is disk (or the slave is not capable of supporting
|
||||||
* diskless replication) and we don't have a BGSAVE in progress,
|
* diskless replication) and we don't have a BGSAVE in progress,
|
||||||
* let's start one. */
|
* let's start one. */
|
||||||
if (server.aof_child_pid == -1) {
|
if (!hasForkChild()) {
|
||||||
startBgsaveForReplication(c->slave_capa);
|
startBgsaveForReplication(c->slave_capa);
|
||||||
} else {
|
} else {
|
||||||
serverLog(LL_NOTICE,
|
serverLog(LL_NOTICE,
|
||||||
"No BGSAVE in progress, but an AOF rewrite is active. "
|
"No BGSAVE in progress, but another BG operation is active. "
|
||||||
"BGSAVE for replication delayed");
|
"BGSAVE for replication delayed");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -2930,7 +2930,7 @@ void replicationCron(void) {
|
|||||||
* In case of diskless replication, we make sure to wait the specified
|
* In case of diskless replication, we make sure to wait the specified
|
||||||
* number of seconds (according to configuration) so that other slaves
|
* number of seconds (according to configuration) so that other slaves
|
||||||
* have the time to arrive before we start streaming. */
|
* have the time to arrive before we start streaming. */
|
||||||
if (server.rdb_child_pid == -1 && server.aof_child_pid == -1) {
|
if (!hasForkChild()) {
|
||||||
time_t idle, max_idle = 0;
|
time_t idle, max_idle = 0;
|
||||||
int slaves_waiting = 0;
|
int slaves_waiting = 0;
|
||||||
int mincapa = -1;
|
int mincapa = -1;
|
||||||
|
@ -1827,7 +1827,7 @@ void ldbSendLogs(void) {
|
|||||||
int ldbStartSession(client *c) {
|
int ldbStartSession(client *c) {
|
||||||
ldb.forked = (c->flags & CLIENT_LUA_DEBUG_SYNC) == 0;
|
ldb.forked = (c->flags & CLIENT_LUA_DEBUG_SYNC) == 0;
|
||||||
if (ldb.forked) {
|
if (ldb.forked) {
|
||||||
pid_t cp = fork();
|
pid_t cp = redisFork();
|
||||||
if (cp == -1) {
|
if (cp == -1) {
|
||||||
addReplyError(c,"Fork() failed: can't run EVAL in debugging mode.");
|
addReplyError(c,"Fork() failed: can't run EVAL in debugging mode.");
|
||||||
return 0;
|
return 0;
|
||||||
@ -1844,7 +1844,6 @@ int ldbStartSession(client *c) {
|
|||||||
* socket to make sure if the parent crashes a reset is sent
|
* socket to make sure if the parent crashes a reset is sent
|
||||||
* to the clients. */
|
* to the clients. */
|
||||||
serverLog(LL_WARNING,"Redis forked for debugging eval");
|
serverLog(LL_WARNING,"Redis forked for debugging eval");
|
||||||
closeListeningSockets(0);
|
|
||||||
} else {
|
} else {
|
||||||
/* Parent */
|
/* Parent */
|
||||||
listAddNodeTail(ldb.children,(void*)(unsigned long)cp);
|
listAddNodeTail(ldb.children,(void*)(unsigned long)cp);
|
||||||
|
103
src/server.c
103
src/server.c
@ -1449,12 +1449,18 @@ int incrementallyRehash(int dbid) {
|
|||||||
* for dict.c to resize the hash tables accordingly to the fact we have o not
|
* for dict.c to resize the hash tables accordingly to the fact we have o not
|
||||||
* running childs. */
|
* running childs. */
|
||||||
void updateDictResizePolicy(void) {
|
void updateDictResizePolicy(void) {
|
||||||
if (server.rdb_child_pid == -1 && server.aof_child_pid == -1)
|
if (!hasForkChild())
|
||||||
dictEnableResize();
|
dictEnableResize();
|
||||||
else
|
else
|
||||||
dictDisableResize();
|
dictDisableResize();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int hasForkChild() {
|
||||||
|
return server.rdb_child_pid != -1 ||
|
||||||
|
server.aof_child_pid != -1 ||
|
||||||
|
server.module_child_pid != -1;
|
||||||
|
}
|
||||||
|
|
||||||
/* ======================= Cron: called every 100 ms ======================== */
|
/* ======================= Cron: called every 100 ms ======================== */
|
||||||
|
|
||||||
/* Add a sample to the operations per second array of samples. */
|
/* Add a sample to the operations per second array of samples. */
|
||||||
@ -1691,7 +1697,7 @@ void databasesCron(void) {
|
|||||||
/* Perform hash tables rehashing if needed, but only if there are no
|
/* Perform hash tables rehashing if needed, but only if there are no
|
||||||
* other processes saving the DB on disk. Otherwise rehashing is bad
|
* other processes saving the DB on disk. Otherwise rehashing is bad
|
||||||
* as will cause a lot of copy-on-write of memory pages. */
|
* as will cause a lot of copy-on-write of memory pages. */
|
||||||
if (server.rdb_child_pid == -1 && server.aof_child_pid == -1) {
|
if (!hasForkChild()) {
|
||||||
/* We use global counters so if we stop the computation at a given
|
/* We use global counters so if we stop the computation at a given
|
||||||
* DB we'll be able to start from the successive in the next
|
* DB we'll be able to start from the successive in the next
|
||||||
* cron loop iteration. */
|
* cron loop iteration. */
|
||||||
@ -1888,15 +1894,14 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
|
|||||||
|
|
||||||
/* Start a scheduled AOF rewrite if this was requested by the user while
|
/* Start a scheduled AOF rewrite if this was requested by the user while
|
||||||
* a BGSAVE was in progress. */
|
* a BGSAVE was in progress. */
|
||||||
if (server.rdb_child_pid == -1 && server.aof_child_pid == -1 &&
|
if (!hasForkChild() &&
|
||||||
server.aof_rewrite_scheduled)
|
server.aof_rewrite_scheduled)
|
||||||
{
|
{
|
||||||
rewriteAppendOnlyFileBackground();
|
rewriteAppendOnlyFileBackground();
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Check if a background saving or AOF rewrite in progress terminated. */
|
/* Check if a background saving or AOF rewrite in progress terminated. */
|
||||||
if (server.rdb_child_pid != -1 || server.aof_child_pid != -1 ||
|
if (hasForkChild() || ldbPendingChildren())
|
||||||
ldbPendingChildren())
|
|
||||||
{
|
{
|
||||||
int statloc;
|
int statloc;
|
||||||
pid_t pid;
|
pid_t pid;
|
||||||
@ -1907,18 +1912,29 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
|
|||||||
|
|
||||||
if (WIFSIGNALED(statloc)) bysignal = WTERMSIG(statloc);
|
if (WIFSIGNALED(statloc)) bysignal = WTERMSIG(statloc);
|
||||||
|
|
||||||
|
/* sigKillChildHandler catches the signal and calls exit(), but we
|
||||||
|
* must make sure not to flag lastbgsave_status, etc incorrectly. */
|
||||||
|
if (exitcode == SIGUSR1) {
|
||||||
|
bysignal = SIGUSR1;
|
||||||
|
exitcode = 1;
|
||||||
|
}
|
||||||
|
|
||||||
if (pid == -1) {
|
if (pid == -1) {
|
||||||
serverLog(LL_WARNING,"wait3() returned an error: %s. "
|
serverLog(LL_WARNING,"wait3() returned an error: %s. "
|
||||||
"rdb_child_pid = %d, aof_child_pid = %d",
|
"rdb_child_pid = %d, aof_child_pid = %d, module_child_pid = %d",
|
||||||
strerror(errno),
|
strerror(errno),
|
||||||
(int) server.rdb_child_pid,
|
(int) server.rdb_child_pid,
|
||||||
(int) server.aof_child_pid);
|
(int) server.aof_child_pid,
|
||||||
|
(int) server.module_child_pid);
|
||||||
} else if (pid == server.rdb_child_pid) {
|
} else if (pid == server.rdb_child_pid) {
|
||||||
backgroundSaveDoneHandler(exitcode,bysignal);
|
backgroundSaveDoneHandler(exitcode,bysignal);
|
||||||
if (!bysignal && exitcode == 0) receiveChildInfo();
|
if (!bysignal && exitcode == 0) receiveChildInfo();
|
||||||
} else if (pid == server.aof_child_pid) {
|
} else if (pid == server.aof_child_pid) {
|
||||||
backgroundRewriteDoneHandler(exitcode,bysignal);
|
backgroundRewriteDoneHandler(exitcode,bysignal);
|
||||||
if (!bysignal && exitcode == 0) receiveChildInfo();
|
if (!bysignal && exitcode == 0) receiveChildInfo();
|
||||||
|
} else if (pid == server.module_child_pid) {
|
||||||
|
ModuleForkDoneHandler(exitcode,bysignal);
|
||||||
|
if (!bysignal && exitcode == 0) receiveChildInfo();
|
||||||
} else {
|
} else {
|
||||||
if (!ldbRemoveChild(pid)) {
|
if (!ldbRemoveChild(pid)) {
|
||||||
serverLog(LL_WARNING,
|
serverLog(LL_WARNING,
|
||||||
@ -1956,8 +1972,7 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
|
|||||||
|
|
||||||
/* Trigger an AOF rewrite if needed. */
|
/* Trigger an AOF rewrite if needed. */
|
||||||
if (server.aof_state == AOF_ON &&
|
if (server.aof_state == AOF_ON &&
|
||||||
server.rdb_child_pid == -1 &&
|
!hasForkChild() &&
|
||||||
server.aof_child_pid == -1 &&
|
|
||||||
server.aof_rewrite_perc &&
|
server.aof_rewrite_perc &&
|
||||||
server.aof_current_size > server.aof_rewrite_min_size)
|
server.aof_current_size > server.aof_rewrite_min_size)
|
||||||
{
|
{
|
||||||
@ -2015,7 +2030,7 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
|
|||||||
* Note: this code must be after the replicationCron() call above so
|
* Note: this code must be after the replicationCron() call above so
|
||||||
* make sure when refactoring this file to keep this order. This is useful
|
* make sure when refactoring this file to keep this order. This is useful
|
||||||
* because we want to give priority to RDB savings for replication. */
|
* because we want to give priority to RDB savings for replication. */
|
||||||
if (server.rdb_child_pid == -1 && server.aof_child_pid == -1 &&
|
if (!hasForkChild() &&
|
||||||
server.rdb_bgsave_scheduled &&
|
server.rdb_bgsave_scheduled &&
|
||||||
(server.unixtime-server.lastbgsave_try > CONFIG_BGSAVE_RETRY_DELAY ||
|
(server.unixtime-server.lastbgsave_try > CONFIG_BGSAVE_RETRY_DELAY ||
|
||||||
server.lastbgsave_status == C_OK))
|
server.lastbgsave_status == C_OK))
|
||||||
@ -2799,6 +2814,7 @@ void initServer(void) {
|
|||||||
server.cronloops = 0;
|
server.cronloops = 0;
|
||||||
server.rdb_child_pid = -1;
|
server.rdb_child_pid = -1;
|
||||||
server.aof_child_pid = -1;
|
server.aof_child_pid = -1;
|
||||||
|
server.module_child_pid = -1;
|
||||||
server.rdb_child_type = RDB_CHILD_TYPE_NONE;
|
server.rdb_child_type = RDB_CHILD_TYPE_NONE;
|
||||||
server.rdb_bgsave_scheduled = 0;
|
server.rdb_bgsave_scheduled = 0;
|
||||||
server.child_info_pipe[0] = -1;
|
server.child_info_pipe[0] = -1;
|
||||||
@ -2817,6 +2833,7 @@ void initServer(void) {
|
|||||||
server.stat_peak_memory = 0;
|
server.stat_peak_memory = 0;
|
||||||
server.stat_rdb_cow_bytes = 0;
|
server.stat_rdb_cow_bytes = 0;
|
||||||
server.stat_aof_cow_bytes = 0;
|
server.stat_aof_cow_bytes = 0;
|
||||||
|
server.stat_module_cow_bytes = 0;
|
||||||
server.cron_malloc_stats.zmalloc_used = 0;
|
server.cron_malloc_stats.zmalloc_used = 0;
|
||||||
server.cron_malloc_stats.process_rss = 0;
|
server.cron_malloc_stats.process_rss = 0;
|
||||||
server.cron_malloc_stats.allocator_allocated = 0;
|
server.cron_malloc_stats.allocator_allocated = 0;
|
||||||
@ -3566,6 +3583,12 @@ int prepareForShutdown(int flags) {
|
|||||||
killRDBChild();
|
killRDBChild();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* Kill module child if there is one. */
|
||||||
|
if (server.module_child_pid != -1) {
|
||||||
|
serverLog(LL_WARNING,"There is a module fork child. Killing it!");
|
||||||
|
TerminateModuleForkChild(0);
|
||||||
|
}
|
||||||
|
|
||||||
if (server.aof_state != AOF_OFF) {
|
if (server.aof_state != AOF_OFF) {
|
||||||
/* Kill the AOF saving child as the AOF we already have may be longer
|
/* Kill the AOF saving child as the AOF we already have may be longer
|
||||||
* but contains the full dataset anyway. */
|
* but contains the full dataset anyway. */
|
||||||
@ -4066,7 +4089,9 @@ sds genRedisInfoString(char *section) {
|
|||||||
"aof_current_rewrite_time_sec:%jd\r\n"
|
"aof_current_rewrite_time_sec:%jd\r\n"
|
||||||
"aof_last_bgrewrite_status:%s\r\n"
|
"aof_last_bgrewrite_status:%s\r\n"
|
||||||
"aof_last_write_status:%s\r\n"
|
"aof_last_write_status:%s\r\n"
|
||||||
"aof_last_cow_size:%zu\r\n",
|
"aof_last_cow_size:%zu\r\n"
|
||||||
|
"module_fork_in_progress:%d\r\n"
|
||||||
|
"module_fork_last_cow_size:%zu\r\n",
|
||||||
server.loading,
|
server.loading,
|
||||||
server.dirty,
|
server.dirty,
|
||||||
server.rdb_child_pid != -1,
|
server.rdb_child_pid != -1,
|
||||||
@ -4084,7 +4109,9 @@ sds genRedisInfoString(char *section) {
|
|||||||
-1 : time(NULL)-server.aof_rewrite_time_start),
|
-1 : time(NULL)-server.aof_rewrite_time_start),
|
||||||
(server.aof_lastbgrewrite_status == C_OK) ? "ok" : "err",
|
(server.aof_lastbgrewrite_status == C_OK) ? "ok" : "err",
|
||||||
(server.aof_last_write_status == C_OK) ? "ok" : "err",
|
(server.aof_last_write_status == C_OK) ? "ok" : "err",
|
||||||
server.stat_aof_cow_bytes);
|
server.stat_aof_cow_bytes,
|
||||||
|
server.module_child_pid != -1,
|
||||||
|
server.stat_module_cow_bytes);
|
||||||
|
|
||||||
if (server.aof_enabled) {
|
if (server.aof_enabled) {
|
||||||
info = sdscatprintf(info,
|
info = sdscatprintf(info,
|
||||||
@ -4591,6 +4618,58 @@ void setupSignalHandlers(void) {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void sigKillChildHandler(int sig) {
|
||||||
|
UNUSED(sig);
|
||||||
|
/* this handler is needed to resolve a valgrind warning */
|
||||||
|
serverLogFromHandler(LL_WARNING, "Received SIGUSR1 in child, exiting now.");
|
||||||
|
exitFromChild(SIGUSR1);
|
||||||
|
}
|
||||||
|
|
||||||
|
void setupChildSignalHandlers(void) {
|
||||||
|
struct sigaction act;
|
||||||
|
|
||||||
|
/* When the SA_SIGINFO flag is set in sa_flags then sa_sigaction is used.
|
||||||
|
* Otherwise, sa_handler is used. */
|
||||||
|
sigemptyset(&act.sa_mask);
|
||||||
|
act.sa_flags = 0;
|
||||||
|
act.sa_handler = sigKillChildHandler;
|
||||||
|
sigaction(SIGUSR1, &act, NULL);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
int redisFork() {
|
||||||
|
int childpid;
|
||||||
|
long long start = ustime();
|
||||||
|
if ((childpid = fork()) == 0) {
|
||||||
|
/* Child */
|
||||||
|
closeListeningSockets(0);
|
||||||
|
setupChildSignalHandlers();
|
||||||
|
} else {
|
||||||
|
/* Parent */
|
||||||
|
server.stat_fork_time = ustime()-start;
|
||||||
|
server.stat_fork_rate = (double) zmalloc_used_memory() * 1000000 / server.stat_fork_time / (1024*1024*1024); /* GB per second. */
|
||||||
|
latencyAddSampleIfNeeded("fork",server.stat_fork_time/1000);
|
||||||
|
if (childpid == -1) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
updateDictResizePolicy();
|
||||||
|
}
|
||||||
|
return childpid;
|
||||||
|
}
|
||||||
|
|
||||||
|
void sendChildCOWInfo(int ptype, char *pname) {
|
||||||
|
size_t private_dirty = zmalloc_get_private_dirty(-1);
|
||||||
|
|
||||||
|
if (private_dirty) {
|
||||||
|
serverLog(LL_NOTICE,
|
||||||
|
"%s: %zu MB of memory used by copy-on-write",
|
||||||
|
pname, private_dirty/(1024*1024));
|
||||||
|
}
|
||||||
|
|
||||||
|
server.child_info_data.cow_size = private_dirty;
|
||||||
|
sendChildInfo(ptype);
|
||||||
|
}
|
||||||
|
|
||||||
void memtest(size_t megabytes, int passes);
|
void memtest(size_t megabytes, int passes);
|
||||||
|
|
||||||
/* Returns 1 if there is --sentinel among the arguments or if
|
/* Returns 1 if there is --sentinel among the arguments or if
|
||||||
|
10
src/server.h
10
src/server.h
@ -1041,6 +1041,7 @@ struct clusterState;
|
|||||||
#define CHILD_INFO_MAGIC 0xC17DDA7A12345678LL
|
#define CHILD_INFO_MAGIC 0xC17DDA7A12345678LL
|
||||||
#define CHILD_INFO_TYPE_RDB 0
|
#define CHILD_INFO_TYPE_RDB 0
|
||||||
#define CHILD_INFO_TYPE_AOF 1
|
#define CHILD_INFO_TYPE_AOF 1
|
||||||
|
#define CHILD_INFO_TYPE_MODULE 3
|
||||||
|
|
||||||
struct redisServer {
|
struct redisServer {
|
||||||
/* General */
|
/* General */
|
||||||
@ -1076,6 +1077,7 @@ struct redisServer {
|
|||||||
int module_blocked_pipe[2]; /* Pipe used to awake the event loop if a
|
int module_blocked_pipe[2]; /* Pipe used to awake the event loop if a
|
||||||
client blocked on a module command needs
|
client blocked on a module command needs
|
||||||
to be processed. */
|
to be processed. */
|
||||||
|
pid_t module_child_pid; /* PID of module child */
|
||||||
/* Networking */
|
/* Networking */
|
||||||
int port; /* TCP listening port */
|
int port; /* TCP listening port */
|
||||||
int tcp_backlog; /* TCP listen() backlog */
|
int tcp_backlog; /* TCP listen() backlog */
|
||||||
@ -1149,6 +1151,7 @@ struct redisServer {
|
|||||||
_Atomic long long stat_net_output_bytes; /* Bytes written to network. */
|
_Atomic long long stat_net_output_bytes; /* Bytes written to network. */
|
||||||
size_t stat_rdb_cow_bytes; /* Copy on write bytes during RDB saving. */
|
size_t stat_rdb_cow_bytes; /* Copy on write bytes during RDB saving. */
|
||||||
size_t stat_aof_cow_bytes; /* Copy on write bytes during AOF rewrite. */
|
size_t stat_aof_cow_bytes; /* Copy on write bytes during AOF rewrite. */
|
||||||
|
size_t stat_module_cow_bytes; /* Copy on write bytes during module fork. */
|
||||||
/* The following two are used to track instantaneous metrics, like
|
/* The following two are used to track instantaneous metrics, like
|
||||||
* number of operations per second, network traffic. */
|
* number of operations per second, network traffic. */
|
||||||
struct {
|
struct {
|
||||||
@ -1540,6 +1543,8 @@ void moduleAcquireGIL(void);
|
|||||||
void moduleReleaseGIL(void);
|
void moduleReleaseGIL(void);
|
||||||
void moduleNotifyKeyspaceEvent(int type, const char *event, robj *key, int dbid);
|
void moduleNotifyKeyspaceEvent(int type, const char *event, robj *key, int dbid);
|
||||||
void moduleCallCommandFilters(client *c);
|
void moduleCallCommandFilters(client *c);
|
||||||
|
void ModuleForkDoneHandler(int exitcode, int bysignal);
|
||||||
|
void TerminateModuleForkChild(int wait);
|
||||||
ssize_t rdbSaveModulesAux(rio *rdb, int when);
|
ssize_t rdbSaveModulesAux(rio *rdb, int when);
|
||||||
int moduleAllDatatypesHandleErrors();
|
int moduleAllDatatypesHandleErrors();
|
||||||
|
|
||||||
@ -1803,6 +1808,11 @@ void closeChildInfoPipe(void);
|
|||||||
void sendChildInfo(int process_type);
|
void sendChildInfo(int process_type);
|
||||||
void receiveChildInfo(void);
|
void receiveChildInfo(void);
|
||||||
|
|
||||||
|
/* Fork helpers */
|
||||||
|
int redisFork();
|
||||||
|
int hasForkChild();
|
||||||
|
void sendChildCOWInfo(int ptype, char *pname);
|
||||||
|
|
||||||
/* acl.c -- Authentication related prototypes. */
|
/* acl.c -- Authentication related prototypes. */
|
||||||
extern rax *Users;
|
extern rax *Users;
|
||||||
extern user *DefaultUser;
|
extern user *DefaultUser;
|
||||||
|
@ -115,3 +115,17 @@ start_server_and_kill_it [list "dir" $server_path] {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
start_server {} {
|
||||||
|
test {Test FLUSHALL aborts bgsave} {
|
||||||
|
r config set rdb-key-save-delay 1000
|
||||||
|
r debug populate 1000
|
||||||
|
r bgsave
|
||||||
|
assert_equal [s rdb_bgsave_in_progress] 1
|
||||||
|
r flushall
|
||||||
|
after 200
|
||||||
|
assert_equal [s rdb_bgsave_in_progress] 0
|
||||||
|
# make sure the server is still writable
|
||||||
|
r set x xx
|
||||||
|
}
|
||||||
|
}
|
@ -13,16 +13,20 @@ endif
|
|||||||
|
|
||||||
.SUFFIXES: .c .so .xo .o
|
.SUFFIXES: .c .so .xo .o
|
||||||
|
|
||||||
all: commandfilter.so testrdb.so
|
all: commandfilter.so testrdb.so fork.so
|
||||||
|
|
||||||
.c.xo:
|
.c.xo:
|
||||||
$(CC) -I../../src $(CFLAGS) $(SHOBJ_CFLAGS) -fPIC -c $< -o $@
|
$(CC) -I../../src $(CFLAGS) $(SHOBJ_CFLAGS) -fPIC -c $< -o $@
|
||||||
|
|
||||||
commandfilter.xo: ../../src/redismodule.h
|
commandfilter.xo: ../../src/redismodule.h
|
||||||
|
fork.xo: ../../src/redismodule.h
|
||||||
testrdb.xo: ../../src/redismodule.h
|
testrdb.xo: ../../src/redismodule.h
|
||||||
|
|
||||||
commandfilter.so: commandfilter.xo
|
commandfilter.so: commandfilter.xo
|
||||||
$(LD) -o $@ $< $(SHOBJ_LDFLAGS) $(LIBS) -lc
|
$(LD) -o $@ $< $(SHOBJ_LDFLAGS) $(LIBS) -lc
|
||||||
|
|
||||||
|
fork.so: fork.xo
|
||||||
|
$(LD) -o $@ $< $(SHOBJ_LDFLAGS) $(LIBS) -lc
|
||||||
|
|
||||||
testrdb.so: testrdb.xo
|
testrdb.so: testrdb.xo
|
||||||
$(LD) -o $@ $< $(SHOBJ_LDFLAGS) $(LIBS) -lc
|
$(LD) -o $@ $< $(SHOBJ_LDFLAGS) $(LIBS) -lc
|
||||||
|
84
tests/modules/fork.c
Normal file
84
tests/modules/fork.c
Normal file
@ -0,0 +1,84 @@
|
|||||||
|
#define REDISMODULE_EXPERIMENTAL_API
|
||||||
|
#include "redismodule.h"
|
||||||
|
|
||||||
|
#include <string.h>
|
||||||
|
#include <assert.h>
|
||||||
|
#include <unistd.h>
|
||||||
|
|
||||||
|
#define UNUSED(V) ((void) V)
|
||||||
|
|
||||||
|
int child_pid = -1;
|
||||||
|
int exitted_with_code = -1;
|
||||||
|
|
||||||
|
void done_handler(int exitcode, int bysignal, void *user_data) {
|
||||||
|
child_pid = -1;
|
||||||
|
exitted_with_code = exitcode;
|
||||||
|
assert(user_data==(void*)0xdeadbeef);
|
||||||
|
UNUSED(bysignal);
|
||||||
|
}
|
||||||
|
|
||||||
|
int fork_create(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
|
||||||
|
{
|
||||||
|
long long code_to_exit_with;
|
||||||
|
if (argc != 2) {
|
||||||
|
RedisModule_WrongArity(ctx);
|
||||||
|
return REDISMODULE_OK;
|
||||||
|
}
|
||||||
|
RedisModule_StringToLongLong(argv[1], &code_to_exit_with);
|
||||||
|
exitted_with_code = -1;
|
||||||
|
child_pid = RedisModule_Fork(done_handler, (void*)0xdeadbeef);
|
||||||
|
if (child_pid < 0) {
|
||||||
|
RedisModule_ReplyWithError(ctx, "Fork failed");
|
||||||
|
return REDISMODULE_OK;
|
||||||
|
} else if (child_pid > 0) {
|
||||||
|
/* parent */
|
||||||
|
RedisModule_ReplyWithLongLong(ctx, child_pid);
|
||||||
|
return REDISMODULE_OK;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* child */
|
||||||
|
RedisModule_Log(ctx, "notice", "fork child started");
|
||||||
|
usleep(200000);
|
||||||
|
RedisModule_Log(ctx, "notice", "fork child exiting");
|
||||||
|
RedisModule_ExitFromChild(code_to_exit_with);
|
||||||
|
/* unreachable */
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int fork_exitcode(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
|
||||||
|
{
|
||||||
|
UNUSED(argv);
|
||||||
|
UNUSED(argc);
|
||||||
|
RedisModule_ReplyWithLongLong(ctx, exitted_with_code);
|
||||||
|
return REDISMODULE_OK;
|
||||||
|
}
|
||||||
|
|
||||||
|
int fork_kill(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
|
||||||
|
{
|
||||||
|
UNUSED(argv);
|
||||||
|
UNUSED(argc);
|
||||||
|
if (RedisModule_KillForkChild(child_pid) != REDISMODULE_OK)
|
||||||
|
RedisModule_ReplyWithError(ctx, "KillForkChild failed");
|
||||||
|
else
|
||||||
|
RedisModule_ReplyWithLongLong(ctx, 1);
|
||||||
|
child_pid = -1;
|
||||||
|
return REDISMODULE_OK;
|
||||||
|
}
|
||||||
|
|
||||||
|
int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
|
||||||
|
UNUSED(argv);
|
||||||
|
UNUSED(argc);
|
||||||
|
if (RedisModule_Init(ctx,"fork",1,REDISMODULE_APIVER_1)== REDISMODULE_ERR)
|
||||||
|
return REDISMODULE_ERR;
|
||||||
|
|
||||||
|
if (RedisModule_CreateCommand(ctx,"fork.create", fork_create,"",0,0,0) == REDISMODULE_ERR)
|
||||||
|
return REDISMODULE_ERR;
|
||||||
|
|
||||||
|
if (RedisModule_CreateCommand(ctx,"fork.exitcode", fork_exitcode,"",0,0,0) == REDISMODULE_ERR)
|
||||||
|
return REDISMODULE_ERR;
|
||||||
|
|
||||||
|
if (RedisModule_CreateCommand(ctx,"fork.kill", fork_kill,"",0,0,0) == REDISMODULE_ERR)
|
||||||
|
return REDISMODULE_ERR;
|
||||||
|
|
||||||
|
return REDISMODULE_OK;
|
||||||
|
}
|
32
tests/unit/moduleapi/fork.tcl
Normal file
32
tests/unit/moduleapi/fork.tcl
Normal file
@ -0,0 +1,32 @@
|
|||||||
|
set testmodule [file normalize tests/modules/fork.so]
|
||||||
|
|
||||||
|
proc count_log_message {pattern} {
|
||||||
|
set result [exec grep -c $pattern < [srv 0 stdout]]
|
||||||
|
}
|
||||||
|
|
||||||
|
start_server {tags {"modules"}} {
|
||||||
|
r module load $testmodule
|
||||||
|
|
||||||
|
test {Module fork} {
|
||||||
|
# the argument to fork.create is the exitcode on termination
|
||||||
|
r fork.create 3
|
||||||
|
wait_for_condition 20 100 {
|
||||||
|
[r fork.exitcode] != -1
|
||||||
|
} else {
|
||||||
|
fail "fork didn't terminate"
|
||||||
|
}
|
||||||
|
r fork.exitcode
|
||||||
|
} {3}
|
||||||
|
|
||||||
|
test {Module fork kill} {
|
||||||
|
r fork.create 3
|
||||||
|
after 20
|
||||||
|
r fork.kill
|
||||||
|
after 100
|
||||||
|
|
||||||
|
assert {[count_log_message "fork child started"] eq "2"}
|
||||||
|
assert {[count_log_message "Received SIGUSR1 in child"] eq "1"}
|
||||||
|
assert {[count_log_message "fork child exiting"] eq "1"}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user