mirror of
https://codeberg.org/redict/redict.git
synced 2025-01-22 08:08:53 -05:00
Implement defragmentation for pubsub kvstore (#13058)
After #13013 ### This PR make effort to defrag the pubsub kvstore in the following ways: 1. Till now server.pubsub(shard)_channels only share channel name obj with the first subscribed client, now change it so that the clients and the pubsub kvstore share the channel name robj. This would save a lot of memory when there are many subscribers to the same channel. It also means that we only need to defrag the channel name robj in the pubsub kvstore, and then update all client references for the current channel, avoiding the need to iterate through all the clients to do the same things. 2. Refactor the code to defragment pubsub(shard) in the same way as defragment of keys and EXPIRES, with the exception that we only defragment pubsub(without shard) when slot is zero. ### Other Fix an overlook in #11695, if defragment doesn't reach the end time, we should wait for the current db's keys and expires, pubsub and pubsubshard to finish before leaving, now it's possible to exit early when the keys are defragmented. --------- Co-authored-by: oranagra <oran@redislabs.com>
This commit is contained in:
parent
33ea432585
commit
ad12730333
202
src/defrag.c
202
src/defrag.c
@ -39,10 +39,15 @@
|
||||
#ifdef HAVE_DEFRAG
|
||||
|
||||
typedef struct defragCtx {
|
||||
redisDb *db;
|
||||
void *privdata;
|
||||
int slot;
|
||||
} defragCtx;
|
||||
|
||||
typedef struct defragPubSubCtx {
|
||||
kvstore *pubsub_channels;
|
||||
dict *(*clientPubSubChannels)(client*);
|
||||
} defragPubSubCtx;
|
||||
|
||||
/* this method was added to jemalloc in order to help us understand which
|
||||
* pointers are worthwhile moving and which aren't */
|
||||
int je_get_defrag_hint(void* ptr);
|
||||
@ -86,14 +91,16 @@ sds activeDefragSds(sds sdsptr) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
/* Defrag helper for robj and/or string objects
|
||||
/* Defrag helper for robj and/or string objects with expected refcount.
|
||||
*
|
||||
* returns NULL in case the allocation wasn't moved.
|
||||
* when it returns a non-null value, the old pointer was already released
|
||||
* and should NOT be accessed. */
|
||||
robj *activeDefragStringOb(robj* ob) {
|
||||
* Like activeDefragStringOb, but it requires the caller to pass in the expected
|
||||
* reference count. In some cases, the caller needs to update a robj whose
|
||||
* reference count is not 1, in these cases, the caller must explicitly pass
|
||||
* in the reference count, otherwise defragmentation will not be performed.
|
||||
* Note that the caller is responsible for updating any other references to the robj. */
|
||||
robj *activeDefragStringObEx(robj* ob, int expected_refcount) {
|
||||
robj *ret = NULL;
|
||||
if (ob->refcount!=1)
|
||||
if (ob->refcount!=expected_refcount)
|
||||
return NULL;
|
||||
|
||||
/* try to defrag robj (only if not an EMBSTR type (handled below). */
|
||||
@ -124,6 +131,15 @@ robj *activeDefragStringOb(robj* ob) {
|
||||
return ret;
|
||||
}
|
||||
|
||||
/* Defrag helper for robj and/or string objects
|
||||
*
|
||||
* returns NULL in case the allocation wasn't moved.
|
||||
* when it returns a non-null value, the old pointer was already released
|
||||
* and should NOT be accessed. */
|
||||
robj *activeDefragStringOb(robj* ob) {
|
||||
return activeDefragStringObEx(ob, 1);
|
||||
}
|
||||
|
||||
/* Defrag helper for lua scripts
|
||||
*
|
||||
* returns NULL in case the allocation wasn't moved.
|
||||
@ -145,12 +161,20 @@ luaScript *activeDefragLuaScript(luaScript *script) {
|
||||
}
|
||||
|
||||
/* Defrag helper for dict main allocations (dict struct, and hash tables).
|
||||
* receives a pointer to the dict* and implicitly updates it when the dict
|
||||
* struct itself was moved. */
|
||||
void dictDefragTables(dict* d) {
|
||||
* Receives a pointer to the dict* and return a new dict* when the dict
|
||||
* struct itself was moved.
|
||||
*
|
||||
* Returns NULL in case the allocation wasn't moved.
|
||||
* When it returns a non-null value, the old pointer was already released
|
||||
* and should NOT be accessed. */
|
||||
dict *dictDefragTables(dict *d) {
|
||||
dict *ret = NULL;
|
||||
dictEntry **newtable;
|
||||
/* handle the dict struct */
|
||||
if ((ret = activeDefragAlloc(d)))
|
||||
d = ret;
|
||||
/* handle the first hash table */
|
||||
if (!d->ht_table[0]) return; /* created but unused */
|
||||
if (!d->ht_table[0]) return ret; /* created but unused */
|
||||
newtable = activeDefragAlloc(d->ht_table[0]);
|
||||
if (newtable)
|
||||
d->ht_table[0] = newtable;
|
||||
@ -160,6 +184,7 @@ void dictDefragTables(dict* d) {
|
||||
if (newtable)
|
||||
d->ht_table[1] = newtable;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
/* Internal function used by zslDefrag */
|
||||
@ -460,11 +485,9 @@ void defragZsetSkiplist(redisDb *db, dictEntry *kde) {
|
||||
}
|
||||
dictReleaseIterator(di);
|
||||
}
|
||||
/* handle the dict struct */
|
||||
if ((newdict = activeDefragAlloc(zs->dict)))
|
||||
/* defrag the dict struct and tables */
|
||||
if ((newdict = dictDefragTables(zs->dict)))
|
||||
zs->dict = newdict;
|
||||
/* defrag the dict tables */
|
||||
dictDefragTables(zs->dict);
|
||||
}
|
||||
|
||||
void defragHash(redisDb *db, dictEntry *kde) {
|
||||
@ -476,11 +499,9 @@ void defragHash(redisDb *db, dictEntry *kde) {
|
||||
defragLater(db, kde);
|
||||
else
|
||||
activeDefragSdsDict(d, DEFRAG_SDS_DICT_VAL_IS_SDS);
|
||||
/* handle the dict struct */
|
||||
if ((newd = activeDefragAlloc(ob->ptr)))
|
||||
/* defrag the dict struct and tables */
|
||||
if ((newd = dictDefragTables(ob->ptr)))
|
||||
ob->ptr = newd;
|
||||
/* defrag the dict tables */
|
||||
dictDefragTables(ob->ptr);
|
||||
}
|
||||
|
||||
void defragSet(redisDb *db, dictEntry *kde) {
|
||||
@ -492,11 +513,9 @@ void defragSet(redisDb *db, dictEntry *kde) {
|
||||
defragLater(db, kde);
|
||||
else
|
||||
activeDefragSdsDict(d, DEFRAG_SDS_DICT_NO_VAL);
|
||||
/* handle the dict struct */
|
||||
if ((newd = activeDefragAlloc(ob->ptr)))
|
||||
/* defrag the dict struct and tables */
|
||||
if ((newd = dictDefragTables(ob->ptr)))
|
||||
ob->ptr = newd;
|
||||
/* defrag the dict tables */
|
||||
dictDefragTables(ob->ptr);
|
||||
}
|
||||
|
||||
/* Defrag callback for radix tree iterator, called for each node,
|
||||
@ -677,7 +696,7 @@ void defragKey(defragCtx *ctx, dictEntry *de) {
|
||||
robj *newob, *ob;
|
||||
unsigned char *newzl;
|
||||
sds newsds;
|
||||
redisDb *db = ctx->db;
|
||||
redisDb *db = ctx->privdata;
|
||||
int slot = ctx->slot;
|
||||
/* Try to defrag the key name. */
|
||||
newsds = activeDefragSds(keysds);
|
||||
@ -761,18 +780,6 @@ void defragScanCallback(void *privdata, const dictEntry *de) {
|
||||
server.stat_active_defrag_scanned++;
|
||||
}
|
||||
|
||||
static void defragKvstoreDefragScanCallBack(dict **d) {
|
||||
dict *newd;
|
||||
/* handle the dict struct */
|
||||
if ((newd = activeDefragAlloc(*d)))
|
||||
*d = newd;
|
||||
dictDefragTables(*d);
|
||||
}
|
||||
|
||||
void activeDefragKvstore(kvstore *kvs) {
|
||||
kvstoreDictLUTDefrag(kvs, defragKvstoreDefragScanCallBack);
|
||||
}
|
||||
|
||||
/* Utility function to get the fragmentation ratio from jemalloc.
|
||||
* It is critical to do that by comparing only heap maps that belong to
|
||||
* jemalloc, and skip ones the jemalloc keeps as spare. Since we use this
|
||||
@ -798,6 +805,41 @@ float getAllocatorFragmentation(size_t *out_frag_bytes) {
|
||||
return frag_pct;
|
||||
}
|
||||
|
||||
/* Defrag scan callback for the pubsub dictionary. */
|
||||
void defragPubsubScanCallback(void *privdata, const dictEntry *de) {
|
||||
defragCtx *ctx = privdata;
|
||||
defragPubSubCtx *pubsub_ctx = ctx->privdata;
|
||||
kvstore *pubsub_channels = pubsub_ctx->pubsub_channels;
|
||||
robj *newchannel, *channel = dictGetKey(de);
|
||||
dict *newclients, *clients = dictGetVal(de);
|
||||
|
||||
/* Try to defrag the channel name. */
|
||||
serverAssert(channel->refcount == (int)dictSize(clients) + 1);
|
||||
newchannel = activeDefragStringObEx(channel, dictSize(clients) + 1);
|
||||
if (newchannel) {
|
||||
kvstoreDictSetKey(pubsub_channels, ctx->slot, (dictEntry*)de, newchannel);
|
||||
|
||||
/* The channel name is shared by the client's pubsub(shard) and server's
|
||||
* pubsub(shard), after defraging the channel name, we need to update
|
||||
* the reference in the clients' dictionary. */
|
||||
dictIterator *di = dictGetIterator(clients);
|
||||
dictEntry *clientde;
|
||||
while((clientde = dictNext(di)) != NULL) {
|
||||
client *c = dictGetKey(clientde);
|
||||
dictEntry *pubsub_channel = dictFind(pubsub_ctx->clientPubSubChannels(c), newchannel);
|
||||
serverAssert(pubsub_channel);
|
||||
dictSetKey(pubsub_ctx->clientPubSubChannels(c), pubsub_channel, newchannel);
|
||||
}
|
||||
dictReleaseIterator(di);
|
||||
}
|
||||
|
||||
/* Try to defrag the dictionary of clients that is stored as the value part. */
|
||||
if ((newclients = dictDefragTables(clients)))
|
||||
kvstoreDictSetVal(pubsub_channels, ctx->slot, (dictEntry*)de, newclients);
|
||||
|
||||
server.stat_active_defrag_scanned++;
|
||||
}
|
||||
|
||||
/* We may need to defrag other globals, one small allocation can hold a full allocator run.
|
||||
* so although small, it is still important to defrag these */
|
||||
void defragOtherGlobals(void) {
|
||||
@ -807,6 +849,8 @@ void defragOtherGlobals(void) {
|
||||
* that remain static for a long time */
|
||||
activeDefragSdsDict(evalScriptsDict(), DEFRAG_SDS_DICT_VAL_LUA_SCRIPT);
|
||||
moduleDefragGlobals();
|
||||
kvstoreDictLUTDefrag(server.pubsub_channels, dictDefragTables);
|
||||
kvstoreDictLUTDefrag(server.pubsubshard_channels, dictDefragTables);
|
||||
}
|
||||
|
||||
/* returns 0 more work may or may not be needed (see non-zero cursor),
|
||||
@ -944,12 +988,11 @@ void computeDefragCycles(void) {
|
||||
* This works in a similar way to activeExpireCycle, in the sense that
|
||||
* we do incremental work across calls. */
|
||||
void activeDefragCycle(void) {
|
||||
static defragCtx ctx;
|
||||
static int slot = -1;
|
||||
static int current_db = -1;
|
||||
static int defrag_later_item_in_progress = 0;
|
||||
static unsigned long cursor = 0;
|
||||
static unsigned long expires_cursor = 0;
|
||||
static int defrag_stage = 0;
|
||||
static unsigned long defrag_cursor = 0;
|
||||
static redisDb *db = NULL;
|
||||
static long long start_scan, start_stat;
|
||||
unsigned int iterations = 0;
|
||||
@ -957,6 +1000,7 @@ void activeDefragCycle(void) {
|
||||
unsigned long long prev_scanned = server.stat_active_defrag_scanned;
|
||||
long long start, timelimit, endtime;
|
||||
mstime_t latency;
|
||||
int all_stages_finished = 0;
|
||||
int quit = 0;
|
||||
|
||||
if (!server.active_defrag_enabled) {
|
||||
@ -969,8 +1013,8 @@ void activeDefragCycle(void) {
|
||||
defrag_later_current_key = NULL;
|
||||
defrag_later_cursor = 0;
|
||||
current_db = -1;
|
||||
cursor = 0;
|
||||
expires_cursor = 0;
|
||||
defrag_stage = 0;
|
||||
defrag_cursor = 0;
|
||||
slot = -1;
|
||||
defrag_later_item_in_progress = 0;
|
||||
db = NULL;
|
||||
@ -1008,7 +1052,7 @@ void activeDefragCycle(void) {
|
||||
dictDefragFunctions defragfns = {.defragAlloc = activeDefragAlloc};
|
||||
do {
|
||||
/* if we're not continuing a scan from the last call or loop, start a new one */
|
||||
if (!cursor && !expires_cursor && (slot < 0)) {
|
||||
if (!defrag_stage && !defrag_cursor && (slot < 0)) {
|
||||
/* finish any leftovers from previous db before moving to the next one */
|
||||
if (db && defragLaterStep(db, slot, endtime)) {
|
||||
quit = 1; /* time is up, we didn't finish all the work */
|
||||
@ -1029,12 +1073,11 @@ void activeDefragCycle(void) {
|
||||
|
||||
start_scan = now;
|
||||
current_db = -1;
|
||||
cursor = 0;
|
||||
expires_cursor = 0;
|
||||
defrag_stage = 0;
|
||||
defrag_cursor = 0;
|
||||
slot = -1;
|
||||
defrag_later_item_in_progress = 0;
|
||||
db = NULL;
|
||||
memset(&ctx, -1, sizeof(ctx));
|
||||
server.active_defrag_running = 0;
|
||||
|
||||
computeDefragCycles(); /* if another scan is needed, start it right away */
|
||||
@ -1049,16 +1092,33 @@ void activeDefragCycle(void) {
|
||||
}
|
||||
|
||||
db = &server.db[current_db];
|
||||
activeDefragKvstore(db->keys);
|
||||
activeDefragKvstore(db->expires);
|
||||
cursor = 0;
|
||||
expires_cursor = 0;
|
||||
slot = kvstoreFindDictIndexByKeyIndex(db->keys, 1);
|
||||
kvstoreDictLUTDefrag(db->keys, dictDefragTables);
|
||||
kvstoreDictLUTDefrag(db->expires, dictDefragTables);
|
||||
defrag_stage = 0;
|
||||
defrag_cursor = 0;
|
||||
slot = -1;
|
||||
defrag_later_item_in_progress = 0;
|
||||
ctx.db = db;
|
||||
ctx.slot = slot;
|
||||
}
|
||||
|
||||
/* This array of structures holds the parameters for all defragmentation stages. */
|
||||
typedef struct defragStage {
|
||||
kvstore *kvs;
|
||||
dictScanFunction *scanfn;
|
||||
void *privdata;
|
||||
} defragStage;
|
||||
defragStage defrag_stages[] = {
|
||||
{db->keys, defragScanCallback, db},
|
||||
{db->expires, scanCallbackCountScanned, NULL},
|
||||
{server.pubsub_channels, defragPubsubScanCallback,
|
||||
&(defragPubSubCtx){server.pubsub_channels, getClientPubSubChannels}},
|
||||
{server.pubsubshard_channels, defragPubsubScanCallback,
|
||||
&(defragPubSubCtx){server.pubsubshard_channels, getClientPubSubShardChannels}},
|
||||
};
|
||||
do {
|
||||
int num_stages = sizeof(defrag_stages) / sizeof(defrag_stages[0]);
|
||||
serverAssert(defrag_stage < num_stages);
|
||||
defragStage *current_stage = &defrag_stages[defrag_stage];
|
||||
|
||||
/* before scanning the next bucket, see if we have big keys left from the previous bucket to scan */
|
||||
if (defragLaterStep(db, slot, endtime)) {
|
||||
quit = 1; /* time is up, we didn't finish all the work */
|
||||
@ -1066,26 +1126,31 @@ void activeDefragCycle(void) {
|
||||
}
|
||||
|
||||
if (!defrag_later_item_in_progress) {
|
||||
/* Scan the keyspace dict unless we're scanning the expire dict. */
|
||||
if (!expires_cursor)
|
||||
cursor = kvstoreDictScanDefrag(db->keys, slot, cursor,
|
||||
defragScanCallback,
|
||||
&defragfns, &ctx);
|
||||
/* When done scanning the keyspace dict, we scan the expire dict. */
|
||||
if (!cursor)
|
||||
expires_cursor = kvstoreDictScanDefrag(db->expires, slot, expires_cursor,
|
||||
scanCallbackCountScanned,
|
||||
&defragfns, NULL);
|
||||
/* Continue defragmentation from the previous stage.
|
||||
* If slot is -1, it means this stage starts from the first non-empty slot. */
|
||||
if (slot == -1) slot = kvstoreGetFirstNonEmptyDictIndex(current_stage->kvs);
|
||||
defrag_cursor = kvstoreDictScanDefrag(current_stage->kvs, slot, defrag_cursor,
|
||||
current_stage->scanfn, &defragfns, &(defragCtx){current_stage->privdata, slot});
|
||||
}
|
||||
if (!(cursor || expires_cursor)) {
|
||||
|
||||
if (!defrag_cursor) {
|
||||
/* Move to the next slot only if regular and large item scanning has been completed. */
|
||||
if (listLength(db->defrag_later) > 0) {
|
||||
defrag_later_item_in_progress = 1;
|
||||
continue;
|
||||
}
|
||||
slot = kvstoreGetNextNonEmptyDictIndex(db->keys, slot);
|
||||
|
||||
/* Move to the next slot in the current stage. If we've reached the end, move to the next stage. */
|
||||
if ((slot = kvstoreGetNextNonEmptyDictIndex(current_stage->kvs, slot)) == -1)
|
||||
defrag_stage++;
|
||||
defrag_later_item_in_progress = 0;
|
||||
ctx.slot = slot;
|
||||
}
|
||||
|
||||
/* Check if all defragmentation stages have been processed.
|
||||
* If so, mark as finished and reset the stage counter to move on to next database. */
|
||||
if (defrag_stage == num_stages) {
|
||||
all_stages_finished = 1;
|
||||
defrag_stage = 0;
|
||||
}
|
||||
|
||||
/* Once in 16 scan iterations, 512 pointer reallocations. or 64 keys
|
||||
@ -1093,12 +1158,13 @@ void activeDefragCycle(void) {
|
||||
* check if we reached the time limit.
|
||||
* But regardless, don't start a new db in this loop, this is because after
|
||||
* the last db we call defragOtherGlobals, which must be done in one cycle */
|
||||
if ((!(cursor || expires_cursor) && slot == -1) ||
|
||||
if (all_stages_finished ||
|
||||
++iterations > 16 ||
|
||||
server.stat_active_defrag_hits - prev_defragged > 512 ||
|
||||
server.stat_active_defrag_scanned - prev_scanned > 64)
|
||||
{
|
||||
if (!cursor || ustime() > endtime) {
|
||||
/* Quit if all stages were finished or timeout. */
|
||||
if (all_stages_finished || ustime() > endtime) {
|
||||
quit = 1;
|
||||
break;
|
||||
}
|
||||
@ -1106,7 +1172,7 @@ void activeDefragCycle(void) {
|
||||
prev_defragged = server.stat_active_defrag_hits;
|
||||
prev_scanned = server.stat_active_defrag_scanned;
|
||||
}
|
||||
} while(((cursor || expires_cursor) || slot > 0) && !quit);
|
||||
} while(!all_stages_finished && !quit);
|
||||
} while(!quit);
|
||||
|
||||
latencyEndMonitor(latency);
|
||||
|
@ -525,8 +525,17 @@ int kvstoreFindDictIndexByKeyIndex(kvstore *kvs, unsigned long target) {
|
||||
return result;
|
||||
}
|
||||
|
||||
/* Wrapper for kvstoreFindDictIndexByKeyIndex to get the first non-empty dict index in the kvstore. */
|
||||
int kvstoreGetFirstNonEmptyDictIndex(kvstore *kvs) {
|
||||
return kvstoreFindDictIndexByKeyIndex(kvs, 1);
|
||||
}
|
||||
|
||||
/* Returns next non-empty dict index strictly after given one, or -1 if provided didx is the last one. */
|
||||
int kvstoreGetNextNonEmptyDictIndex(kvstore *kvs, int didx) {
|
||||
if (kvs->num_dicts == 1) {
|
||||
assert(didx == 0);
|
||||
return -1;
|
||||
}
|
||||
unsigned long long next_key = cumulativeKeyCountRead(kvs, didx) + 1;
|
||||
return next_key <= kvstoreSize(kvs) ? kvstoreFindDictIndexByKeyIndex(kvs, next_key) : -1;
|
||||
}
|
||||
@ -550,7 +559,7 @@ kvstoreIterator *kvstoreIteratorInit(kvstore *kvs) {
|
||||
kvstoreIterator *kvs_it = zmalloc(sizeof(*kvs_it));
|
||||
kvs_it->kvs = kvs;
|
||||
kvs_it->didx = -1;
|
||||
kvs_it->next_didx = kvstoreFindDictIndexByKeyIndex(kvs_it->kvs, 1); /* Finds first non-empty dict index. */
|
||||
kvs_it->next_didx = kvstoreGetFirstNonEmptyDictIndex(kvs_it->kvs); /* Finds first non-empty dict index. */
|
||||
dictInitSafeIterator(&kvs_it->di, NULL);
|
||||
return kvs_it;
|
||||
}
|
||||
@ -752,10 +761,11 @@ unsigned long kvstoreDictScanDefrag(kvstore *kvs, int didx, unsigned long v, dic
|
||||
* that callback can reallocate. */
|
||||
void kvstoreDictLUTDefrag(kvstore *kvs, kvstoreDictLUTDefragFunction *defragfn) {
|
||||
for (int didx = 0; didx < kvs->num_dicts; didx++) {
|
||||
dict **d = kvstoreGetDictRef(kvs, didx);
|
||||
dict **d = kvstoreGetDictRef(kvs, didx), *newd;
|
||||
if (!*d)
|
||||
continue;
|
||||
defragfn(d);
|
||||
if ((newd = defragfn(*d)))
|
||||
*d = newd;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -28,6 +28,7 @@ int kvstoreGetFairRandomDictIndex(kvstore *kvs);
|
||||
void kvstoreGetStats(kvstore *kvs, char *buf, size_t bufsize, int full);
|
||||
|
||||
int kvstoreFindDictIndexByKeyIndex(kvstore *kvs, unsigned long target);
|
||||
int kvstoreGetFirstNonEmptyDictIndex(kvstore *kvs);
|
||||
int kvstoreGetNextNonEmptyDictIndex(kvstore *kvs, int didx);
|
||||
int kvstoreNumNonEmptyDicts(kvstore *kvs);
|
||||
int kvstoreNumAllocatedDicts(kvstore *kvs);
|
||||
@ -60,7 +61,7 @@ dictEntry *kvstoreDictFindEntryByPtrAndHash(kvstore *kvs, int didx, const void *
|
||||
unsigned int kvstoreDictGetSomeKeys(kvstore *kvs, int didx, dictEntry **des, unsigned int count);
|
||||
int kvstoreDictExpand(kvstore *kvs, int didx, unsigned long size);
|
||||
unsigned long kvstoreDictScanDefrag(kvstore *kvs, int didx, unsigned long v, dictScanFunction *fn, dictDefragFunctions *defragfns, void *privdata);
|
||||
typedef void (kvstoreDictLUTDefragFunction)(dict **d);
|
||||
typedef dict *(kvstoreDictLUTDefragFunction)(dict *d);
|
||||
void kvstoreDictLUTDefrag(kvstore *kvs, kvstoreDictLUTDefragFunction *defragfn);
|
||||
void *kvstoreDictFetchValue(kvstore *kvs, int didx, const void *key);
|
||||
dictEntry *kvstoreDictFind(kvstore *kvs, int didx, void *key);
|
||||
|
@ -263,9 +263,9 @@ int pubsubSubscribeChannel(client *c, robj *channel, pubsubtype type) {
|
||||
unsigned int slot = 0;
|
||||
|
||||
/* Add the channel to the client -> channels hash table */
|
||||
if (dictAdd(type.clientPubSubChannels(c),channel,NULL) == DICT_OK) {
|
||||
void *position = dictFindPositionForInsert(type.clientPubSubChannels(c),channel,NULL);
|
||||
if (position) { /* Not yet subscribed to this channel */
|
||||
retval = 1;
|
||||
incrRefCount(channel);
|
||||
/* Add the client to the channel -> list of clients hash table */
|
||||
if (server.cluster_enabled && type.shard) {
|
||||
slot = getKeySlot(channel->ptr);
|
||||
@ -275,6 +275,7 @@ int pubsubSubscribeChannel(client *c, robj *channel, pubsubtype type) {
|
||||
|
||||
if (existing) {
|
||||
clients = dictGetVal(existing);
|
||||
channel = dictGetKey(existing);
|
||||
} else {
|
||||
clients = dictCreate(&clientDictType);
|
||||
kvstoreDictSetVal(*type.serverPubSubChannels, slot, de, clients);
|
||||
@ -282,6 +283,8 @@ int pubsubSubscribeChannel(client *c, robj *channel, pubsubtype type) {
|
||||
}
|
||||
|
||||
serverAssert(dictAdd(clients, c, NULL) != DICT_ERR);
|
||||
serverAssert(dictInsertAtPosition(type.clientPubSubChannels(c), channel, position));
|
||||
incrRefCount(channel);
|
||||
}
|
||||
/* Notify the client */
|
||||
addReplyPubsubSubscribed(c,channel,type);
|
||||
|
@ -3183,6 +3183,8 @@ int serverPubsubShardSubscriptionCount(void);
|
||||
size_t pubsubMemOverhead(client *c);
|
||||
void unmarkClientAsPubSub(client *c);
|
||||
int pubsubTotalSubscriptions(void);
|
||||
dict *getClientPubSubChannels(client *c);
|
||||
dict *getClientPubSubShardChannels(client *c);
|
||||
|
||||
/* Keyspace events notification */
|
||||
void notifyKeyspaceEvent(int type, char *event, robj *key, int dbid);
|
||||
|
@ -404,6 +404,105 @@ run_solo {defrag} {
|
||||
r save ;# saving an rdb iterates over all the data / pointers
|
||||
} {OK}
|
||||
|
||||
test "Active defrag pubsub: $type" {
|
||||
r flushdb
|
||||
r config resetstat
|
||||
r config set hz 100
|
||||
r config set activedefrag no
|
||||
r config set active-defrag-threshold-lower 5
|
||||
r config set active-defrag-cycle-min 65
|
||||
r config set active-defrag-cycle-max 75
|
||||
r config set active-defrag-ignore-bytes 1500kb
|
||||
r config set maxmemory 0
|
||||
|
||||
# Populate memory with interleaving pubsub-key pattern of same size
|
||||
set n 50000
|
||||
set dummy_channel "[string repeat x 400]"
|
||||
set rd [redis_deferring_client]
|
||||
set rd_pubsub [redis_deferring_client]
|
||||
for {set j 0} {$j < $n} {incr j} {
|
||||
set channel_name "$dummy_channel[format "%06d" $j]"
|
||||
$rd_pubsub subscribe $channel_name
|
||||
$rd_pubsub read ; # Discard subscribe replies
|
||||
$rd_pubsub ssubscribe $channel_name
|
||||
$rd_pubsub read ; # Discard ssubscribe replies
|
||||
$rd set k$j $channel_name
|
||||
$rd read ; # Discard set replies
|
||||
}
|
||||
|
||||
after 120 ;# serverCron only updates the info once in 100ms
|
||||
if {$::verbose} {
|
||||
puts "used [s allocator_allocated]"
|
||||
puts "rss [s allocator_active]"
|
||||
puts "frag [s allocator_frag_ratio]"
|
||||
puts "frag_bytes [s allocator_frag_bytes]"
|
||||
}
|
||||
assert_lessthan [s allocator_frag_ratio] 1.05
|
||||
|
||||
# Delete all the keys to create fragmentation
|
||||
for {set j 0} {$j < $n} {incr j} { $rd del k$j }
|
||||
for {set j 0} {$j < $n} {incr j} { $rd read } ; # Discard del replies
|
||||
$rd close
|
||||
after 120 ;# serverCron only updates the info once in 100ms
|
||||
if {$::verbose} {
|
||||
puts "used [s allocator_allocated]"
|
||||
puts "rss [s allocator_active]"
|
||||
puts "frag [s allocator_frag_ratio]"
|
||||
puts "frag_bytes [s allocator_frag_bytes]"
|
||||
}
|
||||
assert_morethan [s allocator_frag_ratio] 1.35
|
||||
|
||||
catch {r config set activedefrag yes} e
|
||||
if {[r config get activedefrag] eq "activedefrag yes"} {
|
||||
|
||||
# wait for the active defrag to start working (decision once a second)
|
||||
wait_for_condition 50 100 {
|
||||
[s total_active_defrag_time] ne 0
|
||||
} else {
|
||||
after 120 ;# serverCron only updates the info once in 100ms
|
||||
puts [r info memory]
|
||||
puts [r info stats]
|
||||
puts [r memory malloc-stats]
|
||||
fail "defrag not started."
|
||||
}
|
||||
|
||||
# wait for the active defrag to stop working
|
||||
wait_for_condition 500 100 {
|
||||
[s active_defrag_running] eq 0
|
||||
} else {
|
||||
after 120 ;# serverCron only updates the info once in 100ms
|
||||
puts [r info memory]
|
||||
puts [r memory malloc-stats]
|
||||
fail "defrag didn't stop."
|
||||
}
|
||||
|
||||
# test the fragmentation is lower
|
||||
after 120 ;# serverCron only updates the info once in 100ms
|
||||
if {$::verbose} {
|
||||
puts "used [s allocator_allocated]"
|
||||
puts "rss [s allocator_active]"
|
||||
puts "frag [s allocator_frag_ratio]"
|
||||
puts "frag_bytes [s allocator_frag_bytes]"
|
||||
}
|
||||
assert_lessthan_equal [s allocator_frag_ratio] 1.05
|
||||
}
|
||||
|
||||
# Publishes some message to all the pubsub clients to make sure that
|
||||
# we didn't break the data structure.
|
||||
for {set j 0} {$j < $n} {incr j} {
|
||||
set channel "$dummy_channel[format "%06d" $j]"
|
||||
r publish $channel "hello"
|
||||
assert_equal "message $channel hello" [$rd_pubsub read]
|
||||
$rd_pubsub unsubscribe $channel
|
||||
$rd_pubsub read
|
||||
r spublish $channel "hello"
|
||||
assert_equal "smessage $channel hello" [$rd_pubsub read]
|
||||
$rd_pubsub sunsubscribe $channel
|
||||
$rd_pubsub read
|
||||
}
|
||||
$rd_pubsub close
|
||||
}
|
||||
|
||||
if {$type eq "standalone"} { ;# skip in cluster mode
|
||||
test "Active defrag big list: $type" {
|
||||
r flushdb
|
||||
|
Loading…
Reference in New Issue
Block a user