Merge branch 'aaslave'

This commit is contained in:
antirez 2010-11-05 10:59:49 +01:00
commit 3b5e72d402
6 changed files with 206 additions and 66 deletions

View File

@ -110,6 +110,19 @@ dir ./
#
# masterauth <master-password>
# When a slave lost the connection with the master, or when the replication
# is still in progress, the slave can act in two different ways:
#
# 1) if slave-serve-stale-data is set to 'yes' (the default) the slave will
# still reply to client requests, possibly with out of data data, or the
# data set may just be empty if this is the first synchronization.
#
# 2) if slave-serve-stale data is set to 'no' the slave will reply with
# an error "SYNC with master in progress" to all the kind of commands
# but to INFO and SLAVEOF.
#
slave-serve-stale-data yes
################################## SECURITY ###################################
# Require clients to issue AUTH <PASSWORD> before processing any other

View File

@ -152,6 +152,10 @@ void loadServerConfig(char *filename) {
server.replstate = REDIS_REPL_CONNECT;
} else if (!strcasecmp(argv[0],"masterauth") && argc == 2) {
server.masterauth = zstrdup(argv[1]);
} else if (!strcasecmp(argv[0],"slave-serve-stale-data") && argc == 2) {
if ((server.repl_serve_stale_data = yesnotoi(argv[1])) == -1) {
err = "argument must be 'yes' or 'no'"; goto loaderr;
}
} else if (!strcasecmp(argv[0],"glueoutputbuf") && argc == 2) {
if ((server.glueoutputbuf = yesnotoi(argv[1])) == -1) {
err = "argument must be 'yes' or 'no'"; goto loaderr;
@ -379,6 +383,11 @@ void configSetCommand(redisClient *c) {
appendServerSaveParams(seconds, changes);
}
sdsfreesplitres(v,vlen);
} else if (!strcasecmp(c->argv[2]->ptr,"slave-serve-stale-data")) {
int yn = yesnotoi(o->ptr);
if (yn == -1) goto badfmt;
server.repl_serve_stale_data = yn;
} else {
addReplyErrorFormat(c,"Unsupported CONFIG parameter: %s",
(char*)c->argv[2]->ptr);
@ -488,6 +497,11 @@ void configGetCommand(redisClient *c) {
sdsfree(buf);
matches++;
}
if (stringmatch(pattern,"slave-serve-stale-data",0)) {
addReplyBulkCString(c,"slave-serve-stale-data");
addReplyBulkCString(c,server.repl_serve_stale_data ? "yes" : "no");
matches++;
}
setDeferredMultiBulkLength(c,replylen,matches*2);
}

View File

@ -467,6 +467,7 @@ void freeClient(redisClient *c) {
/* Case 2: we lost the connection with the master. */
if (c->flags & REDIS_MASTER) {
server.master = NULL;
/* FIXME */
server.replstate = REDIS_REPL_CONNECT;
/* Since we lost the connection with the master, we should also
* close the connection with all our slaves if we have any, so

View File

@ -633,14 +633,10 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
}
}
/* Check if we should connect to a MASTER */
if (server.replstate == REDIS_REPL_CONNECT && !(loops % 10)) {
redisLog(REDIS_NOTICE,"Connecting to MASTER...");
if (syncWithMaster() == REDIS_OK) {
redisLog(REDIS_NOTICE,"MASTER <-> SLAVE sync succeeded");
if (server.appendonly) rewriteAppendOnlyFileBackground();
}
}
/* Replication cron function -- used to reconnect to master and
* to detect transfer failures. */
if (!(loops % 10)) replicationCron();
return 100;
}
@ -790,6 +786,7 @@ void initServerConfig() {
server.masterport = 6379;
server.master = NULL;
server.replstate = REDIS_REPL_NONE;
server.repl_serve_stale_data = 1;
/* Double constants initialization */
R_Zero = 0.0;
@ -998,6 +995,17 @@ int processCommand(redisClient *c) {
return REDIS_OK;
}
/* Only allow INFO and SLAVEOF when slave-serve-stale-data is no and
* we are a slave with a broken link with master. */
if (server.masterhost && server.replstate != REDIS_REPL_CONNECTED &&
server.repl_serve_stale_data == 0 &&
cmd->proc != infoCommand && cmd->proc != slaveofCommand)
{
addReplyError(c,
"link with MASTER is down and slave-serve-stale-data is set to no");
return REDIS_OK;
}
/* Exec the command */
if (c->flags & REDIS_MULTI &&
cmd->proc != execCommand && cmd->proc != discardCommand &&
@ -1187,12 +1195,23 @@ sds genRedisInfoString(void) {
"master_port:%d\r\n"
"master_link_status:%s\r\n"
"master_last_io_seconds_ago:%d\r\n"
"master_sync_in_progress:%d\r\n"
,server.masterhost,
server.masterport,
(server.replstate == REDIS_REPL_CONNECTED) ?
"up" : "down",
server.master ? ((int)(time(NULL)-server.master->lastinteraction)) : -1
server.master ? ((int)(time(NULL)-server.master->lastinteraction)) : -1,
server.replstate == REDIS_REPL_TRANSFER
);
if (server.replstate == REDIS_REPL_TRANSFER) {
info = sdscatprintf(info,
"master_sync_left_bytes:%ld\r\n"
"master_sync_last_io_seconds_ago:%d\r\n"
,(long)server.repl_transfer_left,
(int)(time(NULL)-server.repl_transfer_lastio)
);
}
}
if (server.vm_enabled) {
lockThreadedIO();

View File

@ -152,7 +152,8 @@
/* Slave replication state - slave side */
#define REDIS_REPL_NONE 0 /* No active replication */
#define REDIS_REPL_CONNECT 1 /* Must connect to master */
#define REDIS_REPL_CONNECTED 2 /* Connected to master */
#define REDIS_REPL_TRANSFER 2 /* Receiving .rdb from master */
#define REDIS_REPL_CONNECTED 3 /* Connected to master */
/* Slave replication state - from the point of view of master
* Note that in SEND_BULK and ONLINE state the slave receives new updates
@ -360,7 +361,8 @@ struct redisServer {
long long dirty_before_bgsave; /* used to restore dirty on failed BGSAVE */
list *clients;
dict *commands; /* Command table hahs table */
struct redisCommand *delCommand, *multiCommand; /* often lookedup cmds */
/* Fast pointers to often looked up command */
struct redisCommand *delCommand, *multiCommand;
list *slaves, *monitors;
char neterr[ANET_ERR_LEN];
aeEventLoop *el;
@ -401,15 +403,24 @@ struct redisServer {
int activerehashing;
/* Replication related */
int isslave;
/* Slave specific fields */
char *masterauth;
char *masterhost;
int masterport;
redisClient *master; /* client that is master for this slave */
int replstate;
int replstate; /* replication status if the instance is a slave */
off_t repl_transfer_left; /* bytes left reading .rdb */
int repl_transfer_s; /* slave -> master SYNC socket */
int repl_transfer_fd; /* slave -> master SYNC temp file descriptor */
char *repl_transfer_tmpfile; /* slave-> master SYNC temp file name */
time_t repl_transfer_lastio; /* unix time of the latest read, for timeout */
int repl_serve_stale_data; /* Serve stale data when link is down? */
/* Limits */
unsigned int maxclients;
unsigned long long maxmemory;
int maxmemory_policy;
int maxmemory_samples;
/* Blocked clients */
unsigned int blpop_blocked_clients;
unsigned int vm_blocked_clients;
/* Sort parameters - qsort_r() is only available under BSD so we
@ -713,6 +724,7 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc);
void replicationFeedMonitors(list *monitors, int dictid, robj **argv, int argc);
int syncWithMaster(void);
void updateSlavesWaitingBgsave(int bgsaveerr);
void replicationCron(void);
/* RDB persistence */
int rdbLoad(char *filename);

View File

@ -5,6 +5,8 @@
#include <fcntl.h>
#include <sys/stat.h>
/* ---------------------------------- MASTER -------------------------------- */
void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
listNode *ln;
listIter li;
@ -288,9 +290,101 @@ void updateSlavesWaitingBgsave(int bgsaveerr) {
}
}
/* ----------------------------------- SLAVE -------------------------------- */
/* Abort the async download of the bulk dataset while SYNC-ing with master */
void replicationAbortSyncTransfer(void) {
redisAssert(server.replstate == REDIS_REPL_TRANSFER);
aeDeleteFileEvent(server.el,server.repl_transfer_s,AE_READABLE);
close(server.repl_transfer_s);
close(server.repl_transfer_fd);
unlink(server.repl_transfer_tmpfile);
zfree(server.repl_transfer_tmpfile);
server.replstate = REDIS_REPL_CONNECT;
}
/* Asynchronously read the SYNC payload we receive from a master */
void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) {
char buf[4096];
ssize_t nread, readlen;
REDIS_NOTUSED(el);
REDIS_NOTUSED(privdata);
REDIS_NOTUSED(mask);
/* If repl_transfer_left == -1 we still have to read the bulk length
* from the master reply. */
if (server.repl_transfer_left == -1) {
if (syncReadLine(fd,buf,1024,3600) == -1) {
redisLog(REDIS_WARNING,
"I/O error reading bulk count from MASTER: %s",
strerror(errno));
replicationAbortSyncTransfer();
return;
}
if (buf[0] == '-') {
redisLog(REDIS_WARNING,
"MASTER aborted replication with an error: %s",
buf+1);
replicationAbortSyncTransfer();
return;
} else if (buf[0] != '$') {
redisLog(REDIS_WARNING,"Bad protocol from MASTER, the first byte is not '$', are you sure the host and port are right?");
replicationAbortSyncTransfer();
return;
}
server.repl_transfer_left = strtol(buf+1,NULL,10);
redisLog(REDIS_NOTICE,
"MASTER <-> SLAVE sync: receiving %ld bytes from master",
server.repl_transfer_left);
return;
}
/* Read bulk data */
readlen = (server.repl_transfer_left < (signed)sizeof(buf)) ?
server.repl_transfer_left : (signed)sizeof(buf);
nread = read(fd,buf,readlen);
if (nread <= 0) {
redisLog(REDIS_WARNING,"I/O error trying to sync with MASTER: %s",
(nread == -1) ? strerror(errno) : "connection lost");
replicationAbortSyncTransfer();
return;
}
server.repl_transfer_lastio = time(NULL);
if (write(server.repl_transfer_fd,buf,nread) != nread) {
redisLog(REDIS_WARNING,"Write error or short write writing to the DB dump file needed for MASTER <-> SLAVE synchrnonization: %s", strerror(errno));
replicationAbortSyncTransfer();
return;
}
server.repl_transfer_left -= nread;
/* Check if the transfer is now complete */
if (server.repl_transfer_left == 0) {
if (rename(server.repl_transfer_tmpfile,server.dbfilename) == -1) {
redisLog(REDIS_WARNING,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno));
replicationAbortSyncTransfer();
return;
}
redisLog(REDIS_NOTICE, "MASTER <-> SLAVE sync: Loading DB in memory");
emptyDb();
if (rdbLoad(server.dbfilename) != REDIS_OK) {
redisLog(REDIS_WARNING,"Failed trying to load the MASTER synchronization DB from disk");
replicationAbortSyncTransfer();
return;
}
/* Final setup of the connected slave <- master link */
aeDeleteFileEvent(server.el,server.repl_transfer_s,AE_READABLE);
zfree(server.repl_transfer_tmpfile);
close(server.repl_transfer_fd);
server.master = createClient(server.repl_transfer_s);
server.master->flags |= REDIS_MASTER;
server.master->authenticated = 1;
server.replstate = REDIS_REPL_CONNECTED;
redisLog(REDIS_NOTICE, "MASTER <-> SLAVE sync: Finished with success");
}
}
int syncWithMaster(void) {
char buf[1024], tmpfile[256], authcmd[1024];
long dumpsize;
int fd = anetTcpConnect(NULL,server.masterhost,server.masterport);
int dfd, maxtries = 5;
@ -330,26 +424,8 @@ int syncWithMaster(void) {
strerror(errno));
return REDIS_ERR;
}
/* Read the bulk write count */
if (syncReadLine(fd,buf,1024,3600) == -1) {
close(fd);
redisLog(REDIS_WARNING,"I/O error reading bulk count from MASTER: %s",
strerror(errno));
return REDIS_ERR;
}
if (buf[0] == '-') {
close(fd);
redisLog(REDIS_WARNING,"MASTER aborted replication with an error: %s",
buf+1);
return REDIS_ERR;
} else if (buf[0] != '$') {
close(fd);
redisLog(REDIS_WARNING,"Bad protocol from MASTER, the first byte is not '$', are you sure the host and port are right?");
return REDIS_ERR;
}
dumpsize = strtol(buf+1,NULL,10);
redisLog(REDIS_NOTICE,"Receiving %ld bytes data dump from MASTER",dumpsize);
/* Read the bulk write data on a temp file */
/* Prepare a suitable temp file for bulk transfer */
while(maxtries--) {
snprintf(tmpfile,256,
"temp-%d.%ld.rdb",(int)time(NULL),(long int)getpid());
@ -362,43 +438,21 @@ int syncWithMaster(void) {
redisLog(REDIS_WARNING,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno));
return REDIS_ERR;
}
while(dumpsize) {
int nread, nwritten;
nread = read(fd,buf,(dumpsize < 1024)?dumpsize:1024);
if (nread <= 0) {
redisLog(REDIS_WARNING,"I/O error trying to sync with MASTER: %s",
(nread == -1) ? strerror(errno) : "connection lost");
close(fd);
close(dfd);
return REDIS_ERR;
}
nwritten = write(dfd,buf,nread);
if (nwritten == -1) {
redisLog(REDIS_WARNING,"Write error writing to the DB dump file needed for MASTER <-> SLAVE synchrnonization: %s", strerror(errno));
close(fd);
close(dfd);
return REDIS_ERR;
}
dumpsize -= nread;
}
close(dfd);
if (rename(tmpfile,server.dbfilename) == -1) {
redisLog(REDIS_WARNING,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno));
unlink(tmpfile);
/* Setup the non blocking download of the bulk file. */
if (aeCreateFileEvent(server.el, fd, AE_READABLE, readSyncBulkPayload, NULL)
== AE_ERR)
{
close(fd);
redisLog(REDIS_WARNING,"Can't create readable event for SYNC");
return REDIS_ERR;
}
emptyDb();
if (rdbLoad(server.dbfilename) != REDIS_OK) {
redisLog(REDIS_WARNING,"Failed trying to load the MASTER synchronization DB from disk");
close(fd);
return REDIS_ERR;
}
server.master = createClient(fd);
server.master->flags |= REDIS_MASTER;
server.master->authenticated = 1;
server.replstate = REDIS_REPL_CONNECTED;
server.replstate = REDIS_REPL_TRANSFER;
server.repl_transfer_left = -1;
server.repl_transfer_s = fd;
server.repl_transfer_fd = dfd;
server.repl_transfer_lastio = time(NULL);
server.repl_transfer_tmpfile = zstrdup(tmpfile);
return REDIS_OK;
}
@ -409,6 +463,8 @@ void slaveofCommand(redisClient *c) {
sdsfree(server.masterhost);
server.masterhost = NULL;
if (server.master) freeClient(server.master);
if (server.replstate == REDIS_REPL_TRANSFER)
replicationAbortSyncTransfer();
server.replstate = REDIS_REPL_NONE;
redisLog(REDIS_NOTICE,"MASTER MODE enabled (user request)");
}
@ -417,9 +473,34 @@ void slaveofCommand(redisClient *c) {
server.masterhost = sdsdup(c->argv[1]->ptr);
server.masterport = atoi(c->argv[2]->ptr);
if (server.master) freeClient(server.master);
if (server.replstate == REDIS_REPL_TRANSFER)
replicationAbortSyncTransfer();
server.replstate = REDIS_REPL_CONNECT;
redisLog(REDIS_NOTICE,"SLAVE OF %s:%d enabled (user request)",
server.masterhost, server.masterport);
}
addReply(c,shared.ok);
}
/* --------------------------- REPLICATION CRON ---------------------------- */
#define REDIS_REPL_TRANSFER_TIMEOUT 60
void replicationCron(void) {
/* Bulk transfer I/O timeout? */
if (server.masterhost && server.replstate == REDIS_REPL_TRANSFER &&
(time(NULL)-server.repl_transfer_lastio) > REDIS_REPL_TRANSFER_TIMEOUT)
{
redisLog(REDIS_WARNING,"Timeout receiving bulk data from MASTER...");
replicationAbortSyncTransfer();
}
/* Check if we should connect to a MASTER */
if (server.replstate == REDIS_REPL_CONNECT) {
redisLog(REDIS_NOTICE,"Connecting to MASTER...");
if (syncWithMaster() == REDIS_OK) {
redisLog(REDIS_NOTICE,"MASTER <-> SLAVE sync started: SYNC sent");
if (server.appendonly) rewriteAppendOnlyFileBackground();
}
}
}