diff --git a/src/rax.c b/src/rax.c index b4f5ae05d..3ead27ed7 100644 --- a/src/rax.c +++ b/src/rax.c @@ -1655,6 +1655,11 @@ int raxEOF(raxIterator *it) { return it->flags & RAX_ITER_EOF; } +/* Return the number of elements inside the radix tree. */ +uint64_t raxSize(rax *rax) { + return rax->numele; +} + /* ----------------------------- Introspection ------------------------------ */ /* This function is mostly used for debugging and learning purposes. diff --git a/src/rax.h b/src/rax.h index f6985c373..e22b6e699 100644 --- a/src/rax.h +++ b/src/rax.h @@ -157,5 +157,6 @@ int raxCompare(raxIterator *iter, const char *op, unsigned char *key, size_t key void raxStop(raxIterator *it); int raxEOF(raxIterator *it); void raxShow(rax *rax); +uint64_t raxSize(rax *rax); #endif diff --git a/src/rdb.c b/src/rdb.c index 19ba59ab8..c79bfa8d4 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -31,6 +31,7 @@ #include "lzf.h" /* LZF compression library */ #include "zipmap.h" #include "endianconv.h" +#include "stream.h" #include #include @@ -622,6 +623,8 @@ int rdbSaveObjectType(rio *rdb, robj *o) { return rdbSaveType(rdb,RDB_TYPE_HASH); else serverPanic("Unknown hash encoding"); + case OBJ_STREAM: + return rdbSaveType(rdb,RDB_TYPE_STREAM_LISTPACKS); case OBJ_MODULE: return rdbSaveType(rdb,RDB_TYPE_MODULE_2); default: @@ -762,7 +765,26 @@ ssize_t rdbSaveObject(rio *rdb, robj *o) { } else { serverPanic("Unknown hash encoding"); } + } else if (o->type == OBJ_STREAM) { + /* Store how many listpacks we have inside the radix tree. */ + stream *s = o->ptr; + rax *rax = s->rax; + if ((n = rdbSaveLen(rdb,raxSize(rax))) == -1) return -1; + nwritten += n; + /* Serialize all the listpacks inside the radix tree as they are, + * when loading back, we'll use the first entry of each listpack + * to insert it back into the radix tree. */ + raxIterator ri; + raxStart(&ri,rax); + raxSeek(&ri,"^",NULL,0); + while (raxNext(&ri)) { + unsigned char *lp = ri.data; + size_t lp_bytes = lpBytes(lp); + if ((n = rdbSaveRawString(rdb,lp,lp_bytes)) == -1) return -1; + nwritten += n; + } + raxStop(&ri); } else if (o->type == OBJ_MODULE) { /* Save a module-specific value. */ RedisModuleIO io; diff --git a/src/stream.h b/src/stream.h index 065c328eb..e78af5bc5 100644 --- a/src/stream.h +++ b/src/stream.h @@ -2,6 +2,7 @@ #define STREAM_H #include "rax.h" +#include "listpack.h" /* Stream item ID: a 128 bit number composed of a milliseconds time and * a sequence counter. IDs generated in the same millisecond (or in a past diff --git a/src/t_stream.c b/src/t_stream.c index dcf9fccee..9ca001d71 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -32,7 +32,6 @@ */ #include "server.h" -#include "listpack.h" #include "endianconv.h" #include "stream.h" @@ -169,7 +168,6 @@ void streamAppendItem(stream *s, robj **argv, int numfields, streamID *added_id) s->length++; s->last_id = id; if (added_id) *added_id = id; - raxShow(s->rax); } /* Send the specified range to the client 'c'. The range the client will