Keep track of meaningful replication offset in replicas too

Now both master and replicas keep track of the last replication offset
that contains meaningful data (ignoring the tailing pings), and both
trim that tail from the replication backlog, and the offset with which
they try to use for psync.

the implication is that if someone missed some pings, or even have
excessive pings that the promoted replica has, it'll still be able to
psync (avoid full sync).

the downside (which was already committed) is that replicas running old
code may fail to psync, since the promoted replica trims pings form it's
backlog.

This commit adds a test that reproduces several cases of promotions and
demotions with stale and non-stale pings

Background:
The mearningful offset on the master was added recently to solve a problem were
the master is left all alone, injecting PINGs into it's backlog when no one is
listening and then gets demoted and tries to replicate from a replica that didn't
have any of the PINGs (or at least not the last ones).

however, consider this case:
master A has two replicas (B and C) replicating directly from it.
there's no traffic at all, and also no network issues, just many pings in the
tail of the backlog. now B gets promoted, A becomes a replica of B, and C
remains a replica of A. when A gets demoted, it trims the pings from its
backlog, and successfully replicate from B. however, C is still aware of
these PINGs, when it'll disconnect and re-connect to A, it'll ask for something
that's not in the backlog anymore (since A trimmed the tail of it's backlog),
and be forced to do a full sync (something it didn't have to do before the
meaningful offset fix).

Besides that, the psync2 test was always failing randomly here and there, it
turns out the reason were PINGs. Investigating it shows the following scenario:

cycle 1: redis #1 is master, and all the rest are direct replicas of #1
cycle 2: redis #2 is promoted to master, #1 is a replica of #2 and #3 is replica of #1
now we see that when #1 is demoted it prints:
17339:S 21 Apr 2020 11:16:38.523 * Using the meaningful offset 3929963 instead of 3929977 to exclude the final PINGs (14 bytes difference)
17339:S 21 Apr 2020 11:16:39.391 * Trying a partial resynchronization (request e2b3f8817735fdfe5fa4626766daa938b61419e5:3929964).
17339:S 21 Apr 2020 11:16:39.392 * Successful partial resynchronization with master.
and when #3 connects to the demoted #2, #2 says:
17339:S 21 Apr 2020 11:16:40.084 * Partial resynchronization not accepted: Requested offset for secondary ID was 3929978, but I can reply up to 3929964

so the issue here is that the meaningful offset feature saved the day for the
demoted master (since it needs to sync from a replica that didn't get the last
ping), but it didn't help one of the other replicas which did get the last ping.
This commit is contained in:
Oran Agra 2020-04-23 15:04:42 +03:00 committed by antirez
parent 3497fd007f
commit 4447ddc8bb
5 changed files with 213 additions and 93 deletions

View File

@ -110,7 +110,7 @@ void processUnblockedClients(void) {
* the code is conceptually more correct this way. */
if (!(c->flags & CLIENT_BLOCKED)) {
if (c->querybuf && sdslen(c->querybuf) > 0) {
processInputBufferAndReplicate(c);
processInputBuffer(c);
}
}
}

View File

