Merge branch 'psync2' into unstable

This commit is contained in:
antirez 2016-11-17 09:37:03 +01:00
commit 8fb3ad2444
10 changed files with 468 additions and 143 deletions

View File

@ -402,6 +402,10 @@ repl-disable-tcp-nodelay no
# need to elapse, starting from the time the last slave disconnected, for
# the backlog buffer to be freed.
#
# Note that slaves never free the backlog for timeout, since they may be
# promoted to masters later, and should be able to correctly "partially
# resynchronize" with the slaves: hence they should always accumulate backlog.
#
# A value of 0 means to never release the backlog.
#
# repl-backlog-ttl 3600

View File

@ -653,7 +653,7 @@ int loadAppendOnlyFile(char *filename) {
serverLog(LL_NOTICE,"Reading RDB preamble from AOF file...");
if (fseek(fp,0,SEEK_SET) == -1) goto readerr;
rioInitWithFile(&rdb,fp);
if (rdbLoadRio(&rdb) != C_OK) {
if (rdbLoadRio(&rdb,NULL) != C_OK) {
serverLog(LL_WARNING,"Error reading the RDB preamble of the AOF file, AOF loading aborted");
goto readerr;
} else {
@ -1152,7 +1152,7 @@ int rewriteAppendOnlyFile(char *filename) {
if (server.aof_use_rdb_preamble) {
int error;
if (rdbSaveRio(&aof,&error,RDB_SAVE_AOF_PREAMBLE) == C_ERR) {
if (rdbSaveRio(&aof,&error,RDB_SAVE_AOF_PREAMBLE,NULL) == C_ERR) {
errno = error;
goto werr;
}

View File

@ -413,7 +413,7 @@ void flushallCommand(client *c) {
/* Normally rdbSave() will reset dirty, but we don't want this here
* as otherwise FLUSHALL will not be replicated nor put into the AOF. */
int saved_dirty = server.dirty;
rdbSave(server.rdb_filename);
rdbSave(server.rdb_filename,NULL);
server.dirty = saved_dirty;
}
server.dirty++;

View File

@ -320,12 +320,12 @@ void debugCommand(client *c) {
if (c->argc >= 3) c->argv[2] = tryObjectEncoding(c->argv[2]);
serverAssertWithInfo(c,c->argv[0],1 == 2);
} else if (!strcasecmp(c->argv[1]->ptr,"reload")) {
if (rdbSave(server.rdb_filename) != C_OK) {
if (rdbSave(server.rdb_filename,NULL) != C_OK) {
addReply(c,shared.err);
return;
}
emptyDb(-1,EMPTYDB_NO_FLAGS,NULL);
if (rdbLoad(server.rdb_filename) != C_OK) {
if (rdbLoad(server.rdb_filename,NULL) != C_OK) {
addReplyError(c,"Error trying to load the RDB dump");
return;
}

View File

@ -352,6 +352,14 @@ void addReplySds(client *c, sds s) {
}
}
/* This low level function just adds whatever protocol you send it to the
* client buffer, trying the static buffer initially, and using the string
* of objects if not possible.
*
* It is efficient because does not create an SDS object nor an Redis object
* if not needed. The object will only be created by calling
* _addReplyStringToList() if we fail to extend the existing tail object
* in the list of objects. */
void addReplyString(client *c, const char *s, size_t len) {
if (prepareClientToWrite(c) != C_OK) return;
if (_addReplyToBuffer(c,s,len) != C_OK)
@ -1022,7 +1030,7 @@ int processInlineBuffer(client *c) {
char *newline;
int argc, j;
sds *argv, aux;
size_t querylen;
size_t querylen, protolen;
/* Search for end of line */
newline = strchr(c->querybuf,'\n');
@ -1035,6 +1043,7 @@ int processInlineBuffer(client *c) {
}
return C_ERR;
}
protolen = (newline - c->querybuf)+1; /* Total protocol bytes of command. */
/* Handle the \r\n case. */
if (newline && newline != c->querybuf && *(newline-1) == '\r')
@ -1057,6 +1066,15 @@ int processInlineBuffer(client *c) {
if (querylen == 0 && c->flags & CLIENT_SLAVE)
c->repl_ack_time = server.unixtime;
/* Newline from masters can be used to prevent timeouts, but should
* not affect the replication offset since they are always sent
* "out of band" directly writing to the socket and without passing
* from the output buffers. */
if (querylen == 0 && c->flags & CLIENT_MASTER) {
c->reploff -= protolen;
while (protolen--) chopReplicationBacklog();
}
/* Leave data after the first line of the query in the buffer */
sdsrange(c->querybuf,querylen+2,-1);
@ -1321,7 +1339,11 @@ void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
sdsIncrLen(c->querybuf,nread);
c->lastinteraction = server.unixtime;
if (c->flags & CLIENT_MASTER) c->reploff += nread;
if (c->flags & CLIENT_MASTER) {
c->reploff += nread;
replicationFeedSlavesFromMasterStream(server.slaves,
c->querybuf+qblen,nread);
}
server.stat_net_input_bytes += nread;
if (sdslen(c->querybuf) > server.client_max_querybuf_len) {
sds ci = catClientInfoString(sdsempty(),c), bytes = sdsempty();

View File

@ -835,7 +835,7 @@ int rdbSaveAuxFieldStrInt(rio *rdb, char *key, long long val) {
}
/* Save a few default AUX fields with information about the RDB generated. */
int rdbSaveInfoAuxFields(rio *rdb, int flags) {
int rdbSaveInfoAuxFields(rio *rdb, int flags, rdbSaveInfo *rsi) {
int redis_bits = (sizeof(void*) == 8) ? 64 : 32;
int aof_preamble = (flags & RDB_SAVE_AOF_PREAMBLE) != 0;
@ -844,7 +844,19 @@ int rdbSaveInfoAuxFields(rio *rdb, int flags) {
if (rdbSaveAuxFieldStrInt(rdb,"redis-bits",redis_bits) == -1) return -1;
if (rdbSaveAuxFieldStrInt(rdb,"ctime",time(NULL)) == -1) return -1;
if (rdbSaveAuxFieldStrInt(rdb,"used-mem",zmalloc_used_memory()) == -1) return -1;
/* Handle saving options that generate aux fields. */
if (rsi) {
if (rsi->repl_stream_db &&
rdbSaveAuxFieldStrInt(rdb,"repl-stream-db",rsi->repl_stream_db)
== -1)
{
return -1;
}
}
if (rdbSaveAuxFieldStrInt(rdb,"aof-preamble",aof_preamble) == -1) return -1;
if (rdbSaveAuxFieldStrStr(rdb,"repl-id",server.replid) == -1) return -1;
if (rdbSaveAuxFieldStrInt(rdb,"repl-offset",server.master_repl_offset) == -1) return -1;
return 1;
}
@ -856,7 +868,7 @@ int rdbSaveInfoAuxFields(rio *rdb, int flags) {
* When the function returns C_ERR and if 'error' is not NULL, the
* integer pointed by 'error' is set to the value of errno just after the I/O
* error. */
int rdbSaveRio(rio *rdb, int *error, int flags) {
int rdbSaveRio(rio *rdb, int *error, int flags, rdbSaveInfo *rsi) {
dictIterator *di = NULL;
dictEntry *de;
char magic[10];
@ -869,7 +881,7 @@ int rdbSaveRio(rio *rdb, int *error, int flags) {
rdb->update_cksum = rioGenericUpdateChecksum;
snprintf(magic,sizeof(magic),"REDIS%04d",RDB_VERSION);
if (rdbWriteRaw(rdb,magic,9) == -1) goto werr;
if (rdbSaveInfoAuxFields(rdb,flags) == -1) goto werr;
if (rdbSaveInfoAuxFields(rdb,flags,rsi) == -1) goto werr;
for (j = 0; j < server.dbnum; j++) {
redisDb *db = server.db+j;
@ -945,7 +957,7 @@ werr:
* While the suffix is the 40 bytes hex string we announced in the prefix.
* This way processes receiving the payload can understand when it ends
* without doing any processing of the content. */
int rdbSaveRioWithEOFMark(rio *rdb, int *error) {
int rdbSaveRioWithEOFMark(rio *rdb, int *error, rdbSaveInfo *rsi) {
char eofmark[RDB_EOF_MARK_SIZE];
getRandomHexChars(eofmark,RDB_EOF_MARK_SIZE);
@ -953,7 +965,7 @@ int rdbSaveRioWithEOFMark(rio *rdb, int *error) {
if (rioWrite(rdb,"$EOF:",5) == 0) goto werr;
if (rioWrite(rdb,eofmark,RDB_EOF_MARK_SIZE) == 0) goto werr;
if (rioWrite(rdb,"\r\n",2) == 0) goto werr;
if (rdbSaveRio(rdb,error,RDB_SAVE_NONE) == C_ERR) goto werr;
if (rdbSaveRio(rdb,error,RDB_SAVE_NONE,rsi) == C_ERR) goto werr;
if (rioWrite(rdb,eofmark,RDB_EOF_MARK_SIZE) == 0) goto werr;
return C_OK;
@ -964,7 +976,7 @@ werr: /* Write error. */
}
/* Save the DB on disk. Return C_ERR on error, C_OK on success. */
int rdbSave(char *filename) {
int rdbSave(char *filename, rdbSaveInfo *rsi) {
char tmpfile[256];
char cwd[MAXPATHLEN]; /* Current working dir path for error messages. */
FILE *fp;
@ -985,7 +997,7 @@ int rdbSave(char *filename) {
}
rioInitWithFile(&rdb,fp);
if (rdbSaveRio(&rdb,&error,RDB_SAVE_NONE) == C_ERR) {
if (rdbSaveRio(&rdb,&error,RDB_SAVE_NONE,rsi) == C_ERR) {
errno = error;
goto werr;
}
@ -1023,7 +1035,7 @@ werr:
return C_ERR;
}
int rdbSaveBackground(char *filename) {
int rdbSaveBackground(char *filename, rdbSaveInfo *rsi) {
pid_t childpid;
long long start;
@ -1040,7 +1052,7 @@ int rdbSaveBackground(char *filename) {
/* Child */
closeListeningSockets(0);
redisSetProcTitle("redis-rdb-bgsave");
retval = rdbSave(filename);
retval = rdbSave(filename,rsi);
if (retval == C_OK) {
size_t private_dirty = zmalloc_get_private_dirty(-1);
@ -1410,7 +1422,7 @@ void rdbLoadProgressCallback(rio *r, const void *buf, size_t len) {
/* Load an RDB file from the rio stream 'rdb'. On success C_OK is returned,
* otherwise C_ERR is returned and 'errno' is set accordingly. */
int rdbLoadRio(rio *rdb) {
int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi) {
uint64_t dbid;
int type, rdbver;
redisDb *db = server.db+0;
@ -1501,6 +1513,15 @@ int rdbLoadRio(rio *rdb) {
serverLog(LL_NOTICE,"RDB '%s': %s",
(char*)auxkey->ptr,
(char*)auxval->ptr);
} else if (!strcasecmp(auxkey->ptr,"repl-stream-db")) {
if (rsi) rsi->repl_stream_db = atoi(auxval->ptr);
} else if (!strcasecmp(auxkey->ptr,"repl-id")) {
if (rsi && sdslen(auxval->ptr) == CONFIG_RUN_ID_SIZE) {
memcpy(rsi->repl_id,auxval->ptr,CONFIG_RUN_ID_SIZE+1);
rsi->repl_id_is_set = 1;
}
} else if (!strcasecmp(auxkey->ptr,"repl-offset")) {
if (rsi) rsi->repl_offset = strtoll(auxval->ptr,NULL,10);
} else {
/* We ignore fields we don't understand, as by AUX field
* contract. */
@ -1559,8 +1580,11 @@ eoferr: /* unexpected end of file is handled here with a fatal exit */
/* Like rdbLoadRio() but takes a filename instead of a rio stream. The
* filename is open for reading and a rio stream object created in order
* to do the actual loading. Moreover the ETA displayed in the INFO
* output is initialized and finalized. */
int rdbLoad(char *filename) {
* output is initialized and finalized.
*
* If you pass an 'rsi' structure initialied with RDB_SAVE_OPTION_INIT, the
* loading code will fiil the information fields in the structure. */
int rdbLoad(char *filename, rdbSaveInfo *rsi) {
FILE *fp;
rio rdb;
int retval;
@ -1568,7 +1592,7 @@ int rdbLoad(char *filename) {
if ((fp = fopen(filename,"r")) == NULL) return C_ERR;
startLoading(fp);
rioInitWithFile(&rdb,fp);
retval = rdbLoadRio(&rdb);
retval = rdbLoadRio(&rdb,rsi);
fclose(fp);
stopLoading();
return retval;
@ -1721,7 +1745,7 @@ void backgroundSaveDoneHandler(int exitcode, int bysignal) {
/* Spawn an RDB child that writes the RDB to the sockets of the slaves
* that are currently in SLAVE_STATE_WAIT_BGSAVE_START state. */
int rdbSaveToSlavesSockets(void) {
int rdbSaveToSlavesSockets(rdbSaveInfo *rsi) {
int *fds;
uint64_t *clientids;
int numfds;
@ -1779,7 +1803,7 @@ int rdbSaveToSlavesSockets(void) {
closeListeningSockets(0);
redisSetProcTitle("redis-rdb-to-slaves");
retval = rdbSaveRioWithEOFMark(&slave_sockets,NULL);
retval = rdbSaveRioWithEOFMark(&slave_sockets,NULL,rsi);
if (retval == C_OK && rioFlush(&slave_sockets) == 0)
retval = C_ERR;
@ -1884,7 +1908,7 @@ void saveCommand(client *c) {
addReplyError(c,"Background save already in progress");
return;
}
if (rdbSave(server.rdb_filename) == C_OK) {
if (rdbSave(server.rdb_filename,NULL) == C_OK) {
addReply(c,shared.ok);
} else {
addReply(c,shared.err);
@ -1918,7 +1942,7 @@ void bgsaveCommand(client *c) {
"Use BGSAVE SCHEDULE in order to schedule a BGSAVE whenver "
"possible.");
}
} else if (rdbSaveBackground(server.rdb_filename) == C_OK) {
} else if (rdbSaveBackground(server.rdb_filename,NULL) == C_OK) {
addReplyStatus(c,"Background saving started");
} else {
addReply(c,shared.err);

View File

@ -118,11 +118,11 @@ uint64_t rdbLoadLen(rio *rdb, int *isencoded);
int rdbLoadLenByRef(rio *rdb, int *isencoded, uint64_t *lenptr);
int rdbSaveObjectType(rio *rdb, robj *o);
int rdbLoadObjectType(rio *rdb);
int rdbLoad(char *filename);
int rdbSaveBackground(char *filename);
int rdbSaveToSlavesSockets(void);
int rdbLoad(char *filename, rdbSaveInfo *rsi);
int rdbSaveBackground(char *filename, rdbSaveInfo *rsi);
int rdbSaveToSlavesSockets(rdbSaveInfo *rsi);
void rdbRemoveTempFile(pid_t childpid);
int rdbSave(char *filename);
int rdbSave(char *filename, rdbSaveInfo *rsi);
ssize_t rdbSaveObject(rio *rdb, robj *o);
size_t rdbSavedObjectLen(robj *o);
robj *rdbLoadObject(int type, rio *rdb);
@ -136,6 +136,6 @@ int rdbSaveBinaryDoubleValue(rio *rdb, double val);
int rdbLoadBinaryDoubleValue(rio *rdb, double *val);
int rdbSaveBinaryFloatValue(rio *rdb, float val);
int rdbLoadBinaryFloatValue(rio *rdb, float *val);
int rdbLoadRio(rio *rdb);
int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi);
#endif

View File

@ -79,11 +79,6 @@ void createReplicationBacklog(void) {
server.repl_backlog = zmalloc(server.repl_backlog_size);
server.repl_backlog_histlen = 0;
server.repl_backlog_idx = 0;
/* When a new backlog buffer is created, we increment the replication
* offset by one to make sure we'll not be able to PSYNC with any
* previous slave. This is needed because we avoid incrementing the
* master_repl_offset if no backlog exists nor slaves are attached. */
server.master_repl_offset++;
/* We don't have any data inside our buffer, but virtually the first
* byte we have is the next byte that will be generated for the
@ -153,6 +148,22 @@ void feedReplicationBacklog(void *ptr, size_t len) {
server.repl_backlog_histlen + 1;
}
/* Remove the last byte from the replication backlog. This
* is useful when we receive an out of band "\n" to keep the connection
* alive but don't want to count it as replication stream.
*
* As a side effect this function adjusts the master replication offset
* of this instance to account for the missing byte. */
void chopReplicationBacklog(void) {
if (!server.repl_backlog || !server.repl_backlog_histlen) return;
if (server.repl_backlog_idx == 0)
server.repl_backlog_idx = server.repl_backlog_size-1;
else
server.repl_backlog_idx--;
server.master_repl_offset--;
server.repl_backlog_histlen--;
}
/* Wrapper for feedReplicationBacklog() that takes Redis string objects
* as input. */
void feedReplicationBacklogWithObject(robj *o) {
@ -170,12 +181,24 @@ void feedReplicationBacklogWithObject(robj *o) {
feedReplicationBacklog(p,len);
}
/* Propagate write commands to slaves, and populate the replication backlog
* as well. This function is used if the instance is a master: we use
* the commands received by our clients in order to create the replication
* stream. Instead if the instance is a slave and has sub-slaves attached,
* we use replicationFeedSlavesFromMaster() */
void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
listNode *ln;
listIter li;
int j, len;
char llstr[LONG_STR_SIZE];
/* If the instance is not a top level master, return ASAP: we'll just proxy
* the stream of data we receive from our master instead, in order to
* propagate *identical* replication stream. In this way this slave can
* advertise the same replication ID as the master (since it shares the
* master replication history and has the same backlog and offsets). */
if (server.masterhost != NULL) return;
/* If there aren't slaves, and there is no backlog buffer to populate,
* we can return ASAP. */
if (server.repl_backlog == NULL && listLength(slaves) == 0) return;
@ -265,6 +288,34 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
}
}
/* This function is used in order to proxy what we receive from our master
* to our sub-slaves. */
#include <ctype.h>
void replicationFeedSlavesFromMasterStream(list *slaves, char *buf, size_t buflen) {
listNode *ln;
listIter li;
/* Debugging: this is handy to see the stream sent from master
* to slaves. Disabled with if(0). */
if (0) {
printf("%zu:",buflen);
for (size_t j = 0; j < buflen; j++) {
printf("%c", isprint(buf[j]) ? buf[j] : '.');
}
printf("\n");
}
if (server.repl_backlog) feedReplicationBacklog(buf,buflen);
listRewind(slaves,&li);
while((ln = listNext(&li))) {
client *slave = ln->value;
/* Don't feed slaves that are still waiting for BGSAVE to start */
if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue;
addReplyString(slave,buf,buflen);
}
}
void replicationFeedMonitors(client *c, list *monitors, int dictid, robj **argv, int argc) {
listNode *ln;
listIter li;
@ -329,7 +380,7 @@ long long addReplyReplicationBacklog(client *c, long long offset) {
skip = offset - server.repl_backlog_off;
serverLog(LL_DEBUG, "[PSYNC] Skipping: %lld", skip);
/* Point j to the oldest byte, that is actaully our
/* Point j to the oldest byte, that is actually our
* server.repl_backlog_off byte. */
j = (server.repl_backlog_idx +
(server.repl_backlog_size-server.repl_backlog_histlen)) %
@ -361,18 +412,14 @@ long long addReplyReplicationBacklog(client *c, long long offset) {
* the BGSAVE process started and before executing any other command
* from clients. */
long long getPsyncInitialOffset(void) {
long long psync_offset = server.master_repl_offset;
/* Add 1 to psync_offset if it the replication backlog does not exists
* as when it will be created later we'll increment the offset by one. */
if (server.repl_backlog == NULL) psync_offset++;
return psync_offset;
return server.master_repl_offset;
}
/* Send a FULLRESYNC reply in the specific case of a full resynchronization,
* as a side effect setup the slave for a full sync in different ways:
*
* 1) Remember, into the slave client structure, the offset we sent
* here, so that if new slaves will later attach to the same
* 1) Remember, into the slave client structure, the replication offset
* we sent here, so that if new slaves will later attach to the same
* background RDB saving process (by duplicating this client output
* buffer), we can get the right offset from this slave.
* 2) Set the replication state of the slave to WAIT_BGSAVE_END so that
@ -392,14 +439,14 @@ int replicationSetupSlaveForFullResync(client *slave, long long offset) {
slave->replstate = SLAVE_STATE_WAIT_BGSAVE_END;
/* We are going to accumulate the incremental changes for this
* slave as well. Set slaveseldb to -1 in order to force to re-emit
* a SLEECT statement in the replication stream. */
* a SELECT statement in the replication stream. */
server.slaveseldb = -1;
/* Don't send this reply to slaves that approached us with
* the old SYNC command. */
if (!(slave->flags & CLIENT_PRE_PSYNC)) {
buflen = snprintf(buf,sizeof(buf),"+FULLRESYNC %s %lld\r\n",
server.runid,offset);
server.replid,offset);
if (write(slave->fd,buf,buflen) != buflen) {
freeClientAsync(slave);
return C_ERR;
@ -415,19 +462,32 @@ int replicationSetupSlaveForFullResync(client *slave, long long offset) {
* with the usual full resync. */
int masterTryPartialResynchronization(client *c) {
long long psync_offset, psync_len;
char *master_runid = c->argv[1]->ptr;
char *master_replid = c->argv[1]->ptr;
char buf[128];
int buflen;
/* Is the runid of this master the same advertised by the wannabe slave
* via PSYNC? If runid changed this master is a different instance and
* there is no way to continue. */
if (strcasecmp(master_runid, server.runid)) {
/* Parse the replication offset asked by the slave. Go to full sync
* on parse error: this should never happen but we try to handle
* it in a robust way compared to aborting. */
if (getLongLongFromObjectOrReply(c,c->argv[2],&psync_offset,NULL) !=
C_OK) goto need_full_resync;
/* Is the replication ID of this master the same advertised by the wannabe
* slave via PSYNC? If the replication ID changed this master has a
* different replication history, and there is no way to continue.
*
* Note that there are two potentially valid replication IDs: the ID1
* and the ID2. The ID2 however is only valid up to a specific offset. */
if (strcasecmp(master_replid, server.replid) &&
(strcasecmp(master_replid, server.replid2) ||
psync_offset > server.second_replid_offset))
{
/* Run id "?" is used by slaves that want to force a full resync. */
if (master_runid[0] != '?') {
if (master_replid[0] != '?') {
serverLog(LL_NOTICE,"Partial resynchronization not accepted: "
"Runid mismatch (Client asked for runid '%s', my runid is '%s')",
master_runid, server.runid);
"Replication ID mismatch (Slave asked for '%s', my replication "
"ID is '%s')",
master_replid, server.replid);
} else {
serverLog(LL_NOTICE,"Full resync requested by slave %s",
replicationGetSlaveName(c));
@ -436,8 +496,6 @@ int masterTryPartialResynchronization(client *c) {
}
/* We still have the data our slave is asking for? */
if (getLongLongFromObjectOrReply(c,c->argv[2],&psync_offset,NULL) !=
C_OK) goto need_full_resync;
if (!server.repl_backlog ||
psync_offset < server.repl_backlog_off ||
psync_offset > (server.repl_backlog_off + server.repl_backlog_histlen))
@ -463,7 +521,11 @@ int masterTryPartialResynchronization(client *c) {
/* We can't use the connection buffers since they are used to accumulate
* new commands at this stage. But we are sure the socket send buffer is
* empty so this write will never fail actually. */
buflen = snprintf(buf,sizeof(buf),"+CONTINUE\r\n");
if (c->slave_capa & SLAVE_CAPA_PSYNC2) {
buflen = snprintf(buf,sizeof(buf),"+CONTINUE %s\r\n", server.replid);
} else {
buflen = snprintf(buf,sizeof(buf),"+CONTINUE\r\n");
}
if (write(c->fd,buf,buflen) != buflen) {
freeClientAsync(c);
return C_OK;
@ -515,10 +577,18 @@ int startBgsaveForReplication(int mincapa) {
serverLog(LL_NOTICE,"Starting BGSAVE for SYNC with target: %s",
socket_target ? "slaves sockets" : "disk");
rdbSaveInfo rsi = RDB_SAVE_INFO_INIT;
/* If we are saving for a chained slave (that is, if we are,
* in turn, a slave of another instance), make sure after
* loadig the RDB, our slaves select the right DB: we'll just
* send the replication stream we receive from our master, so
* no way to send SELECT commands. */
if (server.master) rsi.repl_stream_db = server.master->db->id;
if (socket_target)
retval = rdbSaveToSlavesSockets();
retval = rdbSaveToSlavesSockets(&rsi);
else
retval = rdbSaveBackground(server.rdb_filename);
retval = rdbSaveBackground(server.rdb_filename,&rsi);
/* If we failed to BGSAVE, remove the slaves waiting for a full
* resynchorinization from the list of salves, inform them with
@ -589,22 +659,22 @@ void syncCommand(client *c) {
* when this happens masterTryPartialResynchronization() already
* replied with:
*
* +FULLRESYNC <runid> <offset>
* +FULLRESYNC <replid> <offset>
*
* So the slave knows the new runid and offset to try a PSYNC later
* So the slave knows the new replid and offset to try a PSYNC later
* if the connection with the master is lost. */
if (!strcasecmp(c->argv[0]->ptr,"psync")) {
if (masterTryPartialResynchronization(c) == C_OK) {
server.stat_sync_partial_ok++;
return; /* No full resync needed, return. */
} else {
char *master_runid = c->argv[1]->ptr;
char *master_replid = c->argv[1]->ptr;
/* Increment stats for failed PSYNCs, but only if the
* runid is not "?", as this is used by slaves to force a full
* replid is not "?", as this is used by slaves to force a full
* resync on purpose when they are not albe to partially
* resync. */
if (master_runid[0] != '?') server.stat_sync_partial_err++;
if (master_replid[0] != '?') server.stat_sync_partial_err++;
}
} else {
/* If a slave uses SYNC, we are dealing with an old implementation
@ -625,6 +695,16 @@ void syncCommand(client *c) {
c->flags |= CLIENT_SLAVE;
listAddNodeTail(server.slaves,c);
/* Create the replication backlog if needed. */
if (listLength(server.slaves) == 1 && server.repl_backlog == NULL) {
/* When we create the backlog from scratch, we always use a new
* replication ID and clear the ID2, since there is no valid
* past history. */
changeReplicationId();
clearReplicationId2();
createReplicationBacklog();
}
/* CASE 1: BGSAVE is in progress, with disk target. */
if (server.rdb_child_pid != -1 &&
server.rdb_child_type == RDB_CHILD_TYPE_DISK)
@ -685,9 +765,6 @@ void syncCommand(client *c) {
}
}
}
if (listLength(server.slaves) == 1 && server.repl_backlog == NULL)
createReplicationBacklog();
return;
}
@ -735,6 +812,8 @@ void replconfCommand(client *c) {
/* Ignore capabilities not understood by this master. */
if (!strcasecmp(c->argv[j+1]->ptr,"eof"))
c->slave_capa |= SLAVE_CAPA_EOF;
else if (!strcasecmp(c->argv[j+1]->ptr,"psync2"))
c->slave_capa |= SLAVE_CAPA_PSYNC2;
} else if (!strcasecmp(c->argv[j]->ptr,"ack")) {
/* REPLCONF ACK is used by slave to inform the master the amount
* of replication stream that it processed so far. It is an
@ -928,6 +1007,43 @@ void updateSlavesWaitingBgsave(int bgsaveerr, int type) {
if (startbgsave) startBgsaveForReplication(mincapa);
}
/* Change the current instance replication ID with a new, random one.
* This will prevent successful PSYNCs between this master and other
* slaves, so the command should be called when something happens that
* alters the current story of the dataset. */
void changeReplicationId(void) {
getRandomHexChars(server.replid,CONFIG_RUN_ID_SIZE);
server.replid[CONFIG_RUN_ID_SIZE] = '\0';
}
/* Clear (invalidate) the secondary replication ID. This happens, for
* example, after a full resynchronization, when we start a new replication
* history. */
void clearReplicationId2(void) {
memset(server.replid2,'0',sizeof(server.replid));
server.replid2[CONFIG_RUN_ID_SIZE] = '\0';
server.second_replid_offset = -1;
}
/* Use the current replication ID / offset as secondary replication
* ID, and change the current one in order to start a new history.
* This should be used when an instance is switched from slave to master
* so that it can serve PSYNC requests performed using the master
* replication ID. */
void shiftReplicationId(void) {
memcpy(server.replid2,server.replid,sizeof(server.replid));
/* We set the second replid offset to the master offset + 1, since
* the slave will ask for the first byte it has not yet received, so
* we need to add one to the offset: for example if, as a slave, we are
* sure we have the same history as the master for 50 bytes, after we
* are turned into a master, we can accept a PSYNC request with offset
* 51, since the slave asking has the same history up to the 50th
* byte, and is asking for the new bytes starting at offset 51. */
server.second_replid_offset = server.master_repl_offset+1;
changeReplicationId();
serverLog(LL_WARNING,"Setting secondary replication ID to %s, valid up to offset: %lld. New replication ID is %s", server.replid2, server.second_replid_offset, server.replid);
}
/* ----------------------------------- SLAVE -------------------------------- */
/* Returns 1 if the given replication state is a handshake state,
@ -965,18 +1081,18 @@ void replicationEmptyDbCallback(void *privdata) {
/* Once we have a link with the master and the synchroniziation was
* performed, this function materializes the master client we store
* at server.master, starting from the specified file descriptor. */
void replicationCreateMasterClient(int fd) {
void replicationCreateMasterClient(int fd, int dbid) {
server.master = createClient(fd);
server.master->flags |= CLIENT_MASTER;
server.master->authenticated = 1;
server.repl_state = REPL_STATE_CONNECTED;
server.master->reploff = server.repl_master_initial_offset;
memcpy(server.master->replrunid, server.repl_master_runid,
sizeof(server.repl_master_runid));
server.master->reploff = server.master_initial_offset;
memcpy(server.master->replid, server.master_replid,
sizeof(server.master_replid));
/* If master offset is set to -1, this master is old and is not
* PSYNC capable, so we flag it accordingly. */
if (server.master->reploff == -1)
server.master->flags |= CLIENT_PRE_PSYNC;
if (dbid != -1) selectDb(server.master,dbid);
}
/* Asynchronously read the SYNC payload we receive from a master */
@ -1137,7 +1253,8 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) {
* time for non blocking loading. */
aeDeleteFileEvent(server.el,server.repl_transfer_s,AE_READABLE);
serverLog(LL_NOTICE, "MASTER <-> SLAVE sync: Loading DB in memory");
if (rdbLoad(server.rdb_filename) != C_OK) {
rdbSaveInfo rsi = RDB_SAVE_INFO_INIT;
if (rdbLoad(server.rdb_filename,&rsi) != C_OK) {
serverLog(LL_WARNING,"Failed trying to load the MASTER synchronization DB from disk");
cancelReplicationHandshake();
return;
@ -1145,7 +1262,20 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) {
/* Final setup of the connected slave <- master link */
zfree(server.repl_transfer_tmpfile);
close(server.repl_transfer_fd);
replicationCreateMasterClient(server.repl_transfer_s);
replicationCreateMasterClient(server.repl_transfer_s,rsi.repl_stream_db);
server.repl_state = REPL_STATE_CONNECTED;
/* After a full resynchroniziation we use the replication ID and
* offset of the master. The secondary ID / offset are cleared since
* we are starting a new history. */
memcpy(server.replid,server.master->replid,sizeof(server.replid));
server.master_repl_offset = server.master->reploff;
clearReplicationId2();
/* Let's create the replication backlog if needed. Slaves need to
* accumulate the backlog regardless of the fact they have sub-slaves
* or not, in order to behave correctly if they are promoted to
* masters after a failover. */
if (server.repl_backlog == NULL) createReplicationBacklog();
serverLog(LL_NOTICE, "MASTER <-> SLAVE sync: Finished with success");
/* Restart the AOF subsystem now that we finished the sync. This
* will trigger an AOF rewrite, and when done will start appending
@ -1270,7 +1400,7 @@ char *sendSynchronousCommand(int flags, int fd, ...) {
*
* 1) As a side effect of the function call the function removes the readable
* event handler from "fd", unless the return value is PSYNC_WAIT_REPLY.
* 2) server.repl_master_initial_offset is set to the right value according
* 2) server.master_initial_offset is set to the right value according
* to the master reply. This will be used to populate the 'server.master'
* structure replication offset.
*/
@ -1281,31 +1411,31 @@ char *sendSynchronousCommand(int flags, int fd, ...) {
#define PSYNC_FULLRESYNC 3
#define PSYNC_NOT_SUPPORTED 4
int slaveTryPartialResynchronization(int fd, int read_reply) {
char *psync_runid;
char *psync_replid;
char psync_offset[32];
sds reply;
/* Writing half */
if (!read_reply) {
/* Initially set repl_master_initial_offset to -1 to mark the current
/* Initially set master_initial_offset to -1 to mark the current
* master run_id and offset as not valid. Later if we'll be able to do
* a FULL resync using the PSYNC command we'll set the offset at the
* right value, so that this information will be propagated to the
* client structure representing the master into server.master. */
server.repl_master_initial_offset = -1;
server.master_initial_offset = -1;
if (server.cached_master) {
psync_runid = server.cached_master->replrunid;
psync_replid = server.cached_master->replid;
snprintf(psync_offset,sizeof(psync_offset),"%lld", server.cached_master->reploff+1);
serverLog(LL_NOTICE,"Trying a partial resynchronization (request %s:%s).", psync_runid, psync_offset);
serverLog(LL_NOTICE,"Trying a partial resynchronization (request %s:%s).", psync_replid, psync_offset);
} else {
serverLog(LL_NOTICE,"Partial resynchronization not possible (no cached master)");
psync_runid = "?";
psync_replid = "?";
memcpy(psync_offset,"-1",3);
}
/* Issue the PSYNC command */
reply = sendSynchronousCommand(SYNC_CMD_WRITE,fd,"PSYNC",psync_runid,psync_offset,NULL);
reply = sendSynchronousCommand(SYNC_CMD_WRITE,fd,"PSYNC",psync_replid,psync_offset,NULL);
if (reply != NULL) {
serverLog(LL_WARNING,"Unable to send PSYNC to master: %s",reply);
sdsfree(reply);
@ -1327,31 +1457,31 @@ int slaveTryPartialResynchronization(int fd, int read_reply) {
aeDeleteFileEvent(server.el,fd,AE_READABLE);
if (!strncmp(reply,"+FULLRESYNC",11)) {
char *runid = NULL, *offset = NULL;
char *replid = NULL, *offset = NULL;
/* FULL RESYNC, parse the reply in order to extract the run id
* and the replication offset. */
runid = strchr(reply,' ');
if (runid) {
runid++;
offset = strchr(runid,' ');
replid = strchr(reply,' ');
if (replid) {
replid++;
offset = strchr(replid,' ');
if (offset) offset++;
}
if (!runid || !offset || (offset-runid-1) != CONFIG_RUN_ID_SIZE) {
if (!replid || !offset || (offset-replid-1) != CONFIG_RUN_ID_SIZE) {
serverLog(LL_WARNING,
"Master replied with wrong +FULLRESYNC syntax.");
/* This is an unexpected condition, actually the +FULLRESYNC
* reply means that the master supports PSYNC, but the reply
* format seems wrong. To stay safe we blank the master
* runid to make sure next PSYNCs will fail. */
memset(server.repl_master_runid,0,CONFIG_RUN_ID_SIZE+1);
* replid to make sure next PSYNCs will fail. */
memset(server.master_replid,0,CONFIG_RUN_ID_SIZE+1);
} else {
memcpy(server.repl_master_runid, runid, offset-runid-1);
server.repl_master_runid[CONFIG_RUN_ID_SIZE] = '\0';
server.repl_master_initial_offset = strtoll(offset,NULL,10);
memcpy(server.master_replid, replid, offset-replid-1);
server.master_replid[CONFIG_RUN_ID_SIZE] = '\0';
server.master_initial_offset = strtoll(offset,NULL,10);
serverLog(LL_NOTICE,"Full resync from master: %s:%lld",
server.repl_master_runid,
server.repl_master_initial_offset);
server.master_replid,
server.master_initial_offset);
}
/* We are going to full resync, discard the cached master structure. */
replicationDiscardCachedMaster();
@ -1360,9 +1490,40 @@ int slaveTryPartialResynchronization(int fd, int read_reply) {
}
if (!strncmp(reply,"+CONTINUE",9)) {
/* Partial resync was accepted, set the replication state accordingly */
/* Partial resync was accepted. */
serverLog(LL_NOTICE,
"Successful partial resynchronization with master.");
/* Check the new replication ID advertised by the master. If it
* changed, we need to set the new ID as primary ID, and set or
* secondary ID as the old master ID up to the current offset, so
* that our sub-slaves will be able to PSYNC with us after a
* disconnection. */
char *start = reply+10;
char *end = reply+9;
while(end[0] != '\r' && end[0] != '\n' && end[0] != '\0') end++;
if (end-start == CONFIG_RUN_ID_SIZE) {
char new[CONFIG_RUN_ID_SIZE+1];
memcpy(new,start,CONFIG_RUN_ID_SIZE);
new[CONFIG_RUN_ID_SIZE] = '\0';
if (strcmp(new,server.cached_master->replid)) {
/* Master ID changed. */
serverLog(LL_WARNING,"Master replication ID changed to %s",new);
/* Set the old ID as our ID2, up to the current offset+1. */
memcpy(server.replid2,server.cached_master->replid,
sizeof(server.replid2));
server.second_replid_offset = server.master_repl_offset+1;
/* Update the cached master ID and our own primary ID to the
* new one. */
memcpy(server.replid,new,sizeof(server.replid));
memcpy(server.cached_master->replid,new,sizeof(server.replid));
}
}
/* Setup the replication to continue. */
sdsfree(reply);
replicationResurrectCachedMaster(fd);
return PSYNC_CONTINUE;
@ -1386,6 +1547,8 @@ int slaveTryPartialResynchronization(int fd, int read_reply) {
return PSYNC_NOT_SUPPORTED;
}
/* This handler fires when the non blocking connect was able to
* establish a connection with the master. */
void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {
char tmpfile[256], *err = NULL;
int dfd, maxtries = 5;
@ -1402,7 +1565,8 @@ void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {
return;
}
/* Check for errors in the socket. */
/* Check for errors in the socket: after a non blocking connect() we
* may find that the socket is in error state. */
if (getsockopt(fd, SOL_SOCKET, SO_ERROR, &sockerr, &errlen) == -1)
sockerr = errno;
if (sockerr) {
@ -1531,13 +1695,15 @@ void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {
server.repl_state = REPL_STATE_SEND_CAPA;
}
/* Inform the master of our capabilities. While we currently send
* just one capability, it is possible to chain new capabilities here
* in the form of REPLCONF capa X capa Y capa Z ...
/* Inform the master of our (slave) capabilities.
*
* EOF: supports EOF-style RDB transfer for diskless replication.
* PSYNC2: supports PSYNC v2, so understands +CONTINUE <new repl ID>.
*
* The master will ignore capabilities it does not understand. */
if (server.repl_state == REPL_STATE_SEND_CAPA) {
err = sendSynchronousCommand(SYNC_CMD_WRITE,fd,"REPLCONF",
"capa","eof",NULL);
"capa","eof","capa","psync2",NULL);
if (err) goto write_error;
sdsfree(err);
server.repl_state = REPL_STATE_RECEIVE_CAPA;
@ -1591,14 +1757,14 @@ void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {
}
/* PSYNC failed or is not supported: we want our slaves to resync with us
* as well, if we have any (chained replication case). The mater may
* transfer us an entirely different data set and we have no way to
* incrementally feed our slaves after that. */
* as well, if we have any sub-slaves. The mater may transfer us an
* entirely different data set and we have no way to incrementally feed
* our slaves after that. */
disconnectSlaves(); /* Force our slaves to resync with us as well. */
freeReplicationBacklog(); /* Don't allow our chained slaves to PSYNC. */
/* Fall back to SYNC if needed. Otherwise psync_result == PSYNC_FULLRESYNC
* and the server.repl_master_runid and repl_master_initial_offset are
* and the server.master_replid and master_initial_offset are
* already populated. */
if (psync_result == PSYNC_NOT_SUPPORTED) {
serverLog(LL_NOTICE,"Retrying with SYNC...");
@ -1727,15 +1893,23 @@ int cancelReplicationHandshake(void) {
/* Set replication to the specified master address and port. */
void replicationSetMaster(char *ip, int port) {
int was_master = server.masterhost == NULL;
sdsfree(server.masterhost);
server.masterhost = sdsnew(ip);
server.masterport = port;
if (server.master) freeClient(server.master);
if (server.master) {
freeClient(server.master);
}
disconnectAllBlockedClients(); /* Clients blocked in master, now slave. */
disconnectSlaves(); /* Force our slaves to resync with us as well. */
replicationDiscardCachedMaster(); /* Don't try a PSYNC. */
freeReplicationBacklog(); /* Don't allow our chained slaves to PSYNC. */
/* Force our slaves to resync with us as well. They may hopefully be able
* to partially resync with us, but we can notify the replid change. */
disconnectSlaves();
cancelReplicationHandshake();
/* Before destroying our master state, create a cached master using
* our own parameters, to later PSYNC with the new master. */
if (was_master) replicationCacheMasterUsingMyself();
server.repl_state = REPL_STATE_CONNECT;
server.master_repl_offset = 0;
server.repl_down_since = 0;
@ -1746,20 +1920,26 @@ void replicationUnsetMaster(void) {
if (server.masterhost == NULL) return; /* Nothing to do. */
sdsfree(server.masterhost);
server.masterhost = NULL;
if (server.master) {
if (listLength(server.slaves) == 0) {
/* If this instance is turned into a master and there are no
* slaves, it inherits the replication offset from the master.
* Under certain conditions this makes replicas comparable by
* replication offset to understand what is the most updated. */
server.master_repl_offset = server.master->reploff;
freeReplicationBacklog();
}
freeClient(server.master);
}
/* When a slave is turned into a master, the current replication ID
* (that was inherited from the master at synchronization time) is
* used as secondary ID up to the current offset, and a new replication
* ID is created to continue with a new replication history. */
shiftReplicationId();
if (server.master) freeClient(server.master);
replicationDiscardCachedMaster();
cancelReplicationHandshake();
/* Disconnecting all the slaves is required: we need to inform slaves
* of the replication ID change (see shiftReplicationId() call). However
* the slaves will be able to partially resync with us, so it will be
* a very fast reconnection. */
disconnectSlaves();
server.repl_state = REPL_STATE_NONE;
/* We need to make sure the new master will start the replication stream
* with a SELECT statement. This is forced after a full resync, but
* with PSYNC version 2, there is no need for full resync after a
* master switch. */
server.slaveseldb = -1;
}
/* This function is called when the slave lose the connection with the
@ -1931,6 +2111,31 @@ void replicationCacheMaster(client *c) {
replicationHandleMasterDisconnection();
}
/* This function is called when a master is turend into a slave, in order to
* create from scratch a cached master for the new client, that will allow
* to PSYNC with the slave that was promoted as the new master after a
* failover.
*
* Assuming this instance was previously the master instance of the new master,
* the new master will accept its replication ID, and potentiall also the
* current offset if no data was lost during the failover. So we use our
* current replication ID and offset in order to synthesize a cached master. */
void replicationCacheMasterUsingMyself(void) {
/* The master client we create can be set to any DBID, because
* the new master will start its replication stream with SELECT. */
server.master_initial_offset = server.master_repl_offset;
replicationCreateMasterClient(-1,-1);
/* Use our own ID / offset. */
memcpy(server.master->replid, server.replid, sizeof(server.replid));
/* Set as cached master. */
unlinkClient(server.master);
server.cached_master = server.master;
server.master = NULL;
serverLog(LL_NOTICE,"Before turning into a slave, using my master parameters to synthesize a cached master: I may be able to synchronize with the new master with just a partial transfer.");
}
/* Free a cached master, called when there are no longer the conditions for
* a partial resync on reconnection. */
void replicationDiscardCachedMaster(void) {
@ -2290,7 +2495,9 @@ void replicationCron(void) {
robj *ping_argv[1];
/* First, send PING according to ping_slave_period. */
if ((replication_cron_loops % server.repl_ping_slave_period) == 0) {
if ((replication_cron_loops % server.repl_ping_slave_period) == 0 &&
listLength(server.slaves))
{
ping_argv[0] = createStringObject("PING",4);
replicationFeedSlaves(server.slaves, server.slaveseldb,
ping_argv, 1);
@ -2299,20 +2506,32 @@ void replicationCron(void) {
/* Second, send a newline to all the slaves in pre-synchronization
* stage, that is, slaves waiting for the master to create the RDB file.
*
* Also send the a newline to all the chained slaves we have, if we lost
* connection from our master, to keep the slaves aware that their
* master is online. This is needed since sub-slaves only receive proxied
* data from top-level masters, so there is no explicit pinging in order
* to avoid altering the replication offsets. This special out of band
* pings (newlines) can be sent, they will have no effect in the offset.
*
* The newline will be ignored by the slave but will refresh the
* last-io timer preventing a timeout. In this case we ignore the
* last interaction timer preventing a timeout. In this case we ignore the
* ping period and refresh the connection once per second since certain
* timeouts are set at a few seconds (example: PSYNC response). */
listRewind(server.slaves,&li);
while((ln = listNext(&li))) {
client *slave = ln->value;
if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START ||
int is_presync =
(slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START ||
(slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END &&
server.rdb_child_type != RDB_CHILD_TYPE_SOCKET))
{
server.rdb_child_type != RDB_CHILD_TYPE_SOCKET));
int is_subslave = server.masterhost && server.master == NULL &&
slave->replstate == SLAVE_STATE_ONLINE;
if (is_presync || is_subslave) {
if (write(slave->fd, "\n", 1) == -1) {
/* Don't worry, it's just a ping. */
/* Don't worry about socket errors, it's just a ping. */
}
}
}
@ -2337,10 +2556,14 @@ void replicationCron(void) {
}
}
/* If we have no attached slaves and there is a replication backlog
* using memory, free it after some (configured) time. */
/* If this is a master without attached slaves and there is a replication
* backlog active, in order to reclaim memory we can free it after some
* (configured) time. Note that this cannot be done for slaves: slaves
* without sub-slaves attached should still accumulate data into the
* backlog, in order to reply to PSYNC queries if they are turned into
* masters after a failover. */
if (listLength(server.slaves) == 0 && server.repl_backlog_time_limit &&
server.repl_backlog)
server.repl_backlog && server.masterhost == NULL)
{
time_t idle = server.unixtime - server.repl_no_slaves_since;

View File

@ -1079,7 +1079,7 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
{
serverLog(LL_NOTICE,"%d changes in %d seconds. Saving...",
sp->changes, (int)sp->seconds);
rdbSaveBackground(server.rdb_filename);
rdbSaveBackground(server.rdb_filename,NULL);
break;
}
}
@ -1151,7 +1151,7 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
(server.unixtime-server.lastbgsave_try > CONFIG_BGSAVE_RETRY_DELAY ||
server.lastbgsave_status == C_OK))
{
if (rdbSaveBackground(server.rdb_filename) == C_OK)
if (rdbSaveBackground(server.rdb_filename,NULL) == C_OK)
server.rdb_bgsave_scheduled = 0;
}
@ -1309,10 +1309,11 @@ void initServerConfig(void) {
int j;
getRandomHexChars(server.runid,CONFIG_RUN_ID_SIZE);
server.runid[CONFIG_RUN_ID_SIZE] = '\0';
changeReplicationId();
server.configfile = NULL;
server.executable = NULL;
server.hz = CONFIG_DEFAULT_HZ;
server.runid[CONFIG_RUN_ID_SIZE] = '\0';
server.arch_bits = (sizeof(long) == 8) ? 64 : 32;
server.port = CONFIG_DEFAULT_SERVER_PORT;
server.tcp_backlog = CONFIG_DEFAULT_TCP_BACKLOG;
@ -1409,7 +1410,7 @@ void initServerConfig(void) {
server.masterport = 6379;
server.master = NULL;
server.cached_master = NULL;
server.repl_master_initial_offset = -1;
server.master_initial_offset = -1;
server.repl_state = REPL_STATE_NONE;
server.repl_syncio_timeout = CONFIG_REPL_SYNCIO_TIMEOUT;
server.repl_serve_stale_data = CONFIG_DEFAULT_SLAVE_SERVE_STALE_DATA;
@ -2471,7 +2472,7 @@ int prepareForShutdown(int flags) {
if ((server.saveparamslen > 0 && !nosave) || save) {
serverLog(LL_NOTICE,"Saving the final RDB snapshot before exiting.");
/* Snapshotting. Perform a SYNC SAVE and exit */
if (rdbSave(server.rdb_filename) != C_OK) {
if (rdbSave(server.rdb_filename,NULL) != C_OK) {
/* Ooops.. error saving! The best we can do is to continue
* operating. Note that if there was a background saving process,
* in the next cron() Redis will be notified that the background
@ -3135,12 +3136,18 @@ sds genRedisInfoString(char *section) {
}
}
info = sdscatprintf(info,
"master_replid:%s\r\n"
"master_replid2:%s\r\n"
"master_repl_offset:%lld\r\n"
"second_repl_offset:%lld\r\n"
"repl_backlog_active:%d\r\n"
"repl_backlog_size:%lld\r\n"
"repl_backlog_first_byte_offset:%lld\r\n"
"repl_backlog_histlen:%lld\r\n",
server.replid,
server.replid2,
server.master_repl_offset,
server.second_replid_offset,
server.repl_backlog != NULL,
server.repl_backlog_size,
server.repl_backlog_off,
@ -3416,9 +3423,20 @@ void loadDataFromDisk(void) {
if (loadAppendOnlyFile(server.aof_filename) == C_OK)
serverLog(LL_NOTICE,"DB loaded from append only file: %.3f seconds",(float)(ustime()-start)/1000000);
} else {
if (rdbLoad(server.rdb_filename) == C_OK) {
rdbSaveInfo rsi = RDB_SAVE_INFO_INIT;
if (rdbLoad(server.rdb_filename,&rsi) == C_OK) {
serverLog(LL_NOTICE,"DB loaded from disk: %.3f seconds",
(float)(ustime()-start)/1000000);
/* Restore the replication ID / offset from the RDB file. */
if (rsi.repl_id_is_set && rsi.repl_offset != -1) {
memcpy(server.replid,rsi.repl_id,sizeof(server.replid));
server.master_repl_offset = rsi.repl_offset;
/* If we are a slave, create a cached master from this
* information, in order to allow partial resynchronizations
* with masters. */
if (server.masterhost) replicationCacheMasterUsingMyself();
}
} else if (errno != ENOENT) {
serverLog(LL_WARNING,"Fatal error loading the DB: %s. Exiting.",strerror(errno));
exit(1);

View File

@ -293,7 +293,8 @@ typedef long long mstime_t; /* millisecond time type. */
/* Slave capabilities. */
#define SLAVE_CAPA_NONE 0
#define SLAVE_CAPA_EOF (1<<0) /* Can parse the RDB EOF streaming format. */
#define SLAVE_CAPA_EOF (1<<0) /* Can parse the RDB EOF streaming format. */
#define SLAVE_CAPA_PSYNC2 (1<<1) /* Supports PSYNC2 protocol. */
/* Synchronous read timeout - slave side */
#define CONFIG_REPL_SYNCIO_TIMEOUT 5
@ -679,8 +680,8 @@ typedef struct client {
long long psync_initial_offset; /* FULLRESYNC reply offset other slaves
copying this slave output buffer
should use. */
char replrunid[CONFIG_RUN_ID_SIZE+1]; /* Master run id if is a master. */
int slave_listening_port; /* As configured with: REPLCONF listening-port */
char replid[CONFIG_RUN_ID_SIZE+1]; /* Master replication ID (if master). */
int slave_listening_port; /* As configured with: SLAVECONF listening-port */
char slave_ip[NET_IP_STR_LEN]; /* Optionally given by REPLCONF ip-address */
int slave_capa; /* Slave capabilities: SLAVE_CAPA_* bitwise OR. */
multiState mstate; /* MULTI/EXEC state */
@ -803,6 +804,26 @@ struct redisMemOverhead {
} *db;
};
/* This structure can be optionally passed to RDB save/load functions in
* order to implement additional functionalities, by storing and loading
* metadata to the RDB file.
*
* Currently the only use is to select a DB at load time, useful in
* replication in order to make sure that chained slaves (slaves of slaves)
* select the correct DB and are able to accept the stream coming from the
* top-level master. */
typedef struct rdbSaveInfo {
/* Used saving and loading. */
int repl_stream_db; /* DB to select in server.master client. */
/* Used only loading. */
int repl_id_is_set; /* True if repl_id field is set. */
char repl_id[CONFIG_RUN_ID_SIZE+1]; /* Replication ID. */
long long repl_offset; /* Replication offset. */
} rdbSaveInfo;
#define RDB_SAVE_INFO_INIT {-1,0,"000000000000000000000000000000",-1}
/*-----------------------------------------------------------------------------
* Global server state
*----------------------------------------------------------------------------*/
@ -988,15 +1009,19 @@ struct redisServer {
char *syslog_ident; /* Syslog ident */
int syslog_facility; /* Syslog facility */
/* Replication (master) */
char replid[CONFIG_RUN_ID_SIZE+1]; /* My current replication ID. */
char replid2[CONFIG_RUN_ID_SIZE+1]; /* replid inherited from master*/
long long master_repl_offset; /* My current replication offset */
long long second_replid_offset; /* Accept offsets up to this for replid2. */
int slaveseldb; /* Last SELECTed DB in replication output */
long long master_repl_offset; /* Global replication offset */
int repl_ping_slave_period; /* Master pings the slave every N seconds */
char *repl_backlog; /* Replication backlog for partial syncs */
long long repl_backlog_size; /* Backlog circular buffer size */
long long repl_backlog_histlen; /* Backlog actual data length */
long long repl_backlog_idx; /* Backlog circular buffer current offset */
long long repl_backlog_off; /* Replication offset of first byte in the
backlog buffer. */
long long repl_backlog_idx; /* Backlog circular buffer current offset,
that is the next byte will'll write to.*/
long long repl_backlog_off; /* Replication "master offset" of first
byte in the replication backlog buffer.*/
time_t repl_backlog_time_limit; /* Time without slaves after the backlog
gets released. */
time_t repl_no_slaves_since; /* We have no slaves since that time.
@ -1029,8 +1054,11 @@ struct redisServer {
int slave_priority; /* Reported in INFO and used by Sentinel. */
int slave_announce_port; /* Give the master this listening port. */
char *slave_announce_ip; /* Give the master this ip address. */
char repl_master_runid[CONFIG_RUN_ID_SIZE+1]; /* Master run id for PSYNC.*/
long long repl_master_initial_offset; /* Master PSYNC offset. */
/* The following two fields is where we store master PSYNC replid/offset
* while the PSYNC is in progress. At the end we'll copy the fields into
* the server->master client structure. */
char master_replid[CONFIG_RUN_ID_SIZE+1]; /* Master PSYNC runid. */
long long master_initial_offset; /* Master PSYNC offset. */
int repl_slave_lazy_flush; /* Lazy FLUSHALL before loading DB? */
/* Replication script cache. */
dict *repl_scriptcache_dict; /* SHA1 all slaves are aware of. */
@ -1259,6 +1287,7 @@ void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask);
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask);
void acceptUnixHandler(aeEventLoop *el, int fd, void *privdata, int mask);
void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask);
void addReplyString(client *c, const char *s, size_t len);
void addReplyBulk(client *c, robj *obj);
void addReplyBulkCString(client *c, const char *s);
void addReplyBulkCBuffer(client *c, const void *p, size_t len);
@ -1393,6 +1422,7 @@ ssize_t syncReadLine(int fd, char *ptr, ssize_t size, long long timeout);
/* Replication */
void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc);
void replicationFeedSlavesFromMasterStream(list *slaves, char *buf, size_t buflen);
void replicationFeedMonitors(client *c, list *monitors, int dictid, robj **argv, int argc);
void updateSlavesWaitingBgsave(int bgsaveerr, int type);
void replicationCron(void);
@ -1414,6 +1444,10 @@ long long replicationGetSlaveOffset(void);
char *replicationGetSlaveName(client *c);
long long getPsyncInitialOffset(void);
int replicationSetupSlaveForFullResync(client *slave, long long offset);
void changeReplicationId(void);
void clearReplicationId2(void);
void chopReplicationBacklog(void);
void replicationCacheMasterUsingMyself(void);
/* Generic persistence functions */
void startLoading(FILE *fp);
@ -1422,7 +1456,7 @@ void stopLoading(void);
/* RDB persistence */
#include "rdb.h"
int rdbSaveRio(rio *rdb, int *error, int flags);
int rdbSaveRio(rio *rdb, int *error, int flags, rdbSaveInfo *rsi);
/* AOF persistence */
void flushAppendOnlyFile(int force);