Tracking: BCAST: broadcasting of keys in prefixes implemented.

This commit is contained in:
antirez 2020-02-11 18:11:59 +01:00
parent 3f7ba86255
commit 71f3f3f1af
2 changed files with 95 additions and 10 deletions

View File

@ -1659,6 +1659,7 @@ void trackingInvalidateKeysOnFlush(int dbid);
void trackingLimitUsedSlots(void);
uint64_t trackingGetTotalItems(void);
uint64_t trackingGetTotalKeys(void);
void trackingBroadcastInvalidationMessages(void);
/* List data type */
void listTypeTryConversion(robj *subject, robj *value);

View File

@ -167,7 +167,17 @@ void trackingRememberKeys(client *c) {
getKeysFreeResult(keys);
}
void sendTrackingMessage(client *c, char *keyname, size_t keylen) {
/* Given a key name, this function sends an invalidation message in the
* proper channel (depending on RESP version: PubSub or Push message) and
* to the proper client (in case fo redirection), in the context of the
* client 'c' with tracking enabled.
*
* In case the 'proto' argument is non zero, the function will assume that
* 'keyname' points to a buffer of 'keylen' bytes already expressed in the
* form of Redis RESP protocol, representing an array of keys to send
* to the client as value of the invalidation. This is used in BCAST mode
* in order to optimized the implementation to use less CPU time. */
void sendTrackingMessage(client *c, char *keyname, size_t keylen, int proto) {
int using_redirection = 0;
if (c->client_tracking_redirection) {
client *redir = lookupClientByID(c->client_tracking_redirection);
@ -193,18 +203,38 @@ void sendTrackingMessage(client *c, char *keyname, size_t keylen) {
if (c->resp > 2) {
addReplyPushLen(c,2);
addReplyBulkCBuffer(c,"invalidate",10);
addReplyArrayLen(c,1);
addReplyBulkCBuffer(c,keyname,keylen);
} else if (using_redirection && c->flags & CLIENT_PUBSUB) {
/* We use a static object to speedup things, however we assume
* that addReplyPubsubMessage() will not take a reference. */
robj keyobj;
initStaticStringObject(keyobj,keyname);
addReplyPubsubMessage(c,TrackingChannelName,NULL);
addReplyArrayLen(c,1);
addReplyBulk(c,&keyobj);
serverAssert(keyobj.refcount == 1);
}
/* Send the "value" part, which is the array of keys. */
if (proto) {
addReplyProto(c,keyname,keylen);
} else {
addReplyArrayLen(c,1);
addReplyBulkCBuffer(c,keyname,keylen);
}
}
/* This function is called when a key is modified in Redis and in the case
* we have at least one client with the BCAST mode enabled.
* Its goal is to set the key in the right broadcast state if the key
* matches one or more prefixes in the prefix table. Later when we
* return to the event loop, we'll send invalidation messages to the
* clients subscribed to each prefix. */
void trackingRememberKeyToBroadcast(char *keyname, size_t keylen) {
raxIterator ri;
raxStart(&ri,PrefixTable);
raxSeek(&ri,"^",NULL,0);
while(raxNext(&ri)) {
if (keylen > ri.key_len) continue;
if (memcmp(ri.key,keyname,ri.key_len) != 0) continue;
bcastState *bs = ri.data;
raxTryInsert(bs->keys,(unsigned char*)keyname,keylen,NULL,NULL);
}
raxStop(&ri);
}
/* This function is called from signalModifiedKey() or other places in Redis
@ -214,6 +244,10 @@ void sendTrackingMessage(client *c, char *keyname, size_t keylen) {
void trackingInvalidateKey(robj *keyobj) {
if (TrackingTable == NULL) return;
sds sdskey = keyobj->ptr;
if (raxSize(PrefixTable) > 0)
trackingRememberKeyToBroadcast(sdskey,sdslen(sdskey));
rax *ids = raxFind(TrackingTable,(unsigned char*)sdskey,sdslen(sdskey));
if (ids == raxNotFound) return;;
@ -225,7 +259,7 @@ void trackingInvalidateKey(robj *keyobj) {
memcpy(&id,ri.key,sizeof(id));
client *c = lookupClientByID(id);
if (c == NULL || !(c->flags & CLIENT_TRACKING)) continue;
sendTrackingMessage(c,sdskey,sdslen(sdskey));
sendTrackingMessage(c,sdskey,sdslen(sdskey),0);
}
raxStop(&ri);
@ -262,7 +296,7 @@ void trackingInvalidateKeysOnFlush(int dbid) {
while ((ln = listNext(&li)) != NULL) {
client *c = listNodeValue(ln);
if (c->flags & CLIENT_TRACKING) {
sendTrackingMessage(c,"",1);
sendTrackingMessage(c,"",1,0);
}
}
}
@ -323,6 +357,56 @@ void trackingLimitUsedSlots(void) {
timeout_counter++;
}
/* This function will run the prefixes of clients in BCAST mode and
* keys that were modified about each prefix, and will send the
* notifications to each client in each prefix. */
void trackingBroadcastInvalidationMessages(void) {
raxIterator ri, ri2;
raxStart(&ri,PrefixTable);
raxSeek(&ri,"^",NULL,0);
while(raxNext(&ri)) {
bcastState *bs = ri.data;
/* Create the array reply with the list of keys once, then send
* it to all the clients subscribed to this prefix. */
char buf[32];
size_t len = ll2string(buf,sizeof(buf),raxSize(bs->keys));
sds proto = sdsempty();
proto = sdsMakeRoomFor(proto,raxSize(bs->keys)*15);
proto = sdscatlen(proto,"*",1);
proto = sdscatlen(proto,buf,len);
proto = sdscatlen(proto,"\r\n",2);
raxStart(&ri2,bs->keys);
raxSeek(&ri2,"^",NULL,0);
while(raxNext(&ri2)) {
len = ll2string(buf,sizeof(buf),ri2.key_len);
sds proto = sdsnewlen("$",1);
proto = sdscatlen(proto,ri2.key,ri2.key_len);
proto = sdscatlen(proto,"\r\n",2);
}
raxStop(&ri2);
/* Send this array of keys to every client in the list. */
raxStart(&ri2,bs->clients);
raxSeek(&ri2,"^",NULL,0);
while(raxNext(&ri2)) {
client *c;
memcpy(&c,ri2.key,sizeof(c));
sendTrackingMessage(c,proto,sdslen(proto),1);
}
raxStop(&ri2);
/* Clean up: we can remove everything from this state, because we
* want to only track the new keys that will be accumulated starting
* from now. */
sdsfree(proto);
raxFree(bs->clients);
raxFree(bs->keys);
bs->clients = raxNew();
bs->keys = raxNew();
}
raxStop(&ri);
}
/* This is just used in order to access the amount of used slots in the
* tracking table. */
uint64_t trackingGetTotalItems(void) {