@ -1671,12 +1671,55 @@ int processMultibulkBuffer(client *c) {
return C_ERR;
}
/* Perform necessary tasks after a command was executed:
*
* 1. The client is reset unless there are reasons to avoid doing it.
* 2. In the case of master clients, the replication offset is updated.
* 3. Propagate commands we got from our master to replicas down the line. */
void commandProcessed(client *c) {
int cmd_is_ping = c->cmd && c->cmd->proc == pingCommand;
long long prev_offset = c->reploff;
if (c->flags & CLIENT_MASTER && !(c->flags & CLIENT_MULTI)) {
/* Update the applied replication offset of our master. */
c->reploff = c->read_reploff - sdslen(c->querybuf) + c->qb_pos;
}
/* Don't reset the client structure for clients blocked in a
* module blocking command, so that the reply callback will
* still be able to access the client argv and argc field.
* The client will be reset in unblockClientFromModule(). */
if (!(c->flags & CLIENT_BLOCKED) ||
c->btype != BLOCKED_MODULE)
{
resetClient(c);
}
/* If the client is a master we need to compute the difference
* between the applied offset before and after processing the buffer,
* to understand how much of the replication stream was actually
* applied to the master state: this quantity, and its corresponding
* part of the replication stream, will be propagated to the
* sub-replicas and to the replication backlog. */
if (c->flags & CLIENT_MASTER) {
long long applied = c->reploff - prev_offset;
long long prev_master_repl_meaningful_offset = server.master_repl_meaningful_offset;
if (applied) {
replicationFeedSlavesFromMasterStream(server.slaves,
c->pending_querybuf, applied);
sdsrange(c->pending_querybuf,applied,-1);
}
/* The server.master_repl_meaningful_offset variable represents
* the offset of the replication stream without the pending PINGs. */
if (cmd_is_ping)
server.master_repl_meaningful_offset = prev_master_repl_meaningful_offset;
}
}
/* This function calls processCommand(), but also performs a few sub tasks
* that are useful in that context:
* for the client that are useful in that context:
*
* 1. It sets the current client to the client 'c'.
* 2. In the case of master clients, the replication offset is updated.
* 3. The client is reset unless there are reasons to avoid doing it.
* 2. calls commandProcessed() if the command was handled.
*
* The function returns C_ERR in case the client was freed as a side effect
* of processing the command, otherwise C_OK is returned. */
@ -1684,20 +1727,7 @@ int processCommandAndResetClient(client *c) {
int deadclient = 0;
server.current_client = c;
if (processCommand(c) == C_OK) {
if (c->flags & CLIENT_MASTER && !(c->flags & CLIENT_MULTI)) {
/* Update the applied replication offset of our master. */
c->reploff = c->read_reploff - sdslen(c->querybuf) + c->qb_pos;
}
/* Don't reset the client structure for clients blocked in a
* module blocking command, so that the reply callback will
* still be able to access the client argv and argc field.
* The client will be reset in unblockClientFromModule(). */
if (!(c->flags & CLIENT_BLOCKED) ||
c->btype != BLOCKED_MODULE)
{
resetClient(c);
}
commandProcessed(c);
}
if (server.current_client == NULL) deadclient = 1;
server.current_client = NULL;
@ -1794,31 +1824,6 @@ void processInputBuffer(client *c) {
}
}
/* This is a wrapper for processInputBuffer that also cares about handling
* the replication forwarding to the sub-replicas, in case the client 'c'
* is flagged as master. Usually you want to call this instead of the
* raw processInputBuffer(). */
void processInputBufferAndReplicate(client *c) {
if (!(c->flags & CLIENT_MASTER)) {
processInputBuffer(c);
} else {
/* If the client is a master we need to compute the difference
* between the applied offset before and after processing the buffer,
* to understand how much of the replication stream was actually
* applied to the master state: this quantity, and its corresponding
* part of the replication stream, will be propagated to the
* sub-replicas and to the replication backlog. */
size_t prev_offset = c->reploff;
processInputBuffer(c);
size_t applied = c->reploff - prev_offset;
if (applied) {
replicationFeedSlavesFromMasterStream(server.slaves,
c->pending_querybuf, applied);
sdsrange(c->pending_querybuf,applied,-1);
}
}
}
void readQueryFromClient(connection *conn) {
client *c = connGetPrivateData(conn);
int nread, readlen;
@ -1886,7 +1891,7 @@ void readQueryFromClient(connection *conn) {
/* There is more data in the client input buffer, continue parsing it
* in case to check if there is a full command to execute. */
processInputBufferAndReplicate(c);
processInputBuffer(c);
}
void getClientsMaxBuffers(unsigned long *longest_output_list,
@ -3101,7 +3106,7 @@ int handleClientsWithPendingReadsUsingThreads(void) {
continue;
}
}
processInputBufferAndReplicate(c);
processInputBuffer(c);
}
return processed;
}

