mirror of
https://codeberg.org/redict/redict.git
synced 2025-01-22 16:18:28 -05:00
Fix SLOWLOG for blocked commands (#8632)
* SLOWLOG didn't record anything for blocked commands because the client was reset and argv was already empty. there was a fix for this issue specifically for modules, now it works for all blocked clients. * The original command argv (before being re-written) was also reset before adding the slowlog on behalf of the blocked command. * Latency monitor is now updated regardless of the slowlog flags of the command or its execution (their purpose is to hide sensitive info from the slowlog, not hide the fact the latency happened). * Latency monitor now uses real_cmd rather than c->cmd (which may be different if the command got re-written, e.g. GEOADD) Changes: * Unify shared code between slowlog insertion in call() and updateStatsOnUnblock(), hopefully prevent future bugs from happening due to the later being overlooked. * Reset CLIENT_PREVENT_LOGGING in resetClient rather than after command processing. * Add a test for SLOWLOG and BLPOP Notes: - real_cmd == c->lastcmd, except inside MULTI and Lua. - blocked commands never happen in these cases (MULTI / Lua) - real_cmd == c->cmd, except for when the command is rewritten (e.g. GEOADD) - blocked commands (currently) are never rewritten - other than the command's CLIENT_PREVENT_LOGGING, and the execution flag CLIENT_PREVENT_LOGGING, other cases that we want to avoid slowlog are on AOF loading (specifically CMD_CALL_SLOWLOG will be off when executed from execCommand that runs from an AOF)
This commit is contained in:
parent
4a2c4477c9
commit
497351ad07
@ -108,18 +108,9 @@ void updateStatsOnUnblock(client *c, long blocked_us, long reply_us){
|
||||
c->lastcmd->microseconds += total_cmd_duration;
|
||||
|
||||
/* Log the command into the Slow log if needed. */
|
||||
if (!(c->lastcmd->flags & CMD_SKIP_SLOWLOG) &&
|
||||
!(c->flags & CLIENT_PREVENT_LOGGING))
|
||||
{
|
||||
slowlogPushEntryIfNeeded(c,c->argv,c->argc,total_cmd_duration);
|
||||
/* Log the reply duration event. */
|
||||
latencyAddSampleIfNeeded("command-unblocking",reply_us/1000);
|
||||
}
|
||||
|
||||
/* Always clear the prevent logging field now. */
|
||||
if (c->flags & CLIENT_PREVENT_LOGGING) {
|
||||
c->flags &= ~CLIENT_PREVENT_LOGGING;
|
||||
}
|
||||
slowlogPushCurrentCommand(c, c->lastcmd, total_cmd_duration);
|
||||
/* Log the reply duration event. */
|
||||
latencyAddSampleIfNeeded("command-unblocking",reply_us/1000);
|
||||
}
|
||||
|
||||
/* This function is called in the beforeSleep() function of the event loop
|
||||
@ -196,6 +187,16 @@ void unblockClient(client *c) {
|
||||
} else {
|
||||
serverPanic("Unknown btype in unblockClient().");
|
||||
}
|
||||
|
||||
/* Reset the client for a new query since, for blocking commands
|
||||
* we do not do it immediately after the command returns (when the
|
||||
* client got blocked) in order to be still able to access the argument
|
||||
* vector from module callbacks and updateStatsOnUnblock. */
|
||||
if (c->btype != BLOCKED_PAUSE) {
|
||||
freeClientOriginalArgv(c);
|
||||
resetClient(c);
|
||||
}
|
||||
|
||||
/* Clear the flags, and put the client in the unblocked list so that
|
||||
* we'll process new commands in its query buffer ASAP. */
|
||||
server.blocked_clients--;
|
||||
@ -287,7 +288,6 @@ void serveClientsBlockedOnListKey(robj *o, readyList *rl) {
|
||||
* freed by the next unblockClient()
|
||||
* call. */
|
||||
if (dstkey) incrRefCount(dstkey);
|
||||
unblockClient(receiver);
|
||||
|
||||
monotime replyTimer;
|
||||
elapsedStart(&replyTimer);
|
||||
@ -300,6 +300,7 @@ void serveClientsBlockedOnListKey(robj *o, readyList *rl) {
|
||||
listTypePush(o,value,wherefrom);
|
||||
}
|
||||
updateStatsOnUnblock(receiver, 0, elapsedUs(replyTimer));
|
||||
unblockClient(receiver);
|
||||
|
||||
if (dstkey) decrRefCount(dstkey);
|
||||
decrRefCount(value);
|
||||
@ -343,11 +344,11 @@ void serveClientsBlockedOnSortedSetKey(robj *o, readyList *rl) {
|
||||
int where = (receiver->lastcmd &&
|
||||
receiver->lastcmd->proc == bzpopminCommand)
|
||||
? ZSET_MIN : ZSET_MAX;
|
||||
unblockClient(receiver);
|
||||
monotime replyTimer;
|
||||
elapsedStart(&replyTimer);
|
||||
genericZpopCommand(receiver,&rl->key,1,where,1,NULL);
|
||||
updateStatsOnUnblock(receiver, 0, elapsedUs(replyTimer));
|
||||
unblockClient(receiver);
|
||||
zcard--;
|
||||
|
||||
/* Replicate the command. */
|
||||
|
@ -5188,11 +5188,6 @@ void unblockClientFromModule(client *c) {
|
||||
moduleUnblockClient(c);
|
||||
|
||||
bc->client = NULL;
|
||||
/* Reset the client for a new query since, for blocking commands implemented
|
||||
* into modules, we do not it immediately after the command returns (and
|
||||
* the client blocks) in order to be still able to access the argument
|
||||
* vector from callbacks. */
|
||||
resetClient(c);
|
||||
}
|
||||
|
||||
/* Block a client in the context of a module: this function implements both
|
||||
|
@ -1645,6 +1645,9 @@ void resetClient(client *c) {
|
||||
c->flags |= CLIENT_REPLY_SKIP;
|
||||
c->flags &= ~CLIENT_REPLY_SKIP_NEXT;
|
||||
}
|
||||
|
||||
/* Always clear the prevent logging field. */
|
||||
c->flags &= ~CLIENT_PREVENT_LOGGING;
|
||||
}
|
||||
|
||||
/* This function is used when we want to re-enter the event loop but there
|
||||
@ -1954,13 +1957,10 @@ void commandProcessed(client *c) {
|
||||
c->reploff = c->read_reploff - sdslen(c->querybuf) + c->qb_pos;
|
||||
}
|
||||
|
||||
/* Don't reset the client structure for clients blocked in a
|
||||
* module blocking command, so that the reply callback will
|
||||
* still be able to access the client argv and argc field.
|
||||
* The client will be reset in unblockClientFromModule(). */
|
||||
if (!(c->flags & CLIENT_BLOCKED) ||
|
||||
(c->btype != BLOCKED_MODULE && c->btype != BLOCKED_PAUSE))
|
||||
{
|
||||
/* Don't reset the client structure for blocked clients, so that the reply
|
||||
* callback will still be able to access the client argv and argc fields.
|
||||
* The client will be reset in unblockClient(). */
|
||||
if (!(c->flags & CLIENT_BLOCKED)) {
|
||||
resetClient(c);
|
||||
}
|
||||
|
||||
|
56
src/server.c
56
src/server.c
@ -3639,6 +3639,19 @@ void preventCommandReplication(client *c) {
|
||||
c->flags |= CLIENT_PREVENT_REPL_PROP;
|
||||
}
|
||||
|
||||
/* Log the last command a client executed into the slowlog. */
|
||||
void slowlogPushCurrentCommand(client *c, struct redisCommand *cmd, ustime_t duration) {
|
||||
/* Some commands may contain sensitive data that should not be available in the slowlog. */
|
||||
if ((c->flags & CLIENT_PREVENT_LOGGING) || (cmd->flags & CMD_SKIP_SLOWLOG))
|
||||
return;
|
||||
|
||||
/* If command argument vector was rewritten, use the original
|
||||
* arguments. */
|
||||
robj **argv = c->original_argv ? c->original_argv : c->argv;
|
||||
int argc = c->original_argv ? c->original_argc : c->argc;
|
||||
slowlogPushEntryIfNeeded(c,argv,argc,duration);
|
||||
}
|
||||
|
||||
/* Call() is the core of Redis execution of a command.
|
||||
*
|
||||
* The following flags can be passed:
|
||||
@ -3741,34 +3754,31 @@ void call(client *c, int flags) {
|
||||
server.lua_caller->flags |= CLIENT_FORCE_AOF;
|
||||
}
|
||||
|
||||
/* Some commands may contain sensitive data that should
|
||||
* not be available in the slowlog. */
|
||||
if ((c->flags & CLIENT_PREVENT_LOGGING) && !(c->flags & CLIENT_BLOCKED)) {
|
||||
c->flags &= ~CLIENT_PREVENT_LOGGING;
|
||||
flags &= ~CMD_CALL_SLOWLOG;
|
||||
}
|
||||
/* Note: the code below uses the real command that was executed
|
||||
* c->cmd and c->lastcmd may be different, in case of MULTI-EXEC or
|
||||
* re-written commands such as EXPIRE, GEOADD, etc. */
|
||||
|
||||
/* Log the command into the Slow log if needed, and populate the
|
||||
* per-command statistics that we show in INFO commandstats. */
|
||||
if (flags & CMD_CALL_SLOWLOG && !(c->cmd->flags & CMD_SKIP_SLOWLOG)) {
|
||||
char *latency_event = (c->cmd->flags & CMD_FAST) ?
|
||||
"fast-command" : "command";
|
||||
/* Record the latency this command induced on the main thread.
|
||||
* unless instructed by the caller not to log. (happens when processing
|
||||
* a MULTI-EXEC from inside an AOF). */
|
||||
if (flags & CMD_CALL_SLOWLOG) {
|
||||
char *latency_event = (real_cmd->flags & CMD_FAST) ?
|
||||
"fast-command" : "command";
|
||||
latencyAddSampleIfNeeded(latency_event,duration/1000);
|
||||
/* If command argument vector was rewritten, use the original
|
||||
* arguments. */
|
||||
robj **argv = c->original_argv ? c->original_argv : c->argv;
|
||||
int argc = c->original_argv ? c->original_argc : c->argc;
|
||||
/* If the client is blocked we will handle slowlog when it is unblocked . */
|
||||
if (!(c->flags & CLIENT_BLOCKED)) {
|
||||
slowlogPushEntryIfNeeded(c,argv,argc,duration);
|
||||
}
|
||||
}
|
||||
freeClientOriginalArgv(c);
|
||||
|
||||
/* Log the command into the Slow log if needed.
|
||||
* If the client is blocked we will handle slowlog when it is unblocked. */
|
||||
if ((flags & CMD_CALL_SLOWLOG) && !(c->flags & CLIENT_BLOCKED))
|
||||
slowlogPushCurrentCommand(c, real_cmd, duration);
|
||||
|
||||
/* Clear the original argv.
|
||||
* If the client is blocked we will handle slowlog when it is unblocked. */
|
||||
if (!(c->flags & CLIENT_BLOCKED))
|
||||
freeClientOriginalArgv(c);
|
||||
|
||||
/* populate the per-command statistics that we show in INFO commandstats. */
|
||||
if (flags & CMD_CALL_STATS) {
|
||||
/* use the real command that was executed (cmd and lastamc) may be
|
||||
* different, in case of MULTI-EXEC or re-written commands such as
|
||||
* EXPIRE, GEOADD, etc. */
|
||||
real_cmd->microseconds += duration;
|
||||
real_cmd->calls++;
|
||||
}
|
||||
|
@ -2208,6 +2208,7 @@ void preventCommandPropagation(client *c);
|
||||
void preventCommandLogging(client *c);
|
||||
void preventCommandAOF(client *c);
|
||||
void preventCommandReplication(client *c);
|
||||
void slowlogPushCurrentCommand(client *c, struct redisCommand *cmd, ustime_t duration);
|
||||
int prepareForShutdown(int flags);
|
||||
#ifdef __GNUC__
|
||||
void _serverLog(int level, const char *fmt, ...)
|
||||
|
@ -90,6 +90,22 @@ start_server {tags {"slowlog"} overrides {slowlog-log-slower-than 1000000}} {
|
||||
# INCRBYFLOAT is replicated as SET
|
||||
r INCRBYFLOAT A 1.0
|
||||
assert_equal {INCRBYFLOAT A 1.0} [lindex [lindex [r slowlog get] 0] 3]
|
||||
|
||||
# blocked BLPOP is replicated as LPOP
|
||||
set rd [redis_deferring_client]
|
||||
$rd blpop l 0
|
||||
wait_for_condition 50 100 {
|
||||
[s blocked_clients] eq {1}
|
||||
} else {
|
||||
fail "Clients are not blocked"
|
||||
}
|
||||
r multi
|
||||
r lpush l foo
|
||||
r slowlog reset
|
||||
r exec
|
||||
$rd read
|
||||
$rd close
|
||||
assert_equal {blpop l 0} [lindex [lindex [r slowlog get] 0] 3]
|
||||
}
|
||||
|
||||
test {SLOWLOG - commands with too many arguments are trimmed} {
|
||||
|
Loading…
Reference in New Issue
Block a user