Shared reusable client for RM_Call() (#8516)

A single client pointer is added in the server struct. This is
initialized by the first RM_Call() and reused for every subsequent
RM_Call() except if it's already in use, which means that it's not
used for (recursive) module calls to modules. For these, a new
"fake" client is created each time.

Other changes:
* Avoid allocating a dict iterator in pubsubUnsubscribeAllChannels
  when not needed
This commit is contained in:
Viktor Söderqvist 2021-02-28 13:11:18 +01:00 committed by GitHub
parent 4a474843fb
commit 6122f1c450
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 43 additions and 11 deletions

View File

@ -3969,16 +3969,26 @@ RedisModuleCallReply *RM_Call(RedisModuleCtx *ctx, const char *cmdname, const ch
RedisModuleCallReply *reply = NULL; RedisModuleCallReply *reply = NULL;
int replicate = 0; /* Replicate this command? */ int replicate = 0; /* Replicate this command? */
/* Create the client and dispatch the command. */ /* Handle arguments. */
va_start(ap, fmt); va_start(ap, fmt);
c = createClient(NULL);
c->user = NULL; /* Root user. */
argv = moduleCreateArgvFromUserFormat(cmdname,fmt,&argc,&flags,ap); argv = moduleCreateArgvFromUserFormat(cmdname,fmt,&argc,&flags,ap);
replicate = flags & REDISMODULE_ARGV_REPLICATE; replicate = flags & REDISMODULE_ARGV_REPLICATE;
va_end(ap); va_end(ap);
/* Setup our fake client for command execution. */ /* Setup our fake client for command execution. */
c->flags |= CLIENT_MODULE; if (server.module_client == NULL) {
/* This is the first RM_Call() ever. Create reusable client. */
c = server.module_client = createClient(NULL);
} else if (server.module_client->argv == NULL) {
/* The reusable module client is not busy with a command. Use it. */
c = server.module_client;
} else {
/* The reusable module client is busy. (It is probably used in a
* recursive call to this module.) */
c = createClient(NULL);
}
c->user = NULL; /* Root user. */
c->flags = CLIENT_MODULE;
/* We do not want to allow block, the module do not expect it */ /* We do not want to allow block, the module do not expect it */
c->flags |= CLIENT_DENY_BLOCKING; c->flags |= CLIENT_DENY_BLOCKING;
@ -4066,7 +4076,18 @@ RedisModuleCallReply *RM_Call(RedisModuleCtx *ctx, const char *cmdname, const ch
cleanup: cleanup:
if (ctx->module) ctx->module->in_call--; if (ctx->module) ctx->module->in_call--;
freeClient(c); if (c == server.module_client) {
/* reset shared client so it can be reused */
discardTransaction(c);
pubsubUnsubscribeAllChannels(c,0);
pubsubUnsubscribeAllPatterns(c,0);
resetClient(c); /* frees the contents of argv */
zfree(c->argv);
c->argv = NULL;
c->resp = 2;
} else {
freeClient(c); /* temporary client */
}
return reply; return reply;
} }
@ -8269,6 +8290,9 @@ void moduleInitModulesSystem(void) {
/* Set up filter list */ /* Set up filter list */
moduleCommandFilters = listCreate(); moduleCommandFilters = listCreate();
/* Reusable client for RM_Call() is created on first use */
server.module_client = NULL;
moduleRegisterCoreAPI(); moduleRegisterCoreAPI();
if (pipe(server.module_blocked_pipe) == -1) { if (pipe(server.module_blocked_pipe) == -1) {
serverLog(LL_WARNING, serverLog(LL_WARNING,

View File

@ -250,18 +250,20 @@ int pubsubUnsubscribePattern(client *c, robj *pattern, int notify) {
/* Unsubscribe from all the channels. Return the number of channels the /* Unsubscribe from all the channels. Return the number of channels the
* client was subscribed to. */ * client was subscribed to. */
int pubsubUnsubscribeAllChannels(client *c, int notify) { int pubsubUnsubscribeAllChannels(client *c, int notify) {
dictIterator *di = dictGetSafeIterator(c->pubsub_channels);
dictEntry *de;
int count = 0; int count = 0;
if (dictSize(c->pubsub_channels) > 0) {
dictIterator *di = dictGetSafeIterator(c->pubsub_channels);
dictEntry *de;
while((de = dictNext(di)) != NULL) { while((de = dictNext(di)) != NULL) {
robj *channel = dictGetKey(de); robj *channel = dictGetKey(de);
count += pubsubUnsubscribeChannel(c,channel,notify); count += pubsubUnsubscribeChannel(c,channel,notify);
}
dictReleaseIterator(di);
} }
/* We were subscribed to nothing? Still reply to the client. */ /* We were subscribed to nothing? Still reply to the client. */
if (notify && count == 0) addReplyPubsubUnsubscribed(c,NULL); if (notify && count == 0) addReplyPubsubUnsubscribed(c,NULL);
dictReleaseIterator(di);
return count; return count;
} }

View File

@ -1195,6 +1195,7 @@ struct redisServer {
to be processed. */ to be processed. */
pid_t child_pid; /* PID of current child */ pid_t child_pid; /* PID of current child */
int child_type; /* Type of current child */ int child_type; /* Type of current child */
client *module_client; /* "Fake" client to call Redis from modules */
/* Networking */ /* Networking */
int port; /* TCP listening port */ int port; /* TCP listening port */
int tls_port; /* TLS listening port */ int tls_port; /* TLS listening port */

View File

@ -16,6 +16,11 @@ start_server {tags {"modules"}} {
assert { [string match "*cmdstat_module*" $info] } assert { [string match "*cmdstat_module*" $info] }
} }
test {test RM_Call recursive} {
set info [r test.call_generic test.call_generic info commandstats]
assert { [string match "*cmdstat_module*" $info] }
}
test {test redis version} { test {test redis version} {
set version [s redis_version] set version [s redis_version]
assert_equal $version [r test.redisversion] assert_equal $version [r test.redisversion]