mirror of
https://codeberg.org/redict/redict.git
synced 2025-01-23 08:38:27 -05:00
2440 lines
91 KiB
C
2440 lines
91 KiB
C
|
/* Redis Sentinel implementation
|
||
|
* -----------------------------
|
||
|
*
|
||
|
* Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
|
||
|
* All rights reserved.
|
||
|
*
|
||
|
* Redistribution and use in source and binary forms, with or without
|
||
|
* modification, are permitted provided that the following conditions are met:
|
||
|
*
|
||
|
* * Redistributions of source code must retain the above copyright notice,
|
||
|
* this list of conditions and the following disclaimer.
|
||
|
* * Redistributions in binary form must reproduce the above copyright
|
||
|
* notice, this list of conditions and the following disclaimer in the
|
||
|
* documentation and/or other materials provided with the distribution.
|
||
|
* * Neither the name of Redis nor the names of its contributors may be used
|
||
|
* to endorse or promote products derived from this software without
|
||
|
* specific prior written permission.
|
||
|
*
|
||
|
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
|
||
|
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
|
||
|
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
|
||
|
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
|
||
|
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
|
||
|
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
|
||
|
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
|
||
|
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
|
||
|
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
|
||
|
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
|
||
|
* POSSIBILITY OF SUCH DAMAGE.
|
||
|
*/
|
||
|
|
||
|
#include "redis.h"
|
||
|
#include "hiredis.h"
|
||
|
#include "async.h"
|
||
|
|
||
|
#include <ctype.h>
|
||
|
#include <arpa/inet.h>
|
||
|
#include <sys/socket.h>
|
||
|
|
||
|
#define REDIS_SENTINEL_PORT 26379
|
||
|
|
||
|
/* ======================== Sentinel global state =========================== */
|
||
|
|
||
|
typedef long long mstime_t; /* millisecond time type. */
|
||
|
|
||
|
/* Address object, used to describe an ip:port pair. */
|
||
|
typedef struct sentinelAddr {
|
||
|
char *ip;
|
||
|
int port;
|
||
|
} sentinelAddr;
|
||
|
|
||
|
/* A Sentinel Redis Instance object is monitoring. */
|
||
|
#define SRI_MASTER (1<<0)
|
||
|
#define SRI_SLAVE (1<<1)
|
||
|
#define SRI_SENTINEL (1<<2)
|
||
|
#define SRI_DISCONNECTED (1<<3)
|
||
|
#define SRI_S_DOWN (1<<4) /* Subjectively down (no quorum). */
|
||
|
#define SRI_O_DOWN (1<<5) /* Objectively down (quorum reached). */
|
||
|
#define SRI_MASTER_DOWN (1<<6) /* A Sentinel with this flag set thinks that
|
||
|
its master is down. */
|
||
|
/* SRI_CAN_FAILOVER when set in an SRI_MASTER instance means that we are
|
||
|
* allowed to perform the failover for this master.
|
||
|
* When set in a SRI_SENTINEL instance means that sentinel is allowed to
|
||
|
* perform the failover on its master. */
|
||
|
#define SRI_CAN_FAILOVER (1<<7)
|
||
|
#define SRI_FAILOVER_IN_PROGRESS (1<<8) /* Failover is in progress for
|
||
|
this master. */
|
||
|
#define SRI_I_AM_THE_LEADER (1<<9) /* We are the leader for this master. */
|
||
|
#define SRI_PROMOTED (1<<10) /* Slave selected for promotion. */
|
||
|
#define SRI_RECONF_SENT (1<<11) /* SLAVEOF <newmaster> sent. */
|
||
|
#define SRI_RECONF_INPROG (1<<12) /* Slave synchronization in progress. */
|
||
|
#define SRI_RECONF_DONE (1<<13) /* Slave synchronized with new master. */
|
||
|
|
||
|
#define SENTINEL_INFO_PERIOD 10000
|
||
|
#define SENTINEL_PING_PERIOD 1000
|
||
|
#define SENTINEL_ASK_PERIOD 1000
|
||
|
#define SENTINEL_PUBLISH_PERIOD 5000
|
||
|
#define SENTINEL_DOWN_AFTER_PERIOD 30000
|
||
|
#define SENTINEL_HELLO_CHANNEL "__sentinel__:hello"
|
||
|
#define SENTINEL_TILT_TRIGGER 2000
|
||
|
#define SENTINEL_TILT_PERIOD (SENTINEL_PING_PERIOD*30)
|
||
|
#define SENTINEL_DEFAULT_SLAVE_PRIORITY 100
|
||
|
#define SENTINEL_PROMOTION_RETRY_PERIOD 30000
|
||
|
#define SENTINEL_SLAVE_RECONF_RETRY_PERIOD 10000
|
||
|
#define SENTINEL_DEFAULT_PARALLEL_SYNCS 1
|
||
|
#define SENTINEL_MIN_LINK_RECONNECT_PERIOD 15000
|
||
|
#define SENTINEL_DEFAULT_FAILOVER_TIMEOUT (60*15*1000)
|
||
|
#define SENTINEL_MAX_PENDING_COMMANDS 100
|
||
|
#define SENTINEL_EXTENDED_SDOWN_MULTIPLIER 10
|
||
|
|
||
|
/* How many milliseconds is an information valid? This applies for instance
|
||
|
* to the reply to SENTINEL IS-MASTER-DOWN-BY-ADDR replies. */
|
||
|
#define SENTINEL_INFO_VALIDITY_TIME 5000
|
||
|
#define SENTINEL_FAILOVER_FIXED_DELAY 5000
|
||
|
#define SENTINEL_FAILOVER_MAX_RANDOM_DELAY 10000
|
||
|
|
||
|
/* Failover machine different states. */
|
||
|
#define SENTINEL_FAILOVER_STATE_NONE 0 /* No failover in progress. */
|
||
|
#define SENTINEL_FAILOVER_STATE_WAIT_START 1 /* Wait for failover_start_time*/
|
||
|
#define SENTINEL_FAILOVER_STATE_SELECT_SLAVE 2 /* Select slave to promote */
|
||
|
#define SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE 3 /* Slave -> Master */
|
||
|
#define SENTINEL_FAILOVER_STATE_WAIT_PROMOTION 4 /* Wait slave to change role */
|
||
|
#define SENTINEL_FAILOVER_STATE_RECONF_SLAVES 5 /* SLAVEOF newmaster */
|
||
|
#define SENTINEL_FAILOVER_STATE_WAIT_NEXT_SLAVE 6 /* wait replication */
|
||
|
#define SENTINEL_FAILOVER_STATE_ALERT_CLIENTS 7 /* Run user script. */
|
||
|
#define SENTINEL_FAILOVER_STATE_WAIT_ALERT_SCRIPT 8 /* Wait script exec. */
|
||
|
#define SENTINEL_FAILOVER_STATE_DETECT_END 9 /* Check for failover end. */
|
||
|
#define SENTINEL_FAILOVER_STATE_UPDATE_CONFIG 10 /* Monitor promoted slave. */
|
||
|
|
||
|
#define SENTINEL_MASTER_LINK_STATUS_UP 0
|
||
|
#define SENTINEL_MASTER_LINK_STATUS_DOWN 1
|
||
|
|
||
|
typedef struct sentinelRedisInstance {
|
||
|
int flags; /* See SRI_... defines */
|
||
|
char *name; /* Master name from the point of view of this sentinel. */
|
||
|
char *runid; /* run ID of this instance. */
|
||
|
sentinelAddr *addr; /* Master host. */
|
||
|
redisAsyncContext *cc; /* Hiredis context for commands. */
|
||
|
redisAsyncContext *pc; /* Hiredis context for Pub / Sub. */
|
||
|
int pending_commands; /* Number of commands sent waiting for a reply. */
|
||
|
mstime_t cc_conn_time; /* cc connection time. */
|
||
|
mstime_t pc_conn_time; /* pc connection time. */
|
||
|
mstime_t pc_last_activity; /* Last time we received any message. */
|
||
|
mstime_t last_avail_time; /* Last time the instance replied to ping with
|
||
|
a reply we consider valid. */
|
||
|
mstime_t last_pong_time; /* Last time the instance replied to ping,
|
||
|
whatever the reply was. That's used to check
|
||
|
if the link is idle and must be reconnected. */
|
||
|
mstime_t last_pub_time; /* Last time we sent hello via Pub/Sub. */
|
||
|
mstime_t last_hello_time; /* Only used if SRI_SENTINEL is set. Last time
|
||
|
we received an hello from this Sentinel
|
||
|
via Pub/Sub. */
|
||
|
mstime_t last_master_down_reply_time; /* Time of last reply to
|
||
|
SENTINEL is-master-down command. */
|
||
|
mstime_t s_down_since_time; /* Subjectively down since time. */
|
||
|
mstime_t o_down_since_time; /* Objectively down since time. */
|
||
|
mstime_t down_after_period; /* Consider it down after that period. */
|
||
|
mstime_t info_refresh; /* Time at which we received INFO output from it. */
|
||
|
|
||
|
/* Master specific. */
|
||
|
dict *sentinels; /* Other sentinels monitoring the same master. */
|
||
|
dict *slaves; /* Slaves for this master instance. */
|
||
|
int quorum; /* Number of sentinels that need to agree on failure. */
|
||
|
int parallel_syncs; /* How many slaves to reconfigure at same time. */
|
||
|
|
||
|
/* Slave specific. */
|
||
|
mstime_t master_link_down_time; /* Slave replication link down time. */
|
||
|
int slave_priority; /* Slave priority according to its INFO output. */
|
||
|
mstime_t slave_reconf_sent_time; /* Time at which we sent SLAVE OF <new> */
|
||
|
struct sentinelRedisInstance *master; /* Master instance if SRI_SLAVE is set. */
|
||
|
char *slave_master_host; /* Master host as reported by INFO */
|
||
|
int slave_master_port; /* Master port as reported by INFO */
|
||
|
int slave_master_link_status; /* Master link status as reported by INFO */
|
||
|
/* Failover */
|
||
|
char *leader; /* If this is a master instance, this is the runid of
|
||
|
the Sentinel that should perform the failover. If
|
||
|
this is a Sentinel, this is the runid of the Sentinel
|
||
|
that this other Sentinel is voting as leader.
|
||
|
This field is valid only if SRI_MASTER_DOWN is
|
||
|
set on the Sentinel instance. */
|
||
|
int failover_state; /* See SENTINEL_FAILOVER_STATE_* defines. */
|
||
|
mstime_t failover_state_change_time;
|
||
|
mstime_t failover_start_time; /* When to start to failover if leader. */
|
||
|
mstime_t failover_timeout; /* Max time to refresh failover state. */
|
||
|
struct sentinelRedisInstance *promoted_slave; /* Promoted slave instance. */
|
||
|
/* Scripts executed to notify admin or reconfigure clients: when they
|
||
|
* are set to NULL no script is executed. */
|
||
|
char *notify_script;
|
||
|
char *client_reconfig_script;
|
||
|
} sentinelRedisInstance;
|
||
|
|
||
|
/* Main state. */
|
||
|
struct sentinelState {
|
||
|
dict *masters; /* Dictionary of master sentinelRedisInstances.
|
||
|
Key is the instance name, value is the
|
||
|
sentinelRedisInstance structure pointer. */
|
||
|
int tilt; /* Are we in TILT mode? */
|
||
|
mstime_t tilt_start_time; /* When TITL started. */
|
||
|
mstime_t previous_time; /* Time last time we ran the time handler. */
|
||
|
} sentinel;
|
||
|
|
||
|
/* ======================= hiredis ae.c adapters =============================
|
||
|
* Note: this implementation is taken from hiredis/adapters/ae.h, however
|
||
|
* we have our modified copy for Sentinel in order to use our allocator
|
||
|
* and to have full control over how the adapter works. */
|
||
|
|
||
|
typedef struct redisAeEvents {
|
||
|
redisAsyncContext *context;
|
||
|
aeEventLoop *loop;
|
||
|
int fd;
|
||
|
int reading, writing;
|
||
|
} redisAeEvents;
|
||
|
|
||
|
static void redisAeReadEvent(aeEventLoop *el, int fd, void *privdata, int mask) {
|
||
|
((void)el); ((void)fd); ((void)mask);
|
||
|
|
||
|
redisAeEvents *e = (redisAeEvents*)privdata;
|
||
|
redisAsyncHandleRead(e->context);
|
||
|
}
|
||
|
|
||
|
static void redisAeWriteEvent(aeEventLoop *el, int fd, void *privdata, int mask) {
|
||
|
((void)el); ((void)fd); ((void)mask);
|
||
|
|
||
|
redisAeEvents *e = (redisAeEvents*)privdata;
|
||
|
redisAsyncHandleWrite(e->context);
|
||
|
}
|
||
|
|
||
|
static void redisAeAddRead(void *privdata) {
|
||
|
redisAeEvents *e = (redisAeEvents*)privdata;
|
||
|
aeEventLoop *loop = e->loop;
|
||
|
if (!e->reading) {
|
||
|
e->reading = 1;
|
||
|
aeCreateFileEvent(loop,e->fd,AE_READABLE,redisAeReadEvent,e);
|
||
|
}
|
||
|
}
|
||
|
|
||
|
static void redisAeDelRead(void *privdata) {
|
||
|
redisAeEvents *e = (redisAeEvents*)privdata;
|
||
|
aeEventLoop *loop = e->loop;
|
||
|
if (e->reading) {
|
||
|
e->reading = 0;
|
||
|
aeDeleteFileEvent(loop,e->fd,AE_READABLE);
|
||
|
}
|
||
|
}
|
||
|
|
||
|
static void redisAeAddWrite(void *privdata) {
|
||
|
redisAeEvents *e = (redisAeEvents*)privdata;
|
||
|
aeEventLoop *loop = e->loop;
|
||
|
if (!e->writing) {
|
||
|
e->writing = 1;
|
||
|
aeCreateFileEvent(loop,e->fd,AE_WRITABLE,redisAeWriteEvent,e);
|
||
|
}
|
||
|
}
|
||
|
|
||
|
static void redisAeDelWrite(void *privdata) {
|
||
|
redisAeEvents *e = (redisAeEvents*)privdata;
|
||
|
aeEventLoop *loop = e->loop;
|
||
|
if (e->writing) {
|
||
|
e->writing = 0;
|
||
|
aeDeleteFileEvent(loop,e->fd,AE_WRITABLE);
|
||
|
}
|
||
|
}
|
||
|
|
||
|
static void redisAeCleanup(void *privdata) {
|
||
|
redisAeEvents *e = (redisAeEvents*)privdata;
|
||
|
redisAeDelRead(privdata);
|
||
|
redisAeDelWrite(privdata);
|
||
|
zfree(e);
|
||
|
}
|
||
|
|
||
|
static int redisAeAttach(aeEventLoop *loop, redisAsyncContext *ac) {
|
||
|
redisContext *c = &(ac->c);
|
||
|
redisAeEvents *e;
|
||
|
|
||
|
/* Nothing should be attached when something is already attached */
|
||
|
if (ac->ev.data != NULL)
|
||
|
return REDIS_ERR;
|
||
|
|
||
|
/* Create container for context and r/w events */
|
||
|
e = (redisAeEvents*)zmalloc(sizeof(*e));
|
||
|
e->context = ac;
|
||
|
e->loop = loop;
|
||
|
e->fd = c->fd;
|
||
|
e->reading = e->writing = 0;
|
||
|
|
||
|
/* Register functions to start/stop listening for events */
|
||
|
ac->ev.addRead = redisAeAddRead;
|
||
|
ac->ev.delRead = redisAeDelRead;
|
||
|
ac->ev.addWrite = redisAeAddWrite;
|
||
|
ac->ev.delWrite = redisAeDelWrite;
|
||
|
ac->ev.cleanup = redisAeCleanup;
|
||
|
ac->ev.data = e;
|
||
|
|
||
|
return REDIS_OK;
|
||
|
}
|
||
|
|
||
|
/* ============================= Prototypes ================================= */
|
||
|
|
||
|
void sentinelLinkEstablishedCallback(const redisAsyncContext *c, int status);
|
||
|
void sentinelDisconnectCallback(const redisAsyncContext *c, int status);
|
||
|
void sentinelReceiveHelloMessages(redisAsyncContext *c, void *reply, void *privdata);
|
||
|
sentinelRedisInstance *sentinelGetMasterByName(char *name);
|
||
|
char *sentinelGetSubjectiveLeader(sentinelRedisInstance *master);
|
||
|
char *sentinelGetObjectiveLeader(sentinelRedisInstance *master);
|
||
|
int yesnotoi(char *s);
|
||
|
void sentinelDisconnectInstanceFromContext(const redisAsyncContext *c);
|
||
|
const char *sentinelRedisInstanceTypeStr(sentinelRedisInstance *ri);
|
||
|
|
||
|
/* ========================= Dictionary types =============================== */
|
||
|
|
||
|
unsigned int dictSdsHash(const void *key);
|
||
|
int dictSdsKeyCompare(void *privdata, const void *key1, const void *key2);
|
||
|
void releaseSentinelRedisInstance(sentinelRedisInstance *ri);
|
||
|
|
||
|
void dictInstancesValDestructor (void *privdata, void *obj) {
|
||
|
releaseSentinelRedisInstance(obj);
|
||
|
}
|
||
|
|
||
|
/* Instance name (sds) -> instance (sentinelRedisInstance pointer)
|
||
|
*
|
||
|
* also used for: sentinelRedisInstance->sentinels dictionary that maps
|
||
|
* sentinels ip:port to last seen time in Pub/Sub hello message. */
|
||
|
dictType instancesDictType = {
|
||
|
dictSdsHash, /* hash function */
|
||
|
NULL, /* key dup */
|
||
|
NULL, /* val dup */
|
||
|
dictSdsKeyCompare, /* key compare */
|
||
|
NULL, /* key destructor */
|
||
|
dictInstancesValDestructor /* val destructor */
|
||
|
};
|
||
|
|
||
|
/* Instance runid (sds) -> votes (long casted to void*)
|
||
|
*
|
||
|
* This is useful into sentinelGetObjectiveLeader() function in order to
|
||
|
* count the votes and understand who is the leader. */
|
||
|
dictType leaderVotesDictType = {
|
||
|
dictSdsHash, /* hash function */
|
||
|
NULL, /* key dup */
|
||
|
NULL, /* val dup */
|
||
|
dictSdsKeyCompare, /* key compare */
|
||
|
NULL, /* key destructor */
|
||
|
NULL /* val destructor */
|
||
|
};
|
||
|
|
||
|
/* =========================== Initialization =============================== */
|
||
|
|
||
|
void sentinelCommand(redisClient *c);
|
||
|
|
||
|
struct redisCommand sentinelcmds[] = {
|
||
|
{"ping",pingCommand,1,"",0,NULL,0,0,0,0,0},
|
||
|
{"sentinel",sentinelCommand,-2,"",0,NULL,0,0,0,0,0},
|
||
|
{"subscribe",subscribeCommand,-2,"",0,NULL,0,0,0,0,0},
|
||
|
{"unsubscribe",unsubscribeCommand,-1,"",0,NULL,0,0,0,0,0},
|
||
|
{"psubscribe",psubscribeCommand,-2,"",0,NULL,0,0,0,0,0},
|
||
|
{"punsubscribe",punsubscribeCommand,-1,"",0,NULL,0,0,0,0,0}
|
||
|
};
|
||
|
|
||
|
/* This function overwrites a few normal Redis config default with Sentinel
|
||
|
* specific defaults. */
|
||
|
void initSentinelConfig(void) {
|
||
|
server.port = REDIS_SENTINEL_PORT;
|
||
|
}
|
||
|
|
||
|
/* Perform the Sentinel mode initialization. */
|
||
|
void initSentinel(void) {
|
||
|
int j;
|
||
|
|
||
|
/* Remove usual Redis commands from the command table, then just add
|
||
|
* the SENTINEL command. */
|
||
|
dictEmpty(server.commands);
|
||
|
for (j = 0; j < sizeof(sentinelcmds)/sizeof(sentinelcmds[0]); j++) {
|
||
|
int retval;
|
||
|
struct redisCommand *cmd = sentinelcmds+j;
|
||
|
|
||
|
retval = dictAdd(server.commands, sdsnew(cmd->name), cmd);
|
||
|
redisAssert(retval == DICT_OK);
|
||
|
}
|
||
|
|
||
|
/* Initialize various data structures. */
|
||
|
sentinel.masters = dictCreate(&instancesDictType,NULL);
|
||
|
sentinel.tilt = 0;
|
||
|
sentinel.tilt_start_time = mstime();
|
||
|
sentinel.previous_time = mstime();
|
||
|
}
|
||
|
|
||
|
/* ============================== sentinelAddr ============================== */
|
||
|
|
||
|
/* Create a sentinelAddr object and return it on success.
|
||
|
* On error NULL is returned and errno is set to:
|
||
|
* ENOENT: Can't resolve the hostname.
|
||
|
* EINVAL: Invalid port number.
|
||
|
*/
|
||
|
sentinelAddr *createSentinelAddr(char *hostname, int port) {
|
||
|
char buf[32];
|
||
|
sentinelAddr *sa;
|
||
|
|
||
|
if (port <= 0 || port > 65535) {
|
||
|
errno = EINVAL;
|
||
|
return NULL;
|
||
|
}
|
||
|
if (anetResolve(NULL,hostname,buf) == ANET_ERR) {
|
||
|
errno = ENOENT;
|
||
|
return NULL;
|
||
|
}
|
||
|
sa = zmalloc(sizeof(*sa));
|
||
|
sa->ip = sdsnew(buf);
|
||
|
sa->port = port;
|
||
|
return sa;
|
||
|
}
|
||
|
|
||
|
/* Free a Sentinel address. Can't fail. */
|
||
|
void releaseSentinelAddr(sentinelAddr *sa) {
|
||
|
sdsfree(sa->ip);
|
||
|
zfree(sa);
|
||
|
}
|
||
|
|
||
|
/* =========================== Events notification ========================== */
|
||
|
|
||
|
void sentinelCallNotificationScript(char *scriptpath, char *type, char *msg) {
|
||
|
/* TODO: implement it. */
|
||
|
}
|
||
|
|
||
|
/* Send an event to log, pub/sub, user notification script.
|
||
|
*
|
||
|
* 'level' is the log level for logging. Only REDIS_WARNING events will trigger
|
||
|
* the execution of the user notification script.
|
||
|
*
|
||
|
* 'type' is the message type, also used as a pub/sub channel name.
|
||
|
*
|
||
|
* 'ri', is the redis instance target of this event if applicable, and is
|
||
|
* used to obtain the path of the notification script to execute.
|
||
|
*
|
||
|
* The remaining arguments are printf-alike.
|
||
|
* If the format specifier starts with the two characters "%@" then ri is
|
||
|
* not NULL, and the message is prefixed with an instance identifier in the
|
||
|
* following format:
|
||
|
*
|
||
|
* <instance type> <instance name> <ip> <port>
|
||
|
*
|
||
|
* If the instance type is not master, than the additional string is
|
||
|
* added to specify the originating master:
|
||
|
*
|
||
|
* @ <master name> <master ip> <master port>
|
||
|
*
|
||
|
* Any other specifier after "%@" is processed by printf itself.
|
||
|
*/
|
||
|
void sentinelEvent(int level, char *type, sentinelRedisInstance *ri,
|
||
|
const char *fmt, ...) {
|
||
|
va_list ap;
|
||
|
char msg[REDIS_MAX_LOGMSG_LEN];
|
||
|
robj *channel, *payload;
|
||
|
|
||
|
/* Handle %@ */
|
||
|
if (fmt[0] == '%' && fmt[1] == '@') {
|
||
|
sentinelRedisInstance *master = (ri->flags & SRI_MASTER) ?
|
||
|
NULL : ri->master;
|
||
|
|
||
|
if (master) {
|
||
|
snprintf(msg, sizeof(msg), "%s %s %s %d @ %s %s %d",
|
||
|
sentinelRedisInstanceTypeStr(ri),
|
||
|
ri->name, ri->addr->ip, ri->addr->port,
|
||
|
master->name, master->addr->ip, master->addr->port);
|
||
|
} else {
|
||
|
snprintf(msg, sizeof(msg), "%s %s %s %d",
|
||
|
sentinelRedisInstanceTypeStr(ri),
|
||
|
ri->name, ri->addr->ip, ri->addr->port);
|
||
|
}
|
||
|
fmt += 2;
|
||
|
} else {
|
||
|
msg[0] = '\0';
|
||
|
}
|
||
|
|
||
|
/* Use vsprintf for the rest of the formatting if any. */
|
||
|
if (fmt[0] != '\0') {
|
||
|
va_start(ap, fmt);
|
||
|
vsnprintf(msg+strlen(msg), sizeof(msg)-strlen(msg), fmt, ap);
|
||
|
va_end(ap);
|
||
|
}
|
||
|
|
||
|
/* Log the message if the log level allows it to be logged. */
|
||
|
if (level >= server.verbosity)
|
||
|
redisLog(level,"%s %s",type,msg);
|
||
|
|
||
|
/* Publish the message via Pub/Sub if it's not a debugging one. */
|
||
|
if (level != REDIS_DEBUG) {
|
||
|
channel = createStringObject(type,strlen(type));
|
||
|
payload = createStringObject(msg,strlen(msg));
|
||
|
pubsubPublishMessage(channel,payload);
|
||
|
decrRefCount(channel);
|
||
|
decrRefCount(payload);
|
||
|
}
|
||
|
|
||
|
/* Call the notification script if applicable. */
|
||
|
if (level == REDIS_WARNING && ri != NULL) {
|
||
|
sentinelRedisInstance *master = (ri->flags & SRI_MASTER) ?
|
||
|
ri : ri->master;
|
||
|
if (master->notify_script) {
|
||
|
sentinelCallNotificationScript(master->notify_script,type,msg);
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
/* ========================== sentinelRedisInstance ========================= */
|
||
|
|
||
|
/* Create a redis instance, the following fields must be populated by the
|
||
|
* caller if needed:
|
||
|
* runid: set to NULL but will be populated once INFO output is received.
|
||
|
* info_refresh: is set to 0 to mean that we never received INFO so far.
|
||
|
*
|
||
|
* If SRI_MASTER is set into initial flags the instance is added to
|
||
|
* sentinel.masters table.
|
||
|
*
|
||
|
* if SRI_SLAVE or SRI_SENTINEL is set then 'master' must be not NULL and the
|
||
|
* instance is added into master->slaves or master->sentinels table.
|
||
|
*
|
||
|
* If the instance is a slave or sentinel, the name parameter is ignored and
|
||
|
* is created automatically as hostname:port.
|
||
|
*
|
||
|
* The function fails if hostname can't be resolved or port is out of range.
|
||
|
* When this happens NULL is returned and errno is set accordingly to the
|
||
|
* createSentinelAddr() function.
|
||
|
*
|
||
|
* The function may also fail and return NULL with errno set to EBUSY if
|
||
|
* a master or slave with the same name already exists. */
|
||
|
sentinelRedisInstance *createSentinelRedisInstance(char *name, int flags, char *hostname, int port, int quorum, sentinelRedisInstance *master) {
|
||
|
sentinelRedisInstance *ri;
|
||
|
sentinelAddr *addr;
|
||
|
dict *table;
|
||
|
char slavename[128], *sdsname;
|
||
|
|
||
|
redisAssert(flags & (SRI_MASTER|SRI_SLAVE|SRI_SENTINEL));
|
||
|
redisAssert((flags & SRI_MASTER) || master != NULL);
|
||
|
|
||
|
/* Check address validity. */
|
||
|
addr = createSentinelAddr(hostname,port);
|
||
|
if (addr == NULL) return NULL;
|
||
|
|
||
|
/* For slaves and sentinel we use ip:port as name. */
|
||
|
if (flags & (SRI_SLAVE|SRI_SENTINEL)) {
|
||
|
snprintf(slavename,sizeof(slavename),"%s:%d",hostname,port);
|
||
|
name = slavename;
|
||
|
}
|
||
|
|
||
|
/* Make sure the entry is not duplicated. This may happen when the same
|
||
|
* name for a master is used multiple times inside the configuration or
|
||
|
* if we try to add multiple times a slave or sentinel with same ip/port
|
||
|
* to a master. */
|
||
|
if (flags & SRI_MASTER) table = sentinel.masters;
|
||
|
else if (flags & SRI_SLAVE) table = master->slaves;
|
||
|
else if (flags & SRI_SENTINEL) table = master->sentinels;
|
||
|
sdsname = sdsnew(name);
|
||
|
if (dictFind(table,sdsname)) {
|
||
|
sdsfree(sdsname);
|
||
|
errno = EBUSY;
|
||
|
return NULL;
|
||
|
}
|
||
|
|
||
|
/* Create the instance object. */
|
||
|
ri = zmalloc(sizeof(*ri));
|
||
|
/* Note that all the instances are started in the disconnected state,
|
||
|
* the event loop will take care of connecting them. */
|
||
|
ri->flags = flags | SRI_DISCONNECTED;
|
||
|
ri->name = sdsname;
|
||
|
ri->runid = NULL;
|
||
|
ri->addr = addr;
|
||
|
ri->cc = NULL;
|
||
|
ri->pc = NULL;
|
||
|
ri->pending_commands = 0;
|
||
|
ri->cc_conn_time = 0;
|
||
|
ri->pc_conn_time = 0;
|
||
|
ri->pc_last_activity = 0;
|
||
|
ri->last_avail_time = mstime();
|
||
|
ri->last_pong_time = mstime();
|
||
|
ri->last_pub_time = mstime();
|
||
|
ri->last_hello_time = mstime();
|
||
|
ri->last_master_down_reply_time = mstime();
|
||
|
ri->s_down_since_time = 0;
|
||
|
ri->o_down_since_time = 0;
|
||
|
ri->down_after_period = master ? master->down_after_period :
|
||
|
SENTINEL_DOWN_AFTER_PERIOD;
|
||
|
ri->master_link_down_time = 0;
|
||
|
ri->slave_priority = SENTINEL_DEFAULT_SLAVE_PRIORITY;
|
||
|
ri->slave_reconf_sent_time = 0;
|
||
|
ri->slave_master_host = NULL;
|
||
|
ri->slave_master_port = 0;
|
||
|
ri->slave_master_link_status = SENTINEL_MASTER_LINK_STATUS_DOWN;
|
||
|
ri->sentinels = dictCreate(&instancesDictType,NULL);
|
||
|
ri->quorum = quorum;
|
||
|
ri->parallel_syncs = SENTINEL_DEFAULT_PARALLEL_SYNCS;
|
||
|
ri->master = master;
|
||
|
ri->slaves = dictCreate(&instancesDictType,NULL);
|
||
|
ri->info_refresh = 0;
|
||
|
|
||
|
/* Failover state. */
|
||
|
ri->leader = NULL;
|
||
|
ri->failover_state = SENTINEL_FAILOVER_STATE_NONE;
|
||
|
ri->failover_state_change_time = 0;
|
||
|
ri->failover_start_time = 0;
|
||
|
ri->failover_timeout = SENTINEL_DEFAULT_FAILOVER_TIMEOUT;
|
||
|
ri->promoted_slave = NULL;
|
||
|
ri->notify_script = NULL;
|
||
|
ri->client_reconfig_script = NULL;
|
||
|
|
||
|
/* Add into the right table. */
|
||
|
dictAdd(table, ri->name, ri);
|
||
|
return ri;
|
||
|
}
|
||
|
|
||
|
/* Release this instance and all its slaves, sentinels, hiredis connections.
|
||
|
* This function also takes care of unlinking the instance from the main
|
||
|
* masters table (if it is a master) or from its master sentinels/slaves table
|
||
|
* if it is a slave or sentinel. */
|
||
|
void releaseSentinelRedisInstance(sentinelRedisInstance *ri) {
|
||
|
/* Release all its slaves or sentinels if any. */
|
||
|
dictRelease(ri->sentinels);
|
||
|
dictRelease(ri->slaves);
|
||
|
|
||
|
/* Release hiredis connections. Note that redisAsyncFree() will call
|
||
|
* the disconnection callback. */
|
||
|
if (ri->cc) {
|
||
|
redisAsyncFree(ri->cc);
|
||
|
ri->cc = NULL;
|
||
|
}
|
||
|
if (ri->pc) {
|
||
|
redisAsyncFree(ri->pc);
|
||
|
ri->pc = NULL;
|
||
|
}
|
||
|
|
||
|
/* Free other resources. */
|
||
|
sdsfree(ri->name);
|
||
|
sdsfree(ri->runid);
|
||
|
sdsfree(ri->notify_script);
|
||
|
sdsfree(ri->client_reconfig_script);
|
||
|
sdsfree(ri->slave_master_host);
|
||
|
sdsfree(ri->leader);
|
||
|
releaseSentinelAddr(ri->addr);
|
||
|
|
||
|
/* Clear state into the master if needed. */
|
||
|
if ((ri->flags & SRI_SLAVE) && (ri->flags & SRI_PROMOTED) && ri->master)
|
||
|
ri->master->promoted_slave = NULL;
|
||
|
|
||
|
zfree(ri);
|
||
|
}
|
||
|
|
||
|
/* Lookup a slave in a master Redis instance, by ip and port. */
|
||
|
sentinelRedisInstance *sentinelRedisInstanceLookupSlave(
|
||
|
sentinelRedisInstance *ri, char *ip, int port)
|
||
|
{
|
||
|
sds key;
|
||
|
sentinelRedisInstance *slave;
|
||
|
|
||
|
redisAssert(ri->flags & SRI_MASTER);
|
||
|
key = sdscatprintf(sdsempty(),"%s:%d",ip,port);
|
||
|
slave = dictFetchValue(ri->slaves,key);
|
||
|
sdsfree(key);
|
||
|
return slave;
|
||
|
}
|
||
|
|
||
|
/* Return the name of the type of the instance as a string. */
|
||
|
const char *sentinelRedisInstanceTypeStr(sentinelRedisInstance *ri) {
|
||
|
if (ri->flags & SRI_MASTER) return "master";
|
||
|
else if (ri->flags & SRI_SLAVE) return "slave";
|
||
|
else if (ri->flags & SRI_SENTINEL) return "sentinel";
|
||
|
else return "unknown";
|
||
|
}
|
||
|
|
||
|
/* This function removes all the instances found in the dictionary of instances
|
||
|
* 'd', having either:
|
||
|
*
|
||
|
* 1) The same ip/port as specified.
|
||
|
* 2) The same runid.
|
||
|
*
|
||
|
* "1" and "2" don't need to verify at the same time, just one is enough.
|
||
|
* If "runid" is NULL it is not checked.
|
||
|
* Similarly if "ip" is NULL it is not checked.
|
||
|
*
|
||
|
* This function is useful because every time we add a new Sentinel into
|
||
|
* a master's Sentinels dictionary, we want to be very sure about not
|
||
|
* having duplicated instances for any reason. This is so important because
|
||
|
* we use those other sentinels in order to run our quorum protocol to
|
||
|
* understand if it's time to proceeed with the fail over.
|
||
|
*
|
||
|
* Making sure no duplication is possible we greately improve the robustness
|
||
|
* of the quorum (otherwise we may end counting the same instance multiple
|
||
|
* times for some reason).
|
||
|
*
|
||
|
* The function returns the number of Sentinels removed. */
|
||
|
int removeMatchingSentinelsFromMaster(sentinelRedisInstance *master, char *ip, int port, char *runid) {
|
||
|
dictIterator *di;
|
||
|
dictEntry *de;
|
||
|
int removed = 0;
|
||
|
|
||
|
di = dictGetSafeIterator(master->sentinels);
|
||
|
while((de = dictNext(di)) != NULL) {
|
||
|
sentinelRedisInstance *ri = dictGetVal(de);
|
||
|
|
||
|
if ((ri->runid && runid && strcmp(ri->runid,runid) == 0) ||
|
||
|
(ip && strcmp(ri->addr->ip,ip) == 0 && port == ri->addr->port))
|
||
|
{
|
||
|
dictDelete(master->sentinels,ri->name);
|
||
|
removed++;
|
||
|
}
|
||
|
}
|
||
|
dictReleaseIterator(di);
|
||
|
return removed;
|
||
|
}
|
||
|
|
||
|
/* Search an instance with the same runid, ip and port into a dictionary
|
||
|
* of instances. Return NULL if not found, otherwise return the instance
|
||
|
* pointer.
|
||
|
*
|
||
|
* runid or ip can be NULL. In such a case the search is performed only
|
||
|
* by the non-NULL field. */
|
||
|
sentinelRedisInstance *getSentinelRedisInstanceByAddrAndRunID(dict *instances, char *ip, int port, char *runid) {
|
||
|
dictIterator *di;
|
||
|
dictEntry *de;
|
||
|
sentinelRedisInstance *instance = NULL;
|
||
|
|
||
|
redisAssert(ip || runid); /* User must pass at least one search param. */
|
||
|
di = dictGetIterator(instances);
|
||
|
while((de = dictNext(di)) != NULL) {
|
||
|
sentinelRedisInstance *ri = dictGetVal(de);
|
||
|
|
||
|
if (runid && !ri->runid) continue;
|
||
|
if ((runid == NULL || strcmp(ri->runid, runid) == 0) &&
|
||
|
(ip == NULL || (strcmp(ri->addr->ip, ip) == 0 &&
|
||
|
ri->addr->port == port)))
|
||
|
{
|
||
|
instance = ri;
|
||
|
break;
|
||
|
}
|
||
|
}
|
||
|
dictReleaseIterator(di);
|
||
|
return instance;
|
||
|
}
|
||
|
|
||
|
/* Simple master lookup by name */
|
||
|
sentinelRedisInstance *sentinelGetMasterByName(char *name) {
|
||
|
sentinelRedisInstance *ri;
|
||
|
sds sdsname = sdsnew(name);
|
||
|
|
||
|
ri = dictFetchValue(sentinel.masters,sdsname);
|
||
|
sdsfree(sdsname);
|
||
|
return ri;
|
||
|
}
|
||
|
|
||
|
/* Add the specified flags to all the instances in the specified dictionary. */
|
||
|
void sentinelAddFlagsToDictOfRedisInstances(dict *instances, int flags) {
|
||
|
dictIterator *di;
|
||
|
dictEntry *de;
|
||
|
|
||
|
di = dictGetIterator(instances);
|
||
|
while((de = dictNext(di)) != NULL) {
|
||
|
sentinelRedisInstance *ri = dictGetVal(de);
|
||
|
ri->flags |= flags;
|
||
|
}
|
||
|
dictReleaseIterator(di);
|
||
|
}
|
||
|
|
||
|
/* Remove the specified flags to all the instances in the specified
|
||
|
* dictionary. */
|
||
|
void sentinelDelFlagsToDictOfRedisInstances(dict *instances, int flags) {
|
||
|
dictIterator *di;
|
||
|
dictEntry *de;
|
||
|
|
||
|
di = dictGetIterator(instances);
|
||
|
while((de = dictNext(di)) != NULL) {
|
||
|
sentinelRedisInstance *ri = dictGetVal(de);
|
||
|
ri->flags &= ~flags;
|
||
|
}
|
||
|
dictReleaseIterator(di);
|
||
|
}
|
||
|
|
||
|
/* Reset the state of a monitored master:
|
||
|
* 1) Remove all slaves.
|
||
|
* 2) Remove all sentinels.
|
||
|
* 3) Remove most of the flags resulting from runtime operations.
|
||
|
* 4) Reset timers to their default value.
|
||
|
* 5) In the process of doing this undo the failover if in progress.
|
||
|
* 6) Disconnect the connections with the master (will reconnect automatically).
|
||
|
*/
|
||
|
void sentinelResetMaster(sentinelRedisInstance *ri) {
|
||
|
redisAssert(ri->flags & SRI_MASTER);
|
||
|
dictRelease(ri->slaves);
|
||
|
dictRelease(ri->sentinels);
|
||
|
ri->slaves = dictCreate(&instancesDictType,NULL);
|
||
|
ri->sentinels = dictCreate(&instancesDictType,NULL);
|
||
|
if (ri->cc) redisAsyncFree(ri->cc);
|
||
|
if (ri->pc) redisAsyncFree(ri->pc);
|
||
|
ri->flags &= SRI_MASTER|SRI_CAN_FAILOVER|SRI_DISCONNECTED;
|
||
|
if (ri->leader) {
|
||
|
sdsfree(ri->leader);
|
||
|
ri->leader = NULL;
|
||
|
}
|
||
|
ri->failover_state = SENTINEL_FAILOVER_STATE_NONE;
|
||
|
ri->failover_state_change_time = 0;
|
||
|
ri->failover_start_time = 0;
|
||
|
ri->promoted_slave = NULL;
|
||
|
sentinelEvent(REDIS_WARNING,"+reset-master",ri,"%@");
|
||
|
}
|
||
|
|
||
|
/* Call sentinelResetMaster() on every master with a name matching the specified
|
||
|
* pattern. */
|
||
|
int sentinelResetMastersByPattern(char *pattern) {
|
||
|
dictIterator *di;
|
||
|
dictEntry *de;
|
||
|
int reset = 0;
|
||
|
|
||
|
di = dictGetIterator(sentinel.masters);
|
||
|
while((de = dictNext(di)) != NULL) {
|
||
|
sentinelRedisInstance *ri = dictGetVal(de);
|
||
|
|
||
|
if (ri->name) {
|
||
|
if (stringmatch(pattern,ri->name,0)) {
|
||
|
sentinelResetMaster(ri);
|
||
|
reset++;
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
dictReleaseIterator(di);
|
||
|
return reset;
|
||
|
}
|
||
|
|
||
|
/* ============================ Config handling ============================= */
|
||
|
char *sentinelHandleConfiguration(char **argv, int argc) {
|
||
|
sentinelRedisInstance *ri;
|
||
|
|
||
|
if (!strcasecmp(argv[0],"monitor") && argc == 5) {
|
||
|
/* monitor <name> <host> <port> <quorum> */
|
||
|
int quorum = atoi(argv[4]);
|
||
|
|
||
|
if (quorum <= 0) return "Quorum must be 1 or greater.";
|
||
|
if (createSentinelRedisInstance(argv[1],SRI_MASTER,argv[2],
|
||
|
atoi(argv[3]),quorum,NULL) == NULL)
|
||
|
{
|
||
|
switch(errno) {
|
||
|
case EBUSY: return "Duplicated master name.";
|
||
|
case ENOENT: return "Can't resolve master instance hostname.";
|
||
|
case EINVAL: return "Invalid port number";
|
||
|
}
|
||
|
}
|
||
|
} else if (!strcasecmp(argv[0],"down-after-milliseconds") && argc == 3) {
|
||
|
/* down-after-milliseconds <name> <milliseconds> */
|
||
|
ri = sentinelGetMasterByName(argv[1]);
|
||
|
if (!ri) return "No such master with specified name.";
|
||
|
ri->down_after_period = atoi(argv[2]);
|
||
|
if (ri->down_after_period <= 0)
|
||
|
return "negative or zero time parameter.";
|
||
|
} else if (!strcasecmp(argv[0],"failover-timeout") && argc == 3) {
|
||
|
/* failover-timeout <name> <milliseconds> */
|
||
|
ri = sentinelGetMasterByName(argv[1]);
|
||
|
if (!ri) return "No such master with specified name.";
|
||
|
ri->failover_timeout = atoi(argv[2]);
|
||
|
if (ri->failover_timeout <= 0)
|
||
|
return "negative or zero time parameter.";
|
||
|
} else if (!strcasecmp(argv[0],"can-failover") && argc == 3) {
|
||
|
/* can-failover <name> <yes/no> */
|
||
|
int yesno = yesnotoi(argv[2]);
|
||
|
|
||
|
ri = sentinelGetMasterByName(argv[1]);
|
||
|
if (!ri) return "No such master with specified name.";
|
||
|
if (yesno == -1) return "Argument must be either yes or no.";
|
||
|
if (yesno)
|
||
|
ri->flags |= SRI_CAN_FAILOVER;
|
||
|
else
|
||
|
ri->flags &= ~SRI_CAN_FAILOVER;
|
||
|
} else if (!strcasecmp(argv[0],"parallel-syncs") && argc == 3) {
|
||
|
/* parallel-syncs <name> <milliseconds> */
|
||
|
ri = sentinelGetMasterByName(argv[1]);
|
||
|
if (!ri) return "No such master with specified name.";
|
||
|
ri->parallel_syncs = atoi(argv[2]);
|
||
|
} else {
|
||
|
return "Unrecognized sentinel configuration statement.";
|
||
|
}
|
||
|
return NULL;
|
||
|
}
|
||
|
|
||
|
/* ====================== hiredis connection handling ======================= */
|
||
|
|
||
|
/* This function takes an hiredis context that is in an error condition
|
||
|
* and make sure to mark the instance as disconnected performing the
|
||
|
* cleanup needed.
|
||
|
*
|
||
|
* Note: we don't free the hiredis context as hiredis will do it for us
|
||
|
* for async conenctions. */
|
||
|
void sentinelDisconnectInstanceFromContext(const redisAsyncContext *c) {
|
||
|
sentinelRedisInstance *ri = c->data;
|
||
|
int pubsub = (ri->pc == c);
|
||
|
|
||
|
sentinelEvent(REDIS_DEBUG, pubsub ? "-pubsub-link" : "-cmd-link", ri,
|
||
|
"%@ #%s", c->errstr);
|
||
|
if (pubsub)
|
||
|
ri->pc = NULL;
|
||
|
else
|
||
|
ri->cc = NULL;
|
||
|
ri->flags |= SRI_DISCONNECTED;
|
||
|
}
|
||
|
|
||
|
void sentinelLinkEstablishedCallback(const redisAsyncContext *c, int status) {
|
||
|
if (status != REDIS_OK) {
|
||
|
sentinelDisconnectInstanceFromContext(c);
|
||
|
} else {
|
||
|
sentinelRedisInstance *ri = c->data;
|
||
|
int pubsub = (ri->pc == c);
|
||
|
|
||
|
sentinelEvent(REDIS_DEBUG, pubsub ? "+pubsub-link" : "+cmd-link", ri,
|
||
|
"%@");
|
||
|
}
|
||
|
}
|
||
|
|
||
|
void sentinelDisconnectCallback(const redisAsyncContext *c, int status) {
|
||
|
sentinelDisconnectInstanceFromContext(c);
|
||
|
}
|
||
|
|
||
|
/* Create the async connections for the specified instance if the instance
|
||
|
* is disconnected. Note that the SRI_DISCONNECTED flag is set even if just
|
||
|
* one of the two links (commands and pub/sub) is missing. */
|
||
|
void sentinelReconnectInstance(sentinelRedisInstance *ri) {
|
||
|
if (!(ri->flags & SRI_DISCONNECTED)) return;
|
||
|
|
||
|
/* Commands connection. */
|
||
|
if (ri->cc == NULL) {
|
||
|
ri->cc = redisAsyncConnect(ri->addr->ip,ri->addr->port);
|
||
|
if (ri->cc->err) {
|
||
|
sentinelEvent(REDIS_DEBUG,"-cmd-link-reconnection",ri,"%@ #%s",
|
||
|
ri->cc->errstr);
|
||
|
redisAsyncFree(ri->cc);
|
||
|
ri->cc = NULL;
|
||
|
} else {
|
||
|
ri->cc_conn_time = mstime();
|
||
|
ri->cc->data = ri;
|
||
|
redisAeAttach(server.el,ri->cc);
|
||
|
redisAsyncSetConnectCallback(ri->cc,
|
||
|
sentinelLinkEstablishedCallback);
|
||
|
redisAsyncSetDisconnectCallback(ri->cc,
|
||
|
sentinelDisconnectCallback);
|
||
|
}
|
||
|
}
|
||
|
/* Pub / Sub */
|
||
|
if ((ri->flags & SRI_MASTER) && ri->pc == NULL) {
|
||
|
ri->pc = redisAsyncConnect(ri->addr->ip,ri->addr->port);
|
||
|
if (ri->pc->err) {
|
||
|
sentinelEvent(REDIS_DEBUG,"-pubsub-link-reconnection",ri,"%@ #%s",
|
||
|
ri->pc->errstr);
|
||
|
redisAsyncFree(ri->pc);
|
||
|
ri->pc = NULL;
|
||
|
} else {
|
||
|
int retval;
|
||
|
|
||
|
ri->pc_conn_time = mstime();
|
||
|
ri->pc->data = ri;
|
||
|
redisAeAttach(server.el,ri->pc);
|
||
|
redisAsyncSetConnectCallback(ri->pc,
|
||
|
sentinelLinkEstablishedCallback);
|
||
|
redisAsyncSetDisconnectCallback(ri->pc,
|
||
|
sentinelDisconnectCallback);
|
||
|
/* Now we subscribe to the Sentinels "Hello" channel. */
|
||
|
retval = redisAsyncCommand(ri->pc,
|
||
|
sentinelReceiveHelloMessages, NULL, "SUBSCRIBE %s",
|
||
|
SENTINEL_HELLO_CHANNEL);
|
||
|
if (retval != REDIS_OK) {
|
||
|
/* If we can't subscribe, the Pub/Sub connection is useless
|
||
|
* and we can simply disconnect it and try again. */
|
||
|
redisAsyncFree(ri->pc);
|
||
|
ri->pc = NULL;
|
||
|
return;
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
/* Clear the DISCONNECTED flags only if we have both the connections
|
||
|
* (or just the commands connection if this is a slave or a
|
||
|
* sentinel instance). */
|
||
|
if (ri->cc && (ri->flags & (SRI_SLAVE|SRI_SENTINEL) || ri->pc))
|
||
|
ri->flags &= ~SRI_DISCONNECTED;
|
||
|
}
|
||
|
|
||
|
/* ======================== Redis instances pinging ======================== */
|
||
|
|
||
|
/* Process the INFO output from masters. */
|
||
|
void sentinelRefreshInstanceInfo(sentinelRedisInstance *ri, const char *info) {
|
||
|
sds *lines;
|
||
|
int numlines, j;
|
||
|
int role = 0;
|
||
|
|
||
|
|
||
|
/* The following fields must be reset to a given value in the case they
|
||
|
* are not found at all in the INFO output. */
|
||
|
ri->master_link_down_time = 0;
|
||
|
|
||
|
/* Process line by line. */
|
||
|
lines = sdssplitlen(info,strlen(info),"\r\n",2,&numlines);
|
||
|
for (j = 0; j < numlines; j++) {
|
||
|
sentinelRedisInstance *slave;
|
||
|
sds l = lines[j];
|
||
|
|
||
|
/* run_id:<40 hex chars>*/
|
||
|
if (sdslen(l) >= 47 && !memcmp(l,"run_id:",7)) {
|
||
|
if (ri->runid == NULL) {
|
||
|
ri->runid = sdsnewlen(l+7,40);
|
||
|
} else {
|
||
|
/* TODO: check if run_id has changed. This means the
|
||
|
* instance has been restarted, we want to set a flag
|
||
|
* and notify this event. */
|
||
|
}
|
||
|
}
|
||
|
|
||
|
/* slave0:<ip>,<port>,<state> */
|
||
|
if ((ri->flags & SRI_MASTER) &&
|
||
|
sdslen(l) >= 7 &&
|
||
|
!memcmp(l,"slave",5) && isdigit(l[5]))
|
||
|
{
|
||
|
char *ip, *port, *end;
|
||
|
|
||
|
ip = strchr(l,':'); if (!ip) continue;
|
||
|
ip++; /* Now ip points to start of ip address. */
|
||
|
port = strchr(ip,','); if (!port) continue;
|
||
|
*port = '\0'; /* nul term for easy access. */
|
||
|
port++; /* Now port points to start of port number. */
|
||
|
end = strchr(port,','); if (!end) continue;
|
||
|
*end = '\0'; /* nul term for easy access. */
|
||
|
|
||
|
/* Check if we already have this slave into our table,
|
||
|
* otherwise add it. */
|
||
|
if (sentinelRedisInstanceLookupSlave(ri,ip,atoi(port)) == NULL) {
|
||
|
if ((slave = createSentinelRedisInstance(NULL,SRI_SLAVE,ip,
|
||
|
atoi(port), ri->quorum,ri)) != NULL)
|
||
|
{
|
||
|
sentinelEvent(REDIS_NOTICE,"+slave",slave,"%@");
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
/* master_link_down_since_seconds:<seconds> */
|
||
|
if (sdslen(l) >= 32 &&
|
||
|
!memcmp(l,"master_link_down_since_seconds",30))
|
||
|
{
|
||
|
ri->master_link_down_time = strtoll(l+31,NULL,10)*1000;
|
||
|
}
|
||
|
|
||
|
/* role:<role> */
|
||
|
if (!memcmp(l,"role:master",11)) role = SRI_MASTER;
|
||
|
else if (!memcmp(l,"role:slave",10)) role = SRI_SLAVE;
|
||
|
|
||
|
if (role == SRI_SLAVE) {
|
||
|
/* master_host:<host> */
|
||
|
if (sdslen(l) >= 12 && !memcmp(l,"master_host:",12)) {
|
||
|
sdsfree(ri->slave_master_host);
|
||
|
ri->slave_master_host = sdsnew(l+12);
|
||
|
}
|
||
|
|
||
|
/* master_port:<port> */
|
||
|
if (sdslen(l) >= 12 && !memcmp(l,"master_port:",12))
|
||
|
ri->slave_master_port = atoi(l+12);
|
||
|
|
||
|
/* master_link_status:<status> */
|
||
|
if (sdslen(l) >= 19 && !memcmp(l,"master_link_status:",19)) {
|
||
|
ri->slave_master_link_status =
|
||
|
(strcasecmp(l+19,"up") == 0) ?
|
||
|
SENTINEL_MASTER_LINK_STATUS_UP :
|
||
|
SENTINEL_MASTER_LINK_STATUS_DOWN;
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
ri->info_refresh = mstime();
|
||
|
sdsfreesplitres(lines,numlines);
|
||
|
|
||
|
if (sentinel.tilt) return;
|
||
|
|
||
|
/* Act if a slave turned into a master. */
|
||
|
if ((ri->flags & SRI_SLAVE) && role == SRI_MASTER) {
|
||
|
if (ri->flags & SRI_PROMOTED) {
|
||
|
/* If this is a promoted slave we can change state to the
|
||
|
* failover state machine. */
|
||
|
if (ri->master &&
|
||
|
(ri->master->flags & SRI_FAILOVER_IN_PROGRESS) &&
|
||
|
(ri->master->flags & SRI_I_AM_THE_LEADER) &&
|
||
|
(ri->master->failover_state ==
|
||
|
SENTINEL_FAILOVER_STATE_WAIT_PROMOTION))
|
||
|
{
|
||
|
ri->master->failover_state = SENTINEL_FAILOVER_STATE_RECONF_SLAVES;
|
||
|
ri->master->failover_state_change_time = mstime();
|
||
|
sentinelEvent(REDIS_WARNING,"+promoted-slave",ri,"%@");
|
||
|
sentinelEvent(REDIS_WARNING,"+failover-state-reconf-slaves",
|
||
|
ri->master,"%@");
|
||
|
}
|
||
|
} else {
|
||
|
/* Otherwise we interpret this as the start of the failover. */
|
||
|
if (ri->master &&
|
||
|
(ri->master->flags & SRI_FAILOVER_IN_PROGRESS) == 0)
|
||
|
{
|
||
|
ri->master->flags |= SRI_FAILOVER_IN_PROGRESS;
|
||
|
sentinelEvent(REDIS_WARNING,"failover-detected",ri->master,"%@");
|
||
|
ri->master->failover_state = SENTINEL_FAILOVER_STATE_DETECT_END;
|
||
|
ri->master->failover_state_change_time = mstime();
|
||
|
ri->master->promoted_slave = ri;
|
||
|
ri->flags |= SRI_PROMOTED;
|
||
|
/* We are an observer, so we can only assume that the leader
|
||
|
* is reconfiguring the slave instances. For this reason we
|
||
|
* set all the instances as RECONF_SENT waiting for progresses
|
||
|
* on this side. */
|
||
|
sentinelAddFlagsToDictOfRedisInstances(ri->master->slaves,
|
||
|
SRI_RECONF_SENT);
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
/* Detect if the slave that is in the process of being reconfigured
|
||
|
* changed state. */
|
||
|
if ((ri->flags & SRI_SLAVE) && role == SRI_SLAVE &&
|
||
|
(ri->flags & (SRI_RECONF_SENT|SRI_RECONF_INPROG)))
|
||
|
{
|
||
|
/* SRI_RECONF_SENT -> SRI_RECONF_INPROG. */
|
||
|
if ((ri->flags & SRI_RECONF_SENT) &&
|
||
|
ri->slave_master_host &&
|
||
|
strcmp(ri->slave_master_host,
|
||
|
ri->master->promoted_slave->addr->ip) == 0 &&
|
||
|
ri->slave_master_port == ri->master->promoted_slave->addr->port)
|
||
|
{
|
||
|
ri->flags &= ~SRI_RECONF_SENT;
|
||
|
ri->flags |= SRI_RECONF_INPROG;
|
||
|
sentinelEvent(REDIS_NOTICE,"+slave-reconf-inprog",ri,"%@");
|
||
|
}
|
||
|
|
||
|
/* SRI_RECONF_INPROG -> SRI_RECONF_DONE */
|
||
|
if ((ri->flags & SRI_RECONF_INPROG) &&
|
||
|
ri->slave_master_link_status == SENTINEL_MASTER_LINK_STATUS_UP)
|
||
|
{
|
||
|
ri->flags &= ~SRI_RECONF_INPROG;
|
||
|
ri->flags |= SRI_RECONF_DONE;
|
||
|
sentinelEvent(REDIS_NOTICE,"+slave-reconf-done",ri,"%@");
|
||
|
/* If we are moving forward (a new slave is now configured)
|
||
|
* we update the change_time as we are conceptually passing
|
||
|
* to the next slave. */
|
||
|
ri->failover_state_change_time = mstime();
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
void sentinelInfoReplyCallback(redisAsyncContext *c, void *reply, void *privdata) {
|
||
|
sentinelRedisInstance *ri = c->data;
|
||
|
redisReply *r;
|
||
|
|
||
|
ri->pending_commands--;
|
||
|
if (!reply) return;
|
||
|
r = reply;
|
||
|
|
||
|
if (r->type == REDIS_REPLY_STRING) {
|
||
|
sentinelRefreshInstanceInfo(ri,r->str);
|
||
|
}
|
||
|
}
|
||
|
|
||
|
/* Just discard the reply. We use this when we are not monitoring the return
|
||
|
* value of the command but its effects directly. */
|
||
|
void sentinelDiscardReplyCallback(redisAsyncContext *c, void *reply, void *privdata) {
|
||
|
sentinelRedisInstance *ri = c->data;
|
||
|
|
||
|
ri->pending_commands--;
|
||
|
}
|
||
|
|
||
|
void sentinelPingReplyCallback(redisAsyncContext *c, void *reply, void *privdata) {
|
||
|
sentinelRedisInstance *ri = c->data;
|
||
|
redisReply *r;
|
||
|
|
||
|
ri->pending_commands--;
|
||
|
if (!reply) return;
|
||
|
r = reply;
|
||
|
|
||
|
if (r->type == REDIS_REPLY_STATUS ||
|
||
|
r->type == REDIS_REPLY_ERROR) {
|
||
|
/* Update the "instance available" field only if this is an
|
||
|
* acceptable reply. */
|
||
|
if (strncmp(r->str,"PONG",4) == 0 ||
|
||
|
strncmp(r->str,"LOADING",7) == 0 ||
|
||
|
strncmp(r->str,"MASTERDOWN",10) == 0)
|
||
|
{
|
||
|
ri->last_avail_time = mstime();
|
||
|
}
|
||
|
}
|
||
|
ri->last_pong_time = mstime();
|
||
|
}
|
||
|
|
||
|
/* This is called when we get the reply about the PUBLISH command we send
|
||
|
* to the master to advertise this sentinel. */
|
||
|
void sentinelPublishReplyCallback(redisAsyncContext *c, void *reply, void *privdata) {
|
||
|
sentinelRedisInstance *ri = c->data;
|
||
|
redisReply *r;
|
||
|
|
||
|
ri->pending_commands--;
|
||
|
if (!reply) return;
|
||
|
r = reply;
|
||
|
|
||
|
/* Only update pub_time if we actually published our message. Otherwise
|
||
|
* we'll retry against in 100 milliseconds. */
|
||
|
if (r->type != REDIS_REPLY_ERROR)
|
||
|
ri->last_pub_time = mstime();
|
||
|
}
|
||
|
|
||
|
/* This is our Pub/Sub callback for the Hello channel. It's useful in order
|
||
|
* to discover other sentinels attached at the same master. */
|
||
|
void sentinelReceiveHelloMessages(redisAsyncContext *c, void *reply, void *privdata) {
|
||
|
sentinelRedisInstance *ri = c->data;
|
||
|
redisReply *r;
|
||
|
|
||
|
if (!reply) return;
|
||
|
r = reply;
|
||
|
|
||
|
/* Update the last activity in the pubsub channel. Note that since we
|
||
|
* receive our messages as well this timestamp can be used to detect
|
||
|
* if the link is probably diconnected even if it seems otherwise. */
|
||
|
ri->pc_last_activity = mstime();
|
||
|
|
||
|
/* Sanity check in the reply we expect, so that the code that follows
|
||
|
* can avoid to check for details. */
|
||
|
if (r->type != REDIS_REPLY_ARRAY ||
|
||
|
r->elements != 3 ||
|
||
|
r->element[0]->type != REDIS_REPLY_STRING ||
|
||
|
r->element[1]->type != REDIS_REPLY_STRING ||
|
||
|
r->element[2]->type != REDIS_REPLY_STRING ||
|
||
|
strcmp(r->element[0]->str,"message") != 0) return;
|
||
|
|
||
|
/* We are not interested in meeting ourselves */
|
||
|
if (strstr(r->element[2]->str,server.runid) != NULL) return;
|
||
|
|
||
|
{
|
||
|
int numtokens, port, removed, canfailover;
|
||
|
char **token = sdssplitlen(r->element[2]->str,
|
||
|
r->element[2]->len,
|
||
|
":",1,&numtokens);
|
||
|
sentinelRedisInstance *sentinel;
|
||
|
|
||
|
if (numtokens == 4) {
|
||
|
/* First, try to see if we already have this sentinel. */
|
||
|
port = atoi(token[1]);
|
||
|
canfailover = atoi(token[3]);
|
||
|
sentinel = getSentinelRedisInstanceByAddrAndRunID(
|
||
|
ri->sentinels,token[0],port,token[2]);
|
||
|
|
||
|
if (!sentinel) {
|
||
|
/* If not, remove all the sentinels that have the same runid
|
||
|
* OR the same ip/port, because it's either a restart or a
|
||
|
* network topology change. */
|
||
|
removed = removeMatchingSentinelsFromMaster(ri,token[0],port,
|
||
|
token[2]);
|
||
|
if (removed) {
|
||
|
sentinelEvent(REDIS_NOTICE,"-dup-sentinel",ri,
|
||
|
"%@ #duplicate of %s:%d or %s",
|
||
|
token[0],port,token[2]);
|
||
|
}
|
||
|
|
||
|
/* Add the new sentinel. */
|
||
|
sentinel = createSentinelRedisInstance(NULL,SRI_SENTINEL,
|
||
|
token[0],port,ri->quorum,ri);
|
||
|
if (sentinel) {
|
||
|
sentinelEvent(REDIS_NOTICE,"+sentinel",sentinel,"%@");
|
||
|
/* The runid is NULL after a new instance creation and
|
||
|
* for Sentinels we don't have a later chance to fill it,
|
||
|
* so do it now. */
|
||
|
sentinel->runid = sdsnew(token[2]);
|
||
|
}
|
||
|
}
|
||
|
|
||
|
/* Update the state of the Sentinel. */
|
||
|
if (sentinel) {
|
||
|
sentinel->last_hello_time = mstime();
|
||
|
if (canfailover)
|
||
|
sentinel->flags |= SRI_CAN_FAILOVER;
|
||
|
else
|
||
|
sentinel->flags &= ~SRI_CAN_FAILOVER;
|
||
|
}
|
||
|
}
|
||
|
sdsfreesplitres(token,numtokens);
|
||
|
}
|
||
|
}
|
||
|
|
||
|
void sentinelPingInstance(sentinelRedisInstance *ri) {
|
||
|
mstime_t now = mstime();
|
||
|
mstime_t info_period;
|
||
|
int retval;
|
||
|
|
||
|
/* Return ASAP if we have already a PING or INFO already pending, or
|
||
|
* in the case the instance is not properly connected. */
|
||
|
if (ri->flags & SRI_DISCONNECTED) return;
|
||
|
|
||
|
/* For INFO, PING, PUBLISH that are not critical commands to send we
|
||
|
* also have a limit of SENTINEL_MAX_PENDING_COMMANDS. We don't
|
||
|
* want to use a lot of memory just because a link is not working
|
||
|
* properly (note that anyway there is a redundant protection about this,
|
||
|
* that is, the link will be disconnected and reconnected if a long
|
||
|
* timeout condition is detected. */
|
||
|
if (ri->pending_commands >= SENTINEL_MAX_PENDING_COMMANDS) return;
|
||
|
|
||
|
/* If this is a slave of a master in O_DOWN condition we start sending
|
||
|
* it INFO every second, instead of the usual SENTINEL_INFO_PERIOD
|
||
|
* period. In this state we want to closely monitor slaves in case they
|
||
|
* are turned into masters by another Sentinel, or by the sysadmin. */
|
||
|
if ((ri->flags & SRI_SLAVE) &&
|
||
|
(ri->master->flags & (SRI_O_DOWN|SRI_FAILOVER_IN_PROGRESS))) {
|
||
|
info_period = 1000;
|
||
|
} else {
|
||
|
info_period = SENTINEL_INFO_PERIOD;
|
||
|
}
|
||
|
|
||
|
if ((ri->flags & SRI_SENTINEL) == 0 &&
|
||
|
(ri->info_refresh == 0 ||
|
||
|
(now - ri->info_refresh) > info_period))
|
||
|
{
|
||
|
/* Send INFO to masters and slaves, not sentinels. */
|
||
|
retval = redisAsyncCommand(ri->cc,
|
||
|
sentinelInfoReplyCallback, NULL, "INFO");
|
||
|
if (retval != REDIS_OK) return;
|
||
|
ri->pending_commands++;
|
||
|
} else if ((now - ri->last_pong_time) > SENTINEL_PING_PERIOD) {
|
||
|
/* Send PING to all the three kinds of instances. */
|
||
|
retval = redisAsyncCommand(ri->cc,
|
||
|
sentinelPingReplyCallback, NULL, "PING");
|
||
|
if (retval != REDIS_OK) return;
|
||
|
ri->pending_commands++;
|
||
|
} else if ((ri->flags & SRI_MASTER) &&
|
||
|
(now - ri->last_pub_time) > SENTINEL_PUBLISH_PERIOD)
|
||
|
{
|
||
|
/* PUBLISH hello messages only to masters. */
|
||
|
struct sockaddr_in sa;
|
||
|
socklen_t salen = sizeof(sa);
|
||
|
|
||
|
if (getsockname(ri->cc->c.fd,(struct sockaddr*)&sa,&salen) != -1) {
|
||
|
char myaddr[128];
|
||
|
|
||
|
snprintf(myaddr,sizeof(myaddr),"%s:%d:%s:%d",
|
||
|
inet_ntoa(sa.sin_addr), server.port, server.runid,
|
||
|
(ri->flags & SRI_CAN_FAILOVER) != 0);
|
||
|
retval = redisAsyncCommand(ri->cc,
|
||
|
sentinelPublishReplyCallback, NULL, "PUBLISH %s %s",
|
||
|
SENTINEL_HELLO_CHANNEL,myaddr);
|
||
|
if (retval != REDIS_OK) return;
|
||
|
ri->pending_commands++;
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
/* =========================== SENTINEL command ============================= */
|
||
|
|
||
|
const char *sentinelFailoverStateStr(int state) {
|
||
|
switch(state) {
|
||
|
case SENTINEL_FAILOVER_STATE_NONE: return "none";
|
||
|
case SENTINEL_FAILOVER_STATE_WAIT_START: return "wait_start";
|
||
|
case SENTINEL_FAILOVER_STATE_SELECT_SLAVE: return "select_slave";
|
||
|
case SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE: return "send_slaveof_noone";
|
||
|
case SENTINEL_FAILOVER_STATE_WAIT_PROMOTION: return "wait_promotion";
|
||
|
case SENTINEL_FAILOVER_STATE_RECONF_SLAVES: return "reconf_slaves";
|
||
|
case SENTINEL_FAILOVER_STATE_ALERT_CLIENTS: return "alert_clients";
|
||
|
case SENTINEL_FAILOVER_STATE_DETECT_END: return "detect_end";
|
||
|
case SENTINEL_FAILOVER_STATE_UPDATE_CONFIG: return "update_config";
|
||
|
default: return "unknown";
|
||
|
}
|
||
|
}
|
||
|
|
||
|
/* Redis instance to Redis protocol representation. */
|
||
|
void addReplySentinelRedisInstance(redisClient *c, sentinelRedisInstance *ri) {
|
||
|
char *flags = sdsempty();
|
||
|
void *mbl;
|
||
|
int fields = 0;
|
||
|
|
||
|
mbl = addDeferredMultiBulkLength(c);
|
||
|
|
||
|
addReplyBulkCString(c,"name");
|
||
|
addReplyBulkCString(c,ri->name);
|
||
|
fields++;
|
||
|
|
||
|
addReplyBulkCString(c,"ip");
|
||
|
addReplyBulkCString(c,ri->addr->ip);
|
||
|
fields++;
|
||
|
|
||
|
addReplyBulkCString(c,"port");
|
||
|
addReplyBulkLongLong(c,ri->addr->port);
|
||
|
fields++;
|
||
|
|
||
|
addReplyBulkCString(c,"runid");
|
||
|
addReplyBulkCString(c,ri->runid ? ri->runid : "");
|
||
|
fields++;
|
||
|
|
||
|
addReplyBulkCString(c,"flags");
|
||
|
if (ri->flags & SRI_S_DOWN) flags = sdscat(flags,"s_down,");
|
||
|
if (ri->flags & SRI_O_DOWN) flags = sdscat(flags,"o_down,");
|
||
|
if (ri->flags & SRI_MASTER) flags = sdscat(flags,"master,");
|
||
|
if (ri->flags & SRI_SLAVE) flags = sdscat(flags,"slave,");
|
||
|
if (ri->flags & SRI_SENTINEL) flags = sdscat(flags,"sentinel,");
|
||
|
if (ri->flags & SRI_DISCONNECTED) flags = sdscat(flags,"disconnected,");
|
||
|
if (ri->flags & SRI_MASTER_DOWN) flags = sdscat(flags,"master_down,");
|
||
|
if (ri->flags & SRI_FAILOVER_IN_PROGRESS)
|
||
|
flags = sdscat(flags,"failover_in_progress,");
|
||
|
if (ri->flags & SRI_I_AM_THE_LEADER)
|
||
|
flags = sdscat(flags,"i_am_the_leader,");
|
||
|
if (ri->flags & SRI_PROMOTED) flags = sdscat(flags,"promoted,");
|
||
|
if (ri->flags & SRI_RECONF_SENT) flags = sdscat(flags,"reconf_sent,");
|
||
|
if (ri->flags & SRI_RECONF_INPROG) flags = sdscat(flags,"reconf_inprog,");
|
||
|
if (ri->flags & SRI_RECONF_DONE) flags = sdscat(flags,"reconf_done,");
|
||
|
|
||
|
if (sdslen(flags) != 0) flags = sdsrange(flags,0,-2); /* remove last "," */
|
||
|
addReplyBulkCString(c,flags);
|
||
|
sdsfree(flags);
|
||
|
fields++;
|
||
|
|
||
|
addReplyBulkCString(c,"pending-commands");
|
||
|
addReplyBulkLongLong(c,ri->pending_commands);
|
||
|
fields++;
|
||
|
|
||
|
if (ri->flags & SRI_FAILOVER_IN_PROGRESS) {
|
||
|
addReplyBulkCString(c,"failover-state");
|
||
|
addReplyBulkCString(c,(char*)sentinelFailoverStateStr(ri->failover_state));
|
||
|
fields++;
|
||
|
}
|
||
|
|
||
|
addReplyBulkCString(c,"last-ok-ping-reply");
|
||
|
addReplyBulkLongLong(c,mstime() - ri->last_avail_time);
|
||
|
fields++;
|
||
|
|
||
|
addReplyBulkCString(c,"last-ping-reply");
|
||
|
addReplyBulkLongLong(c,mstime() - ri->last_pong_time);
|
||
|
fields++;
|
||
|
|
||
|
if (ri->flags & SRI_S_DOWN) {
|
||
|
addReplyBulkCString(c,"s-down-time");
|
||
|
addReplyBulkLongLong(c,mstime()-ri->s_down_since_time);
|
||
|
fields++;
|
||
|
}
|
||
|
|
||
|
if (ri->flags & SRI_O_DOWN) {
|
||
|
addReplyBulkCString(c,"o-down-time");
|
||
|
addReplyBulkLongLong(c,mstime()-ri->o_down_since_time);
|
||
|
fields++;
|
||
|
}
|
||
|
|
||
|
/* Masters and Slaves */
|
||
|
if (ri->flags & (SRI_MASTER|SRI_SLAVE)) {
|
||
|
addReplyBulkCString(c,"info-refresh");
|
||
|
addReplyBulkLongLong(c,mstime() - ri->info_refresh);
|
||
|
fields++;
|
||
|
}
|
||
|
|
||
|
/* Only masters */
|
||
|
if (ri->flags & SRI_MASTER) {
|
||
|
addReplyBulkCString(c,"num-slaves");
|
||
|
addReplyBulkLongLong(c,dictSize(ri->slaves));
|
||
|
fields++;
|
||
|
|
||
|
addReplyBulkCString(c,"num-other-sentinels");
|
||
|
addReplyBulkLongLong(c,dictSize(ri->sentinels));
|
||
|
fields++;
|
||
|
|
||
|
addReplyBulkCString(c,"quorum");
|
||
|
addReplyBulkLongLong(c,ri->quorum);
|
||
|
fields++;
|
||
|
}
|
||
|
|
||
|
/* Only slaves */
|
||
|
if (ri->flags & SRI_SLAVE) {
|
||
|
addReplyBulkCString(c,"master-link-down-time");
|
||
|
addReplyBulkLongLong(c,ri->master_link_down_time);
|
||
|
fields++;
|
||
|
|
||
|
addReplyBulkCString(c,"master-link-status");
|
||
|
addReplyBulkCString(c,
|
||
|
(ri->slave_master_link_status == SENTINEL_MASTER_LINK_STATUS_UP) ?
|
||
|
"ok" : "err");
|
||
|
fields++;
|
||
|
|
||
|
addReplyBulkCString(c,"master-host");
|
||
|
addReplyBulkCString(c,
|
||
|
ri->slave_master_host ? ri->slave_master_host : "?");
|
||
|
fields++;
|
||
|
|
||
|
addReplyBulkCString(c,"master-port");
|
||
|
addReplyBulkLongLong(c,ri->slave_master_port);
|
||
|
fields++;
|
||
|
}
|
||
|
|
||
|
/* Only sentinels */
|
||
|
if (ri->flags & SRI_SENTINEL) {
|
||
|
addReplyBulkCString(c,"last-hello-message");
|
||
|
addReplyBulkLongLong(c,mstime() - ri->last_hello_time);
|
||
|
fields++;
|
||
|
|
||
|
addReplyBulkCString(c,"can-failover-its-master");
|
||
|
addReplyBulkLongLong(c,(ri->flags & SRI_CAN_FAILOVER) != 0);
|
||
|
fields++;
|
||
|
|
||
|
if (ri->flags & SRI_MASTER_DOWN) {
|
||
|
addReplyBulkCString(c,"subjective-leader");
|
||
|
addReplyBulkCString(c,ri->leader ? ri->leader : "?");
|
||
|
fields++;
|
||
|
}
|
||
|
}
|
||
|
|
||
|
setDeferredMultiBulkLength(c,mbl,fields*2);
|
||
|
}
|
||
|
|
||
|
/* Output a number of instances contanined inside a dictionary as
|
||
|
* Redis protocol. */
|
||
|
void addReplyDictOfRedisInstances(redisClient *c, dict *instances) {
|
||
|
dictIterator *di;
|
||
|
dictEntry *de;
|
||
|
|
||
|
di = dictGetIterator(instances);
|
||
|
addReplyMultiBulkLen(c,dictSize(instances));
|
||
|
while((de = dictNext(di)) != NULL) {
|
||
|
sentinelRedisInstance *ri = dictGetVal(de);
|
||
|
|
||
|
addReplySentinelRedisInstance(c,ri);
|
||
|
}
|
||
|
dictReleaseIterator(di);
|
||
|
}
|
||
|
|
||
|
/* Lookup the named master into sentinel.masters.
|
||
|
* If the master is not found reply to the client with an error and returns
|
||
|
* NULL. */
|
||
|
sentinelRedisInstance *sentinelGetMasterByNameOrReplyError(redisClient *c,
|
||
|
robj *name)
|
||
|
{
|
||
|
sentinelRedisInstance *ri;
|
||
|
|
||
|
ri = dictFetchValue(sentinel.masters,c->argv[2]->ptr);
|
||
|
if (!ri) {
|
||
|
addReplyError(c,"No such master with that name");
|
||
|
return NULL;
|
||
|
}
|
||
|
return ri;
|
||
|
}
|
||
|
|
||
|
void sentinelCommand(redisClient *c) {
|
||
|
if (!strcasecmp(c->argv[1]->ptr,"masters")) {
|
||
|
/* SENTINEL MASTERS */
|
||
|
if (c->argc != 2) goto numargserr;
|
||
|
|
||
|
addReplyDictOfRedisInstances(c,sentinel.masters);
|
||
|
} else if (!strcasecmp(c->argv[1]->ptr,"slaves")) {
|
||
|
/* SENTINEL SLAVES <master-name> */
|
||
|
sentinelRedisInstance *ri;
|
||
|
|
||
|
if (c->argc != 3) goto numargserr;
|
||
|
if ((ri = sentinelGetMasterByNameOrReplyError(c,c->argv[2])) == NULL)
|
||
|
return;
|
||
|
addReplyDictOfRedisInstances(c,ri->slaves);
|
||
|
} else if (!strcasecmp(c->argv[1]->ptr,"sentinels")) {
|
||
|
/* SENTINEL SENTINELS <master-name> */
|
||
|
sentinelRedisInstance *ri;
|
||
|
|
||
|
if (c->argc != 3) goto numargserr;
|
||
|
if ((ri = sentinelGetMasterByNameOrReplyError(c,c->argv[2])) == NULL)
|
||
|
return;
|
||
|
addReplyDictOfRedisInstances(c,ri->sentinels);
|
||
|
} else if (!strcasecmp(c->argv[1]->ptr,"is-master-down-by-addr")) {
|
||
|
/* SENTINEL IS-MASTER-DOWN-BY-ADDR <ip> <port> */
|
||
|
sentinelRedisInstance *ri;
|
||
|
char *leader = NULL;
|
||
|
long port;
|
||
|
int isdown = 0;
|
||
|
|
||
|
if (c->argc != 4) goto numargserr;
|
||
|
if (getLongFromObjectOrReply(c,c->argv[3],&port,NULL) != REDIS_OK)
|
||
|
return;
|
||
|
ri = getSentinelRedisInstanceByAddrAndRunID(sentinel.masters,
|
||
|
c->argv[2]->ptr,port,NULL);
|
||
|
|
||
|
/* It exists? Is actually a master? Is subjectively down? It's down.
|
||
|
* Note: if we are in tilt mode we always reply with "0". */
|
||
|
if (!sentinel.tilt && ri && (ri->flags & SRI_S_DOWN) &&
|
||
|
(ri->flags & SRI_MASTER))
|
||
|
isdown = 1;
|
||
|
if (ri) leader = sentinelGetSubjectiveLeader(ri);
|
||
|
|
||
|
/* Reply with a two-elements multi-bulk reply: down state, leader. */
|
||
|
addReplyMultiBulkLen(c,2);
|
||
|
addReply(c, isdown ? shared.cone : shared.czero);
|
||
|
addReplyBulkCString(c, leader ? leader : "?");
|
||
|
if (leader) sdsfree(leader);
|
||
|
} else if (!strcasecmp(c->argv[1]->ptr,"reset")) {
|
||
|
/* SENTINEL RESET <pattern> */
|
||
|
if (c->argc != 3) goto numargserr;
|
||
|
addReplyLongLong(c,sentinelResetMastersByPattern(c->argv[2]->ptr));
|
||
|
} else if (!strcasecmp(c->argv[1]->ptr,"get-master-addr-by-name")) {
|
||
|
/* SENTINEL GET-MASTER-ADDR-BY-NAME <master-name> */
|
||
|
sentinelRedisInstance *ri;
|
||
|
|
||
|
if (c->argc != 3) goto numargserr;
|
||
|
ri = sentinelGetMasterByName(c->argv[2]->ptr);
|
||
|
if (ri == NULL) {
|
||
|
addReply(c,shared.nullmultibulk);
|
||
|
} else {
|
||
|
sentinelAddr *addr = ri->addr;
|
||
|
|
||
|
if ((ri->flags & SRI_FAILOVER_IN_PROGRESS) && ri->promoted_slave)
|
||
|
addr = ri->promoted_slave->addr;
|
||
|
addReplyMultiBulkLen(c,2);
|
||
|
addReplyBulkCString(c,addr->ip);
|
||
|
addReplyBulkLongLong(c,addr->port);
|
||
|
}
|
||
|
} else {
|
||
|
addReplyErrorFormat(c,"Unknown sentinel subcommand '%s'",
|
||
|
(char*)c->argv[1]->ptr);
|
||
|
}
|
||
|
return;
|
||
|
|
||
|
numargserr:
|
||
|
addReplyErrorFormat(c,"Wrong number of commands for 'sentinel %s'",
|
||
|
(char*)c->argv[1]->ptr);
|
||
|
}
|
||
|
|
||
|
/* ===================== SENTINEL availability checks ======================= */
|
||
|
|
||
|
/* Is this instance down from our point of view? */
|
||
|
void sentinelCheckSubjectivelyDown(sentinelRedisInstance *ri) {
|
||
|
mstime_t elapsed = mstime() - ri->last_avail_time;
|
||
|
|
||
|
/* Check if we are in need for a reconnection of one of the
|
||
|
* links, because we are detecting low activity.
|
||
|
*
|
||
|
* 1) Check if the command link seems connected, was connected not less
|
||
|
* than SENTINEL_MIN_LINK_RECONNECT_PERIOD, but still we have an
|
||
|
* idle time that is greater than down_after_period / 2 seconds. */
|
||
|
if (ri->cc &&
|
||
|
(mstime() - ri->cc_conn_time) > SENTINEL_MIN_LINK_RECONNECT_PERIOD &&
|
||
|
(mstime() - ri->last_pong_time) > (ri->down_after_period/2))
|
||
|
{
|
||
|
redisAsyncFree(ri->cc); /* will call the disconnection callback */
|
||
|
}
|
||
|
|
||
|
/* 2) Check if the pubsub link seems connected, was connected not less
|
||
|
* than SENTINEL_MIN_LINK_RECONNECT_PERIOD, but still we have no
|
||
|
* activity in the Pub/Sub channel for more than
|
||
|
* SENTINEL_PUBLISH_PERIOD * 3.
|
||
|
*/
|
||
|
if (ri->pc &&
|
||
|
(mstime() - ri->pc_conn_time) > SENTINEL_MIN_LINK_RECONNECT_PERIOD &&
|
||
|
(mstime() - ri->pc_last_activity) > (SENTINEL_PUBLISH_PERIOD*3))
|
||
|
{
|
||
|
redisAsyncFree(ri->pc); /* will call the disconnection callback */
|
||
|
}
|
||
|
|
||
|
/* Update the subjectively down flag. */
|
||
|
if (elapsed > ri->down_after_period) {
|
||
|
/* Is subjectively down */
|
||
|
if ((ri->flags & SRI_S_DOWN) == 0) {
|
||
|
sentinelEvent(REDIS_WARNING,"+sdown",ri,"%@");
|
||
|
ri->s_down_since_time = mstime();
|
||
|
ri->flags |= SRI_S_DOWN;
|
||
|
}
|
||
|
} else {
|
||
|
/* Is subjectively up */
|
||
|
if (ri->flags & SRI_S_DOWN) {
|
||
|
sentinelEvent(REDIS_WARNING,"-sdown",ri,"%@");
|
||
|
ri->flags &= ~SRI_S_DOWN;
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
/* Is this instance down accordingly to the configured quorum? */
|
||
|
void sentinelCheckObjectivelyDown(sentinelRedisInstance *master) {
|
||
|
dictIterator *di;
|
||
|
dictEntry *de;
|
||
|
int quorum = 0, odown = 0;
|
||
|
|
||
|
if (master->flags & SRI_S_DOWN) {
|
||
|
/* Is down for enough sentinels? */
|
||
|
quorum = 1; /* the current sentinel. */
|
||
|
/* Count all the other sentinels. */
|
||
|
di = dictGetIterator(master->sentinels);
|
||
|
while((de = dictNext(di)) != NULL) {
|
||
|
sentinelRedisInstance *ri = dictGetVal(de);
|
||
|
|
||
|
if (ri->flags & SRI_MASTER_DOWN) quorum++;
|
||
|
}
|
||
|
dictReleaseIterator(di);
|
||
|
if (quorum >= master->quorum) odown = 1;
|
||
|
}
|
||
|
|
||
|
/* Set the flag accordingly to the outcome. */
|
||
|
if (odown) {
|
||
|
if ((master->flags & SRI_O_DOWN) == 0) {
|
||
|
sentinelEvent(REDIS_WARNING,"+odown",master,"%@ #quorum %d/%d",
|
||
|
quorum, master->quorum);
|
||
|
master->flags |= SRI_O_DOWN;
|
||
|
master->o_down_since_time = mstime();
|
||
|
}
|
||
|
} else {
|
||
|
if (master->flags & SRI_O_DOWN) {
|
||
|
sentinelEvent(REDIS_WARNING,"-odown",master,"%@");
|
||
|
master->flags &= ~SRI_O_DOWN;
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
/* Receive the SENTINEL is-master-down-by-addr reply, see the
|
||
|
* sentinelAskMasterStateToOtherSentinels() function for more information. */
|
||
|
void sentinelReceiveIsMasterDownReply(redisAsyncContext *c, void *reply, void *privdata) {
|
||
|
sentinelRedisInstance *ri = c->data;
|
||
|
redisReply *r;
|
||
|
|
||
|
ri->pending_commands--;
|
||
|
if (!reply) return;
|
||
|
r = reply;
|
||
|
|
||
|
/* Ignore every error or unexpected reply.
|
||
|
* Note that if the command returns an error for any reason we'll
|
||
|
* end clearing the SRI_MASTER_DOWN flag for timeout anyway. */
|
||
|
if (r->type == REDIS_REPLY_ARRAY && r->elements == 2 &&
|
||
|
r->element[0]->type == REDIS_REPLY_INTEGER &&
|
||
|
r->element[1]->type == REDIS_REPLY_STRING)
|
||
|
{
|
||
|
ri->last_master_down_reply_time = mstime();
|
||
|
if (r->element[0]->integer == 1) {
|
||
|
ri->flags |= SRI_MASTER_DOWN;
|
||
|
} else {
|
||
|
ri->flags &= ~SRI_MASTER_DOWN;
|
||
|
}
|
||
|
sdsfree(ri->leader);
|
||
|
ri->leader = sdsnew(r->element[1]->str);
|
||
|
}
|
||
|
}
|
||
|
|
||
|
/* If we think (subjectively) the master is down, we start sending
|
||
|
* SENTINEL IS-MASTER-DOWN-BY-ADDR requests to other sentinels
|
||
|
* in order to get the replies that allow to reach the quorum and
|
||
|
* possibly also mark the master as objectively down. */
|
||
|
void sentinelAskMasterStateToOtherSentinels(sentinelRedisInstance *master) {
|
||
|
dictIterator *di;
|
||
|
dictEntry *de;
|
||
|
|
||
|
di = dictGetIterator(master->sentinels);
|
||
|
while((de = dictNext(di)) != NULL) {
|
||
|
sentinelRedisInstance *ri = dictGetVal(de);
|
||
|
mstime_t elapsed = mstime() - ri->last_master_down_reply_time;
|
||
|
char port[32];
|
||
|
int retval;
|
||
|
|
||
|
/* If the master state from other sentinel is too old, we clear it. */
|
||
|
if (elapsed > SENTINEL_INFO_VALIDITY_TIME) {
|
||
|
ri->flags &= ~SRI_MASTER_DOWN;
|
||
|
sdsfree(ri->leader);
|
||
|
ri->leader = NULL;
|
||
|
}
|
||
|
|
||
|
/* Only ask if master is down to other sentinels if:
|
||
|
*
|
||
|
* 1) We believe it is down, or there is a failover in progress.
|
||
|
* 2) Sentinel is connected.
|
||
|
* 3) We did not received the info within SENTINEL_ASK_PERIOD ms. */
|
||
|
if ((master->flags & (SRI_S_DOWN|SRI_FAILOVER_IN_PROGRESS)) == 0)
|
||
|
continue;
|
||
|
if (ri->flags & SRI_DISCONNECTED) continue;
|
||
|
if (mstime() - ri->last_master_down_reply_time < SENTINEL_ASK_PERIOD)
|
||
|
continue;
|
||
|
|
||
|
/* Ask */
|
||
|
ll2string(port,sizeof(port),master->addr->port);
|
||
|
retval = redisAsyncCommand(ri->cc,
|
||
|
sentinelReceiveIsMasterDownReply, NULL,
|
||
|
"SENTINEL is-master-down-by-addr %s %s",
|
||
|
master->addr->ip, port);
|
||
|
if (retval == REDIS_OK) ri->pending_commands++;
|
||
|
}
|
||
|
dictReleaseIterator(di);
|
||
|
}
|
||
|
|
||
|
/* =============================== FAILOVER ================================= */
|
||
|
|
||
|
/* Given a master get the "subjective leader", that is, among all the sentinels
|
||
|
* with given characteristics, the one with the lexicographically smaller
|
||
|
* runid. The characteristics required are:
|
||
|
*
|
||
|
* 1) Has SRI_CAN_FAILOVER flag.
|
||
|
* 2) Is not disconnected.
|
||
|
* 3) Recently answered to our ping (no longer than
|
||
|
* SENTINEL_INFO_VALIDITY_TIME milliseconds ago).
|
||
|
*
|
||
|
* The function returns a pointer to an sds string representing the runid of the
|
||
|
* leader sentinel instance (from our point of view). Otherwise NULL is
|
||
|
* returned if there are no suitable sentinels.
|
||
|
*/
|
||
|
|
||
|
int compareRunID(const void *a, const void *b) {
|
||
|
char **aptrptr = (char**)a, **bptrptr = (char**)b;
|
||
|
return strcasecmp(*aptrptr, *bptrptr);
|
||
|
}
|
||
|
|
||
|
char *sentinelGetSubjectiveLeader(sentinelRedisInstance *master) {
|
||
|
dictIterator *di;
|
||
|
dictEntry *de;
|
||
|
char **instance =
|
||
|
zmalloc(sizeof(char*)*(dictSize(master->sentinels)+1));
|
||
|
int instances = 0;
|
||
|
char *leader = NULL;
|
||
|
|
||
|
if (master->flags & SRI_CAN_FAILOVER) {
|
||
|
/* Add myself if I'm a Sentinel that can failover this master. */
|
||
|
instance[instances++] = server.runid;
|
||
|
}
|
||
|
|
||
|
di = dictGetIterator(master->sentinels);
|
||
|
while((de = dictNext(di)) != NULL) {
|
||
|
sentinelRedisInstance *ri = dictGetVal(de);
|
||
|
mstime_t lag = mstime() - ri->last_avail_time;
|
||
|
|
||
|
if (lag > SENTINEL_INFO_VALIDITY_TIME ||
|
||
|
!(ri->flags & SRI_CAN_FAILOVER) ||
|
||
|
(ri->flags & SRI_DISCONNECTED) ||
|
||
|
ri->runid == NULL)
|
||
|
continue;
|
||
|
instance[instances++] = ri->runid;
|
||
|
}
|
||
|
dictReleaseIterator(di);
|
||
|
|
||
|
/* If we have at least one instance passing our checks, order the array
|
||
|
* by runid. */
|
||
|
if (instances) {
|
||
|
qsort(instance,instances,sizeof(char*),compareRunID);
|
||
|
leader = sdsnew(instance[0]);
|
||
|
}
|
||
|
zfree(instance);
|
||
|
return leader;
|
||
|
}
|
||
|
|
||
|
struct sentinelLeader {
|
||
|
char *runid;
|
||
|
unsigned long votes;
|
||
|
};
|
||
|
|
||
|
/* Helper function for sentinelGetObjectiveLeader, increment the counter
|
||
|
* relative to the specified runid. */
|
||
|
void sentinelObjectiveLeaderIncr(dict *counters, char *runid) {
|
||
|
dictEntry *de = dictFind(counters,runid);
|
||
|
uint64_t oldval;
|
||
|
|
||
|
if (de) {
|
||
|
oldval = dictGetUnsignedIntegerVal(de);
|
||
|
dictSetUnsignedIntegerVal(de,oldval+1);
|
||
|
} else {
|
||
|
de = dictAddRaw(counters,runid);
|
||
|
redisAssert(de != NULL);
|
||
|
dictSetUnsignedIntegerVal(de,1);
|
||
|
}
|
||
|
}
|
||
|
|
||
|
/* Scan all the Sentinels attached to this master to check what is the
|
||
|
* most voted leader among Sentinels. */
|
||
|
char *sentinelGetObjectiveLeader(sentinelRedisInstance *master) {
|
||
|
dict *counters;
|
||
|
dictIterator *di;
|
||
|
dictEntry *de;
|
||
|
unsigned int voters = 0, voters_quorum;
|
||
|
char *myvote;
|
||
|
char *winner = NULL;
|
||
|
|
||
|
redisAssert(master->flags & (SRI_O_DOWN|SRI_FAILOVER_IN_PROGRESS));
|
||
|
counters = dictCreate(&leaderVotesDictType,NULL);
|
||
|
|
||
|
/* Count my vote. */
|
||
|
myvote = sentinelGetSubjectiveLeader(master);
|
||
|
if (myvote) {
|
||
|
sentinelObjectiveLeaderIncr(counters,myvote);
|
||
|
voters++;
|
||
|
}
|
||
|
|
||
|
/* Count other sentinels votes */
|
||
|
di = dictGetIterator(master->sentinels);
|
||
|
while((de = dictNext(di)) != NULL) {
|
||
|
sentinelRedisInstance *ri = dictGetVal(de);
|
||
|
if (ri->leader == NULL) continue;
|
||
|
/* If the failover is not already in progress we are only interested
|
||
|
* in Sentinels that believe the master is down. Otherwise the leader
|
||
|
* selection is useful for the "failover-takedown" when the original
|
||
|
* leader fails. In that case we consider all the voters. */
|
||
|
if (!(master->flags & SRI_FAILOVER_IN_PROGRESS) &&
|
||
|
!(ri->flags & SRI_MASTER_DOWN)) continue;
|
||
|
sentinelObjectiveLeaderIncr(counters,ri->leader);
|
||
|
voters++;
|
||
|
}
|
||
|
dictReleaseIterator(di);
|
||
|
voters_quorum = voters/2+1;
|
||
|
|
||
|
/* Check what's the winner. For the winner to win, it needs two conditions:
|
||
|
* 1) Absolute majority between voters (50% + 1).
|
||
|
* 2) And anyway at least master->quorum votes. */
|
||
|
{
|
||
|
uint64_t max_votes = 0; /* Max votes so far. */
|
||
|
|
||
|
di = dictGetIterator(counters);
|
||
|
while((de = dictNext(di)) != NULL) {
|
||
|
uint64_t votes = dictGetUnsignedIntegerVal(de);
|
||
|
|
||
|
if (max_votes < votes) {
|
||
|
max_votes = votes;
|
||
|
winner = dictGetKey(de);
|
||
|
}
|
||
|
}
|
||
|
dictReleaseIterator(di);
|
||
|
if (winner && (max_votes < voters_quorum || max_votes < master->quorum))
|
||
|
winner = NULL;
|
||
|
}
|
||
|
winner = winner ? sdsnew(winner) : NULL;
|
||
|
sdsfree(myvote);
|
||
|
dictRelease(counters);
|
||
|
return winner;
|
||
|
}
|
||
|
|
||
|
/* This function checks if there are the conditions to start the failover,
|
||
|
* that is:
|
||
|
*
|
||
|
* 1) Enough time has passed since O_DOWN.
|
||
|
* 2) The master is marked as SRI_CAN_FAILOVER, so we can failover it.
|
||
|
* 3) We are the objectively leader for this master.
|
||
|
*
|
||
|
* If the conditions are met we flag the master as SRI_FAILOVER_IN_PROGRESS
|
||
|
* and SRI_I_AM_THE_LEADER.
|
||
|
*/
|
||
|
void sentinelStartFailover(sentinelRedisInstance *master) {
|
||
|
char *leader;
|
||
|
int isleader;
|
||
|
|
||
|
/* We can't failover if the master is not in O_DOWN state or if
|
||
|
* there is not already a failover in progress (to perform the
|
||
|
* takedown if the leader died) or if this Sentinel is not allowed
|
||
|
* to start a failover. */
|
||
|
if (!(master->flags & SRI_CAN_FAILOVER) ||
|
||
|
!(master->flags & (SRI_O_DOWN|SRI_FAILOVER_IN_PROGRESS))) return;
|
||
|
|
||
|
leader = sentinelGetObjectiveLeader(master);
|
||
|
isleader = leader && strcasecmp(leader,server.runid) == 0;
|
||
|
sdsfree(leader);
|
||
|
|
||
|
/* If I'm not the leader, I can't failover for sure. */
|
||
|
if (!isleader) return;
|
||
|
|
||
|
/* If the failover is already in progress there are two options... */
|
||
|
if (master->flags & SRI_FAILOVER_IN_PROGRESS) {
|
||
|
if (master->flags & SRI_I_AM_THE_LEADER) {
|
||
|
/* 1) I'm flagged as leader so I already started the failover.
|
||
|
* Just return. */
|
||
|
return;
|
||
|
} else {
|
||
|
mstime_t elapsed = mstime() - master->failover_state_change_time;
|
||
|
|
||
|
/* 2) I'm the new leader, but I'm not flagged as leader in the
|
||
|
* master: I did not started the failover, but the original
|
||
|
* leader has no longer the leadership.
|
||
|
*
|
||
|
* In this case if the failover appears to be lagging
|
||
|
* for at least 25% of the configured failover timeout,
|
||
|
* I can assume I can take control. Otherwise
|
||
|
* it's better to return and wait more. */
|
||
|
if (elapsed < (master->failover_timeout/4)) return;
|
||
|
sentinelEvent(REDIS_WARNING,"+failover-takedown",master,"%@");
|
||
|
/* We have already an elected slave if we are in
|
||
|
* FAILOVER_IN_PROGRESS state, that is, the slave that we
|
||
|
* observed turning into a master. */
|
||
|
master->failover_state = SENTINEL_FAILOVER_STATE_RECONF_SLAVES;
|
||
|
/* As an observer we flagged all the slaves as RECONF_SENT but
|
||
|
* now we are in charge of actually sending the reconfiguration
|
||
|
* command so let's clear this flag for all the instances. */
|
||
|
sentinelDelFlagsToDictOfRedisInstances(master->slaves,
|
||
|
SRI_RECONF_SENT);
|
||
|
}
|
||
|
} else {
|
||
|
/* Brand new failover as SRI_FAILOVER_IN_PROGRESS was not set. */
|
||
|
master->failover_state = SENTINEL_FAILOVER_STATE_WAIT_START;
|
||
|
}
|
||
|
|
||
|
master->flags |= SRI_FAILOVER_IN_PROGRESS|SRI_I_AM_THE_LEADER;
|
||
|
sentinelEvent(REDIS_WARNING,"+failover-triggered",master,"%@");
|
||
|
|
||
|
/* Pick a random delay if it's a fresh failover (WAIT_START), and not
|
||
|
* a recovery of a failover started by another sentinel. */
|
||
|
if (master->failover_state == SENTINEL_FAILOVER_STATE_WAIT_START) {
|
||
|
master->failover_start_time = mstime() +
|
||
|
SENTINEL_FAILOVER_FIXED_DELAY +
|
||
|
(rand() % SENTINEL_FAILOVER_MAX_RANDOM_DELAY);
|
||
|
sentinelEvent(REDIS_WARNING,"+failover-state-wait-start",master,
|
||
|
"%@ #starting in %lld milliseconds",
|
||
|
master->failover_start_time-mstime());
|
||
|
}
|
||
|
master->failover_state_change_time = mstime();
|
||
|
}
|
||
|
|
||
|
/* Select a suitable slave to promote. The current algorithm only uses
|
||
|
* the following parameters:
|
||
|
*
|
||
|
* 1) None of the following conditions: S_DOWN, O_DOWN, DISCONNECTED.
|
||
|
* 2) last_avail_time more recent than SENTINEL_INFO_VALIDITY_TIME.
|
||
|
* 3) info_refresh more recent than SENTINEL_INFO_VALIDITY_TIME.
|
||
|
* 4) master_link_down_time no more than:
|
||
|
* (now - master->s_down_since_time) + (master->down_after_period * 10).
|
||
|
*
|
||
|
* Among all the slaves matching the above conditions we select the slave
|
||
|
* with lower slave_priority. If priority is the same we select the slave
|
||
|
* with lexicographically smaller runid.
|
||
|
*
|
||
|
* The function returns the pointer to the selected slave, otherwise
|
||
|
* NULL if no suitable slave was found.
|
||
|
*/
|
||
|
|
||
|
int compareSlavesForPromotion(const void *a, const void *b) {
|
||
|
sentinelRedisInstance **sa = (sentinelRedisInstance **)a,
|
||
|
**sb = (sentinelRedisInstance **)b;
|
||
|
if ((*sa)->slave_priority != (*sb)->slave_priority)
|
||
|
return (*sa)->slave_priority - (*sb)->slave_priority;
|
||
|
return strcasecmp((*sa)->runid,(*sb)->runid);
|
||
|
}
|
||
|
|
||
|
sentinelRedisInstance *sentinelSelectSlave(sentinelRedisInstance *master) {
|
||
|
sentinelRedisInstance **instance =
|
||
|
zmalloc(sizeof(instance[0])*dictSize(master->slaves));
|
||
|
sentinelRedisInstance *selected = NULL;
|
||
|
int instances = 0;
|
||
|
dictIterator *di;
|
||
|
dictEntry *de;
|
||
|
mstime_t max_master_down_time;
|
||
|
|
||
|
max_master_down_time = (mstime() - master->s_down_since_time) +
|
||
|
(master->down_after_period * 10);
|
||
|
|
||
|
di = dictGetIterator(master->slaves);
|
||
|
while((de = dictNext(di)) != NULL) {
|
||
|
sentinelRedisInstance *slave = dictGetVal(de);
|
||
|
mstime_t info_validity_time = mstime()-SENTINEL_INFO_VALIDITY_TIME;
|
||
|
|
||
|
if (slave->flags & (SRI_S_DOWN|SRI_O_DOWN|SRI_DISCONNECTED)) continue;
|
||
|
if (slave->last_avail_time < info_validity_time) continue;
|
||
|
if (slave->info_refresh < info_validity_time) continue;
|
||
|
if (slave->master_link_down_time > max_master_down_time) continue;
|
||
|
instance[instances++] = slave;
|
||
|
}
|
||
|
dictReleaseIterator(di);
|
||
|
if (instances) {
|
||
|
qsort(instance,instances,sizeof(sentinelRedisInstance*),
|
||
|
compareSlavesForPromotion);
|
||
|
selected = instance[0];
|
||
|
}
|
||
|
zfree(instance);
|
||
|
return selected;
|
||
|
}
|
||
|
|
||
|
/* ---------------- Failover state machine implementation ------------------- */
|
||
|
void sentinelFailoverWaitStart(sentinelRedisInstance *ri) {
|
||
|
if (mstime() >= ri->failover_start_time) {
|
||
|
ri->failover_state = SENTINEL_FAILOVER_STATE_SELECT_SLAVE;
|
||
|
ri->failover_state_change_time = mstime();
|
||
|
sentinelEvent(REDIS_WARNING,"+failover-state-select-slave",ri,"%@");
|
||
|
}
|
||
|
}
|
||
|
|
||
|
void sentinelFailoverSelectSlave(sentinelRedisInstance *ri) {
|
||
|
sentinelRedisInstance *slave = sentinelSelectSlave(ri);
|
||
|
|
||
|
if (slave == NULL) {
|
||
|
sentinelEvent(REDIS_WARNING,"-no-good-slave",ri,
|
||
|
"%@ #retrying in %d seconds",
|
||
|
(SENTINEL_FAILOVER_FIXED_DELAY+
|
||
|
SENTINEL_FAILOVER_MAX_RANDOM_DELAY)/1000);
|
||
|
ri->failover_state = SENTINEL_FAILOVER_STATE_WAIT_START;
|
||
|
ri->failover_start_time = mstime() + SENTINEL_FAILOVER_FIXED_DELAY +
|
||
|
SENTINEL_FAILOVER_MAX_RANDOM_DELAY;
|
||
|
} else {
|
||
|
sentinelEvent(REDIS_WARNING,"+selected-slave",slave,"%@");
|
||
|
slave->flags |= SRI_PROMOTED;
|
||
|
ri->promoted_slave = slave;
|
||
|
ri->failover_state = SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE;
|
||
|
ri->failover_state_change_time = mstime();
|
||
|
sentinelEvent(REDIS_NOTICE,"+failover-state-send-slaveof-noone",
|
||
|
slave, "%@");
|
||
|
}
|
||
|
}
|
||
|
|
||
|
void sentinelFailoverSendSlaveOfNoOne(sentinelRedisInstance *ri) {
|
||
|
int retval;
|
||
|
|
||
|
if (ri->promoted_slave->flags & SRI_DISCONNECTED) return;
|
||
|
|
||
|
/* Send SLAVEOF NO ONE command to turn the slave into a master.
|
||
|
* We actually register a generic callback for this command as we don't
|
||
|
* really care about the reply. We check if it worked indirectly observing
|
||
|
* if INFO returns a different role (master instead of slave). */
|
||
|
retval = redisAsyncCommand(ri->promoted_slave->cc,
|
||
|
sentinelDiscardReplyCallback, NULL, "SLAVEOF NO ONE");
|
||
|
if (retval != REDIS_OK) return;
|
||
|
ri->promoted_slave->pending_commands++;
|
||
|
sentinelEvent(REDIS_NOTICE, "+failover-state-wait-promotion",
|
||
|
ri->promoted_slave,"%@");
|
||
|
ri->failover_state = SENTINEL_FAILOVER_STATE_WAIT_PROMOTION;
|
||
|
ri->failover_state_change_time = mstime();
|
||
|
}
|
||
|
|
||
|
/* We actually wait for promotion indirectly checking with INFO when the
|
||
|
* slave turns into a master. */
|
||
|
void sentinelFailoverWaitPromotion(sentinelRedisInstance *ri) {
|
||
|
mstime_t elapsed = mstime() - ri->failover_state_change_time;
|
||
|
|
||
|
if (elapsed >= SENTINEL_PROMOTION_RETRY_PERIOD) {
|
||
|
sentinelEvent(REDIS_WARNING,"-promotion-timeout",ri->promoted_slave,
|
||
|
"%@");
|
||
|
sentinelEvent(REDIS_WARNING,"+failover-state-select-slave",ri,"%@");
|
||
|
ri->failover_state = SENTINEL_FAILOVER_STATE_SELECT_SLAVE;
|
||
|
ri->failover_state_change_time = mstime();
|
||
|
ri->promoted_slave->flags &= ~SRI_PROMOTED;
|
||
|
ri->promoted_slave = NULL;
|
||
|
}
|
||
|
}
|
||
|
|
||
|
void sentinelFailoverDetectEnd(sentinelRedisInstance *master) {
|
||
|
int not_reconfigured = 0, timeout = 0;
|
||
|
dictIterator *di;
|
||
|
dictEntry *de;
|
||
|
mstime_t elapsed = mstime() - master->failover_state_change_time;
|
||
|
|
||
|
/* We can't consider failover finished if the promoted slave is
|
||
|
* not reachable. */
|
||
|
if (master->promoted_slave == NULL ||
|
||
|
master->promoted_slave->flags & SRI_S_DOWN) return;
|
||
|
|
||
|
/* The failover terminates once all the reachable slaves are properly
|
||
|
* configured. */
|
||
|
di = dictGetIterator(master->slaves);
|
||
|
while((de = dictNext(di)) != NULL) {
|
||
|
sentinelRedisInstance *slave = dictGetVal(de);
|
||
|
|
||
|
if (slave->flags & (SRI_PROMOTED|SRI_RECONF_DONE)) continue;
|
||
|
if (slave->flags & SRI_S_DOWN) continue;
|
||
|
not_reconfigured++;
|
||
|
}
|
||
|
dictReleaseIterator(di);
|
||
|
|
||
|
/* Force end of failover on timeout. */
|
||
|
if (elapsed > master->failover_timeout) {
|
||
|
not_reconfigured = 0;
|
||
|
timeout = 1;
|
||
|
sentinelEvent(REDIS_WARNING,"+failover-end-for-timeout",master,"%@");
|
||
|
}
|
||
|
|
||
|
if (not_reconfigured == 0) {
|
||
|
sentinelEvent(REDIS_WARNING,"+failover-end",master,"%@");
|
||
|
master->failover_state = SENTINEL_FAILOVER_STATE_UPDATE_CONFIG;
|
||
|
master->failover_state_change_time = mstime();
|
||
|
}
|
||
|
|
||
|
/* If I'm the leader it is a good idea to send a best effort SLAVEOF
|
||
|
* command to all the slaves still not reconfigured to replicate with
|
||
|
* the new master. */
|
||
|
if (timeout && (master->flags & SRI_I_AM_THE_LEADER)) {
|
||
|
dictIterator *di;
|
||
|
dictEntry *de;
|
||
|
char master_port[32];
|
||
|
|
||
|
ll2string(master_port,sizeof(master_port),
|
||
|
master->promoted_slave->addr->port);
|
||
|
|
||
|
di = dictGetIterator(master->slaves);
|
||
|
while((de = dictNext(di)) != NULL) {
|
||
|
sentinelRedisInstance *slave = dictGetVal(de);
|
||
|
int retval;
|
||
|
|
||
|
if (slave->flags &
|
||
|
(SRI_RECONF_DONE|SRI_RECONF_SENT|SRI_DISCONNECTED)) continue;
|
||
|
|
||
|
retval = redisAsyncCommand(slave->cc,
|
||
|
sentinelDiscardReplyCallback, NULL, "SLAVEOF %s %s",
|
||
|
master->promoted_slave->addr->ip,
|
||
|
master_port);
|
||
|
if (retval == REDIS_OK) {
|
||
|
sentinelEvent(REDIS_NOTICE,"+slave-reconf-sent-be",slave,"%@");
|
||
|
slave->flags |= SRI_RECONF_SENT;
|
||
|
}
|
||
|
}
|
||
|
dictReleaseIterator(di);
|
||
|
}
|
||
|
}
|
||
|
|
||
|
/* Send SLAVE OF <new master address> to all the remaining slaves that
|
||
|
* still don't appear to have the configuration updated. */
|
||
|
void sentinelFailoverReconfNextSlave(sentinelRedisInstance *master) {
|
||
|
dictIterator *di;
|
||
|
dictEntry *de;
|
||
|
int in_progress = 0;
|
||
|
|
||
|
di = dictGetIterator(master->slaves);
|
||
|
while((de = dictNext(di)) != NULL) {
|
||
|
sentinelRedisInstance *slave = dictGetVal(de);
|
||
|
|
||
|
if (slave->flags & (SRI_RECONF_SENT|SRI_RECONF_INPROG))
|
||
|
in_progress++;
|
||
|
}
|
||
|
dictReleaseIterator(di);
|
||
|
|
||
|
di = dictGetIterator(master->slaves);
|
||
|
while(in_progress < master->parallel_syncs &&
|
||
|
(de = dictNext(di)) != NULL)
|
||
|
{
|
||
|
sentinelRedisInstance *slave = dictGetVal(de);
|
||
|
int retval;
|
||
|
char master_port[32];
|
||
|
|
||
|
/* Skip the promoted slave, and already configured slaves. */
|
||
|
if (slave->flags & (SRI_PROMOTED|SRI_RECONF_DONE)) continue;
|
||
|
|
||
|
/* Clear the SRI_RECONF_SENT flag if too much time elapsed without
|
||
|
* the slave moving forward to the next state. */
|
||
|
if ((slave->flags & SRI_RECONF_SENT) &&
|
||
|
(mstime() - slave->slave_reconf_sent_time) >
|
||
|
SENTINEL_SLAVE_RECONF_RETRY_PERIOD)
|
||
|
{
|
||
|
sentinelEvent(REDIS_NOTICE,"-slave-reconf-sent-timeout",slave,"%@");
|
||
|
slave->flags &= ~SRI_RECONF_SENT;
|
||
|
}
|
||
|
|
||
|
/* Nothing to do for instances that are disconnected or already
|
||
|
* in RECONF_SENT state. */
|
||
|
if (slave->flags & (SRI_DISCONNECTED|SRI_RECONF_SENT|SRI_RECONF_INPROG))
|
||
|
continue;
|
||
|
|
||
|
/* Send SLAVEOF <new master>. */
|
||
|
ll2string(master_port,sizeof(master_port),
|
||
|
master->promoted_slave->addr->port);
|
||
|
retval = redisAsyncCommand(slave->cc,
|
||
|
sentinelDiscardReplyCallback, NULL, "SLAVEOF %s %s",
|
||
|
master->promoted_slave->addr->ip,
|
||
|
master_port);
|
||
|
if (retval == REDIS_OK) {
|
||
|
slave->flags |= SRI_RECONF_SENT;
|
||
|
slave->pending_commands++;
|
||
|
slave->slave_reconf_sent_time = mstime();
|
||
|
sentinelEvent(REDIS_NOTICE,"+slave-reconf-sent",slave,"%@");
|
||
|
in_progress++;
|
||
|
}
|
||
|
}
|
||
|
dictReleaseIterator(di);
|
||
|
sentinelFailoverDetectEnd(master);
|
||
|
}
|
||
|
|
||
|
/* This function is called when the slave is in
|
||
|
* SENTINEL_FAILOVER_STATE_UPDATE_CONFIG state. In this state we need
|
||
|
* to remove it from the master table and add the promoted slave instead.
|
||
|
*
|
||
|
* If there are no promoted slaves as this instance is unique, we remove
|
||
|
* and re-add it with the same address to trigger a complete state
|
||
|
* refresh. */
|
||
|
void sentinelFailoverSwitchToPromotedSlave(sentinelRedisInstance *master) {
|
||
|
sentinelRedisInstance *new, *ref = master->promoted_slave ?
|
||
|
master->promoted_slave : master;
|
||
|
int quorum = ref->quorum, parallel_syncs = ref->parallel_syncs;
|
||
|
char *name = sdsnew(master->name);
|
||
|
char *ip = sdsnew(ref->addr->ip), *oldip = sdsnew(master->addr->ip);
|
||
|
int port = ref->addr->port, oldport = master->addr->port;
|
||
|
int retval, oldflags = master->flags;
|
||
|
mstime_t old_down_after_period = master->down_after_period;
|
||
|
mstime_t old_failover_timeout = master->failover_timeout;
|
||
|
|
||
|
retval = dictDelete(sentinel.masters,master->name);
|
||
|
redisAssert(retval == DICT_OK);
|
||
|
new = createSentinelRedisInstance(name,SRI_MASTER,ip,port,quorum,NULL);
|
||
|
redisAssert(new != NULL);
|
||
|
new->parallel_syncs = parallel_syncs;
|
||
|
new->flags |= (oldflags & SRI_CAN_FAILOVER);
|
||
|
new->down_after_period = old_down_after_period;
|
||
|
new->failover_timeout = old_failover_timeout;
|
||
|
/* TODO: ... set the scripts as well. */
|
||
|
sentinelEvent(REDIS_WARNING,"+switch-master",new,"%s %s %d %s %d",
|
||
|
name, oldip, oldport, ip, port);
|
||
|
sdsfree(name);
|
||
|
sdsfree(ip);
|
||
|
sdsfree(oldip);
|
||
|
}
|
||
|
|
||
|
void sentinelFailoverStateMachine(sentinelRedisInstance *ri) {
|
||
|
redisAssert(ri->flags & SRI_MASTER);
|
||
|
|
||
|
if (!(ri->flags & SRI_FAILOVER_IN_PROGRESS)) return;
|
||
|
|
||
|
switch(ri->failover_state) {
|
||
|
case SENTINEL_FAILOVER_STATE_WAIT_START:
|
||
|
sentinelFailoverWaitStart(ri);
|
||
|
break;
|
||
|
case SENTINEL_FAILOVER_STATE_SELECT_SLAVE:
|
||
|
sentinelFailoverSelectSlave(ri);
|
||
|
break;
|
||
|
case SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE:
|
||
|
sentinelFailoverSendSlaveOfNoOne(ri);
|
||
|
break;
|
||
|
case SENTINEL_FAILOVER_STATE_WAIT_PROMOTION:
|
||
|
sentinelFailoverWaitPromotion(ri);
|
||
|
break;
|
||
|
case SENTINEL_FAILOVER_STATE_RECONF_SLAVES:
|
||
|
sentinelFailoverReconfNextSlave(ri);
|
||
|
break;
|
||
|
case SENTINEL_FAILOVER_STATE_DETECT_END:
|
||
|
sentinelFailoverDetectEnd(ri);
|
||
|
break;
|
||
|
}
|
||
|
}
|
||
|
|
||
|
/* The following is called only for master instances and will abort the
|
||
|
* failover process if:
|
||
|
*
|
||
|
* 1) The failover is in progress.
|
||
|
* 2) We already promoted a slave.
|
||
|
* 3) The promoted slave is in extended SDOWN condition.
|
||
|
*/
|
||
|
void sentinelAbortFailoverIfNeeded(sentinelRedisInstance *ri) {
|
||
|
dictIterator *di;
|
||
|
dictEntry *de;
|
||
|
|
||
|
/* Failover is in progress? Do we have a promoted slave? */
|
||
|
if (!(ri->flags & SRI_FAILOVER_IN_PROGRESS) || !ri->promoted_slave) return;
|
||
|
|
||
|
/* Is the promoted slave into an extended SDOWN state? */
|
||
|
if (!(ri->promoted_slave->flags & SRI_S_DOWN) ||
|
||
|
(mstime() - ri->promoted_slave->s_down_since_time) <
|
||
|
(ri->down_after_period * SENTINEL_EXTENDED_SDOWN_MULTIPLIER)) return;
|
||
|
|
||
|
sentinelEvent(REDIS_WARNING,"-failover-abort-x-sdown",ri->promoted_slave,"%@");
|
||
|
|
||
|
/* Clear failover related flags from slaves.
|
||
|
* Also if we are the leader make sure to send SLAVEOF commands to all the
|
||
|
* already reconfigured slaves in order to turn them back into slaves of
|
||
|
* the original master. */
|
||
|
|
||
|
di = dictGetIterator(ri->slaves);
|
||
|
while((de = dictNext(di)) != NULL) {
|
||
|
sentinelRedisInstance *slave = dictGetVal(de);
|
||
|
if (ri->flags & SRI_I_AM_THE_LEADER) {
|
||
|
char master_port[32];
|
||
|
int retval;
|
||
|
|
||
|
ll2string(master_port,sizeof(master_port),ri->addr->port);
|
||
|
retval = redisAsyncCommand(slave->cc,
|
||
|
sentinelDiscardReplyCallback, NULL, "SLAVEOF %s %s",
|
||
|
ri->addr->ip,
|
||
|
master_port);
|
||
|
if (retval == REDIS_OK)
|
||
|
sentinelEvent(REDIS_NOTICE,"-slave-reconf-undo",slave,"%@");
|
||
|
}
|
||
|
slave->flags &= ~(SRI_RECONF_SENT|SRI_RECONF_INPROG|SRI_RECONF_DONE);
|
||
|
}
|
||
|
dictReleaseIterator(di);
|
||
|
|
||
|
ri->flags &= ~(SRI_FAILOVER_IN_PROGRESS|SRI_I_AM_THE_LEADER);
|
||
|
ri->failover_state = SENTINEL_FAILOVER_STATE_NONE;
|
||
|
ri->failover_state_change_time = mstime();
|
||
|
ri->promoted_slave->flags &= ~SRI_PROMOTED;
|
||
|
ri->promoted_slave = NULL;
|
||
|
}
|
||
|
|
||
|
/* ======================== SENTINEL timer handler ==========================
|
||
|
* This is the "main" our Sentinel, being sentinel completely non blocking
|
||
|
* in design. The function is called every second.
|
||
|
* -------------------------------------------------------------------------- */
|
||
|
|
||
|
/* Perform scheduled operations for the specified Redis instance. */
|
||
|
void sentinelHandleRedisInstance(sentinelRedisInstance *ri) {
|
||
|
/* ========== MONITORING HALF ============ */
|
||
|
/* Every kind of instance */
|
||
|
sentinelReconnectInstance(ri);
|
||
|
sentinelPingInstance(ri);
|
||
|
|
||
|
/* Masters and slaves */
|
||
|
if (ri->flags & (SRI_MASTER|SRI_SLAVE)) {
|
||
|
/* Nothing so far. */
|
||
|
}
|
||
|
|
||
|
/* Only masters */
|
||
|
if (ri->flags & SRI_MASTER) {
|
||
|
sentinelAskMasterStateToOtherSentinels(ri);
|
||
|
}
|
||
|
|
||
|
/* ============== ACTING HALF ============= */
|
||
|
/* We don't proceed with the acting half if we are in TILT mode.
|
||
|
* TILT happens when we find something odd with the time, like a
|
||
|
* sudden change in the clock. */
|
||
|
if (sentinel.tilt) {
|
||
|
if (mstime()-sentinel.tilt_start_time < SENTINEL_TILT_PERIOD) return;
|
||
|
sentinel.tilt = 0;
|
||
|
sentinelEvent(REDIS_WARNING,"-tilt",NULL,"#tilt mode exited");
|
||
|
}
|
||
|
|
||
|
/* Every kind of instance */
|
||
|
sentinelCheckSubjectivelyDown(ri);
|
||
|
|
||
|
/* Masters and slaves */
|
||
|
if (ri->flags & (SRI_MASTER|SRI_SLAVE)) {
|
||
|
/* Nothing so far. */
|
||
|
}
|
||
|
|
||
|
/* Only masters */
|
||
|
if (ri->flags & SRI_MASTER) {
|
||
|
sentinelCheckObjectivelyDown(ri);
|
||
|
sentinelStartFailover(ri);
|
||
|
sentinelFailoverStateMachine(ri);
|
||
|
sentinelAbortFailoverIfNeeded(ri);
|
||
|
}
|
||
|
}
|
||
|
|
||
|
/* Perform scheduled operations for all the instances in the dictionary.
|
||
|
* Recursively call the function against dictionaries of slaves. */
|
||
|
void sentinelHandleDictOfRedisInstances(dict *instances) {
|
||
|
dictIterator *di;
|
||
|
dictEntry *de;
|
||
|
sentinelRedisInstance *switch_to_promoted = NULL;
|
||
|
|
||
|
/* There are a number of things we need to perform against every master. */
|
||
|
di = dictGetIterator(instances);
|
||
|
while((de = dictNext(di)) != NULL) {
|
||
|
sentinelRedisInstance *ri = dictGetVal(de);
|
||
|
|
||
|
sentinelHandleRedisInstance(ri);
|
||
|
if (ri->flags & SRI_MASTER) {
|
||
|
sentinelHandleDictOfRedisInstances(ri->slaves);
|
||
|
sentinelHandleDictOfRedisInstances(ri->sentinels);
|
||
|
if (ri->failover_state == SENTINEL_FAILOVER_STATE_UPDATE_CONFIG) {
|
||
|
switch_to_promoted = ri;
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
if (switch_to_promoted)
|
||
|
sentinelFailoverSwitchToPromotedSlave(switch_to_promoted);
|
||
|
dictReleaseIterator(di);
|
||
|
}
|
||
|
|
||
|
/* This function checks if we need to enter the TITL mode.
|
||
|
*
|
||
|
* The TILT mode is entered if we detect that between two invocations of the
|
||
|
* timer interrupt, a negative amount of time, or too much time has passed.
|
||
|
* Note that we expect that more or less just 100 milliseconds will pass
|
||
|
* if everything is fine. However we'll see a negative number or a
|
||
|
* difference bigger than SENTINEL_TILT_TRIGGER milliseconds if one of the
|
||
|
* following conditions happen:
|
||
|
*
|
||
|
* 1) The Sentiel process for some time is blocked, for every kind of
|
||
|
* random reason: the load is huge, the computer was freezed for some time
|
||
|
* in I/O or alike, the process was stopped by a signal. Everything.
|
||
|
* 2) The system clock was altered significantly.
|
||
|
*
|
||
|
* Under both this conditions we'll see everything as timed out and failing
|
||
|
* without good reasons. Instead we enter the TILT mode and wait
|
||
|
* for SENTIENL_TILT_PERIOD to elapse before starting to act again.
|
||
|
*
|
||
|
* During TILT time we still collect information, we just do not act. */
|
||
|
void sentinelCheckTiltCondition(void) {
|
||
|
mstime_t now = mstime();
|
||
|
mstime_t delta = now - sentinel.previous_time;
|
||
|
|
||
|
if (delta < 0 || delta > SENTINEL_TILT_TRIGGER) {
|
||
|
sentinel.tilt = 1;
|
||
|
sentinel.tilt_start_time = mstime();
|
||
|
sentinelEvent(REDIS_WARNING,"+tilt",NULL,"#tilt mode entered");
|
||
|
}
|
||
|
sentinel.previous_time = mstime();
|
||
|
}
|
||
|
|
||
|
void sentinelTimer(void) {
|
||
|
sentinelCheckTiltCondition();
|
||
|
sentinelHandleDictOfRedisInstances(sentinel.masters);
|
||
|
}
|
||
|
|