mirror of
https://codeberg.org/redict/redict.git
synced 2025-01-23 08:38:27 -05:00
CG: first draft of streamReplyWithRangeFromConsumerPEL().
This commit is contained in:
parent
bbec4569a5
commit
aa808394f6
@ -89,7 +89,7 @@ struct client;
|
|||||||
|
|
||||||
stream *streamNew(void);
|
stream *streamNew(void);
|
||||||
void freeStream(stream *s);
|
void freeStream(stream *s);
|
||||||
size_t streamReplyWithRange(struct client *c, stream *s, streamID *start, streamID *end, size_t count, int rev, streamCG *group, streamConsumer *consumer, int noack);
|
size_t streamReplyWithRange(struct client *c, stream *s, streamID *start, streamID *end, size_t count, int rev, streamCG *group, streamConsumer *consumer, int flags);
|
||||||
void streamIteratorStart(streamIterator *si, stream *s, streamID *start, streamID *end, int rev);
|
void streamIteratorStart(streamIterator *si, stream *s, streamID *start, streamID *end, int rev);
|
||||||
int streamIteratorGetID(streamIterator *si, streamID *id, int64_t *numfields);
|
int streamIteratorGetID(streamIterator *si, streamID *id, int64_t *numfields);
|
||||||
void streamIteratorGetField(streamIterator *si, unsigned char **fieldptr, unsigned char **valueptr, int64_t *fieldlen, int64_t *valuelen);
|
void streamIteratorGetField(streamIterator *si, unsigned char **fieldptr, unsigned char **valueptr, int64_t *fieldlen, int64_t *valuelen);
|
||||||
|
101
src/t_stream.c
101
src/t_stream.c
@ -44,6 +44,7 @@ void streamFreeCG(streamCG *cg);
|
|||||||
streamCG *streamLookupCG(stream *s, sds groupname);
|
streamCG *streamLookupCG(stream *s, sds groupname);
|
||||||
streamConsumer *streamLookupConsumer(streamCG *cg, sds name);
|
streamConsumer *streamLookupConsumer(streamCG *cg, sds name);
|
||||||
streamNACK *streamCreateNACK(streamConsumer *consumer);
|
streamNACK *streamCreateNACK(streamConsumer *consumer);
|
||||||
|
size_t streamReplyWithRangeFromConsumerPEL(client *c, stream *s, streamID *start, streamID *end, size_t count, streamCG *group, streamConsumer *consumer);
|
||||||
|
|
||||||
/* -----------------------------------------------------------------------
|
/* -----------------------------------------------------------------------
|
||||||
* Low level stream encoding: a radix tree of listpacks.
|
* Low level stream encoding: a radix tree of listpacks.
|
||||||
@ -666,16 +667,12 @@ void streamIteratorStop(streamIterator *si) {
|
|||||||
raxStop(&si->ri);
|
raxStop(&si->ri);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* This is an helper function for streamReplyWithRange() when called with
|
/* Emit a reply in the client output buffer by formatting a Stream ID
|
||||||
* group and consumer arguments, but with a range that is referring to already
|
* in the standard <ms>-<seq> format, using the simple string protocol
|
||||||
* delivered messages. In this case we just emit messages that are already
|
* of REPL. */
|
||||||
* in the history of the conusmer, fetching the IDs from its PEL.
|
void addReplyStreamID(client *c, streamID *id) {
|
||||||
*
|
sds replyid = sdscatfmt(sdsempty(),"+%U-%U\r\n",id->ms,id->seq);
|
||||||
* Note that this function does not have a 'rev' argument because it's not
|
addReplySds(c,replyid);
|
||||||
* possible to iterate in reverse using a group. Basically this function
|
|
||||||
* is only called as a result of the XREADGROUP command. */
|
|
||||||
size_t streamReplyWithRangeFromConsumerPEL(client *c, stream *s, streamID *start, streamID *end, size_t count, streamCG *group, streamConsumer *consumer) {
|
|
||||||
/* TODO: update the last time delivery and delivery count. */
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Send the specified range to the client 'c'. The range the client will
|
/* Send the specified range to the client 'c'. The range the client will
|
||||||
@ -690,11 +687,30 @@ size_t streamReplyWithRangeFromConsumerPEL(client *c, stream *s, streamID *start
|
|||||||
* 2. If the requested IDs are already assigned to some other consumer, the
|
* 2. If the requested IDs are already assigned to some other consumer, the
|
||||||
* function will not return it to the client.
|
* function will not return it to the client.
|
||||||
* 3. An entry in the pending list will be created for every entry delivered
|
* 3. An entry in the pending list will be created for every entry delivered
|
||||||
* for the first time to this consumer. This is only performed if
|
* for the first time to this consumer.
|
||||||
* 'noack' is non-zero.
|
*
|
||||||
|
* The behavior may be modified passing non-zero flags:
|
||||||
|
*
|
||||||
|
* STREAM_RWR_NOACK: Do not craete PEL entries, that is, the point "3" above
|
||||||
|
* is not performed.
|
||||||
|
* STREAM_RWR_RAWENTRIES: Do not emit array boundaries, but just the entries,
|
||||||
|
* and return the number of entries emitted as usually.
|
||||||
|
* This is used when the function is just used in order
|
||||||
|
* to emit data and there is some higher level logic.
|
||||||
|
*
|
||||||
|
* Note that this function is recursive in certian cases. When it's called
|
||||||
|
* with a non NULL group and consumer argument, it may call
|
||||||
|
* streamReplyWithRangeFromConsumerPEL() in order to get entries from the
|
||||||
|
* consumer pending entires list. However such a function will then call
|
||||||
|
* streamReplyWithRange() in order to emit single entries (found in the
|
||||||
|
* PEL by ID) to the client. This is the use case for the STREAM_RWR_RAWENTRIES
|
||||||
|
* flag.
|
||||||
*/
|
*/
|
||||||
size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end, size_t count, int rev, streamCG *group, streamConsumer *consumer, int noack) {
|
#define STREAM_RWR_NOACK (1<<0) /* Do not create entries in the PEL. */
|
||||||
void *arraylen_ptr;
|
#define STREAM_RWR_RAWENTRIES (1<<1) /* Do not emit protocol for array
|
||||||
|
boundaries, just the entries. */
|
||||||
|
size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end, size_t count, int rev, streamCG *group, streamConsumer *consumer, int flags) {
|
||||||
|
void *arraylen_ptr = NULL;
|
||||||
size_t arraylen = 0;
|
size_t arraylen = 0;
|
||||||
streamIterator si;
|
streamIterator si;
|
||||||
int64_t numfields;
|
int64_t numfields;
|
||||||
@ -713,7 +729,8 @@ size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end
|
|||||||
group,consumer);
|
group,consumer);
|
||||||
}
|
}
|
||||||
|
|
||||||
arraylen_ptr = addDeferredMultiBulkLength(c);
|
if (!(flags & STREAM_RWR_RAWENTRIES))
|
||||||
|
arraylen_ptr = addDeferredMultiBulkLength(c);
|
||||||
streamIteratorStart(&si,s,start,end,rev);
|
streamIteratorStart(&si,s,start,end,rev);
|
||||||
while(streamIteratorGetID(&si,&id,&numfields)) {
|
while(streamIteratorGetID(&si,&id,&numfields)) {
|
||||||
/* Update the group last_id if needed. */
|
/* Update the group last_id if needed. */
|
||||||
@ -722,9 +739,8 @@ size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end
|
|||||||
|
|
||||||
/* Emit a two elements array for each item. The first is
|
/* Emit a two elements array for each item. The first is
|
||||||
* the ID, the second is an array of field-value pairs. */
|
* the ID, the second is an array of field-value pairs. */
|
||||||
sds replyid = sdscatfmt(sdsempty(),"+%U-%U\r\n",id.ms,id.seq);
|
|
||||||
addReplyMultiBulkLen(c,2);
|
addReplyMultiBulkLen(c,2);
|
||||||
addReplySds(c,replyid);
|
addReplyStreamID(c,&id);
|
||||||
addReplyMultiBulkLen(c,numfields*2);
|
addReplyMultiBulkLen(c,numfields*2);
|
||||||
|
|
||||||
/* Emit the field-value pairs. */
|
/* Emit the field-value pairs. */
|
||||||
@ -744,7 +760,7 @@ size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end
|
|||||||
* associated with some other consumer, because if we reached this
|
* associated with some other consumer, because if we reached this
|
||||||
* loop the IDs the user is requesting are greater than any message
|
* loop the IDs the user is requesting are greater than any message
|
||||||
* delivered for this group. */
|
* delivered for this group. */
|
||||||
if (group && !noack) {
|
if (group && !(flags & STREAM_RWR_NOACK)) {
|
||||||
unsigned char buf[sizeof(streamID)];
|
unsigned char buf[sizeof(streamID)];
|
||||||
streamEncodeID(buf,&id);
|
streamEncodeID(buf,&id);
|
||||||
streamNACK *nack = streamCreateNACK(consumer);
|
streamNACK *nack = streamCreateNACK(consumer);
|
||||||
@ -755,10 +771,54 @@ size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
streamIteratorStop(&si);
|
streamIteratorStop(&si);
|
||||||
setDeferredMultiBulkLength(c,arraylen_ptr,arraylen);
|
if (arraylen_ptr) setDeferredMultiBulkLength(c,arraylen_ptr,arraylen);
|
||||||
return arraylen;
|
return arraylen;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* This is an helper function for streamReplyWithRange() when called with
|
||||||
|
* group and consumer arguments, but with a range that is referring to already
|
||||||
|
* delivered messages. In this case we just emit messages that are already
|
||||||
|
* in the history of the conusmer, fetching the IDs from its PEL.
|
||||||
|
*
|
||||||
|
* Note that this function does not have a 'rev' argument because it's not
|
||||||
|
* possible to iterate in reverse using a group. Basically this function
|
||||||
|
* is only called as a result of the XREADGROUP command.
|
||||||
|
*
|
||||||
|
* This function is more expensive because it needs to inspect the PEL and then
|
||||||
|
* seek into the radix tree of the messages in order to emit the full message
|
||||||
|
* to the client. However clients only reach this code path when they are
|
||||||
|
* fetching the history of already retrieved messages, which is rare. */
|
||||||
|
size_t streamReplyWithRangeFromConsumerPEL(client *c, stream *s, streamID *start, streamID *end, size_t count, streamCG *group, streamConsumer *consumer) {
|
||||||
|
raxIterator ri;
|
||||||
|
unsigned char startkey[sizeof(streamID)];
|
||||||
|
unsigned char endkey[sizeof(streamID)];
|
||||||
|
streamEncodeID(startkey,start);
|
||||||
|
if (end) streamEncodeID(endkey,start);
|
||||||
|
|
||||||
|
size_t arraylen = 0;
|
||||||
|
void *arraylen_ptr = addDeferredMultiBulkLength(c);
|
||||||
|
raxStart(&ri,consumer->pel);
|
||||||
|
raxSeek(&ri,">=",startkey,sizeof(startkey));
|
||||||
|
while(raxNext(&ri)) {
|
||||||
|
if (end && memcmp(end,ri.key,ri.key_len) > 0) break;
|
||||||
|
if (streamReplyWithRange(c,s,start,end,1,0,NULL,NULL,
|
||||||
|
STREAM_RWR_RAWENTRIES) == 0)
|
||||||
|
{
|
||||||
|
/* Note that we may have a not acknowledged entry in the PEL
|
||||||
|
* about a message that's no longer here because was removed
|
||||||
|
* by the user by other means. In that case we signal it emitting
|
||||||
|
* the ID but then a NULL entry for the fields. */
|
||||||
|
addReplyMultiBulkLen(c,2);
|
||||||
|
streamID id;
|
||||||
|
streamDecodeID(ri.key,&id);
|
||||||
|
addReplyStreamID(c,&id);
|
||||||
|
addReply(c,shared.nullmultibulk);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
raxStop(&ri);
|
||||||
|
setDeferredMultiBulkLength(c,arraylen_ptr,arraylen);
|
||||||
|
}
|
||||||
|
|
||||||
/* -----------------------------------------------------------------------
|
/* -----------------------------------------------------------------------
|
||||||
* Stream commands implementation
|
* Stream commands implementation
|
||||||
* ----------------------------------------------------------------------- */
|
* ----------------------------------------------------------------------- */
|
||||||
@ -904,8 +964,7 @@ void xaddCommand(client *c) {
|
|||||||
"target stream top item");
|
"target stream top item");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
sds reply = sdscatfmt(sdsempty(),"+%U-%U\r\n",id.ms,id.seq);
|
addReplyStreamID(c,&id);
|
||||||
addReplySds(c,reply);
|
|
||||||
|
|
||||||
signalModifiedKey(c->db,c->argv[1]);
|
signalModifiedKey(c->db,c->argv[1]);
|
||||||
notifyKeyspaceEvent(NOTIFY_STREAM,"xadd",c->argv[1],c->db->id);
|
notifyKeyspaceEvent(NOTIFY_STREAM,"xadd",c->argv[1],c->db->id);
|
||||||
|
Loading…
Reference in New Issue
Block a user