Modules API: blocked client free callback modified to get a context.

Note that this was an experimental API that can only be enabled with
REIDSMODULE_EXPERIMENTAL_API, so it is subject to change until its
promoted to stable API. Sorry for the breakage, it is trivial to
resolve btw. This change will not be back ported to Redis 4.0.
This commit is contained in:
antirez 2018-04-09 11:54:44 +02:00
parent b2868c7b9c
commit 49e098234a
4 changed files with 52 additions and 32 deletions

View File

@ -127,6 +127,7 @@ typedef struct RedisModuleCtx RedisModuleCtx;
#define REDISMODULE_CTX_BLOCKED_REPLY (1<<3)
#define REDISMODULE_CTX_BLOCKED_TIMEOUT (1<<4)
#define REDISMODULE_CTX_THREAD_SAFE (1<<5)
#define REDISMODULE_CTX_BLOCKED_DISCONNECTED (1<<6)
/* This represents a Redis key opened with RM_OpenKey(). */
struct RedisModuleKey {
@ -200,7 +201,7 @@ typedef struct RedisModuleBlockedClient {
RedisModule *module; /* Module blocking the client. */
RedisModuleCmdFunc reply_callback; /* Reply callback on normal completion.*/
RedisModuleCmdFunc timeout_callback; /* Reply callback on timeout. */
void (*free_privdata)(void *); /* privdata cleanup callback. */
void (*free_privdata)(RedisModuleCtx*,void*);/* privdata cleanup callback.*/
void *privdata; /* Module private data that may be used by the reply
or timeout callback. It is set via the
RedisModule_UnblockClient() API. */
@ -3449,7 +3450,7 @@ void unblockClientFromModule(client *c) {
* free_privdata: called in order to free the privata data that is passed
* by RedisModule_UnblockClient() call.
*/
RedisModuleBlockedClient *RM_BlockClient(RedisModuleCtx *ctx, RedisModuleCmdFunc reply_callback, RedisModuleCmdFunc timeout_callback, void (*free_privdata)(void*), long long timeout_ms) {
RedisModuleBlockedClient *RM_BlockClient(RedisModuleCtx *ctx, RedisModuleCmdFunc reply_callback, RedisModuleCmdFunc timeout_callback, void (*free_privdata)(RedisModuleCtx*,void*), long long timeout_ms) {
client *c = ctx->client;
int islua = c->flags & CLIENT_LUA;
int ismulti = c->flags & CLIENT_MULTI;
@ -3553,8 +3554,16 @@ void moduleHandleBlockedClients(void) {
}
/* Free privdata if any. */
if (bc->privdata && bc->free_privdata)
bc->free_privdata(bc->privdata);
if (bc->privdata && bc->free_privdata) {
RedisModuleCtx ctx = REDISMODULE_CTX_INIT;
if (c == NULL)
ctx.flags |= REDISMODULE_CTX_BLOCKED_DISCONNECTED;
ctx.blocked_privdata = bc->privdata;
ctx.module = bc->module;
ctx.client = bc->client;
bc->free_privdata(&ctx,bc->privdata);
moduleFreeContext(&ctx);
}
/* It is possible that this blocked client object accumulated
* replies to send to the client in a thread safe context.
@ -3625,6 +3634,13 @@ void *RM_GetBlockedClientPrivateData(RedisModuleCtx *ctx) {
return ctx->blocked_privdata;
}
/* Return true if when the free callback of a blocked client is called,
* the reason for the client to be unblocked is that it disconnected
* while it was blocked. */
int RM_BlockedClientDisconnected(RedisModuleCtx *ctx) {
return (ctx->flags & REDISMODULE_CTX_BLOCKED_DISCONNECTED) != 0;
}
/* --------------------------------------------------------------------------
* Thread Safe Contexts
* -------------------------------------------------------------------------- */
@ -4596,4 +4612,5 @@ void moduleRegisterCoreAPI(void) {
REGISTER_API(GetClusterSize);
REGISTER_API(GetRandomBytes);
REGISTER_API(GetRandomHexChars);
REGISTER_API(BlockedClientDisconnected);
}

View File

@ -54,7 +54,8 @@ int HelloBlock_Timeout(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
}
/* Private data freeing callback for HELLO.BLOCK command. */
void HelloBlock_FreeData(void *privdata) {
void HelloBlock_FreeData(RedisModuleCtx *ctx, void *privdata) {
REDISMODULE_NOT_USED(ctx);
RedisModule_Free(privdata);
}

View File

@ -30,6 +30,7 @@
* POSSIBILITY OF SUCH DAMAGE.
*/
#define REDISMODULE_EXPERIMENTAL_API
#include "../redismodule.h"
#include <stdio.h>
#include <stdlib.h>

View File

@ -268,6 +268,21 @@ long long REDISMODULE_API_FUNC(RedisModule_Milliseconds)(void);
void REDISMODULE_API_FUNC(RedisModule_DigestAddStringBuffer)(RedisModuleDigest *md, unsigned char *ele, size_t len);
void REDISMODULE_API_FUNC(RedisModule_DigestAddLongLong)(RedisModuleDigest *md, long long ele);
void REDISMODULE_API_FUNC(RedisModule_DigestEndSequence)(RedisModuleDigest *md);
/* Experimental APIs */
#ifdef REDISMODULE_EXPERIMENTAL_API
RedisModuleBlockedClient *REDISMODULE_API_FUNC(RedisModule_BlockClient)(RedisModuleCtx *ctx, RedisModuleCmdFunc reply_callback, RedisModuleCmdFunc timeout_callback, void (*free_privdata)(RedisModuleCtx*,void*), long long timeout_ms);
int REDISMODULE_API_FUNC(RedisModule_UnblockClient)(RedisModuleBlockedClient *bc, void *privdata);
int REDISMODULE_API_FUNC(RedisModule_IsBlockedReplyRequest)(RedisModuleCtx *ctx);
int REDISMODULE_API_FUNC(RedisModule_IsBlockedTimeoutRequest)(RedisModuleCtx *ctx);
void *REDISMODULE_API_FUNC(RedisModule_GetBlockedClientPrivateData)(RedisModuleCtx *ctx);
int REDISMODULE_API_FUNC(RedisModule_AbortBlock)(RedisModuleBlockedClient *bc);
RedisModuleCtx *REDISMODULE_API_FUNC(RedisModule_GetThreadSafeContext)(RedisModuleBlockedClient *bc);
void REDISMODULE_API_FUNC(RedisModule_FreeThreadSafeContext)(RedisModuleCtx *ctx);
void REDISMODULE_API_FUNC(RedisModule_ThreadSafeContextLock)(RedisModuleCtx *ctx);
void REDISMODULE_API_FUNC(RedisModule_ThreadSafeContextUnlock)(RedisModuleCtx *ctx);
int REDISMODULE_API_FUNC(RedisModule_SubscribeToKeyspaceEvents)(RedisModuleCtx *ctx, int types, RedisModuleNotificationFunc cb);
int REDISMODULE_API_FUNC(RedisModule_BlockedClientDisconnected)(RedisModuleCtx *ctx);
void REDISMODULE_API_FUNC(RedisModule_RegisterClusterMessageReceiver)(RedisModuleCtx *ctx, uint8_t type, RedisModuleClusterMessageReceiver callback);
int REDISMODULE_API_FUNC(RedisModule_SendClusterMessage)(RedisModuleCtx *ctx, char *target_id, uint8_t type, unsigned char *msg, uint32_t len);
int REDISMODULE_API_FUNC(RedisModule_GetClusterNodeInfo)(RedisModuleCtx *ctx, const char *id, char *ip, char *master_id, int *port, int *flags);
@ -280,21 +295,6 @@ const char *REDISMODULE_API_FUNC(RedisModule_GetMyClusterID)(void);
size_t REDISMODULE_API_FUNC(RedisModule_GetClusterSize)(void);
void REDISMODULE_API_FUNC(RedisModule_GetRandomBytes)(unsigned char *dst, size_t len);
void REDISMODULE_API_FUNC(RedisModule_GetRandomHexChars)(char *dst, size_t len);
/* Experimental APIs */
#ifdef REDISMODULE_EXPERIMENTAL_API
RedisModuleBlockedClient *REDISMODULE_API_FUNC(RedisModule_BlockClient)(RedisModuleCtx *ctx, RedisModuleCmdFunc reply_callback, RedisModuleCmdFunc timeout_callback, void (*free_privdata)(void*), long long timeout_ms);
int REDISMODULE_API_FUNC(RedisModule_UnblockClient)(RedisModuleBlockedClient *bc, void *privdata);
int REDISMODULE_API_FUNC(RedisModule_IsBlockedReplyRequest)(RedisModuleCtx *ctx);
int REDISMODULE_API_FUNC(RedisModule_IsBlockedTimeoutRequest)(RedisModuleCtx *ctx);
void *REDISMODULE_API_FUNC(RedisModule_GetBlockedClientPrivateData)(RedisModuleCtx *ctx);
int REDISMODULE_API_FUNC(RedisModule_AbortBlock)(RedisModuleBlockedClient *bc);
RedisModuleCtx *REDISMODULE_API_FUNC(RedisModule_GetThreadSafeContext)(RedisModuleBlockedClient *bc);
void REDISMODULE_API_FUNC(RedisModule_FreeThreadSafeContext)(RedisModuleCtx *ctx);
void REDISMODULE_API_FUNC(RedisModule_ThreadSafeContextLock)(RedisModuleCtx *ctx);
void REDISMODULE_API_FUNC(RedisModule_ThreadSafeContextUnlock)(RedisModuleCtx *ctx);
int REDISMODULE_API_FUNC(RedisModule_SubscribeToKeyspaceEvents)(RedisModuleCtx *ctx, int types, RedisModuleNotificationFunc cb);
#endif
/* This is included inline inside each Redis module. */
@ -404,18 +404,6 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int
REDISMODULE_GET_API(DigestAddStringBuffer);
REDISMODULE_GET_API(DigestAddLongLong);
REDISMODULE_GET_API(DigestEndSequence);
REDISMODULE_GET_API(RegisterClusterMessageReceiver);
REDISMODULE_GET_API(SendClusterMessage);
REDISMODULE_GET_API(GetClusterNodeInfo);
REDISMODULE_GET_API(GetClusterNodesList);
REDISMODULE_GET_API(FreeClusterNodesList);
REDISMODULE_GET_API(CreateTimer);
REDISMODULE_GET_API(StopTimer);
REDISMODULE_GET_API(GetTimerInfo);
REDISMODULE_GET_API(GetMyClusterID);
REDISMODULE_GET_API(GetClusterSize);
REDISMODULE_GET_API(GetRandomBytes);
REDISMODULE_GET_API(GetRandomHexChars);
#ifdef REDISMODULE_EXPERIMENTAL_API
REDISMODULE_GET_API(GetThreadSafeContext);
@ -429,6 +417,19 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int
REDISMODULE_GET_API(GetBlockedClientPrivateData);
REDISMODULE_GET_API(AbortBlock);
REDISMODULE_GET_API(SubscribeToKeyspaceEvents);
REDISMODULE_GET_API(BlockedClientDisconnected);
REDISMODULE_GET_API(RegisterClusterMessageReceiver);
REDISMODULE_GET_API(SendClusterMessage);
REDISMODULE_GET_API(GetClusterNodeInfo);
REDISMODULE_GET_API(GetClusterNodesList);
REDISMODULE_GET_API(FreeClusterNodesList);
REDISMODULE_GET_API(CreateTimer);
REDISMODULE_GET_API(StopTimer);
REDISMODULE_GET_API(GetTimerInfo);
REDISMODULE_GET_API(GetMyClusterID);
REDISMODULE_GET_API(GetClusterSize);
REDISMODULE_GET_API(GetRandomBytes);
REDISMODULE_GET_API(GetRandomHexChars);
#endif
if (RedisModule_IsModuleNameBusy && RedisModule_IsModuleNameBusy(name)) return REDISMODULE_ERR;