Client side caching: implement trackingInvalidateKeysOnFlush()

This commit is contained in:
zhaozhao.zz 2019-07-17 20:33:52 +08:00
parent 3f1c84751a
commit 6191ea90a1
3 changed files with 52 additions and 31 deletions

View File

@ -417,6 +417,7 @@ void signalModifiedKey(redisDb *db, robj *key) {
void signalFlushedDb(int dbid) { void signalFlushedDb(int dbid) {
touchWatchedKeysOnFlush(dbid); touchWatchedKeysOnFlush(dbid);
if (server.tracking_clients) trackingInvalidateKeysOnFlush(dbid);
} }
/*----------------------------------------------------------------------------- /*-----------------------------------------------------------------------------

View File

@ -1638,6 +1638,7 @@ void enableTracking(client *c, uint64_t redirect_to);
void disableTracking(client *c); void disableTracking(client *c);
void trackingRememberKeys(client *c); void trackingRememberKeys(client *c);
void trackingInvalidateKey(robj *keyobj); void trackingInvalidateKey(robj *keyobj);
void trackingInvalidateKeysOnFlush(int dbid);
/* List data type */ /* List data type */
void listTypeTryConversion(robj *subject, robj *value); void listTypeTryConversion(robj *subject, robj *value);

View File

@ -117,6 +117,40 @@ void trackingRememberKeys(client *c) {
getKeysFreeResult(keys); getKeysFreeResult(keys);
} }
void sendTrackingMessage(client *c, long long hash) {
int using_redirection = 0;
if (c->client_tracking_redirection) {
client *redir = lookupClientByID(c->client_tracking_redirection);
if (!redir) {
/* We need to signal to the original connection that we
* are unable to send invalidation messages to the redirected
* connection, because the client no longer exist. */
if (c->resp > 2) {
addReplyPushLen(c,3);
addReplyBulkCBuffer(c,"tracking-redir-broken",21);
addReplyLongLong(c,c->client_tracking_redirection);
}
return;
}
c = redir;
using_redirection = 1;
}
/* Only send such info for clients in RESP version 3 or more. However
* if redirection is active, and the connection we redirect to is
* in Pub/Sub mode, we can support the feature with RESP 2 as well,
* by sending Pub/Sub messages in the __redis__:invalidate channel. */
if (c->resp > 2) {
addReplyPushLen(c,2);
addReplyBulkCBuffer(c,"invalidate",10);
addReplyLongLong(c,hash);
} else if (using_redirection && c->flags & CLIENT_PUBSUB) {
robj *msg = createStringObjectFromLongLong(hash);
addReplyPubsubMessage(c,TrackingChannelName,msg);
decrRefCount(msg);
}
}
/* This function is called from signalModifiedKey() or other places in Redis /* This function is called from signalModifiedKey() or other places in Redis
* when a key changes value. In the context of keys tracking, our task here is * when a key changes value. In the context of keys tracking, our task here is
* to send a notification to every client that may have keys about such . */ * to send a notification to every client that may have keys about such . */
@ -134,37 +168,7 @@ void trackingInvalidateKey(robj *keyobj) {
memcpy(&id,ri.key,ri.key_len); memcpy(&id,ri.key,ri.key_len);
client *c = lookupClientByID(id); client *c = lookupClientByID(id);
if (c == NULL) continue; if (c == NULL) continue;
int using_redirection = 0; sendTrackingMessage(c,hash);
if (c->client_tracking_redirection) {
client *redir = lookupClientByID(c->client_tracking_redirection);
if (!redir) {
/* We need to signal to the original connection that we
* are unable to send invalidation messages to the redirected
* connection, because the client no longer exist. */
if (c->resp > 2) {
addReplyPushLen(c,3);
addReplyBulkCBuffer(c,"tracking-redir-broken",21);
addReplyLongLong(c,c->client_tracking_redirection);
}
continue;
}
c = redir;
using_redirection = 1;
}
/* Only send such info for clients in RESP version 3 or more. However
* if redirection is active, and the connection we redirect to is
* in Pub/Sub mode, we can support the feature with RESP 2 as well,
* by sending Pub/Sub messages in the __redis__:invalidate channel. */
if (c->resp > 2) {
addReplyPushLen(c,2);
addReplyBulkCBuffer(c,"invalidate",10);
addReplyLongLong(c,hash);
} else if (using_redirection && c->flags & CLIENT_PUBSUB) {
robj *msg = createStringObjectFromLongLong(hash);
addReplyPubsubMessage(c,TrackingChannelName,msg);
decrRefCount(msg);
}
} }
raxStop(&ri); raxStop(&ri);
@ -173,3 +177,18 @@ void trackingInvalidateKey(robj *keyobj) {
raxFree(TrackingTable[hash]); raxFree(TrackingTable[hash]);
TrackingTable[hash] = NULL; TrackingTable[hash] = NULL;
} }
void trackingInvalidateKeysOnFlush(int dbid) {
UNUSED(dbid);
if (server.tracking_clients == 0) return;
listNode *ln;
listIter li;
listRewind(server.clients,&li);
while ((ln = listNext(&li)) != NULL) {
client *c = listNodeValue(ln);
if (c->flags & CLIENT_TRACKING) {
sendTrackingMessage(c,-1);
}
}
}