From e8099cabd19c4e3a46c94c39e69e13191d43f5eb Mon Sep 17 00:00:00 2001 From: Oran Agra Date: Tue, 26 Jun 2018 14:13:24 +0300 Subject: [PATCH 1/2] add defrag hint support into jemalloc 5 --- .../internal/jemalloc_internal_inlines_c.h | 28 +++++++++++++++++++ .../include/jemalloc/jemalloc_macros.h.in | 4 +++ deps/jemalloc/src/jemalloc.c | 11 ++++++++ 3 files changed, 43 insertions(+) diff --git a/deps/jemalloc/include/jemalloc/internal/jemalloc_internal_inlines_c.h b/deps/jemalloc/include/jemalloc/internal/jemalloc_internal_inlines_c.h index c829ac60c..540c168e5 100644 --- a/deps/jemalloc/include/jemalloc/internal/jemalloc_internal_inlines_c.h +++ b/deps/jemalloc/include/jemalloc/internal/jemalloc_internal_inlines_c.h @@ -215,4 +215,32 @@ ixalloc(tsdn_t *tsdn, void *ptr, size_t oldsize, size_t size, size_t extra, return arena_ralloc_no_move(tsdn, ptr, oldsize, size, extra, zero); } +JEMALLOC_ALWAYS_INLINE int +iget_defrag_hint(tsdn_t *tsdn, void* ptr, int *bin_util, int *run_util) { + int defrag = 0; + rtree_ctx_t rtree_ctx_fallback; + rtree_ctx_t *rtree_ctx = tsdn_rtree_ctx(tsdn, &rtree_ctx_fallback); + szind_t szind; + bool is_slab; + rtree_szind_slab_read(tsdn, &extents_rtree, rtree_ctx, (uintptr_t)ptr, true, &szind, &is_slab); + if (likely(is_slab)) { + /* Small allocation. */ + extent_t *slab = iealloc(tsdn, ptr); + arena_t *arena = extent_arena_get(slab); + szind_t binind = extent_szind_get(slab); + bin_t *bin = &arena->bins[binind]; + malloc_mutex_lock(tsdn, &bin->lock); + /* don't bother moving allocations from the slab currently used for new allocations */ + if (slab != bin->slabcur) { + const bin_info_t *bin_info = &bin_infos[binind]; + size_t availregs = bin_info->nregs * bin->stats.curslabs; + *bin_util = (bin->stats.curregs<<16) / availregs; + *run_util = ((bin_info->nregs - extent_nfree_get(slab))<<16) / bin_info->nregs; + defrag = 1; + } + malloc_mutex_unlock(tsdn, &bin->lock); + } + return defrag; +} + #endif /* JEMALLOC_INTERNAL_INLINES_C_H */ diff --git a/deps/jemalloc/include/jemalloc/jemalloc_macros.h.in b/deps/jemalloc/include/jemalloc/jemalloc_macros.h.in index aee55438c..daf9e571b 100644 --- a/deps/jemalloc/include/jemalloc/jemalloc_macros.h.in +++ b/deps/jemalloc/include/jemalloc/jemalloc_macros.h.in @@ -120,3 +120,7 @@ # define JEMALLOC_RESTRICT_RETURN # define JEMALLOC_ALLOCATOR #endif + +/* This version of Jemalloc, modified for Redis, has the je_get_defrag_hint() + * function. */ +#define JEMALLOC_FRAG_HINT diff --git a/deps/jemalloc/src/jemalloc.c b/deps/jemalloc/src/jemalloc.c index f93c16fa3..5b936cb48 100644 --- a/deps/jemalloc/src/jemalloc.c +++ b/deps/jemalloc/src/jemalloc.c @@ -3324,3 +3324,14 @@ jemalloc_postfork_child(void) { } /******************************************************************************/ + +/* Helps the application decide if a pointer is worth re-allocating in order to reduce fragmentation. + * returns 0 if the allocation is in the currently active run, + * or when it is not causing any frag issue (large or huge bin) + * returns the bin utilization and run utilization both in fixed point 16:16. + * If the application decides to re-allocate it should use MALLOCX_TCACHE_NONE when doing so. */ +JEMALLOC_EXPORT int JEMALLOC_NOTHROW +get_defrag_hint(void* ptr, int *bin_util, int *run_util) { + assert(ptr != NULL); + return iget_defrag_hint(TSDN_NULL, ptr, bin_util, run_util); +} From 5616d4c6036eeea416f38aa2e41a57249e69277f Mon Sep 17 00:00:00 2001 From: Oran Agra Date: Tue, 26 Jun 2018 14:14:35 +0300 Subject: [PATCH 2/2] add active defrag support for streams --- src/config.c | 5 + src/defrag.c | 210 ++++++++++++++++++++++++++++++++--- src/rax.c | 7 +- src/rax.h | 10 ++ tests/integration/rdb.tcl | 2 + tests/unit/memefficiency.tcl | 21 +++- 6 files changed, 230 insertions(+), 25 deletions(-) diff --git a/src/config.c b/src/config.c index c7fc11556..c39b61e6c 100644 --- a/src/config.c +++ b/src/config.c @@ -431,6 +431,11 @@ void loadServerConfigFromString(char *config) { if ((server.active_defrag_enabled = yesnotoi(argv[1])) == -1) { err = "argument must be 'yes' or 'no'"; goto loaderr; } + if (server.active_defrag_enabled) { +#ifndef HAVE_DEFRAG + err = "active defrag can't be enabled without proper jemalloc support"; goto loaderr; +#endif + } } else if (!strcasecmp(argv[0],"daemonize") && argc == 2) { if ((server.daemonize = yesnotoi(argv[1])) == -1) { err = "argument must be 'yes' or 'no'"; goto loaderr; diff --git a/src/defrag.c b/src/defrag.c index aae72adcb..b25fceb1e 100644 --- a/src/defrag.c +++ b/src/defrag.c @@ -592,6 +592,171 @@ long defragSet(redisDb *db, dictEntry *kde) { return defragged; } +/* Defrag callback for radix tree iterator, called for each node, + * used in order to defrag the nodes allocations. */ +int defragRaxNode(raxNode **noderef) { + raxNode *newnode = activeDefragAlloc(*noderef); + if (newnode) { + *noderef = newnode; + return 1; + } + return 0; +} + +/* returns 0 if no more work needs to be been done, and 1 if time is up and more work is needed. */ +int scanLaterStraemListpacks(robj *ob, unsigned long *cursor, long long endtime, long long *defragged) { + static unsigned char last[sizeof(streamID)]; + raxIterator ri; + long iterations = 0; + if (ob->type != OBJ_STREAM || ob->encoding != OBJ_ENCODING_STREAM) { + *cursor = 0; + return 0; + } + + stream *s = ob->ptr; + raxStart(&ri,s->rax); + if (*cursor == 0) { + /* if cursor is 0, we start new iteration */ + defragRaxNode(&s->rax->head); + /* assign the iterator node callback before the seek, so that the + * initial nodes that are processed till the first item are covered */ + ri.node_cb = defragRaxNode; + raxSeek(&ri,"^",NULL,0); + } else { + /* if cursor is non-zero, we seek to the static 'last' */ + if (!raxSeek(&ri,">", last, sizeof(last))) { + *cursor = 0; + return 0; + } + /* assign the iterator node callback after the seek, so that the + * initial nodes that are processed till now aren't covered */ + ri.node_cb = defragRaxNode; + } + + (*cursor)++; + while (raxNext(&ri)) { + void *newdata = activeDefragAlloc(ri.data); + if (newdata) + raxSetData(ri.node, ri.data=newdata), (*defragged)++; + if (++iterations > 16) { + if (ustime() > endtime) { + serverAssert(ri.key_len==sizeof(last)); + memcpy(last,ri.key,ri.key_len); + raxStop(&ri); + return 1; + } + iterations = 0; + } + } + raxStop(&ri); + *cursor = 0; + return 0; +} + +/* optional callback used defrag each rax element (not including the element pointer itself) */ +typedef void *(raxDefragFunction)(raxIterator *ri, void *privdata, long *defragged); + +/* defrag radix tree including: + * 1) rax struct + * 2) rax nodes + * 3) rax entry data (only if defrag_data is specified) + * 4) call a callback per element, and allow the callback to return a new pointer for the element */ +long defragRadixTree(rax **raxref, int defrag_data, raxDefragFunction *element_cb, void *element_cb_data) { + long defragged = 0; + raxIterator ri; + rax* rax; + if ((rax = activeDefragAlloc(*raxref))) + defragged++, *raxref = rax; + rax = *raxref; + raxStart(&ri,rax); + ri.node_cb = defragRaxNode; + defragRaxNode(&rax->head); + raxSeek(&ri,"^",NULL,0); + while (raxNext(&ri)) { + void *newdata = NULL; + if (element_cb) + newdata = element_cb(&ri, element_cb_data, &defragged); + if (defrag_data && !newdata) + newdata = activeDefragAlloc(ri.data); + if (newdata) + raxSetData(ri.node, ri.data=newdata), defragged++; + } + raxStop(&ri); + return defragged; +} + +typedef struct { + streamCG *cg; + streamConsumer *c; +} PendingEntryContext; + +void* defragStreamConsumerPendingEntry(raxIterator *ri, void *privdata, long *defragged) { + UNUSED(defragged); + PendingEntryContext *ctx = privdata; + streamNACK *nack = ri->data, *newnack; + nack->consumer = ctx->c; /* update nack pointer to consumer */ + newnack = activeDefragAlloc(nack); + if (newnack) { + /* update consumer group pointer to the nack */ + void *prev; + raxInsert(ctx->cg->pel, ri->key, ri->key_len, newnack, &prev); + serverAssert(prev==nack); + /* note: we don't increment 'defragged' that's done by the caller */ + } + return newnack; +} + +void* defragStreamConsumer(raxIterator *ri, void *privdata, long *defragged) { + streamConsumer *c = ri->data; + streamCG *cg = privdata; + void *newc = activeDefragAlloc(c); + if (newc) { + /* note: we don't increment 'defragged' that's done by the caller */ + c = newc; + } + sds newsds = activeDefragSds(c->name); + if (newsds) + (*defragged)++, c->name = newsds; + if (c->pel) { + PendingEntryContext pel_ctx = {cg, c}; + *defragged += defragRadixTree(&c->pel, 0, defragStreamConsumerPendingEntry, &pel_ctx); + } + return newc; /* returns NULL if c was not defragged */ +} + +void* defragStreamConsumerGroup(raxIterator *ri, void *privdata, long *defragged) { + streamCG *cg = ri->data; + UNUSED(privdata); + if (cg->consumers) + *defragged += defragRadixTree(&cg->consumers, 0, defragStreamConsumer, cg); + if (cg->pel) + *defragged += defragRadixTree(&cg->pel, 0, NULL, NULL); + return NULL; +} + +long defragStream(redisDb *db, dictEntry *kde) { + long defragged = 0; + robj *ob = dictGetVal(kde); + serverAssert(ob->type == OBJ_STREAM && ob->encoding == OBJ_ENCODING_STREAM); + stream *s = ob->ptr, *news; + + /* handle the main struct */ + if ((news = activeDefragAlloc(s))) + defragged++, ob->ptr = s = news; + + if (raxSize(s->rax) > server.active_defrag_max_scan_fields) { + rax *newrax = activeDefragAlloc(s->rax); + if (newrax) + defragged++, s->rax = newrax; + defragLater(db, kde); + } else + defragged += defragRadixTree(&s->rax, 1, NULL, NULL); + + if (s->cgroups) + defragged += defragRadixTree(&s->cgroups, 1, defragStreamConsumerGroup, NULL); + return defragged; +} + /* for each key we scan in the main dict, this function will attempt to defrag * all the various pointers it has. Returns a stat of how many pointers were * moved. */ @@ -660,6 +825,8 @@ long defragKey(redisDb *db, dictEntry *de) { } else { serverPanic("Unknown hash encoding"); } + } else if (ob->type == OBJ_STREAM) { + defragged += defragStream(db, de); } else if (ob->type == OBJ_MODULE) { /* Currently defragmenting modules private data types * is not supported. */ @@ -680,7 +847,7 @@ void defragScanCallback(void *privdata, const dictEntry *de) { server.stat_active_defrag_scanned++; } -/* Defrag scan callback for for each hash table bicket, +/* Defrag scan callback for each hash table bicket, * used in order to defrag the dictEntry allocations. */ void defragDictBucketCallback(void *privdata, dictEntry **bucketref) { UNUSED(privdata); /* NOTE: this function is also used by both activeDefragCycle and scanLaterHash, etc. don't use privdata */ @@ -728,27 +895,29 @@ long defragOtherGlobals() { return defragged; } -unsigned long defragLaterItem(dictEntry *de, unsigned long cursor) { - long defragged = 0; +/* returns 0 more work may or may not be needed (see non-zero cursor), + * and 1 if time is up and more work is needed. */ +int defragLaterItem(dictEntry *de, unsigned long *cursor, long long endtime) { if (de) { robj *ob = dictGetVal(de); if (ob->type == OBJ_LIST) { - defragged += scanLaterList(ob); - cursor = 0; /* list has no scan, we must finish it in one go */ + server.stat_active_defrag_hits += scanLaterList(ob); + *cursor = 0; /* list has no scan, we must finish it in one go */ } else if (ob->type == OBJ_SET) { - defragged += scanLaterSet(ob, &cursor); + server.stat_active_defrag_hits += scanLaterSet(ob, cursor); } else if (ob->type == OBJ_ZSET) { - defragged += scanLaterZset(ob, &cursor); + server.stat_active_defrag_hits += scanLaterZset(ob, cursor); } else if (ob->type == OBJ_HASH) { - defragged += scanLaterHash(ob, &cursor); + server.stat_active_defrag_hits += scanLaterHash(ob, cursor); + } else if (ob->type == OBJ_STREAM) { + return scanLaterStraemListpacks(ob, cursor, endtime, &server.stat_active_defrag_hits); } else { - cursor = 0; /* object type may have changed since we schedule it for later */ + *cursor = 0; /* object type may have changed since we schedule it for later */ } } else { - cursor = 0; /* object may have been deleted already */ + *cursor = 0; /* object may have been deleted already */ } - server.stat_active_defrag_hits += defragged; - return cursor; + return 0; } /* returns 0 if no more work needs to be been done, and 1 if time is up and more work is needed. */ @@ -788,17 +957,22 @@ int defragLaterStep(redisDb *db, long long endtime) { dictEntry *de = dictFind(db->dict, current_key); key_defragged = server.stat_active_defrag_hits; do { - cursor = defragLaterItem(de, cursor); + int quit = 0; + if (defragLaterItem(de, &cursor, endtime)) + quit = 1; /* time is up, we didn't finish all the work */ + + /* Don't start a new BIG key in this loop, this is because the + * next key can be a list, and scanLaterList must be done in once cycle */ + if (!cursor) + quit = 1; /* Once in 16 scan iterations, 512 pointer reallocations, or 64 fields * (if we have a lot of pointers in one hash bucket, or rehashing), - * check if we reached the time limit. - * But regardless, don't start a new BIG key in this loop, this is because the - * next key can be a list, and scanLaterList must be done in once cycle */ - if (!cursor || (++iterations > 16 || + * check if we reached the time limit. */ + if (quit || (++iterations > 16 || server.stat_active_defrag_hits - prev_defragged > 512 || server.stat_active_defrag_scanned - prev_scanned > 64)) { - if (!cursor || ustime() > endtime) { + if (quit || ustime() > endtime) { if(key_defragged != server.stat_active_defrag_hits) server.stat_active_defrag_key_hits++; else diff --git a/src/rax.c b/src/rax.c index 8764dd8c9..fff580272 100644 --- a/src/rax.c +++ b/src/rax.c @@ -1167,6 +1167,7 @@ void raxStart(raxIterator *it, rax *rt) { it->key = it->key_static_string; it->key_max = RAX_ITER_STATIC_LEN; it->data = NULL; + it->node_cb = NULL; raxStackInit(&it->stack); } @@ -1240,6 +1241,8 @@ int raxIteratorNextStep(raxIterator *it, int noup) { if (!raxIteratorAddChars(it,it->node->data, it->node->iscompr ? it->node->size : 1)) return 0; memcpy(&it->node,cp,sizeof(it->node)); + if (it->node_cb && it->node_cb(&it->node)) + memcpy(cp,&it->node,sizeof(it->node)); /* For "next" step, stop every time we find a key along the * way, since the key is lexicograhically smaller compared to * what follows in the sub-children. */ @@ -1292,6 +1295,8 @@ int raxIteratorNextStep(raxIterator *it, int noup) { raxIteratorAddChars(it,it->node->data+i,1); if (!raxStackPush(&it->stack,it->node)) return 0; memcpy(&it->node,cp,sizeof(it->node)); + if (it->node_cb && it->node_cb(&it->node)) + memcpy(cp,&it->node,sizeof(it->node)); if (it->node->iskey) { it->data = raxGetData(it->node); return 1; @@ -1325,7 +1330,7 @@ int raxSeekGreatest(raxIterator *it) { /* Like raxIteratorNextStep() but implements an iteration step moving * to the lexicographically previous element. The 'noup' option has a similar - * effect to the one of raxIteratorPrevSte(). */ + * effect to the one of raxIteratorNextStep(). */ int raxIteratorPrevStep(raxIterator *it, int noup) { if (it->flags & RAX_ITER_EOF) { return 1; diff --git a/src/rax.h b/src/rax.h index 5b4d45167..9e6bc0b51 100644 --- a/src/rax.h +++ b/src/rax.h @@ -119,6 +119,12 @@ typedef struct raxStack { int oom; /* True if pushing into this stack failed for OOM at some point. */ } raxStack; +/* Optional callback used for iterators and be notified on each rax node. + * This is used by active defrag, the return value is an indication that + * the noderef was chagned, and the tree needs to be updated. + * This is currently only supported in forward iterations (raxNext) */ +typedef int (*raxNodeCallback)(raxNode **noderef); + /* Radix tree iterator state is encapsulated into this data structure. */ #define RAX_ITER_STATIC_LEN 128 #define RAX_ITER_JUST_SEEKED (1<<0) /* Iterator was just seeked. Return current @@ -137,6 +143,7 @@ typedef struct raxIterator { unsigned char key_static_string[RAX_ITER_STATIC_LEN]; raxNode *node; /* Current node. Only for unsafe iteration. */ raxStack stack; /* Stack used for unsafe iteration. */ + raxNodeCallback node_cb; } raxIterator; /* A special pointer returned for not found items. */ @@ -161,4 +168,7 @@ int raxEOF(raxIterator *it); void raxShow(rax *rax); uint64_t raxSize(rax *rax); +/* internals */ +void raxSetData(raxNode *n, void *data); + #endif diff --git a/tests/integration/rdb.tcl b/tests/integration/rdb.tcl index d645b2b54..f51a3c13d 100644 --- a/tests/integration/rdb.tcl +++ b/tests/integration/rdb.tcl @@ -48,6 +48,8 @@ start_server [list overrides [list "dir" $server_path]] { r xadd stream * bar $j } } + r xgroup create stream mygroup $ + r xreadgroup GROUP mygroup Alice COUNT 1 STREAMS stream > set digest [r debug digest] r debug reload set newdigest [r debug digest] diff --git a/tests/unit/memefficiency.tcl b/tests/unit/memefficiency.tcl index 1796d695c..ed37a68ed 100644 --- a/tests/unit/memefficiency.tcl +++ b/tests/unit/memefficiency.tcl @@ -97,10 +97,15 @@ start_server {tags {"defrag"}} { r config set active-defrag-ignore-bytes 2mb r config set maxmemory 0 r config set list-max-ziplist-size 5 ;# list of 10k items will have 2000 quicklist nodes + r config set stream-node-max-entries 5 r hmset hash h1 v1 h2 v2 h3 v3 r lpush list a b c d r zadd zset 0 a 1 b 2 c 3 d r sadd set a b c d + r xadd stream * item 1 value a + r xadd stream * item 2 value b + r xgroup create stream mygroup $ + r xreadgroup GROUP mygroup Alice COUNT 1 STREAMS stream > # create big keys with 10k items set rd [redis_deferring_client] @@ -109,8 +114,9 @@ start_server {tags {"defrag"}} { $rd lpush biglist [concat "asdfasdfasdf" $j] $rd zadd bigzset $j [concat "asdfasdfasdf" $j] $rd sadd bigset [concat "asdfasdfasdf" $j] + $rd xadd bigstream * item 1 value a } - for {set j 0} {$j < 40000} {incr j} { + for {set j 0} {$j < 50000} {incr j} { $rd read ; # Discard replies } @@ -134,7 +140,7 @@ start_server {tags {"defrag"}} { for {set j 0} {$j < 500000} {incr j} { $rd read ; # Discard replies } - assert {[r dbsize] == 500008} + assert {[r dbsize] == 500010} # create some fragmentation for {set j 0} {$j < 500000} {incr j 2} { @@ -143,7 +149,7 @@ start_server {tags {"defrag"}} { for {set j 0} {$j < 500000} {incr j 2} { $rd read ; # Discard replies } - assert {[r dbsize] == 250008} + assert {[r dbsize] == 250010} # start defrag after 120 ;# serverCron only updates the info once in 100ms @@ -155,6 +161,7 @@ start_server {tags {"defrag"}} { r config set latency-monitor-threshold 5 r latency reset + set digest [r debug digest] catch {r config set activedefrag yes} e if {![string match {DISABLED*} $e]} { # wait for the active defrag to start working (decision once a second) @@ -193,9 +200,11 @@ start_server {tags {"defrag"}} { # due to high fragmentation, 10hz, and active-defrag-cycle-max set to 75, # we expect max latency to be not much higher than 75ms assert {$max_latency <= 80} - } else { - set _ "" } - } {} + # verify the data isn't corrupted or changed + set newdigest [r debug digest] + assert {$digest eq $newdigest} + r save ;# saving an rdb iterates over all the data / pointers + } {OK} } }