mirror of
https://codeberg.org/redict/redict.git
synced 2025-01-22 08:08:53 -05:00
cdd60793d5
Non-API impacting renames. Signed-off-by: Drew DeVault <sir@cmpwn.com>
727 lines
26 KiB
C
727 lines
26 KiB
C
// SPDX-FileCopyrightText: 2024 Redict Contributors
|
|
// SPDX-FileCopyrightText: 2024 Salvatore Sanfilippo <antirez at gmail dot com>
|
|
//
|
|
// SPDX-License-Identifier: BSD-3-Clause
|
|
// SPDX-License-Identifier: LGPL-3.0-only
|
|
|
|
/* define macros for having usleep */
|
|
#define _BSD_SOURCE
|
|
#define _DEFAULT_SOURCE
|
|
#include <unistd.h>
|
|
|
|
#include "redictmodule.h"
|
|
#include <assert.h>
|
|
#include <stdio.h>
|
|
#include <pthread.h>
|
|
#include <strings.h>
|
|
|
|
#define UNUSED(V) ((void) V)
|
|
|
|
/* used to test processing events during slow bg operation */
|
|
static volatile int g_slow_bg_operation = 0;
|
|
static volatile int g_is_in_slow_bg_operation = 0;
|
|
|
|
void *sub_worker(void *arg) {
|
|
// Get Redict module context
|
|
RedictModuleCtx *ctx = (RedictModuleCtx *)arg;
|
|
|
|
// Try acquiring GIL
|
|
int res = RedictModule_ThreadSafeContextTryLock(ctx);
|
|
|
|
// GIL is already taken by the calling thread expecting to fail.
|
|
assert(res != REDICTMODULE_OK);
|
|
|
|
return NULL;
|
|
}
|
|
|
|
void *worker(void *arg) {
|
|
// Retrieve blocked client
|
|
RedictModuleBlockedClient *bc = (RedictModuleBlockedClient *)arg;
|
|
|
|
// Get Redict module context
|
|
RedictModuleCtx *ctx = RedictModule_GetThreadSafeContext(bc);
|
|
|
|
// Acquire GIL
|
|
RedictModule_ThreadSafeContextLock(ctx);
|
|
|
|
// Create another thread which will try to acquire the GIL
|
|
pthread_t tid;
|
|
int res = pthread_create(&tid, NULL, sub_worker, ctx);
|
|
assert(res == 0);
|
|
|
|
// Wait for thread
|
|
pthread_join(tid, NULL);
|
|
|
|
// Release GIL
|
|
RedictModule_ThreadSafeContextUnlock(ctx);
|
|
|
|
// Reply to client
|
|
RedictModule_ReplyWithSimpleString(ctx, "OK");
|
|
|
|
// Unblock client
|
|
RedictModule_UnblockClient(bc, NULL);
|
|
|
|
// Free the Redict module context
|
|
RedictModule_FreeThreadSafeContext(ctx);
|
|
|
|
return NULL;
|
|
}
|
|
|
|
int acquire_gil(RedictModuleCtx *ctx, RedictModuleString **argv, int argc)
|
|
{
|
|
UNUSED(argv);
|
|
UNUSED(argc);
|
|
|
|
int flags = RedictModule_GetContextFlags(ctx);
|
|
int allFlags = RedictModule_GetContextFlagsAll();
|
|
if ((allFlags & REDICTMODULE_CTX_FLAGS_MULTI) &&
|
|
(flags & REDICTMODULE_CTX_FLAGS_MULTI)) {
|
|
RedictModule_ReplyWithSimpleString(ctx, "Blocked client is not supported inside multi");
|
|
return REDICTMODULE_OK;
|
|
}
|
|
|
|
if ((allFlags & REDICTMODULE_CTX_FLAGS_DENY_BLOCKING) &&
|
|
(flags & REDICTMODULE_CTX_FLAGS_DENY_BLOCKING)) {
|
|
RedictModule_ReplyWithSimpleString(ctx, "Blocked client is not allowed");
|
|
return REDICTMODULE_OK;
|
|
}
|
|
|
|
/* This command handler tries to acquire the GIL twice
|
|
* once in the worker thread using "RedictModule_ThreadSafeContextLock"
|
|
* second in the sub-worker thread
|
|
* using "RedictModule_ThreadSafeContextTryLock"
|
|
* as the GIL is already locked. */
|
|
RedictModuleBlockedClient *bc = RedictModule_BlockClient(ctx, NULL, NULL, NULL, 0);
|
|
|
|
pthread_t tid;
|
|
int res = pthread_create(&tid, NULL, worker, bc);
|
|
assert(res == 0);
|
|
|
|
return REDICTMODULE_OK;
|
|
}
|
|
|
|
typedef struct {
|
|
RedictModuleString **argv;
|
|
int argc;
|
|
RedictModuleBlockedClient *bc;
|
|
} bg_call_data;
|
|
|
|
void *bg_call_worker(void *arg) {
|
|
bg_call_data *bg = arg;
|
|
RedictModuleBlockedClient *bc = bg->bc;
|
|
|
|
// Get Redict module context
|
|
RedictModuleCtx *ctx = RedictModule_GetThreadSafeContext(bg->bc);
|
|
|
|
// Acquire GIL
|
|
RedictModule_ThreadSafeContextLock(ctx);
|
|
|
|
// Test slow operation yielding
|
|
if (g_slow_bg_operation) {
|
|
g_is_in_slow_bg_operation = 1;
|
|
while (g_slow_bg_operation) {
|
|
RedictModule_Yield(ctx, REDICTMODULE_YIELD_FLAG_CLIENTS, "Slow module operation");
|
|
usleep(1000);
|
|
}
|
|
g_is_in_slow_bg_operation = 0;
|
|
}
|
|
|
|
// Call the command
|
|
const char *module_cmd = RedictModule_StringPtrLen(bg->argv[0], NULL);
|
|
int cmd_pos = 1;
|
|
RedictModuleString *format_redict_str = RedictModule_CreateString(NULL, "v", 1);
|
|
if (!strcasecmp(module_cmd, "do_bg_rm_call_format")) {
|
|
cmd_pos = 2;
|
|
size_t format_len;
|
|
const char *format = RedictModule_StringPtrLen(bg->argv[1], &format_len);
|
|
RedictModule_StringAppendBuffer(NULL, format_redict_str, format, format_len);
|
|
RedictModule_StringAppendBuffer(NULL, format_redict_str, "E", 1);
|
|
}
|
|
const char *format = RedictModule_StringPtrLen(format_redict_str, NULL);
|
|
const char *cmd = RedictModule_StringPtrLen(bg->argv[cmd_pos], NULL);
|
|
RedictModuleCallReply *rep = RedictModule_Call(ctx, cmd, format, bg->argv + cmd_pos + 1, bg->argc - cmd_pos - 1);
|
|
RedictModule_FreeString(NULL, format_redict_str);
|
|
|
|
/* Free the arguments within GIL to prevent simultaneous freeing in main thread. */
|
|
for (int i=0; i<bg->argc; i++)
|
|
RedictModule_FreeString(ctx, bg->argv[i]);
|
|
RedictModule_Free(bg->argv);
|
|
RedictModule_Free(bg);
|
|
|
|
// Release GIL
|
|
RedictModule_ThreadSafeContextUnlock(ctx);
|
|
|
|
// Reply to client
|
|
if (!rep) {
|
|
RedictModule_ReplyWithError(ctx, "NULL reply returned");
|
|
} else {
|
|
RedictModule_ReplyWithCallReply(ctx, rep);
|
|
RedictModule_FreeCallReply(rep);
|
|
}
|
|
|
|
// Unblock client
|
|
RedictModule_UnblockClient(bc, NULL);
|
|
|
|
// Free the Redict module context
|
|
RedictModule_FreeThreadSafeContext(ctx);
|
|
|
|
return NULL;
|
|
}
|
|
|
|
int do_bg_rm_call(RedictModuleCtx *ctx, RedictModuleString **argv, int argc)
|
|
{
|
|
UNUSED(argv);
|
|
UNUSED(argc);
|
|
|
|
/* Make sure we're not trying to block a client when we shouldn't */
|
|
int flags = RedictModule_GetContextFlags(ctx);
|
|
int allFlags = RedictModule_GetContextFlagsAll();
|
|
if ((allFlags & REDICTMODULE_CTX_FLAGS_MULTI) &&
|
|
(flags & REDICTMODULE_CTX_FLAGS_MULTI)) {
|
|
RedictModule_ReplyWithSimpleString(ctx, "Blocked client is not supported inside multi");
|
|
return REDICTMODULE_OK;
|
|
}
|
|
if ((allFlags & REDICTMODULE_CTX_FLAGS_DENY_BLOCKING) &&
|
|
(flags & REDICTMODULE_CTX_FLAGS_DENY_BLOCKING)) {
|
|
RedictModule_ReplyWithSimpleString(ctx, "Blocked client is not allowed");
|
|
return REDICTMODULE_OK;
|
|
}
|
|
|
|
/* Make a copy of the arguments and pass them to the thread. */
|
|
bg_call_data *bg = RedictModule_Alloc(sizeof(bg_call_data));
|
|
bg->argv = RedictModule_Alloc(sizeof(RedictModuleString*)*argc);
|
|
bg->argc = argc;
|
|
for (int i=0; i<argc; i++)
|
|
bg->argv[i] = RedictModule_HoldString(ctx, argv[i]);
|
|
|
|
/* Block the client */
|
|
bg->bc = RedictModule_BlockClient(ctx, NULL, NULL, NULL, 0);
|
|
|
|
/* Start a thread to handle the request */
|
|
pthread_t tid;
|
|
int res = pthread_create(&tid, NULL, bg_call_worker, bg);
|
|
assert(res == 0);
|
|
|
|
return REDICTMODULE_OK;
|
|
}
|
|
|
|
int do_rm_call(RedictModuleCtx *ctx, RedictModuleString **argv, int argc){
|
|
UNUSED(argv);
|
|
UNUSED(argc);
|
|
|
|
if(argc < 2){
|
|
return RedictModule_WrongArity(ctx);
|
|
}
|
|
|
|
const char* cmd = RedictModule_StringPtrLen(argv[1], NULL);
|
|
|
|
RedictModuleCallReply* rep = RedictModule_Call(ctx, cmd, "Ev", argv + 2, argc - 2);
|
|
if(!rep){
|
|
RedictModule_ReplyWithError(ctx, "NULL reply returned");
|
|
}else{
|
|
RedictModule_ReplyWithCallReply(ctx, rep);
|
|
RedictModule_FreeCallReply(rep);
|
|
}
|
|
|
|
return REDICTMODULE_OK;
|
|
}
|
|
|
|
static void rm_call_async_send_reply(RedictModuleCtx *ctx, RedictModuleCallReply *reply) {
|
|
RedictModule_ReplyWithCallReply(ctx, reply);
|
|
RedictModule_FreeCallReply(reply);
|
|
}
|
|
|
|
/* Called when the command that was blocked on 'RM_Call' gets unblocked
|
|
* and send the reply to the blocked client. */
|
|
static void rm_call_async_on_unblocked(RedictModuleCtx *ctx, RedictModuleCallReply *reply, void *private_data) {
|
|
UNUSED(ctx);
|
|
RedictModuleBlockedClient *bc = private_data;
|
|
RedictModuleCtx *bctx = RedictModule_GetThreadSafeContext(bc);
|
|
rm_call_async_send_reply(bctx, reply);
|
|
RedictModule_FreeThreadSafeContext(bctx);
|
|
RedictModule_UnblockClient(bc, RedictModule_BlockClientGetPrivateData(bc));
|
|
}
|
|
|
|
int do_rm_call_async_fire_and_forget(RedictModuleCtx *ctx, RedictModuleString **argv, int argc){
|
|
UNUSED(argv);
|
|
UNUSED(argc);
|
|
|
|
if(argc < 2){
|
|
return RedictModule_WrongArity(ctx);
|
|
}
|
|
const char* cmd = RedictModule_StringPtrLen(argv[1], NULL);
|
|
|
|
RedictModuleCallReply* rep = RedictModule_Call(ctx, cmd, "!KEv", argv + 2, argc - 2);
|
|
|
|
if(RedictModule_CallReplyType(rep) != REDICTMODULE_REPLY_PROMISE) {
|
|
RedictModule_ReplyWithCallReply(ctx, rep);
|
|
} else {
|
|
RedictModule_ReplyWithSimpleString(ctx, "Blocked");
|
|
}
|
|
RedictModule_FreeCallReply(rep);
|
|
|
|
return REDICTMODULE_OK;
|
|
}
|
|
|
|
static void do_rm_call_async_free_pd(RedictModuleCtx * ctx, void *pd) {
|
|
UNUSED(ctx);
|
|
RedictModule_FreeCallReply(pd);
|
|
}
|
|
|
|
static void do_rm_call_async_disconnect(RedictModuleCtx *ctx, struct RedictModuleBlockedClient *bc) {
|
|
UNUSED(ctx);
|
|
RedictModuleCallReply* rep = RedictModule_BlockClientGetPrivateData(bc);
|
|
RedictModule_CallReplyPromiseAbort(rep, NULL);
|
|
RedictModule_FreeCallReply(rep);
|
|
RedictModule_AbortBlock(bc);
|
|
}
|
|
|
|
/*
|
|
* Callback for do_rm_call_async / do_rm_call_async_script_mode
|
|
* Gets the command to invoke as the first argument to the command and runs it,
|
|
* passing the rest of the arguments to the command invocation.
|
|
* If the command got blocked, blocks the client and unblock it when the command gets unblocked,
|
|
* this allows check the K (allow blocking) argument to RM_Call.
|
|
*/
|
|
int do_rm_call_async(RedictModuleCtx *ctx, RedictModuleString **argv, int argc){
|
|
UNUSED(argv);
|
|
UNUSED(argc);
|
|
|
|
if(argc < 2){
|
|
return RedictModule_WrongArity(ctx);
|
|
}
|
|
|
|
size_t format_len = 0;
|
|
char format[6] = {0};
|
|
|
|
if (!(RedictModule_GetContextFlags(ctx) & REDICTMODULE_CTX_FLAGS_DENY_BLOCKING)) {
|
|
/* We are allowed to block the client so we can allow RM_Call to also block us */
|
|
format[format_len++] = 'K';
|
|
}
|
|
|
|
const char* invoked_cmd = RedictModule_StringPtrLen(argv[0], NULL);
|
|
if (strcasecmp(invoked_cmd, "do_rm_call_async_script_mode") == 0) {
|
|
format[format_len++] = 'S';
|
|
}
|
|
|
|
format[format_len++] = 'E';
|
|
format[format_len++] = 'v';
|
|
if (strcasecmp(invoked_cmd, "do_rm_call_async_no_replicate") != 0) {
|
|
/* Notice, without the '!' flag we will have inconsistency between master and replica.
|
|
* This is used only to check '!' flag correctness on blocked commands. */
|
|
format[format_len++] = '!';
|
|
}
|
|
|
|
const char* cmd = RedictModule_StringPtrLen(argv[1], NULL);
|
|
|
|
RedictModuleCallReply* rep = RedictModule_Call(ctx, cmd, format, argv + 2, argc - 2);
|
|
|
|
if(RedictModule_CallReplyType(rep) != REDICTMODULE_REPLY_PROMISE) {
|
|
rm_call_async_send_reply(ctx, rep);
|
|
} else {
|
|
RedictModuleBlockedClient *bc = RedictModule_BlockClient(ctx, NULL, NULL, do_rm_call_async_free_pd, 0);
|
|
RedictModule_SetDisconnectCallback(bc, do_rm_call_async_disconnect);
|
|
RedictModule_BlockClientSetPrivateData(bc, rep);
|
|
RedictModule_CallReplyPromiseSetUnblockHandler(rep, rm_call_async_on_unblocked, bc);
|
|
}
|
|
|
|
return REDICTMODULE_OK;
|
|
}
|
|
|
|
typedef struct ThreadedAsyncRMCallCtx{
|
|
RedictModuleBlockedClient *bc;
|
|
RedictModuleCallReply *reply;
|
|
} ThreadedAsyncRMCallCtx;
|
|
|
|
void *send_async_reply(void *arg) {
|
|
ThreadedAsyncRMCallCtx *ta_rm_call_ctx = arg;
|
|
rm_call_async_on_unblocked(NULL, ta_rm_call_ctx->reply, ta_rm_call_ctx->bc);
|
|
RedictModule_Free(ta_rm_call_ctx);
|
|
return NULL;
|
|
}
|
|
|
|
/* Called when the command that was blocked on 'RM_Call' gets unblocked
|
|
* and schedule a thread to send the reply to the blocked client. */
|
|
static void rm_call_async_reply_on_thread(RedictModuleCtx *ctx, RedictModuleCallReply *reply, void *private_data) {
|
|
UNUSED(ctx);
|
|
ThreadedAsyncRMCallCtx *ta_rm_call_ctx = RedictModule_Alloc(sizeof(*ta_rm_call_ctx));
|
|
ta_rm_call_ctx->bc = private_data;
|
|
ta_rm_call_ctx->reply = reply;
|
|
pthread_t tid;
|
|
int res = pthread_create(&tid, NULL, send_async_reply, ta_rm_call_ctx);
|
|
assert(res == 0);
|
|
}
|
|
|
|
/*
|
|
* Callback for do_rm_call_async_on_thread.
|
|
* Gets the command to invoke as the first argument to the command and runs it,
|
|
* passing the rest of the arguments to the command invocation.
|
|
* If the command got blocked, blocks the client and unblock on a background thread.
|
|
* this allows check the K (allow blocking) argument to RM_Call, and make sure that the reply
|
|
* that passes to unblock handler is owned by the handler and are not attached to any
|
|
* context that might be freed after the callback ends.
|
|
*/
|
|
int do_rm_call_async_on_thread(RedictModuleCtx *ctx, RedictModuleString **argv, int argc){
|
|
UNUSED(argv);
|
|
UNUSED(argc);
|
|
|
|
if(argc < 2){
|
|
return RedictModule_WrongArity(ctx);
|
|
}
|
|
|
|
const char* cmd = RedictModule_StringPtrLen(argv[1], NULL);
|
|
|
|
RedictModuleCallReply* rep = RedictModule_Call(ctx, cmd, "KEv", argv + 2, argc - 2);
|
|
|
|
if(RedictModule_CallReplyType(rep) != REDICTMODULE_REPLY_PROMISE) {
|
|
rm_call_async_send_reply(ctx, rep);
|
|
} else {
|
|
RedictModuleBlockedClient *bc = RedictModule_BlockClient(ctx, NULL, NULL, NULL, 0);
|
|
RedictModule_CallReplyPromiseSetUnblockHandler(rep, rm_call_async_reply_on_thread, bc);
|
|
RedictModule_FreeCallReply(rep);
|
|
}
|
|
|
|
return REDICTMODULE_OK;
|
|
}
|
|
|
|
/* Private data for wait_and_do_rm_call_async that holds information about:
|
|
* 1. the block client, to unblock when done.
|
|
* 2. the arguments, contains the command to run using RM_Call */
|
|
typedef struct WaitAndDoRMCallCtx {
|
|
RedictModuleBlockedClient *bc;
|
|
RedictModuleString **argv;
|
|
int argc;
|
|
} WaitAndDoRMCallCtx;
|
|
|
|
/*
|
|
* This callback will be called when the 'wait' command invoke on 'wait_and_do_rm_call_async' will finish.
|
|
* This callback will continue the execution flow just like 'do_rm_call_async' command.
|
|
*/
|
|
static void wait_and_do_rm_call_async_on_unblocked(RedictModuleCtx *ctx, RedictModuleCallReply *reply, void *private_data) {
|
|
WaitAndDoRMCallCtx *wctx = private_data;
|
|
if (RedictModule_CallReplyType(reply) != REDICTMODULE_REPLY_INTEGER) {
|
|
goto done;
|
|
}
|
|
|
|
if (RedictModule_CallReplyInteger(reply) != 1) {
|
|
goto done;
|
|
}
|
|
|
|
RedictModule_FreeCallReply(reply);
|
|
reply = NULL;
|
|
|
|
const char* cmd = RedictModule_StringPtrLen(wctx->argv[0], NULL);
|
|
reply = RedictModule_Call(ctx, cmd, "!EKv", wctx->argv + 1, wctx->argc - 1);
|
|
|
|
done:
|
|
if(RedictModule_CallReplyType(reply) != REDICTMODULE_REPLY_PROMISE) {
|
|
RedictModuleCtx *bctx = RedictModule_GetThreadSafeContext(wctx->bc);
|
|
rm_call_async_send_reply(bctx, reply);
|
|
RedictModule_FreeThreadSafeContext(bctx);
|
|
RedictModule_UnblockClient(wctx->bc, NULL);
|
|
} else {
|
|
RedictModule_CallReplyPromiseSetUnblockHandler(reply, rm_call_async_on_unblocked, wctx->bc);
|
|
RedictModule_FreeCallReply(reply);
|
|
}
|
|
for (int i = 0 ; i < wctx->argc ; ++i) {
|
|
RedictModule_FreeString(NULL, wctx->argv[i]);
|
|
}
|
|
RedictModule_Free(wctx->argv);
|
|
RedictModule_Free(wctx);
|
|
}
|
|
|
|
/*
|
|
* Callback for wait_and_do_rm_call
|
|
* Gets the command to invoke as the first argument, runs 'wait'
|
|
* command (using the K flag to RM_Call). Once the wait finished, runs the
|
|
* command that was given (just like 'do_rm_call_async').
|
|
*/
|
|
int wait_and_do_rm_call_async(RedictModuleCtx *ctx, RedictModuleString **argv, int argc) {
|
|
UNUSED(argv);
|
|
UNUSED(argc);
|
|
|
|
if(argc < 2){
|
|
return RedictModule_WrongArity(ctx);
|
|
}
|
|
|
|
int flags = RedictModule_GetContextFlags(ctx);
|
|
if (flags & REDICTMODULE_CTX_FLAGS_DENY_BLOCKING) {
|
|
return RedictModule_ReplyWithError(ctx, "Err can not run wait, blocking is not allowed.");
|
|
}
|
|
|
|
RedictModuleCallReply* rep = RedictModule_Call(ctx, "wait", "!EKcc", "1", "0");
|
|
if(RedictModule_CallReplyType(rep) != REDICTMODULE_REPLY_PROMISE) {
|
|
rm_call_async_send_reply(ctx, rep);
|
|
} else {
|
|
RedictModuleBlockedClient *bc = RedictModule_BlockClient(ctx, NULL, NULL, NULL, 0);
|
|
WaitAndDoRMCallCtx *wctx = RedictModule_Alloc(sizeof(*wctx));
|
|
*wctx = (WaitAndDoRMCallCtx){
|
|
.bc = bc,
|
|
.argv = RedictModule_Alloc((argc - 1) * sizeof(RedictModuleString*)),
|
|
.argc = argc - 1,
|
|
};
|
|
|
|
for (int i = 1 ; i < argc ; ++i) {
|
|
wctx->argv[i - 1] = RedictModule_HoldString(NULL, argv[i]);
|
|
}
|
|
RedictModule_CallReplyPromiseSetUnblockHandler(rep, wait_and_do_rm_call_async_on_unblocked, wctx);
|
|
RedictModule_FreeCallReply(rep);
|
|
}
|
|
|
|
return REDICTMODULE_OK;
|
|
}
|
|
|
|
static void blpop_and_set_multiple_keys_on_unblocked(RedictModuleCtx *ctx, RedictModuleCallReply *reply, void *private_data) {
|
|
/* ignore the reply */
|
|
RedictModule_FreeCallReply(reply);
|
|
WaitAndDoRMCallCtx *wctx = private_data;
|
|
for (int i = 0 ; i < wctx->argc ; i += 2) {
|
|
RedictModuleCallReply* rep = RedictModule_Call(ctx, "set", "!ss", wctx->argv[i], wctx->argv[i + 1]);
|
|
RedictModule_FreeCallReply(rep);
|
|
}
|
|
|
|
RedictModuleCtx *bctx = RedictModule_GetThreadSafeContext(wctx->bc);
|
|
RedictModule_ReplyWithSimpleString(bctx, "OK");
|
|
RedictModule_FreeThreadSafeContext(bctx);
|
|
RedictModule_UnblockClient(wctx->bc, NULL);
|
|
|
|
for (int i = 0 ; i < wctx->argc ; ++i) {
|
|
RedictModule_FreeString(NULL, wctx->argv[i]);
|
|
}
|
|
RedictModule_Free(wctx->argv);
|
|
RedictModule_Free(wctx);
|
|
|
|
}
|
|
|
|
/*
|
|
* Performs a blpop command on a given list and when unblocked set multiple string keys.
|
|
* This command allows checking that the unblock callback is performed as a unit
|
|
* and its effect are replicated to the replica and AOF wrapped with multi exec.
|
|
*/
|
|
int blpop_and_set_multiple_keys(RedictModuleCtx *ctx, RedictModuleString **argv, int argc) {
|
|
UNUSED(argv);
|
|
UNUSED(argc);
|
|
|
|
if(argc < 2 || argc % 2 != 0){
|
|
return RedictModule_WrongArity(ctx);
|
|
}
|
|
|
|
int flags = RedictModule_GetContextFlags(ctx);
|
|
if (flags & REDICTMODULE_CTX_FLAGS_DENY_BLOCKING) {
|
|
return RedictModule_ReplyWithError(ctx, "Err can not run wait, blocking is not allowed.");
|
|
}
|
|
|
|
RedictModuleCallReply* rep = RedictModule_Call(ctx, "blpop", "!EKsc", argv[1], "0");
|
|
if(RedictModule_CallReplyType(rep) != REDICTMODULE_REPLY_PROMISE) {
|
|
rm_call_async_send_reply(ctx, rep);
|
|
} else {
|
|
RedictModuleBlockedClient *bc = RedictModule_BlockClient(ctx, NULL, NULL, NULL, 0);
|
|
WaitAndDoRMCallCtx *wctx = RedictModule_Alloc(sizeof(*wctx));
|
|
*wctx = (WaitAndDoRMCallCtx){
|
|
.bc = bc,
|
|
.argv = RedictModule_Alloc((argc - 2) * sizeof(RedictModuleString*)),
|
|
.argc = argc - 2,
|
|
};
|
|
|
|
for (int i = 0 ; i < argc - 2 ; ++i) {
|
|
wctx->argv[i] = RedictModule_HoldString(NULL, argv[i + 2]);
|
|
}
|
|
RedictModule_CallReplyPromiseSetUnblockHandler(rep, blpop_and_set_multiple_keys_on_unblocked, wctx);
|
|
RedictModule_FreeCallReply(rep);
|
|
}
|
|
|
|
return REDICTMODULE_OK;
|
|
}
|
|
|
|
/* simulate a blocked client replying to a thread safe context without creating a thread */
|
|
int do_fake_bg_true(RedictModuleCtx *ctx, RedictModuleString **argv, int argc) {
|
|
UNUSED(argv);
|
|
UNUSED(argc);
|
|
|
|
RedictModuleBlockedClient *bc = RedictModule_BlockClient(ctx, NULL, NULL, NULL, 0);
|
|
RedictModuleCtx *bctx = RedictModule_GetThreadSafeContext(bc);
|
|
|
|
RedictModule_ReplyWithBool(bctx, 1);
|
|
|
|
RedictModule_FreeThreadSafeContext(bctx);
|
|
RedictModule_UnblockClient(bc, NULL);
|
|
|
|
return REDICTMODULE_OK;
|
|
}
|
|
|
|
|
|
/* this flag is used to work with busy commands, that might take a while
|
|
* and ability to stop the busy work with a different command*/
|
|
static volatile int abort_flag = 0;
|
|
|
|
int slow_fg_command(RedictModuleCtx *ctx, RedictModuleString **argv, int argc) {
|
|
if (argc != 2) {
|
|
RedictModule_WrongArity(ctx);
|
|
return REDICTMODULE_OK;
|
|
}
|
|
long long block_time = 0;
|
|
if (RedictModule_StringToLongLong(argv[1], &block_time) != REDICTMODULE_OK) {
|
|
RedictModule_ReplyWithError(ctx, "Invalid integer value");
|
|
return REDICTMODULE_OK;
|
|
}
|
|
|
|
uint64_t start_time = RedictModule_MonotonicMicroseconds();
|
|
/* when not blocking indefinitely, we don't process client commands in this test. */
|
|
int yield_flags = block_time? REDICTMODULE_YIELD_FLAG_NONE: REDICTMODULE_YIELD_FLAG_CLIENTS;
|
|
while (!abort_flag) {
|
|
RedictModule_Yield(ctx, yield_flags, "Slow module operation");
|
|
usleep(1000);
|
|
if (block_time && RedictModule_MonotonicMicroseconds() - start_time > (uint64_t)block_time)
|
|
break;
|
|
}
|
|
|
|
abort_flag = 0;
|
|
RedictModule_ReplyWithLongLong(ctx, 1);
|
|
return REDICTMODULE_OK;
|
|
}
|
|
|
|
int stop_slow_fg_command(RedictModuleCtx *ctx, RedictModuleString **argv, int argc) {
|
|
REDICTMODULE_NOT_USED(argv);
|
|
REDICTMODULE_NOT_USED(argc);
|
|
abort_flag = 1;
|
|
RedictModule_ReplyWithLongLong(ctx, 1);
|
|
return REDICTMODULE_OK;
|
|
}
|
|
|
|
/* used to enable or disable slow operation in do_bg_rm_call */
|
|
static int set_slow_bg_operation(RedictModuleCtx *ctx, RedictModuleString **argv, int argc) {
|
|
if (argc != 2) {
|
|
RedictModule_WrongArity(ctx);
|
|
return REDICTMODULE_OK;
|
|
}
|
|
long long ll;
|
|
if (RedictModule_StringToLongLong(argv[1], &ll) != REDICTMODULE_OK) {
|
|
RedictModule_ReplyWithError(ctx, "Invalid integer value");
|
|
return REDICTMODULE_OK;
|
|
}
|
|
g_slow_bg_operation = ll;
|
|
RedictModule_ReplyWithSimpleString(ctx, "OK");
|
|
return REDICTMODULE_OK;
|
|
}
|
|
|
|
/* used to test if we reached the slow operation in do_bg_rm_call */
|
|
static int is_in_slow_bg_operation(RedictModuleCtx *ctx, RedictModuleString **argv, int argc) {
|
|
UNUSED(argv);
|
|
if (argc != 1) {
|
|
RedictModule_WrongArity(ctx);
|
|
return REDICTMODULE_OK;
|
|
}
|
|
|
|
RedictModule_ReplyWithLongLong(ctx, g_is_in_slow_bg_operation);
|
|
return REDICTMODULE_OK;
|
|
}
|
|
|
|
static void timer_callback(RedictModuleCtx *ctx, void *data)
|
|
{
|
|
UNUSED(ctx);
|
|
|
|
RedictModuleBlockedClient *bc = data;
|
|
|
|
// Get Redict module context
|
|
RedictModuleCtx *reply_ctx = RedictModule_GetThreadSafeContext(bc);
|
|
|
|
// Reply to client
|
|
RedictModule_ReplyWithSimpleString(reply_ctx, "OK");
|
|
|
|
// Unblock client
|
|
RedictModule_UnblockClient(bc, NULL);
|
|
|
|
// Free the Redict module context
|
|
RedictModule_FreeThreadSafeContext(reply_ctx);
|
|
}
|
|
|
|
/* unblock_by_timer <period_ms> <timeout_ms>
|
|
* period_ms is the period of the timer.
|
|
* timeout_ms is the blocking timeout. */
|
|
int unblock_by_timer(RedictModuleCtx *ctx, RedictModuleString **argv, int argc)
|
|
{
|
|
if (argc != 3)
|
|
return RedictModule_WrongArity(ctx);
|
|
|
|
long long period;
|
|
long long timeout;
|
|
if (RedictModule_StringToLongLong(argv[1],&period) != REDICTMODULE_OK)
|
|
return RedictModule_ReplyWithError(ctx,"ERR invalid period");
|
|
if (RedictModule_StringToLongLong(argv[2],&timeout) != REDICTMODULE_OK) {
|
|
return RedictModule_ReplyWithError(ctx,"ERR invalid timeout");
|
|
}
|
|
|
|
RedictModuleBlockedClient *bc = RedictModule_BlockClient(ctx, NULL, NULL, NULL, timeout);
|
|
RedictModule_CreateTimer(ctx, period, timer_callback, bc);
|
|
return REDICTMODULE_OK;
|
|
}
|
|
|
|
int RedictModule_OnLoad(RedictModuleCtx *ctx, RedictModuleString **argv, int argc) {
|
|
REDICTMODULE_NOT_USED(argv);
|
|
REDICTMODULE_NOT_USED(argc);
|
|
|
|
if (RedictModule_Init(ctx, "blockedclient", 1, REDICTMODULE_APIVER_1)== REDICTMODULE_ERR)
|
|
return REDICTMODULE_ERR;
|
|
|
|
if (RedictModule_CreateCommand(ctx, "acquire_gil", acquire_gil, "", 0, 0, 0) == REDICTMODULE_ERR)
|
|
return REDICTMODULE_ERR;
|
|
|
|
if (RedictModule_CreateCommand(ctx, "do_rm_call", do_rm_call,
|
|
"write", 0, 0, 0) == REDICTMODULE_ERR)
|
|
return REDICTMODULE_ERR;
|
|
|
|
if (RedictModule_CreateCommand(ctx, "do_rm_call_async", do_rm_call_async,
|
|
"write", 0, 0, 0) == REDICTMODULE_ERR)
|
|
return REDICTMODULE_ERR;
|
|
|
|
if (RedictModule_CreateCommand(ctx, "do_rm_call_async_on_thread", do_rm_call_async_on_thread,
|
|
"write", 0, 0, 0) == REDICTMODULE_ERR)
|
|
return REDICTMODULE_ERR;
|
|
|
|
if (RedictModule_CreateCommand(ctx, "do_rm_call_async_script_mode", do_rm_call_async,
|
|
"write", 0, 0, 0) == REDICTMODULE_ERR)
|
|
return REDICTMODULE_ERR;
|
|
|
|
if (RedictModule_CreateCommand(ctx, "do_rm_call_async_no_replicate", do_rm_call_async,
|
|
"write", 0, 0, 0) == REDICTMODULE_ERR)
|
|
return REDICTMODULE_ERR;
|
|
|
|
if (RedictModule_CreateCommand(ctx, "do_rm_call_fire_and_forget", do_rm_call_async_fire_and_forget,
|
|
"write", 0, 0, 0) == REDICTMODULE_ERR)
|
|
return REDICTMODULE_ERR;
|
|
|
|
if (RedictModule_CreateCommand(ctx, "wait_and_do_rm_call", wait_and_do_rm_call_async,
|
|
"write", 0, 0, 0) == REDICTMODULE_ERR)
|
|
return REDICTMODULE_ERR;
|
|
|
|
if (RedictModule_CreateCommand(ctx, "blpop_and_set_multiple_keys", blpop_and_set_multiple_keys,
|
|
"write", 0, 0, 0) == REDICTMODULE_ERR)
|
|
return REDICTMODULE_ERR;
|
|
|
|
if (RedictModule_CreateCommand(ctx, "do_bg_rm_call", do_bg_rm_call, "", 0, 0, 0) == REDICTMODULE_ERR)
|
|
return REDICTMODULE_ERR;
|
|
|
|
if (RedictModule_CreateCommand(ctx, "do_bg_rm_call_format", do_bg_rm_call, "", 0, 0, 0) == REDICTMODULE_ERR)
|
|
return REDICTMODULE_ERR;
|
|
|
|
if (RedictModule_CreateCommand(ctx, "do_fake_bg_true", do_fake_bg_true, "", 0, 0, 0) == REDICTMODULE_ERR)
|
|
return REDICTMODULE_ERR;
|
|
|
|
if (RedictModule_CreateCommand(ctx, "slow_fg_command", slow_fg_command,"", 0, 0, 0) == REDICTMODULE_ERR)
|
|
return REDICTMODULE_ERR;
|
|
|
|
if (RedictModule_CreateCommand(ctx, "stop_slow_fg_command", stop_slow_fg_command,"allow-busy", 0, 0, 0) == REDICTMODULE_ERR)
|
|
return REDICTMODULE_ERR;
|
|
|
|
if (RedictModule_CreateCommand(ctx, "set_slow_bg_operation", set_slow_bg_operation, "allow-busy", 0, 0, 0) == REDICTMODULE_ERR)
|
|
return REDICTMODULE_ERR;
|
|
|
|
if (RedictModule_CreateCommand(ctx, "is_in_slow_bg_operation", is_in_slow_bg_operation, "allow-busy", 0, 0, 0) == REDICTMODULE_ERR)
|
|
return REDICTMODULE_ERR;
|
|
|
|
if (RedictModule_CreateCommand(ctx, "unblock_by_timer", unblock_by_timer, "", 0, 0, 0) == REDICTMODULE_ERR)
|
|
return REDICTMODULE_ERR;
|
|
|
|
return REDICTMODULE_OK;
|
|
}
|