syncWithMaster: sendSynchronousCommand split to send, and receive

This is just a refactoring commit.

This function was never actually used as a synchronous (do both send or
receive), it was always used only ine one of the two modes, which meant it
has to take extra arguments that are not relevant for the other.

Besides that, a tool that sends a synchronous command, it not something
we want in our toolbox (synchronous IO in single threaded app is evil).

sendSynchronousCommand was now refactored into separate sending and
receiving APIs, and the sending part has two variants, one taking vaargs,
and the other taking argc+argv (and an optional length array which means
you can use binary sds strings).
This commit is contained in:
Oran Agra 2020-12-18 00:02:57 +02:00
parent 35fc7fda7a
commit 9bd212cf24

View File

@ -1827,65 +1827,10 @@ error:
return;
}
/* Send a synchronous command to the master. Used to send AUTH and
* REPLCONF commands before starting the replication with SYNC.
*
* The command returns an sds string representing the result of the
* operation. On error the first byte is a "-".
*/
#define SYNC_CMD_READ (1<<0)
#define SYNC_CMD_WRITE (1<<1)
#define SYNC_CMD_WRITE_SDS (1<<2)
#define SYNC_CMD_FULL (SYNC_CMD_READ|SYNC_CMD_WRITE)
char *sendSynchronousCommand(int flags, connection *conn, ...) {
/* Create the command to send to the master, we use redis binary
* protocol to make sure correct arguments are sent. This function
* is not safe for all binary data. */
if (flags & SYNC_CMD_WRITE) {
char *arg;
va_list ap;
sds cmd = sdsempty();
sds cmdargs = sdsempty();
size_t argslen = 0;
va_start(ap,conn);
while(1) {
arg = va_arg(ap, char*);
if (arg == NULL) break;
if (flags & SYNC_CMD_WRITE_SDS) {
cmdargs = sdscatprintf(cmdargs,"$%zu\r\n", sdslen((sds)arg));
cmdargs = sdscatsds(cmdargs, (sds)arg);
cmdargs = sdscat(cmdargs, "\r\n");
} else {
cmdargs = sdscatprintf(cmdargs,"$%zu\r\n%s\r\n",strlen(arg),arg);
}
argslen++;
}
va_end(ap);
cmd = sdscatprintf(cmd,"*%zu\r\n",argslen);
cmd = sdscatsds(cmd,cmdargs);
sdsfree(cmdargs);
/* Transfer command to the server. */
if (connSyncWrite(conn,cmd,sdslen(cmd),server.repl_syncio_timeout*1000)
== -1)
{
sdsfree(cmd);
return sdscatprintf(sdsempty(),"-Writing to master: %s",
connGetLastError(conn));
}
sdsfree(cmd);
}
/* Read the reply from the server. */
if (flags & SYNC_CMD_READ) {
char *receiveSynchronousResponse(connection *conn) {
char buf[256];
if (connSyncReadLine(conn,buf,sizeof(buf),server.repl_syncio_timeout*1000)
== -1)
/* Read the reply from the server. */
if (connSyncReadLine(conn,buf,sizeof(buf),server.repl_syncio_timeout*1000) == -1)
{
return sdscatprintf(sdsempty(),"-Reading from master: %s",
strerror(errno));
@ -1893,6 +1838,83 @@ char *sendSynchronousCommand(int flags, connection *conn, ...) {
server.repl_transfer_lastio = server.unixtime;
return sdsnew(buf);
}
/* Send a pre-formatted multi-bulk command to the connection. */
char* sendCommandRaw(connection *conn, sds cmd) {
if (connSyncWrite(conn,cmd,sdslen(cmd),server.repl_syncio_timeout*1000) == -1) {
return sdscatprintf(sdsempty(),"-Writing to master: %s",
connGetLastError(conn));
}
return NULL;
}
/* Compose a multi-bulk command and send it to the connection.
* Used to send AUTH and REPLCONF commands to the master before starting the
* replication.
*
* Takes a list of char* arguments, terminated by a NULL argument.
*
* The command returns an sds string representing the result of the
* operation. On error the first byte is a "-".
*/
char *sendCommand(connection *conn, ...) {
va_list ap;
sds cmd = sdsempty();
sds cmdargs = sdsempty();
size_t argslen = 0;
char *arg;
/* Create the command to send to the master, we use redis binary
* protocol to make sure correct arguments are sent. This function
* is not safe for all binary data. */
va_start(ap,conn);
while(1) {
arg = va_arg(ap, char*);
if (arg == NULL) break;
cmdargs = sdscatprintf(cmdargs,"$%zu\r\n%s\r\n",strlen(arg),arg);
argslen++;
}
cmd = sdscatprintf(cmd,"*%zu\r\n",argslen);
cmd = sdscatsds(cmd,cmdargs);
sdsfree(cmdargs);
va_end(ap);
char* err = sendCommandRaw(conn, cmd);
sdsfree(cmd);
if(err)
return err;
return NULL;
}
/* Compose a multi-bulk command and send it to the connection.
* Used to send AUTH and REPLCONF commands to the master before starting the
* replication.
*
* argv_lens is optional, when NULL, strlen is used.
*
* The command returns an sds string representing the result of the
* operation. On error the first byte is a "-".
*/
char *sendCommandArgv(connection *conn, int argc, char **argv, size_t *argv_lens) {
sds cmd = sdsempty();
char *arg;
int i;
/* Create the command to send to the master. */
cmd = sdscatfmt(cmd,"*%i\r\n",argc);
for (i=0; i<argc; i++) {
int len;
arg = argv[i];
len = argv_lens ? argv_lens[i] : strlen(arg);
cmd = sdscatfmt(cmd,"$%i\r\n",len);
cmd = sdscatlen(cmd,arg,len);
cmd = sdscatlen(cmd,"\r\n",2);
}
char* err = sendCommandRaw(conn, cmd);
sdsfree(cmd);
if (err)
return err;
return NULL;
}
@ -1975,7 +1997,7 @@ int slaveTryPartialResynchronization(connection *conn, int read_reply) {
}
/* Issue the PSYNC command */
reply = sendSynchronousCommand(SYNC_CMD_WRITE,conn,"PSYNC",psync_replid,psync_offset,NULL);
reply = sendCommand(conn,"PSYNC",psync_replid,psync_offset,NULL);
if (reply != NULL) {
serverLog(LL_WARNING,"Unable to send PSYNC to master: %s",reply);
sdsfree(reply);
@ -1986,7 +2008,7 @@ int slaveTryPartialResynchronization(connection *conn, int read_reply) {
}
/* Reading half */
reply = sendSynchronousCommand(SYNC_CMD_READ,conn,NULL);
reply = receiveSynchronousResponse(conn);
if (sdslen(reply) == 0) {
/* The master may send empty newlines after it receives PSYNC
* and before to reply, just to keep the connection alive. */
@ -2140,14 +2162,14 @@ void syncWithMaster(connection *conn) {
server.repl_state = REPL_STATE_RECEIVE_PONG;
/* Send the PING, don't check for errors at all, we have the timeout
* that will take care about this. */
err = sendSynchronousCommand(SYNC_CMD_WRITE,conn,"PING",NULL);
err = sendCommand(conn,"PING",NULL);
if (err) goto write_error;
return;
}
/* Receive the PONG command. */
if (server.repl_state == REPL_STATE_RECEIVE_PONG) {
err = sendSynchronousCommand(SYNC_CMD_READ,conn,NULL);
err = receiveSynchronousResponse(conn);
/* We accept only two replies as valid, a positive +PONG reply
* (we just check for "+") or an authentication error.
@ -2173,17 +2195,18 @@ void syncWithMaster(connection *conn) {
/* AUTH with the master if required. */
if (server.repl_state == REPL_STATE_SEND_AUTH) {
if (server.masterauth) {
sds auth = sdsnew("AUTH");
char *args[3] = {"AUTH",NULL,NULL};
size_t lens[3] = {4,0,0};
int argc = 1;
if (server.masteruser) {
sds masteruser = sdsnew(server.masteruser);
err = sendSynchronousCommand(SYNC_CMD_WRITE | SYNC_CMD_WRITE_SDS, conn, auth,
masteruser, server.masterauth, NULL);
sdsfree(masteruser);
} else {
err = sendSynchronousCommand(SYNC_CMD_WRITE | SYNC_CMD_WRITE_SDS, conn, auth,
server.masterauth, NULL);
args[argc] = server.masteruser;
lens[argc] = strlen(server.masteruser);
argc++;
}
sdsfree(auth);
args[argc] = server.masterauth;
lens[argc] = sdslen(server.masterauth);
argc++;
err = sendCommandArgv(conn, argc, args, lens);
if (err) goto write_error;
server.repl_state = REPL_STATE_RECEIVE_AUTH;
return;
@ -2194,7 +2217,7 @@ void syncWithMaster(connection *conn) {
/* Receive AUTH reply. */
if (server.repl_state == REPL_STATE_RECEIVE_AUTH) {
err = sendSynchronousCommand(SYNC_CMD_READ,conn,NULL);
err = receiveSynchronousResponse(conn);
if (err[0] == '-') {
serverLog(LL_WARNING,"Unable to AUTH to MASTER: %s",err);
sdsfree(err);
@ -2212,7 +2235,7 @@ void syncWithMaster(connection *conn) {
else if (server.tls_replication && server.tls_port) port = server.tls_port;
else port = server.port;
sds portstr = sdsfromlonglong(port);
err = sendSynchronousCommand(SYNC_CMD_WRITE,conn,"REPLCONF",
err = sendCommand(conn,"REPLCONF",
"listening-port",portstr, NULL);
sdsfree(portstr);
if (err) goto write_error;
@ -2223,7 +2246,7 @@ void syncWithMaster(connection *conn) {
/* Receive REPLCONF listening-port reply. */
if (server.repl_state == REPL_STATE_RECEIVE_PORT) {
err = sendSynchronousCommand(SYNC_CMD_READ,conn,NULL);
err = receiveSynchronousResponse(conn);
/* Ignore the error if any, not all the Redis versions support
* REPLCONF listening-port. */
if (err[0] == '-') {
@ -2244,7 +2267,7 @@ void syncWithMaster(connection *conn) {
/* Set the slave ip, so that Master's INFO command can list the
* slave IP address port correctly in case of port forwarding or NAT. */
if (server.repl_state == REPL_STATE_SEND_IP) {
err = sendSynchronousCommand(SYNC_CMD_WRITE,conn,"REPLCONF",
err = sendCommand(conn,"REPLCONF",
"ip-address",server.slave_announce_ip, NULL);
if (err) goto write_error;
sdsfree(err);
@ -2254,7 +2277,7 @@ void syncWithMaster(connection *conn) {
/* Receive REPLCONF ip-address reply. */
if (server.repl_state == REPL_STATE_RECEIVE_IP) {
err = sendSynchronousCommand(SYNC_CMD_READ,conn,NULL);
err = receiveSynchronousResponse(conn);
/* Ignore the error if any, not all the Redis versions support
* REPLCONF listening-port. */
if (err[0] == '-') {
@ -2272,7 +2295,7 @@ void syncWithMaster(connection *conn) {
*
* The master will ignore capabilities it does not understand. */
if (server.repl_state == REPL_STATE_SEND_CAPA) {
err = sendSynchronousCommand(SYNC_CMD_WRITE,conn,"REPLCONF",
err = sendCommand(conn,"REPLCONF",
"capa","eof","capa","psync2",NULL);
if (err) goto write_error;
sdsfree(err);
@ -2282,7 +2305,7 @@ void syncWithMaster(connection *conn) {
/* Receive CAPA reply. */
if (server.repl_state == REPL_STATE_RECEIVE_CAPA) {
err = sendSynchronousCommand(SYNC_CMD_READ,conn,NULL);
err = receiveSynchronousResponse(conn);
/* Ignore the error if any, not all the Redis versions support
* REPLCONF capa. */
if (err[0] == '-') {
@ -2403,7 +2426,7 @@ error:
server.repl_state = REPL_STATE_CONNECT;
return;
write_error: /* Handle sendSynchronousCommand(SYNC_CMD_WRITE) errors. */
write_error: /* Handle sendCommand() errors. */
serverLog(LL_WARNING,"Sending command to master in replication handshake: %s", err);
sdsfree(err);
goto error;