Cluster Manager: import command

This commit is contained in:
artix 2018-04-10 16:25:25 +02:00
parent 8969254e66
commit eaaa3202e6
2 changed files with 195 additions and 23 deletions

View File

@ -146,7 +146,7 @@ REDIS_SERVER_NAME=redis-server
REDIS_SENTINEL_NAME=redis-sentinel REDIS_SENTINEL_NAME=redis-sentinel
REDIS_SERVER_OBJ=adlist.o quicklist.o ae.o anet.o dict.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o crc16.o endianconv.o slowlog.o scripting.o bio.o rio.o rand.o memtest.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o redis-check-rdb.o redis-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.c REDIS_SERVER_OBJ=adlist.o quicklist.o ae.o anet.o dict.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o crc16.o endianconv.o slowlog.o scripting.o bio.o rio.o rand.o memtest.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o redis-check-rdb.o redis-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.c
REDIS_CLI_NAME=redis-cli REDIS_CLI_NAME=redis-cli
REDIS_CLI_OBJ=anet.o adlist.o dict.o redis-cli.o zmalloc.o release.o anet.o ae.o crc64.o siphash.o REDIS_CLI_OBJ=anet.o adlist.o dict.o redis-cli.o zmalloc.o release.o anet.o ae.o crc64.o siphash.o crc16.o
REDIS_BENCHMARK_NAME=redis-benchmark REDIS_BENCHMARK_NAME=redis-benchmark
REDIS_BENCHMARK_OBJ=ae.o anet.o redis-benchmark.o adlist.o zmalloc.o redis-benchmark.o REDIS_BENCHMARK_OBJ=ae.o anet.o redis-benchmark.o adlist.o zmalloc.o redis-benchmark.o
REDIS_CHECK_RDB_NAME=redis-check-rdb REDIS_CHECK_RDB_NAME=redis-check-rdb

View File

