ClusterManager: added replicas count to clusterManagerNode

This commit is contained in:
artix 2018-02-07 12:02:56 +01:00
parent c002b95d89
commit 2f48d62423

View File

@ -223,9 +223,11 @@ typedef struct clusterManagerNode {
time_t ping_recv; time_t ping_recv;
int flags; int flags;
sds replicate; sds replicate;
list replicas;
int dirty; int dirty;
uint8_t slots[CLUSTER_MANAGER_SLOTS]; uint8_t slots[CLUSTER_MANAGER_SLOTS];
int slots_count; int slots_count;
int replicas_count;
list *friends; list *friends;
sds *migrating; sds *migrating;
sds *importing; sds *importing;
@ -250,6 +252,7 @@ static dictType clusterManagerDictType = {
}; };
static clusterManagerNode *clusterManagerNewNode(char *ip, int port); static clusterManagerNode *clusterManagerNewNode(char *ip, int port);
static clusterManagerNode *clusterManagerNodeByName(const char *name);
static int clusterManagerNodeIsCluster(clusterManagerNode *node, char **err); static int clusterManagerNodeIsCluster(clusterManagerNode *node, char **err);
static int clusterManagerNodeLoadInfo(clusterManagerNode *node, int opts, static int clusterManagerNodeLoadInfo(clusterManagerNode *node, int opts,
char **err); char **err);
@ -265,6 +268,7 @@ static void clusterManagerShowInfo(void);
static int clusterManagerFlushNodeConfig(clusterManagerNode *node, char **err); static int clusterManagerFlushNodeConfig(clusterManagerNode *node, char **err);
static void clusterManagerWaitForClusterJoin(void); static void clusterManagerWaitForClusterJoin(void);
static void clusterManagerCheckCluster(int quiet); static void clusterManagerCheckCluster(int quiet);
typedef int clusterManagerCommandProc(int argc, char **argv); typedef int clusterManagerCommandProc(int argc, char **argv);
typedef struct clusterManagerCommandDef { typedef struct clusterManagerCommandDef {
char *name; char *name;
@ -1890,10 +1894,31 @@ static clusterManagerNode *clusterManagerNewNode(char *ip, int port) {
node->importing = NULL; node->importing = NULL;
node->migrating_count = 0; node->migrating_count = 0;
node->importing_count = 0; node->importing_count = 0;
node->replicas_count = 0;
CLUSTER_MANAGER_RESET_SLOTS(node); CLUSTER_MANAGER_RESET_SLOTS(node);
return node; return node;
} }
static clusterManagerNode *clusterManagerNodeByName(const char *name) {
if (cluster_manager.nodes == NULL) return NULL;
clusterManagerNode *found = NULL;
sds lcname = sdsempty();
lcname = sdscpy(lcname, name);
sdstolower(lcname);
listIter li;
listNode *ln;
listRewind(cluster_manager.nodes, &li);
while ((ln = listNext(&li)) != NULL) {
clusterManagerNode *n = ln->value;
if (n->name && !sdscmp(n->name, lcname)) {
found = n;
break;
}
}
sdsfree(lcname);
return found;
}
static int clusterManagerNodeIsCluster(clusterManagerNode *node, char **err) { static int clusterManagerNodeIsCluster(clusterManagerNode *node, char **err) {
redisReply *info = CLUSTER_MANAGER_NODE_INFO(node); redisReply *info = CLUSTER_MANAGER_NODE_INFO(node);
int is_err = 0; int is_err = 0;
@ -2119,7 +2144,9 @@ static sds clusterManagerNodeInfo(clusterManagerNode *node) {
} }
if (node->replicate != NULL) if (node->replicate != NULL)
info = sdscatfmt(info, "\n replicates %S", node->replicate); info = sdscatfmt(info, "\n replicates %S", node->replicate);
//else if () {} //TODO: add replicas info else if (node->replicas_count)
info = sdscatfmt(info, "\n %U additional replica(s)",
node->replicas_count);
return info; return info;
} }
@ -2485,6 +2512,18 @@ invalid_friend:
listRelease(node->friends); listRelease(node->friends);
node->friends = NULL; node->friends = NULL;
} }
// Count replicas for each node
listRewind(cluster_manager.nodes, &li);
while ((ln = listNext(&li)) != NULL) {
clusterManagerNode *n = ln->value;
if (n->replicate != NULL) {
clusterManagerNode *master = clusterManagerNodeByName(n->replicate);
if (master == NULL) {
printf("*** WARNING: %s:%d claims to be slave of unknown "
"node ID %s.\n", n->ip, n->port, n->replicate);
} else master->replicas_count++;
}
}
return 1; return 1;
} }