From eda827f8b71b81a1170f5524c52bea7db249eb58 Mon Sep 17 00:00:00 2001 From: antirez Date: Thu, 5 May 2011 11:13:21 +0200 Subject: [PATCH] cluster import/export of hash slots implemented in the query redirection engine --- src/cluster.c | 71 ++++++++++++++++++++++++++++++++++++++------------- src/redis.c | 8 +++--- src/redis.h | 2 +- 3 files changed, 59 insertions(+), 22 deletions(-) diff --git a/src/cluster.c b/src/cluster.c index 958892012..b77a61b06 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -1605,12 +1605,19 @@ file_rd_err: /* Return the pointer to the cluster node that is able to serve the query * as all the keys belong to hash slots for which the node is in charge. * - * If keys in query spawn multiple nodes NULL is returned. */ -clusterNode *getNodeByQuery(redisClient *c, struct redisCommand *cmd, robj **argv, int argc, int *hashslot) { + * If the returned node should be used only for this request, the *ask + * integer is set to '1', otherwise to '0'. This is used in order to + * let the caller know if we should reply with -MOVED or with -ASK. + * + * If the request contains more than a single key NULL is returned, + * however a request with more then a key argument where the key is always + * the same is valid, like in: RPOPLPUSH mylist mylist.*/ +clusterNode *getNodeByQuery(redisClient *c, struct redisCommand *cmd, robj **argv, int argc, int *hashslot, int *ask) { clusterNode *n = NULL; + robj *firstkey = NULL; multiState *ms, _ms; multiCmd mc; - int i; + int i, slot = 0; /* We handle all the cases as if they were EXEC commands, so we have * a common code path for everything */ @@ -1620,7 +1627,9 @@ clusterNode *getNodeByQuery(redisClient *c, struct redisCommand *cmd, robj **arg if (!(c->flags & REDIS_MULTI)) return server.cluster.myself; ms = &c->mstate; } else { - /* Create a fake Multi State structure, with just one command */ + /* In order to have a single codepath create a fake Multi State + * structure if the client is not in MULTI/EXEC state, this way + * we have a single codepath below. */ ms = &_ms; _ms.commands = &mc; _ms.count = 1; @@ -1629,6 +1638,8 @@ clusterNode *getNodeByQuery(redisClient *c, struct redisCommand *cmd, robj **arg mc.cmd = cmd; } + /* Check that all the keys are the same key, and get the slot and + * node for this key. */ for (i = 0; i < ms->count; i++) { struct redisCommand *mcmd; robj **margv; @@ -1641,24 +1652,48 @@ clusterNode *getNodeByQuery(redisClient *c, struct redisCommand *cmd, robj **arg keyindex = getKeysFromCommand(mcmd,margv,margc,&numkeys, REDIS_GETKEYS_PRELOAD); for (j = 0; j < numkeys; j++) { - int slot = keyHashSlot((char*)margv[keyindex[j]]->ptr, - sdslen(margv[keyindex[j]]->ptr)); - struct clusterNode *slotnode; + if (firstkey == NULL) { + /* This is the first key we see. Check what is the slot + * and node. */ + firstkey = margv[keyindex[j]]; - slotnode = server.cluster.slots[slot]; - if (hashslot) *hashslot = slot; - /* Node not assigned? (Should never happen actually - * if we reached this function). - * Different node than the previous one? - * Return NULL, the cluster can't serve multi-node requests */ - if (slotnode == NULL || (n && slotnode != n)) { - getKeysFreeResult(keyindex); - return NULL; + slot = keyHashSlot((char*)firstkey->ptr, sdslen(firstkey->ptr)); + n = server.cluster.slots[slot]; + redisAssert(n != NULL); } else { - n = slotnode; + /* If it is not the first key, make sure it is exactly + * the same key as the first we saw. */ + if (!equalStringObjects(firstkey,margv[keyindex[j]])) { + decrRefCount(firstkey); + getKeysFreeResult(keyindex); + return NULL; + } } } getKeysFreeResult(keyindex); } - return (n == NULL) ? server.cluster.myself : n; + if (ask) *ask = 0; /* This is the default. Set to 1 if needed later. */ + /* No key at all in command? then we can serve the request + * without redirections. */ + if (n == NULL) return server.cluster.myself; + if (hashslot) *hashslot = slot; + /* This request is about a slot we are migrating into another instance? + * Then we need to check if we have the key. If we have it we can reply. + * If instead is a new key, we pass the request to the node that is + * receiving the slot. */ + if (n == server.cluster.myself && + server.cluster.migrating_slots_to[slot] != NULL) + { + if (lookupKeyRead(&server.db[0],firstkey) == NULL) { + if (ask) *ask = 1; + return server.cluster.migrating_slots_to[slot]; + } + } + /* Handle the case in which we are receiving this hash slot from + * another instance, so we'll accept the query even if in the table + * it is assigned to a different node. */ + if (server.cluster.importing_slots_from[slot] != NULL) + return server.cluster.myself; + /* It's not a -ASK case. Base case: just return the right node. */ + return n; } diff --git a/src/redis.c b/src/redis.c index 96786df95..5ddbb177c 100644 --- a/src/redis.c +++ b/src/redis.c @@ -1096,13 +1096,15 @@ int processCommand(redisClient *c) { addReplyError(c,"The cluster is down. Check with CLUSTER INFO for more information"); return REDIS_OK; } else { - clusterNode *n = getNodeByQuery(c,cmd,c->argv,c->argc,&hashslot); + int ask; + clusterNode *n = getNodeByQuery(c,cmd,c->argv,c->argc,&hashslot,&ask); if (n == NULL) { - addReplyError(c,"Invalid cross-node request"); + addReplyError(c,"Multi keys request invalid in cluster"); return REDIS_OK; } else if (n != server.cluster.myself) { addReplySds(c,sdscatprintf(sdsempty(), - "-MOVED %d %s:%d\r\n",hashslot,n->ip,n->port)); + "-%s %d %s:%d\r\n", ask ? "ASK" : "MOVED", + hashslot,n->ip,n->port)); return REDIS_OK; } } diff --git a/src/redis.h b/src/redis.h index b6955805c..8098a8de7 100644 --- a/src/redis.h +++ b/src/redis.h @@ -1083,7 +1083,7 @@ unsigned int keyHashSlot(char *key, int keylen); clusterNode *createClusterNode(char *nodename, int flags); int clusterAddNode(clusterNode *node); void clusterCron(void); -clusterNode *getNodeByQuery(redisClient *c, struct redisCommand *cmd, robj **argv, int argc, int *hashslot); +clusterNode *getNodeByQuery(redisClient *c, struct redisCommand *cmd, robj **argv, int argc, int *hashslot, int *ask); /* Git SHA1 */ char *redisGitSHA1(void);