diff --git a/src/stream.h b/src/stream.h index fa6947482..917392076 100644 --- a/src/stream.h +++ b/src/stream.h @@ -89,7 +89,7 @@ struct client; stream *streamNew(void); void freeStream(stream *s); -size_t streamReplyWithRange(struct client *c, stream *s, streamID *start, streamID *end, size_t count, int rev, streamCG *group, streamConsumer *consumer, int noack); +size_t streamReplyWithRange(struct client *c, stream *s, streamID *start, streamID *end, size_t count, int rev, streamCG *group, streamConsumer *consumer, int flags); void streamIteratorStart(streamIterator *si, stream *s, streamID *start, streamID *end, int rev); int streamIteratorGetID(streamIterator *si, streamID *id, int64_t *numfields); void streamIteratorGetField(streamIterator *si, unsigned char **fieldptr, unsigned char **valueptr, int64_t *fieldlen, int64_t *valuelen); diff --git a/src/t_stream.c b/src/t_stream.c index 1d1dd8943..0277f72a9 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -44,6 +44,7 @@ void streamFreeCG(streamCG *cg); streamCG *streamLookupCG(stream *s, sds groupname); streamConsumer *streamLookupConsumer(streamCG *cg, sds name); streamNACK *streamCreateNACK(streamConsumer *consumer); +size_t streamReplyWithRangeFromConsumerPEL(client *c, stream *s, streamID *start, streamID *end, size_t count, streamCG *group, streamConsumer *consumer); /* ----------------------------------------------------------------------- * Low level stream encoding: a radix tree of listpacks. @@ -666,16 +667,12 @@ void streamIteratorStop(streamIterator *si) { raxStop(&si->ri); } -/* This is an helper function for streamReplyWithRange() when called with - * group and consumer arguments, but with a range that is referring to already - * delivered messages. In this case we just emit messages that are already - * in the history of the conusmer, fetching the IDs from its PEL. - * - * Note that this function does not have a 'rev' argument because it's not - * possible to iterate in reverse using a group. Basically this function - * is only called as a result of the XREADGROUP command. */ -size_t streamReplyWithRangeFromConsumerPEL(client *c, stream *s, streamID *start, streamID *end, size_t count, streamCG *group, streamConsumer *consumer) { - /* TODO: update the last time delivery and delivery count. */ +/* Emit a reply in the client output buffer by formatting a Stream ID + * in the standard - format, using the simple string protocol + * of REPL. */ +void addReplyStreamID(client *c, streamID *id) { + sds replyid = sdscatfmt(sdsempty(),"+%U-%U\r\n",id->ms,id->seq); + addReplySds(c,replyid); } /* Send the specified range to the client 'c'. The range the client will @@ -690,11 +687,30 @@ size_t streamReplyWithRangeFromConsumerPEL(client *c, stream *s, streamID *start * 2. If the requested IDs are already assigned to some other consumer, the * function will not return it to the client. * 3. An entry in the pending list will be created for every entry delivered - * for the first time to this consumer. This is only performed if - * 'noack' is non-zero. + * for the first time to this consumer. + * + * The behavior may be modified passing non-zero flags: + * + * STREAM_RWR_NOACK: Do not craete PEL entries, that is, the point "3" above + * is not performed. + * STREAM_RWR_RAWENTRIES: Do not emit array boundaries, but just the entries, + * and return the number of entries emitted as usually. + * This is used when the function is just used in order + * to emit data and there is some higher level logic. + * + * Note that this function is recursive in certian cases. When it's called + * with a non NULL group and consumer argument, it may call + * streamReplyWithRangeFromConsumerPEL() in order to get entries from the + * consumer pending entires list. However such a function will then call + * streamReplyWithRange() in order to emit single entries (found in the + * PEL by ID) to the client. This is the use case for the STREAM_RWR_RAWENTRIES + * flag. */ -size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end, size_t count, int rev, streamCG *group, streamConsumer *consumer, int noack) { - void *arraylen_ptr; +#define STREAM_RWR_NOACK (1<<0) /* Do not create entries in the PEL. */ +#define STREAM_RWR_RAWENTRIES (1<<1) /* Do not emit protocol for array + boundaries, just the entries. */ +size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end, size_t count, int rev, streamCG *group, streamConsumer *consumer, int flags) { + void *arraylen_ptr = NULL; size_t arraylen = 0; streamIterator si; int64_t numfields; @@ -713,7 +729,8 @@ size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end group,consumer); } - arraylen_ptr = addDeferredMultiBulkLength(c); + if (!(flags & STREAM_RWR_RAWENTRIES)) + arraylen_ptr = addDeferredMultiBulkLength(c); streamIteratorStart(&si,s,start,end,rev); while(streamIteratorGetID(&si,&id,&numfields)) { /* Update the group last_id if needed. */ @@ -722,9 +739,8 @@ size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end /* Emit a two elements array for each item. The first is * the ID, the second is an array of field-value pairs. */ - sds replyid = sdscatfmt(sdsempty(),"+%U-%U\r\n",id.ms,id.seq); addReplyMultiBulkLen(c,2); - addReplySds(c,replyid); + addReplyStreamID(c,&id); addReplyMultiBulkLen(c,numfields*2); /* Emit the field-value pairs. */ @@ -744,7 +760,7 @@ size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end * associated with some other consumer, because if we reached this * loop the IDs the user is requesting are greater than any message * delivered for this group. */ - if (group && !noack) { + if (group && !(flags & STREAM_RWR_NOACK)) { unsigned char buf[sizeof(streamID)]; streamEncodeID(buf,&id); streamNACK *nack = streamCreateNACK(consumer); @@ -755,10 +771,54 @@ size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end } } streamIteratorStop(&si); - setDeferredMultiBulkLength(c,arraylen_ptr,arraylen); + if (arraylen_ptr) setDeferredMultiBulkLength(c,arraylen_ptr,arraylen); return arraylen; } +/* This is an helper function for streamReplyWithRange() when called with + * group and consumer arguments, but with a range that is referring to already + * delivered messages. In this case we just emit messages that are already + * in the history of the conusmer, fetching the IDs from its PEL. + * + * Note that this function does not have a 'rev' argument because it's not + * possible to iterate in reverse using a group. Basically this function + * is only called as a result of the XREADGROUP command. + * + * This function is more expensive because it needs to inspect the PEL and then + * seek into the radix tree of the messages in order to emit the full message + * to the client. However clients only reach this code path when they are + * fetching the history of already retrieved messages, which is rare. */ +size_t streamReplyWithRangeFromConsumerPEL(client *c, stream *s, streamID *start, streamID *end, size_t count, streamCG *group, streamConsumer *consumer) { + raxIterator ri; + unsigned char startkey[sizeof(streamID)]; + unsigned char endkey[sizeof(streamID)]; + streamEncodeID(startkey,start); + if (end) streamEncodeID(endkey,start); + + size_t arraylen = 0; + void *arraylen_ptr = addDeferredMultiBulkLength(c); + raxStart(&ri,consumer->pel); + raxSeek(&ri,">=",startkey,sizeof(startkey)); + while(raxNext(&ri)) { + if (end && memcmp(end,ri.key,ri.key_len) > 0) break; + if (streamReplyWithRange(c,s,start,end,1,0,NULL,NULL, + STREAM_RWR_RAWENTRIES) == 0) + { + /* Note that we may have a not acknowledged entry in the PEL + * about a message that's no longer here because was removed + * by the user by other means. In that case we signal it emitting + * the ID but then a NULL entry for the fields. */ + addReplyMultiBulkLen(c,2); + streamID id; + streamDecodeID(ri.key,&id); + addReplyStreamID(c,&id); + addReply(c,shared.nullmultibulk); + } + } + raxStop(&ri); + setDeferredMultiBulkLength(c,arraylen_ptr,arraylen); +} + /* ----------------------------------------------------------------------- * Stream commands implementation * ----------------------------------------------------------------------- */ @@ -904,8 +964,7 @@ void xaddCommand(client *c) { "target stream top item"); return; } - sds reply = sdscatfmt(sdsempty(),"+%U-%U\r\n",id.ms,id.seq); - addReplySds(c,reply); + addReplyStreamID(c,&id); signalModifiedKey(c->db,c->argv[1]); notifyKeyspaceEvent(NOTIFY_STREAM,"xadd",c->argv[1],c->db->id);