Merge pull request #5065 from oranagra/defrag_jemalloc5

defrag hint support for jemalloc 5, and active defrag for streams
This commit is contained in:
Salvatore Sanfilippo 2018-06-27 14:05:16 +02:00 committed by GitHub
commit 35c5f3fa7d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 273 additions and 25 deletions

View File

@ -215,4 +215,32 @@ ixalloc(tsdn_t *tsdn, void *ptr, size_t oldsize, size_t size, size_t extra,
return arena_ralloc_no_move(tsdn, ptr, oldsize, size, extra, zero);
}
JEMALLOC_ALWAYS_INLINE int
iget_defrag_hint(tsdn_t *tsdn, void* ptr, int *bin_util, int *run_util) {
int defrag = 0;
rtree_ctx_t rtree_ctx_fallback;
rtree_ctx_t *rtree_ctx = tsdn_rtree_ctx(tsdn, &rtree_ctx_fallback);
szind_t szind;
bool is_slab;
rtree_szind_slab_read(tsdn, &extents_rtree, rtree_ctx, (uintptr_t)ptr, true, &szind, &is_slab);
if (likely(is_slab)) {
/* Small allocation. */
extent_t *slab = iealloc(tsdn, ptr);
arena_t *arena = extent_arena_get(slab);
szind_t binind = extent_szind_get(slab);
bin_t *bin = &arena->bins[binind];
malloc_mutex_lock(tsdn, &bin->lock);
/* don't bother moving allocations from the slab currently used for new allocations */
if (slab != bin->slabcur) {
const bin_info_t *bin_info = &bin_infos[binind];
size_t availregs = bin_info->nregs * bin->stats.curslabs;
*bin_util = (bin->stats.curregs<<16) / availregs;
*run_util = ((bin_info->nregs - extent_nfree_get(slab))<<16) / bin_info->nregs;
defrag = 1;
}
malloc_mutex_unlock(tsdn, &bin->lock);
}
return defrag;
}
#endif /* JEMALLOC_INTERNAL_INLINES_C_H */

View File

@ -120,3 +120,7 @@
# define JEMALLOC_RESTRICT_RETURN
# define JEMALLOC_ALLOCATOR
#endif
/* This version of Jemalloc, modified for Redis, has the je_get_defrag_hint()
* function. */
#define JEMALLOC_FRAG_HINT

View File

@ -3324,3 +3324,14 @@ jemalloc_postfork_child(void) {
}
/******************************************************************************/
/* Helps the application decide if a pointer is worth re-allocating in order to reduce fragmentation.
* returns 0 if the allocation is in the currently active run,
* or when it is not causing any frag issue (large or huge bin)
* returns the bin utilization and run utilization both in fixed point 16:16.
* If the application decides to re-allocate it should use MALLOCX_TCACHE_NONE when doing so. */
JEMALLOC_EXPORT int JEMALLOC_NOTHROW
get_defrag_hint(void* ptr, int *bin_util, int *run_util) {
assert(ptr != NULL);
return iget_defrag_hint(TSDN_NULL, ptr, bin_util, run_util);
}

View File

