mirror of
https://codeberg.org/redict/redict.git
synced 2025-01-23 00:28:26 -05:00
Cluster: manual failover initial implementation.
This commit is contained in:
parent
4919a13f50
commit
4cf0cd5719
165
src/cluster.c
165
src/cluster.c
@ -67,6 +67,7 @@ void clusterHandleSlaveMigration(int max_slaves);
|
||||
int bitmapTestBit(unsigned char *bitmap, int pos);
|
||||
void clusterDoBeforeSleep(int flags);
|
||||
void clusterSendUpdate(clusterLink *link, clusterNode *node);
|
||||
void resetManualFailover(void);
|
||||
|
||||
/* -----------------------------------------------------------------------------
|
||||
* Initialization
|
||||
@ -344,6 +345,7 @@ void clusterInit(void) {
|
||||
|
||||
/* The slots -> keys map is a sorted set. Init it. */
|
||||
server.cluster->slots_to_keys = zslCreate();
|
||||
resetManualFailover();
|
||||
}
|
||||
|
||||
/* -----------------------------------------------------------------------------
|
||||
@ -1167,7 +1169,9 @@ int clusterProcessPacket(clusterLink *link) {
|
||||
ntohl(hdr->data.publish.msg.message_len);
|
||||
if (totlen != explen) return 1;
|
||||
} else if (type == CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST ||
|
||||
type == CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK) {
|
||||
type == CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK ||
|
||||
type == CLUSTERMSG_TYPE_MFSTART)
|
||||
{
|
||||
uint32_t explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
|
||||
|
||||
if (totlen != explen) return 1;
|
||||
@ -1194,6 +1198,17 @@ int clusterProcessPacket(clusterLink *link) {
|
||||
/* Update the replication offset info for this node. */
|
||||
sender->repl_offset = ntohu64(hdr->offset);
|
||||
sender->repl_offset_time = mstime();
|
||||
/* If we are a slave performing a manual failover and our master
|
||||
* sent its offset while already paused, populate the MF state. */
|
||||
if (server.cluster->mf_end &&
|
||||
nodeIsSlave(myself) &&
|
||||
myself->slaveof == sender &&
|
||||
hdr->mflags[0] & CLUSTERMSG_FLAG0_PAUSED &&
|
||||
server.cluster->mf_master_offset == 0)
|
||||
{
|
||||
server.cluster->mf_master_offset = sender->repl_offset;
|
||||
redisLog(REDIS_WARNING,"Received replication offset for paused master manual failover: %lld", server.cluster->mf_master_offset);
|
||||
}
|
||||
}
|
||||
|
||||
/* Process packets by type. */
|
||||
@ -1464,6 +1479,18 @@ int clusterProcessPacket(clusterLink *link) {
|
||||
* we check ASAP. */
|
||||
clusterDoBeforeSleep(CLUSTER_TODO_HANDLE_FAILOVER);
|
||||
}
|
||||
} else if (type == CLUSTERMSG_TYPE_MFSTART) {
|
||||
/* This message is acceptable only if I'm a master and the sender
|
||||
* is one of my slaves. */
|
||||
if (!sender || sender->slaveof != myself) return 1;
|
||||
/* Manual failover requested from slaves. Initialize the state
|
||||
* accordingly. */
|
||||
resetManualFailover();
|
||||
server.cluster->mf_end = mstime() + REDIS_CLUSTER_MF_TIMEOUT;
|
||||
server.cluster->mf_slave = sender;
|
||||
pauseClients(mstime()+(REDIS_CLUSTER_MF_TIMEOUT*2));
|
||||
redisLog(REDIS_WARNING,"Manual failover requested by slave %.40s.",
|
||||
sender->name);
|
||||
} else if (type == CLUSTERMSG_TYPE_UPDATE) {
|
||||
clusterNode *n; /* The node the update is about. */
|
||||
uint64_t reportedConfigEpoch = ntohu64(hdr->data.update.nodecfg.configEpoch);
|
||||
@ -1651,6 +1678,10 @@ void clusterBuildMessageHdr(clusterMsg *hdr, int type) {
|
||||
offset = server.master_repl_offset;
|
||||
hdr->offset = htonu64(offset);
|
||||
|
||||
/* Set the message flags. */
|
||||
if (nodeIsMaster(myself) && server.cluster->mf_end)
|
||||
hdr->mflags[0] |= CLUSTERMSG_FLAG0_PAUSED;
|
||||
|
||||
/* Compute the message length for certain messages. For other messages
|
||||
* this is up to the caller. */
|
||||
if (type == CLUSTERMSG_TYPE_FAIL) {
|
||||
@ -1883,6 +1914,19 @@ void clusterSendFailoverAuth(clusterNode *node) {
|
||||
clusterSendMessage(node->link,buf,totlen);
|
||||
}
|
||||
|
||||
/* Send a MFSTART message to the specified node. */
|
||||
void clusterSendMFStart(clusterNode *node) {
|
||||
unsigned char buf[sizeof(clusterMsg)];
|
||||
clusterMsg *hdr = (clusterMsg*) buf;
|
||||
uint32_t totlen;
|
||||
|
||||
if (!node->link) return;
|
||||
clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_MFSTART);
|
||||
totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
|
||||
hdr->totlen = htonl(totlen);
|
||||
clusterSendMessage(node->link,buf,totlen);
|
||||
}
|
||||
|
||||
/* Vote for the node asking for our vote if there are the conditions. */
|
||||
void clusterSendFailoverAuthIfNeeded(clusterNode *node, clusterMsg *request) {
|
||||
clusterNode *master = node->slaveof;
|
||||
@ -1975,11 +2019,11 @@ void clusterHandleSlaveFailover(void) {
|
||||
|
||||
/* Pre conditions to run the function:
|
||||
* 1) We are a slave.
|
||||
* 2) Our master is flagged as FAIL.
|
||||
* 2) Our master is flagged as FAIL, or this is a manual failover.
|
||||
* 3) It is serving slots. */
|
||||
if (nodeIsMaster(myself) ||
|
||||
myself->slaveof == NULL ||
|
||||
!nodeFailed(myself->slaveof) ||
|
||||
(!nodeFailed(myself->slaveof) && server.cluster->mf_end == 0) ||
|
||||
myself->slaveof->numslots == 0) return;
|
||||
|
||||
/* Set data_age to the number of seconds we are disconnected from
|
||||
@ -2019,6 +2063,11 @@ void clusterHandleSlaveFailover(void) {
|
||||
* less updated replication offset, are penalized. */
|
||||
server.cluster->failover_auth_time +=
|
||||
server.cluster->failover_auth_rank * 1000;
|
||||
/* However if this is a manual failover, no delay is needed. */
|
||||
if (server.cluster->mf_end) {
|
||||
server.cluster->failover_auth_time = mstime();
|
||||
server.cluster->failover_auth_rank = 0;
|
||||
}
|
||||
redisLog(REDIS_WARNING,
|
||||
"Start of election delayed for %lld milliseconds "
|
||||
"(rank #%d, offset %lld).",
|
||||
@ -2035,7 +2084,9 @@ void clusterHandleSlaveFailover(void) {
|
||||
/* It is possible that we received more updated offsets from other
|
||||
* slaves for the same master since we computed our election delay.
|
||||
* Update the delay if our rank changed. */
|
||||
if (server.cluster->failover_auth_sent == 0) {
|
||||
if (server.cluster->failover_auth_sent == 0 &&
|
||||
server.cluster->mf_end == 0)
|
||||
{
|
||||
int newrank = clusterGetSlaveRank();
|
||||
if (newrank > server.cluster->failover_auth_rank) {
|
||||
long long added_delay =
|
||||
@ -2102,6 +2153,9 @@ void clusterHandleSlaveFailover(void) {
|
||||
/* 5) Pong all the other nodes so that they can update the state
|
||||
* accordingly and detect that we switched to master role. */
|
||||
clusterBroadcastPong(CLUSTER_BROADCAST_ALL);
|
||||
|
||||
/* 6) If there was a manual failover in progress, clear the state. */
|
||||
resetManualFailover();
|
||||
}
|
||||
}
|
||||
|
||||
@ -2190,6 +2244,79 @@ void clusterHandleSlaveMigration(int max_slaves) {
|
||||
}
|
||||
}
|
||||
|
||||
/* -----------------------------------------------------------------------------
|
||||
* CLUSTER manual failover
|
||||
*
|
||||
* This are the important steps performed by slaves during a manual failover:
|
||||
* 1) User send CLUSTER FAILOVER command. The failover state is initialized
|
||||
* setting mf_end to the millisecond unix time at which we'll abort the
|
||||
* attempt.
|
||||
* 2) Slave sends a MFSTART message to the master requesting to pause clients
|
||||
* for two times the manual failover timeout REDIS_CLUSTER_MF_TIMEOUT.
|
||||
* When master is paused for manual failover, it also starts to flag
|
||||
* packets with CLUSTERMSG_FLAG0_PAUSED.
|
||||
* 3) Slave waits for master to send its replication offset flagged as PAUSED.
|
||||
* 4) If slave received the offset from the master, and its offset matches,
|
||||
* mf_can_start is set to 1, and clusterHandleSlaveFailover() will perform
|
||||
* the failover as usually, with the difference that the vote request
|
||||
* will be modified to force masters to vote for a slave that has a
|
||||
* working master.
|
||||
*
|
||||
* From the point of view of the master things are simpler: when a
|
||||
* PAUSE_CLIENTS packet is received the master sets mf_end as well and
|
||||
* the sender in mf_slave. During the time limit for the manual failover
|
||||
* the master will just send PINGs more often to this slave, flagged with
|
||||
* the PAUSED flag, so that the slave will set mf_master_offset when receiving
|
||||
* a packet from the master with this flag set.
|
||||
*
|
||||
* The gaol of the manual failover is to perform a fast failover without
|
||||
* data loss due to the asynchronous master-slave replication.
|
||||
* -------------------------------------------------------------------------- */
|
||||
|
||||
/* Reset the manual failover state. This works for both masters and slavesa
|
||||
* as all the state about manual failover is cleared.
|
||||
*
|
||||
* The function can be used both to initialize the manual failover state at
|
||||
* startup or to abort a manual failover in progress. */
|
||||
void resetManualFailover(void) {
|
||||
if (server.cluster->mf_end && clientsArePaused()) {
|
||||
server.clients_pause_end_time = 0;
|
||||
clientsArePaused(); /* Just use the side effect of the function. */
|
||||
}
|
||||
server.cluster->mf_end = 0; /* No manual failover in progress. */
|
||||
server.cluster->mf_can_start = 0;
|
||||
server.cluster->mf_slave = NULL;
|
||||
server.cluster->mf_master_offset = 0;
|
||||
}
|
||||
|
||||
/* If a manual failover timed out, abort it. */
|
||||
void manualFailoverCheckTimeout(void) {
|
||||
if (server.cluster->mf_end < mstime()) {
|
||||
redisLog(REDIS_WARNING,"Manual failover timed out.");
|
||||
resetManualFailover();
|
||||
}
|
||||
}
|
||||
|
||||
/* This function is called from the cluster cron function in order to go
|
||||
* forward with a manual failover state machine. */
|
||||
void clusterHandleManualFailover(void) {
|
||||
/* Return ASAP if no manual failover is in progress. */
|
||||
if (server.cluster->mf_end == 0) return;
|
||||
|
||||
/* If mf_can_start is non-zero, the failover was alrady triggered so the
|
||||
* next steps are performed by clusterHandleSlaveFailover(). */
|
||||
if (server.cluster->mf_can_start) return;
|
||||
|
||||
if (server.cluster->mf_master_offset == 0) return; /* Wait for offset... */
|
||||
|
||||
if (server.cluster->mf_master_offset == replicationGetSlaveOffset()) {
|
||||
/* Our replication offset matches the master replication offset
|
||||
* announced after clients were paused. We can start the failover. */
|
||||
server.cluster->mf_can_start = 1;
|
||||
redisLog(REDIS_WARNING,"All master replication stream processed, manual failover can start.");
|
||||
}
|
||||
}
|
||||
|
||||
/* -----------------------------------------------------------------------------
|
||||
* CLUSTER cron job
|
||||
* -------------------------------------------------------------------------- */
|
||||
@ -2351,6 +2478,17 @@ void clusterCron(void) {
|
||||
continue;
|
||||
}
|
||||
|
||||
/* If we are a master and one of the slaves requested a manual
|
||||
* failover, ping it continuously. */
|
||||
if (server.cluster->mf_end &&
|
||||
nodeIsMaster(myself) &&
|
||||
server.cluster->mf_slave == node &&
|
||||
node->link)
|
||||
{
|
||||
clusterSendPing(node->link, CLUSTERMSG_TYPE_PING);
|
||||
continue;
|
||||
}
|
||||
|
||||
/* Check only if we have an active ping for this instance. */
|
||||
if (node->ping_sent == 0) continue;
|
||||
|
||||
@ -2383,7 +2521,11 @@ void clusterCron(void) {
|
||||
replicationSetMaster(myself->slaveof->ip, myself->slaveof->port);
|
||||
}
|
||||
|
||||
/* Abourt a manual failover if the timeout is reached. */
|
||||
manualFailoverCheckTimeout();
|
||||
|
||||
if (nodeIsSlave(myself)) {
|
||||
clusterHandleManualFailover();
|
||||
clusterHandleSlaveFailover();
|
||||
/* If there are orphaned slaves, and we are a slave among the masters
|
||||
* with the max number of non-failing slaves, consider migrating to
|
||||
@ -2700,6 +2842,7 @@ void clusterSetMaster(clusterNode *n) {
|
||||
myself->slaveof = n;
|
||||
clusterNodeAddSlave(n,myself);
|
||||
replicationSetMaster(n->ip, n->port);
|
||||
resetManualFailover();
|
||||
}
|
||||
|
||||
/* -----------------------------------------------------------------------------
|
||||
@ -3152,6 +3295,20 @@ void clusterCommand(redisClient *c) {
|
||||
addReplyBulkCString(c,ni);
|
||||
sdsfree(ni);
|
||||
}
|
||||
} else if (!strcasecmp(c->argv[1]->ptr,"failover") && c->argc == 2) {
|
||||
if (nodeIsMaster(myself)) {
|
||||
addReplyError(c,"You should send CLUSTER FAILOVER to a slave");
|
||||
return;
|
||||
} else if (myself->slaveof == NULL || nodeFailed(myself->slaveof) ||
|
||||
myself->slaveof->link == NULL)
|
||||
{
|
||||
addReplyError(c,"Master is down or failed, "
|
||||
"please use CLUSTER FAILOVER FORCE");
|
||||
return;
|
||||
}
|
||||
resetManualFailover();
|
||||
server.cluster->mf_end = mstime() + REDIS_CLUSTER_MF_TIMEOUT;
|
||||
clusterSendMFStart(myself->slaveof);
|
||||
} else {
|
||||
addReplyError(c,"Wrong CLUSTER subcommand or number of arguments");
|
||||
}
|
||||
|
@ -22,6 +22,8 @@
|
||||
#define REDIS_CLUSTER_FAILOVER_AUTH_RETRY_MULT 4 /* Auth request retry time. */
|
||||
#define REDIS_CLUSTER_FAILOVER_DELAY 5 /* Seconds */
|
||||
#define REDIS_CLUSTER_DEFAULT_MIGRATION_BARRIER 1
|
||||
#define REDIS_CLUSTER_MF_TIMEOUT 5000 /* Milliseconds to do a manual failover. */
|
||||
#define REDIS_CLUSTER_MF_PAUSE_MULT 2 /* Master pause manual failover mult. */
|
||||
|
||||
struct clusterNode;
|
||||
|
||||
@ -100,6 +102,16 @@ typedef struct clusterState {
|
||||
int failover_auth_sent; /* True if we already asked for votes. */
|
||||
int failover_auth_rank; /* This slave rank for current auth request. */
|
||||
uint64_t failover_auth_epoch; /* Epoch of the current election. */
|
||||
/* Manual failover state in common. */
|
||||
mstime_t mf_end; /* Manual failover time limit (ms unixtime).
|
||||
It is zero if there is no MF in progress. */
|
||||
/* Manual failover state of master. */
|
||||
clusterNode *mf_slave; /* Slave performing the manual failover. */
|
||||
/* Manual failover state of slave. */
|
||||
long long mf_master_offset; /* Master offset the slave needs to start MF
|
||||
or zero if stil not received. */
|
||||
int mf_can_start; /* If non-zero signal that the manual failover
|
||||
can start requesting masters vote. */
|
||||
/* The followign fields are uesd by masters to take state on elections. */
|
||||
uint64_t last_vote_epoch; /* Epoch of the last vote granted. */
|
||||
int todo_before_sleep; /* Things to do in clusterBeforeSleep(). */
|
||||
@ -127,6 +139,7 @@ typedef struct clusterState {
|
||||
#define CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST 5 /* May I failover? */
|
||||
#define CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK 6 /* Yes, you have my vote */
|
||||
#define CLUSTERMSG_TYPE_UPDATE 7 /* Another node slots configuration */
|
||||
#define CLUSTERMSG_TYPE_MFSTART 8 /* Pause clients for manual failover */
|
||||
|
||||
/* Initially we don't know our "name", but we'll find it once we connect
|
||||
* to the first node, using the getsockname() function. Then we'll use this
|
||||
@ -180,6 +193,7 @@ union clusterMsgData {
|
||||
} update;
|
||||
};
|
||||
|
||||
|
||||
typedef struct {
|
||||
uint32_t totlen; /* Total length of this message */
|
||||
uint16_t type; /* Message type */
|
||||
@ -197,12 +211,16 @@ typedef struct {
|
||||
uint16_t port; /* Sender TCP base port */
|
||||
uint16_t flags; /* Sender node flags */
|
||||
unsigned char state; /* Cluster state from the POV of the sender */
|
||||
unsigned char notused2[3]; /* Reserved for future use. For alignment. */
|
||||
unsigned char mflags[3]; /* Message flags: CLUSTERMSG_FLAG[012]_... */
|
||||
union clusterMsgData data;
|
||||
} clusterMsg;
|
||||
|
||||
#define CLUSTERMSG_MIN_LEN (sizeof(clusterMsg)-sizeof(union clusterMsgData))
|
||||
|
||||
/* Message flags better specify the packet content or are used to
|
||||
* provide some information about the node state. */
|
||||
#define CLUSTERMSG_FLAG0_PAUSED (1<<0) /* Master paused for manual failover. */
|
||||
|
||||
/* ---------------------- API exported outside cluster.c -------------------- */
|
||||
clusterNode *getNodeByQuery(redisClient *c, struct redisCommand *cmd, robj **argv, int argc, int *hashslot, int *ask);
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user