redict/src/stream.h
Itamar Haber c81c7f51c3
Add stream consumer group lag tracking and reporting (#9127)
Adds the ability to track the lag of a consumer group (CG), that is, the number
of entries yet-to-be-delivered from the stream.

The proposed constant-time solution is in the spirit of "best-effort."

Partially addresses #8737.

## Description of approach

We add a new "entries_added" property to the stream. This starts at 0 for a new
stream and is incremented by 1 with every `XADD`.  It is essentially an all-time
counter of the entries added to the stream.

Given the stream's length and this counter value, we can trivially find the logical
"entries_added" counter of the first ID if and only if the stream is contiguous.
A fragmented stream contains one or more tombstones generated by `XDEL`s.
The new "xdel_max_id" stream property tracks the latest tombstone.

The CG also tracks its last delivered ID's as an "entries_read" counter and
increments it independently when delivering new messages, unless the this
read counter is invalid (-1 means invalid offset). When the CG's counter is
available, the reported lag is the difference between added and read counters.

Lastly, this also adds a "first_id" field to the stream structure in order to make
looking it up cheaper in most cases.

## Limitations

There are two cases in which the mechanism isn't able to track the lag.
In these cases, `XINFO` replies with `null` in the "lag" field.

The first case is when a CG is created with an arbitrary last delivered ID,
that isn't "0-0", nor the first or the last entries of the stream. In this case,
it is impossible to obtain a valid read counter (short of an O(N) operation).
The second case is when there are one or more tombstones fragmenting
the stream's entries range.

In both cases, given enough time and assuming that the consumers are
active (reading and lacking) and advancing, the CG should be able to
catch up with the tip of the stream and report zero lag.
Once that's achieved, lag tracking would resume as normal (until the
next tombstone is set).

## API changes

* `XGROUP CREATE` added with the optional named argument `[ENTRIESREAD entries-read]`
  for explicitly specifying the new CG's counter.
* `XGROUP SETID` added with an optional positional argument `[ENTRIESREAD entries-read]`
  for specifying the CG's counter.
* `XINFO` reports the maximal tombstone ID, the recorded first entry ID, and total
  number of entries added to the stream.
* `XINFO` reports the current lag and logical read counter of CGs.
* `XSETID` is an internal command that's used in replication/aof. It has been added with
  the optional positional arguments `[ENTRIESADDED entries-added] [MAXDELETEDID max-deleted-entry-id]`
  for propagating the CG's offset and maximal tombstone ID of the stream.

## The generic unsolved problem