@ -431,6 +431,11 @@ void loadServerConfigFromString(char *config) {
if ((server.active_defrag_enabled = yesnotoi(argv[1])) == -1) {
err = "argument must be 'yes' or 'no'"; goto loaderr;
}
if (server.active_defrag_enabled) {
#ifndef HAVE_DEFRAG
err = "active defrag can't be enabled without proper jemalloc support"; goto loaderr;
#endif
}
} else if (!strcasecmp(argv[0],"daemonize") && argc == 2) {
if ((server.daemonize = yesnotoi(argv[1])) == -1) {
err = "argument must be 'yes' or 'no'"; goto loaderr;

View File

@ -592,6 +592,171 @@ long defragSet(redisDb *db, dictEntry *kde) {
return defragged;
}
/* Defrag callback for radix tree iterator, called for each node,
* used in order to defrag the nodes allocations. */
int defragRaxNode(raxNode **noderef) {
raxNode *newnode = activeDefragAlloc(*noderef);
if (newnode) {
*noderef = newnode;
return 1;
}
return 0;
}
/* returns 0 if no more work needs to be been done, and 1 if time is up and more work is needed. */
int scanLaterStraemListpacks(robj *ob, unsigned long *cursor, long long endtime, long long *defragged) {
static unsigned char last[sizeof(streamID)];
raxIterator ri;
long iterations = 0;
if (ob->type != OBJ_STREAM || ob->encoding != OBJ_ENCODING_STREAM) {
*cursor = 0;
return 0;
}
stream *s = ob->ptr;
raxStart(&ri,s->rax);
if (*cursor == 0) {
/* if cursor is 0, we start new iteration */
defragRaxNode(&s->rax->head);
/* assign the iterator node callback before the seek, so that the
* initial nodes that are processed till the first item are covered */
ri.node_cb = defragRaxNode;
raxSeek(&ri,"^",NULL,0);
} else {
/* if cursor is non-zero, we seek to the static 'last' */
if (!raxSeek(&ri,">", last, sizeof(last))) {
*cursor = 0;
return 0;
}
/* assign the iterator node callback after the seek, so that the
* initial nodes that are processed till now aren't covered */
ri.node_cb = defragRaxNode;
}
(*cursor)++;
while (raxNext(&ri)) {
void *newdata = activeDefragAlloc(ri.data);
if (newdata)
raxSetData(ri.node, ri.data=newdata), (*defragged)++;
if (++iterations > 16) {
if (ustime() > endtime) {
serverAssert(ri.key_len==sizeof(last));
memcpy(last,ri.key,ri.key_len);
raxStop(&ri);
return 1;
}
iterations = 0;
}
}
raxStop(&ri);
*cursor = 0;
return 0;
}
/* optional callback used defrag each rax element (not including the element pointer itself) */
typedef void *(raxDefragFunction)(raxIterator *ri, void *privdata, long *defragged);
/* defrag radix tree including:
* 1) rax struct
* 2) rax nodes
* 3) rax entry data (only if defrag_data is specified)
* 4) call a callback per element, and allow the callback to return a new pointer for the element */
long defragRadixTree(rax **raxref, int defrag_data, raxDefragFunction *element_cb, void *element_cb_data) {
long defragged = 0;
raxIterator ri;
rax* rax;
if ((rax = activeDefragAlloc(*raxref)))
defragged++, *raxref = rax;
rax = *raxref;
raxStart(&ri,rax);
ri.node_cb = defragRaxNode;
defragRaxNode(&rax->head);
raxSeek(&ri,"^",NULL,0);
while (raxNext(&ri)) {
void *newdata = NULL;
if (element_cb)
newdata = element_cb(&ri, element_cb_data, &defragged);
if (defrag_data && !newdata)
newdata = activeDefragAlloc(ri.data);
if (newdata)
raxSetData(ri.node, ri.data=newdata), defragged++;
}
raxStop(&ri);
return defragged;
}
typedef struct {
streamCG *cg;
streamConsumer *c;
} PendingEntryContext;
void* defragStreamConsumerPendingEntry(raxIterator *ri, void *privdata, long *defragged) {
UNUSED(defragged);
PendingEntryContext *ctx = privdata;
streamNACK *nack = ri->data, *newnack;
nack->consumer = ctx->c; /* update nack pointer to consumer */
newnack = activeDefragAlloc(nack);
if (newnack) {
/* update consumer group pointer to the nack */
void *prev;
raxInsert(ctx->cg->pel, ri->key, ri->key_len, newnack, &prev);
serverAssert(prev==nack);
/* note: we don't increment 'defragged' that's done by the caller */
}
return newnack;
}
void* defragStreamConsumer(raxIterator *ri, void *privdata, long *defragged) {
streamConsumer *c = ri->data;
streamCG *cg = privdata;
void *newc = activeDefragAlloc(c);
if (newc) {
/* note: we don't increment 'defragged' that's done by the caller */
c = newc;
}
sds newsds = activeDefragSds(c->name);
if (newsds)
(*defragged)++, c->name = newsds;
if (c->pel) {
PendingEntryContext pel_ctx = {cg, c};
*defragged += defragRadixTree(&c->pel, 0, defragStreamConsumerPendingEntry, &pel_ctx);
}
return newc; /* returns NULL if c was not defragged */
}
void* defragStreamConsumerGroup(raxIterator *ri, void *privdata, long *defragged) {
streamCG *cg = ri->data;
UNUSED(privdata);
if (cg->consumers)
*defragged += defragRadixTree(&cg->consumers, 0, defragStreamConsumer, cg);
if (cg->pel)
*defragged += defragRadixTree(&cg->pel, 0, NULL, NULL);
return NULL;
}
long defragStream(redisDb *db, dictEntry *kde) {
long defragged = 0;
robj *ob = dictGetVal(kde);
serverAssert(ob->type == OBJ_STREAM && ob->encoding == OBJ_ENCODING_STREAM);
stream *s = ob->ptr, *news;
/* handle the main struct */
if ((news = activeDefragAlloc(s)))
defragged++, ob->ptr = s = news;
if (raxSize(s->rax) > server.active_defrag_max_scan_fields) {
rax *newrax = activeDefragAlloc(s->rax);
if (newrax)
defragged++, s->rax = newrax;
defragLater(db, kde);
} else
defragged += defragRadixTree(&s->rax, 1, NULL, NULL);
if (s->cgroups)
defragged += defragRadixTree(&s->cgroups, 1, defragStreamConsumerGroup, NULL);
return defragged;
}
/* for each key we scan in the main dict, this function will attempt to defrag
* all the various pointers it has. Returns a stat of how many pointers were
* moved. */
@ -660,6 +825,8 @@ long defragKey(redisDb *db, dictEntry *de) {
} else {
serverPanic("Unknown hash encoding");
}
} else if (ob->type == OBJ_STREAM) {
defragged += defragStream(db, de);
} else if (ob->type == OBJ_MODULE) {
/* Currently defragmenting modules private data types
* is not supported. */
@ -680,7 +847,7 @@ void defragScanCallback(void *privdata, const dictEntry *de) {
server.stat_active_defrag_scanned++;
}
/* Defrag scan callback for for each hash table bicket,
/* Defrag scan callback for each hash table bicket,
* used in order to defrag the dictEntry allocations. */
void defragDictBucketCallback(void *privdata, dictEntry **bucketref) {
UNUSED(privdata); /* NOTE: this function is also used by both activeDefragCycle and scanLaterHash, etc. don't use privdata */
@ -728,27 +895,29 @@ long defragOtherGlobals() {
return defragged;
}
unsigned long defragLaterItem(dictEntry *de, unsigned long cursor) {
long defragged = 0;
/* returns 0 more work may or may not be needed (see non-zero cursor),
* and 1 if time is up and more work is needed. */
int defragLaterItem(dictEntry *de, unsigned long *cursor, long long endtime) {
if (de) {
robj *ob = dictGetVal(de);
if (ob->type == OBJ_LIST) {
defragged += scanLaterList(ob);
cursor = 0; /* list has no scan, we must finish it in one go */
server.stat_active_defrag_hits += scanLaterList(ob);
*cursor = 0; /* list has no scan, we must finish it in one go */
} else if (ob->type == OBJ_SET) {
defragged += scanLaterSet(ob, &cursor);
server.stat_active_defrag_hits += scanLaterSet(ob, cursor);
} else if (ob->type == OBJ_ZSET) {
defragged += scanLaterZset(ob, &cursor);
server.stat_active_defrag_hits += scanLaterZset(ob, cursor);
} else if (ob->type == OBJ_HASH) {
defragged += scanLaterHash(ob, &cursor);
server.stat_active_defrag_hits += scanLaterHash(ob, cursor);
} else if (ob->type == OBJ_STREAM) {
return scanLaterStraemListpacks(ob, cursor, endtime, &server.stat_active_defrag_hits);
} else {
cursor = 0; /* object type may have changed since we schedule it for later */
*cursor = 0; /* object type may have changed since we schedule it for later */
}
} else {
cursor = 0; /* object may have been deleted already */
*cursor = 0; /* object may have been deleted already */
}
server.stat_active_defrag_hits += defragged;
return cursor;
return 0;
}
/* returns 0 if no more work needs to be been done, and 1 if time is up and more work is needed. */
@ -788,17 +957,22 @@ int defragLaterStep(redisDb *db, long long endtime) {
dictEntry *de = dictFind(db->dict, current_key);
key_defragged = server.stat_active_defrag_hits;
do {
cursor = defragLaterItem(de, cursor);
int quit = 0;
if (defragLaterItem(de, &cursor, endtime))
quit = 1; /* time is up, we didn't finish all the work */
/* Don't start a new BIG key in this loop, this is because the
* next key can be a list, and scanLaterList must be done in once cycle */
if (!cursor)
quit = 1;
/* Once in 16 scan iterations, 512 pointer reallocations, or 64 fields
* (if we have a lot of pointers in one hash bucket, or rehashing),
* check if we reached the time limit.
* But regardless, don't start a new BIG key in this loop, this is because the
* next key can be a list, and scanLaterList must be done in once cycle */
if (!cursor || (++iterations > 16 ||
* check if we reached the time limit. */
if (quit || (++iterations > 16 ||
server.stat_active_defrag_hits - prev_defragged > 512 ||
server.stat_active_defrag_scanned - prev_scanned > 64)) {
if (!cursor || ustime() > endtime) {
if (quit || ustime() > endtime) {
if(key_defragged != server.stat_active_defrag_hits)
server.stat_active_defrag_key_hits++;
else

View File

@ -1167,6 +1167,7 @@ void raxStart(raxIterator *it, rax *rt) {
it->key = it->key_static_string;
it->key_max = RAX_ITER_STATIC_LEN;
it->data = NULL;
it->node_cb = NULL;
raxStackInit(&it->stack);
}
@ -1240,6 +1241,8 @@ int raxIteratorNextStep(raxIterator *it, int noup) {
if (!raxIteratorAddChars(it,it->node->data,
it->node->iscompr ? it->node->size : 1)) return 0;
memcpy(&it->node,cp,sizeof(it->node));
if (it->node_cb && it->node_cb(&it->node))
memcpy(cp,&it->node,sizeof(it->node));
/* For "next" step, stop every time we find a key along the
* way, since the key is lexicograhically smaller compared to
* what follows in the sub-children. */
@ -1292,6 +1295,8 @@ int raxIteratorNextStep(raxIterator *it, int noup) {
raxIteratorAddChars(it,it->node->data+i,1);
if (!raxStackPush(&it->stack,it->node)) return 0;
memcpy(&it->node,cp,sizeof(it->node));
if (it->node_cb && it->node_cb(&it->node))
memcpy(cp,&it->node,sizeof(it->node));
if (it->node->iskey) {
it->data = raxGetData(it->node);
return 1;
@ -1325,7 +1330,7 @@ int raxSeekGreatest(raxIterator *it) {
/* Like raxIteratorNextStep() but implements an iteration step moving
* to the lexicographically previous element. The 'noup' option has a similar
* effect to the one of raxIteratorPrevSte(). */
* effect to the one of raxIteratorNextStep(). */
int raxIteratorPrevStep(raxIterator *it, int noup) {
if (it->flags & RAX_ITER_EOF) {
return 1;

View File

@ -119,6 +119,12 @@ typedef struct raxStack {
int oom; /* True if pushing into this stack failed for OOM at some point. */
} raxStack;
/* Optional callback used for iterators and be notified on each rax node.
* This is used by active defrag, the return value is an indication that
* the noderef was chagned, and the tree needs to be updated.
* This is currently only supported in forward iterations (raxNext) */
typedef int (*raxNodeCallback)(raxNode **noderef);
/* Radix tree iterator state is encapsulated into this data structure. */
#define RAX_ITER_STATIC_LEN 128
#define RAX_ITER_JUST_SEEKED (1<<0) /* Iterator was just seeked. Return current
@ -137,6 +143,7 @@ typedef struct raxIterator {
unsigned char key_static_string[RAX_ITER_STATIC_LEN];
raxNode *node; /* Current node. Only for unsafe iteration. */
raxStack stack; /* Stack used for unsafe iteration. */
raxNodeCallback node_cb;
} raxIterator;
/* A special pointer returned for not found items. */
@ -161,4 +168,7 @@ int raxEOF(raxIterator *it);
void raxShow(rax *rax);
uint64_t raxSize(rax *rax);
/* internals */
void raxSetData(raxNode *n, void *data);
#endif

View File

@ -48,6 +48,8 @@ start_server [list overrides [list "dir" $server_path]] {
r xadd stream * bar $j
}
}
r xgroup create stream mygroup $
r xreadgroup GROUP mygroup Alice COUNT 1 STREAMS stream >
set digest [r debug digest]
r debug reload
set newdigest [r debug digest]

View File

@ -97,10 +97,15 @@ start_server {tags {"defrag"}} {
r config set active-defrag-ignore-bytes 2mb
r config set maxmemory 0
r config set list-max-ziplist-size 5 ;# list of 10k items will have 2000 quicklist nodes
r config set stream-node-max-entries 5
r hmset hash h1 v1 h2 v2 h3 v3
r lpush list a b c d
r zadd zset 0 a 1 b 2 c 3 d
r sadd set a b c d
r xadd stream * item 1 value a
r xadd stream * item 2 value b
r xgroup create stream mygroup $
r xreadgroup GROUP mygroup Alice COUNT 1 STREAMS stream >
# create big keys with 10k items
set rd [redis_deferring_client]
@ -109,8 +114,9 @@ start_server {tags {"defrag"}} {
$rd lpush biglist [concat "asdfasdfasdf" $j]
$rd zadd bigzset $j [concat "asdfasdfasdf" $j]
$rd sadd bigset [concat "asdfasdfasdf" $j]
$rd xadd bigstream * item 1 value a
}
for {set j 0} {$j < 40000} {incr j} {
for {set j 0} {$j < 50000} {incr j} {
$rd read ; # Discard replies
}
@ -134,7 +140,7 @@ start_server {tags {"defrag"}} {
for {set j 0} {$j < 500000} {incr j} {
$rd read ; # Discard replies
}
assert {[r dbsize] == 500008}
assert {[r dbsize] == 500010}
# create some fragmentation
for {set j 0} {$j < 500000} {incr j 2} {
@ -143,7 +149,7 @@ start_server {tags {"defrag"}} {
for {set j 0} {$j < 500000} {incr j 2} {
$rd read ; # Discard replies
}
assert {[r dbsize] == 250008}
assert {[r dbsize] == 250010}
# start defrag
after 120 ;# serverCron only updates the info once in 100ms
@ -155,6 +161,7 @@ start_server {tags {"defrag"}} {
r config set latency-monitor-threshold 5
r latency reset
set digest [r debug digest]
catch {r config set activedefrag yes} e
if {![string match {DISABLED*} $e]} {
# wait for the active defrag to start working (decision once a second)
@ -193,9 +200,11 @@ start_server {tags {"defrag"}} {
# due to high fragmentation, 10hz, and active-defrag-cycle-max set to 75,
# we expect max latency to be not much higher than 75ms
assert {$max_latency <= 80}
} else {
set _ ""
}
} {}
# verify the data isn't corrupted or changed
set newdigest [r debug digest]
assert {$digest eq $newdigest}
r save ;# saving an rdb iterates over all the data / pointers
} {OK}
}
}