diff --git a/src/aof.c b/src/aof.c index cbc0989d0..3f33bda3a 100644 --- a/src/aof.c +++ b/src/aof.c @@ -773,6 +773,7 @@ int loadAppendOnlyFile(char *filename) { if (!(loops++ % 1000)) { loadingProgress(ftello(fp)); processEventsWhileBlocked(); + loadingCron(); processModuleLoadingProgressEvent(1); } diff --git a/src/module.c b/src/module.c index ce1f1edc9..de66af506 100644 --- a/src/module.c +++ b/src/module.c @@ -7258,7 +7258,7 @@ void moduleUnsubscribeAllServerEvents(RedisModule *module) { } void processModuleLoadingProgressEvent(int is_aof) { - long long now = ustime(); + long long now = server.ustime; static long long next_event = 0; if (now >= next_event) { /* Fire the loading progress modules end event. */ diff --git a/src/networking.c b/src/networking.c index 24424ab90..295f75283 100644 --- a/src/networking.c +++ b/src/networking.c @@ -2887,6 +2887,10 @@ int clientsArePaused(void) { void processEventsWhileBlocked(void) { int iterations = 4; /* See the function top-comment. */ + /* Update our cached time since it is used to create and update the last + * interaction time with clients and for other important things. */ + updateCachedTime(0); + /* Note: when we are processing events while blocked (for instance during * busy Lua scripts), we set a global flag. When such flag is set, we * avoid handling the read part of clients using threaded I/O. diff --git a/src/rdb.c b/src/rdb.c index 54a169cd8..eab77fb9e 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -2069,14 +2069,11 @@ void rdbLoadProgressCallback(rio *r, const void *buf, size_t len) { if (server.loading_process_events_interval_bytes && (r->processed_bytes + len)/server.loading_process_events_interval_bytes > r->processed_bytes/server.loading_process_events_interval_bytes) { - /* The DB can take some non trivial amount of time to load. Update - * our cached time since it is used to create and update the last - * interaction time with clients and for other important things. */ - updateCachedTime(0); if (server.masterhost && server.repl_state == REPL_STATE_TRANSFER) replicationSendNewlineToMaster(); loadingProgress(r->processed_bytes); processEventsWhileBlocked(); + loadingCron(); processModuleLoadingProgressEvent(0); } } diff --git a/src/server.c b/src/server.c index 42fa87898..6363a9b3b 100644 --- a/src/server.c +++ b/src/server.c @@ -1839,6 +1839,41 @@ void checkChildrenDone(void) { } } +/* Called from serverCron and loadingCron to update cached memory metrics. */ +void cronUpdateMemoryStats() { + /* Record the max memory used since the server was started. */ + if (zmalloc_used_memory() > server.stat_peak_memory) + server.stat_peak_memory = zmalloc_used_memory(); + + run_with_period(100) { + /* Sample the RSS and other metrics here since this is a relatively slow call. + * We must sample the zmalloc_used at the same time we take the rss, otherwise + * the frag ratio calculate may be off (ratio of two samples at different times) */ + server.cron_malloc_stats.process_rss = zmalloc_get_rss(); + server.cron_malloc_stats.zmalloc_used = zmalloc_used_memory(); + /* Sampling the allcator info can be slow too. + * The fragmentation ratio it'll show is potentically more accurate + * it excludes other RSS pages such as: shared libraries, LUA and other non-zmalloc + * allocations, and allocator reserved pages that can be pursed (all not actual frag) */ + zmalloc_get_allocator_info(&server.cron_malloc_stats.allocator_allocated, + &server.cron_malloc_stats.allocator_active, + &server.cron_malloc_stats.allocator_resident); + /* in case the allocator isn't providing these stats, fake them so that + * fragmention info still shows some (inaccurate metrics) */ + if (!server.cron_malloc_stats.allocator_resident) { + /* LUA memory isn't part of zmalloc_used, but it is part of the process RSS, + * so we must desuct it in order to be able to calculate correct + * "allocator fragmentation" ratio */ + size_t lua_memory = lua_gc(server.lua,LUA_GCCOUNT,0)*1024LL; + server.cron_malloc_stats.allocator_resident = server.cron_malloc_stats.process_rss - lua_memory; + } + if (!server.cron_malloc_stats.allocator_active) + server.cron_malloc_stats.allocator_active = server.cron_malloc_stats.allocator_resident; + if (!server.cron_malloc_stats.allocator_allocated) + server.cron_malloc_stats.allocator_allocated = server.cron_malloc_stats.zmalloc_used; + } +} + /* This is our timer interrupt, called server.hz times per second. * Here is where we do a number of things that need to be done asynchronously. * For instance: @@ -1907,37 +1942,7 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { * LRU_CLOCK_RESOLUTION define. */ server.lruclock = getLRUClock(); - /* Record the max memory used since the server was started. */ - if (zmalloc_used_memory() > server.stat_peak_memory) - server.stat_peak_memory = zmalloc_used_memory(); - - run_with_period(100) { - /* Sample the RSS and other metrics here since this is a relatively slow call. - * We must sample the zmalloc_used at the same time we take the rss, otherwise - * the frag ratio calculate may be off (ratio of two samples at different times) */ - server.cron_malloc_stats.process_rss = zmalloc_get_rss(); - server.cron_malloc_stats.zmalloc_used = zmalloc_used_memory(); - /* Sampling the allcator info can be slow too. - * The fragmentation ratio it'll show is potentically more accurate - * it excludes other RSS pages such as: shared libraries, LUA and other non-zmalloc - * allocations, and allocator reserved pages that can be pursed (all not actual frag) */ - zmalloc_get_allocator_info(&server.cron_malloc_stats.allocator_allocated, - &server.cron_malloc_stats.allocator_active, - &server.cron_malloc_stats.allocator_resident); - /* in case the allocator isn't providing these stats, fake them so that - * fragmention info still shows some (inaccurate metrics) */ - if (!server.cron_malloc_stats.allocator_resident) { - /* LUA memory isn't part of zmalloc_used, but it is part of the process RSS, - * so we must desuct it in order to be able to calculate correct - * "allocator fragmentation" ratio */ - size_t lua_memory = lua_gc(server.lua,LUA_GCCOUNT,0)*1024LL; - server.cron_malloc_stats.allocator_resident = server.cron_malloc_stats.process_rss - lua_memory; - } - if (!server.cron_malloc_stats.allocator_active) - server.cron_malloc_stats.allocator_active = server.cron_malloc_stats.allocator_resident; - if (!server.cron_malloc_stats.allocator_allocated) - server.cron_malloc_stats.allocator_allocated = server.cron_malloc_stats.zmalloc_used; - } + cronUpdateMemoryStats(); /* We received a SIGTERM, shutting down here in a safe way, as it is * not ok doing so inside the signal handler. */ @@ -2103,6 +2108,24 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { return 1000/server.hz; } +/* This function fill in the role of serverCron during RDB or AOF loading. + * It attempts to do its duties at a similar rate as the configured server.hz, + * and updates cronloops variable so that similarly to serverCron, the + * run_with_period can be used. */ +void loadingCron() { + long long now = server.ustime; + static long long next_event = 0; + if (now >= next_event) { + cronUpdateMemoryStats(); + + /* Increment cronloop so that run_with_period works. */ + server.cronloops++; + + /* Decide when the next event should fire. */ + next_event = now + 1000000 / server.hz; + } +} + extern int ProcessingEventsWhileBlocked; /* This function gets called every time Redis is entering the diff --git a/src/server.h b/src/server.h index 9badea9e8..97f663076 100644 --- a/src/server.h +++ b/src/server.h @@ -1696,6 +1696,7 @@ int listenToPort(int port, int *fds, int *count); void pauseClients(mstime_t duration); int clientsArePaused(void); void processEventsWhileBlocked(void); +void loadingCron(void); int handleClientsWithPendingWrites(void); int handleClientsWithPendingWritesUsingThreads(void); int handleClientsWithPendingReadsUsingThreads(void);