Avoid diskelss-load if modules did not declare they handle read errors

This commit is contained in:
Oran Agra 2019-07-30 15:11:57 +03:00
parent d7d028a7a7
commit 4339706e07
4 changed files with 45 additions and 21 deletions

View File

@ -51,6 +51,7 @@ struct RedisModule {
list *using; /* List of modules we use some APIs of. */ list *using; /* List of modules we use some APIs of. */
list *filters; /* List of filters the module has registered. */ list *filters; /* List of filters the module has registered. */
int in_call; /* RM_Call() nesting level */ int in_call; /* RM_Call() nesting level */
int options; /* Moduile options and capabilities. */
}; };
typedef struct RedisModule RedisModule; typedef struct RedisModule RedisModule;
@ -771,6 +772,19 @@ long long RM_Milliseconds(void) {
return mstime(); return mstime();
} }
/* Set flags defining capabilities or behavior bit flags.
*
* REDISMODULE_OPTIONS_HANDLE_IO_ERRORS:
* Generally, modules don't need to bother with this, as the process will just
* terminate if a read error happens, however, setting this flag would allow
* repl-diskless-load to work if enabled.
* The module should use RedisModule_IsIOError after reads, before using the
* data that was read, and in case of error, propagate it upwards, and also be
* able to release the partially populated value and all it's allocations. */
void RM_SetModuleOptions(RedisModuleCtx *ctx, int options) {
ctx->module->options = options;
}
/* -------------------------------------------------------------------------- /* --------------------------------------------------------------------------
* Automatic memory management for modules * Automatic memory management for modules
* -------------------------------------------------------------------------- */ * -------------------------------------------------------------------------- */
@ -3143,7 +3157,7 @@ void *RM_ModuleTypeGetValue(RedisModuleKey *key) {
* modules this cannot be recovered, but if the module declared capability * modules this cannot be recovered, but if the module declared capability
* to handle errors, we'll raise a flag rather than exiting. */ * to handle errors, we'll raise a flag rather than exiting. */
void moduleRDBLoadError(RedisModuleIO *io) { void moduleRDBLoadError(RedisModuleIO *io) {
if (io->flags & REDISMODULE_HANDLE_IO_ERRORS) { if (io->ctx->module->options & REDISMODULE_OPTIONS_HANDLE_IO_ERRORS) {
io->error = 1; io->error = 1;
return; return;
} }
@ -3157,19 +3171,29 @@ void moduleRDBLoadError(RedisModuleIO *io) {
exit(1); exit(1);
} }
/* Set flags defining capabilities or behavior */ /* Returns 0 if there's at least one registered data type that did not declare
void RM_SetIOFlags(RedisModuleIO *io, int flags) { * REDISMODULE_OPTIONS_HANDLE_IO_ERRORS, in which case diskless loading should
io->flags = flags; * be avoided since it could cause data loss. */
} int moduleAllDatatypesHandleErrors() {
dictIterator *di = dictGetIterator(modules);
dictEntry *de;
/* Get flags which were set by RedisModule_SetIOFlags */ while ((de = dictNext(di)) != NULL) {
int RM_GetIOFlags(RedisModuleIO *io) { struct RedisModule *module = dictGetVal(de);
return io->flags; if (listLength(module->types) &&
!(module->options & REDISMODULE_OPTIONS_HANDLE_IO_ERRORS))
{
dictReleaseIterator(di);
return 0;
}
}
dictReleaseIterator(di);
return 1;
} }
/* Returns true if any previous IO API failed. /* Returns true if any previous IO API failed.
* for Load* APIs the REDISMODULE_HANDLE_IO_ERRORS flag must be set with * for Load* APIs the REDISMODULE_OPTIONS_HANDLE_IO_ERRORS flag must be set with
* RediModule_SetIOFlags first. */ * RediModule_SetModuleOptions first. */
int RM_IsIOError(RedisModuleIO *io) { int RM_IsIOError(RedisModuleIO *io) {
return io->error; return io->error;
} }
@ -5434,9 +5458,8 @@ void moduleRegisterCoreAPI(void) {
REGISTER_API(ModuleTypeSetValue); REGISTER_API(ModuleTypeSetValue);
REGISTER_API(ModuleTypeGetType); REGISTER_API(ModuleTypeGetType);
REGISTER_API(ModuleTypeGetValue); REGISTER_API(ModuleTypeGetValue);
REGISTER_API(GetIOFlags);
REGISTER_API(SetIOFlags);
REGISTER_API(IsIOError); REGISTER_API(IsIOError);
REGISTER_API(SetModuleOptions);
REGISTER_API(SaveUnsigned); REGISTER_API(SaveUnsigned);
REGISTER_API(LoadUnsigned); REGISTER_API(LoadUnsigned);
REGISTER_API(SaveSigned); REGISTER_API(SaveSigned);

View File

@ -140,8 +140,8 @@ typedef uint64_t RedisModuleTimerID;
/* Do filter RedisModule_Call() commands initiated by module itself. */ /* Do filter RedisModule_Call() commands initiated by module itself. */
#define REDISMODULE_CMDFILTER_NOSELF (1<<0) #define REDISMODULE_CMDFILTER_NOSELF (1<<0)
/* Declare that the module can handle errors with RedisModule_SetIOFlags */ /* Declare that the module can handle errors with RedisModule_SetModuleOptions. */
#define REDISMODULE_HANDLE_IO_ERRORS (1<<0) #define REDISMODULE_OPTIONS_HANDLE_IO_ERRORS (1<<0)
/* ------------------------- End of common defines ------------------------ */ /* ------------------------- End of common defines ------------------------ */
@ -274,9 +274,8 @@ RedisModuleType *REDISMODULE_API_FUNC(RedisModule_CreateDataType)(RedisModuleCtx
int REDISMODULE_API_FUNC(RedisModule_ModuleTypeSetValue)(RedisModuleKey *key, RedisModuleType *mt, void *value); int REDISMODULE_API_FUNC(RedisModule_ModuleTypeSetValue)(RedisModuleKey *key, RedisModuleType *mt, void *value);
RedisModuleType *REDISMODULE_API_FUNC(RedisModule_ModuleTypeGetType)(RedisModuleKey *key); RedisModuleType *REDISMODULE_API_FUNC(RedisModule_ModuleTypeGetType)(RedisModuleKey *key);
void *REDISMODULE_API_FUNC(RedisModule_ModuleTypeGetValue)(RedisModuleKey *key); void *REDISMODULE_API_FUNC(RedisModule_ModuleTypeGetValue)(RedisModuleKey *key);
void REDISMODULE_API_FUNC(RedisModule_SetIOFlags)(RedisModuleIO *io, int flags);
int REDISMODULE_API_FUNC(RedisModule_GetIOFlags)(RedisModuleIO *io);
int REDISMODULE_API_FUNC(RedisModule_IsIOError)(RedisModuleIO *io); int REDISMODULE_API_FUNC(RedisModule_IsIOError)(RedisModuleIO *io);
void REDISMODULE_API_FUNC(RedisModule_SetModuleOptions)(RedisModuleCtx *ctx, int options);
void REDISMODULE_API_FUNC(RedisModule_SaveUnsigned)(RedisModuleIO *io, uint64_t value); void REDISMODULE_API_FUNC(RedisModule_SaveUnsigned)(RedisModuleIO *io, uint64_t value);
uint64_t REDISMODULE_API_FUNC(RedisModule_LoadUnsigned)(RedisModuleIO *io); uint64_t REDISMODULE_API_FUNC(RedisModule_LoadUnsigned)(RedisModuleIO *io);
void REDISMODULE_API_FUNC(RedisModule_SaveSigned)(RedisModuleIO *io, int64_t value); void REDISMODULE_API_FUNC(RedisModule_SaveSigned)(RedisModuleIO *io, int64_t value);
@ -450,9 +449,8 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int
REDISMODULE_GET_API(ModuleTypeSetValue); REDISMODULE_GET_API(ModuleTypeSetValue);
REDISMODULE_GET_API(ModuleTypeGetType); REDISMODULE_GET_API(ModuleTypeGetType);
REDISMODULE_GET_API(ModuleTypeGetValue); REDISMODULE_GET_API(ModuleTypeGetValue);
REDISMODULE_GET_API(SetIOFlags);
REDISMODULE_GET_API(GetIOFlags);
REDISMODULE_GET_API(IsIOError); REDISMODULE_GET_API(IsIOError);
REDISMODULE_GET_API(SetModuleOptions);
REDISMODULE_GET_API(SaveUnsigned); REDISMODULE_GET_API(SaveUnsigned);
REDISMODULE_GET_API(LoadUnsigned); REDISMODULE_GET_API(LoadUnsigned);
REDISMODULE_GET_API(SaveSigned); REDISMODULE_GET_API(SaveSigned);

View File

@ -1115,8 +1115,12 @@ void restartAOFAfterSYNC() {
static int useDisklessLoad() { static int useDisklessLoad() {
/* compute boolean decision to use diskless load */ /* compute boolean decision to use diskless load */
return server.repl_diskless_load == REPL_DISKLESS_LOAD_SWAPDB || int enabled = server.repl_diskless_load == REPL_DISKLESS_LOAD_SWAPDB ||
(server.repl_diskless_load == REPL_DISKLESS_LOAD_WHEN_DB_EMPTY && dbTotalServerKeyCount()==0); (server.repl_diskless_load == REPL_DISKLESS_LOAD_WHEN_DB_EMPTY && dbTotalServerKeyCount()==0);
/* Check all modules handle read errors, otherwise it's not safe to use diskless load. */
if (enabled && !moduleAllDatatypesHandleErrors())
enabled = 0;
return enabled;
} }
/* Helper function for readSyncBulkPayload() to make backups of the current /* Helper function for readSyncBulkPayload() to make backups of the current

View File

@ -599,7 +599,6 @@ typedef struct RedisModuleIO {
* 2 (current version with opcodes annotation). */ * 2 (current version with opcodes annotation). */
struct RedisModuleCtx *ctx; /* Optional context, see RM_GetContextFromIO()*/ struct RedisModuleCtx *ctx; /* Optional context, see RM_GetContextFromIO()*/
struct redisObject *key; /* Optional name of key processed */ struct redisObject *key; /* Optional name of key processed */
int flags; /* flags declaring capabilities or behavior */
} RedisModuleIO; } RedisModuleIO;
/* Macro to initialize an IO context. Note that the 'ver' field is populated /* Macro to initialize an IO context. Note that the 'ver' field is populated
@ -612,7 +611,6 @@ typedef struct RedisModuleIO {
iovar.ver = 0; \ iovar.ver = 0; \
iovar.key = keyptr; \ iovar.key = keyptr; \
iovar.ctx = NULL; \ iovar.ctx = NULL; \
iovar.flags = 0; \
} while(0); } while(0);
/* This is a structure used to export DEBUG DIGEST capabilities to Redis /* This is a structure used to export DEBUG DIGEST capabilities to Redis
@ -1530,6 +1528,7 @@ void moduleAcquireGIL(void);
void moduleReleaseGIL(void); void moduleReleaseGIL(void);
void moduleNotifyKeyspaceEvent(int type, const char *event, robj *key, int dbid); void moduleNotifyKeyspaceEvent(int type, const char *event, robj *key, int dbid);
void moduleCallCommandFilters(client *c); void moduleCallCommandFilters(client *c);
int moduleAllDatatypesHandleErrors();
/* Utils */ /* Utils */
long long ustime(void); long long ustime(void);