mirror of
https://codeberg.org/redict/redict.git
synced 2025-01-22 08:08:53 -05:00
Modules: Test RedisModule_BlockClientOnKeys
This commit is contained in:
parent
fdaea2a7a7
commit
b81f486c2f
@ -21,4 +21,5 @@ $TCLSH tests/test_helper.tcl \
|
|||||||
--single unit/moduleapi/propagate \
|
--single unit/moduleapi/propagate \
|
||||||
--single unit/moduleapi/hooks \
|
--single unit/moduleapi/hooks \
|
||||||
--single unit/moduleapi/misc \
|
--single unit/moduleapi/misc \
|
||||||
|
--single unit/moduleapi/blockonkeys \
|
||||||
"${@}"
|
"${@}"
|
||||||
|
@ -18,7 +18,8 @@ TEST_MODULES = \
|
|||||||
infotest.so \
|
infotest.so \
|
||||||
propagate.so \
|
propagate.so \
|
||||||
misc.so \
|
misc.so \
|
||||||
hooks.so
|
hooks.so \
|
||||||
|
blockonkeys.so
|
||||||
|
|
||||||
.PHONY: all
|
.PHONY: all
|
||||||
|
|
||||||
|
261
tests/modules/blockonkeys.c
Normal file
261
tests/modules/blockonkeys.c
Normal file
@ -0,0 +1,261 @@
|
|||||||
|
#define REDISMODULE_EXPERIMENTAL_API
|
||||||
|
#include "redismodule.h"
|
||||||
|
|
||||||
|
#include <string.h>
|
||||||
|
#include <assert.h>
|
||||||
|
#include <unistd.h>
|
||||||
|
|
||||||
|
#define LIST_SIZE 1024
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
long long list[LIST_SIZE];
|
||||||
|
long long length;
|
||||||
|
} fsl_t; /* Fixed-size list */
|
||||||
|
|
||||||
|
static RedisModuleType *fsltype = NULL;
|
||||||
|
|
||||||
|
fsl_t *fsl_type_create() {
|
||||||
|
fsl_t *o;
|
||||||
|
o = RedisModule_Alloc(sizeof(*o));
|
||||||
|
o->length = 0;
|
||||||
|
return o;
|
||||||
|
}
|
||||||
|
|
||||||
|
void fsl_type_free(fsl_t *o) {
|
||||||
|
RedisModule_Free(o);
|
||||||
|
}
|
||||||
|
|
||||||
|
/* ========================== "fsltype" type methods ======================= */
|
||||||
|
|
||||||
|
void *fsl_rdb_load(RedisModuleIO *rdb, int encver) {
|
||||||
|
if (encver != 0) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
fsl_t *fsl = fsl_type_create();
|
||||||
|
fsl->length = RedisModule_LoadUnsigned(rdb);
|
||||||
|
for (long long i = 0; i < fsl->length; i++)
|
||||||
|
fsl->list[i] = RedisModule_LoadSigned(rdb);
|
||||||
|
return fsl;
|
||||||
|
}
|
||||||
|
|
||||||
|
void fsl_rdb_save(RedisModuleIO *rdb, void *value) {
|
||||||
|
fsl_t *fsl = value;
|
||||||
|
RedisModule_SaveUnsigned(rdb,fsl->length);
|
||||||
|
for (long long i = 0; i < fsl->length; i++)
|
||||||
|
RedisModule_SaveSigned(rdb, fsl->list[i]);
|
||||||
|
}
|
||||||
|
|
||||||
|
void fsl_aofrw(RedisModuleIO *aof, RedisModuleString *key, void *value) {
|
||||||
|
fsl_t *fsl = value;
|
||||||
|
for (long long i = 0; i < fsl->length; i++)
|
||||||
|
RedisModule_EmitAOF(aof, "FSL.PUSH","sl", key, fsl->list[i]);
|
||||||
|
}
|
||||||
|
|
||||||
|
void fsl_free(void *value) {
|
||||||
|
fsl_type_free(value);
|
||||||
|
}
|
||||||
|
|
||||||
|
/* ========================== helper methods ======================= */
|
||||||
|
|
||||||
|
int get_fsl(RedisModuleCtx *ctx, RedisModuleString *keyname, int mode, int create, fsl_t **fsl, int reply_on_failure) {
|
||||||
|
RedisModuleKey *key = RedisModule_OpenKey(ctx, keyname, mode);
|
||||||
|
|
||||||
|
int type = RedisModule_KeyType(key);
|
||||||
|
if (type != REDISMODULE_KEYTYPE_EMPTY && RedisModule_ModuleTypeGetType(key) != fsltype) {
|
||||||
|
RedisModule_CloseKey(key);
|
||||||
|
if (reply_on_failure)
|
||||||
|
RedisModule_ReplyWithError(ctx, REDISMODULE_ERRORMSG_WRONGTYPE);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Create an empty value object if the key is currently empty. */
|
||||||
|
if (type == REDISMODULE_KEYTYPE_EMPTY) {
|
||||||
|
if (!create) {
|
||||||
|
/* Key is empty but we cannot create */
|
||||||
|
RedisModule_CloseKey(key);
|
||||||
|
*fsl = NULL;
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
*fsl = fsl_type_create();
|
||||||
|
RedisModule_ModuleTypeSetValue(key, fsltype, *fsl);
|
||||||
|
} else {
|
||||||
|
*fsl = RedisModule_ModuleTypeGetValue(key);
|
||||||
|
}
|
||||||
|
|
||||||
|
RedisModule_CloseKey(key);
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* ========================== commands ======================= */
|
||||||
|
|
||||||
|
/* FSL.PUSH <key> <int> - Push an integer to the fixed-size list (to the right).
|
||||||
|
* It must be greater than the element in the head of the list. */
|
||||||
|
int fsl_push(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
|
||||||
|
if (argc != 3)
|
||||||
|
return RedisModule_WrongArity(ctx);
|
||||||
|
|
||||||
|
long long ele;
|
||||||
|
if (RedisModule_StringToLongLong(argv[2],&ele) != REDISMODULE_OK)
|
||||||
|
return RedisModule_ReplyWithError(ctx,"ERR invalid integer");
|
||||||
|
|
||||||
|
fsl_t *fsl;
|
||||||
|
if (!get_fsl(ctx, argv[1], REDISMODULE_WRITE, 1, &fsl, 1))
|
||||||
|
return REDISMODULE_OK;
|
||||||
|
|
||||||
|
if (fsl->length == LIST_SIZE)
|
||||||
|
return RedisModule_ReplyWithError(ctx,"ERR list is full");
|
||||||
|
|
||||||
|
if (fsl->length != 0 && fsl->list[fsl->length-1] >= ele)
|
||||||
|
return RedisModule_ReplyWithError(ctx,"ERR new element has to be greater than the head element");
|
||||||
|
|
||||||
|
fsl->list[fsl->length++] = ele;
|
||||||
|
|
||||||
|
if (fsl->length >= 2)
|
||||||
|
RedisModule_SignalKeyAsReady(ctx, argv[1]);
|
||||||
|
|
||||||
|
return RedisModule_ReplyWithSimpleString(ctx, "OK");
|
||||||
|
}
|
||||||
|
|
||||||
|
int bpop2_reply_callback(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
|
||||||
|
REDISMODULE_NOT_USED(argv);
|
||||||
|
REDISMODULE_NOT_USED(argc);
|
||||||
|
RedisModuleString *keyname = RedisModule_GetBlockedClientReadyKey(ctx);
|
||||||
|
|
||||||
|
fsl_t *fsl;
|
||||||
|
if (!get_fsl(ctx, keyname, REDISMODULE_READ, 0, &fsl, 0))
|
||||||
|
return REDISMODULE_ERR;
|
||||||
|
|
||||||
|
if (!fsl || fsl->length < 2)
|
||||||
|
return REDISMODULE_ERR;
|
||||||
|
|
||||||
|
RedisModule_ReplyWithArray(ctx, 2);
|
||||||
|
RedisModule_ReplyWithLongLong(ctx, fsl->list[--fsl->length]);
|
||||||
|
RedisModule_ReplyWithLongLong(ctx, fsl->list[--fsl->length]);
|
||||||
|
return REDISMODULE_OK;
|
||||||
|
}
|
||||||
|
|
||||||
|
int bpop2_timeout_callback(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
|
||||||
|
REDISMODULE_NOT_USED(argv);
|
||||||
|
REDISMODULE_NOT_USED(argc);
|
||||||
|
return RedisModule_ReplyWithSimpleString(ctx, "Request timedout");
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/* FSL.BPOP2 <key> <timeout> - Block clients until list has two or more elements.
|
||||||
|
* When that happens, unblock client and pop the last two elements (from the right). */
|
||||||
|
int fsl_bpop2(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
|
||||||
|
if (argc != 3)
|
||||||
|
return RedisModule_WrongArity(ctx);
|
||||||
|
|
||||||
|
long long timeout;
|
||||||
|
if (RedisModule_StringToLongLong(argv[2],&timeout) != REDISMODULE_OK || timeout < 0)
|
||||||
|
return RedisModule_ReplyWithError(ctx,"ERR invalid timeout");
|
||||||
|
|
||||||
|
fsl_t *fsl;
|
||||||
|
if (!get_fsl(ctx, argv[1], REDISMODULE_READ, 0, &fsl, 1))
|
||||||
|
return REDISMODULE_OK;
|
||||||
|
|
||||||
|
if (!fsl || fsl->length < 2) {
|
||||||
|
/* Key is empty or has <2 elements, we must block */
|
||||||
|
RedisModule_BlockClientOnKeys(ctx, bpop2_reply_callback, bpop2_timeout_callback,
|
||||||
|
NULL, timeout, &argv[1], 1, NULL);
|
||||||
|
} else {
|
||||||
|
RedisModule_ReplyWithArray(ctx, 2);
|
||||||
|
RedisModule_ReplyWithLongLong(ctx, fsl->list[--fsl->length]);
|
||||||
|
RedisModule_ReplyWithLongLong(ctx, fsl->list[--fsl->length]);
|
||||||
|
}
|
||||||
|
|
||||||
|
return REDISMODULE_OK;
|
||||||
|
}
|
||||||
|
|
||||||
|
int bpopgt_reply_callback(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
|
||||||
|
REDISMODULE_NOT_USED(argv);
|
||||||
|
REDISMODULE_NOT_USED(argc);
|
||||||
|
RedisModuleString *keyname = RedisModule_GetBlockedClientReadyKey(ctx);
|
||||||
|
long long gt = (long long)RedisModule_GetBlockedClientPrivateData(ctx);
|
||||||
|
|
||||||
|
fsl_t *fsl;
|
||||||
|
if (!get_fsl(ctx, keyname, REDISMODULE_READ, 0, &fsl, 0))
|
||||||
|
return REDISMODULE_ERR;
|
||||||
|
|
||||||
|
if (!fsl || fsl->list[fsl->length-1] <= gt)
|
||||||
|
return REDISMODULE_ERR;
|
||||||
|
|
||||||
|
RedisModule_ReplyWithLongLong(ctx, fsl->list[--fsl->length]);
|
||||||
|
return REDISMODULE_OK;
|
||||||
|
}
|
||||||
|
|
||||||
|
int bpopgt_timeout_callback(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
|
||||||
|
REDISMODULE_NOT_USED(argv);
|
||||||
|
REDISMODULE_NOT_USED(argc);
|
||||||
|
return RedisModule_ReplyWithSimpleString(ctx, "Request timedout");
|
||||||
|
}
|
||||||
|
|
||||||
|
void bpopgt_free_privdata(RedisModuleCtx *ctx, void *privdata) {
|
||||||
|
/* Nothing to do because privdata is actually a 'long long',
|
||||||
|
* not a pointer to the heap */
|
||||||
|
REDISMODULE_NOT_USED(ctx);
|
||||||
|
REDISMODULE_NOT_USED(privdata);
|
||||||
|
}
|
||||||
|
|
||||||
|
/* FSL.BPOPGT <key> <gt> <timeout> - Block clients until list has an element greater than <gt>.
|
||||||
|
* When that happens, unblock client and pop the last element (from the right). */
|
||||||
|
int fsl_bpopgt(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
|
||||||
|
if (argc != 4)
|
||||||
|
return RedisModule_WrongArity(ctx);
|
||||||
|
|
||||||
|
long long gt;
|
||||||
|
if (RedisModule_StringToLongLong(argv[2],>) != REDISMODULE_OK)
|
||||||
|
return RedisModule_ReplyWithError(ctx,"ERR invalid integer");
|
||||||
|
|
||||||
|
long long timeout;
|
||||||
|
if (RedisModule_StringToLongLong(argv[3],&timeout) != REDISMODULE_OK || timeout < 0)
|
||||||
|
return RedisModule_ReplyWithError(ctx,"ERR invalid timeout");
|
||||||
|
|
||||||
|
fsl_t *fsl;
|
||||||
|
if (!get_fsl(ctx, argv[1], REDISMODULE_READ, 0, &fsl, 1))
|
||||||
|
return REDISMODULE_OK;
|
||||||
|
|
||||||
|
if (!fsl || fsl->list[fsl->length-1] <= gt) {
|
||||||
|
/* Key is empty or has <2 elements, we must block */
|
||||||
|
RedisModule_BlockClientOnKeys(ctx, bpopgt_reply_callback, bpopgt_timeout_callback,
|
||||||
|
bpopgt_free_privdata, timeout, &argv[1], 1, (void*)gt);
|
||||||
|
} else {
|
||||||
|
RedisModule_ReplyWithLongLong(ctx, fsl->list[--fsl->length]);
|
||||||
|
}
|
||||||
|
|
||||||
|
return REDISMODULE_OK;
|
||||||
|
}
|
||||||
|
|
||||||
|
int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
|
||||||
|
REDISMODULE_NOT_USED(argv);
|
||||||
|
REDISMODULE_NOT_USED(argc);
|
||||||
|
|
||||||
|
if (RedisModule_Init(ctx, "blockonkeys", 1, REDISMODULE_APIVER_1)== REDISMODULE_ERR)
|
||||||
|
return REDISMODULE_ERR;
|
||||||
|
|
||||||
|
RedisModuleTypeMethods tm = {
|
||||||
|
.version = REDISMODULE_TYPE_METHOD_VERSION,
|
||||||
|
.rdb_load = fsl_rdb_load,
|
||||||
|
.rdb_save = fsl_rdb_save,
|
||||||
|
.aof_rewrite = fsl_aofrw,
|
||||||
|
.mem_usage = NULL,
|
||||||
|
.free = fsl_free,
|
||||||
|
.digest = NULL
|
||||||
|
};
|
||||||
|
|
||||||
|
fsltype = RedisModule_CreateDataType(ctx, "fsltype_t", 0, &tm);
|
||||||
|
if (fsltype == NULL)
|
||||||
|
return REDISMODULE_ERR;
|
||||||
|
|
||||||
|
if (RedisModule_CreateCommand(ctx,"fsl.push",fsl_push,"",0,0,0) == REDISMODULE_ERR)
|
||||||
|
return REDISMODULE_ERR;
|
||||||
|
|
||||||
|
if (RedisModule_CreateCommand(ctx,"fsl.bpop2",fsl_bpop2,"",0,0,0) == REDISMODULE_ERR)
|
||||||
|
return REDISMODULE_ERR;
|
||||||
|
|
||||||
|
if (RedisModule_CreateCommand(ctx,"fsl.bpopgt",fsl_bpopgt,"",0,0,0) == REDISMODULE_ERR)
|
||||||
|
return REDISMODULE_ERR;
|
||||||
|
|
||||||
|
return REDISMODULE_OK;
|
||||||
|
}
|
85
tests/unit/moduleapi/blockonkeys.tcl
Normal file
85
tests/unit/moduleapi/blockonkeys.tcl
Normal file
@ -0,0 +1,85 @@
|
|||||||
|
set testmodule [file normalize tests/modules/blockonkeys.so]
|
||||||
|
|
||||||
|
start_server {tags {"modules"}} {
|
||||||
|
r module load $testmodule
|
||||||
|
|
||||||
|
test {Module client blocked on keys (no metadata): No block} {
|
||||||
|
r del k
|
||||||
|
r fsl.push k 33
|
||||||
|
r fsl.push k 34
|
||||||
|
r fsl.bpop2 k 0
|
||||||
|
} {34 33}
|
||||||
|
|
||||||
|
test {Module client blocked on keys (no metadata): Timeout} {
|
||||||
|
r del k
|
||||||
|
set rd [redis_deferring_client]
|
||||||
|
r fsl.push k 33
|
||||||
|
$rd fsl.bpop2 k 1
|
||||||
|
assert_equal {Request timedout} [$rd read]
|
||||||
|
}
|
||||||
|
|
||||||
|
test {Module client blocked on keys (no metadata): Blocked, case 1} {
|
||||||
|
r del k
|
||||||
|
set rd [redis_deferring_client]
|
||||||
|
r fsl.push k 33
|
||||||
|
$rd fsl.bpop2 k 0
|
||||||
|
r fsl.push k 34
|
||||||
|
assert_equal {34 33} [$rd read]
|
||||||
|
}
|
||||||
|
|
||||||
|
test {Module client blocked on keys (no metadata): Blocked, case 2} {
|
||||||
|
r del k
|
||||||
|
set rd [redis_deferring_client]
|
||||||
|
r fsl.push k 33
|
||||||
|
r fsl.push k 34
|
||||||
|
$rd fsl.bpop2 k 0
|
||||||
|
assert_equal {34 33} [$rd read]
|
||||||
|
}
|
||||||
|
|
||||||
|
test {Module client blocked on keys (with metadata): No block} {
|
||||||
|
r del k
|
||||||
|
r fsl.push k 34
|
||||||
|
r fsl.bpopgt k 30 0
|
||||||
|
} {34}
|
||||||
|
|
||||||
|
test {Module client blocked on keys (with metadata): Timeout} {
|
||||||
|
r del k
|
||||||
|
set rd [redis_deferring_client]
|
||||||
|
r fsl.push k 33
|
||||||
|
$rd fsl.bpopgt k 35 1
|
||||||
|
assert_equal {Request timedout} [$rd read]
|
||||||
|
}
|
||||||
|
|
||||||
|
test {Module client blocked on keys (with metadata): Blocked, case 1} {
|
||||||
|
r del k
|
||||||
|
set rd [redis_deferring_client]
|
||||||
|
r fsl.push k 33
|
||||||
|
$rd fsl.bpopgt k 33 0
|
||||||
|
r fsl.push k 34
|
||||||
|
assert_equal {34} [$rd read]
|
||||||
|
}
|
||||||
|
|
||||||
|
test {Module client blocked on keys (with metadata): Blocked, case 2} {
|
||||||
|
r del k
|
||||||
|
set rd [redis_deferring_client]
|
||||||
|
$rd fsl.bpopgt k 35 0
|
||||||
|
r fsl.push k 33
|
||||||
|
r fsl.push k 34
|
||||||
|
r fsl.push k 35
|
||||||
|
r fsl.push k 36
|
||||||
|
assert_equal {36} [$rd read]
|
||||||
|
}
|
||||||
|
|
||||||
|
test {Module client blocked on keys does not wake up on wrong type} {
|
||||||
|
r del k
|
||||||
|
set rd [redis_deferring_client]
|
||||||
|
$rd fsl.bpop2 k 0
|
||||||
|
r lpush k 12
|
||||||
|
r lpush k 13
|
||||||
|
r lpush k 14
|
||||||
|
r del k
|
||||||
|
r fsl.push k 33
|
||||||
|
r fsl.push k 34
|
||||||
|
assert_equal {34 33} [$rd read]
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user