mirror of
https://codeberg.org/redict/redict.git
synced 2025-01-22 08:08:53 -05:00
cdd60793d5
Non-API impacting renames. Signed-off-by: Drew DeVault <sir@cmpwn.com>
337 lines
13 KiB
C
337 lines
13 KiB
C
// SPDX-FileCopyrightText: 2024 Redict Contributors
|
|
// SPDX-FileCopyrightText: 2024 Salvatore Sanfilippo <antirez at gmail dot com>
|
|
//
|
|
// SPDX-License-Identifier: BSD-3-Clause
|
|
// SPDX-License-Identifier: LGPL-3.0-only
|
|
|
|
#define _XOPEN_SOURCE 700
|
|
#include "redictmodule.h"
|
|
#include <stdio.h>
|
|
#include <stdlib.h>
|
|
#include <pthread.h>
|
|
#include <time.h>
|
|
|
|
#define UNUSED(x) (void)(x)
|
|
|
|
typedef struct {
|
|
/* Mutex for protecting RedictModule_BlockedClientMeasureTime*() API from race
|
|
* conditions due to timeout callback triggered in the main thread. */
|
|
pthread_mutex_t measuretime_mutex;
|
|
int measuretime_completed; /* Indicates that time measure has ended and will not continue further */
|
|
int myint; /* Used for replying */
|
|
} BlockPrivdata;
|
|
|
|
void blockClientPrivdataInit(RedictModuleBlockedClient *bc) {
|
|
BlockPrivdata *block_privdata = RedictModule_Calloc(1, sizeof(*block_privdata));
|
|
block_privdata->measuretime_mutex = (pthread_mutex_t)PTHREAD_MUTEX_INITIALIZER;
|
|
RedictModule_BlockClientSetPrivateData(bc, block_privdata);
|
|
}
|
|
|
|
void blockClientMeasureTimeStart(RedictModuleBlockedClient *bc, BlockPrivdata *block_privdata) {
|
|
pthread_mutex_lock(&block_privdata->measuretime_mutex);
|
|
RedictModule_BlockedClientMeasureTimeStart(bc);
|
|
pthread_mutex_unlock(&block_privdata->measuretime_mutex);
|
|
}
|
|
|
|
void blockClientMeasureTimeEnd(RedictModuleBlockedClient *bc, BlockPrivdata *block_privdata, int completed) {
|
|
pthread_mutex_lock(&block_privdata->measuretime_mutex);
|
|
if (!block_privdata->measuretime_completed) {
|
|
RedictModule_BlockedClientMeasureTimeEnd(bc);
|
|
if (completed) block_privdata->measuretime_completed = 1;
|
|
}
|
|
pthread_mutex_unlock(&block_privdata->measuretime_mutex);
|
|
}
|
|
|
|
/* Reply callback for blocking command BLOCK.DEBUG */
|
|
int HelloBlock_Reply(RedictModuleCtx *ctx, RedictModuleString **argv, int argc) {
|
|
UNUSED(argv);
|
|
UNUSED(argc);
|
|
BlockPrivdata *block_privdata = RedictModule_GetBlockedClientPrivateData(ctx);
|
|
return RedictModule_ReplyWithLongLong(ctx,block_privdata->myint);
|
|
}
|
|
|
|
/* Timeout callback for blocking command BLOCK.DEBUG */
|
|
int HelloBlock_Timeout(RedictModuleCtx *ctx, RedictModuleString **argv, int argc) {
|
|
UNUSED(argv);
|
|
UNUSED(argc);
|
|
RedictModuleBlockedClient *bc = RedictModule_GetBlockedClientHandle(ctx);
|
|
BlockPrivdata *block_privdata = RedictModule_GetBlockedClientPrivateData(ctx);
|
|
blockClientMeasureTimeEnd(bc, block_privdata, 1);
|
|
return RedictModule_ReplyWithSimpleString(ctx,"Request timedout");
|
|
}
|
|
|
|
/* Private data freeing callback for BLOCK.DEBUG command. */
|
|
void HelloBlock_FreeData(RedictModuleCtx *ctx, void *privdata) {
|
|
UNUSED(ctx);
|
|
BlockPrivdata *block_privdata = privdata;
|
|
pthread_mutex_destroy(&block_privdata->measuretime_mutex);
|
|
RedictModule_Free(privdata);
|
|
}
|
|
|
|
/* Private data freeing callback for BLOCK.BLOCK command. */
|
|
void HelloBlock_FreeStringData(RedictModuleCtx *ctx, void *privdata) {
|
|
RedictModule_FreeString(ctx, (RedictModuleString*)privdata);
|
|
}
|
|
|
|
/* The thread entry point that actually executes the blocking part
|
|
* of the command BLOCK.DEBUG. */
|
|
void *BlockDebug_ThreadMain(void *arg) {
|
|
void **targ = arg;
|
|
RedictModuleBlockedClient *bc = targ[0];
|
|
long long delay = (unsigned long)targ[1];
|
|
long long enable_time_track = (unsigned long)targ[2];
|
|
BlockPrivdata *block_privdata = RedictModule_BlockClientGetPrivateData(bc);
|
|
|
|
if (enable_time_track)
|
|
blockClientMeasureTimeStart(bc, block_privdata);
|
|
RedictModule_Free(targ);
|
|
|
|
struct timespec ts;
|
|
ts.tv_sec = delay / 1000;
|
|
ts.tv_nsec = (delay % 1000) * 1000000;
|
|
nanosleep(&ts, NULL);
|
|
if (enable_time_track)
|
|
blockClientMeasureTimeEnd(bc, block_privdata, 0);
|
|
block_privdata->myint = rand();
|
|
RedictModule_UnblockClient(bc,block_privdata);
|
|
return NULL;
|
|
}
|
|
|
|
/* The thread entry point that actually executes the blocking part
|
|
* of the command BLOCK.DOUBLE_DEBUG. */
|
|
void *DoubleBlock_ThreadMain(void *arg) {
|
|
void **targ = arg;
|
|
RedictModuleBlockedClient *bc = targ[0];
|
|
long long delay = (unsigned long)targ[1];
|
|
BlockPrivdata *block_privdata = RedictModule_BlockClientGetPrivateData(bc);
|
|
blockClientMeasureTimeStart(bc, block_privdata);
|
|
RedictModule_Free(targ);
|
|
struct timespec ts;
|
|
ts.tv_sec = delay / 1000;
|
|
ts.tv_nsec = (delay % 1000) * 1000000;
|
|
nanosleep(&ts, NULL);
|
|
blockClientMeasureTimeEnd(bc, block_privdata, 0);
|
|
/* call again RedictModule_BlockedClientMeasureTimeStart() and
|
|
* RedictModule_BlockedClientMeasureTimeEnd and ensure that the
|
|
* total execution time is 2x the delay. */
|
|
blockClientMeasureTimeStart(bc, block_privdata);
|
|
nanosleep(&ts, NULL);
|
|
blockClientMeasureTimeEnd(bc, block_privdata, 0);
|
|
block_privdata->myint = rand();
|
|
RedictModule_UnblockClient(bc,block_privdata);
|
|
return NULL;
|
|
}
|
|
|
|
void HelloBlock_Disconnected(RedictModuleCtx *ctx, RedictModuleBlockedClient *bc) {
|
|
RedictModule_Log(ctx,"warning","Blocked client %p disconnected!",
|
|
(void*)bc);
|
|
}
|
|
|
|
/* BLOCK.DEBUG <delay_ms> <timeout_ms> -- Block for <count> milliseconds, then reply with
|
|
* a random number. Timeout is the command timeout, so that you can test
|
|
* what happens when the delay is greater than the timeout. */
|
|
int HelloBlock_RedictCommand(RedictModuleCtx *ctx, RedictModuleString **argv, int argc) {
|
|
if (argc != 3) return RedictModule_WrongArity(ctx);
|
|
long long delay;
|
|
long long timeout;
|
|
|
|
if (RedictModule_StringToLongLong(argv[1],&delay) != REDICTMODULE_OK) {
|
|
return RedictModule_ReplyWithError(ctx,"ERR invalid count");
|
|
}
|
|
|
|
if (RedictModule_StringToLongLong(argv[2],&timeout) != REDICTMODULE_OK) {
|
|
return RedictModule_ReplyWithError(ctx,"ERR invalid count");
|
|
}
|
|
|
|
pthread_t tid;
|
|
RedictModuleBlockedClient *bc = RedictModule_BlockClient(ctx,HelloBlock_Reply,HelloBlock_Timeout,HelloBlock_FreeData,timeout);
|
|
blockClientPrivdataInit(bc);
|
|
|
|
/* Here we set a disconnection handler, however since this module will
|
|
* block in sleep() in a thread, there is not much we can do in the
|
|
* callback, so this is just to show you the API. */
|
|
RedictModule_SetDisconnectCallback(bc,HelloBlock_Disconnected);
|
|
|
|
/* Now that we setup a blocking client, we need to pass the control
|
|
* to the thread. However we need to pass arguments to the thread:
|
|
* the delay and a reference to the blocked client handle. */
|
|
void **targ = RedictModule_Alloc(sizeof(void*)*3);
|
|
targ[0] = bc;
|
|
targ[1] = (void*)(unsigned long) delay;
|
|
// pass 1 as flag to enable time tracking
|
|
targ[2] = (void*)(unsigned long) 1;
|
|
|
|
if (pthread_create(&tid,NULL,BlockDebug_ThreadMain,targ) != 0) {
|
|
RedictModule_AbortBlock(bc);
|
|
return RedictModule_ReplyWithError(ctx,"-ERR Can't start thread");
|
|
}
|
|
return REDICTMODULE_OK;
|
|
}
|
|
|
|
/* BLOCK.DEBUG_NOTRACKING <delay_ms> <timeout_ms> -- Block for <count> milliseconds, then reply with
|
|
* a random number. Timeout is the command timeout, so that you can test
|
|
* what happens when the delay is greater than the timeout.
|
|
* this command does not track background time so the background time should no appear in stats*/
|
|
int HelloBlockNoTracking_RedictCommand(RedictModuleCtx *ctx, RedictModuleString **argv, int argc) {
|
|
if (argc != 3) return RedictModule_WrongArity(ctx);
|
|
long long delay;
|
|
long long timeout;
|
|
|
|
if (RedictModule_StringToLongLong(argv[1],&delay) != REDICTMODULE_OK) {
|
|
return RedictModule_ReplyWithError(ctx,"ERR invalid count");
|
|
}
|
|
|
|
if (RedictModule_StringToLongLong(argv[2],&timeout) != REDICTMODULE_OK) {
|
|
return RedictModule_ReplyWithError(ctx,"ERR invalid count");
|
|
}
|
|
|
|
pthread_t tid;
|
|
RedictModuleBlockedClient *bc = RedictModule_BlockClient(ctx,HelloBlock_Reply,HelloBlock_Timeout,HelloBlock_FreeData,timeout);
|
|
blockClientPrivdataInit(bc);
|
|
|
|
/* Here we set a disconnection handler, however since this module will
|
|
* block in sleep() in a thread, there is not much we can do in the
|
|
* callback, so this is just to show you the API. */
|
|
RedictModule_SetDisconnectCallback(bc,HelloBlock_Disconnected);
|
|
|
|
/* Now that we setup a blocking client, we need to pass the control
|
|
* to the thread. However we need to pass arguments to the thread:
|
|
* the delay and a reference to the blocked client handle. */
|
|
void **targ = RedictModule_Alloc(sizeof(void*)*3);
|
|
targ[0] = bc;
|
|
targ[1] = (void*)(unsigned long) delay;
|
|
// pass 0 as flag to enable time tracking
|
|
targ[2] = (void*)(unsigned long) 0;
|
|
|
|
if (pthread_create(&tid,NULL,BlockDebug_ThreadMain,targ) != 0) {
|
|
RedictModule_AbortBlock(bc);
|
|
return RedictModule_ReplyWithError(ctx,"-ERR Can't start thread");
|
|
}
|
|
return REDICTMODULE_OK;
|
|
}
|
|
|
|
/* BLOCK.DOUBLE_DEBUG <delay_ms> -- Block for 2 x <count> milliseconds,
|
|
* then reply with a random number.
|
|
* This command is used to test multiple calls to RedictModule_BlockedClientMeasureTimeStart()
|
|
* and RedictModule_BlockedClientMeasureTimeEnd() within the same execution. */
|
|
int HelloDoubleBlock_RedictCommand(RedictModuleCtx *ctx, RedictModuleString **argv, int argc) {
|
|
if (argc != 2) return RedictModule_WrongArity(ctx);
|
|
long long delay;
|
|
|
|
if (RedictModule_StringToLongLong(argv[1],&delay) != REDICTMODULE_OK) {
|
|
return RedictModule_ReplyWithError(ctx,"ERR invalid count");
|
|
}
|
|
|
|
pthread_t tid;
|
|
RedictModuleBlockedClient *bc = RedictModule_BlockClient(ctx,HelloBlock_Reply,HelloBlock_Timeout,HelloBlock_FreeData,0);
|
|
blockClientPrivdataInit(bc);
|
|
|
|
/* Now that we setup a blocking client, we need to pass the control
|
|
* to the thread. However we need to pass arguments to the thread:
|
|
* the delay and a reference to the blocked client handle. */
|
|
void **targ = RedictModule_Alloc(sizeof(void*)*2);
|
|
targ[0] = bc;
|
|
targ[1] = (void*)(unsigned long) delay;
|
|
|
|
if (pthread_create(&tid,NULL,DoubleBlock_ThreadMain,targ) != 0) {
|
|
RedictModule_AbortBlock(bc);
|
|
return RedictModule_ReplyWithError(ctx,"-ERR Can't start thread");
|
|
}
|
|
return REDICTMODULE_OK;
|
|
}
|
|
|
|
RedictModuleBlockedClient *blocked_client = NULL;
|
|
|
|
/* BLOCK.BLOCK [TIMEOUT] -- Blocks the current client until released
|
|
* or TIMEOUT seconds. If TIMEOUT is zero, no timeout function is
|
|
* registered.
|
|
*/
|
|
int Block_RedictCommand(RedictModuleCtx *ctx, RedictModuleString **argv, int argc) {
|
|
if (RedictModule_IsBlockedReplyRequest(ctx)) {
|
|
RedictModuleString *r = RedictModule_GetBlockedClientPrivateData(ctx);
|
|
return RedictModule_ReplyWithString(ctx, r);
|
|
} else if (RedictModule_IsBlockedTimeoutRequest(ctx)) {
|
|
RedictModule_UnblockClient(blocked_client, NULL); /* Must be called to avoid leaks. */
|
|
blocked_client = NULL;
|
|
return RedictModule_ReplyWithSimpleString(ctx, "Timed out");
|
|
}
|
|
|
|
if (argc != 2) return RedictModule_WrongArity(ctx);
|
|
long long timeout;
|
|
|
|
if (RedictModule_StringToLongLong(argv[1], &timeout) != REDICTMODULE_OK) {
|
|
return RedictModule_ReplyWithError(ctx, "ERR invalid timeout");
|
|
}
|
|
if (blocked_client) {
|
|
return RedictModule_ReplyWithError(ctx, "ERR another client already blocked");
|
|
}
|
|
|
|
/* Block client. We use this function as both a reply and optional timeout
|
|
* callback and differentiate the different code flows above.
|
|
*/
|
|
blocked_client = RedictModule_BlockClient(ctx, Block_RedictCommand,
|
|
timeout > 0 ? Block_RedictCommand : NULL, HelloBlock_FreeStringData, timeout);
|
|
return REDICTMODULE_OK;
|
|
}
|
|
|
|
/* BLOCK.IS_BLOCKED -- Returns 1 if we have a blocked client, or 0 otherwise.
|
|
*/
|
|
int IsBlocked_RedictCommand(RedictModuleCtx *ctx, RedictModuleString **argv, int argc) {
|
|
UNUSED(argv);
|
|
UNUSED(argc);
|
|
RedictModule_ReplyWithLongLong(ctx, blocked_client ? 1 : 0);
|
|
return REDICTMODULE_OK;
|
|
}
|
|
|
|
/* BLOCK.RELEASE [reply] -- Releases the blocked client and produce the specified reply.
|
|
*/
|
|
int Release_RedictCommand(RedictModuleCtx *ctx, RedictModuleString **argv, int argc) {
|
|
if (argc != 2) return RedictModule_WrongArity(ctx);
|
|
if (!blocked_client) {
|
|
return RedictModule_ReplyWithError(ctx, "ERR No blocked client");
|
|
}
|
|
|
|
RedictModuleString *replystr = argv[1];
|
|
RedictModule_RetainString(ctx, replystr);
|
|
RedictModule_UnblockClient(blocked_client, replystr);
|
|
blocked_client = NULL;
|
|
|
|
RedictModule_ReplyWithSimpleString(ctx, "OK");
|
|
|
|
return REDICTMODULE_OK;
|
|
}
|
|
|
|
int RedictModule_OnLoad(RedictModuleCtx *ctx, RedictModuleString **argv, int argc) {
|
|
UNUSED(argv);
|
|
UNUSED(argc);
|
|
|
|
if (RedictModule_Init(ctx,"block",1,REDICTMODULE_APIVER_1)
|
|
== REDICTMODULE_ERR) return REDICTMODULE_ERR;
|
|
|
|
if (RedictModule_CreateCommand(ctx,"block.debug",
|
|
HelloBlock_RedictCommand,"",0,0,0) == REDICTMODULE_ERR)
|
|
return REDICTMODULE_ERR;
|
|
|
|
if (RedictModule_CreateCommand(ctx,"block.double_debug",
|
|
HelloDoubleBlock_RedictCommand,"",0,0,0) == REDICTMODULE_ERR)
|
|
return REDICTMODULE_ERR;
|
|
|
|
if (RedictModule_CreateCommand(ctx,"block.debug_no_track",
|
|
HelloBlockNoTracking_RedictCommand,"",0,0,0) == REDICTMODULE_ERR)
|
|
return REDICTMODULE_ERR;
|
|
|
|
if (RedictModule_CreateCommand(ctx, "block.block",
|
|
Block_RedictCommand, "", 0, 0, 0) == REDICTMODULE_ERR)
|
|
return REDICTMODULE_ERR;
|
|
|
|
if (RedictModule_CreateCommand(ctx,"block.is_blocked",
|
|
IsBlocked_RedictCommand,"",0,0,0) == REDICTMODULE_ERR)
|
|
return REDICTMODULE_ERR;
|
|
|
|
if (RedictModule_CreateCommand(ctx,"block.release",
|
|
Release_RedictCommand,"",0,0,0) == REDICTMODULE_ERR)
|
|
return REDICTMODULE_ERR;
|
|
|
|
return REDICTMODULE_OK;
|
|
}
|