CG: RDB saving part 2, consumers.

This commit is contained in:
antirez 2018-01-31 17:06:32 +01:00
parent 8fb6048ed0
commit db7a5f23b4

View File

@ -682,6 +682,44 @@ ssize_t rdbSaveStreamPEL(rio *rdb, rax *pel, int nacks) {
return nwritten;
}
/* Serialize the consumers of a stream consumer group into the RDB. Helper
* function for the stream data type serialization. What we do here is to
* persist the consumer metadata, and it's PEL, for each consumer. */
size_t rdbSaveStreamConsumers(rio *rdb, streamCG *cg) {
ssize_t n, nwritten = 0;
/* Number of consumers in this consumer group. */
if ((n = rdbSaveLen(rdb,raxSize(cg->consumers))) == -1) return -1;
nwritten += n;
/* Save each consumer. */
raxIterator ri;
raxStart(&ri,cg->consumers);
raxSeek(&ri,"^",NULL,0);
while(raxNext(&ri)) {
streamConsumer *consumer = ri.data;
/* Consumer name. */
if ((n = rdbSaveRawString(rdb,ri.key,ri.key_len)) == -1) return -1;
nwritten += n;
/* Last seen time. */
if ((n = rdbSaveMillisecondTime(rdb,consumer->seen_time)) == -1)
return -1;
nwritten += n;
/* Consumer PEL, without the ACKs (see last parameter of the function
* passed with value of 0), at loading time we'll lookup the ID
* in the consumer group global PEL and will put a reference in the
* consumer local PEL. */
if ((n = rdbSaveStreamPEL(rdb,consumer->pel,0)) == -1)
return -1;
nwritten += n;
}
raxStop(&ri);
return nwritten;
}
/* Save a Redis object.
* Returns -1 on error, number of bytes written on success. */
ssize_t rdbSaveObject(rio *rdb, robj *o) {
@ -853,7 +891,7 @@ ssize_t rdbSaveObject(rio *rdb, robj *o) {
while(raxNext(&ri)) {
streamCG *cg = ri.data;
/* Save the consumer group name. */
/* Save the group name. */
if ((n = rdbSaveRawString(rdb,ri.key,ri.key_len)) == -1) return -1;
nwritten += n;
@ -868,6 +906,8 @@ ssize_t rdbSaveObject(rio *rdb, robj *o) {
nwritten += n;
/* Save the consumers of this group. */
if ((n = rdbSaveStreamPEL(rdb,cg->pel,1)) == -1) return -1;
nwritten += n;
}
raxStop(&ri);
} else if (o->type == OBJ_MODULE) {