View File

@ -39,6 +39,7 @@
#include <sys/socket.h>
#include <sys/stat.h>
long long adjustMeaningfulReplOffset();
void replicationDiscardCachedMaster(void);
void replicationResurrectCachedMaster(connection *conn);
void replicationSendAck(void);
@ -2693,6 +2694,9 @@ void replicationCacheMaster(client *c) {
* pending outputs to the master. */
sdsclear(server.master->querybuf);
sdsclear(server.master->pending_querybuf);
/* Adjust reploff and read_reploff to the last meaningful offset we executed.
* this is the offset the replica will use for future PSYNC. */
server.master->reploff = adjustMeaningfulReplOffset();
server.master->read_reploff = server.master->reploff;
if (c->flags & CLIENT_MULTI) discardTransaction(c);
listEmpty(c->reply);
@ -2717,6 +2721,38 @@ void replicationCacheMaster(client *c) {
replicationHandleMasterDisconnection();
}
/* If the "meaningful" offset, that is the offset without the final PINGs
* in the stream, is different than the last offset, use it instead:
* often when the master is no longer reachable, replicas will never
* receive the PINGs, however the master will end with an incremented
* offset because of the PINGs and will not be able to incrementally
* PSYNC with the new master.
* This function trims the replication backlog when needed, and returns
* the offset to be used for future partial sync. */
long long adjustMeaningfulReplOffset() {
if (server.master_repl_offset > server.master_repl_meaningful_offset) {
long long delta = server.master_repl_offset -
server.master_repl_meaningful_offset;
serverLog(LL_NOTICE,
"Using the meaningful offset %lld instead of %lld to exclude "
"the final PINGs (%lld bytes difference)",
server.master_repl_meaningful_offset,
server.master_repl_offset,
delta);
server.master_repl_offset = server.master_repl_meaningful_offset;
if (server.repl_backlog_histlen <= delta) {
server.repl_backlog_histlen = 0;
server.repl_backlog_idx = 0;
} else {
server.repl_backlog_histlen -= delta;
server.repl_backlog_idx =
(server.repl_backlog_idx + (server.repl_backlog_size - delta)) %
server.repl_backlog_size;
}
}
return server.master_repl_offset;
}
/* This function is called when a master is turend into a slave, in order to
* create from scratch a cached master for the new client, that will allow
* to PSYNC with the slave that was promoted as the new master after a
@ -2736,35 +2772,7 @@ void replicationCacheMasterUsingMyself(void) {
* by replicationCreateMasterClient(). We'll later set the created
* master as server.cached_master, so the replica will use such
* offset for PSYNC. */
server.master_initial_offset = server.master_repl_offset;
/* However if the "meaningful" offset, that is the offset without
* the final PINGs in the stream, is different, use this instead:
* often when the master is no longer reachable, replicas will never
* receive the PINGs, however the master will end with an incremented
* offset because of the PINGs and will not be able to incrementally
* PSYNC with the new master. */
if (server.master_repl_offset > server.master_repl_meaningful_offset) {
long long delta = server.master_repl_offset -
server.master_repl_meaningful_offset;
serverLog(LL_NOTICE,
"Using the meaningful offset %lld instead of %lld to exclude "
"the final PINGs (%lld bytes difference)",
server.master_repl_meaningful_offset,
server.master_repl_offset,
delta);
server.master_initial_offset = server.master_repl_meaningful_offset;
server.master_repl_offset = server.master_repl_meaningful_offset;
if (server.repl_backlog_histlen <= delta) {
server.repl_backlog_histlen = 0;
server.repl_backlog_idx = 0;
} else {
server.repl_backlog_histlen -= delta;
server.repl_backlog_idx =
(server.repl_backlog_idx + (server.repl_backlog_size - delta)) %
server.repl_backlog_size;
}
}
server.master_initial_offset = adjustMeaningfulReplOffset();
/* The master client we create can be set to any DBID, because
* the new master will start its replication stream with SELECT. */

View File

@ -1600,7 +1600,6 @@ void setDeferredSetLen(client *c, void *node, long length);
void setDeferredAttributeLen(client *c, void *node, long length);
void setDeferredPushLen(client *c, void *node, long length);
void processInputBuffer(client *c);
void processInputBufferAndReplicate(client *c);
void processGopherRequest(client *c);
void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask);
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask);

