diff --git a/src/db.c b/src/db.c index 5ab8d32c1..50d6bd460 100644 --- a/src/db.c +++ b/src/db.c @@ -669,9 +669,21 @@ long long emptyDbStructure(redisDb *dbarray, int dbnum, int async, if (async) { emptyDbAsync(&dbarray[j]); } else { + dbDictMetadata *metadata; for (int k = 0; k < dbarray[j].dict_count; k++) { dictEmpty(dbarray[j].dict[k],callback); + metadata = (dbDictMetadata *)dictMetadata(dbarray[j].dict[k]); + if (metadata->rehashing_node) { + listDelNode(server.rehashing, metadata->rehashing_node); + metadata->rehashing_node = NULL; + } + dictEmpty(dbarray[j].expires[k],callback); + metadata = (dbDictMetadata *)dictMetadata(dbarray[j].expires[k]); + if (metadata->rehashing_node) { + listDelNode(server.rehashing, metadata->rehashing_node); + metadata->rehashing_node = NULL; + } } } /* Because all keys of database are removed, reset average ttl. */ @@ -682,8 +694,6 @@ long long emptyDbStructure(redisDb *dbarray, int dbnum, int async, dbarray[j].sub_dict[subdict].key_count = 0; dbarray[j].sub_dict[subdict].resize_cursor = -1; if (server.cluster_enabled) { - if (dbarray[j].sub_dict[subdict].rehashing) - listEmpty(dbarray[j].sub_dict[subdict].rehashing); dbarray[j].sub_dict[subdict].bucket_count = 0; unsigned long long *slot_size_index = dbarray[j].sub_dict[subdict].slot_size_index; memset(slot_size_index, 0, sizeof(unsigned long long) * (CLUSTER_SLOTS + 1)); @@ -757,7 +767,6 @@ redisDb *initTempDb(void) { tempDb[i].dict = dictCreateMultiple(&dbDictType, tempDb[i].dict_count); tempDb[i].expires = dictCreateMultiple(&dbExpiresDictType, tempDb[i].dict_count); for (dbKeyType subdict = DB_MAIN; subdict <= DB_EXPIRES; subdict++) { - tempDb[i].sub_dict[subdict].rehashing = listCreate(); tempDb[i].sub_dict[subdict].slot_size_index = server.cluster_enabled ? zcalloc(sizeof(unsigned long long) * (CLUSTER_SLOTS + 1)) : NULL; } } @@ -779,7 +788,6 @@ void discardTempDb(redisDb *tempDb, void(callback)(dict*)) { zfree(tempDb[i].dict); zfree(tempDb[i].expires); for (dbKeyType subdict = DB_MAIN; subdict <= DB_EXPIRES; subdict++) { - listRelease(tempDb[i].sub_dict[subdict].rehashing); if (server.cluster_enabled) { zfree(tempDb[i].sub_dict[subdict].slot_size_index); } @@ -1445,7 +1453,7 @@ size_t dbMemUsage(redisDb *db, dbKeyType keyType) { unsigned long long keys_count = dbSize(db, keyType); mem += keys_count * dictEntryMemUsage() + dbBuckets(db, keyType) * sizeof(dictEntry*) + - db->dict_count * sizeof(dict); + db->dict_count * (sizeof(dict) + dictMetadataSize(db->dict[0])); if (keyType == DB_MAIN) { mem+=keys_count * sizeof(robj); } @@ -1890,7 +1898,6 @@ int dbSwapDatabases(int id1, int id2) { db1->expires_cursor = db2->expires_cursor; db1->dict_count = db2->dict_count; for (dbKeyType subdict = DB_MAIN; subdict <= DB_EXPIRES; subdict++) { - db1->sub_dict[subdict].rehashing = db2->sub_dict[subdict].rehashing; db1->sub_dict[subdict].key_count = db2->sub_dict[subdict].key_count; db1->sub_dict[subdict].bucket_count = db2->sub_dict[subdict].bucket_count; db1->sub_dict[subdict].non_empty_slots = db2->sub_dict[subdict].non_empty_slots; @@ -1904,7 +1911,6 @@ int dbSwapDatabases(int id1, int id2) { db2->expires_cursor = aux.expires_cursor; db2->dict_count = aux.dict_count; for (dbKeyType subdict = DB_MAIN; subdict <= DB_EXPIRES; subdict++) { - db2->sub_dict[subdict].rehashing = aux.sub_dict[subdict].rehashing; db2->sub_dict[subdict].key_count = aux.sub_dict[subdict].key_count; db2->sub_dict[subdict].bucket_count = aux.sub_dict[subdict].bucket_count; db2->sub_dict[subdict].non_empty_slots = aux.sub_dict[subdict].non_empty_slots; @@ -1950,7 +1956,6 @@ void swapMainDbWithTempDb(redisDb *tempDb) { activedb->expires_cursor = newdb->expires_cursor; activedb->dict_count = newdb->dict_count; for (dbKeyType subdict = DB_MAIN; subdict <= DB_EXPIRES; subdict++) { - activedb->sub_dict[subdict].rehashing = newdb->sub_dict[subdict].rehashing; activedb->sub_dict[subdict].key_count = newdb->sub_dict[subdict].key_count; activedb->sub_dict[subdict].bucket_count = newdb->sub_dict[subdict].bucket_count; activedb->sub_dict[subdict].non_empty_slots = newdb->sub_dict[subdict].non_empty_slots; @@ -1964,7 +1969,6 @@ void swapMainDbWithTempDb(redisDb *tempDb) { newdb->expires_cursor = aux.expires_cursor; newdb->dict_count = aux.dict_count; for (dbKeyType subdict = DB_MAIN; subdict <= DB_EXPIRES; subdict++) { - newdb->sub_dict[subdict].rehashing = aux.sub_dict[subdict].rehashing; newdb->sub_dict[subdict].key_count = aux.sub_dict[subdict].key_count; newdb->sub_dict[subdict].bucket_count = aux.sub_dict[subdict].bucket_count; newdb->sub_dict[subdict].non_empty_slots = aux.sub_dict[subdict].non_empty_slots; diff --git a/src/dict.c b/src/dict.c index f41575a99..328c2dc81 100644 --- a/src/dict.c +++ b/src/dict.c @@ -181,7 +181,11 @@ static void _dictReset(dict *d, int htidx) /* Create a new hash table */ dict *dictCreate(dictType *type) { - dict *d = zmalloc(sizeof(*d)); + size_t metasize = type->dictMetadataBytes ? type->dictMetadataBytes(NULL) : 0; + dict *d = zmalloc(sizeof(*d)+metasize); + if (metasize > 0) { + memset(dictMetadata(d), 0, metasize); + } _dictInit(d,type); return d; } @@ -399,10 +403,10 @@ long long timeInMilliseconds(void) { return (((long long)tv.tv_sec)*1000)+(tv.tv_usec/1000); } -/* Rehash in ms+"delta" milliseconds. The value of "delta" is larger - * than 0, and is smaller than 1 in most cases. The exact upper bound +/* Rehash in us+"delta" microseconds. The value of "delta" is larger + * than 0, and is smaller than 1000 in most cases. The exact upper bound * depends on the running time of dictRehash(d,100).*/ -int dictRehashMilliseconds(dict *d, unsigned int ms) { +int dictRehashMicroseconds(dict *d, uint64_t us) { if (d->pauserehash > 0) return 0; monotime timer; @@ -411,7 +415,7 @@ int dictRehashMilliseconds(dict *d, unsigned int ms) { while(dictRehash(d,100)) { rehashes += 100; - if (elapsedMs(timer) >= ms) break; + if (elapsedUs(timer) >= us) break; } return rehashes; } @@ -1714,7 +1718,7 @@ int dictTest(int argc, char **argv, int flags) { /* Wait for rehashing. */ while (dictIsRehashing(dict)) { - dictRehashMilliseconds(dict,100); + dictRehashMicroseconds(dict,100*1000); } start_benchmark(); diff --git a/src/dict.h b/src/dict.h index 334dc441e..3d4de3be2 100644 --- a/src/dict.h +++ b/src/dict.h @@ -60,6 +60,9 @@ typedef struct dictType { /* Invoked at the end of dict initialization/rehashing of all the entries from old to new ht. Both ht still exists * and are cleaned up after this callback. */ void (*rehashingCompleted)(dict *d); + /* Allow a dict to carry extra caller-defined metadata. The + * extra memory is initialized to 0 when a dict is allocated. */ + size_t (*dictMetadataBytes)(dict *d); /* Flags */ /* The 'no_value' flag, if set, indicates that values are not used, i.e. the * dict is a set. When this flag is set, it's not possible to access the @@ -88,6 +91,7 @@ struct dict { /* Keep small vars at end for optimal (minimal) struct padding */ int16_t pauserehash; /* If >0 rehashing is paused (<0 indicates coding error) */ signed char ht_size_exp[2]; /* exponent of size. (size = 1<type->keyCompare((d), key1, key2) : \ (key1) == (key2)) +#define dictMetadata(d) (&(d)->metadata) +#define dictMetadataSize(d) ((d)->type->dictMetadataBytes \ + ? (d)->type->dictMetadataBytes(d) : 0) + #define dictHashKey(d, key) ((d)->type->hashFunction(key)) #define dictBuckets(d) (DICTHT_SIZE((d)->ht_size_exp[0])+DICTHT_SIZE((d)->ht_size_exp[1])) #define dictSize(d) ((d)->ht_used[0]+(d)->ht_used[1]) @@ -166,7 +174,6 @@ dict *dictCreate(dictType *type); dict **dictCreateMultiple(dictType *type, int count); int dictExpand(dict *d, unsigned long size); int dictTryExpand(dict *d, unsigned long size); -void *dictMetadata(dict *d); int dictAdd(dict *d, void *key, void *val); dictEntry *dictAddRaw(dict *d, void *key, dictEntry **existing); void *dictFindPositionForInsert(dict *d, const void *key, dictEntry **existing); @@ -215,7 +222,7 @@ uint64_t dictGenCaseHashFunction(const unsigned char *buf, size_t len); void dictEmpty(dict *d, void(callback)(dict*)); void dictSetResizeEnabled(dictResizeEnable enable); int dictRehash(dict *d, int n); -int dictRehashMilliseconds(dict *d, unsigned int ms); +int dictRehashMicroseconds(dict *d, uint64_t us); void dictSetHashFunctionSeed(uint8_t *seed); uint8_t *dictGetHashFunctionSeed(void); unsigned long dictScan(dict *d, unsigned long v, dictScanFunction *fn, void *privdata); diff --git a/src/lazyfree.c b/src/lazyfree.c index 2a6d1b7e1..1b58bb78e 100644 --- a/src/lazyfree.c +++ b/src/lazyfree.c @@ -179,6 +179,20 @@ void freeObjAsync(robj *key, robj *obj, int dbid) { * create a new empty set of hash tables and scheduling the old ones for * lazy freeing. */ void emptyDbAsync(redisDb *db) { + dbDictMetadata *metadata; + for (int i = 0; i < db->dict_count; i++) { + metadata = (dbDictMetadata *)dictMetadata(db->dict[i]); + if (metadata->rehashing_node) { + listDelNode(server.rehashing, metadata->rehashing_node); + metadata->rehashing_node = NULL; + } + + metadata = (dbDictMetadata *)dictMetadata(db->expires[i]); + if (metadata->rehashing_node) { + listDelNode(server.rehashing, metadata->rehashing_node); + metadata->rehashing_node = NULL; + } + } dict **oldDict = db->dict; dict **oldExpires = db->expires; atomicIncr(lazyfree_objects,dbSize(db, DB_MAIN)); diff --git a/src/server.c b/src/server.c index 25a20a49b..361a0928f 100644 --- a/src/server.c +++ b/src/server.c @@ -419,52 +419,61 @@ int dictExpandAllowed(size_t moreMem, double usedRatio) { } } -/* Updates the bucket count in cluster-mode for the given dictionary in a DB. bucket count - * incremented with the new ht size during the rehashing phase. - * And also adds dictionary to the rehashing list in cluster mode, which allows us +/* Adds dictionary to the rehashing list, which allows us * to quickly find rehash targets during incremental rehashing. - * - * In non-cluster mode, bucket count can be retrieved directly from single dict bucket and - * we don't need this list as there is only one dictionary per DB. */ -void dictRehashingStarted(dict *d) { - if (!server.cluster_enabled) return; + * + * Updates the bucket count in cluster-mode for the given dictionary in a DB, bucket count + * incremented with the new ht size during the rehashing phase. In non-cluster mode, + * bucket count can be retrieved directly from single dict bucket. */ +void dictRehashingStarted(dict *d, dbKeyType keyType) { + dbDictMetadata *metadata = (dbDictMetadata *)dictMetadata(d); + listAddNodeTail(server.rehashing, d); + metadata->rehashing_node = listLast(server.rehashing); + if (!server.cluster_enabled) return; unsigned long long from, to; dictRehashingInfo(d, &from, &to); - server.db[0].sub_dict[DB_MAIN].bucket_count += to; /* Started rehashing (Add the new ht size) */ - if (from == 0) return; /* No entries are to be moved. */ - if (server.activerehashing) { - listAddNodeTail(server.db[0].sub_dict[DB_MAIN].rehashing, d); - } + server.db[0].sub_dict[keyType].bucket_count += to; /* Started rehashing (Add the new ht size) */ } -/* Updates the bucket count for the given dictionary in a DB. It removes +/* Remove dictionary from the rehashing list. + * + * Updates the bucket count for the given dictionary in a DB. It removes * the old ht size of the dictionary from the total sum of buckets for a DB. */ -void dictRehashingCompleted(dict *d) { - if (!server.cluster_enabled) return; - unsigned long long from, to; - dictRehashingInfo(d, &from, &to); - server.db[0].sub_dict[DB_MAIN].bucket_count -= from; /* Finished rehashing (Remove the old ht size) */ -} - -void dictRehashingStartedForExpires(dict *d) { - if (!server.cluster_enabled) return; - - unsigned long long from, to; - dictRehashingInfo(d, &from, &to); - server.db[0].sub_dict[DB_EXPIRES].bucket_count += to; /* Started rehashing (Add the new ht size) */ - if (from == 0) return; /* No entries are to be moved. */ - if (server.activerehashing) { - listAddNodeTail(server.db[0].sub_dict[DB_EXPIRES].rehashing, d); +void dictRehashingCompleted(dict *d, dbKeyType keyType) { + dbDictMetadata *metadata = (dbDictMetadata *)dictMetadata(d); + if (metadata->rehashing_node) { + listDelNode(server.rehashing, metadata->rehashing_node); + metadata->rehashing_node = NULL; } -} -void dictRehashingCompletedForExpires(dict *d) { if (!server.cluster_enabled) return; - unsigned long long from, to; dictRehashingInfo(d, &from, &to); - server.db[0].sub_dict[DB_EXPIRES].bucket_count -= from; /* Finished rehashing (Remove the old ht size) */ + server.db[0].sub_dict[keyType].bucket_count -= from; /* Finished rehashing (Remove the old ht size) */ +} + +void dbDictRehashingStarted(dict *d) { + dictRehashingStarted(d, DB_MAIN); +} + +void dbDictRehashingCompleted(dict *d) { + dictRehashingCompleted(d, DB_MAIN); +} + +void dbExpiresRehashingStarted(dict *d) { + dictRehashingStarted(d, DB_EXPIRES); +} + +void dbExpiresRehashingCompleted(dict *d) { + dictRehashingCompleted(d, DB_EXPIRES); +} + +/* Returns the size of the DB dict metadata in bytes. */ +size_t dbDictMetadataSize(dict *d) { + UNUSED(d); + /* NOTICE: this also affects overhead_ht_main and overhead_ht_expires in getMemoryOverheadData. */ + return sizeof(dbDictMetadata); } /* Generic hash table type where keys are Redis Objects, Values @@ -522,8 +531,9 @@ dictType dbDictType = { dictSdsDestructor, /* key destructor */ dictObjectDestructor, /* val destructor */ dictExpandAllowed, /* allow to expand */ - dictRehashingStarted, - dictRehashingCompleted, + dbDictRehashingStarted, + dbDictRehashingCompleted, + dbDictMetadataSize, }; /* Db->expires */ @@ -535,8 +545,9 @@ dictType dbExpiresDictType = { NULL, /* key destructor */ NULL, /* val destructor */ dictExpandAllowed, /* allow to expand */ - dictRehashingStartedForExpires, - dictRehashingCompletedForExpires, + dbExpiresRehashingStarted, + dbExpiresRehashingCompleted, + dbDictMetadataSize, }; /* Command table. sds string -> command struct pointer. */ @@ -683,45 +694,23 @@ void tryResizeHashTables(int dbid) { * * The function returns 1 if some rehashing was performed, otherwise 0 * is returned. */ -int incrementallyRehash(int dbid) { - /* Rehash main and expire dictionary . */ - if (server.cluster_enabled) { - listNode *node, *nextNode; - monotime timer; - elapsedStart(&timer); - /* Our goal is to rehash as many slot specific dictionaries as we can before reaching predefined threshold, - * while removing those that already finished rehashing from the queue. */ - for (dbKeyType subdict = DB_MAIN; subdict <= DB_EXPIRES; subdict++) { - serverLog(LL_DEBUG,"Rehashing list length: %lu", listLength(server.db[dbid].sub_dict[subdict].rehashing)); - while ((node = listFirst(server.db[dbid].sub_dict[subdict].rehashing))) { - if (dictIsRehashing((dict *) listNodeValue(node))) { - dictRehashMilliseconds(listNodeValue(node), INCREMENTAL_REHASHING_THRESHOLD_MS); - if (elapsedMs(timer) >= INCREMENTAL_REHASHING_THRESHOLD_MS) { - return 1; /* Reached the time limit. */ - } - } else { /* It is possible that rehashing has already completed for this dictionary, simply remove it from the queue. */ - nextNode = listNextNode(node); - listDelNode(server.db[dbid].sub_dict[subdict].rehashing, node); - node = nextNode; - } - } - } - /* When cluster mode is disabled, only one dict is used for the entire DB and rehashing list isn't populated. */ - } else { - /* Rehash main dict. */ - dict *main_dict = server.db[dbid].dict[0]; - if (dictIsRehashing(main_dict)) { - dictRehashMilliseconds(main_dict, INCREMENTAL_REHASHING_THRESHOLD_MS); - return 1; /* already used our millisecond for this loop... */ - } - /* Rehash expires. */ - dict *expires_dict = server.db[dbid].expires[0]; - if (dictIsRehashing(expires_dict)) { - dictRehashMilliseconds(expires_dict, INCREMENTAL_REHASHING_THRESHOLD_MS); - return 1; /* already used our millisecond for this loop... */ +int incrementallyRehash(void) { + if (listLength(server.rehashing) == 0) return 0; + serverLog(LL_DEBUG,"Rehashing list length: %lu", listLength(server.rehashing)); + + /* Our goal is to rehash as many dictionaries as we can before reaching predefined threshold, + * after each dictionary completes rehashing, it removes itself from the list. */ + listNode *node; + monotime timer; + elapsedStart(&timer); + while ((node = listFirst(server.rehashing))) { + uint64_t elapsed_us = elapsedUs(timer); + if (elapsed_us >= INCREMENTAL_REHASHING_THRESHOLD_US) { + break; /* Reached the time limit. */ } + dictRehashMicroseconds(listNodeValue(node), INCREMENTAL_REHASHING_THRESHOLD_US - elapsed_us); } - return 0; + return 1; } /* This function is called once a background process of some kind terminates, @@ -1162,7 +1151,6 @@ void databasesCron(void) { * DB we'll be able to start from the successive in the next * cron loop iteration. */ static unsigned int resize_db = 0; - static unsigned int rehash_db = 0; int dbs_per_call = CRON_DBS_PER_CALL; int j; @@ -1177,18 +1165,7 @@ void databasesCron(void) { /* Rehash */ if (server.activerehashing) { - for (j = 0; j < dbs_per_call; j++) { - int work_done = incrementallyRehash(rehash_db); - if (work_done) { - /* If the function did some work, stop here, we'll do - * more at the next cron loop. */ - break; - } else { - /* If this db didn't need rehash, we'll try the next one. */ - rehash_db++; - rehash_db %= server.dbnum; - } - } + incrementallyRehash(); } } } @@ -2654,7 +2631,6 @@ void makeThreadKillable(void) { /* When adding fields, please check the initTempDb related logic. */ void initDbState(redisDb *db){ for (dbKeyType subdict = DB_MAIN; subdict <= DB_EXPIRES; subdict++) { - db->sub_dict[subdict].rehashing = listCreate(); db->sub_dict[subdict].non_empty_slots = 0; db->sub_dict[subdict].key_count = 0; db->sub_dict[subdict].resize_cursor = -1; @@ -2754,6 +2730,7 @@ void initServer(void) { initDbState(&server.db[j]); listSetFreeMethod(server.db[j].defrag_later,(void (*)(void*))sdsfree); } + server.rehashing = listCreate(); evictionPoolAlloc(); /* Initialize the LRU keys pool. */ server.pubsub_channels = dictCreate(&keylistDictType); server.pubsub_patterns = dictCreate(&keylistDictType); diff --git a/src/server.h b/src/server.h index a0b028f00..c0a49f538 100644 --- a/src/server.h +++ b/src/server.h @@ -137,7 +137,7 @@ struct hdr_histogram; #define CONFIG_BINDADDR_MAX 16 #define CONFIG_MIN_RESERVED_FDS 32 #define CONFIG_DEFAULT_PROC_TITLE_TEMPLATE "{title} {listen-addr} {server-mode}" -#define INCREMENTAL_REHASHING_THRESHOLD_MS 1 +#define INCREMENTAL_REHASHING_THRESHOLD_US 1000 /* Bucket sizes for client eviction pools. Each bucket stores clients with * memory usage of up to twice the size of the bucket below it. */ @@ -971,7 +971,6 @@ typedef struct replBufBlock { /* When adding fields, please check the swap db related logic. */ typedef struct dbDictState { - list *rehashing; /* List of dictionaries in this DB that are currently rehashing. */ int resize_cursor; /* Cron job uses this cursor to gradually resize dictionaries (only used for cluster-enabled). */ int non_empty_slots; /* The number of non-empty slots. */ unsigned long long key_count; /* Total number of keys in this DB. */ @@ -984,6 +983,11 @@ typedef enum dbKeyType { DB_EXPIRES } dbKeyType; +/* Dict metadata for database, used for record the position in rehashing list. */ +typedef struct dbDictMetadata { + listNode *rehashing_node; /* list node in rehashing list */ +} dbDictMetadata; + /* Redis database representation. There are multiple databases identified * by integers from 0 (the default database) up to the max configured * database. The database number is the 'id' field in the structure. */ @@ -1569,6 +1573,7 @@ struct redisServer { int hz; /* serverCron() calls frequency in hertz */ int in_fork_child; /* indication that this is a fork child */ redisDb *db; + list *rehashing; /* List of dictionaries in DBs that are currently rehashing. */ dict *commands; /* Command table */ dict *orig_commands; /* Command table before command renaming. */ aeEventLoop *el;