mirror of
https://codeberg.org/redict/redict.git
synced 2025-01-22 08:08:53 -05:00
Unified db rehash method for both standalone and cluster (#12848)
After #11695, we added two functions `rehashingStarted` and `rehashingCompleted` to the dict structure. We also registered two handlers for the main database's dict and expire structures. This allows the main database to record the dict in `rehashing` list when rehashing starts. Later, in `serverCron`, the `incrementallyRehash` function is continuously called to perform the rehashing operation. However, currently, when rehashing is completed, `rehashingCompleted` does not remove the dict from the `rehashing` list. This results in the `rehashing` list containing many invalid dicts. Although subsequent cron checks and removes dicts that don't require rehashing, it is still inefficient. This PR implements the functionality to remove the dict from the `rehashing` list in `rehashingCompleted`. This is achieved by adding `metadata` to the dict structure, which keeps track of its position in the `rehashing` list, allowing for quick removal. This approach avoids storing duplicate dicts in the `rehashing` list. Additionally, there are other modifications: 1. Whether in standalone or cluster mode, the dict in database is inserted into the rehashing linked list when rehashing starts. This eliminates the need to distinguish between standalone and cluster mode in `incrementallyRehash`. The function only needs to focus on the dicts in the `rehashing` list that require rehashing. 2. `rehashing` list is moved from per-database to Redis server level. This decouples `incrementallyRehash` from the database ID, and in standalone mode, there is no need to iterate over all databases, avoiding unnecessary access to databases that do not require rehashing. In the future, even if unsharded-cluster mode supports multiple databases, there will be no risk involved. 3. The insertion and removal operations of dict structures in the `rehashing` list are decoupled from `activerehashing` config. `activerehashing` only controls whether `incrementallyRehash` is executed in serverCron. There is no need for additional steps when modifying the `activerehashing` switch, as in #12705.
This commit is contained in:
parent
967fb3c6e8
commit
d8a21c5767
22
src/db.c
22
src/db.c
@ -669,9 +669,21 @@ long long emptyDbStructure(redisDb *dbarray, int dbnum, int async,
|
||||
if (async) {
|
||||
emptyDbAsync(&dbarray[j]);
|
||||
} else {
|
||||
dbDictMetadata *metadata;
|
||||
for (int k = 0; k < dbarray[j].dict_count; k++) {
|
||||
dictEmpty(dbarray[j].dict[k],callback);
|
||||
metadata = (dbDictMetadata *)dictMetadata(dbarray[j].dict[k]);
|
||||
if (metadata->rehashing_node) {
|
||||
listDelNode(server.rehashing, metadata->rehashing_node);
|
||||
metadata->rehashing_node = NULL;
|
||||
}
|
||||
|
||||
dictEmpty(dbarray[j].expires[k],callback);
|
||||
metadata = (dbDictMetadata *)dictMetadata(dbarray[j].expires[k]);
|
||||
if (metadata->rehashing_node) {
|
||||
listDelNode(server.rehashing, metadata->rehashing_node);
|
||||
metadata->rehashing_node = NULL;
|
||||
}
|
||||
}
|
||||
}
|
||||
/* Because all keys of database are removed, reset average ttl. */
|
||||
@ -682,8 +694,6 @@ long long emptyDbStructure(redisDb *dbarray, int dbnum, int async,
|
||||
dbarray[j].sub_dict[subdict].key_count = 0;
|
||||
dbarray[j].sub_dict[subdict].resize_cursor = -1;
|
||||
if (server.cluster_enabled) {
|
||||
if (dbarray[j].sub_dict[subdict].rehashing)
|
||||
listEmpty(dbarray[j].sub_dict[subdict].rehashing);
|
||||
dbarray[j].sub_dict[subdict].bucket_count = 0;
|
||||
unsigned long long *slot_size_index = dbarray[j].sub_dict[subdict].slot_size_index;
|
||||
memset(slot_size_index, 0, sizeof(unsigned long long) * (CLUSTER_SLOTS + 1));
|
||||
@ -757,7 +767,6 @@ redisDb *initTempDb(void) {
|
||||
tempDb[i].dict = dictCreateMultiple(&dbDictType, tempDb[i].dict_count);
|
||||
tempDb[i].expires = dictCreateMultiple(&dbExpiresDictType, tempDb[i].dict_count);
|
||||
for (dbKeyType subdict = DB_MAIN; subdict <= DB_EXPIRES; subdict++) {
|
||||
tempDb[i].sub_dict[subdict].rehashing = listCreate();
|
||||
tempDb[i].sub_dict[subdict].slot_size_index = server.cluster_enabled ? zcalloc(sizeof(unsigned long long) * (CLUSTER_SLOTS + 1)) : NULL;
|
||||
}
|
||||
}
|
||||
@ -779,7 +788,6 @@ void discardTempDb(redisDb *tempDb, void(callback)(dict*)) {
|
||||
zfree(tempDb[i].dict);
|
||||
zfree(tempDb[i].expires);
|
||||
for (dbKeyType subdict = DB_MAIN; subdict <= DB_EXPIRES; subdict++) {
|
||||
listRelease(tempDb[i].sub_dict[subdict].rehashing);
|
||||
if (server.cluster_enabled) {
|
||||
zfree(tempDb[i].sub_dict[subdict].slot_size_index);
|
||||
}
|
||||
@ -1445,7 +1453,7 @@ size_t dbMemUsage(redisDb *db, dbKeyType keyType) {
|
||||
unsigned long long keys_count = dbSize(db, keyType);
|
||||
mem += keys_count * dictEntryMemUsage() +
|
||||
dbBuckets(db, keyType) * sizeof(dictEntry*) +
|
||||
db->dict_count * sizeof(dict);
|
||||
db->dict_count * (sizeof(dict) + dictMetadataSize(db->dict[0]));
|
||||
if (keyType == DB_MAIN) {
|
||||
mem+=keys_count * sizeof(robj);
|
||||
}
|
||||
@ -1890,7 +1898,6 @@ int dbSwapDatabases(int id1, int id2) {
|
||||
db1->expires_cursor = db2->expires_cursor;
|
||||
db1->dict_count = db2->dict_count;
|
||||
for (dbKeyType subdict = DB_MAIN; subdict <= DB_EXPIRES; subdict++) {
|
||||
db1->sub_dict[subdict].rehashing = db2->sub_dict[subdict].rehashing;
|
||||
db1->sub_dict[subdict].key_count = db2->sub_dict[subdict].key_count;
|
||||
db1->sub_dict[subdict].bucket_count = db2->sub_dict[subdict].bucket_count;
|
||||
db1->sub_dict[subdict].non_empty_slots = db2->sub_dict[subdict].non_empty_slots;
|
||||
@ -1904,7 +1911,6 @@ int dbSwapDatabases(int id1, int id2) {
|
||||
db2->expires_cursor = aux.expires_cursor;
|
||||
db2->dict_count = aux.dict_count;
|
||||
for (dbKeyType subdict = DB_MAIN; subdict <= DB_EXPIRES; subdict++) {
|
||||
db2->sub_dict[subdict].rehashing = aux.sub_dict[subdict].rehashing;
|
||||
db2->sub_dict[subdict].key_count = aux.sub_dict[subdict].key_count;
|
||||
db2->sub_dict[subdict].bucket_count = aux.sub_dict[subdict].bucket_count;
|
||||
db2->sub_dict[subdict].non_empty_slots = aux.sub_dict[subdict].non_empty_slots;
|
||||
@ -1950,7 +1956,6 @@ void swapMainDbWithTempDb(redisDb *tempDb) {
|
||||
activedb->expires_cursor = newdb->expires_cursor;
|
||||
activedb->dict_count = newdb->dict_count;
|
||||
for (dbKeyType subdict = DB_MAIN; subdict <= DB_EXPIRES; subdict++) {
|
||||
activedb->sub_dict[subdict].rehashing = newdb->sub_dict[subdict].rehashing;
|
||||
activedb->sub_dict[subdict].key_count = newdb->sub_dict[subdict].key_count;
|
||||
activedb->sub_dict[subdict].bucket_count = newdb->sub_dict[subdict].bucket_count;
|
||||
activedb->sub_dict[subdict].non_empty_slots = newdb->sub_dict[subdict].non_empty_slots;
|
||||
@ -1964,7 +1969,6 @@ void swapMainDbWithTempDb(redisDb *tempDb) {
|
||||
newdb->expires_cursor = aux.expires_cursor;
|
||||
newdb->dict_count = aux.dict_count;
|
||||
for (dbKeyType subdict = DB_MAIN; subdict <= DB_EXPIRES; subdict++) {
|
||||
newdb->sub_dict[subdict].rehashing = aux.sub_dict[subdict].rehashing;
|
||||
newdb->sub_dict[subdict].key_count = aux.sub_dict[subdict].key_count;
|
||||
newdb->sub_dict[subdict].bucket_count = aux.sub_dict[subdict].bucket_count;
|
||||
newdb->sub_dict[subdict].non_empty_slots = aux.sub_dict[subdict].non_empty_slots;
|
||||
|
16
src/dict.c
16
src/dict.c
@ -181,7 +181,11 @@ static void _dictReset(dict *d, int htidx)
|
||||
/* Create a new hash table */
|
||||
dict *dictCreate(dictType *type)
|
||||
{
|
||||
dict *d = zmalloc(sizeof(*d));
|
||||
size_t metasize = type->dictMetadataBytes ? type->dictMetadataBytes(NULL) : 0;
|
||||
dict *d = zmalloc(sizeof(*d)+metasize);
|
||||
if (metasize > 0) {
|
||||
memset(dictMetadata(d), 0, metasize);
|
||||
}
|
||||
_dictInit(d,type);
|
||||
return d;
|
||||
}
|
||||
@ -399,10 +403,10 @@ long long timeInMilliseconds(void) {
|
||||
return (((long long)tv.tv_sec)*1000)+(tv.tv_usec/1000);
|
||||
}
|
||||
|
||||
/* Rehash in ms+"delta" milliseconds. The value of "delta" is larger
|
||||
* than 0, and is smaller than 1 in most cases. The exact upper bound
|
||||
/* Rehash in us+"delta" microseconds. The value of "delta" is larger
|
||||
* than 0, and is smaller than 1000 in most cases. The exact upper bound
|
||||
* depends on the running time of dictRehash(d,100).*/
|
||||
int dictRehashMilliseconds(dict *d, unsigned int ms) {
|
||||
int dictRehashMicroseconds(dict *d, uint64_t us) {
|
||||
if (d->pauserehash > 0) return 0;
|
||||
|
||||
monotime timer;
|
||||
@ -411,7 +415,7 @@ int dictRehashMilliseconds(dict *d, unsigned int ms) {
|
||||
|
||||
while(dictRehash(d,100)) {
|
||||
rehashes += 100;
|
||||
if (elapsedMs(timer) >= ms) break;
|
||||
if (elapsedUs(timer) >= us) break;
|
||||
}
|
||||
return rehashes;
|
||||
}
|
||||
@ -1714,7 +1718,7 @@ int dictTest(int argc, char **argv, int flags) {
|
||||
|
||||
/* Wait for rehashing. */
|
||||
while (dictIsRehashing(dict)) {
|
||||
dictRehashMilliseconds(dict,100);
|
||||
dictRehashMicroseconds(dict,100*1000);
|
||||
}
|
||||
|
||||
start_benchmark();
|
||||
|
11
src/dict.h
11
src/dict.h
@ -60,6 +60,9 @@ typedef struct dictType {
|
||||
/* Invoked at the end of dict initialization/rehashing of all the entries from old to new ht. Both ht still exists
|
||||
* and are cleaned up after this callback. */
|
||||
void (*rehashingCompleted)(dict *d);
|
||||
/* Allow a dict to carry extra caller-defined metadata. The
|
||||
* extra memory is initialized to 0 when a dict is allocated. */
|
||||
size_t (*dictMetadataBytes)(dict *d);
|
||||
/* Flags */
|
||||
/* The 'no_value' flag, if set, indicates that values are not used, i.e. the
|
||||
* dict is a set. When this flag is set, it's not possible to access the
|
||||
@ -88,6 +91,7 @@ struct dict {
|
||||
/* Keep small vars at end for optimal (minimal) struct padding */
|
||||
int16_t pauserehash; /* If >0 rehashing is paused (<0 indicates coding error) */
|
||||
signed char ht_size_exp[2]; /* exponent of size. (size = 1<<exp) */
|
||||
void *metadata[];
|
||||
};
|
||||
|
||||
/* If safe is set to 1 this is a safe iterator, that means, you can call
|
||||
@ -140,6 +144,10 @@ typedef struct {
|
||||
(d)->type->keyCompare((d), key1, key2) : \
|
||||
(key1) == (key2))
|
||||
|
||||
#define dictMetadata(d) (&(d)->metadata)
|
||||
#define dictMetadataSize(d) ((d)->type->dictMetadataBytes \
|
||||
? (d)->type->dictMetadataBytes(d) : 0)
|
||||
|
||||
#define dictHashKey(d, key) ((d)->type->hashFunction(key))
|
||||
#define dictBuckets(d) (DICTHT_SIZE((d)->ht_size_exp[0])+DICTHT_SIZE((d)->ht_size_exp[1]))
|
||||
#define dictSize(d) ((d)->ht_used[0]+(d)->ht_used[1])
|
||||
@ -166,7 +174,6 @@ dict *dictCreate(dictType *type);
|
||||
dict **dictCreateMultiple(dictType *type, int count);
|
||||
int dictExpand(dict *d, unsigned long size);
|
||||
int dictTryExpand(dict *d, unsigned long size);
|
||||
void *dictMetadata(dict *d);
|
||||
int dictAdd(dict *d, void *key, void *val);
|
||||
dictEntry *dictAddRaw(dict *d, void *key, dictEntry **existing);
|
||||
void *dictFindPositionForInsert(dict *d, const void *key, dictEntry **existing);
|
||||
@ -215,7 +222,7 @@ uint64_t dictGenCaseHashFunction(const unsigned char *buf, size_t len);
|
||||
void dictEmpty(dict *d, void(callback)(dict*));
|
||||
void dictSetResizeEnabled(dictResizeEnable enable);
|
||||
int dictRehash(dict *d, int n);
|
||||
int dictRehashMilliseconds(dict *d, unsigned int ms);
|
||||
int dictRehashMicroseconds(dict *d, uint64_t us);
|
||||
void dictSetHashFunctionSeed(uint8_t *seed);
|
||||
uint8_t *dictGetHashFunctionSeed(void);
|
||||
unsigned long dictScan(dict *d, unsigned long v, dictScanFunction *fn, void *privdata);
|
||||
|
@ -179,6 +179,20 @@ void freeObjAsync(robj *key, robj *obj, int dbid) {
|
||||
* create a new empty set of hash tables and scheduling the old ones for
|
||||
* lazy freeing. */
|
||||
void emptyDbAsync(redisDb *db) {
|
||||
dbDictMetadata *metadata;
|
||||
for (int i = 0; i < db->dict_count; i++) {
|
||||
metadata = (dbDictMetadata *)dictMetadata(db->dict[i]);
|
||||
if (metadata->rehashing_node) {
|
||||
listDelNode(server.rehashing, metadata->rehashing_node);
|
||||
metadata->rehashing_node = NULL;
|
||||
}
|
||||
|
||||
metadata = (dbDictMetadata *)dictMetadata(db->expires[i]);
|
||||
if (metadata->rehashing_node) {
|
||||
listDelNode(server.rehashing, metadata->rehashing_node);
|
||||
metadata->rehashing_node = NULL;
|
||||
}
|
||||
}
|
||||
dict **oldDict = db->dict;
|
||||
dict **oldExpires = db->expires;
|
||||
atomicIncr(lazyfree_objects,dbSize(db, DB_MAIN));
|
||||
|
155
src/server.c
155
src/server.c
@ -419,52 +419,61 @@ int dictExpandAllowed(size_t moreMem, double usedRatio) {
|
||||
}
|
||||
}
|
||||
|
||||
/* Updates the bucket count in cluster-mode for the given dictionary in a DB. bucket count
|
||||
* incremented with the new ht size during the rehashing phase.
|
||||
* And also adds dictionary to the rehashing list in cluster mode, which allows us
|
||||
/* Adds dictionary to the rehashing list, which allows us
|
||||
* to quickly find rehash targets during incremental rehashing.
|
||||
*
|
||||
* In non-cluster mode, bucket count can be retrieved directly from single dict bucket and
|
||||
* we don't need this list as there is only one dictionary per DB. */
|
||||
void dictRehashingStarted(dict *d) {
|
||||
if (!server.cluster_enabled) return;
|
||||
*
|
||||
* Updates the bucket count in cluster-mode for the given dictionary in a DB, bucket count
|
||||
* incremented with the new ht size during the rehashing phase. In non-cluster mode,
|
||||
* bucket count can be retrieved directly from single dict bucket. */
|
||||
void dictRehashingStarted(dict *d, dbKeyType keyType) {
|
||||
dbDictMetadata *metadata = (dbDictMetadata *)dictMetadata(d);
|
||||
listAddNodeTail(server.rehashing, d);
|
||||
metadata->rehashing_node = listLast(server.rehashing);
|
||||
|
||||
if (!server.cluster_enabled) return;
|
||||
unsigned long long from, to;
|
||||
dictRehashingInfo(d, &from, &to);
|
||||
server.db[0].sub_dict[DB_MAIN].bucket_count += to; /* Started rehashing (Add the new ht size) */
|
||||
if (from == 0) return; /* No entries are to be moved. */
|
||||
if (server.activerehashing) {
|
||||
listAddNodeTail(server.db[0].sub_dict[DB_MAIN].rehashing, d);
|
||||
}
|
||||
server.db[0].sub_dict[keyType].bucket_count += to; /* Started rehashing (Add the new ht size) */
|
||||
}
|
||||
|
||||
/* Updates the bucket count for the given dictionary in a DB. It removes
|
||||
/* Remove dictionary from the rehashing list.
|
||||
*
|
||||
* Updates the bucket count for the given dictionary in a DB. It removes
|
||||
* the old ht size of the dictionary from the total sum of buckets for a DB. */
|
||||
void dictRehashingCompleted(dict *d) {
|
||||
if (!server.cluster_enabled) return;
|
||||
unsigned long long from, to;
|
||||
dictRehashingInfo(d, &from, &to);
|
||||
server.db[0].sub_dict[DB_MAIN].bucket_count -= from; /* Finished rehashing (Remove the old ht size) */
|
||||
}
|
||||
|
||||
void dictRehashingStartedForExpires(dict *d) {
|
||||
if (!server.cluster_enabled) return;
|
||||
|
||||
unsigned long long from, to;
|
||||
dictRehashingInfo(d, &from, &to);
|
||||
server.db[0].sub_dict[DB_EXPIRES].bucket_count += to; /* Started rehashing (Add the new ht size) */
|
||||
if (from == 0) return; /* No entries are to be moved. */
|
||||
if (server.activerehashing) {
|
||||
listAddNodeTail(server.db[0].sub_dict[DB_EXPIRES].rehashing, d);
|
||||
void dictRehashingCompleted(dict *d, dbKeyType keyType) {
|
||||
dbDictMetadata *metadata = (dbDictMetadata *)dictMetadata(d);
|
||||
if (metadata->rehashing_node) {
|
||||
listDelNode(server.rehashing, metadata->rehashing_node);
|
||||
metadata->rehashing_node = NULL;
|
||||
}
|
||||
}
|
||||
|
||||
void dictRehashingCompletedForExpires(dict *d) {
|
||||
if (!server.cluster_enabled) return;
|
||||
|
||||
unsigned long long from, to;
|
||||
dictRehashingInfo(d, &from, &to);
|
||||
server.db[0].sub_dict[DB_EXPIRES].bucket_count -= from; /* Finished rehashing (Remove the old ht size) */
|
||||
server.db[0].sub_dict[keyType].bucket_count -= from; /* Finished rehashing (Remove the old ht size) */
|
||||
}
|
||||
|
||||
void dbDictRehashingStarted(dict *d) {
|
||||
dictRehashingStarted(d, DB_MAIN);
|
||||
}
|
||||
|
||||
void dbDictRehashingCompleted(dict *d) {
|
||||
dictRehashingCompleted(d, DB_MAIN);
|
||||
}
|
||||
|
||||
void dbExpiresRehashingStarted(dict *d) {
|
||||
dictRehashingStarted(d, DB_EXPIRES);
|
||||
}
|
||||
|
||||
void dbExpiresRehashingCompleted(dict *d) {
|
||||
dictRehashingCompleted(d, DB_EXPIRES);
|
||||
}
|
||||
|
||||
/* Returns the size of the DB dict metadata in bytes. */
|
||||
size_t dbDictMetadataSize(dict *d) {
|
||||
UNUSED(d);
|
||||
/* NOTICE: this also affects overhead_ht_main and overhead_ht_expires in getMemoryOverheadData. */
|
||||
return sizeof(dbDictMetadata);
|
||||
}
|
||||
|
||||
/* Generic hash table type where keys are Redis Objects, Values
|
||||
@ -522,8 +531,9 @@ dictType dbDictType = {
|
||||
dictSdsDestructor, /* key destructor */
|
||||
dictObjectDestructor, /* val destructor */
|
||||
dictExpandAllowed, /* allow to expand */
|
||||
dictRehashingStarted,
|
||||
dictRehashingCompleted,
|
||||
dbDictRehashingStarted,
|
||||
dbDictRehashingCompleted,
|
||||
dbDictMetadataSize,
|
||||
};
|
||||
|
||||
/* Db->expires */
|
||||
@ -535,8 +545,9 @@ dictType dbExpiresDictType = {
|
||||
NULL, /* key destructor */
|
||||
NULL, /* val destructor */
|
||||
dictExpandAllowed, /* allow to expand */
|
||||
dictRehashingStartedForExpires,
|
||||
dictRehashingCompletedForExpires,
|
||||
dbExpiresRehashingStarted,
|
||||
dbExpiresRehashingCompleted,
|
||||
dbDictMetadataSize,
|
||||
};
|
||||
|
||||
/* Command table. sds string -> command struct pointer. */
|
||||
@ -683,45 +694,23 @@ void tryResizeHashTables(int dbid) {
|
||||
*
|
||||
* The function returns 1 if some rehashing was performed, otherwise 0
|
||||
* is returned. */
|
||||
int incrementallyRehash(int dbid) {
|
||||
/* Rehash main and expire dictionary . */
|
||||
if (server.cluster_enabled) {
|
||||
listNode *node, *nextNode;
|
||||
monotime timer;
|
||||
elapsedStart(&timer);
|
||||
/* Our goal is to rehash as many slot specific dictionaries as we can before reaching predefined threshold,
|
||||
* while removing those that already finished rehashing from the queue. */
|
||||
for (dbKeyType subdict = DB_MAIN; subdict <= DB_EXPIRES; subdict++) {
|
||||
serverLog(LL_DEBUG,"Rehashing list length: %lu", listLength(server.db[dbid].sub_dict[subdict].rehashing));
|
||||
while ((node = listFirst(server.db[dbid].sub_dict[subdict].rehashing))) {
|
||||
if (dictIsRehashing((dict *) listNodeValue(node))) {
|
||||
dictRehashMilliseconds(listNodeValue(node), INCREMENTAL_REHASHING_THRESHOLD_MS);
|
||||
if (elapsedMs(timer) >= INCREMENTAL_REHASHING_THRESHOLD_MS) {
|
||||
return 1; /* Reached the time limit. */
|
||||
}
|
||||
} else { /* It is possible that rehashing has already completed for this dictionary, simply remove it from the queue. */
|
||||
nextNode = listNextNode(node);
|
||||
listDelNode(server.db[dbid].sub_dict[subdict].rehashing, node);
|
||||
node = nextNode;
|
||||
}
|
||||
}
|
||||
}
|
||||
/* When cluster mode is disabled, only one dict is used for the entire DB and rehashing list isn't populated. */
|
||||
} else {
|
||||
/* Rehash main dict. */
|
||||
dict *main_dict = server.db[dbid].dict[0];
|
||||
if (dictIsRehashing(main_dict)) {
|
||||
dictRehashMilliseconds(main_dict, INCREMENTAL_REHASHING_THRESHOLD_MS);
|
||||
return 1; /* already used our millisecond for this loop... */
|
||||
}
|
||||
/* Rehash expires. */
|
||||
dict *expires_dict = server.db[dbid].expires[0];
|
||||
if (dictIsRehashing(expires_dict)) {
|
||||
dictRehashMilliseconds(expires_dict, INCREMENTAL_REHASHING_THRESHOLD_MS);
|
||||
return 1; /* already used our millisecond for this loop... */
|
||||
int incrementallyRehash(void) {
|
||||
if (listLength(server.rehashing) == 0) return 0;
|
||||
serverLog(LL_DEBUG,"Rehashing list length: %lu", listLength(server.rehashing));
|
||||
|
||||
/* Our goal is to rehash as many dictionaries as we can before reaching predefined threshold,
|
||||
* after each dictionary completes rehashing, it removes itself from the list. */
|
||||
listNode *node;
|
||||
monotime timer;
|
||||
elapsedStart(&timer);
|
||||
while ((node = listFirst(server.rehashing))) {
|
||||
uint64_t elapsed_us = elapsedUs(timer);
|
||||
if (elapsed_us >= INCREMENTAL_REHASHING_THRESHOLD_US) {
|
||||
break; /* Reached the time limit. */
|
||||
}
|
||||
dictRehashMicroseconds(listNodeValue(node), INCREMENTAL_REHASHING_THRESHOLD_US - elapsed_us);
|
||||
}
|
||||
return 0;
|
||||
return 1;
|
||||
}
|
||||
|
||||
/* This function is called once a background process of some kind terminates,
|
||||
@ -1162,7 +1151,6 @@ void databasesCron(void) {
|
||||
* DB we'll be able to start from the successive in the next
|
||||
* cron loop iteration. */
|
||||
static unsigned int resize_db = 0;
|
||||
static unsigned int rehash_db = 0;
|
||||
int dbs_per_call = CRON_DBS_PER_CALL;
|
||||
int j;
|
||||
|
||||
@ -1177,18 +1165,7 @@ void databasesCron(void) {
|
||||
|
||||
/* Rehash */
|
||||
if (server.activerehashing) {
|
||||
for (j = 0; j < dbs_per_call; j++) {
|
||||
int work_done = incrementallyRehash(rehash_db);
|
||||
if (work_done) {
|
||||
/* If the function did some work, stop here, we'll do
|
||||
* more at the next cron loop. */
|
||||
break;
|
||||
} else {
|
||||
/* If this db didn't need rehash, we'll try the next one. */
|
||||
rehash_db++;
|
||||
rehash_db %= server.dbnum;
|
||||
}
|
||||
}
|
||||
incrementallyRehash();
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -2654,7 +2631,6 @@ void makeThreadKillable(void) {
|
||||
/* When adding fields, please check the initTempDb related logic. */
|
||||
void initDbState(redisDb *db){
|
||||
for (dbKeyType subdict = DB_MAIN; subdict <= DB_EXPIRES; subdict++) {
|
||||
db->sub_dict[subdict].rehashing = listCreate();
|
||||
db->sub_dict[subdict].non_empty_slots = 0;
|
||||
db->sub_dict[subdict].key_count = 0;
|
||||
db->sub_dict[subdict].resize_cursor = -1;
|
||||
@ -2754,6 +2730,7 @@ void initServer(void) {
|
||||
initDbState(&server.db[j]);
|
||||
listSetFreeMethod(server.db[j].defrag_later,(void (*)(void*))sdsfree);
|
||||
}
|
||||
server.rehashing = listCreate();
|
||||
evictionPoolAlloc(); /* Initialize the LRU keys pool. */
|
||||
server.pubsub_channels = dictCreate(&keylistDictType);
|
||||
server.pubsub_patterns = dictCreate(&keylistDictType);
|
||||
|
@ -137,7 +137,7 @@ struct hdr_histogram;
|
||||
#define CONFIG_BINDADDR_MAX 16
|
||||
#define CONFIG_MIN_RESERVED_FDS 32
|
||||
#define CONFIG_DEFAULT_PROC_TITLE_TEMPLATE "{title} {listen-addr} {server-mode}"
|
||||
#define INCREMENTAL_REHASHING_THRESHOLD_MS 1
|
||||
#define INCREMENTAL_REHASHING_THRESHOLD_US 1000
|
||||
|
||||
/* Bucket sizes for client eviction pools. Each bucket stores clients with
|
||||
* memory usage of up to twice the size of the bucket below it. */
|
||||
@ -971,7 +971,6 @@ typedef struct replBufBlock {
|
||||
|
||||
/* When adding fields, please check the swap db related logic. */
|
||||
typedef struct dbDictState {
|
||||
list *rehashing; /* List of dictionaries in this DB that are currently rehashing. */
|
||||
int resize_cursor; /* Cron job uses this cursor to gradually resize dictionaries (only used for cluster-enabled). */
|
||||
int non_empty_slots; /* The number of non-empty slots. */
|
||||
unsigned long long key_count; /* Total number of keys in this DB. */
|
||||
@ -984,6 +983,11 @@ typedef enum dbKeyType {
|
||||
DB_EXPIRES
|
||||
} dbKeyType;
|
||||
|
||||
/* Dict metadata for database, used for record the position in rehashing list. */
|
||||
typedef struct dbDictMetadata {
|
||||
listNode *rehashing_node; /* list node in rehashing list */
|
||||
} dbDictMetadata;
|
||||
|
||||
/* Redis database representation. There are multiple databases identified
|
||||
* by integers from 0 (the default database) up to the max configured
|
||||
* database. The database number is the 'id' field in the structure. */
|
||||
@ -1569,6 +1573,7 @@ struct redisServer {
|
||||
int hz; /* serverCron() calls frequency in hertz */
|
||||
int in_fork_child; /* indication that this is a fork child */
|
||||
redisDb *db;
|
||||
list *rehashing; /* List of dictionaries in DBs that are currently rehashing. */
|
||||
dict *commands; /* Command table */
|
||||
dict *orig_commands; /* Command table before command renaming. */
|
||||
aeEventLoop *el;
|
||||
|
Loading…
Reference in New Issue
Block a user