RDB: try to make error handling code more readable.

This commit is contained in:
antirez 2019-07-17 17:30:02 +02:00
parent 6b72b04a37
commit 42b6305964
3 changed files with 52 additions and 47 deletions

View File

@ -101,12 +101,13 @@ int rdbLoadType(rio *rdb) {
/* This is only used to load old databases stored with the RDB_OPCODE_EXPIRETIME /* This is only used to load old databases stored with the RDB_OPCODE_EXPIRETIME
* opcode. New versions of Redis store using the RDB_OPCODE_EXPIRETIME_MS * opcode. New versions of Redis store using the RDB_OPCODE_EXPIRETIME_MS
* opcode. */ * opcode. On error -1 is returned, however this could be a valid time, so
int rdbLoadTime(rio *rdb, time_t *t) { * to check for loading errors the caller should call rioGetReadError() after
* calling this function. */
time_t rdbLoadTime(rio *rdb) {
int32_t t32; int32_t t32;
if (rioRead(rdb,&t32,4) == 0) return C_ERR; if (rioRead(rdb,&t32,4) == 0) return -1;
*t = (time_t)t32; return (time_t)t32;
return C_OK;;
} }
int rdbSaveMillisecondTime(rio *rdb, long long t) { int rdbSaveMillisecondTime(rio *rdb, long long t) {
@ -125,14 +126,17 @@ int rdbSaveMillisecondTime(rio *rdb, long long t) {
* after upgrading to Redis version 5 they will no longer be able to load their * after upgrading to Redis version 5 they will no longer be able to load their
* own old RDB files. Because of that, we instead fix the function only for new * own old RDB files. Because of that, we instead fix the function only for new
* RDB versions, and load older RDB versions as we used to do in the past, * RDB versions, and load older RDB versions as we used to do in the past,
* allowing big endian systems to load their own old RDB files. */ * allowing big endian systems to load their own old RDB files.
int rdbLoadMillisecondTime(rio *rdb, long long *t, int rdbver) { *
* On I/O error the function returns LLONG_MAX, however if this is also a
* valid stored value, the caller should use rioGetReadError() to check for
* errors after calling this function. */
long long rdbLoadMillisecondTime(rio *rdb, int rdbver) {
int64_t t64; int64_t t64;
if (rioRead(rdb,&t64,8) == 0) return C_ERR; if (rioRead(rdb,&t64,8) == 0) return LLONG_MAX;
if (rdbver >= 9) /* Check the top comment of this function. */ if (rdbver >= 9) /* Check the top comment of this function. */
memrev64ifbe(&t64); /* Convert in big endian if the system is BE. */ memrev64ifbe(&t64); /* Convert in big endian if the system is BE. */
*t = (long long)t64; return (long long)t64;
return C_OK;
} }
/* Saves an encoded length. The first two bits in the first byte are used to /* Saves an encoded length. The first two bits in the first byte are used to
@ -1692,15 +1696,14 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, robj *key) {
rdbExitReportCorruptRDB("Listpack re-added with existing key"); rdbExitReportCorruptRDB("Listpack re-added with existing key");
} }
/* Load total number of items inside the stream. */ /* Load total number of items inside the stream. */
if (rdbLoadLenByRef(rdb,NULL,&s->length)) { s->length = rdbLoadLen(rdb,NULL);
rdbReportReadError("Stream item count loading failed.");
decrRefCount(o);
return NULL;
}
/* Load the last entry ID. */ /* Load the last entry ID. */
if (rdbLoadLenByRef(rdb,NULL,&s->last_id.ms) || s->last_id.ms = rdbLoadLen(rdb,NULL);
rdbLoadLenByRef(rdb,NULL,&s->last_id.seq)) { s->last_id.seq = rdbLoadLen(rdb,NULL);
rdbReportReadError("Stream last entry ID loading failed.");
if (rioGetReadError(rdb)) {
rdbReportReadError("Stream object metadata loading failed.");
decrRefCount(o); decrRefCount(o);
return NULL; return NULL;
} }
@ -1724,13 +1727,16 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, robj *key) {
decrRefCount(o); decrRefCount(o);
return NULL; return NULL;
} }
if (rdbLoadLenByRef(rdb,NULL,&cg_id.ms) ||
rdbLoadLenByRef(rdb,NULL,&cg_id.seq)) { cg_id.ms = rdbLoadLen(rdb,NULL);
cg_id.seq = rdbLoadLen(rdb,NULL);
if (rioGetReadError(rdb)) {
rdbReportReadError("Stream cgroup ID loading failed."); rdbReportReadError("Stream cgroup ID loading failed.");
sdsfree(cgname); sdsfree(cgname);
decrRefCount(o); decrRefCount(o);
return NULL; return NULL;
} }
streamCG *cgroup = streamCreateCG(s,cgname,sdslen(cgname),&cg_id); streamCG *cgroup = streamCreateCG(s,cgname,sdslen(cgname),&cg_id);
if (cgroup == NULL) if (cgroup == NULL)
rdbExitReportCorruptRDB("Duplicated consumer group name %s", rdbExitReportCorruptRDB("Duplicated consumer group name %s",
@ -1756,14 +1762,10 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, robj *key) {
return NULL; return NULL;
} }
streamNACK *nack = streamCreateNACK(NULL); streamNACK *nack = streamCreateNACK(NULL);
if (rdbLoadMillisecondTime(rdb, &nack->delivery_time,RDB_VERSION) == C_ERR) { nack->delivery_time = rdbLoadMillisecondTime(rdb,RDB_VERSION);
rdbReportReadError("Stream PEL nack loading failed."); nack->delivery_count = rdbLoadLen(rdb,NULL);
decrRefCount(o); if (rioGetReadError(rdb)) {
streamFreeNACK(nack); rdbReportReadError("Stream PEL NACK loading failed.");
return NULL;
}
if ((nack->delivery_count = rdbLoadLen(rdb,NULL)) == RDB_LENERR) {
rdbReportReadError("Stream nack deliveries loading failed.");
decrRefCount(o); decrRefCount(o);
streamFreeNACK(nack); streamFreeNACK(nack);
return NULL; return NULL;
@ -1792,7 +1794,8 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, robj *key) {
streamConsumer *consumer = streamLookupConsumer(cgroup,cname, streamConsumer *consumer = streamLookupConsumer(cgroup,cname,
1); 1);
sdsfree(cname); sdsfree(cname);
if (rdbLoadMillisecondTime(rdb, &consumer->seen_time,RDB_VERSION) == C_ERR) { consumer->seen_time = rdbLoadMillisecondTime(rdb,RDB_VERSION);
if (rioGetReadError(rdb)) {
rdbReportReadError("Stream short read reading seen time."); rdbReportReadError("Stream short read reading seen time.");
decrRefCount(o); decrRefCount(o);
return NULL; return NULL;
@ -1802,14 +1805,16 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, robj *key) {
* consumer. */ * consumer. */
pel_size = rdbLoadLen(rdb,NULL); pel_size = rdbLoadLen(rdb,NULL);
if (pel_size == RDB_LENERR) { if (pel_size == RDB_LENERR) {
rdbReportReadError("Stream consumer PEL num loading failed."); rdbReportReadError(
"Stream consumer PEL num loading failed.");
decrRefCount(o); decrRefCount(o);
return NULL; return NULL;
} }
while(pel_size--) { while(pel_size--) {
unsigned char rawid[sizeof(streamID)]; unsigned char rawid[sizeof(streamID)];
if (rioRead(rdb,rawid,sizeof(rawid)) == 0) { if (rioRead(rdb,rawid,sizeof(rawid)) == 0) {
rdbReportReadError("Stream short read reading PEL streamID."); rdbReportReadError(
"Stream short read reading PEL streamID.");
decrRefCount(o); decrRefCount(o);
return NULL; return NULL;
} }
@ -1830,10 +1835,8 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, robj *key) {
} }
} }
} else if (rdbtype == RDB_TYPE_MODULE || rdbtype == RDB_TYPE_MODULE_2) { } else if (rdbtype == RDB_TYPE_MODULE || rdbtype == RDB_TYPE_MODULE_2) {
uint64_t moduleid; uint64_t moduleid = rdbLoadLen(rdb,NULL);
if (rdbLoadLenByRef(rdb,NULL, &moduleid)) { if (rioGetReadError(rdb)) return NULL;
return NULL;
}
moduleType *mt = moduleTypeLookupModuleByID(moduleid); moduleType *mt = moduleTypeLookupModuleByID(moduleid);
char name[10]; char name[10];
@ -1977,14 +1980,15 @@ int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) {
/* EXPIRETIME: load an expire associated with the next key /* EXPIRETIME: load an expire associated with the next key
* to load. Note that after loading an expire we need to * to load. Note that after loading an expire we need to
* load the actual type, and continue. */ * load the actual type, and continue. */
time_t t; expiretime = rdbLoadTime(rdb);
if (rdbLoadTime(rdb, &t) == C_ERR) goto eoferr; expiretime *= 1000;
expiretime = t * 1000; if (rioGetReadError(rdb)) goto eoferr;
continue; /* Read next opcode. */ continue; /* Read next opcode. */
} else if (type == RDB_OPCODE_EXPIRETIME_MS) { } else if (type == RDB_OPCODE_EXPIRETIME_MS) {
/* EXPIRETIME_MS: milliseconds precision expire times introduced /* EXPIRETIME_MS: milliseconds precision expire times introduced
* with RDB v3. Like EXPIRETIME but no with more precision. */ * with RDB v3. Like EXPIRETIME but no with more precision. */
if (rdbLoadMillisecondTime(rdb, &expiretime, rdbver) == C_ERR) goto eoferr; expiretime = rdbLoadMillisecondTime(rdb,rdbver);
if (rioGetReadError(rdb)) goto eoferr;
continue; /* Read next opcode. */ continue; /* Read next opcode. */
} else if (type == RDB_OPCODE_FREQ) { } else if (type == RDB_OPCODE_FREQ) {
/* FREQ: LFU frequency. */ /* FREQ: LFU frequency. */
@ -2093,8 +2097,8 @@ int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) {
* we have the ability to read a MODULE_AUX opcode followed by an * we have the ability to read a MODULE_AUX opcode followed by an
* identifier of the module, and a serialized value in "MODULE V2" * identifier of the module, and a serialized value in "MODULE V2"
* format. */ * format. */
uint64_t moduleid; uint64_t moduleid = rdbLoadLen(rdb,NULL);
if (rdbLoadLenByRef(rdb,NULL,&moduleid)) goto eoferr; if (rioGetReadError(rdb)) goto eoferr;
moduleType *mt = moduleTypeLookupModuleByID(moduleid); moduleType *mt = moduleTypeLookupModuleByID(moduleid);
char name[10]; char name[10];
moduleTypeNameByID(name,moduleid); moduleTypeNameByID(name,moduleid);

View File

@ -127,10 +127,10 @@
int rdbSaveType(rio *rdb, unsigned char type); int rdbSaveType(rio *rdb, unsigned char type);
int rdbLoadType(rio *rdb); int rdbLoadType(rio *rdb);
int rdbSaveTime(rio *rdb, time_t t); int rdbSaveTime(rio *rdb, time_t t);
int rdbLoadTime(rio *rdb, time_t *t); time_t rdbLoadTime(rio *rdb);
int rdbSaveLen(rio *rdb, uint64_t len); int rdbSaveLen(rio *rdb, uint64_t len);
int rdbSaveMillisecondTime(rio *rdb, long long t); int rdbSaveMillisecondTime(rio *rdb, long long t);
int rdbLoadMillisecondTime(rio *rdb, long long *t, int rdbver); long long rdbLoadMillisecondTime(rio *rdb, int rdbver);
uint64_t rdbLoadLen(rio *rdb, int *isencoded); uint64_t rdbLoadLen(rio *rdb, int *isencoded);
int rdbLoadLenByRef(rio *rdb, int *isencoded, uint64_t *lenptr); int rdbLoadLenByRef(rio *rdb, int *isencoded, uint64_t *lenptr);
int rdbSaveObjectType(rio *rdb, robj *o); int rdbSaveObjectType(rio *rdb, robj *o);

View File

@ -212,19 +212,20 @@ int redis_check_rdb(char *rdbfilename, FILE *fp) {
/* Handle special types. */ /* Handle special types. */
if (type == RDB_OPCODE_EXPIRETIME) { if (type == RDB_OPCODE_EXPIRETIME) {
time_t t;
rdbstate.doing = RDB_CHECK_DOING_READ_EXPIRE; rdbstate.doing = RDB_CHECK_DOING_READ_EXPIRE;
/* EXPIRETIME: load an expire associated with the next key /* EXPIRETIME: load an expire associated with the next key
* to load. Note that after loading an expire we need to * to load. Note that after loading an expire we need to
* load the actual type, and continue. */ * load the actual type, and continue. */
if (rdbLoadTime(&rdb, &t) == C_ERR) goto eoferr; expiretime = rdbLoadTime(&rdb);
expiretime = t * 1000; expiretime *= 1000;
if (rioGetReadError(&rdb)) goto eoferr;
continue; /* Read next opcode. */ continue; /* Read next opcode. */
} else if (type == RDB_OPCODE_EXPIRETIME_MS) { } else if (type == RDB_OPCODE_EXPIRETIME_MS) {
/* EXPIRETIME_MS: milliseconds precision expire times introduced /* EXPIRETIME_MS: milliseconds precision expire times introduced
* with RDB v3. Like EXPIRETIME but no with more precision. */ * with RDB v3. Like EXPIRETIME but no with more precision. */
rdbstate.doing = RDB_CHECK_DOING_READ_EXPIRE; rdbstate.doing = RDB_CHECK_DOING_READ_EXPIRE;
if (rdbLoadMillisecondTime(&rdb, &expiretime, rdbver) == C_ERR) goto eoferr; expiretime = rdbLoadMillisecondTime(&rdb, rdbver);
if (rioGetReadError(&rdb)) goto eoferr;
continue; /* Read next opcode. */ continue; /* Read next opcode. */
} else if (type == RDB_OPCODE_FREQ) { } else if (type == RDB_OPCODE_FREQ) {
/* FREQ: LFU frequency. */ /* FREQ: LFU frequency. */