diff --git a/src/aof.c b/src/aof.c index 518b98c84..f1586cf90 100644 --- a/src/aof.c +++ b/src/aof.c @@ -1440,7 +1440,7 @@ int rewriteAppendOnlyFileRio(rio *aof) { size_t processed = 0; int j; long key_count = 0; - long long cow_updated_time = 0; + long long updated_time = 0; for (j = 0; j < server.dbnum; j++) { char selectcmd[] = "*2\r\n$6\r\nSELECT\r\n"; @@ -1501,18 +1501,16 @@ int rewriteAppendOnlyFileRio(rio *aof) { aofReadDiffFromParent(); } - /* Update COW info every 1 second (approximately). + /* Update info every 1 second (approximately). * in order to avoid calling mstime() on each iteration, we will * check the diff every 1024 keys */ - if ((key_count & 1023) == 0) { - key_count = 0; + if ((key_count++ & 1023) == 0) { long long now = mstime(); - if (now - cow_updated_time >= 1000) { - sendChildCOWInfo(CHILD_TYPE_AOF, 0, "AOF rewrite"); - cow_updated_time = now; + if (now - updated_time >= 1000) { + sendChildInfo(CHILD_INFO_TYPE_CURRENT_INFO, key_count, "AOF rewrite"); + updated_time = now; } } - key_count++; } dictReleaseIterator(di); di = NULL; @@ -1613,7 +1611,7 @@ int rewriteAppendOnlyFile(char *filename) { size_t bytes_to_write = sdslen(server.aof_child_diff); const char *buf = server.aof_child_diff; long long cow_updated_time = mstime(); - + long long key_count = dbTotalServerKeyCount(); while (bytes_to_write) { /* We write the AOF buffer in chunk of 8MB so that we can check the time in between them */ size_t chunk_size = bytes_to_write < (8<<20) ? bytes_to_write : (8<<20); @@ -1627,7 +1625,7 @@ int rewriteAppendOnlyFile(char *filename) { /* Update COW info */ long long now = mstime(); if (now - cow_updated_time >= 1000) { - sendChildCOWInfo(CHILD_TYPE_AOF, 0, "AOF rewrite"); + sendChildInfo(CHILD_INFO_TYPE_CURRENT_INFO, key_count, "AOF rewrite"); cow_updated_time = now; } } @@ -1761,7 +1759,7 @@ int rewriteAppendOnlyFileBackground(void) { redisSetCpuAffinity(server.aof_rewrite_cpulist); snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) getpid()); if (rewriteAppendOnlyFile(tmpfile) == C_OK) { - sendChildCOWInfo(CHILD_TYPE_AOF, 1, "AOF rewrite"); + sendChildCowInfo(CHILD_INFO_TYPE_AOF_COW_SIZE, "AOF rewrite"); exitFromChild(0); } else { exitFromChild(1); diff --git a/src/childinfo.c b/src/childinfo.c index cae73fe46..d9dd17025 100644 --- a/src/childinfo.c +++ b/src/childinfo.c @@ -31,9 +31,10 @@ #include typedef struct { - int process_type; /* AOF or RDB child? */ - int on_exit; /* COW size of active or exited child */ - size_t cow_size; /* Copy on write size. */ + size_t keys; + size_t cow; + double progress; + childInfoType information_type; /* Type of information */ } child_info_data; /* Open a child-parent channel used in order to move information about the @@ -64,39 +65,48 @@ void closeChildInfoPipe(void) { } } -/* Send COW data to parent. */ -void sendChildInfo(int process_type, int on_exit, size_t cow_size) { +/* Send save data to parent. */ +void sendChildInfoGeneric(childInfoType info_type, size_t keys, double progress, char *pname) { if (server.child_info_pipe[1] == -1) return; - child_info_data buffer = {.process_type = process_type, .on_exit = on_exit, .cow_size = cow_size}; - ssize_t wlen = sizeof(buffer); + child_info_data data = {.information_type=info_type, + .keys=keys, + .cow=zmalloc_get_private_dirty(-1), + .progress=progress}; - if (write(server.child_info_pipe[1],&buffer,wlen) != wlen) { + if (data.cow) { + serverLog((info_type == CHILD_INFO_TYPE_CURRENT_INFO) ? LL_VERBOSE : LL_NOTICE, + "%s: %zu MB of memory used by copy-on-write", + pname, data.cow/(1024*1024)); + } + + ssize_t wlen = sizeof(data); + + if (write(server.child_info_pipe[1], &data, wlen) != wlen) { /* Nothing to do on error, this will be detected by the other side. */ } } -/* Update COW data. */ -void updateChildInfo(int process_type, int on_exit, size_t cow_size) { - if (!on_exit) { - server.stat_current_cow_bytes = cow_size; - return; - } - - if (process_type == CHILD_TYPE_RDB) { - server.stat_rdb_cow_bytes = cow_size; - } else if (process_type == CHILD_TYPE_AOF) { - server.stat_aof_cow_bytes = cow_size; - } else if (process_type == CHILD_TYPE_MODULE) { - server.stat_module_cow_bytes = cow_size; +/* Update Child info. */ +void updateChildInfo(childInfoType information_type, size_t cow, size_t keys, double progress) { + if (information_type == CHILD_INFO_TYPE_CURRENT_INFO) { + server.stat_current_cow_bytes = cow; + server.stat_current_save_keys_processed = keys; + if (progress != -1) server.stat_module_progress = progress; + } else if (information_type == CHILD_INFO_TYPE_AOF_COW_SIZE) { + server.stat_aof_cow_bytes = cow; + } else if (information_type == CHILD_INFO_TYPE_RDB_COW_SIZE) { + server.stat_rdb_cow_bytes = cow; + } else if (information_type == CHILD_INFO_TYPE_MODULE_COW_SIZE) { + server.stat_module_cow_bytes = cow; } } -/* Read COW info data from the pipe. - * if complete data read into the buffer, process type, copy-on-write type and copy-on-write size - * are stored into *process_type, *on_exit and *cow_size respectively and returns 1. +/* Read child info data from the pipe. + * if complete data read into the buffer, + * data is stored into *buffer, and returns 1. * otherwise, the partial data is left in the buffer, waiting for the next read, and returns 0. */ -int readChildInfo(int *process_type, int *on_exit, size_t *cow_size) { +int readChildInfo(childInfoType *information_type, size_t *cow, size_t *keys, double* progress) { /* We are using here a static buffer in combination with the server.child_info_nread to handle short reads */ static child_info_data buffer; ssize_t wlen = sizeof(buffer); @@ -111,25 +121,27 @@ int readChildInfo(int *process_type, int *on_exit, size_t *cow_size) { /* We have complete child info */ if (server.child_info_nread == wlen) { - *process_type = buffer.process_type; - *on_exit = buffer.on_exit; - *cow_size = buffer.cow_size; + *information_type = buffer.information_type; + *cow = buffer.cow; + *keys = buffer.keys; + *progress = buffer.progress; return 1; } else { return 0; } } -/* Receive COW data from child. */ +/* Receive info data from child. */ void receiveChildInfo(void) { if (server.child_info_pipe[0] == -1) return; - int process_type; - int on_exit; - size_t cow_size; + size_t cow; + size_t keys; + double progress; + childInfoType information_type; /* Drain the pipe and update child info so that we get the final message. */ - while (readChildInfo(&process_type, &on_exit, &cow_size)) { - updateChildInfo(process_type, on_exit, cow_size); + while (readChildInfo(&information_type, &cow, &keys, &progress)) { + updateChildInfo(information_type, cow, keys, progress); } } diff --git a/src/module.c b/src/module.c index 5282bd742..fe0cd8345 100644 --- a/src/module.c +++ b/src/module.c @@ -7696,16 +7696,18 @@ int RM_Fork(RedisModuleForkDoneHandler cb, void *user_data) { } /* The module is advised to call this function from the fork child once in a while, - * so that it can report COW memory to the parent which will be reported in INFO */ -void RM_SendChildCOWInfo(void) { - sendChildCOWInfo(CHILD_TYPE_MODULE, 0, "Module fork"); + * so that it can report progress and COW memory to the parent which will be + * reported in INFO. + * The `progress` argument should between 0 and 1, or -1 when not available. */ +void RM_SendChildHeartbeat(double progress) { + sendChildInfoGeneric(CHILD_INFO_TYPE_CURRENT_INFO, 0, progress, "Module fork"); } /* Call from the child process when you want to terminate it. * retcode will be provided to the done handler executed on the parent process. */ int RM_ExitFromChild(int retcode) { - sendChildCOWInfo(CHILD_TYPE_MODULE, 1, "Module fork"); + sendChildCowInfo(CHILD_INFO_TYPE_MODULE_COW_SIZE, "Module fork"); exitFromChild(retcode); return REDISMODULE_OK; } @@ -9239,7 +9241,7 @@ void moduleRegisterCoreAPI(void) { REGISTER_API(CommandFilterArgReplace); REGISTER_API(CommandFilterArgDelete); REGISTER_API(Fork); - REGISTER_API(SendChildCOWInfo); + REGISTER_API(SendChildHeartbeat); REGISTER_API(ExitFromChild); REGISTER_API(KillForkChild); REGISTER_API(RegisterInfoFunc); diff --git a/src/rdb.c b/src/rdb.c index 7deed2a2d..630417302 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -1223,7 +1223,8 @@ int rdbSaveRio(rio *rdb, int *error, int rdbflags, rdbSaveInfo *rsi) { size_t processed = 0; int j; long key_count = 0; - long long cow_updated_time = 0; + long long info_updated_time = 0; + char *pname = (rdbflags & RDBFLAGS_AOF_PREAMBLE) ? "AOF rewrite" : "RDB"; if (server.rdb_checksum) rdb->update_cksum = rioGenericUpdateChecksum; @@ -1270,22 +1271,16 @@ int rdbSaveRio(rio *rdb, int *error, int rdbflags, rdbSaveInfo *rsi) { aofReadDiffFromParent(); } - /* Update COW info every 1 second (approximately). + /* Update child info every 1 second (approximately). * in order to avoid calling mstime() on each iteration, we will * check the diff every 1024 keys */ - if ((key_count & 1023) == 0) { - key_count = 0; + if ((key_count++ & 1023) == 0) { long long now = mstime(); - if (now - cow_updated_time >= 1000) { - if (rdbflags & RDBFLAGS_AOF_PREAMBLE) { - sendChildCOWInfo(CHILD_TYPE_AOF, 0, "AOF rewrite"); - } else { - sendChildCOWInfo(CHILD_TYPE_RDB, 0, "RDB"); - } - cow_updated_time = now; + if (now - info_updated_time >= 1000) { + sendChildInfo(CHILD_INFO_TYPE_CURRENT_INFO, key_count, pname); + info_updated_time = now; } } - key_count++; } dictReleaseIterator(di); di = NULL; /* So that we don't release it again on error. */ @@ -1438,7 +1433,7 @@ int rdbSaveBackground(char *filename, rdbSaveInfo *rsi) { redisSetCpuAffinity(server.bgsave_cpulist); retval = rdbSave(filename,rsi); if (retval == C_OK) { - sendChildCOWInfo(CHILD_TYPE_RDB, 1, "RDB"); + sendChildCowInfo(CHILD_INFO_TYPE_RDB_COW_SIZE, "RDB"); } exitFromChild((retval == C_OK) ? 0 : 1); } else { @@ -2805,7 +2800,7 @@ int rdbSaveToSlavesSockets(rdbSaveInfo *rsi) { retval = C_ERR; if (retval == C_OK) { - sendChildCOWInfo(CHILD_TYPE_RDB, 1, "RDB"); + sendChildCowInfo(CHILD_INFO_TYPE_RDB_COW_SIZE, "RDB"); } rioFreeFd(&rdb); diff --git a/src/redismodule.h b/src/redismodule.h index 60a152452..0c2801bea 100644 --- a/src/redismodule.h +++ b/src/redismodule.h @@ -813,7 +813,7 @@ REDISMODULE_API int (*RedisModule_CommandFilterArgInsert)(RedisModuleCommandFilt REDISMODULE_API int (*RedisModule_CommandFilterArgReplace)(RedisModuleCommandFilterCtx *fctx, int pos, RedisModuleString *arg) REDISMODULE_ATTR; REDISMODULE_API int (*RedisModule_CommandFilterArgDelete)(RedisModuleCommandFilterCtx *fctx, int pos) REDISMODULE_ATTR; REDISMODULE_API int (*RedisModule_Fork)(RedisModuleForkDoneHandler cb, void *user_data) REDISMODULE_ATTR; -REDISMODULE_API void (*RedisModule_SendChildCOWInfo)(void) REDISMODULE_ATTR; +REDISMODULE_API void (*RedisModule_SendChildHeartbeat)(double progress) REDISMODULE_ATTR; REDISMODULE_API int (*RedisModule_ExitFromChild)(int retcode) REDISMODULE_ATTR; REDISMODULE_API int (*RedisModule_KillForkChild)(int child_pid) REDISMODULE_ATTR; REDISMODULE_API float (*RedisModule_GetUsedMemoryRatio)() REDISMODULE_ATTR; @@ -1082,7 +1082,7 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int REDISMODULE_GET_API(CommandFilterArgReplace); REDISMODULE_GET_API(CommandFilterArgDelete); REDISMODULE_GET_API(Fork); - REDISMODULE_GET_API(SendChildCOWInfo); + REDISMODULE_GET_API(SendChildHeartbeat); REDISMODULE_GET_API(ExitFromChild); REDISMODULE_GET_API(KillForkChild); REDISMODULE_GET_API(GetUsedMemoryRatio); diff --git a/src/server.c b/src/server.c index 0d8a55000..993f3cb8b 100644 --- a/src/server.c +++ b/src/server.c @@ -1621,6 +1621,9 @@ void resetChildState() { server.child_type = CHILD_TYPE_NONE; server.child_pid = -1; server.stat_current_cow_bytes = 0; + server.stat_current_save_keys_processed = 0; + server.stat_module_progress = 0; + server.stat_current_save_keys_total = 0; updateDictResizePolicy(); closeChildInfoPipe(); moduleFireServerEvent(REDISMODULE_EVENT_FORK_CHILD, @@ -3236,9 +3239,12 @@ void initServer(void) { server.stat_starttime = time(NULL); server.stat_peak_memory = 0; server.stat_current_cow_bytes = 0; + server.stat_current_save_keys_processed = 0; + server.stat_current_save_keys_total = 0; server.stat_rdb_cow_bytes = 0; server.stat_aof_cow_bytes = 0; server.stat_module_cow_bytes = 0; + server.stat_module_progress = 0; for (int j = 0; j < CLIENT_TYPE_COUNT; j++) server.stat_clients_type_memory[j] = 0; server.cron_malloc_stats.zmalloc_used = 0; @@ -4769,10 +4775,20 @@ sds genRedisInfoString(const char *section) { /* Persistence */ if (allsections || defsections || !strcasecmp(section,"persistence")) { if (sections++) info = sdscat(info,"\r\n"); + double fork_perc = 0; + if (server.stat_module_progress) { + fork_perc = server.stat_module_progress * 100; + } else if (server.stat_current_save_keys_total) { + fork_perc = ((double)server.stat_current_save_keys_processed / server.stat_current_save_keys_total) * 100; + } + info = sdscatprintf(info, "# Persistence\r\n" "loading:%d\r\n" "current_cow_size:%zu\r\n" + "current_fork_perc:%.2f%%\r\n" + "current_save_keys_processed:%zu\r\n" + "current_save_keys_total:%zu\r\n" "rdb_changes_since_last_save:%lld\r\n" "rdb_bgsave_in_progress:%d\r\n" "rdb_last_save_time:%jd\r\n" @@ -4792,6 +4808,9 @@ sds genRedisInfoString(const char *section) { "module_fork_last_cow_size:%zu\r\n", (int)server.loading, server.stat_current_cow_bytes, + fork_perc, + server.stat_current_save_keys_processed, + server.stat_current_save_keys_total, server.dirty, server.child_type == CHILD_TYPE_RDB, (intmax_t)server.lastsave, @@ -5662,6 +5681,9 @@ int redisFork(int purpose) { server.child_pid = childpid; server.child_type = purpose; server.stat_current_cow_bytes = 0; + server.stat_current_save_keys_processed = 0; + server.stat_module_progress = 0; + server.stat_current_save_keys_total = dbTotalServerKeyCount(); } updateDictResizePolicy(); @@ -5672,16 +5694,12 @@ int redisFork(int purpose) { return childpid; } -void sendChildCOWInfo(int ptype, int on_exit, char *pname) { - size_t private_dirty = zmalloc_get_private_dirty(-1); +void sendChildCowInfo(childInfoType info_type, char *pname) { + sendChildInfoGeneric(info_type, 0, -1, pname); +} - if (private_dirty) { - serverLog(on_exit ? LL_NOTICE : LL_VERBOSE, - "%s: %zu MB of memory used by copy-on-write", - pname, private_dirty/(1024*1024)); - } - - sendChildInfo(ptype, on_exit, private_dirty); +void sendChildInfo(childInfoType info_type, size_t keys, char *pname) { + sendChildInfoGeneric(info_type, keys, -1, pname); } void memtest(size_t megabytes, int passes); diff --git a/src/server.h b/src/server.h index a9841886d..2bfcc015d 100644 --- a/src/server.h +++ b/src/server.h @@ -1142,6 +1142,13 @@ struct clusterState; #define CHILD_TYPE_LDB 3 #define CHILD_TYPE_MODULE 4 +typedef enum childInfoType { + CHILD_INFO_TYPE_CURRENT_INFO, + CHILD_INFO_TYPE_AOF_COW_SIZE, + CHILD_INFO_TYPE_RDB_COW_SIZE, + CHILD_INFO_TYPE_MODULE_COW_SIZE +} childInfoType; + struct redisServer { /* General */ pid_t pid; /* Main process pid. */ @@ -1270,9 +1277,12 @@ struct redisServer { redisAtomic long long stat_net_input_bytes; /* Bytes read from network. */ redisAtomic long long stat_net_output_bytes; /* Bytes written to network. */ size_t stat_current_cow_bytes; /* Copy on write bytes while child is active. */ + size_t stat_current_save_keys_processed; /* Processed keys while child is active. */ + size_t stat_current_save_keys_total; /* Number of keys when child started. */ size_t stat_rdb_cow_bytes; /* Copy on write bytes during RDB saving. */ size_t stat_aof_cow_bytes; /* Copy on write bytes during AOF rewrite. */ size_t stat_module_cow_bytes; /* Copy on write bytes during module fork. */ + double stat_module_progress; /* Module save progress. */ uint64_t stat_clients_type_memory[CLIENT_TYPE_COUNT];/* Mem usage by type */ long long stat_unexpected_error_replies; /* Number of unexpected (aof-loading, replica to master, etc.) error replies */ long long stat_total_error_replies; /* Total number of issued error replies ( command + rejected errors ) */ @@ -2060,7 +2070,9 @@ void restartAOFAfterSYNC(); /* Child info */ void openChildInfoPipe(void); void closeChildInfoPipe(void); -void sendChildInfo(int process_type, int on_exit, size_t cow_size); +void sendChildInfoGeneric(childInfoType info_type, size_t keys, double progress, char *pname); +void sendChildCowInfo(childInfoType info_type, char *pname); +void sendChildInfo(childInfoType info_type, size_t keys, char *pname); void receiveChildInfo(void); /* Fork helpers */ @@ -2068,7 +2080,6 @@ int redisFork(int type); int hasActiveChildProcess(); void resetChildState(); int isMutuallyExclusiveChildType(int type); -void sendChildCOWInfo(int ptype, int on_exit, char *pname); /* acl.c -- Authentication related prototypes. */ extern rax *Users; diff --git a/tests/integration/rdb.tcl b/tests/integration/rdb.tcl index a89221197..7df1e2f74 100644 --- a/tests/integration/rdb.tcl +++ b/tests/integration/rdb.tcl @@ -206,10 +206,13 @@ set system_name [string tolower [exec uname -s]] if {$system_name eq {linux}} { start_server {overrides {save ""}} { - test {Test child sending COW info} { + test {Test child sending info} { # make sure that rdb_last_cow_size and current_cow_size are zero (the test using new server), # so that the comparisons during the test will be valid assert {[s current_cow_size] == 0} + assert {[s current_save_keys_processed] == 0} + assert {[s current_save_keys_total] == 0} + assert {[s rdb_last_cow_size] == 0} # using a 200us delay, the bgsave is empirically taking about 10 seconds. @@ -234,23 +237,35 @@ start_server {overrides {save ""}} { # start background rdb save r bgsave + set current_save_keys_total [s current_save_keys_total] + if {$::verbose} { + puts "Keys before bgsave start: current_save_keys_total" + } + # on each iteration, we will write some key to the server to trigger copy-on-write, and # wait to see that it reflected in INFO. set iteration 1 while 1 { - # take a sample before writing new data to the server + # take samples before writing new data to the server set cow_size [s current_cow_size] if {$::verbose} { puts "COW info before copy-on-write: $cow_size" } + set keys_processed [s current_save_keys_processed] + if {$::verbose} { + puts "current_save_keys_processed info : $keys_processed" + } + # trigger copy-on-write r setrange key$iteration 0 [string repeat B $size] # wait to see that current_cow_size value updated (as long as the child is in progress) wait_for_condition 80 100 { [s rdb_bgsave_in_progress] == 0 || - [s current_cow_size] >= $cow_size + $size + [s current_cow_size] >= $cow_size + $size && + [s current_save_keys_processed] > $keys_processed && + [s current_fork_perc] > 0 } else { if {$::verbose} { puts "COW info on fail: [s current_cow_size]" @@ -259,6 +274,9 @@ start_server {overrides {save ""}} { fail "COW info wasn't reported" } + # assert that $keys_processed is not greater than total keys. + assert_morethan_equal $current_save_keys_total $keys_processed + # for no accurate, stop after 2 iterations if {!$::accurate && $iteration == 2} { break