mirror of
https://codeberg.org/redict/redict.git
synced 2025-01-22 08:08:53 -05:00
BLPOP blocking code refactored to be generic & reusable.
This commit is contained in:
parent
2e027c48e5
commit
82b672f633
@ -103,7 +103,7 @@ endif
|
||||
|
||||
REDIS_SERVER_NAME=redis-server
|
||||
REDIS_SENTINEL_NAME=redis-sentinel
|
||||
REDIS_SERVER_OBJ=adlist.o ae.o anet.o dict.o redis.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o crc16.o endianconv.o slowlog.o scripting.o bio.o rio.o rand.o memtest.o crc64.o bitops.o sentinel.o notify.o setproctitle.o
|
||||
REDIS_SERVER_OBJ=adlist.o ae.o anet.o dict.o redis.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o crc16.o endianconv.o slowlog.o scripting.o bio.o rio.o rand.o memtest.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o
|
||||
REDIS_CLI_NAME=redis-cli
|
||||
REDIS_CLI_OBJ=anet.o sds.o adlist.o redis-cli.o zmalloc.o release.o anet.o ae.o crc64.o
|
||||
REDIS_BENCHMARK_NAME=redis-benchmark
|
||||
|
@ -449,6 +449,7 @@ struct redisClient *createFakeClient(void) {
|
||||
c->argv = NULL;
|
||||
c->bufpos = 0;
|
||||
c->flags = 0;
|
||||
c->btype = REDIS_BLOCKED_NONE;
|
||||
/* We set the fake client as a slave waiting for the synchronization
|
||||
* so that Redis will not try to send replies to this client. */
|
||||
c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
|
||||
|
121
src/blocked.c
Normal file
121
src/blocked.c
Normal file
@ -0,0 +1,121 @@
|
||||
/* blocked.c - generic support for blocking operations like BLPOP & WAIT.
|
||||
*
|
||||
* Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
|
||||
* All rights reserved.
|
||||
*
|
||||
* Redistribution and use in source and binary forms, with or without
|
||||
* modification, are permitted provided that the following conditions are met:
|
||||
*
|
||||
* * Redistributions of source code must retain the above copyright notice,
|
||||
* this list of conditions and the following disclaimer.
|
||||
* * Redistributions in binary form must reproduce the above copyright
|
||||
* notice, this list of conditions and the following disclaimer in the
|
||||
* documentation and/or other materials provided with the distribution.
|
||||
* * Neither the name of Redis nor the names of its contributors may be used
|
||||
* to endorse or promote products derived from this software without
|
||||
* specific prior written permission.
|
||||
*
|
||||
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
|
||||
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
|
||||
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
|
||||
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
|
||||
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
|
||||
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
|
||||
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
|
||||
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
|
||||
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
|
||||
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
|
||||
* POSSIBILITY OF SUCH DAMAGE.
|
||||
*/
|
||||
|
||||
#include "redis.h"
|
||||
|
||||
/* Get a timeout value from an object and store it into 'timeout'.
|
||||
* The final timeout is always stored as milliseconds as a time where the
|
||||
* timeout will expire, however the parsing is performed according to
|
||||
* the 'unit' that can be seconds or milliseconds.
|
||||
*
|
||||
* Note that if the timeout is zero (usually from the point of view of
|
||||
* commands API this means no timeout) the value stored into 'timeout'
|
||||
* is zero. */
|
||||
int getTimeoutFromObjectOrReply(redisClient *c, robj *object, mstime_t *timeout, int unit) {
|
||||
long long tval;
|
||||
|
||||
if (getLongLongFromObjectOrReply(c,object,&tval,
|
||||
"timeout is not an integer or out of range") != REDIS_OK)
|
||||
return REDIS_ERR;
|
||||
|
||||
if (tval < 0) {
|
||||
addReplyError(c,"timeout is negative");
|
||||
return REDIS_ERR;
|
||||
}
|
||||
|
||||
if (tval > 0) {
|
||||
if (unit == UNIT_SECONDS) tval *= 1000;
|
||||
tval += mstime();
|
||||
}
|
||||
*timeout = tval;
|
||||
|
||||
return REDIS_OK;
|
||||
}
|
||||
|
||||
/* Block a client for the specific operation type. Once the REDIS_BLOCKED
|
||||
* flag is set client query buffer is not longer processed, but accumulated,
|
||||
* and will be processed when the client is unblocked. */
|
||||
void blockClient(redisClient *c, int btype) {
|
||||
c->flags |= REDIS_BLOCKED;
|
||||
c->btype = btype;
|
||||
server.bpop_blocked_clients++;
|
||||
}
|
||||
|
||||
/* This function is called in the beforeSleep() function of the event loop
|
||||
* in order to process the pending input buffer of clients that were
|
||||
* unblocked after a blocking operation. */
|
||||
void processUnblockedClients(void) {
|
||||
listNode *ln;
|
||||
redisClient *c;
|
||||
|
||||
while (listLength(server.unblocked_clients)) {
|
||||
ln = listFirst(server.unblocked_clients);
|
||||
redisAssert(ln != NULL);
|
||||
c = ln->value;
|
||||
listDelNode(server.unblocked_clients,ln);
|
||||
c->flags &= ~REDIS_UNBLOCKED;
|
||||
c->btype = REDIS_BLOCKED_NONE;
|
||||
|
||||
/* Process remaining data in the input buffer. */
|
||||
if (c->querybuf && sdslen(c->querybuf) > 0) {
|
||||
server.current_client = c;
|
||||
processInputBuffer(c);
|
||||
server.current_client = NULL;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/* Unblock a client calling the right function depending on the kind
|
||||
* of operation the client is blocking for. */
|
||||
void unblockClient(redisClient *c) {
|
||||
if (c->btype == REDIS_BLOCKED_LIST) {
|
||||
unblockClientWaitingData(c);
|
||||
} else {
|
||||
redisPanic("Unknown btype in unblockClient().");
|
||||
}
|
||||
/* Clear the flags, and put the client in the unblocked list so that
|
||||
* we'll process new commands in its query buffer ASAP. */
|
||||
c->flags &= ~REDIS_BLOCKED;
|
||||
c->flags |= REDIS_UNBLOCKED;
|
||||
c->btype = REDIS_BLOCKED_NONE;
|
||||
server.bpop_blocked_clients--;
|
||||
listAddNodeTail(server.unblocked_clients,c);
|
||||
}
|
||||
|
||||
/* This function gets called when a blocked client timed out in order to
|
||||
* send it a reply of some kind. */
|
||||
void replyToBlockedClientTimedOut(redisClient *c) {
|
||||
if (c->btype == REDIS_BLOCKED_LIST) {
|
||||
addReply(c,shared.nullmultibulk);
|
||||
} else {
|
||||
redisPanic("Unknown btype in replyToBlockedClientTimedOut().");
|
||||
}
|
||||
}
|
||||
|
@ -108,9 +108,12 @@ redisClient *createClient(int fd) {
|
||||
c->obuf_soft_limit_reached_time = 0;
|
||||
listSetFreeMethod(c->reply,decrRefCountVoid);
|
||||
listSetDupMethod(c->reply,dupClientReplyValue);
|
||||
c->bpop.keys = dictCreate(&setDictType,NULL);
|
||||
c->btype = REDIS_BLOCKED_NONE;
|
||||
c->bpop.timeout = 0;
|
||||
c->bpop.keys = dictCreate(&setDictType,NULL);
|
||||
c->bpop.target = NULL;
|
||||
c->bpop.numreplicas = 0;
|
||||
c->bpop.reploffset = 0;
|
||||
c->watched_keys = listCreate();
|
||||
c->pubsub_channels = dictCreate(&setDictType,NULL);
|
||||
c->pubsub_patterns = listCreate();
|
||||
@ -666,8 +669,7 @@ void freeClient(redisClient *c) {
|
||||
c->querybuf = NULL;
|
||||
|
||||
/* Deallocate structures used to block on blocking ops. */
|
||||
if (c->flags & REDIS_BLOCKED)
|
||||
unblockClientWaitingData(c);
|
||||
if (c->flags & REDIS_BLOCKED) unblockClient(c);
|
||||
dictRelease(c->bpop.keys);
|
||||
|
||||
/* UNWATCH all the keys */
|
||||
|
23
src/redis.c
23
src/redis.c
@ -871,7 +871,7 @@ long long getOperationsPerSecond(void) {
|
||||
|
||||
/* Check for timeouts. Returns non-zero if the client was terminated */
|
||||
int clientsCronHandleTimeout(redisClient *c) {
|
||||
time_t now = server.unixtime;
|
||||
mstime_t now = mstime();
|
||||
|
||||
if (server.maxidletime &&
|
||||
!(c->flags & REDIS_SLAVE) && /* no timeout for slaves */
|
||||
@ -886,8 +886,8 @@ int clientsCronHandleTimeout(redisClient *c) {
|
||||
return 1;
|
||||
} else if (c->flags & REDIS_BLOCKED) {
|
||||
if (c->bpop.timeout != 0 && c->bpop.timeout < now) {
|
||||
addReply(c,shared.nullmultibulk);
|
||||
unblockClientWaitingData(c);
|
||||
replyToBlockedClientTimedOut(c);
|
||||
unblockClient(c);
|
||||
}
|
||||
}
|
||||
return 0;
|
||||
@ -1194,8 +1194,6 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
|
||||
* for ready file descriptors. */
|
||||
void beforeSleep(struct aeEventLoop *eventLoop) {
|
||||
REDIS_NOTUSED(eventLoop);
|
||||
listNode *ln;
|
||||
redisClient *c;
|
||||
|
||||
/* Run a fast expire cycle (the called function will return
|
||||
* ASAP if a fast cycle is not needed). */
|
||||
@ -1203,20 +1201,7 @@ void beforeSleep(struct aeEventLoop *eventLoop) {
|
||||
activeExpireCycle(ACTIVE_EXPIRE_CYCLE_FAST);
|
||||
|
||||
/* Try to process pending commands for clients that were just unblocked. */
|
||||
while (listLength(server.unblocked_clients)) {
|
||||
ln = listFirst(server.unblocked_clients);
|
||||
redisAssert(ln != NULL);
|
||||
c = ln->value;
|
||||
listDelNode(server.unblocked_clients,ln);
|
||||
c->flags &= ~REDIS_UNBLOCKED;
|
||||
|
||||
/* Process remaining data in the input buffer. */
|
||||
if (c->querybuf && sdslen(c->querybuf) > 0) {
|
||||
server.current_client = c;
|
||||
processInputBuffer(c);
|
||||
server.current_client = NULL;
|
||||
}
|
||||
}
|
||||
processUnblockedClients();
|
||||
|
||||
/* Write the AOF buffer on disk */
|
||||
flushAppendOnlyFile(0);
|
||||
|
27
src/redis.h
27
src/redis.h
@ -232,6 +232,12 @@
|
||||
#define REDIS_FORCE_REPL (1<<15) /* Force replication of current cmd. */
|
||||
#define REDIS_PRE_PSYNC_SLAVE (1<<16) /* Slave don't understand PSYNC. */
|
||||
|
||||
/* Client block type (btype field in client structure)
|
||||
* if REDIS_BLOCKED flag is set. */
|
||||
#define REDIS_BLOCKED_NONE 0 /* Not blocked, no REDIS_BLOCKED flag set. */
|
||||
#define REDIS_BLOCKED_LIST 1 /* BLPOP & co. */
|
||||
#define REDIS_BLOCKED_WAIT 2 /* WAIT for synchronous replication. */
|
||||
|
||||
/* Client request types */
|
||||
#define REDIS_REQ_INLINE 1
|
||||
#define REDIS_REQ_MULTIBULK 2
|
||||
@ -419,13 +425,22 @@ typedef struct multiState {
|
||||
time_t minreplicas_timeout; /* MINREPLICAS timeout as unixtime. */
|
||||
} multiState;
|
||||
|
||||
/* This structure holds the blocking operation state for a client.
|
||||
* The fields used depend on client->btype. */
|
||||
typedef struct blockingState {
|
||||
/* Generic fields. */
|
||||
mstime_t timeout; /* Blocking operation timeout. If UNIX current time
|
||||
* is > timeout then the operation timed out. */
|
||||
|
||||
/* REDIS_BLOCK_LIST */
|
||||
dict *keys; /* The keys we are waiting to terminate a blocking
|
||||
* operation such as BLPOP. Otherwise NULL. */
|
||||
time_t timeout; /* Blocking operation timeout. If UNIX current time
|
||||
* is > timeout then the operation timed out. */
|
||||
robj *target; /* The key that should receive the element,
|
||||
* for BRPOPLPUSH. */
|
||||
|
||||
/* REDIS_BLOCK_WAIT */
|
||||
int numreplicas; /* Number of replicas we are waiting for ACK. */
|
||||
long long reploffset; /* Replication offset to reach. */
|
||||
} blockingState;
|
||||
|
||||
/* The following structure represents a node in the server.ready_keys list,
|
||||
@ -479,6 +494,7 @@ typedef struct redisClient {
|
||||
char replrunid[REDIS_RUN_ID_SIZE+1]; /* master run id if this is a master */
|
||||
int slave_listening_port; /* As configured with: SLAVECONF listening-port */
|
||||
multiState mstate; /* MULTI/EXEC state */
|
||||
int btype; /* Type of blocking op if REDIS_BLOCKED. */
|
||||
blockingState bpop; /* blocking state */
|
||||
list *watched_keys; /* Keys WATCHED for MULTI/EXEC CAS */
|
||||
dict *pubsub_channels; /* channels a client is interested in (SUBSCRIBE) */
|
||||
@ -1227,6 +1243,13 @@ void sentinelIsRunning(void);
|
||||
/* Scripting */
|
||||
void scriptingInit(void);
|
||||
|
||||
/* Blocked clients */
|
||||
void processUnblockedClients(void);
|
||||
void blockClient(redisClient *c, int btype);
|
||||
void unblockClient(redisClient *c);
|
||||
void replyToBlockedClientTimedOut(redisClient *c);
|
||||
int getTimeoutFromObjectOrReply(redisClient *c, robj *object, mstime_t *timeout, int unit);
|
||||
|
||||
/* Git SHA1 */
|
||||
char *redisGitSHA1(void);
|
||||
char *redisGitDirty(void);
|
||||
|
48
src/t_list.c
48
src/t_list.c
@ -778,7 +778,7 @@ void rpoplpushCommand(redisClient *c) {
|
||||
|
||||
/* Set a client in blocking mode for the specified key, with the specified
|
||||
* timeout */
|
||||
void blockForKeys(redisClient *c, robj **keys, int numkeys, time_t timeout, robj *target) {
|
||||
void blockForKeys(redisClient *c, robj **keys, int numkeys, mstime_t timeout, robj *target) {
|
||||
dictEntry *de;
|
||||
list *l;
|
||||
int j;
|
||||
@ -808,13 +808,11 @@ void blockForKeys(redisClient *c, robj **keys, int numkeys, time_t timeout, robj
|
||||
}
|
||||
listAddNodeTail(l,c);
|
||||
}
|
||||
|
||||
/* Mark the client as a blocked client */
|
||||
c->flags |= REDIS_BLOCKED;
|
||||
server.bpop_blocked_clients++;
|
||||
blockClient(c,REDIS_BLOCKED_LIST);
|
||||
}
|
||||
|
||||
/* Unblock a client that's waiting in a blocking operation such as BLPOP */
|
||||
/* Unblock a client that's waiting in a blocking operation such as BLPOP.
|
||||
* You should never call this function directly, but unblockClient() instead. */
|
||||
void unblockClientWaitingData(redisClient *c) {
|
||||
dictEntry *de;
|
||||
dictIterator *di;
|
||||
@ -842,10 +840,6 @@ void unblockClientWaitingData(redisClient *c) {
|
||||
decrRefCount(c->bpop.target);
|
||||
c->bpop.target = NULL;
|
||||
}
|
||||
c->flags &= ~REDIS_BLOCKED;
|
||||
c->flags |= REDIS_UNBLOCKED;
|
||||
server.bpop_blocked_clients--;
|
||||
listAddNodeTail(server.unblocked_clients,c);
|
||||
}
|
||||
|
||||
/* If the specified key has clients blocked waiting for list pushes, this
|
||||
@ -1000,10 +994,10 @@ void handleClientsBlockedOnLists(void) {
|
||||
|
||||
if (value) {
|
||||
/* Protect receiver->bpop.target, that will be
|
||||
* freed by the next unblockClientWaitingData()
|
||||
* freed by the next unblockClient()
|
||||
* call. */
|
||||
if (dstkey) incrRefCount(dstkey);
|
||||
unblockClientWaitingData(receiver);
|
||||
unblockClient(receiver);
|
||||
|
||||
if (serveClientBlockedOnList(receiver,
|
||||
rl->key,dstkey,rl->db,value,
|
||||
@ -1036,32 +1030,14 @@ void handleClientsBlockedOnLists(void) {
|
||||
}
|
||||
}
|
||||
|
||||
int getTimeoutFromObjectOrReply(redisClient *c, robj *object, time_t *timeout) {
|
||||
long tval;
|
||||
|
||||
if (getLongFromObjectOrReply(c,object,&tval,
|
||||
"timeout is not an integer or out of range") != REDIS_OK)
|
||||
return REDIS_ERR;
|
||||
|
||||
if (tval < 0) {
|
||||
addReplyError(c,"timeout is negative");
|
||||
return REDIS_ERR;
|
||||
}
|
||||
|
||||
if (tval > 0) tval += server.unixtime;
|
||||
*timeout = tval;
|
||||
|
||||
return REDIS_OK;
|
||||
}
|
||||
|
||||
/* Blocking RPOP/LPOP */
|
||||
void blockingPopGenericCommand(redisClient *c, int where) {
|
||||
robj *o;
|
||||
time_t timeout;
|
||||
mstime_t timeout;
|
||||
int j;
|
||||
|
||||
if (getTimeoutFromObjectOrReply(c,c->argv[c->argc-1],&timeout) != REDIS_OK)
|
||||
return;
|
||||
if (getTimeoutFromObjectOrReply(c,c->argv[c->argc-1],&timeout,UNIT_SECONDS)
|
||||
!= REDIS_OK) return;
|
||||
|
||||
for (j = 1; j < c->argc-1; j++) {
|
||||
o = lookupKeyWrite(c->db,c->argv[j]);
|
||||
@ -1120,10 +1096,10 @@ void brpopCommand(redisClient *c) {
|
||||
}
|
||||
|
||||
void brpoplpushCommand(redisClient *c) {
|
||||
time_t timeout;
|
||||
mstime_t timeout;
|
||||
|
||||
if (getTimeoutFromObjectOrReply(c,c->argv[3],&timeout) != REDIS_OK)
|
||||
return;
|
||||
if (getTimeoutFromObjectOrReply(c,c->argv[3],&timeout,UNIT_SECONDS)
|
||||
!= REDIS_OK) return;
|
||||
|
||||
robj *key = lookupKeyWrite(c->db, c->argv[1]);
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user