@ -74,7 +74,7 @@
#define CLUSTER_MANAGER_REBALANCE_THRESHOLD 2 #define CLUSTER_MANAGER_REBALANCE_THRESHOLD 2
#define CLUSTER_MANAGER_INVALID_HOST_ARG \ #define CLUSTER_MANAGER_INVALID_HOST_ARG \
"Invalid arguments: you need to pass either a valid " \ "[ERR] Invalid arguments: you need to pass either a valid " \
"address (ie. 120.0.0.1:7000) or space separated IP " \ "address (ie. 120.0.0.1:7000) or space separated IP " \
"and port (ie. 120.0.0.1 7000)\n" "and port (ie. 120.0.0.1 7000)\n"
#define CLUSTER_MANAGER_MODE() (config.cluster_manager_command.name != NULL) #define CLUSTER_MANAGER_MODE() (config.cluster_manager_command.name != NULL)
@ -115,7 +115,9 @@
#define CLUSTER_MANAGER_CMD_FLAG_AUTOWEIGHTS 1 << 3 #define CLUSTER_MANAGER_CMD_FLAG_AUTOWEIGHTS 1 << 3
#define CLUSTER_MANAGER_CMD_FLAG_EMPTYMASTER 1 << 4 #define CLUSTER_MANAGER_CMD_FLAG_EMPTYMASTER 1 << 4
#define CLUSTER_MANAGER_CMD_FLAG_SIMULATE 1 << 5 #define CLUSTER_MANAGER_CMD_FLAG_SIMULATE 1 << 5
#define CLUSTER_MANAGER_CMD_FLAG_COLOR 1 << 7 #define CLUSTER_MANAGER_CMD_FLAG_REPLACE 1 << 6
#define CLUSTER_MANAGER_CMD_FLAG_COPY 1 << 7
#define CLUSTER_MANAGER_CMD_FLAG_COLOR 1 << 8
#define CLUSTER_MANAGER_OPT_GETFRIENDS 1 << 0 #define CLUSTER_MANAGER_OPT_GETFRIENDS 1 << 0
#define CLUSTER_MANAGER_OPT_COLD 1 << 1 #define CLUSTER_MANAGER_OPT_COLD 1 << 1
@ -237,6 +239,8 @@ static long getLongInfoField(char *info, char *field);
* Utility functions * Utility functions
*--------------------------------------------------------------------------- */ *--------------------------------------------------------------------------- */
uint16_t crc16(const char *buf, int len);
static long long ustime(void) { static long long ustime(void) {
struct timeval tv; struct timeval tv;
long long ust; long long ust;
@ -1325,6 +1329,12 @@ static int parseOptions(int argc, char **argv) {
} else if (!strcmp(argv[i],"--cluster-simulate")) { } else if (!strcmp(argv[i],"--cluster-simulate")) {
config.cluster_manager_command.flags |= config.cluster_manager_command.flags |=
CLUSTER_MANAGER_CMD_FLAG_SIMULATE; CLUSTER_MANAGER_CMD_FLAG_SIMULATE;
} else if (!strcmp(argv[i],"--cluster-replace")) {
config.cluster_manager_command.flags |=
CLUSTER_MANAGER_CMD_FLAG_REPLACE;
} else if (!strcmp(argv[i],"--cluster-copy")) {
config.cluster_manager_command.flags |=
CLUSTER_MANAGER_CMD_FLAG_COPY;
} else if (!strcmp(argv[i],"--cluster-use-empty-masters")) { } else if (!strcmp(argv[i],"--cluster-use-empty-masters")) {
config.cluster_manager_command.flags |= config.cluster_manager_command.flags |=
CLUSTER_MANAGER_CMD_FLAG_EMPTYMASTER; CLUSTER_MANAGER_CMD_FLAG_EMPTYMASTER;
@ -1870,6 +1880,7 @@ static int clusterManagerCommandCheck(int argc, char **argv);
static int clusterManagerCommandFix(int argc, char **argv); static int clusterManagerCommandFix(int argc, char **argv);
static int clusterManagerCommandReshard(int argc, char **argv); static int clusterManagerCommandReshard(int argc, char **argv);
static int clusterManagerCommandRebalance(int argc, char **argv); static int clusterManagerCommandRebalance(int argc, char **argv);
static int clusterManagerCommandImport(int argc, char **argv);
static int clusterManagerCommandCall(int argc, char **argv); static int clusterManagerCommandCall(int argc, char **argv);
static int clusterManagerCommandHelp(int argc, char **argv); static int clusterManagerCommandHelp(int argc, char **argv);
@ -1892,6 +1903,8 @@ clusterManagerCommandDef clusterManagerCommands[] = {
{"rebalance", clusterManagerCommandRebalance, -1, "host:port", {"rebalance", clusterManagerCommandRebalance, -1, "host:port",
"weight <node1=w1...nodeN=wN>,use-empty-masters," "weight <node1=w1...nodeN=wN>,use-empty-masters,"
"timeout <arg>,simulate,pipeline <arg>,threshold <arg>"}, "timeout <arg>,simulate,pipeline <arg>,threshold <arg>"},
{"import", clusterManagerCommandImport, 1, "host:port",
"from <arg>,copy,replace"},
{"call", clusterManagerCommandCall, -2, {"call", clusterManagerCommandCall, -2,
"host:port command arg arg .. arg", NULL}, "host:port command arg arg .. arg", NULL},
{"help", clusterManagerCommandHelp, 0, NULL, NULL} {"help", clusterManagerCommandHelp, 0, NULL, NULL}
@ -2383,6 +2396,37 @@ static sds clusterManagerNodeSlotsString(clusterManagerNode *node) {
return slots; return slots;
} }
/* -----------------------------------------------------------------------------
* Key space handling
* -------------------------------------------------------------------------- */
/* We have 16384 hash slots. The hash slot of a given key is obtained
* as the least significant 14 bits of the crc16 of the key.
*
* However if the key contains the {...} pattern, only the part between
* { and } is hashed. This may be useful in the future to force certain
* keys to be in the same node (assuming no resharding is in progress). */
static unsigned int keyHashSlot(char *key, int keylen) {
int s, e; /* start-end indexes of { and } */
for (s = 0; s < keylen; s++)
if (key[s] == '{') break;
/* No '{' ? Hash the whole key. This is the base case. */
if (s == keylen) return crc16(key,keylen) & 0x3FFF;
/* '{' found? Check if we have the corresponding '}'. */
for (e = s+1; e < keylen; e++)
if (key[e] == '}') break;
/* No '}' or nothing between {} ? Hash the whole key. */
if (e == keylen || e == s+1) return crc16(key,keylen) & 0x3FFF;
/* If we are here there is both a { and a } on its right. Hash
* what is in the middle between { and }. */
return crc16(key+s+1,e-s-1) & 0x3FFF;
}
static sds clusterManagerNodeInfo(clusterManagerNode *node, int indent) { static sds clusterManagerNodeInfo(clusterManagerNode *node, int indent) {
sds info = sdsempty(); sds info = sdsempty();
sds spaces = sdsempty(); sds spaces = sdsempty();
@ -3533,8 +3577,8 @@ static int clusterManagerFixOpenSlot(int slot) {
} }
// Use ADDSLOTS to assign the slot. // Use ADDSLOTS to assign the slot.
printf("*** Configuring %s:%d as the slot owner\n", owner->ip, clusterManagerLogWarn("*** Configuring %s:%d as the slot owner\n",
owner->port); owner->ip, owner->port);
redisReply *reply = CLUSTER_MANAGER_COMMAND(owner, "CLUSTER " redisReply *reply = CLUSTER_MANAGER_COMMAND(owner, "CLUSTER "
"SETSLOT %d %s", "SETSLOT %d %s",
slot, "STABLE"); slot, "STABLE");
@ -4527,7 +4571,7 @@ static int clusterManagerCommandRebalance(int argc, char **argv) {
if (over_threshold) threshold_reached = 1; if (over_threshold) threshold_reached = 1;
} }
if (!threshold_reached) { if (!threshold_reached) {
clusterManagerLogErr("*** No rebalancing needed! " clusterManagerLogWarn("*** No rebalancing needed! "
"All nodes are within the %.2f%% threshold.\n", "All nodes are within the %.2f%% threshold.\n",
config.cluster_manager_command.threshold); config.cluster_manager_command.threshold);
result = 0; result = 0;
@ -4586,7 +4630,7 @@ static int clusterManagerCommandRebalance(int argc, char **argv) {
listRelease(lsrc); listRelease(lsrc);
int table_len = (int) listLength(table); int table_len = (int) listLength(table);
if (!table || table_len != numslots) { if (!table || table_len != numslots) {
clusterManagerLogErr("*** Assertio failed: Reshard table " clusterManagerLogErr("*** Assertion failed: Reshard table "
"!= number of slots"); "!= number of slots");
result = 0; result = 0;
goto end_move; goto end_move;
@ -4629,23 +4673,148 @@ invalid_args:
return 0; return 0;
} }
static int clusterManagerCommandCall(int argc, char **argv) { static int clusterManagerCommandImport(int argc, char **argv) {
int port = 0; int success = 1;
char *ip = NULL; int port = 0, src_port = 0;
char *addr = argv[0]; char *ip = NULL, *src_ip = NULL;
char *c = strrchr(addr, '@'); char *invalid_args_msg = NULL;
int i; if (!getClusterHostFromCmdArgs(argc, argv, &ip, &port)) {
if (c != NULL) *c = '\0'; invalid_args_msg = CLUSTER_MANAGER_INVALID_HOST_ARG;
c = strrchr(addr, ':'); goto invalid_args;
if (c != NULL) {
*c = '\0';
ip = addr;
port = atoi(++c);
} else {
fprintf(stderr,
"Invalid arguments: first agrumnt must be host:port.\n");
return 0;
} }
if (config.cluster_manager_command.from == NULL) {
invalid_args_msg = "[ERR] Option '--cluster-from' is required for "
"subcommand 'import'.\n";
goto invalid_args;
}
char *src_host[] = {config.cluster_manager_command.from};
if (!getClusterHostFromCmdArgs(1, src_host, &src_ip, &src_port)) {
invalid_args_msg = "[ERR] Invalid --cluster-from host. You need to "
"pass a valid address (ie. 120.0.0.1:7000).\n";
goto invalid_args;
}
clusterManagerLogInfo(">>> Importing data from %s:%d to cluster %s:%d\n",
src_ip, src_port, ip, port);
clusterManagerNode *refnode = clusterManagerNewNode(ip, port);
if (!clusterManagerLoadInfoFromNode(refnode, 0)) return 0;
char *reply_err = NULL;
redisReply *src_reply = NULL;
// Connect to the source node.
redisContext *src_ctx = redisConnect(src_ip, src_port);
if (src_ctx->err) {
success = 0;
fprintf(stderr,"Could not connect to Redis at %s:%d: %s.\n", src_ip,
src_port, src_ctx->errstr);
goto cleanup;
}
src_reply = reconnectingRedisCommand(src_ctx, "INFO");
if (!src_reply || src_reply->type == REDIS_REPLY_ERROR) {
if (src_reply && src_reply->str) reply_err = src_reply->str;
success = 0;
goto cleanup;
}
if (getLongInfoField(src_reply->str, "cluster_enabled")) {
clusterManagerLogErr("[ERR] The source node should not be a "
"cluster node.\n");
success = 0;
goto cleanup;
}
freeReplyObject(src_reply);
src_reply = reconnectingRedisCommand(src_ctx, "DBSIZE");
if (!src_reply || src_reply->type == REDIS_REPLY_ERROR) {
if (src_reply && src_reply->str) reply_err = src_reply->str;
success = 0;
goto cleanup;
}
int size = src_reply->integer, i;
clusterManagerLogWarn("*** Importing %d keys from DB 0\n", size);
// Build a slot -> node map
clusterManagerNode *slots_map[CLUSTER_MANAGER_SLOTS];
memset(slots_map, 0, sizeof(slots_map) / sizeof(clusterManagerNode *));
listIter li;
listNode *ln;
for (i = 0; i < CLUSTER_MANAGER_SLOTS; i++) {
listRewind(cluster_manager.nodes, &li);
while ((ln = listNext(&li)) != NULL) {
clusterManagerNode *n = ln->value;
if (n->flags & CLUSTER_MANAGER_FLAG_SLAVE) continue;
if (n->slots_count == 0) continue;
if (n->slots[i]) {
slots_map[i] = n;
break;
}
}
}
char cmdfmt[50] = "MIGRATE %s %d %s %d %d";
if (config.cluster_manager_command.flags & CLUSTER_MANAGER_CMD_FLAG_COPY)
strcat(cmdfmt, " %s");
if (config.cluster_manager_command.flags & CLUSTER_MANAGER_CMD_FLAG_REPLACE)
strcat(cmdfmt, " %s");
/* Use SCAN to iterate over the keys, migrating to the
* right node as needed. */
int cursor = -999, timeout = config.cluster_manager_command.timeout;
while (cursor != 0) {
if (cursor < 0) cursor = 0;
freeReplyObject(src_reply);
src_reply = reconnectingRedisCommand(src_ctx, "SCAN %d COUNT %d",
cursor, 1000);
if (!src_reply || src_reply->type == REDIS_REPLY_ERROR) {
if (src_reply && src_reply->str) reply_err = src_reply->str;
success = 0;
goto cleanup;
}
assert(src_reply->type == REDIS_REPLY_ARRAY);
assert(src_reply->elements >= 2);
assert(src_reply->element[1]->type == REDIS_REPLY_ARRAY);
if (src_reply->element[0]->type == REDIS_REPLY_STRING)
cursor = atoi(src_reply->element[0]->str);
else if (src_reply->element[0]->type == REDIS_REPLY_INTEGER)
cursor = src_reply->element[0]->integer;
int keycount = src_reply->element[1]->elements;
for (i = 0; i < keycount; i++) {
redisReply *kr = src_reply->element[1]->element[i];
assert(kr->type == REDIS_REPLY_STRING);
char *key = kr->str;
uint16_t slot = keyHashSlot(key, kr->len);
clusterManagerNode *target = slots_map[slot];
printf("Migrating %s to %s:%d: ", key, target->ip, target->port);
redisReply *r = reconnectingRedisCommand(src_ctx, cmdfmt,
target->ip, target->port,
key, 0, timeout,
"COPY", "REPLACE");
if (!r || r->type == REDIS_REPLY_ERROR) {
if (r && r->str) {
clusterManagerLogErr("Source %s:%d replied with "
"error:\n%s\n", src_ip, src_port,
r->str);
}
success = 0;
}
freeReplyObject(r);
if (!success) goto cleanup;
clusterManagerLogOk("OK\n");
}
}
cleanup:
if (reply_err)
clusterManagerLogErr("Source %s:%d replied with error:\n%s\n",
src_ip, src_port, reply_err);
if (src_ctx) redisFree(src_ctx);
if (src_reply) freeReplyObject(src_reply);
return success;
invalid_args:
fprintf(stderr, "%s", invalid_args_msg);
return 0;
}
static int clusterManagerCommandCall(int argc, char **argv) {
int port = 0, i;
char *ip = NULL;
if (!getClusterHostFromCmdArgs(argc, argv, &ip, &port)) goto invalid_args;
clusterManagerNode *refnode = clusterManagerNewNode(ip, port); clusterManagerNode *refnode = clusterManagerNewNode(ip, port);
if (!clusterManagerLoadInfoFromNode(refnode, 0)) return 0; if (!clusterManagerLoadInfoFromNode(refnode, 0)) return 0;
argc--; argc--;
@ -4677,6 +4846,9 @@ static int clusterManagerCommandCall(int argc, char **argv) {
} }
zfree(argvlen); zfree(argvlen);
return 1; return 1;
invalid_args:
fprintf(stderr, CLUSTER_MANAGER_INVALID_HOST_ARG);
return 0;
} }
static int clusterManagerCommandHelp(int argc, char **argv) { static int clusterManagerCommandHelp(int argc, char **argv) {