propagate PUBLISH messages using the redis cluster nodes bus. Still need to process the incoming packets of that type. Work in progress.

This commit is contained in:
antirez 2011-10-07 15:37:34 +02:00
parent 623131d408
commit c563ce463b
3 changed files with 82 additions and 12 deletions

View File

@ -666,7 +666,7 @@ int clusterProcessPacket(clusterLink *link) {
clusterSaveConfigOrDie(); clusterSaveConfigOrDie();
} }
} else { } else {
redisLog(REDIS_NOTICE,"Received unknown packet type: %d", type); redisLog(REDIS_WARNING,"Received unknown packet type: %d", type);
} }
return 1; return 1;
} }
@ -759,6 +759,22 @@ void clusterSendMessage(clusterLink *link, unsigned char *msg, size_t msglen) {
link->sndbuf = sdscatlen(link->sndbuf, msg, msglen); link->sndbuf = sdscatlen(link->sndbuf, msg, msglen);
} }
/* Send a message to all the nodes with a reliable link */
void clusterBroadcastMessage(void *buf, size_t len) {
dictIterator *di;
dictEntry *de;
di = dictGetIterator(server.cluster.nodes);
while((de = dictNext(di)) != NULL) {
clusterNode *node = dictGetEntryVal(de);
if (!node->link) continue;
if (node->flags & (REDIS_NODE_MYSELF|REDIS_NODE_NOADDR)) continue;
clusterSendMessage(node->link,buf,len);
}
dictReleaseIterator(di);
}
/* Build the message header */ /* Build the message header */
void clusterBuildMessageHdr(clusterMsg *hdr, int type) { void clusterBuildMessageHdr(clusterMsg *hdr, int type) {
int totlen; int totlen;
@ -843,20 +859,48 @@ void clusterSendPing(clusterLink *link, int type) {
clusterSendMessage(link,buf,totlen); clusterSendMessage(link,buf,totlen);
} }
/* Send a message to all the nodes with a reliable link */ /* Send a PUBLISH message.
void clusterBroadcastMessage(void *buf, size_t len) { *
dictIterator *di; * If link is NULL, then the message is broadcasted to the whole cluster. */
dictEntry *de; void clusterSendPublish(clusterLink *link, robj *channel, robj *message) {
unsigned char buf[4096], *payload;
clusterMsg *hdr = (clusterMsg*) buf;
uint32_t totlen;
uint32_t channel_len, message_len;
di = dictGetIterator(server.cluster.nodes); channel = getDecodedObject(channel);
while((de = dictNext(di)) != NULL) { message = getDecodedObject(message);
clusterNode *node = dictGetEntryVal(de); channel_len = sdslen(channel->ptr);
message_len = sdslen(message->ptr);
if (!node->link) continue; clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_PUBLISH);
if (node->flags & (REDIS_NODE_MYSELF|REDIS_NODE_NOADDR)) continue; totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
clusterSendMessage(node->link,buf,len); totlen += sizeof(clusterMsgDataPublish) + channel_len + message_len;
hdr->data.publish.msg.channel_len = htonl(channel_len);
hdr->data.publish.msg.message_len = htonl(message_len);
hdr->totlen = htonl(totlen);
/* Try to use the local buffer if possible */
if (totlen < sizeof(buf)) {
payload = buf;
} else {
payload = zmalloc(totlen);
hdr = (clusterMsg*) payload;
memcpy(payload,hdr,sizeof(hdr));
} }
dictReleaseIterator(di); memcpy(hdr->data.publish.msg.bulk_data,channel->ptr,sdslen(channel->ptr));
memcpy(hdr->data.publish.msg.bulk_data+sdslen(channel->ptr),
message->ptr,sdslen(message->ptr));
if (link)
clusterSendMessage(link,payload,totlen);
else
clusterBroadcastMessage(payload,totlen);
decrRefCount(channel);
decrRefCount(message);
if (payload != buf) zfree(payload);
} }
/* Send a FAIL message to all the nodes we are able to contact. /* Send a FAIL message to all the nodes we are able to contact.
@ -873,6 +917,17 @@ void clusterSendFail(char *nodename) {
clusterBroadcastMessage(buf,ntohl(hdr->totlen)); clusterBroadcastMessage(buf,ntohl(hdr->totlen));
} }
/* -----------------------------------------------------------------------------
* CLUSTER Pub/Sub support
*
* For now we do very little, just propagating PUBLISH messages across the whole
* cluster. In the future we'll try to get smarter and avoiding propagating those
* messages to hosts without receives for a given channel.
* -------------------------------------------------------------------------- */
void clusterPropagatePublish(robj *channel, robj *message) {
clusterSendPublish(NULL, channel, message);
}
/* ----------------------------------------------------------------------------- /* -----------------------------------------------------------------------------
* CLUSTER cron job * CLUSTER cron job
* -------------------------------------------------------------------------- */ * -------------------------------------------------------------------------- */

