mirror of
https://codeberg.org/redict/redict.git
synced 2025-01-22 08:08:53 -05:00
timeout.c created: move client timeouts code there.
This commit is contained in:
parent
0e22cb2680
commit
dd7e61d77f
@ -206,7 +206,7 @@ endif
|
||||
|
||||
REDIS_SERVER_NAME=redis-server
|
||||
REDIS_SENTINEL_NAME=redis-sentinel
|
||||
REDIS_SERVER_OBJ=adlist.o quicklist.o ae.o anet.o dict.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o crc16.o endianconv.o slowlog.o scripting.o bio.o rio.o rand.o memtest.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o redis-check-rdb.o redis-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o lolwut6.o acl.o gopher.o tracking.o connection.o tls.o sha256.o
|
||||
REDIS_SERVER_OBJ=adlist.o quicklist.o ae.o anet.o dict.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o crc16.o endianconv.o slowlog.o scripting.o bio.o rio.o rand.o memtest.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o redis-check-rdb.o redis-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o lolwut6.o acl.o gopher.o tracking.o connection.o tls.o sha256.o timeout.o
|
||||
REDIS_CLI_NAME=redis-cli
|
||||
REDIS_CLI_OBJ=anet.o adlist.o dict.o redis-cli.o zmalloc.o release.o ae.o crc64.o siphash.o crc16.o
|
||||
REDIS_BENCHMARK_NAME=redis-benchmark
|
||||
|
@ -31,9 +31,6 @@
|
||||
*
|
||||
* API:
|
||||
*
|
||||
* getTimeoutFromObjectOrReply() is just an utility function to parse a
|
||||
* timeout argument since blocking operations usually require a timeout.
|
||||
*
|
||||
* blockClient() set the CLIENT_BLOCKED flag in the client, and set the
|
||||
* specified block type 'btype' filed to one of BLOCKED_* macros.
|
||||
*
|
||||
@ -67,42 +64,6 @@
|
||||
|
||||
int serveClientBlockedOnList(client *receiver, robj *key, robj *dstkey, redisDb *db, robj *value, int where);
|
||||
|
||||
/* Get a timeout value from an object and store it into 'timeout'.
|
||||
* The final timeout is always stored as milliseconds as a time where the
|
||||
* timeout will expire, however the parsing is performed according to
|
||||
* the 'unit' that can be seconds or milliseconds.
|
||||
*
|
||||
* Note that if the timeout is zero (usually from the point of view of
|
||||
* commands API this means no timeout) the value stored into 'timeout'
|
||||
* is zero. */
|
||||
int getTimeoutFromObjectOrReply(client *c, robj *object, mstime_t *timeout, int unit) {
|
||||
long long tval;
|
||||
long double ftval;
|
||||
|
||||
if (unit == UNIT_SECONDS) {
|
||||
if (getLongDoubleFromObjectOrReply(c,object,&ftval,
|
||||
"timeout is not an float or out of range") != C_OK)
|
||||
return C_ERR;
|
||||
tval = (long long) (ftval * 1000.0);
|
||||
} else {
|
||||
if (getLongLongFromObjectOrReply(c,object,&tval,
|
||||
"timeout is not an integer or out of range") != C_OK)
|
||||
return C_ERR;
|
||||
}
|
||||
|
||||
if (tval < 0) {
|
||||
addReplyError(c,"timeout is negative");
|
||||
return C_ERR;
|
||||
}
|
||||
|
||||
if (tval > 0) {
|
||||
tval += mstime();
|
||||
}
|
||||
*timeout = tval;
|
||||
|
||||
return C_OK;
|
||||
}
|
||||
|
||||
/* Block a client for the specific operation type. Once the CLIENT_BLOCKED
|
||||
* flag is set client query buffer is not longer processed, but accumulated,
|
||||
* and will be processed when the client is unblocked. */
|
||||
|
128
src/server.c
128
src/server.c
@ -1473,132 +1473,6 @@ int allPersistenceDisabled(void) {
|
||||
return server.saveparamslen == 0 && server.aof_state == AOF_OFF;
|
||||
}
|
||||
|
||||
/* ========================== Clients timeouts ============================= */
|
||||
|
||||
/* Check if this blocked client timedout (does nothing if the client is
|
||||
* not blocked right now). If so send a reply, unblock it, and return 1.
|
||||
* Otherwise 0 is returned and no operation is performed. */
|
||||
int checkBlockedClientTimeout(client *c, mstime_t now) {
|
||||
if (c->flags & CLIENT_BLOCKED &&
|
||||
c->bpop.timeout != 0
|
||||
&& c->bpop.timeout < now)
|
||||
{
|
||||
/* Handle blocking operation specific timeout. */
|
||||
replyToBlockedClientTimedOut(c);
|
||||
unblockClient(c);
|
||||
return 1;
|
||||
} else {
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
/* Check for timeouts. Returns non-zero if the client was terminated.
|
||||
* The function gets the current time in milliseconds as argument since
|
||||
* it gets called multiple times in a loop, so calling gettimeofday() for
|
||||
* each iteration would be costly without any actual gain. */
|
||||
int clientsCronHandleTimeout(client *c, mstime_t now_ms) {
|
||||
time_t now = now_ms/1000;
|
||||
|
||||
if (server.maxidletime &&
|
||||
/* This handles the idle clients connection timeout if set. */
|
||||
!(c->flags & CLIENT_SLAVE) && /* No timeout for slaves and monitors */
|
||||
!(c->flags & CLIENT_MASTER) && /* No timeout for masters */
|
||||
!(c->flags & CLIENT_BLOCKED) && /* No timeout for BLPOP */
|
||||
!(c->flags & CLIENT_PUBSUB) && /* No timeout for Pub/Sub clients */
|
||||
(now - c->lastinteraction > server.maxidletime))
|
||||
{
|
||||
serverLog(LL_VERBOSE,"Closing idle client");
|
||||
freeClient(c);
|
||||
return 1;
|
||||
} else if (c->flags & CLIENT_BLOCKED) {
|
||||
/* Cluster: handle unblock & redirect of clients blocked
|
||||
* into keys no longer served by this server. */
|
||||
if (server.cluster_enabled) {
|
||||
if (clusterRedirectBlockedClientIfNeeded(c))
|
||||
unblockClient(c);
|
||||
}
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
/* For blocked clients timeouts we populate a radix tree of 128 bit keys
|
||||
* composed as such:
|
||||
*
|
||||
* [8 byte big endian expire time]+[8 byte client ID]
|
||||
*
|
||||
* We don't do any cleanup in the Radix tree: when we run the clients that
|
||||
* reached the timeout already, if they are no longer existing or no longer
|
||||
* blocked with such timeout, we just go forward.
|
||||
*
|
||||
* Every time a client blocks with a timeout, we add the client in
|
||||
* the tree. In beforeSleep() we call clientsHandleTimeout() to run
|
||||
* the tree and unblock the clients. */
|
||||
|
||||
#define CLIENT_ST_KEYLEN 16 /* 8 bytes mstime + 8 bytes client ID. */
|
||||
|
||||
/* Given client ID and timeout, write the resulting radix tree key in buf. */
|
||||
void encodeTimeoutKey(unsigned char *buf, uint64_t timeout, uint64_t id) {
|
||||
timeout = htonu64(timeout);
|
||||
memcpy(buf,&timeout,sizeof(timeout));
|
||||
memcpy(buf+8,&id,sizeof(id));
|
||||
}
|
||||
|
||||
/* Given a key encoded with encodeTimeoutKey(), resolve the fields and write
|
||||
* the timeout into *toptr and the client ID into *idptr. */
|
||||
void decodeTimeoutKey(unsigned char *buf, uint64_t *toptr, uint64_t *idptr) {
|
||||
memcpy(toptr,buf,sizeof(*toptr));
|
||||
*toptr = ntohu64(*toptr);
|
||||
memcpy(idptr,buf+8,sizeof(*idptr));
|
||||
}
|
||||
|
||||
/* Add the specified client id / timeout as a key in the radix tree we use
|
||||
* to handle blocked clients timeouts. The client is not added to the list
|
||||
* if its timeout is zero (block forever). */
|
||||
void addClientToTimeoutTable(client *c) {
|
||||
if (c->bpop.timeout == 0) return;
|
||||
uint64_t timeout = c->bpop.timeout;
|
||||
uint64_t id = c->id;
|
||||
unsigned char buf[CLIENT_ST_KEYLEN];
|
||||
encodeTimeoutKey(buf,timeout,id);
|
||||
if (raxTryInsert(server.clients_timeout_table,buf,sizeof(buf),NULL,NULL))
|
||||
c->flags |= CLIENT_IN_TO_TABLE;
|
||||
}
|
||||
|
||||
/* Remove the client from the table when it is unblocked for reasons
|
||||
* different than timing out. */
|
||||
void removeClientFromTimeoutTable(client *c) {
|
||||
if (!(c->flags & CLIENT_IN_TO_TABLE)) return;
|
||||
c->flags &= ~CLIENT_IN_TO_TABLE;
|
||||
uint64_t timeout = c->bpop.timeout;
|
||||
uint64_t id = c->id;
|
||||
unsigned char buf[CLIENT_ST_KEYLEN];
|
||||
encodeTimeoutKey(buf,timeout,id);
|
||||
raxRemove(server.clients_timeout_table,buf,sizeof(buf),NULL);
|
||||
}
|
||||
|
||||
/* This function is called in beforeSleep() in order to unblock clients
|
||||
* that are waiting in blocking operations with a timeout set. */
|
||||
void clientsHandleTimeout(void) {
|
||||
if (raxSize(server.clients_timeout_table) == 0) return;
|
||||
uint64_t now = mstime();
|
||||
raxIterator ri;
|
||||
raxStart(&ri,server.clients_timeout_table);
|
||||
raxSeek(&ri,"^",NULL,0);
|
||||
|
||||
while(raxNext(&ri)) {
|
||||
uint64_t id, timeout;
|
||||
decodeTimeoutKey(ri.key,&timeout,&id);
|
||||
if (timeout >= now) break; /* All the timeouts are in the future. */
|
||||
client *c = lookupClientByID(id);
|
||||
if (c) {
|
||||
c->flags &= ~CLIENT_IN_TO_TABLE;
|
||||
checkBlockedClientTimeout(c,now);
|
||||
}
|
||||
raxRemove(server.clients_timeout_table,ri.key,ri.key_len,NULL);
|
||||
raxSeek(&ri,"^",NULL,0);
|
||||
}
|
||||
}
|
||||
|
||||
/* ======================= Cron: called every 100 ms ======================== */
|
||||
|
||||
/* Add a sample to the operations per second array of samples. */
|
||||
@ -2183,7 +2057,7 @@ void beforeSleep(struct aeEventLoop *eventLoop) {
|
||||
UNUSED(eventLoop);
|
||||
|
||||
/* Handle precise timeouts of blocked clients. */
|
||||
clientsHandleTimeout();
|
||||
handleBlockedClientsTimeout();
|
||||
|
||||
/* We should handle pending reads clients ASAP after event loop. */
|
||||
handleClientsWithPendingReadsUsingThreads();
|
||||
|
@ -2138,8 +2138,12 @@ void disconnectAllBlockedClients(void);
|
||||
void handleClientsBlockedOnKeys(void);
|
||||
void signalKeyAsReady(redisDb *db, robj *key);
|
||||
void blockForKeys(client *c, int btype, robj **keys, int numkeys, mstime_t timeout, robj *target, streamID *ids);
|
||||
|
||||
/* timeout.c -- Blocked clients timeout and connections timeout. */
|
||||
void addClientToTimeoutTable(client *c);
|
||||
void removeClientFromTimeoutTable(client *c);
|
||||
void handleBlockedClientsTimeout(void);
|
||||
int clientsCronHandleTimeout(client *c, mstime_t now_ms);
|
||||
|
||||
/* expire.c -- Handling of expired keys */
|
||||
void activeExpireCycle(int type);
|
||||
|
192
src/timeout.c
Normal file
192
src/timeout.c
Normal file
@ -0,0 +1,192 @@
|
||||
/* Copyright (c) 2009-2020, Salvatore Sanfilippo <antirez at gmail dot com>
|
||||
* All rights reserved.
|
||||
*
|
||||
* Redistribution and use in source and binary forms, with or without
|
||||
* modification, are permitted provided that the following conditions are met:
|
||||
*
|
||||
* * Redistributions of source code must retain the above copyright notice,
|
||||
* this list of conditions and the following disclaimer.
|
||||
* * Redistributions in binary form must reproduce the above copyright
|
||||
* notice, this list of conditions and the following disclaimer in the
|
||||
* documentation and/or other materials provided with the distribution.
|
||||
* * Neither the name of Redis nor the names of its contributors may be used
|
||||
* to endorse or promote products derived from this software without
|
||||
* specific prior written permission.
|
||||
*
|
||||
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
|
||||
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
|
||||
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
|
||||
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
|
||||
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
|
||||
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
|
||||
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
|
||||
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
|
||||
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
|
||||
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
|
||||
* POSSIBILITY OF SUCH DAMAGE.
|
||||
*/
|
||||
|
||||
#include "server.h"
|
||||
#include "cluster.h"
|
||||
|
||||
/* ========================== Clients timeouts ============================= */
|
||||
|
||||
/* Check if this blocked client timedout (does nothing if the client is
|
||||
* not blocked right now). If so send a reply, unblock it, and return 1.
|
||||
* Otherwise 0 is returned and no operation is performed. */
|
||||
int checkBlockedClientTimeout(client *c, mstime_t now) {
|
||||
if (c->flags & CLIENT_BLOCKED &&
|
||||
c->bpop.timeout != 0
|
||||
&& c->bpop.timeout < now)
|
||||
{
|
||||
/* Handle blocking operation specific timeout. */
|
||||
replyToBlockedClientTimedOut(c);
|
||||
unblockClient(c);
|
||||
return 1;
|
||||
} else {
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
/* Check for timeouts. Returns non-zero if the client was terminated.
|
||||
* The function gets the current time in milliseconds as argument since
|
||||
* it gets called multiple times in a loop, so calling gettimeofday() for
|
||||
* each iteration would be costly without any actual gain. */
|
||||
int clientsCronHandleTimeout(client *c, mstime_t now_ms) {
|
||||
time_t now = now_ms/1000;
|
||||
|
||||
if (server.maxidletime &&
|
||||
/* This handles the idle clients connection timeout if set. */
|
||||
!(c->flags & CLIENT_SLAVE) && /* No timeout for slaves and monitors */
|
||||
!(c->flags & CLIENT_MASTER) && /* No timeout for masters */
|
||||
!(c->flags & CLIENT_BLOCKED) && /* No timeout for BLPOP */
|
||||
!(c->flags & CLIENT_PUBSUB) && /* No timeout for Pub/Sub clients */
|
||||
(now - c->lastinteraction > server.maxidletime))
|
||||
{
|
||||
serverLog(LL_VERBOSE,"Closing idle client");
|
||||
freeClient(c);
|
||||
return 1;
|
||||
} else if (c->flags & CLIENT_BLOCKED) {
|
||||
/* Cluster: handle unblock & redirect of clients blocked
|
||||
* into keys no longer served by this server. */
|
||||
if (server.cluster_enabled) {
|
||||
if (clusterRedirectBlockedClientIfNeeded(c))
|
||||
unblockClient(c);
|
||||
}
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
/* For blocked clients timeouts we populate a radix tree of 128 bit keys
|
||||
* composed as such:
|
||||
*
|
||||
* [8 byte big endian expire time]+[8 byte client ID]
|
||||
*
|
||||
* We don't do any cleanup in the Radix tree: when we run the clients that
|
||||
* reached the timeout already, if they are no longer existing or no longer
|
||||
* blocked with such timeout, we just go forward.
|
||||
*
|
||||
* Every time a client blocks with a timeout, we add the client in
|
||||
* the tree. In beforeSleep() we call handleBlockedClientsTimeout() to run
|
||||
* the tree and unblock the clients. */
|
||||
|
||||
#define CLIENT_ST_KEYLEN 16 /* 8 bytes mstime + 8 bytes client ID. */
|
||||
|
||||
/* Given client ID and timeout, write the resulting radix tree key in buf. */
|
||||
void encodeTimeoutKey(unsigned char *buf, uint64_t timeout, uint64_t id) {
|
||||
timeout = htonu64(timeout);
|
||||
memcpy(buf,&timeout,sizeof(timeout));
|
||||
memcpy(buf+8,&id,sizeof(id));
|
||||
}
|
||||
|
||||
/* Given a key encoded with encodeTimeoutKey(), resolve the fields and write
|
||||
* the timeout into *toptr and the client ID into *idptr. */
|
||||
void decodeTimeoutKey(unsigned char *buf, uint64_t *toptr, uint64_t *idptr) {
|
||||
memcpy(toptr,buf,sizeof(*toptr));
|
||||
*toptr = ntohu64(*toptr);
|
||||
memcpy(idptr,buf+8,sizeof(*idptr));
|
||||
}
|
||||
|
||||
/* Add the specified client id / timeout as a key in the radix tree we use
|
||||
* to handle blocked clients timeouts. The client is not added to the list
|
||||
* if its timeout is zero (block forever). */
|
||||
void addClientToTimeoutTable(client *c) {
|
||||
if (c->bpop.timeout == 0) return;
|
||||
uint64_t timeout = c->bpop.timeout;
|
||||
uint64_t id = c->id;
|
||||
unsigned char buf[CLIENT_ST_KEYLEN];
|
||||
encodeTimeoutKey(buf,timeout,id);
|
||||
if (raxTryInsert(server.clients_timeout_table,buf,sizeof(buf),NULL,NULL))
|
||||
c->flags |= CLIENT_IN_TO_TABLE;
|
||||
}
|
||||
|
||||
/* Remove the client from the table when it is unblocked for reasons
|
||||
* different than timing out. */
|
||||
void removeClientFromTimeoutTable(client *c) {
|
||||
if (!(c->flags & CLIENT_IN_TO_TABLE)) return;
|
||||
c->flags &= ~CLIENT_IN_TO_TABLE;
|
||||
uint64_t timeout = c->bpop.timeout;
|
||||
uint64_t id = c->id;
|
||||
unsigned char buf[CLIENT_ST_KEYLEN];
|
||||
encodeTimeoutKey(buf,timeout,id);
|
||||
raxRemove(server.clients_timeout_table,buf,sizeof(buf),NULL);
|
||||
}
|
||||
|
||||
/* This function is called in beforeSleep() in order to unblock clients
|
||||
* that are waiting in blocking operations with a timeout set. */
|
||||
void handleBlockedClientsTimeout(void) {
|
||||
if (raxSize(server.clients_timeout_table) == 0) return;
|
||||
uint64_t now = mstime();
|
||||
raxIterator ri;
|
||||
raxStart(&ri,server.clients_timeout_table);
|
||||
raxSeek(&ri,"^",NULL,0);
|
||||
|
||||
while(raxNext(&ri)) {
|
||||
uint64_t id, timeout;
|
||||
decodeTimeoutKey(ri.key,&timeout,&id);
|
||||
if (timeout >= now) break; /* All the timeouts are in the future. */
|
||||
client *c = lookupClientByID(id);
|
||||
if (c) {
|
||||
c->flags &= ~CLIENT_IN_TO_TABLE;
|
||||
checkBlockedClientTimeout(c,now);
|
||||
}
|
||||
raxRemove(server.clients_timeout_table,ri.key,ri.key_len,NULL);
|
||||
raxSeek(&ri,"^",NULL,0);
|
||||
}
|
||||
}
|
||||
|
||||
/* Get a timeout value from an object and store it into 'timeout'.
|
||||
* The final timeout is always stored as milliseconds as a time where the
|
||||
* timeout will expire, however the parsing is performed according to
|
||||
* the 'unit' that can be seconds or milliseconds.
|
||||
*
|
||||
* Note that if the timeout is zero (usually from the point of view of
|
||||
* commands API this means no timeout) the value stored into 'timeout'
|
||||
* is zero. */
|
||||
int getTimeoutFromObjectOrReply(client *c, robj *object, mstime_t *timeout, int unit) {
|
||||
long long tval;
|
||||
long double ftval;
|
||||
|
||||
if (unit == UNIT_SECONDS) {
|
||||
if (getLongDoubleFromObjectOrReply(c,object,&ftval,
|
||||
"timeout is not an float or out of range") != C_OK)
|
||||
return C_ERR;
|
||||
tval = (long long) (ftval * 1000.0);
|
||||
} else {
|
||||
if (getLongLongFromObjectOrReply(c,object,&tval,
|
||||
"timeout is not an integer or out of range") != C_OK)
|
||||
return C_ERR;
|
||||
}
|
||||
|
||||
if (tval < 0) {
|
||||
addReplyError(c,"timeout is negative");
|
||||
return C_ERR;
|
||||
}
|
||||
|
||||
if (tval > 0) {
|
||||
tval += mstime();
|
||||
}
|
||||
*timeout = tval;
|
||||
|
||||
return C_OK;
|
||||
}
|
Loading…
Reference in New Issue
Block a user