Diskless SYNC: fix RDB EOF detection.

RDB EOF detection was relying on the final part of the RDB transfer to
be a magic 40 bytes EOF marker. However as the slave is put online
immediately, and because of sockets timeouts, the replication stream is
actually contiguous with the RDB file.

This means that to detect the EOF correctly we should either:

1) Scan all the stream searching for the mark. Sucks CPU-wise.
2) Start to send the replication stream only after an acknowledge.
3) Implement a proper chunked encoding.

For now solution "2" was picked, so the master does not start to send
ASAP the stream of commands in the case of diskless replication. We wait
for the first REPLCONF ACK command from the slave, that certifies us
that the slave correctly loaded the RDB file and is ready to get more
data.
This commit is contained in:
antirez 2014-11-11 17:12:12 +01:00
parent f5c6ebbfe3
commit bb7fea0d5c
3 changed files with 21 additions and 4 deletions

View File

@ -100,6 +100,7 @@ redisClient *createClient(int fd) {
c->ctime = c->lastinteraction = server.unixtime;
c->authenticated = 0;
c->replstate = REDIS_REPL_NONE;
c->repl_put_online_on_ack = 0;
c->reploff = 0;
c->repl_ack_off = 0;
c->repl_ack_time = 0;

View File

@ -532,6 +532,7 @@ typedef struct redisClient {
int flags; /* REDIS_SLAVE | REDIS_MONITOR | REDIS_MULTI ... */
int authenticated; /* when requirepass is non-NULL */
int replstate; /* replication state if this is a slave */
int repl_put_online_on_ack; /* Install slave write handler on ACK. */
int repldbfd; /* replication DB file descriptor */
off_t repldboff; /* replication DB file offset */
off_t repldbsize; /* replication DB file size */

View File

@ -40,6 +40,7 @@
void replicationDiscardCachedMaster(void);
void replicationResurrectCachedMaster(int newfd);
void replicationSendAck(void);
void putSlaveOnline(redisClient *slave);
/* --------------------------- Utility functions ---------------------------- */
@ -398,6 +399,7 @@ int masterTryPartialResynchronization(redisClient *c) {
c->flags |= REDIS_SLAVE;
c->replstate = REDIS_REPL_ONLINE;
c->repl_ack_time = server.unixtime;
c->repl_put_online_on_ack = 0;
listAddNodeTail(server.slaves,c);
/* We can't use the connection buffers since they are used to accumulate
* new commands at this stage. But we are sure the socket send buffer is
@ -623,6 +625,11 @@ void replconfCommand(redisClient *c) {
if (offset > c->repl_ack_off)
c->repl_ack_off = offset;
c->repl_ack_time = server.unixtime;
/* If this was a diskless replication, we need to really put
* the slave online when the first ACK is received (which
* confirms slave is online and ready to get more data). */
if (c->repl_put_online_on_ack && c->replstate == REDIS_REPL_ONLINE)
putSlaveOnline(c);
/* Note: this command does not reply anything! */
return;
} else if (!strcasecmp(c->argv[j]->ptr,"getack")) {
@ -652,6 +659,7 @@ void replconfCommand(redisClient *c) {
* 3) Update the count of good slaves. */
void putSlaveOnline(redisClient *slave) {
slave->replstate = REDIS_REPL_ONLINE;
slave->repl_put_online_on_ack = 0;
slave->repl_ack_time = server.unixtime;
if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE,
sendReplyToClient, slave) == AE_ERR) {
@ -660,6 +668,8 @@ void putSlaveOnline(redisClient *slave) {
return;
}
refreshGoodSlavesCount();
redisLog(REDIS_NOTICE,"Synchronization with slave %s succeeded",
replicationGetSlaveName(slave));
}
void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) {
@ -713,7 +723,6 @@ void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) {
slave->repldbfd = -1;
aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
putSlaveOnline(slave);
redisLog(REDIS_NOTICE,"Synchronization with slave succeeded (disk)");
}
}
@ -752,10 +761,16 @@ void updateSlavesWaitingBgsave(int bgsaveerr, int type) {
* diskless replication, our work is trivial, we can just put
* the slave online. */
if (type == REDIS_RDB_CHILD_TYPE_SOCKET) {
putSlaveOnline(slave);
redisLog(REDIS_NOTICE,
"Synchronization with slave %s succeeded (socket)",
"Streamed RDB transfer with slave %s succeeded (socket). Waiting for REPLCONF ACK from slave to enable streaming",
replicationGetSlaveName(slave));
/* Note: we wait for a REPLCONF ACK message from slave in
* order to really put it online (install the write handler
* so that the accumulated data can be transfered). However
* we change the replication state ASAP, since our slave
* is technically online now. */
slave->replstate = REDIS_REPL_ONLINE;
slave->repl_put_online_on_ack = 1;
} else {
if (bgsaveerr != REDIS_OK) {
freeClient(slave);
@ -929,7 +944,7 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) {
int eof_reached = 0;
if (usemark) {
/* Update the last bytes array, and check if it matches our delimiter. */
/* Update the last bytes array, and check if it matches our delimiter.*/
if (nread >= REDIS_RUN_ID_SIZE) {
memcpy(lastbytes,buf+nread-REDIS_RUN_ID_SIZE,REDIS_RUN_ID_SIZE);
} else {