View File

@ -263,5 +263,6 @@ void punsubscribeCommand(redisClient *c) {
void publishCommand(redisClient *c) { void publishCommand(redisClient *c) {
int receivers = pubsubPublishMessage(c->argv[1],c->argv[2]); int receivers = pubsubPublishMessage(c->argv[1],c->argv[2]);
if (server.cluster_enabled) clusterPropagatePublish(c->argv[1],c->argv[2]);
addReplyLongLong(c,receivers); addReplyLongLong(c,receivers);
} }

View File

@ -445,6 +445,7 @@ typedef struct {
#define CLUSTERMSG_TYPE_PONG 1 /* Pong (reply to Ping) */ #define CLUSTERMSG_TYPE_PONG 1 /* Pong (reply to Ping) */
#define CLUSTERMSG_TYPE_MEET 2 /* Meet "let's join" message */ #define CLUSTERMSG_TYPE_MEET 2 /* Meet "let's join" message */
#define CLUSTERMSG_TYPE_FAIL 3 /* Mark node xxx as failing */ #define CLUSTERMSG_TYPE_FAIL 3 /* Mark node xxx as failing */
#define CLUSTERMSG_TYPE_PUBLISH 4 /* Pub/Sub Publish propatagion */
/* Initially we don't know our "name", but we'll find it once we connect /* Initially we don't know our "name", but we'll find it once we connect
* to the first node, using the getsockname() function. Then we'll use this * to the first node, using the getsockname() function. Then we'll use this
@ -463,16 +464,28 @@ typedef struct {
char nodename[REDIS_CLUSTER_NAMELEN]; char nodename[REDIS_CLUSTER_NAMELEN];
} clusterMsgDataFail; } clusterMsgDataFail;
typedef struct {
uint32_t channel_len;
uint32_t message_len;
unsigned char bulk_data[8]; /* defined as 8 just for alignment concerns. */
} clusterMsgDataPublish;
union clusterMsgData { union clusterMsgData {
/* PING, MEET and PONG */ /* PING, MEET and PONG */
struct { struct {
/* Array of N clusterMsgDataGossip structures */ /* Array of N clusterMsgDataGossip structures */
clusterMsgDataGossip gossip[1]; clusterMsgDataGossip gossip[1];
} ping; } ping;
/* FAIL */ /* FAIL */
struct { struct {
clusterMsgDataFail about; clusterMsgDataFail about;
} fail; } fail;
/* PUBLISH */
struct {
clusterMsgDataPublish msg;
} publish;
}; };
typedef struct { typedef struct {
@ -976,6 +989,7 @@ clusterNode *createClusterNode(char *nodename, int flags);
int clusterAddNode(clusterNode *node); int clusterAddNode(clusterNode *node);
void clusterCron(void); void clusterCron(void);
clusterNode *getNodeByQuery(redisClient *c, struct redisCommand *cmd, robj **argv, int argc, int *hashslot, int *ask); clusterNode *getNodeByQuery(redisClient *c, struct redisCommand *cmd, robj **argv, int argc, int *hashslot, int *ask);
void clusterPropagatePublish(robj *channel, robj *message);
/* Scripting */ /* Scripting */
void scriptingInit(void); void scriptingInit(void);