CG: RDB saving part 1, metadata and PEL.

This commit is contained in:
antirez 2018-01-31 12:05:04 +01:00
parent e76fb4ab25
commit 8fb6048ed0

View File

@ -642,7 +642,48 @@ int rdbLoadObjectType(rio *rdb) {
return type;
}
/* Save a Redis object. Returns -1 on error, number of bytes written on success. */
/* This helper function serializes a consumer group Pending Entries List (PEL)
* into the RDB file. The 'nacks' argument tells the function if also persist
* the informations about the not acknowledged message, or if to persist
* just the IDs: this is useful because for the global consumer group PEL
* we serialized the NACKs as well, but when serializing the local consumer
* PELs we just add the ID, that will be resolved inside the global PEL to
* put a reference to the same structure. */
ssize_t rdbSaveStreamPEL(rio *rdb, rax *pel, int nacks) {
ssize_t n, nwritten = 0;
/* Number of entries in the PEL. */
if ((n = rdbSaveLen(rdb,raxSize(pel))) == -1) return -1;
nwritten += n;
/* Save each entry. */
raxIterator ri;
raxStart(&ri,pel);
raxSeek(&ri,"^",NULL,0);
while(raxNext(&ri)) {
/* We store IDs in raw form as 128 big big endian numbers, like
* they are inside the radix tree key. */
if ((n = rdbWriteRaw(rdb,ri.key,sizeof(streamID))) == -1) return -1;
nwritten += n;
if (nacks) {
streamNACK *nack = ri.data;
if ((n = rdbSaveMillisecondTime(rdb,nack->delivery_time)) == -1)
return -1;
nwritten += n;
if ((n = rdbSaveLen(rdb,nack->delivery_count)) == -1) return -1;
nwritten += n;
/* We don't save the consumer name: we'll save the pending IDs
* for each consumer in the consumer PEL, and resolve the consumer
* at loading time. */
}
}
raxStop(&ri);
return nwritten;
}
/* Save a Redis object.
* Returns -1 on error, number of bytes written on success. */
ssize_t rdbSaveObject(rio *rdb, robj *o) {
ssize_t n = 0, nwritten = 0;
@ -798,6 +839,37 @@ ssize_t rdbSaveObject(rio *rdb, robj *o) {
nwritten += n;
if ((n = rdbSaveLen(rdb,s->last_id.seq)) == -1) return -1;
nwritten += n;
/* The consumer groups and their clients are part of the stream
* type, so serialize every consumer group. */
/* Save the number of groups. */
if ((n = rdbSaveLen(rdb,raxSize(s->cgroups))) == -1) return -1;
nwritten += n;
/* Serialize each consumer group. */
raxStart(&ri,s->cgroups);
raxSeek(&ri,"^",NULL,0);
while(raxNext(&ri)) {
streamCG *cg = ri.data;
/* Save the consumer group name. */
if ((n = rdbSaveRawString(rdb,ri.key,ri.key_len)) == -1) return -1;
nwritten += n;
/* Last ID. */
if ((n = rdbSaveLen(rdb,cg->last_id.ms)) == -1) return -1;
nwritten += n;
if ((n = rdbSaveLen(rdb,cg->last_id.seq)) == -1) return -1;
nwritten += n;
/* Save the global PEL. */
if ((n = rdbSaveStreamPEL(rdb,cg->pel,1)) == -1) return -1;
nwritten += n;
/* Save the consumers of this group. */
}
raxStop(&ri);
} else if (o->type == OBJ_MODULE) {
/* Save a module-specific value. */
RedisModuleIO io;