BLPOP timeouts implemented

This commit is contained in:
antirez 2009-12-29 16:26:05 -05:00
parent 95242ab507
commit f86a74e944

19
redis.c
View File

@ -332,6 +332,7 @@ struct redisServer {
int replstate; int replstate;
unsigned int maxclients; unsigned int maxclients;
unsigned long maxmemory; unsigned long maxmemory;
unsigned int blockedclients;
/* Sort parameters - qsort_r() is only available under BSD so we /* Sort parameters - qsort_r() is only available under BSD so we
* have to take this state global, in order to pass it to sortCompare() */ * have to take this state global, in order to pass it to sortCompare() */
int sort_desc; int sort_desc;
@ -892,11 +893,18 @@ static void closeTimedoutClients(void) {
listRewind(server.clients); listRewind(server.clients);
while ((ln = listYield(server.clients)) != NULL) { while ((ln = listYield(server.clients)) != NULL) {
c = listNodeValue(ln); c = listNodeValue(ln);
if (!(c->flags & REDIS_SLAVE) && /* no timeout for slaves */ if (server.maxidletime &&
!(c->flags & REDIS_SLAVE) && /* no timeout for slaves */
!(c->flags & REDIS_MASTER) && /* no timeout for masters */ !(c->flags & REDIS_MASTER) && /* no timeout for masters */
(now - c->lastinteraction > server.maxidletime)) { (now - c->lastinteraction > server.maxidletime))
{
redisLog(REDIS_DEBUG,"Closing idle client"); redisLog(REDIS_DEBUG,"Closing idle client");
freeClient(c); freeClient(c);
} else if (c->flags & REDIS_BLOCKED) {
if (c->blockingto < now) {
addReply(c,shared.nullbulk);
unblockClient(c);
}
} }
} }
} }
@ -1050,7 +1058,7 @@ static int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientD
} }
/* Close connections of timedout clients */ /* Close connections of timedout clients */
if (server.maxidletime && !(loops % 10)) if ((server.maxidletime && !(loops % 10)) || server.blockedclients)
closeTimedoutClients(); closeTimedoutClients();
/* Check if a background saving or AOF rewrite in progress terminated */ /* Check if a background saving or AOF rewrite in progress terminated */
@ -1196,6 +1204,7 @@ static void initServerConfig() {
server.rdbcompression = 1; server.rdbcompression = 1;
server.sharingpoolsize = 1024; server.sharingpoolsize = 1024;
server.maxclients = 0; server.maxclients = 0;
server.blockedclients = 0;
server.maxmemory = 0; server.maxmemory = 0;
resetServerSaveParams(); resetServerSaveParams();
@ -5206,6 +5215,7 @@ static sds genRedisInfoString(void) {
"uptime_in_days:%ld\r\n" "uptime_in_days:%ld\r\n"
"connected_clients:%d\r\n" "connected_clients:%d\r\n"
"connected_slaves:%d\r\n" "connected_slaves:%d\r\n"
"blocked_clients:%d\r\n"
"used_memory:%zu\r\n" "used_memory:%zu\r\n"
"changes_since_last_save:%lld\r\n" "changes_since_last_save:%lld\r\n"
"bgsave_in_progress:%d\r\n" "bgsave_in_progress:%d\r\n"
@ -5221,6 +5231,7 @@ static sds genRedisInfoString(void) {
uptime/(3600*24), uptime/(3600*24),
listLength(server.clients)-listLength(server.slaves), listLength(server.clients)-listLength(server.slaves),
listLength(server.slaves), listLength(server.slaves),
server.blockedclients,
server.usedmemory, server.usedmemory,
server.dirty, server.dirty,
server.bgsavechildpid != -1, server.bgsavechildpid != -1,
@ -5502,6 +5513,7 @@ static void blockForKey(redisClient *c, robj *key, time_t timeout) {
listAddNodeTail(l,c); listAddNodeTail(l,c);
c->flags |= REDIS_BLOCKED; c->flags |= REDIS_BLOCKED;
aeDeleteFileEvent(server.el,c->fd,AE_READABLE); aeDeleteFileEvent(server.el,c->fd,AE_READABLE);
server.blockedclients++;
} }
/* Unblock a client that's waiting in a blocking operation such as BLPOP */ /* Unblock a client that's waiting in a blocking operation such as BLPOP */
@ -5522,6 +5534,7 @@ static void unblockClient(redisClient *c) {
decrRefCount(c->blockingkey); decrRefCount(c->blockingkey);
c->blockingkey = NULL; c->blockingkey = NULL;
c->flags &= (~REDIS_BLOCKED); c->flags &= (~REDIS_BLOCKED);
server.blockedclients--;
/* Ok now we are ready to get read events from socket, note that we /* Ok now we are ready to get read events from socket, note that we
* can't trap errors here as it's possible that unblockClients() is * can't trap errors here as it's possible that unblockClients() is
* called from freeClient() itself, and the only thing we can do * called from freeClient() itself, and the only thing we can do