mirror of
https://codeberg.org/redict/redict.git
synced 2025-01-22 16:18:28 -05:00
Merge branch 'unstable' of github.com:antirez/redis into unstable
This commit is contained in:
commit
c6dc8d5288
14
redis.conf
14
redis.conf
@ -755,6 +755,20 @@ auto-aof-rewrite-min-size 64mb
|
||||
# will be found.
|
||||
aof-load-truncated yes
|
||||
|
||||
# When rewriting the AOF file, Redis is able to use an RDB preamble in the
|
||||
# AOF file for faster rewrites and recoveries. When this option is turned
|
||||
# on the rewritten AOF file is composed of two different stanzas:
|
||||
#
|
||||
# [RDB file][AOF tail]
|
||||
#
|
||||
# When loading Redis recognizes that the AOF file starts with the "REDIS"
|
||||
# string and loads the prefixed RDB file, and continues loading the AOF
|
||||
# tail.
|
||||
#
|
||||
# This is currently turned off by default in order to avoid the surprise
|
||||
# of a format change, but will at some point be used as the default.
|
||||
aof-use-rdb-preamble no
|
||||
|
||||
################################ LUA SCRIPTING ###############################
|
||||
|
||||
# Max execution time of a Lua script in milliseconds.
|
||||
|
@ -486,7 +486,7 @@ static int _anetTcpServer(char *err, int port, char *bindaddr, int af, int backl
|
||||
goto end;
|
||||
}
|
||||
if (p == NULL) {
|
||||
anetSetError(err, "unable to bind socket");
|
||||
anetSetError(err, "unable to bind socket, errno: %d", errno);
|
||||
goto error;
|
||||
}
|
||||
|
||||
|
151
src/aof.c
151
src/aof.c
@ -616,19 +616,23 @@ int loadAppendOnlyFile(char *filename) {
|
||||
struct redis_stat sb;
|
||||
int old_aof_state = server.aof_state;
|
||||
long loops = 0;
|
||||
off_t valid_up_to = 0; /* Offset of the latest well-formed command loaded. */
|
||||
|
||||
if (fp && redis_fstat(fileno(fp),&sb) != -1 && sb.st_size == 0) {
|
||||
server.aof_current_size = 0;
|
||||
fclose(fp);
|
||||
return C_ERR;
|
||||
}
|
||||
off_t valid_up_to = 0; /* Offset of latest well-formed command loaded. */
|
||||
|
||||
if (fp == NULL) {
|
||||
serverLog(LL_WARNING,"Fatal error: can't open the append log file for reading: %s",strerror(errno));
|
||||
exit(1);
|
||||
}
|
||||
|
||||
/* Handle a zero-length AOF file as a special case. An emtpy AOF file
|
||||
* is a valid AOF because an empty server with AOF enabled will create
|
||||
* a zero length file at startup, that will remain like that if no write
|
||||
* operation is received. */
|
||||
if (fp && redis_fstat(fileno(fp),&sb) != -1 && sb.st_size == 0) {
|
||||
server.aof_current_size = 0;
|
||||
fclose(fp);
|
||||
return C_ERR;
|
||||
}
|
||||
|
||||
/* Temporarily disable AOF, to prevent EXEC from feeding a MULTI
|
||||
* to the same file we're about to read. */
|
||||
server.aof_state = AOF_OFF;
|
||||
@ -636,6 +640,28 @@ int loadAppendOnlyFile(char *filename) {
|
||||
fakeClient = createFakeClient();
|
||||
startLoading(fp);
|
||||
|
||||
/* Check if this AOF file has an RDB preamble. In that case we need to
|
||||
* load the RDB file and later continue loading the AOF tail. */
|
||||
char sig[5]; /* "REDIS" */
|
||||
if (fread(sig,1,5,fp) != 5 || memcmp(sig,"REDIS",5) != 0) {
|
||||
/* No RDB preamble, seek back at 0 offset. */
|
||||
if (fseek(fp,0,SEEK_SET) == -1) goto readerr;
|
||||
} else {
|
||||
/* RDB preamble. Pass loading the RDB functions. */
|
||||
rio rdb;
|
||||
|
||||
serverLog(LL_NOTICE,"Reading RDB preamble from AOF file...");
|
||||
if (fseek(fp,0,SEEK_SET) == -1) goto readerr;
|
||||
rioInitWithFile(&rdb,fp);
|
||||
if (rdbLoadRio(&rdb) != C_OK) {
|
||||
serverLog(LL_WARNING,"Error reading the RDB preamble of the AOF file, AOF loading aborted");
|
||||
goto readerr;
|
||||
} else {
|
||||
serverLog(LL_NOTICE,"Reading the remaining AOF tail...");
|
||||
}
|
||||
}
|
||||
|
||||
/* Read the actual AOF file, in REPL format, command by command. */
|
||||
while(1) {
|
||||
int argc, j;
|
||||
unsigned long len;
|
||||
@ -989,7 +1015,7 @@ int rewriteHashObject(rio *r, robj *key, robj *o) {
|
||||
}
|
||||
|
||||
/* Call the module type callback in order to rewrite a data type
|
||||
* taht is exported by a module and is not handled by Redis itself.
|
||||
* that is exported by a module and is not handled by Redis itself.
|
||||
* The function returns 0 on error, 1 on success. */
|
||||
int rewriteModuleObject(rio *r, robj *key, robj *o) {
|
||||
RedisModuleIO io;
|
||||
@ -1015,51 +1041,23 @@ ssize_t aofReadDiffFromParent(void) {
|
||||
return total;
|
||||
}
|
||||
|
||||
/* Write a sequence of commands able to fully rebuild the dataset into
|
||||
* "filename". Used both by REWRITEAOF and BGREWRITEAOF.
|
||||
*
|
||||
* In order to minimize the number of commands needed in the rewritten
|
||||
* log Redis uses variadic commands when possible, such as RPUSH, SADD
|
||||
* and ZADD. However at max AOF_REWRITE_ITEMS_PER_CMD items per time
|
||||
* are inserted using a single command. */
|
||||
int rewriteAppendOnlyFile(char *filename) {
|
||||
int rewriteAppendOnlyFileRio(rio *aof) {
|
||||
dictIterator *di = NULL;
|
||||
dictEntry *de;
|
||||
rio aof;
|
||||
FILE *fp;
|
||||
char tmpfile[256];
|
||||
int j;
|
||||
long long now = mstime();
|
||||
char byte;
|
||||
size_t processed = 0;
|
||||
long long now = mstime();
|
||||
int j;
|
||||
|
||||
/* Note that we have to use a different temp name here compared to the
|
||||
* one used by rewriteAppendOnlyFileBackground() function. */
|
||||
snprintf(tmpfile,256,"temp-rewriteaof-%d.aof", (int) getpid());
|
||||
fp = fopen(tmpfile,"w");
|
||||
if (!fp) {
|
||||
serverLog(LL_WARNING, "Opening the temp file for AOF rewrite in rewriteAppendOnlyFile(): %s", strerror(errno));
|
||||
return C_ERR;
|
||||
}
|
||||
|
||||
server.aof_child_diff = sdsempty();
|
||||
rioInitWithFile(&aof,fp);
|
||||
if (server.aof_rewrite_incremental_fsync)
|
||||
rioSetAutoSync(&aof,AOF_AUTOSYNC_BYTES);
|
||||
for (j = 0; j < server.dbnum; j++) {
|
||||
char selectcmd[] = "*2\r\n$6\r\nSELECT\r\n";
|
||||
redisDb *db = server.db+j;
|
||||
dict *d = db->dict;
|
||||
if (dictSize(d) == 0) continue;
|
||||
di = dictGetSafeIterator(d);
|
||||
if (!di) {
|
||||
fclose(fp);
|
||||
return C_ERR;
|
||||
}
|
||||
|
||||
/* SELECT the new DB */
|
||||
if (rioWrite(&aof,selectcmd,sizeof(selectcmd)-1) == 0) goto werr;
|
||||
if (rioWriteBulkLongLong(&aof,j) == 0) goto werr;
|
||||
if (rioWrite(aof,selectcmd,sizeof(selectcmd)-1) == 0) goto werr;
|
||||
if (rioWriteBulkLongLong(aof,j) == 0) goto werr;
|
||||
|
||||
/* Iterate this DB writing every entry */
|
||||
while((de = dictNext(di)) != NULL) {
|
||||
@ -1080,39 +1078,83 @@ int rewriteAppendOnlyFile(char *filename) {
|
||||
if (o->type == OBJ_STRING) {
|
||||
/* Emit a SET command */
|
||||
char cmd[]="*3\r\n$3\r\nSET\r\n";
|
||||
if (rioWrite(&aof,cmd,sizeof(cmd)-1) == 0) goto werr;
|
||||
if (rioWrite(aof,cmd,sizeof(cmd)-1) == 0) goto werr;
|
||||
/* Key and value */
|
||||
if (rioWriteBulkObject(&aof,&key) == 0) goto werr;
|
||||
if (rioWriteBulkObject(&aof,o) == 0) goto werr;
|
||||
if (rioWriteBulkObject(aof,&key) == 0) goto werr;
|
||||
if (rioWriteBulkObject(aof,o) == 0) goto werr;
|
||||
} else if (o->type == OBJ_LIST) {
|
||||
if (rewriteListObject(&aof,&key,o) == 0) goto werr;
|
||||
if (rewriteListObject(aof,&key,o) == 0) goto werr;
|
||||
} else if (o->type == OBJ_SET) {
|
||||
if (rewriteSetObject(&aof,&key,o) == 0) goto werr;
|
||||
if (rewriteSetObject(aof,&key,o) == 0) goto werr;
|
||||
} else if (o->type == OBJ_ZSET) {
|
||||
if (rewriteSortedSetObject(&aof,&key,o) == 0) goto werr;
|
||||
if (rewriteSortedSetObject(aof,&key,o) == 0) goto werr;
|
||||
} else if (o->type == OBJ_HASH) {
|
||||
if (rewriteHashObject(&aof,&key,o) == 0) goto werr;
|
||||
if (rewriteHashObject(aof,&key,o) == 0) goto werr;
|
||||
} else if (o->type == OBJ_MODULE) {
|
||||
if (rewriteModuleObject(&aof,&key,o) == 0) goto werr;
|
||||
if (rewriteModuleObject(aof,&key,o) == 0) goto werr;
|
||||
} else {
|
||||
serverPanic("Unknown object type");
|
||||
}
|
||||
/* Save the expire time */
|
||||
if (expiretime != -1) {
|
||||
char cmd[]="*3\r\n$9\r\nPEXPIREAT\r\n";
|
||||
if (rioWrite(&aof,cmd,sizeof(cmd)-1) == 0) goto werr;
|
||||
if (rioWriteBulkObject(&aof,&key) == 0) goto werr;
|
||||
if (rioWriteBulkLongLong(&aof,expiretime) == 0) goto werr;
|
||||
if (rioWrite(aof,cmd,sizeof(cmd)-1) == 0) goto werr;
|
||||
if (rioWriteBulkObject(aof,&key) == 0) goto werr;
|
||||
if (rioWriteBulkLongLong(aof,expiretime) == 0) goto werr;
|
||||
}
|
||||
/* Read some diff from the parent process from time to time. */
|
||||
if (aof.processed_bytes > processed+1024*10) {
|
||||
processed = aof.processed_bytes;
|
||||
if (aof->processed_bytes > processed+AOF_READ_DIFF_INTERVAL_BYTES) {
|
||||
processed = aof->processed_bytes;
|
||||
aofReadDiffFromParent();
|
||||
}
|
||||
}
|
||||
dictReleaseIterator(di);
|
||||
di = NULL;
|
||||
}
|
||||
return C_OK;
|
||||
|
||||
werr:
|
||||
if (di) dictReleaseIterator(di);
|
||||
return C_ERR;
|
||||
}
|
||||
|
||||
/* Write a sequence of commands able to fully rebuild the dataset into
|
||||
* "filename". Used both by REWRITEAOF and BGREWRITEAOF.
|
||||
*
|
||||
* In order to minimize the number of commands needed in the rewritten
|
||||
* log Redis uses variadic commands when possible, such as RPUSH, SADD
|
||||
* and ZADD. However at max AOF_REWRITE_ITEMS_PER_CMD items per time
|
||||
* are inserted using a single command. */
|
||||
int rewriteAppendOnlyFile(char *filename) {
|
||||
rio aof;
|
||||
FILE *fp;
|
||||
char tmpfile[256];
|
||||
char byte;
|
||||
|
||||
/* Note that we have to use a different temp name here compared to the
|
||||
* one used by rewriteAppendOnlyFileBackground() function. */
|
||||
snprintf(tmpfile,256,"temp-rewriteaof-%d.aof", (int) getpid());
|
||||
fp = fopen(tmpfile,"w");
|
||||
if (!fp) {
|
||||
serverLog(LL_WARNING, "Opening the temp file for AOF rewrite in rewriteAppendOnlyFile(): %s", strerror(errno));
|
||||
return C_ERR;
|
||||
}
|
||||
|
||||
server.aof_child_diff = sdsempty();
|
||||
rioInitWithFile(&aof,fp);
|
||||
|
||||
if (server.aof_rewrite_incremental_fsync)
|
||||
rioSetAutoSync(&aof,AOF_AUTOSYNC_BYTES);
|
||||
|
||||
if (server.aof_use_rdb_preamble) {
|
||||
int error;
|
||||
if (rdbSaveRio(&aof,&error,RDB_SAVE_AOF_PREAMBLE) == C_ERR) {
|
||||
errno = error;
|
||||
goto werr;
|
||||
}
|
||||
} else {
|
||||
if (rewriteAppendOnlyFileRio(&aof) == C_ERR) goto werr;
|
||||
}
|
||||
|
||||
/* Do an initial slow fsync here while the parent is still sending
|
||||
* data, in order to make the next final fsync faster. */
|
||||
@ -1178,7 +1220,6 @@ werr:
|
||||
serverLog(LL_WARNING,"Write error writing append only file on disk: %s", strerror(errno));
|
||||
fclose(fp);
|
||||
unlink(tmpfile);
|
||||
if (di) dictReleaseIterator(di);
|
||||
return C_ERR;
|
||||
}
|
||||
|
||||
|
@ -475,6 +475,10 @@ void loadServerConfigFromString(char *config) {
|
||||
if ((server.aof_load_truncated = yesnotoi(argv[1])) == -1) {
|
||||
err = "argument must be 'yes' or 'no'"; goto loaderr;
|
||||
}
|
||||
} else if (!strcasecmp(argv[0],"aof-use-rdb-preamble") && argc == 2) {
|
||||
if ((server.aof_use_rdb_preamble = yesnotoi(argv[1])) == -1) {
|
||||
err = "argument must be 'yes' or 'no'"; goto loaderr;
|
||||
}
|
||||
} else if (!strcasecmp(argv[0],"requirepass") && argc == 2) {
|
||||
if (strlen(argv[1]) > CONFIG_AUTHPASS_MAX_LEN) {
|
||||
err = "Password is longer than CONFIG_AUTHPASS_MAX_LEN";
|
||||
@ -953,6 +957,8 @@ void configSetCommand(client *c) {
|
||||
"aof-rewrite-incremental-fsync",server.aof_rewrite_incremental_fsync) {
|
||||
} config_set_bool_field(
|
||||
"aof-load-truncated",server.aof_load_truncated) {
|
||||
} config_set_bool_field(
|
||||
"aof-use-rdb-preamble",server.aof_use_rdb_preamble) {
|
||||
} config_set_bool_field(
|
||||
"slave-serve-stale-data",server.repl_serve_stale_data) {
|
||||
} config_set_bool_field(
|
||||
@ -1227,6 +1233,8 @@ void configGetCommand(client *c) {
|
||||
server.aof_rewrite_incremental_fsync);
|
||||
config_get_bool_field("aof-load-truncated",
|
||||
server.aof_load_truncated);
|
||||
config_get_bool_field("aof-use-rdb-preamble",
|
||||
server.aof_use_rdb_preamble);
|
||||
config_get_bool_field("lazyfree-lazy-eviction",
|
||||
server.lazyfree_lazy_eviction);
|
||||
config_get_bool_field("lazyfree-lazy-expire",
|
||||
@ -1947,6 +1955,7 @@ int rewriteConfig(char *path) {
|
||||
rewriteConfigNumericalOption(state,"hz",server.hz,CONFIG_DEFAULT_HZ);
|
||||
rewriteConfigYesNoOption(state,"aof-rewrite-incremental-fsync",server.aof_rewrite_incremental_fsync,CONFIG_DEFAULT_AOF_REWRITE_INCREMENTAL_FSYNC);
|
||||
rewriteConfigYesNoOption(state,"aof-load-truncated",server.aof_load_truncated,CONFIG_DEFAULT_AOF_LOAD_TRUNCATED);
|
||||
rewriteConfigYesNoOption(state,"aof-use-rdb-preamble",server.aof_use_rdb_preamble,CONFIG_DEFAULT_AOF_USE_RDB_PREAMBLE);
|
||||
rewriteConfigEnumOption(state,"supervised",server.supervised_mode,supervised_mode_enum,SUPERVISED_NONE);
|
||||
rewriteConfigYesNoOption(state,"lazyfree-lazy-eviction",server.lazyfree_lazy_eviction,CONFIG_DEFAULT_LAZYFREE_LAZY_EVICTION);
|
||||
rewriteConfigYesNoOption(state,"lazyfree-lazy-expire",server.lazyfree_lazy_expire,CONFIG_DEFAULT_LAZYFREE_LAZY_EXPIRE);
|
||||
|
90
src/rdb.c
90
src/rdb.c
@ -818,14 +818,16 @@ int rdbSaveAuxFieldStrInt(rio *rdb, char *key, long long val) {
|
||||
}
|
||||
|
||||
/* Save a few default AUX fields with information about the RDB generated. */
|
||||
int rdbSaveInfoAuxFields(rio *rdb) {
|
||||
int rdbSaveInfoAuxFields(rio *rdb, int flags) {
|
||||
int redis_bits = (sizeof(void*) == 8) ? 64 : 32;
|
||||
int aof_preamble = (flags & RDB_SAVE_AOF_PREAMBLE) != 0;
|
||||
|
||||
/* Add a few fields about the state when the RDB was created. */
|
||||
if (rdbSaveAuxFieldStrStr(rdb,"redis-ver",REDIS_VERSION) == -1) return -1;
|
||||
if (rdbSaveAuxFieldStrInt(rdb,"redis-bits",redis_bits) == -1) return -1;
|
||||
if (rdbSaveAuxFieldStrInt(rdb,"ctime",time(NULL)) == -1) return -1;
|
||||
if (rdbSaveAuxFieldStrInt(rdb,"used-mem",zmalloc_used_memory()) == -1) return -1;
|
||||
if (rdbSaveAuxFieldStrInt(rdb,"aof-preamble",aof_preamble) == -1) return -1;
|
||||
return 1;
|
||||
}
|
||||
|
||||
@ -837,19 +839,20 @@ int rdbSaveInfoAuxFields(rio *rdb) {
|
||||
* When the function returns C_ERR and if 'error' is not NULL, the
|
||||
* integer pointed by 'error' is set to the value of errno just after the I/O
|
||||
* error. */
|
||||
int rdbSaveRio(rio *rdb, int *error) {
|
||||
int rdbSaveRio(rio *rdb, int *error, int flags) {
|
||||
dictIterator *di = NULL;
|
||||
dictEntry *de;
|
||||
char magic[10];
|
||||
int j;
|
||||
long long now = mstime();
|
||||
uint64_t cksum;
|
||||
size_t processed = 0;
|
||||
|
||||
if (server.rdb_checksum)
|
||||
rdb->update_cksum = rioGenericUpdateChecksum;
|
||||
snprintf(magic,sizeof(magic),"REDIS%04d",RDB_VERSION);
|
||||
if (rdbWriteRaw(rdb,magic,9) == -1) goto werr;
|
||||
if (rdbSaveInfoAuxFields(rdb) == -1) goto werr;
|
||||
if (rdbSaveInfoAuxFields(rdb,flags) == -1) goto werr;
|
||||
|
||||
for (j = 0; j < server.dbnum; j++) {
|
||||
redisDb *db = server.db+j;
|
||||
@ -886,6 +889,16 @@ int rdbSaveRio(rio *rdb, int *error) {
|
||||
initStaticStringObject(key,keystr);
|
||||
expire = getExpire(db,&key);
|
||||
if (rdbSaveKeyValuePair(rdb,&key,o,expire,now) == -1) goto werr;
|
||||
|
||||
/* When this RDB is produced as part of an AOF rewrite, move
|
||||
* accumulated diff from parent to child while rewriting in
|
||||
* order to have a smaller final write. */
|
||||
if (flags & RDB_SAVE_AOF_PREAMBLE &&
|
||||
rdb->processed_bytes > processed+AOF_READ_DIFF_INTERVAL_BYTES)
|
||||
{
|
||||
processed = rdb->processed_bytes;
|
||||
aofReadDiffFromParent();
|
||||
}
|
||||
}
|
||||
dictReleaseIterator(di);
|
||||
}
|
||||
@ -923,7 +936,7 @@ int rdbSaveRioWithEOFMark(rio *rdb, int *error) {
|
||||
if (rioWrite(rdb,"$EOF:",5) == 0) goto werr;
|
||||
if (rioWrite(rdb,eofmark,RDB_EOF_MARK_SIZE) == 0) goto werr;
|
||||
if (rioWrite(rdb,"\r\n",2) == 0) goto werr;
|
||||
if (rdbSaveRio(rdb,error) == C_ERR) goto werr;
|
||||
if (rdbSaveRio(rdb,error,RDB_SAVE_NONE) == C_ERR) goto werr;
|
||||
if (rioWrite(rdb,eofmark,RDB_EOF_MARK_SIZE) == 0) goto werr;
|
||||
return C_OK;
|
||||
|
||||
@ -955,7 +968,7 @@ int rdbSave(char *filename) {
|
||||
}
|
||||
|
||||
rioInitWithFile(&rdb,fp);
|
||||
if (rdbSaveRio(&rdb,&error) == C_ERR) {
|
||||
if (rdbSaveRio(&rdb,&error,RDB_SAVE_NONE) == C_ERR) {
|
||||
errno = error;
|
||||
goto werr;
|
||||
}
|
||||
@ -1373,67 +1386,61 @@ void rdbLoadProgressCallback(rio *r, const void *buf, size_t len) {
|
||||
}
|
||||
}
|
||||
|
||||
int rdbLoad(char *filename) {
|
||||
/* Load an RDB file from the rio stream 'rdb'. On success C_OK is returned,
|
||||
* otherwise C_ERR is returned and 'errno' is set accordingly. */
|
||||
int rdbLoadRio(rio *rdb) {
|
||||
uint64_t dbid;
|
||||
int type, rdbver;
|
||||
redisDb *db = server.db+0;
|
||||
char buf[1024];
|
||||
long long expiretime, now = mstime();
|
||||
FILE *fp;
|
||||
rio rdb;
|
||||
|
||||
if ((fp = fopen(filename,"r")) == NULL) return C_ERR;
|
||||
|
||||
rioInitWithFile(&rdb,fp);
|
||||
rdb.update_cksum = rdbLoadProgressCallback;
|
||||
rdb.max_processing_chunk = server.loading_process_events_interval_bytes;
|
||||
if (rioRead(&rdb,buf,9) == 0) goto eoferr;
|
||||
rdb->update_cksum = rdbLoadProgressCallback;
|
||||
rdb->max_processing_chunk = server.loading_process_events_interval_bytes;
|
||||
if (rioRead(rdb,buf,9) == 0) goto eoferr;
|
||||
buf[9] = '\0';
|
||||
if (memcmp(buf,"REDIS",5) != 0) {
|
||||
fclose(fp);
|
||||
serverLog(LL_WARNING,"Wrong signature trying to load DB from file");
|
||||
errno = EINVAL;
|
||||
return C_ERR;
|
||||
}
|
||||
rdbver = atoi(buf+5);
|
||||
if (rdbver < 1 || rdbver > RDB_VERSION) {
|
||||
fclose(fp);
|
||||
serverLog(LL_WARNING,"Can't handle RDB format version %d",rdbver);
|
||||
errno = EINVAL;
|
||||
return C_ERR;
|
||||
}
|
||||
|
||||
startLoading(fp);
|
||||
while(1) {
|
||||
robj *key, *val;
|
||||
expiretime = -1;
|
||||
|
||||
/* Read type. */
|
||||
if ((type = rdbLoadType(&rdb)) == -1) goto eoferr;
|
||||
if ((type = rdbLoadType(rdb)) == -1) goto eoferr;
|
||||
|
||||
/* Handle special types. */
|
||||
if (type == RDB_OPCODE_EXPIRETIME) {
|
||||
/* EXPIRETIME: load an expire associated with the next key
|
||||
* to load. Note that after loading an expire we need to
|
||||
* load the actual type, and continue. */
|
||||
if ((expiretime = rdbLoadTime(&rdb)) == -1) goto eoferr;
|
||||
if ((expiretime = rdbLoadTime(rdb)) == -1) goto eoferr;
|
||||
/* We read the time so we need to read the object type again. */
|
||||
if ((type = rdbLoadType(&rdb)) == -1) goto eoferr;
|
||||
if ((type = rdbLoadType(rdb)) == -1) goto eoferr;
|
||||
/* the EXPIRETIME opcode specifies time in seconds, so convert
|
||||
* into milliseconds. */
|
||||
expiretime *= 1000;
|
||||
} else if (type == RDB_OPCODE_EXPIRETIME_MS) {
|
||||
/* EXPIRETIME_MS: milliseconds precision expire times introduced
|
||||
* with RDB v3. Like EXPIRETIME but no with more precision. */
|
||||
if ((expiretime = rdbLoadMillisecondTime(&rdb)) == -1) goto eoferr;
|
||||
if ((expiretime = rdbLoadMillisecondTime(rdb)) == -1) goto eoferr;
|
||||
/* We read the time so we need to read the object type again. */
|
||||
if ((type = rdbLoadType(&rdb)) == -1) goto eoferr;
|
||||
if ((type = rdbLoadType(rdb)) == -1) goto eoferr;
|
||||
} else if (type == RDB_OPCODE_EOF) {
|
||||
/* EOF: End of file, exit the main loop. */
|
||||
break;
|
||||
} else if (type == RDB_OPCODE_SELECTDB) {
|
||||
/* SELECTDB: Select the specified database. */
|
||||
if ((dbid = rdbLoadLen(&rdb,NULL)) == RDB_LENERR)
|
||||
if ((dbid = rdbLoadLen(rdb,NULL)) == RDB_LENERR)
|
||||
goto eoferr;
|
||||
if (dbid >= (unsigned)server.dbnum) {
|
||||
serverLog(LL_WARNING,
|
||||
@ -1448,9 +1455,9 @@ int rdbLoad(char *filename) {
|
||||
/* RESIZEDB: Hint about the size of the keys in the currently
|
||||
* selected data base, in order to avoid useless rehashing. */
|
||||
uint64_t db_size, expires_size;
|
||||
if ((db_size = rdbLoadLen(&rdb,NULL)) == RDB_LENERR)
|
||||
if ((db_size = rdbLoadLen(rdb,NULL)) == RDB_LENERR)
|
||||
goto eoferr;
|
||||
if ((expires_size = rdbLoadLen(&rdb,NULL)) == RDB_LENERR)
|
||||
if ((expires_size = rdbLoadLen(rdb,NULL)) == RDB_LENERR)
|
||||
goto eoferr;
|
||||
dictExpand(db->dict,db_size);
|
||||
dictExpand(db->expires,expires_size);
|
||||
@ -1462,8 +1469,8 @@ int rdbLoad(char *filename) {
|
||||
*
|
||||
* An AUX field is composed of two strings: key and value. */
|
||||
robj *auxkey, *auxval;
|
||||
if ((auxkey = rdbLoadStringObject(&rdb)) == NULL) goto eoferr;
|
||||
if ((auxval = rdbLoadStringObject(&rdb)) == NULL) goto eoferr;
|
||||
if ((auxkey = rdbLoadStringObject(rdb)) == NULL) goto eoferr;
|
||||
if ((auxval = rdbLoadStringObject(rdb)) == NULL) goto eoferr;
|
||||
|
||||
if (((char*)auxkey->ptr)[0] == '%') {
|
||||
/* All the fields with a name staring with '%' are considered
|
||||
@ -1485,9 +1492,9 @@ int rdbLoad(char *filename) {
|
||||
}
|
||||
|
||||
/* Read key */
|
||||
if ((key = rdbLoadStringObject(&rdb)) == NULL) goto eoferr;
|
||||
if ((key = rdbLoadStringObject(rdb)) == NULL) goto eoferr;
|
||||
/* Read value */
|
||||
if ((val = rdbLoadObject(type,&rdb)) == NULL) goto eoferr;
|
||||
if ((val = rdbLoadObject(type,rdb)) == NULL) goto eoferr;
|
||||
/* Check if the key already expired. This function is used when loading
|
||||
* an RDB file from disk, either at startup, or when an RDB was
|
||||
* received from the master. In the latter case, the master is
|
||||
@ -1508,9 +1515,9 @@ int rdbLoad(char *filename) {
|
||||
}
|
||||
/* Verify the checksum if RDB version is >= 5 */
|
||||
if (rdbver >= 5 && server.rdb_checksum) {
|
||||
uint64_t cksum, expected = rdb.cksum;
|
||||
uint64_t cksum, expected = rdb->cksum;
|
||||
|
||||
if (rioRead(&rdb,&cksum,8) == 0) goto eoferr;
|
||||
if (rioRead(rdb,&cksum,8) == 0) goto eoferr;
|
||||
memrev64ifbe(&cksum);
|
||||
if (cksum == 0) {
|
||||
serverLog(LL_WARNING,"RDB file was saved with checksum disabled: no check performed.");
|
||||
@ -1519,9 +1526,6 @@ int rdbLoad(char *filename) {
|
||||
rdbExitReportCorruptRDB("RDB CRC error");
|
||||
}
|
||||
}
|
||||
|
||||
fclose(fp);
|
||||
stopLoading();
|
||||
return C_OK;
|
||||
|
||||
eoferr: /* unexpected end of file is handled here with a fatal exit */
|
||||
@ -1530,6 +1534,24 @@ eoferr: /* unexpected end of file is handled here with a fatal exit */
|
||||
return C_ERR; /* Just to avoid warning */
|
||||
}
|
||||
|
||||
/* Like rdbLoadRio() but takes a filename instead of a rio stream. The
|
||||
* filename is open for reading and a rio stream object created in order
|
||||
* to do the actual loading. Moreover the ETA displayed in the INFO
|
||||
* output is initialized and finalized. */
|
||||
int rdbLoad(char *filename) {
|
||||
FILE *fp;
|
||||
rio rdb;
|
||||
int retval;
|
||||
|
||||
if ((fp = fopen(filename,"r")) == NULL) return C_ERR;
|
||||
startLoading(fp);
|
||||
rioInitWithFile(&rdb,fp);
|
||||
retval = rdbLoadRio(&rdb);
|
||||
fclose(fp);
|
||||
stopLoading();
|
||||
return retval;
|
||||
}
|
||||
|
||||
/* A background saving child (BGSAVE) terminated its work. Handle this.
|
||||
* This function covers the case of actual BGSAVEs. */
|
||||
void backgroundSaveDoneHandlerDisk(int exitcode, int bysignal) {
|
||||
|
@ -106,6 +106,9 @@
|
||||
#define RDB_LOAD_PLAIN (1<<1)
|
||||
#define RDB_LOAD_SDS (1<<2)
|
||||
|
||||
#define RDB_SAVE_NONE 0
|
||||
#define RDB_SAVE_AOF_PREAMBLE (1<<0)
|
||||
|
||||
int rdbSaveType(rio *rdb, unsigned char type);
|
||||
int rdbLoadType(rio *rdb);
|
||||
int rdbSaveTime(rio *rdb, time_t t);
|
||||
@ -131,5 +134,6 @@ ssize_t rdbSaveRawString(rio *rdb, unsigned char *s, size_t len);
|
||||
void *rdbGenericLoadStringObject(rio *rdb, int flags, size_t *lenptr);
|
||||
int rdbSaveBinaryDoubleValue(rio *rdb, double val);
|
||||
int rdbLoadBinaryDoubleValue(rio *rdb, double *val);
|
||||
int rdbLoadRio(rio *rdb);
|
||||
|
||||
#endif
|
||||
|
12
src/server.c
12
src/server.c
@ -1345,6 +1345,7 @@ void initServerConfig(void) {
|
||||
server.aof_flush_postponed_start = 0;
|
||||
server.aof_rewrite_incremental_fsync = CONFIG_DEFAULT_AOF_REWRITE_INCREMENTAL_FSYNC;
|
||||
server.aof_load_truncated = CONFIG_DEFAULT_AOF_LOAD_TRUNCATED;
|
||||
server.aof_use_rdb_preamble = CONFIG_DEFAULT_AOF_USE_RDB_PREAMBLE;
|
||||
server.pidfile = NULL;
|
||||
server.rdb_filename = zstrdup(CONFIG_DEFAULT_RDB_FILENAME);
|
||||
server.aof_filename = zstrdup(CONFIG_DEFAULT_AOF_FILENAME);
|
||||
@ -1636,6 +1637,7 @@ int listenToPort(int port, int *fds, int *count) {
|
||||
if (server.bindaddr_count == 0) server.bindaddr[0] = NULL;
|
||||
for (j = 0; j < server.bindaddr_count || j == 0; j++) {
|
||||
if (server.bindaddr[j] == NULL) {
|
||||
int unsupported = 0;
|
||||
/* Bind * for both IPv6 and IPv4, we enter here only if
|
||||
* server.bindaddr_count == 0. */
|
||||
fds[*count] = anetTcp6Server(server.neterr,port,NULL,
|
||||
@ -1643,19 +1645,27 @@ int listenToPort(int port, int *fds, int *count) {
|
||||
if (fds[*count] != ANET_ERR) {
|
||||
anetNonBlock(NULL,fds[*count]);
|
||||
(*count)++;
|
||||
} else if (errno == EAFNOSUPPORT) {
|
||||
unsupported++;
|
||||
serverLog(LL_WARNING,"Not listening to IPv6: unsupproted");
|
||||
}
|
||||
|
||||
if (*count == 1 || unsupported) {
|
||||
/* Bind the IPv4 address as well. */
|
||||
fds[*count] = anetTcpServer(server.neterr,port,NULL,
|
||||
server.tcp_backlog);
|
||||
if (fds[*count] != ANET_ERR) {
|
||||
anetNonBlock(NULL,fds[*count]);
|
||||
(*count)++;
|
||||
} else if (errno == EAFNOSUPPORT) {
|
||||
unsupported++;
|
||||
serverLog(LL_WARNING,"Not listening to IPv4: unsupproted");
|
||||
}
|
||||
}
|
||||
/* Exit the loop if we were able to bind * on IPv4 and IPv6,
|
||||
* otherwise fds[*count] will be ANET_ERR and we'll print an
|
||||
* error and return to the caller with an error. */
|
||||
if (*count == 2) break;
|
||||
if (*count + unsupported == 2) break;
|
||||
} else if (strchr(server.bindaddr[j],':')) {
|
||||
/* Bind IPv6 address. */
|
||||
fds[*count] = anetTcp6Server(server.neterr,port,server.bindaddr[j],
|
||||
|
@ -93,6 +93,7 @@ typedef long long mstime_t; /* millisecond time type. */
|
||||
#define AOF_REWRITE_PERC 100
|
||||
#define AOF_REWRITE_MIN_SIZE (64*1024*1024)
|
||||
#define AOF_REWRITE_ITEMS_PER_CMD 64
|
||||
#define AOF_READ_DIFF_INTERVAL_BYTES (1024*10)
|
||||
#define CONFIG_DEFAULT_SLOWLOG_LOG_SLOWER_THAN 10000
|
||||
#define CONFIG_DEFAULT_SLOWLOG_MAX_LEN 128
|
||||
#define CONFIG_DEFAULT_MAX_CLIENTS 10000
|
||||
@ -136,6 +137,7 @@ typedef long long mstime_t; /* millisecond time type. */
|
||||
#define CONFIG_DEFAULT_AOF_FILENAME "appendonly.aof"
|
||||
#define CONFIG_DEFAULT_AOF_NO_FSYNC_ON_REWRITE 0
|
||||
#define CONFIG_DEFAULT_AOF_LOAD_TRUNCATED 1
|
||||
#define CONFIG_DEFAULT_AOF_USE_RDB_PREAMBLE 0
|
||||
#define CONFIG_DEFAULT_ACTIVE_REHASHING 1
|
||||
#define CONFIG_DEFAULT_AOF_REWRITE_INCREMENTAL_FSYNC 1
|
||||
#define CONFIG_DEFAULT_MIN_SLAVES_TO_WRITE 0
|
||||
@ -900,6 +902,7 @@ struct redisServer {
|
||||
int aof_last_write_status; /* C_OK or C_ERR */
|
||||
int aof_last_write_errno; /* Valid if aof_last_write_status is ERR */
|
||||
int aof_load_truncated; /* Don't stop on unexpected AOF EOF. */
|
||||
int aof_use_rdb_preamble; /* Use RDB preamble on AOF rewrites. */
|
||||
/* AOF pipes used to communicate between parent and child during rewrite. */
|
||||
int aof_pipe_write_data_to_child;
|
||||
int aof_pipe_read_data_from_parent;
|
||||
@ -1365,6 +1368,7 @@ void stopLoading(void);
|
||||
|
||||
/* RDB persistence */
|
||||
#include "rdb.h"
|
||||
int rdbSaveRio(rio *rdb, int *error, int flags);
|
||||
|
||||
/* AOF persistence */
|
||||
void flushAppendOnlyFile(int force);
|
||||
@ -1377,6 +1381,7 @@ int startAppendOnly(void);
|
||||
void backgroundRewriteDoneHandler(int exitcode, int bysignal);
|
||||
void aofRewriteBufferReset(void);
|
||||
unsigned long aofRewriteBufferSize(void);
|
||||
ssize_t aofReadDiffFromParent(void);
|
||||
|
||||
/* Sorted sets data type */
|
||||
|
||||
|
@ -4,60 +4,63 @@ start_server {tags {"aofrw"}} {
|
||||
r config set auto-aof-rewrite-percentage 0 ; # Disable auto-rewrite.
|
||||
waitForBgrewriteaof r
|
||||
|
||||
test {AOF rewrite during write load} {
|
||||
# Start a write load for 10 seconds
|
||||
set master [srv 0 client]
|
||||
set master_host [srv 0 host]
|
||||
set master_port [srv 0 port]
|
||||
set load_handle0 [start_write_load $master_host $master_port 10]
|
||||
set load_handle1 [start_write_load $master_host $master_port 10]
|
||||
set load_handle2 [start_write_load $master_host $master_port 10]
|
||||
set load_handle3 [start_write_load $master_host $master_port 10]
|
||||
set load_handle4 [start_write_load $master_host $master_port 10]
|
||||
foreach rdbpre {yes no} {
|
||||
r config set aof-use-rdb-preamble $rdbpre
|
||||
test "AOF rewrite during write load: RDB preamble=$rdbpre" {
|
||||
# Start a write load for 10 seconds
|
||||
set master [srv 0 client]
|
||||
set master_host [srv 0 host]
|
||||
set master_port [srv 0 port]
|
||||
set load_handle0 [start_write_load $master_host $master_port 10]
|
||||
set load_handle1 [start_write_load $master_host $master_port 10]
|
||||
set load_handle2 [start_write_load $master_host $master_port 10]
|
||||
set load_handle3 [start_write_load $master_host $master_port 10]
|
||||
set load_handle4 [start_write_load $master_host $master_port 10]
|
||||
|
||||
# Make sure the instance is really receiving data
|
||||
wait_for_condition 50 100 {
|
||||
[r dbsize] > 0
|
||||
} else {
|
||||
fail "No write load detected."
|
||||
# Make sure the instance is really receiving data
|
||||
wait_for_condition 50 100 {
|
||||
[r dbsize] > 0
|
||||
} else {
|
||||
fail "No write load detected."
|
||||
}
|
||||
|
||||
# After 3 seconds, start a rewrite, while the write load is still
|
||||
# active.
|
||||
after 3000
|
||||
r bgrewriteaof
|
||||
waitForBgrewriteaof r
|
||||
|
||||
# Let it run a bit more so that we'll append some data to the new
|
||||
# AOF.
|
||||
after 1000
|
||||
|
||||
# Stop the processes generating the load if they are still active
|
||||
stop_write_load $load_handle0
|
||||
stop_write_load $load_handle1
|
||||
stop_write_load $load_handle2
|
||||
stop_write_load $load_handle3
|
||||
stop_write_load $load_handle4
|
||||
|
||||
# Make sure that we remain the only connected client.
|
||||
# This step is needed to make sure there are no pending writes
|
||||
# that will be processed between the two "debug digest" calls.
|
||||
wait_for_condition 50 100 {
|
||||
[llength [split [string trim [r client list]] "\n"]] == 1
|
||||
} else {
|
||||
puts [r client list]
|
||||
fail "Clients generating loads are not disconnecting"
|
||||
}
|
||||
|
||||
# Get the data set digest
|
||||
set d1 [r debug digest]
|
||||
|
||||
# Load the AOF
|
||||
r debug loadaof
|
||||
set d2 [r debug digest]
|
||||
|
||||
# Make sure they are the same
|
||||
assert {$d1 eq $d2}
|
||||
}
|
||||
|
||||
# After 3 seconds, start a rewrite, while the write load is still
|
||||
# active.
|
||||
after 3000
|
||||
r bgrewriteaof
|
||||
waitForBgrewriteaof r
|
||||
|
||||
# Let it run a bit more so that we'll append some data to the new
|
||||
# AOF.
|
||||
after 1000
|
||||
|
||||
# Stop the processes generating the load if they are still active
|
||||
stop_write_load $load_handle0
|
||||
stop_write_load $load_handle1
|
||||
stop_write_load $load_handle2
|
||||
stop_write_load $load_handle3
|
||||
stop_write_load $load_handle4
|
||||
|
||||
# Make sure that we remain the only connected client.
|
||||
# This step is needed to make sure there are no pending writes
|
||||
# that will be processed between the two "debug digest" calls.
|
||||
wait_for_condition 50 100 {
|
||||
[llength [split [string trim [r client list]] "\n"]] == 1
|
||||
} else {
|
||||
puts [r client list]
|
||||
fail "Clients generating loads are not disconnecting"
|
||||
}
|
||||
|
||||
# Get the data set digest
|
||||
set d1 [r debug digest]
|
||||
|
||||
# Load the AOF
|
||||
r debug loadaof
|
||||
set d2 [r debug digest]
|
||||
|
||||
# Make sure they are the same
|
||||
assert {$d1 eq $d2}
|
||||
}
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user