CG: implement XCLAIM FORCE option.

This commit is contained in:
antirez 2018-03-02 13:44:40 +01:00
parent b26f03bd69
commit 596264aee9

View File

@ -459,7 +459,7 @@ int64_t streamTrimByLength(stream *s, size_t maxlen, int approx) {
* streamIterator myiterator; * streamIterator myiterator;
* streamIteratorStart(&myiterator,...); * streamIteratorStart(&myiterator,...);
* int64_t numfields; * int64_t numfields;
* while(streamIteratorGetID(&myitereator,&ID,&numfields)) { * while(streamIteratorGetID(&myiterator,&ID,&numfields)) {
* while(numfields--) { * while(numfields--) {
* unsigned char *key, *value; * unsigned char *key, *value;
* size_t key_len, value_len; * size_t key_len, value_len;
@ -1750,6 +1750,28 @@ void xclaimCommand(client *c) {
/* Lookup the ID in the group PEL. */ /* Lookup the ID in the group PEL. */
streamNACK *nack = raxFind(group->pel,buf,sizeof(buf)); streamNACK *nack = raxFind(group->pel,buf,sizeof(buf));
/* If FORCE is passed, let's check if at least the entry
* exists in the Stream. In such case, we'll crate a new
* entry in the PEL from scratch, so that XCLAIM can also
* be used to create entries in the PEL. Useful for AOF
* and replication of consumer groups. */
if (force && nack == raxNotFound) {
streamIterator myiterator;
streamIteratorStart(&myiterator,o->ptr,&id,&id,0);
int64_t numfields;
int found = 0;
streamID item_id;
if (streamIteratorGetID(&myiterator,&item_id,&numfields)) found = 1;
streamIteratorStop(&myiterator);
/* Item must exist for us to create a NACK for it. */
if (!found) continue;
/* Create the NACK. */
nack = streamCreateNACK(NULL);
}
if (nack != raxNotFound) { if (nack != raxNotFound) {
/* We need to check if the minimum idle time requested /* We need to check if the minimum idle time requested
* by the caller is satisfied by this entry. */ * by the caller is satisfied by this entry. */
@ -1757,7 +1779,10 @@ void xclaimCommand(client *c) {
mstime_t this_idle = now - nack->delivery_time; mstime_t this_idle = now - nack->delivery_time;
if (this_idle < minidle) continue; if (this_idle < minidle) continue;
} }
/* Remove the entry from the old consumer. */ /* Remove the entry from the old consumer.
* Note that nack->consumer is NULL if we created the
* NACK above because of the FORCE option. */
if (nack->consumer)
raxRemove(nack->consumer->pel,buf,sizeof(buf),NULL); raxRemove(nack->consumer->pel,buf,sizeof(buf),NULL);
/* Update the consumer and idle time. */ /* Update the consumer and idle time. */
nack->consumer = consumer; nack->consumer = consumer;