Client side caching: RESP2 support.

This commit is contained in:
antirez 2019-07-05 12:24:28 +02:00
parent f099def733
commit 6b29f2d83d
2 changed files with 15 additions and 2 deletions

View File

@ -1946,6 +1946,7 @@ int pubsubUnsubscribeAllPatterns(client *c, int notify);
void freePubsubPattern(void *p); void freePubsubPattern(void *p);
int listMatchPubsubPattern(void *a, void *b); int listMatchPubsubPattern(void *a, void *b);
int pubsubPublishMessage(robj *channel, robj *message); int pubsubPublishMessage(robj *channel, robj *message);
void addReplyPubsubMessage(client *c, robj *channel, robj *msg);
/* Keyspace events notification */ /* Keyspace events notification */
void notifyKeyspaceEvent(int type, char *event, robj *key, int dbid); void notifyKeyspaceEvent(int type, char *event, robj *key, int dbid);

View File

@ -60,6 +60,7 @@
* use the most significant bits instead of the full 24 bits. */ * use the most significant bits instead of the full 24 bits. */
#define TRACKING_TABLE_SIZE (1<<24) #define TRACKING_TABLE_SIZE (1<<24)
rax **TrackingTable = NULL; rax **TrackingTable = NULL;
robj *TrackingChannelName;
/* Remove the tracking state from the client 'c'. Note that there is not much /* Remove the tracking state from the client 'c'. Note that there is not much
* to do for us here, if not to decrement the counter of the clients in * to do for us here, if not to decrement the counter of the clients in
@ -87,8 +88,10 @@ void enableTracking(client *c, uint64_t redirect_to) {
c->flags &= ~CLIENT_TRACKING_BROKEN_REDIR; c->flags &= ~CLIENT_TRACKING_BROKEN_REDIR;
c->client_tracking_redirection = redirect_to; c->client_tracking_redirection = redirect_to;
server.tracking_clients++; server.tracking_clients++;
if (TrackingTable == NULL) if (TrackingTable == NULL) {
TrackingTable = zcalloc(sizeof(rax*) * TRACKING_TABLE_SIZE); TrackingTable = zcalloc(sizeof(rax*) * TRACKING_TABLE_SIZE);
TrackingChannelName = createStringObject("__redis__:invalidate",20);
}
} }
/* This function is called after the excution of a readonly command in the /* This function is called after the excution of a readonly command in the
@ -130,6 +133,7 @@ void trackingInvalidateKey(robj *keyobj) {
uint64_t id; uint64_t id;
memcpy(&id,ri.key,ri.key_len); memcpy(&id,ri.key,ri.key_len);
client *c = lookupClientByID(id); client *c = lookupClientByID(id);
int using_redirection = 0;
if (c->client_tracking_redirection) { if (c->client_tracking_redirection) {
client *redir = lookupClientByID(c->client_tracking_redirection); client *redir = lookupClientByID(c->client_tracking_redirection);
if (!redir) { if (!redir) {
@ -144,13 +148,21 @@ void trackingInvalidateKey(robj *keyobj) {
continue; continue;
} }
c = redir; c = redir;
using_redirection = 1;
} }
/* Only send such info for clients in RESP version 3 or more. */ /* Only send such info for clients in RESP version 3 or more. However
* if redirection is active, and the connection we redirect to is
* in Pub/Sub mode, we can support the feature with RESP 2 as well,
* by sending Pub/Sub messages in the __redis__:invalidate channel. */
if (c->resp > 2) { if (c->resp > 2) {
addReplyPushLen(c,2); addReplyPushLen(c,2);
addReplyBulkCBuffer(c,"invalidate",10); addReplyBulkCBuffer(c,"invalidate",10);
addReplyLongLong(c,hash); addReplyLongLong(c,hash);
} else if (using_redirection && c->flags & CLIENT_PUBSUB) {
robj *msg = createStringObjectFromLongLong(hash);
addReplyPubsubMessage(c,TrackingChannelName,msg);
decrRefCount(msg);
} }
} }
raxStop(&ri); raxStop(&ri);