From 71f3f3f1afe4fbb6f8634970258b5dec2d389c68 Mon Sep 17 00:00:00 2001 From: antirez Date: Tue, 11 Feb 2020 18:11:59 +0100 Subject: [PATCH] Tracking: BCAST: broadcasting of keys in prefixes implemented. --- src/server.h | 1 + src/tracking.c | 104 ++++++++++++++++++++++++++++++++++++++++++++----- 2 files changed, 95 insertions(+), 10 deletions(-) diff --git a/src/server.h b/src/server.h index 725c3cbc8..439bbc393 100644 --- a/src/server.h +++ b/src/server.h @@ -1659,6 +1659,7 @@ void trackingInvalidateKeysOnFlush(int dbid); void trackingLimitUsedSlots(void); uint64_t trackingGetTotalItems(void); uint64_t trackingGetTotalKeys(void); +void trackingBroadcastInvalidationMessages(void); /* List data type */ void listTypeTryConversion(robj *subject, robj *value); diff --git a/src/tracking.c b/src/tracking.c index 9f46275a4..345c5f1ad 100644 --- a/src/tracking.c +++ b/src/tracking.c @@ -167,7 +167,17 @@ void trackingRememberKeys(client *c) { getKeysFreeResult(keys); } -void sendTrackingMessage(client *c, char *keyname, size_t keylen) { +/* Given a key name, this function sends an invalidation message in the + * proper channel (depending on RESP version: PubSub or Push message) and + * to the proper client (in case fo redirection), in the context of the + * client 'c' with tracking enabled. + * + * In case the 'proto' argument is non zero, the function will assume that + * 'keyname' points to a buffer of 'keylen' bytes already expressed in the + * form of Redis RESP protocol, representing an array of keys to send + * to the client as value of the invalidation. This is used in BCAST mode + * in order to optimized the implementation to use less CPU time. */ +void sendTrackingMessage(client *c, char *keyname, size_t keylen, int proto) { int using_redirection = 0; if (c->client_tracking_redirection) { client *redir = lookupClientByID(c->client_tracking_redirection); @@ -193,18 +203,38 @@ void sendTrackingMessage(client *c, char *keyname, size_t keylen) { if (c->resp > 2) { addReplyPushLen(c,2); addReplyBulkCBuffer(c,"invalidate",10); - addReplyArrayLen(c,1); - addReplyBulkCBuffer(c,keyname,keylen); } else if (using_redirection && c->flags & CLIENT_PUBSUB) { /* We use a static object to speedup things, however we assume * that addReplyPubsubMessage() will not take a reference. */ - robj keyobj; - initStaticStringObject(keyobj,keyname); addReplyPubsubMessage(c,TrackingChannelName,NULL); - addReplyArrayLen(c,1); - addReplyBulk(c,&keyobj); - serverAssert(keyobj.refcount == 1); } + + /* Send the "value" part, which is the array of keys. */ + if (proto) { + addReplyProto(c,keyname,keylen); + } else { + addReplyArrayLen(c,1); + addReplyBulkCBuffer(c,keyname,keylen); + } +} + +/* This function is called when a key is modified in Redis and in the case + * we have at least one client with the BCAST mode enabled. + * Its goal is to set the key in the right broadcast state if the key + * matches one or more prefixes in the prefix table. Later when we + * return to the event loop, we'll send invalidation messages to the + * clients subscribed to each prefix. */ +void trackingRememberKeyToBroadcast(char *keyname, size_t keylen) { + raxIterator ri; + raxStart(&ri,PrefixTable); + raxSeek(&ri,"^",NULL,0); + while(raxNext(&ri)) { + if (keylen > ri.key_len) continue; + if (memcmp(ri.key,keyname,ri.key_len) != 0) continue; + bcastState *bs = ri.data; + raxTryInsert(bs->keys,(unsigned char*)keyname,keylen,NULL,NULL); + } + raxStop(&ri); } /* This function is called from signalModifiedKey() or other places in Redis @@ -214,6 +244,10 @@ void sendTrackingMessage(client *c, char *keyname, size_t keylen) { void trackingInvalidateKey(robj *keyobj) { if (TrackingTable == NULL) return; sds sdskey = keyobj->ptr; + + if (raxSize(PrefixTable) > 0) + trackingRememberKeyToBroadcast(sdskey,sdslen(sdskey)); + rax *ids = raxFind(TrackingTable,(unsigned char*)sdskey,sdslen(sdskey)); if (ids == raxNotFound) return;; @@ -225,7 +259,7 @@ void trackingInvalidateKey(robj *keyobj) { memcpy(&id,ri.key,sizeof(id)); client *c = lookupClientByID(id); if (c == NULL || !(c->flags & CLIENT_TRACKING)) continue; - sendTrackingMessage(c,sdskey,sdslen(sdskey)); + sendTrackingMessage(c,sdskey,sdslen(sdskey),0); } raxStop(&ri); @@ -262,7 +296,7 @@ void trackingInvalidateKeysOnFlush(int dbid) { while ((ln = listNext(&li)) != NULL) { client *c = listNodeValue(ln); if (c->flags & CLIENT_TRACKING) { - sendTrackingMessage(c,"",1); + sendTrackingMessage(c,"",1,0); } } } @@ -323,6 +357,56 @@ void trackingLimitUsedSlots(void) { timeout_counter++; } +/* This function will run the prefixes of clients in BCAST mode and + * keys that were modified about each prefix, and will send the + * notifications to each client in each prefix. */ +void trackingBroadcastInvalidationMessages(void) { + raxIterator ri, ri2; + raxStart(&ri,PrefixTable); + raxSeek(&ri,"^",NULL,0); + while(raxNext(&ri)) { + bcastState *bs = ri.data; + /* Create the array reply with the list of keys once, then send + * it to all the clients subscribed to this prefix. */ + char buf[32]; + size_t len = ll2string(buf,sizeof(buf),raxSize(bs->keys)); + sds proto = sdsempty(); + proto = sdsMakeRoomFor(proto,raxSize(bs->keys)*15); + proto = sdscatlen(proto,"*",1); + proto = sdscatlen(proto,buf,len); + proto = sdscatlen(proto,"\r\n",2); + raxStart(&ri2,bs->keys); + raxSeek(&ri2,"^",NULL,0); + while(raxNext(&ri2)) { + len = ll2string(buf,sizeof(buf),ri2.key_len); + sds proto = sdsnewlen("$",1); + proto = sdscatlen(proto,ri2.key,ri2.key_len); + proto = sdscatlen(proto,"\r\n",2); + } + raxStop(&ri2); + + /* Send this array of keys to every client in the list. */ + raxStart(&ri2,bs->clients); + raxSeek(&ri2,"^",NULL,0); + while(raxNext(&ri2)) { + client *c; + memcpy(&c,ri2.key,sizeof(c)); + sendTrackingMessage(c,proto,sdslen(proto),1); + } + raxStop(&ri2); + + /* Clean up: we can remove everything from this state, because we + * want to only track the new keys that will be accumulated starting + * from now. */ + sdsfree(proto); + raxFree(bs->clients); + raxFree(bs->keys); + bs->clients = raxNew(); + bs->keys = raxNew(); + } + raxStop(&ri); +} + /* This is just used in order to access the amount of used slots in the * tracking table. */ uint64_t trackingGetTotalItems(void) {