diff --git a/src/blocked.c b/src/blocked.c index fccce35d6..84d74f24b 100644 --- a/src/blocked.c +++ b/src/blocked.c @@ -240,7 +240,14 @@ void handleClientsBlockedOnKeys(void) { while(numclients--) { listNode *clientnode = listFirst(clients); client *receiver = clientnode->value; - if (receiver->btype != BLOCKED_LIST) continue; + + if (receiver->btype != BLOCKED_LIST) { + /* Put on the tail, so that at the next call + * we'll not run into it again. */ + listDelNode(clients,clientnode); + listAddNodeTail(clients,receiver); + continue; + } robj *dstkey = receiver->bpop.target; int where = (receiver->lastcmd && @@ -279,6 +286,47 @@ void handleClientsBlockedOnKeys(void) { * when an element was pushed on the list. */ } + /* Serve clients blocked on stream key. */ + else if (o != NULL && o->type == OBJ_STREAM) { + dictEntry *de = dictFind(rl->db->blocking_keys,rl->key); + stream *s = o->ptr; + + /* We need to provide the new data arrived on the stream + * to all the clients that are waiting for an offset smaller + * than the current top item. */ + if (de) { + list *clients = dictGetVal(de); + listNode *ln; + listIter li; + listRewind(clients,&li); + + while((ln = listNext(&li))) { + client *receiver = listNodeValue(ln); + if (receiver->btype != BLOCKED_STREAM) continue; + streamID *gt = dictFetchValue(receiver->bpop.keys, + rl->key); + if (s->last_id.ms > gt->ms || + (s->last_id.ms == gt->ms && + s->last_id.seq > gt->seq)) + { + unblockClient(receiver); + streamID start = *gt; + start.seq++; /* Can't overflow, it's an uint64_t */ + + /* Emit the two elements sub-array consisting of + * the name of the stream and the data we + * extracted from it. Wrapped in a single-item + * array, since we have just one key. */ + addReplyMultiBulkLen(receiver,1); + addReplyMultiBulkLen(receiver,2); + addReplyBulk(receiver,rl->key); + streamReplyWithRange(receiver,s,&start,NULL, + receiver->bpop.xread_count); + } + } + } + } + /* Free this item. */ decrRefCount(rl->key); zfree(rl); diff --git a/src/server.h b/src/server.h index 4b84486eb..8fa7380e4 100644 --- a/src/server.h +++ b/src/server.h @@ -1425,6 +1425,7 @@ void popGenericCommand(client *c, int where); /* Stream data type. */ stream *streamNew(void); void freeStream(stream *s); +size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end, size_t count); /* MULTI/EXEC/WATCH... */ void unwatchAllKeys(client *c);