diff --git a/runtest-moduleapi b/runtest-moduleapi index 9301002c9..e48535126 100755 --- a/runtest-moduleapi +++ b/runtest-moduleapi @@ -21,4 +21,5 @@ $TCLSH tests/test_helper.tcl \ --single unit/moduleapi/propagate \ --single unit/moduleapi/hooks \ --single unit/moduleapi/misc \ +--single unit/moduleapi/blockonkeys \ "${@}" diff --git a/tests/modules/Makefile b/tests/modules/Makefile index 71c0b5ef8..9e27758a2 100644 --- a/tests/modules/Makefile +++ b/tests/modules/Makefile @@ -18,7 +18,8 @@ TEST_MODULES = \ infotest.so \ propagate.so \ misc.so \ - hooks.so + hooks.so \ + blockonkeys.so .PHONY: all diff --git a/tests/modules/blockonkeys.c b/tests/modules/blockonkeys.c new file mode 100644 index 000000000..959918b1c --- /dev/null +++ b/tests/modules/blockonkeys.c @@ -0,0 +1,261 @@ +#define REDISMODULE_EXPERIMENTAL_API +#include "redismodule.h" + +#include +#include +#include + +#define LIST_SIZE 1024 + +typedef struct { + long long list[LIST_SIZE]; + long long length; +} fsl_t; /* Fixed-size list */ + +static RedisModuleType *fsltype = NULL; + +fsl_t *fsl_type_create() { + fsl_t *o; + o = RedisModule_Alloc(sizeof(*o)); + o->length = 0; + return o; +} + +void fsl_type_free(fsl_t *o) { + RedisModule_Free(o); +} + +/* ========================== "fsltype" type methods ======================= */ + +void *fsl_rdb_load(RedisModuleIO *rdb, int encver) { + if (encver != 0) { + return NULL; + } + fsl_t *fsl = fsl_type_create(); + fsl->length = RedisModule_LoadUnsigned(rdb); + for (long long i = 0; i < fsl->length; i++) + fsl->list[i] = RedisModule_LoadSigned(rdb); + return fsl; +} + +void fsl_rdb_save(RedisModuleIO *rdb, void *value) { + fsl_t *fsl = value; + RedisModule_SaveUnsigned(rdb,fsl->length); + for (long long i = 0; i < fsl->length; i++) + RedisModule_SaveSigned(rdb, fsl->list[i]); +} + +void fsl_aofrw(RedisModuleIO *aof, RedisModuleString *key, void *value) { + fsl_t *fsl = value; + for (long long i = 0; i < fsl->length; i++) + RedisModule_EmitAOF(aof, "FSL.PUSH","sl", key, fsl->list[i]); +} + +void fsl_free(void *value) { + fsl_type_free(value); +} + +/* ========================== helper methods ======================= */ + +int get_fsl(RedisModuleCtx *ctx, RedisModuleString *keyname, int mode, int create, fsl_t **fsl, int reply_on_failure) { + RedisModuleKey *key = RedisModule_OpenKey(ctx, keyname, mode); + + int type = RedisModule_KeyType(key); + if (type != REDISMODULE_KEYTYPE_EMPTY && RedisModule_ModuleTypeGetType(key) != fsltype) { + RedisModule_CloseKey(key); + if (reply_on_failure) + RedisModule_ReplyWithError(ctx, REDISMODULE_ERRORMSG_WRONGTYPE); + return 0; + } + + /* Create an empty value object if the key is currently empty. */ + if (type == REDISMODULE_KEYTYPE_EMPTY) { + if (!create) { + /* Key is empty but we cannot create */ + RedisModule_CloseKey(key); + *fsl = NULL; + return 1; + } + *fsl = fsl_type_create(); + RedisModule_ModuleTypeSetValue(key, fsltype, *fsl); + } else { + *fsl = RedisModule_ModuleTypeGetValue(key); + } + + RedisModule_CloseKey(key); + return 1; +} + +/* ========================== commands ======================= */ + +/* FSL.PUSH - Push an integer to the fixed-size list (to the right). + * It must be greater than the element in the head of the list. */ +int fsl_push(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + if (argc != 3) + return RedisModule_WrongArity(ctx); + + long long ele; + if (RedisModule_StringToLongLong(argv[2],&ele) != REDISMODULE_OK) + return RedisModule_ReplyWithError(ctx,"ERR invalid integer"); + + fsl_t *fsl; + if (!get_fsl(ctx, argv[1], REDISMODULE_WRITE, 1, &fsl, 1)) + return REDISMODULE_OK; + + if (fsl->length == LIST_SIZE) + return RedisModule_ReplyWithError(ctx,"ERR list is full"); + + if (fsl->length != 0 && fsl->list[fsl->length-1] >= ele) + return RedisModule_ReplyWithError(ctx,"ERR new element has to be greater than the head element"); + + fsl->list[fsl->length++] = ele; + + if (fsl->length >= 2) + RedisModule_SignalKeyAsReady(ctx, argv[1]); + + return RedisModule_ReplyWithSimpleString(ctx, "OK"); +} + +int bpop2_reply_callback(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + REDISMODULE_NOT_USED(argv); + REDISMODULE_NOT_USED(argc); + RedisModuleString *keyname = RedisModule_GetBlockedClientReadyKey(ctx); + + fsl_t *fsl; + if (!get_fsl(ctx, keyname, REDISMODULE_READ, 0, &fsl, 0)) + return REDISMODULE_ERR; + + if (!fsl || fsl->length < 2) + return REDISMODULE_ERR; + + RedisModule_ReplyWithArray(ctx, 2); + RedisModule_ReplyWithLongLong(ctx, fsl->list[--fsl->length]); + RedisModule_ReplyWithLongLong(ctx, fsl->list[--fsl->length]); + return REDISMODULE_OK; +} + +int bpop2_timeout_callback(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + REDISMODULE_NOT_USED(argv); + REDISMODULE_NOT_USED(argc); + return RedisModule_ReplyWithSimpleString(ctx, "Request timedout"); +} + + +/* FSL.BPOP2 - Block clients until list has two or more elements. + * When that happens, unblock client and pop the last two elements (from the right). */ +int fsl_bpop2(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + if (argc != 3) + return RedisModule_WrongArity(ctx); + + long long timeout; + if (RedisModule_StringToLongLong(argv[2],&timeout) != REDISMODULE_OK || timeout < 0) + return RedisModule_ReplyWithError(ctx,"ERR invalid timeout"); + + fsl_t *fsl; + if (!get_fsl(ctx, argv[1], REDISMODULE_READ, 0, &fsl, 1)) + return REDISMODULE_OK; + + if (!fsl || fsl->length < 2) { + /* Key is empty or has <2 elements, we must block */ + RedisModule_BlockClientOnKeys(ctx, bpop2_reply_callback, bpop2_timeout_callback, + NULL, timeout, &argv[1], 1, NULL); + } else { + RedisModule_ReplyWithArray(ctx, 2); + RedisModule_ReplyWithLongLong(ctx, fsl->list[--fsl->length]); + RedisModule_ReplyWithLongLong(ctx, fsl->list[--fsl->length]); + } + + return REDISMODULE_OK; +} + +int bpopgt_reply_callback(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + REDISMODULE_NOT_USED(argv); + REDISMODULE_NOT_USED(argc); + RedisModuleString *keyname = RedisModule_GetBlockedClientReadyKey(ctx); + long long gt = (long long)RedisModule_GetBlockedClientPrivateData(ctx); + + fsl_t *fsl; + if (!get_fsl(ctx, keyname, REDISMODULE_READ, 0, &fsl, 0)) + return REDISMODULE_ERR; + + if (!fsl || fsl->list[fsl->length-1] <= gt) + return REDISMODULE_ERR; + + RedisModule_ReplyWithLongLong(ctx, fsl->list[--fsl->length]); + return REDISMODULE_OK; +} + +int bpopgt_timeout_callback(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + REDISMODULE_NOT_USED(argv); + REDISMODULE_NOT_USED(argc); + return RedisModule_ReplyWithSimpleString(ctx, "Request timedout"); +} + +void bpopgt_free_privdata(RedisModuleCtx *ctx, void *privdata) { + /* Nothing to do because privdata is actually a 'long long', + * not a pointer to the heap */ + REDISMODULE_NOT_USED(ctx); + REDISMODULE_NOT_USED(privdata); +} + +/* FSL.BPOPGT - Block clients until list has an element greater than . + * When that happens, unblock client and pop the last element (from the right). */ +int fsl_bpopgt(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + if (argc != 4) + return RedisModule_WrongArity(ctx); + + long long gt; + if (RedisModule_StringToLongLong(argv[2],>) != REDISMODULE_OK) + return RedisModule_ReplyWithError(ctx,"ERR invalid integer"); + + long long timeout; + if (RedisModule_StringToLongLong(argv[3],&timeout) != REDISMODULE_OK || timeout < 0) + return RedisModule_ReplyWithError(ctx,"ERR invalid timeout"); + + fsl_t *fsl; + if (!get_fsl(ctx, argv[1], REDISMODULE_READ, 0, &fsl, 1)) + return REDISMODULE_OK; + + if (!fsl || fsl->list[fsl->length-1] <= gt) { + /* Key is empty or has <2 elements, we must block */ + RedisModule_BlockClientOnKeys(ctx, bpopgt_reply_callback, bpopgt_timeout_callback, + bpopgt_free_privdata, timeout, &argv[1], 1, (void*)gt); + } else { + RedisModule_ReplyWithLongLong(ctx, fsl->list[--fsl->length]); + } + + return REDISMODULE_OK; +} + +int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + REDISMODULE_NOT_USED(argv); + REDISMODULE_NOT_USED(argc); + + if (RedisModule_Init(ctx, "blockonkeys", 1, REDISMODULE_APIVER_1)== REDISMODULE_ERR) + return REDISMODULE_ERR; + + RedisModuleTypeMethods tm = { + .version = REDISMODULE_TYPE_METHOD_VERSION, + .rdb_load = fsl_rdb_load, + .rdb_save = fsl_rdb_save, + .aof_rewrite = fsl_aofrw, + .mem_usage = NULL, + .free = fsl_free, + .digest = NULL + }; + + fsltype = RedisModule_CreateDataType(ctx, "fsltype_t", 0, &tm); + if (fsltype == NULL) + return REDISMODULE_ERR; + + if (RedisModule_CreateCommand(ctx,"fsl.push",fsl_push,"",0,0,0) == REDISMODULE_ERR) + return REDISMODULE_ERR; + + if (RedisModule_CreateCommand(ctx,"fsl.bpop2",fsl_bpop2,"",0,0,0) == REDISMODULE_ERR) + return REDISMODULE_ERR; + + if (RedisModule_CreateCommand(ctx,"fsl.bpopgt",fsl_bpopgt,"",0,0,0) == REDISMODULE_ERR) + return REDISMODULE_ERR; + + return REDISMODULE_OK; +} diff --git a/tests/unit/moduleapi/blockonkeys.tcl b/tests/unit/moduleapi/blockonkeys.tcl new file mode 100644 index 000000000..cb99ab1c9 --- /dev/null +++ b/tests/unit/moduleapi/blockonkeys.tcl @@ -0,0 +1,85 @@ +set testmodule [file normalize tests/modules/blockonkeys.so] + +start_server {tags {"modules"}} { + r module load $testmodule + + test {Module client blocked on keys (no metadata): No block} { + r del k + r fsl.push k 33 + r fsl.push k 34 + r fsl.bpop2 k 0 + } {34 33} + + test {Module client blocked on keys (no metadata): Timeout} { + r del k + set rd [redis_deferring_client] + r fsl.push k 33 + $rd fsl.bpop2 k 1 + assert_equal {Request timedout} [$rd read] + } + + test {Module client blocked on keys (no metadata): Blocked, case 1} { + r del k + set rd [redis_deferring_client] + r fsl.push k 33 + $rd fsl.bpop2 k 0 + r fsl.push k 34 + assert_equal {34 33} [$rd read] + } + + test {Module client blocked on keys (no metadata): Blocked, case 2} { + r del k + set rd [redis_deferring_client] + r fsl.push k 33 + r fsl.push k 34 + $rd fsl.bpop2 k 0 + assert_equal {34 33} [$rd read] + } + + test {Module client blocked on keys (with metadata): No block} { + r del k + r fsl.push k 34 + r fsl.bpopgt k 30 0 + } {34} + + test {Module client blocked on keys (with metadata): Timeout} { + r del k + set rd [redis_deferring_client] + r fsl.push k 33 + $rd fsl.bpopgt k 35 1 + assert_equal {Request timedout} [$rd read] + } + + test {Module client blocked on keys (with metadata): Blocked, case 1} { + r del k + set rd [redis_deferring_client] + r fsl.push k 33 + $rd fsl.bpopgt k 33 0 + r fsl.push k 34 + assert_equal {34} [$rd read] + } + + test {Module client blocked on keys (with metadata): Blocked, case 2} { + r del k + set rd [redis_deferring_client] + $rd fsl.bpopgt k 35 0 + r fsl.push k 33 + r fsl.push k 34 + r fsl.push k 35 + r fsl.push k 36 + assert_equal {36} [$rd read] + } + + test {Module client blocked on keys does not wake up on wrong type} { + r del k + set rd [redis_deferring_client] + $rd fsl.bpop2 k 0 + r lpush k 12 + r lpush k 13 + r lpush k 14 + r del k + r fsl.push k 33 + r fsl.push k 34 + assert_equal {34 33} [$rd read] + } +}