mirror of
https://codeberg.org/redict/redict.git
synced 2025-01-23 00:28:26 -05:00
Cluster: introduce data_received field.
We want to send pings and pongs at specific intervals, since our packets also contain information about the configuration of the cluster and are used for gossip. However since our cluster bus is used in a mixed way for data (such as Pub/Sub or modules cluster messages) and metadata, sometimes a very busy channel may delay the reception of pong packets. So after discussing it in #7216, this commit introduces a new field that is not exposed in the cluster, is only an internal information about the last time we received any data from a given node: we use this field in order to avoid detecting failures, claiming data reception of new data from the node is a proof of liveness.
This commit is contained in:
parent
e17f9311c8
commit
00a3bc4359
@ -749,6 +749,7 @@ clusterNode *createClusterNode(char *nodename, int flags) {
|
|||||||
node->slaves = NULL;
|
node->slaves = NULL;
|
||||||
node->slaveof = NULL;
|
node->slaveof = NULL;
|
||||||
node->ping_sent = node->pong_received = 0;
|
node->ping_sent = node->pong_received = 0;
|
||||||
|
node->data_received = 0;
|
||||||
node->fail_time = 0;
|
node->fail_time = 0;
|
||||||
node->link = NULL;
|
node->link = NULL;
|
||||||
memset(node->ip,0,sizeof(node->ip));
|
memset(node->ip,0,sizeof(node->ip));
|
||||||
@ -1678,6 +1679,7 @@ int clusterProcessPacket(clusterLink *link) {
|
|||||||
clusterMsg *hdr = (clusterMsg*) link->rcvbuf;
|
clusterMsg *hdr = (clusterMsg*) link->rcvbuf;
|
||||||
uint32_t totlen = ntohl(hdr->totlen);
|
uint32_t totlen = ntohl(hdr->totlen);
|
||||||
uint16_t type = ntohs(hdr->type);
|
uint16_t type = ntohs(hdr->type);
|
||||||
|
mstime_t now = mstime();
|
||||||
|
|
||||||
if (type < CLUSTERMSG_TYPE_COUNT)
|
if (type < CLUSTERMSG_TYPE_COUNT)
|
||||||
server.cluster->stats_bus_messages_received[type]++;
|
server.cluster->stats_bus_messages_received[type]++;
|
||||||
@ -1741,6 +1743,13 @@ int clusterProcessPacket(clusterLink *link) {
|
|||||||
|
|
||||||
/* Check if the sender is a known node. */
|
/* Check if the sender is a known node. */
|
||||||
sender = clusterLookupNode(hdr->sender);
|
sender = clusterLookupNode(hdr->sender);
|
||||||
|
|
||||||
|
/* Update the last time we saw any data from this node. We
|
||||||
|
* use this in order to avoid detecting a timeout from a node that
|
||||||
|
* is just sending a lot of data in the cluster bus, for instance
|
||||||
|
* because of Pub/Sub. */
|
||||||
|
if (sender) sender->data_received = now;
|
||||||
|
|
||||||
if (sender && !nodeInHandshake(sender)) {
|
if (sender && !nodeInHandshake(sender)) {
|
||||||
/* Update our curretEpoch if we see a newer epoch in the cluster. */
|
/* Update our curretEpoch if we see a newer epoch in the cluster. */
|
||||||
senderCurrentEpoch = ntohu64(hdr->currentEpoch);
|
senderCurrentEpoch = ntohu64(hdr->currentEpoch);
|
||||||
@ -1755,7 +1764,7 @@ int clusterProcessPacket(clusterLink *link) {
|
|||||||
}
|
}
|
||||||
/* Update the replication offset info for this node. */
|
/* Update the replication offset info for this node. */
|
||||||
sender->repl_offset = ntohu64(hdr->offset);
|
sender->repl_offset = ntohu64(hdr->offset);
|
||||||
sender->repl_offset_time = mstime();
|
sender->repl_offset_time = now;
|
||||||
/* If we are a slave performing a manual failover and our master
|
/* If we are a slave performing a manual failover and our master
|
||||||
* sent its offset while already paused, populate the MF state. */
|
* sent its offset while already paused, populate the MF state. */
|
||||||
if (server.cluster->mf_end &&
|
if (server.cluster->mf_end &&
|
||||||
@ -1869,7 +1878,7 @@ int clusterProcessPacket(clusterLink *link) {
|
|||||||
* address. */
|
* address. */
|
||||||
serverLog(LL_DEBUG,"PONG contains mismatching sender ID. About node %.40s added %d ms ago, having flags %d",
|
serverLog(LL_DEBUG,"PONG contains mismatching sender ID. About node %.40s added %d ms ago, having flags %d",
|
||||||
link->node->name,
|
link->node->name,
|
||||||
(int)(mstime()-(link->node->ctime)),
|
(int)(now-(link->node->ctime)),
|
||||||
link->node->flags);
|
link->node->flags);
|
||||||
link->node->flags |= CLUSTER_NODE_NOADDR;
|
link->node->flags |= CLUSTER_NODE_NOADDR;
|
||||||
link->node->ip[0] = '\0';
|
link->node->ip[0] = '\0';
|
||||||
@ -1904,7 +1913,7 @@ int clusterProcessPacket(clusterLink *link) {
|
|||||||
|
|
||||||
/* Update our info about the node */
|
/* Update our info about the node */
|
||||||
if (link->node && type == CLUSTERMSG_TYPE_PONG) {
|
if (link->node && type == CLUSTERMSG_TYPE_PONG) {
|
||||||
link->node->pong_received = mstime();
|
link->node->pong_received = now;
|
||||||
link->node->ping_sent = 0;
|
link->node->ping_sent = 0;
|
||||||
|
|
||||||
/* The PFAIL condition can be reversed without external
|
/* The PFAIL condition can be reversed without external
|
||||||
@ -2051,7 +2060,7 @@ int clusterProcessPacket(clusterLink *link) {
|
|||||||
"FAIL message received from %.40s about %.40s",
|
"FAIL message received from %.40s about %.40s",
|
||||||
hdr->sender, hdr->data.fail.about.nodename);
|
hdr->sender, hdr->data.fail.about.nodename);
|
||||||
failing->flags |= CLUSTER_NODE_FAIL;
|
failing->flags |= CLUSTER_NODE_FAIL;
|
||||||
failing->fail_time = mstime();
|
failing->fail_time = now;
|
||||||
failing->flags &= ~CLUSTER_NODE_PFAIL;
|
failing->flags &= ~CLUSTER_NODE_PFAIL;
|
||||||
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
|
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
|
||||||
CLUSTER_TODO_UPDATE_STATE);
|
CLUSTER_TODO_UPDATE_STATE);
|
||||||
@ -2104,9 +2113,9 @@ int clusterProcessPacket(clusterLink *link) {
|
|||||||
/* Manual failover requested from slaves. Initialize the state
|
/* Manual failover requested from slaves. Initialize the state
|
||||||
* accordingly. */
|
* accordingly. */
|
||||||
resetManualFailover();
|
resetManualFailover();
|
||||||
server.cluster->mf_end = mstime() + CLUSTER_MF_TIMEOUT;
|
server.cluster->mf_end = now + CLUSTER_MF_TIMEOUT;
|
||||||
server.cluster->mf_slave = sender;
|
server.cluster->mf_slave = sender;
|
||||||
pauseClients(mstime()+(CLUSTER_MF_TIMEOUT*CLUSTER_MF_PAUSE_MULT));
|
pauseClients(now+(CLUSTER_MF_TIMEOUT*CLUSTER_MF_PAUSE_MULT));
|
||||||
serverLog(LL_WARNING,"Manual failover requested by replica %.40s.",
|
serverLog(LL_WARNING,"Manual failover requested by replica %.40s.",
|
||||||
sender->name);
|
sender->name);
|
||||||
} else if (type == CLUSTERMSG_TYPE_UPDATE) {
|
} else if (type == CLUSTERMSG_TYPE_UPDATE) {
|
||||||
@ -3529,7 +3538,6 @@ void clusterCron(void) {
|
|||||||
while((de = dictNext(di)) != NULL) {
|
while((de = dictNext(di)) != NULL) {
|
||||||
clusterNode *node = dictGetVal(de);
|
clusterNode *node = dictGetVal(de);
|
||||||
now = mstime(); /* Use an updated time at every iteration. */
|
now = mstime(); /* Use an updated time at every iteration. */
|
||||||
mstime_t delay;
|
|
||||||
|
|
||||||
if (node->flags &
|
if (node->flags &
|
||||||
(CLUSTER_NODE_MYSELF|CLUSTER_NODE_NOADDR|CLUSTER_NODE_HANDSHAKE))
|
(CLUSTER_NODE_MYSELF|CLUSTER_NODE_NOADDR|CLUSTER_NODE_HANDSHAKE))
|
||||||
@ -3553,7 +3561,7 @@ void clusterCron(void) {
|
|||||||
this_slaves = okslaves;
|
this_slaves = okslaves;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* If we are waiting for the PONG more than half the cluster
|
/* If we are not receiving any data for more than half the cluster
|
||||||
* timeout, reconnect the link: maybe there is a connection
|
* timeout, reconnect the link: maybe there is a connection
|
||||||
* issue even if the node is alive. */
|
* issue even if the node is alive. */
|
||||||
if (node->link && /* is connected */
|
if (node->link && /* is connected */
|
||||||
@ -3562,7 +3570,9 @@ void clusterCron(void) {
|
|||||||
node->ping_sent && /* we already sent a ping */
|
node->ping_sent && /* we already sent a ping */
|
||||||
node->pong_received < node->ping_sent && /* still waiting pong */
|
node->pong_received < node->ping_sent && /* still waiting pong */
|
||||||
/* and we are waiting for the pong more than timeout/2 */
|
/* and we are waiting for the pong more than timeout/2 */
|
||||||
now - node->ping_sent > server.cluster_node_timeout/2)
|
now - node->ping_sent > server.cluster_node_timeout/2 &&
|
||||||
|
/* and in such interval we are not seeing any traffic at all. */
|
||||||
|
now - node->data_received > server.cluster_node_timeout/2)
|
||||||
{
|
{
|
||||||
/* Disconnect the link, it will be reconnected automatically. */
|
/* Disconnect the link, it will be reconnected automatically. */
|
||||||
freeClusterLink(node->link);
|
freeClusterLink(node->link);
|
||||||
@ -3597,7 +3607,13 @@ void clusterCron(void) {
|
|||||||
/* Compute the delay of the PONG. Note that if we already received
|
/* Compute the delay of the PONG. Note that if we already received
|
||||||
* the PONG, then node->ping_sent is zero, so can't reach this
|
* the PONG, then node->ping_sent is zero, so can't reach this
|
||||||
* code at all. */
|
* code at all. */
|
||||||
delay = now - node->ping_sent;
|
mstime_t delay = now - node->ping_sent;
|
||||||
|
|
||||||
|
/* We consider every incoming data as proof of liveness, since
|
||||||
|
* our cluster bus link is also used for data: under heavy data
|
||||||
|
* load pong delays are possible. */
|
||||||
|
mstime_t data_delay = now - node->data_received;
|
||||||
|
if (data_delay < delay) delay = data_delay;
|
||||||
|
|
||||||
if (delay > server.cluster_node_timeout) {
|
if (delay > server.cluster_node_timeout) {
|
||||||
/* Timeout reached. Set the node as possibly failing if it is
|
/* Timeout reached. Set the node as possibly failing if it is
|
||||||
|
@ -124,6 +124,7 @@ typedef struct clusterNode {
|
|||||||
tables. */
|
tables. */
|
||||||
mstime_t ping_sent; /* Unix time we sent latest ping */
|
mstime_t ping_sent; /* Unix time we sent latest ping */
|
||||||
mstime_t pong_received; /* Unix time we received the pong */
|
mstime_t pong_received; /* Unix time we received the pong */
|
||||||
|
mstime_t data_received; /* Unix time we received any data */
|
||||||
mstime_t fail_time; /* Unix time when FAIL flag was set */
|
mstime_t fail_time; /* Unix time when FAIL flag was set */
|
||||||
mstime_t voted_time; /* Last time we voted for a slave of this master */
|
mstime_t voted_time; /* Last time we voted for a slave of this master */
|
||||||
mstime_t repl_offset_time; /* Unix time we received offset for this node */
|
mstime_t repl_offset_time; /* Unix time we received offset for this node */
|
||||||
|
Loading…
Reference in New Issue
Block a user