From 6790d848c5cf76ad5833d155fc4debc50b9ef2c4 Mon Sep 17 00:00:00 2001 From: Ozan Tezcan Date: Tue, 11 Jan 2022 20:00:56 +0300 Subject: [PATCH] Reuse temporary client objects for blocked clients by module (#9940) Added a pool for temporary client objects to reuse in module operations. By reusing temporary clients, we are avoiding expensive createClient()/freeClient() calls and improving performance of RM_BlockClient() and RM_GetThreadSafeContext() calls. This commit contains two optimizations: 1 - RM_BlockClient() and RM_GetThreadSafeContext() calls create temporary clients and they are freed in RM_UnblockClient() and RM_FreeThreadSafeContext() calls respectively. Creating/destroying client object takes quite time. To avoid that, added a pool of temporary clients. Pool expands when more clients are needed. Also, added a cron function to shrink the pool and free unused clients after some time. Pool starts with zero clients in it. It does not have max size and can grow unbounded as we need it. We will keep minimum of 8 temporary clients in the pool once created. Keeping small amount of clients to avoid client allocation costs if temporary clients are required after some idle period. 2 - After unblocking a client (RM_UnblockClient()), one byte is written to pipe to wake up Redis main thread. If there are many clients that will be unblocked, each operation requires one write() call which is quite expensive. Changed code to avoid subsequent calls if possible. There are a few more places that need temporary client objects (e.g RM_Call()). These are now using the same temporary client pool to make things more centralized. --- src/module.c | 221 +++++++++++++++++++++++++++++------------------ src/networking.c | 74 +++++++++------- src/server.c | 4 + src/server.h | 3 +- 4 files changed, 186 insertions(+), 116 deletions(-) diff --git a/src/module.c b/src/module.c index 2db57c275..36c4202c5 100644 --- a/src/module.c +++ b/src/module.c @@ -168,6 +168,10 @@ typedef struct RedisModuleCtx RedisModuleCtx; #define REDISMODULE_CTX_BLOCKED_TIMEOUT (1<<3) #define REDISMODULE_CTX_THREAD_SAFE (1<<4) #define REDISMODULE_CTX_BLOCKED_DISCONNECTED (1<<5) +#define REDISMODULE_CTX_TEMP_CLIENT (1<<6) /* Return client object to the pool + when the context is destroyed */ +#define REDISMODULE_CTX_NEW_CLIENT (1<<7) /* Free client object when the + context is destroyed */ /* This represents a Redis key opened with RM_OpenKey(). */ struct RedisModuleKey { @@ -248,6 +252,8 @@ typedef struct RedisModuleBlockedClient { void *privdata; /* Module private data that may be used by the reply or timeout callback. It is set via the RedisModule_UnblockClient() API. */ + client *thread_safe_ctx_client; /* Fake client to be used for thread safe + context so that no lock is required. */ client *reply_client; /* Fake client used to accumulate replies in thread safe contexts. */ int dbid; /* Database number selected by the original client. */ @@ -261,6 +267,16 @@ typedef struct RedisModuleBlockedClient { static pthread_mutex_t moduleUnblockedClientsMutex = PTHREAD_MUTEX_INITIALIZER; static list *moduleUnblockedClients; +/* Pool for temporary client objects. Creating and destroying a client object is + * costly. We manage a pool of clients to avoid this cost. Pool expands when + * more clients are needed and shrinks when unused. Please see modulesCron() + * for more details. */ +static client **moduleTempClients; +static size_t moduleTempClientCap = 0; +static size_t moduleTempClientCount = 0; /* Client count in pool */ +static size_t moduleTempClientMinCount = 0; /* Min client count in pool since + the last cron. */ + /* We need a mutex that is unlocked / relocked in beforeSleep() in order to * allow thread safe contexts to execute commands at a safe moment. */ static pthread_mutex_t moduleGIL = PTHREAD_MUTEX_INITIALIZER; @@ -286,12 +302,6 @@ typedef struct RedisModuleKeyspaceSubscriber { /* The module keyspace notification subscribers list */ static list *moduleKeyspaceSubscribers; -/* Static client recycled for when we need to provide a context with a client - * in a situation where there is no client to provide. This avoids allocating - * a new client per round. For instance this is used in the keyspace - * notifications, timers and cluster messages callbacks. */ -static client *moduleFreeContextReusedClient; - /* Data structures related to the exported dictionary data structure. */ typedef struct RedisModuleDict { rax *rax; /* The radix tree. */ @@ -359,8 +369,6 @@ typedef struct RedisModuleEventListener { } RedisModuleEventListener; list *RedisModule_EventListeners; /* Global list of all the active events. */ -unsigned long long ModulesInHooks = 0; /* Total number of modules in hooks - callbacks right now. */ /* Data structures related to the redis module users */ @@ -497,6 +505,36 @@ void *RM_PoolAlloc(RedisModuleCtx *ctx, size_t bytes) { * Helpers for modules API implementation * -------------------------------------------------------------------------- */ +client *moduleAllocTempClient() { + client *c = NULL; + + if (moduleTempClientCount > 0) { + c = moduleTempClients[--moduleTempClientCount]; + if (moduleTempClientCount < moduleTempClientMinCount) + moduleTempClientMinCount = moduleTempClientCount; + } else { + c = createClient(NULL); + c->flags |= CLIENT_MODULE; + c->user = NULL; /* Root user */ + } + return c; +} + +void moduleReleaseTempClient(client *c) { + if (moduleTempClientCount == moduleTempClientCap) { + moduleTempClientCap = moduleTempClientCap ? moduleTempClientCap*2 : 32; + moduleTempClients = zrealloc(moduleTempClients, sizeof(c)*moduleTempClientCap); + } + clearClientConnectionState(c); + listEmpty(c->reply); + c->reply_bytes = 0; + resetClient(c); + c->bufpos = 0; + c->flags = CLIENT_MODULE; + c->user = NULL; /* Root user */ + moduleTempClients[moduleTempClientCount++] = c; +} + /* Create an empty key of the specified type. `key` must point to a key object * opened for writing where the `.value` member is set to NULL because the * key was found to be non existing. @@ -627,7 +665,14 @@ void moduleFreeContext(RedisModuleCtx *ctx) { "calls.", ctx->module->name); } - if (ctx->flags & REDISMODULE_CTX_THREAD_SAFE) freeClient(ctx->client); + /* If this context has a temp client, we return it back to the pool. + * If this context created a new client (e.g detached context), we free it. + * If the client is assigned manually, e.g ctx->client = someClientInstance, + * none of these flags will be set and we do not attempt to free it. */ + if (ctx->flags & REDISMODULE_CTX_TEMP_CLIENT) + moduleReleaseTempClient(ctx->client); + else if (ctx->flags & REDISMODULE_CTX_NEW_CLIENT) + freeClient(ctx->client); } /* Create a module ctx and keep track of the nesting level. @@ -641,6 +686,11 @@ void moduleCreateContext(RedisModuleCtx *out_ctx, RedisModule *module, int ctx_f out_ctx->getapifuncptr = (void*)(unsigned long)&RM_GetApi; out_ctx->module = module; out_ctx->flags = ctx_flags; + if (ctx_flags & REDISMODULE_CTX_TEMP_CLIENT) + out_ctx->client = moduleAllocTempClient(); + else if (ctx_flags & REDISMODULE_CTX_NEW_CLIENT) + out_ctx->client = createClient(NULL); + if (!(ctx_flags & REDISMODULE_CTX_THREAD_SAFE)) { server.module_ctx_nesting++; } @@ -4856,21 +4906,7 @@ RedisModuleCallReply *RM_Call(RedisModuleCtx *ctx, const char *cmdname, const ch replicate = flags & REDISMODULE_ARGV_REPLICATE; va_end(ap); - /* Setup our fake client for command execution. */ - if (server.module_client == NULL) { - /* This is the first RM_Call() ever. Create reusable client. */ - c = server.module_client = createClient(NULL); - } else if (server.module_client->argv == NULL) { - /* The reusable module client is not busy with a command. Use it. */ - c = server.module_client; - } else { - /* The reusable module client is busy. (It is probably used in a - * recursive call to this module.) */ - c = createClient(NULL); - } - - c->user = NULL; /* Root user. */ - c->flags = CLIENT_MODULE; + c = moduleAllocTempClient(); /* We do not want to allow block, the module do not expect it */ c->flags |= CLIENT_DENY_BLOCKING; @@ -4994,18 +5030,7 @@ RedisModuleCallReply *RM_Call(RedisModuleCtx *ctx, const char *cmdname, const ch cleanup: if (ctx->module) ctx->module->in_call--; - if (c == server.module_client) { - /* reset shared client so it can be reused */ - discardTransaction(c); - pubsubUnsubscribeAllChannels(c,0); - pubsubUnsubscribeAllPatterns(c,0); - resetClient(c); /* frees the contents of argv */ - zfree(c->argv); - c->argv = NULL; - c->resp = 2; - } else { - freeClient(c); /* temporary client */ - } + moduleReleaseTempClient(c); return reply; } @@ -6164,10 +6189,10 @@ RedisModuleBlockedClient *moduleBlockClient(RedisModuleCtx *ctx, RedisModuleCmdF bc->disconnect_callback = NULL; /* Set by RM_SetDisconnectCallback() */ bc->free_privdata = free_privdata; bc->privdata = privdata; - bc->reply_client = createClient(NULL); + bc->reply_client = moduleAllocTempClient(); + bc->thread_safe_ctx_client = moduleAllocTempClient(); if (bc->client) bc->reply_client->resp = bc->client->resp; - bc->reply_client->flags |= CLIENT_MODULE; bc->dbid = c->db->id; bc->blocked_on_keys = keys != NULL; bc->unblocked = 0; @@ -6337,10 +6362,12 @@ int moduleUnblockClientByHandle(RedisModuleBlockedClient *bc, void *privdata) { pthread_mutex_lock(&moduleUnblockedClientsMutex); if (!bc->blocked_on_keys) bc->privdata = privdata; bc->unblocked = 1; - listAddNodeTail(moduleUnblockedClients,bc); - if (write(server.module_blocked_pipe[1],"A",1) != 1) { - /* Ignore the error, this is best-effort. */ + if (listLength(moduleUnblockedClients) == 0) { + if (write(server.module_blocked_pipe[1],"A",1) != 1) { + /* Ignore the error, this is best-effort. */ + } } + listAddNodeTail(moduleUnblockedClients,bc); pthread_mutex_unlock(&moduleUnblockedClientsMutex); return REDISMODULE_OK; } @@ -6433,8 +6460,10 @@ void moduleHandleBlockedClients(void) { pthread_mutex_lock(&moduleUnblockedClientsMutex); /* Here we unblock all the pending clients blocked in modules operations * so we can read every pending "awake byte" in the pipe. */ - char buf[1]; - while (read(server.module_blocked_pipe[0],buf,1) == 1); + if (listLength(moduleUnblockedClients) > 0) { + char buf[1]; + while (read(server.module_blocked_pipe[0],buf,1) == 1); + } while (listLength(moduleUnblockedClients)) { ln = listFirst(moduleUnblockedClients); bc = ln->value; @@ -6488,7 +6517,8 @@ void moduleHandleBlockedClients(void) { * We need to glue such replies to the client output buffer and * free the temporary client we just used for the replies. */ if (c) AddReplyFromClient(c, bc->reply_client); - freeClient(bc->reply_client); + moduleReleaseTempClient(bc->reply_client); + moduleReleaseTempClient(bc->thread_safe_ctx_client); if (c != NULL) { /* Before unblocking the client, set the disconnect callback @@ -6623,15 +6653,27 @@ int RM_BlockedClientDisconnected(RedisModuleCtx *ctx) { RedisModuleCtx *RM_GetThreadSafeContext(RedisModuleBlockedClient *bc) { RedisModuleCtx *ctx = zmalloc(sizeof(*ctx)); RedisModule *module = bc ? bc->module : NULL; - moduleCreateContext(ctx, module, REDISMODULE_CTX_THREAD_SAFE); + int flags = REDISMODULE_CTX_THREAD_SAFE; + /* Creating a new client object is costly. To avoid that, we have an + * internal pool of client objects. In blockClient(), a client object is + * assigned to bc->thread_safe_ctx_client to be used for the thread safe + * context. + * For detached thread safe contexts, we create a new client object. + * Otherwise, as this function can be called from different threads, we + * would need to synchronize access to internal pool of client objects. + * Assuming creating detached context is rare and not that performance + * critical, we avoid synchronizing access to the client pool by creating + * a new client */ + if (!bc) flags |= REDISMODULE_CTX_NEW_CLIENT; + moduleCreateContext(ctx, module, flags); /* Even when the context is associated with a blocked client, we can't - * access it safely from another thread, so we create a fake client here + * access it safely from another thread, so we use a fake client here * in order to keep things like the currently selected database and similar * things. */ - ctx->client = createClient(NULL); if (bc) { ctx->blocked_client = bc; + ctx->client = bc->thread_safe_ctx_client; selectDb(ctx->client,bc->dbid); if (bc->client) { ctx->client->id = bc->client->id; @@ -6648,8 +6690,10 @@ RedisModuleCtx *RM_GetThreadSafeContext(RedisModuleBlockedClient *bc) { * a long term, for purposes such as logging. */ RedisModuleCtx *RM_GetDetachedThreadSafeContext(RedisModuleCtx *ctx) { RedisModuleCtx *new_ctx = zmalloc(sizeof(*new_ctx)); - moduleCreateContext(new_ctx, ctx->module, REDISMODULE_CTX_THREAD_SAFE); - new_ctx->client = createClient(NULL); + /* We create a new client object for the detached context. + * See RM_GetThreadSafeContext() for more information */ + moduleCreateContext(new_ctx, ctx->module, + REDISMODULE_CTX_THREAD_SAFE|REDISMODULE_CTX_NEW_CLIENT); return new_ctx; } @@ -6828,12 +6872,11 @@ void moduleNotifyKeyspaceEvent(int type, const char *event, robj *key, int dbid) while((ln = listNext(&li))) { RedisModuleKeyspaceSubscriber *sub = ln->value; - /* Only notify subscribers on events matching they registration, + /* Only notify subscribers on events matching the registration, * and avoid subscribers triggering themselves */ if ((sub->event_mask & type) && sub->active == 0) { RedisModuleCtx ctx; - moduleCreateContext(&ctx, sub->module, REDISMODULE_CTX_NONE); - ctx.client = moduleFreeContextReusedClient; + moduleCreateContext(&ctx, sub->module, REDISMODULE_CTX_TEMP_CLIENT); selectDb(ctx.client, dbid); /* mark the handler as active to avoid reentrant loops. @@ -6895,8 +6938,7 @@ void moduleCallClusterReceivers(const char *sender_id, uint64_t module_id, uint8 while(r) { if (r->module_id == module_id) { RedisModuleCtx ctx; - moduleCreateContext(&ctx, r->module, REDISMODULE_CTX_NONE); - ctx.client = moduleFreeContextReusedClient; + moduleCreateContext(&ctx, r->module, REDISMODULE_CTX_TEMP_CLIENT); selectDb(ctx.client, 0); r->callback(&ctx,sender_id,type,payload,len); moduleFreeContext(&ctx); @@ -7167,8 +7209,7 @@ int moduleTimerHandler(struct aeEventLoop *eventLoop, long long id, void *client if (now >= expiretime) { RedisModuleTimer *timer = ri.data; RedisModuleCtx ctx; - moduleCreateContext(&ctx, timer->module, REDISMODULE_CTX_NONE); - ctx.client = moduleFreeContextReusedClient; + moduleCreateContext(&ctx,timer->module,REDISMODULE_CTX_TEMP_CLIENT); selectDb(ctx.client, timer->dbid); timer->callback(&ctx,timer->data); moduleFreeContext(&ctx); @@ -9287,7 +9328,6 @@ void moduleFireServerEvent(uint64_t eid, int subid, void *data) { * cheap if there are no registered modules. */ if (listLength(RedisModule_EventListeners) == 0) return; - int real_client_used = 0; listIter li; listNode *ln; listRewind(RedisModule_EventListeners,&li); @@ -9295,22 +9335,16 @@ void moduleFireServerEvent(uint64_t eid, int subid, void *data) { RedisModuleEventListener *el = ln->value; if (el->event.id == eid) { RedisModuleCtx ctx; - moduleCreateContext(&ctx, el->module, REDISMODULE_CTX_NONE); - if (eid == REDISMODULE_EVENT_CLIENT_CHANGE) { /* In the case of client changes, we're pushing the real client * so the event handler can mutate it if needed. For example, * to change its authentication state in a way that does not * depend on specific commands executed later. */ + moduleCreateContext(&ctx,el->module,REDISMODULE_CTX_NONE); ctx.client = (client *) data; - real_client_used = 1; - } else if (ModulesInHooks == 0) { - ctx.client = moduleFreeContextReusedClient; } else { - ctx.client = createClient(NULL); - ctx.client->flags |= CLIENT_MODULE; - ctx.client->user = NULL; /* Root user. */ + moduleCreateContext(&ctx,el->module,REDISMODULE_CTX_TEMP_CLIENT); } void *moduledata = NULL; @@ -9337,8 +9371,10 @@ void moduleFireServerEvent(uint64_t eid, int subid, void *data) { selectDb(ctx.client, fi->dbnum); } else if (eid == REDISMODULE_EVENT_MODULE_CHANGE) { RedisModule *m = data; - if (m == el->module) + if (m == el->module) { + moduleFreeContext(&ctx); continue; + } mcv1.version = REDISMODULE_MODULE_CHANGE_VERSION; mcv1.module_name = m->name; mcv1.module_version = m->ver; @@ -9351,13 +9387,10 @@ void moduleFireServerEvent(uint64_t eid, int subid, void *data) { moduledata = data; } - ModulesInHooks++; el->module->in_hook++; el->callback(&ctx,el->event,subid,moduledata); el->module->in_hook--; - ModulesInHooks--; - if (ModulesInHooks != 0 && !real_client_used) freeClient(ctx.client); moduleFreeContext(&ctx); } } @@ -9488,14 +9521,11 @@ int moduleRegisterApi(const char *funcname, void *funcptr) { /* Global initialization at Redis startup. */ void moduleRegisterCoreAPI(void); -/* Some steps in module initialization need to be done last after server - * initialization. - * For example, selectDb() in createClient() requires that server.db has +/* Currently, this function is just a placeholder for the module system + * initialization steps that need to be run after server initialization. + * A previous issue, selectDb() in createClient() requires that server.db has * been initialized, see #7323. */ void moduleInitModulesSystemLast(void) { - moduleFreeContextReusedClient = createClient(NULL); - moduleFreeContextReusedClient->flags |= CLIENT_MODULE; - moduleFreeContextReusedClient->user = NULL; /* root user. */ } void moduleInitModulesSystem(void) { @@ -9509,9 +9539,6 @@ void moduleInitModulesSystem(void) { /* Set up filter list */ moduleCommandFilters = listCreate(); - /* Reusable client for RM_Call() is created on first use */ - server.module_client = NULL; - moduleRegisterCoreAPI(); /* Create a pipe for module threads to be able to wake up the redis main thread. @@ -9540,6 +9567,33 @@ void moduleInitModulesSystem(void) { pthread_mutex_lock(&moduleGIL); } +void modulesCron(void) { + /* Check number of temporary clients in the pool and free the unused ones + * since the last cron. moduleTempClientMinCount tracks minimum count of + * clients in the pool since the last cron. This is the number of clients + * that we didn't use for the last cron period. */ + + /* Limit the max client count to be freed at once to avoid latency spikes.*/ + int iteration = 50; + /* We are freeing clients if we have more than 8 unused clients. Keeping + * small amount of clients to avoid client allocation costs if temporary + * clients are required after some idle period. */ + const unsigned int min_client = 8; + while (iteration > 0 && moduleTempClientCount > 0 && moduleTempClientMinCount > min_client) { + client *c = moduleTempClients[--moduleTempClientCount]; + freeClient(c); + iteration--; + moduleTempClientMinCount--; + } + moduleTempClientMinCount = moduleTempClientCount; + + /* Shrink moduleTempClients array itself if it is wasting some space */ + if (moduleTempClientCap > 32 && moduleTempClientCap > moduleTempClientCount * 4) { + moduleTempClientCap /= 4; + moduleTempClients = zrealloc(moduleTempClients,sizeof(client*)*moduleTempClientCap); + } +} + void moduleLoadQueueEntryFree(struct moduleLoadQueueEntry *loadmod) { if (!loadmod) return; sdsfree(loadmod->path); @@ -9632,10 +9686,6 @@ void moduleUnregisterCommands(struct RedisModule *module) { int moduleLoad(const char *path, void **module_argv, int module_argc) { int (*onload)(void *, void **, int); void *handle; - RedisModuleCtx ctx; - moduleCreateContext(&ctx, NULL, REDISMODULE_CTX_NONE); /* We pass NULL since we don't have a module yet. */ - ctx.client = moduleFreeContextReusedClient; - selectDb(ctx.client, 0); struct stat st; if (stat(path, &st) == 0) @@ -9659,6 +9709,9 @@ int moduleLoad(const char *path, void **module_argv, int module_argc) { "symbol. Module not loaded.",path); return C_ERR; } + RedisModuleCtx ctx; + moduleCreateContext(&ctx, NULL, REDISMODULE_CTX_TEMP_CLIENT); /* We pass NULL since we don't have a module yet. */ + selectDb(ctx.client, 0); if (onload((void*)&ctx,module_argv,module_argc) == REDISMODULE_ERR) { if (ctx.module) { moduleUnregisterCommands(ctx.module); @@ -9666,6 +9719,7 @@ int moduleLoad(const char *path, void **module_argv, int module_argc) { moduleUnregisterUsedAPI(ctx.module); moduleFreeModuleStructure(ctx.module); } + moduleFreeContext(&ctx); dlclose(handle); serverLog(LL_WARNING, "Module %s initialization failed. Module not loaded",path); @@ -9726,8 +9780,7 @@ int moduleUnload(sds name) { onunload = (int (*)(void *))(unsigned long) dlsym(module->handle, "RedisModule_OnUnload"); if (onunload) { RedisModuleCtx ctx; - moduleCreateContext(&ctx, module, REDISMODULE_CTX_NONE); - ctx.client = moduleFreeContextReusedClient; + moduleCreateContext(&ctx, module, REDISMODULE_CTX_TEMP_CLIENT); int unload_status = onunload((void*)&ctx); moduleFreeContext(&ctx); diff --git a/src/networking.c b/src/networking.c index c545e2c3e..22d9c6812 100644 --- a/src/networking.c +++ b/src/networking.c @@ -1374,6 +1374,45 @@ void unlinkClient(client *c) { if (c->flags & CLIENT_TRACKING) disableTracking(c); } +/* Clear the client state to resemble a newly connected client. */ +void clearClientConnectionState(client *c) { + listNode *ln; + + /* MONITOR clients are also marked with CLIENT_SLAVE, we need to + * distinguish between the two. + */ + if (c->flags & CLIENT_MONITOR) { + ln = listSearchKey(server.monitors,c); + serverAssert(ln != NULL); + listDelNode(server.monitors,ln); + + c->flags &= ~(CLIENT_MONITOR|CLIENT_SLAVE); + } + + serverAssert(!(c->flags &(CLIENT_SLAVE|CLIENT_MASTER))); + + if (c->flags & CLIENT_TRACKING) disableTracking(c); + selectDb(c,0); + c->resp = 2; + + clientSetDefaultAuth(c); + moduleNotifyUserChanged(c); + discardTransaction(c); + + pubsubUnsubscribeAllChannels(c,0); + pubsubUnsubscribeShardAllChannels(c, 0); + pubsubUnsubscribeAllPatterns(c,0); + + if (c->name) { + decrRefCount(c->name); + c->name = NULL; + } + + /* Selectively clear state flags not covered above */ + c->flags &= ~(CLIENT_ASKING|CLIENT_READONLY|CLIENT_PUBSUB| + CLIENT_REPLY_OFF|CLIENT_REPLY_SKIP_NEXT); +} + void freeClient(client *c) { listNode *ln; @@ -2598,45 +2637,18 @@ int clientSetNameOrReply(client *c, robj *name) { /* Reset the client state to resemble a newly connected client. */ void resetCommand(client *c) { - listNode *ln; - /* MONITOR clients are also marked with CLIENT_SLAVE, we need to * distinguish between the two. */ - if (c->flags & CLIENT_MONITOR) { - ln = listSearchKey(server.monitors,c); - serverAssert(ln != NULL); - listDelNode(server.monitors,ln); + uint64_t flags = c->flags; + if (flags & CLIENT_MONITOR) flags &= ~(CLIENT_MONITOR|CLIENT_SLAVE); - c->flags &= ~(CLIENT_MONITOR|CLIENT_SLAVE); - } - - if (c->flags & (CLIENT_SLAVE|CLIENT_MASTER|CLIENT_MODULE)) { + if (flags & (CLIENT_SLAVE|CLIENT_MASTER|CLIENT_MODULE)) { addReplyError(c,"can only reset normal client connections"); return; } - if (c->flags & CLIENT_TRACKING) disableTracking(c); - selectDb(c,0); - c->resp = 2; - - clientSetDefaultAuth(c); - moduleNotifyUserChanged(c); - discardTransaction(c); - - pubsubUnsubscribeAllChannels(c,0); - pubsubUnsubscribeShardAllChannels(c, 0); - pubsubUnsubscribeAllPatterns(c,0); - - if (c->name) { - decrRefCount(c->name); - c->name = NULL; - } - - /* Selectively clear state flags not covered above */ - c->flags &= ~(CLIENT_ASKING|CLIENT_READONLY|CLIENT_PUBSUB| - CLIENT_REPLY_OFF|CLIENT_REPLY_SKIP_NEXT); - + clearClientConnectionState(c); addReplyStatus(c,"RESET"); } diff --git a/src/server.c b/src/server.c index fb08d5a99..07b75b92e 100644 --- a/src/server.c +++ b/src/server.c @@ -1320,6 +1320,10 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { server.rdb_bgsave_scheduled = 0; } + run_with_period(100) { + if (moduleCount()) modulesCron(); + } + /* Fire the cron loop modules event. */ RedisModuleCronLoopV1 ei = {REDISMODULE_CRON_LOOP_VERSION,server.hz}; moduleFireServerEvent(REDISMODULE_EVENT_CRON_LOOP, diff --git a/src/server.h b/src/server.h index 975ebe373..f03579ff0 100644 --- a/src/server.h +++ b/src/server.h @@ -1453,7 +1453,6 @@ struct redisServer { to be processed. */ pid_t child_pid; /* PID of current child */ int child_type; /* Type of current child */ - client *module_client; /* "Fake" client to call Redis from modules */ /* Networking */ int port; /* TCP listening port */ int tls_port; /* TLS listening port */ @@ -2302,6 +2301,7 @@ void populateCommandLegacyRangeSpec(struct redisCommand *c); /* Modules */ void moduleInitModulesSystem(void); void moduleInitModulesSystemLast(void); +void modulesCron(void); int moduleLoad(const char *path, void **argv, int argc); void moduleLoadFromQueue(void); int moduleGetCommandKeysViaAPI(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result); @@ -2361,6 +2361,7 @@ void freeClient(client *c); void freeClientAsync(client *c); void logInvalidUseAndFreeClientAsync(client *c, const char *fmt, ...); int beforeNextClient(client *c); +void clearClientConnectionState(client *c); void resetClient(client *c); void freeClientOriginalArgv(client *c); void sendReplyToClient(connection *conn);