diff --git a/src/module.c b/src/module.c index c81330b63..cbc6ff627 100644 --- a/src/module.c +++ b/src/module.c @@ -105,7 +105,7 @@ struct RedisModuleCtx { int flags; /* REDISMODULE_CTX_... flags. */ void **postponed_arrays; /* To set with RM_ReplySetArrayLength(). */ int postponed_arrays_count; /* Number of entries in postponed_arrays. */ - void *blocked_privdata; /* Privdata set when unblocking a clinet. */ + void *blocked_privdata; /* Privdata set when unblocking a client. */ /* Used if there is the REDISMODULE_CTX_KEYS_POS_REQUEST flag set. */ int *keys_pos; @@ -203,6 +203,10 @@ typedef struct RedisModuleBlockedClient { static pthread_mutex_t moduleUnblockedClientsMutex = PTHREAD_MUTEX_INITIALIZER; static list *moduleUnblockedClients; +/* We need a mutex that is unlocked / relocked in beforeSleep() in order to + * allow thread safe contexts to execute commands at a safe moment. */ +static pthread_mutex_t moduleGIL = PTHREAD_MUTEX_INITIALIZER; + /* -------------------------------------------------------------------------- * Prototypes * -------------------------------------------------------------------------- */ @@ -3278,6 +3282,24 @@ void *RM_GetBlockedClientPrivateData(RedisModuleCtx *ctx) { return ctx->blocked_privdata; } +/* -------------------------------------------------------------------------- + * Thread Safe Contexts + * -------------------------------------------------------------------------- */ + +/* Operations executed in thread safe contexts use a global lock in order to + * be ran at a safe time. This function unlocks and re-acquire the locks: + * hopefully with *any* sane implementation of pthreads, this will allow the + * modules to make progresses. + * + * This function is called in beforeSleep(). */ +void moduleCooperativeMultiTaskingCycle(void) { + if (dictSize(modules) == 0) return; /* No modules, no async ops. */ + pthread_mutex_unlock(&moduleGIL); + /* Here hopefully thread modules waiting to be executed at a safe time + * should be able to acquire the lock. */ + pthread_mutex_lock(&moduleGIL); +} + /* -------------------------------------------------------------------------- * Modules API internals * -------------------------------------------------------------------------- */ @@ -3329,6 +3351,10 @@ void moduleInitModulesSystem(void) { * and we do not want to block not in the read nor in the write half. */ anetNonBlock(NULL,server.module_blocked_pipe[0]); anetNonBlock(NULL,server.module_blocked_pipe[1]); + + /* Our thread-safe contexts GIL must start with already locked: + * it is just unlocked when it's safe. */ + pthread_mutex_lock(&moduleGIL); } /* Load all the modules in the server.loadmodule_queue list, which is diff --git a/src/server.c b/src/server.c index 72914c53b..e9013bf60 100644 --- a/src/server.c +++ b/src/server.c @@ -1172,6 +1172,9 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { void beforeSleep(struct aeEventLoop *eventLoop) { UNUSED(eventLoop); + /* Give some run time to modules threads using thread safe contexts. */ + moduleCooperativeMultiTaskingCycle(); + /* Call the Redis Cluster before sleep function. Note that this function * may change the state of Redis Cluster (from ok to fail or vice versa), * so it's a good idea to call it before serving the unblocked clients diff --git a/src/server.h b/src/server.h index 8cc172149..956370296 100644 --- a/src/server.h +++ b/src/server.h @@ -1294,6 +1294,7 @@ void unblockClientFromModule(client *c); void moduleHandleBlockedClients(void); void moduleBlockedClientTimedOut(client *c); void moduleBlockedClientPipeReadable(aeEventLoop *el, int fd, void *privdata, int mask); +void moduleCooperativeMultiTaskingCycle(void); /* Utils */ long long ustime(void);