From db89f7474d3f4c784bfd8757b2bd3321e3efd9a1 Mon Sep 17 00:00:00 2001 From: antirez Date: Mon, 11 Sep 2017 11:20:36 +0200 Subject: [PATCH] Streams: When XREAD blocks without COUNT, set a default one. A client may lose a lot of time between invocations of blocking XREAD, for example because it is processing the messages or for any other cause. When it returns back, it may provide a low enough message ID that the server will block to send an unreasonable number of messages in a single call. For this reason we set a COUNT when the client is blocked with XREAD calls, even if no COUNT is given. This is arbitrarily set to 1000 because it's enough to avoid slowing down the reception of many messages, but low enough to avoid to block. --- src/t_stream.c | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/t_stream.c b/src/t_stream.c index c47c5dde1..1836ae735 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -399,6 +399,7 @@ void xlenCommand(client *c) { /* XREAD [BLOCK ] [COUNT ] [GROUP ] * [RETRY ] STREAMS key_1 key_2 ... key_N * ID_1 ID_2 ... ID_N */ +#define XREAD_BLOCKED_DEFAULT_COUNT 1000 void xreadCommand(client *c) { long long timeout = -1; /* -1 means, no BLOCK argument given. */ long long count = 0; @@ -510,6 +511,11 @@ void xreadCommand(client *c) { } blockForKeys(c, BLOCKED_STREAM, c->argv+streams_arg, streams_count, timeout, NULL, ids); + /* If no COUNT is given and we block, set a relatively small count: + * in case the ID provided is too low, we do not want the server to + * block just to serve this client a huge stream of messages. */ + c->bpop.xread_count = count ? count : XREAD_BLOCKED_DEFAULT_COUNT; + c->bpop.xread_group = NULL; /* Not used for now. */ goto cleanup; }