Add last_dbid to migrateCachedSocket to avoid redundant SELECT

Avoid redundant SELECT calls when continuously migrating keys to
the same dbid within a target Redis instance.
This commit is contained in:
Tommy Wang 2015-02-25 12:29:06 -06:00 committed by antirez
parent cc0d339bd1
commit 7fda935ad3

View File

@ -4362,11 +4362,12 @@ void restoreCommand(redisClient *c) {
typedef struct migrateCachedSocket { typedef struct migrateCachedSocket {
int fd; int fd;
long last_dbid;
time_t last_use_time; time_t last_use_time;
} migrateCachedSocket; } migrateCachedSocket;
/* Return a TCP socket connected with the target instance, possibly returning /* Return a migrateCachedSocket containing a TCP socket connected with the
* a cached one. * target instance, possibly returning a cached one.
* *
* This function is responsible of sending errors to the client if a * This function is responsible of sending errors to the client if a
* connection can't be established. In this case -1 is returned. * connection can't be established. In this case -1 is returned.
@ -4376,7 +4377,7 @@ typedef struct migrateCachedSocket {
* If the caller detects an error while using the socket, migrateCloseSocket() * If the caller detects an error while using the socket, migrateCloseSocket()
* should be called so that the connection will be created from scratch * should be called so that the connection will be created from scratch
* the next time. */ * the next time. */
int migrateGetSocket(redisClient *c, robj *host, robj *port, long timeout) { migrateCachedSocket* migrateGetSocket(redisClient *c, robj *host, robj *port, long timeout) {
int fd; int fd;
sds name = sdsempty(); sds name = sdsempty();
migrateCachedSocket *cs; migrateCachedSocket *cs;
@ -4389,7 +4390,7 @@ int migrateGetSocket(redisClient *c, robj *host, robj *port, long timeout) {
if (cs) { if (cs) {
sdsfree(name); sdsfree(name);
cs->last_use_time = server.unixtime; cs->last_use_time = server.unixtime;
return cs->fd; return cs;
} }
/* No cached socket, create one. */ /* No cached socket, create one. */
@ -4409,7 +4410,7 @@ int migrateGetSocket(redisClient *c, robj *host, robj *port, long timeout) {
sdsfree(name); sdsfree(name);
addReplyErrorFormat(c,"Can't connect to target node: %s", addReplyErrorFormat(c,"Can't connect to target node: %s",
server.neterr); server.neterr);
return -1; return NULL;
} }
anetEnableTcpNoDelay(server.neterr,fd); anetEnableTcpNoDelay(server.neterr,fd);
@ -4419,15 +4420,16 @@ int migrateGetSocket(redisClient *c, robj *host, robj *port, long timeout) {
addReplySds(c, addReplySds(c,
sdsnew("-IOERR error or timeout connecting to the client\r\n")); sdsnew("-IOERR error or timeout connecting to the client\r\n"));
close(fd); close(fd);
return -1; return NULL;
} }
/* Add to the cache and return it to the caller. */ /* Add to the cache and return it to the caller. */
cs = zmalloc(sizeof(*cs)); cs = zmalloc(sizeof(*cs));
cs->fd = fd; cs->fd = fd;
cs->last_dbid = -1;
cs->last_use_time = server.unixtime; cs->last_use_time = server.unixtime;
dictAdd(server.migrate_cached_sockets,name,cs); dictAdd(server.migrate_cached_sockets,name,cs);
return fd; return cs;
} }
/* Free a migrate cached connection. */ /* Free a migrate cached connection. */
@ -4468,7 +4470,8 @@ void migrateCloseTimedoutSockets(void) {
/* MIGRATE host port key dbid timeout [COPY | REPLACE] */ /* MIGRATE host port key dbid timeout [COPY | REPLACE] */
void migrateCommand(redisClient *c) { void migrateCommand(redisClient *c) {
int fd, copy, replace, j; migrateCachedSocket *cs;
int copy, replace, j;
long timeout; long timeout;
long dbid; long dbid;
long long ttl, expireat; long long ttl, expireat;
@ -4478,6 +4481,7 @@ void migrateCommand(redisClient *c) {
try_again: try_again:
/* Initialization */ /* Initialization */
cs = NULL;
copy = 0; copy = 0;
replace = 0; replace = 0;
ttl = 0; ttl = 0;
@ -4510,14 +4514,17 @@ try_again:
} }
/* Connect */ /* Connect */
fd = migrateGetSocket(c,c->argv[1],c->argv[2],timeout); cs = migrateGetSocket(c,c->argv[1],c->argv[2],timeout);
if (fd == -1) return; /* error sent to the client by migrateGetSocket() */ if (cs == NULL) return; /* error sent to the client by migrateGetSocket() */
rioInitWithBuffer(&cmd,sdsempty());
/* Create RESTORE payload and generate the protocol to call the command. */ /* Create RESTORE payload and generate the protocol to call the command. */
rioInitWithBuffer(&cmd,sdsempty()); if (cs->last_dbid != dbid) {
redisAssertWithInfo(c,NULL,rioWriteBulkCount(&cmd,'*',2)); redisAssertWithInfo(c,NULL,rioWriteBulkCount(&cmd,'*',2));
redisAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"SELECT",6)); redisAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"SELECT",6));
redisAssertWithInfo(c,NULL,rioWriteBulkLongLong(&cmd,dbid)); redisAssertWithInfo(c,NULL,rioWriteBulkLongLong(&cmd,dbid));
}
expireat = getExpire(c->db,c->argv[3]); expireat = getExpire(c->db,c->argv[3]);
if (expireat != -1) { if (expireat != -1) {
@ -4556,7 +4563,7 @@ try_again:
while ((towrite = sdslen(buf)-pos) > 0) { while ((towrite = sdslen(buf)-pos) > 0) {
towrite = (towrite > (64*1024) ? (64*1024) : towrite); towrite = (towrite > (64*1024) ? (64*1024) : towrite);
nwritten = syncWrite(fd,buf+pos,towrite,timeout); nwritten = syncWrite(cs->fd,buf+pos,towrite,timeout);
if (nwritten != (signed)towrite) goto socket_wr_err; if (nwritten != (signed)towrite) goto socket_wr_err;
pos += nwritten; pos += nwritten;
} }
@ -4568,14 +4575,18 @@ try_again:
char buf2[1024]; char buf2[1024];
/* Read the two replies */ /* Read the two replies */
if (syncReadLine(fd, buf1, sizeof(buf1), timeout) <= 0) if (cs->last_dbid != dbid && syncReadLine(cs->fd, buf1, sizeof(buf1), timeout) <= 0)
goto socket_rd_err; goto socket_rd_err;
if (syncReadLine(fd, buf2, sizeof(buf2), timeout) <= 0) if (syncReadLine(cs->fd, buf2, sizeof(buf2), timeout) <= 0)
goto socket_rd_err; goto socket_rd_err;
if (buf1[0] == '-' || buf2[0] == '-') { if ((cs->last_dbid != dbid && buf1[0] == '-') || buf2[0] == '-') {
/* If we got an error at all, assume that the last_dbid is no longer valid */
cs->last_dbid = -1;
addReplyErrorFormat(c,"Target instance replied with error: %s", addReplyErrorFormat(c,"Target instance replied with error: %s",
(buf1[0] == '-') ? buf1+1 : buf2+1); (cs->last_dbid != dbid && buf1[0] == '-') ? buf1+1 : buf2+1);
} else { } else {
/* Update the last_dbid in migrateCachedSocket */
cs->last_dbid = dbid;
robj *aux; robj *aux;
if (!copy) { if (!copy) {
@ -4586,10 +4597,12 @@ try_again:
addReply(c,shared.ok); addReply(c,shared.ok);
server.dirty++; server.dirty++;
/* Translate MIGRATE as DEL for replication/AOF. */ if (!copy) {
aux = createStringObject("DEL",3); /* Translate MIGRATE as DEL for replication/AOF. */
rewriteClientCommandVector(c,2,aux,c->argv[3]); aux = createStringObject("DEL",3);
decrRefCount(aux); rewriteClientCommandVector(c,2,aux,c->argv[3]);
decrRefCount(aux);
}
} }
} }