Cluster: better handling of stolen slots.

The previous code handling a lost slot (by another master with an higher
configuration for the slot) was defensive, considering it an error and
putting the cluster in an odd state requiring redis-cli fix.

This was changed, because actually this only happens either in a
legitimate way, with failovers, or when the admin messed with the config
in order to reconfigure the cluster. So the new code instead will try to
make sure that the keys stored match the new slots map, by removing all
the keys in the slots we lost ownership from.

The function that deletes the keys from the lost slots is called only
if the node does not lose all its slots (resulting in a reconfiguration
as a slave of the node that got ownership). This is an optimization
since the replication code will anyway flush all the instance data in
a faster way.
This commit is contained in:
antirez 2014-05-14 10:46:37 +02:00
parent 27ca133d35
commit 6baac558d8
3 changed files with 47 additions and 16 deletions

View File

@ -1212,6 +1212,15 @@ void clusterSetNodeAsMaster(clusterNode *n) {
void clusterUpdateSlotsConfigWith(clusterNode *sender, uint64_t senderConfigEpoch, unsigned char *slots) {
int j;
clusterNode *curmaster, *newmaster = NULL;
/* The dirty slots list is a list of slots for which we lose the ownership
* while having still keys inside. This usually happens after a failover
* or after a manual cluster reconfiguration operated by the admin.
*
* If the update message is not able to demote a master to slave (in this
* case we'll resync with the master updating the whole key space), we
* need to delete all the keys in the slots we lost ownership. */
uint16_t dirty_slots[REDIS_CLUSTER_SLOTS];
int dirty_slots_count = 0;
/* Here we set curmaster to this node or the node this node
* replicates to if it's a slave. In the for loop we are
@ -1241,25 +1250,14 @@ void clusterUpdateSlotsConfigWith(clusterNode *sender, uint64_t senderConfigEpoc
if (server.cluster->slots[j] == NULL ||
server.cluster->slots[j]->configEpoch < senderConfigEpoch)
{
/* Was this slot mine, and still contains keys? Something
* odd happened, put the slot in importing state so that
* redis-trib fix can detect the condition (and no further
* updates will be processed before the slot gets fixed). */
/* Was this slot mine, and still contains keys? Mark it as
* a dirty slot. */
if (server.cluster->slots[j] == myself &&
countKeysInSlot(j) &&
sender != myself)
{
redisLog(REDIS_WARNING,
"I received an update for slot %d. "
"%.40s claims it with config %llu, "
"I've it assigned to myself with config %llu. "
"I've still keys about this slot! "
"Putting the slot in IMPORTING state. "
"Please run the 'redis-trib fix' command.",
j, sender->name,
(unsigned long long) senderConfigEpoch,
(unsigned long long) myself->configEpoch);
server.cluster->importing_slots_from[j] = sender;
dirty_slots[dirty_slots_count] = j;
dirty_slots_count++;
}
if (server.cluster->slots[j] == curmaster)
@ -1288,6 +1286,16 @@ void clusterUpdateSlotsConfigWith(clusterNode *sender, uint64_t senderConfigEpoc
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
CLUSTER_TODO_UPDATE_STATE|
CLUSTER_TODO_FSYNC_CONFIG);
} else if (dirty_slots_count) {
/* If we are here, we received an update message which removed
* ownership for certain slots we still have keys about, but still
* we are serving some slots, so this master node was not demoted to
* a slave.
*
* In order to maintain a consistent state between keys and slots
* we need to remove all the keys from the slots we lost. */
for (j = 0; j < dirty_slots_count; j++)
delKeysInSlot(dirty_slots[j]);
}
}

View File

@ -1152,6 +1152,28 @@ unsigned int getKeysInSlot(unsigned int hashslot, robj **keys, unsigned int coun
return j;
}
/* Remove all the keys in the specified hash slot.
* The number of removed items is returned. */
unsigned int delKeysInSlot(unsigned int hashslot) {
zskiplistNode *n;
zrangespec range;
int j = 0;
range.min = range.max = hashslot;
range.minex = range.maxex = 0;
n = zslFirstInRange(server.cluster->slots_to_keys, &range);
while(n && n->score == hashslot) {
robj *key = n->obj;
n = n->level[0].forward; /* Go to the next item before freeing it. */
incrRefCount(key); /* Protect the object while freeing it. */
dbDelete(&server.db[0],key);
decrRefCount(key);
j++;
}
return j;
}
unsigned int countKeysInSlot(unsigned int hashslot) {
zskiplist *zsl = server.cluster->slots_to_keys;
zskiplistNode *zn;

View File

@ -1271,6 +1271,7 @@ void signalModifiedKey(redisDb *db, robj *key);
void signalFlushedDb(int dbid);
unsigned int getKeysInSlot(unsigned int hashslot, robj **keys, unsigned int count);
unsigned int countKeysInSlot(unsigned int hashslot);
unsigned int delKeysInSlot(unsigned int hashslot);
int verifyClusterConfigWithData(void);
void scanGenericCommand(redisClient *c, robj *o, unsigned long cursor);
int parseScanCursorOrReply(redisClient *c, robj *o, unsigned long *cursor);