The current stream implementation doesn't provide an efficient way to obtain the
approximate/exact size of a range of entries. While it could've been nice to have
that ability (#5813) in general, let alone specifically in the context of CGs, the risk
and complexities involved in such implementation are in all likelihood prohibitive.

## A refactoring note

The `streamGetEdgeID` has been refactored to accommodate both the existing seek
of any entry as well as seeking non-deleted entries (the addition of the `skip_tombstones`
argument). Furthermore, this refactoring also migrated the seek logic to use the
`streamIterator` (rather than `raxIterator`) that was, in turn, extended with the
`skip_tombstones` Boolean struct field to control the emission of these.

Co-authored-by: Guy Benoish <guy.benoish@redislabs.com>
Co-authored-by: Oran Agra <oran@redislabs.com>
2022-02-23 22:34:58 +02:00

151 lines
7.9 KiB
C

#ifndef STREAM_H
#define STREAM_H
#include "rax.h"
#include "listpack.h"
/* Stream item ID: a 128 bit number composed of a milliseconds time and
* a sequence counter. IDs generated in the same millisecond (or in a past
* millisecond if the clock jumped backward) will use the millisecond time
* of the latest generated ID and an incremented sequence. */
typedef struct streamID {
uint64_t ms; /* Unix time in milliseconds. */
uint64_t seq; /* Sequence number. */
} streamID;
typedef struct stream {
rax *rax; /* The radix tree holding the stream. */
uint64_t length; /* Current number of elements inside this stream. */
streamID last_id; /* Zero if there are yet no items. */
streamID first_id; /* The first non-tombstone entry, zero if empty. */
streamID max_deleted_entry_id; /* The maximal ID that was deleted. */
uint64_t entries_added; /* All time count of elements added. */
rax *cgroups; /* Consumer groups dictionary: name -> streamCG */
} stream;
/* We define an iterator to iterate stream items in an abstract way, without
* caring about the radix tree + listpack representation. Technically speaking
* the iterator is only used inside streamReplyWithRange(), so could just
* be implemented inside the function, but practically there is the AOF
* rewriting code that also needs to iterate the stream to emit the XADD
* commands. */
typedef struct streamIterator {
stream *stream; /* The stream we are iterating. */
streamID master_id; /* ID of the master entry at listpack head. */
uint64_t master_fields_count; /* Master entries # of fields. */
unsigned char *master_fields_start; /* Master entries start in listpack. */
unsigned char *master_fields_ptr; /* Master field to emit next. */
int entry_flags; /* Flags of entry we are emitting. */
int rev; /* True if iterating end to start (reverse). */
int skip_tombstones; /* True if not emitting tombstone entries. */
uint64_t start_key[2]; /* Start key as 128 bit big endian. */
uint64_t end_key[2]; /* End key as 128 bit big endian. */
raxIterator ri; /* Rax iterator. */
unsigned char *lp; /* Current listpack. */
unsigned char *lp_ele; /* Current listpack cursor. */
unsigned char *lp_flags; /* Current entry flags pointer. */
/* Buffers used to hold the string of lpGet() when the element is
* integer encoded, so that there is no string representation of the
* element inside the listpack itself. */
unsigned char field_buf[LP_INTBUF_SIZE];
unsigned char value_buf[LP_INTBUF_SIZE];
} streamIterator;
/* Consumer group. */
typedef struct streamCG {
streamID last_id; /* Last delivered (not acknowledged) ID for this
group. Consumers that will just ask for more
messages will served with IDs > than this. */
long long entries_read; /* In a perfect world (CG starts at 0-0, no dels, no
XGROUP SETID, ...), this is the total number of
group reads. In the real world, the reasoning behind
this value is detailed at the top comment of
streamEstimateDistanceFromFirstEverEntry(). */
rax *pel; /* Pending entries list. This is a radix tree that
has every message delivered to consumers (without
the NOACK option) that was yet not acknowledged
as processed. The key of the radix tree is the
ID as a 64 bit big endian number, while the
associated value is a streamNACK structure.*/
rax *consumers; /* A radix tree representing the consumers by name
and their associated representation in the form
of streamConsumer structures. */
} streamCG;
/* A specific consumer in a consumer group. */
typedef struct streamConsumer {
mstime_t seen_time; /* Last time this consumer was active. */
sds name; /* Consumer name. This is how the consumer
will be identified in the consumer group
protocol. Case sensitive. */
rax *pel; /* Consumer specific pending entries list: all
the pending messages delivered to this
consumer not yet acknowledged. Keys are
big endian message IDs, while values are
the same streamNACK structure referenced
in the "pel" of the consumer group structure
itself, so the value is shared. */
} streamConsumer;
/* Pending (yet not acknowledged) message in a consumer group. */
typedef struct streamNACK {
mstime_t delivery_time; /* Last time this message was delivered. */
uint64_t delivery_count; /* Number of times this message was delivered.*/
streamConsumer *consumer; /* The consumer this message was delivered to
in the last delivery. */
} streamNACK;
/* Stream propagation information, passed to functions in order to propagate
* XCLAIM commands to AOF and slaves. */
typedef struct streamPropInfo {
robj *keyname;
robj *groupname;
} streamPropInfo;
/* Prototypes of exported APIs. */
struct client;
/* Flags for streamLookupConsumer */
#define SLC_DEFAULT 0
#define SLC_NO_REFRESH (1<<0) /* Do not update consumer's seen-time */
/* Flags for streamCreateConsumer */
#define SCC_DEFAULT 0
#define SCC_NO_NOTIFY (1<<0) /* Do not notify key space if consumer created */
#define SCC_NO_DIRTIFY (1<<1) /* Do not dirty++ if consumer created */
#define SCG_INVALID_ENTRIES_READ -1
stream *streamNew(void);
void freeStream(stream *s);
unsigned long streamLength(const robj *subject);
size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end, size_t count, int rev, streamCG *group, streamConsumer *consumer, int flags, streamPropInfo *spi);
void streamIteratorStart(streamIterator *si, stream *s, streamID *start, streamID *end, int rev);
int streamIteratorGetID(streamIterator *si, streamID *id, int64_t *numfields);
void streamIteratorGetField(streamIterator *si, unsigned char **fieldptr, unsigned char **valueptr, int64_t *fieldlen, int64_t *valuelen);
void streamIteratorRemoveEntry(streamIterator *si, streamID *current);
void streamIteratorStop(streamIterator *si);
streamCG *streamLookupCG(stream *s, sds groupname);
streamConsumer *streamLookupConsumer(streamCG *cg, sds name, int flags);
streamConsumer *streamCreateConsumer(streamCG *cg, sds name, robj *key, int dbid, int flags);
streamCG *streamCreateCG(stream *s, char *name, size_t namelen, streamID *id, long long entries_read);
streamNACK *streamCreateNACK(streamConsumer *consumer);
void streamDecodeID(void *buf, streamID *id);
int streamCompareID(streamID *a, streamID *b);
void streamFreeNACK(streamNACK *na);
int streamIncrID(streamID *id);
int streamDecrID(streamID *id);
void streamPropagateConsumerCreation(client *c, robj *key, robj *groupname, sds consumername);
robj *streamDup(robj *o);
int streamValidateListpackIntegrity(unsigned char *lp, size_t size, int deep);
int streamParseID(const robj *o, streamID *id);
robj *createObjectFromStreamID(streamID *id);
int streamAppendItem(stream *s, robj **argv, int64_t numfields, streamID *added_id, streamID *use_id, int seq_given);
int streamDeleteItem(stream *s, streamID *id);
void streamGetEdgeID(stream *s, int first, int skip_tombstones, streamID *edge_id);
long long streamEstimateDistanceFromFirstEverEntry(stream *s, streamID *id);
int64_t streamTrimByLength(stream *s, long long maxlen, int approx);
int64_t streamTrimByID(stream *s, streamID minid, int approx);
#endif