mirror of
https://codeberg.org/redict/redict.git
synced 2025-01-22 16:18:28 -05:00
REPLCONF internal command introduced.
The REPLCONF command is an internal command (not designed to be directly used by normal clients) that allows a slave to set some replication related state in the master before issuing SYNC to start the replication. The initial motivation for this command, and the only reason currently it is used by the implementation, is to let the slave instance communicate its listening port to the slave, so that the master can show all the slaves with their listening ports in the "replication" section of the INFO output. This allows clients to auto discover and query all the slaves attached into a master. Currently only a single option of the REPLCONF command is supported, and it is called "listening-port", so the slave now starts the replication process with something like the following chat: REPLCONF listening-prot 6380 SYNC Note that this works even if the master is an older version of Redis and does not understand REPLCONF, because the slave ignores the REPLCONF error. In the future REPLCONF can be used for partial replication and other replication related features where there is the need to exchange information between master and slave. NOTE: This commit also fixes a bug: the INFO outout already carried information about slaves, but the port was broken, and was obtained with getpeername(2), so it was actually just the ephemeral port used by the slave to connect to the master as a client.
This commit is contained in:
parent
5410168c6e
commit
3a32897856
@ -55,6 +55,7 @@ redisClient *createClient(int fd) {
|
|||||||
c->ctime = c->lastinteraction = server.unixtime;
|
c->ctime = c->lastinteraction = server.unixtime;
|
||||||
c->authenticated = 0;
|
c->authenticated = 0;
|
||||||
c->replstate = REDIS_REPL_NONE;
|
c->replstate = REDIS_REPL_NONE;
|
||||||
|
c->slave_listening_port = 0;
|
||||||
c->reply = listCreate();
|
c->reply = listCreate();
|
||||||
c->reply_bytes = 0;
|
c->reply_bytes = 0;
|
||||||
c->obuf_soft_limit_reached_time = 0;
|
c->obuf_soft_limit_reached_time = 0;
|
||||||
|
@ -215,6 +215,7 @@ struct redisCommand redisCommandTable[] = {
|
|||||||
{"exec",execCommand,1,"s",0,NULL,0,0,0,0,0},
|
{"exec",execCommand,1,"s",0,NULL,0,0,0,0,0},
|
||||||
{"discard",discardCommand,1,"rs",0,NULL,0,0,0,0,0},
|
{"discard",discardCommand,1,"rs",0,NULL,0,0,0,0,0},
|
||||||
{"sync",syncCommand,1,"ars",0,NULL,0,0,0,0,0},
|
{"sync",syncCommand,1,"ars",0,NULL,0,0,0,0,0},
|
||||||
|
{"replconf",replconfCommand,-1,"ars",0,NULL,0,0,0,0,0},
|
||||||
{"flushdb",flushdbCommand,1,"w",0,NULL,0,0,0,0,0},
|
{"flushdb",flushdbCommand,1,"w",0,NULL,0,0,0,0,0},
|
||||||
{"flushall",flushallCommand,1,"w",0,NULL,0,0,0,0,0},
|
{"flushall",flushallCommand,1,"w",0,NULL,0,0,0,0,0},
|
||||||
{"sort",sortCommand,-2,"wmS",0,NULL,1,1,1,0,0},
|
{"sort",sortCommand,-2,"wmS",0,NULL,1,1,1,0,0},
|
||||||
@ -2119,7 +2120,7 @@ sds genRedisInfoString(char *section) {
|
|||||||
}
|
}
|
||||||
if (state == NULL) continue;
|
if (state == NULL) continue;
|
||||||
info = sdscatprintf(info,"slave%d:%s,%d,%s\r\n",
|
info = sdscatprintf(info,"slave%d:%s,%d,%s\r\n",
|
||||||
slaveid,ip,port,state);
|
slaveid,ip,slave->slave_listening_port,state);
|
||||||
slaveid++;
|
slaveid++;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -346,6 +346,7 @@ typedef struct redisClient {
|
|||||||
int repldbfd; /* replication DB file descriptor */
|
int repldbfd; /* replication DB file descriptor */
|
||||||
long repldboff; /* replication DB file offset */
|
long repldboff; /* replication DB file offset */
|
||||||
off_t repldbsize; /* replication DB file size */
|
off_t repldbsize; /* replication DB file size */
|
||||||
|
int slave_listening_port; /* As configured with: SLAVECONF listening-port */
|
||||||
multiState mstate; /* MULTI/EXEC state */
|
multiState mstate; /* MULTI/EXEC state */
|
||||||
blockingState bpop; /* blocking state */
|
blockingState bpop; /* blocking state */
|
||||||
list *io_keys; /* Keys this client is waiting to be loaded from the
|
list *io_keys; /* Keys this client is waiting to be loaded from the
|
||||||
@ -1256,6 +1257,7 @@ void scriptCommand(redisClient *c);
|
|||||||
void timeCommand(redisClient *c);
|
void timeCommand(redisClient *c);
|
||||||
void bitopCommand(redisClient *c);
|
void bitopCommand(redisClient *c);
|
||||||
void bitcountCommand(redisClient *c);
|
void bitcountCommand(redisClient *c);
|
||||||
|
void replconfCommand(redisClient *c);
|
||||||
|
|
||||||
#if defined(__GNUC__)
|
#if defined(__GNUC__)
|
||||||
void *calloc(size_t count, size_t size) __attribute__ ((deprecated));
|
void *calloc(size_t count, size_t size) __attribute__ ((deprecated));
|
||||||
|
@ -145,6 +145,46 @@ void syncCommand(redisClient *c) {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* REPLCONF <option> <value> <option> <value> ...
|
||||||
|
* This command is used by a slave in order to configure the replication
|
||||||
|
* process before starting it with the SYNC command.
|
||||||
|
*
|
||||||
|
* Currently the only use of this command is to communicate to the master
|
||||||
|
* what is the listening port of the Slave redis instance, so that the
|
||||||
|
* master can accurately list slaves and their listening ports in
|
||||||
|
* the INFO output.
|
||||||
|
*
|
||||||
|
* In the future the same command can be used in order to configure
|
||||||
|
* the replication to initiate an incremental replication instead of a
|
||||||
|
* full resync. */
|
||||||
|
void replconfCommand(redisClient *c) {
|
||||||
|
int j;
|
||||||
|
|
||||||
|
if ((c->argc % 2) == 0) {
|
||||||
|
/* Number of arguments must be odd to make sure that every
|
||||||
|
* option has a corresponding value. */
|
||||||
|
addReply(c,shared.syntaxerr);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Process every option-value pair. */
|
||||||
|
for (j = 1; j < c->argc; j+=2) {
|
||||||
|
if (!strcasecmp(c->argv[j]->ptr,"listening-port")) {
|
||||||
|
long port;
|
||||||
|
|
||||||
|
if ((getLongFromObjectOrReply(c,c->argv[j+1],
|
||||||
|
&port,NULL) != REDIS_OK))
|
||||||
|
return;
|
||||||
|
c->slave_listening_port = port;
|
||||||
|
} else {
|
||||||
|
addReplyErrorFormat(c,"Unrecognized REPLCONF option: %s",
|
||||||
|
(char*)c->argv[j]->ptr);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
addReply(c,shared.ok);
|
||||||
|
}
|
||||||
|
|
||||||
void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) {
|
void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) {
|
||||||
redisClient *slave = privdata;
|
redisClient *slave = privdata;
|
||||||
REDIS_NOTUSED(el);
|
REDIS_NOTUSED(el);
|
||||||
@ -378,8 +418,54 @@ error:
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* Send a synchronous command to the master. Used to send AUTH and
|
||||||
|
* REPLCONF commadns before starting the replication with SYNC.
|
||||||
|
*
|
||||||
|
* On success NULL is returned.
|
||||||
|
* On error an sds string describing the error is returned.
|
||||||
|
*/
|
||||||
|
char *sendSynchronousCommand(int fd, ...) {
|
||||||
|
va_list ap;
|
||||||
|
sds cmd = sdsempty();
|
||||||
|
char *arg, buf[256];
|
||||||
|
|
||||||
|
/* Create the command to send to the master, we use simple inline
|
||||||
|
* protocol for simplicity as currently we only send simple strings. */
|
||||||
|
va_start(ap,fd);
|
||||||
|
while(1) {
|
||||||
|
arg = va_arg(ap, char*);
|
||||||
|
if (arg == NULL) break;
|
||||||
|
|
||||||
|
if (sdslen(cmd) != 0) cmd = sdscatlen(cmd," ",1);
|
||||||
|
cmd = sdscat(cmd,arg);
|
||||||
|
}
|
||||||
|
cmd = sdscatlen(cmd,"\r\n",2);
|
||||||
|
|
||||||
|
/* Transfer command to the server. */
|
||||||
|
if (syncWrite(fd,cmd,sdslen(cmd),server.repl_syncio_timeout*1000) == -1) {
|
||||||
|
sdsfree(cmd);
|
||||||
|
return sdscatprintf(sdsempty(),"Writing to master: %s",
|
||||||
|
strerror(errno));
|
||||||
|
}
|
||||||
|
sdsfree(cmd);
|
||||||
|
|
||||||
|
/* Read the reply from the server. */
|
||||||
|
if (syncReadLine(fd,buf,sizeof(buf),server.repl_syncio_timeout*1000) == -1)
|
||||||
|
{
|
||||||
|
return sdscatprintf(sdsempty(),"Reading from master: %s",
|
||||||
|
strerror(errno));
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Check for errors from the server. */
|
||||||
|
if (buf[0] != '+') {
|
||||||
|
return sdscatprintf(sdsempty(),"Error from master: %s", buf);
|
||||||
|
}
|
||||||
|
|
||||||
|
return NULL; /* No errors. */
|
||||||
|
}
|
||||||
|
|
||||||
void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {
|
void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {
|
||||||
char buf[1024], tmpfile[256];
|
char tmpfile[256], *err;
|
||||||
int dfd, maxtries = 5;
|
int dfd, maxtries = 5;
|
||||||
REDIS_NOTUSED(el);
|
REDIS_NOTUSED(el);
|
||||||
REDIS_NOTUSED(privdata);
|
REDIS_NOTUSED(privdata);
|
||||||
@ -400,24 +486,26 @@ void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {
|
|||||||
|
|
||||||
/* AUTH with the master if required. */
|
/* AUTH with the master if required. */
|
||||||
if(server.masterauth) {
|
if(server.masterauth) {
|
||||||
char authcmd[1024];
|
err = sendSynchronousCommand(fd,"AUTH",server.masterauth,NULL);
|
||||||
size_t authlen;
|
if (err) {
|
||||||
|
redisLog(REDIS_WARNING,"Unable to AUTH to MASTER: %s",err);
|
||||||
|
sdsfree(err);
|
||||||
|
goto error;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
authlen = snprintf(authcmd,sizeof(authcmd),"AUTH %s\r\n",server.masterauth);
|
/* Set the slave port, so that Master's INFO command can list the
|
||||||
if (syncWrite(fd,authcmd,authlen,server.repl_syncio_timeout*1000) == -1) {
|
* slave listening port correctly. */
|
||||||
redisLog(REDIS_WARNING,"Unable to AUTH to MASTER: %s",
|
{
|
||||||
strerror(errno));
|
sds port = sdsfromlonglong(server.port);
|
||||||
goto error;
|
err = sendSynchronousCommand(fd,"REPLCONF","listening-port",port,
|
||||||
}
|
NULL);
|
||||||
/* Read the AUTH result. */
|
sdsfree(port);
|
||||||
if (syncReadLine(fd,buf,1024,server.repl_syncio_timeout*1000) == -1) {
|
/* Ignore the error if any, not all the Redis versions support
|
||||||
redisLog(REDIS_WARNING,"I/O error reading auth result from MASTER: %s",
|
* REPLCONF listening-port. */
|
||||||
strerror(errno));
|
if (err) {
|
||||||
goto error;
|
redisLog(REDIS_NOTICE,"(non critical): Master does not understand REPLCONF listening-port: %s", err);
|
||||||
}
|
sdsfree(err);
|
||||||
if (buf[0] != '+') {
|
|
||||||
redisLog(REDIS_WARNING,"Cannot AUTH to MASTER, is the masterauth password correct?");
|
|
||||||
goto error;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user