mirror of
https://codeberg.org/redict/redict.git
synced 2025-01-22 16:18:28 -05:00
Cluster: react faster when a slave wins an election.
This commit is contained in:
parent
42fa46e49a
commit
7c4b8f29e7
@ -53,6 +53,7 @@ int clusterDelSlot(int slot);
|
|||||||
int clusterDelNodeSlots(clusterNode *node);
|
int clusterDelNodeSlots(clusterNode *node);
|
||||||
int clusterNodeSetSlotBit(clusterNode *n, int slot);
|
int clusterNodeSetSlotBit(clusterNode *n, int slot);
|
||||||
void clusterSetMaster(clusterNode *n);
|
void clusterSetMaster(clusterNode *n);
|
||||||
|
void clusterHandleSlaveFailover(void);
|
||||||
int bitmapTestBit(unsigned char *bitmap, int pos);
|
int bitmapTestBit(unsigned char *bitmap, int pos);
|
||||||
|
|
||||||
/* -----------------------------------------------------------------------------
|
/* -----------------------------------------------------------------------------
|
||||||
@ -1191,6 +1192,9 @@ int clusterProcessPacket(clusterLink *link) {
|
|||||||
senderCurrentEpoch == server.cluster->currentEpoch)
|
senderCurrentEpoch == server.cluster->currentEpoch)
|
||||||
{
|
{
|
||||||
server.cluster->failover_auth_count++;
|
server.cluster->failover_auth_count++;
|
||||||
|
/* Maybe we reached a quorum here, set a flag to make sure
|
||||||
|
* we check ASAP. */
|
||||||
|
server.cluster->handle_slave_failover_asap++;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
redisLog(REDIS_WARNING,"Received unknown packet type: %d", type);
|
redisLog(REDIS_WARNING,"Received unknown packet type: %d", type);
|
||||||
@ -1291,7 +1295,11 @@ void clusterReadHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Put stuff into the send buffer. */
|
/* Put stuff into the send buffer.
|
||||||
|
*
|
||||||
|
* It is guaranteed that this function will never have as a side effect
|
||||||
|
* the link to be invalidated, so it is safe to call this function
|
||||||
|
* from event handlers that will do stuff with the same link later. */
|
||||||
void clusterSendMessage(clusterLink *link, unsigned char *msg, size_t msglen) {
|
void clusterSendMessage(clusterLink *link, unsigned char *msg, size_t msglen) {
|
||||||
if (sdslen(link->sndbuf) == 0 && msglen != 0)
|
if (sdslen(link->sndbuf) == 0 && msglen != 0)
|
||||||
aeCreateFileEvent(server.el,link->fd,AE_WRITABLE,
|
aeCreateFileEvent(server.el,link->fd,AE_WRITABLE,
|
||||||
@ -1301,7 +1309,11 @@ void clusterSendMessage(clusterLink *link, unsigned char *msg, size_t msglen) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/* Send a message to all the nodes that are part of the cluster having
|
/* Send a message to all the nodes that are part of the cluster having
|
||||||
* a connected link. */
|
* a connected link.
|
||||||
|
*
|
||||||
|
* It is guaranteed that this function will never have as a side effect
|
||||||
|
* some node->link to be invalidated, so it is safe to call this function
|
||||||
|
* from event handlers that will do stuff with node links later. */
|
||||||
void clusterBroadcastMessage(void *buf, size_t len) {
|
void clusterBroadcastMessage(void *buf, size_t len) {
|
||||||
dictIterator *di;
|
dictIterator *di;
|
||||||
dictEntry *de;
|
dictEntry *de;
|
||||||
@ -1416,10 +1428,11 @@ void clusterSendPing(clusterLink *link, int type) {
|
|||||||
clusterSendMessage(link,buf,totlen);
|
clusterSendMessage(link,buf,totlen);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Send a PONG packet to every connected node that's not in handshake state.
|
/* Send a PONG packet to every connected node that's not in handshake state
|
||||||
|
* and for which we have a valid link.
|
||||||
*
|
*
|
||||||
* In Redis Cluster pings are not just used for failure detection, but also
|
* In Redis Cluster pongs are not used just for failure detection, but also
|
||||||
* to carry important configuration informations. So broadcasting a pong is
|
* to carry important configuration information. So broadcasting a pong is
|
||||||
* useful when something changes in the configuration and we want to make
|
* useful when something changes in the configuration and we want to make
|
||||||
* the cluster aware ASAP (for instance after a slave promotion). */
|
* the cluster aware ASAP (for instance after a slave promotion). */
|
||||||
void clusterBroadcastPong(void) {
|
void clusterBroadcastPong(void) {
|
||||||
@ -1430,6 +1443,7 @@ void clusterBroadcastPong(void) {
|
|||||||
while((de = dictNext(di)) != NULL) {
|
while((de = dictNext(di)) != NULL) {
|
||||||
clusterNode *node = dictGetVal(de);
|
clusterNode *node = dictGetVal(de);
|
||||||
|
|
||||||
|
if (!node->link) continue;
|
||||||
if (node->flags & (REDIS_NODE_MYSELF|REDIS_NODE_HANDSHAKE)) continue;
|
if (node->flags & (REDIS_NODE_MYSELF|REDIS_NODE_HANDSHAKE)) continue;
|
||||||
clusterSendPing(node->link,CLUSTERMSG_TYPE_PONG);
|
clusterSendPing(node->link,CLUSTERMSG_TYPE_PONG);
|
||||||
}
|
}
|
||||||
@ -1591,6 +1605,15 @@ void clusterHandleSlaveFailover(void) {
|
|||||||
int needed_quorum = (server.cluster->size / 2) + 1;
|
int needed_quorum = (server.cluster->size / 2) + 1;
|
||||||
int j;
|
int j;
|
||||||
|
|
||||||
|
/* Pre conditions to run the function:
|
||||||
|
* 1) We are a slave.
|
||||||
|
* 2) Our master is flagged as FAIL.
|
||||||
|
* 3) It is serving slots. */
|
||||||
|
if (!(server.cluster->myself->flags & REDIS_NODE_SLAVE) ||
|
||||||
|
server.cluster->myself->slaveof == NULL ||
|
||||||
|
!(server.cluster->myself->slaveof->flags & REDIS_NODE_FAIL) ||
|
||||||
|
server.cluster->myself->slaveof->numslots == 0) return;
|
||||||
|
|
||||||
/* Remove the node timeout from the data age as it is fine that we are
|
/* Remove the node timeout from the data age as it is fine that we are
|
||||||
* disconnected from our master at least for the time it was down to be
|
* disconnected from our master at least for the time it was down to be
|
||||||
* flagged as FAIL, that's the baseline. */
|
* flagged as FAIL, that's the baseline. */
|
||||||
@ -1834,19 +1857,21 @@ void clusterCron(void) {
|
|||||||
server.cluster->myself->slaveof->port);
|
server.cluster->myself->slaveof->port);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* If we are a slave and our master is down, but is serving slots,
|
clusterHandleSlaveFailover();
|
||||||
* call the function that handles the failover. */
|
|
||||||
if (server.cluster->myself->flags & REDIS_NODE_SLAVE &&
|
|
||||||
server.cluster->myself->slaveof &&
|
|
||||||
server.cluster->myself->slaveof->flags & REDIS_NODE_FAIL &&
|
|
||||||
server.cluster->myself->slaveof->numslots != 0)
|
|
||||||
{
|
|
||||||
clusterHandleSlaveFailover();
|
|
||||||
}
|
|
||||||
|
|
||||||
if (update_state) clusterUpdateState();
|
if (update_state) clusterUpdateState();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* This function is called before the event handler returns to sleep for
|
||||||
|
* events. It is useful to perform operations that must be done ASAP in
|
||||||
|
* reaction to events fired but that are not safe to perform inside event
|
||||||
|
* handlers. */
|
||||||
|
void clusterBeforeSleep(void) {
|
||||||
|
if (server.cluster->handle_slave_failover_asap) {
|
||||||
|
clusterHandleSlaveFailover();
|
||||||
|
server.cluster->handle_slave_failover_asap = 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/* -----------------------------------------------------------------------------
|
/* -----------------------------------------------------------------------------
|
||||||
* Slots management
|
* Slots management
|
||||||
* -------------------------------------------------------------------------- */
|
* -------------------------------------------------------------------------- */
|
||||||
|
@ -1203,6 +1203,9 @@ void beforeSleep(struct aeEventLoop *eventLoop) {
|
|||||||
|
|
||||||
/* Write the AOF buffer on disk */
|
/* Write the AOF buffer on disk */
|
||||||
flushAppendOnlyFile(0);
|
flushAppendOnlyFile(0);
|
||||||
|
|
||||||
|
/* Call the Redis Cluster before sleep function. */
|
||||||
|
if (server.cluster_enabled) clusterBeforeSleep();
|
||||||
}
|
}
|
||||||
|
|
||||||
/* =========================== Server initialization ======================== */
|
/* =========================== Server initialization ======================== */
|
||||||
|
@ -647,12 +647,13 @@ typedef struct {
|
|||||||
clusterNode *slots[REDIS_CLUSTER_SLOTS];
|
clusterNode *slots[REDIS_CLUSTER_SLOTS];
|
||||||
zskiplist *slots_to_keys;
|
zskiplist *slots_to_keys;
|
||||||
/* The following fields are used to take the slave state on elections. */
|
/* The following fields are used to take the slave state on elections. */
|
||||||
mstime_t failover_auth_time;/* Time at which we'll try to get elected in ms. */
|
mstime_t failover_auth_time;/* Time at which we'll try to get elected in ms*/
|
||||||
int failover_auth_count; /* Number of votes received so far. */
|
int failover_auth_count; /* Number of votes received so far. */
|
||||||
int failover_auth_sent; /* True if we already asked for votes. */
|
int failover_auth_sent; /* True if we already asked for votes. */
|
||||||
uint64_t failover_auth_epoch; /* Epoch of the current election. */
|
uint64_t failover_auth_epoch; /* Epoch of the current election. */
|
||||||
/* The followign fields are uesd by masters to take state on elections. */
|
/* The followign fields are uesd by masters to take state on elections. */
|
||||||
uint64_t last_vote_epoch; /* Epoch of the last vote granted. */
|
uint64_t last_vote_epoch; /* Epoch of the last vote granted. */
|
||||||
|
int handle_slave_failover_asap; /* Call clusterHandleSlaveFailover() ASAP. */
|
||||||
} clusterState;
|
} clusterState;
|
||||||
|
|
||||||
/* Redis cluster messages header */
|
/* Redis cluster messages header */
|
||||||
@ -1380,6 +1381,7 @@ void clusterCron(void);
|
|||||||
clusterNode *getNodeByQuery(redisClient *c, struct redisCommand *cmd, robj **argv, int argc, int *hashslot, int *ask);
|
clusterNode *getNodeByQuery(redisClient *c, struct redisCommand *cmd, robj **argv, int argc, int *hashslot, int *ask);
|
||||||
void clusterPropagatePublish(robj *channel, robj *message);
|
void clusterPropagatePublish(robj *channel, robj *message);
|
||||||
void migrateCloseTimedoutSockets(void);
|
void migrateCloseTimedoutSockets(void);
|
||||||
|
void clusterBeforeSleep(void);
|
||||||
|
|
||||||
/* Sentinel */
|
/* Sentinel */
|
||||||
void initSentinelConfig(void);
|
void initSentinelConfig(void);
|
||||||
|
Loading…
Reference in New Issue
Block a user