mirror of
https://codeberg.org/redict/redict.git
synced 2025-01-23 16:48:27 -05:00
Cluster: replica migration implementation.
This feature allows slaves to migrate to orphaned masters (masters without working slaves), as long as a set of conditions are met, including the fact that the migrating slave needs to be in a master-slaves ring with at least another slave working.
This commit is contained in:
parent
5b4020fb42
commit
c2507b0ff6
133
src/cluster.c
133
src/cluster.c
@ -63,6 +63,7 @@ int clusterDelNodeSlots(clusterNode *node);
|
|||||||
int clusterNodeSetSlotBit(clusterNode *n, int slot);
|
int clusterNodeSetSlotBit(clusterNode *n, int slot);
|
||||||
void clusterSetMaster(clusterNode *n);
|
void clusterSetMaster(clusterNode *n);
|
||||||
void clusterHandleSlaveFailover(void);
|
void clusterHandleSlaveFailover(void);
|
||||||
|
void clusterHandleSlaveMigration(int max_slaves);
|
||||||
int bitmapTestBit(unsigned char *bitmap, int pos);
|
int bitmapTestBit(unsigned char *bitmap, int pos);
|
||||||
void clusterDoBeforeSleep(int flags);
|
void clusterDoBeforeSleep(int flags);
|
||||||
void clusterSendUpdate(clusterLink *link, clusterNode *node);
|
void clusterSendUpdate(clusterLink *link, clusterNode *node);
|
||||||
@ -579,6 +580,14 @@ void clusterNodeResetSlaves(clusterNode *n) {
|
|||||||
n->slaves = NULL;
|
n->slaves = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int clusterCountNonFailingSlaves(clusterNode *n) {
|
||||||
|
int j, okslaves = 0;
|
||||||
|
|
||||||
|
for (j = 0; j < n->numslaves; j++)
|
||||||
|
if (!nodeFailed(n->slaves[j])) okslaves++;
|
||||||
|
return okslaves;
|
||||||
|
}
|
||||||
|
|
||||||
void freeClusterNode(clusterNode *n) {
|
void freeClusterNode(clusterNode *n) {
|
||||||
sds nodename;
|
sds nodename;
|
||||||
|
|
||||||
@ -2096,6 +2105,90 @@ void clusterHandleSlaveFailover(void) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* -----------------------------------------------------------------------------
|
||||||
|
* CLUSTER slave migration
|
||||||
|
*
|
||||||
|
* Slave migration is the process that allows a slave of a master that is
|
||||||
|
* already covered by at least another slave, to "migrate" to a master that
|
||||||
|
* is orpaned, that is, left with no working slaves.
|
||||||
|
* -------------------------------------------------------------------------- */
|
||||||
|
|
||||||
|
/* This function is responsible to decide if this replica should be migrated
|
||||||
|
* to a different (orphaned) master. It is called by the clusterCron() function
|
||||||
|
* only if:
|
||||||
|
*
|
||||||
|
* 1) We are a slave node.
|
||||||
|
* 2) It was detected that there is at least one orphaned master in
|
||||||
|
* the cluster.
|
||||||
|
* 3) We are a slave of one of the masters with the greatest number of
|
||||||
|
* slaves.
|
||||||
|
*
|
||||||
|
* This checks are performed by the caller since it requires to iterate
|
||||||
|
* the nodes anyway, so we spend time into clusterHandleSlaveMigration()
|
||||||
|
* if definitely needed.
|
||||||
|
*
|
||||||
|
* The fuction is called with a pre-computed max_slaves, that is the max
|
||||||
|
* number of working (not in FAIL state) slaves for a single master.
|
||||||
|
*
|
||||||
|
* Additional conditions for migration are examined inside the function.
|
||||||
|
*/
|
||||||
|
void clusterHandleSlaveMigration(int max_slaves) {
|
||||||
|
int j, okslaves = 0;
|
||||||
|
clusterNode *mymaster = myself->slaveof, *target = NULL, *candidate = NULL;
|
||||||
|
dictIterator *di;
|
||||||
|
dictEntry *de;
|
||||||
|
|
||||||
|
/* Step 1: Don't migrate if the cluster state is not ok. */
|
||||||
|
if (server.cluster->state != REDIS_CLUSTER_OK) return;
|
||||||
|
|
||||||
|
/* Step 2: Don't migrate if my master has just me as working slave. */
|
||||||
|
if (mymaster == NULL) return;
|
||||||
|
for (j = 0; j < mymaster->numslaves; j++)
|
||||||
|
if (!nodeFailed(mymaster->slaves[j]) &&
|
||||||
|
!nodeTimedOut(mymaster->slaves[j])) okslaves++;
|
||||||
|
if (okslaves == 1) return;
|
||||||
|
|
||||||
|
/* Step 3: Idenitfy a candidate for migration, and check if among the
|
||||||
|
* masters with the greatest number of ok slaves, I'm the one with the
|
||||||
|
* smaller node ID.
|
||||||
|
*
|
||||||
|
* Note that this means that eventually a replica migration will occurr
|
||||||
|
* since slaves that are reachable again always have their FAIL flag
|
||||||
|
* cleared. At the same time this does not mean that there are no
|
||||||
|
* race conditions possible (two slaves migrating at the same time), but
|
||||||
|
* this is extremely unlikely to happen, and harmless. */
|
||||||
|
candidate = myself;
|
||||||
|
di = dictGetSafeIterator(server.cluster->nodes);
|
||||||
|
while((de = dictNext(di)) != NULL) {
|
||||||
|
clusterNode *node = dictGetVal(de);
|
||||||
|
int okslaves;
|
||||||
|
|
||||||
|
/* Only iterate over working masters. */
|
||||||
|
if (nodeIsSlave(node) || nodeFailed(node)) continue;
|
||||||
|
okslaves = clusterCountNonFailingSlaves(node);
|
||||||
|
|
||||||
|
if (okslaves == 0 && target == NULL) target = node;
|
||||||
|
if (okslaves == max_slaves) {
|
||||||
|
for (j = 0; j < node->numslaves; j++) {
|
||||||
|
if (memcmp(node->slaves[j]->name,
|
||||||
|
candidate->name,
|
||||||
|
REDIS_CLUSTER_NAMELEN) < 0)
|
||||||
|
{
|
||||||
|
candidate = node->slaves[j];
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Step 4: perform the migration if there is a target, and if I'm the
|
||||||
|
* candidate. */
|
||||||
|
if (target && candidate == myself) {
|
||||||
|
redisLog(REDIS_WARNING,"Migrating to orphaned master %.40s",
|
||||||
|
target->name);
|
||||||
|
clusterSetMaster(target);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/* -----------------------------------------------------------------------------
|
/* -----------------------------------------------------------------------------
|
||||||
* CLUSTER cron job
|
* CLUSTER cron job
|
||||||
* -------------------------------------------------------------------------- */
|
* -------------------------------------------------------------------------- */
|
||||||
@ -2104,7 +2197,10 @@ void clusterHandleSlaveFailover(void) {
|
|||||||
void clusterCron(void) {
|
void clusterCron(void) {
|
||||||
dictIterator *di;
|
dictIterator *di;
|
||||||
dictEntry *de;
|
dictEntry *de;
|
||||||
int j, update_state = 0;
|
int update_state = 0;
|
||||||
|
int orphaned_masters; /* How many masters there are without ok slaves. */
|
||||||
|
int max_slaves; /* Max number of ok slaves for a single master. */
|
||||||
|
int this_slaves; /* Number of ok slaves for our master (if we are slave). */
|
||||||
mstime_t min_pong = 0, now = mstime();
|
mstime_t min_pong = 0, now = mstime();
|
||||||
clusterNode *min_pong_node = NULL;
|
clusterNode *min_pong_node = NULL;
|
||||||
static unsigned long long iteration = 0;
|
static unsigned long long iteration = 0;
|
||||||
@ -2175,6 +2271,8 @@ void clusterCron(void) {
|
|||||||
/* Ping some random node 1 time every 10 iterations, so that we usually ping
|
/* Ping some random node 1 time every 10 iterations, so that we usually ping
|
||||||
* one random node every second. */
|
* one random node every second. */
|
||||||
if (!(iteration % 10)) {
|
if (!(iteration % 10)) {
|
||||||
|
int j;
|
||||||
|
|
||||||
/* Check a few random nodes and ping the one with the oldest
|
/* Check a few random nodes and ping the one with the oldest
|
||||||
* pong_received time. */
|
* pong_received time. */
|
||||||
for (j = 0; j < 5; j++) {
|
for (j = 0; j < 5; j++) {
|
||||||
@ -2195,7 +2293,15 @@ void clusterCron(void) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Iterate nodes to check if we need to flag something as failing */
|
/* Iterate nodes to check if we need to flag something as failing.
|
||||||
|
* This loop is also responsible to:
|
||||||
|
* 1) Check if there are orphaned masters (masters without non failing
|
||||||
|
* slaves).
|
||||||
|
* 2) Count the max number of non failing slaves for a single master.
|
||||||
|
* 3) Count the number of slaves for our master, if we are a slave. */
|
||||||
|
orphaned_masters = 0;
|
||||||
|
max_slaves = 0;
|
||||||
|
this_slaves = 0;
|
||||||
di = dictGetSafeIterator(server.cluster->nodes);
|
di = dictGetSafeIterator(server.cluster->nodes);
|
||||||
while((de = dictNext(di)) != NULL) {
|
while((de = dictNext(di)) != NULL) {
|
||||||
clusterNode *node = dictGetVal(de);
|
clusterNode *node = dictGetVal(de);
|
||||||
@ -2251,6 +2357,17 @@ void clusterCron(void) {
|
|||||||
update_state = 1;
|
update_state = 1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* Orphaned master check, useful only if the current instance
|
||||||
|
* is a slave that may migrate to another master. */
|
||||||
|
if (nodeIsSlave(myself) && nodeIsMaster(node) && !nodeFailed(node)) {
|
||||||
|
int okslaves = clusterCountNonFailingSlaves(node);
|
||||||
|
|
||||||
|
if (okslaves == 0) orphaned_masters++;
|
||||||
|
if (okslaves > max_slaves) max_slaves = okslaves;
|
||||||
|
if (nodeIsSlave(myself) && myself->slaveof == node)
|
||||||
|
this_slaves = okslaves;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
dictReleaseIterator(di);
|
dictReleaseIterator(di);
|
||||||
|
|
||||||
@ -2265,7 +2382,17 @@ void clusterCron(void) {
|
|||||||
replicationSetMaster(myself->slaveof->ip, myself->slaveof->port);
|
replicationSetMaster(myself->slaveof->ip, myself->slaveof->port);
|
||||||
}
|
}
|
||||||
|
|
||||||
clusterHandleSlaveFailover();
|
if (nodeIsSlave(myself)) {
|
||||||
|
clusterHandleSlaveFailover();
|
||||||
|
/* If there are orphaned slaves, and we are a slave among the masters
|
||||||
|
* with the max number of non-failing slaves, consider migrating to
|
||||||
|
* the orphaned masters. Note that it does not make sense to try
|
||||||
|
* a migration if there is no master with at least *two* working
|
||||||
|
* slaves. */
|
||||||
|
if (orphaned_masters && max_slaves >= 2 && this_slaves == max_slaves)
|
||||||
|
clusterHandleSlaveMigration(max_slaves);
|
||||||
|
}
|
||||||
|
|
||||||
if (update_state || server.cluster->state == REDIS_CLUSTER_FAIL)
|
if (update_state || server.cluster->state == REDIS_CLUSTER_FAIL)
|
||||||
clusterUpdateState();
|
clusterUpdateState();
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user