View File

@ -44,6 +44,7 @@ start_server {} {
set used [list $master_id]
test "PSYNC2: \[NEW LAYOUT\] Set #$master_id as master" {
$R($master_id) slaveof no one
$R($master_id) config set repl-ping-replica-period 1 ;# increse the chance that random ping will cause issues
if {$counter_value == 0} {
$R($master_id) set x $counter_value
}
@ -114,23 +115,20 @@ start_server {} {
}
}
# wait for all the slaves to be in sync with the master
set master_ofs [status $R($master_id) master_repl_offset]
# wait for all the slaves to be in sync with the master, due to pings, we have to re-sample the master constantly too
wait_for_condition 500 100 {
$master_ofs == [status $R(0) master_repl_offset] &&
$master_ofs == [status $R(1) master_repl_offset] &&
$master_ofs == [status $R(2) master_repl_offset] &&
$master_ofs == [status $R(3) master_repl_offset] &&
$master_ofs == [status $R(4) master_repl_offset]
[status $R($master_id) master_repl_offset] == [status $R(0) master_repl_offset] &&
[status $R($master_id) master_repl_offset] == [status $R(1) master_repl_offset] &&
[status $R($master_id) master_repl_offset] == [status $R(2) master_repl_offset] &&
[status $R($master_id) master_repl_offset] == [status $R(3) master_repl_offset] &&
[status $R($master_id) master_repl_offset] == [status $R(4) master_repl_offset]
} else {
if {$debug_msg} {
for {set j 0} {$j < 5} {incr j} {
puts "$j: sync_full: [status $R($j) sync_full]"
puts "$j: id1 : [status $R($j) master_replid]:[status $R($j) master_repl_offset]"
puts "$j: id2 : [status $R($j) master_replid2]:[status $R($j) second_repl_offset]"
puts "$j: backlog : firstbyte=[status $R($j) repl_backlog_first_byte_offset] len=[status $R($j) repl_backlog_histlen]"
puts "---"
}
for {set j 0} {$j < 5} {incr j} {
puts "$j: sync_full: [status $R($j) sync_full]"
puts "$j: id1 : [status $R($j) master_replid]:[status $R($j) master_repl_offset]"
puts "$j: id2 : [status $R($j) master_replid2]:[status $R($j) second_repl_offset]"
puts "$j: backlog : firstbyte=[status $R($j) repl_backlog_first_byte_offset] len=[status $R($j) repl_backlog_histlen]"
puts "---"
}
fail "Slaves are not in sync with the master after too long time."
}
@ -175,9 +173,14 @@ start_server {} {
$R($j) slaveof $master_host $master_port
}
# Wait for slaves to sync
# Wait for replicas to sync. it is not enough to just wait for connected_slaves==4
# since we might do the check before the master realized that they're disconnected
wait_for_condition 50 1000 {
[status $R($master_id) connected_slaves] == 4
[status $R($master_id) connected_slaves] == 4 &&
[status $R([expr {($master_id+1)%5}]) master_link_status] == "up" &&
[status $R([expr {($master_id+2)%5}]) master_link_status] == "up" &&
[status $R([expr {($master_id+3)%5}]) master_link_status] == "up" &&
[status $R([expr {($master_id+4)%5}]) master_link_status] == "up"
} else {
fail "Replica not reconnecting"
}
@ -188,6 +191,7 @@ start_server {} {
set slave_id [expr {($master_id+1)%5}]
set sync_count [status $R($master_id) sync_full]
set sync_partial [status $R($master_id) sync_partial_ok]
set sync_partial_err [status $R($master_id) sync_partial_err]
catch {
$R($slave_id) config rewrite
$R($slave_id) debug restart
@ -197,7 +201,11 @@ start_server {} {
wait_for_condition 50 1000 {
[status $R($master_id) sync_partial_ok] == $sync_partial + 1
} else {
fail "Replica not reconnecting"
puts "prev sync_full: $sync_count"
puts "prev sync_partial_ok: $sync_partial"
puts "prev sync_partial_err: $sync_partial_err"
puts [$R($master_id) info stats]
fail "Replica didn't partial sync"
}
set new_sync_count [status $R($master_id) sync_full]
assert {$sync_count == $new_sync_count}
@ -271,3 +279,103 @@ start_server {} {
}
}}}}}
start_server {tags {"psync2"}} {
start_server {} {
start_server {} {
start_server {} {
start_server {} {
test {pings at the end of replication stream are ignored for psync} {
set master [srv -4 client]
set master_host [srv -4 host]
set master_port [srv -4 port]
set replica1 [srv -3 client]
set replica2 [srv -2 client]
set replica3 [srv -1 client]
set replica4 [srv -0 client]
$replica1 replicaof $master_host $master_port
$replica2 replicaof $master_host $master_port
$replica3 replicaof $master_host $master_port
$replica4 replicaof $master_host $master_port
wait_for_condition 50 1000 {
[status $master connected_slaves] == 4
} else {
fail "replicas didn't connect"
}
$master incr x
wait_for_condition 50 1000 {
[$replica1 get x] == 1 && [$replica2 get x] == 1 &&
[$replica3 get x] == 1 && [$replica4 get x] == 1
} else {
fail "replicas didn't get incr"
}
# disconnect replica1 and replica2
# and wait for the master to send a ping to replica3 and replica4
$replica1 replicaof no one
$replica2 replicaof 127.0.0.1 1 ;# we can't promote it to master since that will cycle the replication id
$master config set repl-ping-replica-period 1
after 1500
# make everyone sync from the replica1 that didn't get the last ping from the old master
# replica4 will keep syncing from the old master which now syncs from replica1
# and replica2 will re-connect to the old master (which went back in time)
set new_master_host [srv -3 host]
set new_master_port [srv -3 port]
$replica3 replicaof $new_master_host $new_master_port
$master replicaof $new_master_host $new_master_port
$replica2 replicaof $master_host $master_port
wait_for_condition 50 1000 {
[status $replica2 master_link_status] == "up" &&
[status $replica3 master_link_status] == "up" &&
[status $replica4 master_link_status] == "up" &&
[status $master master_link_status] == "up"
} else {
fail "replicas didn't connect"
}
# make sure replication is still alive and kicking
$replica1 incr x
wait_for_condition 50 1000 {
[$replica2 get x] == 2 &&
[$replica3 get x] == 2 &&
[$replica4 get x] == 2 &&
[$master get x] == 2
} else {
fail "replicas didn't get incr"
}
# make sure there are full syncs other than the initial ones
assert_equal [status $master sync_full] 4
assert_equal [status $replica1 sync_full] 0
assert_equal [status $replica2 sync_full] 0
assert_equal [status $replica3 sync_full] 0
assert_equal [status $replica4 sync_full] 0
# force psync
$master client kill type master
$replica2 client kill type master
$replica3 client kill type master
$replica4 client kill type master
# make sure replication is still alive and kicking
$replica1 incr x
wait_for_condition 50 1000 {
[$replica2 get x] == 3 &&
[$replica3 get x] == 3 &&
[$replica4 get x] == 3 &&
[$master get x] == 3
} else {
fail "replicas didn't get incr"
}
# make sure there are full syncs other than the initial ones
assert_equal [status $master sync_full] 4
assert_equal [status $replica1 sync_full] 0
assert_equal [status $replica2 sync_full] 0
assert_equal [status $replica3 sync_full] 0
assert_equal [status $replica4 sync_full] 0
}
}}}}}