diff --git a/src/rdb.c b/src/rdb.c index 45c2b3acd..a28601b4f 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -682,6 +682,44 @@ ssize_t rdbSaveStreamPEL(rio *rdb, rax *pel, int nacks) { return nwritten; } +/* Serialize the consumers of a stream consumer group into the RDB. Helper + * function for the stream data type serialization. What we do here is to + * persist the consumer metadata, and it's PEL, for each consumer. */ +size_t rdbSaveStreamConsumers(rio *rdb, streamCG *cg) { + ssize_t n, nwritten = 0; + + /* Number of consumers in this consumer group. */ + if ((n = rdbSaveLen(rdb,raxSize(cg->consumers))) == -1) return -1; + nwritten += n; + + /* Save each consumer. */ + raxIterator ri; + raxStart(&ri,cg->consumers); + raxSeek(&ri,"^",NULL,0); + while(raxNext(&ri)) { + streamConsumer *consumer = ri.data; + + /* Consumer name. */ + if ((n = rdbSaveRawString(rdb,ri.key,ri.key_len)) == -1) return -1; + nwritten += n; + + /* Last seen time. */ + if ((n = rdbSaveMillisecondTime(rdb,consumer->seen_time)) == -1) + return -1; + nwritten += n; + + /* Consumer PEL, without the ACKs (see last parameter of the function + * passed with value of 0), at loading time we'll lookup the ID + * in the consumer group global PEL and will put a reference in the + * consumer local PEL. */ + if ((n = rdbSaveStreamPEL(rdb,consumer->pel,0)) == -1) + return -1; + nwritten += n; + } + raxStop(&ri); + return nwritten; +} + /* Save a Redis object. * Returns -1 on error, number of bytes written on success. */ ssize_t rdbSaveObject(rio *rdb, robj *o) { @@ -853,7 +891,7 @@ ssize_t rdbSaveObject(rio *rdb, robj *o) { while(raxNext(&ri)) { streamCG *cg = ri.data; - /* Save the consumer group name. */ + /* Save the group name. */ if ((n = rdbSaveRawString(rdb,ri.key,ri.key_len)) == -1) return -1; nwritten += n; @@ -868,6 +906,8 @@ ssize_t rdbSaveObject(rio *rdb, robj *o) { nwritten += n; /* Save the consumers of this group. */ + if ((n = rdbSaveStreamPEL(rdb,cg->pel,1)) == -1) return -1; + nwritten += n; } raxStop(&ri); } else if (o->type == OBJ_MODULE) {