fix rare replication stream corruption with disk-based replication

The slave sends \n keepalive messages to the master while parsing the rdb,
and later sends REPLCONF ACK once a second. rarely, the master recives both
a linefeed char and a REPLCONF in the same read, \n*3\r\n$8\r\nREPLCONF\r\n...
and it tries to trim two chars (\r\n) from the query buffer,
trimming the '*' from *3\r\n$8\r\nREPLCONF\r\n...

then the master tries to process a command starting with '3' and replies to
the slave a bunch of -ERR and one +OK.
although the slave silently ignores these (prints a log message), this corrupts
the replication offset at the slave since the slave increases the replication
offset, and the master did not.

other than the fix in processInlineBuffer, i did several other improvments
while hunting this very rare bug.

- when redis replies with "unknown command" it includes a portion of the
  arguments, not just the command name. so it would be easier to understand
  what was recived, in my case, on the slave side,  it was -ERR, but
  the "arguments" were the interesting part (containing info on the error).
- about a year ago i added code in addReplyErrorLength to print the error to
  the log in case of a reply to master (since this string isn't actually
  trasmitted to the master), now changed that block to print a similar log
  message to indicate an error being sent from the master to the slave.
  note that the slave is marked as CLIENT_SLAVE only after PSYNC was received,
  so this will not cause any harm for REPLCONF, and will only indicate problems
  that are gonna corrupt the replication stream anyway.
- two places were c->reply was emptied, and i wanted to reset sentlen
  this is a precaution (i did not actually see such a problem), since a
  non-zero sentlen will cause corruption to be transmitted on the socket.
This commit is contained in:
Oran Agra 2018-07-14 16:49:15 +03:00
parent cefe21d28a
commit d55598988b
3 changed files with 18 additions and 9 deletions

View File

@ -342,11 +342,13 @@ void addReplyErrorLength(client *c, const char *s, size_t len) {
if (!len || s[0] != '-') addReplyString(c,"-ERR ",5); if (!len || s[0] != '-') addReplyString(c,"-ERR ",5);
addReplyString(c,s,len); addReplyString(c,s,len);
addReplyString(c,"\r\n",2); addReplyString(c,"\r\n",2);
if (c->flags & CLIENT_MASTER) { if (c->flags & (CLIENT_MASTER|CLIENT_SLAVE)) {
char* to = c->flags & CLIENT_MASTER? "master": "slave";
char* from = c->flags & CLIENT_MASTER? "slave": "master";
char *cmdname = c->lastcmd ? c->lastcmd->name : "<unknown>"; char *cmdname = c->lastcmd ? c->lastcmd->name : "<unknown>";
serverLog(LL_WARNING,"== CRITICAL == This slave is sending an error " serverLog(LL_WARNING,"== CRITICAL == This %s is sending an error "
"to its master: '%s' after processing the command " "to its %s: '%s' after processing the command "
"'%s'", s, cmdname); "'%s'", from, to, s, cmdname);
} }
} }
@ -608,6 +610,7 @@ void addReplySubcommandSyntaxError(client *c) {
* destination client. */ * destination client. */
void copyClientOutputBuffer(client *dst, client *src) { void copyClientOutputBuffer(client *dst, client *src) {
listRelease(dst->reply); listRelease(dst->reply);
dst->sentlen = 0;
dst->reply = listDup(src->reply); dst->reply = listDup(src->reply);
memcpy(dst->buf,src->buf,src->bufpos); memcpy(dst->buf,src->buf,src->bufpos);
dst->bufpos = src->bufpos; dst->bufpos = src->bufpos;
@ -1094,7 +1097,7 @@ void resetClient(client *c) {
* with the error and close the connection. */ * with the error and close the connection. */
int processInlineBuffer(client *c) { int processInlineBuffer(client *c) {
char *newline; char *newline;
int argc, j; int argc, j, linefeed_chars = 1;
sds *argv, aux; sds *argv, aux;
size_t querylen; size_t querylen;
@ -1112,7 +1115,7 @@ int processInlineBuffer(client *c) {
/* Handle the \r\n case. */ /* Handle the \r\n case. */
if (newline && newline != c->querybuf && *(newline-1) == '\r') if (newline && newline != c->querybuf && *(newline-1) == '\r')
newline--; newline--, linefeed_chars++;
/* Split the input buffer up to the \r\n */ /* Split the input buffer up to the \r\n */
querylen = newline-(c->querybuf); querylen = newline-(c->querybuf);
@ -1132,7 +1135,7 @@ int processInlineBuffer(client *c) {
c->repl_ack_time = server.unixtime; c->repl_ack_time = server.unixtime;
/* Leave data after the first line of the query in the buffer */ /* Leave data after the first line of the query in the buffer */
sdsrange(c->querybuf,querylen+2,-1); sdsrange(c->querybuf,querylen+linefeed_chars,-1);
/* Setup argv array on client structure */ /* Setup argv array on client structure */
if (argc) { if (argc) {

View File

@ -2148,6 +2148,7 @@ void replicationCacheMaster(client *c) {
server.master->read_reploff = server.master->reploff; server.master->read_reploff = server.master->reploff;
if (c->flags & CLIENT_MULTI) discardTransaction(c); if (c->flags & CLIENT_MULTI) discardTransaction(c);
listEmpty(c->reply); listEmpty(c->reply);
c->sentlen = 0;
c->reply_bytes = 0; c->reply_bytes = 0;
c->bufpos = 0; c->bufpos = 0;
resetClient(c); resetClient(c);

View File

@ -2449,8 +2449,13 @@ int processCommand(client *c) {
c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr); c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);
if (!c->cmd) { if (!c->cmd) {
flagTransaction(c); flagTransaction(c);
addReplyErrorFormat(c,"unknown command '%s'", sds args = sdsempty();
(char*)c->argv[0]->ptr); int i;
for (i=1; i < c->argc && sdslen(args) < 128; i++)
args = sdscatprintf(args, "`%.*s`, ", 128-(int)sdslen(args), (char*)c->argv[i]->ptr);
addReplyErrorFormat(c,"unknown command `%s`, with args beginning with: %s",
(char*)c->argv[0]->ptr, args);
sdsfree(args);
return C_OK; return C_OK;
} else if ((c->cmd->arity > 0 && c->cmd->arity != c->argc) || } else if ((c->cmd->arity > 0 && c->cmd->arity != c->argc) ||
(c->argc < -c->cmd->arity)) { (c->argc < -c->cmd->arity)) {