Cluster Manager: better fix subcommand.

This commit is contained in:
artix 2018-10-23 16:14:45 +02:00
parent be3a9dbb6f
commit 2e9859cbfc

View File

@ -2726,8 +2726,7 @@ static int clusterManagerSetSlot(clusterManagerNode *node1,
if (err != NULL) {
*err = zmalloc((reply->len + 1) * sizeof(char));
strcpy(*err, reply->str);
CLUSTER_MANAGER_PRINT_REPLY_ERROR(node1, err);
}
} else CLUSTER_MANAGER_PRINT_REPLY_ERROR(node1, reply->str);
goto cleanup;
}
cleanup:
@ -2848,14 +2847,21 @@ static int clusterManagerMigrateKeysInSlot(clusterManagerNode *source,
if (migrate_reply == NULL) goto next;
if (migrate_reply->type == REDIS_REPLY_ERROR) {
if (do_fix && strstr(migrate_reply->str, "BUSYKEY")) {
/* If the key already exists, try to migrate keys
* adding REPLACE option.
* If the key's slot is not served, try to assign slot
* to the target node. */
int is_busy = (strstr(migrate_reply->str, "BUSYKEY") != NULL);
if (strstr(migrate_reply->str, "slot not served") != NULL)
clusterManagerSetSlot(source, target, slot, "node", NULL);
clusterManagerLogWarn("*** Target key exists. "
"Replacing it for FIX.\n");
freeReplyObject(migrate_reply);
/* Try to migrate keys adding REPLACE option. */
migrate_reply = clusterManagerMigrateKeysInReply(source,
target,
reply,
1, timeout,
is_busy,
timeout,
NULL);
success = (migrate_reply != NULL &&
migrate_reply->type != REDIS_REPLY_ERROR);
@ -3670,14 +3676,22 @@ static int clusterManagerFixSlotsCoverage(char *all_slots) {
while ((nln = listNext(&nli)) != NULL) {
clusterManagerNode *src = nln->value;
if (src == target) continue;
/* Assign the slot to target node in the source node. */
redisReply *r = CLUSTER_MANAGER_COMMAND(src,
"CLUSTER SETSLOT %s %s %s", slot,
"NODE", target->name);
if (!clusterManagerCheckRedisReply(src, r, NULL))
fixed = -1;
if (r) freeReplyObject(r);
if (fixed < 0) goto cleanup;
/* Set the source node in 'importing' state
* (even if we will actually migrate keys away)
* in order to avoid receiving redirections
* for MIGRATE. */
redisReply *r = CLUSTER_MANAGER_COMMAND(src,
r = CLUSTER_MANAGER_COMMAND(src,
"CLUSTER SETSLOT %s %s %s", slot,
"IMPORTING", target->name);
if (!clusterManagerCheckRedisReply(target, r, NULL))
if (!clusterManagerCheckRedisReply(src, r, NULL))
fixed = -1;
if (r) freeReplyObject(r);
if (fixed < 0) goto cleanup;
@ -3687,6 +3701,13 @@ static int clusterManagerFixSlotsCoverage(char *all_slots) {
fixed = -1;
goto cleanup;
}
r = CLUSTER_MANAGER_COMMAND(src,
"CLUSTER SETSLOT %s %s", slot,
"STABLE");
if (!clusterManagerCheckRedisReply(src, r, NULL))
fixed = -1;
if (r) freeReplyObject(r);
if (fixed < 0) goto cleanup;
}
fixed++;
}
@ -3707,7 +3728,7 @@ static int clusterManagerFixOpenSlot(int slot) {
clusterManagerLogInfo(">>> Fixing open slot %d\n", slot);
/* Try to obtain the current slot owner, according to the current
* nodes configuration. */
int success = 1;
int success = 1, keys_in_multiple_nodes = 0;
list *owners = listCreate();
list *migrating = listCreate();
list *importing = listCreate();
@ -3720,11 +3741,23 @@ static int clusterManagerFixOpenSlot(int slot) {
while ((ln = listNext(&li)) != NULL) {
clusterManagerNode *n = ln->value;
if (n->flags & CLUSTER_MANAGER_FLAG_SLAVE) continue;
if (n->slots[slot]) {
if (owner == NULL) owner = n;
listAddNodeTail(owners, n);
if (n->slots[slot]) listAddNodeTail(owners, n);
else {
redisReply *r = CLUSTER_MANAGER_COMMAND(n,
"CLUSTER COUNTKEYSINSLOT %d", slot);
success = clusterManagerCheckRedisReply(n, r, NULL);
if (success && r->integer > 0) {
clusterManagerLogWarn("*** Found keys about slot %d "
"in non-owner node %s:%d!\n", slot,
n->ip, n->port);
listAddNodeTail(owners, n);
keys_in_multiple_nodes = 1;
}
if (r) freeReplyObject(r);
if (!success) goto cleanup;
}
}
if (listLength(owners) == 1) owner = listFirst(owners)->value;
listRewind(cluster_manager.nodes, &li);
while ((ln = listNext(&li)) != NULL) {
clusterManagerNode *n = ln->value;
@ -3767,6 +3800,9 @@ static int clusterManagerFixOpenSlot(int slot) {
clusterManagerLogWarn("*** Found keys about slot %d "
"in node %s:%d!\n", slot, n->ip,
n->port);
char *sep = (listLength(importing) == 0 ? "" : ",");
importing_str = sdscatfmt(importing_str, "%s%S:%u",
sep, n->ip, n->port);
listAddNodeTail(importing, n);
}
if (r) freeReplyObject(r);
@ -3799,6 +3835,15 @@ static int clusterManagerFixOpenSlot(int slot) {
success = clusterManagerCheckRedisReply(owner, reply, NULL);
if (reply) freeReplyObject(reply);
if (!success) goto cleanup;
/* Ensure that the slot is unassigned before assigning it to the
* owner. */
reply = CLUSTER_MANAGER_COMMAND(owner, "CLUSTER DELSLOTS %d", slot);
success = clusterManagerCheckRedisReply(owner, reply, NULL);
/* Ignore "already unassigned" error. */
if (!success && reply && reply->type == REDIS_REPLY_ERROR &&
strstr(reply->str, "already unassigned") != NULL) success = 1;
if (reply) freeReplyObject(reply);
if (!success) goto cleanup;
reply = CLUSTER_MANAGER_COMMAND(owner, "CLUSTER ADDSLOTS %d", slot);
success = clusterManagerCheckRedisReply(owner, reply, NULL);
if (reply) freeReplyObject(reply);
@ -3835,12 +3880,22 @@ static int clusterManagerFixOpenSlot(int slot) {
if (n == owner) continue;
reply = CLUSTER_MANAGER_COMMAND(n, "CLUSTER DELSLOTS %d", slot);
success = clusterManagerCheckRedisReply(n, reply, NULL);
/* Ignore "already unassigned" error. */
if (!success && reply && reply->type == REDIS_REPLY_ERROR &&
strstr(reply->str, "already unassigned") != NULL) success = 1;
if (reply) freeReplyObject(reply);
if (!success) goto cleanup;
n->slots[slot] = 0;
/* Assign the slot to the owner in the node 'n' configuration.' */
success = clusterManagerSetSlot(n, owner, slot, "node", NULL);
if (!success) goto cleanup;
success = clusterManagerSetSlot(n, owner, slot, "importing", NULL);
if (!success) goto cleanup;
clusterManagerRemoveNodeFromList(importing, n); //Avoid duplicates
/* Avoid duplicates. */
clusterManagerRemoveNodeFromList(importing, n);
listAddNodeTail(importing, n);
/* Ensure that the node is not in the migrating list. */
clusterManagerRemoveNodeFromList(migrating, n);
}
reply = CLUSTER_MANAGER_COMMAND(owner, "CLUSTER BUMPEPOCH");
success = clusterManagerCheckRedisReply(owner, reply, NULL);
@ -3883,18 +3938,21 @@ static int clusterManagerFixOpenSlot(int slot) {
listLength(migrating) == 1);
if (try_to_close_slot) {
clusterManagerNode *n = listFirst(migrating)->value;
redisReply *r = CLUSTER_MANAGER_COMMAND(n,
"CLUSTER GETKEYSINSLOT %d %d", slot, 10);
success = clusterManagerCheckRedisReply(n, r, NULL);
if (r) {
if (success) try_to_close_slot = (r->elements == 0);
freeReplyObject(r);
if (!owner || owner != n) {
redisReply *r = CLUSTER_MANAGER_COMMAND(n,
"CLUSTER GETKEYSINSLOT %d %d", slot, 10);
success = clusterManagerCheckRedisReply(n, r, NULL);
if (r) {
if (success) try_to_close_slot = (r->elements == 0);
freeReplyObject(r);
}
if (!success) goto cleanup;
}
if (!success) goto cleanup;
}
/* Case 3: There are no slots claiming to be in importing state, but
* there is a migrating node that actually don't have any key. We
* can just close the slot, probably a reshard interrupted in the middle. */
* there is a migrating node that actually don't have any key or is the
* slot owner. We can just close the slot, probably a reshard interrupted
* in the middle. */
if (try_to_close_slot) {
clusterManagerNode *n = listFirst(migrating)->value;
redisReply *r = CLUSTER_MANAGER_COMMAND(n, "CLUSTER SETSLOT %d %s",