improve malloc efficiency for cluster slots_info_pairs (#10488)

This commit improve malloc efficiency of the slots_info_pairs mechanism in cluster.c
by changing adlist into an array being realloced with greedy growth mechanism

Recently the cluster tests are consistently failing when executed with ASAN in the CI.
I tried to track down the commit that started it, and it appears to be #10293.
Looking at the commit, i realize it didn't affect this test / flow, other than the
replacement of the slots_info_pairs from sds to list.

I concluded that what could be happening is that the slot range is very fragmented,
and that results in many allocations.
with sds, it results in one allocation and also, we have a greedy growth mechanism,
but with adlist, we just have many many small allocations.
this probably causes stress on ASAN, and causes it to be slow at termination.
This commit is contained in:
Oran Agra 2022-03-29 10:05:06 +03:00 committed by GitHub
parent 14b198868f
commit 3b1e65a32b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 36 additions and 41 deletions

View File

@ -74,7 +74,8 @@ void clusterCloseAllSlots(void);
void clusterSetNodeAsMaster(clusterNode *n);
void clusterDelNode(clusterNode *delnode);
sds representClusterNodeFlags(sds ci, uint16_t flags);
sds representSlotInfo(sds ci, list *slot_info_pairs);
sds representSlotInfo(sds ci, uint16_t *slot_info_pairs, int slot_info_pairs_count);
void clusterFreeNodesSlotsInfo(clusterNode *n);
uint64_t clusterGetMaxEpoch(void);
int clusterBumpConfigEpochWithoutConsensus(void);
void moduleCallClusterReceivers(const char *sender_id, uint64_t module_id, uint8_t type, const unsigned char *payload, uint32_t len);
@ -944,6 +945,8 @@ clusterNode *createClusterNode(char *nodename, int flags) {
node->flags = flags;
memset(node->slots,0,sizeof(node->slots));
node->slot_info_pairs = NULL;
node->slot_info_pairs_count = 0;
node->slot_info_pairs_alloc = 0;
node->numslots = 0;
node->numslaves = 0;
node->slaves = NULL;
@ -4565,16 +4568,10 @@ sds representClusterNodeFlags(sds ci, uint16_t flags) {
/* Concatenate the slot ownership information to the given SDS string 'ci'.
* If the slot ownership is in a contiguous block, it's represented as start-end pair,
* else each slot is added separately. */
sds representSlotInfo(sds ci, list *slot_info_pairs) {
listIter li;
listNode *ln;
listRewind(slot_info_pairs, &li);
while((ln = listNext(&li))) {
unsigned long start = (unsigned long)ln->value;
ln = listNext(&li);
/* List should have even number of elements */
serverAssert(ln != NULL);
unsigned long end = (unsigned long)ln->value;
sds representSlotInfo(sds ci, uint16_t *slot_info_pairs, int slot_info_pairs_count) {
for (int i = 0; i< slot_info_pairs_count; i+=2) {
unsigned long start = slot_info_pairs[i];
unsigned long end = slot_info_pairs[i+1];
if (start == end) {
ci = sdscatfmt(ci, " %i", start);
} else {
@ -4633,7 +4630,7 @@ sds clusterGenNodeDescription(clusterNode *node, int use_pport) {
/* Slots served by this instance. If we already have slots info,
* append it directly, otherwise, generate slots only if it has. */
if (node->slot_info_pairs) {
ci = representSlotInfo(ci, node->slot_info_pairs);
ci = representSlotInfo(ci, node->slot_info_pairs, node->slot_info_pairs_count);
} else if (node->numslots > 0) {
start = -1;
for (j = 0; j < CLUSTER_SLOTS; j++) {
@ -4693,11 +4690,15 @@ void clusterGenNodesSlotsInfo(int filter) {
* or end of slot. */
if (i == CLUSTER_SLOTS || n != server.cluster->slots[i]) {
if (!(n->flags & filter)) {
if (n->slot_info_pairs == NULL) {
n->slot_info_pairs = listCreate();
if (n->slot_info_pairs_count+2 > n->slot_info_pairs_alloc) {
if (n->slot_info_pairs_alloc == 0)
n->slot_info_pairs_alloc = 8;
else
n->slot_info_pairs_alloc *= 2;
n->slot_info_pairs = zrealloc(n->slot_info_pairs, n->slot_info_pairs_alloc * sizeof(uint16_t));
}
listAddNodeTail(n->slot_info_pairs, (void *)(unsigned long)start);
listAddNodeTail(n->slot_info_pairs, (void *)(unsigned long)(i-1));
n->slot_info_pairs[n->slot_info_pairs_count++] = start;
n->slot_info_pairs[n->slot_info_pairs_count++] = i-1;
}
if (i == CLUSTER_SLOTS) break;
n = server.cluster->slots[i];
@ -4706,6 +4707,13 @@ void clusterGenNodesSlotsInfo(int filter) {
}
}
void clusterFreeNodesSlotsInfo(clusterNode *n) {
zfree(n->slot_info_pairs);
n->slot_info_pairs = NULL;
n->slot_info_pairs_count = 0;
n->slot_info_pairs_alloc = 0;
}
/* Generate a csv-alike representation of the nodes we are aware of,
* including the "myself" node, and return an SDS string containing the
* representation (it is up to the caller to free it).
@ -4740,10 +4748,7 @@ sds clusterGenNodesDescription(int filter, int use_pport) {
ci = sdscatlen(ci,"\n",1);
/* Release slots info. */
if (node->slot_info_pairs != NULL) {
listRelease(node->slot_info_pairs);
node->slot_info_pairs = NULL;
}
clusterFreeNodesSlotsInfo(node);
}
dictReleaseIterator(di);
return ci;
@ -5033,22 +5038,14 @@ void addNodeDetailsToShardReply(client *c, clusterNode *node) {
}
/* Add the shard reply of a single shard based off the given primary node. */
void addShardReplyForClusterShards(client *c, clusterNode *node, list *slot_info_pairs) {
void addShardReplyForClusterShards(client *c, clusterNode *node, uint16_t *slot_info_pairs, int slot_pairs_count) {
addReplyMapLen(c, 2);
addReplyBulkCString(c, "slots");
uint16_t slot_pair_count = 0;
if (slot_info_pairs) {
slot_pair_count = listLength(slot_info_pairs);
serverAssert((slot_pair_count % 2) == 0);
addReplyArrayLen(c, slot_pair_count);
listIter li;
listRewind(slot_info_pairs, &li);
listNode *ln;
while((ln = listNext(&li))) {
addReplyBulkLongLong(c, (unsigned long)listNodeValue(ln));
ln = listNext(&li);
addReplyBulkLongLong(c, (unsigned long)listNodeValue(ln));
}
serverAssert((slot_pairs_count % 2) == 0);
addReplyArrayLen(c, slot_pairs_count);
for (int i = 0; i < slot_pairs_count; i++)
addReplyBulkLongLong(c, (unsigned long)slot_info_pairs[i]);
} else {
/* If no slot info pair is provided, the node owns no slots */
addReplyArrayLen(c, 0);
@ -5090,17 +5087,13 @@ void clusterReplyShards(client *c) {
if (nodeIsSlave(n)) {
/* You can force a replica to own slots, even though it'll get reverted,
* so freeing the slot pair here just in case. */
if (n->slot_info_pairs) listRelease(n->slot_info_pairs);
n->slot_info_pairs = NULL;
clusterFreeNodesSlotsInfo(n);
continue;
}
shard_count++;
/* n->slot_info_pairs is set to NULL when the the node owns no slots. */
addShardReplyForClusterShards(c, n, n->slot_info_pairs);
if (n->slot_info_pairs) {
listRelease(n->slot_info_pairs);
n->slot_info_pairs = NULL;
}
addShardReplyForClusterShards(c, n, n->slot_info_pairs, n->slot_info_pairs_count);
clusterFreeNodesSlotsInfo(n);
}
dictReleaseIterator(di);
setDeferredArrayLen(c, shard_replylen, shard_count);

View File

@ -118,7 +118,9 @@ typedef struct clusterNode {
int flags; /* CLUSTER_NODE_... */
uint64_t configEpoch; /* Last configEpoch observed for this node */
unsigned char slots[CLUSTER_SLOTS/8]; /* slots handled by this node */
list *slot_info_pairs; /* Slots info represented as (start/end) pair (consecutive index). */
uint16_t *slot_info_pairs; /* Slots info represented as (start/end) pair (consecutive index). */
int slot_info_pairs_count; /* Used number of slots in slot_info_pairs */
int slot_info_pairs_alloc; /* Allocated number of slots in slot_info_pairs */
int numslots; /* Number of slots handled by this node */
int numslaves; /* Number of slave nodes, if this is a master */
struct clusterNode **slaves; /* pointers to slave nodes */