Fix PSYNC2 incomplete command bug as described in #3899.

This bug was discovered by @kevinmcgehee and constituted a major hidden
bug in the PSYNC2 implementation, caused by the propagation from the
master of incomplete commands to slaves.

The bug had several results:

1. Borrowing from Kevin text in the issue: "Given that slaves blindly
copy over their master's input into their own replication backlog over
successive read syscalls, it's possible that with large commands or
small TCP buffers, partial commands are present in this buffer. If the
master were to fail before successfully propagating the entire command
to a slave, the slaves will never execute the partial command (since the
client is invalidated) but will copy it to replication backlog which may
relay those invalid bytes to its slaves on PSYNC2, corrupting the
backlog and possibly other valid commands that follow the failover.
Simple command boundaries aren't sufficient to capture this, either,
because in the case of a MULTI/EXEC block, if the master successfully
propagates a subset of the commands but not the EXEC, then the
transaction in the backlog becomes corrupt and could corrupt other
slaves that consume this data."

2. As identified by @yangsiran later, there is another effect of the
bug. For the same mechanism of the first problem, a slave having another
slave, could receive a full resynchronization request with an already
half-applied command in the backlog. Once the RDB is ready, it will be
sent to the slave, and the replication will continue sending to the
sub-slave the other half of the command, which is not valid.

The fix, designed by @yangsiran and @antirez, and implemented by
@antirez, uses a secondary buffer in order to feed the sub-masters and
update the replication backlog and offsets, only when a given part of
the query buffer is actually *applied* to the state of the instance,
that is, when the command gets processed and the command is not pending
in the Redis transaction buffer because of CLIENT_MULTI state.

Given that now the backlog and offsets representation are in agreement
with the actual processed commands, both issue 1 and 2 should no longer
be possible.

Thanks to @kevinmcgehee, @yangsiran and @oranagra for their work in
identifying and designing a fix for this problem.
This commit is contained in:
antirez 2017-04-19 10:25:45 +02:00
parent 27fe8e9fb2
commit 22be435efe
3 changed files with 47 additions and 8 deletions

View File

@ -93,6 +93,7 @@ client *createClient(int fd) {
c->name = NULL;
c->bufpos = 0;
c->querybuf = sdsempty();
c->pending_querybuf = sdsempty();
c->querybuf_peak = 0;
c->reqtype = 0;
c->argc = 0;
@ -107,6 +108,7 @@ client *createClient(int fd) {
c->replstate = REPL_STATE_NONE;
c->repl_put_online_on_ack = 0;
c->reploff = 0;
c->read_reploff = 0;
c->repl_ack_off = 0;
c->repl_ack_time = 0;
c->slave_listening_port = 0;
@ -796,6 +798,7 @@ void freeClient(client *c) {
/* Free the query buffer */
sdsfree(c->querybuf);
sdsfree(c->pending_querybuf);
c->querybuf = NULL;
/* Deallocate structures used to block on blocking ops. */
@ -1318,8 +1321,13 @@ void processInputBuffer(client *c) {
resetClient(c);
} else {
/* Only reset the client when the command was executed. */
if (processCommand(c) == C_OK)
if (processCommand(c) == C_OK) {
if (c->flags & CLIENT_MASTER && !(c->flags & CLIENT_MULTI)) {
/* Update the applied replication offset of our master. */
c->reploff = c->read_reploff - sdslen(c->querybuf);
}
resetClient(c);
}
/* freeMemoryIfNeeded may flush slave output buffers. This may result
* into a slave, that may be the active client, to be freed. */
if (server.current_client == NULL) break;
@ -1366,15 +1374,17 @@ void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
serverLog(LL_VERBOSE, "Client closed connection");
freeClient(c);
return;
} else if (c->flags & CLIENT_MASTER) {
/* Append the query buffer to the pending (not applied) buffer
* of the master. We'll use this buffer later in order to have a
* copy of the string applied by the last command executed. */
c->pending_querybuf = sdscatlen(c->pending_querybuf,
c->querybuf+qblen,nread);
}
sdsIncrLen(c->querybuf,nread);
c->lastinteraction = server.unixtime;
if (c->flags & CLIENT_MASTER) {
c->reploff += nread;
replicationFeedSlavesFromMasterStream(server.slaves,
c->querybuf+qblen,nread);
}
if (c->flags & CLIENT_MASTER) c->read_reploff += nread;
server.stat_net_input_bytes += nread;
if (sdslen(c->querybuf) > server.client_max_querybuf_len) {
sds ci = catClientInfoString(sdsempty(),c), bytes = sdsempty();
@ -1386,7 +1396,25 @@ void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
freeClient(c);
return;
}
processInputBuffer(c);
/* Time to process the buffer. If the client is a master we need to
* compute the difference between the applied offset before and after
* processing the buffer, to understand how much of the replication stream
* was actually applied to the master state: this quantity, and its
* corresponding part of the replication stream, will be propagated to
* the sub-slaves and to the replication backlog. */
if (!(c->flags & CLIENT_MASTER)) {
processInputBuffer(c);
} else {
size_t prev_offset = c->reploff;
processInputBuffer(c);
size_t applied = c->reploff - prev_offset;
if (applied) {
replicationFeedSlavesFromMasterStream(server.slaves,
c->pending_querybuf, applied);
sdsrange(c->pending_querybuf,applied,-1);
}
}
}
void getClientsMaxBuffers(unsigned long *longest_output_list,

View File

@ -1078,6 +1078,7 @@ void replicationCreateMasterClient(int fd, int dbid) {
server.master->flags |= CLIENT_MASTER;
server.master->authenticated = 1;
server.master->reploff = server.master_initial_offset;
server.master->read_reploff = server.master->reploff;
memcpy(server.master->replid, server.master_replid,
sizeof(server.master_replid));
/* If master offset is set to -1, this master is old and is not
@ -2118,6 +2119,12 @@ void replicationCacheMaster(client *c) {
/* Unlink the client from the server structures. */
unlinkClient(c);
/* Fix the master specific fields: we want to discard to non processed
* query buffers and non processed offsets. */
sdsclear(server.master->querybuf);
sdsclear(server.master->pending_querybuf);
server.master->read_reploff = server.master->reploff;
/* Save the master. Server.master will be set to null later by
* replicationHandleMasterDisconnection(). */
server.cached_master = server.master;

View File

@ -663,6 +663,9 @@ typedef struct client {
redisDb *db; /* Pointer to currently SELECTed DB. */
robj *name; /* As set by CLIENT SETNAME. */
sds querybuf; /* Buffer we use to accumulate client queries. */
sds pending_querybuf; /* If this is a master, this buffer represents the
yet not applied replication stream that we
are receiving from the master. */
size_t querybuf_peak; /* Recent (100ms or more) peak of querybuf size. */
int argc; /* Num of arguments of current command. */
robj **argv; /* Arguments of current command. */
@ -685,7 +688,8 @@ typedef struct client {
off_t repldboff; /* Replication DB file offset. */
off_t repldbsize; /* Replication DB file size. */
sds replpreamble; /* Replication DB preamble. */
long long reploff; /* Replication offset if this is our master. */
long long read_reploff; /* Read replication offset if this is a master. */
long long reploff; /* Applied replication offset if this is a master. */
long long repl_ack_off; /* Replication ack offset, if this is a slave. */
long long repl_ack_time;/* Replication ack time, if this is a slave. */
long long psync_initial_offset; /* FULLRESYNC reply offset other slaves