Add redis-cli support for diskless replication (CAPA EOF)

when setting repl-diskless-sync yes, and sending SYNC.
redis-cli needs to be able to understand the EOF marker protocol
in order to be able to skip or download the rdb file
This commit is contained in:
Oran Agra 2018-04-02 18:36:17 +03:00
parent 8ac7af1c5d
commit d56f4b4122

View File

@ -1793,9 +1793,31 @@ static void latencyDistMode(void) {
* Slave mode * Slave mode
*--------------------------------------------------------------------------- */ *--------------------------------------------------------------------------- */
#define RDB_EOF_MARK_SIZE 40
void sendReplconf(const char* arg1, const char* arg2) {
printf("sending REPLCONF %s %s\n", arg1, arg2);
redisReply *reply = redisCommand(context, "REPLCONF %s %s", arg1, arg2);
/* Handle any error conditions */
if(reply == NULL) {
fprintf(stderr, "\nI/O error\n");
exit(1);
} else if(reply->type == REDIS_REPLY_ERROR) {
fprintf(stderr, "REPLCONF %s error: %s\n", arg1, reply->str);
/* non fatal, old versions may not support it */
}
freeReplyObject(reply);
}
void sendCapa() {
sendReplconf("capa", "eof");
}
/* Sends SYNC and reads the number of bytes in the payload. Used both by /* Sends SYNC and reads the number of bytes in the payload. Used both by
* slaveMode() and getRDB(). */ * slaveMode() and getRDB().
unsigned long long sendSync(int fd) { * returns 0 in case an EOF marker is used. */
unsigned long long sendSync(int fd, char *out_eof) {
/* To start we need to send the SYNC command and return the payload. /* To start we need to send the SYNC command and return the payload.
* The hiredis client lib does not understand this part of the protocol * The hiredis client lib does not understand this part of the protocol
* and we don't want to mess with its buffers, so everything is performed * and we don't want to mess with its buffers, so everything is performed
@ -1825,17 +1847,33 @@ unsigned long long sendSync(int fd) {
printf("SYNC with master failed: %s\n", buf); printf("SYNC with master failed: %s\n", buf);
exit(1); exit(1);
} }
if (strncmp(buf+1,"EOF:",4) == 0 && strlen(buf+5) >= RDB_EOF_MARK_SIZE) {
memcpy(out_eof, buf+5, RDB_EOF_MARK_SIZE);
return 0;
}
return strtoull(buf+1,NULL,10); return strtoull(buf+1,NULL,10);
} }
static void slaveMode(void) { static void slaveMode(void) {
int fd = context->fd; int fd = context->fd;
unsigned long long payload = sendSync(fd); static char eofmark[RDB_EOF_MARK_SIZE];
static char lastbytes[RDB_EOF_MARK_SIZE];
static int usemark = 0;
unsigned long long payload = sendSync(fd, eofmark);
char buf[1024]; char buf[1024];
int original_output = config.output; int original_output = config.output;
fprintf(stderr,"SYNC with master, discarding %llu " if (payload == 0) {
"bytes of bulk transfer...\n", payload); payload = ULLONG_MAX;
memset(lastbytes,0,RDB_EOF_MARK_SIZE);
usemark = 1;
fprintf(stderr,"SYNC with master, discarding "
"bytes of bulk transfer until EOF marker...\n");
} else {
fprintf(stderr,"SYNC with master, discarding %llu "
"bytes of bulk transfer...\n", payload);
}
/* Discard the payload. */ /* Discard the payload. */
while(payload) { while(payload) {
@ -1847,8 +1885,29 @@ static void slaveMode(void) {
exit(1); exit(1);
} }
payload -= nread; payload -= nread;
if (usemark) {
/* Update the last bytes array, and check if it matches our delimiter.*/
if (nread >= RDB_EOF_MARK_SIZE) {
memcpy(lastbytes,buf+nread-RDB_EOF_MARK_SIZE,RDB_EOF_MARK_SIZE);
} else {
int rem = RDB_EOF_MARK_SIZE-nread;
memmove(lastbytes,lastbytes+nread,rem);
memcpy(lastbytes+rem,buf,nread);
}
if (memcmp(lastbytes,eofmark,RDB_EOF_MARK_SIZE) == 0)
break;
}
} }
fprintf(stderr,"SYNC done. Logging commands from master.\n");
if (usemark) {
unsigned long long offset = ULLONG_MAX - payload;
fprintf(stderr,"SYNC done after %llu bytes. Logging commands from master.\n", offset);
/* put the slave online */
sleep(1);
sendReplconf("ACK", "0");
} else
fprintf(stderr,"SYNC done. Logging commands from master.\n");
/* Now we can use hiredis to read the incoming protocol. */ /* Now we can use hiredis to read the incoming protocol. */
config.output = OUTPUT_CSV; config.output = OUTPUT_CSV;
@ -1865,11 +1924,22 @@ static void slaveMode(void) {
static void getRDB(void) { static void getRDB(void) {
int s = context->fd; int s = context->fd;
int fd; int fd;
unsigned long long payload = sendSync(s); static char eofmark[RDB_EOF_MARK_SIZE];
static char lastbytes[RDB_EOF_MARK_SIZE];
static int usemark = 0;
unsigned long long payload = sendSync(s, eofmark);
char buf[4096]; char buf[4096];
fprintf(stderr,"SYNC sent to master, writing %llu bytes to '%s'\n", if (payload == 0) {
payload, config.rdb_filename); payload = ULLONG_MAX;
memset(lastbytes,0,RDB_EOF_MARK_SIZE);
usemark = 1;
fprintf(stderr,"SYNC sent to master, writing bytes of bulk transfer until EOF marker to '%s'\n",
config.rdb_filename);
} else {
fprintf(stderr,"SYNC sent to master, writing %llu bytes to '%s'\n",
payload, config.rdb_filename);
}
/* Write to file. */ /* Write to file. */
if (!strcmp(config.rdb_filename,"-")) { if (!strcmp(config.rdb_filename,"-")) {
@ -1898,11 +1968,31 @@ static void getRDB(void) {
exit(1); exit(1);
} }
payload -= nread; payload -= nread;
if (usemark) {
/* Update the last bytes array, and check if it matches our delimiter.*/
if (nread >= RDB_EOF_MARK_SIZE) {
memcpy(lastbytes,buf+nread-RDB_EOF_MARK_SIZE,RDB_EOF_MARK_SIZE);
} else {
int rem = RDB_EOF_MARK_SIZE-nread;
memmove(lastbytes,lastbytes+nread,rem);
memcpy(lastbytes+rem,buf,nread);
}
if (memcmp(lastbytes,eofmark,RDB_EOF_MARK_SIZE) == 0)
break;
}
}
if (usemark) {
payload = ULLONG_MAX - payload - RDB_EOF_MARK_SIZE;
if (ftruncate(fd, payload) == -1)
fprintf(stderr,"ftruncate failed: %s.\n", strerror(errno));
fprintf(stderr,"Transfer finished with success after %llu bytes\n", payload);
} else {
fprintf(stderr,"Transfer finished with success.\n");
} }
close(s); /* Close the file descriptor ASAP as fsync() may take time. */ close(s); /* Close the file descriptor ASAP as fsync() may take time. */
fsync(fd); fsync(fd);
close(fd); close(fd);
fprintf(stderr,"Transfer finished with success.\n");
exit(0); exit(0);
} }
@ -2893,12 +2983,14 @@ int main(int argc, char **argv) {
/* Slave mode */ /* Slave mode */
if (config.slave_mode) { if (config.slave_mode) {
if (cliConnect(0) == REDIS_ERR) exit(1); if (cliConnect(0) == REDIS_ERR) exit(1);
sendCapa();
slaveMode(); slaveMode();
} }
/* Get RDB mode. */ /* Get RDB mode. */
if (config.getrdb_mode) { if (config.getrdb_mode) {
if (cliConnect(0) == REDIS_ERR) exit(1); if (cliConnect(0) == REDIS_ERR) exit(1);
sendCapa();
getRDB(); getRDB();
} }