From 596264aee935bf916d53d89df9ca8fef6b44491f Mon Sep 17 00:00:00 2001 From: antirez Date: Fri, 2 Mar 2018 13:44:40 +0100 Subject: [PATCH] CG: implement XCLAIM FORCE option. --- src/t_stream.c | 31 ++++++++++++++++++++++++++++--- 1 file changed, 28 insertions(+), 3 deletions(-) diff --git a/src/t_stream.c b/src/t_stream.c index 46228e74e..726883d21 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -459,7 +459,7 @@ int64_t streamTrimByLength(stream *s, size_t maxlen, int approx) { * streamIterator myiterator; * streamIteratorStart(&myiterator,...); * int64_t numfields; - * while(streamIteratorGetID(&myitereator,&ID,&numfields)) { + * while(streamIteratorGetID(&myiterator,&ID,&numfields)) { * while(numfields--) { * unsigned char *key, *value; * size_t key_len, value_len; @@ -1750,6 +1750,28 @@ void xclaimCommand(client *c) { /* Lookup the ID in the group PEL. */ streamNACK *nack = raxFind(group->pel,buf,sizeof(buf)); + + /* If FORCE is passed, let's check if at least the entry + * exists in the Stream. In such case, we'll crate a new + * entry in the PEL from scratch, so that XCLAIM can also + * be used to create entries in the PEL. Useful for AOF + * and replication of consumer groups. */ + if (force && nack == raxNotFound) { + streamIterator myiterator; + streamIteratorStart(&myiterator,o->ptr,&id,&id,0); + int64_t numfields; + int found = 0; + streamID item_id; + if (streamIteratorGetID(&myiterator,&item_id,&numfields)) found = 1; + streamIteratorStop(&myiterator); + + /* Item must exist for us to create a NACK for it. */ + if (!found) continue; + + /* Create the NACK. */ + nack = streamCreateNACK(NULL); + } + if (nack != raxNotFound) { /* We need to check if the minimum idle time requested * by the caller is satisfied by this entry. */ @@ -1757,8 +1779,11 @@ void xclaimCommand(client *c) { mstime_t this_idle = now - nack->delivery_time; if (this_idle < minidle) continue; } - /* Remove the entry from the old consumer. */ - raxRemove(nack->consumer->pel,buf,sizeof(buf),NULL); + /* Remove the entry from the old consumer. + * Note that nack->consumer is NULL if we created the + * NACK above because of the FORCE option. */ + if (nack->consumer) + raxRemove(nack->consumer->pel,buf,sizeof(buf),NULL); /* Update the consumer and idle time. */ nack->consumer = consumer; nack->delivery_time = deliverytime;