Streams: When XREAD blocks without COUNT, set a default one.

A client may lose a lot of time between invocations of blocking XREAD,
for example because it is processing the messages or for any other
cause. When it returns back, it may provide a low enough message ID that
the server will block to send an unreasonable number of messages in a
single call. For this reason we set a COUNT when the client is blocked
with XREAD calls, even if no COUNT is given. This is arbitrarily set to
1000 because it's enough to avoid slowing down the reception of many
messages, but low enough to avoid to block.
This commit is contained in:
antirez 2017-09-11 11:20:36 +02:00
parent c128190026
commit db89f7474d

View File

@ -399,6 +399,7 @@ void xlenCommand(client *c) {
/* XREAD [BLOCK <milliseconds>] [COUNT <count>] [GROUP <groupname> <ttl>]
* [RETRY <milliseconds> <ttl>] STREAMS key_1 key_2 ... key_N
* ID_1 ID_2 ... ID_N */
#define XREAD_BLOCKED_DEFAULT_COUNT 1000
void xreadCommand(client *c) {
long long timeout = -1; /* -1 means, no BLOCK argument given. */
long long count = 0;
@ -510,6 +511,11 @@ void xreadCommand(client *c) {
}
blockForKeys(c, BLOCKED_STREAM, c->argv+streams_arg, streams_count,
timeout, NULL, ids);
/* If no COUNT is given and we block, set a relatively small count:
* in case the ID provided is too low, we do not want the server to
* block just to serve this client a huge stream of messages. */
c->bpop.xread_count = count ? count : XREAD_BLOCKED_DEFAULT_COUNT;
c->bpop.xread_group = NULL; /* Not used for now. */
goto cleanup;
}