mirror of
https://codeberg.org/redict/redict.git
synced 2025-01-22 16:18:28 -05:00
Accelerate diskless master connections, and general re-connections (#6271)
Diskless master has some inherent latencies. 1) fork starts with delay from cron rather than immediately 2) replica is put online only after an ACK. but the ACK was sent only once a second. 3) but even if it would arrive immediately, it will not register in case cron didn't yet detect that the fork is done. Besides that, when a replica disconnects, it doesn't immediately attempts to re-connect, it waits for replication cron (one per second). in case it was already online, it may be important to try to re-connect as soon as possible, so that the backlog at the master doesn't vanish. In case it disconnected during rdb transfer, one can argue that it's not very important to re-connect immediately, but this is needed for the "diskless loading short read" test to be able to run 100 iterations in 5 seconds, rather than 3 (waiting for replication cron re-connection) changes in this commit: 1) sync command starts a fork immediately if no sync_delay is configured 2) replica sends REPLCONF ACK when done reading the rdb (rather than on 1s cron) 3) when a replica unexpectedly disconnets, it immediately tries to re-connect rather than waiting 1s 4) when when a child exits, if there is another replica waiting, we spawn a new one right away, instead of waiting for 1s replicationCron. 5) added a call to connectWithMaster from replicationSetMaster. which is called from the REPLICAOF command but also in 3 places in cluster.c, in all of these the connection attempt will now be immediate instead of delayed by 1 second. side note: we can add a call to rdbPipeReadHandler in replconfCommand when getting a REPLCONF ACK from the replica to solve a race where the replica got the entire rdb and EOF marker before we detected that the pipe was closed. in the test i did see this race happens in one about of some 300 runs, but i concluded that this race is unlikely in real life (where the replica is on another host and we're more likely to first detect the pipe was closed. the test runs 100 iterations in 3 seconds, so in some cases it'll take 4 seconds instead (waiting for another REPLCONF ACK). Removing unneeded startBgsaveForReplication from updateSlavesWaitingForBgsave Now that CheckChildrenDone is calling the new replicationStartPendingFork (extracted from serverCron) there's actually no need to call startBgsaveForReplication from updateSlavesWaitingForBgsave anymore, since as soon as updateSlavesWaitingForBgsave returns, CheckChildrenDone is calling replicationStartPendingFork that handles that anyway. The code in updateSlavesWaitingForBgsave had a bug in which it ignored repl-diskless-sync-delay, but removing that code shows that this bug was hiding another bug, which is that the max_idle should have used >= and not >, this one second delay has a big impact on my new test.
This commit is contained in:
parent
81f8524a12
commit
c17e597d05
@ -43,7 +43,7 @@ void replicationDiscardCachedMaster(void);
|
||||
void replicationResurrectCachedMaster(connection *conn);
|
||||
void replicationSendAck(void);
|
||||
void putSlaveOnline(client *slave);
|
||||
int cancelReplicationHandshake(void);
|
||||
int cancelReplicationHandshake(int reconnect);
|
||||
|
||||
/* We take a global flag to remember if this instance generated an RDB
|
||||
* because of replication, so that we can remove the RDB file in case
|
||||
@ -832,6 +832,8 @@ void syncCommand(client *c) {
|
||||
* few seconds to wait for more slaves to arrive. */
|
||||
if (server.repl_diskless_sync_delay)
|
||||
serverLog(LL_NOTICE,"Delay next BGSAVE for diskless SYNC");
|
||||
else
|
||||
startBgsaveForReplication(c->slave_capa);
|
||||
} else {
|
||||
/* Target is disk (or the slave is not capable of supporting
|
||||
* diskless replication) and we don't have a BGSAVE in progress,
|
||||
@ -1225,19 +1227,13 @@ void rdbPipeReadHandler(struct aeEventLoop *eventLoop, int fd, void *clientData,
|
||||
* (if it had a disk or socket target). */
|
||||
void updateSlavesWaitingBgsave(int bgsaveerr, int type) {
|
||||
listNode *ln;
|
||||
int startbgsave = 0;
|
||||
int mincapa = -1;
|
||||
listIter li;
|
||||
|
||||
listRewind(server.slaves,&li);
|
||||
while((ln = listNext(&li))) {
|
||||
client *slave = ln->value;
|
||||
|
||||
if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) {
|
||||
startbgsave = 1;
|
||||
mincapa = (mincapa == -1) ? slave->slave_capa :
|
||||
(mincapa & slave->slave_capa);
|
||||
} else if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END) {
|
||||
if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END) {
|
||||
struct redis_stat buf;
|
||||
|
||||
if (bgsaveerr != C_OK) {
|
||||
@ -1304,7 +1300,6 @@ void updateSlavesWaitingBgsave(int bgsaveerr, int type) {
|
||||
}
|
||||
}
|
||||
}
|
||||
if (startbgsave) startBgsaveForReplication(mincapa);
|
||||
}
|
||||
|
||||
/* Change the current instance replication ID with a new, random one.
|
||||
@ -1568,7 +1563,7 @@ void readSyncBulkPayload(connection *conn) {
|
||||
}
|
||||
serverLog(LL_WARNING,"I/O error trying to sync with MASTER: %s",
|
||||
(nread == -1) ? strerror(errno) : "connection lost");
|
||||
cancelReplicationHandshake();
|
||||
cancelReplicationHandshake(1);
|
||||
return;
|
||||
}
|
||||
server.stat_net_input_bytes += nread;
|
||||
@ -1695,7 +1690,7 @@ void readSyncBulkPayload(connection *conn) {
|
||||
serverLog(LL_WARNING,
|
||||
"Failed trying to load the MASTER synchronization DB "
|
||||
"from socket");
|
||||
cancelReplicationHandshake();
|
||||
cancelReplicationHandshake(1);
|
||||
rioFreeConn(&rdb, NULL);
|
||||
if (server.repl_diskless_load == REPL_DISKLESS_LOAD_SWAPDB) {
|
||||
/* Restore the backed up databases. */
|
||||
@ -1728,7 +1723,7 @@ void readSyncBulkPayload(connection *conn) {
|
||||
memcmp(buf,eofmark,CONFIG_RUN_ID_SIZE) != 0)
|
||||
{
|
||||
serverLog(LL_WARNING,"Replication stream EOF marker is broken");
|
||||
cancelReplicationHandshake();
|
||||
cancelReplicationHandshake(1);
|
||||
rioFreeConn(&rdb, NULL);
|
||||
return;
|
||||
}
|
||||
@ -1758,7 +1753,7 @@ void readSyncBulkPayload(connection *conn) {
|
||||
"Failed trying to rename the temp DB into %s in "
|
||||
"MASTER <-> REPLICA synchronization: %s",
|
||||
server.rdb_filename, strerror(errno));
|
||||
cancelReplicationHandshake();
|
||||
cancelReplicationHandshake(1);
|
||||
if (old_rdb_fd != -1) close(old_rdb_fd);
|
||||
return;
|
||||
}
|
||||
@ -1769,7 +1764,7 @@ void readSyncBulkPayload(connection *conn) {
|
||||
serverLog(LL_WARNING,
|
||||
"Failed trying to load the MASTER synchronization "
|
||||
"DB from disk");
|
||||
cancelReplicationHandshake();
|
||||
cancelReplicationHandshake(1);
|
||||
if (server.rdb_del_sync_files && allPersistenceDisabled()) {
|
||||
serverLog(LL_NOTICE,"Removing the RDB file obtained from "
|
||||
"the master. This replica has persistence "
|
||||
@ -1824,6 +1819,9 @@ void readSyncBulkPayload(connection *conn) {
|
||||
redisCommunicateSystemd("READY=1\n");
|
||||
}
|
||||
|
||||
/* Send the initial ACK immediately to put this replica in online state. */
|
||||
if (usemark) replicationSendAck();
|
||||
|
||||
/* Restart the AOF subsystem now that we finished the sync. This
|
||||
* will trigger an AOF rewrite, and when done will start appending
|
||||
* to the new file. */
|
||||
@ -1831,7 +1829,7 @@ void readSyncBulkPayload(connection *conn) {
|
||||
return;
|
||||
|
||||
error:
|
||||
cancelReplicationHandshake();
|
||||
cancelReplicationHandshake(1);
|
||||
return;
|
||||
}
|
||||
|
||||
@ -2421,6 +2419,7 @@ int connectWithMaster(void) {
|
||||
|
||||
server.repl_transfer_lastio = server.unixtime;
|
||||
server.repl_state = REPL_STATE_CONNECTING;
|
||||
serverLog(LL_NOTICE,"MASTER <-> REPLICA sync started");
|
||||
return C_OK;
|
||||
}
|
||||
|
||||
@ -2456,7 +2455,7 @@ void replicationAbortSyncTransfer(void) {
|
||||
* the replication state (server.repl_state) set to REPL_STATE_CONNECT.
|
||||
*
|
||||
* Otherwise zero is returned and no operation is perforemd at all. */
|
||||
int cancelReplicationHandshake(void) {
|
||||
int cancelReplicationHandshake(int reconnect) {
|
||||
if (server.repl_state == REPL_STATE_TRANSFER) {
|
||||
replicationAbortSyncTransfer();
|
||||
server.repl_state = REPL_STATE_CONNECT;
|
||||
@ -2468,6 +2467,16 @@ int cancelReplicationHandshake(void) {
|
||||
} else {
|
||||
return 0;
|
||||
}
|
||||
|
||||
if (!reconnect)
|
||||
return 1;
|
||||
|
||||
/* try to re-connect without waiting for replicationCron, this is needed
|
||||
* for the "diskless loading short read" test. */
|
||||
serverLog(LL_NOTICE,"Reconnecting to MASTER %s:%d after failure",
|
||||
server.masterhost, server.masterport);
|
||||
connectWithMaster();
|
||||
|
||||
return 1;
|
||||
}
|
||||
|
||||
@ -2476,17 +2485,22 @@ void replicationSetMaster(char *ip, int port) {
|
||||
int was_master = server.masterhost == NULL;
|
||||
|
||||
sdsfree(server.masterhost);
|
||||
server.masterhost = sdsnew(ip);
|
||||
server.masterport = port;
|
||||
server.masterhost = NULL;
|
||||
if (server.master) {
|
||||
freeClient(server.master);
|
||||
}
|
||||
disconnectAllBlockedClients(); /* Clients blocked in master, now slave. */
|
||||
|
||||
/* Setting masterhost only after the call to freeClient since it calls
|
||||
* replicationHandleMasterDisconnection which can trigger a re-connect
|
||||
* directly from within that call. */
|
||||
server.masterhost = sdsnew(ip);
|
||||
server.masterport = port;
|
||||
|
||||
/* Force our slaves to resync with us as well. They may hopefully be able
|
||||
* to partially resync with us, but we can notify the replid change. */
|
||||
disconnectSlaves();
|
||||
cancelReplicationHandshake();
|
||||
cancelReplicationHandshake(0);
|
||||
/* Before destroying our master state, create a cached master using
|
||||
* our own parameters, to later PSYNC with the new master. */
|
||||
if (was_master) {
|
||||
@ -2506,6 +2520,9 @@ void replicationSetMaster(char *ip, int port) {
|
||||
NULL);
|
||||
|
||||
server.repl_state = REPL_STATE_CONNECT;
|
||||
serverLog(LL_NOTICE,"Connecting to MASTER %s:%d",
|
||||
server.masterhost, server.masterport);
|
||||
connectWithMaster();
|
||||
}
|
||||
|
||||
/* Cancel replication, setting the instance as a master itself. */
|
||||
@ -2518,11 +2535,13 @@ void replicationUnsetMaster(void) {
|
||||
REDISMODULE_SUBEVENT_MASTER_LINK_DOWN,
|
||||
NULL);
|
||||
|
||||
/* Clear masterhost first, since the freeClient calls
|
||||
* replicationHandleMasterDisconnection which can attempt to re-connect. */
|
||||
sdsfree(server.masterhost);
|
||||
server.masterhost = NULL;
|
||||
if (server.master) freeClient(server.master);
|
||||
replicationDiscardCachedMaster();
|
||||
cancelReplicationHandshake();
|
||||
cancelReplicationHandshake(0);
|
||||
/* When a slave is turned into a master, the current replication ID
|
||||
* (that was inherited from the master at synchronization time) is
|
||||
* used as secondary ID up to the current offset, and a new replication
|
||||
@ -2576,6 +2595,14 @@ void replicationHandleMasterDisconnection(void) {
|
||||
/* We lost connection with our master, don't disconnect slaves yet,
|
||||
* maybe we'll be able to PSYNC with our master later. We'll disconnect
|
||||
* the slaves only if we'll have to do a full resync with our master. */
|
||||
|
||||
/* Try to re-connect immediately rather than wait for replicationCron
|
||||
* waiting 1 second may risk backlog being recycled. */
|
||||
if (server.masterhost) {
|
||||
serverLog(LL_NOTICE,"Reconnecting to MASTER %s:%d",
|
||||
server.masterhost, server.masterport);
|
||||
connectWithMaster();
|
||||
}
|
||||
}
|
||||
|
||||
void replicaofCommand(client *c) {
|
||||
@ -3121,7 +3148,7 @@ void replicationCron(void) {
|
||||
(time(NULL)-server.repl_transfer_lastio) > server.repl_timeout)
|
||||
{
|
||||
serverLog(LL_WARNING,"Timeout connecting to the MASTER...");
|
||||
cancelReplicationHandshake();
|
||||
cancelReplicationHandshake(1);
|
||||
}
|
||||
|
||||
/* Bulk transfer I/O timeout? */
|
||||
@ -3129,7 +3156,7 @@ void replicationCron(void) {
|
||||
(time(NULL)-server.repl_transfer_lastio) > server.repl_timeout)
|
||||
{
|
||||
serverLog(LL_WARNING,"Timeout receiving bulk data from MASTER... If the problem persists try to set the 'repl-timeout' parameter in redis.conf to a larger value.");
|
||||
cancelReplicationHandshake();
|
||||
cancelReplicationHandshake(1);
|
||||
}
|
||||
|
||||
/* Timed out master when we are an already connected slave? */
|
||||
@ -3144,9 +3171,7 @@ void replicationCron(void) {
|
||||
if (server.repl_state == REPL_STATE_CONNECT) {
|
||||
serverLog(LL_NOTICE,"Connecting to MASTER %s:%d",
|
||||
server.masterhost, server.masterport);
|
||||
if (connectWithMaster() == C_OK) {
|
||||
serverLog(LL_NOTICE,"MASTER <-> REPLICA sync started");
|
||||
}
|
||||
connectWithMaster();
|
||||
}
|
||||
|
||||
/* Send ACK to master from time to time.
|
||||
@ -3280,6 +3305,18 @@ void replicationCron(void) {
|
||||
replicationScriptCacheFlush();
|
||||
}
|
||||
|
||||
replicationStartPendingFork();
|
||||
|
||||
/* Remove the RDB file used for replication if Redis is not running
|
||||
* with any persistence. */
|
||||
removeRDBUsedToSyncReplicas();
|
||||
|
||||
/* Refresh the number of slaves with lag <= min-slaves-max-lag. */
|
||||
refreshGoodSlavesCount();
|
||||
replication_cron_loops++; /* Incremented with frequency 1 HZ. */
|
||||
}
|
||||
|
||||
void replicationStartPendingFork(void) {
|
||||
/* Start a BGSAVE good for replication if we have slaves in
|
||||
* WAIT_BGSAVE_START state.
|
||||
*
|
||||
@ -3307,7 +3344,7 @@ void replicationCron(void) {
|
||||
|
||||
if (slaves_waiting &&
|
||||
(!server.repl_diskless_sync ||
|
||||
max_idle > server.repl_diskless_sync_delay))
|
||||
max_idle >= server.repl_diskless_sync_delay))
|
||||
{
|
||||
/* Start the BGSAVE. The called function may start a
|
||||
* BGSAVE with socket target or disk target depending on the
|
||||
@ -3315,12 +3352,4 @@ void replicationCron(void) {
|
||||
startBgsaveForReplication(mincapa);
|
||||
}
|
||||
}
|
||||
|
||||
/* Remove the RDB file used for replication if Redis is not running
|
||||
* with any persistence. */
|
||||
removeRDBUsedToSyncReplicas();
|
||||
|
||||
/* Refresh the number of slaves with lag <= min-slaves-max-lag. */
|
||||
refreshGoodSlavesCount();
|
||||
replication_cron_loops++; /* Incremented with frequency 1 HZ. */
|
||||
}
|
||||
|
@ -1828,6 +1828,9 @@ void checkChildrenDone(void) {
|
||||
}
|
||||
updateDictResizePolicy();
|
||||
closeChildInfoPipe();
|
||||
|
||||
/* start any pending forks immediately. */
|
||||
replicationStartPendingFork();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1807,6 +1807,7 @@ void replicationFeedSlavesFromMasterStream(list *slaves, char *buf, size_t bufle
|
||||
void replicationFeedMonitors(client *c, list *monitors, int dictid, robj **argv, int argc);
|
||||
void updateSlavesWaitingBgsave(int bgsaveerr, int type);
|
||||
void replicationCron(void);
|
||||
void replicationStartPendingFork(void);
|
||||
void replicationHandleMasterDisconnection(void);
|
||||
void replicationCacheMaster(client *c);
|
||||
void resizeReplicationBacklog(long long newsize);
|
||||
|
@ -407,7 +407,12 @@ test {diskless loading short read} {
|
||||
$master config set repl-diskless-sync yes
|
||||
$master config set rdbcompression no
|
||||
$replica config set repl-diskless-load swapdb
|
||||
$master config set hz 500
|
||||
$replica config set hz 500
|
||||
$master config set dynamic-hz no
|
||||
$replica config set dynamic-hz no
|
||||
# Try to fill the master with all types of data types / encodings
|
||||
set start [clock clicks -milliseconds]
|
||||
for {set k 0} {$k < 3} {incr k} {
|
||||
for {set i 0} {$i < 10} {incr i} {
|
||||
r set "$k int_$i" [expr {int(rand()*10000)}]
|
||||
@ -429,14 +434,21 @@ test {diskless loading short read} {
|
||||
}
|
||||
}
|
||||
|
||||
if {$::verbose} {
|
||||
set end [clock clicks -milliseconds]
|
||||
set duration [expr $end - $start]
|
||||
puts "filling took $duration ms (TODO: use pipeline)"
|
||||
set start [clock clicks -milliseconds]
|
||||
}
|
||||
|
||||
# Start the replication process...
|
||||
set loglines [count_log_lines -1]
|
||||
$master config set repl-diskless-sync-delay 0
|
||||
$replica replicaof $master_host $master_port
|
||||
|
||||
# kill the replication at various points
|
||||
set attempts 3
|
||||
if {$::accurate} { set attempts 10 }
|
||||
set attempts 100
|
||||
if {$::accurate} { set attempts 500 }
|
||||
for {set i 0} {$i < $attempts} {incr i} {
|
||||
# wait for the replica to start reading the rdb
|
||||
# using the log file since the replica only responds to INFO once in 2mb
|
||||
@ -469,6 +481,11 @@ test {diskless loading short read} {
|
||||
fail "Replica didn't disconnect"
|
||||
}
|
||||
}
|
||||
if {$::verbose} {
|
||||
set end [clock clicks -milliseconds]
|
||||
set duration [expr $end - $start]
|
||||
puts "test took $duration ms"
|
||||
}
|
||||
# enable fast shutdown
|
||||
$master config set rdb-key-save-delay 0
|
||||
}
|
||||
|
@ -62,18 +62,30 @@ tags "modules" {
|
||||
$master config set repl-diskless-sync yes
|
||||
$master config set rdbcompression no
|
||||
$replica config set repl-diskless-load swapdb
|
||||
$master config set hz 500
|
||||
$replica config set hz 500
|
||||
$master config set dynamic-hz no
|
||||
$replica config set dynamic-hz no
|
||||
set start [clock clicks -milliseconds]
|
||||
for {set k 0} {$k < 30} {incr k} {
|
||||
r testrdb.set.key key$k [string repeat A [expr {int(rand()*1000000)}]]
|
||||
}
|
||||
|
||||
if {$::verbose} {
|
||||
set end [clock clicks -milliseconds]
|
||||
set duration [expr $end - $start]
|
||||
puts "filling took $duration ms (TODO: use pipeline)"
|
||||
set start [clock clicks -milliseconds]
|
||||
}
|
||||
|
||||
# Start the replication process...
|
||||
set loglines [count_log_lines -1]
|
||||
$master config set repl-diskless-sync-delay 0
|
||||
$replica replicaof $master_host $master_port
|
||||
|
||||
# kill the replication at various points
|
||||
set attempts 3
|
||||
if {$::accurate} { set attempts 10 }
|
||||
set attempts 100
|
||||
if {$::accurate} { set attempts 500 }
|
||||
for {set i 0} {$i < $attempts} {incr i} {
|
||||
# wait for the replica to start reading the rdb
|
||||
# using the log file since the replica only responds to INFO once in 2mb
|
||||
@ -106,6 +118,11 @@ tags "modules" {
|
||||
fail "Replica didn't disconnect"
|
||||
}
|
||||
}
|
||||
if {$::verbose} {
|
||||
set end [clock clicks -milliseconds]
|
||||
set duration [expr $end - $start]
|
||||
puts "test took $duration ms"
|
||||
}
|
||||
# enable fast shutdown
|
||||
$master config set rdb-key-save-delay 0
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user