mirror of
https://codeberg.org/redict/redict.git
synced 2025-01-22 16:18:28 -05:00
Streams: more internal preparation for blocking XREAD.
This commit is contained in:
parent
4a377cecd8
commit
f80dfbf464
@ -225,8 +225,7 @@ void handleClientsBlockedOnKeys(void) {
|
|||||||
* we can safely call signalKeyAsReady() against this key. */
|
* we can safely call signalKeyAsReady() against this key. */
|
||||||
dictDelete(rl->db->ready_keys,rl->key);
|
dictDelete(rl->db->ready_keys,rl->key);
|
||||||
|
|
||||||
/* If the key exists and it's a list, serve blocked clients
|
/* Serve clients blocked on list key. */
|
||||||
* with data. */
|
|
||||||
robj *o = lookupKeyWrite(rl->db,rl->key);
|
robj *o = lookupKeyWrite(rl->db,rl->key);
|
||||||
if (o != NULL && o->type == OBJ_LIST) {
|
if (o != NULL && o->type == OBJ_LIST) {
|
||||||
dictEntry *de;
|
dictEntry *de;
|
||||||
@ -241,6 +240,8 @@ void handleClientsBlockedOnKeys(void) {
|
|||||||
while(numclients--) {
|
while(numclients--) {
|
||||||
listNode *clientnode = listFirst(clients);
|
listNode *clientnode = listFirst(clients);
|
||||||
client *receiver = clientnode->value;
|
client *receiver = clientnode->value;
|
||||||
|
if (receiver->btype != BLOCKED_LIST) continue;
|
||||||
|
|
||||||
robj *dstkey = receiver->bpop.target;
|
robj *dstkey = receiver->bpop.target;
|
||||||
int where = (receiver->lastcmd &&
|
int where = (receiver->lastcmd &&
|
||||||
receiver->lastcmd->proc == blpopCommand) ?
|
receiver->lastcmd->proc == blpopCommand) ?
|
||||||
@ -287,7 +288,8 @@ void handleClientsBlockedOnKeys(void) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/* This is how the current blocking POP works, we use BLPOP as example:
|
/* This is how the current blocking lists/streams work, we use BLPOP as
|
||||||
|
* example, but the concept is the same for other list ops and XREAD.
|
||||||
* - If the user calls BLPOP and the key exists and contains a non empty list
|
* - If the user calls BLPOP and the key exists and contains a non empty list
|
||||||
* then LPOP is called instead. So BLPOP is semantically the same as LPOP
|
* then LPOP is called instead. So BLPOP is semantically the same as LPOP
|
||||||
* if blocking is not required.
|
* if blocking is not required.
|
||||||
@ -304,9 +306,15 @@ void handleClientsBlockedOnKeys(void) {
|
|||||||
* to the number of elements we have in the ready list.
|
* to the number of elements we have in the ready list.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
/* Set a client in blocking mode for the specified key, with the specified
|
/* Set a client in blocking mode for the specified key (list or stream), with
|
||||||
* timeout */
|
* the specified timeout. The 'type' argument is BLOCKED_LIST or BLOCKED_STREAM
|
||||||
void blockForKeys(client *c, robj **keys, int numkeys, mstime_t timeout, robj *target) {
|
* depending on the kind of operation we are waiting for an empty key in
|
||||||
|
* order to awake the client. The client is blocked for all the 'numkeys'
|
||||||
|
* keys as in the 'keys' argument. When we block for stream keys, we also
|
||||||
|
* provide an array of streamID structures: clients will be unblocked only
|
||||||
|
* when items with an ID greater or equal to the specified one is appended
|
||||||
|
* to the stream. */
|
||||||
|
void blockForKeys(client *c, int btype, robj **keys, int numkeys, mstime_t timeout, robj *target, streamID *ids) {
|
||||||
dictEntry *de;
|
dictEntry *de;
|
||||||
list *l;
|
list *l;
|
||||||
int j;
|
int j;
|
||||||
@ -317,8 +325,16 @@ void blockForKeys(client *c, robj **keys, int numkeys, mstime_t timeout, robj *t
|
|||||||
if (target != NULL) incrRefCount(target);
|
if (target != NULL) incrRefCount(target);
|
||||||
|
|
||||||
for (j = 0; j < numkeys; j++) {
|
for (j = 0; j < numkeys; j++) {
|
||||||
/* If the key already exists in the dict ignore it. */
|
/* The value associated with the key name in the bpop.keys dictionary
|
||||||
if (dictAdd(c->bpop.keys,keys[j],NULL) != DICT_OK) continue;
|
* is NULL for lists, or the stream ID for streams. */
|
||||||
|
void *key_data = NULL;
|
||||||
|
if (btype == BLOCKED_STREAM) {
|
||||||
|
key_data = zmalloc(sizeof(streamID));
|
||||||
|
memcpy(key_data,ids+j,sizeof(streamID));
|
||||||
|
}
|
||||||
|
|
||||||
|
/* If the key already exists in the dictionary ignore it. */
|
||||||
|
if (dictAdd(c->bpop.keys,keys[j],key_data) != DICT_OK) continue;
|
||||||
incrRefCount(keys[j]);
|
incrRefCount(keys[j]);
|
||||||
|
|
||||||
/* And in the other "side", to map keys -> clients */
|
/* And in the other "side", to map keys -> clients */
|
||||||
@ -336,7 +352,7 @@ void blockForKeys(client *c, robj **keys, int numkeys, mstime_t timeout, robj *t
|
|||||||
}
|
}
|
||||||
listAddNodeTail(l,c);
|
listAddNodeTail(l,c);
|
||||||
}
|
}
|
||||||
blockClient(c,BLOCKED_LIST);
|
blockClient(c,btype);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Unblock a client that's waiting in a blocking operation such as BLPOP.
|
/* Unblock a client that's waiting in a blocking operation such as BLPOP.
|
||||||
|
@ -1800,7 +1800,7 @@ int getTimeoutFromObjectOrReply(client *c, robj *object, mstime_t *timeout, int
|
|||||||
void disconnectAllBlockedClients(void);
|
void disconnectAllBlockedClients(void);
|
||||||
void handleClientsBlockedOnKeys(void);
|
void handleClientsBlockedOnKeys(void);
|
||||||
void signalKeyAsReady(redisDb *db, robj *key);
|
void signalKeyAsReady(redisDb *db, robj *key);
|
||||||
void blockForKeys(client *c, robj **keys, int numkeys, mstime_t timeout, robj *target);
|
void blockForKeys(client *c, int btype, robj **keys, int numkeys, mstime_t timeout, robj *target, streamID *ids);
|
||||||
|
|
||||||
/* expire.c -- Handling of expired keys */
|
/* expire.c -- Handling of expired keys */
|
||||||
void activeExpireCycle(int type);
|
void activeExpireCycle(int type);
|
||||||
|
@ -726,7 +726,7 @@ void blockingPopGenericCommand(client *c, int where) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/* If the list is empty or the key does not exists we must block */
|
/* If the list is empty or the key does not exists we must block */
|
||||||
blockForKeys(c, c->argv + 1, c->argc - 2, timeout, NULL);
|
blockForKeys(c,BLOCKED_LIST,c->argv + 1,c->argc - 2,timeout,NULL,NULL);
|
||||||
}
|
}
|
||||||
|
|
||||||
void blpopCommand(client *c) {
|
void blpopCommand(client *c) {
|
||||||
@ -752,7 +752,7 @@ void brpoplpushCommand(client *c) {
|
|||||||
addReply(c, shared.nullbulk);
|
addReply(c, shared.nullbulk);
|
||||||
} else {
|
} else {
|
||||||
/* The list is empty and the client blocks. */
|
/* The list is empty and the client blocks. */
|
||||||
blockForKeys(c, c->argv + 1, 1, timeout, c->argv[2]);
|
blockForKeys(c,BLOCKED_LIST,c->argv + 1,1,timeout,c->argv[2],NULL);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
if (key->type != OBJ_LIST) {
|
if (key->type != OBJ_LIST) {
|
||||||
|
Loading…
Reference in New Issue
Block a user