2017-08-30 06:40:27 -04:00
|
|
|
#ifndef STREAM_H
|
|
|
|
#define STREAM_H
|
|
|
|
|
|
|
|
#include "rax.h"
|
2017-09-05 07:14:13 -04:00
|
|
|
#include "listpack.h"
|
2017-08-30 06:40:27 -04:00
|
|
|
|
|
|
|
/* Stream item ID: a 128 bit number composed of a milliseconds time and
|
|
|
|
* a sequence counter. IDs generated in the same millisecond (or in a past
|
|
|
|
* millisecond if the clock jumped backward) will use the millisecond time
|
|
|
|
* of the latest generated ID and an incremented sequence. */
|
|
|
|
typedef struct streamID {
|
|
|
|
uint64_t ms; /* Unix time in milliseconds. */
|
|
|
|
uint64_t seq; /* Sequence number. */
|
|
|
|
} streamID;
|
|
|
|
|
|
|
|
typedef struct stream {
|
|
|
|
rax *rax; /* The radix tree holding the stream. */
|
|
|
|
uint64_t length; /* Number of elements inside this stream. */
|
|
|
|
streamID last_id; /* Zero if there are yet no items. */
|
|
|
|
} stream;
|
|
|
|
|
2017-09-15 06:17:25 -04:00
|
|
|
/* We define an iterator to iterate stream items in an abstract way, without
|
|
|
|
* caring about the radix tree + listpack representation. Technically speaking
|
|
|
|
* the iterator is only used inside streamReplyWithRange(), so could just
|
|
|
|
* be implemented inside the function, but practically there is the AOF
|
|
|
|
* rewriting code that also needs to iterate the stream to emit the XADD
|
|
|
|
* commands. */
|
|
|
|
typedef struct streamIterator {
|
|
|
|
uint64_t start_key[2]; /* Start key as 128 bit big endian. */
|
|
|
|
uint64_t end_key[2]; /* End key as 128 bit big endian. */
|
|
|
|
raxIterator ri; /* Rax iterator. */
|
|
|
|
unsigned char *lp; /* Current listpack. */
|
|
|
|
unsigned char *lp_ele; /* Current listpack cursor. */
|
|
|
|
/* Buffers used to hold the string of lpGet() when the element is
|
|
|
|
* integer encoded, so that there is no string representation of the
|
|
|
|
* element inside the listpack itself. */
|
|
|
|
unsigned char field_buf[LP_INTBUF_SIZE];
|
|
|
|
unsigned char value_buf[LP_INTBUF_SIZE];
|
|
|
|
} streamIterator;
|
|
|
|
|
|
|
|
/* Prototypes of exported APIs. */
|
|
|
|
|
|
|
|
struct client;
|
|
|
|
|
|
|
|
stream *streamNew(void);
|
|
|
|
void freeStream(stream *s);
|
|
|
|
size_t streamReplyWithRange(struct client *c, stream *s, streamID *start, streamID *end, size_t count);
|
|
|
|
void streamIteratorStart(streamIterator *si, stream *s, streamID *start, streamID *end);
|
|
|
|
int streamIteratorGetID(streamIterator *si, streamID *id, int64_t *numfields);
|
|
|
|
void streamIteratorGetField(streamIterator *si, unsigned char **fieldptr, unsigned char **valueptr, int64_t *fieldlen, int64_t *valuelen);
|
|
|
|
void streamIteratorStop(streamIterator *si);
|
|
|
|
|
2017-08-30 06:40:27 -04:00
|
|
|
#endif
|