Bugfix: Make modules blocked on keys unblock on commands like LPUSH (#8356)

This was a regression from #7625 (only in 6.2 RC2).

This makes it possible again to implement blocking list and zset
commands using the modules API.

This commit also includes a test case for the reverse: A module
unblocks a client blocked on BLPOP by inserting elements using
RedisModule_ListPush(). This already works, but it was untested.
This commit is contained in:
Viktor Söderqvist 2021-01-19 12:15:33 +01:00 committed by GitHub
parent 5198b513d3
commit 4985c11bd6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 120 additions and 2 deletions

View File

@ -684,10 +684,20 @@ static int getBlockedTypeByType(int type) {
void signalKeyAsReady(redisDb *db, robj *key, int type) { void signalKeyAsReady(redisDb *db, robj *key, int type) {
readyList *rl; readyList *rl;
/* If no clients are blocked on this type, just return */ /* Quick returns. */
int btype = getBlockedTypeByType(type); int btype = getBlockedTypeByType(type);
if (btype == BLOCKED_NONE || !server.blocked_clients_by_type[btype]) if (btype == BLOCKED_NONE) {
/* The type can never block. */
return; return;
}
if (!server.blocked_clients_by_type[btype] &&
!server.blocked_clients_by_type[BLOCKED_MODULE]) {
/* No clients block on this type. Note: Blocked modules are represented
* by BLOCKED_MODULE, even if the intention is to wake up by normal
* types (list, zset, stream), so we need to check that there are no
* blocked modules before we do a quick return here. */
return;
}
/* No clients blocking for this key? No need to queue it. */ /* No clients blocking for this key? No need to queue it. */
if (dictFind(db->blocking_keys,key) == NULL) return; if (dictFind(db->blocking_keys,key) == NULL) return;

View File

@ -298,6 +298,76 @@ int fsl_getall(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
return REDISMODULE_OK; return REDISMODULE_OK;
} }
/* Callback for blockonkeys_popall */
int blockonkeys_popall_reply_callback(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
REDISMODULE_NOT_USED(argc);
RedisModuleKey *key = RedisModule_OpenKey(ctx, argv[1], REDISMODULE_WRITE);
if (RedisModule_KeyType(key) == REDISMODULE_KEYTYPE_LIST) {
RedisModuleString *elem;
long len = 0;
RedisModule_ReplyWithArray(ctx, REDISMODULE_POSTPONED_ARRAY_LEN);
while ((elem = RedisModule_ListPop(key, REDISMODULE_LIST_HEAD)) != NULL) {
len++;
RedisModule_ReplyWithString(ctx, elem);
RedisModule_FreeString(ctx, elem);
}
RedisModule_ReplySetArrayLength(ctx, len);
} else {
RedisModule_ReplyWithError(ctx, "ERR Not a list");
}
RedisModule_CloseKey(key);
return REDISMODULE_OK;
}
int blockonkeys_popall_timeout_callback(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
REDISMODULE_NOT_USED(argv);
REDISMODULE_NOT_USED(argc);
return RedisModule_ReplyWithError(ctx, "ERR Timeout");
}
/* BLOCKONKEYS.POPALL key
*
* Blocks on an empty key for up to 3 seconds. When unblocked by a list
* operation like LPUSH, all the elements are popped and returned. Fails with an
* error on timeout. */
int blockonkeys_popall(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
if (argc != 2)
return RedisModule_WrongArity(ctx);
RedisModuleKey *key = RedisModule_OpenKey(ctx, argv[1], REDISMODULE_READ);
if (RedisModule_KeyType(key) == REDISMODULE_KEYTYPE_EMPTY) {
RedisModule_BlockClientOnKeys(ctx, blockonkeys_popall_reply_callback,
blockonkeys_popall_timeout_callback,
NULL, 3000, &argv[1], 1, NULL);
} else {
RedisModule_ReplyWithError(ctx, "ERR Key not empty");
}
RedisModule_CloseKey(key);
return REDISMODULE_OK;
}
/* A module equivalent of LPUSH */
int blockonkeys_lpush(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
if (argc < 3)
return RedisModule_WrongArity(ctx);
RedisModuleKey *key = RedisModule_OpenKey(ctx, argv[1], REDISMODULE_WRITE);
if (RedisModule_KeyType(key) != REDISMODULE_KEYTYPE_EMPTY &&
RedisModule_KeyType(key) != REDISMODULE_KEYTYPE_LIST) {
RedisModule_ReplyWithError(ctx, REDISMODULE_ERRORMSG_WRONGTYPE);
} else {
for (int i = 2; i < argc; i++) {
if (RedisModule_ListPush(key, REDISMODULE_LIST_HEAD,
argv[i]) != REDISMODULE_OK) {
RedisModule_CloseKey(key);
return RedisModule_ReplyWithError(ctx, "ERR Push failed");
}
}
}
RedisModule_CloseKey(key);
return RedisModule_ReplyWithSimpleString(ctx, "OK");
}
int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
REDISMODULE_NOT_USED(argv); REDISMODULE_NOT_USED(argv);
REDISMODULE_NOT_USED(argc); REDISMODULE_NOT_USED(argc);
@ -334,5 +404,13 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
if (RedisModule_CreateCommand(ctx,"fsl.getall",fsl_getall,"",0,0,0) == REDISMODULE_ERR) if (RedisModule_CreateCommand(ctx,"fsl.getall",fsl_getall,"",0,0,0) == REDISMODULE_ERR)
return REDISMODULE_ERR; return REDISMODULE_ERR;
if (RedisModule_CreateCommand(ctx, "blockonkeys.popall", blockonkeys_popall,
"", 1, 1, 1) == REDISMODULE_ERR)
return REDISMODULE_ERR;
if (RedisModule_CreateCommand(ctx, "blockonkeys.lpush", blockonkeys_lpush,
"", 1, 1, 1) == REDISMODULE_ERR)
return REDISMODULE_ERR;
return REDISMODULE_OK; return REDISMODULE_OK;
} }

View File

@ -185,4 +185,34 @@ start_server {tags {"modules"}} {
r fsl.push k 34 r fsl.push k 34
assert_equal {34} [$rd read] assert_equal {34} [$rd read]
} }
test {Module client blocked on keys woken up by LPUSH} {
r del k
set rd [redis_deferring_client]
$rd blockonkeys.popall k
# wait until client is actually blocked
wait_for_condition 50 100 {
[s 0 blocked_clients] eq {1}
} else {
fail "Client is not blocked"
}
r lpush k 42 squirrel banana
assert_equal {banana squirrel 42} [$rd read]
$rd close
}
test {Module client unblocks BLPOP} {
r del k
set rd [redis_deferring_client]
$rd blpop k 3
# wait until client is actually blocked
wait_for_condition 50 100 {
[s 0 blocked_clients] eq {1}
} else {
fail "Client is not blocked"
}
r blockonkeys.lpush k 42
assert_equal {k 42} [$rd read]
$rd close
}
} }