diff --git a/src/aof.c b/src/aof.c index 6a92a0cd9..39229b5df 100644 --- a/src/aof.c +++ b/src/aof.c @@ -989,7 +989,7 @@ int rewriteHashObject(rio *r, robj *key, robj *o) { } /* Call the module type callback in order to rewrite a data type - * taht is exported by a module and is not handled by Redis itself. + * that is exported by a module and is not handled by Redis itself. * The function returns 0 on error, 1 on success. */ int rewriteModuleObject(rio *r, robj *key, robj *o) { RedisModuleIO io; @@ -1015,37 +1015,11 @@ ssize_t aofReadDiffFromParent(void) { return total; } -/* Write a sequence of commands able to fully rebuild the dataset into - * "filename". Used both by REWRITEAOF and BGREWRITEAOF. - * - * In order to minimize the number of commands needed in the rewritten - * log Redis uses variadic commands when possible, such as RPUSH, SADD - * and ZADD. However at max AOF_REWRITE_ITEMS_PER_CMD items per time - * are inserted using a single command. */ -int rewriteAppendOnlyFile(char *filename) { +void rewriteAppendOnlyFileRio(rio *aof) { dictIterator *di = NULL; dictEntry *de; - rio aof; - FILE *fp; - char tmpfile[256]; - int j; - long long now = mstime(); - char byte; size_t processed = 0; - /* Note that we have to use a different temp name here compared to the - * one used by rewriteAppendOnlyFileBackground() function. */ - snprintf(tmpfile,256,"temp-rewriteaof-%d.aof", (int) getpid()); - fp = fopen(tmpfile,"w"); - if (!fp) { - serverLog(LL_WARNING, "Opening the temp file for AOF rewrite in rewriteAppendOnlyFile(): %s", strerror(errno)); - return C_ERR; - } - - server.aof_child_diff = sdsempty(); - rioInitWithFile(&aof,fp); - if (server.aof_rewrite_incremental_fsync) - rioSetAutoSync(&aof,AOF_AUTOSYNC_BYTES); for (j = 0; j < server.dbnum; j++) { char selectcmd[] = "*2\r\n$6\r\nSELECT\r\n"; redisDb *db = server.db+j; @@ -1105,7 +1079,7 @@ int rewriteAppendOnlyFile(char *filename) { if (rioWriteBulkLongLong(&aof,expiretime) == 0) goto werr; } /* Read some diff from the parent process from time to time. */ - if (aof.processed_bytes > processed+1024*10) { + if (aof.processed_bytes > processed+AOF_READ_DIFF_INTERVAL_BYTES) { processed = aof.processed_bytes; aofReadDiffFromParent(); } @@ -1113,6 +1087,52 @@ int rewriteAppendOnlyFile(char *filename) { dictReleaseIterator(di); di = NULL; } + return C_OK; + +werr: + if (di) dictReleaseIterator(di); + return C_ERR; +} + +/* Write a sequence of commands able to fully rebuild the dataset into + * "filename". Used both by REWRITEAOF and BGREWRITEAOF. + * + * In order to minimize the number of commands needed in the rewritten + * log Redis uses variadic commands when possible, such as RPUSH, SADD + * and ZADD. However at max AOF_REWRITE_ITEMS_PER_CMD items per time + * are inserted using a single command. */ +int rewriteAppendOnlyFile(char *filename) { + rio aof; + FILE *fp; + char tmpfile[256]; + int j; + long long now = mstime(); + char byte; + + /* Note that we have to use a different temp name here compared to the + * one used by rewriteAppendOnlyFileBackground() function. */ + snprintf(tmpfile,256,"temp-rewriteaof-%d.aof", (int) getpid()); + fp = fopen(tmpfile,"w"); + if (!fp) { + serverLog(LL_WARNING, "Opening the temp file for AOF rewrite in rewriteAppendOnlyFile(): %s", strerror(errno)); + return C_ERR; + } + + server.aof_child_diff = sdsempty(); + rioInitWithFile(&aof,fp); + + if (server.aof_rewrite_incremental_fsync) + rioSetAutoSync(&aof,AOF_AUTOSYNC_BYTES); + + if (server.aof_use_rdb_prefix) { + int error; + if (rdbSaveRio(&rdb,&error,RDB_SAVE_AOF_PREAMBLE) == C_ERR) { + errno = error; + goto werr; + } + } else { + rewriteAppendOnlyFileRio(&aof); + } /* Do an initial slow fsync here while the parent is still sending * data, in order to make the next final fsync faster. */ @@ -1178,7 +1198,6 @@ werr: serverLog(LL_WARNING,"Write error writing append only file on disk: %s", strerror(errno)); fclose(fp); unlink(tmpfile); - if (di) dictReleaseIterator(di); return C_ERR; } diff --git a/src/rdb.c b/src/rdb.c index 859297943..83e5868cd 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -818,14 +818,16 @@ int rdbSaveAuxFieldStrInt(rio *rdb, char *key, long long val) { } /* Save a few default AUX fields with information about the RDB generated. */ -int rdbSaveInfoAuxFields(rio *rdb) { +int rdbSaveInfoAuxFields(rio *rdb, int flags) { int redis_bits = (sizeof(void*) == 8) ? 64 : 32; + int aof_preamble = (flags & RDB_SAVE_AOF_PREAMBLE) != 0; /* Add a few fields about the state when the RDB was created. */ if (rdbSaveAuxFieldStrStr(rdb,"redis-ver",REDIS_VERSION) == -1) return -1; if (rdbSaveAuxFieldStrInt(rdb,"redis-bits",redis_bits) == -1) return -1; if (rdbSaveAuxFieldStrInt(rdb,"ctime",time(NULL)) == -1) return -1; if (rdbSaveAuxFieldStrInt(rdb,"used-mem",zmalloc_used_memory()) == -1) return -1; + if (rdbSaveAuxFieldStrInt(rdb,"aof-preamble",aof_preamble)) return -1; return 1; } @@ -837,19 +839,20 @@ int rdbSaveInfoAuxFields(rio *rdb) { * When the function returns C_ERR and if 'error' is not NULL, the * integer pointed by 'error' is set to the value of errno just after the I/O * error. */ -int rdbSaveRio(rio *rdb, int *error) { +int rdbSaveRio(rio *rdb, int *error, int flags) { dictIterator *di = NULL; dictEntry *de; char magic[10]; int j; long long now = mstime(); uint64_t cksum; + size_t processed = 0; if (server.rdb_checksum) rdb->update_cksum = rioGenericUpdateChecksum; snprintf(magic,sizeof(magic),"REDIS%04d",RDB_VERSION); if (rdbWriteRaw(rdb,magic,9) == -1) goto werr; - if (rdbSaveInfoAuxFields(rdb) == -1) goto werr; + if (rdbSaveInfoAuxFields(rdb,flags) == -1) goto werr; for (j = 0; j < server.dbnum; j++) { redisDb *db = server.db+j; @@ -886,6 +889,16 @@ int rdbSaveRio(rio *rdb, int *error) { initStaticStringObject(key,keystr); expire = getExpire(db,&key); if (rdbSaveKeyValuePair(rdb,&key,o,expire,now) == -1) goto werr; + + /* When this RDB is produced as part of an AOF rewrite, move + * accumulated diff from parent to child while rewriting in + * order to have a smaller final write. */ + if (flags & RDB_SAVE_AOF_PREAMBLE && + rdb.processed_bytes > processed+AOF_READ_DIFF_INTERVAL_BYTES) + { + processed = rdb.processed_bytes; + aofReadDiffFromParent(); + } } dictReleaseIterator(di); } @@ -923,7 +936,7 @@ int rdbSaveRioWithEOFMark(rio *rdb, int *error) { if (rioWrite(rdb,"$EOF:",5) == 0) goto werr; if (rioWrite(rdb,eofmark,RDB_EOF_MARK_SIZE) == 0) goto werr; if (rioWrite(rdb,"\r\n",2) == 0) goto werr; - if (rdbSaveRio(rdb,error) == C_ERR) goto werr; + if (rdbSaveRio(rdb,error,RDB_SAVE_NONE) == C_ERR) goto werr; if (rioWrite(rdb,eofmark,RDB_EOF_MARK_SIZE) == 0) goto werr; return C_OK; @@ -955,7 +968,7 @@ int rdbSave(char *filename) { } rioInitWithFile(&rdb,fp); - if (rdbSaveRio(&rdb,&error) == C_ERR) { + if (rdbSaveRio(&rdb,&error,RDB_SAVE_NONE) == C_ERR) { errno = error; goto werr; } diff --git a/src/rdb.h b/src/rdb.h index a71ecb16e..2c9a99850 100644 --- a/src/rdb.h +++ b/src/rdb.h @@ -106,6 +106,9 @@ #define RDB_LOAD_PLAIN (1<<1) #define RDB_LOAD_SDS (1<<2) +#define RDB_SAVE_NONE 0 +#define RDB_SAVE_AOF_PREAMBLE (1<<0) + int rdbSaveType(rio *rdb, unsigned char type); int rdbLoadType(rio *rdb); int rdbSaveTime(rio *rdb, time_t t); diff --git a/src/server.h b/src/server.h index d410d5b2a..2bc985cbb 100644 --- a/src/server.h +++ b/src/server.h @@ -93,6 +93,7 @@ typedef long long mstime_t; /* millisecond time type. */ #define AOF_REWRITE_PERC 100 #define AOF_REWRITE_MIN_SIZE (64*1024*1024) #define AOF_REWRITE_ITEMS_PER_CMD 64 +#define AOF_READ_DIFF_INTERVAL_BYTES (1024*10) #define CONFIG_DEFAULT_SLOWLOG_LOG_SLOWER_THAN 10000 #define CONFIG_DEFAULT_SLOWLOG_MAX_LEN 128 #define CONFIG_DEFAULT_MAX_CLIENTS 10000 @@ -1365,6 +1366,7 @@ void stopLoading(void); /* RDB persistence */ #include "rdb.h" +int rdbSaveRio(rio *rdb, int *error, int flags); /* AOF persistence */ void flushAppendOnlyFile(int force);