#ifndef STREAM_H #define STREAM_H #include "rax.h" #include "listpack.h" /* Stream item ID: a 128 bit number composed of a milliseconds time and * a sequence counter. IDs generated in the same millisecond (or in a past * millisecond if the clock jumped backward) will use the millisecond time * of the latest generated ID and an incremented sequence. */ typedef struct streamID { uint64_t ms; /* Unix time in milliseconds. */ uint64_t seq; /* Sequence number. */ } streamID; typedef struct stream { rax *rax; /* The radix tree holding the stream. */ uint64_t length; /* Number of elements inside this stream. */ streamID last_id; /* Zero if there are yet no items. */ } stream; /* We define an iterator to iterate stream items in an abstract way, without * caring about the radix tree + listpack representation. Technically speaking * the iterator is only used inside streamReplyWithRange(), so could just * be implemented inside the function, but practically there is the AOF * rewriting code that also needs to iterate the stream to emit the XADD * commands. */ typedef struct streamIterator { streamID master_id; /* ID of the master entry at listpack head. */ uint64_t master_fields_count; /* Master entries # of fields. */ unsigned char *master_fields_start; /* Master entries start in listpack. */ unsigned char *master_fields_ptr; /* Master field to emit next. */ int entry_flags; /* Flags of entry we are emitting. */ int rev; /* True if iterating end to start (reverse). */ uint64_t start_key[2]; /* Start key as 128 bit big endian. */ uint64_t end_key[2]; /* End key as 128 bit big endian. */ raxIterator ri; /* Rax iterator. */ unsigned char *lp; /* Current listpack. */ unsigned char *lp_ele; /* Current listpack cursor. */ /* Buffers used to hold the string of lpGet() when the element is * integer encoded, so that there is no string representation of the * element inside the listpack itself. */ unsigned char field_buf[LP_INTBUF_SIZE]; unsigned char value_buf[LP_INTBUF_SIZE]; } streamIterator; /* Prototypes of exported APIs. */ struct client; stream *streamNew(void); void freeStream(stream *s); size_t streamReplyWithRange(struct client *c, stream *s, streamID *start, streamID *end, size_t count, int rev); void streamIteratorStart(streamIterator *si, stream *s, streamID *start, streamID *end, int rev); int streamIteratorGetID(streamIterator *si, streamID *id, int64_t *numfields); void streamIteratorGetField(streamIterator *si, unsigned char **fieldptr, unsigned char **valueptr, int64_t *fieldlen, int64_t *valuelen); void streamIteratorStop(streamIterator *si); #endif