mirror of
https://codeberg.org/redict/redict.git
synced 2025-01-23 00:28:26 -05:00
Cluster Manager: compare key values after BUSYKEY error (migration).
If a key exists in the target node during a migration (BUSYKEY), the value of the key on both nodes (source and target) will be compared. If the key has the same value on both keys, the migration will be automatically retried with the REPLACE argument in order to override the target's key. If the key has different values, the behaviour will depend on such cases: - In case of 'fix' command, the migration will stop and the user will be warned to manually check the key(s). - In other cases (ie. reshard), if the user launched the command with the --cluster-replace option, the migration will be retried with the REPLACE argument, elsewhere the migration will stop and the user will be warned.
This commit is contained in:
parent
d935cfcb89
commit
143bfa1e6e
133
src/redis-cli.c
133
src/redis-cli.c
@ -2934,6 +2934,68 @@ static int clusterManagerSetSlotOwner(clusterManagerNode *owner,
|
|||||||
return success;
|
return success;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* Get the hash for the values of the specified keys in *keys_reply for the
|
||||||
|
* specified nodes *n1 and *n2, by calling DEBUG DIGEST-VALUE redis command
|
||||||
|
* on both nodes. Every key with same name on both nodes but having different
|
||||||
|
* values will be added to the *diffs list. Return 0 in case of reply
|
||||||
|
* error. */
|
||||||
|
static int clusterManagerCompareKeysValues(clusterManagerNode *n1,
|
||||||
|
clusterManagerNode *n2,
|
||||||
|
redisReply *keys_reply,
|
||||||
|
list *diffs)
|
||||||
|
{
|
||||||
|
size_t i, argc = keys_reply->elements + 2;
|
||||||
|
static const char *hash_zero = "0000000000000000000000000000000000000000";
|
||||||
|
char **argv = zcalloc(argc * sizeof(char *));
|
||||||
|
size_t *argv_len = zcalloc(argc * sizeof(size_t));
|
||||||
|
argv[0] = "DEBUG";
|
||||||
|
argv_len[0] = 5;
|
||||||
|
argv[1] = "DIGEST-VALUE";
|
||||||
|
argv_len[1] = 12;
|
||||||
|
for (i = 0; i < keys_reply->elements; i++) {
|
||||||
|
redisReply *entry = keys_reply->element[i];
|
||||||
|
int idx = i + 2;
|
||||||
|
argv[idx] = entry->str;
|
||||||
|
argv_len[idx] = entry->len;
|
||||||
|
}
|
||||||
|
int success = 0;
|
||||||
|
void *_reply1 = NULL, *_reply2 = NULL;
|
||||||
|
redisReply *r1 = NULL, *r2 = NULL;
|
||||||
|
redisAppendCommandArgv(n1->context,argc, (const char**)argv,argv_len);
|
||||||
|
success = (redisGetReply(n1->context, &_reply1) == REDIS_OK);
|
||||||
|
if (!success) goto cleanup;
|
||||||
|
r1 = (redisReply *) _reply1;
|
||||||
|
redisAppendCommandArgv(n2->context,argc, (const char**)argv,argv_len);
|
||||||
|
success = (redisGetReply(n2->context, &_reply2) == REDIS_OK);
|
||||||
|
if (!success) goto cleanup;
|
||||||
|
r2 = (redisReply *) _reply2;
|
||||||
|
success = (r1->type != REDIS_REPLY_ERROR && r2->type != REDIS_REPLY_ERROR);
|
||||||
|
if (r1->type == REDIS_REPLY_ERROR) {
|
||||||
|
CLUSTER_MANAGER_PRINT_REPLY_ERROR(n1, r1->str);
|
||||||
|
success = 0;
|
||||||
|
}
|
||||||
|
if (r2->type == REDIS_REPLY_ERROR) {
|
||||||
|
CLUSTER_MANAGER_PRINT_REPLY_ERROR(n2, r2->str);
|
||||||
|
success = 0;
|
||||||
|
}
|
||||||
|
if (!success) goto cleanup;
|
||||||
|
assert(keys_reply->elements == r1->elements &&
|
||||||
|
keys_reply->elements == r2->elements);
|
||||||
|
for (i = 0; i < keys_reply->elements; i++) {
|
||||||
|
char *key = keys_reply->element[i]->str;
|
||||||
|
char *hash1 = r1->element[i]->str;
|
||||||
|
char *hash2 = r2->element[i]->str;
|
||||||
|
/* Ignore keys that don't exist in both nodes. */
|
||||||
|
if (strcmp(hash1, hash_zero) == 0 || strcmp(hash2, hash_zero) == 0)
|
||||||
|
continue;
|
||||||
|
if (strcmp(hash1, hash2) != 0) listAddNodeTail(diffs, key);
|
||||||
|
}
|
||||||
|
cleanup:
|
||||||
|
if (r1) freeReplyObject(r1);
|
||||||
|
if (r2) freeReplyObject(r2);
|
||||||
|
return success;
|
||||||
|
}
|
||||||
|
|
||||||
/* Migrate keys taken from reply->elements. It returns the reply from the
|
/* Migrate keys taken from reply->elements. It returns the reply from the
|
||||||
* MIGRATE command, or NULL if something goes wrong. If the argument 'dots'
|
* MIGRATE command, or NULL if something goes wrong. If the argument 'dots'
|
||||||
* is not NULL, a dot will be printed for every migrated key. */
|
* is not NULL, a dot will be printed for every migrated key. */
|
||||||
@ -3014,8 +3076,10 @@ static int clusterManagerMigrateKeysInSlot(clusterManagerNode *source,
|
|||||||
char **err)
|
char **err)
|
||||||
{
|
{
|
||||||
int success = 1;
|
int success = 1;
|
||||||
int retry = (config.cluster_manager_command.flags &
|
int do_fix = config.cluster_manager_command.flags &
|
||||||
(CLUSTER_MANAGER_CMD_FLAG_FIX | CLUSTER_MANAGER_CMD_FLAG_REPLACE));
|
CLUSTER_MANAGER_CMD_FLAG_FIX;
|
||||||
|
int do_replace = config.cluster_manager_command.flags &
|
||||||
|
CLUSTER_MANAGER_CMD_FLAG_REPLACE;
|
||||||
while (1) {
|
while (1) {
|
||||||
char *dots = NULL;
|
char *dots = NULL;
|
||||||
redisReply *reply = NULL, *migrate_reply = NULL;
|
redisReply *reply = NULL, *migrate_reply = NULL;
|
||||||
@ -3049,6 +3113,8 @@ static int clusterManagerMigrateKeysInSlot(clusterManagerNode *source,
|
|||||||
int is_busy = strstr(migrate_reply->str, "BUSYKEY") != NULL;
|
int is_busy = strstr(migrate_reply->str, "BUSYKEY") != NULL;
|
||||||
int not_served = 0;
|
int not_served = 0;
|
||||||
if (!is_busy) {
|
if (!is_busy) {
|
||||||
|
/* Check if the slot is unassigned (not served) in the
|
||||||
|
* source node's configuration. */
|
||||||
char *get_owner_err = NULL;
|
char *get_owner_err = NULL;
|
||||||
clusterManagerNode *served_by =
|
clusterManagerNode *served_by =
|
||||||
clusterManagerGetSlotOwner(source, slot, &get_owner_err);
|
clusterManagerGetSlotOwner(source, slot, &get_owner_err);
|
||||||
@ -3061,20 +3127,69 @@ static int clusterManagerMigrateKeysInSlot(clusterManagerNode *source,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (retry && (is_busy || not_served)) {
|
/* Try to handle errors. */
|
||||||
/* If the key already exists, try to migrate keys
|
if (is_busy || not_served) {
|
||||||
* adding REPLACE option.
|
/* If the key's slot is not served, try to assign slot
|
||||||
* If the key's slot is not served, try to assign slot
|
|
||||||
* to the target node. */
|
* to the target node. */
|
||||||
if (not_served) {
|
if (do_fix && not_served) {
|
||||||
clusterManagerLogWarn("*** Slot was not served, setting "
|
clusterManagerLogWarn("*** Slot was not served, setting "
|
||||||
"owner to node %s:%d.\n",
|
"owner to node %s:%d.\n",
|
||||||
target->ip, target->port);
|
target->ip, target->port);
|
||||||
clusterManagerSetSlot(source, target, slot, "node", NULL);
|
clusterManagerSetSlot(source, target, slot, "node", NULL);
|
||||||
}
|
}
|
||||||
|
/* If the key already exists in the target node (BUSYKEY),
|
||||||
|
* check whether its value is the same in both nodes.
|
||||||
|
* In case of equal values, retry migration with the
|
||||||
|
* REPLACE option.
|
||||||
|
* In case of different values:
|
||||||
|
* - If the migration is requested by the fix command, stop
|
||||||
|
* and warn the user.
|
||||||
|
* - In other cases (ie. reshard), proceed only if the user
|
||||||
|
* launched the command with the --cluster-replace option.*/
|
||||||
if (is_busy) {
|
if (is_busy) {
|
||||||
clusterManagerLogWarn("*** Target key exists. "
|
clusterManagerLogWarn("\n*** Target key exists, "
|
||||||
"Replacing it for FIX.\n");
|
"checking values...\n");
|
||||||
|
list *diffs = listCreate();
|
||||||
|
success = clusterManagerCompareKeysValues(source,
|
||||||
|
target, reply, diffs);
|
||||||
|
if (!success && (do_fix || !do_replace)) {
|
||||||
|
listRelease(diffs);
|
||||||
|
clusterManagerLogErr("*** Value check failed!\n");
|
||||||
|
goto next;
|
||||||
|
}
|
||||||
|
if (listLength(diffs) > 0 && (do_fix || !do_replace)) {
|
||||||
|
success = 0;
|
||||||
|
clusterManagerLogErr(
|
||||||
|
"*** Found %d key(s) in both source node and "
|
||||||
|
"target node having different values.\n"
|
||||||
|
" Source node: %s:%d\n"
|
||||||
|
" Target node: %s:%d\n"
|
||||||
|
" Keys(s):\n",
|
||||||
|
listLength(diffs),
|
||||||
|
source->ip, source->port,
|
||||||
|
target->ip, target->port);
|
||||||
|
listIter dli;
|
||||||
|
listNode *dln;
|
||||||
|
listRewind(diffs, &dli);
|
||||||
|
while((dln = listNext(&dli)) != NULL) {
|
||||||
|
char *k = dln->value;
|
||||||
|
clusterManagerLogErr(" - %s\n", k);
|
||||||
|
}
|
||||||
|
clusterManagerLogErr("Please fix the above key(s) "
|
||||||
|
"manually ");
|
||||||
|
if (do_fix)
|
||||||
|
clusterManagerLogErr("and try again!\n");
|
||||||
|
else {
|
||||||
|
clusterManagerLogErr("or relaunch the command "
|
||||||
|
"with --cluster-replace "
|
||||||
|
"option to force key "
|
||||||
|
"overriding.\n");
|
||||||
|
}
|
||||||
|
listRelease(diffs);
|
||||||
|
goto next;
|
||||||
|
}
|
||||||
|
listRelease(diffs);
|
||||||
|
clusterManagerLogWarn("*** Replacing target keys...\n");
|
||||||
}
|
}
|
||||||
freeReplyObject(migrate_reply);
|
freeReplyObject(migrate_reply);
|
||||||
migrate_reply = clusterManagerMigrateKeysInReply(source,
|
migrate_reply = clusterManagerMigrateKeysInReply(source,
|
||||||
|
Loading…
Reference in New Issue
Block a user