CG: RDB loading first implementation.

This commit is contained in:
antirez 2018-02-14 16:37:24 +01:00
parent db7a5f23b4
commit f4e1a4de25
3 changed files with 88 additions and 9 deletions

View File

@ -75,6 +75,18 @@ static int rdbWriteRaw(rio *rdb, void *p, size_t len) {
return len; return len;
} }
/* This is just a wrapper for the low level function rioRead() that will
* automatically abort if it is not possible to read the specified amount
* of bytes. */
void rdbLoadRaw(rio *rdb, void *buf, uint64_t len) {
if (rioRead(rdb,buf,len) == 0) {
rdbExitReportCorruptRDB(
"Impossible to read %llu bytes in rdbLoadRaw()",
(unsigned long long) len);
return; /* Not reached. */
}
}
int rdbSaveType(rio *rdb, unsigned char type) { int rdbSaveType(rio *rdb, unsigned char type) {
return rdbWriteRaw(rdb,&type,1); return rdbWriteRaw(rdb,&type,1);
} }
@ -90,7 +102,7 @@ int rdbLoadType(rio *rdb) {
time_t rdbLoadTime(rio *rdb) { time_t rdbLoadTime(rio *rdb) {
int32_t t32; int32_t t32;
if (rioRead(rdb,&t32,4) == 0) return -1; rdbLoadRaw(rdb,&t32,4);
return (time_t)t32; return (time_t)t32;
} }
@ -101,7 +113,7 @@ int rdbSaveMillisecondTime(rio *rdb, long long t) {
long long rdbLoadMillisecondTime(rio *rdb) { long long rdbLoadMillisecondTime(rio *rdb) {
int64_t t64; int64_t t64;
if (rioRead(rdb,&t64,8) == 0) return -1; rdbLoadRaw(rdb,&t64,8);
return (long long)t64; return (long long)t64;
} }
@ -906,7 +918,7 @@ ssize_t rdbSaveObject(rio *rdb, robj *o) {
nwritten += n; nwritten += n;
/* Save the consumers of this group. */ /* Save the consumers of this group. */
if ((n = rdbSaveStreamPEL(rdb,cg->pel,1)) == -1) return -1; if ((n = rdbSaveStreamConsumers(rdb,cg)) == -1) return -1;
nwritten += n; nwritten += n;
} }
raxStop(&ri); raxStop(&ri);
@ -1599,6 +1611,72 @@ robj *rdbLoadObject(int rdbtype, rio *rdb) {
/* Load the last entry ID. */ /* Load the last entry ID. */
s->last_id.ms = rdbLoadLen(rdb,NULL); s->last_id.ms = rdbLoadLen(rdb,NULL);
s->last_id.seq = rdbLoadLen(rdb,NULL); s->last_id.seq = rdbLoadLen(rdb,NULL);
/* Consumer groups loading */
size_t cgroups_count = rdbLoadLen(rdb,NULL);
while(cgroups_count--) {
/* Get the consumer group name and ID. We can then create the
* consumer group ASAP and populate its structure as
* we read more data. */
streamID cg_id;
sds cgname = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL);
cg_id.ms = rdbLoadLen(rdb,NULL);
cg_id.seq = rdbLoadLen(rdb,NULL);
streamCG *cgroup = streamCreateCG(s,cgname,sdslen(cgname),&cg_id);
if (cgroup == NULL)
rdbExitReportCorruptRDB("Duplicated consumer group name %s",
cgname);
sdsfree(cgname);
/* Load the global PEL for this consumer group, however we'll
* not yet populate the NACK structures with the message
* owner, since consumers for this group and their messages will
* be read as a next step. So for now leave them not resolved
* and later populate it. */
size_t pel_size = rdbLoadLen(rdb,NULL);
while(pel_size--) {
unsigned char rawid[sizeof(streamID)];
rdbLoadRaw(rdb,rawid,sizeof(rawid));
streamNACK *nack = streamCreateNACK(NULL);
nack->delivery_time = rdbLoadMillisecondTime(rdb);
nack->delivery_count = rdbLoadLen(rdb,NULL);
if (!raxInsert(cgroup->pel,rawid,sizeof(rawid),nack,NULL))
rdbExitReportCorruptRDB("Duplicated gobal PEL entry "
"loading stream consumer group");
}
/* Now that we loaded our global PEL, we need to load the
* consumers and their local PELs. */
size_t consumers_num = rdbLoadLen(rdb,NULL);
while(consumers_num--) {
sds cname = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL);
streamConsumer *consumer = streamLookupConsumer(cgroup,cname,
1);
sdsfree(cname);
consumer->seen_time = rdbLoadMillisecondTime(rdb);
/* Load the PEL about entries owned by this specific
* consumer. */
pel_size = rdbLoadLen(rdb,NULL);
while(pel_size--) {
unsigned char rawid[sizeof(streamID)];
rdbLoadRaw(rdb,rawid,sizeof(rawid));
streamNACK *nack = raxFind(cgroup->pel,rawid,sizeof(rawid));
if (nack == NULL)
rdbExitReportCorruptRDB("Consumer entry not found in "
"group global PEL");
/* Set the NACK consumer, that was left to NULL when
* loading the global PEL. Then set the same shared
* NACK structure also in the consumer-specific PEL. */
nack->consumer = consumer;
if (raxInsert(consumer->pel,rawid,sizeof(rawid),nack,NULL))
rdbExitReportCorruptRDB("Duplicated consumer PEL entry "
" loading a stream consumer "
"group");
}
}
}
} else if (rdbtype == RDB_TYPE_MODULE || rdbtype == RDB_TYPE_MODULE_2) { } else if (rdbtype == RDB_TYPE_MODULE || rdbtype == RDB_TYPE_MODULE_2) {
uint64_t moduleid = rdbLoadLen(rdb,NULL); uint64_t moduleid = rdbLoadLen(rdb,NULL);
moduleType *mt = moduleTypeLookupModuleByID(moduleid); moduleType *mt = moduleTypeLookupModuleByID(moduleid);
@ -1728,7 +1806,7 @@ int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi) {
/* EXPIRETIME: load an expire associated with the next key /* EXPIRETIME: load an expire associated with the next key
* to load. Note that after loading an expire we need to * to load. Note that after loading an expire we need to
* load the actual type, and continue. */ * load the actual type, and continue. */
if ((expiretime = rdbLoadTime(rdb)) == -1) goto eoferr; expiretime = rdbLoadTime(rdb);
/* We read the time so we need to read the object type again. */ /* We read the time so we need to read the object type again. */
if ((type = rdbLoadType(rdb)) == -1) goto eoferr; if ((type = rdbLoadType(rdb)) == -1) goto eoferr;
/* the EXPIRETIME opcode specifies time in seconds, so convert /* the EXPIRETIME opcode specifies time in seconds, so convert
@ -1737,7 +1815,7 @@ int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi) {
} else if (type == RDB_OPCODE_EXPIRETIME_MS) { } else if (type == RDB_OPCODE_EXPIRETIME_MS) {
/* EXPIRETIME_MS: milliseconds precision expire times introduced /* EXPIRETIME_MS: milliseconds precision expire times introduced
* with RDB v3. Like EXPIRETIME but no with more precision. */ * with RDB v3. Like EXPIRETIME but no with more precision. */
if ((expiretime = rdbLoadMillisecondTime(rdb)) == -1) goto eoferr; expiretime = rdbLoadMillisecondTime(rdb);
/* We read the time so we need to read the object type again. */ /* We read the time so we need to read the object type again. */
if ((type = rdbLoadType(rdb)) == -1) goto eoferr; if ((type = rdbLoadType(rdb)) == -1) goto eoferr;
} else if (type == RDB_OPCODE_EOF) { } else if (type == RDB_OPCODE_EOF) {

View File

@ -96,5 +96,7 @@ void streamIteratorGetField(streamIterator *si, unsigned char **fieldptr, unsign
void streamIteratorStop(streamIterator *si); void streamIteratorStop(streamIterator *si);
streamCG *streamLookupCG(stream *s, sds groupname); streamCG *streamLookupCG(stream *s, sds groupname);
streamConsumer *streamLookupConsumer(streamCG *cg, sds name, int create); streamConsumer *streamLookupConsumer(streamCG *cg, sds name, int create);
streamCG *streamCreateCG(stream *s, char *name, size_t namelen, streamID *id);
streamNACK *streamCreateNACK(streamConsumer *consumer);
#endif #endif

View File

@ -41,7 +41,6 @@
#define STREAM_ITEM_FLAG_SAMEFIELDS (1<<1) /* Same fields as master entry. */ #define STREAM_ITEM_FLAG_SAMEFIELDS (1<<1) /* Same fields as master entry. */
void streamFreeCG(streamCG *cg); void streamFreeCG(streamCG *cg);
streamNACK *streamCreateNACK(streamConsumer *consumer);
size_t streamReplyWithRangeFromConsumerPEL(client *c, stream *s, streamID *start, streamID *end, size_t count, streamCG *group, streamConsumer *consumer); size_t streamReplyWithRangeFromConsumerPEL(client *c, stream *s, streamID *start, streamID *end, size_t count, streamCG *group, streamConsumer *consumer);
/* ----------------------------------------------------------------------- /* -----------------------------------------------------------------------
@ -1304,7 +1303,7 @@ void streamFreeConsumer(streamConsumer *sc) {
* specified name and last server ID. If a consumer group with the same name * specified name and last server ID. If a consumer group with the same name
* already existed NULL is returned, otherwise the pointer to the consumer * already existed NULL is returned, otherwise the pointer to the consumer
* group is returned. */ * group is returned. */
streamCG *streamCreateCG(stream *s, char *name, size_t namelen, streamID id) { streamCG *streamCreateCG(stream *s, char *name, size_t namelen, streamID *id) {
if (s->cgroups == NULL) s->cgroups = raxNew(); if (s->cgroups == NULL) s->cgroups = raxNew();
if (raxFind(s->cgroups,(unsigned char*)name,namelen) != raxNotFound) if (raxFind(s->cgroups,(unsigned char*)name,namelen) != raxNotFound)
return NULL; return NULL;
@ -1312,7 +1311,7 @@ streamCG *streamCreateCG(stream *s, char *name, size_t namelen, streamID id) {
streamCG *cg = zmalloc(sizeof(*cg)); streamCG *cg = zmalloc(sizeof(*cg));
cg->pel = raxNew(); cg->pel = raxNew();
cg->consumers = raxNew(); cg->consumers = raxNew();
cg->last_id = id; cg->last_id = *id;
raxInsert(s->cgroups,(unsigned char*)name,namelen,cg,NULL); raxInsert(s->cgroups,(unsigned char*)name,namelen,cg,NULL);
return cg; return cg;
} }
@ -1388,7 +1387,7 @@ NULL
} else if (streamParseIDOrReply(c,c->argv[4],&id,0) != C_OK) { } else if (streamParseIDOrReply(c,c->argv[4],&id,0) != C_OK) {
return; return;
} }
streamCG *cg = streamCreateCG(s,grpname,sdslen(grpname),id); streamCG *cg = streamCreateCG(s,grpname,sdslen(grpname),&id);
if (cg) { if (cg) {
addReply(c,shared.ok); addReply(c,shared.ok);
} else { } else {