mirror of
https://codeberg.org/redict/redict.git
synced 2025-01-22 08:08:53 -05:00
Ability of slave to announce arbitrary ip/port to master.
This feature is useful, especially in deployments using Sentinel in order to setup Redis HA, where the slave is executed with NAT or port forwarding, so that the auto-detected port/ip addresses, as listed in the "INFO replication" output of the master, or as provided by the "ROLE" command, don't match the real addresses at which the slave is reachable for connections.
This commit is contained in:
parent
356a6304ec
commit
55385f99de
29
redis.conf
29
redis.conf
@ -443,6 +443,35 @@ slave-priority 100
|
||||
# By default min-slaves-to-write is set to 0 (feature disabled) and
|
||||
# min-slaves-max-lag is set to 10.
|
||||
|
||||
# A Redis master is able to list the address and port of the attached
|
||||
# slaves in different ways. For example the "INFO replication" section
|
||||
# offers this information, which is used, among other tools, by
|
||||
# Redis Sentinel in order to discover slave instances.
|
||||
# Another place where this info is available is in the output of the
|
||||
# "ROLE" command of a masteer.
|
||||
#
|
||||
# The listed IP and address normally reported by a slave is obtained
|
||||
# in the following way:
|
||||
#
|
||||
# IP: The address is auto detected by checking the peer address
|
||||
# of the socket used by the slave to connect with the master.
|
||||
#
|
||||
# Port: The port is communicated by the slave during the replication
|
||||
# handshake, and is normally the port that the slave is using to
|
||||
# list for connections.
|
||||
#
|
||||
# However when port forwarding or Network Address Translation (NAT) is
|
||||
# used, the slave may be actually reachable via different IP and port
|
||||
# pairs. The following two options can be used by a slave in order to
|
||||
# report to its master a specific set of IP and port, so that both INFO
|
||||
# and ROLE will report those values.
|
||||
#
|
||||
# There is no need to use both the options if you need to override just
|
||||
# the port or the IP address.
|
||||
#
|
||||
# slave-announce-ip 5.5.5.5
|
||||
# slave-announce-port 1234
|
||||
|
||||
################################## SECURITY ###################################
|
||||
|
||||
# Require clients to issue AUTH <PASSWORD> before processing any other
|
||||
|
19
src/config.c
19
src/config.c
@ -633,6 +633,16 @@ void loadServerConfigFromString(char *config) {
|
||||
}
|
||||
} else if (!strcasecmp(argv[0],"slave-priority") && argc == 2) {
|
||||
server.slave_priority = atoi(argv[1]);
|
||||
} else if (!strcasecmp(argv[0],"slave-announce-ip") && argc == 2) {
|
||||
zfree(server.slave_announce_ip);
|
||||
server.slave_announce_ip = zstrdup(argv[1]);
|
||||
} else if (!strcasecmp(argv[0],"slave-announce-port") && argc == 2) {
|
||||
server.slave_announce_port = atoi(argv[1]);
|
||||
if (server.slave_announce_port < 0 ||
|
||||
server.slave_announce_port > 65535)
|
||||
{
|
||||
err = "Invalid port"; goto loaderr;
|
||||
}
|
||||
} else if (!strcasecmp(argv[0],"min-slaves-to-write") && argc == 2) {
|
||||
server.repl_min_slaves_to_write = atoi(argv[1]);
|
||||
if (server.repl_min_slaves_to_write < 0) {
|
||||
@ -925,6 +935,9 @@ void configSetCommand(client *c) {
|
||||
|
||||
if (flags == -1) goto badfmt;
|
||||
server.notify_keyspace_events = flags;
|
||||
} config_set_special_field("slave-announce-ip") {
|
||||
zfree(server.slave_announce_ip);
|
||||
server.slave_announce_ip = ((char*)o->ptr)[0] ? zstrdup(o->ptr) : NULL;
|
||||
|
||||
/* Boolean fields.
|
||||
* config_set_bool_field(name,var). */
|
||||
@ -1013,6 +1026,8 @@ void configSetCommand(client *c) {
|
||||
"repl-diskless-sync-delay",server.repl_diskless_sync_delay,0,LLONG_MAX) {
|
||||
} config_set_numerical_field(
|
||||
"slave-priority",server.slave_priority,0,LLONG_MAX) {
|
||||
} config_set_numerical_field(
|
||||
"slave-announce-port",server.slave_announce_port,0,65535) {
|
||||
} config_set_numerical_field(
|
||||
"min-slaves-to-write",server.repl_min_slaves_to_write,0,LLONG_MAX) {
|
||||
refreshGoodSlavesCount();
|
||||
@ -1133,6 +1148,7 @@ void configGetCommand(client *c) {
|
||||
config_get_string_field("unixsocket",server.unixsocket);
|
||||
config_get_string_field("logfile",server.logfile);
|
||||
config_get_string_field("pidfile",server.pidfile);
|
||||
config_get_string_field("slave-announce-ip",server.slave_announce_ip);
|
||||
|
||||
/* Numerical values */
|
||||
config_get_numerical_field("maxmemory",server.maxmemory);
|
||||
@ -1177,6 +1193,7 @@ void configGetCommand(client *c) {
|
||||
config_get_numerical_field("maxclients",server.maxclients);
|
||||
config_get_numerical_field("watchdog-period",server.watchdog_period);
|
||||
config_get_numerical_field("slave-priority",server.slave_priority);
|
||||
config_get_numerical_field("slave-announce-port",server.slave_announce_port);
|
||||
config_get_numerical_field("min-slaves-to-write",server.repl_min_slaves_to_write);
|
||||
config_get_numerical_field("min-slaves-max-lag",server.repl_min_slaves_max_lag);
|
||||
config_get_numerical_field("hz",server.hz);
|
||||
@ -1865,6 +1882,7 @@ int rewriteConfig(char *path) {
|
||||
rewriteConfigOctalOption(state,"unixsocketperm",server.unixsocketperm,CONFIG_DEFAULT_UNIX_SOCKET_PERM);
|
||||
rewriteConfigNumericalOption(state,"timeout",server.maxidletime,CONFIG_DEFAULT_CLIENT_TIMEOUT);
|
||||
rewriteConfigNumericalOption(state,"tcp-keepalive",server.tcpkeepalive,CONFIG_DEFAULT_TCP_KEEPALIVE);
|
||||
rewriteConfigNumericalOption(state,"slave-announce-port",server.slave_announce_port,CONFIG_DEFAULT_SLAVE_ANNOUNCE_PORT);
|
||||
rewriteConfigEnumOption(state,"loglevel",server.verbosity,loglevel_enum,CONFIG_DEFAULT_VERBOSITY);
|
||||
rewriteConfigStringOption(state,"logfile",server.logfile,CONFIG_DEFAULT_LOGFILE);
|
||||
rewriteConfigYesNoOption(state,"syslog-enabled",server.syslog_enabled,CONFIG_DEFAULT_SYSLOG_ENABLED);
|
||||
@ -1878,6 +1896,7 @@ int rewriteConfig(char *path) {
|
||||
rewriteConfigStringOption(state,"dbfilename",server.rdb_filename,CONFIG_DEFAULT_RDB_FILENAME);
|
||||
rewriteConfigDirOption(state);
|
||||
rewriteConfigSlaveofOption(state);
|
||||
rewriteConfigStringOption(state,"slave-announce-ip",server.slave_announce_ip,CONFIG_DEFAULT_SLAVE_ANNOUNCE_IP);
|
||||
rewriteConfigStringOption(state,"masterauth",server.masterauth,NULL);
|
||||
rewriteConfigStringOption(state,"cluster-announce-ip",server.cluster_announce_ip,NULL);
|
||||
rewriteConfigYesNoOption(state,"slave-serve-stale-data",server.repl_serve_stale_data,CONFIG_DEFAULT_SLAVE_SERVE_STALE_DATA);
|
||||
|
@ -109,6 +109,7 @@ client *createClient(int fd) {
|
||||
c->repl_ack_off = 0;
|
||||
c->repl_ack_time = 0;
|
||||
c->slave_listening_port = 0;
|
||||
c->slave_ip[0] = '\0';
|
||||
c->slave_capa = SLAVE_CAPA_NONE;
|
||||
c->reply = listCreate();
|
||||
c->reply_bytes = 0;
|
||||
|
@ -47,7 +47,7 @@ int cancelReplicationHandshake(void);
|
||||
|
||||
/* Return the pointer to a string representing the slave ip:listening_port
|
||||
* pair. Mostly useful for logging, since we want to log a slave using its
|
||||
* IP address and it's listening port which is more clear for the user, for
|
||||
* IP address and its listening port which is more clear for the user, for
|
||||
* example: "Closing connection with slave 10.1.2.3:6380". */
|
||||
char *replicationGetSlaveName(client *c) {
|
||||
static char buf[NET_PEER_ID_LEN];
|
||||
@ -55,7 +55,12 @@ char *replicationGetSlaveName(client *c) {
|
||||
|
||||
ip[0] = '\0';
|
||||
buf[0] = '\0';
|
||||
if (anetPeerToString(c->fd,ip,sizeof(ip),NULL) != -1) {
|
||||
if (c->slave_ip[0] != '\0' ||
|
||||
anetPeerToString(c->fd,ip,sizeof(ip),NULL) != -1)
|
||||
{
|
||||
/* Note that the 'ip' buffer is always larger than 'c->slave_ip' */
|
||||
if (c->slave_ip[0] != '\0') memcpy(ip,c->slave_ip,sizeof(c->slave_ip));
|
||||
|
||||
if (c->slave_listening_port)
|
||||
anetFormatAddr(buf,sizeof(buf),ip,c->slave_listening_port);
|
||||
else
|
||||
@ -717,6 +722,15 @@ void replconfCommand(client *c) {
|
||||
&port,NULL) != C_OK))
|
||||
return;
|
||||
c->slave_listening_port = port;
|
||||
} else if (!strcasecmp(c->argv[j]->ptr,"ip-address")) {
|
||||
sds ip = c->argv[j+1]->ptr;
|
||||
if (sdslen(ip) < sizeof(c->slave_ip)) {
|
||||
memcpy(c->slave_ip,ip,sdslen(ip)+1);
|
||||
} else {
|
||||
addReplyErrorFormat(c,"REPLCONF ip-address provided by "
|
||||
"slave instance is too long: %zd bytes", sdslen(ip));
|
||||
return;
|
||||
}
|
||||
} else if (!strcasecmp(c->argv[j]->ptr,"capa")) {
|
||||
/* Ignore capabilities not understood by this master. */
|
||||
if (!strcasecmp(c->argv[j+1]->ptr,"eof"))
|
||||
@ -1462,7 +1476,8 @@ void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {
|
||||
/* Set the slave port, so that Master's INFO command can list the
|
||||
* slave listening port correctly. */
|
||||
if (server.repl_state == REPL_STATE_SEND_PORT) {
|
||||
sds port = sdsfromlonglong(server.port);
|
||||
sds port = sdsfromlonglong(server.slave_announce_port ?
|
||||
server.slave_announce_port : server.port);
|
||||
err = sendSynchronousCommand(SYNC_CMD_WRITE,fd,"REPLCONF",
|
||||
"listening-port",port, NULL);
|
||||
sdsfree(port);
|
||||
@ -1482,6 +1497,37 @@ void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {
|
||||
"REPLCONF listening-port: %s", err);
|
||||
}
|
||||
sdsfree(err);
|
||||
server.repl_state = REPL_STATE_SEND_IP;
|
||||
}
|
||||
|
||||
/* Skip REPLCONF ip-address if there is no slave-announce-ip option set. */
|
||||
if (server.repl_state == REPL_STATE_SEND_IP &&
|
||||
server.slave_announce_ip == NULL)
|
||||
{
|
||||
server.repl_state = REPL_STATE_SEND_CAPA;
|
||||
}
|
||||
|
||||
/* Set the slave ip, so that Master's INFO command can list the
|
||||
* slave IP address port correctly in case of port forwarding or NAT. */
|
||||
if (server.repl_state == REPL_STATE_SEND_IP) {
|
||||
err = sendSynchronousCommand(SYNC_CMD_WRITE,fd,"REPLCONF",
|
||||
"ip-address",server.slave_announce_ip, NULL);
|
||||
if (err) goto write_error;
|
||||
sdsfree(err);
|
||||
server.repl_state = REPL_STATE_RECEIVE_IP;
|
||||
return;
|
||||
}
|
||||
|
||||
/* Receive REPLCONF ip-address reply. */
|
||||
if (server.repl_state == REPL_STATE_RECEIVE_IP) {
|
||||
err = sendSynchronousCommand(SYNC_CMD_READ,fd,NULL);
|
||||
/* Ignore the error if any, not all the Redis versions support
|
||||
* REPLCONF listening-port. */
|
||||
if (err[0] == '-') {
|
||||
serverLog(LL_NOTICE,"(Non critical) Master does not understand "
|
||||
"REPLCONF ip-address: %s", err);
|
||||
}
|
||||
sdsfree(err);
|
||||
server.repl_state = REPL_STATE_SEND_CAPA;
|
||||
}
|
||||
|
||||
@ -1787,12 +1833,16 @@ void roleCommand(client *c) {
|
||||
listRewind(server.slaves,&li);
|
||||
while((ln = listNext(&li))) {
|
||||
client *slave = ln->value;
|
||||
char ip[NET_IP_STR_LEN];
|
||||
char ip[NET_IP_STR_LEN], *slaveip = slave->slave_ip;
|
||||
|
||||
if (anetPeerToString(slave->fd,ip,sizeof(ip),NULL) == -1) continue;
|
||||
if (slaveip[0] == '\0') {
|
||||
if (anetPeerToString(slave->fd,ip,sizeof(ip),NULL) == -1)
|
||||
continue;
|
||||
slaveip = ip;
|
||||
}
|
||||
if (slave->replstate != SLAVE_STATE_ONLINE) continue;
|
||||
addReplyMultiBulkLen(c,3);
|
||||
addReplyBulkCString(c,ip);
|
||||
addReplyBulkCString(c,slaveip);
|
||||
addReplyBulkLongLong(c,slave->slave_listening_port);
|
||||
addReplyBulkLongLong(c,slave->repl_ack_off);
|
||||
slaves++;
|
||||
|
12
src/server.c
12
src/server.c
@ -1412,6 +1412,8 @@ void initServerConfig(void) {
|
||||
server.repl_min_slaves_to_write = CONFIG_DEFAULT_MIN_SLAVES_TO_WRITE;
|
||||
server.repl_min_slaves_max_lag = CONFIG_DEFAULT_MIN_SLAVES_MAX_LAG;
|
||||
server.slave_priority = CONFIG_DEFAULT_SLAVE_PRIORITY;
|
||||
server.slave_announce_ip = CONFIG_DEFAULT_SLAVE_ANNOUNCE_IP;
|
||||
server.slave_announce_port = CONFIG_DEFAULT_SLAVE_ANNOUNCE_PORT;
|
||||
server.master_repl_offset = 0;
|
||||
|
||||
/* Replication partial resync backlog */
|
||||
@ -3056,11 +3058,15 @@ sds genRedisInfoString(char *section) {
|
||||
while((ln = listNext(&li))) {
|
||||
client *slave = listNodeValue(ln);
|
||||
char *state = NULL;
|
||||
char ip[NET_IP_STR_LEN];
|
||||
char ip[NET_IP_STR_LEN], *slaveip = slave->slave_ip;
|
||||
int port;
|
||||
long lag = 0;
|
||||
|
||||
if (anetPeerToString(slave->fd,ip,sizeof(ip),&port) == -1) continue;
|
||||
if (slaveip[0] == '\0') {
|
||||
if (anetPeerToString(slave->fd,ip,sizeof(ip),&port) == -1)
|
||||
continue;
|
||||
slaveip = ip;
|
||||
}
|
||||
switch(slave->replstate) {
|
||||
case SLAVE_STATE_WAIT_BGSAVE_START:
|
||||
case SLAVE_STATE_WAIT_BGSAVE_END:
|
||||
@ -3080,7 +3086,7 @@ sds genRedisInfoString(char *section) {
|
||||
info = sdscatprintf(info,
|
||||
"slave%d:ip=%s,port=%d,state=%s,"
|
||||
"offset=%lld,lag=%ld\r\n",
|
||||
slaveid,ip,slave->slave_listening_port,state,
|
||||
slaveid,slaveip,slave->slave_listening_port,state,
|
||||
slave->repl_ack_off, lag);
|
||||
slaveid++;
|
||||
}
|
||||
|
23
src/server.h
23
src/server.h
@ -126,6 +126,8 @@ typedef long long mstime_t; /* millisecond time type. */
|
||||
#define CONFIG_DEFAULT_REPL_DISKLESS_SYNC_DELAY 5
|
||||
#define CONFIG_DEFAULT_SLAVE_SERVE_STALE_DATA 1
|
||||
#define CONFIG_DEFAULT_SLAVE_READ_ONLY 1
|
||||
#define CONFIG_DEFAULT_SLAVE_ANNOUNCE_IP NULL
|
||||
#define CONFIG_DEFAULT_SLAVE_ANNOUNCE_PORT 0
|
||||
#define CONFIG_DEFAULT_REPL_DISABLE_TCP_NODELAY 0
|
||||
#define CONFIG_DEFAULT_MAXMEMORY 0
|
||||
#define CONFIG_DEFAULT_MAXMEMORY_SAMPLES 5
|
||||
@ -267,13 +269,15 @@ typedef long long mstime_t; /* millisecond time type. */
|
||||
#define REPL_STATE_RECEIVE_AUTH 5 /* Wait for AUTH reply */
|
||||
#define REPL_STATE_SEND_PORT 6 /* Send REPLCONF listening-port */
|
||||
#define REPL_STATE_RECEIVE_PORT 7 /* Wait for REPLCONF reply */
|
||||
#define REPL_STATE_SEND_CAPA 8 /* Send REPLCONF capa */
|
||||
#define REPL_STATE_RECEIVE_CAPA 9 /* Wait for REPLCONF reply */
|
||||
#define REPL_STATE_SEND_PSYNC 10 /* Send PSYNC */
|
||||
#define REPL_STATE_RECEIVE_PSYNC 11 /* Wait for PSYNC reply */
|
||||
#define REPL_STATE_SEND_IP 8 /* Send REPLCONF ip-address */
|
||||
#define REPL_STATE_RECEIVE_IP 9 /* Wait for REPLCONF reply */
|
||||
#define REPL_STATE_SEND_CAPA 10 /* Send REPLCONF capa */
|
||||
#define REPL_STATE_RECEIVE_CAPA 11 /* Wait for REPLCONF reply */
|
||||
#define REPL_STATE_SEND_PSYNC 12 /* Send PSYNC */
|
||||
#define REPL_STATE_RECEIVE_PSYNC 13 /* Wait for PSYNC reply */
|
||||
/* --- End of handshake states --- */
|
||||
#define REPL_STATE_TRANSFER 12 /* Receiving .rdb from master */
|
||||
#define REPL_STATE_CONNECTED 13 /* Connected to master */
|
||||
#define REPL_STATE_TRANSFER 14 /* Receiving .rdb from master */
|
||||
#define REPL_STATE_CONNECTED 15 /* Connected to master */
|
||||
|
||||
/* State of slaves from the POV of the master. Used in client->replstate.
|
||||
* In SEND_BULK and ONLINE state the slave receives new updates
|
||||
@ -665,7 +669,8 @@ typedef struct client {
|
||||
copying this slave output buffer
|
||||
should use. */
|
||||
char replrunid[CONFIG_RUN_ID_SIZE+1]; /* Master run id if is a master. */
|
||||
int slave_listening_port; /* As configured with: SLAVECONF listening-port */
|
||||
int slave_listening_port; /* As configured with: REPLCONF listening-port */
|
||||
char slave_ip[NET_IP_STR_LEN]; /* Optionally given by REPLCONF ip-address */
|
||||
int slave_capa; /* Slave capabilities: SLAVE_CAPA_* bitwise OR. */
|
||||
multiState mstate; /* MULTI/EXEC state */
|
||||
int btype; /* Type of blocking op if CLIENT_BLOCKED. */
|
||||
@ -971,7 +976,9 @@ struct redisServer {
|
||||
time_t repl_down_since; /* Unix time at which link with master went down */
|
||||
int repl_disable_tcp_nodelay; /* Disable TCP_NODELAY after SYNC? */
|
||||
int slave_priority; /* Reported in INFO and used by Sentinel. */
|
||||
char repl_master_runid[CONFIG_RUN_ID_SIZE+1]; /* Master run id for PSYNC. */
|
||||
int slave_announce_port; /* Give the master this listening port. */
|
||||
char *slave_announce_ip; /* Give the master this ip address. */
|
||||
char repl_master_runid[CONFIG_RUN_ID_SIZE+1]; /* Master run id for PSYNC.*/
|
||||
long long repl_master_initial_offset; /* Master PSYNC offset. */
|
||||
int repl_slave_lazy_flush; /* Lazy FLUSHALL before loading DB? */
|
||||
/* Replication script cache. */
|
||||
|
Loading…
Reference in New Issue
Block a user