dict.c: added optional callback to dictEmpty().

Redis hash table implementation has many non-blocking features like
incremental rehashing, however while deleting a large hash table there
was no way to have a callback called to do some incremental work.

This commit adds this support, as an optiona callback argument to
dictEmpty() that is currently called at a fixed interval (one time every
65k deletions).
This commit is contained in:
antirez 2013-12-10 18:18:24 +01:00
parent 2c4ab8a534
commit 2eb781b35b
8 changed files with 22 additions and 21 deletions

View File

@ -170,14 +170,14 @@ int dbDelete(redisDb *db, robj *key) {
} }
} }
long long emptyDb() { long long emptyDb(void(callback)(void*)) {
int j; int j;
long long removed = 0; long long removed = 0;
for (j = 0; j < server.dbnum; j++) { for (j = 0; j < server.dbnum; j++) {
removed += dictSize(server.db[j].dict); removed += dictSize(server.db[j].dict);
dictEmpty(server.db[j].dict); dictEmpty(server.db[j].dict,callback);
dictEmpty(server.db[j].expires); dictEmpty(server.db[j].expires,callback);
} }
if (server.cluster_enabled) slotToKeyFlush(); if (server.cluster_enabled) slotToKeyFlush();
return removed; return removed;
@ -214,15 +214,15 @@ void signalFlushedDb(int dbid) {
void flushdbCommand(redisClient *c) { void flushdbCommand(redisClient *c) {
server.dirty += dictSize(c->db->dict); server.dirty += dictSize(c->db->dict);
signalFlushedDb(c->db->id); signalFlushedDb(c->db->id);
dictEmpty(c->db->dict); dictEmpty(c->db->dict,NULL);
dictEmpty(c->db->expires); dictEmpty(c->db->expires,NULL);
if (server.cluster_enabled) slotToKeyFlush(); if (server.cluster_enabled) slotToKeyFlush();
addReply(c,shared.ok); addReply(c,shared.ok);
} }
void flushallCommand(redisClient *c) { void flushallCommand(redisClient *c) {
signalFlushedDb(-1); signalFlushedDb(-1);
server.dirty += emptyDb(); server.dirty += emptyDb(NULL);
addReply(c,shared.ok); addReply(c,shared.ok);
if (server.rdb_child_pid != -1) { if (server.rdb_child_pid != -1) {
kill(server.rdb_child_pid,SIGUSR1); kill(server.rdb_child_pid,SIGUSR1);

View File

@ -261,7 +261,7 @@ void debugCommand(redisClient *c) {
addReply(c,shared.err); addReply(c,shared.err);
return; return;
} }
emptyDb(); emptyDb(NULL);
if (rdbLoad(server.rdb_filename) != REDIS_OK) { if (rdbLoad(server.rdb_filename) != REDIS_OK) {
addReplyError(c,"Error trying to load the RDB dump"); addReplyError(c,"Error trying to load the RDB dump");
return; return;
@ -269,7 +269,7 @@ void debugCommand(redisClient *c) {
redisLog(REDIS_WARNING,"DB reloaded by DEBUG RELOAD"); redisLog(REDIS_WARNING,"DB reloaded by DEBUG RELOAD");
addReply(c,shared.ok); addReply(c,shared.ok);
} else if (!strcasecmp(c->argv[1]->ptr,"loadaof")) { } else if (!strcasecmp(c->argv[1]->ptr,"loadaof")) {
emptyDb(); emptyDb(NULL);
if (loadAppendOnlyFile(server.aof_filename) != REDIS_OK) { if (loadAppendOnlyFile(server.aof_filename) != REDIS_OK) {
addReply(c,shared.err); addReply(c,shared.err);
return; return;

View File

@ -444,14 +444,15 @@ int dictDeleteNoFree(dict *ht, const void *key) {
} }
/* Destroy an entire dictionary */ /* Destroy an entire dictionary */
int _dictClear(dict *d, dictht *ht) int _dictClear(dict *d, dictht *ht, void(callback)(void *)) {
{
unsigned long i; unsigned long i;
/* Free all the elements */ /* Free all the elements */
for (i = 0; i < ht->size && ht->used > 0; i++) { for (i = 0; i < ht->size && ht->used > 0; i++) {
dictEntry *he, *nextHe; dictEntry *he, *nextHe;
if (callback && (i & 65535) == 0) callback(d->privdata);
if ((he = ht->table[i]) == NULL) continue; if ((he = ht->table[i]) == NULL) continue;
while(he) { while(he) {
nextHe = he->next; nextHe = he->next;
@ -472,8 +473,8 @@ int _dictClear(dict *d, dictht *ht)
/* Clear & Release the hash table */ /* Clear & Release the hash table */
void dictRelease(dict *d) void dictRelease(dict *d)
{ {
_dictClear(d,&d->ht[0]); _dictClear(d,&d->ht[0],NULL);
_dictClear(d,&d->ht[1]); _dictClear(d,&d->ht[1],NULL);
zfree(d); zfree(d);
} }
@ -882,9 +883,9 @@ static int _dictKeyIndex(dict *d, const void *key)
return idx; return idx;
} }
void dictEmpty(dict *d) { void dictEmpty(dict *d, void(callback)(void*)) {
_dictClear(d,&d->ht[0]); _dictClear(d,&d->ht[0],callback);
_dictClear(d,&d->ht[1]); _dictClear(d,&d->ht[1],callback);
d->rehashidx = -1; d->rehashidx = -1;
d->iterators = 0; d->iterators = 0;
} }

View File

@ -160,7 +160,7 @@ dictEntry *dictGetRandomKey(dict *d);
void dictPrintStats(dict *d); void dictPrintStats(dict *d);
unsigned int dictGenHashFunction(const void *key, int len); unsigned int dictGenHashFunction(const void *key, int len);
unsigned int dictGenCaseHashFunction(const unsigned char *buf, int len); unsigned int dictGenCaseHashFunction(const unsigned char *buf, int len);
void dictEmpty(dict *d); void dictEmpty(dict *d, void(callback)(void*));
void dictEnableResize(void); void dictEnableResize(void);
void dictDisableResize(void); void dictDisableResize(void);
int dictRehash(dict *d, int n); int dictRehash(dict *d, int n);

View File

@ -1212,7 +1212,7 @@ void setKey(redisDb *db, robj *key, robj *val);
int dbExists(redisDb *db, robj *key); int dbExists(redisDb *db, robj *key);
robj *dbRandomKey(redisDb *db); robj *dbRandomKey(redisDb *db);
int dbDelete(redisDb *db, robj *key); int dbDelete(redisDb *db, robj *key);
long long emptyDb(); long long emptyDb(void(callback)(void*));
int selectDb(redisClient *c, int id); int selectDb(redisClient *c, int id);
void signalModifiedKey(redisDb *db, robj *key); void signalModifiedKey(redisDb *db, robj *key);
void signalFlushedDb(int dbid); void signalFlushedDb(int dbid);

View File

@ -796,7 +796,7 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) {
} }
redisLog(REDIS_NOTICE, "MASTER <-> SLAVE sync: Flushing old data"); redisLog(REDIS_NOTICE, "MASTER <-> SLAVE sync: Flushing old data");
signalFlushedDb(-1); signalFlushedDb(-1);
emptyDb(); emptyDb(NULL);
/* Before loading the DB into memory we need to delete the readable /* Before loading the DB into memory we need to delete the readable
* handler, otherwise it will get called recursively since * handler, otherwise it will get called recursively since
* rdbLoad() will call the event loop to process events from time to * rdbLoad() will call the event loop to process events from time to
@ -1468,7 +1468,7 @@ void replicationScriptCacheInit(void) {
* to reclaim otherwise unused memory. * to reclaim otherwise unused memory.
*/ */
void replicationScriptCacheFlush(void) { void replicationScriptCacheFlush(void) {
dictEmpty(server.repl_scriptcache_dict); dictEmpty(server.repl_scriptcache_dict,NULL);
listRelease(server.repl_scriptcache_fifo); listRelease(server.repl_scriptcache_fifo);
server.repl_scriptcache_fifo = listCreate(); server.repl_scriptcache_fifo = listCreate();
} }

View File

@ -391,7 +391,7 @@ void initSentinel(void) {
/* Remove usual Redis commands from the command table, then just add /* Remove usual Redis commands from the command table, then just add
* the SENTINEL command. */ * the SENTINEL command. */
dictEmpty(server.commands); dictEmpty(server.commands,NULL);
for (j = 0; j < sizeof(sentinelcmds)/sizeof(sentinelcmds[0]); j++) { for (j = 0; j < sizeof(sentinelcmds)/sizeof(sentinelcmds[0]); j++) {
int retval; int retval;
struct redisCommand *cmd = sentinelcmds+j; struct redisCommand *cmd = sentinelcmds+j;

View File

@ -835,7 +835,7 @@ void unblockClientWaitingData(redisClient *c) {
dictReleaseIterator(di); dictReleaseIterator(di);
/* Cleanup the client structure */ /* Cleanup the client structure */
dictEmpty(c->bpop.keys); dictEmpty(c->bpop.keys,NULL);
if (c->bpop.target) { if (c->bpop.target) {
decrRefCount(c->bpop.target); decrRefCount(c->bpop.target);
c->bpop.target = NULL; c->bpop.target = NULL;