Adds INFO fields to track fork child progress (#8414)

* Adding current_save_keys_total and current_save_keys_processed info fields.
  Present in replication, BGSAVE and AOFRW.
* Changing RM_SendChildCOWInfo() to RM_SendChildHeartbeat(double progress)
* Adding new info field current_fork_perc. Present in Replication, BGSAVE, AOFRW,
  and module forks.
This commit is contained in:
uriyage 2021-02-16 16:06:51 +02:00 committed by GitHub
parent acb32d472d
commit fd052d2a86
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 134 additions and 80 deletions

View File

@ -1440,7 +1440,7 @@ int rewriteAppendOnlyFileRio(rio *aof) {
size_t processed = 0;
int j;
long key_count = 0;
long long cow_updated_time = 0;
long long updated_time = 0;
for (j = 0; j < server.dbnum; j++) {
char selectcmd[] = "*2\r\n$6\r\nSELECT\r\n";
@ -1501,18 +1501,16 @@ int rewriteAppendOnlyFileRio(rio *aof) {
aofReadDiffFromParent();
}
/* Update COW info every 1 second (approximately).
/* Update info every 1 second (approximately).
* in order to avoid calling mstime() on each iteration, we will
* check the diff every 1024 keys */
if ((key_count & 1023) == 0) {
key_count = 0;
if ((key_count++ & 1023) == 0) {
long long now = mstime();
if (now - cow_updated_time >= 1000) {
sendChildCOWInfo(CHILD_TYPE_AOF, 0, "AOF rewrite");
cow_updated_time = now;
if (now - updated_time >= 1000) {
sendChildInfo(CHILD_INFO_TYPE_CURRENT_INFO, key_count, "AOF rewrite");
updated_time = now;
}
}
key_count++;
}
dictReleaseIterator(di);
di = NULL;
@ -1613,7 +1611,7 @@ int rewriteAppendOnlyFile(char *filename) {
size_t bytes_to_write = sdslen(server.aof_child_diff);
const char *buf = server.aof_child_diff;
long long cow_updated_time = mstime();
long long key_count = dbTotalServerKeyCount();
while (bytes_to_write) {
/* We write the AOF buffer in chunk of 8MB so that we can check the time in between them */
size_t chunk_size = bytes_to_write < (8<<20) ? bytes_to_write : (8<<20);
@ -1627,7 +1625,7 @@ int rewriteAppendOnlyFile(char *filename) {
/* Update COW info */
long long now = mstime();
if (now - cow_updated_time >= 1000) {
sendChildCOWInfo(CHILD_TYPE_AOF, 0, "AOF rewrite");
sendChildInfo(CHILD_INFO_TYPE_CURRENT_INFO, key_count, "AOF rewrite");
cow_updated_time = now;
}
}
@ -1761,7 +1759,7 @@ int rewriteAppendOnlyFileBackground(void) {
redisSetCpuAffinity(server.aof_rewrite_cpulist);
snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) getpid());
if (rewriteAppendOnlyFile(tmpfile) == C_OK) {
sendChildCOWInfo(CHILD_TYPE_AOF, 1, "AOF rewrite");
sendChildCowInfo(CHILD_INFO_TYPE_AOF_COW_SIZE, "AOF rewrite");
exitFromChild(0);
} else {
exitFromChild(1);

View File

@ -31,9 +31,10 @@
#include <unistd.h>
typedef struct {
int process_type; /* AOF or RDB child? */
int on_exit; /* COW size of active or exited child */
size_t cow_size; /* Copy on write size. */
size_t keys;
size_t cow;
double progress;
childInfoType information_type; /* Type of information */
} child_info_data;
/* Open a child-parent channel used in order to move information about the
@ -64,39 +65,48 @@ void closeChildInfoPipe(void) {
}
}
/* Send COW data to parent. */
void sendChildInfo(int process_type, int on_exit, size_t cow_size) {
/* Send save data to parent. */
void sendChildInfoGeneric(childInfoType info_type, size_t keys, double progress, char *pname) {
if (server.child_info_pipe[1] == -1) return;
child_info_data buffer = {.process_type = process_type, .on_exit = on_exit, .cow_size = cow_size};
ssize_t wlen = sizeof(buffer);
child_info_data data = {.information_type=info_type,
.keys=keys,
.cow=zmalloc_get_private_dirty(-1),
.progress=progress};
if (write(server.child_info_pipe[1],&buffer,wlen) != wlen) {
if (data.cow) {
serverLog((info_type == CHILD_INFO_TYPE_CURRENT_INFO) ? LL_VERBOSE : LL_NOTICE,
"%s: %zu MB of memory used by copy-on-write",
pname, data.cow/(1024*1024));
}
ssize_t wlen = sizeof(data);
if (write(server.child_info_pipe[1], &data, wlen) != wlen) {
/* Nothing to do on error, this will be detected by the other side. */
}
}
/* Update COW data. */
void updateChildInfo(int process_type, int on_exit, size_t cow_size) {
if (!on_exit) {
server.stat_current_cow_bytes = cow_size;
return;
}
if (process_type == CHILD_TYPE_RDB) {
server.stat_rdb_cow_bytes = cow_size;
} else if (process_type == CHILD_TYPE_AOF) {
server.stat_aof_cow_bytes = cow_size;
} else if (process_type == CHILD_TYPE_MODULE) {
server.stat_module_cow_bytes = cow_size;
/* Update Child info. */
void updateChildInfo(childInfoType information_type, size_t cow, size_t keys, double progress) {
if (information_type == CHILD_INFO_TYPE_CURRENT_INFO) {
server.stat_current_cow_bytes = cow;
server.stat_current_save_keys_processed = keys;
if (progress != -1) server.stat_module_progress = progress;
} else if (information_type == CHILD_INFO_TYPE_AOF_COW_SIZE) {
server.stat_aof_cow_bytes = cow;
} else if (information_type == CHILD_INFO_TYPE_RDB_COW_SIZE) {
server.stat_rdb_cow_bytes = cow;
} else if (information_type == CHILD_INFO_TYPE_MODULE_COW_SIZE) {
server.stat_module_cow_bytes = cow;
}
}
/* Read COW info data from the pipe.
* if complete data read into the buffer, process type, copy-on-write type and copy-on-write size
* are stored into *process_type, *on_exit and *cow_size respectively and returns 1.
/* Read child info data from the pipe.
* if complete data read into the buffer,
* data is stored into *buffer, and returns 1.
* otherwise, the partial data is left in the buffer, waiting for the next read, and returns 0. */
int readChildInfo(int *process_type, int *on_exit, size_t *cow_size) {
int readChildInfo(childInfoType *information_type, size_t *cow, size_t *keys, double* progress) {
/* We are using here a static buffer in combination with the server.child_info_nread to handle short reads */
static child_info_data buffer;
ssize_t wlen = sizeof(buffer);
@ -111,25 +121,27 @@ int readChildInfo(int *process_type, int *on_exit, size_t *cow_size) {
/* We have complete child info */
if (server.child_info_nread == wlen) {
*process_type = buffer.process_type;
*on_exit = buffer.on_exit;
*cow_size = buffer.cow_size;
*information_type = buffer.information_type;
*cow = buffer.cow;
*keys = buffer.keys;
*progress = buffer.progress;
return 1;
} else {
return 0;
}
}
/* Receive COW data from child. */
/* Receive info data from child. */
void receiveChildInfo(void) {
if (server.child_info_pipe[0] == -1) return;
int process_type;
int on_exit;
size_t cow_size;
size_t cow;
size_t keys;
double progress;
childInfoType information_type;
/* Drain the pipe and update child info so that we get the final message. */
while (readChildInfo(&process_type, &on_exit, &cow_size)) {
updateChildInfo(process_type, on_exit, cow_size);
while (readChildInfo(&information_type, &cow, &keys, &progress)) {
updateChildInfo(information_type, cow, keys, progress);
}
}

View File

@ -7696,16 +7696,18 @@ int RM_Fork(RedisModuleForkDoneHandler cb, void *user_data) {
}
/* The module is advised to call this function from the fork child once in a while,
* so that it can report COW memory to the parent which will be reported in INFO */
void RM_SendChildCOWInfo(void) {
sendChildCOWInfo(CHILD_TYPE_MODULE, 0, "Module fork");
* so that it can report progress and COW memory to the parent which will be
* reported in INFO.
* The `progress` argument should between 0 and 1, or -1 when not available. */
void RM_SendChildHeartbeat(double progress) {
sendChildInfoGeneric(CHILD_INFO_TYPE_CURRENT_INFO, 0, progress, "Module fork");
}
/* Call from the child process when you want to terminate it.
* retcode will be provided to the done handler executed on the parent process.
*/
int RM_ExitFromChild(int retcode) {
sendChildCOWInfo(CHILD_TYPE_MODULE, 1, "Module fork");
sendChildCowInfo(CHILD_INFO_TYPE_MODULE_COW_SIZE, "Module fork");
exitFromChild(retcode);
return REDISMODULE_OK;
}
@ -9239,7 +9241,7 @@ void moduleRegisterCoreAPI(void) {
REGISTER_API(CommandFilterArgReplace);
REGISTER_API(CommandFilterArgDelete);
REGISTER_API(Fork);
REGISTER_API(SendChildCOWInfo);
REGISTER_API(SendChildHeartbeat);
REGISTER_API(ExitFromChild);
REGISTER_API(KillForkChild);
REGISTER_API(RegisterInfoFunc);

View File

@ -1223,7 +1223,8 @@ int rdbSaveRio(rio *rdb, int *error, int rdbflags, rdbSaveInfo *rsi) {
size_t processed = 0;
int j;
long key_count = 0;
long long cow_updated_time = 0;
long long info_updated_time = 0;
char *pname = (rdbflags & RDBFLAGS_AOF_PREAMBLE) ? "AOF rewrite" : "RDB";
if (server.rdb_checksum)
rdb->update_cksum = rioGenericUpdateChecksum;
@ -1270,22 +1271,16 @@ int rdbSaveRio(rio *rdb, int *error, int rdbflags, rdbSaveInfo *rsi) {
aofReadDiffFromParent();
}
/* Update COW info every 1 second (approximately).
/* Update child info every 1 second (approximately).
* in order to avoid calling mstime() on each iteration, we will
* check the diff every 1024 keys */
if ((key_count & 1023) == 0) {
key_count = 0;
if ((key_count++ & 1023) == 0) {
long long now = mstime();
if (now - cow_updated_time >= 1000) {
if (rdbflags & RDBFLAGS_AOF_PREAMBLE) {
sendChildCOWInfo(CHILD_TYPE_AOF, 0, "AOF rewrite");
} else {
sendChildCOWInfo(CHILD_TYPE_RDB, 0, "RDB");
}
cow_updated_time = now;
if (now - info_updated_time >= 1000) {
sendChildInfo(CHILD_INFO_TYPE_CURRENT_INFO, key_count, pname);
info_updated_time = now;
}
}
key_count++;
}
dictReleaseIterator(di);
di = NULL; /* So that we don't release it again on error. */
@ -1438,7 +1433,7 @@ int rdbSaveBackground(char *filename, rdbSaveInfo *rsi) {
redisSetCpuAffinity(server.bgsave_cpulist);
retval = rdbSave(filename,rsi);
if (retval == C_OK) {
sendChildCOWInfo(CHILD_TYPE_RDB, 1, "RDB");
sendChildCowInfo(CHILD_INFO_TYPE_RDB_COW_SIZE, "RDB");
}
exitFromChild((retval == C_OK) ? 0 : 1);
} else {
@ -2805,7 +2800,7 @@ int rdbSaveToSlavesSockets(rdbSaveInfo *rsi) {
retval = C_ERR;
if (retval == C_OK) {
sendChildCOWInfo(CHILD_TYPE_RDB, 1, "RDB");
sendChildCowInfo(CHILD_INFO_TYPE_RDB_COW_SIZE, "RDB");
}
rioFreeFd(&rdb);

View File

@ -813,7 +813,7 @@ REDISMODULE_API int (*RedisModule_CommandFilterArgInsert)(RedisModuleCommandFilt
REDISMODULE_API int (*RedisModule_CommandFilterArgReplace)(RedisModuleCommandFilterCtx *fctx, int pos, RedisModuleString *arg) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_CommandFilterArgDelete)(RedisModuleCommandFilterCtx *fctx, int pos) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_Fork)(RedisModuleForkDoneHandler cb, void *user_data) REDISMODULE_ATTR;
REDISMODULE_API void (*RedisModule_SendChildCOWInfo)(void) REDISMODULE_ATTR;
REDISMODULE_API void (*RedisModule_SendChildHeartbeat)(double progress) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_ExitFromChild)(int retcode) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_KillForkChild)(int child_pid) REDISMODULE_ATTR;
REDISMODULE_API float (*RedisModule_GetUsedMemoryRatio)() REDISMODULE_ATTR;
@ -1082,7 +1082,7 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int
REDISMODULE_GET_API(CommandFilterArgReplace);
REDISMODULE_GET_API(CommandFilterArgDelete);
REDISMODULE_GET_API(Fork);
REDISMODULE_GET_API(SendChildCOWInfo);
REDISMODULE_GET_API(SendChildHeartbeat);
REDISMODULE_GET_API(ExitFromChild);
REDISMODULE_GET_API(KillForkChild);
REDISMODULE_GET_API(GetUsedMemoryRatio);

View File

@ -1621,6 +1621,9 @@ void resetChildState() {
server.child_type = CHILD_TYPE_NONE;
server.child_pid = -1;
server.stat_current_cow_bytes = 0;
server.stat_current_save_keys_processed = 0;
server.stat_module_progress = 0;
server.stat_current_save_keys_total = 0;
updateDictResizePolicy();
closeChildInfoPipe();
moduleFireServerEvent(REDISMODULE_EVENT_FORK_CHILD,
@ -3236,9 +3239,12 @@ void initServer(void) {
server.stat_starttime = time(NULL);
server.stat_peak_memory = 0;
server.stat_current_cow_bytes = 0;
server.stat_current_save_keys_processed = 0;
server.stat_current_save_keys_total = 0;
server.stat_rdb_cow_bytes = 0;
server.stat_aof_cow_bytes = 0;
server.stat_module_cow_bytes = 0;
server.stat_module_progress = 0;
for (int j = 0; j < CLIENT_TYPE_COUNT; j++)
server.stat_clients_type_memory[j] = 0;
server.cron_malloc_stats.zmalloc_used = 0;
@ -4769,10 +4775,20 @@ sds genRedisInfoString(const char *section) {
/* Persistence */
if (allsections || defsections || !strcasecmp(section,"persistence")) {
if (sections++) info = sdscat(info,"\r\n");
double fork_perc = 0;
if (server.stat_module_progress) {
fork_perc = server.stat_module_progress * 100;
} else if (server.stat_current_save_keys_total) {
fork_perc = ((double)server.stat_current_save_keys_processed / server.stat_current_save_keys_total) * 100;
}
info = sdscatprintf(info,
"# Persistence\r\n"
"loading:%d\r\n"
"current_cow_size:%zu\r\n"
"current_fork_perc:%.2f%%\r\n"
"current_save_keys_processed:%zu\r\n"
"current_save_keys_total:%zu\r\n"
"rdb_changes_since_last_save:%lld\r\n"
"rdb_bgsave_in_progress:%d\r\n"
"rdb_last_save_time:%jd\r\n"
@ -4792,6 +4808,9 @@ sds genRedisInfoString(const char *section) {
"module_fork_last_cow_size:%zu\r\n",
(int)server.loading,
server.stat_current_cow_bytes,
fork_perc,
server.stat_current_save_keys_processed,
server.stat_current_save_keys_total,
server.dirty,
server.child_type == CHILD_TYPE_RDB,
(intmax_t)server.lastsave,
@ -5662,6 +5681,9 @@ int redisFork(int purpose) {
server.child_pid = childpid;
server.child_type = purpose;
server.stat_current_cow_bytes = 0;
server.stat_current_save_keys_processed = 0;
server.stat_module_progress = 0;
server.stat_current_save_keys_total = dbTotalServerKeyCount();
}
updateDictResizePolicy();
@ -5672,16 +5694,12 @@ int redisFork(int purpose) {
return childpid;
}
void sendChildCOWInfo(int ptype, int on_exit, char *pname) {
size_t private_dirty = zmalloc_get_private_dirty(-1);
if (private_dirty) {
serverLog(on_exit ? LL_NOTICE : LL_VERBOSE,
"%s: %zu MB of memory used by copy-on-write",
pname, private_dirty/(1024*1024));
void sendChildCowInfo(childInfoType info_type, char *pname) {
sendChildInfoGeneric(info_type, 0, -1, pname);
}
sendChildInfo(ptype, on_exit, private_dirty);
void sendChildInfo(childInfoType info_type, size_t keys, char *pname) {
sendChildInfoGeneric(info_type, keys, -1, pname);
}
void memtest(size_t megabytes, int passes);

View File

@ -1142,6 +1142,13 @@ struct clusterState;
#define CHILD_TYPE_LDB 3
#define CHILD_TYPE_MODULE 4
typedef enum childInfoType {
CHILD_INFO_TYPE_CURRENT_INFO,
CHILD_INFO_TYPE_AOF_COW_SIZE,
CHILD_INFO_TYPE_RDB_COW_SIZE,
CHILD_INFO_TYPE_MODULE_COW_SIZE
} childInfoType;
struct redisServer {
/* General */
pid_t pid; /* Main process pid. */
@ -1270,9 +1277,12 @@ struct redisServer {
redisAtomic long long stat_net_input_bytes; /* Bytes read from network. */
redisAtomic long long stat_net_output_bytes; /* Bytes written to network. */
size_t stat_current_cow_bytes; /* Copy on write bytes while child is active. */
size_t stat_current_save_keys_processed; /* Processed keys while child is active. */
size_t stat_current_save_keys_total; /* Number of keys when child started. */
size_t stat_rdb_cow_bytes; /* Copy on write bytes during RDB saving. */
size_t stat_aof_cow_bytes; /* Copy on write bytes during AOF rewrite. */
size_t stat_module_cow_bytes; /* Copy on write bytes during module fork. */
double stat_module_progress; /* Module save progress. */
uint64_t stat_clients_type_memory[CLIENT_TYPE_COUNT];/* Mem usage by type */
long long stat_unexpected_error_replies; /* Number of unexpected (aof-loading, replica to master, etc.) error replies */
long long stat_total_error_replies; /* Total number of issued error replies ( command + rejected errors ) */
@ -2060,7 +2070,9 @@ void restartAOFAfterSYNC();
/* Child info */
void openChildInfoPipe(void);
void closeChildInfoPipe(void);
void sendChildInfo(int process_type, int on_exit, size_t cow_size);
void sendChildInfoGeneric(childInfoType info_type, size_t keys, double progress, char *pname);
void sendChildCowInfo(childInfoType info_type, char *pname);
void sendChildInfo(childInfoType info_type, size_t keys, char *pname);
void receiveChildInfo(void);
/* Fork helpers */
@ -2068,7 +2080,6 @@ int redisFork(int type);
int hasActiveChildProcess();
void resetChildState();
int isMutuallyExclusiveChildType(int type);
void sendChildCOWInfo(int ptype, int on_exit, char *pname);
/* acl.c -- Authentication related prototypes. */
extern rax *Users;

View File

@ -206,10 +206,13 @@ set system_name [string tolower [exec uname -s]]
if {$system_name eq {linux}} {
start_server {overrides {save ""}} {
test {Test child sending COW info} {
test {Test child sending info} {
# make sure that rdb_last_cow_size and current_cow_size are zero (the test using new server),
# so that the comparisons during the test will be valid
assert {[s current_cow_size] == 0}
assert {[s current_save_keys_processed] == 0}
assert {[s current_save_keys_total] == 0}
assert {[s rdb_last_cow_size] == 0}
# using a 200us delay, the bgsave is empirically taking about 10 seconds.
@ -234,23 +237,35 @@ start_server {overrides {save ""}} {
# start background rdb save
r bgsave
set current_save_keys_total [s current_save_keys_total]
if {$::verbose} {
puts "Keys before bgsave start: current_save_keys_total"
}
# on each iteration, we will write some key to the server to trigger copy-on-write, and
# wait to see that it reflected in INFO.
set iteration 1
while 1 {
# take a sample before writing new data to the server
# take samples before writing new data to the server
set cow_size [s current_cow_size]
if {$::verbose} {
puts "COW info before copy-on-write: $cow_size"
}
set keys_processed [s current_save_keys_processed]
if {$::verbose} {
puts "current_save_keys_processed info : $keys_processed"
}
# trigger copy-on-write
r setrange key$iteration 0 [string repeat B $size]
# wait to see that current_cow_size value updated (as long as the child is in progress)
wait_for_condition 80 100 {
[s rdb_bgsave_in_progress] == 0 ||
[s current_cow_size] >= $cow_size + $size
[s current_cow_size] >= $cow_size + $size &&
[s current_save_keys_processed] > $keys_processed &&
[s current_fork_perc] > 0
} else {
if {$::verbose} {
puts "COW info on fail: [s current_cow_size]"
@ -259,6 +274,9 @@ start_server {overrides {save ""}} {
fail "COW info wasn't reported"
}
# assert that $keys_processed is not greater than total keys.
assert_morethan_equal $current_save_keys_total $keys_processed
# for no accurate, stop after 2 iterations
if {!$::accurate && $iteration == 2} {
break