Cluster: update configEpoch after manually messing with slots.

This commit is contained in:
antirez 2014-02-10 18:01:58 +01:00
parent be0bb19fd3
commit 6fc53e16ad

View File

@ -1115,8 +1115,7 @@ void clusterUpdateSlotsConfigWith(clusterNode *sender, uint64_t senderConfigEpoc
* 2) The new node claims it with a greater configEpoch. */ * 2) The new node claims it with a greater configEpoch. */
if (server.cluster->slots[j] == sender) continue; if (server.cluster->slots[j] == sender) continue;
if (server.cluster->slots[j] == NULL || if (server.cluster->slots[j] == NULL ||
server.cluster->slots[j]->configEpoch < server.cluster->slots[j]->configEpoch < senderConfigEpoch)
senderConfigEpoch)
{ {
if (server.cluster->slots[j] == curmaster) if (server.cluster->slots[j] == curmaster)
newmaster = sender; newmaster = sender;
@ -3164,14 +3163,26 @@ void clusterCommand(redisClient *c) {
* itself also clears the importing status. */ * itself also clears the importing status. */
if (n == myself && if (n == myself &&
server.cluster->importing_slots_from[slot]) server.cluster->importing_slots_from[slot])
{
clusterNode *old_owner =
server.cluster->importing_slots_from[slot];
/* This slot was manually migrated, set this node configEpoch
* at least to the value of the configEpoch of the old owner
* so that its old replicas, or some of its old message pending
* on the cluster bus, can't claim our slot. */
if (old_owner->configEpoch > myself->configEpoch)
myself->configEpoch = old_owner->configEpoch;
server.cluster->importing_slots_from[slot] = NULL; server.cluster->importing_slots_from[slot] = NULL;
}
clusterDelSlot(slot); clusterDelSlot(slot);
clusterAddSlot(n,slot); clusterAddSlot(n,slot);
} else { } else {
addReplyError(c,"Invalid CLUSTER SETSLOT action or number of arguments"); addReplyError(c,"Invalid CLUSTER SETSLOT action or number of arguments");
return; return;
} }
clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_SAVE_CONFIG); clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
CLUSTER_TODO_UPDATE_STATE|
CLUSTER_TODO_FSYNC_CONFIG);
addReply(c,shared.ok); addReply(c,shared.ok);
} else if (!strcasecmp(c->argv[1]->ptr,"info") && c->argc == 2) { } else if (!strcasecmp(c->argv[1]->ptr,"info") && c->argc == 2) {
/* CLUSTER INFO */ /* CLUSTER INFO */