redict/tests/modules/eventloop.c

283 lines
10 KiB
C
Raw Normal View History

2024-03-21 09:30:47 -04:00
// SPDX-FileCopyrightText: 2024 Redict Contributors
// SPDX-FileCopyrightText: 2024 Salvatore Sanfilippo <antirez at gmail dot com>
//
// SPDX-License-Identifier: BSD-3-Clause
// SPDX-License-Identifier: GPL-3.0-only
Add event loop support to the module API (#10001) Modules can now register sockets/pipe to the Redis main thread event loop and do network operations asynchronously. Previously, modules had to maintain an event loop and another thread for asynchronous network operations. Also, if a module is calling API functions after doing some network operations, it had to synchronize its event loop thread's access with Redis main thread by locking the GIL, causing contention on the lock. After this commit, no synchronization is needed as module can operate in Redis main thread context. So, this commit may improve the performance for some use cases. Added three functions to the module API: * RedisModule_EventLoopAdd(int fd, int mask, RedisModuleEventLoopFunc func, void *user_data) * RedisModule_EventLoopDel(int fd, int mask) * RedisModule_EventLoopAddOneShot(RedisModuleEventLoopOneShotFunc func, void *user_data) - This function can be called from other threads to trigger callback on Redis main thread. Callback will be triggered only once. If Redis main thread is sleeping, this call will wake up the Redis main thread. Event loop callbacks are called by Redis main thread after locking the GIL. Inside callbacks, modules can operate as if they are holding the GIL. Added REDISMODULE_EVENT_EVENTLOOP event with two subevents: * REDISMODULE_SUBEVENT_EVENTLOOP_BEFORE_SLEEP * REDISMODULE_SUBEVENT_EVENTLOOP_AFTER_SLEEP These events are for modules that want to participate in the before and after sleep action. e.g It might be useful to implement batching : Read data from the network, write all to a file in one go on BEFORE_SLEEP event.
2022-01-18 06:10:07 -05:00
/* This module contains four tests :
* 1- test.sanity : Basic tests for argument validation mostly.
* 2- test.sendbytes : Creates a pipe and registers its fds to the event loop,
* one end of the pipe for read events and the other end for
* the write events. On writable event, data is written. On
* readable event data is read. Repeated until all data is
* received.
* 3- test.iteration : A test for BEFORE_SLEEP and AFTER_SLEEP callbacks.
* Counters are incremented each time these events are
* fired. They should be equal and increment monotonically.
* 4- test.oneshot : Test for oneshot API
*/
2024-03-21 05:49:18 -04:00
#include "redictmodule.h"
Add event loop support to the module API (#10001) Modules can now register sockets/pipe to the Redis main thread event loop and do network operations asynchronously. Previously, modules had to maintain an event loop and another thread for asynchronous network operations. Also, if a module is calling API functions after doing some network operations, it had to synchronize its event loop thread's access with Redis main thread by locking the GIL, causing contention on the lock. After this commit, no synchronization is needed as module can operate in Redis main thread context. So, this commit may improve the performance for some use cases. Added three functions to the module API: * RedisModule_EventLoopAdd(int fd, int mask, RedisModuleEventLoopFunc func, void *user_data) * RedisModule_EventLoopDel(int fd, int mask) * RedisModule_EventLoopAddOneShot(RedisModuleEventLoopOneShotFunc func, void *user_data) - This function can be called from other threads to trigger callback on Redis main thread. Callback will be triggered only once. If Redis main thread is sleeping, this call will wake up the Redis main thread. Event loop callbacks are called by Redis main thread after locking the GIL. Inside callbacks, modules can operate as if they are holding the GIL. Added REDISMODULE_EVENT_EVENTLOOP event with two subevents: * REDISMODULE_SUBEVENT_EVENTLOOP_BEFORE_SLEEP * REDISMODULE_SUBEVENT_EVENTLOOP_AFTER_SLEEP These events are for modules that want to participate in the before and after sleep action. e.g It might be useful to implement batching : Read data from the network, write all to a file in one go on BEFORE_SLEEP event.
2022-01-18 06:10:07 -05:00
#include <stdlib.h>
#include <unistd.h>
#include <fcntl.h>
#include <memory.h>
#include <errno.h>
int fds[2];
long long buf_size;
char *src;
long long src_offset;
char *dst;
long long dst_offset;
RedisModuleBlockedClient *bc;
RedisModuleCtx *reply_ctx;
void onReadable(int fd, void *user_data, int mask) {
REDISMODULE_NOT_USED(mask);
RedisModule_Assert(strcmp(user_data, "userdataread") == 0);
while (1) {
int rd = read(fd, dst + dst_offset, buf_size - dst_offset);
if (rd <= 0)
return;
dst_offset += rd;
/* Received all bytes */
if (dst_offset == buf_size) {
if (memcmp(src, dst, buf_size) == 0)
RedisModule_ReplyWithSimpleString(reply_ctx, "OK");
else
RedisModule_ReplyWithError(reply_ctx, "ERR bytes mismatch");
RedisModule_EventLoopDel(fds[0], REDISMODULE_EVENTLOOP_READABLE);
RedisModule_EventLoopDel(fds[1], REDISMODULE_EVENTLOOP_WRITABLE);
RedisModule_Free(src);
RedisModule_Free(dst);
close(fds[0]);
close(fds[1]);
RedisModule_FreeThreadSafeContext(reply_ctx);
RedisModule_UnblockClient(bc, NULL);
return;
}
};
}
void onWritable(int fd, void *user_data, int mask) {
REDISMODULE_NOT_USED(user_data);
REDISMODULE_NOT_USED(mask);
RedisModule_Assert(strcmp(user_data, "userdatawrite") == 0);
while (1) {
/* Check if we sent all data */
if (src_offset >= buf_size)
return;
int written = write(fd, src + src_offset, buf_size - src_offset);
if (written <= 0) {
return;
}
src_offset += written;
};
}
/* Create a pipe(), register pipe fds to the event loop and send/receive data
* using them. */
int sendbytes(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
if (argc != 2) {
RedisModule_WrongArity(ctx);
return REDISMODULE_OK;
}
if (RedisModule_StringToLongLong(argv[1], &buf_size) != REDISMODULE_OK ||
buf_size == 0) {
RedisModule_ReplyWithError(ctx, "Invalid integer value");
return REDISMODULE_OK;
}
bc = RedisModule_BlockClient(ctx, NULL, NULL, NULL, 0);
reply_ctx = RedisModule_GetThreadSafeContext(bc);
/* Allocate source buffer and write some random data */
src = RedisModule_Calloc(1,buf_size);
src_offset = 0;
memset(src, rand() % 0xFF, buf_size);
memcpy(src, "randomtestdata", strlen("randomtestdata"));
dst = RedisModule_Calloc(1,buf_size);
dst_offset = 0;
/* Create a pipe and register it to the event loop. */
if (pipe(fds) < 0) return REDISMODULE_ERR;
if (fcntl(fds[0], F_SETFL, O_NONBLOCK) < 0) return REDISMODULE_ERR;
if (fcntl(fds[1], F_SETFL, O_NONBLOCK) < 0) return REDISMODULE_ERR;
if (RedisModule_EventLoopAdd(fds[0], REDISMODULE_EVENTLOOP_READABLE,
onReadable, "userdataread") != REDISMODULE_OK) return REDISMODULE_ERR;
if (RedisModule_EventLoopAdd(fds[1], REDISMODULE_EVENTLOOP_WRITABLE,
onWritable, "userdatawrite") != REDISMODULE_OK) return REDISMODULE_ERR;
return REDISMODULE_OK;
}
int sanity(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
REDISMODULE_NOT_USED(argv);
REDISMODULE_NOT_USED(argc);
if (pipe(fds) < 0) return REDISMODULE_ERR;
if (RedisModule_EventLoopAdd(fds[0], 9999999, onReadable, NULL)
== REDISMODULE_OK || errno != EINVAL) {
RedisModule_ReplyWithError(ctx, "ERR non-existing event type should fail");
goto out;
}
if (RedisModule_EventLoopAdd(-1, REDISMODULE_EVENTLOOP_READABLE, onReadable, NULL)
== REDISMODULE_OK || errno != ERANGE) {
RedisModule_ReplyWithError(ctx, "ERR out of range fd should fail");
goto out;
}
if (RedisModule_EventLoopAdd(99999999, REDISMODULE_EVENTLOOP_READABLE, onReadable, NULL)
== REDISMODULE_OK || errno != ERANGE) {
RedisModule_ReplyWithError(ctx, "ERR out of range fd should fail");
goto out;
}
if (RedisModule_EventLoopAdd(fds[0], REDISMODULE_EVENTLOOP_READABLE, NULL, NULL)
== REDISMODULE_OK || errno != EINVAL) {
RedisModule_ReplyWithError(ctx, "ERR null callback should fail");
goto out;
}
if (RedisModule_EventLoopAdd(fds[0], 9999999, onReadable, NULL)
== REDISMODULE_OK || errno != EINVAL) {
RedisModule_ReplyWithError(ctx, "ERR non-existing event type should fail");
goto out;
}
if (RedisModule_EventLoopDel(fds[0], REDISMODULE_EVENTLOOP_READABLE)
!= REDISMODULE_OK || errno != 0) {
RedisModule_ReplyWithError(ctx, "ERR del on non-registered fd should not fail");
goto out;
}
if (RedisModule_EventLoopDel(fds[0], 9999999) == REDISMODULE_OK ||
errno != EINVAL) {
RedisModule_ReplyWithError(ctx, "ERR non-existing event type should fail");
goto out;
}
if (RedisModule_EventLoopDel(-1, REDISMODULE_EVENTLOOP_READABLE)
== REDISMODULE_OK || errno != ERANGE) {
RedisModule_ReplyWithError(ctx, "ERR out of range fd should fail");
goto out;
}
if (RedisModule_EventLoopDel(99999999, REDISMODULE_EVENTLOOP_READABLE)
== REDISMODULE_OK || errno != ERANGE) {
RedisModule_ReplyWithError(ctx, "ERR out of range fd should fail");
goto out;
}
if (RedisModule_EventLoopAdd(fds[0], REDISMODULE_EVENTLOOP_READABLE, onReadable, NULL)
!= REDISMODULE_OK || errno != 0) {
RedisModule_ReplyWithError(ctx, "ERR Add failed");
goto out;
}
if (RedisModule_EventLoopAdd(fds[0], REDISMODULE_EVENTLOOP_READABLE, onReadable, NULL)
!= REDISMODULE_OK || errno != 0) {
RedisModule_ReplyWithError(ctx, "ERR Adding same fd twice failed");
goto out;
}
if (RedisModule_EventLoopDel(fds[0], REDISMODULE_EVENTLOOP_READABLE)
!= REDISMODULE_OK || errno != 0) {
RedisModule_ReplyWithError(ctx, "ERR Del failed");
goto out;
}
if (RedisModule_EventLoopAddOneShot(NULL, NULL) == REDISMODULE_OK || errno != EINVAL) {
RedisModule_ReplyWithError(ctx, "ERR null callback should fail");
goto out;
}
RedisModule_ReplyWithSimpleString(ctx, "OK");
out:
close(fds[0]);
close(fds[1]);
return REDISMODULE_OK;
}
static long long beforeSleepCount;
static long long afterSleepCount;
int iteration(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
REDISMODULE_NOT_USED(argv);
REDISMODULE_NOT_USED(argc);
/* On each event loop iteration, eventloopCallback() is called. We increment
* beforeSleepCount and afterSleepCount, so these two should be equal.
* We reply with iteration count, caller can test if iteration count
* increments monotonically */
RedisModule_Assert(beforeSleepCount == afterSleepCount);
RedisModule_ReplyWithLongLong(ctx, beforeSleepCount);
return REDISMODULE_OK;
}
void oneshotCallback(void* arg)
{
RedisModule_Assert(strcmp(arg, "userdata") == 0);
RedisModule_ReplyWithSimpleString(reply_ctx, "OK");
RedisModule_FreeThreadSafeContext(reply_ctx);
RedisModule_UnblockClient(bc, NULL);
}
int oneshot(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
REDISMODULE_NOT_USED(argv);
REDISMODULE_NOT_USED(argc);
bc = RedisModule_BlockClient(ctx, NULL, NULL, NULL, 0);
reply_ctx = RedisModule_GetThreadSafeContext(bc);
if (RedisModule_EventLoopAddOneShot(oneshotCallback, "userdata") != REDISMODULE_OK) {
RedisModule_ReplyWithError(ctx, "ERR oneshot failed");
RedisModule_FreeThreadSafeContext(reply_ctx);
RedisModule_UnblockClient(bc, NULL);
}
return REDISMODULE_OK;
}
void eventloopCallback(struct RedisModuleCtx *ctx, RedisModuleEvent eid, uint64_t subevent, void *data) {
REDISMODULE_NOT_USED(ctx);
REDISMODULE_NOT_USED(eid);
REDISMODULE_NOT_USED(subevent);
REDISMODULE_NOT_USED(data);
RedisModule_Assert(eid.id == REDISMODULE_EVENT_EVENTLOOP);
if (subevent == REDISMODULE_SUBEVENT_EVENTLOOP_BEFORE_SLEEP)
beforeSleepCount++;
else if (subevent == REDISMODULE_SUBEVENT_EVENTLOOP_AFTER_SLEEP)
afterSleepCount++;
}
int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
REDISMODULE_NOT_USED(argv);
REDISMODULE_NOT_USED(argc);
if (RedisModule_Init(ctx,"eventloop",1,REDISMODULE_APIVER_1)
== REDISMODULE_ERR) return REDISMODULE_ERR;
/* Test basics. */
if (RedisModule_CreateCommand(ctx, "test.sanity", sanity, "", 0, 0, 0)
== REDISMODULE_ERR) return REDISMODULE_ERR;
/* Register a command to create a pipe() and send data through it by using
* event loop API. */
if (RedisModule_CreateCommand(ctx, "test.sendbytes", sendbytes, "", 0, 0, 0)
== REDISMODULE_ERR) return REDISMODULE_ERR;
/* Register a command to return event loop iteration count. */
if (RedisModule_CreateCommand(ctx, "test.iteration", iteration, "", 0, 0, 0)
== REDISMODULE_ERR) return REDISMODULE_ERR;
if (RedisModule_CreateCommand(ctx, "test.oneshot", oneshot, "", 0, 0, 0)
== REDISMODULE_ERR) return REDISMODULE_ERR;
if (RedisModule_SubscribeToServerEvent(ctx, RedisModuleEvent_EventLoop,
eventloopCallback) != REDISMODULE_OK) return REDISMODULE_ERR;
return REDISMODULE_OK;
}