RDB: refactor some RDB loading code into dbAddRDBLoad().

This commit is contained in:
antirez 2020-04-09 16:21:48 +02:00
parent 399a6b2b47
commit d88f52ee7d
3 changed files with 22 additions and 4 deletions

View File

@ -188,6 +188,24 @@ void dbAdd(redisDb *db, robj *key, robj *val) {
if (server.cluster_enabled) slotToKeyAdd(key->ptr); if (server.cluster_enabled) slotToKeyAdd(key->ptr);
} }
/* This is a special version of dbAdd() that is used only when loading
* keys from the RDB file: the key is passed as an SDS string that is
* retained by the function (and not freed by the caller).
*
* Moreover this function will not abort if the key is already busy, to
* give more control to the caller, nor will signal the key as ready
* since it is not useful in this context.
*
* The function returns 1 if the key was added to the database, taking
* ownership of the SDS string, otherwise 0 is returned, and is up to the
* caller to free the SDS string. */
int dbAddRDBLoad(redisDb *db, sds key, robj *val) {
int retval = dictAdd(db->dict, key, val);
if (retval != DICT_OK) return 0;
if (server.cluster_enabled) slotToKeyAdd(key);
return 1;
}
/* Overwrite an existing key with a new value. Incrementing the reference /* Overwrite an existing key with a new value. Incrementing the reference
* count of the new value is up to the caller. * count of the new value is up to the caller.
* This function does not modify the expire time of the existing key. * This function does not modify the expire time of the existing key.

View File

@ -2245,22 +2245,21 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
robj keyobj; robj keyobj;
/* Add the new object in the hash table */ /* Add the new object in the hash table */
int retval = dictAdd(db->dict, key, val); int added = dbAddRDBLoad(db,key,val);
if (retval != DICT_OK) { if (!added) {
if (rdbflags & RDBFLAGS_ALLOW_DUP) { if (rdbflags & RDBFLAGS_ALLOW_DUP) {
/* This flag is useful for DEBUG RELOAD special modes. /* This flag is useful for DEBUG RELOAD special modes.
* When it's set we allow new keys to replace the current * When it's set we allow new keys to replace the current
* keys with the same name. */ * keys with the same name. */
initStaticStringObject(keyobj,key); initStaticStringObject(keyobj,key);
dbSyncDelete(db,&keyobj); dbSyncDelete(db,&keyobj);
dictAdd(db->dict, key, val); dbAddRDBLoad(db,key,val);
} else { } else {
serverLog(LL_WARNING, serverLog(LL_WARNING,
"RDB has duplicated key '%s' in DB %d",key,db->id); "RDB has duplicated key '%s' in DB %d",key,db->id);
serverPanic("Duplicated key found in RDB file"); serverPanic("Duplicated key found in RDB file");
} }
} }
if (server.cluster_enabled) slotToKeyAdd(key);
/* Set the expire time if needed */ /* Set the expire time if needed */
if (expiretime != -1) { if (expiretime != -1) {

View File

@ -2069,6 +2069,7 @@ int objectSetLRUOrLFU(robj *val, long long lfu_freq, long long lru_idle,
#define LOOKUP_NONE 0 #define LOOKUP_NONE 0
#define LOOKUP_NOTOUCH (1<<0) #define LOOKUP_NOTOUCH (1<<0)
void dbAdd(redisDb *db, robj *key, robj *val); void dbAdd(redisDb *db, robj *key, robj *val);
int dbAddRDBLoad(redisDb *db, sds key, robj *val);
void dbOverwrite(redisDb *db, robj *key, robj *val); void dbOverwrite(redisDb *db, robj *key, robj *val);
void genericSetKey(redisDb *db, robj *key, robj *val, int keepttl, int signal); void genericSetKey(redisDb *db, robj *key, robj *val, int keepttl, int signal);
void setKey(redisDb *db, robj *key, robj *val); void setKey(redisDb *db, robj *key, robj *val);