From 66f55bc5c15d72542983f37c6c1b48b0c1618917 Mon Sep 17 00:00:00 2001 From: antirez Date: Thu, 31 Oct 2019 12:23:55 +0100 Subject: [PATCH] Modules: block on keys: fix bugs in processing order. --- src/blocked.c | 16 ++++++++++++++++ src/module.c | 7 +++++++ 2 files changed, 23 insertions(+) diff --git a/src/blocked.c b/src/blocked.c index fb58f850b..3110c00fc 100644 --- a/src/blocked.c +++ b/src/blocked.c @@ -450,6 +450,22 @@ void serveClientsBlockedOnKeyByModule(readyList *rl) { listNode *clientnode = listFirst(clients); client *receiver = clientnode->value; + /* Put at the tail, so that at the next call + * we'll not run into it again: clients here may not be + * ready to be served, so they'll remain in the list + * sometimes. We want also be able to skip clients that are + * not blocked for the MODULE type safely. */ + listDelNode(clients,clientnode); + listAddNodeTail(clients,receiver); + + if (receiver->btype != BLOCKED_MODULE) continue; + + /* Note that if *this* client cannot be served by this key, + * it does not mean that another client that is next into the + * list cannot be served as well: they may be blocked by + * different modules with different triggers to consider if a key + * is ready or not. This means we can't exit the loop but need + * to continue after the first failure. */ if (!moduleTryServeClientBlockedOnKey(receiver, rl->key)) continue; moduleUnblockClient(receiver); diff --git a/src/module.c b/src/module.c index 353b6f426..248a55e62 100644 --- a/src/module.c +++ b/src/module.c @@ -249,6 +249,7 @@ typedef struct RedisModuleBlockedClient { in thread safe contexts. */ int dbid; /* Database number selected by the original client. */ int blocked_on_keys; /* If blocked via RM_BlockClientOnKeys(). */ + int unblocked; /* Already on the moduleUnblocked list. */ } RedisModuleBlockedClient; static pthread_mutex_t moduleUnblockedClientsMutex = PTHREAD_MUTEX_INITIALIZER; @@ -4037,6 +4038,7 @@ RedisModuleBlockedClient *moduleBlockClient(RedisModuleCtx *ctx, RedisModuleCmdF bc->reply_client->flags |= CLIENT_MODULE; bc->dbid = c->db->id; bc->blocked_on_keys = keys != NULL; + bc->unblocked = 0; c->bpop.timeout = timeout; if (islua || ismulti) { @@ -4063,6 +4065,10 @@ RedisModuleBlockedClient *moduleBlockClient(RedisModuleCtx *ctx, RedisModuleCmdF int moduleTryServeClientBlockedOnKey(client *c, robj *key) { int served = 0; RedisModuleBlockedClient *bc = c->bpop.module_blocked_handle; + /* Protect against re-processing: don't serve clients that are already + * in the unblocking list for any reason (including RM_UnblockClient() + * explicit call). */ + if (bc->unblocked) return REDISMODULE_ERR; RedisModuleCtx ctx = REDISMODULE_CTX_INIT; ctx.flags |= REDISMODULE_CTX_BLOCKED_REPLY; ctx.blocked_ready_key = key; @@ -4162,6 +4168,7 @@ void RM_SignalKeyAsReady(RedisModuleCtx *ctx, RedisModuleString *key) { int moduleUnblockClientByHandle(RedisModuleBlockedClient *bc, void *privdata) { pthread_mutex_lock(&moduleUnblockedClientsMutex); if (!bc->blocked_on_keys) bc->privdata = privdata; + bc->unblocked = 1; listAddNodeTail(moduleUnblockedClients,bc); if (write(server.module_blocked_pipe[1],"A",1) != 1) { /* Ignore the error, this is best-effort. */