mirror of
https://codeberg.org/redict/redict.git
synced 2025-01-22 16:18:28 -05:00
cluster import/export of hash slots implemented in the query redirection engine
This commit is contained in:
parent
a5dce40726
commit
eda827f8b7
@ -1605,12 +1605,19 @@ file_rd_err:
|
|||||||
/* Return the pointer to the cluster node that is able to serve the query
|
/* Return the pointer to the cluster node that is able to serve the query
|
||||||
* as all the keys belong to hash slots for which the node is in charge.
|
* as all the keys belong to hash slots for which the node is in charge.
|
||||||
*
|
*
|
||||||
* If keys in query spawn multiple nodes NULL is returned. */
|
* If the returned node should be used only for this request, the *ask
|
||||||
clusterNode *getNodeByQuery(redisClient *c, struct redisCommand *cmd, robj **argv, int argc, int *hashslot) {
|
* integer is set to '1', otherwise to '0'. This is used in order to
|
||||||
|
* let the caller know if we should reply with -MOVED or with -ASK.
|
||||||
|
*
|
||||||
|
* If the request contains more than a single key NULL is returned,
|
||||||
|
* however a request with more then a key argument where the key is always
|
||||||
|
* the same is valid, like in: RPOPLPUSH mylist mylist.*/
|
||||||
|
clusterNode *getNodeByQuery(redisClient *c, struct redisCommand *cmd, robj **argv, int argc, int *hashslot, int *ask) {
|
||||||
clusterNode *n = NULL;
|
clusterNode *n = NULL;
|
||||||
|
robj *firstkey = NULL;
|
||||||
multiState *ms, _ms;
|
multiState *ms, _ms;
|
||||||
multiCmd mc;
|
multiCmd mc;
|
||||||
int i;
|
int i, slot = 0;
|
||||||
|
|
||||||
/* We handle all the cases as if they were EXEC commands, so we have
|
/* We handle all the cases as if they were EXEC commands, so we have
|
||||||
* a common code path for everything */
|
* a common code path for everything */
|
||||||
@ -1620,7 +1627,9 @@ clusterNode *getNodeByQuery(redisClient *c, struct redisCommand *cmd, robj **arg
|
|||||||
if (!(c->flags & REDIS_MULTI)) return server.cluster.myself;
|
if (!(c->flags & REDIS_MULTI)) return server.cluster.myself;
|
||||||
ms = &c->mstate;
|
ms = &c->mstate;
|
||||||
} else {
|
} else {
|
||||||
/* Create a fake Multi State structure, with just one command */
|
/* In order to have a single codepath create a fake Multi State
|
||||||
|
* structure if the client is not in MULTI/EXEC state, this way
|
||||||
|
* we have a single codepath below. */
|
||||||
ms = &_ms;
|
ms = &_ms;
|
||||||
_ms.commands = &mc;
|
_ms.commands = &mc;
|
||||||
_ms.count = 1;
|
_ms.count = 1;
|
||||||
@ -1629,6 +1638,8 @@ clusterNode *getNodeByQuery(redisClient *c, struct redisCommand *cmd, robj **arg
|
|||||||
mc.cmd = cmd;
|
mc.cmd = cmd;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* Check that all the keys are the same key, and get the slot and
|
||||||
|
* node for this key. */
|
||||||
for (i = 0; i < ms->count; i++) {
|
for (i = 0; i < ms->count; i++) {
|
||||||
struct redisCommand *mcmd;
|
struct redisCommand *mcmd;
|
||||||
robj **margv;
|
robj **margv;
|
||||||
@ -1641,24 +1652,48 @@ clusterNode *getNodeByQuery(redisClient *c, struct redisCommand *cmd, robj **arg
|
|||||||
keyindex = getKeysFromCommand(mcmd,margv,margc,&numkeys,
|
keyindex = getKeysFromCommand(mcmd,margv,margc,&numkeys,
|
||||||
REDIS_GETKEYS_PRELOAD);
|
REDIS_GETKEYS_PRELOAD);
|
||||||
for (j = 0; j < numkeys; j++) {
|
for (j = 0; j < numkeys; j++) {
|
||||||
int slot = keyHashSlot((char*)margv[keyindex[j]]->ptr,
|
if (firstkey == NULL) {
|
||||||
sdslen(margv[keyindex[j]]->ptr));
|
/* This is the first key we see. Check what is the slot
|
||||||
struct clusterNode *slotnode;
|
* and node. */
|
||||||
|
firstkey = margv[keyindex[j]];
|
||||||
|
|
||||||
slotnode = server.cluster.slots[slot];
|
slot = keyHashSlot((char*)firstkey->ptr, sdslen(firstkey->ptr));
|
||||||
if (hashslot) *hashslot = slot;
|
n = server.cluster.slots[slot];
|
||||||
/* Node not assigned? (Should never happen actually
|
redisAssert(n != NULL);
|
||||||
* if we reached this function).
|
|
||||||
* Different node than the previous one?
|
|
||||||
* Return NULL, the cluster can't serve multi-node requests */
|
|
||||||
if (slotnode == NULL || (n && slotnode != n)) {
|
|
||||||
getKeysFreeResult(keyindex);
|
|
||||||
return NULL;
|
|
||||||
} else {
|
} else {
|
||||||
n = slotnode;
|
/* If it is not the first key, make sure it is exactly
|
||||||
|
* the same key as the first we saw. */
|
||||||
|
if (!equalStringObjects(firstkey,margv[keyindex[j]])) {
|
||||||
|
decrRefCount(firstkey);
|
||||||
|
getKeysFreeResult(keyindex);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
getKeysFreeResult(keyindex);
|
getKeysFreeResult(keyindex);
|
||||||
}
|
}
|
||||||
return (n == NULL) ? server.cluster.myself : n;
|
if (ask) *ask = 0; /* This is the default. Set to 1 if needed later. */
|
||||||
|
/* No key at all in command? then we can serve the request
|
||||||
|
* without redirections. */
|
||||||
|
if (n == NULL) return server.cluster.myself;
|
||||||
|
if (hashslot) *hashslot = slot;
|
||||||
|
/* This request is about a slot we are migrating into another instance?
|
||||||
|
* Then we need to check if we have the key. If we have it we can reply.
|
||||||
|
* If instead is a new key, we pass the request to the node that is
|
||||||
|
* receiving the slot. */
|
||||||
|
if (n == server.cluster.myself &&
|
||||||
|
server.cluster.migrating_slots_to[slot] != NULL)
|
||||||
|
{
|
||||||
|
if (lookupKeyRead(&server.db[0],firstkey) == NULL) {
|
||||||
|
if (ask) *ask = 1;
|
||||||
|
return server.cluster.migrating_slots_to[slot];
|
||||||
|
}
|
||||||
|
}
|
||||||
|
/* Handle the case in which we are receiving this hash slot from
|
||||||
|
* another instance, so we'll accept the query even if in the table
|
||||||
|
* it is assigned to a different node. */
|
||||||
|
if (server.cluster.importing_slots_from[slot] != NULL)
|
||||||
|
return server.cluster.myself;
|
||||||
|
/* It's not a -ASK case. Base case: just return the right node. */
|
||||||
|
return n;
|
||||||
}
|
}
|
||||||
|
@ -1096,13 +1096,15 @@ int processCommand(redisClient *c) {
|
|||||||
addReplyError(c,"The cluster is down. Check with CLUSTER INFO for more information");
|
addReplyError(c,"The cluster is down. Check with CLUSTER INFO for more information");
|
||||||
return REDIS_OK;
|
return REDIS_OK;
|
||||||
} else {
|
} else {
|
||||||
clusterNode *n = getNodeByQuery(c,cmd,c->argv,c->argc,&hashslot);
|
int ask;
|
||||||
|
clusterNode *n = getNodeByQuery(c,cmd,c->argv,c->argc,&hashslot,&ask);
|
||||||
if (n == NULL) {
|
if (n == NULL) {
|
||||||
addReplyError(c,"Invalid cross-node request");
|
addReplyError(c,"Multi keys request invalid in cluster");
|
||||||
return REDIS_OK;
|
return REDIS_OK;
|
||||||
} else if (n != server.cluster.myself) {
|
} else if (n != server.cluster.myself) {
|
||||||
addReplySds(c,sdscatprintf(sdsempty(),
|
addReplySds(c,sdscatprintf(sdsempty(),
|
||||||
"-MOVED %d %s:%d\r\n",hashslot,n->ip,n->port));
|
"-%s %d %s:%d\r\n", ask ? "ASK" : "MOVED",
|
||||||
|
hashslot,n->ip,n->port));
|
||||||
return REDIS_OK;
|
return REDIS_OK;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1083,7 +1083,7 @@ unsigned int keyHashSlot(char *key, int keylen);
|
|||||||
clusterNode *createClusterNode(char *nodename, int flags);
|
clusterNode *createClusterNode(char *nodename, int flags);
|
||||||
int clusterAddNode(clusterNode *node);
|
int clusterAddNode(clusterNode *node);
|
||||||
void clusterCron(void);
|
void clusterCron(void);
|
||||||
clusterNode *getNodeByQuery(redisClient *c, struct redisCommand *cmd, robj **argv, int argc, int *hashslot);
|
clusterNode *getNodeByQuery(redisClient *c, struct redisCommand *cmd, robj **argv, int argc, int *hashslot, int *ask);
|
||||||
|
|
||||||
/* Git SHA1 */
|
/* Git SHA1 */
|
||||||
char *redisGitSHA1(void);
|
char *redisGitSHA1(void);
|
||||||
|
Loading…
Reference in New Issue
Block a user