EXEC always fails with EXECABORT and multi-state is cleared

In order to support the use of multi-exec in pipeline, it is important that
MULTI and EXEC are never rejected and it is easy for the client to know if the
connection is still in multi state.

It was easy to make sure MULTI and DISCARD never fail (done by previous
commits) since these only change the client state and don't do any actual
change in the server, but EXEC is a different story.

Since in the past, it was possible for clients to handle some EXEC errors and
retry the EXEC, we now can't affort to return any error on EXEC other than
EXECABORT, which now carries with it the real reason for the abort too.

Other fixes in this commit:
- Some checks that where performed at the time of queuing need to be re-
  validated when EXEC runs, for instance if the transaction contains writes
  commands, it needs to be aborted. there was one check that was already done
  in execCommand (-READONLY), but other checks where missing: -OOM, -MISCONF,
  -NOREPLICAS, -MASTERDOWN
- When a command is rejected by processCommand it was rejected with addReply,
  which was not recognized as an error in case the bad command came from the
  master. this will enable to count or MONITOR these errors in the future.
- make it easier for tests to create additional (non deferred) clients.
- add tests for the fixes of this commit.
This commit is contained in:
Oran Agra 2020-06-11 21:09:35 +03:00
parent 2ebcd63d6a
commit 65a3307bc9
6 changed files with 204 additions and 91 deletions

View File

