From 57709c4bc663ddcb9313777c551a92dabf07095e Mon Sep 17 00:00:00 2001 From: Wang Yuan Date: Thu, 24 Sep 2020 21:01:41 +0800 Subject: [PATCH] Don't write replies if close the client ASAP (#7202) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Before this commit, we would have continued to add replies to the reply buffer even if client output buffer limit is reached, so the used memory would keep increasing over the configured limit. What's more, we shouldn’t write any reply to the client if it is set 'CLIENT_CLOSE_ASAP' flag because that doesn't conform to its definition and we will close all clients flagged with 'CLIENT_CLOSE_ASAP' in ‘beforeSleep’. Because of code execution order, before this, we may firstly write to part of the replies to the socket before disconnecting it, but in fact, we may can’t send the full replies to clients since OS socket buffer is limited. But this unexpected behavior makes some commands work well, for instance ACL DELUSER, if the client deletes the current user, we need to send reply to client and close the connection, but before, we close the client firstly and write the reply to reply buffer. secondly, we shouldn't do this despite the fact it works well in most cases. We add a flag 'CLIENT_CLOSE_AFTER_COMMAND' to mark clients, this flag means we will close the client after executing commands and send all entire replies, so that we can write replies to reply buffer during executing commands, send replies to clients, and close them later. We also fix some implicit problems. If client output buffer limit is enforced in 'multi/exec', all commands will be executed completely in redis and clients will not read any reply instead of partial replies. Even more, if the client executes 'ACL deluser' the using user in 'multi/exec', it will not read the replies after 'ACL deluser' just like before executing 'client kill' itself in 'multi/exec'. We added some tests for output buffer limit breach during multi-exec and using a pipeline of many small commands rather than one with big response. Co-authored-by: Oran Agra --- src/acl.c | 8 +++- src/module.c | 8 +++- src/networking.c | 14 ++++++ src/server.c | 7 +++ src/server.h | 2 + tests/unit/acl.tcl | 17 +++++++ tests/unit/obuf-limits.tcl | 90 ++++++++++++++++++++++++++++++++++++++ 7 files changed, 144 insertions(+), 2 deletions(-) diff --git a/src/acl.c b/src/acl.c index 7af3d6242..b4fb52c50 100644 --- a/src/acl.c +++ b/src/acl.c @@ -297,7 +297,13 @@ void ACLFreeUserAndKillClients(user *u) { * it in non authenticated mode. */ c->user = DefaultUser; c->authenticated = 0; - freeClientAsync(c); + /* We will write replies to this client later, so we can't + * close it directly even if async. */ + if (c == server.current_client) { + c->flags |= CLIENT_CLOSE_AFTER_COMMAND; + } else { + freeClientAsync(c); + } } } ACLFreeUser(u); diff --git a/src/module.c b/src/module.c index ddf8ee888..b76b57698 100644 --- a/src/module.c +++ b/src/module.c @@ -5538,7 +5538,13 @@ void revokeClientAuthentication(client *c) { c->user = DefaultUser; c->authenticated = 0; - freeClientAsync(c); + /* We will write replies to this client later, so we can't close it + * directly even if async. */ + if (c == server.current_client) { + c->flags |= CLIENT_CLOSE_AFTER_COMMAND; + } else { + freeClientAsync(c); + } } /* Cleanup all clients that have been authenticated with this module. This diff --git a/src/networking.c b/src/networking.c index 7ac8ed1c4..4cbddb660 100644 --- a/src/networking.c +++ b/src/networking.c @@ -224,6 +224,9 @@ int prepareClientToWrite(client *c) { * handler since there is no socket at all. */ if (c->flags & (CLIENT_LUA|CLIENT_MODULE)) return C_OK; + /* If CLIENT_CLOSE_ASAP flag is set, we need not write anything. */ + if (c->flags & CLIENT_CLOSE_ASAP) return C_ERR; + /* CLIENT REPLY OFF / SKIP handling: don't send replies. */ if (c->flags & (CLIENT_REPLY_OFF|CLIENT_REPLY_SKIP)) return C_ERR; @@ -1471,6 +1474,9 @@ int handleClientsWithPendingWrites(void) { * that may trigger write error or recreate handler. */ if (c->flags & CLIENT_PROTECTED) continue; + /* Don't write to clients that are going to be closed anyway. */ + if (c->flags & CLIENT_CLOSE_ASAP) continue; + /* Try to write buffers to the client socket. */ if (writeToClient(c,0) == C_ERR) continue; @@ -3160,6 +3166,14 @@ int handleClientsWithPendingWritesUsingThreads(void) { while((ln = listNext(&li))) { client *c = listNodeValue(ln); c->flags &= ~CLIENT_PENDING_WRITE; + + /* Remove clients from the list of pending writes since + * they are going to be closed ASAP. */ + if (c->flags & CLIENT_CLOSE_ASAP) { + listDelNode(server.clients_pending_write, ln); + continue; + } + int target_id = item_id % server.io_threads_num; listAddNodeTail(io_threads_list[target_id],c); item_id++; diff --git a/src/server.c b/src/server.c index 6179d2f29..24238c595 100644 --- a/src/server.c +++ b/src/server.c @@ -3427,6 +3427,13 @@ void call(client *c, int flags) { dirty = server.dirty-dirty; if (dirty < 0) dirty = 0; + /* After executing command, we will close the client after writing entire + * reply if it is set 'CLIENT_CLOSE_AFTER_COMMAND' flag. */ + if (c->flags & CLIENT_CLOSE_AFTER_COMMAND) { + c->flags &= ~CLIENT_CLOSE_AFTER_COMMAND; + c->flags |= CLIENT_CLOSE_AFTER_REPLY; + } + /* When EVAL is called loading the AOF we don't want commands called * from Lua to go into the slowlog or to populate statistics. */ if (server.loading && c->flags & CLIENT_LUA) diff --git a/src/server.h b/src/server.h index 685f929f5..5c266f61d 100644 --- a/src/server.h +++ b/src/server.h @@ -265,6 +265,8 @@ extern int configOOMScoreAdjValuesDefaults[CONFIG_OOM_COUNT]; about writes performed by myself.*/ #define CLIENT_IN_TO_TABLE (1ULL<<38) /* This client is in the timeout table. */ #define CLIENT_PROTOCOL_ERROR (1ULL<<39) /* Protocol error chatting with it. */ +#define CLIENT_CLOSE_AFTER_COMMAND (1ULL<<40) /* Close after executing commands + * and writing entire reply. */ /* Client block type (btype field in client structure) * if CLIENT_BLOCKED flag is set. */ diff --git a/tests/unit/acl.tcl b/tests/unit/acl.tcl index 381f2f95f..f015f75a0 100644 --- a/tests/unit/acl.tcl +++ b/tests/unit/acl.tcl @@ -260,6 +260,23 @@ start_server {tags {"acl"}} { catch {r ACL help xxx} e assert_match "*Unknown subcommand or wrong number of arguments*" $e } + + test {Delete a user that the client doesn't use} { + r ACL setuser not_used on >passwd + assert {[r ACL deluser not_used] == 1} + # The client is not closed + assert {[r ping] eq {PONG}} + } + + test {Delete a user that the client is using} { + r ACL setuser using on +acl >passwd + r AUTH using passwd + # The client will receive reply normally + assert {[r ACL deluser using] == 1} + # The client is closed + catch {[r ping]} e + assert_match "*I/O error*" $e + } } set server_path [tmpdir "server.acl"] diff --git a/tests/unit/obuf-limits.tcl b/tests/unit/obuf-limits.tcl index c45bf8e86..20ba32fd5 100644 --- a/tests/unit/obuf-limits.tcl +++ b/tests/unit/obuf-limits.tcl @@ -70,4 +70,94 @@ start_server {tags {"obuf-limits"}} { assert {$omem >= 100000 && $time_elapsed < 6} $rd1 close } + + test {No response for single command if client output buffer hard limit is enforced} { + r config set client-output-buffer-limit {normal 100000 0 0} + # Total size of all items must be more than 100k + set item [string repeat "x" 1000] + for {set i 0} {$i < 150} {incr i} { + r lpush mylist $item + } + set orig_mem [s used_memory] + # Set client name and get all items + set rd [redis_deferring_client] + $rd client setname mybiglist + assert {[$rd read] eq "OK"} + $rd lrange mylist 0 -1 + $rd flush + after 100 + + # Before we read reply, redis will close this client. + set clients [r client list] + assert_no_match "*name=mybiglist*" $clients + set cur_mem [s used_memory] + # 10k just is a deviation threshold + assert {$cur_mem < 10000 + $orig_mem} + + # Read nothing + set fd [$rd channel] + assert_equal {} [read $fd] + } + + test {No response for multi commands in pipeline if client output buffer limit is enforced} { + r config set client-output-buffer-limit {normal 100000 0 0} + set value [string repeat "x" 10000] + r set bigkey $value + set rd1 [redis_deferring_client] + set rd2 [redis_deferring_client] + $rd2 client setname multicommands + assert_equal "OK" [$rd2 read] + # Let redis sleep 2s firstly + $rd1 debug sleep 2 + $rd1 flush + after 100 + + # Total size should be less than OS socket buffer, redis can + # execute all commands in this pipeline when it wakes up. + for {set i 0} {$i < 15} {incr i} { + $rd2 set $i $i + $rd2 get $i + $rd2 del $i + # One bigkey is 10k, total response size must be more than 100k + $rd2 get bigkey + } + $rd2 flush + after 100 + + # Reds must wake up if it can send reply + assert_equal "PONG" [r ping] + set clients [r client list] + assert_no_match "*name=multicommands*" $clients + set fd [$rd2 channel] + assert_equal {} [read $fd] + } + + test {Execute transactions completely even if client output buffer limit is enforced} { + r config set client-output-buffer-limit {normal 100000 0 0} + # Total size of all items must be more than 100k + set item [string repeat "x" 1000] + for {set i 0} {$i < 150} {incr i} { + r lpush mylist2 $item + } + + # Output buffer limit is enforced during executing transaction + r client setname transactionclient + r set k1 v1 + r multi + r set k2 v2 + r get k2 + r lrange mylist2 0 -1 + r set k3 v3 + r del k1 + catch {[r exec]} e + assert_match "*I/O error*" $e + reconnect + set clients [r client list] + assert_no_match "*name=transactionclient*" $clients + + # Transactions should be executed completely + assert_equal {} [r get k1] + assert_equal "v2" [r get k2] + assert_equal "v3" [r get k3] + } }