RDB AOF preamble: WIP 1.

This commit is contained in:
antirez 2016-08-09 11:07:32 +02:00
parent 9f779b33b5
commit 4426cb11e2
4 changed files with 72 additions and 35 deletions

View File

@ -989,7 +989,7 @@ int rewriteHashObject(rio *r, robj *key, robj *o) {
}
/* Call the module type callback in order to rewrite a data type
* taht is exported by a module and is not handled by Redis itself.
* that is exported by a module and is not handled by Redis itself.
* The function returns 0 on error, 1 on success. */
int rewriteModuleObject(rio *r, robj *key, robj *o) {
RedisModuleIO io;
@ -1015,37 +1015,11 @@ ssize_t aofReadDiffFromParent(void) {
return total;
}
/* Write a sequence of commands able to fully rebuild the dataset into
* "filename". Used both by REWRITEAOF and BGREWRITEAOF.
*
* In order to minimize the number of commands needed in the rewritten
* log Redis uses variadic commands when possible, such as RPUSH, SADD
* and ZADD. However at max AOF_REWRITE_ITEMS_PER_CMD items per time
* are inserted using a single command. */
int rewriteAppendOnlyFile(char *filename) {
void rewriteAppendOnlyFileRio(rio *aof) {
dictIterator *di = NULL;
dictEntry *de;
rio aof;
FILE *fp;
char tmpfile[256];
int j;
long long now = mstime();
char byte;
size_t processed = 0;
/* Note that we have to use a different temp name here compared to the
* one used by rewriteAppendOnlyFileBackground() function. */
snprintf(tmpfile,256,"temp-rewriteaof-%d.aof", (int) getpid());
fp = fopen(tmpfile,"w");
if (!fp) {
serverLog(LL_WARNING, "Opening the temp file for AOF rewrite in rewriteAppendOnlyFile(): %s", strerror(errno));
return C_ERR;
}
server.aof_child_diff = sdsempty();
rioInitWithFile(&aof,fp);
if (server.aof_rewrite_incremental_fsync)
rioSetAutoSync(&aof,AOF_AUTOSYNC_BYTES);
for (j = 0; j < server.dbnum; j++) {
char selectcmd[] = "*2\r\n$6\r\nSELECT\r\n";
redisDb *db = server.db+j;
@ -1105,7 +1079,7 @@ int rewriteAppendOnlyFile(char *filename) {
if (rioWriteBulkLongLong(&aof,expiretime) == 0) goto werr;
}
/* Read some diff from the parent process from time to time. */
if (aof.processed_bytes > processed+1024*10) {
if (aof.processed_bytes > processed+AOF_READ_DIFF_INTERVAL_BYTES) {
processed = aof.processed_bytes;
aofReadDiffFromParent();
}
@ -1113,6 +1087,52 @@ int rewriteAppendOnlyFile(char *filename) {
dictReleaseIterator(di);
di = NULL;
}
return C_OK;
werr:
if (di) dictReleaseIterator(di);
return C_ERR;
}
/* Write a sequence of commands able to fully rebuild the dataset into
* "filename". Used both by REWRITEAOF and BGREWRITEAOF.
*
* In order to minimize the number of commands needed in the rewritten
* log Redis uses variadic commands when possible, such as RPUSH, SADD
* and ZADD. However at max AOF_REWRITE_ITEMS_PER_CMD items per time
* are inserted using a single command. */
int rewriteAppendOnlyFile(char *filename) {
rio aof;
FILE *fp;
char tmpfile[256];
int j;
long long now = mstime();
char byte;
/* Note that we have to use a different temp name here compared to the
* one used by rewriteAppendOnlyFileBackground() function. */
snprintf(tmpfile,256,"temp-rewriteaof-%d.aof", (int) getpid());
fp = fopen(tmpfile,"w");
if (!fp) {
serverLog(LL_WARNING, "Opening the temp file for AOF rewrite in rewriteAppendOnlyFile(): %s", strerror(errno));
return C_ERR;
}
server.aof_child_diff = sdsempty();
rioInitWithFile(&aof,fp);
if (server.aof_rewrite_incremental_fsync)
rioSetAutoSync(&aof,AOF_AUTOSYNC_BYTES);
if (server.aof_use_rdb_prefix) {
int error;
if (rdbSaveRio(&rdb,&error,RDB_SAVE_AOF_PREAMBLE) == C_ERR) {
errno = error;
goto werr;
}
} else {
rewriteAppendOnlyFileRio(&aof);
}
/* Do an initial slow fsync here while the parent is still sending
* data, in order to make the next final fsync faster. */
@ -1178,7 +1198,6 @@ werr:
serverLog(LL_WARNING,"Write error writing append only file on disk: %s", strerror(errno));
fclose(fp);
unlink(tmpfile);
if (di) dictReleaseIterator(di);
return C_ERR;
}

View File

@ -818,14 +818,16 @@ int rdbSaveAuxFieldStrInt(rio *rdb, char *key, long long val) {
}
/* Save a few default AUX fields with information about the RDB generated. */
int rdbSaveInfoAuxFields(rio *rdb) {
int rdbSaveInfoAuxFields(rio *rdb, int flags) {
int redis_bits = (sizeof(void*) == 8) ? 64 : 32;
int aof_preamble = (flags & RDB_SAVE_AOF_PREAMBLE) != 0;
/* Add a few fields about the state when the RDB was created. */
if (rdbSaveAuxFieldStrStr(rdb,"redis-ver",REDIS_VERSION) == -1) return -1;
if (rdbSaveAuxFieldStrInt(rdb,"redis-bits",redis_bits) == -1) return -1;
if (rdbSaveAuxFieldStrInt(rdb,"ctime",time(NULL)) == -1) return -1;
if (rdbSaveAuxFieldStrInt(rdb,"used-mem",zmalloc_used_memory()) == -1) return -1;
if (rdbSaveAuxFieldStrInt(rdb,"aof-preamble",aof_preamble)) return -1;
return 1;
}
@ -837,19 +839,20 @@ int rdbSaveInfoAuxFields(rio *rdb) {
* When the function returns C_ERR and if 'error' is not NULL, the
* integer pointed by 'error' is set to the value of errno just after the I/O
* error. */
int rdbSaveRio(rio *rdb, int *error) {
int rdbSaveRio(rio *rdb, int *error, int flags) {
dictIterator *di = NULL;
dictEntry *de;
char magic[10];
int j;
long long now = mstime();
uint64_t cksum;
size_t processed = 0;
if (server.rdb_checksum)
rdb->update_cksum = rioGenericUpdateChecksum;
snprintf(magic,sizeof(magic),"REDIS%04d",RDB_VERSION);
if (rdbWriteRaw(rdb,magic,9) == -1) goto werr;
if (rdbSaveInfoAuxFields(rdb) == -1) goto werr;
if (rdbSaveInfoAuxFields(rdb,flags) == -1) goto werr;
for (j = 0; j < server.dbnum; j++) {
redisDb *db = server.db+j;
@ -886,6 +889,16 @@ int rdbSaveRio(rio *rdb, int *error) {
initStaticStringObject(key,keystr);
expire = getExpire(db,&key);
if (rdbSaveKeyValuePair(rdb,&key,o,expire,now) == -1) goto werr;
/* When this RDB is produced as part of an AOF rewrite, move
* accumulated diff from parent to child while rewriting in
* order to have a smaller final write. */
if (flags & RDB_SAVE_AOF_PREAMBLE &&
rdb.processed_bytes > processed+AOF_READ_DIFF_INTERVAL_BYTES)
{
processed = rdb.processed_bytes;
aofReadDiffFromParent();
}
}
dictReleaseIterator(di);
}
@ -923,7 +936,7 @@ int rdbSaveRioWithEOFMark(rio *rdb, int *error) {
if (rioWrite(rdb,"$EOF:",5) == 0) goto werr;
if (rioWrite(rdb,eofmark,RDB_EOF_MARK_SIZE) == 0) goto werr;
if (rioWrite(rdb,"\r\n",2) == 0) goto werr;
if (rdbSaveRio(rdb,error) == C_ERR) goto werr;
if (rdbSaveRio(rdb,error,RDB_SAVE_NONE) == C_ERR) goto werr;
if (rioWrite(rdb,eofmark,RDB_EOF_MARK_SIZE) == 0) goto werr;
return C_OK;
@ -955,7 +968,7 @@ int rdbSave(char *filename) {
}
rioInitWithFile(&rdb,fp);
if (rdbSaveRio(&rdb,&error) == C_ERR) {
if (rdbSaveRio(&rdb,&error,RDB_SAVE_NONE) == C_ERR) {
errno = error;
goto werr;
}

View File

@ -106,6 +106,9 @@
#define RDB_LOAD_PLAIN (1<<1)
#define RDB_LOAD_SDS (1<<2)
#define RDB_SAVE_NONE 0
#define RDB_SAVE_AOF_PREAMBLE (1<<0)
int rdbSaveType(rio *rdb, unsigned char type);
int rdbLoadType(rio *rdb);
int rdbSaveTime(rio *rdb, time_t t);

View File

@ -93,6 +93,7 @@ typedef long long mstime_t; /* millisecond time type. */
#define AOF_REWRITE_PERC 100
#define AOF_REWRITE_MIN_SIZE (64*1024*1024)
#define AOF_REWRITE_ITEMS_PER_CMD 64
#define AOF_READ_DIFF_INTERVAL_BYTES (1024*10)
#define CONFIG_DEFAULT_SLOWLOG_LOG_SLOWER_THAN 10000
#define CONFIG_DEFAULT_SLOWLOG_MAX_LEN 128
#define CONFIG_DEFAULT_MAX_CLIENTS 10000
@ -1365,6 +1366,7 @@ void stopLoading(void);
/* RDB persistence */
#include "rdb.h"
int rdbSaveRio(rio *rdb, int *error, int flags);
/* AOF persistence */
void flushAppendOnlyFile(int force);