From 0701cad3de615ef2fbdda27514dfaa6e52734e73 Mon Sep 17 00:00:00 2001 From: antirez Date: Thu, 29 Mar 2018 15:13:31 +0200 Subject: [PATCH] Modules Cluster API: message bus implementation. --- src/cluster.c | 72 +++++++++++++++++++++++++++++++++++++++++++++++++++ src/cluster.h | 20 ++++++++++---- src/module.c | 23 ++++++++++++++++ src/server.h | 1 + 4 files changed, 111 insertions(+), 5 deletions(-) diff --git a/src/cluster.c b/src/cluster.c index d0f19bff4..0faa987e5 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -75,6 +75,7 @@ void clusterDelNode(clusterNode *delnode); sds representClusterNodeFlags(sds ci, uint16_t flags); uint64_t clusterGetMaxEpoch(void); int clusterBumpConfigEpochWithoutConsensus(void); +void moduleCallClusterReceivers(char *sender_id, uint64_t module_id, uint8_t type, const unsigned char *payload, uint32_t len); /* ----------------------------------------------------------------------------- * Initialization @@ -1682,6 +1683,12 @@ int clusterProcessPacket(clusterLink *link) { explen += sizeof(clusterMsgDataUpdate); if (totlen != explen) return 1; + } else if (type == CLUSTERMSG_TYPE_MODULE) { + uint32_t explen = sizeof(clusterMsg)-sizeof(union clusterMsgData); + + explen += sizeof(clusterMsgDataPublish) - + 3 + ntohl(hdr->data.module.msg.len); + if (totlen != explen) return 1; } /* Check if the sender is a known node. */ @@ -2076,6 +2083,15 @@ int clusterProcessPacket(clusterLink *link) { * config accordingly. */ clusterUpdateSlotsConfigWith(n,reportedConfigEpoch, hdr->data.update.nodecfg.slots); + } else if (type == CLUSTERMSG_TYPE_MODULE) { + if (!sender) return 1; /* Protect the module from unknown nodes. */ + /* We need to route this message back to the right module subscribed + * for the right message type. */ + uint64_t module_id = hdr->data.module.msg.module_id; /* Endian-safe ID */ + uint32_t len = ntohl(hdr->data.module.msg.len); + uint8_t type = hdr->data.module.msg.type; + unsigned char *payload = hdr->data.module.msg.bulk_data; + moduleCallClusterReceivers(sender->name,module_id,type,payload,len); } else { serverLog(LL_WARNING,"Received unknown packet type: %d", type); } @@ -2563,6 +2579,61 @@ void clusterSendUpdate(clusterLink *link, clusterNode *node) { clusterSendMessage(link,buf,ntohl(hdr->totlen)); } +/* Send a MODULE message. + * + * If link is NULL, then the message is broadcasted to the whole cluster. */ +void clusterSendModule(clusterLink *link, uint64_t module_id, uint8_t type, + unsigned char *payload, uint32_t len) { + unsigned char buf[sizeof(clusterMsg)], *heapbuf; + clusterMsg *hdr = (clusterMsg*) buf; + uint32_t totlen; + + clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_MODULE); + totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData); + totlen += sizeof(clusterMsgModule) - 3 + len; + + hdr->data.module.msg.module_id = module_id; /* Already endian adjusted. */ + hdr->data.module.msg.type = type; + hdr->data.module.msg.len = htonl(len); + hdr->totlen = htonl(totlen); + + /* Try to use the local buffer if possible */ + if (totlen < sizeof(buf)) { + heapbuf = buf; + } else { + heapbuf = zmalloc(totlen); + memcpy(heapbuf,hdr,sizeof(*hdr)); + hdr = (clusterMsg*) heapbuf; + } + memcpy(hdr->data.module.msg.bulk_data,payload,len); + + if (link) + clusterSendMessage(link,heapbuf,totlen); + else + clusterBroadcastMessage(heapbuf,totlen); + + if (heapbuf != buf) zfree(heapbuf); +} + +/* This function gets a cluster node ID string as target, the same way the nodes + * addresses are represented in the modules side, resolves the node, and sends + * the message. If the target is NULL the message is broadcasted. + * + * The function returns C_OK if the target is valid, otherwise C_ERR is + * returned. */ +int clusterSendModuleMessageToTarget(char *target, uint64_t module_id, uint8_t type, unsigned char *payload, uint32_t len) { + clusterNode *node = NULL; + + if (target != NULL) { + node = clusterLookupNode(target); + if (node == NULL || node->link == NULL) return C_ERR; + } + + clusterSendModule(target ? node->link : NULL, + module_id, type, payload, len); + return C_OK; +} + /* ----------------------------------------------------------------------------- * CLUSTER Pub/Sub support * @@ -4008,6 +4079,7 @@ const char *clusterGetMessageTypeString(int type) { case CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK: return "auth-ack"; case CLUSTERMSG_TYPE_UPDATE: return "update"; case CLUSTERMSG_TYPE_MFSTART: return "mfstart"; + case CLUSTERMSG_TYPE_MODULE: return "module"; } return "unknown"; } diff --git a/src/cluster.h b/src/cluster.h index f2b9a4ecf..4d4a4d60e 100644 --- a/src/cluster.h +++ b/src/cluster.h @@ -97,7 +97,8 @@ typedef struct clusterLink { #define CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK 6 /* Yes, you have my vote */ #define CLUSTERMSG_TYPE_UPDATE 7 /* Another node slots configuration */ #define CLUSTERMSG_TYPE_MFSTART 8 /* Pause clients for manual failover */ -#define CLUSTERMSG_TYPE_COUNT 9 /* Total number of message types. */ +#define CLUSTERMSG_TYPE_MODULE 9 /* Module cluster API message. */ +#define CLUSTERMSG_TYPE_COUNT 10 /* Total number of message types. */ /* This structure represent elements of node->fail_reports. */ typedef struct clusterNodeFailReport { @@ -195,10 +196,7 @@ typedef struct { typedef struct { uint32_t channel_len; uint32_t message_len; - /* We can't reclare bulk_data as bulk_data[] since this structure is - * nested. The 8 bytes are removed from the count during the message - * length computation. */ - unsigned char bulk_data[8]; + unsigned char bulk_data[8]; /* 8 bytes just as placeholder. */ } clusterMsgDataPublish; typedef struct { @@ -207,6 +205,13 @@ typedef struct { unsigned char slots[CLUSTER_SLOTS/8]; /* Slots bitmap. */ } clusterMsgDataUpdate; +typedef struct { + uint64_t module_id; /* ID of the sender module. */ + uint32_t len; /* ID of the sender module. */ + uint8_t type; /* Type from 0 to 255. */ + unsigned char bulk_data[3]; /* 3 bytes just as placeholder. */ +} clusterMsgModule; + union clusterMsgData { /* PING, MEET and PONG */ struct { @@ -228,6 +233,11 @@ union clusterMsgData { struct { clusterMsgDataUpdate nodecfg; } update; + + /* MODULE */ + struct { + clusterMsgModule msg; + } module; }; #define CLUSTER_PROTO_VER 1 /* Cluster bus protocol version. */ diff --git a/src/module.c b/src/module.c index e8af8e3ff..fc00e9f9e 100644 --- a/src/module.c +++ b/src/module.c @@ -3808,6 +3808,29 @@ void moduleUnsubscribeNotifications(RedisModule *module) { } } +/* -------------------------------------------------------------------------- + * Modules Cluster API + * -------------------------------------------------------------------------- */ + +typedef void (*RedisModuleClusterMessageReceiver)(RedisModuleCtx *ctx, char *sender_id, uint8_t type, const unsigned char *payload, uint32_t len); + +/* This structure identifies a registered caller: it must match a given module + * ID, for a given message type. The callback function is just the function + * that was registered as receiver. */ +struct moduleClusterReceiver { + uint64_t module_id; + uint8_t msg_type; + RedisModuleClusterMessageReceiver callback; + struct moduleClusterReceiver *next; +}; + +/* We have an array of message types: each bucket is a linked list of + * configured receivers. */ +static struct moduleClusterReceiver *clusterReceivers[UINT8_MAX]; + +void moduleCallClusterReceivers(char *sender_id, uint64_t module_id, uint8_t type, const unsigned char *payload, uint32_t len) { +} + /* -------------------------------------------------------------------------- * Modules API internals * -------------------------------------------------------------------------- */ diff --git a/src/server.h b/src/server.h index 155937536..cca56bc35 100644 --- a/src/server.h +++ b/src/server.h @@ -1808,6 +1808,7 @@ void clusterCron(void); void clusterPropagatePublish(robj *channel, robj *message); void migrateCloseTimedoutSockets(void); void clusterBeforeSleep(void); +int clusterSendModuleMessageToTarget(char *target, uint64_t module_id, uint8_t type, unsigned char *payload, uint32_t len); /* Sentinel */ void initSentinelConfig(void);