From f4e1a4de258b75a2ad19805a595a9fbb25e9edbf Mon Sep 17 00:00:00 2001 From: antirez Date: Wed, 14 Feb 2018 16:37:24 +0100 Subject: [PATCH] CG: RDB loading first implementation. --- src/rdb.c | 88 +++++++++++++++++++++++++++++++++++++++++++++++--- src/stream.h | 2 ++ src/t_stream.c | 7 ++-- 3 files changed, 88 insertions(+), 9 deletions(-) diff --git a/src/rdb.c b/src/rdb.c index a28601b4f..c9114110e 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -75,6 +75,18 @@ static int rdbWriteRaw(rio *rdb, void *p, size_t len) { return len; } +/* This is just a wrapper for the low level function rioRead() that will + * automatically abort if it is not possible to read the specified amount + * of bytes. */ +void rdbLoadRaw(rio *rdb, void *buf, uint64_t len) { + if (rioRead(rdb,buf,len) == 0) { + rdbExitReportCorruptRDB( + "Impossible to read %llu bytes in rdbLoadRaw()", + (unsigned long long) len); + return; /* Not reached. */ + } +} + int rdbSaveType(rio *rdb, unsigned char type) { return rdbWriteRaw(rdb,&type,1); } @@ -90,7 +102,7 @@ int rdbLoadType(rio *rdb) { time_t rdbLoadTime(rio *rdb) { int32_t t32; - if (rioRead(rdb,&t32,4) == 0) return -1; + rdbLoadRaw(rdb,&t32,4); return (time_t)t32; } @@ -101,7 +113,7 @@ int rdbSaveMillisecondTime(rio *rdb, long long t) { long long rdbLoadMillisecondTime(rio *rdb) { int64_t t64; - if (rioRead(rdb,&t64,8) == 0) return -1; + rdbLoadRaw(rdb,&t64,8); return (long long)t64; } @@ -906,7 +918,7 @@ ssize_t rdbSaveObject(rio *rdb, robj *o) { nwritten += n; /* Save the consumers of this group. */ - if ((n = rdbSaveStreamPEL(rdb,cg->pel,1)) == -1) return -1; + if ((n = rdbSaveStreamConsumers(rdb,cg)) == -1) return -1; nwritten += n; } raxStop(&ri); @@ -1599,6 +1611,72 @@ robj *rdbLoadObject(int rdbtype, rio *rdb) { /* Load the last entry ID. */ s->last_id.ms = rdbLoadLen(rdb,NULL); s->last_id.seq = rdbLoadLen(rdb,NULL); + + /* Consumer groups loading */ + size_t cgroups_count = rdbLoadLen(rdb,NULL); + while(cgroups_count--) { + /* Get the consumer group name and ID. We can then create the + * consumer group ASAP and populate its structure as + * we read more data. */ + streamID cg_id; + sds cgname = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL); + cg_id.ms = rdbLoadLen(rdb,NULL); + cg_id.seq = rdbLoadLen(rdb,NULL); + streamCG *cgroup = streamCreateCG(s,cgname,sdslen(cgname),&cg_id); + if (cgroup == NULL) + rdbExitReportCorruptRDB("Duplicated consumer group name %s", + cgname); + sdsfree(cgname); + + /* Load the global PEL for this consumer group, however we'll + * not yet populate the NACK structures with the message + * owner, since consumers for this group and their messages will + * be read as a next step. So for now leave them not resolved + * and later populate it. */ + size_t pel_size = rdbLoadLen(rdb,NULL); + while(pel_size--) { + unsigned char rawid[sizeof(streamID)]; + rdbLoadRaw(rdb,rawid,sizeof(rawid)); + streamNACK *nack = streamCreateNACK(NULL); + nack->delivery_time = rdbLoadMillisecondTime(rdb); + nack->delivery_count = rdbLoadLen(rdb,NULL); + if (!raxInsert(cgroup->pel,rawid,sizeof(rawid),nack,NULL)) + rdbExitReportCorruptRDB("Duplicated gobal PEL entry " + "loading stream consumer group"); + } + + /* Now that we loaded our global PEL, we need to load the + * consumers and their local PELs. */ + size_t consumers_num = rdbLoadLen(rdb,NULL); + while(consumers_num--) { + sds cname = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL); + streamConsumer *consumer = streamLookupConsumer(cgroup,cname, + 1); + sdsfree(cname); + consumer->seen_time = rdbLoadMillisecondTime(rdb); + + /* Load the PEL about entries owned by this specific + * consumer. */ + pel_size = rdbLoadLen(rdb,NULL); + while(pel_size--) { + unsigned char rawid[sizeof(streamID)]; + rdbLoadRaw(rdb,rawid,sizeof(rawid)); + streamNACK *nack = raxFind(cgroup->pel,rawid,sizeof(rawid)); + if (nack == NULL) + rdbExitReportCorruptRDB("Consumer entry not found in " + "group global PEL"); + + /* Set the NACK consumer, that was left to NULL when + * loading the global PEL. Then set the same shared + * NACK structure also in the consumer-specific PEL. */ + nack->consumer = consumer; + if (raxInsert(consumer->pel,rawid,sizeof(rawid),nack,NULL)) + rdbExitReportCorruptRDB("Duplicated consumer PEL entry " + " loading a stream consumer " + "group"); + } + } + } } else if (rdbtype == RDB_TYPE_MODULE || rdbtype == RDB_TYPE_MODULE_2) { uint64_t moduleid = rdbLoadLen(rdb,NULL); moduleType *mt = moduleTypeLookupModuleByID(moduleid); @@ -1728,7 +1806,7 @@ int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi) { /* EXPIRETIME: load an expire associated with the next key * to load. Note that after loading an expire we need to * load the actual type, and continue. */ - if ((expiretime = rdbLoadTime(rdb)) == -1) goto eoferr; + expiretime = rdbLoadTime(rdb); /* We read the time so we need to read the object type again. */ if ((type = rdbLoadType(rdb)) == -1) goto eoferr; /* the EXPIRETIME opcode specifies time in seconds, so convert @@ -1737,7 +1815,7 @@ int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi) { } else if (type == RDB_OPCODE_EXPIRETIME_MS) { /* EXPIRETIME_MS: milliseconds precision expire times introduced * with RDB v3. Like EXPIRETIME but no with more precision. */ - if ((expiretime = rdbLoadMillisecondTime(rdb)) == -1) goto eoferr; + expiretime = rdbLoadMillisecondTime(rdb); /* We read the time so we need to read the object type again. */ if ((type = rdbLoadType(rdb)) == -1) goto eoferr; } else if (type == RDB_OPCODE_EOF) { diff --git a/src/stream.h b/src/stream.h index bd999d77c..a759056ea 100644 --- a/src/stream.h +++ b/src/stream.h @@ -96,5 +96,7 @@ void streamIteratorGetField(streamIterator *si, unsigned char **fieldptr, unsign void streamIteratorStop(streamIterator *si); streamCG *streamLookupCG(stream *s, sds groupname); streamConsumer *streamLookupConsumer(streamCG *cg, sds name, int create); +streamCG *streamCreateCG(stream *s, char *name, size_t namelen, streamID *id); +streamNACK *streamCreateNACK(streamConsumer *consumer); #endif diff --git a/src/t_stream.c b/src/t_stream.c index 95691157a..c071c3dc1 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -41,7 +41,6 @@ #define STREAM_ITEM_FLAG_SAMEFIELDS (1<<1) /* Same fields as master entry. */ void streamFreeCG(streamCG *cg); -streamNACK *streamCreateNACK(streamConsumer *consumer); size_t streamReplyWithRangeFromConsumerPEL(client *c, stream *s, streamID *start, streamID *end, size_t count, streamCG *group, streamConsumer *consumer); /* ----------------------------------------------------------------------- @@ -1304,7 +1303,7 @@ void streamFreeConsumer(streamConsumer *sc) { * specified name and last server ID. If a consumer group with the same name * already existed NULL is returned, otherwise the pointer to the consumer * group is returned. */ -streamCG *streamCreateCG(stream *s, char *name, size_t namelen, streamID id) { +streamCG *streamCreateCG(stream *s, char *name, size_t namelen, streamID *id) { if (s->cgroups == NULL) s->cgroups = raxNew(); if (raxFind(s->cgroups,(unsigned char*)name,namelen) != raxNotFound) return NULL; @@ -1312,7 +1311,7 @@ streamCG *streamCreateCG(stream *s, char *name, size_t namelen, streamID id) { streamCG *cg = zmalloc(sizeof(*cg)); cg->pel = raxNew(); cg->consumers = raxNew(); - cg->last_id = id; + cg->last_id = *id; raxInsert(s->cgroups,(unsigned char*)name,namelen,cg,NULL); return cg; } @@ -1388,7 +1387,7 @@ NULL } else if (streamParseIDOrReply(c,c->argv[4],&id,0) != C_OK) { return; } - streamCG *cg = streamCreateCG(s,grpname,sdslen(grpname),id); + streamCG *cg = streamCreateCG(s,grpname,sdslen(grpname),&id); if (cg) { addReply(c,shared.ok); } else {