Add event loop support to the module API (#10001)

Modules can now register sockets/pipe to the Redis main thread event loop and do network operations asynchronously. Previously, modules had to maintain an event loop and another thread for asynchronous network operations.

Also, if a module is calling API functions after doing some network operations, it had to synchronize its event loop thread's access with Redis main thread by locking the GIL, causing contention on the lock. After this commit, no synchronization is needed as module can operate in Redis main thread context. So, this commit may improve the performance for some use cases.

Added three functions to the module API:

* RedisModule_EventLoopAdd(int fd, int mask, RedisModuleEventLoopFunc func, void *user_data)
* RedisModule_EventLoopDel(int fd, int mask)
* RedisModule_EventLoopAddOneShot(RedisModuleEventLoopOneShotFunc func, void *user_data) - This function can be called from other threads to trigger callback on Redis main thread. Callback will be triggered only once. If Redis main thread is sleeping, this call will wake up the Redis main thread.
Event loop callbacks are called by Redis main thread after locking the GIL. Inside callbacks, modules can operate as if they are holding the GIL.

Added REDISMODULE_EVENT_EVENTLOOP event with two subevents:

* REDISMODULE_SUBEVENT_EVENTLOOP_BEFORE_SLEEP
* REDISMODULE_SUBEVENT_EVENTLOOP_AFTER_SLEEP

These events are for modules that want to participate in the before and after sleep action. e.g It might be useful to implement batching : Read data from the network, write all to a file in one go on BEFORE_SLEEP event.
This commit is contained in:
Ozan Tezcan 2022-01-18 14:10:07 +03:00 committed by GitHub
parent 25e6d4d459
commit 99ab4236af
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 591 additions and 35 deletions

View File

@ -44,5 +44,6 @@ $TCLSH tests/test_helper.tcl \
--single unit/moduleapi/aclcheck \
--single unit/moduleapi/subcommands \
--single unit/moduleapi/reply \
--single unit/moduleapi/eventloop \
"${@}"

View File

@ -197,6 +197,14 @@ void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask)
}
}
void *aeGetFileClientData(aeEventLoop *eventLoop, int fd) {
if (fd >= eventLoop->setsize) return NULL;
aeFileEvent *fe = &eventLoop->events[fd];
if (fe->mask == AE_NONE) return NULL;
return fe->clientData;
}
int aeGetFileEvents(aeEventLoop *eventLoop, int fd) {
if (fd >= eventLoop->setsize) return 0;
aeFileEvent *fe = &eventLoop->events[fd];

View File

@ -118,6 +118,7 @@ int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
aeFileProc *proc, void *clientData);
void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask);
int aeGetFileEvents(aeEventLoop *eventLoop, int fd);
void *aeGetFileClientData(aeEventLoop *eventLoop, int fd);
long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds,
aeTimeProc *proc, void *clientData,
aeEventFinalizerProc *finalizerProc);

View File

