mirror of
https://codeberg.org/redict/redict.git
synced 2025-01-22 16:18:28 -05:00
EXEC always fails with EXECABORT and multi-state is cleared
In order to support the use of multi-exec in pipeline, it is important that MULTI and EXEC are never rejected and it is easy for the client to know if the connection is still in multi state. It was easy to make sure MULTI and DISCARD never fail (done by previous commits) since these only change the client state and don't do any actual change in the server, but EXEC is a different story. Since in the past, it was possible for clients to handle some EXEC errors and retry the EXEC, we now can't affort to return any error on EXEC other than EXECABORT, which now carries with it the real reason for the abort too. Other fixes in this commit: - Some checks that where performed at the time of queuing need to be re- validated when EXEC runs, for instance if the transaction contains writes commands, it needs to be aborted. there was one check that was already done in execCommand (-READONLY), but other checks where missing: -OOM, -MISCONF, -NOREPLICAS, -MASTERDOWN - When a command is rejected by processCommand it was rejected with addReply, which was not recognized as an error in case the bad command came from the master. this will enable to count or MONITOR these errors in the future. - make it easier for tests to create additional (non deferred) clients. - add tests for the fixes of this commit.
This commit is contained in:
parent
2ebcd63d6a
commit
65a3307bc9
43
src/multi.c
43
src/multi.c
@ -36,6 +36,7 @@ void initClientMultiState(client *c) {
|
||||
c->mstate.commands = NULL;
|
||||
c->mstate.count = 0;
|
||||
c->mstate.cmd_flags = 0;
|
||||
c->mstate.cmd_inv_flags = 0;
|
||||
}
|
||||
|
||||
/* Release all the resources associated with MULTI/EXEC state */
|
||||
@ -76,6 +77,7 @@ void queueMultiCommand(client *c) {
|
||||
incrRefCount(mc->argv[j]);
|
||||
c->mstate.count++;
|
||||
c->mstate.cmd_flags |= c->cmd->flags;
|
||||
c->mstate.cmd_inv_flags |= ~c->cmd->flags;
|
||||
}
|
||||
|
||||
void discardTransaction(client *c) {
|
||||
@ -122,6 +124,23 @@ void execCommandPropagateExec(client *c) {
|
||||
PROPAGATE_AOF|PROPAGATE_REPL);
|
||||
}
|
||||
|
||||
/* Aborts a transaction, with a specific error message.
|
||||
* The transaction is always aboarted with -EXECABORT so that the client knows
|
||||
* the server exited the multi state, but the actual reason for the abort is
|
||||
* included too. */
|
||||
void execCommandAbort(client *c, sds error) {
|
||||
discardTransaction(c);
|
||||
|
||||
if (error[0] == '-') error++;
|
||||
addReplyErrorFormat(c, "-EXECABORT Transaction discarded because of: %s", error);
|
||||
|
||||
/* Send EXEC to clients waiting data from MONITOR. We did send a MULTI
|
||||
* already, and didn't send any of the queued commands, now we'll just send
|
||||
* EXEC so it is clear that the transaction is over. */
|
||||
if (listLength(server.monitors) && !server.loading)
|
||||
replicationFeedMonitors(c,server.monitors,c->db->id,c->argv,c->argc);
|
||||
}
|
||||
|
||||
void execCommand(client *c) {
|
||||
int j;
|
||||
robj **orig_argv;
|
||||
@ -135,15 +154,6 @@ void execCommand(client *c) {
|
||||
return;
|
||||
}
|
||||
|
||||
/* If we are in -BUSY state, flag the transaction and return the
|
||||
* -BUSY error, like Redis <= 5. This is a temporary fix, may be changed
|
||||
* ASAP, see issue #7353 on Github. */
|
||||
if (server.lua_timedout) {
|
||||
flagTransaction(c);
|
||||
addReply(c, shared.slowscripterr);
|
||||
return;
|
||||
}
|
||||
|
||||
/* Check if we need to abort the EXEC because:
|
||||
* 1) Some WATCHed key was touched.
|
||||
* 2) There was a previous error while queueing commands.
|
||||
@ -157,21 +167,6 @@ void execCommand(client *c) {
|
||||
goto handle_monitor;
|
||||
}
|
||||
|
||||
/* If there are write commands inside the transaction, and this is a read
|
||||
* only slave, we want to send an error. This happens when the transaction
|
||||
* was initiated when the instance was a master or a writable replica and
|
||||
* then the configuration changed (for example instance was turned into
|
||||
* a replica). */
|
||||
if (!server.loading && server.masterhost && server.repl_slave_ro &&
|
||||
!(c->flags & CLIENT_MASTER) && c->mstate.cmd_flags & CMD_WRITE)
|
||||
{
|
||||
addReplyError(c,
|
||||
"Transaction contains write commands but instance "
|
||||
"is now a read-only replica. EXEC aborted.");
|
||||
discardTransaction(c);
|
||||
goto handle_monitor;
|
||||
}
|
||||
|
||||
/* Exec all the queued commands */
|
||||
unwatchAllKeys(c); /* Unwatch ASAP otherwise we'll waste CPU cycles */
|
||||
orig_argv = c->argv;
|
||||
|
@ -406,19 +406,23 @@ void addReplyError(client *c, const char *err) {
|
||||
addReplyErrorLength(c,err,strlen(err));
|
||||
}
|
||||
|
||||
/* See addReplyErrorLength.
|
||||
* Makes sure there are no newlines in the string, otherwise invalid protocol
|
||||
* is emitted. */
|
||||
void addReplyErrorSafe(client *c, char *s, size_t len) {
|
||||
size_t j;
|
||||
for (j = 0; j < len; j++) {
|
||||
if (s[j] == '\r' || s[j] == '\n') s[j] = ' ';
|
||||
}
|
||||
addReplyErrorLength(c,s,sdslen(s));
|
||||
}
|
||||
|
||||
void addReplyErrorFormat(client *c, const char *fmt, ...) {
|
||||
size_t l, j;
|
||||
va_list ap;
|
||||
va_start(ap,fmt);
|
||||
sds s = sdscatvprintf(sdsempty(),fmt,ap);
|
||||
va_end(ap);
|
||||
/* Make sure there are no newlines in the string, otherwise invalid protocol
|
||||
* is emitted. */
|
||||
l = sdslen(s);
|
||||
for (j = 0; j < l; j++) {
|
||||
if (s[j] == '\r' || s[j] == '\n') s[j] = ' ';
|
||||
}
|
||||
addReplyErrorLength(c,s,sdslen(s));
|
||||
addReplyErrorSafe(c, s, sdslen(s));
|
||||
sdsfree(s);
|
||||
}
|
||||
|
||||
|
93
src/server.c
93
src/server.c
@ -3398,6 +3398,34 @@ void call(client *c, int flags) {
|
||||
server.stat_numcommands++;
|
||||
}
|
||||
|
||||
/* Used when a command that is ready for execution needs to be rejected, due to
|
||||
* varios pre-execution checks. it returns the appropriate error to the client.
|
||||
* If there's a transaction is flags it as dirty, and if the command is EXEC,
|
||||
* it aborts the transaction. */
|
||||
void rejectCommand(client *c, robj *reply) {
|
||||
flagTransaction(c);
|
||||
if (c->cmd && c->cmd->proc == execCommand) {
|
||||
execCommandAbort(c, reply->ptr);
|
||||
} else {
|
||||
/* using addReplyError* rather than addReply so that the error can be logged. */
|
||||
addReplyErrorSafe(c, reply->ptr, sdslen(reply->ptr));
|
||||
}
|
||||
}
|
||||
|
||||
void rejectCommandFormat(client *c, const char *fmt, ...) {
|
||||
flagTransaction(c);
|
||||
va_list ap;
|
||||
va_start(ap,fmt);
|
||||
sds s = sdscatvprintf(sdsempty(),fmt,ap);
|
||||
va_end(ap);
|
||||
if (c->cmd && c->cmd->proc == execCommand) {
|
||||
execCommandAbort(c, s);
|
||||
} else {
|
||||
addReplyErrorSafe(c, s, sdslen(s));
|
||||
}
|
||||
sdsfree(s);
|
||||
}
|
||||
|
||||
/* If this function gets called we already read a whole
|
||||
* command, arguments are in the client argv/argc fields.
|
||||
* processCommand() execute the command or prepare the
|
||||
@ -3423,23 +3451,30 @@ int processCommand(client *c) {
|
||||
* such as wrong arity, bad command name and so forth. */
|
||||
c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);
|
||||
if (!c->cmd) {
|
||||
flagTransaction(c);
|
||||
sds args = sdsempty();
|
||||
int i;
|
||||
for (i=1; i < c->argc && sdslen(args) < 128; i++)
|
||||
args = sdscatprintf(args, "`%.*s`, ", 128-(int)sdslen(args), (char*)c->argv[i]->ptr);
|
||||
addReplyErrorFormat(c,"unknown command `%s`, with args beginning with: %s",
|
||||
rejectCommandFormat(c,"unknown command `%s`, with args beginning with: %s",
|
||||
(char*)c->argv[0]->ptr, args);
|
||||
sdsfree(args);
|
||||
return C_OK;
|
||||
} else if ((c->cmd->arity > 0 && c->cmd->arity != c->argc) ||
|
||||
(c->argc < -c->cmd->arity)) {
|
||||
flagTransaction(c);
|
||||
addReplyErrorFormat(c,"wrong number of arguments for '%s' command",
|
||||
rejectCommandFormat(c,"wrong number of arguments for '%s' command",
|
||||
c->cmd->name);
|
||||
return C_OK;
|
||||
}
|
||||
|
||||
int is_write_command = (c->cmd->flags & CMD_WRITE) ||
|
||||
(c->cmd->proc == execCommand && (c->mstate.cmd_flags & CMD_WRITE));
|
||||
int is_denyoom_command = (c->cmd->flags & CMD_DENYOOM) ||
|
||||
(c->cmd->proc == execCommand && (c->mstate.cmd_flags & CMD_DENYOOM));
|
||||
int is_denystale_command = !(c->cmd->flags & CMD_STALE) ||
|
||||
(c->cmd->proc == execCommand && (c->mstate.cmd_inv_flags & CMD_STALE));
|
||||
int is_denyloading_command = !(c->cmd->flags & CMD_LOADING) ||
|
||||
(c->cmd->proc == execCommand && (c->mstate.cmd_inv_flags & CMD_LOADING));
|
||||
|
||||
/* Check if the user is authenticated. This check is skipped in case
|
||||
* the default user is flagged as "nopass" and is active. */
|
||||
int auth_required = (!(DefaultUser->flags & USER_FLAG_NOPASS) ||
|
||||
@ -3449,8 +3484,7 @@ int processCommand(client *c) {
|
||||
/* AUTH and HELLO and no auth modules are valid even in
|
||||
* non-authenticated state. */
|
||||
if (!(c->cmd->flags & CMD_NO_AUTH)) {
|
||||
flagTransaction(c);
|
||||
addReply(c,shared.noautherr);
|
||||
rejectCommand(c,shared.noautherr);
|
||||
return C_OK;
|
||||
}
|
||||
}
|
||||
@ -3461,13 +3495,12 @@ int processCommand(client *c) {
|
||||
int acl_retval = ACLCheckCommandPerm(c,&acl_keypos);
|
||||
if (acl_retval != ACL_OK) {
|
||||
addACLLogEntry(c,acl_retval,acl_keypos,NULL);
|
||||
flagTransaction(c);
|
||||
if (acl_retval == ACL_DENIED_CMD)
|
||||
addReplyErrorFormat(c,
|
||||
rejectCommandFormat(c,
|
||||
"-NOPERM this user has no permissions to run "
|
||||
"the '%s' command or its subcommand", c->cmd->name);
|
||||
else
|
||||
addReplyErrorFormat(c,
|
||||
rejectCommandFormat(c,
|
||||
"-NOPERM this user has no permissions to access "
|
||||
"one of the keys used as arguments");
|
||||
return C_OK;
|
||||
@ -3515,13 +3548,11 @@ int processCommand(client *c) {
|
||||
* is trying to execute is denied during OOM conditions or the client
|
||||
* is in MULTI/EXEC context? Error. */
|
||||
if (out_of_memory &&
|
||||
(c->cmd->flags & CMD_DENYOOM ||
|
||||
(is_denyoom_command ||
|
||||
(c->flags & CLIENT_MULTI &&
|
||||
c->cmd->proc != execCommand &&
|
||||
c->cmd->proc != discardCommand)))
|
||||
{
|
||||
flagTransaction(c);
|
||||
addReply(c, shared.oomerr);
|
||||
rejectCommand(c, shared.oomerr);
|
||||
return C_OK;
|
||||
}
|
||||
|
||||
@ -3542,17 +3573,14 @@ int processCommand(client *c) {
|
||||
int deny_write_type = writeCommandsDeniedByDiskError();
|
||||
if (deny_write_type != DISK_ERROR_TYPE_NONE &&
|
||||
server.masterhost == NULL &&
|
||||
(c->cmd->flags & CMD_WRITE ||
|
||||
c->cmd->proc == pingCommand))
|
||||
(is_write_command ||c->cmd->proc == pingCommand))
|
||||
{
|
||||
flagTransaction(c);
|
||||
if (deny_write_type == DISK_ERROR_TYPE_RDB)
|
||||
addReply(c, shared.bgsaveerr);
|
||||
rejectCommand(c, shared.bgsaveerr);
|
||||
else
|
||||
addReplySds(c,
|
||||
sdscatprintf(sdsempty(),
|
||||
rejectCommandFormat(c,
|
||||
"-MISCONF Errors writing to the AOF file: %s\r\n",
|
||||
strerror(server.aof_last_write_errno)));
|
||||
strerror(server.aof_last_write_errno));
|
||||
return C_OK;
|
||||
}
|
||||
|
||||
@ -3561,11 +3589,10 @@ int processCommand(client *c) {
|
||||
if (server.masterhost == NULL &&
|
||||
server.repl_min_slaves_to_write &&
|
||||
server.repl_min_slaves_max_lag &&
|
||||
c->cmd->flags & CMD_WRITE &&
|
||||
is_write_command &&
|
||||
server.repl_good_slaves_count < server.repl_min_slaves_to_write)
|
||||
{
|
||||
flagTransaction(c);
|
||||
addReply(c, shared.noreplicaserr);
|
||||
rejectCommand(c, shared.noreplicaserr);
|
||||
return C_OK;
|
||||
}
|
||||
|
||||
@ -3573,10 +3600,9 @@ int processCommand(client *c) {
|
||||
* accept write commands if this is our master. */
|
||||
if (server.masterhost && server.repl_slave_ro &&
|
||||
!(c->flags & CLIENT_MASTER) &&
|
||||
c->cmd->flags & CMD_WRITE)
|
||||
is_write_command)
|
||||
{
|
||||
flagTransaction(c);
|
||||
addReply(c, shared.roslaveerr);
|
||||
rejectCommand(c, shared.roslaveerr);
|
||||
return C_OK;
|
||||
}
|
||||
|
||||
@ -3588,7 +3614,7 @@ int processCommand(client *c) {
|
||||
c->cmd->proc != unsubscribeCommand &&
|
||||
c->cmd->proc != psubscribeCommand &&
|
||||
c->cmd->proc != punsubscribeCommand) {
|
||||
addReplyErrorFormat(c,
|
||||
rejectCommandFormat(c,
|
||||
"Can't execute '%s': only (P)SUBSCRIBE / "
|
||||
"(P)UNSUBSCRIBE / PING / QUIT are allowed in this context",
|
||||
c->cmd->name);
|
||||
@ -3600,17 +3626,16 @@ int processCommand(client *c) {
|
||||
* link with master. */
|
||||
if (server.masterhost && server.repl_state != REPL_STATE_CONNECTED &&
|
||||
server.repl_serve_stale_data == 0 &&
|
||||
!(c->cmd->flags & CMD_STALE))
|
||||
is_denystale_command)
|
||||
{
|
||||
flagTransaction(c);
|
||||
addReply(c, shared.masterdownerr);
|
||||
rejectCommand(c, shared.masterdownerr);
|
||||
return C_OK;
|
||||
}
|
||||
|
||||
/* Loading DB? Return an error if the command has not the
|
||||
* CMD_LOADING flag. */
|
||||
if (server.loading && !(c->cmd->flags & CMD_LOADING)) {
|
||||
addReply(c, shared.loadingerr);
|
||||
if (server.loading && is_denyloading_command) {
|
||||
rejectCommand(c, shared.loadingerr);
|
||||
return C_OK;
|
||||
}
|
||||
|
||||
@ -3625,7 +3650,6 @@ int processCommand(client *c) {
|
||||
c->cmd->proc != helloCommand &&
|
||||
c->cmd->proc != replconfCommand &&
|
||||
c->cmd->proc != multiCommand &&
|
||||
c->cmd->proc != execCommand &&
|
||||
c->cmd->proc != discardCommand &&
|
||||
c->cmd->proc != watchCommand &&
|
||||
c->cmd->proc != unwatchCommand &&
|
||||
@ -3636,8 +3660,7 @@ int processCommand(client *c) {
|
||||
c->argc == 2 &&
|
||||
tolower(((char*)c->argv[1]->ptr)[0]) == 'k'))
|
||||
{
|
||||
flagTransaction(c);
|
||||
addReply(c, shared.slowscripterr);
|
||||
rejectCommand(c, shared.slowscripterr);
|
||||
return C_OK;
|
||||
}
|
||||
|
||||
|
@ -666,6 +666,9 @@ typedef struct multiState {
|
||||
int cmd_flags; /* The accumulated command flags OR-ed together.
|
||||
So if at least a command has a given flag, it
|
||||
will be set in this field. */
|
||||
int cmd_inv_flags; /* Same as cmd_flags, OR-ing the ~flags. so that it
|
||||
is possible to know if all the commands have a
|
||||
certain flag. */
|
||||
int minreplicas; /* MINREPLICAS for synchronous replication */
|
||||
time_t minreplicas_timeout; /* MINREPLICAS timeout as unixtime. */
|
||||
} multiState;
|
||||
@ -1626,6 +1629,7 @@ void addReplyBulkLongLong(client *c, long long ll);
|
||||
void addReply(client *c, robj *obj);
|
||||
void addReplySds(client *c, sds s);
|
||||
void addReplyBulkSds(client *c, sds s);
|
||||
void addReplyErrorSafe(client *c, char *s, size_t len);
|
||||
void addReplyError(client *c, const char *err);
|
||||
void addReplyStatus(client *c, const char *status);
|
||||
void addReplyDouble(client *c, double d);
|
||||
@ -1724,6 +1728,7 @@ void touchWatchedKey(redisDb *db, robj *key);
|
||||
void touchWatchedKeysOnFlush(int dbid);
|
||||
void discardTransaction(client *c);
|
||||
void flagTransaction(client *c);
|
||||
void execCommandAbort(client *c, sds error);
|
||||
void execCommandPropagateMulti(client *c);
|
||||
void execCommandPropagateExec(client *c);
|
||||
|
||||
|
@ -196,6 +196,21 @@ proc redis_deferring_client {args} {
|
||||
return $client
|
||||
}
|
||||
|
||||
proc redis_client {args} {
|
||||
set level 0
|
||||
if {[llength $args] > 0 && [string is integer [lindex $args 0]]} {
|
||||
set level [lindex $args 0]
|
||||
set args [lrange $args 1 end]
|
||||
}
|
||||
|
||||
# create client that defers reading reply
|
||||
set client [redis [srv $level "host"] [srv $level "port"] 0 $::tls]
|
||||
|
||||
# select the right db and read the response (OK)
|
||||
$client select 9
|
||||
return $client
|
||||
}
|
||||
|
||||
# Provide easy access to INFO properties. Same semantic as "proc r".
|
||||
proc s {args} {
|
||||
set level 0
|
||||
|
@ -325,74 +325,145 @@ start_server {tags {"multi"}} {
|
||||
# check that if MULTI arrives during timeout, it is either refused, or
|
||||
# allowed to pass, and we don't end up executing half of the transaction
|
||||
set rd1 [redis_deferring_client]
|
||||
set rd2 [redis_deferring_client]
|
||||
set r2 [redis_client]
|
||||
r config set lua-time-limit 10
|
||||
r set xx 1
|
||||
$rd1 eval {while true do end} 0
|
||||
after 200
|
||||
catch { $rd2 multi; $rd2 read } e
|
||||
catch { $rd2 incr xx; $rd2 read } e
|
||||
catch { $r2 multi; } e
|
||||
catch { $r2 incr xx; } e
|
||||
r script kill
|
||||
after 200 ; # Give some time to Lua to call the hook again...
|
||||
catch { $rd2 incr xx; $rd2 read } e
|
||||
catch { $rd2 exec; $rd2 read } e
|
||||
catch { $r2 incr xx; } e
|
||||
catch { $r2 exec; } e
|
||||
assert_match {EXECABORT*previous errors*} $e
|
||||
set xx [r get xx]
|
||||
# make sure that either the whole transcation passed or none of it (we actually expect none)
|
||||
assert { $xx == 1 || $xx == 3}
|
||||
# check that the connection is no longer in multi state
|
||||
$rd2 ping asdf
|
||||
set pong [$rd2 read]
|
||||
set pong [$r2 ping asdf]
|
||||
assert_equal $pong "asdf"
|
||||
$rd1 close; $r2 close
|
||||
}
|
||||
|
||||
test {EXEC and script timeout} {
|
||||
# check that if EXEC arrives during timeout, we don't end up executing
|
||||
# half of the transaction, and also that we exit the multi state
|
||||
set rd1 [redis_deferring_client]
|
||||
set rd2 [redis_deferring_client]
|
||||
set r2 [redis_client]
|
||||
r config set lua-time-limit 10
|
||||
r set xx 1
|
||||
catch { $rd2 multi; $rd2 read } e
|
||||
catch { $rd2 incr xx; $rd2 read } e
|
||||
catch { $r2 multi; } e
|
||||
catch { $r2 incr xx; } e
|
||||
$rd1 eval {while true do end} 0
|
||||
after 200
|
||||
catch { $rd2 incr xx; $rd2 read } e
|
||||
catch { $rd2 exec; $rd2 read } e
|
||||
catch { $r2 incr xx; } e
|
||||
catch { $r2 exec; } e
|
||||
assert_match {EXECABORT*BUSY*} $e
|
||||
r script kill
|
||||
after 200 ; # Give some time to Lua to call the hook again...
|
||||
set xx [r get xx]
|
||||
# make sure that either the whole transcation passed or none of it (we actually expect none)
|
||||
assert { $xx == 1 || $xx == 3}
|
||||
# Discard the transaction since EXEC likely got -BUSY error
|
||||
# so the client is still in MULTI state.
|
||||
catch { $rd2 discard ;$rd2 read } e
|
||||
# check that the connection is no longer in multi state
|
||||
$rd2 ping asdf
|
||||
set pong [$rd2 read]
|
||||
set pong [$r2 ping asdf]
|
||||
assert_equal $pong "asdf"
|
||||
$rd1 close; $r2 close
|
||||
}
|
||||
|
||||
test {MULTI-EXEC body and script timeout} {
|
||||
# check that we don't run an imcomplete transaction due to some commands
|
||||
# arriving during busy script
|
||||
set rd1 [redis_deferring_client]
|
||||
set rd2 [redis_deferring_client]
|
||||
set r2 [redis_client]
|
||||
r config set lua-time-limit 10
|
||||
r set xx 1
|
||||
catch { $rd2 multi; $rd2 read } e
|
||||
catch { $rd2 incr xx; $rd2 read } e
|
||||
catch { $r2 multi; } e
|
||||
catch { $r2 incr xx; } e
|
||||
$rd1 eval {while true do end} 0
|
||||
after 200
|
||||
catch { $rd2 incr xx; $rd2 read } e
|
||||
catch { $r2 incr xx; } e
|
||||
r script kill
|
||||
after 200 ; # Give some time to Lua to call the hook again...
|
||||
catch { $rd2 exec; $rd2 read } e
|
||||
catch { $r2 exec; } e
|
||||
assert_match {EXECABORT*previous errors*} $e
|
||||
set xx [r get xx]
|
||||
# make sure that either the whole transcation passed or none of it (we actually expect none)
|
||||
assert { $xx == 1 || $xx == 3}
|
||||
# check that the connection is no longer in multi state
|
||||
$rd2 ping asdf
|
||||
set pong [$rd2 read]
|
||||
set pong [$r2 ping asdf]
|
||||
assert_equal $pong "asdf"
|
||||
$rd1 close; $r2 close
|
||||
}
|
||||
|
||||
test {just EXEC and script timeout} {
|
||||
# check that if EXEC arrives during timeout, we don't end up executing
|
||||
# actual commands during busy script, and also that we exit the multi state
|
||||
set rd1 [redis_deferring_client]
|
||||
set r2 [redis_client]
|
||||
r config set lua-time-limit 10
|
||||
r set xx 1
|
||||
catch { $r2 multi; } e
|
||||
catch { $r2 incr xx; } e
|
||||
$rd1 eval {while true do end} 0
|
||||
after 200
|
||||
catch { $r2 exec; } e
|
||||
assert_match {EXECABORT*BUSY*} $e
|
||||
r script kill
|
||||
after 200 ; # Give some time to Lua to call the hook again...
|
||||
set xx [r get xx]
|
||||
# make we didn't execute the transaction
|
||||
assert { $xx == 1}
|
||||
# check that the connection is no longer in multi state
|
||||
set pong [$r2 ping asdf]
|
||||
assert_equal $pong "asdf"
|
||||
$rd1 close; $r2 close
|
||||
}
|
||||
|
||||
test {exec with write commands and state change} {
|
||||
# check that exec that contains write commands fails if server state changed since they were queued
|
||||
set r1 [redis_client]
|
||||
r set xx 1
|
||||
r multi
|
||||
r incr xx
|
||||
$r1 config set min-replicas-to-write 2
|
||||
catch {r exec} e
|
||||
assert_match {*EXECABORT*NOREPLICAS*} $e
|
||||
set xx [r get xx]
|
||||
# make sure that the INCR wasn't executed
|
||||
assert { $xx == 1}
|
||||
$r1 config set min-replicas-to-write 0
|
||||
$r1 close;
|
||||
}
|
||||
|
||||
test {exec with read commands and stale replica state change} {
|
||||
# check that exec that contains read commands fails if server state changed since they were queued
|
||||
r config set replica-serve-stale-data no
|
||||
set r1 [redis_client]
|
||||
r set xx 1
|
||||
|
||||
# check that GET is disallowed on stale replica, even if the replica becomes stale only after queuing.
|
||||
r multi
|
||||
r get xx
|
||||
$r1 replicaof localhsot 0
|
||||
catch {r exec} e
|
||||
assert_match {*EXECABORT*MASTERDOWN*} $e
|
||||
|
||||
# check that PING is allowed
|
||||
r multi
|
||||
r ping
|
||||
$r1 replicaof localhsot 0
|
||||
set pong [r exec]
|
||||
assert {$pong == "PONG"}
|
||||
|
||||
# check that when replica is not stale, GET is allowed
|
||||
# while we're at it, let's check that multi is allowed on stale replica too
|
||||
r multi
|
||||
$r1 replicaof no one
|
||||
r get xx
|
||||
set xx [r exec]
|
||||
# make sure that the INCR was executed
|
||||
assert { $xx == 1 }
|
||||
$r1 close;
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user