@ -36,6 +36,7 @@ void initClientMultiState(client *c) {
c->mstate.commands = NULL;
c->mstate.count = 0;
c->mstate.cmd_flags = 0;
c->mstate.cmd_inv_flags = 0;
}
/* Release all the resources associated with MULTI/EXEC state */
@ -76,6 +77,7 @@ void queueMultiCommand(client *c) {
incrRefCount(mc->argv[j]);
c->mstate.count++;
c->mstate.cmd_flags |= c->cmd->flags;
c->mstate.cmd_inv_flags |= ~c->cmd->flags;
}
void discardTransaction(client *c) {
@ -122,6 +124,23 @@ void execCommandPropagateExec(client *c) {
PROPAGATE_AOF|PROPAGATE_REPL);
}
/* Aborts a transaction, with a specific error message.
* The transaction is always aboarted with -EXECABORT so that the client knows
* the server exited the multi state, but the actual reason for the abort is
* included too. */
void execCommandAbort(client *c, sds error) {
discardTransaction(c);
if (error[0] == '-') error++;
addReplyErrorFormat(c, "-EXECABORT Transaction discarded because of: %s", error);
/* Send EXEC to clients waiting data from MONITOR. We did send a MULTI
* already, and didn't send any of the queued commands, now we'll just send
* EXEC so it is clear that the transaction is over. */
if (listLength(server.monitors) && !server.loading)
replicationFeedMonitors(c,server.monitors,c->db->id,c->argv,c->argc);
}
void execCommand(client *c) {
int j;
robj **orig_argv;
@ -135,15 +154,6 @@ void execCommand(client *c) {
return;
}
/* If we are in -BUSY state, flag the transaction and return the
* -BUSY error, like Redis <= 5. This is a temporary fix, may be changed
* ASAP, see issue #7353 on Github. */
if (server.lua_timedout) {
flagTransaction(c);
addReply(c, shared.slowscripterr);
return;
}
/* Check if we need to abort the EXEC because:
* 1) Some WATCHed key was touched.
* 2) There was a previous error while queueing commands.
@ -157,21 +167,6 @@ void execCommand(client *c) {
goto handle_monitor;
}
/* If there are write commands inside the transaction, and this is a read
* only slave, we want to send an error. This happens when the transaction
* was initiated when the instance was a master or a writable replica and
* then the configuration changed (for example instance was turned into
* a replica). */
if (!server.loading && server.masterhost && server.repl_slave_ro &&
!(c->flags & CLIENT_MASTER) && c->mstate.cmd_flags & CMD_WRITE)
{
addReplyError(c,
"Transaction contains write commands but instance "
"is now a read-only replica. EXEC aborted.");
discardTransaction(c);
goto handle_monitor;
}
/* Exec all the queued commands */
unwatchAllKeys(c); /* Unwatch ASAP otherwise we'll waste CPU cycles */
orig_argv = c->argv;

View File

@ -406,19 +406,23 @@ void addReplyError(client *c, const char *err) {
addReplyErrorLength(c,err,strlen(err));
}
/* See addReplyErrorLength.
* Makes sure there are no newlines in the string, otherwise invalid protocol
* is emitted. */
void addReplyErrorSafe(client *c, char *s, size_t len) {
size_t j;
for (j = 0; j < len; j++) {
if (s[j] == '\r' || s[j] == '\n') s[j] = ' ';
}
addReplyErrorLength(c,s,sdslen(s));
}
void addReplyErrorFormat(client *c, const char *fmt, ...) {
size_t l, j;
va_list ap;
va_start(ap,fmt);
sds s = sdscatvprintf(sdsempty(),fmt,ap);
va_end(ap);
/* Make sure there are no newlines in the string, otherwise invalid protocol
* is emitted. */
l = sdslen(s);
for (j = 0; j < l; j++) {
if (s[j] == '\r' || s[j] == '\n') s[j] = ' ';
}
addReplyErrorLength(c,s,sdslen(s));
addReplyErrorSafe(c, s, sdslen(s));
sdsfree(s);
}

View File

@ -3398,6 +3398,34 @@ void call(client *c, int flags) {
server.stat_numcommands++;
}
/* Used when a command that is ready for execution needs to be rejected, due to
* varios pre-execution checks. it returns the appropriate error to the client.
* If there's a transaction is flags it as dirty, and if the command is EXEC,
* it aborts the transaction. */
void rejectCommand(client *c, robj *reply) {
flagTransaction(c);
if (c->cmd && c->cmd->proc == execCommand) {
execCommandAbort(c, reply->ptr);
} else {
/* using addReplyError* rather than addReply so that the error can be logged. */
addReplyErrorSafe(c, reply->ptr, sdslen(reply->ptr));
}
}
void rejectCommandFormat(client *c, const char *fmt, ...) {
flagTransaction(c);
va_list ap;
va_start(ap,fmt);
sds s = sdscatvprintf(sdsempty(),fmt,ap);
va_end(ap);
if (c->cmd && c->cmd->proc == execCommand) {
execCommandAbort(c, s);
} else {
addReplyErrorSafe(c, s, sdslen(s));
}
sdsfree(s);
}
/* If this function gets called we already read a whole
* command, arguments are in the client argv/argc fields.
* processCommand() execute the command or prepare the
@ -3423,23 +3451,30 @@ int processCommand(client *c) {
* such as wrong arity, bad command name and so forth. */
c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);
if (!c->cmd) {
flagTransaction(c);
sds args = sdsempty();
int i;
for (i=1; i < c->argc && sdslen(args) < 128; i++)
args = sdscatprintf(args, "`%.*s`, ", 128-(int)sdslen(args), (char*)c->argv[i]->ptr);
addReplyErrorFormat(c,"unknown command `%s`, with args beginning with: %s",
rejectCommandFormat(c,"unknown command `%s`, with args beginning with: %s",
(char*)c->argv[0]->ptr, args);
sdsfree(args);
return C_OK;
} else if ((c->cmd->arity > 0 && c->cmd->arity != c->argc) ||
(c->argc < -c->cmd->arity)) {
flagTransaction(c);
addReplyErrorFormat(c,"wrong number of arguments for '%s' command",
rejectCommandFormat(c,"wrong number of arguments for '%s' command",
c->cmd->name);
return C_OK;
}
int is_write_command = (c->cmd->flags & CMD_WRITE) ||
(c->cmd->proc == execCommand && (c->mstate.cmd_flags & CMD_WRITE));
int is_denyoom_command = (c->cmd->flags & CMD_DENYOOM) ||
(c->cmd->proc == execCommand && (c->mstate.cmd_flags & CMD_DENYOOM));
int is_denystale_command = !(c->cmd->flags & CMD_STALE) ||
(c->cmd->proc == execCommand && (c->mstate.cmd_inv_flags & CMD_STALE));
int is_denyloading_command = !(c->cmd->flags & CMD_LOADING) ||
(c->cmd->proc == execCommand && (c->mstate.cmd_inv_flags & CMD_LOADING));
/* Check if the user is authenticated. This check is skipped in case
* the default user is flagged as "nopass" and is active. */
int auth_required = (!(DefaultUser->flags & USER_FLAG_NOPASS) ||
@ -3449,8 +3484,7 @@ int processCommand(client *c) {
/* AUTH and HELLO and no auth modules are valid even in
* non-authenticated state. */
if (!(c->cmd->flags & CMD_NO_AUTH)) {
flagTransaction(c);
addReply(c,shared.noautherr);
rejectCommand(c,shared.noautherr);
return C_OK;
}
}
@ -3461,13 +3495,12 @@ int processCommand(client *c) {
int acl_retval = ACLCheckCommandPerm(c,&acl_keypos);
if (acl_retval != ACL_OK) {
addACLLogEntry(c,acl_retval,acl_keypos,NULL);
flagTransaction(c);
if (acl_retval == ACL_DENIED_CMD)
addReplyErrorFormat(c,
rejectCommandFormat(c,
"-NOPERM this user has no permissions to run "
"the '%s' command or its subcommand", c->cmd->name);
else
addReplyErrorFormat(c,
rejectCommandFormat(c,
"-NOPERM this user has no permissions to access "
"one of the keys used as arguments");
return C_OK;
@ -3515,13 +3548,11 @@ int processCommand(client *c) {
* is trying to execute is denied during OOM conditions or the client
* is in MULTI/EXEC context? Error. */
if (out_of_memory &&
(c->cmd->flags & CMD_DENYOOM ||
(is_denyoom_command ||
(c->flags & CLIENT_MULTI &&
c->cmd->proc != execCommand &&
c->cmd->proc != discardCommand)))
{
flagTransaction(c);
addReply(c, shared.oomerr);
rejectCommand(c, shared.oomerr);
return C_OK;
}
@ -3542,17 +3573,14 @@ int processCommand(client *c) {
int deny_write_type = writeCommandsDeniedByDiskError();
if (deny_write_type != DISK_ERROR_TYPE_NONE &&
server.masterhost == NULL &&
(c->cmd->flags & CMD_WRITE ||
c->cmd->proc == pingCommand))
(is_write_command ||c->cmd->proc == pingCommand))
{
flagTransaction(c);
if (deny_write_type == DISK_ERROR_TYPE_RDB)
addReply(c, shared.bgsaveerr);
rejectCommand(c, shared.bgsaveerr);
else
addReplySds(c,
sdscatprintf(sdsempty(),
rejectCommandFormat(c,
"-MISCONF Errors writing to the AOF file: %s\r\n",
strerror(server.aof_last_write_errno)));
strerror(server.aof_last_write_errno));
return C_OK;
}
@ -3561,11 +3589,10 @@ int processCommand(client *c) {
if (server.masterhost == NULL &&
server.repl_min_slaves_to_write &&
server.repl_min_slaves_max_lag &&
c->cmd->flags & CMD_WRITE &&
is_write_command &&
server.repl_good_slaves_count < server.repl_min_slaves_to_write)
{
flagTransaction(c);
addReply(c, shared.noreplicaserr);
rejectCommand(c, shared.noreplicaserr);
return C_OK;
}
@ -3573,10 +3600,9 @@ int processCommand(client *c) {
* accept write commands if this is our master. */
if (server.masterhost && server.repl_slave_ro &&
!(c->flags & CLIENT_MASTER) &&
c->cmd->flags & CMD_WRITE)
is_write_command)
{
flagTransaction(c);
addReply(c, shared.roslaveerr);
rejectCommand(c, shared.roslaveerr);
return C_OK;
}
@ -3588,7 +3614,7 @@ int processCommand(client *c) {
c->cmd->proc != unsubscribeCommand &&
c->cmd->proc != psubscribeCommand &&
c->cmd->proc != punsubscribeCommand) {
addReplyErrorFormat(c,
rejectCommandFormat(c,
"Can't execute '%s': only (P)SUBSCRIBE / "
"(P)UNSUBSCRIBE / PING / QUIT are allowed in this context",
c->cmd->name);
@ -3600,17 +3626,16 @@ int processCommand(client *c) {
* link with master. */
if (server.masterhost && server.repl_state != REPL_STATE_CONNECTED &&
server.repl_serve_stale_data == 0 &&
!(c->cmd->flags & CMD_STALE))
is_denystale_command)
{
flagTransaction(c);
addReply(c, shared.masterdownerr);
rejectCommand(c, shared.masterdownerr);
return C_OK;
}
/* Loading DB? Return an error if the command has not the
* CMD_LOADING flag. */
if (server.loading && !(c->cmd->flags & CMD_LOADING)) {
addReply(c, shared.loadingerr);
if (server.loading && is_denyloading_command) {
rejectCommand(c, shared.loadingerr);
return C_OK;
}
@ -3625,7 +3650,6 @@ int processCommand(client *c) {
c->cmd->proc != helloCommand &&
c->cmd->proc != replconfCommand &&
c->cmd->proc != multiCommand &&
c->cmd->proc != execCommand &&
c->cmd->proc != discardCommand &&
c->cmd->proc != watchCommand &&
c->cmd->proc != unwatchCommand &&
@ -3636,8 +3660,7 @@ int processCommand(client *c) {
c->argc == 2 &&
tolower(((char*)c->argv[1]->ptr)[0]) == 'k'))
{
flagTransaction(c);
addReply(c, shared.slowscripterr);
rejectCommand(c, shared.slowscripterr);
return C_OK;
}

View File

@ -666,6 +666,9 @@ typedef struct multiState {
int cmd_flags; /* The accumulated command flags OR-ed together.
So if at least a command has a given flag, it
will be set in this field. */
int cmd_inv_flags; /* Same as cmd_flags, OR-ing the ~flags. so that it
is possible to know if all the commands have a
certain flag. */
int minreplicas; /* MINREPLICAS for synchronous replication */
time_t minreplicas_timeout; /* MINREPLICAS timeout as unixtime. */
} multiState;
@ -1626,6 +1629,7 @@ void addReplyBulkLongLong(client *c, long long ll);
void addReply(client *c, robj *obj);
void addReplySds(client *c, sds s);
void addReplyBulkSds(client *c, sds s);
void addReplyErrorSafe(client *c, char *s, size_t len);
void addReplyError(client *c, const char *err);
void addReplyStatus(client *c, const char *status);
void addReplyDouble(client *c, double d);
@ -1724,6 +1728,7 @@ void touchWatchedKey(redisDb *db, robj *key);
void touchWatchedKeysOnFlush(int dbid);
void discardTransaction(client *c);
void flagTransaction(client *c);
void execCommandAbort(client *c, sds error);
void execCommandPropagateMulti(client *c);
void execCommandPropagateExec(client *c);

View File

@ -196,6 +196,21 @@ proc redis_deferring_client {args} {
return $client
}
proc redis_client {args} {
set level 0
if {[llength $args] > 0 && [string is integer [lindex $args 0]]} {
set level [lindex $args 0]
set args [lrange $args 1 end]
}
# create client that defers reading reply
set client [redis [srv $level "host"] [srv $level "port"] 0 $::tls]
# select the right db and read the response (OK)
$client select 9
return $client
}
# Provide easy access to INFO properties. Same semantic as "proc r".
proc s {args} {
set level 0

View File

@ -325,74 +325,145 @@ start_server {tags {"multi"}} {
# check that if MULTI arrives during timeout, it is either refused, or
# allowed to pass, and we don't end up executing half of the transaction
set rd1 [redis_deferring_client]
set rd2 [redis_deferring_client]
set r2 [redis_client]
r config set lua-time-limit 10
r set xx 1
$rd1 eval {while true do end} 0
after 200
catch { $rd2 multi; $rd2 read } e
catch { $rd2 incr xx; $rd2 read } e
catch { $r2 multi; } e
catch { $r2 incr xx; } e
r script kill
after 200 ; # Give some time to Lua to call the hook again...
catch { $rd2 incr xx; $rd2 read } e
catch { $rd2 exec; $rd2 read } e
catch { $r2 incr xx; } e
catch { $r2 exec; } e
assert_match {EXECABORT*previous errors*} $e
set xx [r get xx]
# make sure that either the whole transcation passed or none of it (we actually expect none)
assert { $xx == 1 || $xx == 3}
# check that the connection is no longer in multi state
$rd2 ping asdf
set pong [$rd2 read]
set pong [$r2 ping asdf]
assert_equal $pong "asdf"
$rd1 close; $r2 close
}
test {EXEC and script timeout} {
# check that if EXEC arrives during timeout, we don't end up executing
# half of the transaction, and also that we exit the multi state
set rd1 [redis_deferring_client]
set rd2 [redis_deferring_client]
set r2 [redis_client]
r config set lua-time-limit 10
r set xx 1
catch { $rd2 multi; $rd2 read } e
catch { $rd2 incr xx; $rd2 read } e
catch { $r2 multi; } e
catch { $r2 incr xx; } e
$rd1 eval {while true do end} 0
after 200
catch { $rd2 incr xx; $rd2 read } e
catch { $rd2 exec; $rd2 read } e
catch { $r2 incr xx; } e
catch { $r2 exec; } e
assert_match {EXECABORT*BUSY*} $e
r script kill
after 200 ; # Give some time to Lua to call the hook again...
set xx [r get xx]
# make sure that either the whole transcation passed or none of it (we actually expect none)
assert { $xx == 1 || $xx == 3}
# Discard the transaction since EXEC likely got -BUSY error
# so the client is still in MULTI state.
catch { $rd2 discard ;$rd2 read } e
# check that the connection is no longer in multi state
$rd2 ping asdf
set pong [$rd2 read]
set pong [$r2 ping asdf]
assert_equal $pong "asdf"
$rd1 close; $r2 close
}
test {MULTI-EXEC body and script timeout} {
# check that we don't run an imcomplete transaction due to some commands
# arriving during busy script
set rd1 [redis_deferring_client]
set rd2 [redis_deferring_client]
set r2 [redis_client]
r config set lua-time-limit 10
r set xx 1
catch { $rd2 multi; $rd2 read } e
catch { $rd2 incr xx; $rd2 read } e
catch { $r2 multi; } e
catch { $r2 incr xx; } e
$rd1 eval {while true do end} 0
after 200
catch { $rd2 incr xx; $rd2 read } e
catch { $r2 incr xx; } e
r script kill
after 200 ; # Give some time to Lua to call the hook again...
catch { $rd2 exec; $rd2 read } e
catch { $r2 exec; } e
assert_match {EXECABORT*previous errors*} $e
set xx [r get xx]
# make sure that either the whole transcation passed or none of it (we actually expect none)
assert { $xx == 1 || $xx == 3}
# check that the connection is no longer in multi state
$rd2 ping asdf
set pong [$rd2 read]
set pong [$r2 ping asdf]
assert_equal $pong "asdf"
$rd1 close; $r2 close
}
test {just EXEC and script timeout} {
# check that if EXEC arrives during timeout, we don't end up executing
# actual commands during busy script, and also that we exit the multi state
set rd1 [redis_deferring_client]
set r2 [redis_client]
r config set lua-time-limit 10
r set xx 1
catch { $r2 multi; } e
catch { $r2 incr xx; } e
$rd1 eval {while true do end} 0
after 200
catch { $r2 exec; } e
assert_match {EXECABORT*BUSY*} $e
r script kill
after 200 ; # Give some time to Lua to call the hook again...
set xx [r get xx]
# make we didn't execute the transaction
assert { $xx == 1}
# check that the connection is no longer in multi state
set pong [$r2 ping asdf]
assert_equal $pong "asdf"
$rd1 close; $r2 close
}
test {exec with write commands and state change} {
# check that exec that contains write commands fails if server state changed since they were queued
set r1 [redis_client]
r set xx 1
r multi
r incr xx
$r1 config set min-replicas-to-write 2
catch {r exec} e
assert_match {*EXECABORT*NOREPLICAS*} $e
set xx [r get xx]
# make sure that the INCR wasn't executed
assert { $xx == 1}
$r1 config set min-replicas-to-write 0
$r1 close;
}
test {exec with read commands and stale replica state change} {
# check that exec that contains read commands fails if server state changed since they were queued
r config set replica-serve-stale-data no
set r1 [redis_client]
r set xx 1
# check that GET is disallowed on stale replica, even if the replica becomes stale only after queuing.
r multi
r get xx
$r1 replicaof localhsot 0
catch {r exec} e
assert_match {*EXECABORT*MASTERDOWN*} $e
# check that PING is allowed
r multi
r ping
$r1 replicaof localhsot 0
set pong [r exec]
assert {$pong == "PONG"}
# check that when replica is not stale, GET is allowed
# while we're at it, let's check that multi is allowed on stale replica too
r multi
$r1 replicaof no one
r get xx
set xx [r exec]
# make sure that the INCR was executed
assert { $xx == 1 }
$r1 close;
}
}