mirror of
https://codeberg.org/redict/redict.git
synced 2025-01-22 08:08:53 -05:00
active memory defragmentation
This commit is contained in:
parent
6712bce92c
commit
7aa9e6d2ae
32
deps/jemalloc/src/jemalloc.c
vendored
32
deps/jemalloc/src/jemalloc.c
vendored
@ -2591,3 +2591,35 @@ jemalloc_postfork_child(void)
|
||||
}
|
||||
|
||||
/******************************************************************************/
|
||||
|
||||
/* Helps the application decide if a pointer is worth re-allocating in order to reduce fragmentation.
|
||||
* returns 0 if the allocation is in the currently active run,
|
||||
* or when it is not causing any frag issue (large or huge bin)
|
||||
* returns the bin utilization and run utilization both in fixed point 16:16.
|
||||
* If the application decides to re-allocate it should use MALLOCX_TCACHE_NONE when doing so. */
|
||||
JEMALLOC_EXPORT int JEMALLOC_NOTHROW
|
||||
je_get_defrag_hint(void* ptr, int *bin_util, int *run_util) {
|
||||
int defrag = 0;
|
||||
arena_chunk_t *chunk = (arena_chunk_t *)CHUNK_ADDR2BASE(ptr);
|
||||
if (likely(chunk != ptr)) { /* indication that this is not a HUGE alloc */
|
||||
size_t pageind = ((uintptr_t)ptr - (uintptr_t)chunk) >> LG_PAGE;
|
||||
size_t mapbits = arena_mapbits_get(chunk, pageind);
|
||||
if (likely((mapbits & CHUNK_MAP_LARGE) == 0)) { /* indication that this is not a LARGE alloc */
|
||||
arena_t *arena = extent_node_arena_get(&chunk->node);
|
||||
size_t rpages_ind = pageind - arena_mapbits_small_runind_get(chunk, pageind);
|
||||
arena_run_t *run = &arena_miscelm_get(chunk, rpages_ind)->run;
|
||||
arena_bin_t *bin = &arena->bins[run->binind];
|
||||
malloc_mutex_lock(&bin->lock);
|
||||
/* runs that are in the same chunk in as the current chunk, are likely to be the next currun */
|
||||
if (chunk != (arena_chunk_t *)CHUNK_ADDR2BASE(bin->runcur)) {
|
||||
arena_bin_info_t *bin_info = &arena_bin_info[run->binind];
|
||||
size_t availregs = bin_info->nregs * bin->stats.curruns;
|
||||
*bin_util = (bin->stats.curregs<<16) / availregs;
|
||||
*run_util = ((bin_info->nregs - run->nfree)<<16) / bin_info->nregs;
|
||||
defrag = 1;
|
||||
}
|
||||
malloc_mutex_unlock(&bin->lock);
|
||||
}
|
||||
}
|
||||
return defrag;
|
||||
}
|
||||
|
20
redis.conf
20
redis.conf
@ -1228,3 +1228,23 @@ aof-rewrite-incremental-fsync yes
|
||||
#
|
||||
# lfu-log-factor 10
|
||||
# lfu-decay-time 1
|
||||
|
||||
########################### ACTIVE DEFRAGMENTATION #######################
|
||||
|
||||
# enabled active defragmentation
|
||||
# activedefrag yes
|
||||
|
||||
# minimum amount of fragmentation waste to start active defrag
|
||||
# active-defrag-ignore-bytes 100mb
|
||||
|
||||
# minimum percentage of fragmentation to start active defrag
|
||||
# active-defrag-threshold-lower 10
|
||||
|
||||
# maximum percentage of fragmentation at which we use maximum effort
|
||||
# active-defrag-threshold-upper 100
|
||||
|
||||
# minimal effort for defrag in CPU percentage
|
||||
# active-defrag-cycle-min 25
|
||||
|
||||
# maximal effort for defrag in CPU percentage
|
||||
# active-defrag-cycle-max 75
|
||||
|
@ -128,7 +128,7 @@ endif
|
||||
|
||||
REDIS_SERVER_NAME=redis-server
|
||||
REDIS_SENTINEL_NAME=redis-sentinel
|
||||
REDIS_SERVER_OBJ=adlist.o quicklist.o ae.o anet.o dict.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o crc16.o endianconv.o slowlog.o scripting.o bio.o rio.o rand.o memtest.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o redis-check-rdb.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o
|
||||
REDIS_SERVER_OBJ=adlist.o quicklist.o ae.o anet.o dict.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o crc16.o endianconv.o slowlog.o scripting.o bio.o rio.o rand.o memtest.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o redis-check-rdb.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o
|
||||
REDIS_CLI_NAME=redis-cli
|
||||
REDIS_CLI_OBJ=anet.o adlist.o redis-cli.o zmalloc.o release.o anet.o ae.o crc64.o
|
||||
REDIS_BENCHMARK_NAME=redis-benchmark
|
||||
|
58
src/config.c
58
src/config.c
@ -423,6 +423,10 @@ void loadServerConfigFromString(char *config) {
|
||||
if ((server.repl_slave_lazy_flush = yesnotoi(argv[1])) == -1) {
|
||||
err = "argument must be 'yes' or 'no'"; goto loaderr;
|
||||
}
|
||||
} else if (!strcasecmp(argv[0],"activedefrag") && argc == 2) {
|
||||
if ((server.active_defrag_enabled = yesnotoi(argv[1])) == -1) {
|
||||
err = "argument must be 'yes' or 'no'"; goto loaderr;
|
||||
}
|
||||
} else if (!strcasecmp(argv[0],"daemonize") && argc == 2) {
|
||||
if ((server.daemonize = yesnotoi(argv[1])) == -1) {
|
||||
err = "argument must be 'yes' or 'no'"; goto loaderr;
|
||||
@ -499,6 +503,36 @@ void loadServerConfigFromString(char *config) {
|
||||
}
|
||||
zfree(server.rdb_filename);
|
||||
server.rdb_filename = zstrdup(argv[1]);
|
||||
} else if (!strcasecmp(argv[0],"active-defrag-threshold-lower") && argc == 2) {
|
||||
server.active_defrag_threshold_lower = atoi(argv[1]);
|
||||
if (server.active_defrag_threshold_lower < 0) {
|
||||
err = "active-defrag-threshold-lower must be 0 or greater";
|
||||
goto loaderr;
|
||||
}
|
||||
} else if (!strcasecmp(argv[0],"active-defrag-threshold-upper") && argc == 2) {
|
||||
server.active_defrag_threshold_upper = atoi(argv[1]);
|
||||
if (server.active_defrag_threshold_upper < 0) {
|
||||
err = "active-defrag-threshold-upper must be 0 or greater";
|
||||
goto loaderr;
|
||||
}
|
||||
} else if (!strcasecmp(argv[0],"active-defrag-ignore-bytes") && argc == 2) {
|
||||
server.active_defrag_ignore_bytes = memtoll(argv[1], NULL);
|
||||
if (server.active_defrag_ignore_bytes <= 0) {
|
||||
err = "active-defrag-ignore-bytes must above 0";
|
||||
goto loaderr;
|
||||
}
|
||||
} else if (!strcasecmp(argv[0],"active-defrag-cycle-min") && argc == 2) {
|
||||
server.active_defrag_cycle_min = atoi(argv[1]);
|
||||
if (server.active_defrag_cycle_min < 1 || server.active_defrag_cycle_min > 99) {
|
||||
err = "active-defrag-cycle-min must be between 1 and 99";
|
||||
goto loaderr;
|
||||
}
|
||||
} else if (!strcasecmp(argv[0],"active-defrag-cycle-max") && argc == 2) {
|
||||
server.active_defrag_cycle_max = atoi(argv[1]);
|
||||
if (server.active_defrag_cycle_max < 1 || server.active_defrag_cycle_max > 99) {
|
||||
err = "active-defrag-cycle-max must be between 1 and 99";
|
||||
goto loaderr;
|
||||
}
|
||||
} else if (!strcasecmp(argv[0],"hash-max-ziplist-entries") && argc == 2) {
|
||||
server.hash_max_ziplist_entries = memtoll(argv[1], NULL);
|
||||
} else if (!strcasecmp(argv[0],"hash-max-ziplist-value") && argc == 2) {
|
||||
@ -971,6 +1005,8 @@ void configSetCommand(client *c) {
|
||||
"slave-read-only",server.repl_slave_ro) {
|
||||
} config_set_bool_field(
|
||||
"activerehashing",server.activerehashing) {
|
||||
} config_set_bool_field(
|
||||
"activedefrag",server.active_defrag_enabled) {
|
||||
} config_set_bool_field(
|
||||
"protected-mode",server.protected_mode) {
|
||||
} config_set_bool_field(
|
||||
@ -998,6 +1034,16 @@ void configSetCommand(client *c) {
|
||||
"lfu-decay-time",server.lfu_decay_time,0,LLONG_MAX) {
|
||||
} config_set_numerical_field(
|
||||
"timeout",server.maxidletime,0,LONG_MAX) {
|
||||
} config_set_numerical_field(
|
||||
"active-defrag-threshold-lower",server.active_defrag_threshold_lower,0,1000) {
|
||||
} config_set_numerical_field(
|
||||
"active-defrag-threshold-upper",server.active_defrag_threshold_upper,0,1000) {
|
||||
} config_set_memory_field(
|
||||
"active-defrag-ignore-bytes",server.active_defrag_ignore_bytes) {
|
||||
} config_set_numerical_field(
|
||||
"active-defrag-cycle-min",server.active_defrag_cycle_min,1,99) {
|
||||
} config_set_numerical_field(
|
||||
"active-defrag-cycle-max",server.active_defrag_cycle_max,1,99) {
|
||||
} config_set_numerical_field(
|
||||
"auto-aof-rewrite-percentage",server.aof_rewrite_perc,0,LLONG_MAX){
|
||||
} config_set_numerical_field(
|
||||
@ -1166,6 +1212,11 @@ void configGetCommand(client *c) {
|
||||
config_get_numerical_field("maxmemory",server.maxmemory);
|
||||
config_get_numerical_field("maxmemory-samples",server.maxmemory_samples);
|
||||
config_get_numerical_field("timeout",server.maxidletime);
|
||||
config_get_numerical_field("active-defrag-threshold-lower",server.active_defrag_threshold_lower);
|
||||
config_get_numerical_field("active-defrag-threshold-upper",server.active_defrag_threshold_upper);
|
||||
config_get_numerical_field("active-defrag-ignore-bytes",server.active_defrag_ignore_bytes);
|
||||
config_get_numerical_field("active-defrag-cycle-min",server.active_defrag_cycle_min);
|
||||
config_get_numerical_field("active-defrag-cycle-max",server.active_defrag_cycle_max);
|
||||
config_get_numerical_field("auto-aof-rewrite-percentage",
|
||||
server.aof_rewrite_perc);
|
||||
config_get_numerical_field("auto-aof-rewrite-min-size",
|
||||
@ -1230,6 +1281,7 @@ void configGetCommand(client *c) {
|
||||
config_get_bool_field("rdbcompression", server.rdb_compression);
|
||||
config_get_bool_field("rdbchecksum", server.rdb_checksum);
|
||||
config_get_bool_field("activerehashing", server.activerehashing);
|
||||
config_get_bool_field("activedefrag", server.active_defrag_enabled);
|
||||
config_get_bool_field("protected-mode", server.protected_mode);
|
||||
config_get_bool_field("repl-disable-tcp-nodelay",
|
||||
server.repl_disable_tcp_nodelay);
|
||||
@ -1930,6 +1982,11 @@ int rewriteConfig(char *path) {
|
||||
rewriteConfigBytesOption(state,"maxmemory",server.maxmemory,CONFIG_DEFAULT_MAXMEMORY);
|
||||
rewriteConfigEnumOption(state,"maxmemory-policy",server.maxmemory_policy,maxmemory_policy_enum,CONFIG_DEFAULT_MAXMEMORY_POLICY);
|
||||
rewriteConfigNumericalOption(state,"maxmemory-samples",server.maxmemory_samples,CONFIG_DEFAULT_MAXMEMORY_SAMPLES);
|
||||
rewriteConfigNumericalOption(state,"active-defrag-threshold-lower",server.active_defrag_threshold_lower,CONFIG_DEFAULT_DEFRAG_THRESHOLD_LOWER);
|
||||
rewriteConfigNumericalOption(state,"active-defrag-threshold-upper",server.active_defrag_threshold_upper,CONFIG_DEFAULT_DEFRAG_THRESHOLD_UPPER);
|
||||
rewriteConfigBytesOption(state,"active-defrag-ignore-bytes",server.active_defrag_ignore_bytes,CONFIG_DEFAULT_DEFRAG_IGNORE_BYTES);
|
||||
rewriteConfigNumericalOption(state,"active-defrag-cycle-min",server.active_defrag_cycle_min,CONFIG_DEFAULT_DEFRAG_CYCLE_MIN);
|
||||
rewriteConfigNumericalOption(state,"active-defrag-cycle-max",server.active_defrag_cycle_max,CONFIG_DEFAULT_DEFRAG_CYCLE_MAX);
|
||||
rewriteConfigYesNoOption(state,"appendonly",server.aof_state != AOF_OFF,0);
|
||||
rewriteConfigStringOption(state,"appendfilename",server.aof_filename,CONFIG_DEFAULT_AOF_FILENAME);
|
||||
rewriteConfigEnumOption(state,"appendfsync",server.aof_fsync,aof_fsync_enum,CONFIG_DEFAULT_AOF_FSYNC);
|
||||
@ -1956,6 +2013,7 @@ int rewriteConfig(char *path) {
|
||||
rewriteConfigNumericalOption(state,"zset-max-ziplist-value",server.zset_max_ziplist_value,OBJ_ZSET_MAX_ZIPLIST_VALUE);
|
||||
rewriteConfigNumericalOption(state,"hll-sparse-max-bytes",server.hll_sparse_max_bytes,CONFIG_DEFAULT_HLL_SPARSE_MAX_BYTES);
|
||||
rewriteConfigYesNoOption(state,"activerehashing",server.activerehashing,CONFIG_DEFAULT_ACTIVE_REHASHING);
|
||||
rewriteConfigYesNoOption(state,"activedefrag",server.active_defrag_enabled,CONFIG_DEFAULT_ACTIVE_DEFRAG);
|
||||
rewriteConfigYesNoOption(state,"protected-mode",server.protected_mode,CONFIG_DEFAULT_PROTECTED_MODE);
|
||||
rewriteConfigClientoutputbufferlimitOption(state);
|
||||
rewriteConfigNumericalOption(state,"hz",server.hz,CONFIG_DEFAULT_HZ);
|
||||
|
2
src/db.c
2
src/db.c
@ -665,7 +665,7 @@ void scanGenericCommand(client *c, robj *o, unsigned long cursor) {
|
||||
privdata[0] = keys;
|
||||
privdata[1] = o;
|
||||
do {
|
||||
cursor = dictScan(ht, cursor, scanCallback, privdata);
|
||||
cursor = dictScan(ht, cursor, scanCallback, NULL, privdata);
|
||||
} while (cursor &&
|
||||
maxiterations-- &&
|
||||
listLength(keys) < (unsigned long)count);
|
||||
|
15
src/debug.c
15
src/debug.c
@ -282,7 +282,7 @@ void debugCommand(client *c) {
|
||||
blen++; addReplyStatus(c,
|
||||
"ziplist <key> -- Show low level info about the ziplist encoding.");
|
||||
blen++; addReplyStatus(c,
|
||||
"populate <count> [prefix] -- Create <count> string keys named key:<num>. If a prefix is specified is used instead of the 'key' prefix.");
|
||||
"populate <count> [prefix] [size] -- Create <count> string keys named key:<num>. If a prefix is specified is used instead of the 'key' prefix.");
|
||||
blen++; addReplyStatus(c,
|
||||
"digest -- Outputs an hex signature representing the current DB content.");
|
||||
blen++; addReplyStatus(c,
|
||||
@ -433,7 +433,7 @@ void debugCommand(client *c) {
|
||||
addReplyStatus(c,"Ziplist structure printed on stdout");
|
||||
}
|
||||
} else if (!strcasecmp(c->argv[1]->ptr,"populate") &&
|
||||
(c->argc == 3 || c->argc == 4)) {
|
||||
c->argc >= 3 && c->argc <= 5) {
|
||||
long keys, j;
|
||||
robj *key, *val;
|
||||
char buf[128];
|
||||
@ -442,15 +442,24 @@ void debugCommand(client *c) {
|
||||
return;
|
||||
dictExpand(c->db->dict,keys);
|
||||
for (j = 0; j < keys; j++) {
|
||||
long valsize = 0;
|
||||
snprintf(buf,sizeof(buf),"%s:%lu",
|
||||
(c->argc == 3) ? "key" : (char*)c->argv[3]->ptr, j);
|
||||
key = createStringObject(buf,strlen(buf));
|
||||
if (c->argc == 5)
|
||||
if (getLongFromObjectOrReply(c, c->argv[4], &valsize, NULL) != C_OK)
|
||||
return;
|
||||
if (lookupKeyWrite(c->db,key) != NULL) {
|
||||
decrRefCount(key);
|
||||
continue;
|
||||
}
|
||||
snprintf(buf,sizeof(buf),"value:%lu",j);
|
||||
val = createStringObject(buf,strlen(buf));
|
||||
if (valsize==0)
|
||||
val = createStringObject(buf,strlen(buf));
|
||||
else {
|
||||
val = createStringObject(NULL,valsize);
|
||||
memset(val->ptr, 0, valsize);
|
||||
}
|
||||
dbAdd(c->db,key,val);
|
||||
signalModifiedKey(c->db,key);
|
||||
decrRefCount(key);
|
||||
|
527
src/defrag.c
Normal file
527
src/defrag.c
Normal file
@ -0,0 +1,527 @@
|
||||
/*
|
||||
* Active memory defragmentation
|
||||
* Try to find key / value allocations that need to be re-allocated in order
|
||||
* to reduce external fragmentation.
|
||||
* We do that by scanning the keyspace and for each pointer we have, we can try to
|
||||
* ask the allocator if moving it to a new address will help reduce fragmentation.
|
||||
*
|
||||
* Copyright (c) 2017, Oran Agra
|
||||
* Copyright (c) 2017, Redis Labs, Inc
|
||||
* All rights reserved.
|
||||
*
|
||||
* Redistribution and use in source and binary forms, with or without
|
||||
* modification, are permitted provided that the following conditions are met:
|
||||
*
|
||||
* * Redistributions of source code must retain the above copyright notice,
|
||||
* this list of conditions and the following disclaimer.
|
||||
* * Redistributions in binary form must reproduce the above copyright
|
||||
* notice, this list of conditions and the following disclaimer in the
|
||||
* documentation and/or other materials provided with the distribution.
|
||||
* * Neither the name of Redis nor the names of its contributors may be used
|
||||
* to endorse or promote products derived from this software without
|
||||
* specific prior written permission.
|
||||
*
|
||||
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
|
||||
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
|
||||
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
|
||||
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
|
||||
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
|
||||
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
|
||||
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
|
||||
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
|
||||
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
|
||||
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
|
||||
* POSSIBILITY OF SUCH DAMAGE.
|
||||
*/
|
||||
|
||||
#include "server.h"
|
||||
#include <time.h>
|
||||
#include <assert.h>
|
||||
#include <stddef.h>
|
||||
|
||||
#if defined(USE_JEMALLOC) && defined(MALLOCX_TCACHE_NONE)
|
||||
|
||||
/* this method was added to jemalloc in order to help us understand which
|
||||
* pointers are worthwhile moving and which aren't */
|
||||
int je_get_defrag_hint(void* ptr, int *bin_util, int *run_util);
|
||||
|
||||
/* Defrag helper for generic allocations.
|
||||
*
|
||||
* returns NULL in case the allocatoin wasn't moved.
|
||||
* when it returns a non-null value, the old pointer was already released
|
||||
* and should NOT be accessed. */
|
||||
void* activeDefragAlloc(void *ptr) {
|
||||
int bin_util, run_util;
|
||||
size_t size;
|
||||
void *newptr;
|
||||
if(!je_get_defrag_hint(ptr, &bin_util, &run_util)) {
|
||||
server.stat_active_defrag_misses++;
|
||||
return NULL;
|
||||
}
|
||||
/* if this run is more utilized than the average utilization in this bin (or it is full), skip it.
|
||||
* this will eventually move all the allocations from relatively empty runs into relatively full runs. */
|
||||
if (run_util > bin_util || run_util == 1<<16) {
|
||||
server.stat_active_defrag_misses++;
|
||||
return NULL;
|
||||
}
|
||||
/* move this allocation to a new allocation.
|
||||
* make sure not to use the thread cache. so that we don't get back the same pointers we try to free */
|
||||
size = zmalloc_size(ptr);
|
||||
newptr = zmalloc_no_tcache(size);
|
||||
memcpy(newptr, ptr, size);
|
||||
zfree_no_tcache(ptr);
|
||||
return newptr;
|
||||
}
|
||||
|
||||
/*Defrag helper for sds strings
|
||||
*
|
||||
* returns NULL in case the allocatoin wasn't moved.
|
||||
* when it returns a non-null value, the old pointer was already released
|
||||
* and should NOT be accessed. */
|
||||
sds activeDefragSds(sds sdsptr) {
|
||||
void* ptr = sdsAllocPtr(sdsptr);
|
||||
void* newptr = activeDefragAlloc(ptr);
|
||||
if (newptr) {
|
||||
size_t offset = sdsptr - (char*)ptr;
|
||||
sdsptr = (char*)newptr + offset;
|
||||
return sdsptr;
|
||||
}
|
||||
return NULL;
|
||||
}
|
||||
|
||||
/* Defrag helper for robj and/or string objects
|
||||
*
|
||||
* returns NULL in case the allocatoin wasn't moved.
|
||||
* when it returns a non-null value, the old pointer was already released
|
||||
* and should NOT be accessed. */
|
||||
robj *activeDefragStringOb(robj* ob) {
|
||||
robj *ret = NULL;
|
||||
if (ob->refcount!=1)
|
||||
return NULL;
|
||||
|
||||
/* try to defrag robj (only if not an EMBSTR type (handled below) */
|
||||
if (ob->type!=OBJ_STRING || ob->encoding!=OBJ_ENCODING_EMBSTR) {
|
||||
if ((ret = activeDefragAlloc(ob)))
|
||||
ob = ret;
|
||||
}
|
||||
|
||||
/* try to defrag string object */
|
||||
if (ob->type == OBJ_STRING) {
|
||||
if(ob->encoding==OBJ_ENCODING_RAW) {
|
||||
sds newsds = activeDefragSds((sds)ob->ptr);
|
||||
if (newsds) {
|
||||
ob->ptr = newsds;
|
||||
/* we don't need to change the return value here.
|
||||
* we can return NULL if 'ret' is still NULL (since the object pointer itself wasn't changed).
|
||||
* but we set return value to ob as an indication that we defragged a pointer (for stats).
|
||||
* NOTE: if ret is already set and the robj was moved, then our stats will be a bit off
|
||||
* since two pointers were moved, but we show only one in the stats */
|
||||
ret = ob;
|
||||
}
|
||||
} else if (ob->encoding==OBJ_ENCODING_EMBSTR) {
|
||||
/* the sds is embedded in the object allocation, calculate the offset and update the pointer in the new allocation */
|
||||
long ofs = (intptr_t)ob->ptr - (intptr_t)ob;
|
||||
if ((ret = activeDefragAlloc(ob))) {
|
||||
ret->ptr = (void*)((intptr_t)ret + ofs);
|
||||
}
|
||||
} else if (ob->encoding!=OBJ_ENCODING_INT) {
|
||||
serverPanic("Unknown string encoding");
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
/* Defrag helper for dictEntries to be used during dict iteration (called on each step).
|
||||
* returns a stat of how many pointers were moved. */
|
||||
int dictIterDefragEntry(dictIterator *iter) {
|
||||
/* This function is a little bit dirty since it messes with the internals of the dict and it's iterator,
|
||||
* but the benefit is that it is very easy to use, and require no other chagnes in the dict. */
|
||||
int defragged = 0;
|
||||
dictht *ht;
|
||||
/* handle the next entry (if there is one), and update the pointer in the current entry. */
|
||||
if (iter->nextEntry) {
|
||||
dictEntry *newde = activeDefragAlloc(iter->nextEntry);
|
||||
if (newde) {
|
||||
defragged++;
|
||||
iter->nextEntry = newde;
|
||||
iter->entry->next = newde;
|
||||
}
|
||||
}
|
||||
/* handle the case of the first entry in the hash bucket. */
|
||||
ht = &iter->d->ht[iter->table];
|
||||
if (ht->table[iter->index] == iter->entry) {
|
||||
dictEntry *newde = activeDefragAlloc(iter->entry);
|
||||
if (newde) {
|
||||
iter->entry = newde;
|
||||
ht->table[iter->index] = newde;
|
||||
defragged++;
|
||||
}
|
||||
}
|
||||
return defragged;
|
||||
}
|
||||
|
||||
/* Defrag helper for dict main allocations (dict struct, and hash tables).
|
||||
* receives a pointer to the dict* and implicitly updates it when the dict struct itself was moved.
|
||||
* returns a stat of how many pointers were moved. */
|
||||
int dictDefragTables(dict** dictRef) {
|
||||
dict *d = *dictRef;
|
||||
dictEntry **newtable;
|
||||
int defragged = 0;
|
||||
/* handle the dict struct */
|
||||
dict *newd = activeDefragAlloc(d);
|
||||
if (newd)
|
||||
defragged++, *dictRef = d = newd;
|
||||
/* handle the first hash table */
|
||||
newtable = activeDefragAlloc(d->ht[0].table);
|
||||
if (newtable)
|
||||
defragged++, d->ht[0].table = newtable;
|
||||
/* handle the second hash table */
|
||||
if (d->ht[1].table) {
|
||||
newtable = activeDefragAlloc(d->ht[1].table);
|
||||
if (newtable)
|
||||
defragged++, d->ht[1].table = newtable;
|
||||
}
|
||||
return defragged;
|
||||
}
|
||||
|
||||
/* Internal function used by zslDefrag */
|
||||
void zslUpdateNode(zskiplist *zsl, zskiplistNode *oldnode, zskiplistNode *newnode, zskiplistNode **update) {
|
||||
int i;
|
||||
for (i = 0; i < zsl->level; i++) {
|
||||
if (update[i]->level[i].forward == oldnode)
|
||||
update[i]->level[i].forward = newnode;
|
||||
}
|
||||
if (zsl->header==oldnode)
|
||||
zsl->header = newnode;
|
||||
if (zsl->tail==oldnode)
|
||||
zsl->tail = newnode;
|
||||
if (newnode->level[0].forward) {
|
||||
serverAssert(newnode->level[0].forward->backward==oldnode);
|
||||
newnode->level[0].forward->backward = newnode;
|
||||
}
|
||||
}
|
||||
|
||||
/* Defrag helper for sorted set.
|
||||
* Update the robj pointer, defrag the struct and return the new score reference.
|
||||
* we may not access oldele pointer (not even the pointer stored in the skiplist), as it was already freed.
|
||||
* newele may be null, in which case we only need to defrag the skiplist, but not update the obj pointer.
|
||||
* when return value is non-NULL, it is the score reference that must be updated in the dict record. */
|
||||
double *zslDefrag(zskiplist *zsl, double score, sds oldele, sds newele) {
|
||||
zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x, *newx;
|
||||
int i;
|
||||
sds ele = newele? newele: oldele;
|
||||
|
||||
/* find the skiplist node referring to the object that was moved,
|
||||
* and all pointers that need to be updated if we'll end up moving the skiplist node. */
|
||||
x = zsl->header;
|
||||
for (i = zsl->level-1; i >= 0; i--) {
|
||||
while (x->level[i].forward &&
|
||||
x->level[i].forward->ele != oldele && /* make sure not to access the ->obj pointer if it matches oldele */
|
||||
(x->level[i].forward->score < score ||
|
||||
(x->level[i].forward->score == score &&
|
||||
sdscmp(x->level[i].forward->ele,ele) < 0)))
|
||||
x = x->level[i].forward;
|
||||
update[i] = x;
|
||||
}
|
||||
|
||||
/* update the robj pointer inside the skip list record. */
|
||||
x = x->level[0].forward;
|
||||
serverAssert(x && score == x->score && x->ele==oldele);
|
||||
if (newele)
|
||||
x->ele = newele;
|
||||
|
||||
/* try to defrag the skiplist record itself */
|
||||
newx = activeDefragAlloc(x);
|
||||
if (newx) {
|
||||
zslUpdateNode(zsl, x, newx, update);
|
||||
return &newx->score;
|
||||
}
|
||||
return NULL;
|
||||
}
|
||||
|
||||
/* for each key we scan in the main dict, this function will attempt to defrag all the various pointers it has.
|
||||
* returns a stat of how many pointers were moved. */
|
||||
int defargKey(redisDb *db, dictEntry *de) {
|
||||
sds keysds = dictGetKey(de);
|
||||
robj *newob, *ob;
|
||||
unsigned char *newzl;
|
||||
dict *d;
|
||||
dictIterator *di;
|
||||
int defragged = 0;
|
||||
sds newsds;
|
||||
|
||||
/* try to defrag the key name */
|
||||
newsds = activeDefragSds(keysds);
|
||||
if (newsds) {
|
||||
de->key = newsds;
|
||||
if (dictSize(db->expires)) {
|
||||
/* Dirty code:
|
||||
* i can't search in db->expires for that key after i already released the pointer it holds
|
||||
* it won't be able to do the string compare */
|
||||
unsigned int hash = dictGetHash(db->dict, newsds);
|
||||
dictReplaceKeyPtr(db->expires, keysds, newsds, hash);
|
||||
}
|
||||
defragged++;
|
||||
}
|
||||
|
||||
/* try to defrag robj and / or string value */
|
||||
ob = dictGetVal(de);
|
||||
if ((newob = activeDefragStringOb(ob))) {
|
||||
de->v.val = newob;
|
||||
ob = newob;
|
||||
defragged++;
|
||||
}
|
||||
|
||||
if (ob->type == OBJ_STRING) {
|
||||
/* already handled in activeDefragStringOb */
|
||||
} else if (ob->type == OBJ_LIST) {
|
||||
if (ob->encoding == OBJ_ENCODING_QUICKLIST) {
|
||||
quicklist *ql = ob->ptr, *newql;
|
||||
quicklistNode *node = ql->head, *newnode;
|
||||
if ((newql = activeDefragAlloc(ql)))
|
||||
defragged++, ob->ptr = ql = newql;
|
||||
do {
|
||||
if ((newnode = activeDefragAlloc(node))) {
|
||||
if (newnode->prev)
|
||||
newnode->prev->next = newnode;
|
||||
else
|
||||
ql->head = newnode;
|
||||
if (newnode->next)
|
||||
newnode->next->prev = newnode;
|
||||
else
|
||||
ql->tail = newnode;
|
||||
node = newnode;
|
||||
defragged++;
|
||||
}
|
||||
if ((newzl = activeDefragAlloc(node->zl)))
|
||||
defragged++, node->zl = newzl;
|
||||
} while ((node = node->next));
|
||||
} else if (ob->encoding == OBJ_ENCODING_ZIPLIST) {
|
||||
if ((newzl = activeDefragAlloc(ob->ptr)))
|
||||
defragged++, ob->ptr = newzl;
|
||||
} else {
|
||||
serverPanic("Unknown list encoding");
|
||||
}
|
||||
} else if (ob->type == OBJ_SET) {
|
||||
if (ob->encoding == OBJ_ENCODING_HT) {
|
||||
d = ob->ptr;
|
||||
di = dictGetIterator(d);
|
||||
while((de = dictNext(di)) != NULL) {
|
||||
sds sdsele = dictGetKey(de);
|
||||
if ((newsds = activeDefragSds(sdsele)))
|
||||
defragged++, de->key = newsds;
|
||||
defragged += dictIterDefragEntry(di);
|
||||
}
|
||||
dictReleaseIterator(di);
|
||||
dictDefragTables((dict**)&ob->ptr);
|
||||
} else if (ob->encoding == OBJ_ENCODING_INTSET) {
|
||||
intset *is = ob->ptr;
|
||||
intset *newis = activeDefragAlloc(is);
|
||||
if (newis)
|
||||
defragged++, ob->ptr = newis;
|
||||
} else {
|
||||
serverPanic("Unknown set encoding");
|
||||
}
|
||||
} else if (ob->type == OBJ_ZSET) {
|
||||
if (ob->encoding == OBJ_ENCODING_ZIPLIST) {
|
||||
if ((newzl = activeDefragAlloc(ob->ptr)))
|
||||
defragged++, ob->ptr = newzl;
|
||||
} else if (ob->encoding == OBJ_ENCODING_SKIPLIST) {
|
||||
zset *zs = (zset*)ob->ptr;
|
||||
zset *newzs = activeDefragAlloc(zs);
|
||||
zskiplist *newzsl;
|
||||
if (newzs)
|
||||
defragged++, ob->ptr = zs = newzs;
|
||||
newzsl = activeDefragAlloc(zs->zsl);
|
||||
if (newzsl)
|
||||
defragged++, zs->zsl = newzsl;
|
||||
d = zs->dict;
|
||||
di = dictGetIterator(d);
|
||||
while((de = dictNext(di)) != NULL) {
|
||||
double* newscore;
|
||||
sds sdsele = dictGetKey(de);
|
||||
if ((newsds = activeDefragSds(sdsele)))
|
||||
defragged++, de->key = newsds;
|
||||
newscore = zslDefrag(zs->zsl, *(double*)dictGetVal(de), sdsele, newsds);
|
||||
if (newscore) {
|
||||
dictSetVal(d, de, newscore);
|
||||
defragged++;
|
||||
}
|
||||
defragged += dictIterDefragEntry(di);
|
||||
}
|
||||
dictReleaseIterator(di);
|
||||
dictDefragTables(&zs->dict);
|
||||
} else {
|
||||
serverPanic("Unknown sorted set encoding");
|
||||
}
|
||||
} else if (ob->type == OBJ_HASH) {
|
||||
if (ob->encoding == OBJ_ENCODING_ZIPLIST) {
|
||||
if ((newzl = activeDefragAlloc(ob->ptr)))
|
||||
defragged++, ob->ptr = newzl;
|
||||
} else if (ob->encoding == OBJ_ENCODING_HT) {
|
||||
d = ob->ptr;
|
||||
di = dictGetIterator(d);
|
||||
while((de = dictNext(di)) != NULL) {
|
||||
sds sdsele = dictGetKey(de);
|
||||
if ((newsds = activeDefragSds(sdsele)))
|
||||
defragged++, de->key = newsds;
|
||||
sdsele = dictGetVal(de);
|
||||
if ((newsds = activeDefragSds(sdsele)))
|
||||
defragged++, de->v.val = newsds;
|
||||
defragged += dictIterDefragEntry(di);
|
||||
}
|
||||
dictReleaseIterator(di);
|
||||
dictDefragTables((dict**)&ob->ptr);
|
||||
} else {
|
||||
serverPanic("Unknown hash encoding");
|
||||
}
|
||||
} else {
|
||||
serverPanic("Unknown object type");
|
||||
}
|
||||
return defragged;
|
||||
}
|
||||
|
||||
/* defrag scan callback for the main db dictionary */
|
||||
void defragScanCallback(void *privdata, const dictEntry *de) {
|
||||
/* TODO: defrag the dictEntry (and also the entriy in expire dict). */
|
||||
int defragged = defargKey((redisDb*)privdata, (dictEntry*)de);
|
||||
server.stat_active_defrag_hits += defragged;
|
||||
if(defragged)
|
||||
server.stat_active_defrag_key_hits++;
|
||||
else
|
||||
server.stat_active_defrag_key_misses++;
|
||||
}
|
||||
|
||||
/* defrag scan callback for for each hash table bicket,
|
||||
* used in order to defrag the dictEntry allocations */
|
||||
void defragDictBucketCallback(void *privdata, dictEntry **bucketref) {
|
||||
UNUSED(privdata);
|
||||
while(*bucketref) {
|
||||
dictEntry *de = *bucketref, *newde;
|
||||
if ((newde = activeDefragAlloc(de))) {
|
||||
*bucketref = newde;
|
||||
}
|
||||
bucketref = &(*bucketref)->next;
|
||||
}
|
||||
}
|
||||
|
||||
/* Utility function to get the fragmentation ratio from jemalloc.
|
||||
* it is critical to do that by comparing only heap maps that belown to jemalloc, and skip ones the jemalloc keeps as spare.
|
||||
* since we use this fragmentation ratio in order to decide if a defrag action should be taken or not,
|
||||
* a false detection can cause the defragmenter to waste a lot of CPU without the possibility of getting any results. */
|
||||
float getAllocatorFragmentation(size_t *out_frag_bytes) {
|
||||
size_t epoch = 1, allocated = 0, resident = 0, active = 0, sz = sizeof(size_t);
|
||||
je_mallctl("epoch", &epoch, &sz, &epoch, sz); /* Update the statistics cached by mallctl. */
|
||||
je_mallctl("stats.resident", &resident, &sz, NULL, 0); /* unlike RSS, this does not include RSS from shared libraries and other non heap mappings */
|
||||
je_mallctl("stats.active", &active, &sz, NULL, 0); /* unlike resident, this doesn't not include the pages jemalloc reserves for re-use (purge will clean that) */
|
||||
je_mallctl("stats.allocated", &allocated, &sz, NULL, 0); /* unlike zmalloc_used_memory, this matches the stats.resident by taking into account all allocations done by this process (not only zmalloc) */
|
||||
float frag_pct = ((float)active / allocated)*100 - 100;
|
||||
size_t frag_bytes = active - allocated;
|
||||
float rss_pct = ((float)resident / allocated)*100 - 100;
|
||||
size_t rss_bytes = resident - allocated;
|
||||
if(out_frag_bytes)
|
||||
*out_frag_bytes = frag_bytes;
|
||||
serverLog(LL_DEBUG,
|
||||
"allocated=%zu, active=%zu, resident=%zu, frag=%.0f%% (%.0f%% rss), frag_bytes=%zu (%zu%% rss)",
|
||||
allocated, active, resident, frag_pct, rss_pct, frag_bytes, rss_bytes);
|
||||
return frag_pct;
|
||||
}
|
||||
|
||||
#define INTERPOLATE(x, x1, x2, y1, y2) ( (y1) + ((x)-(x1)) * ((y2)-(y1)) / ((x2)-(x1)) )
|
||||
#define LIMIT(y, min, max) ((y)<(min)? min: ((y)>(max)? max: (y)))
|
||||
|
||||
/* Perform incremental defragmentation work from the serverCron.
|
||||
* This works in a similar way to activeExpireCycle, in the sense that
|
||||
* we do incremental work across calls. */
|
||||
void activeDefragCycle(void) {
|
||||
static int current_db = -1;
|
||||
static unsigned long cursor = 0;
|
||||
static redisDb *db = NULL;
|
||||
static long long start_scan, start_stat;
|
||||
unsigned int iterations = 0;
|
||||
unsigned long long defragged = server.stat_active_defrag_hits;
|
||||
long long start, timelimit;
|
||||
|
||||
if (server.aof_child_pid!=-1 || server.rdb_child_pid!=-1)
|
||||
return; /* defragging memory while there's a fork will just do damage. */
|
||||
|
||||
/* once a second, check if we the fragmentation justfies starting a scan or making it more aggressive */
|
||||
run_with_period(1000) {
|
||||
size_t frag_bytes;
|
||||
float frag_pct = getAllocatorFragmentation(&frag_bytes);
|
||||
/* if we're not already running, and below the threshold, exit. */
|
||||
if (!server.active_defrag_running) {
|
||||
if(frag_pct < server.active_defrag_threshold_lower || frag_bytes < server.active_defrag_ignore_bytes)
|
||||
return;
|
||||
}
|
||||
|
||||
/* calculate the adaptive aggressiveness of the defrag */
|
||||
int cpu_pct = INTERPOLATE(frag_pct, server.active_defrag_threshold_lower, server.active_defrag_threshold_upper,
|
||||
server.active_defrag_cycle_min, server.active_defrag_cycle_max);
|
||||
cpu_pct = LIMIT(cpu_pct, server.active_defrag_cycle_min, server.active_defrag_cycle_max);
|
||||
/* we allow increasing the aggressiveness during a scan, but don't reduce it */
|
||||
if (!server.active_defrag_running || cpu_pct > server.active_defrag_running) {
|
||||
server.active_defrag_running = cpu_pct;
|
||||
serverLog(LL_VERBOSE,
|
||||
"Starting active defrag, frag=%.0f%%, frag_bytes=%zu, cpu=%d%%",
|
||||
frag_pct, frag_bytes, cpu_pct);
|
||||
}
|
||||
}
|
||||
if (!server.active_defrag_running)
|
||||
return;
|
||||
|
||||
/* See activeExpireCycle for how timelimit is handled. */
|
||||
start = ustime();
|
||||
timelimit = 1000000*server.active_defrag_running/server.hz/100;
|
||||
if (timelimit <= 0) timelimit = 1;
|
||||
|
||||
do {
|
||||
if (!cursor) {
|
||||
/* Move on to next database, and stop if we reached the last one */
|
||||
if (++current_db >= server.dbnum) {
|
||||
long long now = ustime();
|
||||
size_t frag_bytes;
|
||||
float frag_pct = getAllocatorFragmentation(&frag_bytes);
|
||||
serverLog(LL_VERBOSE,
|
||||
"Active defrag done in %dms, reallocated=%d, frag=%.0f%%, frag_bytes=%zu",
|
||||
(int)((now - start_scan)/1000), (int)(server.stat_active_defrag_hits - start_stat), frag_pct, frag_bytes);
|
||||
|
||||
start_scan = now;
|
||||
current_db = -1;
|
||||
cursor = 0;
|
||||
db = NULL;
|
||||
server.active_defrag_running = 0;
|
||||
return;
|
||||
}
|
||||
else if (current_db==0) {
|
||||
/* start a scan from the first database */
|
||||
start_scan = ustime();
|
||||
start_stat = server.stat_active_defrag_hits;
|
||||
}
|
||||
|
||||
db = &server.db[current_db];
|
||||
cursor = 0;
|
||||
}
|
||||
|
||||
do {
|
||||
cursor = dictScan(db->dict, cursor, defragScanCallback, defragDictBucketCallback, db);
|
||||
/* once in 16 scan iterations, or 1000 pointer reallocations (if we have a lot of pointers in one hash bucket),
|
||||
* check if we reached the tiem limit */
|
||||
if (cursor && (++iterations > 16 || server.stat_active_defrag_hits - defragged > 1000)) {
|
||||
if ((ustime() - start) > timelimit) {
|
||||
return;
|
||||
}
|
||||
iterations = 0;
|
||||
defragged = server.stat_active_defrag_hits;
|
||||
}
|
||||
} while(cursor);
|
||||
} while(1);
|
||||
}
|
||||
|
||||
#else /* USE_JEMALLOC */
|
||||
|
||||
void activeDefragCycle(void) {
|
||||
/* not implemented yet*/
|
||||
}
|
||||
|
||||
#endif
|
33
src/dict.c
33
src/dict.c
@ -885,6 +885,7 @@ static unsigned long rev(unsigned long v) {
|
||||
unsigned long dictScan(dict *d,
|
||||
unsigned long v,
|
||||
dictScanFunction *fn,
|
||||
dictScanBucketFunction* bucketfn,
|
||||
void *privdata)
|
||||
{
|
||||
dictht *t0, *t1;
|
||||
@ -898,6 +899,7 @@ unsigned long dictScan(dict *d,
|
||||
m0 = t0->sizemask;
|
||||
|
||||
/* Emit entries at cursor */
|
||||
if (bucketfn) bucketfn(privdata, &t0->table[v & m0]);
|
||||
de = t0->table[v & m0];
|
||||
while (de) {
|
||||
next = de->next;
|
||||
@ -919,6 +921,7 @@ unsigned long dictScan(dict *d,
|
||||
m1 = t1->sizemask;
|
||||
|
||||
/* Emit entries at cursor */
|
||||
if (bucketfn) bucketfn(privdata, &t0->table[v & m0]);
|
||||
de = t0->table[v & m0];
|
||||
while (de) {
|
||||
next = de->next;
|
||||
@ -930,6 +933,7 @@ unsigned long dictScan(dict *d,
|
||||
* of the index pointed to by the cursor in the smaller table */
|
||||
do {
|
||||
/* Emit entries at cursor */
|
||||
if (bucketfn) bucketfn(privdata, &t1->table[v & m1]);
|
||||
de = t1->table[v & m1];
|
||||
while (de) {
|
||||
next = de->next;
|
||||
@ -1040,6 +1044,35 @@ void dictDisableResize(void) {
|
||||
dict_can_resize = 0;
|
||||
}
|
||||
|
||||
unsigned int dictGetHash(dict *d, const void *key) {
|
||||
return dictHashKey(d, key);
|
||||
}
|
||||
|
||||
/* Replace an old key pointer in the dictionary with a new pointer.
|
||||
* oldkey is a dead pointer and should not be accessed.
|
||||
* the hash value should be provided using dictGetHash.
|
||||
* no string / key comparison is performed.
|
||||
* return value is the dictEntry if found, or NULL if not found. */
|
||||
dictEntry *dictReplaceKeyPtr(dict *d, const void *oldptr, void *newptr, unsigned int hash) {
|
||||
dictEntry *he;
|
||||
unsigned int idx, table;
|
||||
|
||||
if (d->ht[0].used + d->ht[1].used == 0) return NULL; /* dict is empty */
|
||||
for (table = 0; table <= 1; table++) {
|
||||
idx = hash & d->ht[table].sizemask;
|
||||
he = d->ht[table].table[idx];
|
||||
while(he) {
|
||||
if (oldptr==he->key) {
|
||||
he->key = newptr;
|
||||
return he;
|
||||
}
|
||||
he = he->next;
|
||||
}
|
||||
if (!dictIsRehashing(d)) return NULL;
|
||||
}
|
||||
return NULL;
|
||||
}
|
||||
|
||||
/* ------------------------------- Debugging ---------------------------------*/
|
||||
|
||||
#define DICT_STATS_VECTLEN 50
|
||||
|
@ -95,6 +95,7 @@ typedef struct dictIterator {
|
||||
} dictIterator;
|
||||
|
||||
typedef void (dictScanFunction)(void *privdata, const dictEntry *de);
|
||||
typedef void (dictScanBucketFunction)(void *privdata, dictEntry **bucketref);
|
||||
|
||||
/* This is the initial size of every hash table */
|
||||
#define DICT_HT_INITIAL_SIZE 4
|
||||
@ -176,7 +177,9 @@ int dictRehash(dict *d, int n);
|
||||
int dictRehashMilliseconds(dict *d, int ms);
|
||||
void dictSetHashFunctionSeed(unsigned int initval);
|
||||
unsigned int dictGetHashFunctionSeed(void);
|
||||
unsigned long dictScan(dict *d, unsigned long v, dictScanFunction *fn, void *privdata);
|
||||
unsigned long dictScan(dict *d, unsigned long v, dictScanFunction *fn, dictScanBucketFunction *bucketfn, void *privdata);
|
||||
unsigned int dictGetHash(dict *d, const void *key);
|
||||
dictEntry *dictReplaceKeyPtr(dict *d, const void *oldptr, void *newptr, unsigned int hash);
|
||||
|
||||
/* Hash table types */
|
||||
extern dictType dictTypeHeapStringCopyKey;
|
||||
|
29
src/server.c
29
src/server.c
@ -876,6 +876,10 @@ void databasesCron(void) {
|
||||
expireSlaveKeys();
|
||||
}
|
||||
|
||||
/* Defrag keys gradually. */
|
||||
if (server.active_defrag_enabled)
|
||||
activeDefragCycle();
|
||||
|
||||
/* Perform hash tables rehashing if needed, but only if there are no
|
||||
* other processes saving the DB on disk. Otherwise rehashing is bad
|
||||
* as will cause a lot of copy-on-write of memory pages. */
|
||||
@ -1332,6 +1336,12 @@ void initServerConfig(void) {
|
||||
server.maxidletime = CONFIG_DEFAULT_CLIENT_TIMEOUT;
|
||||
server.tcpkeepalive = CONFIG_DEFAULT_TCP_KEEPALIVE;
|
||||
server.active_expire_enabled = 1;
|
||||
server.active_defrag_enabled = CONFIG_DEFAULT_ACTIVE_DEFRAG;
|
||||
server.active_defrag_ignore_bytes = CONFIG_DEFAULT_DEFRAG_IGNORE_BYTES;
|
||||
server.active_defrag_threshold_lower = CONFIG_DEFAULT_DEFRAG_THRESHOLD_LOWER;
|
||||
server.active_defrag_threshold_upper = CONFIG_DEFAULT_DEFRAG_THRESHOLD_UPPER;
|
||||
server.active_defrag_cycle_min = CONFIG_DEFAULT_DEFRAG_CYCLE_MIN;
|
||||
server.active_defrag_cycle_max = CONFIG_DEFAULT_DEFRAG_CYCLE_MAX;
|
||||
server.client_max_querybuf_len = PROTO_MAX_QUERYBUF_LEN;
|
||||
server.saveparams = NULL;
|
||||
server.loading = 0;
|
||||
@ -1368,6 +1378,7 @@ void initServerConfig(void) {
|
||||
server.rdb_checksum = CONFIG_DEFAULT_RDB_CHECKSUM;
|
||||
server.stop_writes_on_bgsave_err = CONFIG_DEFAULT_STOP_WRITES_ON_BGSAVE_ERROR;
|
||||
server.activerehashing = CONFIG_DEFAULT_ACTIVE_REHASHING;
|
||||
server.active_defrag_running = 0;
|
||||
server.notify_keyspace_events = 0;
|
||||
server.maxclients = CONFIG_DEFAULT_MAX_CLIENTS;
|
||||
server.bpop_blocked_clients = 0;
|
||||
@ -1718,6 +1729,10 @@ void resetServerStats(void) {
|
||||
server.stat_evictedkeys = 0;
|
||||
server.stat_keyspace_misses = 0;
|
||||
server.stat_keyspace_hits = 0;
|
||||
server.stat_active_defrag_hits = 0;
|
||||
server.stat_active_defrag_misses = 0;
|
||||
server.stat_active_defrag_key_hits = 0;
|
||||
server.stat_active_defrag_key_misses = 0;
|
||||
server.stat_fork_time = 0;
|
||||
server.stat_fork_rate = 0;
|
||||
server.stat_rejected_conn = 0;
|
||||
@ -2873,6 +2888,7 @@ sds genRedisInfoString(char *section) {
|
||||
"maxmemory_policy:%s\r\n"
|
||||
"mem_fragmentation_ratio:%.2f\r\n"
|
||||
"mem_allocator:%s\r\n"
|
||||
"active_defrag_running:%d\r\n"
|
||||
"lazyfree_pending_objects:%zu\r\n",
|
||||
zmalloc_used,
|
||||
hmem,
|
||||
@ -2894,6 +2910,7 @@ sds genRedisInfoString(char *section) {
|
||||
evict_policy,
|
||||
mh->fragmentation,
|
||||
ZMALLOC_LIB,
|
||||
server.active_defrag_running,
|
||||
lazyfreeGetPendingObjectsCount()
|
||||
);
|
||||
freeMemoryOverheadData(mh);
|
||||
@ -3013,7 +3030,11 @@ sds genRedisInfoString(char *section) {
|
||||
"pubsub_patterns:%lu\r\n"
|
||||
"latest_fork_usec:%lld\r\n"
|
||||
"migrate_cached_sockets:%ld\r\n"
|
||||
"slave_expires_tracked_keys:%zu\r\n",
|
||||
"slave_expires_tracked_keys:%zu\r\n"
|
||||
"active_defrag_hits:%lld\r\n"
|
||||
"active_defrag_misses:%lld\r\n"
|
||||
"active_defrag_key_hits:%lld\r\n"
|
||||
"active_defrag_key_misses:%lld\r\n",
|
||||
server.stat_numconnections,
|
||||
server.stat_numcommands,
|
||||
getInstantaneousMetric(STATS_METRIC_COMMAND),
|
||||
@ -3033,7 +3054,11 @@ sds genRedisInfoString(char *section) {
|
||||
listLength(server.pubsub_patterns),
|
||||
server.stat_fork_time,
|
||||
dictSize(server.migrate_cached_sockets),
|
||||
getSlaveKeyWithExpireCount());
|
||||
getSlaveKeyWithExpireCount(),
|
||||
server.stat_active_defrag_hits,
|
||||
server.stat_active_defrag_misses,
|
||||
server.stat_active_defrag_key_hits,
|
||||
server.stat_active_defrag_key_misses);
|
||||
}
|
||||
|
||||
/* Replication */
|
||||
|
18
src/server.h
18
src/server.h
@ -152,6 +152,12 @@ typedef long long mstime_t; /* millisecond time type. */
|
||||
#define CONFIG_DEFAULT_LAZYFREE_LAZY_EXPIRE 0
|
||||
#define CONFIG_DEFAULT_LAZYFREE_LAZY_SERVER_DEL 0
|
||||
#define CONFIG_DEFAULT_ALWAYS_SHOW_LOGO 0
|
||||
#define CONFIG_DEFAULT_ACTIVE_DEFRAG 1
|
||||
#define CONFIG_DEFAULT_DEFRAG_THRESHOLD_LOWER 10 /* don't defrag when fragmentation is below 10% */
|
||||
#define CONFIG_DEFAULT_DEFRAG_THRESHOLD_UPPER 100 /* maximum defrag force at 100% fragmentation */
|
||||
#define CONFIG_DEFAULT_DEFRAG_IGNORE_BYTES (100<<20) /* don't defrag if frag overhead is below 100mb */
|
||||
#define CONFIG_DEFAULT_DEFRAG_CYCLE_MIN 25 /* 25% CPU min (at lower threshold) */
|
||||
#define CONFIG_DEFAULT_DEFRAG_CYCLE_MAX 75 /* 75% CPU max (at upper threshold) */
|
||||
|
||||
#define ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP 20 /* Loopkups per loop. */
|
||||
#define ACTIVE_EXPIRE_CYCLE_FAST_DURATION 1000 /* Microseconds */
|
||||
@ -857,6 +863,7 @@ struct redisServer {
|
||||
unsigned lruclock:LRU_BITS; /* Clock for LRU eviction */
|
||||
int shutdown_asap; /* SHUTDOWN needed ASAP */
|
||||
int activerehashing; /* Incremental rehash in serverCron() */
|
||||
int active_defrag_running; /* Active defragmentation running (holds current scan aggressiveness) */
|
||||
char *requirepass; /* Pass for AUTH command, or NULL */
|
||||
char *pidfile; /* PID file path */
|
||||
int arch_bits; /* 32 or 64 depending on sizeof(long) */
|
||||
@ -908,6 +915,10 @@ struct redisServer {
|
||||
long long stat_evictedkeys; /* Number of evicted keys (maxmemory) */
|
||||
long long stat_keyspace_hits; /* Number of successful lookups of keys */
|
||||
long long stat_keyspace_misses; /* Number of failed lookups of keys */
|
||||
long long stat_active_defrag_hits; /* number of allocations moved */
|
||||
long long stat_active_defrag_misses; /* number of allocations scanned but not moved */
|
||||
long long stat_active_defrag_key_hits; /* number of keys with moved allocations */
|
||||
long long stat_active_defrag_key_misses;/* number of keys scanned and not moved */
|
||||
size_t stat_peak_memory; /* Max used memory record */
|
||||
long long stat_fork_time; /* Time needed to perform latest fork() */
|
||||
double stat_fork_rate; /* Fork rate in GB/sec. */
|
||||
@ -937,6 +948,12 @@ struct redisServer {
|
||||
int maxidletime; /* Client timeout in seconds */
|
||||
int tcpkeepalive; /* Set SO_KEEPALIVE if non-zero. */
|
||||
int active_expire_enabled; /* Can be disabled for testing purposes. */
|
||||
int active_defrag_enabled;
|
||||
size_t active_defrag_ignore_bytes; /* minimum amount of fragmentation waste to start active defrag */
|
||||
int active_defrag_threshold_lower; /* minimum percentage of fragmentation to start active defrag */
|
||||
int active_defrag_threshold_upper; /* maximum percentage of fragmentation at which we use maximum effort */
|
||||
int active_defrag_cycle_min; /* minimal effort for defrag in CPU percentage */
|
||||
int active_defrag_cycle_max; /* maximal effort for defrag in CPU percentage */
|
||||
size_t client_max_querybuf_len; /* Limit for client query buffer length */
|
||||
int dbnum; /* Total number of configured DBs */
|
||||
int supervised; /* 1 if supervised, 0 otherwise. */
|
||||
@ -1576,6 +1593,7 @@ void adjustOpenFilesLimit(void);
|
||||
void closeListeningSockets(int unlink_unix_socket);
|
||||
void updateCachedTime(void);
|
||||
void resetServerStats(void);
|
||||
void activeDefragCycle(void);
|
||||
unsigned int getLRUClock(void);
|
||||
const char *evictPolicyToString(void);
|
||||
struct redisMemOverhead *getMemoryOverheadData(void);
|
||||
|
@ -66,6 +66,8 @@ void zlibc_free(void *ptr) {
|
||||
#define calloc(count,size) je_calloc(count,size)
|
||||
#define realloc(ptr,size) je_realloc(ptr,size)
|
||||
#define free(ptr) je_free(ptr)
|
||||
#define mallocx(size,flags) je_mallocx(size,flags)
|
||||
#define dallocx(ptr,flags) je_dallocx(ptr,flags)
|
||||
#endif
|
||||
|
||||
#define update_zmalloc_stat_alloc(__n) do { \
|
||||
@ -115,6 +117,24 @@ void *zmalloc(size_t size) {
|
||||
#endif
|
||||
}
|
||||
|
||||
/* Allocation and free functions that bypass the thread cache
|
||||
* and go straight to the allocator arena bins.
|
||||
* Currently implemented only for jemalloc */
|
||||
#if defined(USE_JEMALLOC) && defined(MALLOCX_TCACHE_NONE)
|
||||
void *zmalloc_no_tcache(size_t size) {
|
||||
void *ptr = mallocx(size+PREFIX_SIZE, MALLOCX_TCACHE_NONE);
|
||||
if (!ptr) zmalloc_oom_handler(size);
|
||||
update_zmalloc_stat_alloc(zmalloc_size(ptr));
|
||||
return ptr;
|
||||
}
|
||||
|
||||
void zfree_no_tcache(void *ptr) {
|
||||
if (ptr == NULL) return;
|
||||
update_zmalloc_stat_free(zmalloc_size(ptr));
|
||||
dallocx(ptr, MALLOCX_TCACHE_NONE);
|
||||
}
|
||||
#endif
|
||||
|
||||
void *zcalloc(size_t size) {
|
||||
void *ptr = calloc(1, size+PREFIX_SIZE);
|
||||
|
||||
|
@ -69,6 +69,8 @@ void *zmalloc(size_t size);
|
||||
void *zcalloc(size_t size);
|
||||
void *zrealloc(void *ptr, size_t size);
|
||||
void zfree(void *ptr);
|
||||
void zfree_no_tcache(void *ptr);
|
||||
void *zmalloc_no_tcache(size_t size);
|
||||
char *zstrdup(const char *s);
|
||||
size_t zmalloc_used_memory(void);
|
||||
void zmalloc_enable_thread_safeness(void);
|
||||
|
Loading…
Reference in New Issue
Block a user