diff --git a/src/blocked.c b/src/blocked.c index acd3b9485..74dab0c19 100644 --- a/src/blocked.c +++ b/src/blocked.c @@ -225,8 +225,7 @@ void handleClientsBlockedOnKeys(void) { * we can safely call signalKeyAsReady() against this key. */ dictDelete(rl->db->ready_keys,rl->key); - /* If the key exists and it's a list, serve blocked clients - * with data. */ + /* Serve clients blocked on list key. */ robj *o = lookupKeyWrite(rl->db,rl->key); if (o != NULL && o->type == OBJ_LIST) { dictEntry *de; @@ -241,6 +240,8 @@ void handleClientsBlockedOnKeys(void) { while(numclients--) { listNode *clientnode = listFirst(clients); client *receiver = clientnode->value; + if (receiver->btype != BLOCKED_LIST) continue; + robj *dstkey = receiver->bpop.target; int where = (receiver->lastcmd && receiver->lastcmd->proc == blpopCommand) ? @@ -287,7 +288,8 @@ void handleClientsBlockedOnKeys(void) { } } -/* This is how the current blocking POP works, we use BLPOP as example: +/* This is how the current blocking lists/streams work, we use BLPOP as + * example, but the concept is the same for other list ops and XREAD. * - If the user calls BLPOP and the key exists and contains a non empty list * then LPOP is called instead. So BLPOP is semantically the same as LPOP * if blocking is not required. @@ -304,9 +306,15 @@ void handleClientsBlockedOnKeys(void) { * to the number of elements we have in the ready list. */ -/* Set a client in blocking mode for the specified key, with the specified - * timeout */ -void blockForKeys(client *c, robj **keys, int numkeys, mstime_t timeout, robj *target) { +/* Set a client in blocking mode for the specified key (list or stream), with + * the specified timeout. The 'type' argument is BLOCKED_LIST or BLOCKED_STREAM + * depending on the kind of operation we are waiting for an empty key in + * order to awake the client. The client is blocked for all the 'numkeys' + * keys as in the 'keys' argument. When we block for stream keys, we also + * provide an array of streamID structures: clients will be unblocked only + * when items with an ID greater or equal to the specified one is appended + * to the stream. */ +void blockForKeys(client *c, int btype, robj **keys, int numkeys, mstime_t timeout, robj *target, streamID *ids) { dictEntry *de; list *l; int j; @@ -317,8 +325,16 @@ void blockForKeys(client *c, robj **keys, int numkeys, mstime_t timeout, robj *t if (target != NULL) incrRefCount(target); for (j = 0; j < numkeys; j++) { - /* If the key already exists in the dict ignore it. */ - if (dictAdd(c->bpop.keys,keys[j],NULL) != DICT_OK) continue; + /* The value associated with the key name in the bpop.keys dictionary + * is NULL for lists, or the stream ID for streams. */ + void *key_data = NULL; + if (btype == BLOCKED_STREAM) { + key_data = zmalloc(sizeof(streamID)); + memcpy(key_data,ids+j,sizeof(streamID)); + } + + /* If the key already exists in the dictionary ignore it. */ + if (dictAdd(c->bpop.keys,keys[j],key_data) != DICT_OK) continue; incrRefCount(keys[j]); /* And in the other "side", to map keys -> clients */ @@ -336,7 +352,7 @@ void blockForKeys(client *c, robj **keys, int numkeys, mstime_t timeout, robj *t } listAddNodeTail(l,c); } - blockClient(c,BLOCKED_LIST); + blockClient(c,btype); } /* Unblock a client that's waiting in a blocking operation such as BLPOP. diff --git a/src/server.h b/src/server.h index 8e50d030e..2c69a94cd 100644 --- a/src/server.h +++ b/src/server.h @@ -1800,7 +1800,7 @@ int getTimeoutFromObjectOrReply(client *c, robj *object, mstime_t *timeout, int void disconnectAllBlockedClients(void); void handleClientsBlockedOnKeys(void); void signalKeyAsReady(redisDb *db, robj *key); -void blockForKeys(client *c, robj **keys, int numkeys, mstime_t timeout, robj *target); +void blockForKeys(client *c, int btype, robj **keys, int numkeys, mstime_t timeout, robj *target, streamID *ids); /* expire.c -- Handling of expired keys */ void activeExpireCycle(int type); diff --git a/src/t_list.c b/src/t_list.c index c7eacb0ee..c7e6aac00 100644 --- a/src/t_list.c +++ b/src/t_list.c @@ -726,7 +726,7 @@ void blockingPopGenericCommand(client *c, int where) { } /* If the list is empty or the key does not exists we must block */ - blockForKeys(c, c->argv + 1, c->argc - 2, timeout, NULL); + blockForKeys(c,BLOCKED_LIST,c->argv + 1,c->argc - 2,timeout,NULL,NULL); } void blpopCommand(client *c) { @@ -752,7 +752,7 @@ void brpoplpushCommand(client *c) { addReply(c, shared.nullbulk); } else { /* The list is empty and the client blocks. */ - blockForKeys(c, c->argv + 1, 1, timeout, c->argv[2]); + blockForKeys(c,BLOCKED_LIST,c->argv + 1,1,timeout,c->argv[2],NULL); } } else { if (key->type != OBJ_LIST) {