/* tracking.c - Client side caching: keys tracking and invalidation * * Copyright (c) 2019, Salvatore Sanfilippo * All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * * Redistributions of source code must retain the above copyright notice, * this list of conditions and the following disclaimer. * * Redistributions in binary form must reproduce the above copyright * notice, this list of conditions and the following disclaimer in the * documentation and/or other materials provided with the distribution. * * Neither the name of Redis nor the names of its contributors may be used * to endorse or promote products derived from this software without * specific prior written permission. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE * POSSIBILITY OF SUCH DAMAGE. */ #include "server.h" /* The tracking table is constituted by a radix tree of keys, each pointing * to a radix tree of client IDs, used to track the clients that may have * certain keys in their local, client side, cache. * * When a client enables tracking with "CLIENT TRACKING on", each key served to * the client is remembered in the table mapping the keys to the client IDs. * Later, when a key is modified, all the clients that may have local copy * of such key will receive an invalidation message. * * Clients will normally take frequently requested objects in memory, removing * them when invalidation messages are received. */ rax *TrackingTable = NULL; rax *PrefixTable = NULL; uint64_t TrackingTableTotalItems = 0; /* Total number of IDs stored across the whole tracking table. This gives an hint about the total memory we are using server side for CSC. */ robj *TrackingChannelName; /* This is the structure that we have as value of the PrefixTable, and * represents the list of keys modified, and the list of clients that need * to be notified, for a given prefix. */ typedef struct bcastState { rax *keys; /* Keys modified in the current event loop cycle. */ rax *clients; /* Clients subscribed to the notification events for this prefix. */ } bcastState; /* Remove the tracking state from the client 'c'. Note that there is not much * to do for us here, if not to decrement the counter of the clients in * tracking mode, because we just store the ID of the client in the tracking * table, so we'll remove the ID reference in a lazy way. Otherwise when a * client with many entries in the table is removed, it would cost a lot of * time to do the cleanup. */ void disableTracking(client *c) { /* If this client is in broadcasting mode, we need to unsubscribe it * from all the prefixes it is registered to. */ if (c->flags & CLIENT_TRACKING_BCAST) { raxIterator ri; raxStart(&ri,c->client_tracking_prefixes); raxSeek(&ri,"^",NULL,0); while(raxNext(&ri)) { bcastState *bs = raxFind(PrefixTable,ri.key,ri.key_len); serverAssert(bs != raxNotFound); raxRemove(bs->clients,(unsigned char*)&c,sizeof(c),NULL); /* Was it the last client? Remove the prefix from the * table. */ if (raxSize(bs->clients) == 0) { raxFree(bs->clients); raxFree(bs->keys); zfree(bs); raxRemove(PrefixTable,ri.key,ri.key_len,NULL); } } raxStop(&ri); raxFree(c->client_tracking_prefixes); c->client_tracking_prefixes = NULL; } /* Clear flags and adjust the count. */ if (c->flags & CLIENT_TRACKING) { server.tracking_clients--; c->flags &= ~(CLIENT_TRACKING|CLIENT_TRACKING_BROKEN_REDIR| CLIENT_TRACKING_BCAST|CLIENT_TRACKING_OPTIN| CLIENT_TRACKING_OPTOUT|CLIENT_TRACKING_CACHING); } } /* Set the client 'c' to track the prefix 'prefix'. If the client 'c' is * already registered for the specified prefix, no operation is performed. */ void enableBcastTrackingForPrefix(client *c, char *prefix, size_t plen) { bcastState *bs = raxFind(PrefixTable,(unsigned char*)prefix,sdslen(prefix)); /* If this is the first client subscribing to such prefix, create * the prefix in the table. */ if (bs == raxNotFound) { bs = zmalloc(sizeof(*bs)); bs->keys = raxNew(); bs->clients = raxNew(); raxInsert(PrefixTable,(unsigned char*)prefix,plen,bs,NULL); } if (raxTryInsert(bs->clients,(unsigned char*)&c,sizeof(c),NULL,NULL)) { if (c->client_tracking_prefixes == NULL) c->client_tracking_prefixes = raxNew(); raxInsert(c->client_tracking_prefixes, (unsigned char*)prefix,plen,NULL,NULL); } } /* Enable the tracking state for the client 'c', and as a side effect allocates * the tracking table if needed. If the 'redirect_to' argument is non zero, the * invalidation messages for this client will be sent to the client ID * specified by the 'redirect_to' argument. Note that if such client will * eventually get freed, we'll send a message to the original client to * inform it of the condition. Multiple clients can redirect the invalidation * messages to the same client ID. */ void enableTracking(client *c, uint64_t redirect_to, uint64_t options, robj **prefix, size_t numprefix) { if (!(c->flags & CLIENT_TRACKING)) server.tracking_clients++; c->flags |= CLIENT_TRACKING; c->flags &= ~(CLIENT_TRACKING_BROKEN_REDIR|CLIENT_TRACKING_BCAST| CLIENT_TRACKING_OPTIN|CLIENT_TRACKING_OPTOUT); c->client_tracking_redirection = redirect_to; if (TrackingTable == NULL) { TrackingTable = raxNew(); PrefixTable = raxNew(); TrackingChannelName = createStringObject("__redis__:invalidate",20); } if (options & CLIENT_TRACKING_BCAST) { c->flags |= CLIENT_TRACKING_BCAST; if (numprefix == 0) enableBcastTrackingForPrefix(c,"",0); for (size_t j = 0; j < numprefix; j++) { sds sdsprefix = prefix[j]->ptr; enableBcastTrackingForPrefix(c,sdsprefix,sdslen(sdsprefix)); } } c->flags |= options & (CLIENT_TRACKING_OPTIN|CLIENT_TRACKING_OPTOUT); } /* This function is called after the execution of a readonly command in the * case the client 'c' has keys tracking enabled and the tracking is not * in BCAST mode. It will populate the tracking invalidation table according * to the keys the user fetched, so that Redis will know what are the clients * that should receive an invalidation message with certain groups of keys * are modified. */ void trackingRememberKeys(client *c) { /* Return if we are in optin/out mode and the right CACHING command * was/wasn't given in order to modify the default behavior. */ uint64_t optin = c->flags & CLIENT_TRACKING_OPTIN; uint64_t optout = c->flags & CLIENT_TRACKING_OPTOUT; uint64_t caching_given = c->flags & CLIENT_TRACKING_CACHING; if ((optin && !caching_given) || (optout && caching_given)) return; int numkeys; int *keys = getKeysFromCommand(c->cmd,c->argv,c->argc,&numkeys); if (keys == NULL) return; for(int j = 0; j < numkeys; j++) { int idx = keys[j]; sds sdskey = c->argv[idx]->ptr; rax *ids = raxFind(TrackingTable,(unsigned char*)sdskey,sdslen(sdskey)); if (ids == raxNotFound) { ids = raxNew(); int inserted = raxTryInsert(TrackingTable,(unsigned char*)sdskey, sdslen(sdskey),ids, NULL); serverAssert(inserted == 1); } if (raxTryInsert(ids,(unsigned char*)&c->id,sizeof(c->id),NULL,NULL)) TrackingTableTotalItems++; } getKeysFreeResult(keys); } /* Given a key name, this function sends an invalidation message in the * proper channel (depending on RESP version: PubSub or Push message) and * to the proper client (in case fo redirection), in the context of the * client 'c' with tracking enabled. * * In case the 'proto' argument is non zero, the function will assume that * 'keyname' points to a buffer of 'keylen' bytes already expressed in the * form of Redis RESP protocol, representing an array of keys to send * to the client as value of the invalidation. This is used in BCAST mode * in order to optimized the implementation to use less CPU time. */ void sendTrackingMessage(client *c, char *keyname, size_t keylen, int proto) { int using_redirection = 0; if (c->client_tracking_redirection) { client *redir = lookupClientByID(c->client_tracking_redirection); if (!redir) { /* We need to signal to the original connection that we * are unable to send invalidation messages to the redirected * connection, because the client no longer exist. */ if (c->resp > 2) { addReplyPushLen(c,3); addReplyBulkCBuffer(c,"tracking-redir-broken",21); addReplyLongLong(c,c->client_tracking_redirection); } return; } c = redir; using_redirection = 1; } /* Only send such info for clients in RESP version 3 or more. However * if redirection is active, and the connection we redirect to is * in Pub/Sub mode, we can support the feature with RESP 2 as well, * by sending Pub/Sub messages in the __redis__:invalidate channel. */ if (c->resp > 2) { addReplyPushLen(c,2); addReplyBulkCBuffer(c,"invalidate",10); } else if (using_redirection && c->flags & CLIENT_PUBSUB) { /* We use a static object to speedup things, however we assume * that addReplyPubsubMessage() will not take a reference. */ addReplyPubsubMessage(c,TrackingChannelName,NULL); } else { /* If are here, the client is not using RESP3, nor is * redirecting to another client. We can't send anything to * it since RESP2 does not support push messages in the same * connection. */ return; } /* Send the "value" part, which is the array of keys. */ if (proto) { addReplyProto(c,keyname,keylen); } else { addReplyArrayLen(c,1); addReplyBulkCBuffer(c,keyname,keylen); } } /* This function is called when a key is modified in Redis and in the case * we have at least one client with the BCAST mode enabled. * Its goal is to set the key in the right broadcast state if the key * matches one or more prefixes in the prefix table. Later when we * return to the event loop, we'll send invalidation messages to the * clients subscribed to each prefix. */ void trackingRememberKeyToBroadcast(char *keyname, size_t keylen) { raxIterator ri; raxStart(&ri,PrefixTable); raxSeek(&ri,"^",NULL,0); while(raxNext(&ri)) { if (ri.key_len > keylen) continue; if (ri.key_len != 0 && memcmp(ri.key,keyname,ri.key_len) != 0) continue; bcastState *bs = ri.data; raxTryInsert(bs->keys,(unsigned char*)keyname,keylen,NULL,NULL); } raxStop(&ri); } /* This function is called from signalModifiedKey() or other places in Redis * when a key changes value. In the context of keys tracking, our task here is * to send a notification to every client that may have keys about such caching * slot. */ void trackingInvalidateKey(robj *keyobj) { if (TrackingTable == NULL) return; sds sdskey = keyobj->ptr; if (raxSize(PrefixTable) > 0) trackingRememberKeyToBroadcast(sdskey,sdslen(sdskey)); rax *ids = raxFind(TrackingTable,(unsigned char*)sdskey,sdslen(sdskey)); if (ids == raxNotFound) return; raxIterator ri; raxStart(&ri,ids); raxSeek(&ri,"^",NULL,0); while(raxNext(&ri)) { uint64_t id; memcpy(&id,ri.key,sizeof(id)); client *c = lookupClientByID(id); /* Note that if the client is in BCAST mode, we don't want to * send invalidation messages that were pending in the case * previously the client was not in BCAST mode. This can happen if * TRACKING is enabled normally, and then the client switches to * BCAST mode. */ if (c == NULL || !(c->flags & CLIENT_TRACKING)|| c->flags & CLIENT_TRACKING_BCAST) { continue; } sendTrackingMessage(c,sdskey,sdslen(sdskey),0); } raxStop(&ri); /* Free the tracking table: we'll create the radix tree and populate it * again if more keys will be modified in this caching slot. */ TrackingTableTotalItems -= raxSize(ids); raxFree(ids); raxRemove(TrackingTable,(unsigned char*)sdskey,sdslen(sdskey),NULL); } /* This function is called when one or all the Redis databases are flushed * (dbid == -1 in case of FLUSHALL). Caching keys are not specific for * each DB but are global: currently what we do is send a special * notification to clients with tracking enabled, invalidating the caching * key "", which means, "all the keys", in order to avoid flooding clients * with many invalidation messages for all the keys they may hold. */ void freeTrackingRadixTree(void *rt) { raxFree(rt); } void trackingInvalidateKeysOnFlush(int dbid) { if (server.tracking_clients) { listNode *ln; listIter li; listRewind(server.clients,&li); while ((ln = listNext(&li)) != NULL) { client *c = listNodeValue(ln); if (c->flags & CLIENT_TRACKING) { sendTrackingMessage(c,"",1,0); } } } /* In case of FLUSHALL, reclaim all the memory used by tracking. */ if (dbid == -1 && TrackingTable) { raxFreeWithCallback(TrackingTable,freeTrackingRadixTree); TrackingTable = raxNew(); TrackingTableTotalItems = 0; } } /* Tracking forces Redis to remember information about which client may have * certain keys. In workloads where there are a lot of reads, but keys are * hardly modified, the amount of information we have to remember server side * could be a lot, with the number of keys being totally not bound. * * So Redis allows the user to configure a maximum number of keys for the * invalidation table. This function makes sure that we don't go over the * specified fill rate: if we are over, we can just evict informations about * a random key, and send invalidation messages to clients like if the key was * modified. */ void trackingLimitUsedSlots(void) { static unsigned int timeout_counter = 0; if (TrackingTable == NULL) return; if (server.tracking_table_max_keys == 0) return; /* No limits set. */ size_t max_keys = server.tracking_table_max_keys; if (raxSize(TrackingTable) <= max_keys) { timeout_counter = 0; return; /* Limit not reached. */ } /* We have to invalidate a few keys to reach the limit again. The effort * we do here is proportional to the number of times we entered this * function and found that we are still over the limit. */ int effort = 100 * (timeout_counter+1); /* We just remove one key after another by using a random walk. */ raxIterator ri; raxStart(&ri,TrackingTable); while(effort > 0) { effort--; raxSeek(&ri,"^",NULL,0); raxRandomWalk(&ri,0); rax *ids = ri.data; TrackingTableTotalItems -= raxSize(ids); raxFree(ids); raxRemove(TrackingTable,ri.key,ri.key_len,NULL); if (raxSize(TrackingTable) <= max_keys) { timeout_counter = 0; raxStop(&ri); return; /* Return ASAP: we are again under the limit. */ } } /* If we reach this point, we were not able to go under the configured * limit using the maximum effort we had for this run. */ raxStop(&ri); timeout_counter++; } /* This function will run the prefixes of clients in BCAST mode and * keys that were modified about each prefix, and will send the * notifications to each client in each prefix. */ void trackingBroadcastInvalidationMessages(void) { raxIterator ri, ri2; /* Return ASAP if there is nothing to do here. */ if (TrackingTable == NULL || !server.tracking_clients) return; raxStart(&ri,PrefixTable); raxSeek(&ri,"^",NULL,0); while(raxNext(&ri)) { bcastState *bs = ri.data; if (raxSize(bs->keys)) { /* Create the array reply with the list of keys once, then send * it to all the clients subscribed to this prefix. */ char buf[32]; size_t len = ll2string(buf,sizeof(buf),raxSize(bs->keys)); sds proto = sdsempty(); proto = sdsMakeRoomFor(proto,raxSize(bs->keys)*15); proto = sdscatlen(proto,"*",1); proto = sdscatlen(proto,buf,len); proto = sdscatlen(proto,"\r\n",2); raxStart(&ri2,bs->keys); raxSeek(&ri2,"^",NULL,0); while(raxNext(&ri2)) { len = ll2string(buf,sizeof(buf),ri2.key_len); proto = sdscatlen(proto,"$",1); proto = sdscatlen(proto,buf,len); proto = sdscatlen(proto,"\r\n",2); proto = sdscatlen(proto,ri2.key,ri2.key_len); proto = sdscatlen(proto,"\r\n",2); } raxStop(&ri2); /* Send this array of keys to every client in the list. */ raxStart(&ri2,bs->clients); raxSeek(&ri2,"^",NULL,0); while(raxNext(&ri2)) { client *c; memcpy(&c,ri2.key,sizeof(c)); sendTrackingMessage(c,proto,sdslen(proto),1); } raxStop(&ri2); /* Clean up: we can remove everything from this state, because we * want to only track the new keys that will be accumulated starting * from now. */ sdsfree(proto); } raxFree(bs->keys); bs->keys = raxNew(); } raxStop(&ri); } /* This is just used in order to access the amount of used slots in the * tracking table. */ uint64_t trackingGetTotalItems(void) { return TrackingTableTotalItems; } uint64_t trackingGetTotalKeys(void) { if (TrackingTable == NULL) return 0; return raxSize(TrackingTable); }