Diskless replica: a few aesthetic changes to replication.c.

This commit is contained in:
antirez 2019-07-08 18:32:47 +02:00
parent d984732b35
commit 81b18fa3a0
2 changed files with 96 additions and 42 deletions

View File

@ -1127,7 +1127,8 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) {
ssize_t nread, readlen, nwritten;
int use_diskless_load;
redisDb *diskless_load_backup = NULL;
int empty_db_flags = server.repl_slave_lazy_flush ? EMPTYDB_ASYNC : EMPTYDB_NO_FLAGS;
int empty_db_flags = server.repl_slave_lazy_flush ? EMPTYDB_ASYNC :
EMPTYDB_NO_FLAGS;
int i;
off_t left;
UNUSED(el);
@ -1199,8 +1200,8 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) {
use_diskless_load = useDisklessLoad();
if (!use_diskless_load) {
/* read the data from the socket, store it to a file and search for the EOF */
/* Read the data from the socket, store it to a file and search
* for the EOF. */
if (usemark) {
readlen = sizeof(buf);
} else {
@ -1222,20 +1223,28 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) {
int eof_reached = 0;
if (usemark) {
/* Update the last bytes array, and check if it matches our delimiter.*/
/* Update the last bytes array, and check if it matches our
* delimiter. */
if (nread >= CONFIG_RUN_ID_SIZE) {
memcpy(lastbytes,buf+nread-CONFIG_RUN_ID_SIZE,CONFIG_RUN_ID_SIZE);
memcpy(lastbytes,buf+nread-CONFIG_RUN_ID_SIZE,
CONFIG_RUN_ID_SIZE);
} else {
int rem = CONFIG_RUN_ID_SIZE-nread;
memmove(lastbytes,lastbytes+nread,rem);
memcpy(lastbytes+rem,buf,nread);
}
if (memcmp(lastbytes,eofmark,CONFIG_RUN_ID_SIZE) == 0) eof_reached = 1;
if (memcmp(lastbytes,eofmark,CONFIG_RUN_ID_SIZE) == 0)
eof_reached = 1;
}
/* Update the last I/O time for the replication transfer (used in
* order to detect timeouts during replication), and write what we
* got from the socket to the dump file on disk. */
server.repl_transfer_lastio = server.unixtime;
if ((nwritten = write(server.repl_transfer_fd,buf,nread)) != nread) {
serverLog(LL_WARNING,"Write error or short write writing to the DB dump file needed for MASTER <-> REPLICA synchronization: %s",
serverLog(LL_WARNING,
"Write error or short write writing to the DB dump file "
"needed for MASTER <-> REPLICA synchronization: %s",
(nwritten == -1) ? strerror(errno) : "short write");
goto error;
}
@ -1246,14 +1255,16 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) {
if (ftruncate(server.repl_transfer_fd,
server.repl_transfer_read - CONFIG_RUN_ID_SIZE) == -1)
{
serverLog(LL_WARNING,"Error truncating the RDB file received from the master for SYNC: %s", strerror(errno));
serverLog(LL_WARNING,
"Error truncating the RDB file received from the master "
"for SYNC: %s", strerror(errno));
goto error;
}
}
/* Sync data on disk from time to time, otherwise at the end of the transfer
* we may suffer a big delay as the memory buffers are copied into the
* actual disk. */
/* Sync data on disk from time to time, otherwise at the end of the
* transfer we may suffer a big delay as the memory buffers are copied
* into the actual disk. */
if (server.repl_transfer_read >=
server.repl_transfer_last_fsync_off + REPL_MAX_WRITTEN_BEFORE_FSYNC)
{
@ -1269,19 +1280,34 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) {
if (server.repl_transfer_read == server.repl_transfer_size)
eof_reached = 1;
}
if (!eof_reached)
return;
/* If the transfer is yet not complete, we need to read more, so
* return ASAP and wait for the handler to be called again. */
if (!eof_reached) return;
}
/* We reach here when the slave is using diskless replication,
* or when we are done reading from the socket to the rdb file. */
/* We reach this point in one of the following cases:
*
* 1. The replica is using diskless replication, that is, it reads data
* directly from the socket to the Redis memory, without using
* a temporary RDB file on disk. In that case we just block and
* read everything from the socket.
*
* 2. Or when we are done reading from the socket to the RDB file, in
* such case we want just to read the RDB file in memory. */
serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Flushing old data");
/* We need to stop any AOFRW fork before flusing and parsing
* RDB, otherwise we'll create a copy-on-write disaster. */
/* We need to stop any AOF rewriting child before flusing and parsing
* the RDB, otherwise we'll create a copy-on-write disaster. */
if (server.aof_state != AOF_OFF) stopAppendOnly();
signalFlushedDb(-1);
if (use_diskless_load && server.repl_diskless_load == REPL_DISKLESS_LOAD_SWAPDB) {
/* create a backup of the current db */
/* When diskless RDB loading is used by replicas, it may be configured
* in order to save the current DB instead of throwing it away,
* so that we can restore it in case of failed transfer. */
if (use_diskless_load &&
server.repl_diskless_load == REPL_DISKLESS_LOAD_SWAPDB)
{
diskless_load_backup = zmalloc(sizeof(redisDb)*server.dbnum);
for (i=0; i<server.dbnum; i++) {
diskless_load_backup[i] = server.db[i];
@ -1291,6 +1317,7 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) {
} else {
emptyDb(-1,empty_db_flags,replicationEmptyDbCallback);
}
/* Before loading the DB into memory we need to delete the readable
* handler, otherwise it will get called recursively since
* rdbLoad() will call the event loop to process events from time to
@ -1301,21 +1328,25 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) {
if (use_diskless_load) {
rio rdb;
rioInitWithFd(&rdb,fd,server.repl_transfer_size);
/* Put the socket in blocking mode to simplify RDB transfer.
* We'll restore it when the RDB is received. */
anetBlock(NULL,fd);
anetRecvTimeout(NULL,fd,server.repl_timeout*1000);
startLoading(server.repl_transfer_size);
if (rdbLoadRio(&rdb,&rsi,0) != C_OK) {
/* rdbloading failed */
/* RDB loading failed. */
stopLoading();
serverLog(LL_WARNING,"Failed trying to load the MASTER synchronization DB from socket");
serverLog(LL_WARNING,
"Failed trying to load the MASTER synchronization DB "
"from socket");
cancelReplicationHandshake();
rioFreeFd(&rdb, NULL);
if (server.repl_diskless_load == REPL_DISKLESS_LOAD_SWAPDB) {
/* restore the backed up db */
emptyDbGeneric(server.db,-1,empty_db_flags,replicationEmptyDbCallback);
/* Restore the backed up databases. */
emptyDbGeneric(server.db,-1,empty_db_flags,
replicationEmptyDbCallback);
for (i=0; i<server.dbnum; i++) {
dictRelease(server.db[i].dict);
dictRelease(server.db[i].expires);
@ -1323,35 +1354,47 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) {
}
zfree(diskless_load_backup);
} else {
/* Remove the half-loaded data */
/* Remove the half-loaded data in case we started with
* an empty replica. */
emptyDb(-1,empty_db_flags,replicationEmptyDbCallback);
}
/* Note that there's no point in restarting the AOF on sync failure,
it'll be restarted when sync succeeds or slave promoted. */
/* Note that there's no point in restarting the AOF on SYNC
* failure, it'll be restarted when sync succeeds or the replica
* gets promoted. */
return;
}
stopLoading();
/* rdbloading succeeded */
/* RDB loading succeeded if we reach this point. */
if (server.repl_diskless_load == REPL_DISKLESS_LOAD_SWAPDB) {
/* delete the backup db that we created before starting to load the new rdb */
emptyDbGeneric(diskless_load_backup,-1,empty_db_flags,replicationEmptyDbCallback);
/* Delete the backup databases we created before starting to load
* the new RDB. Now the RDB was loaded with success so the old
* data is useless. */
emptyDbGeneric(diskless_load_backup,-1,empty_db_flags,
replicationEmptyDbCallback);
for (i=0; i<server.dbnum; i++) {
dictRelease(diskless_load_backup[i].dict);
dictRelease(diskless_load_backup[i].expires);
}
zfree(diskless_load_backup);
}
/* Verify the end mark is correct. */
if (usemark) {
if (!rioRead(&rdb,buf,CONFIG_RUN_ID_SIZE) || memcmp(buf,eofmark,CONFIG_RUN_ID_SIZE) != 0) {
if (!rioRead(&rdb,buf,CONFIG_RUN_ID_SIZE) ||
memcmp(buf,eofmark,CONFIG_RUN_ID_SIZE) != 0)
{
serverLog(LL_WARNING,"Replication stream EOF marker is broken");
cancelReplicationHandshake();
rioFreeFd(&rdb, NULL);
return;
}
}
/* get the unread command stream from the rio buffer */
/* Cleanup and restore the socket to the original state to continue
* with the normal replication. */
rioFreeFd(&rdb, NULL);
/* Restore the socket as non-blocking. */
anetNonBlock(NULL,fd);
anetRecvTimeout(NULL,fd,0);
} else {
@ -1367,40 +1410,49 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) {
}
if (rename(server.repl_transfer_tmpfile,server.rdb_filename) == -1) {
serverLog(LL_WARNING,"Failed trying to rename the temp DB into %s in MASTER <-> REPLICA synchronization: %s",
serverLog(LL_WARNING,
"Failed trying to rename the temp DB into %s in "
"MASTER <-> REPLICA synchronization: %s",
server.rdb_filename, strerror(errno));
cancelReplicationHandshake();
return;
}
if (rdbLoad(server.rdb_filename,&rsi) != C_OK) {
serverLog(LL_WARNING,"Failed trying to load the MASTER synchronization DB from disk");
serverLog(LL_WARNING,
"Failed trying to load the MASTER synchronization "
"DB from disk");
cancelReplicationHandshake();
/* Note that there's no point in restarting the AOF on sync failure,
it'll be restarted when sync succeeds or slave promoted. */
it'll be restarted when sync succeeds or replica promoted. */
return;
}
/* Cleanup. */
zfree(server.repl_transfer_tmpfile);
close(server.repl_transfer_fd);
server.repl_transfer_fd = -1;
server.repl_transfer_tmpfile = NULL;
}
/* Final setup of the connected slave <- master link */
replicationCreateMasterClient(server.repl_transfer_s,rsi.repl_stream_db);
server.repl_state = REPL_STATE_CONNECTED;
server.repl_down_since = 0;
/* After a full resynchroniziation we use the replication ID and
* offset of the master. The secondary ID / offset are cleared since
* we are starting a new history. */
memcpy(server.replid,server.master->replid,sizeof(server.replid));
server.master_repl_offset = server.master->reploff;
clearReplicationId2();
/* Let's create the replication backlog if needed. Slaves need to
* accumulate the backlog regardless of the fact they have sub-slaves
* or not, in order to behave correctly if they are promoted to
* masters after a failover. */
if (server.repl_backlog == NULL) createReplicationBacklog();
serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Finished with success");
/* Restart the AOF subsystem now that we finished the sync. This
* will trigger an AOF rewrite, and when done will start appending
* to the new file. */

View File

@ -173,13 +173,13 @@ static size_t rioFdRead(rio *r, void *buf, size_t len) {
/* if the buffer is too small for the entire request: realloc */
if (sdslen(r->io.fd.buf) + sdsavail(r->io.fd.buf) < len)
r->io.fd.buf = sdsMakeRoomFor(r->io.fd.buf, len - sdslen(r->io.fd.buf));
/* if the remaining unused buffer is not large enough: memmove so that we can read the rest */
if (len > avail && sdsavail(r->io.fd.buf) < len - avail) {
sdsrange(r->io.fd.buf, r->io.fd.pos, -1);
r->io.fd.pos = 0;
}
/* if we don't already have all the data in the sds, read more */
while (len > sdslen(r->io.fd.buf) - r->io.fd.pos) {
size_t buffered = sdslen(r->io.fd.buf) - r->io.fd.pos;
@ -251,8 +251,10 @@ void rioInitWithFd(rio *r, int fd, size_t read_limit) {
/* release the rio stream.
* optionally returns the unread buffered data. */
void rioFreeFd(rio *r, sds* out_remainingBufferedData) {
if(out_remainingBufferedData && (size_t)r->io.fd.pos < sdslen(r->io.fd.buf)) {
void rioFreeFd(rio *r, sds *out_remainingBufferedData) {
if (out_remainingBufferedData &&
(size_t)r->io.fd.pos < sdslen(r->io.fd.buf))
{
if (r->io.fd.pos > 0)
sdsrange(r->io.fd.buf, r->io.fd.pos, -1);
*out_remainingBufferedData = r->io.fd.buf;
@ -264,7 +266,7 @@ void rioFreeFd(rio *r, sds* out_remainingBufferedData) {
r->io.fd.buf = NULL;
}
/* ------------------- File descriptors set implementation ------------------- */
/* ------------------- File descriptors set implementation ------------------ */
/* Returns 1 or 0 for success/failure.
* The function returns success as long as we are able to correctly write