diskless replication on slave side (don't store rdb to file), plus some other related fixes

The implementation of the diskless replication was currently diskless only on the master side.
The slave side was still storing the received rdb file to the disk before loading it back in and parsing it.

This commit adds two modes to load rdb directly from socket:
1) when-empty
2) using "swapdb"
the third mode of using diskless slave by flushdb is risky and currently not included.

other changes:
--------------
distinguish between aof configuration and state so that we can re-enable aof only when sync eventually
succeeds (and not when exiting from readSyncBulkPayload after a failed attempt)
also a CONFIG GET and INFO during rdb loading would have lied

When loading rdb from the network, don't kill the server on short read (that can be a network error)

Fix rdb check when performed on preamble AOF

tests:
run replication tests for diskless slave too
make replication test a bit more aggressive
Add test for diskless load swapdb
This commit is contained in:
Oran Agra 2019-07-01 15:22:29 +03:00
parent 722446510f
commit 2de544cfcc
17 changed files with 648 additions and 252 deletions

View File

@ -377,6 +377,22 @@ repl-diskless-sync no
# it entirely just set it to 0 seconds and the transfer will start ASAP.
repl-diskless-sync-delay 5
# Replica can load the rdb it reads from the replication link directly from the
# socket, or store the rdb to a file and read that file after it was completely
# recived from the master.
# In many cases the disk is slower than the network, and storing and loading
# the rdb file may increase replication time (and even increase the master's
# Copy on Write memory and salve buffers).
# However, parsing the rdb file directly from the socket may mean that we have
# to flush the contents of the current database before the full rdb was received.
# for this reason we have the following options:
# "disabled" - Don't use diskless load (store the rdb file to the disk first)
# "on-empty-db" - Use diskless load only when it is completely safe.
# "swapdb" - Keep a copy of the current db contents in RAM while parsing
# the data directly from the socket. note that this requires
# sufficient memory, if you don't have it, you risk an OOM kill.
repl-diskless-load disabled
# Replicas send PINGs to server in a predefined interval. It's possible to change
# this interval with the repl_ping_replica_period option. The default value is 10
# seconds.

View File

@ -193,6 +193,20 @@ int anetSendTimeout(char *err, int fd, long long ms) {
return ANET_OK;
}
/* Set the socket receive timeout (SO_RCVTIMEO socket option) to the specified
* number of milliseconds, or disable it if the 'ms' argument is zero. */
int anetRecvTimeout(char *err, int fd, long long ms) {
struct timeval tv;
tv.tv_sec = ms/1000;
tv.tv_usec = (ms%1000)*1000;
if (setsockopt(fd, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv)) == -1) {
anetSetError(err, "setsockopt SO_RCVTIMEO: %s", strerror(errno));
return ANET_ERR;
}
return ANET_OK;
}
/* anetGenericResolve() is called by anetResolve() and anetResolveIP() to
* do the actual work. It resolves the hostname "host" and set the string
* representation of the IP address into the buffer pointed by "ipbuf".

View File

@ -70,6 +70,7 @@ int anetEnableTcpNoDelay(char *err, int fd);
int anetDisableTcpNoDelay(char *err, int fd);
int anetTcpKeepAlive(char *err, int fd);
int anetSendTimeout(char *err, int fd, long long ms);
int anetRecvTimeout(char *err, int fd, long long ms);
int anetPeerToString(int fd, char *ip, size_t ip_len, int *port);
int anetKeepAlive(char *err, int fd, int interval);
int anetSockName(int fd, char *ip, size_t ip_len, int *port);

View File

@ -729,7 +729,7 @@ int loadAppendOnlyFile(char *filename) {
server.aof_state = AOF_OFF;
fakeClient = createFakeClient();
startLoading(fp);
startLoadingFile(fp, filename);
/* Check if this AOF file has an RDB preamble. In that case we need to
* load the RDB file and later continue loading the AOF tail. */

View File

