Merge branch 'faster-rdb-loading' into unstable

This commit is contained in:
antirez 2020-04-09 19:15:13 +02:00
commit 3072498665
9 changed files with 129 additions and 43 deletions

View File

@ -4966,7 +4966,7 @@ void restoreCommand(client *c) {
rioInitWithBuffer(&payload,c->argv[3]->ptr);
if (((type = rdbLoadObjectType(&payload)) == -1) ||
((obj = rdbLoadObject(type,&payload,c->argv[1])) == NULL))
((obj = rdbLoadObject(type,&payload,c->argv[1]->ptr)) == NULL))
{
addReplyError(c,"Bad data format");
return;

View File

@ -185,7 +185,25 @@ void dbAdd(redisDb *db, robj *key, robj *val) {
val->type == OBJ_ZSET ||
val->type == OBJ_STREAM)
signalKeyAsReady(db, key);
if (server.cluster_enabled) slotToKeyAdd(key->ptr);
}
/* This is a special version of dbAdd() that is used only when loading
* keys from the RDB file: the key is passed as an SDS string that is
* retained by the function (and not freed by the caller).
*
* Moreover this function will not abort if the key is already busy, to
* give more control to the caller, nor will signal the key as ready
* since it is not useful in this context.
*
* The function returns 1 if the key was added to the database, taking
* ownership of the SDS string, otherwise 0 is returned, and is up to the
* caller to free the SDS string. */
int dbAddRDBLoad(redisDb *db, sds key, robj *val) {
int retval = dictAdd(db->dict, key, val);
if (retval != DICT_OK) return 0;
if (server.cluster_enabled) slotToKeyAdd(key);
return 1;
}
/* Overwrite an existing key with a new value. Incrementing the reference
@ -288,7 +306,7 @@ int dbSyncDelete(redisDb *db, robj *key) {
* the key, because it is shared with the main dictionary. */
if (dictSize(db->expires) > 0) dictDelete(db->expires,key->ptr);
if (dictDelete(db->dict,key->ptr) == DICT_OK) {
if (server.cluster_enabled) slotToKeyDel(key);
if (server.cluster_enabled) slotToKeyDel(key->ptr);
return 1;
} else {
return 0;
@ -1647,17 +1665,17 @@ int *xreadGetKeys(struct redisCommand *cmd, robj **argv, int argc, int *numkeys)
* a fast way a key that belongs to a specified hash slot. This is useful
* while rehashing the cluster and in other conditions when we need to
* understand if we have keys for a given hash slot. */
void slotToKeyUpdateKey(robj *key, int add) {
unsigned int hashslot = keyHashSlot(key->ptr,sdslen(key->ptr));
void slotToKeyUpdateKey(sds key, int add) {
size_t keylen = sdslen(key);
unsigned int hashslot = keyHashSlot(key,keylen);
unsigned char buf[64];
unsigned char *indexed = buf;
size_t keylen = sdslen(key->ptr);
server.cluster->slots_keys_count[hashslot] += add ? 1 : -1;
if (keylen+2 > 64) indexed = zmalloc(keylen+2);
indexed[0] = (hashslot >> 8) & 0xff;
indexed[1] = hashslot & 0xff;
memcpy(indexed+2,key->ptr,keylen);
memcpy(indexed+2,key,keylen);
if (add) {
raxInsert(server.cluster->slots_to_keys,indexed,keylen+2,NULL,NULL);
} else {
@ -1666,11 +1684,11 @@ void slotToKeyUpdateKey(robj *key, int add) {
if (indexed != buf) zfree(indexed);
}
void slotToKeyAdd(robj *key) {
void slotToKeyAdd(sds key) {
slotToKeyUpdateKey(key,1);
}
void slotToKeyDel(robj *key) {
void slotToKeyDel(sds key) {
slotToKeyUpdateKey(key,0);
}

View File

@ -366,7 +366,7 @@ void debugCommand(client *c) {
"OOM -- Crash the server simulating an out-of-memory error.",
"PANIC -- Crash the server simulating a panic.",
"POPULATE <count> [prefix] [size] -- Create <count> string keys named key:<num>. If a prefix is specified is used instead of the 'key' prefix.",
"RELOAD -- Save the RDB on disk and reload it back in memory.",
"RELOAD [MERGE] [NOFLUSH] [NOSAVE] -- Save the RDB on disk and reload it back in memory. By default it will save the RDB file and load it back. With the NOFLUSH option the current database is not removed before loading the new one, but conficts in keys will kill the server with an exception. When MERGE is used, conflicting keys will be loaded (the key in the loaded RDB file will win). When NOSAVE is used, the server will not save the current dataset in the RDB file before loading. Use DEBUG RELOAD NOSAVE when you want just to load the RDB file you placed in the Redis working directory in order to replace the current dataset in memory. Use DEBUG RELOAD NOSAVE NOFLUSH MERGE when you want to add what is in the current RDB file placed in the Redis current directory, with the current memory content. Use DEBUG RELOAD when you want to verify Redis is able to persist the current dataset in the RDB file, flush the memory content, and load it back.",
"RESTART -- Graceful restart: save config, db, restart.",
"SDSLEN <key> -- Show low level SDS string info representing key and value.",
"SEGFAULT -- Crash the server with sigsegv.",
@ -411,15 +411,44 @@ NULL
serverLog(LL_WARNING, "DEBUG LOG: %s", (char*)c->argv[2]->ptr);
addReply(c,shared.ok);
} else if (!strcasecmp(c->argv[1]->ptr,"reload")) {
rdbSaveInfo rsi, *rsiptr;
rsiptr = rdbPopulateSaveInfo(&rsi);
if (rdbSave(server.rdb_filename,rsiptr) != C_OK) {
addReply(c,shared.err);
return;
int flush = 1, save = 1;
int flags = RDBFLAGS_NONE;
/* Parse the additional options that modify the RELOAD
* behavior. */
for (int j = 2; j < c->argc; j++) {
char *opt = c->argv[j]->ptr;
if (!strcasecmp(opt,"MERGE")) {
flags |= RDBFLAGS_ALLOW_DUP;
} else if (!strcasecmp(opt,"NOFLUSH")) {
flush = 0;
} else if (!strcasecmp(opt,"NOSAVE")) {
save = 0;
} else {
addReplyError(c,"DEBUG RELOAD only supports the "
"MERGE, NOFLUSH and NOSAVE options.");
return;
}
}
emptyDb(-1,EMPTYDB_NO_FLAGS,NULL);
/* The default beahvior is to save the RDB file before loading
* it back. */
if (save) {
rdbSaveInfo rsi, *rsiptr;
rsiptr = rdbPopulateSaveInfo(&rsi);
if (rdbSave(server.rdb_filename,rsiptr) != C_OK) {
addReply(c,shared.err);
return;
}
}
/* The default behavior is to remove the current dataset from
* memory before loading the RDB file, however when MERGE is
* used together with NOFLUSH, we are able to merge two datasets. */
if (flush) emptyDb(-1,EMPTYDB_NO_FLAGS,NULL);
protectClient(c);
int ret = rdbLoad(server.rdb_filename,NULL,RDBFLAGS_NONE);
int ret = rdbLoad(server.rdb_filename,NULL,flags);
unprotectClient(c);
if (ret != C_OK) {
addReplyError(c,"Error trying to load the RDB dump");

View File

@ -83,7 +83,7 @@ int dbAsyncDelete(redisDb *db, robj *key) {
* field to NULL in order to lazy free it later. */
if (de) {
dictFreeUnlinkedEntry(db->dict,de);
if (server.cluster_enabled) slotToKeyDel(key);
if (server.cluster_enabled) slotToKeyDel(key->ptr);
return 1;
} else {
return 0;

View File

@ -347,7 +347,15 @@ void freeStreamObject(robj *o) {
}
void incrRefCount(robj *o) {
if (o->refcount != OBJ_SHARED_REFCOUNT) o->refcount++;
if (o->refcount < OBJ_FIRST_SPECIAL_REFCOUNT) {
o->refcount++;
} else {
if (o->refcount == OBJ_SHARED_REFCOUNT) {
/* Nothing to do: this refcount is immutable. */
} else if (o->refcount == OBJ_STATIC_REFCOUNT) {
serverPanic("You tried to retain an object allocated in the stack");
}
}
}
void decrRefCount(robj *o) {

View File

@ -1422,7 +1422,7 @@ robj *rdbLoadCheckModuleValue(rio *rdb, char *modulename) {
/* Load a Redis object of the specified type from the specified file.
* On success a newly allocated object is returned, otherwise NULL. */
robj *rdbLoadObject(int rdbtype, rio *rdb, robj *key) {
robj *rdbLoadObject(int rdbtype, rio *rdb, sds key) {
robj *o = NULL, *ele, *dec;
uint64_t len;
unsigned int i;
@ -1886,7 +1886,9 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, robj *key) {
exit(1);
}
RedisModuleIO io;
moduleInitIOContext(io,mt,rdb,key);
robj keyobj;
initStaticStringObject(keyobj,key);
moduleInitIOContext(io,mt,rdb,&keyobj);
io.ver = (rdbtype == RDB_TYPE_MODULE) ? 1 : 2;
/* Call the rdb_load method of the module providing the 10 bit
* encoding version in the lower 10 bits of the module ID. */
@ -2044,7 +2046,8 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
long long lru_clock = LRU_CLOCK();
while(1) {
robj *key, *val;
sds key;
robj *val;
/* Read type. */
if ((type = rdbLoadType(rdb)) == -1) goto eoferr;
@ -2216,10 +2219,11 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
}
/* Read key */
if ((key = rdbLoadStringObject(rdb)) == NULL) goto eoferr;
if ((key = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL)) == NULL)
goto eoferr;
/* Read value */
if ((val = rdbLoadObject(type,rdb,key)) == NULL) {
decrRefCount(key);
sdsfree(key);
goto eoferr;
}
@ -2227,26 +2231,49 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
* an RDB file from disk, either at startup, or when an RDB was
* received from the master. In the latter case, the master is
* responsible for key expiry. If we would expire keys here, the
* snapshot taken by the master may not be reflected on the slave. */
if (iAmMaster() && !(rdbflags&RDBFLAGS_AOF_PREAMBLE) && expiretime != -1 && expiretime < now) {
decrRefCount(key);
* snapshot taken by the master may not be reflected on the slave.
* Similarly if the RDB is the preamble of an AOF file, we want to
* load all the keys as they are, since the log of operations later
* assume to work in an exact keyspace state. */
if (iAmMaster() &&
!(rdbflags&RDBFLAGS_AOF_PREAMBLE) &&
expiretime != -1 && expiretime < now)
{
sdsfree(key);
decrRefCount(val);
} else {
robj keyobj;
/* Add the new object in the hash table */
dbAdd(db,key,val);
int added = dbAddRDBLoad(db,key,val);
if (!added) {
if (rdbflags & RDBFLAGS_ALLOW_DUP) {
/* This flag is useful for DEBUG RELOAD special modes.
* When it's set we allow new keys to replace the current
* keys with the same name. */
initStaticStringObject(keyobj,key);
dbSyncDelete(db,&keyobj);
dbAddRDBLoad(db,key,val);
} else {
serverLog(LL_WARNING,
"RDB has duplicated key '%s' in DB %d",key,db->id);
serverPanic("Duplicated key found in RDB file");
}
}
/* Set the expire time if needed */
if (expiretime != -1) setExpire(NULL,db,key,expiretime);
if (expiretime != -1) {
initStaticStringObject(keyobj,key);
setExpire(NULL,db,&keyobj,expiretime);
}
/* Set usage information (for eviction). */
objectSetLRUOrLFU(val,lfu_freq,lru_idle,lru_clock,1000);
/* Decrement the key refcount since dbAdd() will take its
* own reference. */
decrRefCount(key);
}
if (server.key_load_delay)
usleep(server.key_load_delay);
/* Loading the database more slowly is useful in order to test
* certain edge cases. */
if (server.key_load_delay) usleep(server.key_load_delay);
/* Reset the state that is key-specified and is populated by
* opcodes before the key, so that we start from scratch again. */

View File

@ -122,9 +122,10 @@
#define RDB_LOAD_SDS (1<<2)
/* flags on the purpose of rdb save or load */
#define RDBFLAGS_NONE 0
#define RDBFLAGS_AOF_PREAMBLE (1<<0)
#define RDBFLAGS_REPLICATION (1<<1)
#define RDBFLAGS_NONE 0 /* No special RDB loading. */
#define RDBFLAGS_AOF_PREAMBLE (1<<0) /* Load/save the RDB as AOF preamble. */
#define RDBFLAGS_REPLICATION (1<<1) /* Load/save for SYNC. */
#define RDBFLAGS_ALLOW_DUP (1<<2) /* Allow duplicated keys when loading.*/
int rdbSaveType(rio *rdb, unsigned char type);
int rdbLoadType(rio *rdb);
@ -144,7 +145,7 @@ void rdbRemoveTempFile(pid_t childpid);
int rdbSave(char *filename, rdbSaveInfo *rsi);
ssize_t rdbSaveObject(rio *rdb, robj *o, robj *key);
size_t rdbSavedObjectLen(robj *o, robj *key);
robj *rdbLoadObject(int type, rio *rdb, robj *key);
robj *rdbLoadObject(int type, rio *rdb, sds key);
void backgroundSaveDoneHandler(int exitcode, int bysignal);
int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val, long long expiretime);
ssize_t rdbSaveSingleModuleAux(rio *rdb, int when, moduleType *mt);

View File

@ -287,7 +287,7 @@ int redis_check_rdb(char *rdbfilename, FILE *fp) {
rdbstate.keys++;
/* Read value */
rdbstate.doing = RDB_CHECK_DOING_READ_OBJECT_VALUE;
if ((val = rdbLoadObject(type,&rdb,key)) == NULL) goto eoferr;
if ((val = rdbLoadObject(type,&rdb,key->ptr)) == NULL) goto eoferr;
/* Check if the key already expired. */
if (expiretime != -1 && expiretime < now)
rdbstate.already_expired++;

View File

@ -597,7 +597,9 @@ typedef struct RedisModuleDigest {
#define LRU_CLOCK_MAX ((1<<LRU_BITS)-1) /* Max value of obj->lru */
#define LRU_CLOCK_RESOLUTION 1000 /* LRU clock resolution in ms */
#define OBJ_SHARED_REFCOUNT INT_MAX
#define OBJ_SHARED_REFCOUNT INT_MAX /* Global object never destroyed. */
#define OBJ_STATIC_REFCOUNT (INT_MAX-1) /* Object allocated in the stack. */
#define OBJ_FIRST_SPECIAL_REFCOUNT OBJ_STATIC_REFCOUNT
typedef struct redisObject {
unsigned type:4;
unsigned encoding:4;
@ -618,7 +620,7 @@ char *getObjectTypeName(robj*);
* we'll update it when the structure is changed, to avoid bugs like
* bug #85 introduced exactly in this way. */
#define initStaticStringObject(_var,_ptr) do { \
_var.refcount = 1; \
_var.refcount = OBJ_STATIC_REFCOUNT; \
_var.type = OBJ_STRING; \
_var.encoding = OBJ_ENCODING_RAW; \
_var.ptr = _ptr; \
@ -2067,6 +2069,7 @@ int objectSetLRUOrLFU(robj *val, long long lfu_freq, long long lru_idle,
#define LOOKUP_NONE 0
#define LOOKUP_NOTOUCH (1<<0)
void dbAdd(redisDb *db, robj *key, robj *val);
int dbAddRDBLoad(redisDb *db, sds key, robj *val);
void dbOverwrite(redisDb *db, robj *key, robj *val);
void genericSetKey(redisDb *db, robj *key, robj *val, int keepttl, int signal);
void setKey(redisDb *db, robj *key, robj *val);
@ -2093,8 +2096,8 @@ unsigned int delKeysInSlot(unsigned int hashslot);
int verifyClusterConfigWithData(void);
void scanGenericCommand(client *c, robj *o, unsigned long cursor);
int parseScanCursorOrReply(client *c, robj *o, unsigned long *cursor);
void slotToKeyAdd(robj *key);
void slotToKeyDel(robj *key);
void slotToKeyAdd(sds key);
void slotToKeyDel(sds key);
void slotToKeyFlush(void);
int dbAsyncDelete(redisDb *db, robj *key);
void emptyDbAsync(redisDb *db);