Reuse temporary client objects for blocked clients by module (#9940)

Added a pool for temporary client objects to reuse in module operations.
By reusing temporary clients, we are avoiding expensive createClient()/freeClient()
calls and improving performance of RM_BlockClient() and  RM_GetThreadSafeContext() calls. 

This commit contains two optimizations: 

1 - RM_BlockClient() and RM_GetThreadSafeContext() calls create temporary clients and they are freed in
RM_UnblockClient() and RM_FreeThreadSafeContext() calls respectively. Creating/destroying client object
takes quite time. To avoid that, added a pool of temporary clients. Pool expands when more clients are needed.
Also, added a cron function to shrink the pool and free unused clients after some time. Pool starts with zero
clients in it. It does not have max size and can grow unbounded as we need it. We will keep minimum of 8
temporary clients in the pool once created. Keeping small amount of clients to avoid client allocation costs
if temporary clients are required after some idle period.

2 - After unblocking a client (RM_UnblockClient()), one byte is written to pipe to wake up Redis main thread.
If there are many clients that will be unblocked, each operation requires one write() call which is quite expensive.
Changed code to avoid subsequent calls if possible. 

There are a few more places that need temporary client objects (e.g RM_Call()). These are now using the same
temporary client pool to make things more centralized.
This commit is contained in:
Ozan Tezcan 2022-01-11 20:00:56 +03:00 committed by GitHub
parent 3204a03574
commit 6790d848c5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 186 additions and 116 deletions

View File

@ -168,6 +168,10 @@ typedef struct RedisModuleCtx RedisModuleCtx;
#define REDISMODULE_CTX_BLOCKED_TIMEOUT (1<<3) #define REDISMODULE_CTX_BLOCKED_TIMEOUT (1<<3)
#define REDISMODULE_CTX_THREAD_SAFE (1<<4) #define REDISMODULE_CTX_THREAD_SAFE (1<<4)
#define REDISMODULE_CTX_BLOCKED_DISCONNECTED (1<<5) #define REDISMODULE_CTX_BLOCKED_DISCONNECTED (1<<5)
#define REDISMODULE_CTX_TEMP_CLIENT (1<<6) /* Return client object to the pool
when the context is destroyed */
#define REDISMODULE_CTX_NEW_CLIENT (1<<7) /* Free client object when the
context is destroyed */
/* This represents a Redis key opened with RM_OpenKey(). */ /* This represents a Redis key opened with RM_OpenKey(). */
struct RedisModuleKey { struct RedisModuleKey {
@ -248,6 +252,8 @@ typedef struct RedisModuleBlockedClient {
void *privdata; /* Module private data that may be used by the reply void *privdata; /* Module private data that may be used by the reply
or timeout callback. It is set via the or timeout callback. It is set via the
RedisModule_UnblockClient() API. */ RedisModule_UnblockClient() API. */
client *thread_safe_ctx_client; /* Fake client to be used for thread safe
context so that no lock is required. */
client *reply_client; /* Fake client used to accumulate replies client *reply_client; /* Fake client used to accumulate replies
in thread safe contexts. */ in thread safe contexts. */
int dbid; /* Database number selected by the original client. */ int dbid; /* Database number selected by the original client. */
@ -261,6 +267,16 @@ typedef struct RedisModuleBlockedClient {
static pthread_mutex_t moduleUnblockedClientsMutex = PTHREAD_MUTEX_INITIALIZER; static pthread_mutex_t moduleUnblockedClientsMutex = PTHREAD_MUTEX_INITIALIZER;
static list *moduleUnblockedClients; static list *moduleUnblockedClients;
/* Pool for temporary client objects. Creating and destroying a client object is
* costly. We manage a pool of clients to avoid this cost. Pool expands when
* more clients are needed and shrinks when unused. Please see modulesCron()
* for more details. */
static client **moduleTempClients;
static size_t moduleTempClientCap = 0;
static size_t moduleTempClientCount = 0; /* Client count in pool */
static size_t moduleTempClientMinCount = 0; /* Min client count in pool since
the last cron. */
/* We need a mutex that is unlocked / relocked in beforeSleep() in order to /* We need a mutex that is unlocked / relocked in beforeSleep() in order to
* allow thread safe contexts to execute commands at a safe moment. */ * allow thread safe contexts to execute commands at a safe moment. */
static pthread_mutex_t moduleGIL = PTHREAD_MUTEX_INITIALIZER; static pthread_mutex_t moduleGIL = PTHREAD_MUTEX_INITIALIZER;
@ -286,12 +302,6 @@ typedef struct RedisModuleKeyspaceSubscriber {
/* The module keyspace notification subscribers list */ /* The module keyspace notification subscribers list */
static list *moduleKeyspaceSubscribers; static list *moduleKeyspaceSubscribers;
/* Static client recycled for when we need to provide a context with a client
* in a situation where there is no client to provide. This avoids allocating
* a new client per round. For instance this is used in the keyspace
* notifications, timers and cluster messages callbacks. */
static client *moduleFreeContextReusedClient;
/* Data structures related to the exported dictionary data structure. */ /* Data structures related to the exported dictionary data structure. */
typedef struct RedisModuleDict { typedef struct RedisModuleDict {
rax *rax; /* The radix tree. */ rax *rax; /* The radix tree. */
@ -359,8 +369,6 @@ typedef struct RedisModuleEventListener {
} RedisModuleEventListener; } RedisModuleEventListener;
list *RedisModule_EventListeners; /* Global list of all the active events. */ list *RedisModule_EventListeners; /* Global list of all the active events. */
unsigned long long ModulesInHooks = 0; /* Total number of modules in hooks
callbacks right now. */
/* Data structures related to the redis module users */ /* Data structures related to the redis module users */
@ -497,6 +505,36 @@ void *RM_PoolAlloc(RedisModuleCtx *ctx, size_t bytes) {
* Helpers for modules API implementation * Helpers for modules API implementation
* -------------------------------------------------------------------------- */ * -------------------------------------------------------------------------- */
client *moduleAllocTempClient() {
client *c = NULL;
if (moduleTempClientCount > 0) {
c = moduleTempClients[--moduleTempClientCount];
if (moduleTempClientCount < moduleTempClientMinCount)
moduleTempClientMinCount = moduleTempClientCount;
} else {
c = createClient(NULL);
c->flags |= CLIENT_MODULE;
c->user = NULL; /* Root user */
}
return c;
}
void moduleReleaseTempClient(client *c) {
if (moduleTempClientCount == moduleTempClientCap) {
moduleTempClientCap = moduleTempClientCap ? moduleTempClientCap*2 : 32;
moduleTempClients = zrealloc(moduleTempClients, sizeof(c)*moduleTempClientCap);
}
clearClientConnectionState(c);
listEmpty(c->reply);
c->reply_bytes = 0;
resetClient(c);
c->bufpos = 0;
c->flags = CLIENT_MODULE;
c->user = NULL; /* Root user */
moduleTempClients[moduleTempClientCount++] = c;
}
/* Create an empty key of the specified type. `key` must point to a key object /* Create an empty key of the specified type. `key` must point to a key object
* opened for writing where the `.value` member is set to NULL because the * opened for writing where the `.value` member is set to NULL because the
* key was found to be non existing. * key was found to be non existing.
@ -627,7 +665,14 @@ void moduleFreeContext(RedisModuleCtx *ctx) {
"calls.", "calls.",
ctx->module->name); ctx->module->name);
} }
if (ctx->flags & REDISMODULE_CTX_THREAD_SAFE) freeClient(ctx->client); /* If this context has a temp client, we return it back to the pool.
* If this context created a new client (e.g detached context), we free it.
* If the client is assigned manually, e.g ctx->client = someClientInstance,
* none of these flags will be set and we do not attempt to free it. */
if (ctx->flags & REDISMODULE_CTX_TEMP_CLIENT)
moduleReleaseTempClient(ctx->client);
else if (ctx->flags & REDISMODULE_CTX_NEW_CLIENT)
freeClient(ctx->client);
} }
/* Create a module ctx and keep track of the nesting level. /* Create a module ctx and keep track of the nesting level.
@ -641,6 +686,11 @@ void moduleCreateContext(RedisModuleCtx *out_ctx, RedisModule *module, int ctx_f
out_ctx->getapifuncptr = (void*)(unsigned long)&RM_GetApi; out_ctx->getapifuncptr = (void*)(unsigned long)&RM_GetApi;
out_ctx->module = module; out_ctx->module = module;
out_ctx->flags = ctx_flags; out_ctx->flags = ctx_flags;
if (ctx_flags & REDISMODULE_CTX_TEMP_CLIENT)
out_ctx->client = moduleAllocTempClient();
else if (ctx_flags & REDISMODULE_CTX_NEW_CLIENT)
out_ctx->client = createClient(NULL);
if (!(ctx_flags & REDISMODULE_CTX_THREAD_SAFE)) { if (!(ctx_flags & REDISMODULE_CTX_THREAD_SAFE)) {
server.module_ctx_nesting++; server.module_ctx_nesting++;
} }
@ -4856,21 +4906,7 @@ RedisModuleCallReply *RM_Call(RedisModuleCtx *ctx, const char *cmdname, const ch
replicate = flags & REDISMODULE_ARGV_REPLICATE; replicate = flags & REDISMODULE_ARGV_REPLICATE;
va_end(ap); va_end(ap);
/* Setup our fake client for command execution. */ c = moduleAllocTempClient();
if (server.module_client == NULL) {
/* This is the first RM_Call() ever. Create reusable client. */
c = server.module_client = createClient(NULL);
} else if (server.module_client->argv == NULL) {
/* The reusable module client is not busy with a command. Use it. */
c = server.module_client;
} else {
/* The reusable module client is busy. (It is probably used in a
* recursive call to this module.) */
c = createClient(NULL);
}
c->user = NULL; /* Root user. */
c->flags = CLIENT_MODULE;
/* We do not want to allow block, the module do not expect it */ /* We do not want to allow block, the module do not expect it */
c->flags |= CLIENT_DENY_BLOCKING; c->flags |= CLIENT_DENY_BLOCKING;
@ -4994,18 +5030,7 @@ RedisModuleCallReply *RM_Call(RedisModuleCtx *ctx, const char *cmdname, const ch
cleanup: cleanup:
if (ctx->module) ctx->module->in_call--; if (ctx->module) ctx->module->in_call--;
if (c == server.module_client) { moduleReleaseTempClient(c);
/* reset shared client so it can be reused */
discardTransaction(c);
pubsubUnsubscribeAllChannels(c,0);
pubsubUnsubscribeAllPatterns(c,0);
resetClient(c); /* frees the contents of argv */
zfree(c->argv);
c->argv = NULL;
c->resp = 2;
} else {
freeClient(c); /* temporary client */
}
return reply; return reply;
} }
@ -6164,10 +6189,10 @@ RedisModuleBlockedClient *moduleBlockClient(RedisModuleCtx *ctx, RedisModuleCmdF
bc->disconnect_callback = NULL; /* Set by RM_SetDisconnectCallback() */ bc->disconnect_callback = NULL; /* Set by RM_SetDisconnectCallback() */
bc->free_privdata = free_privdata; bc->free_privdata = free_privdata;
bc->privdata = privdata; bc->privdata = privdata;
bc->reply_client = createClient(NULL); bc->reply_client = moduleAllocTempClient();
bc->thread_safe_ctx_client = moduleAllocTempClient();
if (bc->client) if (bc->client)
bc->reply_client->resp = bc->client->resp; bc->reply_client->resp = bc->client->resp;
bc->reply_client->flags |= CLIENT_MODULE;
bc->dbid = c->db->id; bc->dbid = c->db->id;
bc->blocked_on_keys = keys != NULL; bc->blocked_on_keys = keys != NULL;
bc->unblocked = 0; bc->unblocked = 0;
@ -6337,10 +6362,12 @@ int moduleUnblockClientByHandle(RedisModuleBlockedClient *bc, void *privdata) {
pthread_mutex_lock(&moduleUnblockedClientsMutex); pthread_mutex_lock(&moduleUnblockedClientsMutex);
if (!bc->blocked_on_keys) bc->privdata = privdata; if (!bc->blocked_on_keys) bc->privdata = privdata;
bc->unblocked = 1; bc->unblocked = 1;
listAddNodeTail(moduleUnblockedClients,bc); if (listLength(moduleUnblockedClients) == 0) {
if (write(server.module_blocked_pipe[1],"A",1) != 1) { if (write(server.module_blocked_pipe[1],"A",1) != 1) {
/* Ignore the error, this is best-effort. */ /* Ignore the error, this is best-effort. */
}
} }
listAddNodeTail(moduleUnblockedClients,bc);
pthread_mutex_unlock(&moduleUnblockedClientsMutex); pthread_mutex_unlock(&moduleUnblockedClientsMutex);
return REDISMODULE_OK; return REDISMODULE_OK;
} }
@ -6433,8 +6460,10 @@ void moduleHandleBlockedClients(void) {
pthread_mutex_lock(&moduleUnblockedClientsMutex); pthread_mutex_lock(&moduleUnblockedClientsMutex);
/* Here we unblock all the pending clients blocked in modules operations /* Here we unblock all the pending clients blocked in modules operations
* so we can read every pending "awake byte" in the pipe. */ * so we can read every pending "awake byte" in the pipe. */
char buf[1]; if (listLength(moduleUnblockedClients) > 0) {
while (read(server.module_blocked_pipe[0],buf,1) == 1); char buf[1];
while (read(server.module_blocked_pipe[0],buf,1) == 1);
}
while (listLength(moduleUnblockedClients)) { while (listLength(moduleUnblockedClients)) {
ln = listFirst(moduleUnblockedClients); ln = listFirst(moduleUnblockedClients);
bc = ln->value; bc = ln->value;
@ -6488,7 +6517,8 @@ void moduleHandleBlockedClients(void) {
* We need to glue such replies to the client output buffer and * We need to glue such replies to the client output buffer and
* free the temporary client we just used for the replies. */ * free the temporary client we just used for the replies. */
if (c) AddReplyFromClient(c, bc->reply_client); if (c) AddReplyFromClient(c, bc->reply_client);
freeClient(bc->reply_client); moduleReleaseTempClient(bc->reply_client);
moduleReleaseTempClient(bc->thread_safe_ctx_client);
if (c != NULL) { if (c != NULL) {
/* Before unblocking the client, set the disconnect callback /* Before unblocking the client, set the disconnect callback
@ -6623,15 +6653,27 @@ int RM_BlockedClientDisconnected(RedisModuleCtx *ctx) {
RedisModuleCtx *RM_GetThreadSafeContext(RedisModuleBlockedClient *bc) { RedisModuleCtx *RM_GetThreadSafeContext(RedisModuleBlockedClient *bc) {
RedisModuleCtx *ctx = zmalloc(sizeof(*ctx)); RedisModuleCtx *ctx = zmalloc(sizeof(*ctx));
RedisModule *module = bc ? bc->module : NULL; RedisModule *module = bc ? bc->module : NULL;
moduleCreateContext(ctx, module, REDISMODULE_CTX_THREAD_SAFE); int flags = REDISMODULE_CTX_THREAD_SAFE;
/* Creating a new client object is costly. To avoid that, we have an
* internal pool of client objects. In blockClient(), a client object is
* assigned to bc->thread_safe_ctx_client to be used for the thread safe
* context.
* For detached thread safe contexts, we create a new client object.
* Otherwise, as this function can be called from different threads, we
* would need to synchronize access to internal pool of client objects.
* Assuming creating detached context is rare and not that performance
* critical, we avoid synchronizing access to the client pool by creating
* a new client */
if (!bc) flags |= REDISMODULE_CTX_NEW_CLIENT;
moduleCreateContext(ctx, module, flags);
/* Even when the context is associated with a blocked client, we can't /* Even when the context is associated with a blocked client, we can't
* access it safely from another thread, so we create a fake client here * access it safely from another thread, so we use a fake client here
* in order to keep things like the currently selected database and similar * in order to keep things like the currently selected database and similar
* things. */ * things. */
ctx->client = createClient(NULL);
if (bc) { if (bc) {
ctx->blocked_client = bc; ctx->blocked_client = bc;
ctx->client = bc->thread_safe_ctx_client;
selectDb(ctx->client,bc->dbid); selectDb(ctx->client,bc->dbid);
if (bc->client) { if (bc->client) {
ctx->client->id = bc->client->id; ctx->client->id = bc->client->id;
@ -6648,8 +6690,10 @@ RedisModuleCtx *RM_GetThreadSafeContext(RedisModuleBlockedClient *bc) {
* a long term, for purposes such as logging. */ * a long term, for purposes such as logging. */
RedisModuleCtx *RM_GetDetachedThreadSafeContext(RedisModuleCtx *ctx) { RedisModuleCtx *RM_GetDetachedThreadSafeContext(RedisModuleCtx *ctx) {
RedisModuleCtx *new_ctx = zmalloc(sizeof(*new_ctx)); RedisModuleCtx *new_ctx = zmalloc(sizeof(*new_ctx));
moduleCreateContext(new_ctx, ctx->module, REDISMODULE_CTX_THREAD_SAFE); /* We create a new client object for the detached context.
new_ctx->client = createClient(NULL); * See RM_GetThreadSafeContext() for more information */
moduleCreateContext(new_ctx, ctx->module,
REDISMODULE_CTX_THREAD_SAFE|REDISMODULE_CTX_NEW_CLIENT);
return new_ctx; return new_ctx;
} }
@ -6828,12 +6872,11 @@ void moduleNotifyKeyspaceEvent(int type, const char *event, robj *key, int dbid)
while((ln = listNext(&li))) { while((ln = listNext(&li))) {
RedisModuleKeyspaceSubscriber *sub = ln->value; RedisModuleKeyspaceSubscriber *sub = ln->value;
/* Only notify subscribers on events matching they registration, /* Only notify subscribers on events matching the registration,
* and avoid subscribers triggering themselves */ * and avoid subscribers triggering themselves */
if ((sub->event_mask & type) && sub->active == 0) { if ((sub->event_mask & type) && sub->active == 0) {
RedisModuleCtx ctx; RedisModuleCtx ctx;
moduleCreateContext(&ctx, sub->module, REDISMODULE_CTX_NONE); moduleCreateContext(&ctx, sub->module, REDISMODULE_CTX_TEMP_CLIENT);
ctx.client = moduleFreeContextReusedClient;
selectDb(ctx.client, dbid); selectDb(ctx.client, dbid);
/* mark the handler as active to avoid reentrant loops. /* mark the handler as active to avoid reentrant loops.
@ -6895,8 +6938,7 @@ void moduleCallClusterReceivers(const char *sender_id, uint64_t module_id, uint8
while(r) { while(r) {
if (r->module_id == module_id) { if (r->module_id == module_id) {
RedisModuleCtx ctx; RedisModuleCtx ctx;
moduleCreateContext(&ctx, r->module, REDISMODULE_CTX_NONE); moduleCreateContext(&ctx, r->module, REDISMODULE_CTX_TEMP_CLIENT);
ctx.client = moduleFreeContextReusedClient;
selectDb(ctx.client, 0); selectDb(ctx.client, 0);
r->callback(&ctx,sender_id,type,payload,len); r->callback(&ctx,sender_id,type,payload,len);
moduleFreeContext(&ctx); moduleFreeContext(&ctx);
@ -7167,8 +7209,7 @@ int moduleTimerHandler(struct aeEventLoop *eventLoop, long long id, void *client
if (now >= expiretime) { if (now >= expiretime) {
RedisModuleTimer *timer = ri.data; RedisModuleTimer *timer = ri.data;
RedisModuleCtx ctx; RedisModuleCtx ctx;
moduleCreateContext(&ctx, timer->module, REDISMODULE_CTX_NONE); moduleCreateContext(&ctx,timer->module,REDISMODULE_CTX_TEMP_CLIENT);
ctx.client = moduleFreeContextReusedClient;
selectDb(ctx.client, timer->dbid); selectDb(ctx.client, timer->dbid);
timer->callback(&ctx,timer->data); timer->callback(&ctx,timer->data);
moduleFreeContext(&ctx); moduleFreeContext(&ctx);
@ -9287,7 +9328,6 @@ void moduleFireServerEvent(uint64_t eid, int subid, void *data) {
* cheap if there are no registered modules. */ * cheap if there are no registered modules. */
if (listLength(RedisModule_EventListeners) == 0) return; if (listLength(RedisModule_EventListeners) == 0) return;
int real_client_used = 0;
listIter li; listIter li;
listNode *ln; listNode *ln;
listRewind(RedisModule_EventListeners,&li); listRewind(RedisModule_EventListeners,&li);
@ -9295,22 +9335,16 @@ void moduleFireServerEvent(uint64_t eid, int subid, void *data) {
RedisModuleEventListener *el = ln->value; RedisModuleEventListener *el = ln->value;
if (el->event.id == eid) { if (el->event.id == eid) {
RedisModuleCtx ctx; RedisModuleCtx ctx;
moduleCreateContext(&ctx, el->module, REDISMODULE_CTX_NONE);
if (eid == REDISMODULE_EVENT_CLIENT_CHANGE) { if (eid == REDISMODULE_EVENT_CLIENT_CHANGE) {
/* In the case of client changes, we're pushing the real client /* In the case of client changes, we're pushing the real client
* so the event handler can mutate it if needed. For example, * so the event handler can mutate it if needed. For example,
* to change its authentication state in a way that does not * to change its authentication state in a way that does not
* depend on specific commands executed later. * depend on specific commands executed later.
*/ */
moduleCreateContext(&ctx,el->module,REDISMODULE_CTX_NONE);
ctx.client = (client *) data; ctx.client = (client *) data;
real_client_used = 1;
} else if (ModulesInHooks == 0) {
ctx.client = moduleFreeContextReusedClient;
} else { } else {
ctx.client = createClient(NULL); moduleCreateContext(&ctx,el->module,REDISMODULE_CTX_TEMP_CLIENT);
ctx.client->flags |= CLIENT_MODULE;
ctx.client->user = NULL; /* Root user. */
} }
void *moduledata = NULL; void *moduledata = NULL;
@ -9337,8 +9371,10 @@ void moduleFireServerEvent(uint64_t eid, int subid, void *data) {
selectDb(ctx.client, fi->dbnum); selectDb(ctx.client, fi->dbnum);
} else if (eid == REDISMODULE_EVENT_MODULE_CHANGE) { } else if (eid == REDISMODULE_EVENT_MODULE_CHANGE) {
RedisModule *m = data; RedisModule *m = data;
if (m == el->module) if (m == el->module) {
moduleFreeContext(&ctx);
continue; continue;
}
mcv1.version = REDISMODULE_MODULE_CHANGE_VERSION; mcv1.version = REDISMODULE_MODULE_CHANGE_VERSION;
mcv1.module_name = m->name; mcv1.module_name = m->name;
mcv1.module_version = m->ver; mcv1.module_version = m->ver;
@ -9351,13 +9387,10 @@ void moduleFireServerEvent(uint64_t eid, int subid, void *data) {
moduledata = data; moduledata = data;
} }
ModulesInHooks++;
el->module->in_hook++; el->module->in_hook++;
el->callback(&ctx,el->event,subid,moduledata); el->callback(&ctx,el->event,subid,moduledata);
el->module->in_hook--; el->module->in_hook--;
ModulesInHooks--;
if (ModulesInHooks != 0 && !real_client_used) freeClient(ctx.client);
moduleFreeContext(&ctx); moduleFreeContext(&ctx);
} }
} }
@ -9488,14 +9521,11 @@ int moduleRegisterApi(const char *funcname, void *funcptr) {
/* Global initialization at Redis startup. */ /* Global initialization at Redis startup. */
void moduleRegisterCoreAPI(void); void moduleRegisterCoreAPI(void);
/* Some steps in module initialization need to be done last after server /* Currently, this function is just a placeholder for the module system
* initialization. * initialization steps that need to be run after server initialization.
* For example, selectDb() in createClient() requires that server.db has * A previous issue, selectDb() in createClient() requires that server.db has
* been initialized, see #7323. */ * been initialized, see #7323. */
void moduleInitModulesSystemLast(void) { void moduleInitModulesSystemLast(void) {
moduleFreeContextReusedClient = createClient(NULL);
moduleFreeContextReusedClient->flags |= CLIENT_MODULE;
moduleFreeContextReusedClient->user = NULL; /* root user. */
} }
void moduleInitModulesSystem(void) { void moduleInitModulesSystem(void) {
@ -9509,9 +9539,6 @@ void moduleInitModulesSystem(void) {
/* Set up filter list */ /* Set up filter list */
moduleCommandFilters = listCreate(); moduleCommandFilters = listCreate();
/* Reusable client for RM_Call() is created on first use */
server.module_client = NULL;
moduleRegisterCoreAPI(); moduleRegisterCoreAPI();
/* Create a pipe for module threads to be able to wake up the redis main thread. /* Create a pipe for module threads to be able to wake up the redis main thread.
@ -9540,6 +9567,33 @@ void moduleInitModulesSystem(void) {
pthread_mutex_lock(&moduleGIL); pthread_mutex_lock(&moduleGIL);
} }
void modulesCron(void) {
/* Check number of temporary clients in the pool and free the unused ones
* since the last cron. moduleTempClientMinCount tracks minimum count of
* clients in the pool since the last cron. This is the number of clients
* that we didn't use for the last cron period. */
/* Limit the max client count to be freed at once to avoid latency spikes.*/
int iteration = 50;
/* We are freeing clients if we have more than 8 unused clients. Keeping
* small amount of clients to avoid client allocation costs if temporary
* clients are required after some idle period. */
const unsigned int min_client = 8;
while (iteration > 0 && moduleTempClientCount > 0 && moduleTempClientMinCount > min_client) {
client *c = moduleTempClients[--moduleTempClientCount];
freeClient(c);
iteration--;
moduleTempClientMinCount--;
}
moduleTempClientMinCount = moduleTempClientCount;
/* Shrink moduleTempClients array itself if it is wasting some space */
if (moduleTempClientCap > 32 && moduleTempClientCap > moduleTempClientCount * 4) {
moduleTempClientCap /= 4;
moduleTempClients = zrealloc(moduleTempClients,sizeof(client*)*moduleTempClientCap);
}
}
void moduleLoadQueueEntryFree(struct moduleLoadQueueEntry *loadmod) { void moduleLoadQueueEntryFree(struct moduleLoadQueueEntry *loadmod) {
if (!loadmod) return; if (!loadmod) return;
sdsfree(loadmod->path); sdsfree(loadmod->path);
@ -9632,10 +9686,6 @@ void moduleUnregisterCommands(struct RedisModule *module) {
int moduleLoad(const char *path, void **module_argv, int module_argc) { int moduleLoad(const char *path, void **module_argv, int module_argc) {
int (*onload)(void *, void **, int); int (*onload)(void *, void **, int);
void *handle; void *handle;
RedisModuleCtx ctx;
moduleCreateContext(&ctx, NULL, REDISMODULE_CTX_NONE); /* We pass NULL since we don't have a module yet. */
ctx.client = moduleFreeContextReusedClient;
selectDb(ctx.client, 0);
struct stat st; struct stat st;
if (stat(path, &st) == 0) if (stat(path, &st) == 0)
@ -9659,6 +9709,9 @@ int moduleLoad(const char *path, void **module_argv, int module_argc) {
"symbol. Module not loaded.",path); "symbol. Module not loaded.",path);
return C_ERR; return C_ERR;
} }
RedisModuleCtx ctx;
moduleCreateContext(&ctx, NULL, REDISMODULE_CTX_TEMP_CLIENT); /* We pass NULL since we don't have a module yet. */
selectDb(ctx.client, 0);
if (onload((void*)&ctx,module_argv,module_argc) == REDISMODULE_ERR) { if (onload((void*)&ctx,module_argv,module_argc) == REDISMODULE_ERR) {
if (ctx.module) { if (ctx.module) {
moduleUnregisterCommands(ctx.module); moduleUnregisterCommands(ctx.module);
@ -9666,6 +9719,7 @@ int moduleLoad(const char *path, void **module_argv, int module_argc) {
moduleUnregisterUsedAPI(ctx.module); moduleUnregisterUsedAPI(ctx.module);
moduleFreeModuleStructure(ctx.module); moduleFreeModuleStructure(ctx.module);
} }
moduleFreeContext(&ctx);
dlclose(handle); dlclose(handle);
serverLog(LL_WARNING, serverLog(LL_WARNING,
"Module %s initialization failed. Module not loaded",path); "Module %s initialization failed. Module not loaded",path);
@ -9726,8 +9780,7 @@ int moduleUnload(sds name) {
onunload = (int (*)(void *))(unsigned long) dlsym(module->handle, "RedisModule_OnUnload"); onunload = (int (*)(void *))(unsigned long) dlsym(module->handle, "RedisModule_OnUnload");
if (onunload) { if (onunload) {
RedisModuleCtx ctx; RedisModuleCtx ctx;
moduleCreateContext(&ctx, module, REDISMODULE_CTX_NONE); moduleCreateContext(&ctx, module, REDISMODULE_CTX_TEMP_CLIENT);
ctx.client = moduleFreeContextReusedClient;
int unload_status = onunload((void*)&ctx); int unload_status = onunload((void*)&ctx);
moduleFreeContext(&ctx); moduleFreeContext(&ctx);

View File

@ -1374,6 +1374,45 @@ void unlinkClient(client *c) {
if (c->flags & CLIENT_TRACKING) disableTracking(c); if (c->flags & CLIENT_TRACKING) disableTracking(c);
} }
/* Clear the client state to resemble a newly connected client. */
void clearClientConnectionState(client *c) {
listNode *ln;
/* MONITOR clients are also marked with CLIENT_SLAVE, we need to
* distinguish between the two.
*/
if (c->flags & CLIENT_MONITOR) {
ln = listSearchKey(server.monitors,c);
serverAssert(ln != NULL);
listDelNode(server.monitors,ln);
c->flags &= ~(CLIENT_MONITOR|CLIENT_SLAVE);
}
serverAssert(!(c->flags &(CLIENT_SLAVE|CLIENT_MASTER)));
if (c->flags & CLIENT_TRACKING) disableTracking(c);
selectDb(c,0);
c->resp = 2;
clientSetDefaultAuth(c);
moduleNotifyUserChanged(c);
discardTransaction(c);
pubsubUnsubscribeAllChannels(c,0);
pubsubUnsubscribeShardAllChannels(c, 0);
pubsubUnsubscribeAllPatterns(c,0);
if (c->name) {
decrRefCount(c->name);
c->name = NULL;
}
/* Selectively clear state flags not covered above */
c->flags &= ~(CLIENT_ASKING|CLIENT_READONLY|CLIENT_PUBSUB|
CLIENT_REPLY_OFF|CLIENT_REPLY_SKIP_NEXT);
}
void freeClient(client *c) { void freeClient(client *c) {
listNode *ln; listNode *ln;
@ -2598,45 +2637,18 @@ int clientSetNameOrReply(client *c, robj *name) {
/* Reset the client state to resemble a newly connected client. /* Reset the client state to resemble a newly connected client.
*/ */
void resetCommand(client *c) { void resetCommand(client *c) {
listNode *ln;
/* MONITOR clients are also marked with CLIENT_SLAVE, we need to /* MONITOR clients are also marked with CLIENT_SLAVE, we need to
* distinguish between the two. * distinguish between the two.
*/ */
if (c->flags & CLIENT_MONITOR) { uint64_t flags = c->flags;
ln = listSearchKey(server.monitors,c); if (flags & CLIENT_MONITOR) flags &= ~(CLIENT_MONITOR|CLIENT_SLAVE);
serverAssert(ln != NULL);
listDelNode(server.monitors,ln);
c->flags &= ~(CLIENT_MONITOR|CLIENT_SLAVE); if (flags & (CLIENT_SLAVE|CLIENT_MASTER|CLIENT_MODULE)) {
}
if (c->flags & (CLIENT_SLAVE|CLIENT_MASTER|CLIENT_MODULE)) {
addReplyError(c,"can only reset normal client connections"); addReplyError(c,"can only reset normal client connections");
return; return;
} }
if (c->flags & CLIENT_TRACKING) disableTracking(c); clearClientConnectionState(c);
selectDb(c,0);
c->resp = 2;
clientSetDefaultAuth(c);
moduleNotifyUserChanged(c);
discardTransaction(c);
pubsubUnsubscribeAllChannels(c,0);
pubsubUnsubscribeShardAllChannels(c, 0);
pubsubUnsubscribeAllPatterns(c,0);
if (c->name) {
decrRefCount(c->name);
c->name = NULL;
}
/* Selectively clear state flags not covered above */
c->flags &= ~(CLIENT_ASKING|CLIENT_READONLY|CLIENT_PUBSUB|
CLIENT_REPLY_OFF|CLIENT_REPLY_SKIP_NEXT);
addReplyStatus(c,"RESET"); addReplyStatus(c,"RESET");
} }

View File

@ -1320,6 +1320,10 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
server.rdb_bgsave_scheduled = 0; server.rdb_bgsave_scheduled = 0;
} }
run_with_period(100) {
if (moduleCount()) modulesCron();
}
/* Fire the cron loop modules event. */ /* Fire the cron loop modules event. */
RedisModuleCronLoopV1 ei = {REDISMODULE_CRON_LOOP_VERSION,server.hz}; RedisModuleCronLoopV1 ei = {REDISMODULE_CRON_LOOP_VERSION,server.hz};
moduleFireServerEvent(REDISMODULE_EVENT_CRON_LOOP, moduleFireServerEvent(REDISMODULE_EVENT_CRON_LOOP,

View File

@ -1453,7 +1453,6 @@ struct redisServer {
to be processed. */ to be processed. */
pid_t child_pid; /* PID of current child */ pid_t child_pid; /* PID of current child */
int child_type; /* Type of current child */ int child_type; /* Type of current child */
client *module_client; /* "Fake" client to call Redis from modules */
/* Networking */ /* Networking */
int port; /* TCP listening port */ int port; /* TCP listening port */
int tls_port; /* TLS listening port */ int tls_port; /* TLS listening port */
@ -2302,6 +2301,7 @@ void populateCommandLegacyRangeSpec(struct redisCommand *c);
/* Modules */ /* Modules */
void moduleInitModulesSystem(void); void moduleInitModulesSystem(void);
void moduleInitModulesSystemLast(void); void moduleInitModulesSystemLast(void);
void modulesCron(void);
int moduleLoad(const char *path, void **argv, int argc); int moduleLoad(const char *path, void **argv, int argc);
void moduleLoadFromQueue(void); void moduleLoadFromQueue(void);
int moduleGetCommandKeysViaAPI(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result); int moduleGetCommandKeysViaAPI(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result);
@ -2361,6 +2361,7 @@ void freeClient(client *c);
void freeClientAsync(client *c); void freeClientAsync(client *c);
void logInvalidUseAndFreeClientAsync(client *c, const char *fmt, ...); void logInvalidUseAndFreeClientAsync(client *c, const char *fmt, ...);
int beforeNextClient(client *c); int beforeNextClient(client *c);
void clearClientConnectionState(client *c);
void resetClient(client *c); void resetClient(client *c);
void freeClientOriginalArgv(client *c); void freeClientOriginalArgv(client *c);
void sendReplyToClient(connection *conn); void sendReplyToClient(connection *conn);