mirror of
https://codeberg.org/redict/redict.git
synced 2025-01-22 08:08:53 -05:00
Cluster: function clusterGetSlaveRank() added.
Return the number of slaves for the same master having a better replication offset of the current slave, that is, the slave "rank" used to pick a delay before the request for election.
This commit is contained in:
parent
40cd38f0c4
commit
6f54032080
@ -1635,16 +1635,10 @@ void clusterBuildMessageHdr(clusterMsg *hdr, int type) {
|
||||
hdr->configEpoch = htonu64(master->configEpoch);
|
||||
|
||||
/* Set the replication offset. */
|
||||
if (nodeIsSlave(myself)) {
|
||||
if (server.master)
|
||||
offset = server.master->reploff;
|
||||
else if (server.cached_master)
|
||||
offset = server.cached_master->reploff;
|
||||
else
|
||||
offset = 0;
|
||||
} else {
|
||||
if (nodeIsSlave(myself))
|
||||
offset = replicationGetSlaveOffset();
|
||||
else
|
||||
offset = server.master_repl_offset;
|
||||
}
|
||||
hdr->offset = htonu64(offset);
|
||||
|
||||
/* Compute the message length for certain messages. For other messages
|
||||
@ -1927,6 +1921,34 @@ void clusterSendFailoverAuthIfNeeded(clusterNode *node, clusterMsg *request) {
|
||||
node->slaveof->voted_time = mstime();
|
||||
}
|
||||
|
||||
/* This function returns the "rank" of this instance, a slave, in the context
|
||||
* of its master-slaves ring. The rank of the slave is given by the number of
|
||||
* other slaves for the same master that have a better replication offset
|
||||
* compared to the local one (better means, greater, so they claim more data).
|
||||
*
|
||||
* A slave with rank 0 is the one with the greatest (most up to date)
|
||||
* replication offset, and so forth. Note that because how the rank is computed
|
||||
* multiple slaves may have the same rank, in case they have the same offset.
|
||||
*
|
||||
* The slave rank is used to add a delay to start an election in order to
|
||||
* get voted and replace a failing master. Slaves with better replication
|
||||
* offsets are more likely to win. */
|
||||
int clusterGetSlaveRank(void) {
|
||||
long long myoffset;
|
||||
int j, rank = 0;
|
||||
clusterNode *master;
|
||||
|
||||
redisAssert(nodeIsSlave(myself));
|
||||
master = myself->slaveof;
|
||||
if (master == NULL) return 0; /* Never called by slaves without master. */
|
||||
|
||||
myoffset = replicationGetSlaveOffset();
|
||||
for (j = 0; j < master->numslaves; j++)
|
||||
if (master->slaves[j] != myself &&
|
||||
master->slaves[j]->repl_offset > myoffset) rank++;
|
||||
return rank;
|
||||
}
|
||||
|
||||
/* This function is called if we are a slave node and our master serving
|
||||
* a non-zero amount of hash slots is in FAIL state.
|
||||
*
|
||||
|
@ -1073,6 +1073,7 @@ void processClientsWaitingReplicas(void);
|
||||
void unblockClientWaitingReplicas(redisClient *c);
|
||||
int replicationCountAcksByOffset(long long offset);
|
||||
void replicationSendNewlineToMaster(void);
|
||||
long long replicationGetSlaveOffset(void);
|
||||
|
||||
/* Generic persistence functions */
|
||||
void startLoading(FILE *fp);
|
||||
|
@ -1672,6 +1672,26 @@ void processClientsWaitingReplicas(void) {
|
||||
}
|
||||
}
|
||||
|
||||
/* Return the slave replication offset for this instance, that is
|
||||
* the offset for which we already processed the master replication stream. */
|
||||
long long replicationGetSlaveOffset(void) {
|
||||
long long offset = 0;
|
||||
|
||||
if (server.masterhost != NULL) {
|
||||
if (server.master) {
|
||||
offset = server.master->reploff;
|
||||
} else if (server.cached_master) {
|
||||
offset = server.cached_master->reploff;
|
||||
}
|
||||
}
|
||||
/* offset may be -1 when the master does not support it at all, however
|
||||
* this function is designed to return an offset that can express the
|
||||
* amount of data processed by the master, so we return a positive
|
||||
* integer. */
|
||||
if (offset < 0) offset = 0;
|
||||
return offset;
|
||||
}
|
||||
|
||||
/* --------------------------- REPLICATION CRON ---------------------------- */
|
||||
|
||||
/* Replication cron funciton, called 1 time per second. */
|
||||
|
Loading…
Reference in New Issue
Block a user