mirror of
https://codeberg.org/redict/redict.git
synced 2025-01-22 16:18:28 -05:00
RDB: load files faster avoiding useless free+realloc.
Reloading of the RDB generated by DEBUG POPULATE 5000000 SAVE is now 25% faster. This commit also prepares the ability to have more flexibility when loading stuff from the RDB, since we no longer use dbAdd() but can control exactly how things are added in the database.
This commit is contained in:
parent
96688aa646
commit
30adc62232
@ -4966,7 +4966,7 @@ void restoreCommand(client *c) {
|
||||
|
||||
rioInitWithBuffer(&payload,c->argv[3]->ptr);
|
||||
if (((type = rdbLoadObjectType(&payload)) == -1) ||
|
||||
((obj = rdbLoadObject(type,&payload,c->argv[1])) == NULL))
|
||||
((obj = rdbLoadObject(type,&payload,c->argv[1]->ptr)) == NULL))
|
||||
{
|
||||
addReplyError(c,"Bad data format");
|
||||
return;
|
||||
|
16
src/db.c
16
src/db.c
@ -185,7 +185,7 @@ void dbAdd(redisDb *db, robj *key, robj *val) {
|
||||
val->type == OBJ_ZSET ||
|
||||
val->type == OBJ_STREAM)
|
||||
signalKeyAsReady(db, key);
|
||||
if (server.cluster_enabled) slotToKeyAdd(key);
|
||||
if (server.cluster_enabled) slotToKeyAdd(key->ptr);
|
||||
}
|
||||
|
||||
/* Overwrite an existing key with a new value. Incrementing the reference
|
||||
@ -288,7 +288,7 @@ int dbSyncDelete(redisDb *db, robj *key) {
|
||||
* the key, because it is shared with the main dictionary. */
|
||||
if (dictSize(db->expires) > 0) dictDelete(db->expires,key->ptr);
|
||||
if (dictDelete(db->dict,key->ptr) == DICT_OK) {
|
||||
if (server.cluster_enabled) slotToKeyDel(key);
|
||||
if (server.cluster_enabled) slotToKeyDel(key->ptr);
|
||||
return 1;
|
||||
} else {
|
||||
return 0;
|
||||
@ -1647,17 +1647,17 @@ int *xreadGetKeys(struct redisCommand *cmd, robj **argv, int argc, int *numkeys)
|
||||
* a fast way a key that belongs to a specified hash slot. This is useful
|
||||
* while rehashing the cluster and in other conditions when we need to
|
||||
* understand if we have keys for a given hash slot. */
|
||||
void slotToKeyUpdateKey(robj *key, int add) {
|
||||
unsigned int hashslot = keyHashSlot(key->ptr,sdslen(key->ptr));
|
||||
void slotToKeyUpdateKey(sds key, int add) {
|
||||
size_t keylen = sdslen(key);
|
||||
unsigned int hashslot = keyHashSlot(key,keylen);
|
||||
unsigned char buf[64];
|
||||
unsigned char *indexed = buf;
|
||||
size_t keylen = sdslen(key->ptr);
|
||||
|
||||
server.cluster->slots_keys_count[hashslot] += add ? 1 : -1;
|
||||
if (keylen+2 > 64) indexed = zmalloc(keylen+2);
|
||||
indexed[0] = (hashslot >> 8) & 0xff;
|
||||
indexed[1] = hashslot & 0xff;
|
||||
memcpy(indexed+2,key->ptr,keylen);
|
||||
memcpy(indexed+2,key,keylen);
|
||||
if (add) {
|
||||
raxInsert(server.cluster->slots_to_keys,indexed,keylen+2,NULL,NULL);
|
||||
} else {
|
||||
@ -1666,11 +1666,11 @@ void slotToKeyUpdateKey(robj *key, int add) {
|
||||
if (indexed != buf) zfree(indexed);
|
||||
}
|
||||
|
||||
void slotToKeyAdd(robj *key) {
|
||||
void slotToKeyAdd(sds key) {
|
||||
slotToKeyUpdateKey(key,1);
|
||||
}
|
||||
|
||||
void slotToKeyDel(robj *key) {
|
||||
void slotToKeyDel(sds key) {
|
||||
slotToKeyUpdateKey(key,0);
|
||||
}
|
||||
|
||||
|
@ -83,7 +83,7 @@ int dbAsyncDelete(redisDb *db, robj *key) {
|
||||
* field to NULL in order to lazy free it later. */
|
||||
if (de) {
|
||||
dictFreeUnlinkedEntry(db->dict,de);
|
||||
if (server.cluster_enabled) slotToKeyDel(key);
|
||||
if (server.cluster_enabled) slotToKeyDel(key->ptr);
|
||||
return 1;
|
||||
} else {
|
||||
return 0;
|
||||
|
40
src/rdb.c
40
src/rdb.c
@ -1422,7 +1422,7 @@ robj *rdbLoadCheckModuleValue(rio *rdb, char *modulename) {
|
||||
|
||||
/* Load a Redis object of the specified type from the specified file.
|
||||
* On success a newly allocated object is returned, otherwise NULL. */
|
||||
robj *rdbLoadObject(int rdbtype, rio *rdb, robj *key) {
|
||||
robj *rdbLoadObject(int rdbtype, rio *rdb, sds key) {
|
||||
robj *o = NULL, *ele, *dec;
|
||||
uint64_t len;
|
||||
unsigned int i;
|
||||
@ -1886,7 +1886,9 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, robj *key) {
|
||||
exit(1);
|
||||
}
|
||||
RedisModuleIO io;
|
||||
moduleInitIOContext(io,mt,rdb,key);
|
||||
robj keyobj;
|
||||
initStaticStringObject(keyobj,key);
|
||||
moduleInitIOContext(io,mt,rdb,&keyobj);
|
||||
io.ver = (rdbtype == RDB_TYPE_MODULE) ? 1 : 2;
|
||||
/* Call the rdb_load method of the module providing the 10 bit
|
||||
* encoding version in the lower 10 bits of the module ID. */
|
||||
@ -2044,7 +2046,8 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
|
||||
long long lru_clock = LRU_CLOCK();
|
||||
|
||||
while(1) {
|
||||
robj *key, *val;
|
||||
sds key;
|
||||
robj *val;
|
||||
|
||||
/* Read type. */
|
||||
if ((type = rdbLoadType(rdb)) == -1) goto eoferr;
|
||||
@ -2216,10 +2219,11 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
|
||||
}
|
||||
|
||||
/* Read key */
|
||||
if ((key = rdbLoadStringObject(rdb)) == NULL) goto eoferr;
|
||||
if ((key = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL)) == NULL)
|
||||
goto eoferr;
|
||||
/* Read value */
|
||||
if ((val = rdbLoadObject(type,rdb,key)) == NULL) {
|
||||
decrRefCount(key);
|
||||
sdsfree(key);
|
||||
goto eoferr;
|
||||
}
|
||||
|
||||
@ -2229,24 +2233,32 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
|
||||
* responsible for key expiry. If we would expire keys here, the
|
||||
* snapshot taken by the master may not be reflected on the slave. */
|
||||
if (iAmMaster() && !(rdbflags&RDBFLAGS_AOF_PREAMBLE) && expiretime != -1 && expiretime < now) {
|
||||
decrRefCount(key);
|
||||
sdsfree(key);
|
||||
decrRefCount(val);
|
||||
} else {
|
||||
/* Add the new object in the hash table */
|
||||
dbAdd(db,key,val);
|
||||
int retval = dictAdd(db->dict, key, val);
|
||||
if (retval != DICT_OK) {
|
||||
serverLog(LL_WARNING,
|
||||
"RDB has duplicated key '%s' in DB %d",key,db->id);
|
||||
serverPanic("Duplicated key found in RDB file");
|
||||
}
|
||||
if (server.cluster_enabled) slotToKeyAdd(key);
|
||||
|
||||
/* Set the expire time if needed */
|
||||
if (expiretime != -1) setExpire(NULL,db,key,expiretime);
|
||||
if (expiretime != -1) {
|
||||
robj keyobj;
|
||||
initStaticStringObject(keyobj,key);
|
||||
setExpire(NULL,db,&keyobj,expiretime);
|
||||
}
|
||||
|
||||
/* Set usage information (for eviction). */
|
||||
objectSetLRUOrLFU(val,lfu_freq,lru_idle,lru_clock,1000);
|
||||
|
||||
/* Decrement the key refcount since dbAdd() will take its
|
||||
* own reference. */
|
||||
decrRefCount(key);
|
||||
}
|
||||
if (server.key_load_delay)
|
||||
usleep(server.key_load_delay);
|
||||
|
||||
/* Loading the database more slowly is useful in order to test
|
||||
* certain edge cases. */
|
||||
if (server.key_load_delay) usleep(server.key_load_delay);
|
||||
|
||||
/* Reset the state that is key-specified and is populated by
|
||||
* opcodes before the key, so that we start from scratch again. */
|
||||
|
@ -144,7 +144,7 @@ void rdbRemoveTempFile(pid_t childpid);
|
||||
int rdbSave(char *filename, rdbSaveInfo *rsi);
|
||||
ssize_t rdbSaveObject(rio *rdb, robj *o, robj *key);
|
||||
size_t rdbSavedObjectLen(robj *o, robj *key);
|
||||
robj *rdbLoadObject(int type, rio *rdb, robj *key);
|
||||
robj *rdbLoadObject(int type, rio *rdb, sds key);
|
||||
void backgroundSaveDoneHandler(int exitcode, int bysignal);
|
||||
int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val, long long expiretime);
|
||||
ssize_t rdbSaveSingleModuleAux(rio *rdb, int when, moduleType *mt);
|
||||
|
@ -287,7 +287,7 @@ int redis_check_rdb(char *rdbfilename, FILE *fp) {
|
||||
rdbstate.keys++;
|
||||
/* Read value */
|
||||
rdbstate.doing = RDB_CHECK_DOING_READ_OBJECT_VALUE;
|
||||
if ((val = rdbLoadObject(type,&rdb,key)) == NULL) goto eoferr;
|
||||
if ((val = rdbLoadObject(type,&rdb,key->ptr)) == NULL) goto eoferr;
|
||||
/* Check if the key already expired. */
|
||||
if (expiretime != -1 && expiretime < now)
|
||||
rdbstate.already_expired++;
|
||||
|
@ -2093,8 +2093,8 @@ unsigned int delKeysInSlot(unsigned int hashslot);
|
||||
int verifyClusterConfigWithData(void);
|
||||
void scanGenericCommand(client *c, robj *o, unsigned long cursor);
|
||||
int parseScanCursorOrReply(client *c, robj *o, unsigned long *cursor);
|
||||
void slotToKeyAdd(robj *key);
|
||||
void slotToKeyDel(robj *key);
|
||||
void slotToKeyAdd(sds key);
|
||||
void slotToKeyDel(sds key);
|
||||
void slotToKeyFlush(void);
|
||||
int dbAsyncDelete(redisDb *db, robj *key);
|
||||
void emptyDbAsync(redisDb *db);
|
||||
|
Loading…
Reference in New Issue
Block a user