mirror of
https://codeberg.org/redict/redict.git
synced 2025-01-22 08:08:53 -05:00
RDB AOF preamble: WIP 2.
This commit is contained in:
parent
4426cb11e2
commit
feda52381d
46
src/aof.c
46
src/aof.c
@ -1015,10 +1015,12 @@ ssize_t aofReadDiffFromParent(void) {
|
||||
return total;
|
||||
}
|
||||
|
||||
void rewriteAppendOnlyFileRio(rio *aof) {
|
||||
int rewriteAppendOnlyFileRio(rio *aof) {
|
||||
dictIterator *di = NULL;
|
||||
dictEntry *de;
|
||||
size_t processed = 0;
|
||||
long long now = mstime();
|
||||
int j;
|
||||
|
||||
for (j = 0; j < server.dbnum; j++) {
|
||||
char selectcmd[] = "*2\r\n$6\r\nSELECT\r\n";
|
||||
@ -1026,14 +1028,10 @@ void rewriteAppendOnlyFileRio(rio *aof) {
|
||||
dict *d = db->dict;
|
||||
if (dictSize(d) == 0) continue;
|
||||
di = dictGetSafeIterator(d);
|
||||
if (!di) {
|
||||
fclose(fp);
|
||||
return C_ERR;
|
||||
}
|
||||
|
||||
/* SELECT the new DB */
|
||||
if (rioWrite(&aof,selectcmd,sizeof(selectcmd)-1) == 0) goto werr;
|
||||
if (rioWriteBulkLongLong(&aof,j) == 0) goto werr;
|
||||
if (rioWrite(aof,selectcmd,sizeof(selectcmd)-1) == 0) goto werr;
|
||||
if (rioWriteBulkLongLong(aof,j) == 0) goto werr;
|
||||
|
||||
/* Iterate this DB writing every entry */
|
||||
while((de = dictNext(di)) != NULL) {
|
||||
@ -1054,33 +1052,33 @@ void rewriteAppendOnlyFileRio(rio *aof) {
|
||||
if (o->type == OBJ_STRING) {
|
||||
/* Emit a SET command */
|
||||
char cmd[]="*3\r\n$3\r\nSET\r\n";
|
||||
if (rioWrite(&aof,cmd,sizeof(cmd)-1) == 0) goto werr;
|
||||
if (rioWrite(aof,cmd,sizeof(cmd)-1) == 0) goto werr;
|
||||
/* Key and value */
|
||||
if (rioWriteBulkObject(&aof,&key) == 0) goto werr;
|
||||
if (rioWriteBulkObject(&aof,o) == 0) goto werr;
|
||||
if (rioWriteBulkObject(aof,&key) == 0) goto werr;
|
||||
if (rioWriteBulkObject(aof,o) == 0) goto werr;
|
||||
} else if (o->type == OBJ_LIST) {
|
||||
if (rewriteListObject(&aof,&key,o) == 0) goto werr;
|
||||
if (rewriteListObject(aof,&key,o) == 0) goto werr;
|
||||
} else if (o->type == OBJ_SET) {
|
||||
if (rewriteSetObject(&aof,&key,o) == 0) goto werr;
|
||||
if (rewriteSetObject(aof,&key,o) == 0) goto werr;
|
||||
} else if (o->type == OBJ_ZSET) {
|
||||
if (rewriteSortedSetObject(&aof,&key,o) == 0) goto werr;
|
||||
if (rewriteSortedSetObject(aof,&key,o) == 0) goto werr;
|
||||
} else if (o->type == OBJ_HASH) {
|
||||
if (rewriteHashObject(&aof,&key,o) == 0) goto werr;
|
||||
if (rewriteHashObject(aof,&key,o) == 0) goto werr;
|
||||
} else if (o->type == OBJ_MODULE) {
|
||||
if (rewriteModuleObject(&aof,&key,o) == 0) goto werr;
|
||||
if (rewriteModuleObject(aof,&key,o) == 0) goto werr;
|
||||
} else {
|
||||
serverPanic("Unknown object type");
|
||||
}
|
||||
/* Save the expire time */
|
||||
if (expiretime != -1) {
|
||||
char cmd[]="*3\r\n$9\r\nPEXPIREAT\r\n";
|
||||
if (rioWrite(&aof,cmd,sizeof(cmd)-1) == 0) goto werr;
|
||||
if (rioWriteBulkObject(&aof,&key) == 0) goto werr;
|
||||
if (rioWriteBulkLongLong(&aof,expiretime) == 0) goto werr;
|
||||
if (rioWrite(aof,cmd,sizeof(cmd)-1) == 0) goto werr;
|
||||
if (rioWriteBulkObject(aof,&key) == 0) goto werr;
|
||||
if (rioWriteBulkLongLong(aof,expiretime) == 0) goto werr;
|
||||
}
|
||||
/* Read some diff from the parent process from time to time. */
|
||||
if (aof.processed_bytes > processed+AOF_READ_DIFF_INTERVAL_BYTES) {
|
||||
processed = aof.processed_bytes;
|
||||
if (aof->processed_bytes > processed+AOF_READ_DIFF_INTERVAL_BYTES) {
|
||||
processed = aof->processed_bytes;
|
||||
aofReadDiffFromParent();
|
||||
}
|
||||
}
|
||||
@ -1105,8 +1103,6 @@ int rewriteAppendOnlyFile(char *filename) {
|
||||
rio aof;
|
||||
FILE *fp;
|
||||
char tmpfile[256];
|
||||
int j;
|
||||
long long now = mstime();
|
||||
char byte;
|
||||
|
||||
/* Note that we have to use a different temp name here compared to the
|
||||
@ -1124,14 +1120,14 @@ int rewriteAppendOnlyFile(char *filename) {
|
||||
if (server.aof_rewrite_incremental_fsync)
|
||||
rioSetAutoSync(&aof,AOF_AUTOSYNC_BYTES);
|
||||
|
||||
if (server.aof_use_rdb_prefix) {
|
||||
if (server.aof_use_rdb_preamble) {
|
||||
int error;
|
||||
if (rdbSaveRio(&rdb,&error,RDB_SAVE_AOF_PREAMBLE) == C_ERR) {
|
||||
if (rdbSaveRio(&aof,&error,RDB_SAVE_AOF_PREAMBLE) == C_ERR) {
|
||||
errno = error;
|
||||
goto werr;
|
||||
}
|
||||
} else {
|
||||
rewriteAppendOnlyFileRio(&aof);
|
||||
if (rewriteAppendOnlyFileRio(&aof) == C_ERR) goto werr;
|
||||
}
|
||||
|
||||
/* Do an initial slow fsync here while the parent is still sending
|
||||
|
@ -475,6 +475,10 @@ void loadServerConfigFromString(char *config) {
|
||||
if ((server.aof_load_truncated = yesnotoi(argv[1])) == -1) {
|
||||
err = "argument must be 'yes' or 'no'"; goto loaderr;
|
||||
}
|
||||
} else if (!strcasecmp(argv[0],"aof-use-rdb-preamble") && argc == 2) {
|
||||
if ((server.aof_use_rdb_preamble = yesnotoi(argv[1])) == -1) {
|
||||
err = "argument must be 'yes' or 'no'"; goto loaderr;
|
||||
}
|
||||
} else if (!strcasecmp(argv[0],"requirepass") && argc == 2) {
|
||||
if (strlen(argv[1]) > CONFIG_AUTHPASS_MAX_LEN) {
|
||||
err = "Password is longer than CONFIG_AUTHPASS_MAX_LEN";
|
||||
@ -953,6 +957,8 @@ void configSetCommand(client *c) {
|
||||
"aof-rewrite-incremental-fsync",server.aof_rewrite_incremental_fsync) {
|
||||
} config_set_bool_field(
|
||||
"aof-load-truncated",server.aof_load_truncated) {
|
||||
} config_set_bool_field(
|
||||
"aof-use-rdb-preamble",server.aof_use_rdb_preamble) {
|
||||
} config_set_bool_field(
|
||||
"slave-serve-stale-data",server.repl_serve_stale_data) {
|
||||
} config_set_bool_field(
|
||||
@ -1227,6 +1233,8 @@ void configGetCommand(client *c) {
|
||||
server.aof_rewrite_incremental_fsync);
|
||||
config_get_bool_field("aof-load-truncated",
|
||||
server.aof_load_truncated);
|
||||
config_get_bool_field("aof-use-rdb-preamble",
|
||||
server.aof_use_rdb_preamble);
|
||||
config_get_bool_field("lazyfree-lazy-eviction",
|
||||
server.lazyfree_lazy_eviction);
|
||||
config_get_bool_field("lazyfree-lazy-expire",
|
||||
@ -1947,6 +1955,7 @@ int rewriteConfig(char *path) {
|
||||
rewriteConfigNumericalOption(state,"hz",server.hz,CONFIG_DEFAULT_HZ);
|
||||
rewriteConfigYesNoOption(state,"aof-rewrite-incremental-fsync",server.aof_rewrite_incremental_fsync,CONFIG_DEFAULT_AOF_REWRITE_INCREMENTAL_FSYNC);
|
||||
rewriteConfigYesNoOption(state,"aof-load-truncated",server.aof_load_truncated,CONFIG_DEFAULT_AOF_LOAD_TRUNCATED);
|
||||
rewriteConfigYesNoOption(state,"aof-use-rdb-preamble",server.aof_use_rdb_preamble,CONFIG_DEFAULT_AOF_USE_RDB_PREAMBLE);
|
||||
rewriteConfigEnumOption(state,"supervised",server.supervised_mode,supervised_mode_enum,SUPERVISED_NONE);
|
||||
rewriteConfigYesNoOption(state,"lazyfree-lazy-eviction",server.lazyfree_lazy_eviction,CONFIG_DEFAULT_LAZYFREE_LAZY_EVICTION);
|
||||
rewriteConfigYesNoOption(state,"lazyfree-lazy-expire",server.lazyfree_lazy_expire,CONFIG_DEFAULT_LAZYFREE_LAZY_EXPIRE);
|
||||
|
@ -827,7 +827,7 @@ int rdbSaveInfoAuxFields(rio *rdb, int flags) {
|
||||
if (rdbSaveAuxFieldStrInt(rdb,"redis-bits",redis_bits) == -1) return -1;
|
||||
if (rdbSaveAuxFieldStrInt(rdb,"ctime",time(NULL)) == -1) return -1;
|
||||
if (rdbSaveAuxFieldStrInt(rdb,"used-mem",zmalloc_used_memory()) == -1) return -1;
|
||||
if (rdbSaveAuxFieldStrInt(rdb,"aof-preamble",aof_preamble)) return -1;
|
||||
if (rdbSaveAuxFieldStrInt(rdb,"aof-preamble",aof_preamble) == -1) return -1;
|
||||
return 1;
|
||||
}
|
||||
|
||||
@ -894,9 +894,9 @@ int rdbSaveRio(rio *rdb, int *error, int flags) {
|
||||
* accumulated diff from parent to child while rewriting in
|
||||
* order to have a smaller final write. */
|
||||
if (flags & RDB_SAVE_AOF_PREAMBLE &&
|
||||
rdb.processed_bytes > processed+AOF_READ_DIFF_INTERVAL_BYTES)
|
||||
rdb->processed_bytes > processed+AOF_READ_DIFF_INTERVAL_BYTES)
|
||||
{
|
||||
processed = rdb.processed_bytes;
|
||||
processed = rdb->processed_bytes;
|
||||
aofReadDiffFromParent();
|
||||
}
|
||||
}
|
||||
|
@ -1345,6 +1345,7 @@ void initServerConfig(void) {
|
||||
server.aof_flush_postponed_start = 0;
|
||||
server.aof_rewrite_incremental_fsync = CONFIG_DEFAULT_AOF_REWRITE_INCREMENTAL_FSYNC;
|
||||
server.aof_load_truncated = CONFIG_DEFAULT_AOF_LOAD_TRUNCATED;
|
||||
server.aof_use_rdb_preamble = CONFIG_DEFAULT_AOF_USE_RDB_PREAMBLE;
|
||||
server.pidfile = NULL;
|
||||
server.rdb_filename = zstrdup(CONFIG_DEFAULT_RDB_FILENAME);
|
||||
server.aof_filename = zstrdup(CONFIG_DEFAULT_AOF_FILENAME);
|
||||
|
@ -137,6 +137,7 @@ typedef long long mstime_t; /* millisecond time type. */
|
||||
#define CONFIG_DEFAULT_AOF_FILENAME "appendonly.aof"
|
||||
#define CONFIG_DEFAULT_AOF_NO_FSYNC_ON_REWRITE 0
|
||||
#define CONFIG_DEFAULT_AOF_LOAD_TRUNCATED 1
|
||||
#define CONFIG_DEFAULT_AOF_USE_RDB_PREAMBLE 0
|
||||
#define CONFIG_DEFAULT_ACTIVE_REHASHING 1
|
||||
#define CONFIG_DEFAULT_AOF_REWRITE_INCREMENTAL_FSYNC 1
|
||||
#define CONFIG_DEFAULT_MIN_SLAVES_TO_WRITE 0
|
||||
@ -901,6 +902,7 @@ struct redisServer {
|
||||
int aof_last_write_status; /* C_OK or C_ERR */
|
||||
int aof_last_write_errno; /* Valid if aof_last_write_status is ERR */
|
||||
int aof_load_truncated; /* Don't stop on unexpected AOF EOF. */
|
||||
int aof_use_rdb_preamble; /* Use RDB preamble on AOF rewrites. */
|
||||
/* AOF pipes used to communicate between parent and child during rewrite. */
|
||||
int aof_pipe_write_data_to_child;
|
||||
int aof_pipe_read_data_from_parent;
|
||||
@ -1379,6 +1381,7 @@ int startAppendOnly(void);
|
||||
void backgroundRewriteDoneHandler(int exitcode, int bysignal);
|
||||
void aofRewriteBufferReset(void);
|
||||
unsigned long aofRewriteBufferSize(void);
|
||||
ssize_t aofReadDiffFromParent(void);
|
||||
|
||||
/* Sorted sets data type */
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user