diff --git a/src/blocked.c b/src/blocked.c index 92f1cee65..c3884fbdf 100644 --- a/src/blocked.c +++ b/src/blocked.c @@ -634,6 +634,16 @@ void unblockClientWaitingData(client *c) { } } +static int getBlockedTypeByType(int type) { + switch (type) { + case OBJ_LIST: return BLOCKED_LIST; + case OBJ_ZSET: return BLOCKED_ZSET; + case OBJ_MODULE: return BLOCKED_MODULE; + case OBJ_STREAM: return BLOCKED_STREAM; + default: return BLOCKED_NONE; + } +} + /* If the specified key has clients blocked waiting for list pushes, this * function will put the key reference into the server.ready_keys list. * Note that db->ready_keys is a hash table that allows us to avoid putting @@ -641,9 +651,14 @@ void unblockClientWaitingData(client *c) { * made by a script or in the context of MULTI/EXEC. * * The list will be finally processed by handleClientsBlockedOnKeys() */ -void signalKeyAsReady(redisDb *db, robj *key) { +void signalKeyAsReady(redisDb *db, robj *key, int type) { readyList *rl; + /* If no clients are blocked on this type, just return */ + int btype = getBlockedTypeByType(type); + if (btype == BLOCKED_NONE || !server.blocked_clients_by_type[btype]) + return; + /* No clients blocking for this key? No need to queue it. */ if (dictFind(db->blocking_keys,key) == NULL) return; @@ -664,4 +679,3 @@ void signalKeyAsReady(redisDb *db, robj *key) { serverAssert(dictAdd(db->ready_keys,key,NULL) == DICT_OK); } - diff --git a/src/db.c b/src/db.c index 529b57c7b..70de568fb 100644 --- a/src/db.c +++ b/src/db.c @@ -181,10 +181,7 @@ void dbAdd(redisDb *db, robj *key, robj *val) { int retval = dictAdd(db->dict, copy, val); serverAssertWithInfo(NULL,key,retval == DICT_OK); - if (val->type == OBJ_LIST || - val->type == OBJ_ZSET || - val->type == OBJ_STREAM) - signalKeyAsReady(db, key); + signalKeyAsReady(db, key, val->type); if (server.cluster_enabled) slotToKeyAdd(key->ptr); } @@ -1089,10 +1086,7 @@ void scanDatabaseForReadyLists(redisDb *db) { while((de = dictNext(di)) != NULL) { robj *key = dictGetKey(de); robj *value = lookupKey(db,key,LOOKUP_NOTOUCH); - if (value && (value->type == OBJ_LIST || - value->type == OBJ_STREAM || - value->type == OBJ_ZSET)) - signalKeyAsReady(db, key); + if (value) signalKeyAsReady(db, key, value->type); } dictReleaseIterator(di); } diff --git a/src/module.c b/src/module.c index 8e9526dad..0714458f7 100644 --- a/src/module.c +++ b/src/module.c @@ -4579,7 +4579,7 @@ RedisModuleBlockedClient *RM_BlockClientOnKeys(RedisModuleCtx *ctx, RedisModuleC * all the clients blocked for this key will get their reply callback called, * and if the callback returns REDISMODULE_OK the client will be unblocked. */ void RM_SignalKeyAsReady(RedisModuleCtx *ctx, RedisModuleString *key) { - signalKeyAsReady(ctx->client->db, key); + signalKeyAsReady(ctx->client->db, key, OBJ_MODULE); } /* Implements RM_UnblockClient() and moduleUnblockClient(). */ diff --git a/src/server.h b/src/server.h index 19aab8750..dbfa5aead 100644 --- a/src/server.h +++ b/src/server.h @@ -2189,7 +2189,7 @@ void replyToBlockedClientTimedOut(client *c); int getTimeoutFromObjectOrReply(client *c, robj *object, mstime_t *timeout, int unit); void disconnectAllBlockedClients(void); void handleClientsBlockedOnKeys(void); -void signalKeyAsReady(redisDb *db, robj *key); +void signalKeyAsReady(redisDb *db, robj *key, int type); void blockForKeys(client *c, int btype, robj **keys, int numkeys, mstime_t timeout, robj *target, streamID *ids); /* timeout.c -- Blocked clients timeout and connections timeout. */ diff --git a/src/t_stream.c b/src/t_stream.c index 38bdce8e0..fc5c4c99e 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -1299,8 +1299,7 @@ void xaddCommand(client *c) { /* We need to signal to blocked clients that there is new data on this * stream. */ - if (server.blocked_clients_by_type[BLOCKED_STREAM]) - signalKeyAsReady(c->db, c->argv[1]); + signalKeyAsReady(c->db, c->argv[1], OBJ_STREAM); } /* XRANGE/XREVRANGE actual implementation. */ @@ -1876,7 +1875,7 @@ NULL notifyKeyspaceEvent(NOTIFY_STREAM,"xgroup-destroy", c->argv[2],c->db->id); /* We want to unblock any XREADGROUP consumers with -NOGROUP. */ - signalKeyAsReady(c->db,c->argv[2]); + signalKeyAsReady(c->db,c->argv[2],OBJ_STREAM); } else { addReply(c,shared.czero); }