From 6191ea90a1ec8b3ebeab65e8b783b2cb6714c0d4 Mon Sep 17 00:00:00 2001 From: "zhaozhao.zz" Date: Wed, 17 Jul 2019 20:33:52 +0800 Subject: [PATCH] Client side caching: implement trackingInvalidateKeysOnFlush() --- src/db.c | 1 + src/server.h | 1 + src/tracking.c | 81 +++++++++++++++++++++++++++++++------------------- 3 files changed, 52 insertions(+), 31 deletions(-) diff --git a/src/db.c b/src/db.c index 51f5a12b4..568e0b8de 100644 --- a/src/db.c +++ b/src/db.c @@ -417,6 +417,7 @@ void signalModifiedKey(redisDb *db, robj *key) { void signalFlushedDb(int dbid) { touchWatchedKeysOnFlush(dbid); + if (server.tracking_clients) trackingInvalidateKeysOnFlush(dbid); } /*----------------------------------------------------------------------------- diff --git a/src/server.h b/src/server.h index f81b1010e..b200a6696 100644 --- a/src/server.h +++ b/src/server.h @@ -1638,6 +1638,7 @@ void enableTracking(client *c, uint64_t redirect_to); void disableTracking(client *c); void trackingRememberKeys(client *c); void trackingInvalidateKey(robj *keyobj); +void trackingInvalidateKeysOnFlush(int dbid); /* List data type */ void listTypeTryConversion(robj *subject, robj *value); diff --git a/src/tracking.c b/src/tracking.c index bbfc66a72..f3ff7ed03 100644 --- a/src/tracking.c +++ b/src/tracking.c @@ -117,6 +117,40 @@ void trackingRememberKeys(client *c) { getKeysFreeResult(keys); } +void sendTrackingMessage(client *c, long long hash) { + int using_redirection = 0; + if (c->client_tracking_redirection) { + client *redir = lookupClientByID(c->client_tracking_redirection); + if (!redir) { + /* We need to signal to the original connection that we + * are unable to send invalidation messages to the redirected + * connection, because the client no longer exist. */ + if (c->resp > 2) { + addReplyPushLen(c,3); + addReplyBulkCBuffer(c,"tracking-redir-broken",21); + addReplyLongLong(c,c->client_tracking_redirection); + } + return; + } + c = redir; + using_redirection = 1; + } + + /* Only send such info for clients in RESP version 3 or more. However + * if redirection is active, and the connection we redirect to is + * in Pub/Sub mode, we can support the feature with RESP 2 as well, + * by sending Pub/Sub messages in the __redis__:invalidate channel. */ + if (c->resp > 2) { + addReplyPushLen(c,2); + addReplyBulkCBuffer(c,"invalidate",10); + addReplyLongLong(c,hash); + } else if (using_redirection && c->flags & CLIENT_PUBSUB) { + robj *msg = createStringObjectFromLongLong(hash); + addReplyPubsubMessage(c,TrackingChannelName,msg); + decrRefCount(msg); + } +} + /* This function is called from signalModifiedKey() or other places in Redis * when a key changes value. In the context of keys tracking, our task here is * to send a notification to every client that may have keys about such . */ @@ -134,37 +168,7 @@ void trackingInvalidateKey(robj *keyobj) { memcpy(&id,ri.key,ri.key_len); client *c = lookupClientByID(id); if (c == NULL) continue; - int using_redirection = 0; - if (c->client_tracking_redirection) { - client *redir = lookupClientByID(c->client_tracking_redirection); - if (!redir) { - /* We need to signal to the original connection that we - * are unable to send invalidation messages to the redirected - * connection, because the client no longer exist. */ - if (c->resp > 2) { - addReplyPushLen(c,3); - addReplyBulkCBuffer(c,"tracking-redir-broken",21); - addReplyLongLong(c,c->client_tracking_redirection); - } - continue; - } - c = redir; - using_redirection = 1; - } - - /* Only send such info for clients in RESP version 3 or more. However - * if redirection is active, and the connection we redirect to is - * in Pub/Sub mode, we can support the feature with RESP 2 as well, - * by sending Pub/Sub messages in the __redis__:invalidate channel. */ - if (c->resp > 2) { - addReplyPushLen(c,2); - addReplyBulkCBuffer(c,"invalidate",10); - addReplyLongLong(c,hash); - } else if (using_redirection && c->flags & CLIENT_PUBSUB) { - robj *msg = createStringObjectFromLongLong(hash); - addReplyPubsubMessage(c,TrackingChannelName,msg); - decrRefCount(msg); - } + sendTrackingMessage(c,hash); } raxStop(&ri); @@ -173,3 +177,18 @@ void trackingInvalidateKey(robj *keyobj) { raxFree(TrackingTable[hash]); TrackingTable[hash] = NULL; } + +void trackingInvalidateKeysOnFlush(int dbid) { + UNUSED(dbid); + if (server.tracking_clients == 0) return; + + listNode *ln; + listIter li; + listRewind(server.clients,&li); + while ((ln = listNext(&li)) != NULL) { + client *c = listNodeValue(ln); + if (c->flags & CLIENT_TRACKING) { + sendTrackingMessage(c,-1); + } + } +}