mirror of
https://codeberg.org/redict/redict.git
synced 2025-01-21 23:58:51 -05:00
50ee0f5be8
Based on feedback from interested parties
154 lines
8.2 KiB
C
154 lines
8.2 KiB
C
// SPDX-FileCopyrightText: 2024 Redict Contributors
|
|
// SPDX-FileCopyrightText: 2024 Salvatore Sanfilippo <antirez at gmail dot com>
|
|
//
|
|
// SPDX-License-Identifier: BSD-3-Clause
|
|
// SPDX-License-Identifier: LGPL-3.0-only
|
|
|
|
#ifndef STREAM_H
|
|
#define STREAM_H
|
|
|
|
#include "rax.h"
|
|
#include "listpack.h"
|
|
|
|
/* Stream item ID: a 128 bit number composed of a milliseconds time and
|
|
* a sequence counter. IDs generated in the same millisecond (or in a past
|
|
* millisecond if the clock jumped backward) will use the millisecond time
|
|
* of the latest generated ID and an incremented sequence. */
|
|
typedef struct streamID {
|
|
uint64_t ms; /* Unix time in milliseconds. */
|
|
uint64_t seq; /* Sequence number. */
|
|
} streamID;
|
|
|
|
typedef struct stream {
|
|
rax *rax; /* The radix tree holding the stream. */
|
|
uint64_t length; /* Current number of elements inside this stream. */
|
|
streamID last_id; /* Zero if there are yet no items. */
|
|
streamID first_id; /* The first non-tombstone entry, zero if empty. */
|
|
streamID max_deleted_entry_id; /* The maximal ID that was deleted. */
|
|
uint64_t entries_added; /* All time count of elements added. */
|
|
rax *cgroups; /* Consumer groups dictionary: name -> streamCG */
|
|
} stream;
|
|
|
|
/* We define an iterator to iterate stream items in an abstract way, without
|
|
* caring about the radix tree + listpack representation. Technically speaking
|
|
* the iterator is only used inside streamReplyWithRange(), so could just
|
|
* be implemented inside the function, but practically there is the AOF
|
|
* rewriting code that also needs to iterate the stream to emit the XADD
|
|
* commands. */
|
|
typedef struct streamIterator {
|
|
stream *stream; /* The stream we are iterating. */
|
|
streamID master_id; /* ID of the master entry at listpack head. */
|
|
uint64_t master_fields_count; /* Master entries # of fields. */
|
|
unsigned char *master_fields_start; /* Master entries start in listpack. */
|
|
unsigned char *master_fields_ptr; /* Master field to emit next. */
|
|
int entry_flags; /* Flags of entry we are emitting. */
|
|
int rev; /* True if iterating end to start (reverse). */
|
|
int skip_tombstones; /* True if not emitting tombstone entries. */
|
|
uint64_t start_key[2]; /* Start key as 128 bit big endian. */
|
|
uint64_t end_key[2]; /* End key as 128 bit big endian. */
|
|
raxIterator ri; /* Rax iterator. */
|
|
unsigned char *lp; /* Current listpack. */
|
|
unsigned char *lp_ele; /* Current listpack cursor. */
|
|
unsigned char *lp_flags; /* Current entry flags pointer. */
|
|
/* Buffers used to hold the string of lpGet() when the element is
|
|
* integer encoded, so that there is no string representation of the
|
|
* element inside the listpack itself. */
|
|
unsigned char field_buf[LP_INTBUF_SIZE];
|
|
unsigned char value_buf[LP_INTBUF_SIZE];
|
|
} streamIterator;
|
|
|
|
/* Consumer group. */
|
|
typedef struct streamCG {
|
|
streamID last_id; /* Last delivered (not acknowledged) ID for this
|
|
group. Consumers that will just ask for more
|
|
messages will served with IDs > than this. */
|
|
long long entries_read; /* In a perfect world (CG starts at 0-0, no dels, no
|
|
XGROUP SETID, ...), this is the total number of
|
|
group reads. In the real world, the reasoning behind
|
|
this value is detailed at the top comment of
|
|
streamEstimateDistanceFromFirstEverEntry(). */
|
|
rax *pel; /* Pending entries list. This is a radix tree that
|
|
has every message delivered to consumers (without
|
|
the NOACK option) that was yet not acknowledged
|
|
as processed. The key of the radix tree is the
|
|
ID as a 64 bit big endian number, while the
|
|
associated value is a streamNACK structure.*/
|
|
rax *consumers; /* A radix tree representing the consumers by name
|
|
and their associated representation in the form
|
|
of streamConsumer structures. */
|
|
} streamCG;
|
|
|
|
/* A specific consumer in a consumer group. */
|
|
typedef struct streamConsumer {
|
|
mstime_t seen_time; /* Last time this consumer tried to perform an action (attempted reading/claiming). */
|
|
mstime_t active_time; /* Last time this consumer was active (successful reading/claiming). */
|
|
sds name; /* Consumer name. This is how the consumer
|
|
will be identified in the consumer group
|
|
protocol. Case sensitive. */
|
|
rax *pel; /* Consumer specific pending entries list: all
|
|
the pending messages delivered to this
|
|
consumer not yet acknowledged. Keys are
|
|
big endian message IDs, while values are
|
|
the same streamNACK structure referenced
|
|
in the "pel" of the consumer group structure
|
|
itself, so the value is shared. */
|
|
} streamConsumer;
|
|
|
|
/* Pending (yet not acknowledged) message in a consumer group. */
|
|
typedef struct streamNACK {
|
|
mstime_t delivery_time; /* Last time this message was delivered. */
|
|
uint64_t delivery_count; /* Number of times this message was delivered.*/
|
|
streamConsumer *consumer; /* The consumer this message was delivered to
|
|
in the last delivery. */
|
|
} streamNACK;
|
|
|
|
/* Stream propagation information, passed to functions in order to propagate
|
|
* XCLAIM commands to AOF and slaves. */
|
|
typedef struct streamPropInfo {
|
|
robj *keyname;
|
|
robj *groupname;
|
|
} streamPropInfo;
|
|
|
|
/* Prototypes of exported APIs. */
|
|
struct client;
|
|
|
|
/* Flags for streamCreateConsumer */
|
|
#define SCC_DEFAULT 0
|
|
#define SCC_NO_NOTIFY (1<<0) /* Do not notify key space if consumer created */
|
|
#define SCC_NO_DIRTIFY (1<<1) /* Do not dirty++ if consumer created */
|
|
|
|
#define SCG_INVALID_ENTRIES_READ -1
|
|
|
|
stream *streamNew(void);
|
|
void freeStream(stream *s);
|
|
unsigned long streamLength(const robj *subject);
|
|
size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end, size_t count, int rev, streamCG *group, streamConsumer *consumer, int flags, streamPropInfo *spi);
|
|
void streamIteratorStart(streamIterator *si, stream *s, streamID *start, streamID *end, int rev);
|
|
int streamIteratorGetID(streamIterator *si, streamID *id, int64_t *numfields);
|
|
void streamIteratorGetField(streamIterator *si, unsigned char **fieldptr, unsigned char **valueptr, int64_t *fieldlen, int64_t *valuelen);
|
|
void streamIteratorRemoveEntry(streamIterator *si, streamID *current);
|
|
void streamIteratorStop(streamIterator *si);
|
|
streamCG *streamLookupCG(stream *s, sds groupname);
|
|
streamConsumer *streamLookupConsumer(streamCG *cg, sds name);
|
|
streamConsumer *streamCreateConsumer(streamCG *cg, sds name, robj *key, int dbid, int flags);
|
|
streamCG *streamCreateCG(stream *s, char *name, size_t namelen, streamID *id, long long entries_read);
|
|
streamNACK *streamCreateNACK(streamConsumer *consumer);
|
|
void streamDecodeID(void *buf, streamID *id);
|
|
int streamCompareID(streamID *a, streamID *b);
|
|
void streamFreeNACK(streamNACK *na);
|
|
int streamIncrID(streamID *id);
|
|
int streamDecrID(streamID *id);
|
|
void streamPropagateConsumerCreation(client *c, robj *key, robj *groupname, sds consumername);
|
|
robj *streamDup(robj *o);
|
|
int streamValidateListpackIntegrity(unsigned char *lp, size_t size, int deep);
|
|
int streamParseID(const robj *o, streamID *id);
|
|
robj *createObjectFromStreamID(streamID *id);
|
|
int streamAppendItem(stream *s, robj **argv, int64_t numfields, streamID *added_id, streamID *use_id, int seq_given);
|
|
int streamDeleteItem(stream *s, streamID *id);
|
|
void streamGetEdgeID(stream *s, int first, int skip_tombstones, streamID *edge_id);
|
|
long long streamEstimateDistanceFromFirstEverEntry(stream *s, streamID *id);
|
|
int64_t streamTrimByLength(stream *s, long long maxlen, int approx);
|
|
int64_t streamTrimByID(stream *s, streamID minid, int approx);
|
|
|
|
#endif
|