Bind source address for cluster communication

The first address specified as a bind parameter
(server.bindaddr[0]) gets used as the source IP
for cluster communication.

If no bind address is specified by the user, the
behavior is unchanged.

This patch allows multiple Redis Cluster instances
to communicate when running on the same interface
of the same host.
This commit is contained in:
Matt Stancliff 2014-03-03 10:57:27 -05:00
parent 8d011492a0
commit e5b1e7be64
3 changed files with 37 additions and 7 deletions

View File

@ -234,11 +234,12 @@ static int anetCreateSocket(char *err, int domain) {
#define ANET_CONNECT_NONE 0 #define ANET_CONNECT_NONE 0
#define ANET_CONNECT_NONBLOCK 1 #define ANET_CONNECT_NONBLOCK 1
static int anetTcpGenericConnect(char *err, char *addr, int port, int flags) static int anetTcpGenericConnect(char *err, char *addr, int port,
char *source_addr, int flags)
{ {
int s = ANET_ERR, rv; int s = ANET_ERR, rv;
char portstr[6]; /* strlen("65535") + 1; */ char portstr[6]; /* strlen("65535") + 1; */
struct addrinfo hints, *servinfo, *p; struct addrinfo hints, *servinfo, *bservinfo, *p, *b;
snprintf(portstr,sizeof(portstr),"%d",port); snprintf(portstr,sizeof(portstr),"%d",port);
memset(&hints,0,sizeof(hints)); memset(&hints,0,sizeof(hints));
@ -258,6 +259,24 @@ static int anetTcpGenericConnect(char *err, char *addr, int port, int flags)
if (anetSetReuseAddr(err,s) == ANET_ERR) goto error; if (anetSetReuseAddr(err,s) == ANET_ERR) goto error;
if (flags & ANET_CONNECT_NONBLOCK && anetNonBlock(err,s) != ANET_OK) if (flags & ANET_CONNECT_NONBLOCK && anetNonBlock(err,s) != ANET_OK)
goto error; goto error;
if (source_addr) {
int bound = 0;
/* Using getaddrinfo saves us from self-determining IPv4 vs IPv6 */
if ((rv = getaddrinfo(source_addr, NULL, &hints, &bservinfo)) != 0) {
anetSetError(err, "%s", gai_strerror(rv));
goto end;
}
for (b = bservinfo; b != NULL; b = b->ai_next) {
if (bind(s,b->ai_addr,b->ai_addrlen) != -1) {
bound = 1;
break;
}
}
if (!bound) {
anetSetError(err, "bind: %s", strerror(errno));
goto end;
}
}
if (connect(s,p->ai_addr,p->ai_addrlen) == -1) { if (connect(s,p->ai_addr,p->ai_addrlen) == -1) {
/* If the socket is non-blocking, it is ok for connect() to /* If the socket is non-blocking, it is ok for connect() to
* return an EINPROGRESS error here. */ * return an EINPROGRESS error here. */
@ -287,12 +306,17 @@ end:
int anetTcpConnect(char *err, char *addr, int port) int anetTcpConnect(char *err, char *addr, int port)
{ {
return anetTcpGenericConnect(err,addr,port,ANET_CONNECT_NONE); return anetTcpGenericConnect(err,addr,port,NULL,ANET_CONNECT_NONE);
} }
int anetTcpNonBlockConnect(char *err, char *addr, int port) int anetTcpNonBlockConnect(char *err, char *addr, int port)
{ {
return anetTcpGenericConnect(err,addr,port,ANET_CONNECT_NONBLOCK); return anetTcpGenericConnect(err,addr,port,NULL,ANET_CONNECT_NONBLOCK);
}
int anetTcpNonBlockBindConnect(char *err, char *addr, int port, char *source_addr)
{
return anetTcpGenericConnect(err,addr,port,source_addr,ANET_CONNECT_NONBLOCK);
} }
int anetUnixGenericConnect(char *err, char *path, int flags) int anetUnixGenericConnect(char *err, char *path, int flags)

View File

@ -45,6 +45,7 @@
int anetTcpConnect(char *err, char *addr, int port); int anetTcpConnect(char *err, char *addr, int port);
int anetTcpNonBlockConnect(char *err, char *addr, int port); int anetTcpNonBlockConnect(char *err, char *addr, int port);
int anetTcpNonBlockBindConnect(char *err, char *addr, int port, char *source_addr);
int anetUnixConnect(char *err, char *path); int anetUnixConnect(char *err, char *path);
int anetUnixNonBlockConnect(char *err, char *path); int anetUnixNonBlockConnect(char *err, char *path);
int anetRead(int fd, char *buf, int count); int anetRead(int fd, char *buf, int count);

View File

@ -2411,9 +2411,14 @@ void clusterCron(void) {
mstime_t old_ping_sent; mstime_t old_ping_sent;
clusterLink *link; clusterLink *link;
fd = anetTcpNonBlockConnect(server.neterr, node->ip, fd = anetTcpNonBlockBindConnect(server.neterr, node->ip,
node->port+REDIS_CLUSTER_PORT_INCR); node->port+REDIS_CLUSTER_PORT_INCR, server.bindaddr[0]);
if (fd == -1) continue; if (fd == -1) {
redisLog(REDIS_DEBUG, "Unable to connect to "
"Cluster Client [%s]:%d", node->ip,
node->port+REDIS_CLUSTER_PORT_INCR);
continue;
}
link = createClusterLink(node); link = createClusterLink(node);
link->fd = fd; link->fd = fd;
node->link = link; node->link = link;