Streams: RDB saving.

This commit is contained in:
antirez 2017-09-05 13:14:13 +02:00
parent 100d43c1ac
commit 485014cc74
5 changed files with 29 additions and 2 deletions

View File

@ -1655,6 +1655,11 @@ int raxEOF(raxIterator *it) {
return it->flags & RAX_ITER_EOF;
}
/* Return the number of elements inside the radix tree. */
uint64_t raxSize(rax *rax) {
return rax->numele;
}
/* ----------------------------- Introspection ------------------------------ */
/* This function is mostly used for debugging and learning purposes.

View File

@ -157,5 +157,6 @@ int raxCompare(raxIterator *iter, const char *op, unsigned char *key, size_t key
void raxStop(raxIterator *it);
int raxEOF(raxIterator *it);
void raxShow(rax *rax);
uint64_t raxSize(rax *rax);
#endif

View File

@ -31,6 +31,7 @@
#include "lzf.h" /* LZF compression library */
#include "zipmap.h"
#include "endianconv.h"
#include "stream.h"
#include <math.h>
#include <sys/types.h>
@ -622,6 +623,8 @@ int rdbSaveObjectType(rio *rdb, robj *o) {
return rdbSaveType(rdb,RDB_TYPE_HASH);
else
serverPanic("Unknown hash encoding");
case OBJ_STREAM:
return rdbSaveType(rdb,RDB_TYPE_STREAM_LISTPACKS);
case OBJ_MODULE:
return rdbSaveType(rdb,RDB_TYPE_MODULE_2);
default:
@ -762,7 +765,26 @@ ssize_t rdbSaveObject(rio *rdb, robj *o) {
} else {
serverPanic("Unknown hash encoding");
}
} else if (o->type == OBJ_STREAM) {
/* Store how many listpacks we have inside the radix tree. */
stream *s = o->ptr;
rax *rax = s->rax;
if ((n = rdbSaveLen(rdb,raxSize(rax))) == -1) return -1;
nwritten += n;
/* Serialize all the listpacks inside the radix tree as they are,
* when loading back, we'll use the first entry of each listpack
* to insert it back into the radix tree. */
raxIterator ri;
raxStart(&ri,rax);
raxSeek(&ri,"^",NULL,0);
while (raxNext(&ri)) {
unsigned char *lp = ri.data;
size_t lp_bytes = lpBytes(lp);
if ((n = rdbSaveRawString(rdb,lp,lp_bytes)) == -1) return -1;
nwritten += n;
}
raxStop(&ri);
} else if (o->type == OBJ_MODULE) {
/* Save a module-specific value. */
RedisModuleIO io;

View File

@ -2,6 +2,7 @@
#define STREAM_H
#include "rax.h"
#include "listpack.h"
/* Stream item ID: a 128 bit number composed of a milliseconds time and
* a sequence counter. IDs generated in the same millisecond (or in a past

View File

@ -32,7 +32,6 @@
*/
#include "server.h"
#include "listpack.h"
#include "endianconv.h"
#include "stream.h"
@ -169,7 +168,6 @@ void streamAppendItem(stream *s, robj **argv, int numfields, streamID *added_id)
s->length++;
s->last_id = id;
if (added_id) *added_id = id;
raxShow(s->rax);
}
/* Send the specified range to the client 'c'. The range the client will