@ -91,6 +91,13 @@ configEnum aof_fsync_enum[] = {
{NULL, 0}
};
configEnum repl_diskless_load_enum[] = {
{"disabled", REPL_DISKLESS_LOAD_DISABLED},
{"on-empty-db", REPL_DISKLESS_LOAD_WHEN_DB_EMPTY},
{"swapdb", REPL_DISKLESS_LOAD_SWAPDB},
{NULL, 0}
};
/* Output buffer limits presets. */
clientBufferLimitsConfig clientBufferLimitsDefaults[CLIENT_TYPE_OBUF_COUNT] = {
{0, 0, 0}, /* normal */
@ -427,6 +434,11 @@ void loadServerConfigFromString(char *config) {
err = "repl-timeout must be 1 or greater";
goto loaderr;
}
} else if (!strcasecmp(argv[0],"repl-diskless-load") && argc==2) {
server.repl_diskless_load = configEnumGetValue(repl_diskless_load_enum,argv[1]);
if (server.repl_diskless_load == INT_MIN) {
err = "argument must be 'disabled', 'on-empty-db', 'swapdb' or 'flushdb'";
}
} else if (!strcasecmp(argv[0],"repl-diskless-sync-delay") && argc==2) {
server.repl_diskless_sync_delay = atoi(argv[1]);
if (server.repl_diskless_sync_delay < 0) {
@ -466,12 +478,10 @@ void loadServerConfigFromString(char *config) {
if (server.config_hz < CONFIG_MIN_HZ) server.config_hz = CONFIG_MIN_HZ;
if (server.config_hz > CONFIG_MAX_HZ) server.config_hz = CONFIG_MAX_HZ;
} else if (!strcasecmp(argv[0],"appendonly") && argc == 2) {
int yes;
if ((yes = yesnotoi(argv[1])) == -1) {
if ((server.aof_enabled = yesnotoi(argv[1])) == -1) {
err = "argument must be 'yes' or 'no'"; goto loaderr;
}
server.aof_state = yes ? AOF_ON : AOF_OFF;
server.aof_state = server.aof_enabled ? AOF_ON : AOF_OFF;
} else if (!strcasecmp(argv[0],"appendfilename") && argc == 2) {
if (!pathIsBaseName(argv[1])) {
err = "appendfilename can't be a path, just a filename";
@ -497,6 +507,12 @@ void loadServerConfigFromString(char *config) {
argc == 2)
{
server.aof_rewrite_min_size = memtoll(argv[1],NULL);
} else if (!strcasecmp(argv[0],"rdb-key-save-delay") && argc==2) {
server.rdb_key_save_delay = atoi(argv[1]);
if (server.rdb_key_save_delay < 0) {
err = "rdb-key-save-delay can't be negative";
goto loaderr;
}
} else if (!strcasecmp(argv[0],"requirepass") && argc == 2) {
if (strlen(argv[1]) > CONFIG_AUTHPASS_MAX_LEN) {
err = "Password is longer than CONFIG_AUTHPASS_MAX_LEN";
@ -942,6 +958,7 @@ void configSetCommand(client *c) {
int enable = yesnotoi(o->ptr);
if (enable == -1) goto badfmt;
server.aof_enabled = enable;
if (enable == 0 && server.aof_state != AOF_OFF) {
stopAppendOnly();
} else if (enable && server.aof_state == AOF_OFF) {
@ -1132,6 +1149,8 @@ void configSetCommand(client *c) {
"slave-priority",server.slave_priority,0,INT_MAX) {
} config_set_numerical_field(
"replica-priority",server.slave_priority,0,INT_MAX) {
} config_set_numerical_field(
"rdb-key-save-delay",server.rdb_key_save_delay,0,LLONG_MAX) {
} config_set_numerical_field(
"slave-announce-port",server.slave_announce_port,0,65535) {
} config_set_numerical_field(
@ -1199,6 +1218,8 @@ void configSetCommand(client *c) {
"maxmemory-policy",server.maxmemory_policy,maxmemory_policy_enum) {
} config_set_enum_field(
"appendfsync",server.aof_fsync,aof_fsync_enum) {
} config_set_enum_field(
"repl-diskless-load",server.repl_diskless_load,repl_diskless_load_enum) {
/* Everyhing else is an error... */
} config_set_else {
@ -1346,6 +1367,7 @@ void configGetCommand(client *c) {
config_get_numerical_field("cluster-slave-validity-factor",server.cluster_slave_validity_factor);
config_get_numerical_field("cluster-replica-validity-factor",server.cluster_slave_validity_factor);
config_get_numerical_field("repl-diskless-sync-delay",server.repl_diskless_sync_delay);
config_get_numerical_field("rdb-key-save-delay",server.rdb_key_save_delay);
config_get_numerical_field("tcp-keepalive",server.tcpkeepalive);
/* Bool (yes/no) values */
@ -1370,12 +1392,14 @@ void configGetCommand(client *c) {
server.aof_fsync,aof_fsync_enum);
config_get_enum_field("syslog-facility",
server.syslog_facility,syslog_facility_enum);
config_get_enum_field("repl-diskless-load",
server.repl_diskless_load,repl_diskless_load_enum);
/* Everything we can't handle with macros follows. */
if (stringmatch(pattern,"appendonly",1)) {
addReplyBulkCString(c,"appendonly");
addReplyBulkCString(c,server.aof_state == AOF_OFF ? "no" : "yes");
addReplyBulkCString(c,server.aof_enabled ? "yes" : "no");
matches++;
}
if (stringmatch(pattern,"dir",1)) {
@ -2109,6 +2133,7 @@ int rewriteConfig(char *path) {
rewriteConfigNumericalOption(state,"repl-timeout",server.repl_timeout,CONFIG_DEFAULT_REPL_TIMEOUT);
rewriteConfigBytesOption(state,"repl-backlog-size",server.repl_backlog_size,CONFIG_DEFAULT_REPL_BACKLOG_SIZE);
rewriteConfigBytesOption(state,"repl-backlog-ttl",server.repl_backlog_time_limit,CONFIG_DEFAULT_REPL_BACKLOG_TIME_LIMIT);
rewriteConfigEnumOption(state,"repl-diskless-load",server.repl_diskless_load,repl_diskless_load_enum,CONFIG_DEFAULT_REPL_DISKLESS_LOAD);
rewriteConfigNumericalOption(state,"repl-diskless-sync-delay",server.repl_diskless_sync_delay,CONFIG_DEFAULT_REPL_DISKLESS_SYNC_DELAY);
rewriteConfigNumericalOption(state,"replica-priority",server.slave_priority,CONFIG_DEFAULT_SLAVE_PRIORITY);
rewriteConfigNumericalOption(state,"min-replicas-to-write",server.repl_min_slaves_to_write,CONFIG_DEFAULT_MIN_SLAVES_TO_WRITE);
@ -2128,7 +2153,7 @@ int rewriteConfig(char *path) {
rewriteConfigNumericalOption(state,"active-defrag-cycle-min",server.active_defrag_cycle_min,CONFIG_DEFAULT_DEFRAG_CYCLE_MIN);
rewriteConfigNumericalOption(state,"active-defrag-cycle-max",server.active_defrag_cycle_max,CONFIG_DEFAULT_DEFRAG_CYCLE_MAX);
rewriteConfigNumericalOption(state,"active-defrag-max-scan-fields",server.active_defrag_max_scan_fields,CONFIG_DEFAULT_DEFRAG_MAX_SCAN_FIELDS);
rewriteConfigYesNoOption(state,"appendonly",server.aof_state != AOF_OFF,0);
rewriteConfigYesNoOption(state,"appendonly",server.aof_enabled,0);
rewriteConfigStringOption(state,"appendfilename",server.aof_filename,CONFIG_DEFAULT_AOF_FILENAME);
rewriteConfigEnumOption(state,"appendfsync",server.aof_fsync,aof_fsync_enum,CONFIG_DEFAULT_AOF_FSYNC);
rewriteConfigNumericalOption(state,"auto-aof-rewrite-percentage",server.aof_rewrite_perc,AOF_REWRITE_PERC);
@ -2157,6 +2182,7 @@ int rewriteConfig(char *path) {
rewriteConfigClientoutputbufferlimitOption(state);
rewriteConfigNumericalOption(state,"hz",server.config_hz,CONFIG_DEFAULT_HZ);
rewriteConfigEnumOption(state,"supervised",server.supervised_mode,supervised_mode_enum,SUPERVISED_NONE);
rewriteConfigNumericalOption(state,"rdb-key-save-delay",server.rdb_key_save_delay,CONFIG_DEFAULT_RDB_KEY_SAVE_DELAY);
/* Rewrite Sentinel config if in Sentinel mode. */
if (server.sentinel_mode) rewriteConfigSentinelOption(state);

View File

@ -344,7 +344,7 @@ robj *dbUnshareStringValue(redisDb *db, robj *key, robj *o) {
* On success the fuction returns the number of keys removed from the
* database(s). Otherwise -1 is returned in the specific case the
* DB number is out of range, and errno is set to EINVAL. */
long long emptyDb(int dbnum, int flags, void(callback)(void*)) {
long long emptyDbGeneric(redisDb *dbarray, int dbnum, int flags, void(callback)(void*)) {
int async = (flags & EMPTYDB_ASYNC);
long long removed = 0;
@ -362,12 +362,12 @@ long long emptyDb(int dbnum, int flags, void(callback)(void*)) {
}
for (int j = startdb; j <= enddb; j++) {
removed += dictSize(server.db[j].dict);
removed += dictSize(dbarray[j].dict);
if (async) {
emptyDbAsync(&server.db[j]);
emptyDbAsync(&dbarray[j]);
} else {
dictEmpty(server.db[j].dict,callback);
dictEmpty(server.db[j].expires,callback);
dictEmpty(dbarray[j].dict,callback);
dictEmpty(dbarray[j].expires,callback);
}
}
if (server.cluster_enabled) {
@ -381,6 +381,10 @@ long long emptyDb(int dbnum, int flags, void(callback)(void*)) {
return removed;
}
long long emptyDb(int dbnum, int flags, void(callback)(void*)) {
return emptyDbGeneric(server.db, dbnum, flags, callback);
}
int selectDb(client *c, int id) {
if (id < 0 || id >= server.dbnum)
return C_ERR;
@ -388,6 +392,15 @@ int selectDb(client *c, int id) {
return C_OK;
}
long long dbTotalServerKeyCount() {
long long total = 0;
int j;
for (j = 0; j < server.dbnum; j++) {
total += dictSize(server.db[j].dict);
}
return total;
}
/*-----------------------------------------------------------------------------
* Hooks for key space changes.
*

View File

@ -44,6 +44,7 @@
#define rdbExitReportCorruptRDB(...) rdbCheckThenExit(__LINE__,__VA_ARGS__)
char* rdbFileBeingLoaded = NULL; /* used for rdb checking on read error */
extern int rdbCheckMode;
void rdbCheckError(const char *fmt, ...);
void rdbCheckSetError(const char *fmt, ...);
@ -61,11 +62,17 @@ void rdbCheckThenExit(int linenum, char *reason, ...) {
if (!rdbCheckMode) {
serverLog(LL_WARNING, "%s", msg);
char *argv[2] = {"",server.rdb_filename};
redis_check_rdb_main(2,argv,NULL);
if (rdbFileBeingLoaded) {
char *argv[2] = {"",rdbFileBeingLoaded};
redis_check_rdb_main(2,argv,NULL);
} else {
serverLog(LL_WARNING, "Failure loading rdb format from socket, assuming connection error, resuming operation.");
return;
}
} else {
rdbCheckError("%s",msg);
}
serverLog(LL_WARNING, "Terminating server after rdb file reading failure.");
exit(1);
}
@ -1039,6 +1046,11 @@ int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val, long long expiretime) {
if (rdbSaveObjectType(rdb,val) == -1) return -1;
if (rdbSaveStringObject(rdb,key) == -1) return -1;
if (rdbSaveObject(rdb,val,key) == -1) return -1;
/* Delay return if required (for testing) */
if (server.rdb_key_save_delay)
usleep(server.rdb_key_save_delay);
return 1;
}
@ -1800,18 +1812,23 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, robj *key) {
/* Mark that we are loading in the global state and setup the fields
* needed to provide loading stats. */
void startLoading(FILE *fp) {
struct stat sb;
void startLoading(size_t size) {
/* Load the DB */
server.loading = 1;
server.loading_start_time = time(NULL);
server.loading_loaded_bytes = 0;
if (fstat(fileno(fp), &sb) == -1) {
server.loading_total_bytes = 0;
} else {
server.loading_total_bytes = sb.st_size;
}
server.loading_total_bytes = size;
}
/* Mark that we are loading in the global state and setup the fields
* needed to provide loading stats.
* 'filename' is optional and used for rdb-check on error */
void startLoadingFile(FILE *fp, char* filename) {
struct stat sb;
if (fstat(fileno(fp), &sb) == -1)
sb.st_size = 0;
rdbFileBeingLoaded = filename;
startLoading(sb.st_size);
}
/* Refresh the loading progress info */
@ -1824,6 +1841,7 @@ void loadingProgress(off_t pos) {
/* Loading finished */
void stopLoading(void) {
server.loading = 0;
rdbFileBeingLoaded = NULL;
}
/* Track loading progress in order to serve client's from time to time
@ -2089,7 +2107,7 @@ int rdbLoad(char *filename, rdbSaveInfo *rsi) {
int retval;
if ((fp = fopen(filename,"r")) == NULL) return C_ERR;
startLoading(fp);
startLoadingFile(fp, filename);
rioInitWithFile(&rdb,fp);
retval = rdbLoadRio(&rdb,rsi,0);
fclose(fp);

View File

@ -202,7 +202,7 @@ int redis_check_rdb(char *rdbfilename, FILE *fp) {
}
expiretime = -1;
startLoading(fp);
startLoadingFile(fp, rdbfilename);
while(1) {
robj *key, *val;
@ -314,6 +314,7 @@ int redis_check_rdb(char *rdbfilename, FILE *fp) {
}
if (closefile) fclose(fp);
stopLoading();
return 0;
eoferr: /* unexpected end of file is handled here with a fatal exit */
@ -324,6 +325,7 @@ eoferr: /* unexpected end of file is handled here with a fatal exit */
}
err:
if (closefile) fclose(fp);
stopLoading();
return 1;
}

View File

@ -1113,11 +1113,22 @@ void restartAOFAfterSYNC() {
}
}
static int useDisklessLoad() {
/* compute boolean decision to use diskless load */
return server.repl_diskless_load == REPL_DISKLESS_LOAD_SWAPDB ||
(server.repl_diskless_load == REPL_DISKLESS_LOAD_WHEN_DB_EMPTY && dbTotalServerKeyCount()==0);
}
/* Asynchronously read the SYNC payload we receive from a master */
#define REPL_MAX_WRITTEN_BEFORE_FSYNC (1024*1024*8) /* 8 MB */
void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) {
char buf[4096];
ssize_t nread, readlen, nwritten;
int use_diskless_load;
redisDb *diskless_load_backup = NULL;
int empty_db_flags = server.repl_slave_lazy_flush ? EMPTYDB_ASYNC : EMPTYDB_NO_FLAGS;
int i;
off_t left;
UNUSED(el);
UNUSED(privdata);
@ -1173,90 +1184,177 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) {
* at the next call. */
server.repl_transfer_size = 0;
serverLog(LL_NOTICE,
"MASTER <-> REPLICA sync: receiving streamed RDB from master");
"MASTER <-> REPLICA sync: receiving streamed RDB from master with EOF %s",
useDisklessLoad()? "to parser":"to disk");
} else {
usemark = 0;
server.repl_transfer_size = strtol(buf+1,NULL,10);
serverLog(LL_NOTICE,
"MASTER <-> REPLICA sync: receiving %lld bytes from master",
(long long) server.repl_transfer_size);
"MASTER <-> REPLICA sync: receiving %lld bytes from master %s",
(long long) server.repl_transfer_size,
useDisklessLoad()? "to parser":"to disk");
}
return;
}
/* Read bulk data */
if (usemark) {
readlen = sizeof(buf);
} else {
left = server.repl_transfer_size - server.repl_transfer_read;
readlen = (left < (signed)sizeof(buf)) ? left : (signed)sizeof(buf);
}
use_diskless_load = useDisklessLoad();
if (!use_diskless_load) {
nread = read(fd,buf,readlen);
if (nread <= 0) {
serverLog(LL_WARNING,"I/O error trying to sync with MASTER: %s",
(nread == -1) ? strerror(errno) : "connection lost");
cancelReplicationHandshake();
return;
}
server.stat_net_input_bytes += nread;
/* When a mark is used, we want to detect EOF asap in order to avoid
* writing the EOF mark into the file... */
int eof_reached = 0;
if (usemark) {
/* Update the last bytes array, and check if it matches our delimiter.*/
if (nread >= CONFIG_RUN_ID_SIZE) {
memcpy(lastbytes,buf+nread-CONFIG_RUN_ID_SIZE,CONFIG_RUN_ID_SIZE);
/* read the data from the socket, store it to a file and search for the EOF */
if (usemark) {
readlen = sizeof(buf);
} else {
int rem = CONFIG_RUN_ID_SIZE-nread;
memmove(lastbytes,lastbytes+nread,rem);
memcpy(lastbytes+rem,buf,nread);
left = server.repl_transfer_size - server.repl_transfer_read;
readlen = (left < (signed)sizeof(buf)) ? left : (signed)sizeof(buf);
}
if (memcmp(lastbytes,eofmark,CONFIG_RUN_ID_SIZE) == 0) eof_reached = 1;
}
server.repl_transfer_lastio = server.unixtime;
if ((nwritten = write(server.repl_transfer_fd,buf,nread)) != nread) {
serverLog(LL_WARNING,"Write error or short write writing to the DB dump file needed for MASTER <-> REPLICA synchronization: %s",
(nwritten == -1) ? strerror(errno) : "short write");
goto error;
}
server.repl_transfer_read += nread;
nread = read(fd,buf,readlen);
if (nread <= 0) {
serverLog(LL_WARNING,"I/O error trying to sync with MASTER: %s",
(nread == -1) ? strerror(errno) : "connection lost");
cancelReplicationHandshake();
return;
}
server.stat_net_input_bytes += nread;
/* Delete the last 40 bytes from the file if we reached EOF. */
if (usemark && eof_reached) {
if (ftruncate(server.repl_transfer_fd,
server.repl_transfer_read - CONFIG_RUN_ID_SIZE) == -1)
{
serverLog(LL_WARNING,"Error truncating the RDB file received from the master for SYNC: %s", strerror(errno));
/* When a mark is used, we want to detect EOF asap in order to avoid
* writing the EOF mark into the file... */
int eof_reached = 0;
if (usemark) {
/* Update the last bytes array, and check if it matches our delimiter.*/
if (nread >= CONFIG_RUN_ID_SIZE) {
memcpy(lastbytes,buf+nread-CONFIG_RUN_ID_SIZE,CONFIG_RUN_ID_SIZE);
} else {
int rem = CONFIG_RUN_ID_SIZE-nread;
memmove(lastbytes,lastbytes+nread,rem);
memcpy(lastbytes+rem,buf,nread);
}
if (memcmp(lastbytes,eofmark,CONFIG_RUN_ID_SIZE) == 0) eof_reached = 1;
}
server.repl_transfer_lastio = server.unixtime;
if ((nwritten = write(server.repl_transfer_fd,buf,nread)) != nread) {
serverLog(LL_WARNING,"Write error or short write writing to the DB dump file needed for MASTER <-> REPLICA synchronization: %s",
(nwritten == -1) ? strerror(errno) : "short write");
goto error;
}
server.repl_transfer_read += nread;
/* Delete the last 40 bytes from the file if we reached EOF. */
if (usemark && eof_reached) {
if (ftruncate(server.repl_transfer_fd,
server.repl_transfer_read - CONFIG_RUN_ID_SIZE) == -1)
{
serverLog(LL_WARNING,"Error truncating the RDB file received from the master for SYNC: %s", strerror(errno));
goto error;
}
}
/* Sync data on disk from time to time, otherwise at the end of the transfer
* we may suffer a big delay as the memory buffers are copied into the
* actual disk. */
if (server.repl_transfer_read >=
server.repl_transfer_last_fsync_off + REPL_MAX_WRITTEN_BEFORE_FSYNC)
{
off_t sync_size = server.repl_transfer_read -
server.repl_transfer_last_fsync_off;
rdb_fsync_range(server.repl_transfer_fd,
server.repl_transfer_last_fsync_off, sync_size);
server.repl_transfer_last_fsync_off += sync_size;
}
/* Check if the transfer is now complete */
if (!usemark) {
if (server.repl_transfer_read == server.repl_transfer_size)
eof_reached = 1;
}
if (!eof_reached)
return;
}
/* Sync data on disk from time to time, otherwise at the end of the transfer
* we may suffer a big delay as the memory buffers are copied into the
* actual disk. */
if (server.repl_transfer_read >=
server.repl_transfer_last_fsync_off + REPL_MAX_WRITTEN_BEFORE_FSYNC)
{
off_t sync_size = server.repl_transfer_read -
server.repl_transfer_last_fsync_off;
rdb_fsync_range(server.repl_transfer_fd,
server.repl_transfer_last_fsync_off, sync_size);
server.repl_transfer_last_fsync_off += sync_size;
/* We reach here when the slave is using diskless replication,
* or when we are done reading from the socket to the rdb file. */
serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Flushing old data");
/* We need to stop any AOFRW fork before flusing and parsing
* RDB, otherwise we'll create a copy-on-write disaster. */
if (server.aof_state != AOF_OFF) stopAppendOnly();
signalFlushedDb(-1);
if (use_diskless_load && server.repl_diskless_load == REPL_DISKLESS_LOAD_SWAPDB) {
/* create a backup of the current db */
diskless_load_backup = zmalloc(sizeof(redisDb)*server.dbnum);
for (i=0; i<server.dbnum; i++) {
diskless_load_backup[i] = server.db[i];
server.db[i].dict = dictCreate(&dbDictType,NULL);
server.db[i].expires = dictCreate(&keyptrDictType,NULL);
}
} else {
emptyDb(-1,empty_db_flags,replicationEmptyDbCallback);
}
/* Before loading the DB into memory we need to delete the readable
* handler, otherwise it will get called recursively since
* rdbLoad() will call the event loop to process events from time to
* time for non blocking loading. */
aeDeleteFileEvent(server.el,server.repl_transfer_s,AE_READABLE);
serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Loading DB in memory");
rdbSaveInfo rsi = RDB_SAVE_INFO_INIT;
if (use_diskless_load) {
rio rdb;
rioInitWithFd(&rdb,fd,server.repl_transfer_size);
/* Put the socket in blocking mode to simplify RDB transfer.
* We'll restore it when the RDB is received. */
anetBlock(NULL,fd);
anetRecvTimeout(NULL,fd,server.repl_timeout*1000);
/* Check if the transfer is now complete */
if (!usemark) {
if (server.repl_transfer_read == server.repl_transfer_size)
eof_reached = 1;
}
if (eof_reached) {
int aof_is_enabled = server.aof_state != AOF_OFF;
startLoading(server.repl_transfer_size);
if (rdbLoadRio(&rdb,&rsi,0) != C_OK) {
/* rdbloading failed */
stopLoading();
serverLog(LL_WARNING,"Failed trying to load the MASTER synchronization DB from socket");
cancelReplicationHandshake();
rioFreeFd(&rdb, NULL);
if (server.repl_diskless_load == REPL_DISKLESS_LOAD_SWAPDB) {
/* restore the backed up db */
emptyDbGeneric(server.db,-1,empty_db_flags,replicationEmptyDbCallback);
for (i=0; i<server.dbnum; i++) {
dictRelease(server.db[i].dict);
dictRelease(server.db[i].expires);
server.db[i] = diskless_load_backup[i];
}
zfree(diskless_load_backup);
} else {
/* Remove the half-loaded data */
emptyDb(-1,empty_db_flags,replicationEmptyDbCallback);
}
/* Note that there's no point in restarting the AOF on sync failure,
it'll be restarted when sync succeeds or slave promoted. */
return;
}
stopLoading();
/* rdbloading succeeded */
if (server.repl_diskless_load == REPL_DISKLESS_LOAD_SWAPDB) {
/* delete the backup db that we created before starting to load the new rdb */
emptyDbGeneric(diskless_load_backup,-1,empty_db_flags,replicationEmptyDbCallback);
for (i=0; i<server.dbnum; i++) {
dictRelease(diskless_load_backup[i].dict);
dictRelease(diskless_load_backup[i].expires);
}
zfree(diskless_load_backup);
}
if (usemark) {
if (!rioRead(&rdb,buf,CONFIG_RUN_ID_SIZE) || memcmp(buf,eofmark,CONFIG_RUN_ID_SIZE) != 0) {
serverLog(LL_WARNING,"Replication stream EOF marker is broken");
cancelReplicationHandshake();
rioFreeFd(&rdb, NULL);
return;
}
}
/* get the unread command stream from the rio buffer */
rioFreeFd(&rdb, NULL);
/* Restore the socket as non-blocking. */
anetNonBlock(NULL,fd);
anetRecvTimeout(NULL,fd,0);
} else {
/* Ensure background save doesn't overwrite synced data */
if (server.rdb_child_pid != -1) {
serverLog(LL_NOTICE,
@ -1270,58 +1368,43 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) {
if (rename(server.repl_transfer_tmpfile,server.rdb_filename) == -1) {
serverLog(LL_WARNING,"Failed trying to rename the temp DB into %s in MASTER <-> REPLICA synchronization: %s",
server.rdb_filename, strerror(errno));
server.rdb_filename, strerror(errno));
cancelReplicationHandshake();
return;
}
serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Flushing old data");
/* We need to stop any AOFRW fork before flusing and parsing
* RDB, otherwise we'll create a copy-on-write disaster. */
if(aof_is_enabled) stopAppendOnly();
signalFlushedDb(-1);
emptyDb(
-1,
server.repl_slave_lazy_flush ? EMPTYDB_ASYNC : EMPTYDB_NO_FLAGS,
replicationEmptyDbCallback);
/* Before loading the DB into memory we need to delete the readable
* handler, otherwise it will get called recursively since
* rdbLoad() will call the event loop to process events from time to
* time for non blocking loading. */
aeDeleteFileEvent(server.el,server.repl_transfer_s,AE_READABLE);
serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Loading DB in memory");
rdbSaveInfo rsi = RDB_SAVE_INFO_INIT;
if (rdbLoad(server.rdb_filename,&rsi) != C_OK) {
serverLog(LL_WARNING,"Failed trying to load the MASTER synchronization DB from disk");
cancelReplicationHandshake();
/* Re-enable the AOF if we disabled it earlier, in order to restore
* the original configuration. */
if (aof_is_enabled) restartAOFAfterSYNC();
/* Note that there's no point in restarting the AOF on sync failure,
it'll be restarted when sync succeeds or slave promoted. */
return;
}
/* Final setup of the connected slave <- master link */
zfree(server.repl_transfer_tmpfile);
close(server.repl_transfer_fd);
replicationCreateMasterClient(server.repl_transfer_s,rsi.repl_stream_db);
server.repl_state = REPL_STATE_CONNECTED;
server.repl_down_since = 0;
/* After a full resynchroniziation we use the replication ID and
* offset of the master. The secondary ID / offset are cleared since
* we are starting a new history. */
memcpy(server.replid,server.master->replid,sizeof(server.replid));
server.master_repl_offset = server.master->reploff;
clearReplicationId2();
/* Let's create the replication backlog if needed. Slaves need to
* accumulate the backlog regardless of the fact they have sub-slaves
* or not, in order to behave correctly if they are promoted to
* masters after a failover. */
if (server.repl_backlog == NULL) createReplicationBacklog();
serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Finished with success");
/* Restart the AOF subsystem now that we finished the sync. This
* will trigger an AOF rewrite, and when done will start appending
* to the new file. */
if (aof_is_enabled) restartAOFAfterSYNC();
server.repl_transfer_fd = -1;
server.repl_transfer_tmpfile = NULL;
}
/* Final setup of the connected slave <- master link */
replicationCreateMasterClient(server.repl_transfer_s,rsi.repl_stream_db);
server.repl_state = REPL_STATE_CONNECTED;
server.repl_down_since = 0;
/* After a full resynchroniziation we use the replication ID and
* offset of the master. The secondary ID / offset are cleared since
* we are starting a new history. */
memcpy(server.replid,server.master->replid,sizeof(server.replid));
server.master_repl_offset = server.master->reploff;
clearReplicationId2();
/* Let's create the replication backlog if needed. Slaves need to
* accumulate the backlog regardless of the fact they have sub-slaves
* or not, in order to behave correctly if they are promoted to
* masters after a failover. */
if (server.repl_backlog == NULL) createReplicationBacklog();
serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Finished with success");
/* Restart the AOF subsystem now that we finished the sync. This
* will trigger an AOF rewrite, and when done will start appending
* to the new file. */
if (server.aof_enabled) restartAOFAfterSYNC();
return;
error:
@ -1845,16 +1928,20 @@ void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {
}
/* Prepare a suitable temp file for bulk transfer */
while(maxtries--) {
snprintf(tmpfile,256,
"temp-%d.%ld.rdb",(int)server.unixtime,(long int)getpid());
dfd = open(tmpfile,O_CREAT|O_WRONLY|O_EXCL,0644);
if (dfd != -1) break;
sleep(1);
}
if (dfd == -1) {
serverLog(LL_WARNING,"Opening the temp file needed for MASTER <-> REPLICA synchronization: %s",strerror(errno));
goto error;
if (!useDisklessLoad()) {
while(maxtries--) {
snprintf(tmpfile,256,
"temp-%d.%ld.rdb",(int)server.unixtime,(long int)getpid());
dfd = open(tmpfile,O_CREAT|O_WRONLY|O_EXCL,0644);
if (dfd != -1) break;
sleep(1);
}
if (dfd == -1) {
serverLog(LL_WARNING,"Opening the temp file needed for MASTER <-> REPLICA synchronization: %s",strerror(errno));
goto error;
}
server.repl_transfer_tmpfile = zstrdup(tmpfile);
server.repl_transfer_fd = dfd;
}
/* Setup the non blocking download of the bulk file. */
@ -1871,15 +1958,19 @@ void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {
server.repl_transfer_size = -1;
server.repl_transfer_read = 0;
server.repl_transfer_last_fsync_off = 0;
server.repl_transfer_fd = dfd;
server.repl_transfer_lastio = server.unixtime;
server.repl_transfer_tmpfile = zstrdup(tmpfile);
return;
error:
aeDeleteFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE);
if (dfd != -1) close(dfd);
close(fd);
if (server.repl_transfer_fd != -1)
close(server.repl_transfer_fd);
if (server.repl_transfer_tmpfile)
zfree(server.repl_transfer_tmpfile);
server.repl_transfer_tmpfile = NULL;
server.repl_transfer_fd = -1;
server.repl_transfer_s = -1;
server.repl_state = REPL_STATE_CONNECT;
return;
@ -1933,9 +2024,13 @@ void undoConnectWithMaster(void) {
void replicationAbortSyncTransfer(void) {
serverAssert(server.repl_state == REPL_STATE_TRANSFER);
undoConnectWithMaster();
close(server.repl_transfer_fd);
unlink(server.repl_transfer_tmpfile);
zfree(server.repl_transfer_tmpfile);
if (server.repl_transfer_fd!=-1) {
close(server.repl_transfer_fd);
unlink(server.repl_transfer_tmpfile);
zfree(server.repl_transfer_tmpfile);
server.repl_transfer_tmpfile = NULL;
server.repl_transfer_fd = -1;
}
}
/* This function aborts a non blocking replication attempt if there is one
@ -2045,6 +2140,9 @@ void replicaofCommand(client *c) {
serverLog(LL_NOTICE,"MASTER MODE enabled (user request from '%s')",
client);
sdsfree(client);
/* Restart the AOF subsystem in case we shut it down during a sync when
* we were still a slave. */
if (server.aof_enabled && server.aof_state == AOF_OFF) restartAOFAfterSYNC();
}
} else {
long port;

109
src/rio.c
View File

@ -157,6 +157,113 @@ void rioInitWithFile(rio *r, FILE *fp) {
r->io.file.autosync = 0;
}
/* ------------------- File descriptor implementation ------------------- */
static size_t rioFdWrite(rio *r, const void *buf, size_t len) {
UNUSED(r);
UNUSED(buf);
UNUSED(len);
return 0; /* Error, this target does not yet support writing. */
}
/* Returns 1 or 0 for success/failure. */
static size_t rioFdRead(rio *r, void *buf, size_t len) {
size_t avail = sdslen(r->io.fd.buf)-r->io.fd.pos;
/* if the buffer is too small for the entire request: realloc */
if (sdslen(r->io.fd.buf) + sdsavail(r->io.fd.buf) < len)
r->io.fd.buf = sdsMakeRoomFor(r->io.fd.buf, len - sdslen(r->io.fd.buf));
/* if the remaining unused buffer is not large enough: memmove so that we can read the rest */
if (len > avail && sdsavail(r->io.fd.buf) < len - avail) {
sdsrange(r->io.fd.buf, r->io.fd.pos, -1);
r->io.fd.pos = 0;
}
/* if we don't already have all the data in the sds, read more */
while (len > sdslen(r->io.fd.buf) - r->io.fd.pos) {
size_t buffered = sdslen(r->io.fd.buf) - r->io.fd.pos;
size_t toread = len - buffered;
/* read either what's missing, or PROTO_IOBUF_LEN, the bigger of the two */
if (toread < PROTO_IOBUF_LEN)
toread = PROTO_IOBUF_LEN;
if (toread > sdsavail(r->io.fd.buf))
toread = sdsavail(r->io.fd.buf);
if (r->io.fd.read_limit != 0 &&
r->io.fd.read_so_far + buffered + toread > r->io.fd.read_limit) {
if (r->io.fd.read_limit >= r->io.fd.read_so_far - buffered)
toread = r->io.fd.read_limit - r->io.fd.read_so_far - buffered;
else {
errno = EOVERFLOW;
return 0;
}
}
int retval = read(r->io.fd.fd, (char*)r->io.fd.buf + sdslen(r->io.fd.buf), toread);
if (retval <= 0) {
if (errno == EWOULDBLOCK) errno = ETIMEDOUT;
return 0;
}
sdsIncrLen(r->io.fd.buf, retval);
}
memcpy(buf, (char*)r->io.fd.buf + r->io.fd.pos, len);
r->io.fd.read_so_far += len;
r->io.fd.pos += len;
return len;
}
/* Returns read/write position in file. */
static off_t rioFdTell(rio *r) {
return r->io.fd.read_so_far;
}
/* Flushes any buffer to target device if applicable. Returns 1 on success
* and 0 on failures. */
static int rioFdFlush(rio *r) {
/* Our flush is implemented by the write method, that recognizes a
* buffer set to NULL with a count of zero as a flush request. */
return rioFdWrite(r,NULL,0);
}
static const rio rioFdIO = {
rioFdRead,
rioFdWrite,
rioFdTell,
rioFdFlush,
NULL, /* update_checksum */
0, /* current checksum */
0, /* bytes read or written */
0, /* read/write chunk size */
{ { NULL, 0 } } /* union for io-specific vars */
};
/* create an rio that implements a buffered read from an fd
* read_limit argument stops buffering when the reaching the limit */
void rioInitWithFd(rio *r, int fd, size_t read_limit) {
*r = rioFdIO;
r->io.fd.fd = fd;
r->io.fd.pos = 0;
r->io.fd.read_limit = read_limit;
r->io.fd.read_so_far = 0;
r->io.fd.buf = sdsnewlen(NULL, PROTO_IOBUF_LEN);
sdsclear(r->io.fd.buf);
}
/* release the rio stream.
* optionally returns the unread buffered data. */
void rioFreeFd(rio *r, sds* out_remainingBufferedData) {
if(out_remainingBufferedData && (size_t)r->io.fd.pos < sdslen(r->io.fd.buf)) {
if (r->io.fd.pos > 0)
sdsrange(r->io.fd.buf, r->io.fd.pos, -1);
*out_remainingBufferedData = r->io.fd.buf;
} else {
sdsfree(r->io.fd.buf);
if (out_remainingBufferedData)
*out_remainingBufferedData = NULL;
}
r->io.fd.buf = NULL;
}
/* ------------------- File descriptors set implementation ------------------- */
/* Returns 1 or 0 for success/failure.
@ -300,7 +407,7 @@ void rioGenericUpdateChecksum(rio *r, const void *buf, size_t len) {
* disk I/O concentrated in very little time. When we fsync in an explicit
* way instead the I/O pressure is more distributed across time. */
void rioSetAutoSync(rio *r, off_t bytes) {
serverAssert(r->read == rioFileIO.read);
if(r->write != rioFileIO.write) return;
r->io.file.autosync = bytes;
}

View File

@ -73,6 +73,14 @@ struct _rio {
off_t buffered; /* Bytes written since last fsync. */
off_t autosync; /* fsync after 'autosync' bytes written. */
} file;
/* file descriptor */
struct {
int fd; /* File descriptor. */
off_t pos; /* pos in buf that was returned */
sds buf; /* buffered data */
size_t read_limit; /* don't allow to buffer/read more than that */
size_t read_so_far; /* amount of data read from the rio (not buffered) */
} fd;
/* Multiple FDs target (used to write to N sockets). */
struct {
int *fds; /* File descriptors. */
@ -126,9 +134,11 @@ static inline int rioFlush(rio *r) {
void rioInitWithFile(rio *r, FILE *fp);
void rioInitWithBuffer(rio *r, sds s);
void rioInitWithFd(rio *r, int fd, size_t read_limit);
void rioInitWithFdset(rio *r, int *fds, int numfds);
void rioFreeFdset(rio *r);
void rioFreeFd(rio *r, sds* out_remainingBufferedData);
size_t rioWriteBulkCount(rio *r, char prefix, long count);
size_t rioWriteBulkString(rio *r, const char *buf, size_t len);

View File

@ -2265,6 +2265,7 @@ void initServerConfig(void) {
server.aof_flush_postponed_start = 0;
server.aof_rewrite_incremental_fsync = CONFIG_DEFAULT_AOF_REWRITE_INCREMENTAL_FSYNC;
server.rdb_save_incremental_fsync = CONFIG_DEFAULT_RDB_SAVE_INCREMENTAL_FSYNC;
server.rdb_key_save_delay = CONFIG_DEFAULT_RDB_KEY_SAVE_DELAY;
server.aof_load_truncated = CONFIG_DEFAULT_AOF_LOAD_TRUNCATED;
server.aof_use_rdb_preamble = CONFIG_DEFAULT_AOF_USE_RDB_PREAMBLE;
server.pidfile = NULL;
@ -2334,6 +2335,9 @@ void initServerConfig(void) {
server.cached_master = NULL;
server.master_initial_offset = -1;
server.repl_state = REPL_STATE_NONE;
server.repl_transfer_tmpfile = NULL;
server.repl_transfer_fd = -1;
server.repl_transfer_s = -1;
server.repl_syncio_timeout = CONFIG_REPL_SYNCIO_TIMEOUT;
server.repl_serve_stale_data = CONFIG_DEFAULT_SLAVE_SERVE_STALE_DATA;
server.repl_slave_ro = CONFIG_DEFAULT_SLAVE_READ_ONLY;
@ -2342,6 +2346,7 @@ void initServerConfig(void) {
server.repl_down_since = 0; /* Never connected, repl is down since EVER. */
server.repl_disable_tcp_nodelay = CONFIG_DEFAULT_REPL_DISABLE_TCP_NODELAY;
server.repl_diskless_sync = CONFIG_DEFAULT_REPL_DISKLESS_SYNC;
server.repl_diskless_load = CONFIG_DEFAULT_REPL_DISKLESS_LOAD;
server.repl_diskless_sync_delay = CONFIG_DEFAULT_REPL_DISKLESS_SYNC_DELAY;
server.repl_ping_slave_period = CONFIG_DEFAULT_REPL_PING_SLAVE_PERIOD;
server.repl_timeout = CONFIG_DEFAULT_REPL_TIMEOUT;
@ -4053,7 +4058,7 @@ sds genRedisInfoString(char *section) {
(server.aof_last_write_status == C_OK) ? "ok" : "err",
server.stat_aof_cow_bytes);
if (server.aof_state != AOF_OFF) {
if (server.aof_enabled) {
info = sdscatprintf(info,
"aof_current_size:%lld\r\n"
"aof_base_size:%lld\r\n"

View File

@ -132,6 +132,7 @@ typedef long long mstime_t; /* millisecond time type. */
#define CONFIG_DEFAULT_RDB_FILENAME "dump.rdb"
#define CONFIG_DEFAULT_REPL_DISKLESS_SYNC 0
#define CONFIG_DEFAULT_REPL_DISKLESS_SYNC_DELAY 5
#define CONFIG_DEFAULT_RDB_KEY_SAVE_DELAY 0
#define CONFIG_DEFAULT_SLAVE_SERVE_STALE_DATA 1
#define CONFIG_DEFAULT_SLAVE_READ_ONLY 1
#define CONFIG_DEFAULT_SLAVE_IGNORE_MAXMEMORY 1
@ -394,6 +395,12 @@ typedef long long mstime_t; /* millisecond time type. */
#define AOF_FSYNC_EVERYSEC 2
#define CONFIG_DEFAULT_AOF_FSYNC AOF_FSYNC_EVERYSEC
/* Replication diskless load defines */
#define REPL_DISKLESS_LOAD_DISABLED 0
#define REPL_DISKLESS_LOAD_WHEN_DB_EMPTY 1
#define REPL_DISKLESS_LOAD_SWAPDB 2
#define CONFIG_DEFAULT_REPL_DISKLESS_LOAD REPL_DISKLESS_LOAD_DISABLED
/* Zipped structures related defaults */
#define OBJ_HASH_MAX_ZIPLIST_ENTRIES 512
#define OBJ_HASH_MAX_ZIPLIST_VALUE 64
@ -1158,6 +1165,7 @@ struct redisServer {
int daemonize; /* True if running as a daemon */
clientBufferLimitsConfig client_obuf_limits[CLIENT_TYPE_OBUF_COUNT];
/* AOF persistence */
int aof_enabled; /* AOF configuration */
int aof_state; /* AOF_(ON|OFF|WAIT_REWRITE) */
int aof_fsync; /* Kind of fsync() policy */
char *aof_filename; /* Name of the AOF file */
@ -1214,6 +1222,8 @@ struct redisServer {
int stop_writes_on_bgsave_err; /* Don't allow writes if can't BGSAVE */
int rdb_pipe_write_result_to_parent; /* RDB pipes used to return the state */
int rdb_pipe_read_result_from_child; /* of each slave in diskless SYNC. */
int rdb_key_save_delay; /* Delay in microseconds between keys while
* writing the RDB. (for testings) */
/* Pipe and data structures for child -> parent info sharing. */
int child_info_pipe[2]; /* Pipe used to write the child_info_data. */
struct {
@ -1249,7 +1259,9 @@ struct redisServer {
int repl_min_slaves_to_write; /* Min number of slaves to write. */
int repl_min_slaves_max_lag; /* Max lag of <count> slaves to write. */
int repl_good_slaves_count; /* Number of slaves with lag <= max_lag. */
int repl_diskless_sync; /* Send RDB to slaves sockets directly. */
int repl_diskless_sync; /* Master send RDB to slaves sockets directly. */
int repl_diskless_load; /* Slave parse RDB directly from the socket.
* see REPL_DISKLESS_LOAD_* enum */
int repl_diskless_sync_delay; /* Delay to start a diskless repl BGSAVE. */
/* Replication (slave) */
char *masteruser; /* AUTH with this user and masterauth with master */
@ -1739,7 +1751,8 @@ void replicationCacheMasterUsingMyself(void);
void feedReplicationBacklog(void *ptr, size_t len);
/* Generic persistence functions */
void startLoading(FILE *fp);
void startLoadingFile(FILE* fp, char* filename);
void startLoading(size_t size);
void loadingProgress(off_t pos);
void stopLoading(void);
@ -1996,6 +2009,8 @@ robj *dbUnshareStringValue(redisDb *db, robj *key, robj *o);
#define EMPTYDB_NO_FLAGS 0 /* No flags. */
#define EMPTYDB_ASYNC (1<<0) /* Reclaim memory in another thread. */
long long emptyDb(int dbnum, int flags, void(callback)(void*));
long long emptyDbGeneric(redisDb *dbarray, int dbnum, int flags, void(callback)(void*));
long long dbTotalServerKeyCount();
int selectDb(client *c, int id);
void signalModifiedKey(redisDb *db, robj *key);

View File

@ -1,12 +1,3 @@
proc start_bg_complex_data {host port db ops} {
set tclsh [info nameofexecutable]
exec $tclsh tests/helpers/bg_complex_data.tcl $host $port $db $ops &
}
proc stop_bg_complex_data {handle} {
catch {exec /bin/kill -9 $handle}
}
start_server {tags {"repl"}} {
start_server {} {

View File

@ -1,12 +1,3 @@
proc start_bg_complex_data {host port db ops} {
set tclsh [info nameofexecutable]
exec $tclsh tests/helpers/bg_complex_data.tcl $host $port $db $ops &
}
proc stop_bg_complex_data {handle} {
catch {exec /bin/kill -9 $handle}
}
# Creates a master-slave pair and breaks the link continuously to force
# partial resyncs attempts, all this while flooding the master with
# write queries.
@ -17,7 +8,7 @@ proc stop_bg_complex_data {handle} {
# If reconnect is > 0, the test actually try to break the connection and
# reconnect with the master, otherwise just the initial synchronization is
# checked for consistency.
proc test_psync {descr duration backlog_size backlog_ttl delay cond diskless reconnect} {
proc test_psync {descr duration backlog_size backlog_ttl delay cond mdl sdl reconnect} {
start_server {tags {"repl"}} {
start_server {} {
@ -28,8 +19,9 @@ proc test_psync {descr duration backlog_size backlog_ttl delay cond diskless rec
$master config set repl-backlog-size $backlog_size
$master config set repl-backlog-ttl $backlog_ttl
$master config set repl-diskless-sync $diskless
$master config set repl-diskless-sync $mdl
$master config set repl-diskless-sync-delay 1
$slave config set repl-diskless-load $sdl
set load_handle0 [start_bg_complex_data $master_host $master_port 9 100000]
set load_handle1 [start_bg_complex_data $master_host $master_port 11 100000]
@ -54,7 +46,7 @@ proc test_psync {descr duration backlog_size backlog_ttl delay cond diskless rec
}
}
test "Test replication partial resync: $descr (diskless: $diskless, reconnect: $reconnect)" {
test "Test replication partial resync: $descr (diskless: $mdl, $sdl, reconnect: $reconnect)" {
# Now while the clients are writing data, break the maste-slave
# link multiple times.
if ($reconnect) {
@ -132,23 +124,25 @@ proc test_psync {descr duration backlog_size backlog_ttl delay cond diskless rec
}
}
foreach diskless {no yes} {
test_psync {no reconnection, just sync} 6 1000000 3600 0 {
} $diskless 0
foreach mdl {no yes} {
foreach sdl {disabled swapdb} {
test_psync {no reconnection, just sync} 6 1000000 3600 0 {
} $mdl $sdl 0
test_psync {ok psync} 6 100000000 3600 0 {
test_psync {ok psync} 6 100000000 3600 0 {
assert {[s -1 sync_partial_ok] > 0}
} $diskless 1
} $mdl $sdl 1
test_psync {no backlog} 6 100 3600 0.5 {
test_psync {no backlog} 6 100 3600 0.5 {
assert {[s -1 sync_partial_err] > 0}
} $diskless 1
} $mdl $sdl 1
test_psync {ok after delay} 3 100000000 3600 3 {
test_psync {ok after delay} 3 100000000 3600 3 {
assert {[s -1 sync_partial_ok] > 0}
} $diskless 1
} $mdl $sdl 1
test_psync {backlog expired} 3 100000000 1 3 {
test_psync {backlog expired} 3 100000000 1 3 {
assert {[s -1 sync_partial_err] > 0}
} $diskless 1
} $mdl $sdl 1
}
}

View File

@ -183,85 +183,92 @@ start_server {tags {"repl"}} {
}
}
foreach dl {no yes} {
start_server {tags {"repl"}} {
set master [srv 0 client]
$master config set repl-diskless-sync $dl
set master_host [srv 0 host]
set master_port [srv 0 port]
set slaves {}
set load_handle0 [start_write_load $master_host $master_port 3]
set load_handle1 [start_write_load $master_host $master_port 5]
set load_handle2 [start_write_load $master_host $master_port 20]
set load_handle3 [start_write_load $master_host $master_port 8]
set load_handle4 [start_write_load $master_host $master_port 4]
start_server {} {
lappend slaves [srv 0 client]
foreach mdl {no yes} {
foreach sdl {disabled swapdb} {
start_server {tags {"repl"}} {
set master [srv 0 client]
$master config set repl-diskless-sync $mdl
$master config set repl-diskless-sync-delay 1
set master_host [srv 0 host]
set master_port [srv 0 port]
set slaves {}
set load_handle0 [start_bg_complex_data $master_host $master_port 9 100000000]
set load_handle1 [start_bg_complex_data $master_host $master_port 11 100000000]
set load_handle2 [start_bg_complex_data $master_host $master_port 12 100000000]
set load_handle3 [start_write_load $master_host $master_port 8]
set load_handle4 [start_write_load $master_host $master_port 4]
after 5000 ;# wait for some data to accumulate so that we have RDB part for the fork
start_server {} {
lappend slaves [srv 0 client]
start_server {} {
lappend slaves [srv 0 client]
test "Connect multiple replicas at the same time (issue #141), diskless=$dl" {
# Send SLAVEOF commands to slaves
[lindex $slaves 0] slaveof $master_host $master_port
[lindex $slaves 1] slaveof $master_host $master_port
[lindex $slaves 2] slaveof $master_host $master_port
start_server {} {
lappend slaves [srv 0 client]
test "Connect multiple replicas at the same time (issue #141), master diskless=$mdl, replica diskless=$sdl" {
# Send SLAVEOF commands to slaves
[lindex $slaves 0] config set repl-diskless-load $sdl
[lindex $slaves 1] config set repl-diskless-load $sdl
[lindex $slaves 2] config set repl-diskless-load $sdl
[lindex $slaves 0] slaveof $master_host $master_port
[lindex $slaves 1] slaveof $master_host $master_port
[lindex $slaves 2] slaveof $master_host $master_port
# Wait for all the three slaves to reach the "online"
# state from the POV of the master.
set retry 500
while {$retry} {
set info [r -3 info]
if {[string match {*slave0:*state=online*slave1:*state=online*slave2:*state=online*} $info]} {
break
} else {
incr retry -1
after 100
# Wait for all the three slaves to reach the "online"
# state from the POV of the master.
set retry 500
while {$retry} {
set info [r -3 info]
if {[string match {*slave0:*state=online*slave1:*state=online*slave2:*state=online*} $info]} {
break
} else {
incr retry -1
after 100
}
}
if {$retry == 0} {
error "assertion:Slaves not correctly synchronized"
}
}
if {$retry == 0} {
error "assertion:Replicas not correctly synchronized"
}
# Wait that slaves acknowledge they are online so
# we are sure that DBSIZE and DEBUG DIGEST will not
# fail because of timing issues.
wait_for_condition 500 100 {
[lindex [[lindex $slaves 0] role] 3] eq {connected} &&
[lindex [[lindex $slaves 1] role] 3] eq {connected} &&
[lindex [[lindex $slaves 2] role] 3] eq {connected}
} else {
fail "Replicas still not connected after some time"
# Wait that slaves acknowledge they are online so
# we are sure that DBSIZE and DEBUG DIGEST will not
# fail because of timing issues.
wait_for_condition 500 100 {
[lindex [[lindex $slaves 0] role] 3] eq {connected} &&
[lindex [[lindex $slaves 1] role] 3] eq {connected} &&
[lindex [[lindex $slaves 2] role] 3] eq {connected}
} else {
fail "Slaves still not connected after some time"
}
# Stop the write load
stop_bg_complex_data $load_handle0
stop_bg_complex_data $load_handle1
stop_bg_complex_data $load_handle2
stop_write_load $load_handle3
stop_write_load $load_handle4
# Make sure that slaves and master have same
# number of keys
wait_for_condition 500 100 {
[$master dbsize] == [[lindex $slaves 0] dbsize] &&
[$master dbsize] == [[lindex $slaves 1] dbsize] &&
[$master dbsize] == [[lindex $slaves 2] dbsize]
} else {
fail "Different number of keys between master and replica after too long time."
}
# Check digests
set digest [$master debug digest]
set digest0 [[lindex $slaves 0] debug digest]
set digest1 [[lindex $slaves 1] debug digest]
set digest2 [[lindex $slaves 2] debug digest]
assert {$digest ne 0000000000000000000000000000000000000000}
assert {$digest eq $digest0}
assert {$digest eq $digest1}
assert {$digest eq $digest2}
}
# Stop the write load
stop_write_load $load_handle0
stop_write_load $load_handle1
stop_write_load $load_handle2
stop_write_load $load_handle3
stop_write_load $load_handle4
# Make sure that slaves and master have same
# number of keys
wait_for_condition 500 100 {
[$master dbsize] == [[lindex $slaves 0] dbsize] &&
[$master dbsize] == [[lindex $slaves 1] dbsize] &&
[$master dbsize] == [[lindex $slaves 2] dbsize]
} else {
fail "Different number of keys between masted and replica after too long time."
}
# Check digests
set digest [$master debug digest]
set digest0 [[lindex $slaves 0] debug digest]
set digest1 [[lindex $slaves 1] debug digest]
set digest2 [[lindex $slaves 2] debug digest]
assert {$digest ne 0000000000000000000000000000000000000000}
assert {$digest eq $digest0}
assert {$digest eq $digest1}
assert {$digest eq $digest2}
}
}
}
}
}
}
}
@ -309,3 +316,70 @@ start_server {tags {"repl"}} {
}
}
}
test {slave fails full sync and diskless load swapdb recoveres it} {
start_server {tags {"repl"}} {
set slave [srv 0 client]
set slave_host [srv 0 host]
set slave_port [srv 0 port]
set slave_log [srv 0 stdout]
start_server {} {
set master [srv 0 client]
set master_host [srv 0 host]
set master_port [srv 0 port]
# Put different data sets on the master and slave
# we need to put large keys on the master since the slave replies to info only once in 2mb
$slave debug populate 2000 slave 10
$master debug populate 200 master 100000
$master config set rdbcompression no
# Set master and slave to use diskless replication
$master config set repl-diskless-sync yes
$master config set repl-diskless-sync-delay 0
$slave config set repl-diskless-load swapdb
# Set master with a slow rdb generation, so that we can easily disconnect it mid sync
# 10ms per key, with 200 keys is 2 seconds
$master config set rdb-key-save-delay 10000
# Start the replication process...
$slave slaveof $master_host $master_port
# wait for the slave to start reading the rdb
wait_for_condition 50 100 {
[s -1 loading] eq 1
} else {
fail "Replica didn't get into loading mode"
}
# make sure that next sync will not start immediately so that we can catch the slave in betweeen syncs
$master config set repl-diskless-sync-delay 5
# for faster server shutdown, make rdb saving fast again (the fork is already uses the slow one)
$master config set rdb-key-save-delay 0
# waiting slave to do flushdb (key count drop)
wait_for_condition 50 100 {
2000 != [scan [regexp -inline {keys\=([\d]*)} [$slave info keyspace]] keys=%d]
} else {
fail "Replica didn't flush"
}
# make sure we're still loading
assert_equal [s -1 loading] 1
# kill the slave connection on the master
set killed [$master client kill type slave]
# wait for loading to stop (fail)
wait_for_condition 50 100 {
[s -1 loading] eq 0
} else {
fail "Replica didn't disconnect"
}
# make sure the original keys were restored
assert_equal [$slave dbsize] 2000
}
}
}

View File

@ -399,3 +399,15 @@ proc lshuffle {list} {
}
return $slist
}
# Execute a background process writing complex data for the specified number
# of ops to the specified Redis instance.
proc start_bg_complex_data {host port db ops} {
set tclsh [info nameofexecutable]
exec $tclsh tests/helpers/bg_complex_data.tcl $host $port $db $ops &
}
# Stop a process generating write load executed with start_bg_complex_data.
proc stop_bg_complex_data {handle} {
catch {exec /bin/kill -9 $handle}
}