Replication: better way to send a preamble before RDB payload.

During the replication full resynchronization process, the RDB file is
transfered from the master to the slave. However there is a short
preamble to send, that is currently just the bulk payload length of the
file in the usual Redis form $..length..<CR><LF>.

This preamble used to be sent with a direct write call, assuming that
there was alway room in the socket output buffer to hold the few bytes
needed, however this does not scale in case we'll need to send more
stuff, and is not very robust code in general.

This commit introduces a more general mechanism to send a preamble up to
2GB in size (the max length of an sds string) in a non blocking way.
This commit is contained in:
antirez 2013-08-12 10:29:14 +02:00
parent db862e8ef0
commit 89ffba9133
3 changed files with 26 additions and 15 deletions

View File

@ -701,8 +701,10 @@ void freeClient(redisClient *c) {
/* Master/slave cleanup. /* Master/slave cleanup.
* Case 1: we lost the connection with a slave. */ * Case 1: we lost the connection with a slave. */
if (c->flags & REDIS_SLAVE) { if (c->flags & REDIS_SLAVE) {
if (c->replstate == REDIS_REPL_SEND_BULK && c->repldbfd != -1) if (c->replstate == REDIS_REPL_SEND_BULK) {
close(c->repldbfd); if (c->repldbfd != -1) close(c->repldbfd);
if (c->replpreamble) sdsfree(c->replpreamble);
}
list *l = (c->flags & REDIS_MONITOR) ? server.monitors : server.slaves; list *l = (c->flags & REDIS_MONITOR) ? server.monitors : server.slaves;
ln = listSearchKey(l,c); ln = listSearchKey(l,c);
redisAssert(ln != NULL); redisAssert(ln != NULL);

View File

@ -470,6 +470,7 @@ typedef struct redisClient {
int repldbfd; /* replication DB file descriptor */ int repldbfd; /* replication DB file descriptor */
long repldboff; /* replication DB file offset */ long repldboff; /* replication DB file offset */
off_t repldbsize; /* replication DB file size */ off_t repldbsize; /* replication DB file size */
sds replpreamble; /* replication DB preamble. */
long long reploff; /* replication offset if this is our master */ long long reploff; /* replication offset if this is our master */
long long repl_ack_off; /* replication ack offset, if this is a slave */ long long repl_ack_off; /* replication ack offset, if this is a slave */
long long repl_ack_time;/* replication ack time, if this is a slave */ long long repl_ack_time;/* replication ack time, if this is a slave */

View File

@ -628,23 +628,28 @@ void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) {
char buf[REDIS_IOBUF_LEN]; char buf[REDIS_IOBUF_LEN];
ssize_t nwritten, buflen; ssize_t nwritten, buflen;
if (slave->repldboff == 0) { /* Before sending the RDB file, we send the preamble as configured by the
/* Write the bulk write count before to transfer the DB. In theory here * replication process. Currently the preamble is just the bulk count of
* we don't know how much room there is in the output buffer of the * the file in the form "$<length>\r\n". */
* socket, but in practice SO_SNDLOWAT (the minimum count for output if (slave->replpreamble) {
* operations) will never be smaller than the few bytes we need. */ nwritten = write(fd,slave->replpreamble,sdslen(slave->replpreamble));
sds bulkcount; if (nwritten == -1) {
redisLog(REDIS_VERBOSE,"Write error sending RDB preamble to slave: %s",
bulkcount = sdscatprintf(sdsempty(),"$%lld\r\n",(unsigned long long) strerror(errno));
slave->repldbsize);
if (write(fd,bulkcount,sdslen(bulkcount)) != (signed)sdslen(bulkcount))
{
sdsfree(bulkcount);
freeClient(slave); freeClient(slave);
return; return;
} }
sdsfree(bulkcount); sdsrange(slave->replpreamble,nwritten,-1);
if (sdslen(slave->replpreamble) == 0) {
sdsfree(slave->replpreamble);
slave->replpreamble = NULL;
/* fall through sending data. */
} else {
return;
}
} }
/* If the preamble was already transfered, send the RDB bulk data. */
lseek(slave->repldbfd,slave->repldboff,SEEK_SET); lseek(slave->repldbfd,slave->repldboff,SEEK_SET);
buflen = read(slave->repldbfd,buf,REDIS_IOBUF_LEN); buflen = read(slave->repldbfd,buf,REDIS_IOBUF_LEN);
if (buflen <= 0) { if (buflen <= 0) {
@ -711,6 +716,9 @@ void updateSlavesWaitingBgsave(int bgsaveerr) {
slave->repldboff = 0; slave->repldboff = 0;
slave->repldbsize = buf.st_size; slave->repldbsize = buf.st_size;
slave->replstate = REDIS_REPL_SEND_BULK; slave->replstate = REDIS_REPL_SEND_BULK;
slave->replpreamble = sdscatprintf(sdsempty(),"$%lld\r\n",
(unsigned long long) slave->repldbsize);
aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE); aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave) == AE_ERR) { if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave) == AE_ERR) {
freeClient(slave); freeClient(slave);