diff --git a/src/cluster.c b/src/cluster.c index d8f83f08d..78524bd34 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -749,6 +749,7 @@ clusterNode *createClusterNode(char *nodename, int flags) { node->slaves = NULL; node->slaveof = NULL; node->ping_sent = node->pong_received = 0; + node->data_received = 0; node->fail_time = 0; node->link = NULL; memset(node->ip,0,sizeof(node->ip)); @@ -1678,6 +1679,7 @@ int clusterProcessPacket(clusterLink *link) { clusterMsg *hdr = (clusterMsg*) link->rcvbuf; uint32_t totlen = ntohl(hdr->totlen); uint16_t type = ntohs(hdr->type); + mstime_t now = mstime(); if (type < CLUSTERMSG_TYPE_COUNT) server.cluster->stats_bus_messages_received[type]++; @@ -1741,6 +1743,13 @@ int clusterProcessPacket(clusterLink *link) { /* Check if the sender is a known node. */ sender = clusterLookupNode(hdr->sender); + + /* Update the last time we saw any data from this node. We + * use this in order to avoid detecting a timeout from a node that + * is just sending a lot of data in the cluster bus, for instance + * because of Pub/Sub. */ + if (sender) sender->data_received = now; + if (sender && !nodeInHandshake(sender)) { /* Update our curretEpoch if we see a newer epoch in the cluster. */ senderCurrentEpoch = ntohu64(hdr->currentEpoch); @@ -1755,7 +1764,7 @@ int clusterProcessPacket(clusterLink *link) { } /* Update the replication offset info for this node. */ sender->repl_offset = ntohu64(hdr->offset); - sender->repl_offset_time = mstime(); + sender->repl_offset_time = now; /* If we are a slave performing a manual failover and our master * sent its offset while already paused, populate the MF state. */ if (server.cluster->mf_end && @@ -1869,7 +1878,7 @@ int clusterProcessPacket(clusterLink *link) { * address. */ serverLog(LL_DEBUG,"PONG contains mismatching sender ID. About node %.40s added %d ms ago, having flags %d", link->node->name, - (int)(mstime()-(link->node->ctime)), + (int)(now-(link->node->ctime)), link->node->flags); link->node->flags |= CLUSTER_NODE_NOADDR; link->node->ip[0] = '\0'; @@ -1904,7 +1913,7 @@ int clusterProcessPacket(clusterLink *link) { /* Update our info about the node */ if (link->node && type == CLUSTERMSG_TYPE_PONG) { - link->node->pong_received = mstime(); + link->node->pong_received = now; link->node->ping_sent = 0; /* The PFAIL condition can be reversed without external @@ -2051,7 +2060,7 @@ int clusterProcessPacket(clusterLink *link) { "FAIL message received from %.40s about %.40s", hdr->sender, hdr->data.fail.about.nodename); failing->flags |= CLUSTER_NODE_FAIL; - failing->fail_time = mstime(); + failing->fail_time = now; failing->flags &= ~CLUSTER_NODE_PFAIL; clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG| CLUSTER_TODO_UPDATE_STATE); @@ -2104,9 +2113,9 @@ int clusterProcessPacket(clusterLink *link) { /* Manual failover requested from slaves. Initialize the state * accordingly. */ resetManualFailover(); - server.cluster->mf_end = mstime() + CLUSTER_MF_TIMEOUT; + server.cluster->mf_end = now + CLUSTER_MF_TIMEOUT; server.cluster->mf_slave = sender; - pauseClients(mstime()+(CLUSTER_MF_TIMEOUT*CLUSTER_MF_PAUSE_MULT)); + pauseClients(now+(CLUSTER_MF_TIMEOUT*CLUSTER_MF_PAUSE_MULT)); serverLog(LL_WARNING,"Manual failover requested by replica %.40s.", sender->name); } else if (type == CLUSTERMSG_TYPE_UPDATE) { @@ -3529,7 +3538,6 @@ void clusterCron(void) { while((de = dictNext(di)) != NULL) { clusterNode *node = dictGetVal(de); now = mstime(); /* Use an updated time at every iteration. */ - mstime_t delay; if (node->flags & (CLUSTER_NODE_MYSELF|CLUSTER_NODE_NOADDR|CLUSTER_NODE_HANDSHAKE)) @@ -3553,7 +3561,7 @@ void clusterCron(void) { this_slaves = okslaves; } - /* If we are waiting for the PONG more than half the cluster + /* If we are not receiving any data for more than half the cluster * timeout, reconnect the link: maybe there is a connection * issue even if the node is alive. */ if (node->link && /* is connected */ @@ -3562,7 +3570,9 @@ void clusterCron(void) { node->ping_sent && /* we already sent a ping */ node->pong_received < node->ping_sent && /* still waiting pong */ /* and we are waiting for the pong more than timeout/2 */ - now - node->ping_sent > server.cluster_node_timeout/2) + now - node->ping_sent > server.cluster_node_timeout/2 && + /* and in such interval we are not seeing any traffic at all. */ + now - node->data_received > server.cluster_node_timeout/2) { /* Disconnect the link, it will be reconnected automatically. */ freeClusterLink(node->link); @@ -3597,7 +3607,13 @@ void clusterCron(void) { /* Compute the delay of the PONG. Note that if we already received * the PONG, then node->ping_sent is zero, so can't reach this * code at all. */ - delay = now - node->ping_sent; + mstime_t delay = now - node->ping_sent; + + /* We consider every incoming data as proof of liveness, since + * our cluster bus link is also used for data: under heavy data + * load pong delays are possible. */ + mstime_t data_delay = now - node->data_received; + if (data_delay < delay) delay = data_delay; if (delay > server.cluster_node_timeout) { /* Timeout reached. Set the node as possibly failing if it is diff --git a/src/cluster.h b/src/cluster.h index 35fc0cbfa..d3af4a355 100644 --- a/src/cluster.h +++ b/src/cluster.h @@ -124,6 +124,7 @@ typedef struct clusterNode { tables. */ mstime_t ping_sent; /* Unix time we sent latest ping */ mstime_t pong_received; /* Unix time we received the pong */ + mstime_t data_received; /* Unix time we received any data */ mstime_t fail_time; /* Unix time when FAIL flag was set */ mstime_t voted_time; /* Last time we voted for a slave of this master */ mstime_t repl_offset_time; /* Unix time we received offset for this node */