diff --git a/src/blocked.c b/src/blocked.c index 2009c9a24..5f4fa9ec2 100644 --- a/src/blocked.c +++ b/src/blocked.c @@ -108,18 +108,9 @@ void updateStatsOnUnblock(client *c, long blocked_us, long reply_us){ c->lastcmd->microseconds += total_cmd_duration; /* Log the command into the Slow log if needed. */ - if (!(c->lastcmd->flags & CMD_SKIP_SLOWLOG) && - !(c->flags & CLIENT_PREVENT_LOGGING)) - { - slowlogPushEntryIfNeeded(c,c->argv,c->argc,total_cmd_duration); - /* Log the reply duration event. */ - latencyAddSampleIfNeeded("command-unblocking",reply_us/1000); - } - - /* Always clear the prevent logging field now. */ - if (c->flags & CLIENT_PREVENT_LOGGING) { - c->flags &= ~CLIENT_PREVENT_LOGGING; - } + slowlogPushCurrentCommand(c, c->lastcmd, total_cmd_duration); + /* Log the reply duration event. */ + latencyAddSampleIfNeeded("command-unblocking",reply_us/1000); } /* This function is called in the beforeSleep() function of the event loop @@ -196,6 +187,16 @@ void unblockClient(client *c) { } else { serverPanic("Unknown btype in unblockClient()."); } + + /* Reset the client for a new query since, for blocking commands + * we do not do it immediately after the command returns (when the + * client got blocked) in order to be still able to access the argument + * vector from module callbacks and updateStatsOnUnblock. */ + if (c->btype != BLOCKED_PAUSE) { + freeClientOriginalArgv(c); + resetClient(c); + } + /* Clear the flags, and put the client in the unblocked list so that * we'll process new commands in its query buffer ASAP. */ server.blocked_clients--; @@ -287,7 +288,6 @@ void serveClientsBlockedOnListKey(robj *o, readyList *rl) { * freed by the next unblockClient() * call. */ if (dstkey) incrRefCount(dstkey); - unblockClient(receiver); monotime replyTimer; elapsedStart(&replyTimer); @@ -300,6 +300,7 @@ void serveClientsBlockedOnListKey(robj *o, readyList *rl) { listTypePush(o,value,wherefrom); } updateStatsOnUnblock(receiver, 0, elapsedUs(replyTimer)); + unblockClient(receiver); if (dstkey) decrRefCount(dstkey); decrRefCount(value); @@ -343,11 +344,11 @@ void serveClientsBlockedOnSortedSetKey(robj *o, readyList *rl) { int where = (receiver->lastcmd && receiver->lastcmd->proc == bzpopminCommand) ? ZSET_MIN : ZSET_MAX; - unblockClient(receiver); monotime replyTimer; elapsedStart(&replyTimer); genericZpopCommand(receiver,&rl->key,1,where,1,NULL); updateStatsOnUnblock(receiver, 0, elapsedUs(replyTimer)); + unblockClient(receiver); zcard--; /* Replicate the command. */ diff --git a/src/module.c b/src/module.c index 4d6bdd0d4..cf2305b56 100644 --- a/src/module.c +++ b/src/module.c @@ -5188,11 +5188,6 @@ void unblockClientFromModule(client *c) { moduleUnblockClient(c); bc->client = NULL; - /* Reset the client for a new query since, for blocking commands implemented - * into modules, we do not it immediately after the command returns (and - * the client blocks) in order to be still able to access the argument - * vector from callbacks. */ - resetClient(c); } /* Block a client in the context of a module: this function implements both diff --git a/src/networking.c b/src/networking.c index 7484a8940..bd0bcadb2 100644 --- a/src/networking.c +++ b/src/networking.c @@ -1645,6 +1645,9 @@ void resetClient(client *c) { c->flags |= CLIENT_REPLY_SKIP; c->flags &= ~CLIENT_REPLY_SKIP_NEXT; } + + /* Always clear the prevent logging field. */ + c->flags &= ~CLIENT_PREVENT_LOGGING; } /* This function is used when we want to re-enter the event loop but there @@ -1954,13 +1957,10 @@ void commandProcessed(client *c) { c->reploff = c->read_reploff - sdslen(c->querybuf) + c->qb_pos; } - /* Don't reset the client structure for clients blocked in a - * module blocking command, so that the reply callback will - * still be able to access the client argv and argc field. - * The client will be reset in unblockClientFromModule(). */ - if (!(c->flags & CLIENT_BLOCKED) || - (c->btype != BLOCKED_MODULE && c->btype != BLOCKED_PAUSE)) - { + /* Don't reset the client structure for blocked clients, so that the reply + * callback will still be able to access the client argv and argc fields. + * The client will be reset in unblockClient(). */ + if (!(c->flags & CLIENT_BLOCKED)) { resetClient(c); } diff --git a/src/server.c b/src/server.c index 21519a18d..3df7a5a72 100644 --- a/src/server.c +++ b/src/server.c @@ -3639,6 +3639,19 @@ void preventCommandReplication(client *c) { c->flags |= CLIENT_PREVENT_REPL_PROP; } +/* Log the last command a client executed into the slowlog. */ +void slowlogPushCurrentCommand(client *c, struct redisCommand *cmd, ustime_t duration) { + /* Some commands may contain sensitive data that should not be available in the slowlog. */ + if ((c->flags & CLIENT_PREVENT_LOGGING) || (cmd->flags & CMD_SKIP_SLOWLOG)) + return; + + /* If command argument vector was rewritten, use the original + * arguments. */ + robj **argv = c->original_argv ? c->original_argv : c->argv; + int argc = c->original_argv ? c->original_argc : c->argc; + slowlogPushEntryIfNeeded(c,argv,argc,duration); +} + /* Call() is the core of Redis execution of a command. * * The following flags can be passed: @@ -3741,34 +3754,31 @@ void call(client *c, int flags) { server.lua_caller->flags |= CLIENT_FORCE_AOF; } - /* Some commands may contain sensitive data that should - * not be available in the slowlog. */ - if ((c->flags & CLIENT_PREVENT_LOGGING) && !(c->flags & CLIENT_BLOCKED)) { - c->flags &= ~CLIENT_PREVENT_LOGGING; - flags &= ~CMD_CALL_SLOWLOG; - } + /* Note: the code below uses the real command that was executed + * c->cmd and c->lastcmd may be different, in case of MULTI-EXEC or + * re-written commands such as EXPIRE, GEOADD, etc. */ - /* Log the command into the Slow log if needed, and populate the - * per-command statistics that we show in INFO commandstats. */ - if (flags & CMD_CALL_SLOWLOG && !(c->cmd->flags & CMD_SKIP_SLOWLOG)) { - char *latency_event = (c->cmd->flags & CMD_FAST) ? - "fast-command" : "command"; + /* Record the latency this command induced on the main thread. + * unless instructed by the caller not to log. (happens when processing + * a MULTI-EXEC from inside an AOF). */ + if (flags & CMD_CALL_SLOWLOG) { + char *latency_event = (real_cmd->flags & CMD_FAST) ? + "fast-command" : "command"; latencyAddSampleIfNeeded(latency_event,duration/1000); - /* If command argument vector was rewritten, use the original - * arguments. */ - robj **argv = c->original_argv ? c->original_argv : c->argv; - int argc = c->original_argv ? c->original_argc : c->argc; - /* If the client is blocked we will handle slowlog when it is unblocked . */ - if (!(c->flags & CLIENT_BLOCKED)) { - slowlogPushEntryIfNeeded(c,argv,argc,duration); - } } - freeClientOriginalArgv(c); + /* Log the command into the Slow log if needed. + * If the client is blocked we will handle slowlog when it is unblocked. */ + if ((flags & CMD_CALL_SLOWLOG) && !(c->flags & CLIENT_BLOCKED)) + slowlogPushCurrentCommand(c, real_cmd, duration); + + /* Clear the original argv. + * If the client is blocked we will handle slowlog when it is unblocked. */ + if (!(c->flags & CLIENT_BLOCKED)) + freeClientOriginalArgv(c); + + /* populate the per-command statistics that we show in INFO commandstats. */ if (flags & CMD_CALL_STATS) { - /* use the real command that was executed (cmd and lastamc) may be - * different, in case of MULTI-EXEC or re-written commands such as - * EXPIRE, GEOADD, etc. */ real_cmd->microseconds += duration; real_cmd->calls++; } diff --git a/src/server.h b/src/server.h index b462131a5..a9624cfc6 100644 --- a/src/server.h +++ b/src/server.h @@ -2208,6 +2208,7 @@ void preventCommandPropagation(client *c); void preventCommandLogging(client *c); void preventCommandAOF(client *c); void preventCommandReplication(client *c); +void slowlogPushCurrentCommand(client *c, struct redisCommand *cmd, ustime_t duration); int prepareForShutdown(int flags); #ifdef __GNUC__ void _serverLog(int level, const char *fmt, ...) diff --git a/tests/unit/slowlog.tcl b/tests/unit/slowlog.tcl index 4cbf6d8f0..eb9dfc65d 100644 --- a/tests/unit/slowlog.tcl +++ b/tests/unit/slowlog.tcl @@ -90,6 +90,22 @@ start_server {tags {"slowlog"} overrides {slowlog-log-slower-than 1000000}} { # INCRBYFLOAT is replicated as SET r INCRBYFLOAT A 1.0 assert_equal {INCRBYFLOAT A 1.0} [lindex [lindex [r slowlog get] 0] 3] + + # blocked BLPOP is replicated as LPOP + set rd [redis_deferring_client] + $rd blpop l 0 + wait_for_condition 50 100 { + [s blocked_clients] eq {1} + } else { + fail "Clients are not blocked" + } + r multi + r lpush l foo + r slowlog reset + r exec + $rd read + $rd close + assert_equal {blpop l 0} [lindex [lindex [r slowlog get] 0] 3] } test {SLOWLOG - commands with too many arguments are trimmed} {