PR 2813 fix ported to unstable.

This commit is contained in:
antirez 2015-10-15 10:20:09 +02:00
parent 35a0c772b5
commit ed6228851c
2 changed files with 25 additions and 20 deletions

View File

@ -909,6 +909,13 @@ void updateSlavesWaitingBgsave(int bgsaveerr, int type) {
/* ----------------------------------- SLAVE -------------------------------- */ /* ----------------------------------- SLAVE -------------------------------- */
/* Returns 1 if the given replication state is a handshake state,
* 0 otherwise. */
int slaveIsInHandshakeState(void) {
return server.repl_state >= REPL_STATE_RECEIVE_PONG &&
server.repl_state <= REPL_STATE_RECEIVE_PSYNC;
}
/* Abort the async download of the bulk dataset while SYNC-ing with master */ /* Abort the async download of the bulk dataset while SYNC-ing with master */
void replicationAbortSyncTransfer(void) { void replicationAbortSyncTransfer(void) {
serverAssert(server.repl_state == REPL_STATE_TRANSFER); serverAssert(server.repl_state == REPL_STATE_TRANSFER);
@ -1206,6 +1213,7 @@ char *sendSynchronousCommand(int flags, int fd, ...) {
return sdscatprintf(sdsempty(),"-Reading from master: %s", return sdscatprintf(sdsempty(),"-Reading from master: %s",
strerror(errno)); strerror(errno));
} }
server.repl_transfer_lastio = server.unixtime;
return sdsnew(buf); return sdsnew(buf);
} }
return NULL; return NULL;
@ -1636,7 +1644,7 @@ void undoConnectWithMaster(void) {
int fd = server.repl_transfer_s; int fd = server.repl_transfer_s;
serverAssert(server.repl_state == REPL_STATE_CONNECTING || serverAssert(server.repl_state == REPL_STATE_CONNECTING ||
server.repl_state == REPL_STATE_RECEIVE_PONG); slaveIsInHandshakeState());
aeDeleteFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE); aeDeleteFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE);
close(fd); close(fd);
server.repl_transfer_s = -1; server.repl_transfer_s = -1;
@ -1655,7 +1663,7 @@ int cancelReplicationHandshake(void) {
if (server.repl_state == REPL_STATE_TRANSFER) { if (server.repl_state == REPL_STATE_TRANSFER) {
replicationAbortSyncTransfer(); replicationAbortSyncTransfer();
} else if (server.repl_state == REPL_STATE_CONNECTING || } else if (server.repl_state == REPL_STATE_CONNECTING ||
server.repl_state == REPL_STATE_RECEIVE_PONG) slaveIsInHandshakeState())
{ {
undoConnectWithMaster(); undoConnectWithMaster();
} else { } else {
@ -1790,22 +1798,17 @@ void roleCommand(client *c) {
addReplyBulkCBuffer(c,"slave",5); addReplyBulkCBuffer(c,"slave",5);
addReplyBulkCString(c,server.masterhost); addReplyBulkCString(c,server.masterhost);
addReplyLongLong(c,server.masterport); addReplyLongLong(c,server.masterport);
switch(server.repl_state) { if (slaveIsInHandshakeState()) {
case REPL_STATE_NONE: slavestate = "none"; break; slavestate = "handshake";
case REPL_STATE_CONNECT: slavestate = "connect"; break; } else {
case REPL_STATE_CONNECTING: slavestate = "connecting"; break; switch(server.repl_state) {
case REPL_STATE_RECEIVE_PONG: case REPL_STATE_NONE: slavestate = "none"; break;
case REPL_STATE_SEND_AUTH: case REPL_STATE_CONNECT: slavestate = "connect"; break;
case REPL_STATE_RECEIVE_AUTH: case REPL_STATE_CONNECTING: slavestate = "connecting"; break;
case REPL_STATE_SEND_PORT: case REPL_STATE_TRANSFER: slavestate = "sync"; break;
case REPL_STATE_RECEIVE_PORT: case REPL_STATE_CONNECTED: slavestate = "connected"; break;
case REPL_STATE_SEND_CAPA: default: slavestate = "unknown"; break;
case REPL_STATE_RECEIVE_CAPA: }
case REPL_STATE_SEND_PSYNC:
case REPL_STATE_RECEIVE_PSYNC: slavestate = "handshake"; break;
case REPL_STATE_TRANSFER: slavestate = "sync"; break;
case REPL_STATE_CONNECTED: slavestate = "connected"; break;
default: slavestate = "unknown"; break;
} }
addReplyBulkCString(c,slavestate); addReplyBulkCString(c,slavestate);
addReplyLongLong(c,server.master ? server.master->reploff : -1); addReplyLongLong(c,server.master ? server.master->reploff : -1);
@ -2182,8 +2185,8 @@ void replicationCron(void) {
/* Non blocking connection timeout? */ /* Non blocking connection timeout? */
if (server.masterhost && if (server.masterhost &&
(server.repl_state == REPL_STATE_CONNECTING || (server.repl_state == REPL_STATE_CONNECTING ||
server.repl_state == REPL_STATE_RECEIVE_PONG) && slaveIsInHandshakeState()) &&
(time(NULL)-server.repl_transfer_lastio) > server.repl_timeout) (time(NULL)-server.repl_transfer_lastio) > server.repl_timeout)
{ {
serverLog(LL_WARNING,"Timeout connecting to the MASTER..."); serverLog(LL_WARNING,"Timeout connecting to the MASTER...");
undoConnectWithMaster(); undoConnectWithMaster();

View File

@ -271,6 +271,7 @@ typedef long long mstime_t; /* millisecond time type. */
#define REPL_STATE_NONE 0 /* No active replication */ #define REPL_STATE_NONE 0 /* No active replication */
#define REPL_STATE_CONNECT 1 /* Must connect to master */ #define REPL_STATE_CONNECT 1 /* Must connect to master */
#define REPL_STATE_CONNECTING 2 /* Connecting to master */ #define REPL_STATE_CONNECTING 2 /* Connecting to master */
/* --- Handshake states, must be ordered --- */
#define REPL_STATE_RECEIVE_PONG 3 /* Wait for PING reply */ #define REPL_STATE_RECEIVE_PONG 3 /* Wait for PING reply */
#define REPL_STATE_SEND_AUTH 4 /* Send AUTH to master */ #define REPL_STATE_SEND_AUTH 4 /* Send AUTH to master */
#define REPL_STATE_RECEIVE_AUTH 5 /* Wait for AUTH reply */ #define REPL_STATE_RECEIVE_AUTH 5 /* Wait for AUTH reply */
@ -280,6 +281,7 @@ typedef long long mstime_t; /* millisecond time type. */
#define REPL_STATE_RECEIVE_CAPA 9 /* Wait for REPLCONF reply */ #define REPL_STATE_RECEIVE_CAPA 9 /* Wait for REPLCONF reply */
#define REPL_STATE_SEND_PSYNC 10 /* Send PSYNC */ #define REPL_STATE_SEND_PSYNC 10 /* Send PSYNC */
#define REPL_STATE_RECEIVE_PSYNC 11 /* Wait for PSYNC reply */ #define REPL_STATE_RECEIVE_PSYNC 11 /* Wait for PSYNC reply */
/* --- End of handshake states --- */
#define REPL_STATE_TRANSFER 12 /* Receiving .rdb from master */ #define REPL_STATE_TRANSFER 12 /* Receiving .rdb from master */
#define REPL_STATE_CONNECTED 13 /* Connected to master */ #define REPL_STATE_CONNECTED 13 /* Connected to master */