2012-11-08 12:25:23 -05:00
/* Asynchronous replication implementation.
*
* Copyright ( c ) 2009 - 2012 , Salvatore Sanfilippo < antirez at gmail dot com >
* All rights reserved .
*
* Redistribution and use in source and binary forms , with or without
* modification , are permitted provided that the following conditions are met :
*
* * Redistributions of source code must retain the above copyright notice ,
* this list of conditions and the following disclaimer .
* * Redistributions in binary form must reproduce the above copyright
* notice , this list of conditions and the following disclaimer in the
* documentation and / or other materials provided with the distribution .
* * Neither the name of Redis nor the names of its contributors may be used
* to endorse or promote products derived from this software without
* specific prior written permission .
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS " AS IS "
* AND ANY EXPRESS OR IMPLIED WARRANTIES , INCLUDING , BUT NOT LIMITED TO , THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED . IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT , INDIRECT , INCIDENTAL , SPECIAL , EXEMPLARY , OR
* CONSEQUENTIAL DAMAGES ( INCLUDING , BUT NOT LIMITED TO , PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES ; LOSS OF USE , DATA , OR PROFITS ; OR BUSINESS
* INTERRUPTION ) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY , WHETHER IN
* CONTRACT , STRICT LIABILITY , OR TORT ( INCLUDING NEGLIGENCE OR OTHERWISE )
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE , EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE .
*/
2010-06-21 18:07:48 -04:00
# include "redis.h"
# include <sys/time.h>
# include <unistd.h>
# include <fcntl.h>
2012-09-17 06:45:57 -04:00
# include <sys/socket.h>
2010-06-21 18:07:48 -04:00
# include <sys/stat.h>
2013-01-30 12:33:16 -05:00
void replicationDiscardCachedMaster ( void ) ;
void replicationResurrectCachedMaster ( int newfd ) ;
2010-11-04 12:29:53 -04:00
/* ---------------------------------- MASTER -------------------------------- */
2013-01-30 12:33:16 -05:00
void createReplicationBacklog ( void ) {
redisAssert ( server . repl_backlog = = NULL ) ;
server . repl_backlog = zmalloc ( server . repl_backlog_size ) ;
server . repl_backlog_histlen = 0 ;
server . repl_backlog_idx = 0 ;
/* When a new backlog buffer is created, we increment the replication
* offset by one to make sure we ' ll not be able to PSYNC with any
* previous slave . This is needed because we avoid incrementing the
* master_repl_offset if no backlog exists nor slaves are attached . */
server . master_repl_offset + + ;
/* We don't have any data inside our buffer, but virtually the first
* byte we have is the next byte that will be generated for the
* replication stream . */
server . repl_backlog_off = server . master_repl_offset + 1 ;
}
/* This function is called when the user modifies the replication backlog
* size at runtime . It is up to the function to both update the
* server . repl_backlog_size and to resize the buffer and setup it so that
* it contains the same data as the previous one ( possibly less data , but
* the most recent bytes , or the same data and more free space in case the
* buffer is enlarged ) . */
void resizeReplicationBacklog ( long long newsize ) {
if ( newsize < REDIS_REPL_BACKLOG_MIN_SIZE )
newsize = REDIS_REPL_BACKLOG_MIN_SIZE ;
if ( server . repl_backlog_size = = newsize ) return ;
server . repl_backlog_size = newsize ;
if ( server . repl_backlog ! = NULL ) {
/* What we actually do is to flush the old buffer and realloc a new
* empty one . It will refill with new data incrementally .
* The reason is that copying a few gigabytes adds latency and even
* worse often we need to alloc additional space before freeing the
* old buffer . */
zfree ( server . repl_backlog ) ;
server . repl_backlog = zmalloc ( server . repl_backlog_size ) ;
server . repl_backlog_histlen = 0 ;
server . repl_backlog_idx = 0 ;
/* Next byte we have is... the next since the buffer is emtpy. */
server . repl_backlog_off = server . master_repl_offset + 1 ;
}
}
void freeReplicationBacklog ( void ) {
2013-01-31 10:33:26 -05:00
redisAssert ( listLength ( server . slaves ) = = 0 ) ;
2013-01-30 12:33:16 -05:00
zfree ( server . repl_backlog ) ;
server . repl_backlog = NULL ;
}
/* Add data to the replication backlog.
* This function also increments the global replication offset stored at
* server . master_repl_offset , because there is no case where we want to feed
* the backlog without incrementing the buffer . */
void feedReplicationBacklog ( void * ptr , size_t len ) {
unsigned char * p = ptr ;
server . master_repl_offset + = len ;
/* This is a circular buffer, so write as much data we can at every
* iteration and rewind the " idx " index if we reach the limit . */
while ( len ) {
size_t thislen = server . repl_backlog_size - server . repl_backlog_idx ;
if ( thislen > len ) thislen = len ;
memcpy ( server . repl_backlog + server . repl_backlog_idx , p , thislen ) ;
server . repl_backlog_idx + = thislen ;
if ( server . repl_backlog_idx = = server . repl_backlog_size )
server . repl_backlog_idx = 0 ;
len - = thislen ;
p + = thislen ;
server . repl_backlog_histlen + = thislen ;
}
if ( server . repl_backlog_histlen > server . repl_backlog_size )
server . repl_backlog_histlen = server . repl_backlog_size ;
/* Set the offset of the first byte we have in the backlog. */
server . repl_backlog_off = server . master_repl_offset -
server . repl_backlog_histlen + 1 ;
}
/* Wrapper for feedReplicationBacklog() that takes Redis string objects
* as input . */
void feedReplicationBacklogWithObject ( robj * o ) {
char llstr [ REDIS_LONGSTR_SIZE ] ;
void * p ;
size_t len ;
if ( o - > encoding = = REDIS_ENCODING_INT ) {
len = ll2string ( llstr , sizeof ( llstr ) , ( long ) o - > ptr ) ;
p = llstr ;
} else {
len = sdslen ( o - > ptr ) ;
p = o - > ptr ;
}
feedReplicationBacklog ( p , len ) ;
}
2010-06-21 18:07:48 -04:00
void replicationFeedSlaves ( list * slaves , int dictid , robj * * argv , int argc ) {
listNode * ln ;
listIter li ;
2013-08-12 06:10:38 -04:00
int j , len ;
2013-01-30 12:33:16 -05:00
char llstr [ REDIS_LONGSTR_SIZE ] ;
/* If there aren't slaves, and there is no backlog buffer to populate,
* we can return ASAP . */
if ( server . repl_backlog = = NULL & & listLength ( slaves ) = = 0 ) return ;
/* We can't have slaves attached and no backlog. */
redisAssert ( ! ( listLength ( slaves ) ! = 0 & & server . repl_backlog = = NULL ) ) ;
2013-08-12 06:10:38 -04:00
/* Send SELECT command to every slave if needed. */
2013-01-30 12:33:16 -05:00
if ( server . slaveseldb ! = dictid ) {
2013-08-12 06:10:38 -04:00
robj * selectcmd ;
2013-01-30 12:33:16 -05:00
2013-08-12 06:10:38 -04:00
/* For a few DBs we have pre-computed SELECT command. */
2013-01-30 12:33:16 -05:00
if ( dictid > = 0 & & dictid < REDIS_SHARED_SELECT_CMDS ) {
2013-08-12 06:10:38 -04:00
selectcmd = shared . select [ dictid ] ;
2013-01-30 12:33:16 -05:00
} else {
int dictid_len ;
dictid_len = ll2string ( llstr , sizeof ( llstr ) , dictid ) ;
2013-08-12 06:10:38 -04:00
selectcmd = createObject ( REDIS_STRING ,
sdscatprintf ( sdsempty ( ) ,
" *2 \r \n $6 \r \n SELECT \r \n $%d \r \n %s \r \n " ,
dictid_len , llstr ) ) ;
2013-01-30 12:33:16 -05:00
}
2013-08-12 06:10:38 -04:00
/* Add the SELECT command into the backlog. */
if ( server . repl_backlog ) feedReplicationBacklogWithObject ( selectcmd ) ;
/* Send it to slaves. */
listRewind ( slaves , & li ) ;
while ( ( ln = listNext ( & li ) ) ) {
redisClient * slave = ln - > value ;
addReply ( slave , selectcmd ) ;
2013-01-30 12:33:16 -05:00
}
2013-08-12 06:10:38 -04:00
if ( dictid < 0 | | dictid > = REDIS_SHARED_SELECT_CMDS )
decrRefCount ( selectcmd ) ;
}
server . slaveseldb = dictid ;
2013-01-30 12:33:16 -05:00
2013-08-12 06:10:38 -04:00
/* Write the command to the replication backlog if any. */
2013-01-30 12:33:16 -05:00
if ( server . repl_backlog ) {
2013-08-12 06:10:38 -04:00
char aux [ REDIS_LONGSTR_SIZE + 3 ] ;
/* Add the multi bulk reply length. */
aux [ 0 ] = ' * ' ;
2013-08-28 01:44:40 -04:00
len = ll2string ( aux + 1 , sizeof ( aux ) - 1 , argc ) ;
2013-08-12 06:10:38 -04:00
aux [ len + 1 ] = ' \r ' ;
aux [ len + 2 ] = ' \n ' ;
feedReplicationBacklog ( aux , len + 3 ) ;
for ( j = 0 ; j < argc ; j + + ) {
long objlen = stringObjectLen ( argv [ j ] ) ;
2013-01-30 12:33:16 -05:00
/* We need to feed the buffer with the object as a bulk reply
* not just as a plain string , so create the $ . . CRLF payload len
* ad add the final CRLF */
aux [ 0 ] = ' $ ' ;
2013-08-12 06:10:38 -04:00
len = ll2string ( aux + 1 , sizeof ( aux ) - 1 , objlen ) ;
2013-01-30 12:33:16 -05:00
aux [ len + 1 ] = ' \r ' ;
aux [ len + 2 ] = ' \n ' ;
feedReplicationBacklog ( aux , len + 3 ) ;
2013-08-12 06:10:38 -04:00
feedReplicationBacklogWithObject ( argv [ j ] ) ;
2013-08-12 06:38:52 -04:00
feedReplicationBacklog ( aux + len + 1 , 2 ) ;
2013-01-30 12:33:16 -05:00
}
}
2010-06-21 18:07:48 -04:00
2013-08-12 06:10:38 -04:00
/* Write the command to every slave. */
2010-06-21 18:07:48 -04:00
listRewind ( slaves , & li ) ;
while ( ( ln = listNext ( & li ) ) ) {
redisClient * slave = ln - > value ;
/* Don't feed slaves that are still waiting for BGSAVE to start */
if ( slave - > replstate = = REDIS_REPL_WAIT_BGSAVE_START ) continue ;
2011-05-29 20:55:13 -04:00
/* Feed slaves that are waiting for the initial SYNC (so these commands
2013-01-16 12:00:20 -05:00
* are queued in the output buffer until the initial SYNC completes ) ,
2011-05-29 20:55:13 -04:00
* or are already in sync with the master . */
2013-01-30 12:33:16 -05:00
2013-08-12 06:10:38 -04:00
/* Add the multi bulk length. */
addReplyMultiBulkLen ( slave , argc ) ;
2013-01-30 12:33:16 -05:00
/* Finally any additional argument that was not stored inside the
* static buffer if any ( from j to argc ) . */
2013-08-12 06:10:38 -04:00
for ( j = 0 ; j < argc ; j + + )
addReplyBulk ( slave , argv [ j ] ) ;
2010-06-21 18:07:48 -04:00
}
}
2012-03-07 06:12:15 -05:00
void replicationFeedMonitors ( redisClient * c , list * monitors , int dictid , robj * * argv , int argc ) {
2010-06-21 18:07:48 -04:00
listNode * ln ;
listIter li ;
2013-07-09 10:21:21 -04:00
int j ;
2010-06-21 18:07:48 -04:00
sds cmdrepr = sdsnew ( " + " ) ;
robj * cmdobj ;
2013-07-09 10:21:21 -04:00
char peerid [ REDIS_PEER_ID_LEN ] ;
2010-06-21 18:07:48 -04:00
struct timeval tv ;
gettimeofday ( & tv , NULL ) ;
2010-12-14 11:39:34 -05:00
cmdrepr = sdscatprintf ( cmdrepr , " %ld.%06ld " , ( long ) tv . tv_sec , ( long ) tv . tv_usec ) ;
2012-03-07 06:12:15 -05:00
if ( c - > flags & REDIS_LUA_CLIENT ) {
2012-11-01 17:10:45 -04:00
cmdrepr = sdscatprintf ( cmdrepr , " [%d lua] " , dictid ) ;
} else if ( c - > flags & REDIS_UNIX_SOCKET ) {
cmdrepr = sdscatprintf ( cmdrepr , " [%d unix:%s] " , dictid , server . unixsocket ) ;
2012-03-07 06:12:15 -05:00
} else {
2013-07-09 10:21:21 -04:00
getClientPeerId ( c , peerid , sizeof ( peerid ) ) ;
cmdrepr = sdscatprintf ( cmdrepr , " [%d %s] " , dictid , peerid ) ;
2012-03-07 06:12:15 -05:00
}
2010-06-21 18:07:48 -04:00
for ( j = 0 ; j < argc ; j + + ) {
if ( argv [ j ] - > encoding = = REDIS_ENCODING_INT ) {
2010-07-01 14:22:46 -04:00
cmdrepr = sdscatprintf ( cmdrepr , " \" %ld \" " , ( long ) argv [ j ] - > ptr ) ;
2010-06-21 18:07:48 -04:00
} else {
cmdrepr = sdscatrepr ( cmdrepr , ( char * ) argv [ j ] - > ptr ,
sdslen ( argv [ j ] - > ptr ) ) ;
}
if ( j ! = argc - 1 )
cmdrepr = sdscatlen ( cmdrepr , " " , 1 ) ;
}
cmdrepr = sdscatlen ( cmdrepr , " \r \n " , 2 ) ;
cmdobj = createObject ( REDIS_STRING , cmdrepr ) ;
listRewind ( monitors , & li ) ;
while ( ( ln = listNext ( & li ) ) ) {
redisClient * monitor = ln - > value ;
addReply ( monitor , cmdobj ) ;
}
decrRefCount ( cmdobj ) ;
}
2013-01-30 12:33:16 -05:00
/* Feed the slave 'c' with the replication backlog starting from the
* specified ' offset ' up to the end of the backlog . */
long long addReplyReplicationBacklog ( redisClient * c , long long offset ) {
long long j , skip , len ;
2013-02-09 10:33:57 -05:00
redisLog ( REDIS_DEBUG , " [PSYNC] Slave request offset: %lld " , offset ) ;
2013-01-30 12:33:16 -05:00
if ( server . repl_backlog_histlen = = 0 ) {
2013-02-09 10:33:57 -05:00
redisLog ( REDIS_DEBUG , " [PSYNC] Backlog history len is zero " ) ;
2013-01-30 12:33:16 -05:00
return 0 ;
}
2013-02-09 10:33:57 -05:00
redisLog ( REDIS_DEBUG , " [PSYNC] Backlog size: %lld " ,
server . repl_backlog_size ) ;
redisLog ( REDIS_DEBUG , " [PSYNC] First byte: %lld " ,
server . repl_backlog_off ) ;
redisLog ( REDIS_DEBUG , " [PSYNC] History len: %lld " ,
server . repl_backlog_histlen ) ;
redisLog ( REDIS_DEBUG , " [PSYNC] Current index: %lld " ,
server . repl_backlog_idx ) ;
2013-01-30 12:33:16 -05:00
/* Compute the amount of bytes we need to discard. */
skip = offset - server . repl_backlog_off ;
2013-02-09 10:33:57 -05:00
redisLog ( REDIS_DEBUG , " [PSYNC] Skipping: %lld " , skip ) ;
2013-01-30 12:33:16 -05:00
/* Point j to the oldest byte, that is actaully our
* server . repl_backlog_off byte . */
j = ( server . repl_backlog_idx +
( server . repl_backlog_size - server . repl_backlog_histlen ) ) %
server . repl_backlog_size ;
2013-02-09 10:33:57 -05:00
redisLog ( REDIS_DEBUG , " [PSYNC] Index of first byte: %lld " , j ) ;
2013-01-30 12:33:16 -05:00
/* Discard the amount of data to seek to the specified 'offset'. */
j = ( j + skip ) % server . repl_backlog_size ;
/* Feed slave with data. Since it is a circular buffer we have to
* split the reply in two parts if we are cross - boundary . */
len = server . repl_backlog_histlen - skip ;
2013-02-09 10:33:57 -05:00
redisLog ( REDIS_DEBUG , " [PSYNC] Reply total length: %lld " , len ) ;
2013-01-30 12:33:16 -05:00
while ( len ) {
long long thislen =
( ( server . repl_backlog_size - j ) < len ) ?
( server . repl_backlog_size - j ) : len ;
2013-02-09 10:33:57 -05:00
redisLog ( REDIS_DEBUG , " [PSYNC] addReply() length: %lld " , thislen ) ;
2013-01-30 12:33:16 -05:00
addReplySds ( c , sdsnewlen ( server . repl_backlog + j , thislen ) ) ;
len - = thislen ;
j = 0 ;
}
return server . repl_backlog_histlen - skip ;
}
/* This function handles the PSYNC command from the point of view of a
* master receiving a request for partial resynchronization .
*
* On success return REDIS_OK , otherwise REDIS_ERR is returned and we proceed
* with the usual full resync . */
int masterTryPartialResynchronization ( redisClient * c ) {
long long psync_offset , psync_len ;
char * master_runid = c - > argv [ 1 ] - > ptr ;
2013-02-01 09:16:56 -05:00
char buf [ 128 ] ;
int buflen ;
2013-01-30 12:33:16 -05:00
/* Is the runid of this master the same advertised by the wannabe slave
* via PSYNC ? If runid changed this master is a different instance and
* there is no way to continue . */
if ( strcasecmp ( master_runid , server . runid ) ) {
/* Run id "?" is used by slaves that want to force a full resync. */
if ( master_runid [ 0 ] ! = ' ? ' ) {
redisLog ( REDIS_NOTICE , " Partial resynchronization not accepted: "
2013-09-30 05:48:09 -04:00
" Runid mismatch (Client asked for runid '%s', my runid is '%s') " ,
2013-01-30 12:33:16 -05:00
master_runid , server . runid ) ;
} else {
redisLog ( REDIS_NOTICE , " Full resync requested by slave. " ) ;
}
goto need_full_resync ;
}
/* We still have the data our slave is asking for? */
if ( getLongLongFromObjectOrReply ( c , c - > argv [ 2 ] , & psync_offset , NULL ) ! =
REDIS_OK ) goto need_full_resync ;
if ( ! server . repl_backlog | |
psync_offset < server . repl_backlog_off | |
2013-10-04 06:25:09 -04:00
psync_offset > ( server . repl_backlog_off + server . repl_backlog_histlen ) )
2013-01-30 12:33:16 -05:00
{
redisLog ( REDIS_NOTICE ,
" Unable to partial resync with the slave for lack of backlog (Slave request was: %lld). " , psync_offset ) ;
2013-10-04 06:25:09 -04:00
if ( psync_offset > server . master_repl_offset ) {
redisLog ( REDIS_WARNING ,
" Warning: slave tried to PSYNC with an offset that is greater than the master replication offset. " ) ;
}
2013-01-30 12:33:16 -05:00
goto need_full_resync ;
}
/* If we reached this point, we are able to perform a partial resync:
* 1 ) Set client state to make it a slave .
* 2 ) Inform the client we can continue with + CONTINUE
* 3 ) Send the backlog data ( from the offset to the end ) to the slave . */
c - > flags | = REDIS_SLAVE ;
c - > replstate = REDIS_REPL_ONLINE ;
2013-05-27 05:17:17 -04:00
c - > repl_ack_time = server . unixtime ;
2013-01-30 12:33:16 -05:00
listAddNodeTail ( server . slaves , c ) ;
2013-02-01 09:16:56 -05:00
/* We can't use the connection buffers since they are used to accumulate
* new commands at this stage . But we are sure the socket send buffer is
* emtpy so this write will never fail actually . */
buflen = snprintf ( buf , sizeof ( buf ) , " +CONTINUE \r \n " ) ;
if ( write ( c - > fd , buf , buflen ) ! = buflen ) {
freeClientAsync ( c ) ;
return REDIS_OK ;
}
2013-01-30 12:33:16 -05:00
psync_len = addReplyReplicationBacklog ( c , psync_offset ) ;
redisLog ( REDIS_NOTICE ,
" Partial resynchronization request accepted. Sending %lld bytes of backlog starting from offset %lld. " , psync_len , psync_offset ) ;
/* Note that we don't need to set the selected DB at server.slaveseldb
* to - 1 to force the master to emit SELECT , since the slave already
* has this state from the previous connection with the master . */
2013-05-30 06:13:25 -04:00
refreshGoodSlavesCount ( ) ;
2013-01-30 12:33:16 -05:00
return REDIS_OK ; /* The caller can return, no full resync needed. */
need_full_resync :
/* We need a full resync for some reason... notify the client. */
psync_offset = server . master_repl_offset ;
/* Add 1 to psync_offset if it the replication backlog does not exists
* as when it will be created later we ' ll increment the offset by one . */
if ( server . repl_backlog = = NULL ) psync_offset + + ;
2013-02-01 09:16:56 -05:00
/* Again, we can't use the connection buffers (see above). */
buflen = snprintf ( buf , sizeof ( buf ) , " +FULLRESYNC %s %lld \r \n " ,
server . runid , psync_offset ) ;
if ( write ( c - > fd , buf , buflen ) ! = buflen ) {
freeClientAsync ( c ) ;
return REDIS_OK ;
}
2013-01-30 12:33:16 -05:00
return REDIS_ERR ;
}
/* SYNC ad PSYNC command implemenation. */
2010-06-21 18:07:48 -04:00
void syncCommand ( redisClient * c ) {
2013-01-16 12:00:20 -05:00
/* ignore SYNC if already slave or in monitor mode */
2010-06-21 18:07:48 -04:00
if ( c - > flags & REDIS_SLAVE ) return ;
2010-08-24 10:04:13 -04:00
/* Refuse SYNC requests if we are a slave but the link with our master
* is not ok . . . */
2011-12-21 06:23:18 -05:00
if ( server . masterhost & & server . repl_state ! = REDIS_REPL_CONNECTED ) {
2010-09-02 13:52:24 -04:00
addReplyError ( c , " Can't SYNC while not connected with my master " ) ;
2010-08-24 10:04:13 -04:00
return ;
}
2010-06-21 18:07:48 -04:00
/* SYNC can't be issued when the server has pending data to send to
* the client about already issued commands . We need a fresh reply
* buffer registering the differences between the BGSAVE and the current
* dataset , so that we can copy to other slaves if needed . */
2013-02-01 09:05:43 -05:00
if ( listLength ( c - > reply ) ! = 0 | | c - > bufpos ! = 0 ) {
addReplyError ( c , " SYNC and PSYNC are invalid with pending output " ) ;
2010-06-21 18:07:48 -04:00
return ;
}
2013-01-30 12:33:16 -05:00
redisLog ( REDIS_NOTICE , " Slave asks for synchronization " ) ;
/* Try a partial resynchronization if this is a PSYNC command.
* If it fails , we continue with usual full resynchronization , however
* when this happens masterTryPartialResynchronization ( ) already
* replied with :
*
* + FULLRESYNC < runid > < offset >
*
* So the slave knows the new runid and offset to try a PSYNC later
* if the connection with the master is lost . */
2013-02-12 09:24:25 -05:00
if ( ! strcasecmp ( c - > argv [ 0 ] - > ptr , " psync " ) ) {
if ( masterTryPartialResynchronization ( c ) = = REDIS_OK ) {
server . stat_sync_partial_ok + + ;
return ; /* No full resync needed, return. */
} else {
char * master_runid = c - > argv [ 1 ] - > ptr ;
/* Increment stats for failed PSYNCs, but only if the
* runid is not " ? " , as this is used by slaves to force a full
* resync on purpose when they are not albe to partially
* resync . */
if ( master_runid [ 0 ] ! = ' ? ' ) server . stat_sync_partial_err + + ;
}
2013-06-26 04:11:20 -04:00
} else {
/* If a slave uses SYNC, we are dealing with an old implementation
* of the replication protocol ( like redis - cli - - slave ) . Flag the client
* so that we don ' t expect to receive REPLCONF ACK feedbacks . */
c - > flags | = REDIS_PRE_PSYNC_SLAVE ;
2013-02-12 09:24:25 -05:00
}
/* Full resynchronization. */
server . stat_sync_full + + ;
2013-01-30 12:33:16 -05:00
2010-06-21 18:07:48 -04:00
/* Here we need to check if there is a background saving operation
* in progress , or if it is required to start one */
2011-12-21 06:22:13 -05:00
if ( server . rdb_child_pid ! = - 1 ) {
2010-06-21 18:07:48 -04:00
/* Ok a background save is in progress. Let's check if it is a good
* one for replication , i . e . if there is another slave that is
* registering differences since the server forked to save */
redisClient * slave ;
listNode * ln ;
listIter li ;
listRewind ( server . slaves , & li ) ;
while ( ( ln = listNext ( & li ) ) ) {
slave = ln - > value ;
if ( slave - > replstate = = REDIS_REPL_WAIT_BGSAVE_END ) break ;
}
if ( ln ) {
/* Perfect, the server is already registering differences for
* another slave . Set the right state , and copy the buffer . */
2011-12-30 13:34:40 -05:00
copyClientOutputBuffer ( c , slave ) ;
2010-06-21 18:07:48 -04:00
c - > replstate = REDIS_REPL_WAIT_BGSAVE_END ;
redisLog ( REDIS_NOTICE , " Waiting for end of BGSAVE for SYNC " ) ;
} else {
/* No way, we need to wait for the next BGSAVE in order to
* register differences */
c - > replstate = REDIS_REPL_WAIT_BGSAVE_START ;
redisLog ( REDIS_NOTICE , " Waiting for next BGSAVE for SYNC " ) ;
}
} else {
/* Ok we don't have a BGSAVE in progress, let's start one */
redisLog ( REDIS_NOTICE , " Starting BGSAVE for SYNC " ) ;
2011-12-21 06:22:13 -05:00
if ( rdbSaveBackground ( server . rdb_filename ) ! = REDIS_OK ) {
2010-06-21 18:07:48 -04:00
redisLog ( REDIS_NOTICE , " Replication failed, can't BGSAVE " ) ;
2010-09-02 13:52:24 -04:00
addReplyError ( c , " Unable to perform background save " ) ;
2010-06-21 18:07:48 -04:00
return ;
}
c - > replstate = REDIS_REPL_WAIT_BGSAVE_END ;
2013-06-24 12:57:31 -04:00
/* Flush the script cache for the new slave. */
replicationScriptCacheFlush ( ) ;
2010-06-21 18:07:48 -04:00
}
2013-01-31 05:14:15 -05:00
if ( server . repl_disable_tcp_nodelay )
anetDisableTcpNoDelay ( NULL , c - > fd ) ; /* Non critical if it fails. */
2010-06-21 18:07:48 -04:00
c - > repldbfd = - 1 ;
c - > flags | = REDIS_SLAVE ;
2012-11-02 11:31:28 -04:00
server . slaveseldb = - 1 ; /* Force to re-emit the SELECT command. */
2010-06-21 18:07:48 -04:00
listAddNodeTail ( server . slaves , c ) ;
2013-01-30 12:33:16 -05:00
if ( listLength ( server . slaves ) = = 1 & & server . repl_backlog = = NULL )
createReplicationBacklog ( ) ;
2010-06-21 18:07:48 -04:00
return ;
}
2012-06-26 03:47:47 -04:00
/* REPLCONF <option> <value> <option> <value> ...
* This command is used by a slave in order to configure the replication
* process before starting it with the SYNC command .
*
* Currently the only use of this command is to communicate to the master
* what is the listening port of the Slave redis instance , so that the
* master can accurately list slaves and their listening ports in
* the INFO output .
*
* In the future the same command can be used in order to configure
* the replication to initiate an incremental replication instead of a
* full resync . */
void replconfCommand ( redisClient * c ) {
int j ;
if ( ( c - > argc % 2 ) = = 0 ) {
/* Number of arguments must be odd to make sure that every
* option has a corresponding value . */
addReply ( c , shared . syntaxerr ) ;
return ;
}
/* Process every option-value pair. */
for ( j = 1 ; j < c - > argc ; j + = 2 ) {
if ( ! strcasecmp ( c - > argv [ j ] - > ptr , " listening-port " ) ) {
long port ;
if ( ( getLongFromObjectOrReply ( c , c - > argv [ j + 1 ] ,
& port , NULL ) ! = REDIS_OK ) )
return ;
c - > slave_listening_port = port ;
2013-05-24 18:37:56 -04:00
} else if ( ! strcasecmp ( c - > argv [ j ] - > ptr , " ack " ) ) {
/* REPLCONF ACK is used by slave to inform the master the amount
* of replication stream that it processed so far . It is an
* internal only command that normal clients should never use . */
long long offset ;
if ( ! ( c - > flags & REDIS_SLAVE ) ) return ;
if ( ( getLongLongFromObject ( c - > argv [ j + 1 ] , & offset ) ! = REDIS_OK ) )
return ;
if ( offset > c - > repl_ack_off )
c - > repl_ack_off = offset ;
c - > repl_ack_time = server . unixtime ;
/* Note: this command does not reply anything! */
2013-05-24 19:22:27 -04:00
return ;
2012-06-26 03:47:47 -04:00
} else {
addReplyErrorFormat ( c , " Unrecognized REPLCONF option: %s " ,
( char * ) c - > argv [ j ] - > ptr ) ;
return ;
}
}
addReply ( c , shared . ok ) ;
}
2010-06-21 18:07:48 -04:00
void sendBulkToSlave ( aeEventLoop * el , int fd , void * privdata , int mask ) {
redisClient * slave = privdata ;
REDIS_NOTUSED ( el ) ;
REDIS_NOTUSED ( mask ) ;
char buf [ REDIS_IOBUF_LEN ] ;
ssize_t nwritten , buflen ;
2013-08-12 04:29:14 -04:00
/* Before sending the RDB file, we send the preamble as configured by the
* replication process . Currently the preamble is just the bulk count of
* the file in the form " $<length> \r \n " . */
if ( slave - > replpreamble ) {
nwritten = write ( fd , slave - > replpreamble , sdslen ( slave - > replpreamble ) ) ;
if ( nwritten = = - 1 ) {
redisLog ( REDIS_VERBOSE , " Write error sending RDB preamble to slave: %s " ,
strerror ( errno ) ) ;
2010-06-21 18:07:48 -04:00
freeClient ( slave ) ;
return ;
}
2013-08-12 04:29:14 -04:00
sdsrange ( slave - > replpreamble , nwritten , - 1 ) ;
if ( sdslen ( slave - > replpreamble ) = = 0 ) {
sdsfree ( slave - > replpreamble ) ;
slave - > replpreamble = NULL ;
/* fall through sending data. */
} else {
return ;
}
2010-06-21 18:07:48 -04:00
}
2013-08-12 04:29:14 -04:00
/* If the preamble was already transfered, send the RDB bulk data. */
2010-06-21 18:07:48 -04:00
lseek ( slave - > repldbfd , slave - > repldboff , SEEK_SET ) ;
buflen = read ( slave - > repldbfd , buf , REDIS_IOBUF_LEN ) ;
if ( buflen < = 0 ) {
redisLog ( REDIS_WARNING , " Read error sending DB to slave: %s " ,
( buflen = = 0 ) ? " premature EOF " : strerror ( errno ) ) ;
freeClient ( slave ) ;
return ;
}
if ( ( nwritten = write ( fd , buf , buflen ) ) = = - 1 ) {
redisLog ( REDIS_VERBOSE , " Write error sending DB to slave: %s " ,
strerror ( errno ) ) ;
freeClient ( slave ) ;
return ;
}
slave - > repldboff + = nwritten ;
if ( slave - > repldboff = = slave - > repldbsize ) {
close ( slave - > repldbfd ) ;
slave - > repldbfd = - 1 ;
aeDeleteFileEvent ( server . el , slave - > fd , AE_WRITABLE ) ;
slave - > replstate = REDIS_REPL_ONLINE ;
2013-05-27 05:17:17 -04:00
slave - > repl_ack_time = server . unixtime ;
2010-06-21 18:07:48 -04:00
if ( aeCreateFileEvent ( server . el , slave - > fd , AE_WRITABLE ,
sendReplyToClient , slave ) = = AE_ERR ) {
freeClient ( slave ) ;
return ;
}
2013-05-30 06:13:25 -04:00
refreshGoodSlavesCount ( ) ;
2010-06-21 18:07:48 -04:00
redisLog ( REDIS_NOTICE , " Synchronization with slave succeeded " ) ;
}
}
2013-01-16 12:00:20 -05:00
/* This function is called at the end of every background saving.
2010-06-21 18:07:48 -04:00
* The argument bgsaveerr is REDIS_OK if the background saving succeeded
* otherwise REDIS_ERR is passed to the function .
*
* The goal of this function is to handle slaves waiting for a successful
* background saving in order to perform non - blocking synchronization . */
void updateSlavesWaitingBgsave ( int bgsaveerr ) {
listNode * ln ;
int startbgsave = 0 ;
listIter li ;
listRewind ( server . slaves , & li ) ;
while ( ( ln = listNext ( & li ) ) ) {
redisClient * slave = ln - > value ;
if ( slave - > replstate = = REDIS_REPL_WAIT_BGSAVE_START ) {
startbgsave = 1 ;
slave - > replstate = REDIS_REPL_WAIT_BGSAVE_END ;
} else if ( slave - > replstate = = REDIS_REPL_WAIT_BGSAVE_END ) {
struct redis_stat buf ;
if ( bgsaveerr ! = REDIS_OK ) {
freeClient ( slave ) ;
redisLog ( REDIS_WARNING , " SYNC failed. BGSAVE child returned an error " ) ;
continue ;
}
2011-12-21 06:22:13 -05:00
if ( ( slave - > repldbfd = open ( server . rdb_filename , O_RDONLY ) ) = = - 1 | |
2010-06-21 18:07:48 -04:00
redis_fstat ( slave - > repldbfd , & buf ) = = - 1 ) {
freeClient ( slave ) ;
redisLog ( REDIS_WARNING , " SYNC failed. Can't open/stat DB after BGSAVE: %s " , strerror ( errno ) ) ;
continue ;
}
slave - > repldboff = 0 ;
slave - > repldbsize = buf . st_size ;
slave - > replstate = REDIS_REPL_SEND_BULK ;
2013-08-12 04:29:14 -04:00
slave - > replpreamble = sdscatprintf ( sdsempty ( ) , " $%lld \r \n " ,
( unsigned long long ) slave - > repldbsize ) ;
2010-06-21 18:07:48 -04:00
aeDeleteFileEvent ( server . el , slave - > fd , AE_WRITABLE ) ;
if ( aeCreateFileEvent ( server . el , slave - > fd , AE_WRITABLE , sendBulkToSlave , slave ) = = AE_ERR ) {
freeClient ( slave ) ;
continue ;
}
}
}
if ( startbgsave ) {
2013-06-24 12:57:31 -04:00
/* Since we are starting a new background save for one or more slaves,
* we flush the Replication Script Cache to use EVAL to propagate every
* new EVALSHA for the first time , since all the new slaves don ' t know
* about previous scripts . */
replicationScriptCacheFlush ( ) ;
2011-12-21 06:22:13 -05:00
if ( rdbSaveBackground ( server . rdb_filename ) ! = REDIS_OK ) {
2010-06-21 18:07:48 -04:00
listIter li ;
listRewind ( server . slaves , & li ) ;
redisLog ( REDIS_WARNING , " SYNC failed. BGSAVE failed " ) ;
while ( ( ln = listNext ( & li ) ) ) {
redisClient * slave = ln - > value ;
if ( slave - > replstate = = REDIS_REPL_WAIT_BGSAVE_START )
freeClient ( slave ) ;
}
}
}
}
2010-11-04 12:29:53 -04:00
/* ----------------------------------- SLAVE -------------------------------- */
/* Abort the async download of the bulk dataset while SYNC-ing with master */
void replicationAbortSyncTransfer ( void ) {
2011-12-21 06:23:18 -05:00
redisAssert ( server . repl_state = = REDIS_REPL_TRANSFER ) ;
2010-11-04 12:29:53 -04:00
aeDeleteFileEvent ( server . el , server . repl_transfer_s , AE_READABLE ) ;
close ( server . repl_transfer_s ) ;
close ( server . repl_transfer_fd ) ;
unlink ( server . repl_transfer_tmpfile ) ;
zfree ( server . repl_transfer_tmpfile ) ;
2011-12-21 06:23:18 -05:00
server . repl_state = REDIS_REPL_CONNECT ;
2010-11-04 12:29:53 -04:00
}
/* Asynchronously read the SYNC payload we receive from a master */
2012-08-24 13:28:44 -04:00
# define REPL_MAX_WRITTEN_BEFORE_FSYNC (1024*1024*8) /* 8 MB */
2010-11-04 12:29:53 -04:00
void readSyncBulkPayload ( aeEventLoop * el , int fd , void * privdata , int mask ) {
2010-11-04 13:09:35 -04:00
char buf [ 4096 ] ;
2010-11-04 12:35:03 -04:00
ssize_t nread , readlen ;
2012-08-24 13:28:44 -04:00
off_t left ;
2010-11-04 12:35:03 -04:00
REDIS_NOTUSED ( el ) ;
REDIS_NOTUSED ( privdata ) ;
REDIS_NOTUSED ( mask ) ;
2010-11-04 12:29:53 -04:00
2012-08-24 13:28:44 -04:00
/* If repl_transfer_size == -1 we still have to read the bulk length
2010-11-04 13:09:35 -04:00
* from the master reply . */
2012-08-24 13:28:44 -04:00
if ( server . repl_transfer_size = = - 1 ) {
2012-03-31 05:23:30 -04:00
if ( syncReadLine ( fd , buf , 1024 , server . repl_syncio_timeout * 1000 ) = = - 1 ) {
2010-11-04 13:09:35 -04:00
redisLog ( REDIS_WARNING ,
" I/O error reading bulk count from MASTER: %s " ,
strerror ( errno ) ) ;
2011-05-22 06:41:24 -04:00
goto error ;
2010-11-04 13:09:35 -04:00
}
2011-05-22 06:41:24 -04:00
2010-11-04 13:09:35 -04:00
if ( buf [ 0 ] = = ' - ' ) {
redisLog ( REDIS_WARNING ,
" MASTER aborted replication with an error: %s " ,
buf + 1 ) ;
2011-05-22 06:41:24 -04:00
goto error ;
2011-01-20 07:18:23 -05:00
} else if ( buf [ 0 ] = = ' \0 ' ) {
/* At this stage just a newline works as a PING in order to take
* the connection live . So we refresh our last interaction
* timestamp . */
2012-03-27 11:39:58 -04:00
server . repl_transfer_lastio = server . unixtime ;
2011-01-20 07:18:23 -05:00
return ;
2010-11-04 13:09:35 -04:00
} else if ( buf [ 0 ] ! = ' $ ' ) {
2013-02-01 07:01:01 -05:00
redisLog ( REDIS_WARNING , " Bad protocol from MASTER, the first byte is not '$' (we received '%s'), are you sure the host and port are right? " , buf ) ;
2011-05-22 06:41:24 -04:00
goto error ;
2010-11-04 13:09:35 -04:00
}
2012-08-24 13:28:44 -04:00
server . repl_transfer_size = strtol ( buf + 1 , NULL , 10 ) ;
2010-11-04 13:09:35 -04:00
redisLog ( REDIS_NOTICE ,
2013-02-27 06:27:15 -05:00
" MASTER <-> SLAVE sync: receiving %lld bytes from master " ,
( long long ) server . repl_transfer_size ) ;
2010-11-04 13:09:35 -04:00
return ;
}
/* Read bulk data */
2012-08-24 13:28:44 -04:00
left = server . repl_transfer_size - server . repl_transfer_read ;
readlen = ( left < ( signed ) sizeof ( buf ) ) ? left : ( signed ) sizeof ( buf ) ;
2010-11-04 12:29:53 -04:00
nread = read ( fd , buf , readlen ) ;
if ( nread < = 0 ) {
redisLog ( REDIS_WARNING , " I/O error trying to sync with MASTER: %s " ,
( nread = = - 1 ) ? strerror ( errno ) : " connection lost " ) ;
replicationAbortSyncTransfer ( ) ;
return ;
}
2012-03-27 11:39:58 -04:00
server . repl_transfer_lastio = server . unixtime ;
2010-11-04 12:29:53 -04:00
if ( write ( server . repl_transfer_fd , buf , nread ) ! = nread ) {
2012-04-25 15:21:56 -04:00
redisLog ( REDIS_WARNING , " Write error or short write writing to the DB dump file needed for MASTER <-> SLAVE synchronization: %s " , strerror ( errno ) ) ;
2011-05-22 06:41:24 -04:00
goto error ;
2010-11-04 12:29:53 -04:00
}
2012-08-24 13:28:44 -04:00
server . repl_transfer_read + = nread ;
/* Sync data on disk from time to time, otherwise at the end of the transfer
* we may suffer a big delay as the memory buffers are copied into the
* actual disk . */
if ( server . repl_transfer_read > =
server . repl_transfer_last_fsync_off + REPL_MAX_WRITTEN_BEFORE_FSYNC )
{
off_t sync_size = server . repl_transfer_read -
server . repl_transfer_last_fsync_off ;
rdb_fsync_range ( server . repl_transfer_fd ,
server . repl_transfer_last_fsync_off , sync_size ) ;
server . repl_transfer_last_fsync_off + = sync_size ;
}
2010-11-04 12:29:53 -04:00
/* Check if the transfer is now complete */
2012-08-24 13:28:44 -04:00
if ( server . repl_transfer_read = = server . repl_transfer_size ) {
2011-12-21 06:22:13 -05:00
if ( rename ( server . repl_transfer_tmpfile , server . rdb_filename ) = = - 1 ) {
2010-11-04 12:29:53 -04:00
redisLog ( REDIS_WARNING , " Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s " , strerror ( errno ) ) ;
replicationAbortSyncTransfer ( ) ;
return ;
}
2010-11-04 13:14:20 -04:00
redisLog ( REDIS_NOTICE , " MASTER <-> SLAVE sync: Loading DB in memory " ) ;
2013-02-08 04:26:19 -05:00
signalFlushedDb ( - 1 ) ;
2010-11-04 12:29:53 -04:00
emptyDb ( ) ;
2010-11-12 14:02:20 -05:00
/* Before loading the DB into memory we need to delete the readable
* handler , otherwise it will get called recursively since
* rdbLoad ( ) will call the event loop to process events from time to
* time for non blocking loading . */
aeDeleteFileEvent ( server . el , server . repl_transfer_s , AE_READABLE ) ;
2011-12-21 06:22:13 -05:00
if ( rdbLoad ( server . rdb_filename ) ! = REDIS_OK ) {
2010-11-04 12:29:53 -04:00
redisLog ( REDIS_WARNING , " Failed trying to load the MASTER synchronization DB from disk " ) ;
replicationAbortSyncTransfer ( ) ;
return ;
}
/* Final setup of the connected slave <- master link */
zfree ( server . repl_transfer_tmpfile ) ;
close ( server . repl_transfer_fd ) ;
server . master = createClient ( server . repl_transfer_s ) ;
server . master - > flags | = REDIS_MASTER ;
server . master - > authenticated = 1 ;
2011-12-21 06:23:18 -05:00
server . repl_state = REDIS_REPL_CONNECTED ;
2013-01-30 12:33:16 -05:00
server . master - > reploff = server . repl_master_initial_offset ;
memcpy ( server . master - > replrunid , server . repl_master_runid ,
sizeof ( server . repl_master_runid ) ) ;
2010-11-04 13:09:35 -04:00
redisLog ( REDIS_NOTICE , " MASTER <-> SLAVE sync: Finished with success " ) ;
2011-12-15 10:07:49 -05:00
/* Restart the AOF subsystem now that we finished the sync. This
* will trigger an AOF rewrite , and when done will start appending
* to the new file . */
2011-12-21 04:31:34 -05:00
if ( server . aof_state ! = REDIS_AOF_OFF ) {
2011-12-15 10:07:49 -05:00
int retry = 10 ;
stopAppendOnly ( ) ;
while ( retry - - & & startAppendOnly ( ) = = REDIS_ERR ) {
2013-01-16 12:00:20 -05:00
redisLog ( REDIS_WARNING , " Failed enabling the AOF after successful master synchronization! Trying it again in one second. " ) ;
2011-12-15 10:07:49 -05:00
sleep ( 1 ) ;
}
if ( ! retry ) {
redisLog ( REDIS_WARNING , " FATAL: this slave instance finished the synchronization with its master, but the AOF can't be turned on. Exiting now. " ) ;
exit ( 1 ) ;
}
}
2010-11-04 12:29:53 -04:00
}
2011-05-22 06:41:24 -04:00
return ;
error :
replicationAbortSyncTransfer ( ) ;
return ;
2010-11-04 12:29:53 -04:00
}
2012-06-26 03:47:47 -04:00
/* Send a synchronous command to the master. Used to send AUTH and
2012-06-27 05:26:37 -04:00
* REPLCONF commands before starting the replication with SYNC .
2012-06-26 03:47:47 -04:00
*
2013-01-30 12:33:16 -05:00
* The command returns an sds string representing the result of the
* operation . On error the first byte is a " - " .
2012-06-26 03:47:47 -04:00
*/
char * sendSynchronousCommand ( int fd , . . . ) {
va_list ap ;
sds cmd = sdsempty ( ) ;
char * arg , buf [ 256 ] ;
/* Create the command to send to the master, we use simple inline
* protocol for simplicity as currently we only send simple strings . */
va_start ( ap , fd ) ;
while ( 1 ) {
arg = va_arg ( ap , char * ) ;
if ( arg = = NULL ) break ;
if ( sdslen ( cmd ) ! = 0 ) cmd = sdscatlen ( cmd , " " , 1 ) ;
cmd = sdscat ( cmd , arg ) ;
}
cmd = sdscatlen ( cmd , " \r \n " , 2 ) ;
/* Transfer command to the server. */
if ( syncWrite ( fd , cmd , sdslen ( cmd ) , server . repl_syncio_timeout * 1000 ) = = - 1 ) {
sdsfree ( cmd ) ;
2013-01-30 12:33:16 -05:00
return sdscatprintf ( sdsempty ( ) , " -Writing to master: %s " ,
2012-06-26 03:47:47 -04:00
strerror ( errno ) ) ;
}
sdsfree ( cmd ) ;
/* Read the reply from the server. */
if ( syncReadLine ( fd , buf , sizeof ( buf ) , server . repl_syncio_timeout * 1000 ) = = - 1 )
{
2013-01-30 12:33:16 -05:00
return sdscatprintf ( sdsempty ( ) , " -Reading from master: %s " ,
2012-06-26 03:47:47 -04:00
strerror ( errno ) ) ;
}
2013-01-30 12:33:16 -05:00
return sdsnew ( buf ) ;
}
/* Try a partial resynchronization with the master if we are about to reconnect.
* If there is no cached master structure , at least try to issue a
* " PSYNC ? -1 " command in order to trigger a full resync using the PSYNC
* command in order to obtain the master run id and the master replication
* global offset .
*
* This function is designed to be called from syncWithMaster ( ) , so the
* following assumptions are made :
*
* 1 ) We pass the function an already connected socket " fd " .
* 2 ) This function does not close the file descriptor " fd " . However in case
* of successful partial resynchronization , the function will reuse
* ' fd ' as file descriptor of the server . master client structure .
*
* The function returns :
*
* PSYNC_CONTINUE : If the PSYNC command succeded and we can continue .
* PSYNC_FULLRESYNC : If PSYNC is supported but a full resync is needed .
* In this case the master run_id and global replication
* offset is saved .
* PSYNC_NOT_SUPPORTED : If the server does not understand PSYNC at all and
* the caller should fall back to SYNC .
*/
# define PSYNC_CONTINUE 0
# define PSYNC_FULLRESYNC 1
# define PSYNC_NOT_SUPPORTED 2
int slaveTryPartialResynchronization ( int fd ) {
char * psync_runid ;
char psync_offset [ 32 ] ;
sds reply ;
/* Initially set repl_master_initial_offset to -1 to mark the current
* master run_id and offset as not valid . Later if we ' ll be able to do
* a FULL resync using the PSYNC command we ' ll set the offset at the
* right value , so that this information will be propagated to the
* client structure representing the master into server . master . */
server . repl_master_initial_offset = - 1 ;
if ( server . cached_master ) {
psync_runid = server . cached_master - > replrunid ;
snprintf ( psync_offset , sizeof ( psync_offset ) , " %lld " , server . cached_master - > reploff + 1 ) ;
redisLog ( REDIS_NOTICE , " Trying a partial resynchronization (request %s:%s). " , psync_runid , psync_offset ) ;
} else {
redisLog ( REDIS_NOTICE , " Partial resynchronization not possible (no cached master) " ) ;
psync_runid = " ? " ;
memcpy ( psync_offset , " -1 " , 3 ) ;
}
/* Issue the PSYNC command */
reply = sendSynchronousCommand ( fd , " PSYNC " , psync_runid , psync_offset , NULL ) ;
if ( ! strncmp ( reply , " +FULLRESYNC " , 11 ) ) {
2013-02-01 09:28:02 -05:00
char * runid = NULL , * offset = NULL ;
2013-01-30 12:33:16 -05:00
/* FULL RESYNC, parse the reply in order to extract the run id
* and the replication offset . */
runid = strchr ( reply , ' ' ) ;
if ( runid ) {
runid + + ;
offset = strchr ( runid , ' ' ) ;
if ( offset ) offset + + ;
}
if ( ! runid | | ! offset | | ( offset - runid - 1 ) ! = REDIS_RUN_ID_SIZE ) {
redisLog ( REDIS_WARNING ,
" Master replied with wrong +FULLRESYNC syntax. " ) ;
2013-02-13 12:33:33 -05:00
/* This is an unexpected condition, actually the +FULLRESYNC
* reply means that the master supports PSYNC , but the reply
* format seems wrong . To stay safe we blank the master
2013-02-13 12:43:40 -05:00
* runid to make sure next PSYNCs will fail . */
2013-02-13 12:33:33 -05:00
memset ( server . repl_master_runid , 0 , REDIS_RUN_ID_SIZE + 1 ) ;
2013-01-30 12:33:16 -05:00
} else {
memcpy ( server . repl_master_runid , runid , offset - runid - 1 ) ;
server . repl_master_runid [ REDIS_RUN_ID_SIZE ] = ' \0 ' ;
server . repl_master_initial_offset = strtoll ( offset , NULL , 10 ) ;
redisLog ( REDIS_NOTICE , " Full resync from master: %s:%lld " ,
server . repl_master_runid ,
server . repl_master_initial_offset ) ;
}
/* We are going to full resync, discard the cached master structure. */
replicationDiscardCachedMaster ( ) ;
sdsfree ( reply ) ;
return PSYNC_FULLRESYNC ;
}
2012-06-26 03:47:47 -04:00
2013-01-30 12:33:16 -05:00
if ( ! strncmp ( reply , " +CONTINUE " , 9 ) ) {
/* Partial resync was accepted, set the replication state accordingly */
redisLog ( REDIS_NOTICE ,
" Successful partial resynchronization with master. " ) ;
sdsfree ( reply ) ;
replicationResurrectCachedMaster ( fd ) ;
return PSYNC_CONTINUE ;
2012-06-26 03:47:47 -04:00
}
2013-01-30 12:33:16 -05:00
/* If we reach this point we receied either an error since the master does
* not understand PSYNC , or an unexpected reply from the master .
* Reply with PSYNC_NOT_SUPPORTED in both cases . */
if ( strncmp ( reply , " -ERR " , 4 ) ) {
/* If it's not an error, log the unexpected event. */
redisLog ( REDIS_WARNING ,
" Unexpected reply to PSYNC from master: %s " , reply ) ;
} else {
redisLog ( REDIS_NOTICE ,
" Master does not support PSYNC or is in "
" error state (reply: %s) " , reply ) ;
}
sdsfree ( reply ) ;
replicationDiscardCachedMaster ( ) ;
return PSYNC_NOT_SUPPORTED ;
2012-06-26 03:47:47 -04:00
}
2011-05-19 12:53:06 -04:00
void syncWithMaster ( aeEventLoop * el , int fd , void * privdata , int mask ) {
2012-06-26 03:47:47 -04:00
char tmpfile [ 256 ] , * err ;
2010-06-21 18:07:48 -04:00
int dfd , maxtries = 5 ;
2013-01-30 12:33:16 -05:00
int sockerr = 0 , psync_result ;
2012-08-31 09:32:57 -04:00
socklen_t errlen = sizeof ( sockerr ) ;
2011-05-19 12:53:06 -04:00
REDIS_NOTUSED ( el ) ;
REDIS_NOTUSED ( privdata ) ;
REDIS_NOTUSED ( mask ) ;
2010-06-21 18:07:48 -04:00
2011-10-18 05:09:32 -04:00
/* If this event fired after the user turned the instance into a master
* with SLAVEOF NO ONE we must just return ASAP . */
2011-12-21 06:23:18 -05:00
if ( server . repl_state = = REDIS_REPL_NONE ) {
2011-10-18 05:09:32 -04:00
close ( fd ) ;
return ;
}
2012-08-31 09:32:57 -04:00
/* Check for errors in the socket. */
if ( getsockopt ( fd , SOL_SOCKET , SO_ERROR , & sockerr , & errlen ) = = - 1 )
sockerr = errno ;
if ( sockerr ) {
aeDeleteFileEvent ( server . el , fd , AE_READABLE | AE_WRITABLE ) ;
redisLog ( REDIS_WARNING , " Error condition on socket for SYNC: %s " ,
strerror ( sockerr ) ) ;
goto error ;
}
/* If we were connecting, it's time to send a non blocking PING, we want to
* make sure the master is able to reply before going into the actual
* replication process where we have long timeouts in the order of
* seconds ( in the meantime the slave would block ) . */
if ( server . repl_state = = REDIS_REPL_CONNECTING ) {
redisLog ( REDIS_NOTICE , " Non blocking connect for SYNC fired the event. " ) ;
/* Delete the writable event so that the readable event remains
* registered and we can wait for the PONG reply . */
aeDeleteFileEvent ( server . el , fd , AE_WRITABLE ) ;
server . repl_state = REDIS_REPL_RECEIVE_PONG ;
/* Send the PING, don't check for errors at all, we have the timeout
* that will take care about this . */
syncWrite ( fd , " PING \r \n " , 6 , 100 ) ;
return ;
}
/* Receive the PONG command. */
if ( server . repl_state = = REDIS_REPL_RECEIVE_PONG ) {
char buf [ 1024 ] ;
/* Delete the readable event, we no longer need it now that there is
* the PING reply to read . */
aeDeleteFileEvent ( server . el , fd , AE_READABLE ) ;
/* Read the reply with explicit timeout. */
buf [ 0 ] = ' \0 ' ;
if ( syncReadLine ( fd , buf , sizeof ( buf ) ,
server . repl_syncio_timeout * 1000 ) = = - 1 )
{
redisLog ( REDIS_WARNING ,
" I/O error reading PING reply from master: %s " ,
strerror ( errno ) ) ;
goto error ;
}
2013-02-12 10:53:27 -05:00
/* We accept only two replies as valid, a positive +PONG reply
* ( we just check for " + " ) or an authentication error .
* Note that older versions of Redis replied with " operation not
* permitted " instead of using a proper error code, so we test
* both . */
if ( buf [ 0 ] ! = ' + ' & &
strncmp ( buf , " -NOAUTH " , 7 ) ! = 0 & &
strncmp ( buf , " -ERR operation not permitted " , 28 ) ! = 0 )
{
redisLog ( REDIS_WARNING , " Error reply to PING from master: '%s' " , buf ) ;
2012-08-31 09:32:57 -04:00
goto error ;
} else {
redisLog ( REDIS_NOTICE ,
" Master replied to PING, replication can continue... " ) ;
}
}
2010-06-21 18:07:48 -04:00
/* AUTH with the master if required. */
if ( server . masterauth ) {
2012-06-26 03:47:47 -04:00
err = sendSynchronousCommand ( fd , " AUTH " , server . masterauth , NULL ) ;
2013-01-30 12:33:16 -05:00
if ( err [ 0 ] = = ' - ' ) {
2012-06-26 03:47:47 -04:00
redisLog ( REDIS_WARNING , " Unable to AUTH to MASTER: %s " , err ) ;
sdsfree ( err ) ;
2011-05-19 12:53:06 -04:00
goto error ;
}
2013-01-30 12:33:16 -05:00
sdsfree ( err ) ;
2012-06-26 03:47:47 -04:00
}
/* Set the slave port, so that Master's INFO command can list the
* slave listening port correctly . */
{
sds port = sdsfromlonglong ( server . port ) ;
err = sendSynchronousCommand ( fd , " REPLCONF " , " listening-port " , port ,
NULL ) ;
sdsfree ( port ) ;
/* Ignore the error if any, not all the Redis versions support
* REPLCONF listening - port . */
2013-01-30 12:33:16 -05:00
if ( err [ 0 ] = = ' - ' ) {
redisLog ( REDIS_NOTICE , " (Non critical) Master does not understand REPLCONF listening-port: %s " , err ) ;
2010-06-21 18:07:48 -04:00
}
2013-01-30 12:33:16 -05:00
sdsfree ( err ) ;
2010-06-21 18:07:48 -04:00
}
2013-01-30 12:33:16 -05:00
/* Try a partial resynchonization. If we don't have a cached master
* slaveTryPartialResynchronization ( ) will at least try to use PSYNC
* to start a full resynchronization so that we get the master run id
* and the global offset , to try a partial resync at the next
* reconnection attempt . */
psync_result = slaveTryPartialResynchronization ( fd ) ;
if ( psync_result = = PSYNC_CONTINUE ) {
redisLog ( REDIS_NOTICE , " MASTER <-> SLAVE sync: Master accepted a Partial Resynchronization. " ) ;
return ;
}
/* Fall back to SYNC if needed. Otherwise psync_result == PSYNC_FULLRESYNC
* and the server . repl_master_runid and repl_master_initial_offset are
* already populated . */
if ( psync_result = = PSYNC_NOT_SUPPORTED ) {
redisLog ( REDIS_NOTICE , " Retrying with SYNC... " ) ;
if ( syncWrite ( fd , " SYNC \r \n " , 6 , server . repl_syncio_timeout * 1000 ) = = - 1 ) {
redisLog ( REDIS_WARNING , " I/O error writing to MASTER: %s " ,
strerror ( errno ) ) ;
goto error ;
}
2010-06-21 18:07:48 -04:00
}
2010-11-04 13:09:35 -04:00
/* Prepare a suitable temp file for bulk transfer */
2010-06-21 18:07:48 -04:00
while ( maxtries - - ) {
snprintf ( tmpfile , 256 ,
2012-03-27 11:39:58 -04:00
" temp-%d.%ld.rdb " , ( int ) server . unixtime , ( long int ) getpid ( ) ) ;
2010-06-21 18:07:48 -04:00
dfd = open ( tmpfile , O_CREAT | O_WRONLY | O_EXCL , 0644 ) ;
if ( dfd ! = - 1 ) break ;
sleep ( 1 ) ;
}
if ( dfd = = - 1 ) {
redisLog ( REDIS_WARNING , " Opening the temp file needed for MASTER <-> SLAVE synchronization: %s " , strerror ( errno ) ) ;
2011-05-19 12:53:06 -04:00
goto error ;
2010-06-21 18:07:48 -04:00
}
2010-11-04 12:29:53 -04:00
/* Setup the non blocking download of the bulk file. */
2011-05-19 12:53:06 -04:00
if ( aeCreateFileEvent ( server . el , fd , AE_READABLE , readSyncBulkPayload , NULL )
2010-11-04 12:35:03 -04:00
= = AE_ERR )
2010-11-04 12:29:53 -04:00
{
2013-01-03 08:22:55 -05:00
redisLog ( REDIS_WARNING ,
" Can't create readable event for SYNC: %s (fd=%d) " ,
strerror ( errno ) , fd ) ;
2011-05-19 12:53:06 -04:00
goto error ;
2010-06-21 18:07:48 -04:00
}
2011-05-19 12:53:06 -04:00
2011-12-21 06:23:18 -05:00
server . repl_state = REDIS_REPL_TRANSFER ;
2012-08-24 13:28:44 -04:00
server . repl_transfer_size = - 1 ;
server . repl_transfer_read = 0 ;
server . repl_transfer_last_fsync_off = 0 ;
2010-11-04 12:29:53 -04:00
server . repl_transfer_fd = dfd ;
2012-03-27 11:39:58 -04:00
server . repl_transfer_lastio = server . unixtime ;
2010-11-04 12:29:53 -04:00
server . repl_transfer_tmpfile = zstrdup ( tmpfile ) ;
2011-05-19 12:53:06 -04:00
return ;
error :
close ( fd ) ;
2012-08-31 09:32:57 -04:00
server . repl_transfer_s = - 1 ;
server . repl_state = REDIS_REPL_CONNECT ;
2011-05-19 12:53:06 -04:00
return ;
}
int connectWithMaster ( void ) {
int fd ;
fd = anetTcpNonBlockConnect ( NULL , server . masterhost , server . masterport ) ;
if ( fd = = - 1 ) {
redisLog ( REDIS_WARNING , " Unable to connect to MASTER: %s " ,
strerror ( errno ) ) ;
return REDIS_ERR ;
}
2011-06-09 09:35:07 -04:00
if ( aeCreateFileEvent ( server . el , fd , AE_READABLE | AE_WRITABLE , syncWithMaster , NULL ) = =
2011-05-22 06:41:24 -04:00
AE_ERR )
2011-05-19 12:53:06 -04:00
{
close ( fd ) ;
redisLog ( REDIS_WARNING , " Can't create readable event for SYNC " ) ;
return REDIS_ERR ;
}
2012-03-27 11:39:58 -04:00
server . repl_transfer_lastio = server . unixtime ;
2011-05-19 12:53:06 -04:00
server . repl_transfer_s = fd ;
2011-12-21 06:23:18 -05:00
server . repl_state = REDIS_REPL_CONNECTING ;
2010-06-21 18:07:48 -04:00
return REDIS_OK ;
}
2011-11-30 09:35:16 -05:00
/* This function can be called when a non blocking connection is currently
* in progress to undo it . */
void undoConnectWithMaster ( void ) {
int fd = server . repl_transfer_s ;
2012-08-31 09:32:57 -04:00
redisAssert ( server . repl_state = = REDIS_REPL_CONNECTING | |
server . repl_state = = REDIS_REPL_RECEIVE_PONG ) ;
2011-11-30 09:35:16 -05:00
aeDeleteFileEvent ( server . el , fd , AE_READABLE | AE_WRITABLE ) ;
close ( fd ) ;
server . repl_transfer_s = - 1 ;
2011-12-21 06:23:18 -05:00
server . repl_state = REDIS_REPL_CONNECT ;
2011-11-30 09:35:16 -05:00
}
2013-01-14 05:39:54 -05:00
/* This function aborts a non blocking replication attempt if there is one
* in progress , by canceling the non - blocking connect attempt or
* the initial bulk transfer .
*
* If there was a replication handshake in progress 1 is returned and
* the replication state ( server . repl_state ) set to REDIS_REPL_CONNECT .
*
* Otherwise zero is returned and no operation is perforemd at all . */
int cancelReplicationHandshake ( void ) {
if ( server . repl_state = = REDIS_REPL_TRANSFER ) {
replicationAbortSyncTransfer ( ) ;
} else if ( server . repl_state = = REDIS_REPL_CONNECTING | |
server . repl_state = = REDIS_REPL_RECEIVE_PONG )
{
undoConnectWithMaster ( ) ;
} else {
return 0 ;
}
return 1 ;
}
2013-03-04 07:22:21 -05:00
/* Set replication to the specified master address and port. */
void replicationSetMaster ( char * ip , int port ) {
sdsfree ( server . masterhost ) ;
2013-03-04 09:39:43 -05:00
server . masterhost = sdsnew ( ip ) ;
2013-03-04 07:22:21 -05:00
server . masterport = port ;
if ( server . master ) freeClient ( server . master ) ;
disconnectSlaves ( ) ; /* Force our slaves to resync with us as well. */
replicationDiscardCachedMaster ( ) ; /* Don't try a PSYNC. */
freeReplicationBacklog ( ) ; /* Don't allow our chained slaves to PSYNC. */
cancelReplicationHandshake ( ) ;
server . repl_state = REDIS_REPL_CONNECT ;
}
/* Cancel replication, setting the instance as a master itself. */
void replicationUnsetMaster ( void ) {
if ( server . masterhost = = NULL ) return ; /* Nothing to do. */
sdsfree ( server . masterhost ) ;
server . masterhost = NULL ;
if ( server . master ) freeClient ( server . master ) ;
replicationDiscardCachedMaster ( ) ;
cancelReplicationHandshake ( ) ;
server . repl_state = REDIS_REPL_NONE ;
}
2010-06-21 18:07:48 -04:00
void slaveofCommand ( redisClient * c ) {
2013-03-05 06:39:11 -05:00
/* SLAVEOF is not allowed in cluster mode as replication is automatically
* configured using the current address of the master node . */
if ( server . cluster_enabled ) {
addReplyError ( c , " SLAVEOF not allowed in cluster mode. " ) ;
return ;
}
/* The special host/port combination "NO" "ONE" turns the instance
* into a master . Otherwise the new master address is set . */
2010-06-21 18:07:48 -04:00
if ( ! strcasecmp ( c - > argv [ 1 ] - > ptr , " no " ) & &
! strcasecmp ( c - > argv [ 2 ] - > ptr , " one " ) ) {
if ( server . masterhost ) {
2013-03-04 07:22:21 -05:00
replicationUnsetMaster ( ) ;
2010-06-21 18:07:48 -04:00
redisLog ( REDIS_NOTICE , " MASTER MODE enabled (user request) " ) ;
}
} else {
2012-01-16 05:27:22 -05:00
long port ;
if ( ( getLongFromObjectOrReply ( c , c - > argv [ 2 ] , & port , NULL ) ! = REDIS_OK ) )
return ;
/* Check if we are already attached to the specified slave */
if ( server . masterhost & & ! strcasecmp ( server . masterhost , c - > argv [ 1 ] - > ptr )
& & server . masterport = = port ) {
redisLog ( REDIS_NOTICE , " SLAVE OF would result into synchronization with the master we are already connected with. No operation performed. " ) ;
addReplySds ( c , sdsnew ( " +OK Already connected to specified master \r \n " ) ) ;
return ;
}
/* There was no previous master or the user specified a different one,
* we can continue . */
2013-03-04 07:22:21 -05:00
replicationSetMaster ( c - > argv [ 1 ] - > ptr , port ) ;
2010-06-21 18:07:48 -04:00
redisLog ( REDIS_NOTICE , " SLAVE OF %s:%d enabled (user request) " ,
server . masterhost , server . masterport ) ;
}
addReply ( c , shared . ok ) ;
}
2010-11-04 12:29:53 -04:00
2013-05-27 04:41:53 -04:00
/* Send a REPLCONF ACK command to the master to inform it about the current
* processed offset . If we are not connected with a master , the command has
* no effects . */
void replicationSendAck ( void ) {
redisClient * c = server . master ;
if ( c ! = NULL ) {
c - > flags | = REDIS_MASTER_FORCE_REPLY ;
addReplyMultiBulkLen ( c , 3 ) ;
addReplyBulkCString ( c , " REPLCONF " ) ;
addReplyBulkCString ( c , " ACK " ) ;
addReplyBulkLongLong ( c , c - > reploff ) ;
c - > flags & = ~ REDIS_MASTER_FORCE_REPLY ;
}
}
2013-01-30 12:33:16 -05:00
/* ---------------------- MASTER CACHING FOR PSYNC -------------------------- */
/* In order to implement partial synchronization we need to be able to cache
* our master ' s client structure after a transient disconnection .
* It is cached into server . cached_master and flushed away using the following
* functions . */
/* This function is called by freeClient() in order to cache the master
* client structure instead of destryoing it . freeClient ( ) will return
* ASAP after this function returns , so every action needed to avoid problems
* with a client that is really " suspended " has to be done by this function .
*
* The other functions that will deal with the cached master are :
*
* replicationDiscardCachedMaster ( ) that will make sure to kill the client
* as for some reason we don ' t want to use it in the future .
*
* replicationResurrectCachedMaster ( ) that is used after a successful PSYNC
* handshake in order to reactivate the cached master .
*/
void replicationCacheMaster ( redisClient * c ) {
listNode * ln ;
redisAssert ( server . master ! = NULL & & server . cached_master = = NULL ) ;
redisLog ( REDIS_NOTICE , " Caching the disconnected master state. " ) ;
/* Remove from the list of clients, we don't want this client to be
* listed by CLIENT LIST or processed in any way by batch operations . */
ln = listSearchKey ( server . clients , c ) ;
redisAssert ( ln ! = NULL ) ;
listDelNode ( server . clients , ln ) ;
/* Save the master. Server.master will be set to null later by
* replicationHandleMasterDisconnection ( ) . */
server . cached_master = server . master ;
/* Remove the event handlers and close the socket. We'll later reuse
* the socket of the new connection with the master during PSYNC . */
aeDeleteFileEvent ( server . el , c - > fd , AE_READABLE ) ;
aeDeleteFileEvent ( server . el , c - > fd , AE_WRITABLE ) ;
close ( c - > fd ) ;
/* Set fd to -1 so that we can safely call freeClient(c) later. */
c - > fd = - 1 ;
/* Caching the master happens instead of the actual freeClient() call,
* so make sure to adjust the replication state . This function will
* also set server . master to NULL . */
replicationHandleMasterDisconnection ( ) ;
}
/* Free a cached master, called when there are no longer the conditions for
* a partial resync on reconnection . */
void replicationDiscardCachedMaster ( void ) {
if ( server . cached_master = = NULL ) return ;
redisLog ( REDIS_NOTICE , " Discarding previously cached master state. " ) ;
server . cached_master - > flags & = ~ REDIS_MASTER ;
freeClient ( server . cached_master ) ;
server . cached_master = NULL ;
}
/* Turn the cached master into the current master, using the file descriptor
* passed as argument as the socket for the new master .
*
* This funciton is called when successfully setup a partial resynchronization
* so the stream of data that we ' ll receive will start from were this
* master left . */
void replicationResurrectCachedMaster ( int newfd ) {
server . master = server . cached_master ;
server . cached_master = NULL ;
server . master - > fd = newfd ;
server . master - > flags & = ~ ( REDIS_CLOSE_AFTER_REPLY | REDIS_CLOSE_ASAP ) ;
server . master - > authenticated = 1 ;
server . master - > lastinteraction = server . unixtime ;
server . repl_state = REDIS_REPL_CONNECTED ;
/* Re-add to the list of clients. */
listAddNodeTail ( server . clients , server . master ) ;
if ( aeCreateFileEvent ( server . el , newfd , AE_READABLE ,
readQueryFromClient , server . master ) ) {
redisLog ( REDIS_WARNING , " Error resurrecting the cached master, impossible to add the readable handler: %s " , strerror ( errno ) ) ;
freeClientAsync ( server . master ) ; /* Close ASAP. */
}
}
2013-05-29 05:36:44 -04:00
/* ------------------------- MIN-SLAVES-TO-WRITE --------------------------- */
/* This function counts the number of slaves with lag <= min-slaves-max-lag.
* If the option is active , the server will prevent writes if there are not
* enough connected slaves with the specified lag ( or less ) . */
void refreshGoodSlavesCount ( void ) {
listIter li ;
listNode * ln ;
int good = 0 ;
if ( ! server . repl_min_slaves_to_write | |
! server . repl_min_slaves_max_lag ) return ;
listRewind ( server . slaves , & li ) ;
while ( ( ln = listNext ( & li ) ) ) {
redisClient * slave = ln - > value ;
time_t lag = server . unixtime - slave - > repl_ack_time ;
if ( slave - > replstate = = REDIS_REPL_ONLINE & &
lag < = server . repl_min_slaves_max_lag ) good + + ;
}
server . repl_good_slaves_count = good ;
}
2013-06-24 04:26:04 -04:00
/* ----------------------- REPLICATION SCRIPT CACHE --------------------------
* The goal of this code is to keep track of scripts already sent to every
* connected slave , in order to be able to replicate EVALSHA as it is without
* translating it to EVAL every time it is possible .
*
* We use a capped collection implemented by an hash table for fast lookup
* of scripts we can send as EVALSHA , plus a linked list that is used for
* eviction of the oldest entry when the max number of items is reached .
*
* We don ' t care about taking a different cache for every different slave
* since to fill the cache again is not very costly , the goal of this code
* is to avoid that the same big script is trasmitted a big number of times
* per second wasting bandwidth and processor speed , but it is not a problem
* if we need to rebuild the cache from scratch from time to time , every used
* script will need to be transmitted a single time to reappear in the cache .
*
* This is how the system works :
*
* 1 ) Every time a new slave connects , we flush the whole script cache .
* 2 ) We only send as EVALSHA what was sent to the master as EVALSHA , without
* trying to convert EVAL into EVALSHA specifically for slaves .
* 3 ) Every time we trasmit a script as EVAL to the slaves , we also add the
* corresponding SHA1 of the script into the cache as we are sure every
* slave knows about the script starting from now .
* 4 ) On SCRIPT FLUSH command , we replicate the command to all the slaves
* and at the same time flush the script cache .
* 5 ) When the last slave disconnects , flush the cache .
* 6 ) We handle SCRIPT LOAD as well since that ' s how scripts are loaded
* in the master sometimes .
*/
/* Initialize the script cache, only called at startup. */
void replicationScriptCacheInit ( void ) {
server . repl_scriptcache_size = 10000 ;
server . repl_scriptcache_dict = dictCreate ( & replScriptCacheDictType , NULL ) ;
server . repl_scriptcache_fifo = listCreate ( ) ;
}
/* Empty the script cache. Should be called every time we are no longer sure
2013-06-24 12:57:31 -04:00
* that every slave knows about all the scripts in our set , or when the
* current AOF " context " is no longer aware of the script . In general we
* should flush the cache :
*
* 1 ) Every time a new slave reconnects to this master and performs a
* full SYNC ( PSYNC does not require flushing ) .
* 2 ) Every time an AOF rewrite is performed .
* 3 ) Every time we are left without slaves at all , and AOF is off , in order
* to reclaim otherwise unused memory .
*/
2013-06-24 04:26:04 -04:00
void replicationScriptCacheFlush ( void ) {
dictEmpty ( server . repl_scriptcache_dict ) ;
listRelease ( server . repl_scriptcache_fifo ) ;
server . repl_scriptcache_fifo = listCreate ( ) ;
}
/* Add an entry into the script cache, if we reach max number of entries the
* oldest is removed from the list . */
void replicationScriptCacheAdd ( sds sha1 ) {
int retval ;
sds key = sdsdup ( sha1 ) ;
/* Evict oldest. */
if ( listLength ( server . repl_scriptcache_fifo ) = = server . repl_scriptcache_size )
{
listNode * ln = listLast ( server . repl_scriptcache_fifo ) ;
sds oldest = listNodeValue ( ln ) ;
retval = dictDelete ( server . repl_scriptcache_dict , oldest ) ;
redisAssert ( retval = = DICT_OK ) ;
listDelNode ( server . repl_scriptcache_fifo , ln ) ;
}
/* Add current. */
retval = dictAdd ( server . repl_scriptcache_dict , key , NULL ) ;
listAddNodeHead ( server . repl_scriptcache_fifo , key ) ;
redisAssert ( retval = = DICT_OK ) ;
}
/* Returns non-zero if the specified entry exists inside the cache, that is,
* if all the slaves are aware of this script SHA1 . */
int replicationScriptCacheExists ( sds sha1 ) {
2013-06-24 12:57:31 -04:00
return dictFind ( server . repl_scriptcache_dict , sha1 ) ! = NULL ;
2013-06-24 04:26:04 -04:00
}
/* --------------------------- REPLICATION CRON ----------------------------- */
2010-11-04 12:29:53 -04:00
2013-05-27 04:41:53 -04:00
/* Replication cron funciton, called 1 time per second. */
2010-11-04 12:29:53 -04:00
void replicationCron ( void ) {
2011-11-30 09:35:16 -05:00
/* Non blocking connection timeout? */
2012-08-31 09:32:57 -04:00
if ( server . masterhost & &
( server . repl_state = = REDIS_REPL_CONNECTING | |
server . repl_state = = REDIS_REPL_RECEIVE_PONG ) & &
2011-11-30 09:35:16 -05:00
( time ( NULL ) - server . repl_transfer_lastio ) > server . repl_timeout )
{
redisLog ( REDIS_WARNING , " Timeout connecting to the MASTER... " ) ;
undoConnectWithMaster ( ) ;
}
2010-11-04 12:29:53 -04:00
/* Bulk transfer I/O timeout? */
2011-12-21 06:23:18 -05:00
if ( server . masterhost & & server . repl_state = = REDIS_REPL_TRANSFER & &
2011-10-31 06:13:28 -04:00
( time ( NULL ) - server . repl_transfer_lastio ) > server . repl_timeout )
2010-11-04 12:29:53 -04:00
{
2012-10-04 05:49:17 -04:00
redisLog ( REDIS_WARNING , " Timeout receiving bulk data from MASTER... If the problem persists try to set the 'repl-timeout' parameter in redis.conf to a larger value. " ) ;
2010-11-04 12:29:53 -04:00
replicationAbortSyncTransfer ( ) ;
}
2011-01-20 07:18:23 -05:00
/* Timed out master when we are an already connected slave? */
2011-12-21 06:23:18 -05:00
if ( server . masterhost & & server . repl_state = = REDIS_REPL_CONNECTED & &
2011-10-31 06:13:28 -04:00
( time ( NULL ) - server . master - > lastinteraction ) > server . repl_timeout )
2011-01-20 07:18:23 -05:00
{
2013-07-12 17:06:27 -04:00
redisLog ( REDIS_WARNING , " MASTER timeout: no data nor PING received... " ) ;
2011-01-20 07:18:23 -05:00
freeClient ( server . master ) ;
}
2010-11-04 12:29:53 -04:00
/* Check if we should connect to a MASTER */
2011-12-21 06:23:18 -05:00
if ( server . repl_state = = REDIS_REPL_CONNECT ) {
2010-11-04 12:29:53 -04:00
redisLog ( REDIS_NOTICE , " Connecting to MASTER... " ) ;
2011-05-19 12:53:06 -04:00
if ( connectWithMaster ( ) = = REDIS_OK ) {
redisLog ( REDIS_NOTICE , " MASTER <-> SLAVE sync started " ) ;
2010-11-04 12:29:53 -04:00
}
}
2013-05-27 04:45:37 -04:00
/* Send ACK to master from time to time. */
if ( server . masterhost & & server . master )
replicationSendAck ( ) ;
2011-01-20 07:18:23 -05:00
/* If we have attached slaves, PING them from time to time.
* So slaves can implement an explicit timeout to masters , and will
* be able to detect a link disconnection even if the TCP connection
* will not actually go down . */
2012-12-14 11:10:40 -05:00
if ( ! ( server . cronloops % ( server . repl_ping_slave_period * server . hz ) ) ) {
2011-01-20 07:18:23 -05:00
listIter li ;
listNode * ln ;
2012-11-03 06:56:28 -04:00
robj * ping_argv [ 1 ] ;
2011-01-20 07:18:23 -05:00
2012-11-03 06:56:28 -04:00
/* First, send PING */
ping_argv [ 0 ] = createStringObject ( " PING " , 4 ) ;
replicationFeedSlaves ( server . slaves , server . slaveseldb , ping_argv , 1 ) ;
decrRefCount ( ping_argv [ 0 ] ) ;
2013-01-30 12:33:16 -05:00
/* Second, send a newline to all the slaves in pre-synchronization
* stage , that is , slaves waiting for the master to create the RDB file .
2012-11-03 06:56:28 -04:00
* The newline will be ignored by the slave but will refresh the
* last - io timer preventing a timeout . */
2011-01-20 07:18:23 -05:00
listRewind ( server . slaves , & li ) ;
while ( ( ln = listNext ( & li ) ) ) {
redisClient * slave = ln - > value ;
2012-11-03 06:56:28 -04:00
if ( slave - > replstate = = REDIS_REPL_WAIT_BGSAVE_START | |
slave - > replstate = = REDIS_REPL_WAIT_BGSAVE_END ) {
2011-02-21 11:50:54 -05:00
if ( write ( slave - > fd , " \n " , 1 ) = = - 1 ) {
/* Don't worry, it's just a ping. */
}
2011-01-20 07:18:23 -05:00
}
}
}
2013-01-30 12:33:16 -05:00
2013-05-27 05:17:17 -04:00
/* Disconnect timedout slaves. */
if ( listLength ( server . slaves ) ) {
listIter li ;
listNode * ln ;
listRewind ( server . slaves , & li ) ;
while ( ( ln = listNext ( & li ) ) ) {
redisClient * slave = ln - > value ;
if ( slave - > replstate ! = REDIS_REPL_ONLINE ) continue ;
2013-06-26 04:11:20 -04:00
if ( slave - > flags & REDIS_PRE_PSYNC_SLAVE ) continue ;
2013-05-27 05:17:17 -04:00
if ( ( server . unixtime - slave - > repl_ack_time ) > server . repl_timeout )
{
2013-07-08 10:11:52 -04:00
char ip [ REDIS_IP_STR_LEN ] ;
2013-05-27 05:17:17 -04:00
int port ;
2013-07-08 10:11:52 -04:00
if ( anetPeerToString ( slave - > fd , ip , sizeof ( ip ) , & port ) ! = - 1 ) {
2013-05-27 05:17:17 -04:00
redisLog ( REDIS_WARNING ,
" Disconnecting timedout slave: %s:%d " ,
ip , slave - > slave_listening_port ) ;
}
freeClient ( slave ) ;
}
}
}
2013-01-30 12:33:16 -05:00
/* If we have no attached slaves and there is a replication backlog
* using memory , free it after some ( configured ) time . */
if ( listLength ( server . slaves ) = = 0 & & server . repl_backlog_time_limit & &
server . repl_backlog )
{
time_t idle = server . unixtime - server . repl_no_slaves_since ;
if ( idle > server . repl_backlog_time_limit ) {
freeReplicationBacklog ( ) ;
redisLog ( REDIS_NOTICE ,
" Replication backlog freed after %d seconds "
2013-02-27 06:27:15 -05:00
" without connected slaves. " ,
( int ) server . repl_backlog_time_limit ) ;
2013-01-30 12:33:16 -05:00
}
}
2013-05-29 05:36:44 -04:00
2013-06-24 12:57:31 -04:00
/* If AOF is disabled and we no longer have attached slaves, we can
* free our Replication Script Cache as there is no need to propagate
* EVALSHA at all . */
if ( listLength ( server . slaves ) = = 0 & &
server . aof_state = = REDIS_AOF_OFF & &
listLength ( server . repl_scriptcache_fifo ) ! = 0 )
{
replicationScriptCacheFlush ( ) ;
}
2013-05-29 05:36:44 -04:00
/* Refresh the number of slaves with lag <= min-slaves-max-lag. */
refreshGoodSlavesCount ( ) ;
2010-11-04 12:29:53 -04:00
}