Fix modules blocking commands awake delay.

If a thread unblocks a client blocked in a module command, by using the
RedisMdoule_UnblockClient() API, the event loop may not be awaken until
the next timeout of the multiplexing API or the next unrelated I/O
operation on other clients. We actually want the client to be served
ASAP, so a mechanism is needed in order for the unblocking API to inform
Redis that there is a client to serve ASAP.

This commit fixes the issue using the old trick of the pipe: when a
client needs to be unblocked, a byte is written in a pipe. When we run
the list of clients blocked in modules, we consume all the bytes
written in the pipe. Writes and reads are performed inside the context
of the mutex, so no race is possible in which we consume the bytes that
are actually related to an awake request for a client that should still
be put into the list of clients to unblock.

It was verified that after the fix the server handles the blocked
clients with the expected short delay.

Thanks to @dvirsky for understanding there was such a problem and
reporting it.
This commit is contained in:
antirez 2017-04-10 09:33:21 +02:00
parent 91999fce40
commit ffefc9f92d
3 changed files with 42 additions and 0 deletions

View File

@ -3108,6 +3108,17 @@ void RM_LogIOError(RedisModuleIO *io, const char *levelstr, const char *fmt, ...
* Blocking clients from modules * Blocking clients from modules
* -------------------------------------------------------------------------- */ * -------------------------------------------------------------------------- */
/* Readable handler for the awake pipe. We do nothing here, the awake bytes
* will be actually read in a more appropriate place in the
* moduleHandleBlockedClients() function that is where clients are actually
* served. */
void moduleBlockedClientPipeReadable(aeEventLoop *el, int fd, void *privdata, int mask) {
UNUSED(el);
UNUSED(fd);
UNUSED(mask);
UNUSED(privdata);
}
/* This is called from blocked.c in order to unblock a client: may be called /* This is called from blocked.c in order to unblock a client: may be called
* for multiple reasons while the client is in the middle of being blocked * for multiple reasons while the client is in the middle of being blocked
* because the client is terminated, but is also called for cleanup when a * because the client is terminated, but is also called for cleanup when a
@ -3171,6 +3182,9 @@ int RM_UnblockClient(RedisModuleBlockedClient *bc, void *privdata) {
pthread_mutex_lock(&moduleUnblockedClientsMutex); pthread_mutex_lock(&moduleUnblockedClientsMutex);
bc->privdata = privdata; bc->privdata = privdata;
listAddNodeTail(moduleUnblockedClients,bc); listAddNodeTail(moduleUnblockedClients,bc);
if (write(server.module_blocked_pipe[1],"A",1) != 1) {
/* Ignore the error, this is best-effort. */
}
pthread_mutex_unlock(&moduleUnblockedClientsMutex); pthread_mutex_unlock(&moduleUnblockedClientsMutex);
return REDISMODULE_OK; return REDISMODULE_OK;
} }
@ -3195,6 +3209,10 @@ void moduleHandleBlockedClients(void) {
RedisModuleBlockedClient *bc; RedisModuleBlockedClient *bc;
pthread_mutex_lock(&moduleUnblockedClientsMutex); pthread_mutex_lock(&moduleUnblockedClientsMutex);
/* Here we unblock all the pending clients blocked in modules operations
* so we can read every pending "awake byte" in the pipe. */
char buf[1];
while (read(server.module_blocked_pipe[0],buf,1) == 1);
while (listLength(moduleUnblockedClients)) { while (listLength(moduleUnblockedClients)) {
ln = listFirst(moduleUnblockedClients); ln = listFirst(moduleUnblockedClients);
bc = ln->value; bc = ln->value;
@ -3298,6 +3316,16 @@ void moduleInitModulesSystem(void) {
server.loadmodule_queue = listCreate(); server.loadmodule_queue = listCreate();
modules = dictCreate(&modulesDictType,NULL); modules = dictCreate(&modulesDictType,NULL);
moduleRegisterCoreAPI(); moduleRegisterCoreAPI();
if (pipe(server.module_blocked_pipe) == -1) {
serverLog(LL_WARNING,
"Can't create the pipe for module blocking commands: %s",
strerror(errno));
exit(1);
}
/* Make the pipe non blocking. This is just a best effort aware mechanism
* and we do not want to block not in the read nor in the write half. */
anetNonBlock(NULL,server.module_blocked_pipe[0]);
anetNonBlock(NULL,server.module_blocked_pipe[1]);
} }
/* Load all the modules in the server.loadmodule_queue list, which is /* Load all the modules in the server.loadmodule_queue list, which is

View File

@ -1870,6 +1870,16 @@ void initServer(void) {
if (server.sofd > 0 && aeCreateFileEvent(server.el,server.sofd,AE_READABLE, if (server.sofd > 0 && aeCreateFileEvent(server.el,server.sofd,AE_READABLE,
acceptUnixHandler,NULL) == AE_ERR) serverPanic("Unrecoverable error creating server.sofd file event."); acceptUnixHandler,NULL) == AE_ERR) serverPanic("Unrecoverable error creating server.sofd file event.");
/* Register a readable event for the pipe used to awake the event loop
* when a blocked client in a module needs attention. */
if (aeCreateFileEvent(server.el, server.module_blocked_pipe[0], AE_READABLE,
moduleBlockedClientPipeReadable,NULL) == AE_ERR) {
serverPanic(
"Error registering the readable event for the module "
"blocked clients subsystem.");
}
/* Open the AOF file if needed. */ /* Open the AOF file if needed. */
if (server.aof_state == AOF_ON) { if (server.aof_state == AOF_ON) {
server.aof_fd = open(server.aof_filename, server.aof_fd = open(server.aof_filename,

View File

@ -877,6 +877,9 @@ struct redisServer {
/* Modules */ /* Modules */
dict *moduleapi; /* Exported APIs dictionary for modules. */ dict *moduleapi; /* Exported APIs dictionary for modules. */
list *loadmodule_queue; /* List of modules to load at startup. */ list *loadmodule_queue; /* List of modules to load at startup. */
int module_blocked_pipe[2]; /* Pipe used to awake the event loop if a
client blocked on a module command needs
to be processed. */
/* Networking */ /* Networking */
int port; /* TCP listening port */ int port; /* TCP listening port */
int tcp_backlog; /* TCP listen() backlog */ int tcp_backlog; /* TCP listen() backlog */
@ -1286,6 +1289,7 @@ void moduleFreeContext(struct RedisModuleCtx *ctx);
void unblockClientFromModule(client *c); void unblockClientFromModule(client *c);
void moduleHandleBlockedClients(void); void moduleHandleBlockedClients(void);
void moduleBlockedClientTimedOut(client *c); void moduleBlockedClientTimedOut(client *c);
void moduleBlockedClientPipeReadable(aeEventLoop *el, int fd, void *privdata, int mask);
/* Utils */ /* Utils */
long long ustime(void); long long ustime(void);