@ -6093,17 +6093,6 @@ void RM_LatencyAddSample(const char *event, mstime_t latency) {
* https://redis.io/topics/modules-blocking-ops.
* -------------------------------------------------------------------------- */
/* Readable handler for the awake pipe. We do nothing here, the awake bytes
* will be actually read in a more appropriate place in the
* moduleHandleBlockedClients() function that is where clients are actually
* served. */
void moduleBlockedClientPipeReadable(aeEventLoop *el, int fd, void *privdata, int mask) {
UNUSED(el);
UNUSED(fd);
UNUSED(mask);
UNUSED(privdata);
}
/* This is called from blocked.c in order to unblock a client: may be called
* for multiple reasons while the client is in the middle of being blocked
* because the client is terminated, but is also called for cleanup when a
@ -6368,7 +6357,7 @@ int moduleUnblockClientByHandle(RedisModuleBlockedClient *bc, void *privdata) {
if (!bc->blocked_on_keys) bc->privdata = privdata;
bc->unblocked = 1;
if (listLength(moduleUnblockedClients) == 0) {
if (write(server.module_blocked_pipe[1],"A",1) != 1) {
if (write(server.module_pipe[1],"A",1) != 1) {
/* Ignore the error, this is best-effort. */
}
}
@ -6463,12 +6452,6 @@ void moduleHandleBlockedClients(void) {
RedisModuleBlockedClient *bc;
pthread_mutex_lock(&moduleUnblockedClientsMutex);
/* Here we unblock all the pending clients blocked in modules operations
* so we can read every pending "awake byte" in the pipe. */
if (listLength(moduleUnblockedClients) > 0) {
char buf[1];
while (read(server.module_blocked_pipe[0],buf,1) == 1);
}
while (listLength(moduleUnblockedClients)) {
ln = listFirst(moduleUnblockedClients);
bc = ln->value;
@ -7333,6 +7316,214 @@ int RM_GetTimerInfo(RedisModuleCtx *ctx, RedisModuleTimerID id, uint64_t *remain
return REDISMODULE_OK;
}
/* --------------------------------------------------------------------------
* ## Modules EventLoop API
* --------------------------------------------------------------------------*/
typedef struct EventLoopData {
RedisModuleEventLoopFunc rFunc;
RedisModuleEventLoopFunc wFunc;
void *user_data;
} EventLoopData;
typedef struct EventLoopOneShot {
RedisModuleEventLoopOneShotFunc func;
void *user_data;
} EventLoopOneShot;
list *moduleEventLoopOneShots;
static pthread_mutex_t moduleEventLoopMutex = PTHREAD_MUTEX_INITIALIZER;
static int eventLoopToAeMask(int mask) {
int aeMask = 0;
if (mask & REDISMODULE_EVENTLOOP_READABLE)
aeMask |= AE_READABLE;
if (mask & REDISMODULE_EVENTLOOP_WRITABLE)
aeMask |= AE_WRITABLE;
return aeMask;
}
static int eventLoopFromAeMask(int ae_mask) {
int mask = 0;
if (ae_mask & AE_READABLE)
mask |= REDISMODULE_EVENTLOOP_READABLE;
if (ae_mask & AE_WRITABLE)
mask |= REDISMODULE_EVENTLOOP_WRITABLE;
return mask;
}
static void eventLoopCbReadable(struct aeEventLoop *ae, int fd, void *user_data, int ae_mask) {
UNUSED(ae);
EventLoopData *data = user_data;
data->rFunc(fd, data->user_data, eventLoopFromAeMask(ae_mask));
}
static void eventLoopCbWritable(struct aeEventLoop *ae, int fd, void *user_data, int ae_mask) {
UNUSED(ae);
EventLoopData *data = user_data;
data->wFunc(fd, data->user_data, eventLoopFromAeMask(ae_mask));
}
/* Add a pipe / socket event to the event loop.
*
* * `mask` must be one of the following values:
*
* * `REDISMODULE_EVENTLOOP_READABLE`
* * `REDISMODULE_EVENTLOOP_WRITABLE`
* * `REDISMODULE_EVENTLOOP_READABLE | REDISMODULE_EVENTLOOP_WRITABLE`
*
* On success REDISMODULE_OK is returned, otherwise
* REDISMODULE_ERR is returned and errno is set to the following values:
*
* * ERANGE: `fd` is negative or higher than `maxclients` Redis config.
* * EINVAL: `callback` is NULL or `mask` value is invalid.
*
* `errno` might take other values in case of an internal error.
*
* Example:
*
* void onReadable(int fd, void *user_data, int mask) {
* char buf[32];
* int bytes = read(fd,buf,sizeof(buf));
* printf("Read %d bytes \n", bytes);
* }
* RM_EventLoopAdd(fd, REDISMODULE_EVENTLOOP_READABLE, onReadable, NULL);
*/
int RM_EventLoopAdd(int fd, int mask, RedisModuleEventLoopFunc func, void *user_data) {
if (fd < 0 || fd >= aeGetSetSize(server.el)) {
errno = ERANGE;
return REDISMODULE_ERR;
}
if (!func || mask & ~(REDISMODULE_EVENTLOOP_READABLE |
REDISMODULE_EVENTLOOP_WRITABLE)) {
errno = EINVAL;
return REDISMODULE_ERR;
}
/* We are going to register stub callbacks to 'ae' for two reasons:
*
* - "ae" callback signature is different from RedisModuleEventLoopCallback,
* that will be handled it in our stub callbacks.
* - We need to remap 'mask' value to provide binary compatibility.
*
* For the stub callbacks, saving user 'callback' and 'user_data' in an
* EventLoopData object and passing it to ae, later, we'll extract
* 'callback' and 'user_data' from that.
*/
EventLoopData *data = aeGetFileClientData(server.el, fd);
if (!data)
data = zcalloc(sizeof(*data));
aeFileProc *aeProc;
if (mask & REDISMODULE_EVENTLOOP_READABLE)
aeProc = eventLoopCbReadable;
else
aeProc = eventLoopCbWritable;
int aeMask = eventLoopToAeMask(mask);
if (aeCreateFileEvent(server.el, fd, aeMask, aeProc, data) != AE_OK) {
if (aeGetFileEvents(server.el, fd) == AE_NONE)
zfree(data);
return REDISMODULE_ERR;
}
data->user_data = user_data;
if (mask & REDISMODULE_EVENTLOOP_READABLE)
data->rFunc = func;
if (mask & REDISMODULE_EVENTLOOP_WRITABLE)
data->wFunc = func;
errno = 0;
return REDISMODULE_OK;
}
/* Delete a pipe / socket event from the event loop.
*
* * `mask` must be one of the following values:
*
* * `REDISMODULE_EVENTLOOP_READABLE`
* * `REDISMODULE_EVENTLOOP_WRITABLE`
* * `REDISMODULE_EVENTLOOP_READABLE | REDISMODULE_EVENTLOOP_WRITABLE`
*
* On success REDISMODULE_OK is returned, otherwise
* REDISMODULE_ERR is returned and errno is set to the following values:
*
* * ERANGE: `fd` is negative or higher than `maxclients` Redis config.
* * EINVAL: `mask` value is invalid.
*/
int RM_EventLoopDel(int fd, int mask) {
if (fd < 0 || fd >= aeGetSetSize(server.el)) {
errno = ERANGE;
return REDISMODULE_ERR;
}
if (mask & ~(REDISMODULE_EVENTLOOP_READABLE |
REDISMODULE_EVENTLOOP_WRITABLE)) {
errno = EINVAL;
return REDISMODULE_ERR;
}
/* After deleting the event, if fd does not have any registered event
* anymore, we can free the EventLoopData object. */
EventLoopData *data = aeGetFileClientData(server.el, fd);
aeDeleteFileEvent(server.el, fd, eventLoopToAeMask(mask));
if (aeGetFileEvents(server.el, fd) == AE_NONE)
zfree(data);
errno = 0;
return REDISMODULE_OK;
}
/* This function can be called from other threads to trigger callback on Redis
* main thread. On success REDISMODULE_OK is returned. If `func` is NULL
* REDISMODULE_ERR is returned and errno is set to EINVAL.
*/
int RM_EventLoopAddOneShot(RedisModuleEventLoopOneShotFunc func, void *user_data) {
if (!func) {
errno = EINVAL;
return REDISMODULE_ERR;
}
EventLoopOneShot *oneshot = zmalloc(sizeof(*oneshot));
oneshot->func = func;
oneshot->user_data = user_data;
pthread_mutex_lock(&moduleEventLoopMutex);
if (!moduleEventLoopOneShots) moduleEventLoopOneShots = listCreate();
listAddNodeTail(moduleEventLoopOneShots, oneshot);
pthread_mutex_unlock(&moduleEventLoopMutex);
if (write(server.module_pipe[1],"A",1) != 1) {
/* Pipe is non-blocking, write() may fail if it's full. */
}
errno = 0;
return REDISMODULE_OK;
}
/* This function will check the moduleEventLoopOneShots queue in order to
* call the callback for the registered oneshot events. */
static void eventLoopHandleOneShotEvents() {
pthread_mutex_lock(&moduleEventLoopMutex);
if (moduleEventLoopOneShots) {
while (listLength(moduleEventLoopOneShots)) {
listNode *ln = listFirst(moduleEventLoopOneShots);
EventLoopOneShot *oneshot = ln->value;
listDelNode(moduleEventLoopOneShots, ln);
/* Unlock mutex before the callback. Another oneshot event can be
* added in the callback, it will need to lock the mutex. */
pthread_mutex_unlock(&moduleEventLoopMutex);
oneshot->func(oneshot->user_data);
zfree(oneshot);
/* Lock again for the next iteration */
pthread_mutex_lock(&moduleEventLoopMutex);
}
}
pthread_mutex_unlock(&moduleEventLoopMutex);
}
/* --------------------------------------------------------------------------
* ## Modules ACL API
*
@ -8989,6 +9180,7 @@ static uint64_t moduleEventVersions[] = {
-1, /* REDISMODULE_EVENT_REPL_BACKUP */
-1, /* REDISMODULE_EVENT_FORK_CHILD */
-1, /* REDISMODULE_EVENT_REPL_ASYNC_LOAD */
-1, /* REDISMODULE_EVENT_EVENTLOOP */
};
/* Register to be notified, via a callback, when the specified server event
@ -9240,6 +9432,15 @@ static uint64_t moduleEventVersions[] = {
* * `REDISMODULE_SUBEVENT_FORK_CHILD_BORN`
* * `REDISMODULE_SUBEVENT_FORK_CHILD_DIED`
*
* * RedisModuleEvent_EventLoop
*
* Called on each event loop iteration, once just before the event loop goes
* to sleep or just after it wakes up.
* The following sub events are available:
*
* * `REDISMODULE_SUBEVENT_EVENTLOOP_BEFORE_SLEEP`
* * `REDISMODULE_SUBEVENT_EVENTLOOP_AFTER_SLEEP`
*
* The function returns REDISMODULE_OK if the module was successfully subscribed
* for the specified event. If the API is called from a wrong context or unsupported event
* is given then REDISMODULE_ERR is returned. */
@ -9315,6 +9516,8 @@ int RM_IsSubEventSupported(RedisModuleEvent event, int64_t subevent) {
return subevent < _REDISMODULE_SUBEVENT_REPL_ASYNC_LOAD_NEXT;
case REDISMODULE_EVENT_FORK_CHILD:
return subevent < _REDISMODULE_SUBEVENT_FORK_CHILD_NEXT;
case REDISMODULE_EVENT_EVENTLOOP:
return subevent < _REDISMODULE_SUBEVENT_EVENTLOOP_NEXT;
default:
break;
}
@ -9552,10 +9755,9 @@ void moduleInitModulesSystem(void) {
* and we do not want to block not in the read nor in the write half.
* Enable close-on-exec flag on pipes in case of the fork-exec system calls in
* sentinels or redis servers. */
if (anetPipe(server.module_blocked_pipe, O_CLOEXEC|O_NONBLOCK, O_CLOEXEC|O_NONBLOCK) == -1) {
if (anetPipe(server.module_pipe, O_CLOEXEC|O_NONBLOCK, O_CLOEXEC|O_NONBLOCK) == -1) {
serverLog(LL_WARNING,
"Can't create the pipe for module blocking commands: %s",
strerror(errno));
"Can't create the pipe for module threads: %s", strerror(errno));
exit(1);
}
@ -9829,6 +10031,19 @@ int moduleUnload(sds name) {
return C_OK;
}
void modulePipeReadable(aeEventLoop *el, int fd, void *privdata, int mask) {
UNUSED(el);
UNUSED(fd);
UNUSED(mask);
UNUSED(privdata);
char buf[128];
while (read(fd, buf, sizeof(buf)) == sizeof(buf));
/* Handle event loop events if pipe was written from event loop API */
eventLoopHandleOneShotEvents();
}
/* Helper function for the MODULE and HELLO command: send the list of the
* loaded modules to the client. */
void addReplyLoadedModules(client *c) {
@ -10734,4 +10949,7 @@ void moduleRegisterCoreAPI(void) {
REGISTER_API(SetCommandKeySpecBeginSearchKeyword);
REGISTER_API(SetCommandKeySpecFindKeysRange);
REGISTER_API(SetCommandKeySpecFindKeysKeynum);
REGISTER_API(EventLoopAdd);
REGISTER_API(EventLoopDel);
REGISTER_API(EventLoopAddOneShot);
}

View File

@ -257,6 +257,12 @@ typedef enum {
#define REDISMODULE_CMD_ARG_MULTIPLE (1<<1) /* The argument may repeat itself (like key in DEL) */
#define REDISMODULE_CMD_ARG_MULTIPLE_TOKEN (1<<2) /* The argument may repeat itself, and so does its token (like `GET pattern` in SORT) */
/* Eventloop definitions. */
#define REDISMODULE_EVENTLOOP_READABLE 1
#define REDISMODULE_EVENTLOOP_WRITABLE 2
typedef void (*RedisModuleEventLoopFunc)(int fd, void *user_data, int mask);
typedef void (*RedisModuleEventLoopOneShotFunc)(void *user_data);
/* Server events definitions.
* Those flags should not be used directly by the module, instead
* the module should use RedisModuleEvent_* variables.
@ -276,7 +282,8 @@ typedef enum {
#define REDISMODULE_EVENT_REPL_BACKUP 12 /* Deprecated since Redis 7.0, not used anymore. */
#define REDISMODULE_EVENT_FORK_CHILD 13
#define REDISMODULE_EVENT_REPL_ASYNC_LOAD 14
#define _REDISMODULE_EVENT_NEXT 15 /* Next event flag, should be updated if a new event added. */
#define REDISMODULE_EVENT_EVENTLOOP 15
#define _REDISMODULE_EVENT_NEXT 16 /* Next event flag, should be updated if a new event added. */
typedef struct RedisModuleEvent {
uint64_t id; /* REDISMODULE_EVENT_... defines. */
@ -375,6 +382,10 @@ static const RedisModuleEvent
RedisModuleEvent_ForkChild = {
REDISMODULE_EVENT_FORK_CHILD,
1
},
RedisModuleEvent_EventLoop = {
REDISMODULE_EVENT_EVENTLOOP,
1
};
/* Those are values that are used for the 'subevent' callback argument. */
@ -436,6 +447,10 @@ static const RedisModuleEvent
#define REDISMODULE_SUBEVENT_FORK_CHILD_DIED 1
#define _REDISMODULE_SUBEVENT_FORK_CHILD_NEXT 2
#define REDISMODULE_SUBEVENT_EVENTLOOP_BEFORE_SLEEP 0
#define REDISMODULE_SUBEVENT_EVENTLOOP_AFTER_SLEEP 1
#define _REDISMODULE_SUBEVENT_EVENTLOOP_NEXT 2
#define _REDISMODULE_SUBEVENT_SHUTDOWN_NEXT 0
#define _REDISMODULE_SUBEVENT_CRON_LOOP_NEXT 0
#define _REDISMODULE_SUBEVENT_SWAPDB_NEXT 0
@ -976,6 +991,9 @@ REDISMODULE_API int (*RedisModule_DefragCursorSet)(RedisModuleDefragCtx *ctx, un
REDISMODULE_API int (*RedisModule_DefragCursorGet)(RedisModuleDefragCtx *ctx, unsigned long *cursor) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_GetDbIdFromDefragCtx)(RedisModuleDefragCtx *ctx) REDISMODULE_ATTR;
REDISMODULE_API const RedisModuleString * (*RedisModule_GetKeyNameFromDefragCtx)(RedisModuleDefragCtx *ctx) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_EventLoopAdd)(int fd, int mask, RedisModuleEventLoopFunc func, void *user_data) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_EventLoopDel)(int fd, int mask) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_EventLoopAddOneShot)(RedisModuleEventLoopOneShotFunc func, void *user_data) REDISMODULE_ATTR;
#define RedisModule_IsAOFClient(id) ((id) == UINT64_MAX)
@ -1295,6 +1313,9 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int
REDISMODULE_GET_API(DefragCursorGet);
REDISMODULE_GET_API(GetKeyNameFromDefragCtx);
REDISMODULE_GET_API(GetDbIdFromDefragCtx);
REDISMODULE_GET_API(EventLoopAdd);
REDISMODULE_GET_API(EventLoopDel);
REDISMODULE_GET_API(EventLoopAddOneShot);
if (RedisModule_IsModuleNameBusy && RedisModule_IsModuleNameBusy(name)) return REDISMODULE_ERR;
RedisModule_SetModuleAttribs(ctx,name,ver,apiver);

View File

@ -1481,7 +1481,12 @@ void beforeSleep(struct aeEventLoop *eventLoop) {
/* Check if there are clients unblocked by modules that implement
* blocking commands. */
if (moduleCount()) moduleHandleBlockedClients();
if (moduleCount()) {
moduleFireServerEvent(REDISMODULE_EVENT_EVENTLOOP,
REDISMODULE_SUBEVENT_EVENTLOOP_BEFORE_SLEEP,
NULL);
moduleHandleBlockedClients();
}
/* Try to process pending commands for clients that were just unblocked. */
if (listLength(server.unblocked_clients))
@ -1561,7 +1566,9 @@ void afterSleep(struct aeEventLoop *eventLoop) {
latencyStartMonitor(latency);
moduleAcquireGIL();
moduleFireServerEvent(REDISMODULE_EVENT_EVENTLOOP,
REDISMODULE_SUBEVENT_EVENTLOOP_AFTER_SLEEP,
NULL);
latencyEndMonitor(latency);
latencyAddSampleIfNeeded("module-acquire-GIL",latency);
}
@ -2475,12 +2482,11 @@ void initServer(void) {
/* Register a readable event for the pipe used to awake the event loop
* when a blocked client in a module needs attention. */
if (aeCreateFileEvent(server.el, server.module_blocked_pipe[0], AE_READABLE,
moduleBlockedClientPipeReadable,NULL) == AE_ERR) {
* from module threads. */
if (aeCreateFileEvent(server.el, server.module_pipe[0], AE_READABLE,
modulePipeReadable,NULL) == AE_ERR) {
serverPanic(
"Error registering the readable event for the module "
"blocked clients subsystem.");
"Error registering the readable event for the module pipe.");
}
/* Register before and after sleep handlers (note this needs to be done

View File

@ -1448,9 +1448,7 @@ struct redisServer {
dict *sharedapi; /* Like moduleapi but containing the APIs that
modules share with each other. */
list *loadmodule_queue; /* List of modules to load at startup. */
int module_blocked_pipe[2]; /* Pipe used to awake the event loop if a
client blocked on a module command needs
to be processed. */
int module_pipe[2]; /* Pipe used to awake the event loop by module threads. */
pid_t child_pid; /* PID of current child */
int child_type; /* Type of current child */
/* Networking */
@ -2314,7 +2312,7 @@ void moduleFreeContext(struct RedisModuleCtx *ctx);
void unblockClientFromModule(client *c);
void moduleHandleBlockedClients(void);
void moduleBlockedClientTimedOut(client *c);
void moduleBlockedClientPipeReadable(aeEventLoop *el, int fd, void *privdata, int mask);
void modulePipeReadable(aeEventLoop *el, int fd, void *privdata, int mask);
size_t moduleCount(void);
void moduleAcquireGIL(void);
int moduleTryAcquireGIL(void);

View File

@ -50,7 +50,8 @@ TEST_MODULES = \
aclcheck.so \
list.so \
subcommands.so \
reply.so
reply.so \
eventloop.so
.PHONY: all

277
tests/modules/eventloop.c Normal file
View File

@ -0,0 +1,277 @@
/* This module contains four tests :
* 1- test.sanity : Basic tests for argument validation mostly.
* 2- test.sendbytes : Creates a pipe and registers its fds to the event loop,
* one end of the pipe for read events and the other end for
* the write events. On writable event, data is written. On
* readable event data is read. Repeated until all data is
* received.
* 3- test.iteration : A test for BEFORE_SLEEP and AFTER_SLEEP callbacks.
* Counters are incremented each time these events are
* fired. They should be equal and increment monotonically.
* 4- test.oneshot : Test for oneshot API
*/
#define REDISMODULE_EXPERIMENTAL_API
#include "redismodule.h"
#include <stdlib.h>
#include <unistd.h>
#include <fcntl.h>
#include <memory.h>
#include <errno.h>
int fds[2];
long long buf_size;
char *src;
long long src_offset;
char *dst;
long long dst_offset;
RedisModuleBlockedClient *bc;
RedisModuleCtx *reply_ctx;
void onReadable(int fd, void *user_data, int mask) {
REDISMODULE_NOT_USED(mask);
RedisModule_Assert(strcmp(user_data, "userdataread") == 0);
while (1) {
int rd = read(fd, dst + dst_offset, buf_size - dst_offset);
if (rd <= 0)
return;
dst_offset += rd;
/* Received all bytes */
if (dst_offset == buf_size) {
if (memcmp(src, dst, buf_size) == 0)
RedisModule_ReplyWithSimpleString(reply_ctx, "OK");
else
RedisModule_ReplyWithError(reply_ctx, "ERR bytes mismatch");
RedisModule_EventLoopDel(fds[0], REDISMODULE_EVENTLOOP_READABLE);
RedisModule_EventLoopDel(fds[1], REDISMODULE_EVENTLOOP_WRITABLE);
RedisModule_Free(src);
RedisModule_Free(dst);
close(fds[0]);
close(fds[1]);
RedisModule_FreeThreadSafeContext(reply_ctx);
RedisModule_UnblockClient(bc, NULL);
return;
}
};
}
void onWritable(int fd, void *user_data, int mask) {
REDISMODULE_NOT_USED(user_data);
REDISMODULE_NOT_USED(mask);
RedisModule_Assert(strcmp(user_data, "userdatawrite") == 0);
while (1) {
/* Check if we sent all data */
if (src_offset >= buf_size)
return;
int written = write(fd, src + src_offset, buf_size - src_offset);
if (written <= 0) {
return;
}
src_offset += written;
};
}
/* Create a pipe(), register pipe fds to the event loop and send/receive data
* using them. */
int sendbytes(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
if (argc != 2) {
RedisModule_WrongArity(ctx);
return REDISMODULE_OK;
}
if (RedisModule_StringToLongLong(argv[1], &buf_size) != REDISMODULE_OK ||
buf_size == 0) {
RedisModule_ReplyWithError(ctx, "Invalid integer value");
return REDISMODULE_OK;
}
bc = RedisModule_BlockClient(ctx, NULL, NULL, NULL, 0);
reply_ctx = RedisModule_GetThreadSafeContext(bc);
/* Allocate source buffer and write some random data */
src = RedisModule_Calloc(1,buf_size);
src_offset = 0;
memset(src, rand() % 0xFF, buf_size);
memcpy(src, "randomtestdata", strlen("randomtestdata"));
dst = RedisModule_Calloc(1,buf_size);
dst_offset = 0;
/* Create a pipe and register it to the event loop. */
if (pipe(fds) < 0) return REDISMODULE_ERR;
if (fcntl(fds[0], F_SETFL, O_NONBLOCK) < 0) return REDISMODULE_ERR;
if (fcntl(fds[1], F_SETFL, O_NONBLOCK) < 0) return REDISMODULE_ERR;
if (RedisModule_EventLoopAdd(fds[0], REDISMODULE_EVENTLOOP_READABLE,
onReadable, "userdataread") != REDISMODULE_OK) return REDISMODULE_ERR;
if (RedisModule_EventLoopAdd(fds[1], REDISMODULE_EVENTLOOP_WRITABLE,
onWritable, "userdatawrite") != REDISMODULE_OK) return REDISMODULE_ERR;
return REDISMODULE_OK;
}
int sanity(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
REDISMODULE_NOT_USED(argv);
REDISMODULE_NOT_USED(argc);
if (pipe(fds) < 0) return REDISMODULE_ERR;
if (RedisModule_EventLoopAdd(fds[0], 9999999, onReadable, NULL)
== REDISMODULE_OK || errno != EINVAL) {
RedisModule_ReplyWithError(ctx, "ERR non-existing event type should fail");
goto out;
}
if (RedisModule_EventLoopAdd(-1, REDISMODULE_EVENTLOOP_READABLE, onReadable, NULL)
== REDISMODULE_OK || errno != ERANGE) {
RedisModule_ReplyWithError(ctx, "ERR out of range fd should fail");
goto out;
}
if (RedisModule_EventLoopAdd(99999999, REDISMODULE_EVENTLOOP_READABLE, onReadable, NULL)
== REDISMODULE_OK || errno != ERANGE) {
RedisModule_ReplyWithError(ctx, "ERR out of range fd should fail");
goto out;
}
if (RedisModule_EventLoopAdd(fds[0], REDISMODULE_EVENTLOOP_READABLE, NULL, NULL)
== REDISMODULE_OK || errno != EINVAL) {
RedisModule_ReplyWithError(ctx, "ERR null callback should fail");
goto out;
}
if (RedisModule_EventLoopAdd(fds[0], 9999999, onReadable, NULL)
== REDISMODULE_OK || errno != EINVAL) {
RedisModule_ReplyWithError(ctx, "ERR non-existing event type should fail");
goto out;
}
if (RedisModule_EventLoopDel(fds[0], REDISMODULE_EVENTLOOP_READABLE)
!= REDISMODULE_OK || errno != 0) {
RedisModule_ReplyWithError(ctx, "ERR del on non-registered fd should not fail");
goto out;
}
if (RedisModule_EventLoopDel(fds[0], 9999999) == REDISMODULE_OK ||
errno != EINVAL) {
RedisModule_ReplyWithError(ctx, "ERR non-existing event type should fail");
goto out;
}
if (RedisModule_EventLoopDel(-1, REDISMODULE_EVENTLOOP_READABLE)
== REDISMODULE_OK || errno != ERANGE) {
RedisModule_ReplyWithError(ctx, "ERR out of range fd should fail");
goto out;
}
if (RedisModule_EventLoopDel(99999999, REDISMODULE_EVENTLOOP_READABLE)
== REDISMODULE_OK || errno != ERANGE) {
RedisModule_ReplyWithError(ctx, "ERR out of range fd should fail");
goto out;
}
if (RedisModule_EventLoopAdd(fds[0], REDISMODULE_EVENTLOOP_READABLE, onReadable, NULL)
!= REDISMODULE_OK || errno != 0) {
RedisModule_ReplyWithError(ctx, "ERR Add failed");
goto out;
}
if (RedisModule_EventLoopAdd(fds[0], REDISMODULE_EVENTLOOP_READABLE, onReadable, NULL)
!= REDISMODULE_OK || errno != 0) {
RedisModule_ReplyWithError(ctx, "ERR Adding same fd twice failed");
goto out;
}
if (RedisModule_EventLoopDel(fds[0], REDISMODULE_EVENTLOOP_READABLE)
!= REDISMODULE_OK || errno != 0) {
RedisModule_ReplyWithError(ctx, "ERR Del failed");
goto out;
}
if (RedisModule_EventLoopAddOneShot(NULL, NULL) == REDISMODULE_OK || errno != EINVAL) {
RedisModule_ReplyWithError(ctx, "ERR null callback should fail");
goto out;
}
RedisModule_ReplyWithSimpleString(ctx, "OK");
out:
close(fds[0]);
close(fds[1]);
return REDISMODULE_OK;
}
static long long beforeSleepCount;
static long long afterSleepCount;
int iteration(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
REDISMODULE_NOT_USED(argv);
REDISMODULE_NOT_USED(argc);
/* On each event loop iteration, eventloopCallback() is called. We increment
* beforeSleepCount and afterSleepCount, so these two should be equal.
* We reply with iteration count, caller can test if iteration count
* increments monotonically */
RedisModule_Assert(beforeSleepCount == afterSleepCount);
RedisModule_ReplyWithLongLong(ctx, beforeSleepCount);
return REDISMODULE_OK;
}
void oneshotCallback(void* arg)
{
RedisModule_Assert(strcmp(arg, "userdata") == 0);
RedisModule_ReplyWithSimpleString(reply_ctx, "OK");
RedisModule_FreeThreadSafeContext(reply_ctx);
RedisModule_UnblockClient(bc, NULL);
}
int oneshot(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
REDISMODULE_NOT_USED(argv);
REDISMODULE_NOT_USED(argc);
bc = RedisModule_BlockClient(ctx, NULL, NULL, NULL, 0);
reply_ctx = RedisModule_GetThreadSafeContext(bc);
if (RedisModule_EventLoopAddOneShot(oneshotCallback, "userdata") != REDISMODULE_OK) {
RedisModule_ReplyWithError(ctx, "ERR oneshot failed");
RedisModule_FreeThreadSafeContext(reply_ctx);
RedisModule_UnblockClient(bc, NULL);
}
return REDISMODULE_OK;
}
void eventloopCallback(struct RedisModuleCtx *ctx, RedisModuleEvent eid, uint64_t subevent, void *data) {
REDISMODULE_NOT_USED(ctx);
REDISMODULE_NOT_USED(eid);
REDISMODULE_NOT_USED(subevent);
REDISMODULE_NOT_USED(data);
RedisModule_Assert(eid.id == REDISMODULE_EVENT_EVENTLOOP);
if (subevent == REDISMODULE_SUBEVENT_EVENTLOOP_BEFORE_SLEEP)
beforeSleepCount++;
else if (subevent == REDISMODULE_SUBEVENT_EVENTLOOP_AFTER_SLEEP)
afterSleepCount++;
}
int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
REDISMODULE_NOT_USED(argv);
REDISMODULE_NOT_USED(argc);
if (RedisModule_Init(ctx,"eventloop",1,REDISMODULE_APIVER_1)
== REDISMODULE_ERR) return REDISMODULE_ERR;
/* Test basics. */
if (RedisModule_CreateCommand(ctx, "test.sanity", sanity, "", 0, 0, 0)
== REDISMODULE_ERR) return REDISMODULE_ERR;
/* Register a command to create a pipe() and send data through it by using
* event loop API. */
if (RedisModule_CreateCommand(ctx, "test.sendbytes", sendbytes, "", 0, 0, 0)
== REDISMODULE_ERR) return REDISMODULE_ERR;
/* Register a command to return event loop iteration count. */
if (RedisModule_CreateCommand(ctx, "test.iteration", iteration, "", 0, 0, 0)
== REDISMODULE_ERR) return REDISMODULE_ERR;
if (RedisModule_CreateCommand(ctx, "test.oneshot", oneshot, "", 0, 0, 0)
== REDISMODULE_ERR) return REDISMODULE_ERR;
if (RedisModule_SubscribeToServerEvent(ctx, RedisModuleEvent_EventLoop,
eventloopCallback) != REDISMODULE_OK) return REDISMODULE_ERR;
return REDISMODULE_OK;
}

View File

@ -0,0 +1,25 @@
set testmodule [file normalize tests/modules/eventloop.so]
start_server {tags {"modules"}} {
r module load $testmodule
test "Module eventloop sendbytes" {
assert_match "OK" [r test.sendbytes 10000000]
assert_match "OK" [r test.sendbytes 2000000]
assert_match "OK" [r test.sendbytes 800000000]
}
test "Module eventloop iteration" {
set iteration [r test.iteration]
set next_iteration [r test.iteration]
assert {$next_iteration > $iteration}
}
test "Module eventloop sanity" {
r test.sanity
}
test "Module eventloop oneshot" {
r test.oneshot
}
}