2012-11-08 12:25:23 -05:00
/* Asynchronous replication implementation.
*
* Copyright ( c ) 2009 - 2012 , Salvatore Sanfilippo < antirez at gmail dot com >
* All rights reserved .
*
* Redistribution and use in source and binary forms , with or without
* modification , are permitted provided that the following conditions are met :
*
* * Redistributions of source code must retain the above copyright notice ,
* this list of conditions and the following disclaimer .
* * Redistributions in binary form must reproduce the above copyright
* notice , this list of conditions and the following disclaimer in the
* documentation and / or other materials provided with the distribution .
* * Neither the name of Redis nor the names of its contributors may be used
* to endorse or promote products derived from this software without
* specific prior written permission .
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS " AS IS "
* AND ANY EXPRESS OR IMPLIED WARRANTIES , INCLUDING , BUT NOT LIMITED TO , THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED . IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT , INDIRECT , INCIDENTAL , SPECIAL , EXEMPLARY , OR
* CONSEQUENTIAL DAMAGES ( INCLUDING , BUT NOT LIMITED TO , PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES ; LOSS OF USE , DATA , OR PROFITS ; OR BUSINESS
* INTERRUPTION ) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY , WHETHER IN
* CONTRACT , STRICT LIABILITY , OR TORT ( INCLUDING NEGLIGENCE OR OTHERWISE )
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE , EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE .
*/
2010-06-21 18:07:48 -04:00
# include "redis.h"
# include <sys/time.h>
# include <unistd.h>
# include <fcntl.h>
2012-09-17 06:45:57 -04:00
# include <sys/socket.h>
2010-06-21 18:07:48 -04:00
# include <sys/stat.h>
2013-01-30 12:33:16 -05:00
void replicationDiscardCachedMaster ( void ) ;
void replicationResurrectCachedMaster ( int newfd ) ;
2010-11-04 12:29:53 -04:00
/* ---------------------------------- MASTER -------------------------------- */
2013-01-30 12:33:16 -05:00
void createReplicationBacklog ( void ) {
redisAssert ( server . repl_backlog = = NULL ) ;
server . repl_backlog = zmalloc ( server . repl_backlog_size ) ;
server . repl_backlog_histlen = 0 ;
server . repl_backlog_idx = 0 ;
/* When a new backlog buffer is created, we increment the replication
* offset by one to make sure we ' ll not be able to PSYNC with any
* previous slave . This is needed because we avoid incrementing the
* master_repl_offset if no backlog exists nor slaves are attached . */
server . master_repl_offset + + ;
/* We don't have any data inside our buffer, but virtually the first
* byte we have is the next byte that will be generated for the
* replication stream . */
server . repl_backlog_off = server . master_repl_offset + 1 ;
}
/* This function is called when the user modifies the replication backlog
* size at runtime . It is up to the function to both update the
* server . repl_backlog_size and to resize the buffer and setup it so that
* it contains the same data as the previous one ( possibly less data , but
* the most recent bytes , or the same data and more free space in case the
* buffer is enlarged ) . */
void resizeReplicationBacklog ( long long newsize ) {
if ( newsize < REDIS_REPL_BACKLOG_MIN_SIZE )
newsize = REDIS_REPL_BACKLOG_MIN_SIZE ;
if ( server . repl_backlog_size = = newsize ) return ;
server . repl_backlog_size = newsize ;
if ( server . repl_backlog ! = NULL ) {
/* What we actually do is to flush the old buffer and realloc a new
* empty one . It will refill with new data incrementally .
* The reason is that copying a few gigabytes adds latency and even
* worse often we need to alloc additional space before freeing the
* old buffer . */
zfree ( server . repl_backlog ) ;
server . repl_backlog = zmalloc ( server . repl_backlog_size ) ;
server . repl_backlog_histlen = 0 ;
server . repl_backlog_idx = 0 ;
/* Next byte we have is... the next since the buffer is emtpy. */
server . repl_backlog_off = server . master_repl_offset + 1 ;
}
}
void freeReplicationBacklog ( void ) {
2013-01-31 10:33:26 -05:00
redisAssert ( listLength ( server . slaves ) = = 0 ) ;
2013-01-30 12:33:16 -05:00
zfree ( server . repl_backlog ) ;
server . repl_backlog = NULL ;
}
/* Add data to the replication backlog.
* This function also increments the global replication offset stored at
* server . master_repl_offset , because there is no case where we want to feed
* the backlog without incrementing the buffer . */
void feedReplicationBacklog ( void * ptr , size_t len ) {
unsigned char * p = ptr ;
server . master_repl_offset + = len ;
/* This is a circular buffer, so write as much data we can at every
* iteration and rewind the " idx " index if we reach the limit . */
while ( len ) {
size_t thislen = server . repl_backlog_size - server . repl_backlog_idx ;
if ( thislen > len ) thislen = len ;
memcpy ( server . repl_backlog + server . repl_backlog_idx , p , thislen ) ;
server . repl_backlog_idx + = thislen ;
if ( server . repl_backlog_idx = = server . repl_backlog_size )
server . repl_backlog_idx = 0 ;
len - = thislen ;
p + = thislen ;
server . repl_backlog_histlen + = thislen ;
}
if ( server . repl_backlog_histlen > server . repl_backlog_size )
server . repl_backlog_histlen = server . repl_backlog_size ;
/* Set the offset of the first byte we have in the backlog. */
server . repl_backlog_off = server . master_repl_offset -
server . repl_backlog_histlen + 1 ;
}
/* Wrapper for feedReplicationBacklog() that takes Redis string objects
* as input . */
void feedReplicationBacklogWithObject ( robj * o ) {
char llstr [ REDIS_LONGSTR_SIZE ] ;
void * p ;
size_t len ;
if ( o - > encoding = = REDIS_ENCODING_INT ) {
len = ll2string ( llstr , sizeof ( llstr ) , ( long ) o - > ptr ) ;
p = llstr ;
} else {
len = sdslen ( o - > ptr ) ;
p = o - > ptr ;
}
feedReplicationBacklog ( p , len ) ;
}
# define FEEDSLAVE_BUF_SIZE (1024*64)
2010-06-21 18:07:48 -04:00
void replicationFeedSlaves ( list * slaves , int dictid , robj * * argv , int argc ) {
listNode * ln ;
listIter li ;
2013-01-30 12:33:16 -05:00
int j , i , len ;
char buf [ FEEDSLAVE_BUF_SIZE ] , * b = buf ;
char llstr [ REDIS_LONGSTR_SIZE ] ;
int buf_left = FEEDSLAVE_BUF_SIZE ;
robj * o ;
/* If there aren't slaves, and there is no backlog buffer to populate,
* we can return ASAP . */
if ( server . repl_backlog = = NULL & & listLength ( slaves ) = = 0 ) return ;
/* We can't have slaves attached and no backlog. */
redisAssert ( ! ( listLength ( slaves ) ! = 0 & & server . repl_backlog = = NULL ) ) ;
/* What we do here is to try to write as much data as possible in a static
* buffer " buf " that is used to create an object that is later sent to all
* the slaves . This way we do the decoding only one time for most commands
* not containing big payloads . */
/* Create the SELECT command into the static buffer if needed. */
if ( server . slaveseldb ! = dictid ) {
char * selectcmd ;
size_t sclen ;
if ( dictid > = 0 & & dictid < REDIS_SHARED_SELECT_CMDS ) {
selectcmd = shared . select [ dictid ] - > ptr ;
sclen = sdslen ( selectcmd ) ;
memcpy ( b , selectcmd , sclen ) ;
b + = sclen ;
buf_left - = sclen ;
} else {
int dictid_len ;
dictid_len = ll2string ( llstr , sizeof ( llstr ) , dictid ) ;
sclen = snprintf ( b , buf_left , " *2 \r \n $6 \r \n SELECT \r \n $%d \r \n %s \r \n " ,
dictid_len , llstr ) ;
b + = sclen ;
buf_left - = sclen ;
}
}
server . slaveseldb = dictid ;
/* Add the multi bulk reply size to the static buffer, that is, the number
* of arguments of the command to send to every slave . */
b [ 0 ] = ' * ' ;
len = ll2string ( b + 1 , REDIS_LONGSTR_SIZE , argc ) ;
b + = len + 1 ;
buf_left - = len ;
b [ 0 ] = ' \r ' ;
b [ 1 ] = ' \n ' ;
b + = 2 ;
buf_left - = 2 ;
/* Try to use the static buffer for as much arguments is possible. */
for ( j = 0 ; j < argc ; j + + ) {
int objlen ;
char * objptr ;
if ( argv [ j ] - > encoding ! = REDIS_ENCODING_RAW & &
argv [ j ] - > encoding ! = REDIS_ENCODING_INT ) {
redisPanic ( " Unexpected encoding " ) ;
}
if ( argv [ j ] - > encoding = = REDIS_ENCODING_RAW ) {
objlen = sdslen ( argv [ j ] - > ptr ) ;
objptr = argv [ j ] - > ptr ;
} else {
objlen = ll2string ( llstr , REDIS_LONGSTR_SIZE , ( long ) argv [ j ] - > ptr ) ;
objptr = llstr ;
}
/* We need enough space for bulk reply encoding, newlines, and
* the data itself . */
if ( buf_left < objlen + REDIS_LONGSTR_SIZE + 32 ) break ;
/* Write $...CRLF */
b [ 0 ] = ' $ ' ;
len = ll2string ( b + 1 , REDIS_LONGSTR_SIZE , objlen ) ;
b + = len + 1 ;
buf_left - = len ;
b [ 0 ] = ' \r ' ;
b [ 1 ] = ' \n ' ;
b + = 2 ;
buf_left - = 2 ;
/* And data plus CRLF */
memcpy ( b , objptr , objlen ) ;
b + = objlen ;
buf_left - = objlen ;
b [ 0 ] = ' \r ' ;
b [ 1 ] = ' \n ' ;
b + = 2 ;
buf_left - = 2 ;
}
/* Create an object with the static buffer content. */
redisAssert ( buf_left < FEEDSLAVE_BUF_SIZE ) ;
o = createStringObject ( buf , b - buf ) ;
/* If we have a backlog, populate it with data and increment
* the global replication offset . */
if ( server . repl_backlog ) {
feedReplicationBacklogWithObject ( o ) ;
for ( i = j ; i < argc ; i + + ) {
char aux [ REDIS_LONGSTR_SIZE + 3 ] ;
long objlen = stringObjectLen ( argv [ i ] ) ;
/* We need to feed the buffer with the object as a bulk reply
* not just as a plain string , so create the $ . . CRLF payload len
* ad add the final CRLF */
aux [ 0 ] = ' $ ' ;
len = ll2string ( aux + 1 , objlen , sizeof ( aux ) - 1 ) ;
aux [ len + 1 ] = ' \r ' ;
aux [ len + 2 ] = ' \n ' ;
feedReplicationBacklog ( aux , len + 3 ) ;
feedReplicationBacklogWithObject ( argv [ j ] ) ;
feedReplicationBacklogWithObject ( shared . crlf ) ;
}
}
2010-06-21 18:07:48 -04:00
2013-01-30 12:33:16 -05:00
/* Write data to slaves. Here we do two things:
* 1 ) We write the " o " object that was created using the accumulated
* static buffer .
* 2 ) We write any additional argument of the command to replicate that
* was not written inside the static buffer for lack of space .
*/
2010-06-21 18:07:48 -04:00
listRewind ( slaves , & li ) ;
while ( ( ln = listNext ( & li ) ) ) {
redisClient * slave = ln - > value ;
/* Don't feed slaves that are still waiting for BGSAVE to start */
if ( slave - > replstate = = REDIS_REPL_WAIT_BGSAVE_START ) continue ;
2011-05-29 20:55:13 -04:00
/* Feed slaves that are waiting for the initial SYNC (so these commands
2013-01-16 12:00:20 -05:00
* are queued in the output buffer until the initial SYNC completes ) ,
2011-05-29 20:55:13 -04:00
* or are already in sync with the master . */
2013-01-30 12:33:16 -05:00
/* First, trasmit the object created from the static buffer. */
addReply ( slave , o ) ;
/* Finally any additional argument that was not stored inside the
* static buffer if any ( from j to argc ) . */
for ( i = j ; i < argc ; i + + )
addReplyBulk ( slave , argv [ i ] ) ;
2010-06-21 18:07:48 -04:00
}
2013-01-30 12:33:16 -05:00
decrRefCount ( o ) ;
2010-06-21 18:07:48 -04:00
}
2012-03-07 06:12:15 -05:00
void replicationFeedMonitors ( redisClient * c , list * monitors , int dictid , robj * * argv , int argc ) {
2010-06-21 18:07:48 -04:00
listNode * ln ;
listIter li ;
2012-03-07 06:12:15 -05:00
int j , port ;
2010-06-21 18:07:48 -04:00
sds cmdrepr = sdsnew ( " + " ) ;
robj * cmdobj ;
2012-03-07 06:12:15 -05:00
char ip [ 32 ] ;
2010-06-21 18:07:48 -04:00
struct timeval tv ;
gettimeofday ( & tv , NULL ) ;
2010-12-14 11:39:34 -05:00
cmdrepr = sdscatprintf ( cmdrepr , " %ld.%06ld " , ( long ) tv . tv_sec , ( long ) tv . tv_usec ) ;
2012-03-07 06:12:15 -05:00
if ( c - > flags & REDIS_LUA_CLIENT ) {
2012-11-01 17:10:45 -04:00
cmdrepr = sdscatprintf ( cmdrepr , " [%d lua] " , dictid ) ;
} else if ( c - > flags & REDIS_UNIX_SOCKET ) {
cmdrepr = sdscatprintf ( cmdrepr , " [%d unix:%s] " , dictid , server . unixsocket ) ;
2012-03-07 06:12:15 -05:00
} else {
anetPeerToString ( c - > fd , ip , & port ) ;
2012-11-01 17:10:45 -04:00
cmdrepr = sdscatprintf ( cmdrepr , " [%d %s:%d] " , dictid , ip , port ) ;
2012-03-07 06:12:15 -05:00
}
2010-06-21 18:07:48 -04:00
for ( j = 0 ; j < argc ; j + + ) {
if ( argv [ j ] - > encoding = = REDIS_ENCODING_INT ) {
2010-07-01 14:22:46 -04:00
cmdrepr = sdscatprintf ( cmdrepr , " \" %ld \" " , ( long ) argv [ j ] - > ptr ) ;
2010-06-21 18:07:48 -04:00
} else {
cmdrepr = sdscatrepr ( cmdrepr , ( char * ) argv [ j ] - > ptr ,
sdslen ( argv [ j ] - > ptr ) ) ;
}
if ( j ! = argc - 1 )
cmdrepr = sdscatlen ( cmdrepr , " " , 1 ) ;
}
cmdrepr = sdscatlen ( cmdrepr , " \r \n " , 2 ) ;
cmdobj = createObject ( REDIS_STRING , cmdrepr ) ;
listRewind ( monitors , & li ) ;
while ( ( ln = listNext ( & li ) ) ) {
redisClient * monitor = ln - > value ;
addReply ( monitor , cmdobj ) ;
}
decrRefCount ( cmdobj ) ;
}
2013-01-30 12:33:16 -05:00
/* Feed the slave 'c' with the replication backlog starting from the
* specified ' offset ' up to the end of the backlog . */
long long addReplyReplicationBacklog ( redisClient * c , long long offset ) {
long long j , skip , len ;
2013-02-09 10:33:57 -05:00
redisLog ( REDIS_DEBUG , " [PSYNC] Slave request offset: %lld " , offset ) ;
2013-01-30 12:33:16 -05:00
if ( server . repl_backlog_histlen = = 0 ) {
2013-02-09 10:33:57 -05:00
redisLog ( REDIS_DEBUG , " [PSYNC] Backlog history len is zero " ) ;
2013-01-30 12:33:16 -05:00
return 0 ;
}
2013-02-09 10:33:57 -05:00
redisLog ( REDIS_DEBUG , " [PSYNC] Backlog size: %lld " ,
server . repl_backlog_size ) ;
redisLog ( REDIS_DEBUG , " [PSYNC] First byte: %lld " ,
server . repl_backlog_off ) ;
redisLog ( REDIS_DEBUG , " [PSYNC] History len: %lld " ,
server . repl_backlog_histlen ) ;
redisLog ( REDIS_DEBUG , " [PSYNC] Current index: %lld " ,
server . repl_backlog_idx ) ;
2013-01-30 12:33:16 -05:00
/* Compute the amount of bytes we need to discard. */
skip = offset - server . repl_backlog_off ;
2013-02-09 10:33:57 -05:00
redisLog ( REDIS_DEBUG , " [PSYNC] Skipping: %lld " , skip ) ;
2013-01-30 12:33:16 -05:00
/* Point j to the oldest byte, that is actaully our
* server . repl_backlog_off byte . */
j = ( server . repl_backlog_idx +
( server . repl_backlog_size - server . repl_backlog_histlen ) ) %
server . repl_backlog_size ;
2013-02-09 10:33:57 -05:00
redisLog ( REDIS_DEBUG , " [PSYNC] Index of first byte: %lld " , j ) ;
2013-01-30 12:33:16 -05:00
/* Discard the amount of data to seek to the specified 'offset'. */
j = ( j + skip ) % server . repl_backlog_size ;
/* Feed slave with data. Since it is a circular buffer we have to
* split the reply in two parts if we are cross - boundary . */
len = server . repl_backlog_histlen - skip ;
2013-02-09 10:33:57 -05:00
redisLog ( REDIS_DEBUG , " [PSYNC] Reply total length: %lld " , len ) ;
2013-01-30 12:33:16 -05:00
while ( len ) {
long long thislen =
( ( server . repl_backlog_size - j ) < len ) ?
( server . repl_backlog_size - j ) : len ;
2013-02-09 10:33:57 -05:00
redisLog ( REDIS_DEBUG , " [PSYNC] addReply() length: %lld " , thislen ) ;
2013-01-30 12:33:16 -05:00
addReplySds ( c , sdsnewlen ( server . repl_backlog + j , thislen ) ) ;
len - = thislen ;
j = 0 ;
}
return server . repl_backlog_histlen - skip ;
}
/* This function handles the PSYNC command from the point of view of a
* master receiving a request for partial resynchronization .
*
* On success return REDIS_OK , otherwise REDIS_ERR is returned and we proceed
* with the usual full resync . */
int masterTryPartialResynchronization ( redisClient * c ) {
long long psync_offset , psync_len ;
char * master_runid = c - > argv [ 1 ] - > ptr ;
2013-02-01 09:16:56 -05:00
char buf [ 128 ] ;
int buflen ;
2013-01-30 12:33:16 -05:00
/* Is the runid of this master the same advertised by the wannabe slave
* via PSYNC ? If runid changed this master is a different instance and
* there is no way to continue . */
if ( strcasecmp ( master_runid , server . runid ) ) {
/* Run id "?" is used by slaves that want to force a full resync. */
if ( master_runid [ 0 ] ! = ' ? ' ) {
redisLog ( REDIS_NOTICE , " Partial resynchronization not accepted: "
" Runid mismatch (Client asked for '%s', I'm '%s') " ,
master_runid , server . runid ) ;
} else {
redisLog ( REDIS_NOTICE , " Full resync requested by slave. " ) ;
}
goto need_full_resync ;
}
/* We still have the data our slave is asking for? */
if ( getLongLongFromObjectOrReply ( c , c - > argv [ 2 ] , & psync_offset , NULL ) ! =
REDIS_OK ) goto need_full_resync ;
if ( ! server . repl_backlog | |
psync_offset < server . repl_backlog_off | |
psync_offset > = ( server . repl_backlog_off + server . repl_backlog_size ) )
{
redisLog ( REDIS_NOTICE ,
" Unable to partial resync with the slave for lack of backlog (Slave request was: %lld). " , psync_offset ) ;
goto need_full_resync ;
}
/* If we reached this point, we are able to perform a partial resync:
* 1 ) Set client state to make it a slave .
* 2 ) Inform the client we can continue with + CONTINUE
* 3 ) Send the backlog data ( from the offset to the end ) to the slave . */
c - > flags | = REDIS_SLAVE ;
c - > replstate = REDIS_REPL_ONLINE ;
2013-05-27 05:17:17 -04:00
c - > repl_ack_time = server . unixtime ;
2013-01-30 12:33:16 -05:00
listAddNodeTail ( server . slaves , c ) ;
2013-02-01 09:16:56 -05:00
/* We can't use the connection buffers since they are used to accumulate
* new commands at this stage . But we are sure the socket send buffer is
* emtpy so this write will never fail actually . */
buflen = snprintf ( buf , sizeof ( buf ) , " +CONTINUE \r \n " ) ;
if ( write ( c - > fd , buf , buflen ) ! = buflen ) {
freeClientAsync ( c ) ;
return REDIS_OK ;
}
2013-01-30 12:33:16 -05:00
psync_len = addReplyReplicationBacklog ( c , psync_offset ) ;
redisLog ( REDIS_NOTICE ,
" Partial resynchronization request accepted. Sending %lld bytes of backlog starting from offset %lld. " , psync_len , psync_offset ) ;
/* Note that we don't need to set the selected DB at server.slaveseldb
* to - 1 to force the master to emit SELECT , since the slave already
* has this state from the previous connection with the master . */
return REDIS_OK ; /* The caller can return, no full resync needed. */
need_full_resync :
/* We need a full resync for some reason... notify the client. */
psync_offset = server . master_repl_offset ;
/* Add 1 to psync_offset if it the replication backlog does not exists
* as when it will be created later we ' ll increment the offset by one . */
if ( server . repl_backlog = = NULL ) psync_offset + + ;
2013-02-01 09:16:56 -05:00
/* Again, we can't use the connection buffers (see above). */
buflen = snprintf ( buf , sizeof ( buf ) , " +FULLRESYNC %s %lld \r \n " ,
server . runid , psync_offset ) ;
if ( write ( c - > fd , buf , buflen ) ! = buflen ) {
freeClientAsync ( c ) ;
return REDIS_OK ;
}
2013-01-30 12:33:16 -05:00
return REDIS_ERR ;
}
/* SYNC ad PSYNC command implemenation. */
2010-06-21 18:07:48 -04:00
void syncCommand ( redisClient * c ) {
2013-01-16 12:00:20 -05:00
/* ignore SYNC if already slave or in monitor mode */
2010-06-21 18:07:48 -04:00
if ( c - > flags & REDIS_SLAVE ) return ;
2010-08-24 10:04:13 -04:00
/* Refuse SYNC requests if we are a slave but the link with our master
* is not ok . . . */
2011-12-21 06:23:18 -05:00
if ( server . masterhost & & server . repl_state ! = REDIS_REPL_CONNECTED ) {
2010-09-02 13:52:24 -04:00
addReplyError ( c , " Can't SYNC while not connected with my master " ) ;
2010-08-24 10:04:13 -04:00
return ;
}
2010-06-21 18:07:48 -04:00
/* SYNC can't be issued when the server has pending data to send to
* the client about already issued commands . We need a fresh reply
* buffer registering the differences between the BGSAVE and the current
* dataset , so that we can copy to other slaves if needed . */
2013-02-01 09:05:43 -05:00
if ( listLength ( c - > reply ) ! = 0 | | c - > bufpos ! = 0 ) {
addReplyError ( c , " SYNC and PSYNC are invalid with pending output " ) ;
2010-06-21 18:07:48 -04:00
return ;
}
2013-01-30 12:33:16 -05:00
redisLog ( REDIS_NOTICE , " Slave asks for synchronization " ) ;
/* Try a partial resynchronization if this is a PSYNC command.
* If it fails , we continue with usual full resynchronization , however
* when this happens masterTryPartialResynchronization ( ) already
* replied with :
*
* + FULLRESYNC < runid > < offset >
*
* So the slave knows the new runid and offset to try a PSYNC later
* if the connection with the master is lost . */
2013-02-12 09:24:25 -05:00
if ( ! strcasecmp ( c - > argv [ 0 ] - > ptr , " psync " ) ) {
if ( masterTryPartialResynchronization ( c ) = = REDIS_OK ) {
server . stat_sync_partial_ok + + ;
return ; /* No full resync needed, return. */
} else {
char * master_runid = c - > argv [ 1 ] - > ptr ;
/* Increment stats for failed PSYNCs, but only if the
* runid is not " ? " , as this is used by slaves to force a full
* resync on purpose when they are not albe to partially
* resync . */
if ( master_runid [ 0 ] ! = ' ? ' ) server . stat_sync_partial_err + + ;
}
}
/* Full resynchronization. */
server . stat_sync_full + + ;
2013-01-30 12:33:16 -05:00
2010-06-21 18:07:48 -04:00
/* Here we need to check if there is a background saving operation
* in progress , or if it is required to start one */
2011-12-21 06:22:13 -05:00
if ( server . rdb_child_pid ! = - 1 ) {
2010-06-21 18:07:48 -04:00
/* Ok a background save is in progress. Let's check if it is a good
* one for replication , i . e . if there is another slave that is
* registering differences since the server forked to save */
redisClient * slave ;
listNode * ln ;
listIter li ;
listRewind ( server . slaves , & li ) ;
while ( ( ln = listNext ( & li ) ) ) {
slave = ln - > value ;
if ( slave - > replstate = = REDIS_REPL_WAIT_BGSAVE_END ) break ;
}
if ( ln ) {
/* Perfect, the server is already registering differences for
* another slave . Set the right state , and copy the buffer . */
2011-12-30 13:34:40 -05:00
copyClientOutputBuffer ( c , slave ) ;
2010-06-21 18:07:48 -04:00
c - > replstate = REDIS_REPL_WAIT_BGSAVE_END ;
redisLog ( REDIS_NOTICE , " Waiting for end of BGSAVE for SYNC " ) ;
} else {
/* No way, we need to wait for the next BGSAVE in order to
* register differences */
c - > replstate = REDIS_REPL_WAIT_BGSAVE_START ;
redisLog ( REDIS_NOTICE , " Waiting for next BGSAVE for SYNC " ) ;
}
} else {
/* Ok we don't have a BGSAVE in progress, let's start one */
redisLog ( REDIS_NOTICE , " Starting BGSAVE for SYNC " ) ;
2011-12-21 06:22:13 -05:00
if ( rdbSaveBackground ( server . rdb_filename ) ! = REDIS_OK ) {
2010-06-21 18:07:48 -04:00
redisLog ( REDIS_NOTICE , " Replication failed, can't BGSAVE " ) ;
2010-09-02 13:52:24 -04:00
addReplyError ( c , " Unable to perform background save " ) ;
2010-06-21 18:07:48 -04:00
return ;
}
c - > replstate = REDIS_REPL_WAIT_BGSAVE_END ;
}
2013-01-31 05:14:15 -05:00
if ( server . repl_disable_tcp_nodelay )
anetDisableTcpNoDelay ( NULL , c - > fd ) ; /* Non critical if it fails. */
2010-06-21 18:07:48 -04:00
c - > repldbfd = - 1 ;
c - > flags | = REDIS_SLAVE ;
2012-11-02 11:31:28 -04:00
server . slaveseldb = - 1 ; /* Force to re-emit the SELECT command. */
2010-06-21 18:07:48 -04:00
listAddNodeTail ( server . slaves , c ) ;
2013-01-30 12:33:16 -05:00
if ( listLength ( server . slaves ) = = 1 & & server . repl_backlog = = NULL )
createReplicationBacklog ( ) ;
2010-06-21 18:07:48 -04:00
return ;
}
2012-06-26 03:47:47 -04:00
/* REPLCONF <option> <value> <option> <value> ...
* This command is used by a slave in order to configure the replication
* process before starting it with the SYNC command .
*
* Currently the only use of this command is to communicate to the master
* what is the listening port of the Slave redis instance , so that the
* master can accurately list slaves and their listening ports in
* the INFO output .
*
* In the future the same command can be used in order to configure
* the replication to initiate an incremental replication instead of a
* full resync . */
void replconfCommand ( redisClient * c ) {
int j ;
if ( ( c - > argc % 2 ) = = 0 ) {
/* Number of arguments must be odd to make sure that every
* option has a corresponding value . */
addReply ( c , shared . syntaxerr ) ;
return ;
}
/* Process every option-value pair. */
for ( j = 1 ; j < c - > argc ; j + = 2 ) {
if ( ! strcasecmp ( c - > argv [ j ] - > ptr , " listening-port " ) ) {
long port ;
if ( ( getLongFromObjectOrReply ( c , c - > argv [ j + 1 ] ,
& port , NULL ) ! = REDIS_OK ) )
return ;
c - > slave_listening_port = port ;
2013-05-24 18:37:56 -04:00
} else if ( ! strcasecmp ( c - > argv [ j ] - > ptr , " ack " ) ) {
/* REPLCONF ACK is used by slave to inform the master the amount
* of replication stream that it processed so far . It is an
* internal only command that normal clients should never use . */
long long offset ;
if ( ! ( c - > flags & REDIS_SLAVE ) ) return ;
if ( ( getLongLongFromObject ( c - > argv [ j + 1 ] , & offset ) ! = REDIS_OK ) )
return ;
if ( offset > c - > repl_ack_off )
c - > repl_ack_off = offset ;
c - > repl_ack_time = server . unixtime ;
/* Note: this command does not reply anything! */
2013-05-24 19:22:27 -04:00
return ;
2012-06-26 03:47:47 -04:00
} else {
addReplyErrorFormat ( c , " Unrecognized REPLCONF option: %s " ,
( char * ) c - > argv [ j ] - > ptr ) ;
return ;
}
}
addReply ( c , shared . ok ) ;
}
2010-06-21 18:07:48 -04:00
void sendBulkToSlave ( aeEventLoop * el , int fd , void * privdata , int mask ) {
redisClient * slave = privdata ;
REDIS_NOTUSED ( el ) ;
REDIS_NOTUSED ( mask ) ;
char buf [ REDIS_IOBUF_LEN ] ;
ssize_t nwritten , buflen ;
if ( slave - > repldboff = = 0 ) {
/* Write the bulk write count before to transfer the DB. In theory here
* we don ' t know how much room there is in the output buffer of the
2013-01-16 12:00:20 -05:00
* socket , but in practice SO_SNDLOWAT ( the minimum count for output
2010-06-21 18:07:48 -04:00
* operations ) will never be smaller than the few bytes we need . */
sds bulkcount ;
bulkcount = sdscatprintf ( sdsempty ( ) , " $%lld \r \n " , ( unsigned long long )
slave - > repldbsize ) ;
if ( write ( fd , bulkcount , sdslen ( bulkcount ) ) ! = ( signed ) sdslen ( bulkcount ) )
{
sdsfree ( bulkcount ) ;
freeClient ( slave ) ;
return ;
}
sdsfree ( bulkcount ) ;
}
lseek ( slave - > repldbfd , slave - > repldboff , SEEK_SET ) ;
buflen = read ( slave - > repldbfd , buf , REDIS_IOBUF_LEN ) ;
if ( buflen < = 0 ) {
redisLog ( REDIS_WARNING , " Read error sending DB to slave: %s " ,
( buflen = = 0 ) ? " premature EOF " : strerror ( errno ) ) ;
freeClient ( slave ) ;
return ;
}
if ( ( nwritten = write ( fd , buf , buflen ) ) = = - 1 ) {
redisLog ( REDIS_VERBOSE , " Write error sending DB to slave: %s " ,
strerror ( errno ) ) ;
freeClient ( slave ) ;
return ;
}
slave - > repldboff + = nwritten ;
if ( slave - > repldboff = = slave - > repldbsize ) {
close ( slave - > repldbfd ) ;
slave - > repldbfd = - 1 ;
aeDeleteFileEvent ( server . el , slave - > fd , AE_WRITABLE ) ;
slave - > replstate = REDIS_REPL_ONLINE ;
2013-05-27 05:17:17 -04:00
slave - > repl_ack_time = server . unixtime ;
2010-06-21 18:07:48 -04:00
if ( aeCreateFileEvent ( server . el , slave - > fd , AE_WRITABLE ,
sendReplyToClient , slave ) = = AE_ERR ) {
freeClient ( slave ) ;
return ;
}
redisLog ( REDIS_NOTICE , " Synchronization with slave succeeded " ) ;
}
}
2013-01-16 12:00:20 -05:00
/* This function is called at the end of every background saving.
2010-06-21 18:07:48 -04:00
* The argument bgsaveerr is REDIS_OK if the background saving succeeded
* otherwise REDIS_ERR is passed to the function .
*
* The goal of this function is to handle slaves waiting for a successful
* background saving in order to perform non - blocking synchronization . */
void updateSlavesWaitingBgsave ( int bgsaveerr ) {
listNode * ln ;
int startbgsave = 0 ;
listIter li ;
listRewind ( server . slaves , & li ) ;
while ( ( ln = listNext ( & li ) ) ) {
redisClient * slave = ln - > value ;
if ( slave - > replstate = = REDIS_REPL_WAIT_BGSAVE_START ) {
startbgsave = 1 ;
slave - > replstate = REDIS_REPL_WAIT_BGSAVE_END ;
} else if ( slave - > replstate = = REDIS_REPL_WAIT_BGSAVE_END ) {
struct redis_stat buf ;
if ( bgsaveerr ! = REDIS_OK ) {
freeClient ( slave ) ;
redisLog ( REDIS_WARNING , " SYNC failed. BGSAVE child returned an error " ) ;
continue ;
}
2011-12-21 06:22:13 -05:00
if ( ( slave - > repldbfd = open ( server . rdb_filename , O_RDONLY ) ) = = - 1 | |
2010-06-21 18:07:48 -04:00
redis_fstat ( slave - > repldbfd , & buf ) = = - 1 ) {
freeClient ( slave ) ;
redisLog ( REDIS_WARNING , " SYNC failed. Can't open/stat DB after BGSAVE: %s " , strerror ( errno ) ) ;
continue ;
}
slave - > repldboff = 0 ;
slave - > repldbsize = buf . st_size ;
slave - > replstate = REDIS_REPL_SEND_BULK ;
aeDeleteFileEvent ( server . el , slave - > fd , AE_WRITABLE ) ;
if ( aeCreateFileEvent ( server . el , slave - > fd , AE_WRITABLE , sendBulkToSlave , slave ) = = AE_ERR ) {
freeClient ( slave ) ;
continue ;
}
}
}
if ( startbgsave ) {
2011-12-21 06:22:13 -05:00
if ( rdbSaveBackground ( server . rdb_filename ) ! = REDIS_OK ) {
2010-06-21 18:07:48 -04:00
listIter li ;
listRewind ( server . slaves , & li ) ;
redisLog ( REDIS_WARNING , " SYNC failed. BGSAVE failed " ) ;
while ( ( ln = listNext ( & li ) ) ) {
redisClient * slave = ln - > value ;
if ( slave - > replstate = = REDIS_REPL_WAIT_BGSAVE_START )
freeClient ( slave ) ;
}
}
}
}
2010-11-04 12:29:53 -04:00
/* ----------------------------------- SLAVE -------------------------------- */
/* Abort the async download of the bulk dataset while SYNC-ing with master */
void replicationAbortSyncTransfer ( void ) {
2011-12-21 06:23:18 -05:00
redisAssert ( server . repl_state = = REDIS_REPL_TRANSFER ) ;
2010-11-04 12:29:53 -04:00
aeDeleteFileEvent ( server . el , server . repl_transfer_s , AE_READABLE ) ;
close ( server . repl_transfer_s ) ;
close ( server . repl_transfer_fd ) ;
unlink ( server . repl_transfer_tmpfile ) ;
zfree ( server . repl_transfer_tmpfile ) ;
2011-12-21 06:23:18 -05:00
server . repl_state = REDIS_REPL_CONNECT ;
2010-11-04 12:29:53 -04:00
}
/* Asynchronously read the SYNC payload we receive from a master */
2012-08-24 13:28:44 -04:00
# define REPL_MAX_WRITTEN_BEFORE_FSYNC (1024*1024*8) /* 8 MB */
2010-11-04 12:29:53 -04:00
void readSyncBulkPayload ( aeEventLoop * el , int fd , void * privdata , int mask ) {
2010-11-04 13:09:35 -04:00
char buf [ 4096 ] ;
2010-11-04 12:35:03 -04:00
ssize_t nread , readlen ;
2012-08-24 13:28:44 -04:00
off_t left ;
2010-11-04 12:35:03 -04:00
REDIS_NOTUSED ( el ) ;
REDIS_NOTUSED ( privdata ) ;
REDIS_NOTUSED ( mask ) ;
2010-11-04 12:29:53 -04:00
2012-08-24 13:28:44 -04:00
/* If repl_transfer_size == -1 we still have to read the bulk length
2010-11-04 13:09:35 -04:00
* from the master reply . */
2012-08-24 13:28:44 -04:00
if ( server . repl_transfer_size = = - 1 ) {
2012-03-31 05:23:30 -04:00
if ( syncReadLine ( fd , buf , 1024 , server . repl_syncio_timeout * 1000 ) = = - 1 ) {
2010-11-04 13:09:35 -04:00
redisLog ( REDIS_WARNING ,
" I/O error reading bulk count from MASTER: %s " ,
strerror ( errno ) ) ;
2011-05-22 06:41:24 -04:00
goto error ;
2010-11-04 13:09:35 -04:00
}
2011-05-22 06:41:24 -04:00
2010-11-04 13:09:35 -04:00
if ( buf [ 0 ] = = ' - ' ) {
redisLog ( REDIS_WARNING ,
" MASTER aborted replication with an error: %s " ,
buf + 1 ) ;
2011-05-22 06:41:24 -04:00
goto error ;
2011-01-20 07:18:23 -05:00
} else if ( buf [ 0 ] = = ' \0 ' ) {
/* At this stage just a newline works as a PING in order to take
* the connection live . So we refresh our last interaction
* timestamp . */
2012-03-27 11:39:58 -04:00
server . repl_transfer_lastio = server . unixtime ;
2011-01-20 07:18:23 -05:00
return ;
2010-11-04 13:09:35 -04:00
} else if ( buf [ 0 ] ! = ' $ ' ) {
2013-02-01 07:01:01 -05:00
redisLog ( REDIS_WARNING , " Bad protocol from MASTER, the first byte is not '$' (we received '%s'), are you sure the host and port are right? " , buf ) ;
2011-05-22 06:41:24 -04:00
goto error ;
2010-11-04 13:09:35 -04:00
}
2012-08-24 13:28:44 -04:00
server . repl_transfer_size = strtol ( buf + 1 , NULL , 10 ) ;
2010-11-04 13:09:35 -04:00
redisLog ( REDIS_NOTICE ,
2013-02-27 06:27:15 -05:00
" MASTER <-> SLAVE sync: receiving %lld bytes from master " ,
( long long ) server . repl_transfer_size ) ;
2010-11-04 13:09:35 -04:00
return ;
}
/* Read bulk data */
2012-08-24 13:28:44 -04:00
left = server . repl_transfer_size - server . repl_transfer_read ;
readlen = ( left < ( signed ) sizeof ( buf ) ) ? left : ( signed ) sizeof ( buf ) ;
2010-11-04 12:29:53 -04:00
nread = read ( fd , buf , readlen ) ;
if ( nread < = 0 ) {
redisLog ( REDIS_WARNING , " I/O error trying to sync with MASTER: %s " ,
( nread = = - 1 ) ? strerror ( errno ) : " connection lost " ) ;
replicationAbortSyncTransfer ( ) ;
return ;
}
2012-03-27 11:39:58 -04:00
server . repl_transfer_lastio = server . unixtime ;
2010-11-04 12:29:53 -04:00
if ( write ( server . repl_transfer_fd , buf , nread ) ! = nread ) {
2012-04-25 15:21:56 -04:00
redisLog ( REDIS_WARNING , " Write error or short write writing to the DB dump file needed for MASTER <-> SLAVE synchronization: %s " , strerror ( errno ) ) ;
2011-05-22 06:41:24 -04:00
goto error ;
2010-11-04 12:29:53 -04:00
}
2012-08-24 13:28:44 -04:00
server . repl_transfer_read + = nread ;
/* Sync data on disk from time to time, otherwise at the end of the transfer
* we may suffer a big delay as the memory buffers are copied into the
* actual disk . */
if ( server . repl_transfer_read > =
server . repl_transfer_last_fsync_off + REPL_MAX_WRITTEN_BEFORE_FSYNC )
{
off_t sync_size = server . repl_transfer_read -
server . repl_transfer_last_fsync_off ;
rdb_fsync_range ( server . repl_transfer_fd ,
server . repl_transfer_last_fsync_off , sync_size ) ;
server . repl_transfer_last_fsync_off + = sync_size ;
}
2010-11-04 12:29:53 -04:00
/* Check if the transfer is now complete */
2012-08-24 13:28:44 -04:00
if ( server . repl_transfer_read = = server . repl_transfer_size ) {
2011-12-21 06:22:13 -05:00
if ( rename ( server . repl_transfer_tmpfile , server . rdb_filename ) = = - 1 ) {
2010-11-04 12:29:53 -04:00
redisLog ( REDIS_WARNING , " Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s " , strerror ( errno ) ) ;
replicationAbortSyncTransfer ( ) ;
return ;
}
2010-11-04 13:14:20 -04:00
redisLog ( REDIS_NOTICE , " MASTER <-> SLAVE sync: Loading DB in memory " ) ;
2013-02-08 04:26:19 -05:00
signalFlushedDb ( - 1 ) ;
2010-11-04 12:29:53 -04:00
emptyDb ( ) ;
2010-11-12 14:02:20 -05:00
/* Before loading the DB into memory we need to delete the readable
* handler , otherwise it will get called recursively since
* rdbLoad ( ) will call the event loop to process events from time to
* time for non blocking loading . */
aeDeleteFileEvent ( server . el , server . repl_transfer_s , AE_READABLE ) ;
2011-12-21 06:22:13 -05:00
if ( rdbLoad ( server . rdb_filename ) ! = REDIS_OK ) {
2010-11-04 12:29:53 -04:00
redisLog ( REDIS_WARNING , " Failed trying to load the MASTER synchronization DB from disk " ) ;
replicationAbortSyncTransfer ( ) ;
return ;
}
/* Final setup of the connected slave <- master link */
zfree ( server . repl_transfer_tmpfile ) ;
close ( server . repl_transfer_fd ) ;
server . master = createClient ( server . repl_transfer_s ) ;
server . master - > flags | = REDIS_MASTER ;
server . master - > authenticated = 1 ;
2011-12-21 06:23:18 -05:00
server . repl_state = REDIS_REPL_CONNECTED ;
2013-01-30 12:33:16 -05:00
server . master - > reploff = server . repl_master_initial_offset ;
memcpy ( server . master - > replrunid , server . repl_master_runid ,
sizeof ( server . repl_master_runid ) ) ;
2010-11-04 13:09:35 -04:00
redisLog ( REDIS_NOTICE , " MASTER <-> SLAVE sync: Finished with success " ) ;
2011-12-15 10:07:49 -05:00
/* Restart the AOF subsystem now that we finished the sync. This
* will trigger an AOF rewrite , and when done will start appending
* to the new file . */
2011-12-21 04:31:34 -05:00
if ( server . aof_state ! = REDIS_AOF_OFF ) {
2011-12-15 10:07:49 -05:00
int retry = 10 ;
stopAppendOnly ( ) ;
while ( retry - - & & startAppendOnly ( ) = = REDIS_ERR ) {
2013-01-16 12:00:20 -05:00
redisLog ( REDIS_WARNING , " Failed enabling the AOF after successful master synchronization! Trying it again in one second. " ) ;
2011-12-15 10:07:49 -05:00
sleep ( 1 ) ;
}
if ( ! retry ) {
redisLog ( REDIS_WARNING , " FATAL: this slave instance finished the synchronization with its master, but the AOF can't be turned on. Exiting now. " ) ;
exit ( 1 ) ;
}
}
2010-11-04 12:29:53 -04:00
}
2011-05-22 06:41:24 -04:00
return ;
error :
replicationAbortSyncTransfer ( ) ;
return ;
2010-11-04 12:29:53 -04:00
}
2012-06-26 03:47:47 -04:00
/* Send a synchronous command to the master. Used to send AUTH and
2012-06-27 05:26:37 -04:00
* REPLCONF commands before starting the replication with SYNC .
2012-06-26 03:47:47 -04:00
*
2013-01-30 12:33:16 -05:00
* The command returns an sds string representing the result of the
* operation . On error the first byte is a " - " .
2012-06-26 03:47:47 -04:00
*/
char * sendSynchronousCommand ( int fd , . . . ) {
va_list ap ;
sds cmd = sdsempty ( ) ;
char * arg , buf [ 256 ] ;
/* Create the command to send to the master, we use simple inline
* protocol for simplicity as currently we only send simple strings . */
va_start ( ap , fd ) ;
while ( 1 ) {
arg = va_arg ( ap , char * ) ;
if ( arg = = NULL ) break ;
if ( sdslen ( cmd ) ! = 0 ) cmd = sdscatlen ( cmd , " " , 1 ) ;
cmd = sdscat ( cmd , arg ) ;
}
cmd = sdscatlen ( cmd , " \r \n " , 2 ) ;
/* Transfer command to the server. */
if ( syncWrite ( fd , cmd , sdslen ( cmd ) , server . repl_syncio_timeout * 1000 ) = = - 1 ) {
sdsfree ( cmd ) ;
2013-01-30 12:33:16 -05:00
return sdscatprintf ( sdsempty ( ) , " -Writing to master: %s " ,
2012-06-26 03:47:47 -04:00
strerror ( errno ) ) ;
}
sdsfree ( cmd ) ;
/* Read the reply from the server. */
if ( syncReadLine ( fd , buf , sizeof ( buf ) , server . repl_syncio_timeout * 1000 ) = = - 1 )
{
2013-01-30 12:33:16 -05:00
return sdscatprintf ( sdsempty ( ) , " -Reading from master: %s " ,
2012-06-26 03:47:47 -04:00
strerror ( errno ) ) ;
}
2013-01-30 12:33:16 -05:00
return sdsnew ( buf ) ;
}
/* Try a partial resynchronization with the master if we are about to reconnect.
* If there is no cached master structure , at least try to issue a
* " PSYNC ? -1 " command in order to trigger a full resync using the PSYNC
* command in order to obtain the master run id and the master replication
* global offset .
*
* This function is designed to be called from syncWithMaster ( ) , so the
* following assumptions are made :
*
* 1 ) We pass the function an already connected socket " fd " .
* 2 ) This function does not close the file descriptor " fd " . However in case
* of successful partial resynchronization , the function will reuse
* ' fd ' as file descriptor of the server . master client structure .
*
* The function returns :
*
* PSYNC_CONTINUE : If the PSYNC command succeded and we can continue .
* PSYNC_FULLRESYNC : If PSYNC is supported but a full resync is needed .
* In this case the master run_id and global replication
* offset is saved .
* PSYNC_NOT_SUPPORTED : If the server does not understand PSYNC at all and
* the caller should fall back to SYNC .
*/
# define PSYNC_CONTINUE 0
# define PSYNC_FULLRESYNC 1
# define PSYNC_NOT_SUPPORTED 2
int slaveTryPartialResynchronization ( int fd ) {
char * psync_runid ;
char psync_offset [ 32 ] ;
sds reply ;
/* Initially set repl_master_initial_offset to -1 to mark the current
* master run_id and offset as not valid . Later if we ' ll be able to do
* a FULL resync using the PSYNC command we ' ll set the offset at the
* right value , so that this information will be propagated to the
* client structure representing the master into server . master . */
server . repl_master_initial_offset = - 1 ;
if ( server . cached_master ) {
psync_runid = server . cached_master - > replrunid ;
snprintf ( psync_offset , sizeof ( psync_offset ) , " %lld " , server . cached_master - > reploff + 1 ) ;
redisLog ( REDIS_NOTICE , " Trying a partial resynchronization (request %s:%s). " , psync_runid , psync_offset ) ;
} else {
redisLog ( REDIS_NOTICE , " Partial resynchronization not possible (no cached master) " ) ;
psync_runid = " ? " ;
memcpy ( psync_offset , " -1 " , 3 ) ;
}
/* Issue the PSYNC command */
reply = sendSynchronousCommand ( fd , " PSYNC " , psync_runid , psync_offset , NULL ) ;
if ( ! strncmp ( reply , " +FULLRESYNC " , 11 ) ) {
2013-02-01 09:28:02 -05:00
char * runid = NULL , * offset = NULL ;
2013-01-30 12:33:16 -05:00
/* FULL RESYNC, parse the reply in order to extract the run id
* and the replication offset . */
runid = strchr ( reply , ' ' ) ;
if ( runid ) {
runid + + ;
offset = strchr ( runid , ' ' ) ;
if ( offset ) offset + + ;
}
if ( ! runid | | ! offset | | ( offset - runid - 1 ) ! = REDIS_RUN_ID_SIZE ) {
redisLog ( REDIS_WARNING ,
" Master replied with wrong +FULLRESYNC syntax. " ) ;
2013-02-13 12:33:33 -05:00
/* This is an unexpected condition, actually the +FULLRESYNC
* reply means that the master supports PSYNC , but the reply
* format seems wrong . To stay safe we blank the master
2013-02-13 12:43:40 -05:00
* runid to make sure next PSYNCs will fail . */
2013-02-13 12:33:33 -05:00
memset ( server . repl_master_runid , 0 , REDIS_RUN_ID_SIZE + 1 ) ;
2013-01-30 12:33:16 -05:00
} else {
memcpy ( server . repl_master_runid , runid , offset - runid - 1 ) ;
server . repl_master_runid [ REDIS_RUN_ID_SIZE ] = ' \0 ' ;
server . repl_master_initial_offset = strtoll ( offset , NULL , 10 ) ;
redisLog ( REDIS_NOTICE , " Full resync from master: %s:%lld " ,
server . repl_master_runid ,
server . repl_master_initial_offset ) ;
}
/* We are going to full resync, discard the cached master structure. */
replicationDiscardCachedMaster ( ) ;
sdsfree ( reply ) ;
return PSYNC_FULLRESYNC ;
}
2012-06-26 03:47:47 -04:00
2013-01-30 12:33:16 -05:00
if ( ! strncmp ( reply , " +CONTINUE " , 9 ) ) {
/* Partial resync was accepted, set the replication state accordingly */
redisLog ( REDIS_NOTICE ,
" Successful partial resynchronization with master. " ) ;
sdsfree ( reply ) ;
replicationResurrectCachedMaster ( fd ) ;
return PSYNC_CONTINUE ;
2012-06-26 03:47:47 -04:00
}
2013-01-30 12:33:16 -05:00
/* If we reach this point we receied either an error since the master does
* not understand PSYNC , or an unexpected reply from the master .
* Reply with PSYNC_NOT_SUPPORTED in both cases . */
if ( strncmp ( reply , " -ERR " , 4 ) ) {
/* If it's not an error, log the unexpected event. */
redisLog ( REDIS_WARNING ,
" Unexpected reply to PSYNC from master: %s " , reply ) ;
} else {
redisLog ( REDIS_NOTICE ,
" Master does not support PSYNC or is in "
" error state (reply: %s) " , reply ) ;
}
sdsfree ( reply ) ;
replicationDiscardCachedMaster ( ) ;
return PSYNC_NOT_SUPPORTED ;
2012-06-26 03:47:47 -04:00
}
2011-05-19 12:53:06 -04:00
void syncWithMaster ( aeEventLoop * el , int fd , void * privdata , int mask ) {
2012-06-26 03:47:47 -04:00
char tmpfile [ 256 ] , * err ;
2010-06-21 18:07:48 -04:00
int dfd , maxtries = 5 ;
2013-01-30 12:33:16 -05:00
int sockerr = 0 , psync_result ;
2012-08-31 09:32:57 -04:00
socklen_t errlen = sizeof ( sockerr ) ;
2011-05-19 12:53:06 -04:00
REDIS_NOTUSED ( el ) ;
REDIS_NOTUSED ( privdata ) ;
REDIS_NOTUSED ( mask ) ;
2010-06-21 18:07:48 -04:00
2011-10-18 05:09:32 -04:00
/* If this event fired after the user turned the instance into a master
* with SLAVEOF NO ONE we must just return ASAP . */
2011-12-21 06:23:18 -05:00
if ( server . repl_state = = REDIS_REPL_NONE ) {
2011-10-18 05:09:32 -04:00
close ( fd ) ;
return ;
}
2012-08-31 09:32:57 -04:00
/* Check for errors in the socket. */
if ( getsockopt ( fd , SOL_SOCKET , SO_ERROR , & sockerr , & errlen ) = = - 1 )
sockerr = errno ;
if ( sockerr ) {
aeDeleteFileEvent ( server . el , fd , AE_READABLE | AE_WRITABLE ) ;
redisLog ( REDIS_WARNING , " Error condition on socket for SYNC: %s " ,
strerror ( sockerr ) ) ;
goto error ;
}
/* If we were connecting, it's time to send a non blocking PING, we want to
* make sure the master is able to reply before going into the actual
* replication process where we have long timeouts in the order of
* seconds ( in the meantime the slave would block ) . */
if ( server . repl_state = = REDIS_REPL_CONNECTING ) {
redisLog ( REDIS_NOTICE , " Non blocking connect for SYNC fired the event. " ) ;
/* Delete the writable event so that the readable event remains
* registered and we can wait for the PONG reply . */
aeDeleteFileEvent ( server . el , fd , AE_WRITABLE ) ;
server . repl_state = REDIS_REPL_RECEIVE_PONG ;
/* Send the PING, don't check for errors at all, we have the timeout
* that will take care about this . */
syncWrite ( fd , " PING \r \n " , 6 , 100 ) ;
return ;
}
/* Receive the PONG command. */
if ( server . repl_state = = REDIS_REPL_RECEIVE_PONG ) {
char buf [ 1024 ] ;
/* Delete the readable event, we no longer need it now that there is
* the PING reply to read . */
aeDeleteFileEvent ( server . el , fd , AE_READABLE ) ;
/* Read the reply with explicit timeout. */
buf [ 0 ] = ' \0 ' ;
if ( syncReadLine ( fd , buf , sizeof ( buf ) ,
server . repl_syncio_timeout * 1000 ) = = - 1 )
{
redisLog ( REDIS_WARNING ,
" I/O error reading PING reply from master: %s " ,
strerror ( errno ) ) ;
goto error ;
}
2013-02-12 10:53:27 -05:00
/* We accept only two replies as valid, a positive +PONG reply
* ( we just check for " + " ) or an authentication error .
* Note that older versions of Redis replied with " operation not
* permitted " instead of using a proper error code, so we test
* both . */
if ( buf [ 0 ] ! = ' + ' & &
strncmp ( buf , " -NOAUTH " , 7 ) ! = 0 & &
strncmp ( buf , " -ERR operation not permitted " , 28 ) ! = 0 )
{
redisLog ( REDIS_WARNING , " Error reply to PING from master: '%s' " , buf ) ;
2012-08-31 09:32:57 -04:00
goto error ;
} else {
redisLog ( REDIS_NOTICE ,
" Master replied to PING, replication can continue... " ) ;
}
}
2010-06-21 18:07:48 -04:00
/* AUTH with the master if required. */
if ( server . masterauth ) {
2012-06-26 03:47:47 -04:00
err = sendSynchronousCommand ( fd , " AUTH " , server . masterauth , NULL ) ;
2013-01-30 12:33:16 -05:00
if ( err [ 0 ] = = ' - ' ) {
2012-06-26 03:47:47 -04:00
redisLog ( REDIS_WARNING , " Unable to AUTH to MASTER: %s " , err ) ;
sdsfree ( err ) ;
2011-05-19 12:53:06 -04:00
goto error ;
}
2013-01-30 12:33:16 -05:00
sdsfree ( err ) ;
2012-06-26 03:47:47 -04:00
}
/* Set the slave port, so that Master's INFO command can list the
* slave listening port correctly . */
{
sds port = sdsfromlonglong ( server . port ) ;
err = sendSynchronousCommand ( fd , " REPLCONF " , " listening-port " , port ,
NULL ) ;
sdsfree ( port ) ;
/* Ignore the error if any, not all the Redis versions support
* REPLCONF listening - port . */
2013-01-30 12:33:16 -05:00
if ( err [ 0 ] = = ' - ' ) {
redisLog ( REDIS_NOTICE , " (Non critical) Master does not understand REPLCONF listening-port: %s " , err ) ;
2010-06-21 18:07:48 -04:00
}
2013-01-30 12:33:16 -05:00
sdsfree ( err ) ;
2010-06-21 18:07:48 -04:00
}
2013-01-30 12:33:16 -05:00
/* Try a partial resynchonization. If we don't have a cached master
* slaveTryPartialResynchronization ( ) will at least try to use PSYNC
* to start a full resynchronization so that we get the master run id
* and the global offset , to try a partial resync at the next
* reconnection attempt . */
psync_result = slaveTryPartialResynchronization ( fd ) ;
if ( psync_result = = PSYNC_CONTINUE ) {
redisLog ( REDIS_NOTICE , " MASTER <-> SLAVE sync: Master accepted a Partial Resynchronization. " ) ;
return ;
}
/* Fall back to SYNC if needed. Otherwise psync_result == PSYNC_FULLRESYNC
* and the server . repl_master_runid and repl_master_initial_offset are
* already populated . */
if ( psync_result = = PSYNC_NOT_SUPPORTED ) {
redisLog ( REDIS_NOTICE , " Retrying with SYNC... " ) ;
if ( syncWrite ( fd , " SYNC \r \n " , 6 , server . repl_syncio_timeout * 1000 ) = = - 1 ) {
redisLog ( REDIS_WARNING , " I/O error writing to MASTER: %s " ,
strerror ( errno ) ) ;
goto error ;
}
2010-06-21 18:07:48 -04:00
}
2010-11-04 13:09:35 -04:00
/* Prepare a suitable temp file for bulk transfer */
2010-06-21 18:07:48 -04:00
while ( maxtries - - ) {
snprintf ( tmpfile , 256 ,
2012-03-27 11:39:58 -04:00
" temp-%d.%ld.rdb " , ( int ) server . unixtime , ( long int ) getpid ( ) ) ;
2010-06-21 18:07:48 -04:00
dfd = open ( tmpfile , O_CREAT | O_WRONLY | O_EXCL , 0644 ) ;
if ( dfd ! = - 1 ) break ;
sleep ( 1 ) ;
}
if ( dfd = = - 1 ) {
redisLog ( REDIS_WARNING , " Opening the temp file needed for MASTER <-> SLAVE synchronization: %s " , strerror ( errno ) ) ;
2011-05-19 12:53:06 -04:00
goto error ;
2010-06-21 18:07:48 -04:00
}
2010-11-04 12:29:53 -04:00
/* Setup the non blocking download of the bulk file. */
2011-05-19 12:53:06 -04:00
if ( aeCreateFileEvent ( server . el , fd , AE_READABLE , readSyncBulkPayload , NULL )
2010-11-04 12:35:03 -04:00
= = AE_ERR )
2010-11-04 12:29:53 -04:00
{
2013-01-03 08:22:55 -05:00
redisLog ( REDIS_WARNING ,
" Can't create readable event for SYNC: %s (fd=%d) " ,
strerror ( errno ) , fd ) ;
2011-05-19 12:53:06 -04:00
goto error ;
2010-06-21 18:07:48 -04:00
}
2011-05-19 12:53:06 -04:00
2011-12-21 06:23:18 -05:00
server . repl_state = REDIS_REPL_TRANSFER ;
2012-08-24 13:28:44 -04:00
server . repl_transfer_size = - 1 ;
server . repl_transfer_read = 0 ;
server . repl_transfer_last_fsync_off = 0 ;
2010-11-04 12:29:53 -04:00
server . repl_transfer_fd = dfd ;
2012-03-27 11:39:58 -04:00
server . repl_transfer_lastio = server . unixtime ;
2010-11-04 12:29:53 -04:00
server . repl_transfer_tmpfile = zstrdup ( tmpfile ) ;
2011-05-19 12:53:06 -04:00
return ;
error :
close ( fd ) ;
2012-08-31 09:32:57 -04:00
server . repl_transfer_s = - 1 ;
server . repl_state = REDIS_REPL_CONNECT ;
2011-05-19 12:53:06 -04:00
return ;
}
int connectWithMaster ( void ) {
int fd ;
fd = anetTcpNonBlockConnect ( NULL , server . masterhost , server . masterport ) ;
if ( fd = = - 1 ) {
redisLog ( REDIS_WARNING , " Unable to connect to MASTER: %s " ,
strerror ( errno ) ) ;
return REDIS_ERR ;
}
2011-06-09 09:35:07 -04:00
if ( aeCreateFileEvent ( server . el , fd , AE_READABLE | AE_WRITABLE , syncWithMaster , NULL ) = =
2011-05-22 06:41:24 -04:00
AE_ERR )
2011-05-19 12:53:06 -04:00
{
close ( fd ) ;
redisLog ( REDIS_WARNING , " Can't create readable event for SYNC " ) ;
return REDIS_ERR ;
}
2012-03-27 11:39:58 -04:00
server . repl_transfer_lastio = server . unixtime ;
2011-05-19 12:53:06 -04:00
server . repl_transfer_s = fd ;
2011-12-21 06:23:18 -05:00
server . repl_state = REDIS_REPL_CONNECTING ;
2010-06-21 18:07:48 -04:00
return REDIS_OK ;
}
2011-11-30 09:35:16 -05:00
/* This function can be called when a non blocking connection is currently
* in progress to undo it . */
void undoConnectWithMaster ( void ) {
int fd = server . repl_transfer_s ;
2012-08-31 09:32:57 -04:00
redisAssert ( server . repl_state = = REDIS_REPL_CONNECTING | |
server . repl_state = = REDIS_REPL_RECEIVE_PONG ) ;
2011-11-30 09:35:16 -05:00
aeDeleteFileEvent ( server . el , fd , AE_READABLE | AE_WRITABLE ) ;
close ( fd ) ;
server . repl_transfer_s = - 1 ;
2011-12-21 06:23:18 -05:00
server . repl_state = REDIS_REPL_CONNECT ;
2011-11-30 09:35:16 -05:00
}
2013-01-14 05:39:54 -05:00
/* This function aborts a non blocking replication attempt if there is one
* in progress , by canceling the non - blocking connect attempt or
* the initial bulk transfer .
*
* If there was a replication handshake in progress 1 is returned and
* the replication state ( server . repl_state ) set to REDIS_REPL_CONNECT .
*
* Otherwise zero is returned and no operation is perforemd at all . */
int cancelReplicationHandshake ( void ) {
if ( server . repl_state = = REDIS_REPL_TRANSFER ) {
replicationAbortSyncTransfer ( ) ;
} else if ( server . repl_state = = REDIS_REPL_CONNECTING | |
server . repl_state = = REDIS_REPL_RECEIVE_PONG )
{
undoConnectWithMaster ( ) ;
} else {
return 0 ;
}
return 1 ;
}
2013-03-04 07:22:21 -05:00
/* Set replication to the specified master address and port. */
void replicationSetMaster ( char * ip , int port ) {
sdsfree ( server . masterhost ) ;
2013-03-04 09:39:43 -05:00
server . masterhost = sdsnew ( ip ) ;
2013-03-04 07:22:21 -05:00
server . masterport = port ;
if ( server . master ) freeClient ( server . master ) ;
disconnectSlaves ( ) ; /* Force our slaves to resync with us as well. */
replicationDiscardCachedMaster ( ) ; /* Don't try a PSYNC. */
freeReplicationBacklog ( ) ; /* Don't allow our chained slaves to PSYNC. */
cancelReplicationHandshake ( ) ;
server . repl_state = REDIS_REPL_CONNECT ;
}
/* Cancel replication, setting the instance as a master itself. */
void replicationUnsetMaster ( void ) {
if ( server . masterhost = = NULL ) return ; /* Nothing to do. */
sdsfree ( server . masterhost ) ;
server . masterhost = NULL ;
if ( server . master ) freeClient ( server . master ) ;
replicationDiscardCachedMaster ( ) ;
cancelReplicationHandshake ( ) ;
server . repl_state = REDIS_REPL_NONE ;
}
2010-06-21 18:07:48 -04:00
void slaveofCommand ( redisClient * c ) {
2013-03-05 06:39:11 -05:00
/* SLAVEOF is not allowed in cluster mode as replication is automatically
* configured using the current address of the master node . */
if ( server . cluster_enabled ) {
addReplyError ( c , " SLAVEOF not allowed in cluster mode. " ) ;
return ;
}
/* The special host/port combination "NO" "ONE" turns the instance
* into a master . Otherwise the new master address is set . */
2010-06-21 18:07:48 -04:00
if ( ! strcasecmp ( c - > argv [ 1 ] - > ptr , " no " ) & &
! strcasecmp ( c - > argv [ 2 ] - > ptr , " one " ) ) {
if ( server . masterhost ) {
2013-03-04 07:22:21 -05:00
replicationUnsetMaster ( ) ;
2010-06-21 18:07:48 -04:00
redisLog ( REDIS_NOTICE , " MASTER MODE enabled (user request) " ) ;
}
} else {
2012-01-16 05:27:22 -05:00
long port ;
if ( ( getLongFromObjectOrReply ( c , c - > argv [ 2 ] , & port , NULL ) ! = REDIS_OK ) )
return ;
/* Check if we are already attached to the specified slave */
if ( server . masterhost & & ! strcasecmp ( server . masterhost , c - > argv [ 1 ] - > ptr )
& & server . masterport = = port ) {
redisLog ( REDIS_NOTICE , " SLAVE OF would result into synchronization with the master we are already connected with. No operation performed. " ) ;
addReplySds ( c , sdsnew ( " +OK Already connected to specified master \r \n " ) ) ;
return ;
}
/* There was no previous master or the user specified a different one,
* we can continue . */
2013-03-04 07:22:21 -05:00
replicationSetMaster ( c - > argv [ 1 ] - > ptr , port ) ;
2010-06-21 18:07:48 -04:00
redisLog ( REDIS_NOTICE , " SLAVE OF %s:%d enabled (user request) " ,
server . masterhost , server . masterport ) ;
}
addReply ( c , shared . ok ) ;
}
2010-11-04 12:29:53 -04:00
2013-05-27 04:41:53 -04:00
/* Send a REPLCONF ACK command to the master to inform it about the current
* processed offset . If we are not connected with a master , the command has
* no effects . */
void replicationSendAck ( void ) {
redisClient * c = server . master ;
if ( c ! = NULL ) {
c - > flags | = REDIS_MASTER_FORCE_REPLY ;
addReplyMultiBulkLen ( c , 3 ) ;
addReplyBulkCString ( c , " REPLCONF " ) ;
addReplyBulkCString ( c , " ACK " ) ;
addReplyBulkLongLong ( c , c - > reploff ) ;
c - > flags & = ~ REDIS_MASTER_FORCE_REPLY ;
}
}
2013-01-30 12:33:16 -05:00
/* ---------------------- MASTER CACHING FOR PSYNC -------------------------- */
/* In order to implement partial synchronization we need to be able to cache
* our master ' s client structure after a transient disconnection .
* It is cached into server . cached_master and flushed away using the following
* functions . */
/* This function is called by freeClient() in order to cache the master
* client structure instead of destryoing it . freeClient ( ) will return
* ASAP after this function returns , so every action needed to avoid problems
* with a client that is really " suspended " has to be done by this function .
*
* The other functions that will deal with the cached master are :
*
* replicationDiscardCachedMaster ( ) that will make sure to kill the client
* as for some reason we don ' t want to use it in the future .
*
* replicationResurrectCachedMaster ( ) that is used after a successful PSYNC
* handshake in order to reactivate the cached master .
*/
void replicationCacheMaster ( redisClient * c ) {
listNode * ln ;
redisAssert ( server . master ! = NULL & & server . cached_master = = NULL ) ;
redisLog ( REDIS_NOTICE , " Caching the disconnected master state. " ) ;
/* Remove from the list of clients, we don't want this client to be
* listed by CLIENT LIST or processed in any way by batch operations . */
ln = listSearchKey ( server . clients , c ) ;
redisAssert ( ln ! = NULL ) ;
listDelNode ( server . clients , ln ) ;
/* Save the master. Server.master will be set to null later by
* replicationHandleMasterDisconnection ( ) . */
server . cached_master = server . master ;
/* Remove the event handlers and close the socket. We'll later reuse
* the socket of the new connection with the master during PSYNC . */
aeDeleteFileEvent ( server . el , c - > fd , AE_READABLE ) ;
aeDeleteFileEvent ( server . el , c - > fd , AE_WRITABLE ) ;
close ( c - > fd ) ;
/* Set fd to -1 so that we can safely call freeClient(c) later. */
c - > fd = - 1 ;
/* Caching the master happens instead of the actual freeClient() call,
* so make sure to adjust the replication state . This function will
* also set server . master to NULL . */
replicationHandleMasterDisconnection ( ) ;
}
/* Free a cached master, called when there are no longer the conditions for
* a partial resync on reconnection . */
void replicationDiscardCachedMaster ( void ) {
if ( server . cached_master = = NULL ) return ;
redisLog ( REDIS_NOTICE , " Discarding previously cached master state. " ) ;
server . cached_master - > flags & = ~ REDIS_MASTER ;
freeClient ( server . cached_master ) ;
server . cached_master = NULL ;
}
/* Turn the cached master into the current master, using the file descriptor
* passed as argument as the socket for the new master .
*
* This funciton is called when successfully setup a partial resynchronization
* so the stream of data that we ' ll receive will start from were this
* master left . */
void replicationResurrectCachedMaster ( int newfd ) {
server . master = server . cached_master ;
server . cached_master = NULL ;
server . master - > fd = newfd ;
server . master - > flags & = ~ ( REDIS_CLOSE_AFTER_REPLY | REDIS_CLOSE_ASAP ) ;
server . master - > authenticated = 1 ;
server . master - > lastinteraction = server . unixtime ;
server . repl_state = REDIS_REPL_CONNECTED ;
/* Re-add to the list of clients. */
listAddNodeTail ( server . clients , server . master ) ;
if ( aeCreateFileEvent ( server . el , newfd , AE_READABLE ,
readQueryFromClient , server . master ) ) {
redisLog ( REDIS_WARNING , " Error resurrecting the cached master, impossible to add the readable handler: %s " , strerror ( errno ) ) ;
freeClientAsync ( server . master ) ; /* Close ASAP. */
}
}
2010-11-04 12:29:53 -04:00
/* --------------------------- REPLICATION CRON ---------------------------- */
2013-05-27 04:41:53 -04:00
/* Replication cron funciton, called 1 time per second. */
2010-11-04 12:29:53 -04:00
void replicationCron ( void ) {
2011-11-30 09:35:16 -05:00
/* Non blocking connection timeout? */
2012-08-31 09:32:57 -04:00
if ( server . masterhost & &
( server . repl_state = = REDIS_REPL_CONNECTING | |
server . repl_state = = REDIS_REPL_RECEIVE_PONG ) & &
2011-11-30 09:35:16 -05:00
( time ( NULL ) - server . repl_transfer_lastio ) > server . repl_timeout )
{
redisLog ( REDIS_WARNING , " Timeout connecting to the MASTER... " ) ;
undoConnectWithMaster ( ) ;
}
2010-11-04 12:29:53 -04:00
/* Bulk transfer I/O timeout? */
2011-12-21 06:23:18 -05:00
if ( server . masterhost & & server . repl_state = = REDIS_REPL_TRANSFER & &
2011-10-31 06:13:28 -04:00
( time ( NULL ) - server . repl_transfer_lastio ) > server . repl_timeout )
2010-11-04 12:29:53 -04:00
{
2012-10-04 05:49:17 -04:00
redisLog ( REDIS_WARNING , " Timeout receiving bulk data from MASTER... If the problem persists try to set the 'repl-timeout' parameter in redis.conf to a larger value. " ) ;
2010-11-04 12:29:53 -04:00
replicationAbortSyncTransfer ( ) ;
}
2011-01-20 07:18:23 -05:00
/* Timed out master when we are an already connected slave? */
2011-12-21 06:23:18 -05:00
if ( server . masterhost & & server . repl_state = = REDIS_REPL_CONNECTED & &
2011-10-31 06:13:28 -04:00
( time ( NULL ) - server . master - > lastinteraction ) > server . repl_timeout )
2011-01-20 07:18:23 -05:00
{
redisLog ( REDIS_WARNING , " MASTER time out: no data nor PING received... " ) ;
freeClient ( server . master ) ;
}
2010-11-04 12:29:53 -04:00
/* Check if we should connect to a MASTER */
2011-12-21 06:23:18 -05:00
if ( server . repl_state = = REDIS_REPL_CONNECT ) {
2010-11-04 12:29:53 -04:00
redisLog ( REDIS_NOTICE , " Connecting to MASTER... " ) ;
2011-05-19 12:53:06 -04:00
if ( connectWithMaster ( ) = = REDIS_OK ) {
redisLog ( REDIS_NOTICE , " MASTER <-> SLAVE sync started " ) ;
2010-11-04 12:29:53 -04:00
}
}
2013-05-27 04:45:37 -04:00
/* Send ACK to master from time to time. */
if ( server . masterhost & & server . master )
replicationSendAck ( ) ;
2011-01-20 07:18:23 -05:00
/* If we have attached slaves, PING them from time to time.
* So slaves can implement an explicit timeout to masters , and will
* be able to detect a link disconnection even if the TCP connection
* will not actually go down . */
2012-12-14 11:10:40 -05:00
if ( ! ( server . cronloops % ( server . repl_ping_slave_period * server . hz ) ) ) {
2011-01-20 07:18:23 -05:00
listIter li ;
listNode * ln ;
2012-11-03 06:56:28 -04:00
robj * ping_argv [ 1 ] ;
2011-01-20 07:18:23 -05:00
2012-11-03 06:56:28 -04:00
/* First, send PING */
ping_argv [ 0 ] = createStringObject ( " PING " , 4 ) ;
replicationFeedSlaves ( server . slaves , server . slaveseldb , ping_argv , 1 ) ;
decrRefCount ( ping_argv [ 0 ] ) ;
2013-01-30 12:33:16 -05:00
/* Second, send a newline to all the slaves in pre-synchronization
* stage , that is , slaves waiting for the master to create the RDB file .
2012-11-03 06:56:28 -04:00
* The newline will be ignored by the slave but will refresh the
* last - io timer preventing a timeout . */
2011-01-20 07:18:23 -05:00
listRewind ( server . slaves , & li ) ;
while ( ( ln = listNext ( & li ) ) ) {
redisClient * slave = ln - > value ;
2012-11-03 06:56:28 -04:00
if ( slave - > replstate = = REDIS_REPL_WAIT_BGSAVE_START | |
slave - > replstate = = REDIS_REPL_WAIT_BGSAVE_END ) {
2011-02-21 11:50:54 -05:00
if ( write ( slave - > fd , " \n " , 1 ) = = - 1 ) {
/* Don't worry, it's just a ping. */
}
2011-01-20 07:18:23 -05:00
}
}
}
2013-01-30 12:33:16 -05:00
2013-05-27 05:17:17 -04:00
/* Disconnect timedout slaves. */
if ( listLength ( server . slaves ) ) {
listIter li ;
listNode * ln ;
listRewind ( server . slaves , & li ) ;
while ( ( ln = listNext ( & li ) ) ) {
redisClient * slave = ln - > value ;
if ( slave - > replstate ! = REDIS_REPL_ONLINE ) continue ;
if ( ( server . unixtime - slave - > repl_ack_time ) > server . repl_timeout )
{
char ip [ 32 ] ;
int port ;
if ( anetPeerToString ( slave - > fd , ip , & port ) ! = - 1 ) {
redisLog ( REDIS_WARNING ,
" Disconnecting timedout slave: %s:%d " ,
ip , slave - > slave_listening_port ) ;
}
freeClient ( slave ) ;
}
}
}
2013-01-30 12:33:16 -05:00
/* If we have no attached slaves and there is a replication backlog
* using memory , free it after some ( configured ) time . */
if ( listLength ( server . slaves ) = = 0 & & server . repl_backlog_time_limit & &
server . repl_backlog )
{
time_t idle = server . unixtime - server . repl_no_slaves_since ;
if ( idle > server . repl_backlog_time_limit ) {
freeReplicationBacklog ( ) ;
redisLog ( REDIS_NOTICE ,
" Replication backlog freed after %d seconds "
2013-02-27 06:27:15 -05:00
" without connected slaves. " ,
( int ) server . repl_backlog_time_limit ) ;
2013-01-30 12:33:16 -05:00
}
}
2010-11-04 12:29:53 -04:00
}