diff --git a/src/server.h b/src/server.h index cd6652257..cb70b93ad 100644 --- a/src/server.h +++ b/src/server.h @@ -1946,6 +1946,7 @@ int pubsubUnsubscribeAllPatterns(client *c, int notify); void freePubsubPattern(void *p); int listMatchPubsubPattern(void *a, void *b); int pubsubPublishMessage(robj *channel, robj *message); +void addReplyPubsubMessage(client *c, robj *channel, robj *msg); /* Keyspace events notification */ void notifyKeyspaceEvent(int type, char *event, robj *key, int dbid); diff --git a/src/tracking.c b/src/tracking.c index aade137c4..9d9585c95 100644 --- a/src/tracking.c +++ b/src/tracking.c @@ -60,6 +60,7 @@ * use the most significant bits instead of the full 24 bits. */ #define TRACKING_TABLE_SIZE (1<<24) rax **TrackingTable = NULL; +robj *TrackingChannelName; /* Remove the tracking state from the client 'c'. Note that there is not much * to do for us here, if not to decrement the counter of the clients in @@ -87,8 +88,10 @@ void enableTracking(client *c, uint64_t redirect_to) { c->flags &= ~CLIENT_TRACKING_BROKEN_REDIR; c->client_tracking_redirection = redirect_to; server.tracking_clients++; - if (TrackingTable == NULL) + if (TrackingTable == NULL) { TrackingTable = zcalloc(sizeof(rax*) * TRACKING_TABLE_SIZE); + TrackingChannelName = createStringObject("__redis__:invalidate",20); + } } /* This function is called after the excution of a readonly command in the @@ -130,6 +133,7 @@ void trackingInvalidateKey(robj *keyobj) { uint64_t id; memcpy(&id,ri.key,ri.key_len); client *c = lookupClientByID(id); + int using_redirection = 0; if (c->client_tracking_redirection) { client *redir = lookupClientByID(c->client_tracking_redirection); if (!redir) { @@ -144,13 +148,21 @@ void trackingInvalidateKey(robj *keyobj) { continue; } c = redir; + using_redirection = 1; } - /* Only send such info for clients in RESP version 3 or more. */ + /* Only send such info for clients in RESP version 3 or more. However + * if redirection is active, and the connection we redirect to is + * in Pub/Sub mode, we can support the feature with RESP 2 as well, + * by sending Pub/Sub messages in the __redis__:invalidate channel. */ if (c->resp > 2) { addReplyPushLen(c,2); addReplyBulkCBuffer(c,"invalidate",10); addReplyLongLong(c,hash); + } else if (using_redirection && c->flags & CLIENT_PUBSUB) { + robj *msg = createStringObjectFromLongLong(hash); + addReplyPubsubMessage(c,TrackingChannelName,msg); + decrRefCount(msg); } } raxStop(&ri);