diff --git a/src/aof.c b/src/aof.c index 39229b5df..104d8fe39 100644 --- a/src/aof.c +++ b/src/aof.c @@ -1015,10 +1015,12 @@ ssize_t aofReadDiffFromParent(void) { return total; } -void rewriteAppendOnlyFileRio(rio *aof) { +int rewriteAppendOnlyFileRio(rio *aof) { dictIterator *di = NULL; dictEntry *de; size_t processed = 0; + long long now = mstime(); + int j; for (j = 0; j < server.dbnum; j++) { char selectcmd[] = "*2\r\n$6\r\nSELECT\r\n"; @@ -1026,14 +1028,10 @@ void rewriteAppendOnlyFileRio(rio *aof) { dict *d = db->dict; if (dictSize(d) == 0) continue; di = dictGetSafeIterator(d); - if (!di) { - fclose(fp); - return C_ERR; - } /* SELECT the new DB */ - if (rioWrite(&aof,selectcmd,sizeof(selectcmd)-1) == 0) goto werr; - if (rioWriteBulkLongLong(&aof,j) == 0) goto werr; + if (rioWrite(aof,selectcmd,sizeof(selectcmd)-1) == 0) goto werr; + if (rioWriteBulkLongLong(aof,j) == 0) goto werr; /* Iterate this DB writing every entry */ while((de = dictNext(di)) != NULL) { @@ -1054,33 +1052,33 @@ void rewriteAppendOnlyFileRio(rio *aof) { if (o->type == OBJ_STRING) { /* Emit a SET command */ char cmd[]="*3\r\n$3\r\nSET\r\n"; - if (rioWrite(&aof,cmd,sizeof(cmd)-1) == 0) goto werr; + if (rioWrite(aof,cmd,sizeof(cmd)-1) == 0) goto werr; /* Key and value */ - if (rioWriteBulkObject(&aof,&key) == 0) goto werr; - if (rioWriteBulkObject(&aof,o) == 0) goto werr; + if (rioWriteBulkObject(aof,&key) == 0) goto werr; + if (rioWriteBulkObject(aof,o) == 0) goto werr; } else if (o->type == OBJ_LIST) { - if (rewriteListObject(&aof,&key,o) == 0) goto werr; + if (rewriteListObject(aof,&key,o) == 0) goto werr; } else if (o->type == OBJ_SET) { - if (rewriteSetObject(&aof,&key,o) == 0) goto werr; + if (rewriteSetObject(aof,&key,o) == 0) goto werr; } else if (o->type == OBJ_ZSET) { - if (rewriteSortedSetObject(&aof,&key,o) == 0) goto werr; + if (rewriteSortedSetObject(aof,&key,o) == 0) goto werr; } else if (o->type == OBJ_HASH) { - if (rewriteHashObject(&aof,&key,o) == 0) goto werr; + if (rewriteHashObject(aof,&key,o) == 0) goto werr; } else if (o->type == OBJ_MODULE) { - if (rewriteModuleObject(&aof,&key,o) == 0) goto werr; + if (rewriteModuleObject(aof,&key,o) == 0) goto werr; } else { serverPanic("Unknown object type"); } /* Save the expire time */ if (expiretime != -1) { char cmd[]="*3\r\n$9\r\nPEXPIREAT\r\n"; - if (rioWrite(&aof,cmd,sizeof(cmd)-1) == 0) goto werr; - if (rioWriteBulkObject(&aof,&key) == 0) goto werr; - if (rioWriteBulkLongLong(&aof,expiretime) == 0) goto werr; + if (rioWrite(aof,cmd,sizeof(cmd)-1) == 0) goto werr; + if (rioWriteBulkObject(aof,&key) == 0) goto werr; + if (rioWriteBulkLongLong(aof,expiretime) == 0) goto werr; } /* Read some diff from the parent process from time to time. */ - if (aof.processed_bytes > processed+AOF_READ_DIFF_INTERVAL_BYTES) { - processed = aof.processed_bytes; + if (aof->processed_bytes > processed+AOF_READ_DIFF_INTERVAL_BYTES) { + processed = aof->processed_bytes; aofReadDiffFromParent(); } } @@ -1105,8 +1103,6 @@ int rewriteAppendOnlyFile(char *filename) { rio aof; FILE *fp; char tmpfile[256]; - int j; - long long now = mstime(); char byte; /* Note that we have to use a different temp name here compared to the @@ -1124,14 +1120,14 @@ int rewriteAppendOnlyFile(char *filename) { if (server.aof_rewrite_incremental_fsync) rioSetAutoSync(&aof,AOF_AUTOSYNC_BYTES); - if (server.aof_use_rdb_prefix) { + if (server.aof_use_rdb_preamble) { int error; - if (rdbSaveRio(&rdb,&error,RDB_SAVE_AOF_PREAMBLE) == C_ERR) { + if (rdbSaveRio(&aof,&error,RDB_SAVE_AOF_PREAMBLE) == C_ERR) { errno = error; goto werr; } } else { - rewriteAppendOnlyFileRio(&aof); + if (rewriteAppendOnlyFileRio(&aof) == C_ERR) goto werr; } /* Do an initial slow fsync here while the parent is still sending diff --git a/src/config.c b/src/config.c index dd21a0aca..1d81180b7 100644 --- a/src/config.c +++ b/src/config.c @@ -475,6 +475,10 @@ void loadServerConfigFromString(char *config) { if ((server.aof_load_truncated = yesnotoi(argv[1])) == -1) { err = "argument must be 'yes' or 'no'"; goto loaderr; } + } else if (!strcasecmp(argv[0],"aof-use-rdb-preamble") && argc == 2) { + if ((server.aof_use_rdb_preamble = yesnotoi(argv[1])) == -1) { + err = "argument must be 'yes' or 'no'"; goto loaderr; + } } else if (!strcasecmp(argv[0],"requirepass") && argc == 2) { if (strlen(argv[1]) > CONFIG_AUTHPASS_MAX_LEN) { err = "Password is longer than CONFIG_AUTHPASS_MAX_LEN"; @@ -953,6 +957,8 @@ void configSetCommand(client *c) { "aof-rewrite-incremental-fsync",server.aof_rewrite_incremental_fsync) { } config_set_bool_field( "aof-load-truncated",server.aof_load_truncated) { + } config_set_bool_field( + "aof-use-rdb-preamble",server.aof_use_rdb_preamble) { } config_set_bool_field( "slave-serve-stale-data",server.repl_serve_stale_data) { } config_set_bool_field( @@ -1227,6 +1233,8 @@ void configGetCommand(client *c) { server.aof_rewrite_incremental_fsync); config_get_bool_field("aof-load-truncated", server.aof_load_truncated); + config_get_bool_field("aof-use-rdb-preamble", + server.aof_use_rdb_preamble); config_get_bool_field("lazyfree-lazy-eviction", server.lazyfree_lazy_eviction); config_get_bool_field("lazyfree-lazy-expire", @@ -1947,6 +1955,7 @@ int rewriteConfig(char *path) { rewriteConfigNumericalOption(state,"hz",server.hz,CONFIG_DEFAULT_HZ); rewriteConfigYesNoOption(state,"aof-rewrite-incremental-fsync",server.aof_rewrite_incremental_fsync,CONFIG_DEFAULT_AOF_REWRITE_INCREMENTAL_FSYNC); rewriteConfigYesNoOption(state,"aof-load-truncated",server.aof_load_truncated,CONFIG_DEFAULT_AOF_LOAD_TRUNCATED); + rewriteConfigYesNoOption(state,"aof-use-rdb-preamble",server.aof_use_rdb_preamble,CONFIG_DEFAULT_AOF_USE_RDB_PREAMBLE); rewriteConfigEnumOption(state,"supervised",server.supervised_mode,supervised_mode_enum,SUPERVISED_NONE); rewriteConfigYesNoOption(state,"lazyfree-lazy-eviction",server.lazyfree_lazy_eviction,CONFIG_DEFAULT_LAZYFREE_LAZY_EVICTION); rewriteConfigYesNoOption(state,"lazyfree-lazy-expire",server.lazyfree_lazy_expire,CONFIG_DEFAULT_LAZYFREE_LAZY_EXPIRE); diff --git a/src/rdb.c b/src/rdb.c index 83e5868cd..570e20f4d 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -827,7 +827,7 @@ int rdbSaveInfoAuxFields(rio *rdb, int flags) { if (rdbSaveAuxFieldStrInt(rdb,"redis-bits",redis_bits) == -1) return -1; if (rdbSaveAuxFieldStrInt(rdb,"ctime",time(NULL)) == -1) return -1; if (rdbSaveAuxFieldStrInt(rdb,"used-mem",zmalloc_used_memory()) == -1) return -1; - if (rdbSaveAuxFieldStrInt(rdb,"aof-preamble",aof_preamble)) return -1; + if (rdbSaveAuxFieldStrInt(rdb,"aof-preamble",aof_preamble) == -1) return -1; return 1; } @@ -894,9 +894,9 @@ int rdbSaveRio(rio *rdb, int *error, int flags) { * accumulated diff from parent to child while rewriting in * order to have a smaller final write. */ if (flags & RDB_SAVE_AOF_PREAMBLE && - rdb.processed_bytes > processed+AOF_READ_DIFF_INTERVAL_BYTES) + rdb->processed_bytes > processed+AOF_READ_DIFF_INTERVAL_BYTES) { - processed = rdb.processed_bytes; + processed = rdb->processed_bytes; aofReadDiffFromParent(); } } diff --git a/src/server.c b/src/server.c index a77582592..d7ce68852 100644 --- a/src/server.c +++ b/src/server.c @@ -1345,6 +1345,7 @@ void initServerConfig(void) { server.aof_flush_postponed_start = 0; server.aof_rewrite_incremental_fsync = CONFIG_DEFAULT_AOF_REWRITE_INCREMENTAL_FSYNC; server.aof_load_truncated = CONFIG_DEFAULT_AOF_LOAD_TRUNCATED; + server.aof_use_rdb_preamble = CONFIG_DEFAULT_AOF_USE_RDB_PREAMBLE; server.pidfile = NULL; server.rdb_filename = zstrdup(CONFIG_DEFAULT_RDB_FILENAME); server.aof_filename = zstrdup(CONFIG_DEFAULT_AOF_FILENAME); diff --git a/src/server.h b/src/server.h index 2bc985cbb..a5f0ee1a6 100644 --- a/src/server.h +++ b/src/server.h @@ -137,6 +137,7 @@ typedef long long mstime_t; /* millisecond time type. */ #define CONFIG_DEFAULT_AOF_FILENAME "appendonly.aof" #define CONFIG_DEFAULT_AOF_NO_FSYNC_ON_REWRITE 0 #define CONFIG_DEFAULT_AOF_LOAD_TRUNCATED 1 +#define CONFIG_DEFAULT_AOF_USE_RDB_PREAMBLE 0 #define CONFIG_DEFAULT_ACTIVE_REHASHING 1 #define CONFIG_DEFAULT_AOF_REWRITE_INCREMENTAL_FSYNC 1 #define CONFIG_DEFAULT_MIN_SLAVES_TO_WRITE 0 @@ -901,6 +902,7 @@ struct redisServer { int aof_last_write_status; /* C_OK or C_ERR */ int aof_last_write_errno; /* Valid if aof_last_write_status is ERR */ int aof_load_truncated; /* Don't stop on unexpected AOF EOF. */ + int aof_use_rdb_preamble; /* Use RDB preamble on AOF rewrites. */ /* AOF pipes used to communicate between parent and child during rewrite. */ int aof_pipe_write_data_to_child; int aof_pipe_read_data_from_parent; @@ -1379,6 +1381,7 @@ int startAppendOnly(void); void backgroundRewriteDoneHandler(int exitcode, int bysignal); void aofRewriteBufferReset(void); unsigned long aofRewriteBufferSize(void); +ssize_t aofReadDiffFromParent(void); /* Sorted sets data type */