diff --git a/redis.conf b/redis.conf index 3a66f23d4..adc09020c 100644 --- a/redis.conf +++ b/redis.conf @@ -402,6 +402,10 @@ repl-disable-tcp-nodelay no # need to elapse, starting from the time the last slave disconnected, for # the backlog buffer to be freed. # +# Note that slaves never free the backlog for timeout, since they may be +# promoted to masters later, and should be able to correctly "partially +# resynchronize" with the slaves: hence they should always accumulate backlog. +# # A value of 0 means to never release the backlog. # # repl-backlog-ttl 3600 diff --git a/src/aof.c b/src/aof.c index c75153cc7..07d8561da 100644 --- a/src/aof.c +++ b/src/aof.c @@ -653,7 +653,7 @@ int loadAppendOnlyFile(char *filename) { serverLog(LL_NOTICE,"Reading RDB preamble from AOF file..."); if (fseek(fp,0,SEEK_SET) == -1) goto readerr; rioInitWithFile(&rdb,fp); - if (rdbLoadRio(&rdb) != C_OK) { + if (rdbLoadRio(&rdb,NULL) != C_OK) { serverLog(LL_WARNING,"Error reading the RDB preamble of the AOF file, AOF loading aborted"); goto readerr; } else { @@ -1152,7 +1152,7 @@ int rewriteAppendOnlyFile(char *filename) { if (server.aof_use_rdb_preamble) { int error; - if (rdbSaveRio(&aof,&error,RDB_SAVE_AOF_PREAMBLE) == C_ERR) { + if (rdbSaveRio(&aof,&error,RDB_SAVE_AOF_PREAMBLE,NULL) == C_ERR) { errno = error; goto werr; } diff --git a/src/db.c b/src/db.c index 268e7c384..55ae663c2 100644 --- a/src/db.c +++ b/src/db.c @@ -413,7 +413,7 @@ void flushallCommand(client *c) { /* Normally rdbSave() will reset dirty, but we don't want this here * as otherwise FLUSHALL will not be replicated nor put into the AOF. */ int saved_dirty = server.dirty; - rdbSave(server.rdb_filename); + rdbSave(server.rdb_filename,NULL); server.dirty = saved_dirty; } server.dirty++; diff --git a/src/debug.c b/src/debug.c index d48caedcc..f4689d532 100644 --- a/src/debug.c +++ b/src/debug.c @@ -320,12 +320,12 @@ void debugCommand(client *c) { if (c->argc >= 3) c->argv[2] = tryObjectEncoding(c->argv[2]); serverAssertWithInfo(c,c->argv[0],1 == 2); } else if (!strcasecmp(c->argv[1]->ptr,"reload")) { - if (rdbSave(server.rdb_filename) != C_OK) { + if (rdbSave(server.rdb_filename,NULL) != C_OK) { addReply(c,shared.err); return; } emptyDb(-1,EMPTYDB_NO_FLAGS,NULL); - if (rdbLoad(server.rdb_filename) != C_OK) { + if (rdbLoad(server.rdb_filename,NULL) != C_OK) { addReplyError(c,"Error trying to load the RDB dump"); return; } diff --git a/src/networking.c b/src/networking.c index 2be40ae15..b2cec8631 100644 --- a/src/networking.c +++ b/src/networking.c @@ -352,6 +352,14 @@ void addReplySds(client *c, sds s) { } } +/* This low level function just adds whatever protocol you send it to the + * client buffer, trying the static buffer initially, and using the string + * of objects if not possible. + * + * It is efficient because does not create an SDS object nor an Redis object + * if not needed. The object will only be created by calling + * _addReplyStringToList() if we fail to extend the existing tail object + * in the list of objects. */ void addReplyString(client *c, const char *s, size_t len) { if (prepareClientToWrite(c) != C_OK) return; if (_addReplyToBuffer(c,s,len) != C_OK) @@ -1022,7 +1030,7 @@ int processInlineBuffer(client *c) { char *newline; int argc, j; sds *argv, aux; - size_t querylen; + size_t querylen, protolen; /* Search for end of line */ newline = strchr(c->querybuf,'\n'); @@ -1035,6 +1043,7 @@ int processInlineBuffer(client *c) { } return C_ERR; } + protolen = (newline - c->querybuf)+1; /* Total protocol bytes of command. */ /* Handle the \r\n case. */ if (newline && newline != c->querybuf && *(newline-1) == '\r') @@ -1057,6 +1066,15 @@ int processInlineBuffer(client *c) { if (querylen == 0 && c->flags & CLIENT_SLAVE) c->repl_ack_time = server.unixtime; + /* Newline from masters can be used to prevent timeouts, but should + * not affect the replication offset since they are always sent + * "out of band" directly writing to the socket and without passing + * from the output buffers. */ + if (querylen == 0 && c->flags & CLIENT_MASTER) { + c->reploff -= protolen; + while (protolen--) chopReplicationBacklog(); + } + /* Leave data after the first line of the query in the buffer */ sdsrange(c->querybuf,querylen+2,-1); @@ -1321,7 +1339,11 @@ void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) { sdsIncrLen(c->querybuf,nread); c->lastinteraction = server.unixtime; - if (c->flags & CLIENT_MASTER) c->reploff += nread; + if (c->flags & CLIENT_MASTER) { + c->reploff += nread; + replicationFeedSlavesFromMasterStream(server.slaves, + c->querybuf+qblen,nread); + } server.stat_net_input_bytes += nread; if (sdslen(c->querybuf) > server.client_max_querybuf_len) { sds ci = catClientInfoString(sdsempty(),c), bytes = sdsempty(); diff --git a/src/rdb.c b/src/rdb.c index 29f880dac..765e13374 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -835,7 +835,7 @@ int rdbSaveAuxFieldStrInt(rio *rdb, char *key, long long val) { } /* Save a few default AUX fields with information about the RDB generated. */ -int rdbSaveInfoAuxFields(rio *rdb, int flags) { +int rdbSaveInfoAuxFields(rio *rdb, int flags, rdbSaveInfo *rsi) { int redis_bits = (sizeof(void*) == 8) ? 64 : 32; int aof_preamble = (flags & RDB_SAVE_AOF_PREAMBLE) != 0; @@ -844,7 +844,19 @@ int rdbSaveInfoAuxFields(rio *rdb, int flags) { if (rdbSaveAuxFieldStrInt(rdb,"redis-bits",redis_bits) == -1) return -1; if (rdbSaveAuxFieldStrInt(rdb,"ctime",time(NULL)) == -1) return -1; if (rdbSaveAuxFieldStrInt(rdb,"used-mem",zmalloc_used_memory()) == -1) return -1; + + /* Handle saving options that generate aux fields. */ + if (rsi) { + if (rsi->repl_stream_db && + rdbSaveAuxFieldStrInt(rdb,"repl-stream-db",rsi->repl_stream_db) + == -1) + { + return -1; + } + } if (rdbSaveAuxFieldStrInt(rdb,"aof-preamble",aof_preamble) == -1) return -1; + if (rdbSaveAuxFieldStrStr(rdb,"repl-id",server.replid) == -1) return -1; + if (rdbSaveAuxFieldStrInt(rdb,"repl-offset",server.master_repl_offset) == -1) return -1; return 1; } @@ -856,7 +868,7 @@ int rdbSaveInfoAuxFields(rio *rdb, int flags) { * When the function returns C_ERR and if 'error' is not NULL, the * integer pointed by 'error' is set to the value of errno just after the I/O * error. */ -int rdbSaveRio(rio *rdb, int *error, int flags) { +int rdbSaveRio(rio *rdb, int *error, int flags, rdbSaveInfo *rsi) { dictIterator *di = NULL; dictEntry *de; char magic[10]; @@ -869,7 +881,7 @@ int rdbSaveRio(rio *rdb, int *error, int flags) { rdb->update_cksum = rioGenericUpdateChecksum; snprintf(magic,sizeof(magic),"REDIS%04d",RDB_VERSION); if (rdbWriteRaw(rdb,magic,9) == -1) goto werr; - if (rdbSaveInfoAuxFields(rdb,flags) == -1) goto werr; + if (rdbSaveInfoAuxFields(rdb,flags,rsi) == -1) goto werr; for (j = 0; j < server.dbnum; j++) { redisDb *db = server.db+j; @@ -945,7 +957,7 @@ werr: * While the suffix is the 40 bytes hex string we announced in the prefix. * This way processes receiving the payload can understand when it ends * without doing any processing of the content. */ -int rdbSaveRioWithEOFMark(rio *rdb, int *error) { +int rdbSaveRioWithEOFMark(rio *rdb, int *error, rdbSaveInfo *rsi) { char eofmark[RDB_EOF_MARK_SIZE]; getRandomHexChars(eofmark,RDB_EOF_MARK_SIZE); @@ -953,7 +965,7 @@ int rdbSaveRioWithEOFMark(rio *rdb, int *error) { if (rioWrite(rdb,"$EOF:",5) == 0) goto werr; if (rioWrite(rdb,eofmark,RDB_EOF_MARK_SIZE) == 0) goto werr; if (rioWrite(rdb,"\r\n",2) == 0) goto werr; - if (rdbSaveRio(rdb,error,RDB_SAVE_NONE) == C_ERR) goto werr; + if (rdbSaveRio(rdb,error,RDB_SAVE_NONE,rsi) == C_ERR) goto werr; if (rioWrite(rdb,eofmark,RDB_EOF_MARK_SIZE) == 0) goto werr; return C_OK; @@ -964,7 +976,7 @@ werr: /* Write error. */ } /* Save the DB on disk. Return C_ERR on error, C_OK on success. */ -int rdbSave(char *filename) { +int rdbSave(char *filename, rdbSaveInfo *rsi) { char tmpfile[256]; char cwd[MAXPATHLEN]; /* Current working dir path for error messages. */ FILE *fp; @@ -985,7 +997,7 @@ int rdbSave(char *filename) { } rioInitWithFile(&rdb,fp); - if (rdbSaveRio(&rdb,&error,RDB_SAVE_NONE) == C_ERR) { + if (rdbSaveRio(&rdb,&error,RDB_SAVE_NONE,rsi) == C_ERR) { errno = error; goto werr; } @@ -1023,7 +1035,7 @@ werr: return C_ERR; } -int rdbSaveBackground(char *filename) { +int rdbSaveBackground(char *filename, rdbSaveInfo *rsi) { pid_t childpid; long long start; @@ -1040,7 +1052,7 @@ int rdbSaveBackground(char *filename) { /* Child */ closeListeningSockets(0); redisSetProcTitle("redis-rdb-bgsave"); - retval = rdbSave(filename); + retval = rdbSave(filename,rsi); if (retval == C_OK) { size_t private_dirty = zmalloc_get_private_dirty(-1); @@ -1410,7 +1422,7 @@ void rdbLoadProgressCallback(rio *r, const void *buf, size_t len) { /* Load an RDB file from the rio stream 'rdb'. On success C_OK is returned, * otherwise C_ERR is returned and 'errno' is set accordingly. */ -int rdbLoadRio(rio *rdb) { +int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi) { uint64_t dbid; int type, rdbver; redisDb *db = server.db+0; @@ -1501,6 +1513,15 @@ int rdbLoadRio(rio *rdb) { serverLog(LL_NOTICE,"RDB '%s': %s", (char*)auxkey->ptr, (char*)auxval->ptr); + } else if (!strcasecmp(auxkey->ptr,"repl-stream-db")) { + if (rsi) rsi->repl_stream_db = atoi(auxval->ptr); + } else if (!strcasecmp(auxkey->ptr,"repl-id")) { + if (rsi && sdslen(auxval->ptr) == CONFIG_RUN_ID_SIZE) { + memcpy(rsi->repl_id,auxval->ptr,CONFIG_RUN_ID_SIZE+1); + rsi->repl_id_is_set = 1; + } + } else if (!strcasecmp(auxkey->ptr,"repl-offset")) { + if (rsi) rsi->repl_offset = strtoll(auxval->ptr,NULL,10); } else { /* We ignore fields we don't understand, as by AUX field * contract. */ @@ -1559,8 +1580,11 @@ eoferr: /* unexpected end of file is handled here with a fatal exit */ /* Like rdbLoadRio() but takes a filename instead of a rio stream. The * filename is open for reading and a rio stream object created in order * to do the actual loading. Moreover the ETA displayed in the INFO - * output is initialized and finalized. */ -int rdbLoad(char *filename) { + * output is initialized and finalized. + * + * If you pass an 'rsi' structure initialied with RDB_SAVE_OPTION_INIT, the + * loading code will fiil the information fields in the structure. */ +int rdbLoad(char *filename, rdbSaveInfo *rsi) { FILE *fp; rio rdb; int retval; @@ -1568,7 +1592,7 @@ int rdbLoad(char *filename) { if ((fp = fopen(filename,"r")) == NULL) return C_ERR; startLoading(fp); rioInitWithFile(&rdb,fp); - retval = rdbLoadRio(&rdb); + retval = rdbLoadRio(&rdb,rsi); fclose(fp); stopLoading(); return retval; @@ -1721,7 +1745,7 @@ void backgroundSaveDoneHandler(int exitcode, int bysignal) { /* Spawn an RDB child that writes the RDB to the sockets of the slaves * that are currently in SLAVE_STATE_WAIT_BGSAVE_START state. */ -int rdbSaveToSlavesSockets(void) { +int rdbSaveToSlavesSockets(rdbSaveInfo *rsi) { int *fds; uint64_t *clientids; int numfds; @@ -1779,7 +1803,7 @@ int rdbSaveToSlavesSockets(void) { closeListeningSockets(0); redisSetProcTitle("redis-rdb-to-slaves"); - retval = rdbSaveRioWithEOFMark(&slave_sockets,NULL); + retval = rdbSaveRioWithEOFMark(&slave_sockets,NULL,rsi); if (retval == C_OK && rioFlush(&slave_sockets) == 0) retval = C_ERR; @@ -1884,7 +1908,7 @@ void saveCommand(client *c) { addReplyError(c,"Background save already in progress"); return; } - if (rdbSave(server.rdb_filename) == C_OK) { + if (rdbSave(server.rdb_filename,NULL) == C_OK) { addReply(c,shared.ok); } else { addReply(c,shared.err); @@ -1918,7 +1942,7 @@ void bgsaveCommand(client *c) { "Use BGSAVE SCHEDULE in order to schedule a BGSAVE whenver " "possible."); } - } else if (rdbSaveBackground(server.rdb_filename) == C_OK) { + } else if (rdbSaveBackground(server.rdb_filename,NULL) == C_OK) { addReplyStatus(c,"Background saving started"); } else { addReply(c,shared.err); diff --git a/src/rdb.h b/src/rdb.h index 60c52a7c1..efe932255 100644 --- a/src/rdb.h +++ b/src/rdb.h @@ -118,11 +118,11 @@ uint64_t rdbLoadLen(rio *rdb, int *isencoded); int rdbLoadLenByRef(rio *rdb, int *isencoded, uint64_t *lenptr); int rdbSaveObjectType(rio *rdb, robj *o); int rdbLoadObjectType(rio *rdb); -int rdbLoad(char *filename); -int rdbSaveBackground(char *filename); -int rdbSaveToSlavesSockets(void); +int rdbLoad(char *filename, rdbSaveInfo *rsi); +int rdbSaveBackground(char *filename, rdbSaveInfo *rsi); +int rdbSaveToSlavesSockets(rdbSaveInfo *rsi); void rdbRemoveTempFile(pid_t childpid); -int rdbSave(char *filename); +int rdbSave(char *filename, rdbSaveInfo *rsi); ssize_t rdbSaveObject(rio *rdb, robj *o); size_t rdbSavedObjectLen(robj *o); robj *rdbLoadObject(int type, rio *rdb); @@ -136,6 +136,6 @@ int rdbSaveBinaryDoubleValue(rio *rdb, double val); int rdbLoadBinaryDoubleValue(rio *rdb, double *val); int rdbSaveBinaryFloatValue(rio *rdb, float val); int rdbLoadBinaryFloatValue(rio *rdb, float *val); -int rdbLoadRio(rio *rdb); +int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi); #endif diff --git a/src/replication.c b/src/replication.c index 67091dd0b..1a9b2e574 100644 --- a/src/replication.c +++ b/src/replication.c @@ -79,11 +79,6 @@ void createReplicationBacklog(void) { server.repl_backlog = zmalloc(server.repl_backlog_size); server.repl_backlog_histlen = 0; server.repl_backlog_idx = 0; - /* When a new backlog buffer is created, we increment the replication - * offset by one to make sure we'll not be able to PSYNC with any - * previous slave. This is needed because we avoid incrementing the - * master_repl_offset if no backlog exists nor slaves are attached. */ - server.master_repl_offset++; /* We don't have any data inside our buffer, but virtually the first * byte we have is the next byte that will be generated for the @@ -153,6 +148,22 @@ void feedReplicationBacklog(void *ptr, size_t len) { server.repl_backlog_histlen + 1; } +/* Remove the last byte from the replication backlog. This + * is useful when we receive an out of band "\n" to keep the connection + * alive but don't want to count it as replication stream. + * + * As a side effect this function adjusts the master replication offset + * of this instance to account for the missing byte. */ +void chopReplicationBacklog(void) { + if (!server.repl_backlog || !server.repl_backlog_histlen) return; + if (server.repl_backlog_idx == 0) + server.repl_backlog_idx = server.repl_backlog_size-1; + else + server.repl_backlog_idx--; + server.master_repl_offset--; + server.repl_backlog_histlen--; +} + /* Wrapper for feedReplicationBacklog() that takes Redis string objects * as input. */ void feedReplicationBacklogWithObject(robj *o) { @@ -170,12 +181,24 @@ void feedReplicationBacklogWithObject(robj *o) { feedReplicationBacklog(p,len); } +/* Propagate write commands to slaves, and populate the replication backlog + * as well. This function is used if the instance is a master: we use + * the commands received by our clients in order to create the replication + * stream. Instead if the instance is a slave and has sub-slaves attached, + * we use replicationFeedSlavesFromMaster() */ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) { listNode *ln; listIter li; int j, len; char llstr[LONG_STR_SIZE]; + /* If the instance is not a top level master, return ASAP: we'll just proxy + * the stream of data we receive from our master instead, in order to + * propagate *identical* replication stream. In this way this slave can + * advertise the same replication ID as the master (since it shares the + * master replication history and has the same backlog and offsets). */ + if (server.masterhost != NULL) return; + /* If there aren't slaves, and there is no backlog buffer to populate, * we can return ASAP. */ if (server.repl_backlog == NULL && listLength(slaves) == 0) return; @@ -265,6 +288,34 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) { } } +/* This function is used in order to proxy what we receive from our master + * to our sub-slaves. */ +#include +void replicationFeedSlavesFromMasterStream(list *slaves, char *buf, size_t buflen) { + listNode *ln; + listIter li; + + /* Debugging: this is handy to see the stream sent from master + * to slaves. Disabled with if(0). */ + if (0) { + printf("%zu:",buflen); + for (size_t j = 0; j < buflen; j++) { + printf("%c", isprint(buf[j]) ? buf[j] : '.'); + } + printf("\n"); + } + + if (server.repl_backlog) feedReplicationBacklog(buf,buflen); + listRewind(slaves,&li); + while((ln = listNext(&li))) { + client *slave = ln->value; + + /* Don't feed slaves that are still waiting for BGSAVE to start */ + if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue; + addReplyString(slave,buf,buflen); + } +} + void replicationFeedMonitors(client *c, list *monitors, int dictid, robj **argv, int argc) { listNode *ln; listIter li; @@ -329,7 +380,7 @@ long long addReplyReplicationBacklog(client *c, long long offset) { skip = offset - server.repl_backlog_off; serverLog(LL_DEBUG, "[PSYNC] Skipping: %lld", skip); - /* Point j to the oldest byte, that is actaully our + /* Point j to the oldest byte, that is actually our * server.repl_backlog_off byte. */ j = (server.repl_backlog_idx + (server.repl_backlog_size-server.repl_backlog_histlen)) % @@ -361,18 +412,14 @@ long long addReplyReplicationBacklog(client *c, long long offset) { * the BGSAVE process started and before executing any other command * from clients. */ long long getPsyncInitialOffset(void) { - long long psync_offset = server.master_repl_offset; - /* Add 1 to psync_offset if it the replication backlog does not exists - * as when it will be created later we'll increment the offset by one. */ - if (server.repl_backlog == NULL) psync_offset++; - return psync_offset; + return server.master_repl_offset; } /* Send a FULLRESYNC reply in the specific case of a full resynchronization, * as a side effect setup the slave for a full sync in different ways: * - * 1) Remember, into the slave client structure, the offset we sent - * here, so that if new slaves will later attach to the same + * 1) Remember, into the slave client structure, the replication offset + * we sent here, so that if new slaves will later attach to the same * background RDB saving process (by duplicating this client output * buffer), we can get the right offset from this slave. * 2) Set the replication state of the slave to WAIT_BGSAVE_END so that @@ -392,14 +439,14 @@ int replicationSetupSlaveForFullResync(client *slave, long long offset) { slave->replstate = SLAVE_STATE_WAIT_BGSAVE_END; /* We are going to accumulate the incremental changes for this * slave as well. Set slaveseldb to -1 in order to force to re-emit - * a SLEECT statement in the replication stream. */ + * a SELECT statement in the replication stream. */ server.slaveseldb = -1; /* Don't send this reply to slaves that approached us with * the old SYNC command. */ if (!(slave->flags & CLIENT_PRE_PSYNC)) { buflen = snprintf(buf,sizeof(buf),"+FULLRESYNC %s %lld\r\n", - server.runid,offset); + server.replid,offset); if (write(slave->fd,buf,buflen) != buflen) { freeClientAsync(slave); return C_ERR; @@ -415,19 +462,32 @@ int replicationSetupSlaveForFullResync(client *slave, long long offset) { * with the usual full resync. */ int masterTryPartialResynchronization(client *c) { long long psync_offset, psync_len; - char *master_runid = c->argv[1]->ptr; + char *master_replid = c->argv[1]->ptr; char buf[128]; int buflen; - /* Is the runid of this master the same advertised by the wannabe slave - * via PSYNC? If runid changed this master is a different instance and - * there is no way to continue. */ - if (strcasecmp(master_runid, server.runid)) { + /* Parse the replication offset asked by the slave. Go to full sync + * on parse error: this should never happen but we try to handle + * it in a robust way compared to aborting. */ + if (getLongLongFromObjectOrReply(c,c->argv[2],&psync_offset,NULL) != + C_OK) goto need_full_resync; + + /* Is the replication ID of this master the same advertised by the wannabe + * slave via PSYNC? If the replication ID changed this master has a + * different replication history, and there is no way to continue. + * + * Note that there are two potentially valid replication IDs: the ID1 + * and the ID2. The ID2 however is only valid up to a specific offset. */ + if (strcasecmp(master_replid, server.replid) && + (strcasecmp(master_replid, server.replid2) || + psync_offset > server.second_replid_offset)) + { /* Run id "?" is used by slaves that want to force a full resync. */ - if (master_runid[0] != '?') { + if (master_replid[0] != '?') { serverLog(LL_NOTICE,"Partial resynchronization not accepted: " - "Runid mismatch (Client asked for runid '%s', my runid is '%s')", - master_runid, server.runid); + "Replication ID mismatch (Slave asked for '%s', my replication " + "ID is '%s')", + master_replid, server.replid); } else { serverLog(LL_NOTICE,"Full resync requested by slave %s", replicationGetSlaveName(c)); @@ -436,8 +496,6 @@ int masterTryPartialResynchronization(client *c) { } /* We still have the data our slave is asking for? */ - if (getLongLongFromObjectOrReply(c,c->argv[2],&psync_offset,NULL) != - C_OK) goto need_full_resync; if (!server.repl_backlog || psync_offset < server.repl_backlog_off || psync_offset > (server.repl_backlog_off + server.repl_backlog_histlen)) @@ -463,7 +521,11 @@ int masterTryPartialResynchronization(client *c) { /* We can't use the connection buffers since they are used to accumulate * new commands at this stage. But we are sure the socket send buffer is * empty so this write will never fail actually. */ - buflen = snprintf(buf,sizeof(buf),"+CONTINUE\r\n"); + if (c->slave_capa & SLAVE_CAPA_PSYNC2) { + buflen = snprintf(buf,sizeof(buf),"+CONTINUE %s\r\n", server.replid); + } else { + buflen = snprintf(buf,sizeof(buf),"+CONTINUE\r\n"); + } if (write(c->fd,buf,buflen) != buflen) { freeClientAsync(c); return C_OK; @@ -515,10 +577,18 @@ int startBgsaveForReplication(int mincapa) { serverLog(LL_NOTICE,"Starting BGSAVE for SYNC with target: %s", socket_target ? "slaves sockets" : "disk"); + rdbSaveInfo rsi = RDB_SAVE_INFO_INIT; + /* If we are saving for a chained slave (that is, if we are, + * in turn, a slave of another instance), make sure after + * loadig the RDB, our slaves select the right DB: we'll just + * send the replication stream we receive from our master, so + * no way to send SELECT commands. */ + if (server.master) rsi.repl_stream_db = server.master->db->id; + if (socket_target) - retval = rdbSaveToSlavesSockets(); + retval = rdbSaveToSlavesSockets(&rsi); else - retval = rdbSaveBackground(server.rdb_filename); + retval = rdbSaveBackground(server.rdb_filename,&rsi); /* If we failed to BGSAVE, remove the slaves waiting for a full * resynchorinization from the list of salves, inform them with @@ -589,22 +659,22 @@ void syncCommand(client *c) { * when this happens masterTryPartialResynchronization() already * replied with: * - * +FULLRESYNC + * +FULLRESYNC * - * So the slave knows the new runid and offset to try a PSYNC later + * So the slave knows the new replid and offset to try a PSYNC later * if the connection with the master is lost. */ if (!strcasecmp(c->argv[0]->ptr,"psync")) { if (masterTryPartialResynchronization(c) == C_OK) { server.stat_sync_partial_ok++; return; /* No full resync needed, return. */ } else { - char *master_runid = c->argv[1]->ptr; + char *master_replid = c->argv[1]->ptr; /* Increment stats for failed PSYNCs, but only if the - * runid is not "?", as this is used by slaves to force a full + * replid is not "?", as this is used by slaves to force a full * resync on purpose when they are not albe to partially * resync. */ - if (master_runid[0] != '?') server.stat_sync_partial_err++; + if (master_replid[0] != '?') server.stat_sync_partial_err++; } } else { /* If a slave uses SYNC, we are dealing with an old implementation @@ -625,6 +695,16 @@ void syncCommand(client *c) { c->flags |= CLIENT_SLAVE; listAddNodeTail(server.slaves,c); + /* Create the replication backlog if needed. */ + if (listLength(server.slaves) == 1 && server.repl_backlog == NULL) { + /* When we create the backlog from scratch, we always use a new + * replication ID and clear the ID2, since there is no valid + * past history. */ + changeReplicationId(); + clearReplicationId2(); + createReplicationBacklog(); + } + /* CASE 1: BGSAVE is in progress, with disk target. */ if (server.rdb_child_pid != -1 && server.rdb_child_type == RDB_CHILD_TYPE_DISK) @@ -685,9 +765,6 @@ void syncCommand(client *c) { } } } - - if (listLength(server.slaves) == 1 && server.repl_backlog == NULL) - createReplicationBacklog(); return; } @@ -735,6 +812,8 @@ void replconfCommand(client *c) { /* Ignore capabilities not understood by this master. */ if (!strcasecmp(c->argv[j+1]->ptr,"eof")) c->slave_capa |= SLAVE_CAPA_EOF; + else if (!strcasecmp(c->argv[j+1]->ptr,"psync2")) + c->slave_capa |= SLAVE_CAPA_PSYNC2; } else if (!strcasecmp(c->argv[j]->ptr,"ack")) { /* REPLCONF ACK is used by slave to inform the master the amount * of replication stream that it processed so far. It is an @@ -928,6 +1007,43 @@ void updateSlavesWaitingBgsave(int bgsaveerr, int type) { if (startbgsave) startBgsaveForReplication(mincapa); } +/* Change the current instance replication ID with a new, random one. + * This will prevent successful PSYNCs between this master and other + * slaves, so the command should be called when something happens that + * alters the current story of the dataset. */ +void changeReplicationId(void) { + getRandomHexChars(server.replid,CONFIG_RUN_ID_SIZE); + server.replid[CONFIG_RUN_ID_SIZE] = '\0'; +} + +/* Clear (invalidate) the secondary replication ID. This happens, for + * example, after a full resynchronization, when we start a new replication + * history. */ +void clearReplicationId2(void) { + memset(server.replid2,'0',sizeof(server.replid)); + server.replid2[CONFIG_RUN_ID_SIZE] = '\0'; + server.second_replid_offset = -1; +} + +/* Use the current replication ID / offset as secondary replication + * ID, and change the current one in order to start a new history. + * This should be used when an instance is switched from slave to master + * so that it can serve PSYNC requests performed using the master + * replication ID. */ +void shiftReplicationId(void) { + memcpy(server.replid2,server.replid,sizeof(server.replid)); + /* We set the second replid offset to the master offset + 1, since + * the slave will ask for the first byte it has not yet received, so + * we need to add one to the offset: for example if, as a slave, we are + * sure we have the same history as the master for 50 bytes, after we + * are turned into a master, we can accept a PSYNC request with offset + * 51, since the slave asking has the same history up to the 50th + * byte, and is asking for the new bytes starting at offset 51. */ + server.second_replid_offset = server.master_repl_offset+1; + changeReplicationId(); + serverLog(LL_WARNING,"Setting secondary replication ID to %s, valid up to offset: %lld. New replication ID is %s", server.replid2, server.second_replid_offset, server.replid); +} + /* ----------------------------------- SLAVE -------------------------------- */ /* Returns 1 if the given replication state is a handshake state, @@ -965,18 +1081,18 @@ void replicationEmptyDbCallback(void *privdata) { /* Once we have a link with the master and the synchroniziation was * performed, this function materializes the master client we store * at server.master, starting from the specified file descriptor. */ -void replicationCreateMasterClient(int fd) { +void replicationCreateMasterClient(int fd, int dbid) { server.master = createClient(fd); server.master->flags |= CLIENT_MASTER; server.master->authenticated = 1; - server.repl_state = REPL_STATE_CONNECTED; - server.master->reploff = server.repl_master_initial_offset; - memcpy(server.master->replrunid, server.repl_master_runid, - sizeof(server.repl_master_runid)); + server.master->reploff = server.master_initial_offset; + memcpy(server.master->replid, server.master_replid, + sizeof(server.master_replid)); /* If master offset is set to -1, this master is old and is not * PSYNC capable, so we flag it accordingly. */ if (server.master->reploff == -1) server.master->flags |= CLIENT_PRE_PSYNC; + if (dbid != -1) selectDb(server.master,dbid); } /* Asynchronously read the SYNC payload we receive from a master */ @@ -1137,7 +1253,8 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) { * time for non blocking loading. */ aeDeleteFileEvent(server.el,server.repl_transfer_s,AE_READABLE); serverLog(LL_NOTICE, "MASTER <-> SLAVE sync: Loading DB in memory"); - if (rdbLoad(server.rdb_filename) != C_OK) { + rdbSaveInfo rsi = RDB_SAVE_INFO_INIT; + if (rdbLoad(server.rdb_filename,&rsi) != C_OK) { serverLog(LL_WARNING,"Failed trying to load the MASTER synchronization DB from disk"); cancelReplicationHandshake(); return; @@ -1145,7 +1262,20 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) { /* Final setup of the connected slave <- master link */ zfree(server.repl_transfer_tmpfile); close(server.repl_transfer_fd); - replicationCreateMasterClient(server.repl_transfer_s); + replicationCreateMasterClient(server.repl_transfer_s,rsi.repl_stream_db); + server.repl_state = REPL_STATE_CONNECTED; + /* After a full resynchroniziation we use the replication ID and + * offset of the master. The secondary ID / offset are cleared since + * we are starting a new history. */ + memcpy(server.replid,server.master->replid,sizeof(server.replid)); + server.master_repl_offset = server.master->reploff; + clearReplicationId2(); + /* Let's create the replication backlog if needed. Slaves need to + * accumulate the backlog regardless of the fact they have sub-slaves + * or not, in order to behave correctly if they are promoted to + * masters after a failover. */ + if (server.repl_backlog == NULL) createReplicationBacklog(); + serverLog(LL_NOTICE, "MASTER <-> SLAVE sync: Finished with success"); /* Restart the AOF subsystem now that we finished the sync. This * will trigger an AOF rewrite, and when done will start appending @@ -1270,7 +1400,7 @@ char *sendSynchronousCommand(int flags, int fd, ...) { * * 1) As a side effect of the function call the function removes the readable * event handler from "fd", unless the return value is PSYNC_WAIT_REPLY. - * 2) server.repl_master_initial_offset is set to the right value according + * 2) server.master_initial_offset is set to the right value according * to the master reply. This will be used to populate the 'server.master' * structure replication offset. */ @@ -1281,31 +1411,31 @@ char *sendSynchronousCommand(int flags, int fd, ...) { #define PSYNC_FULLRESYNC 3 #define PSYNC_NOT_SUPPORTED 4 int slaveTryPartialResynchronization(int fd, int read_reply) { - char *psync_runid; + char *psync_replid; char psync_offset[32]; sds reply; /* Writing half */ if (!read_reply) { - /* Initially set repl_master_initial_offset to -1 to mark the current + /* Initially set master_initial_offset to -1 to mark the current * master run_id and offset as not valid. Later if we'll be able to do * a FULL resync using the PSYNC command we'll set the offset at the * right value, so that this information will be propagated to the * client structure representing the master into server.master. */ - server.repl_master_initial_offset = -1; + server.master_initial_offset = -1; if (server.cached_master) { - psync_runid = server.cached_master->replrunid; + psync_replid = server.cached_master->replid; snprintf(psync_offset,sizeof(psync_offset),"%lld", server.cached_master->reploff+1); - serverLog(LL_NOTICE,"Trying a partial resynchronization (request %s:%s).", psync_runid, psync_offset); + serverLog(LL_NOTICE,"Trying a partial resynchronization (request %s:%s).", psync_replid, psync_offset); } else { serverLog(LL_NOTICE,"Partial resynchronization not possible (no cached master)"); - psync_runid = "?"; + psync_replid = "?"; memcpy(psync_offset,"-1",3); } /* Issue the PSYNC command */ - reply = sendSynchronousCommand(SYNC_CMD_WRITE,fd,"PSYNC",psync_runid,psync_offset,NULL); + reply = sendSynchronousCommand(SYNC_CMD_WRITE,fd,"PSYNC",psync_replid,psync_offset,NULL); if (reply != NULL) { serverLog(LL_WARNING,"Unable to send PSYNC to master: %s",reply); sdsfree(reply); @@ -1327,31 +1457,31 @@ int slaveTryPartialResynchronization(int fd, int read_reply) { aeDeleteFileEvent(server.el,fd,AE_READABLE); if (!strncmp(reply,"+FULLRESYNC",11)) { - char *runid = NULL, *offset = NULL; + char *replid = NULL, *offset = NULL; /* FULL RESYNC, parse the reply in order to extract the run id * and the replication offset. */ - runid = strchr(reply,' '); - if (runid) { - runid++; - offset = strchr(runid,' '); + replid = strchr(reply,' '); + if (replid) { + replid++; + offset = strchr(replid,' '); if (offset) offset++; } - if (!runid || !offset || (offset-runid-1) != CONFIG_RUN_ID_SIZE) { + if (!replid || !offset || (offset-replid-1) != CONFIG_RUN_ID_SIZE) { serverLog(LL_WARNING, "Master replied with wrong +FULLRESYNC syntax."); /* This is an unexpected condition, actually the +FULLRESYNC * reply means that the master supports PSYNC, but the reply * format seems wrong. To stay safe we blank the master - * runid to make sure next PSYNCs will fail. */ - memset(server.repl_master_runid,0,CONFIG_RUN_ID_SIZE+1); + * replid to make sure next PSYNCs will fail. */ + memset(server.master_replid,0,CONFIG_RUN_ID_SIZE+1); } else { - memcpy(server.repl_master_runid, runid, offset-runid-1); - server.repl_master_runid[CONFIG_RUN_ID_SIZE] = '\0'; - server.repl_master_initial_offset = strtoll(offset,NULL,10); + memcpy(server.master_replid, replid, offset-replid-1); + server.master_replid[CONFIG_RUN_ID_SIZE] = '\0'; + server.master_initial_offset = strtoll(offset,NULL,10); serverLog(LL_NOTICE,"Full resync from master: %s:%lld", - server.repl_master_runid, - server.repl_master_initial_offset); + server.master_replid, + server.master_initial_offset); } /* We are going to full resync, discard the cached master structure. */ replicationDiscardCachedMaster(); @@ -1360,9 +1490,40 @@ int slaveTryPartialResynchronization(int fd, int read_reply) { } if (!strncmp(reply,"+CONTINUE",9)) { - /* Partial resync was accepted, set the replication state accordingly */ + /* Partial resync was accepted. */ serverLog(LL_NOTICE, "Successful partial resynchronization with master."); + + /* Check the new replication ID advertised by the master. If it + * changed, we need to set the new ID as primary ID, and set or + * secondary ID as the old master ID up to the current offset, so + * that our sub-slaves will be able to PSYNC with us after a + * disconnection. */ + char *start = reply+10; + char *end = reply+9; + while(end[0] != '\r' && end[0] != '\n' && end[0] != '\0') end++; + if (end-start == CONFIG_RUN_ID_SIZE) { + char new[CONFIG_RUN_ID_SIZE+1]; + memcpy(new,start,CONFIG_RUN_ID_SIZE); + new[CONFIG_RUN_ID_SIZE] = '\0'; + + if (strcmp(new,server.cached_master->replid)) { + /* Master ID changed. */ + serverLog(LL_WARNING,"Master replication ID changed to %s",new); + + /* Set the old ID as our ID2, up to the current offset+1. */ + memcpy(server.replid2,server.cached_master->replid, + sizeof(server.replid2)); + server.second_replid_offset = server.master_repl_offset+1; + + /* Update the cached master ID and our own primary ID to the + * new one. */ + memcpy(server.replid,new,sizeof(server.replid)); + memcpy(server.cached_master->replid,new,sizeof(server.replid)); + } + } + + /* Setup the replication to continue. */ sdsfree(reply); replicationResurrectCachedMaster(fd); return PSYNC_CONTINUE; @@ -1386,6 +1547,8 @@ int slaveTryPartialResynchronization(int fd, int read_reply) { return PSYNC_NOT_SUPPORTED; } +/* This handler fires when the non blocking connect was able to + * establish a connection with the master. */ void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) { char tmpfile[256], *err = NULL; int dfd, maxtries = 5; @@ -1402,7 +1565,8 @@ void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) { return; } - /* Check for errors in the socket. */ + /* Check for errors in the socket: after a non blocking connect() we + * may find that the socket is in error state. */ if (getsockopt(fd, SOL_SOCKET, SO_ERROR, &sockerr, &errlen) == -1) sockerr = errno; if (sockerr) { @@ -1531,13 +1695,15 @@ void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) { server.repl_state = REPL_STATE_SEND_CAPA; } - /* Inform the master of our capabilities. While we currently send - * just one capability, it is possible to chain new capabilities here - * in the form of REPLCONF capa X capa Y capa Z ... + /* Inform the master of our (slave) capabilities. + * + * EOF: supports EOF-style RDB transfer for diskless replication. + * PSYNC2: supports PSYNC v2, so understands +CONTINUE . + * * The master will ignore capabilities it does not understand. */ if (server.repl_state == REPL_STATE_SEND_CAPA) { err = sendSynchronousCommand(SYNC_CMD_WRITE,fd,"REPLCONF", - "capa","eof",NULL); + "capa","eof","capa","psync2",NULL); if (err) goto write_error; sdsfree(err); server.repl_state = REPL_STATE_RECEIVE_CAPA; @@ -1591,14 +1757,14 @@ void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) { } /* PSYNC failed or is not supported: we want our slaves to resync with us - * as well, if we have any (chained replication case). The mater may - * transfer us an entirely different data set and we have no way to - * incrementally feed our slaves after that. */ + * as well, if we have any sub-slaves. The mater may transfer us an + * entirely different data set and we have no way to incrementally feed + * our slaves after that. */ disconnectSlaves(); /* Force our slaves to resync with us as well. */ freeReplicationBacklog(); /* Don't allow our chained slaves to PSYNC. */ /* Fall back to SYNC if needed. Otherwise psync_result == PSYNC_FULLRESYNC - * and the server.repl_master_runid and repl_master_initial_offset are + * and the server.master_replid and master_initial_offset are * already populated. */ if (psync_result == PSYNC_NOT_SUPPORTED) { serverLog(LL_NOTICE,"Retrying with SYNC..."); @@ -1727,15 +1893,23 @@ int cancelReplicationHandshake(void) { /* Set replication to the specified master address and port. */ void replicationSetMaster(char *ip, int port) { + int was_master = server.masterhost == NULL; + sdsfree(server.masterhost); server.masterhost = sdsnew(ip); server.masterport = port; - if (server.master) freeClient(server.master); + if (server.master) { + freeClient(server.master); + } disconnectAllBlockedClients(); /* Clients blocked in master, now slave. */ - disconnectSlaves(); /* Force our slaves to resync with us as well. */ - replicationDiscardCachedMaster(); /* Don't try a PSYNC. */ - freeReplicationBacklog(); /* Don't allow our chained slaves to PSYNC. */ + + /* Force our slaves to resync with us as well. They may hopefully be able + * to partially resync with us, but we can notify the replid change. */ + disconnectSlaves(); cancelReplicationHandshake(); + /* Before destroying our master state, create a cached master using + * our own parameters, to later PSYNC with the new master. */ + if (was_master) replicationCacheMasterUsingMyself(); server.repl_state = REPL_STATE_CONNECT; server.master_repl_offset = 0; server.repl_down_since = 0; @@ -1746,20 +1920,26 @@ void replicationUnsetMaster(void) { if (server.masterhost == NULL) return; /* Nothing to do. */ sdsfree(server.masterhost); server.masterhost = NULL; - if (server.master) { - if (listLength(server.slaves) == 0) { - /* If this instance is turned into a master and there are no - * slaves, it inherits the replication offset from the master. - * Under certain conditions this makes replicas comparable by - * replication offset to understand what is the most updated. */ - server.master_repl_offset = server.master->reploff; - freeReplicationBacklog(); - } - freeClient(server.master); - } + /* When a slave is turned into a master, the current replication ID + * (that was inherited from the master at synchronization time) is + * used as secondary ID up to the current offset, and a new replication + * ID is created to continue with a new replication history. */ + shiftReplicationId(); + if (server.master) freeClient(server.master); replicationDiscardCachedMaster(); cancelReplicationHandshake(); + /* Disconnecting all the slaves is required: we need to inform slaves + * of the replication ID change (see shiftReplicationId() call). However + * the slaves will be able to partially resync with us, so it will be + * a very fast reconnection. */ + disconnectSlaves(); server.repl_state = REPL_STATE_NONE; + + /* We need to make sure the new master will start the replication stream + * with a SELECT statement. This is forced after a full resync, but + * with PSYNC version 2, there is no need for full resync after a + * master switch. */ + server.slaveseldb = -1; } /* This function is called when the slave lose the connection with the @@ -1931,6 +2111,31 @@ void replicationCacheMaster(client *c) { replicationHandleMasterDisconnection(); } +/* This function is called when a master is turend into a slave, in order to + * create from scratch a cached master for the new client, that will allow + * to PSYNC with the slave that was promoted as the new master after a + * failover. + * + * Assuming this instance was previously the master instance of the new master, + * the new master will accept its replication ID, and potentiall also the + * current offset if no data was lost during the failover. So we use our + * current replication ID and offset in order to synthesize a cached master. */ +void replicationCacheMasterUsingMyself(void) { + /* The master client we create can be set to any DBID, because + * the new master will start its replication stream with SELECT. */ + server.master_initial_offset = server.master_repl_offset; + replicationCreateMasterClient(-1,-1); + + /* Use our own ID / offset. */ + memcpy(server.master->replid, server.replid, sizeof(server.replid)); + + /* Set as cached master. */ + unlinkClient(server.master); + server.cached_master = server.master; + server.master = NULL; + serverLog(LL_NOTICE,"Before turning into a slave, using my master parameters to synthesize a cached master: I may be able to synchronize with the new master with just a partial transfer."); +} + /* Free a cached master, called when there are no longer the conditions for * a partial resync on reconnection. */ void replicationDiscardCachedMaster(void) { @@ -2290,7 +2495,9 @@ void replicationCron(void) { robj *ping_argv[1]; /* First, send PING according to ping_slave_period. */ - if ((replication_cron_loops % server.repl_ping_slave_period) == 0) { + if ((replication_cron_loops % server.repl_ping_slave_period) == 0 && + listLength(server.slaves)) + { ping_argv[0] = createStringObject("PING",4); replicationFeedSlaves(server.slaves, server.slaveseldb, ping_argv, 1); @@ -2299,20 +2506,32 @@ void replicationCron(void) { /* Second, send a newline to all the slaves in pre-synchronization * stage, that is, slaves waiting for the master to create the RDB file. + * + * Also send the a newline to all the chained slaves we have, if we lost + * connection from our master, to keep the slaves aware that their + * master is online. This is needed since sub-slaves only receive proxied + * data from top-level masters, so there is no explicit pinging in order + * to avoid altering the replication offsets. This special out of band + * pings (newlines) can be sent, they will have no effect in the offset. + * * The newline will be ignored by the slave but will refresh the - * last-io timer preventing a timeout. In this case we ignore the + * last interaction timer preventing a timeout. In this case we ignore the * ping period and refresh the connection once per second since certain * timeouts are set at a few seconds (example: PSYNC response). */ listRewind(server.slaves,&li); while((ln = listNext(&li))) { client *slave = ln->value; - if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START || + int is_presync = + (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START || (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END && - server.rdb_child_type != RDB_CHILD_TYPE_SOCKET)) - { + server.rdb_child_type != RDB_CHILD_TYPE_SOCKET)); + int is_subslave = server.masterhost && server.master == NULL && + slave->replstate == SLAVE_STATE_ONLINE; + + if (is_presync || is_subslave) { if (write(slave->fd, "\n", 1) == -1) { - /* Don't worry, it's just a ping. */ + /* Don't worry about socket errors, it's just a ping. */ } } } @@ -2337,10 +2556,14 @@ void replicationCron(void) { } } - /* If we have no attached slaves and there is a replication backlog - * using memory, free it after some (configured) time. */ + /* If this is a master without attached slaves and there is a replication + * backlog active, in order to reclaim memory we can free it after some + * (configured) time. Note that this cannot be done for slaves: slaves + * without sub-slaves attached should still accumulate data into the + * backlog, in order to reply to PSYNC queries if they are turned into + * masters after a failover. */ if (listLength(server.slaves) == 0 && server.repl_backlog_time_limit && - server.repl_backlog) + server.repl_backlog && server.masterhost == NULL) { time_t idle = server.unixtime - server.repl_no_slaves_since; diff --git a/src/server.c b/src/server.c index 7e9b962b3..d17ded9b0 100644 --- a/src/server.c +++ b/src/server.c @@ -1079,7 +1079,7 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { { serverLog(LL_NOTICE,"%d changes in %d seconds. Saving...", sp->changes, (int)sp->seconds); - rdbSaveBackground(server.rdb_filename); + rdbSaveBackground(server.rdb_filename,NULL); break; } } @@ -1151,7 +1151,7 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { (server.unixtime-server.lastbgsave_try > CONFIG_BGSAVE_RETRY_DELAY || server.lastbgsave_status == C_OK)) { - if (rdbSaveBackground(server.rdb_filename) == C_OK) + if (rdbSaveBackground(server.rdb_filename,NULL) == C_OK) server.rdb_bgsave_scheduled = 0; } @@ -1309,10 +1309,11 @@ void initServerConfig(void) { int j; getRandomHexChars(server.runid,CONFIG_RUN_ID_SIZE); + server.runid[CONFIG_RUN_ID_SIZE] = '\0'; + changeReplicationId(); server.configfile = NULL; server.executable = NULL; server.hz = CONFIG_DEFAULT_HZ; - server.runid[CONFIG_RUN_ID_SIZE] = '\0'; server.arch_bits = (sizeof(long) == 8) ? 64 : 32; server.port = CONFIG_DEFAULT_SERVER_PORT; server.tcp_backlog = CONFIG_DEFAULT_TCP_BACKLOG; @@ -1409,7 +1410,7 @@ void initServerConfig(void) { server.masterport = 6379; server.master = NULL; server.cached_master = NULL; - server.repl_master_initial_offset = -1; + server.master_initial_offset = -1; server.repl_state = REPL_STATE_NONE; server.repl_syncio_timeout = CONFIG_REPL_SYNCIO_TIMEOUT; server.repl_serve_stale_data = CONFIG_DEFAULT_SLAVE_SERVE_STALE_DATA; @@ -2471,7 +2472,7 @@ int prepareForShutdown(int flags) { if ((server.saveparamslen > 0 && !nosave) || save) { serverLog(LL_NOTICE,"Saving the final RDB snapshot before exiting."); /* Snapshotting. Perform a SYNC SAVE and exit */ - if (rdbSave(server.rdb_filename) != C_OK) { + if (rdbSave(server.rdb_filename,NULL) != C_OK) { /* Ooops.. error saving! The best we can do is to continue * operating. Note that if there was a background saving process, * in the next cron() Redis will be notified that the background @@ -3135,12 +3136,18 @@ sds genRedisInfoString(char *section) { } } info = sdscatprintf(info, + "master_replid:%s\r\n" + "master_replid2:%s\r\n" "master_repl_offset:%lld\r\n" + "second_repl_offset:%lld\r\n" "repl_backlog_active:%d\r\n" "repl_backlog_size:%lld\r\n" "repl_backlog_first_byte_offset:%lld\r\n" "repl_backlog_histlen:%lld\r\n", + server.replid, + server.replid2, server.master_repl_offset, + server.second_replid_offset, server.repl_backlog != NULL, server.repl_backlog_size, server.repl_backlog_off, @@ -3416,9 +3423,20 @@ void loadDataFromDisk(void) { if (loadAppendOnlyFile(server.aof_filename) == C_OK) serverLog(LL_NOTICE,"DB loaded from append only file: %.3f seconds",(float)(ustime()-start)/1000000); } else { - if (rdbLoad(server.rdb_filename) == C_OK) { + rdbSaveInfo rsi = RDB_SAVE_INFO_INIT; + if (rdbLoad(server.rdb_filename,&rsi) == C_OK) { serverLog(LL_NOTICE,"DB loaded from disk: %.3f seconds", (float)(ustime()-start)/1000000); + + /* Restore the replication ID / offset from the RDB file. */ + if (rsi.repl_id_is_set && rsi.repl_offset != -1) { + memcpy(server.replid,rsi.repl_id,sizeof(server.replid)); + server.master_repl_offset = rsi.repl_offset; + /* If we are a slave, create a cached master from this + * information, in order to allow partial resynchronizations + * with masters. */ + if (server.masterhost) replicationCacheMasterUsingMyself(); + } } else if (errno != ENOENT) { serverLog(LL_WARNING,"Fatal error loading the DB: %s. Exiting.",strerror(errno)); exit(1); diff --git a/src/server.h b/src/server.h index b5dbaf0a5..b7f909933 100644 --- a/src/server.h +++ b/src/server.h @@ -293,7 +293,8 @@ typedef long long mstime_t; /* millisecond time type. */ /* Slave capabilities. */ #define SLAVE_CAPA_NONE 0 -#define SLAVE_CAPA_EOF (1<<0) /* Can parse the RDB EOF streaming format. */ +#define SLAVE_CAPA_EOF (1<<0) /* Can parse the RDB EOF streaming format. */ +#define SLAVE_CAPA_PSYNC2 (1<<1) /* Supports PSYNC2 protocol. */ /* Synchronous read timeout - slave side */ #define CONFIG_REPL_SYNCIO_TIMEOUT 5 @@ -679,8 +680,8 @@ typedef struct client { long long psync_initial_offset; /* FULLRESYNC reply offset other slaves copying this slave output buffer should use. */ - char replrunid[CONFIG_RUN_ID_SIZE+1]; /* Master run id if is a master. */ - int slave_listening_port; /* As configured with: REPLCONF listening-port */ + char replid[CONFIG_RUN_ID_SIZE+1]; /* Master replication ID (if master). */ + int slave_listening_port; /* As configured with: SLAVECONF listening-port */ char slave_ip[NET_IP_STR_LEN]; /* Optionally given by REPLCONF ip-address */ int slave_capa; /* Slave capabilities: SLAVE_CAPA_* bitwise OR. */ multiState mstate; /* MULTI/EXEC state */ @@ -803,6 +804,26 @@ struct redisMemOverhead { } *db; }; +/* This structure can be optionally passed to RDB save/load functions in + * order to implement additional functionalities, by storing and loading + * metadata to the RDB file. + * + * Currently the only use is to select a DB at load time, useful in + * replication in order to make sure that chained slaves (slaves of slaves) + * select the correct DB and are able to accept the stream coming from the + * top-level master. */ +typedef struct rdbSaveInfo { + /* Used saving and loading. */ + int repl_stream_db; /* DB to select in server.master client. */ + + /* Used only loading. */ + int repl_id_is_set; /* True if repl_id field is set. */ + char repl_id[CONFIG_RUN_ID_SIZE+1]; /* Replication ID. */ + long long repl_offset; /* Replication offset. */ +} rdbSaveInfo; + +#define RDB_SAVE_INFO_INIT {-1,0,"000000000000000000000000000000",-1} + /*----------------------------------------------------------------------------- * Global server state *----------------------------------------------------------------------------*/ @@ -988,15 +1009,19 @@ struct redisServer { char *syslog_ident; /* Syslog ident */ int syslog_facility; /* Syslog facility */ /* Replication (master) */ + char replid[CONFIG_RUN_ID_SIZE+1]; /* My current replication ID. */ + char replid2[CONFIG_RUN_ID_SIZE+1]; /* replid inherited from master*/ + long long master_repl_offset; /* My current replication offset */ + long long second_replid_offset; /* Accept offsets up to this for replid2. */ int slaveseldb; /* Last SELECTed DB in replication output */ - long long master_repl_offset; /* Global replication offset */ int repl_ping_slave_period; /* Master pings the slave every N seconds */ char *repl_backlog; /* Replication backlog for partial syncs */ long long repl_backlog_size; /* Backlog circular buffer size */ long long repl_backlog_histlen; /* Backlog actual data length */ - long long repl_backlog_idx; /* Backlog circular buffer current offset */ - long long repl_backlog_off; /* Replication offset of first byte in the - backlog buffer. */ + long long repl_backlog_idx; /* Backlog circular buffer current offset, + that is the next byte will'll write to.*/ + long long repl_backlog_off; /* Replication "master offset" of first + byte in the replication backlog buffer.*/ time_t repl_backlog_time_limit; /* Time without slaves after the backlog gets released. */ time_t repl_no_slaves_since; /* We have no slaves since that time. @@ -1029,8 +1054,11 @@ struct redisServer { int slave_priority; /* Reported in INFO and used by Sentinel. */ int slave_announce_port; /* Give the master this listening port. */ char *slave_announce_ip; /* Give the master this ip address. */ - char repl_master_runid[CONFIG_RUN_ID_SIZE+1]; /* Master run id for PSYNC.*/ - long long repl_master_initial_offset; /* Master PSYNC offset. */ + /* The following two fields is where we store master PSYNC replid/offset + * while the PSYNC is in progress. At the end we'll copy the fields into + * the server->master client structure. */ + char master_replid[CONFIG_RUN_ID_SIZE+1]; /* Master PSYNC runid. */ + long long master_initial_offset; /* Master PSYNC offset. */ int repl_slave_lazy_flush; /* Lazy FLUSHALL before loading DB? */ /* Replication script cache. */ dict *repl_scriptcache_dict; /* SHA1 all slaves are aware of. */ @@ -1259,6 +1287,7 @@ void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask); void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask); void acceptUnixHandler(aeEventLoop *el, int fd, void *privdata, int mask); void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask); +void addReplyString(client *c, const char *s, size_t len); void addReplyBulk(client *c, robj *obj); void addReplyBulkCString(client *c, const char *s); void addReplyBulkCBuffer(client *c, const void *p, size_t len); @@ -1393,6 +1422,7 @@ ssize_t syncReadLine(int fd, char *ptr, ssize_t size, long long timeout); /* Replication */ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc); +void replicationFeedSlavesFromMasterStream(list *slaves, char *buf, size_t buflen); void replicationFeedMonitors(client *c, list *monitors, int dictid, robj **argv, int argc); void updateSlavesWaitingBgsave(int bgsaveerr, int type); void replicationCron(void); @@ -1414,6 +1444,10 @@ long long replicationGetSlaveOffset(void); char *replicationGetSlaveName(client *c); long long getPsyncInitialOffset(void); int replicationSetupSlaveForFullResync(client *slave, long long offset); +void changeReplicationId(void); +void clearReplicationId2(void); +void chopReplicationBacklog(void); +void replicationCacheMasterUsingMyself(void); /* Generic persistence functions */ void startLoading(FILE *fp); @@ -1422,7 +1456,7 @@ void stopLoading(void); /* RDB persistence */ #include "rdb.h" -int rdbSaveRio(rio *rdb, int *error, int flags); +int rdbSaveRio(rio *rdb, int *error, int flags, rdbSaveInfo *rsi); /* AOF persistence */ void flushAppendOnlyFile(int force);