From 89ffba9133bc78f6c797d248f22335da490d20d0 Mon Sep 17 00:00:00 2001 From: antirez Date: Mon, 12 Aug 2013 10:29:14 +0200 Subject: [PATCH] Replication: better way to send a preamble before RDB payload. During the replication full resynchronization process, the RDB file is transfered from the master to the slave. However there is a short preamble to send, that is currently just the bulk payload length of the file in the usual Redis form $..length... This preamble used to be sent with a direct write call, assuming that there was alway room in the socket output buffer to hold the few bytes needed, however this does not scale in case we'll need to send more stuff, and is not very robust code in general. This commit introduces a more general mechanism to send a preamble up to 2GB in size (the max length of an sds string) in a non blocking way. --- src/networking.c | 6 ++++-- src/redis.h | 1 + src/replication.c | 34 +++++++++++++++++++++------------- 3 files changed, 26 insertions(+), 15 deletions(-) diff --git a/src/networking.c b/src/networking.c index c131c9c67..d128646f1 100644 --- a/src/networking.c +++ b/src/networking.c @@ -701,8 +701,10 @@ void freeClient(redisClient *c) { /* Master/slave cleanup. * Case 1: we lost the connection with a slave. */ if (c->flags & REDIS_SLAVE) { - if (c->replstate == REDIS_REPL_SEND_BULK && c->repldbfd != -1) - close(c->repldbfd); + if (c->replstate == REDIS_REPL_SEND_BULK) { + if (c->repldbfd != -1) close(c->repldbfd); + if (c->replpreamble) sdsfree(c->replpreamble); + } list *l = (c->flags & REDIS_MONITOR) ? server.monitors : server.slaves; ln = listSearchKey(l,c); redisAssert(ln != NULL); diff --git a/src/redis.h b/src/redis.h index 058fa7d32..b5241b3fd 100644 --- a/src/redis.h +++ b/src/redis.h @@ -470,6 +470,7 @@ typedef struct redisClient { int repldbfd; /* replication DB file descriptor */ long repldboff; /* replication DB file offset */ off_t repldbsize; /* replication DB file size */ + sds replpreamble; /* replication DB preamble. */ long long reploff; /* replication offset if this is our master */ long long repl_ack_off; /* replication ack offset, if this is a slave */ long long repl_ack_time;/* replication ack time, if this is a slave */ diff --git a/src/replication.c b/src/replication.c index 2d7ee809d..d9e26039c 100644 --- a/src/replication.c +++ b/src/replication.c @@ -628,23 +628,28 @@ void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) { char buf[REDIS_IOBUF_LEN]; ssize_t nwritten, buflen; - if (slave->repldboff == 0) { - /* Write the bulk write count before to transfer the DB. In theory here - * we don't know how much room there is in the output buffer of the - * socket, but in practice SO_SNDLOWAT (the minimum count for output - * operations) will never be smaller than the few bytes we need. */ - sds bulkcount; - - bulkcount = sdscatprintf(sdsempty(),"$%lld\r\n",(unsigned long long) - slave->repldbsize); - if (write(fd,bulkcount,sdslen(bulkcount)) != (signed)sdslen(bulkcount)) - { - sdsfree(bulkcount); + /* Before sending the RDB file, we send the preamble as configured by the + * replication process. Currently the preamble is just the bulk count of + * the file in the form "$\r\n". */ + if (slave->replpreamble) { + nwritten = write(fd,slave->replpreamble,sdslen(slave->replpreamble)); + if (nwritten == -1) { + redisLog(REDIS_VERBOSE,"Write error sending RDB preamble to slave: %s", + strerror(errno)); freeClient(slave); return; } - sdsfree(bulkcount); + sdsrange(slave->replpreamble,nwritten,-1); + if (sdslen(slave->replpreamble) == 0) { + sdsfree(slave->replpreamble); + slave->replpreamble = NULL; + /* fall through sending data. */ + } else { + return; + } } + + /* If the preamble was already transfered, send the RDB bulk data. */ lseek(slave->repldbfd,slave->repldboff,SEEK_SET); buflen = read(slave->repldbfd,buf,REDIS_IOBUF_LEN); if (buflen <= 0) { @@ -711,6 +716,9 @@ void updateSlavesWaitingBgsave(int bgsaveerr) { slave->repldboff = 0; slave->repldbsize = buf.st_size; slave->replstate = REDIS_REPL_SEND_BULK; + slave->replpreamble = sdscatprintf(sdsempty(),"$%lld\r\n", + (unsigned long long) slave->repldbsize); + aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE); if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave) == AE_ERR) { freeClient(slave);