Modules: blocking commands WIP: API exported, a first example.

This commit is contained in:
antirez 2016-10-07 13:48:05 +02:00
parent 3aa816e61a
commit ffb00fbcbe
3 changed files with 38 additions and 3 deletions

View File

@ -3085,7 +3085,21 @@ void unblockClientFromModule(client *c) {
bc->client = NULL; bc->client = NULL;
} }
int RM_BlockClient(RedisModuleCtx *ctx, RedisModuleCmdFunc reply_callback, RedisModuleCmdFunc timeout_callback, void (*free_privdata)(void*), long long timeout_ms) { /* Block a client in the context of a blocking command, returning an handle
* which will be used, later, in order to block the client with a call to
* RedisModule_UnblockClient(). The arguments specify callback functions
* and a timeout after which the client is unblocked.
*
* The callbacks are called in the following contexts:
*
* reply_callback: called after a successful RedisModule_UnblockClient() call
* in order to reply to the client and unblock it.
* reply_timeout: called when the timeout is reached in order to send an
* error to the client.
* free_privdata: called in order to free the privata data that is passed
* by RedisModule_UnblockClient() call.
*/
RedisModuleBlockedClient *RM_BlockClient(RedisModuleCtx *ctx, RedisModuleCmdFunc reply_callback, RedisModuleCmdFunc timeout_callback, void (*free_privdata)(void*), long long timeout_ms) {
client *c = ctx->client; client *c = ctx->client;
c->bpop.module_blocked_handle = zmalloc(sizeof(RedisModuleBlockedClient)); c->bpop.module_blocked_handle = zmalloc(sizeof(RedisModuleBlockedClient));
RedisModuleBlockedClient *bc = c->bpop.module_blocked_handle; RedisModuleBlockedClient *bc = c->bpop.module_blocked_handle;
@ -3099,7 +3113,7 @@ int RM_BlockClient(RedisModuleCtx *ctx, RedisModuleCmdFunc reply_callback, Redis
c->bpop.timeout = timeout_ms; c->bpop.timeout = timeout_ms;
blockClient(c,BLOCKED_MODULE); blockClient(c,BLOCKED_MODULE);
return REDISMODULE_OK; return bc;
} }
/* Unblock a client blocked by `RedisModule_BlockedClient`. This will trigger /* Unblock a client blocked by `RedisModule_BlockedClient`. This will trigger
@ -3513,4 +3527,9 @@ void moduleRegisterCoreAPI(void) {
REGISTER_API(RetainString); REGISTER_API(RetainString);
REGISTER_API(StringCompare); REGISTER_API(StringCompare);
REGISTER_API(GetContextFromIO); REGISTER_API(GetContextFromIO);
REGISTER_API(BlockClient);
REGISTER_API(UnblockClient);
REGISTER_API(IsBlockedReplyRequest);
REGISTER_API(IsBlockedTimeoutRequest);
REGISTER_API(GetBlockedClientPrivateData);
} }

View File

@ -13,7 +13,7 @@ endif
.SUFFIXES: .c .so .xo .o .SUFFIXES: .c .so .xo .o
all: helloworld.so hellotype.so testmodule.so all: helloworld.so hellotype.so helloblock.so testmodule.so
.c.xo: .c.xo:
$(CC) -I. $(CFLAGS) $(SHOBJ_CFLAGS) -fPIC -c $< -o $@ $(CC) -I. $(CFLAGS) $(SHOBJ_CFLAGS) -fPIC -c $< -o $@
@ -28,6 +28,11 @@ hellotype.xo: ../redismodule.h
hellotype.so: hellotype.xo hellotype.so: hellotype.xo
$(LD) -o $@ $< $(SHOBJ_LDFLAGS) $(LIBS) -lc $(LD) -o $@ $< $(SHOBJ_LDFLAGS) $(LIBS) -lc
helloblock.xo: ../redismodule.h
helloblock.so: helloblock.xo
$(LD) -o $@ $< $(SHOBJ_LDFLAGS) $(LIBS) -lpthread -lc
testmodule.xo: ../redismodule.h testmodule.xo: ../redismodule.h
testmodule.so: testmodule.xo testmodule.so: testmodule.xo

View File

@ -84,6 +84,7 @@ typedef struct RedisModuleCallReply RedisModuleCallReply;
typedef struct RedisModuleIO RedisModuleIO; typedef struct RedisModuleIO RedisModuleIO;
typedef struct RedisModuleType RedisModuleType; typedef struct RedisModuleType RedisModuleType;
typedef struct RedisModuleDigest RedisModuleDigest; typedef struct RedisModuleDigest RedisModuleDigest;
typedef struct RedisModuleBlockedClient RedisModuleBlockedClient;
typedef int (*RedisModuleCmdFunc) (RedisModuleCtx *ctx, RedisModuleString **argv, int argc); typedef int (*RedisModuleCmdFunc) (RedisModuleCtx *ctx, RedisModuleString **argv, int argc);
@ -194,6 +195,11 @@ int REDISMODULE_API_FUNC(RedisModule_StringAppendBuffer)(RedisModuleCtx *ctx, Re
void REDISMODULE_API_FUNC(RedisModule_RetainString)(RedisModuleCtx *ctx, RedisModuleString *str); void REDISMODULE_API_FUNC(RedisModule_RetainString)(RedisModuleCtx *ctx, RedisModuleString *str);
int REDISMODULE_API_FUNC(RedisModule_StringCompare)(RedisModuleString *a, RedisModuleString *b); int REDISMODULE_API_FUNC(RedisModule_StringCompare)(RedisModuleString *a, RedisModuleString *b);
RedisModuleCtx *REDISMODULE_API_FUNC(RedisModule_GetContextFromIO)(RedisModuleIO *io); RedisModuleCtx *REDISMODULE_API_FUNC(RedisModule_GetContextFromIO)(RedisModuleIO *io);
RedisModuleBlockedClient *REDISMODULE_API_FUNC(RedisModule_BlockClient)(RedisModuleCtx *ctx, RedisModuleCmdFunc reply_callback, RedisModuleCmdFunc timeout_callback, void (*free_privdata)(void*), long long timeout_ms);
int REDISMODULE_API_FUNC(RedisModule_UnblockClient)(RedisModuleBlockedClient *bc, void *privdata);
int REDISMODULE_API_FUNC(RedisModule_IsBlockedReplyRequest)(RedisModuleCtx *ctx);
int REDISMODULE_API_FUNC(RedisModule_IsBlockedTimeoutRequest)(RedisModuleCtx *ctx);
void *REDISMODULE_API_FUNC(RedisModule_GetBlockedClientPrivateData)(RedisModuleCtx *ctx);
/* This is included inline inside each Redis module. */ /* This is included inline inside each Redis module. */
static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int apiver) __attribute__((unused)); static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int apiver) __attribute__((unused));
@ -295,6 +301,11 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int
REDISMODULE_GET_API(RetainString); REDISMODULE_GET_API(RetainString);
REDISMODULE_GET_API(StringCompare); REDISMODULE_GET_API(StringCompare);
REDISMODULE_GET_API(GetContextFromIO); REDISMODULE_GET_API(GetContextFromIO);
REDISMODULE_GET_API(BlockClient);
REDISMODULE_GET_API(UnblockClient);
REDISMODULE_GET_API(IsBlockedReplyRequest);
REDISMODULE_GET_API(IsBlockedTimeoutRequest);
REDISMODULE_GET_API(GetBlockedClientPrivateData);
RedisModule_SetModuleAttribs(ctx,name,ver,apiver); RedisModule_SetModuleAttribs(ctx,name,ver,apiver);
return REDISMODULE_OK; return REDISMODULE_OK;