Take a pointer to the relevant entry of the command table in the client structure. This is generally a more sounding design, simplifies a few functions prototype, and as a side effect fixes a bug related to the conversion of EXPIRE -1 to DEL: before of this fix Redis tried to convert it into an EXPIREAT in the AOF code, regardless of our rewrite of the command.

This commit is contained in:
antirez 2011-07-08 12:59:30 +02:00
parent 5521fa6a9f
commit 09e2d9eeba
5 changed files with 45 additions and 31 deletions

View File

@ -24,14 +24,14 @@ void freeClientMultiState(redisClient *c) {
} }
/* Add a new command into the MULTI commands queue */ /* Add a new command into the MULTI commands queue */
void queueMultiCommand(redisClient *c, struct redisCommand *cmd) { void queueMultiCommand(redisClient *c) {
multiCmd *mc; multiCmd *mc;
int j; int j;
c->mstate.commands = zrealloc(c->mstate.commands, c->mstate.commands = zrealloc(c->mstate.commands,
sizeof(multiCmd)*(c->mstate.count+1)); sizeof(multiCmd)*(c->mstate.count+1));
mc = c->mstate.commands+c->mstate.count; mc = c->mstate.commands+c->mstate.count;
mc->cmd = cmd; mc->cmd = c->cmd;
mc->argc = c->argc; mc->argc = c->argc;
mc->argv = zmalloc(sizeof(robj*)*c->argc); mc->argv = zmalloc(sizeof(robj*)*c->argc);
memcpy(mc->argv,c->argv,sizeof(robj*)*c->argc); memcpy(mc->argv,c->argv,sizeof(robj*)*c->argc);
@ -78,6 +78,7 @@ void execCommand(redisClient *c) {
int j; int j;
robj **orig_argv; robj **orig_argv;
int orig_argc; int orig_argc;
struct redisCommand *orig_cmd;
if (!(c->flags & REDIS_MULTI)) { if (!(c->flags & REDIS_MULTI)) {
addReplyError(c,"EXEC without MULTI"); addReplyError(c,"EXEC without MULTI");
@ -105,18 +106,22 @@ void execCommand(redisClient *c) {
unwatchAllKeys(c); /* Unwatch ASAP otherwise we'll waste CPU cycles */ unwatchAllKeys(c); /* Unwatch ASAP otherwise we'll waste CPU cycles */
orig_argv = c->argv; orig_argv = c->argv;
orig_argc = c->argc; orig_argc = c->argc;
orig_cmd = c->cmd;
addReplyMultiBulkLen(c,c->mstate.count); addReplyMultiBulkLen(c,c->mstate.count);
for (j = 0; j < c->mstate.count; j++) { for (j = 0; j < c->mstate.count; j++) {
c->argc = c->mstate.commands[j].argc; c->argc = c->mstate.commands[j].argc;
c->argv = c->mstate.commands[j].argv; c->argv = c->mstate.commands[j].argv;
call(c,c->mstate.commands[j].cmd); c->cmd = c->mstate.commands[j].cmd;
call(c);
/* Commands may alter argc/argv, restore mstate. */ /* Commands may alter argc/argv, restore mstate. */
c->mstate.commands[j].argc = c->argc; c->mstate.commands[j].argc = c->argc;
c->mstate.commands[j].argv = c->argv; c->mstate.commands[j].argv = c->argv;
c->mstate.commands[j].cmd = c->cmd;
} }
c->argv = orig_argv; c->argv = orig_argv;
c->argc = orig_argc; c->argc = orig_argc;
c->cmd = orig_cmd;
freeClientMultiState(c); freeClientMultiState(c);
initClientMultiState(c); initClientMultiState(c);
c->flags &= ~(REDIS_MULTI|REDIS_DIRTY_CAS); c->flags &= ~(REDIS_MULTI|REDIS_DIRTY_CAS);

View File

@ -30,6 +30,7 @@ redisClient *createClient(int fd) {
c->reqtype = 0; c->reqtype = 0;
c->argc = 0; c->argc = 0;
c->argv = NULL; c->argv = NULL;
c->cmd = NULL;
c->multibulklen = 0; c->multibulklen = 0;
c->bulklen = -1; c->bulklen = -1;
c->sentlen = 0; c->sentlen = 0;
@ -447,6 +448,7 @@ static void freeClientArgv(redisClient *c) {
for (j = 0; j < c->argc; j++) for (j = 0; j < c->argc; j++)
decrRefCount(c->argv[j]); decrRefCount(c->argv[j]);
c->argc = 0; c->argc = 0;
c->cmd = NULL;
} }
void freeClient(redisClient *c) { void freeClient(redisClient *c) {
@ -947,5 +949,7 @@ void rewriteClientCommandVector(redisClient *c, int argc, ...) {
/* Replace argv and argc with our new versions. */ /* Replace argv and argc with our new versions. */
c->argv = argv; c->argv = argv;
c->argc = argc; c->argc = argc;
c->cmd = lookupCommand(c->argv[0]->ptr);
redisAssert(c->cmd != NULL);
va_end(ap); va_end(ap);
} }

View File

@ -1005,20 +1005,20 @@ struct redisCommand *lookupCommandByCString(char *s) {
} }
/* Call() is the core of Redis execution of a command */ /* Call() is the core of Redis execution of a command */
void call(redisClient *c, struct redisCommand *cmd) { void call(redisClient *c) {
long long dirty, start = ustime(), duration; long long dirty, start = ustime(), duration;
dirty = server.dirty; dirty = server.dirty;
cmd->proc(c); c->cmd->proc(c);
dirty = server.dirty-dirty; dirty = server.dirty-dirty;
duration = ustime()-start; duration = ustime()-start;
cmd->microseconds += duration; c->cmd->microseconds += duration;
slowlogPushEntryIfNeeded(c->argv,c->argc,duration); slowlogPushEntryIfNeeded(c->argv,c->argc,duration);
cmd->calls++; c->cmd->calls++;
if (server.appendonly && dirty) if (server.appendonly && dirty)
feedAppendOnlyFile(cmd,c->db->id,c->argv,c->argc); feedAppendOnlyFile(c->cmd,c->db->id,c->argv,c->argc);
if ((dirty || cmd->flags & REDIS_CMD_FORCE_REPLICATION) && if ((dirty || c->cmd->flags & REDIS_CMD_FORCE_REPLICATION) &&
listLength(server.slaves)) listLength(server.slaves))
replicationFeedSlaves(server.slaves,c->db->id,c->argv,c->argc); replicationFeedSlaves(server.slaves,c->db->id,c->argv,c->argc);
if (listLength(server.monitors)) if (listLength(server.monitors))
@ -1035,8 +1035,6 @@ void call(redisClient *c, struct redisCommand *cmd) {
* and other operations can be performed by the caller. Otherwise * and other operations can be performed by the caller. Otherwise
* if 0 is returned the client was destroied (i.e. after QUIT). */ * if 0 is returned the client was destroied (i.e. after QUIT). */
int processCommand(redisClient *c) { int processCommand(redisClient *c) {
struct redisCommand *cmd;
/* The QUIT command is handled separately. Normal command procs will /* The QUIT command is handled separately. Normal command procs will
* go through checking for replication and QUIT will cause trouble * go through checking for replication and QUIT will cause trouble
* when FORCE_REPLICATION is enabled and would be implemented in * when FORCE_REPLICATION is enabled and would be implemented in
@ -1048,28 +1046,29 @@ int processCommand(redisClient *c) {
} }
/* Now lookup the command and check ASAP about trivial error conditions /* Now lookup the command and check ASAP about trivial error conditions
* such wrong arity, bad command name and so forth. */ * such as wrong arity, bad command name and so forth. */
cmd = lookupCommand(c->argv[0]->ptr); c->cmd = lookupCommand(c->argv[0]->ptr);
if (!cmd) { if (!c->cmd) {
addReplyErrorFormat(c,"unknown command '%s'", addReplyErrorFormat(c,"unknown command '%s'",
(char*)c->argv[0]->ptr); (char*)c->argv[0]->ptr);
return REDIS_OK; return REDIS_OK;
} else if ((cmd->arity > 0 && cmd->arity != c->argc) || } else if ((c->cmd->arity > 0 && c->cmd->arity != c->argc) ||
(c->argc < -cmd->arity)) { (c->argc < -c->cmd->arity)) {
addReplyErrorFormat(c,"wrong number of arguments for '%s' command", addReplyErrorFormat(c,"wrong number of arguments for '%s' command",
cmd->name); c->cmd->name);
return REDIS_OK; return REDIS_OK;
} }
/* Check if the user is authenticated */ /* Check if the user is authenticated */
if (server.requirepass && !c->authenticated && cmd->proc != authCommand) { if (server.requirepass && !c->authenticated && c->cmd->proc != authCommand)
{
addReplyError(c,"operation not permitted"); addReplyError(c,"operation not permitted");
return REDIS_OK; return REDIS_OK;
} }
/* If cluster is enabled, redirect here */ /* If cluster is enabled, redirect here */
if (server.cluster_enabled && if (server.cluster_enabled &&
!(cmd->getkeys_proc == NULL && cmd->firstkey == 0)) { !(c->cmd->getkeys_proc == NULL && c->cmd->firstkey == 0)) {
int hashslot; int hashslot;
if (server.cluster.state != REDIS_CLUSTER_OK) { if (server.cluster.state != REDIS_CLUSTER_OK) {
@ -1077,7 +1076,7 @@ int processCommand(redisClient *c) {
return REDIS_OK; return REDIS_OK;
} else { } else {
int ask; int ask;
clusterNode *n = getNodeByQuery(c,cmd,c->argv,c->argc,&hashslot,&ask); clusterNode *n = getNodeByQuery(c,c->cmd,c->argv,c->argc,&hashslot,&ask);
if (n == NULL) { if (n == NULL) {
addReplyError(c,"Multi keys request invalid in cluster"); addReplyError(c,"Multi keys request invalid in cluster");
return REDIS_OK; return REDIS_OK;
@ -1096,7 +1095,7 @@ int processCommand(redisClient *c) {
* keys in the dataset). If there are not the only thing we can do * keys in the dataset). If there are not the only thing we can do
* is returning an error. */ * is returning an error. */
if (server.maxmemory) freeMemoryIfNeeded(); if (server.maxmemory) freeMemoryIfNeeded();
if (server.maxmemory && (cmd->flags & REDIS_CMD_DENYOOM) && if (server.maxmemory && (c->cmd->flags & REDIS_CMD_DENYOOM) &&
zmalloc_used_memory() > server.maxmemory) zmalloc_used_memory() > server.maxmemory)
{ {
addReplyError(c,"command not allowed when used memory > 'maxmemory'"); addReplyError(c,"command not allowed when used memory > 'maxmemory'");
@ -1106,8 +1105,10 @@ int processCommand(redisClient *c) {
/* Only allow SUBSCRIBE and UNSUBSCRIBE in the context of Pub/Sub */ /* Only allow SUBSCRIBE and UNSUBSCRIBE in the context of Pub/Sub */
if ((dictSize(c->pubsub_channels) > 0 || listLength(c->pubsub_patterns) > 0) if ((dictSize(c->pubsub_channels) > 0 || listLength(c->pubsub_patterns) > 0)
&& &&
cmd->proc != subscribeCommand && cmd->proc != unsubscribeCommand && c->cmd->proc != subscribeCommand &&
cmd->proc != psubscribeCommand && cmd->proc != punsubscribeCommand) { c->cmd->proc != unsubscribeCommand &&
c->cmd->proc != psubscribeCommand &&
c->cmd->proc != punsubscribeCommand) {
addReplyError(c,"only (P)SUBSCRIBE / (P)UNSUBSCRIBE / QUIT allowed in this context"); addReplyError(c,"only (P)SUBSCRIBE / (P)UNSUBSCRIBE / QUIT allowed in this context");
return REDIS_OK; return REDIS_OK;
} }
@ -1116,7 +1117,7 @@ int processCommand(redisClient *c) {
* we are a slave with a broken link with master. */ * we are a slave with a broken link with master. */
if (server.masterhost && server.replstate != REDIS_REPL_CONNECTED && if (server.masterhost && server.replstate != REDIS_REPL_CONNECTED &&
server.repl_serve_stale_data == 0 && server.repl_serve_stale_data == 0 &&
cmd->proc != infoCommand && cmd->proc != slaveofCommand) c->cmd->proc != infoCommand && c->cmd->proc != slaveofCommand)
{ {
addReplyError(c, addReplyError(c,
"link with MASTER is down and slave-serve-stale-data is set to no"); "link with MASTER is down and slave-serve-stale-data is set to no");
@ -1124,20 +1125,20 @@ int processCommand(redisClient *c) {
} }
/* Loading DB? Return an error if the command is not INFO */ /* Loading DB? Return an error if the command is not INFO */
if (server.loading && cmd->proc != infoCommand) { if (server.loading && c->cmd->proc != infoCommand) {
addReply(c, shared.loadingerr); addReply(c, shared.loadingerr);
return REDIS_OK; return REDIS_OK;
} }
/* Exec the command */ /* Exec the command */
if (c->flags & REDIS_MULTI && if (c->flags & REDIS_MULTI &&
cmd->proc != execCommand && cmd->proc != discardCommand && c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&
cmd->proc != multiCommand && cmd->proc != watchCommand) c->cmd->proc != multiCommand && c->cmd->proc != watchCommand)
{ {
queueMultiCommand(c,cmd); queueMultiCommand(c);
addReply(c,shared.queued); addReply(c,shared.queued);
} else { } else {
call(c,cmd); call(c);
} }
return REDIS_OK; return REDIS_OK;
} }

View File

@ -309,6 +309,7 @@ typedef struct redisClient {
sds querybuf; sds querybuf;
int argc; int argc;
robj **argv; robj **argv;
struct redisCommand *cmd;
int reqtype; int reqtype;
int multibulklen; /* number of multi bulk arguments left to read */ int multibulklen; /* number of multi bulk arguments left to read */
long bulklen; /* length of bulk argument in multi bulk request */ long bulklen; /* length of bulk argument in multi bulk request */
@ -803,7 +804,7 @@ void popGenericCommand(redisClient *c, int where);
void unwatchAllKeys(redisClient *c); void unwatchAllKeys(redisClient *c);
void initClientMultiState(redisClient *c); void initClientMultiState(redisClient *c);
void freeClientMultiState(redisClient *c); void freeClientMultiState(redisClient *c);
void queueMultiCommand(redisClient *c, struct redisCommand *cmd); void queueMultiCommand(redisClient *c);
void touchWatchedKey(redisDb *db, robj *key); void touchWatchedKey(redisDb *db, robj *key);
void touchWatchedKeysOnFlush(int dbid); void touchWatchedKeysOnFlush(int dbid);
@ -914,7 +915,7 @@ int processCommand(redisClient *c);
void setupSignalHandlers(void); void setupSignalHandlers(void);
struct redisCommand *lookupCommand(sds name); struct redisCommand *lookupCommand(sds name);
struct redisCommand *lookupCommandByCString(char *s); struct redisCommand *lookupCommandByCString(char *s);
void call(redisClient *c, struct redisCommand *cmd); void call(redisClient *c);
int prepareForShutdown(); int prepareForShutdown();
void redisLog(int level, const char *fmt, ...); void redisLog(int level, const char *fmt, ...);
void usage(); void usage();

View File

@ -910,6 +910,7 @@ void blockingPopGenericCommand(redisClient *c, int where) {
if (listTypeLength(o) != 0) { if (listTypeLength(o) != 0) {
/* If the list contains elements fall back to the usual /* If the list contains elements fall back to the usual
* non-blocking POP operation */ * non-blocking POP operation */
struct redisCommand *orig_cmd;
robj *argv[2], **orig_argv; robj *argv[2], **orig_argv;
int orig_argc; int orig_argc;
@ -917,6 +918,7 @@ void blockingPopGenericCommand(redisClient *c, int where) {
* popGenericCommand() as the command takes a single key. */ * popGenericCommand() as the command takes a single key. */
orig_argv = c->argv; orig_argv = c->argv;
orig_argc = c->argc; orig_argc = c->argc;
orig_cmd = c->cmd;
argv[1] = c->argv[j]; argv[1] = c->argv[j];
c->argv = argv; c->argv = argv;
c->argc = 2; c->argc = 2;
@ -934,6 +936,7 @@ void blockingPopGenericCommand(redisClient *c, int where) {
/* Fix the client structure with the original stuff */ /* Fix the client structure with the original stuff */
c->argv = orig_argv; c->argv = orig_argv;
c->argc = orig_argc; c->argc = orig_argc;
c->cmd = orig_cmd;
return; return;
} }