mirror of
https://codeberg.org/redict/redict.git
synced 2025-01-22 16:18:28 -05:00
Tracking: optin/out implemented.
This commit is contained in:
parent
8a14fff545
commit
b6378edcd6
@ -1365,6 +1365,12 @@ void resetClient(client *c) {
|
|||||||
if (!(c->flags & CLIENT_MULTI) && prevcmd != askingCommand)
|
if (!(c->flags & CLIENT_MULTI) && prevcmd != askingCommand)
|
||||||
c->flags &= ~CLIENT_ASKING;
|
c->flags &= ~CLIENT_ASKING;
|
||||||
|
|
||||||
|
/* We do the same for the CACHING command as well. It also affects
|
||||||
|
* the next command or transaction executed, in a way very similar
|
||||||
|
* to ASKING. */
|
||||||
|
if (!(c->flags & CLIENT_MULTI) && prevcmd != clientCommand)
|
||||||
|
c->flags &= ~CLIENT_TRACKING_CACHING;
|
||||||
|
|
||||||
/* Remove the CLIENT_REPLY_SKIP flag if any so that the reply
|
/* Remove the CLIENT_REPLY_SKIP flag if any so that the reply
|
||||||
* to the next command will be sent, but set the flag if the command
|
* to the next command will be sent, but set the flag if the command
|
||||||
* we just processed was "CLIENT REPLY SKIP". */
|
* we just processed was "CLIENT REPLY SKIP". */
|
||||||
@ -2044,7 +2050,7 @@ void clientCommand(client *c) {
|
|||||||
"REPLY (on|off|skip) -- Control the replies sent to the current connection.",
|
"REPLY (on|off|skip) -- Control the replies sent to the current connection.",
|
||||||
"SETNAME <name> -- Assign the name <name> to the current connection.",
|
"SETNAME <name> -- Assign the name <name> to the current connection.",
|
||||||
"UNBLOCK <clientid> [TIMEOUT|ERROR] -- Unblock the specified blocked client.",
|
"UNBLOCK <clientid> [TIMEOUT|ERROR] -- Unblock the specified blocked client.",
|
||||||
"TRACKING (on|off) [REDIRECT <id>] [BCAST] [PREFIX first] [PREFIX second] ... -- Enable client keys tracking for client side caching.",
|
"TRACKING (on|off) [REDIRECT <id>] [BCAST] [PREFIX first] [PREFIX second] [OPTIN] [OPTOUT]... -- Enable client keys tracking for client side caching.",
|
||||||
"GETREDIR -- Return the client ID we are redirecting to when tracking is enabled.",
|
"GETREDIR -- Return the client ID we are redirecting to when tracking is enabled.",
|
||||||
NULL
|
NULL
|
||||||
};
|
};
|
||||||
@ -2221,9 +2227,9 @@ NULL
|
|||||||
addReply(c,shared.ok);
|
addReply(c,shared.ok);
|
||||||
} else if (!strcasecmp(c->argv[1]->ptr,"tracking") && c->argc >= 3) {
|
} else if (!strcasecmp(c->argv[1]->ptr,"tracking") && c->argc >= 3) {
|
||||||
/* CLIENT TRACKING (on|off) [REDIRECT <id>] [BCAST] [PREFIX first]
|
/* CLIENT TRACKING (on|off) [REDIRECT <id>] [BCAST] [PREFIX first]
|
||||||
* [PREFIX second] ... */
|
* [PREFIX second] [OPTIN] [OPTOUT] ... */
|
||||||
long long redir = 0;
|
long long redir = 0;
|
||||||
int bcast = 0;
|
uint64_t options = 0;
|
||||||
robj **prefix = NULL;
|
robj **prefix = NULL;
|
||||||
size_t numprefix = 0;
|
size_t numprefix = 0;
|
||||||
|
|
||||||
@ -2256,7 +2262,11 @@ NULL
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
} else if (!strcasecmp(c->argv[j]->ptr,"bcast")) {
|
} else if (!strcasecmp(c->argv[j]->ptr,"bcast")) {
|
||||||
bcast = 1;
|
options |= CLIENT_TRACKING_BCAST;
|
||||||
|
} else if (!strcasecmp(c->argv[j]->ptr,"optin")) {
|
||||||
|
options |= CLIENT_TRACKING_OPTIN;
|
||||||
|
} else if (!strcasecmp(c->argv[j]->ptr,"optout")) {
|
||||||
|
options |= CLIENT_TRACKING_OPTOUT;
|
||||||
} else if (!strcasecmp(c->argv[j]->ptr,"prefix") && moreargs) {
|
} else if (!strcasecmp(c->argv[j]->ptr,"prefix") && moreargs) {
|
||||||
j++;
|
j++;
|
||||||
prefix = zrealloc(prefix,sizeof(robj*)*(numprefix+1));
|
prefix = zrealloc(prefix,sizeof(robj*)*(numprefix+1));
|
||||||
@ -2272,7 +2282,7 @@ NULL
|
|||||||
if (!strcasecmp(c->argv[2]->ptr,"on")) {
|
if (!strcasecmp(c->argv[2]->ptr,"on")) {
|
||||||
/* Before enabling tracking, make sure options are compatible
|
/* Before enabling tracking, make sure options are compatible
|
||||||
* among each other and with the current state of the client. */
|
* among each other and with the current state of the client. */
|
||||||
if (!bcast && numprefix) {
|
if (!(options & CLIENT_TRACKING_BCAST) && numprefix) {
|
||||||
addReplyError(c,
|
addReplyError(c,
|
||||||
"PREFIX option requires BCAST mode to be enabled");
|
"PREFIX option requires BCAST mode to be enabled");
|
||||||
zfree(prefix);
|
zfree(prefix);
|
||||||
@ -2281,7 +2291,8 @@ NULL
|
|||||||
|
|
||||||
if (c->flags & CLIENT_TRACKING) {
|
if (c->flags & CLIENT_TRACKING) {
|
||||||
int oldbcast = !!(c->flags & CLIENT_TRACKING_BCAST);
|
int oldbcast = !!(c->flags & CLIENT_TRACKING_BCAST);
|
||||||
if (oldbcast != bcast) {
|
int newbcast = !!(options & CLIENT_TRACKING_BCAST);
|
||||||
|
if (oldbcast != newbcast) {
|
||||||
addReplyError(c,
|
addReplyError(c,
|
||||||
"You can't switch BCAST mode on/off before disabling "
|
"You can't switch BCAST mode on/off before disabling "
|
||||||
"tracking for this client, and then re-enabling it with "
|
"tracking for this client, and then re-enabling it with "
|
||||||
@ -2290,7 +2301,17 @@ NULL
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
enableTracking(c,redir,bcast,prefix,numprefix);
|
|
||||||
|
if (options & CLIENT_TRACKING_BCAST &&
|
||||||
|
options & (CLIENT_TRACKING_OPTIN|CLIENT_TRACKING_OPTOUT))
|
||||||
|
{
|
||||||
|
addReplyError(c,
|
||||||
|
"OPTIN and OPTOUT are not compatible with BCAST");
|
||||||
|
zfree(prefix);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
enableTracking(c,redir,options,prefix,numprefix);
|
||||||
} else if (!strcasecmp(c->argv[2]->ptr,"off")) {
|
} else if (!strcasecmp(c->argv[2]->ptr,"off")) {
|
||||||
disableTracking(c);
|
disableTracking(c);
|
||||||
} else {
|
} else {
|
||||||
@ -2300,6 +2321,36 @@ NULL
|
|||||||
}
|
}
|
||||||
zfree(prefix);
|
zfree(prefix);
|
||||||
addReply(c,shared.ok);
|
addReply(c,shared.ok);
|
||||||
|
} else if (!strcasecmp(c->argv[1]->ptr,"caching") && c->argc >= 3) {
|
||||||
|
if (!(c->flags & CLIENT_TRACKING)) {
|
||||||
|
addReplyError(c,"CLIENT CACHING can be called only when the "
|
||||||
|
"client is in tracking mode with OPTIN or "
|
||||||
|
"OPTOUT mode enabled");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
char *opt = c->argv[2]->ptr;
|
||||||
|
if (!strcasecmp(opt,"yes")) {
|
||||||
|
if (c->flags & CLIENT_TRACKING_OPTIN) {
|
||||||
|
c->flags |= CLIENT_TRACKING_CACHING;
|
||||||
|
} else {
|
||||||
|
addReplyError(c,"CLIENT CACHING YES is only valid when tracking is enabled in OPTIN mode.");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
} else if (!strcasecmp(opt,"no")) {
|
||||||
|
if (c->flags & CLIENT_TRACKING_OPTOUT) {
|
||||||
|
c->flags |= CLIENT_TRACKING_CACHING;
|
||||||
|
} else {
|
||||||
|
addReplyError(c,"CLIENT CACHING NO is only valid when tracking is enabled in OPTOUT mode.");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
addReply(c,shared.syntaxerr);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Common reply for when we succeeded. */
|
||||||
|
addReply(c,shared.ok);
|
||||||
} else if (!strcasecmp(c->argv[1]->ptr,"getredir") && c->argc == 2) {
|
} else if (!strcasecmp(c->argv[1]->ptr,"getredir") && c->argc == 2) {
|
||||||
/* CLIENT GETREDIR */
|
/* CLIENT GETREDIR */
|
||||||
if (c->flags & CLIENT_TRACKING) {
|
if (c->flags & CLIENT_TRACKING) {
|
||||||
|
@ -248,6 +248,10 @@ typedef long long ustime_t; /* microsecond time type. */
|
|||||||
perform client side caching. */
|
perform client side caching. */
|
||||||
#define CLIENT_TRACKING_BROKEN_REDIR (1ULL<<32) /* Target client is invalid. */
|
#define CLIENT_TRACKING_BROKEN_REDIR (1ULL<<32) /* Target client is invalid. */
|
||||||
#define CLIENT_TRACKING_BCAST (1ULL<<33) /* Tracking in BCAST mode. */
|
#define CLIENT_TRACKING_BCAST (1ULL<<33) /* Tracking in BCAST mode. */
|
||||||
|
#define CLIENT_TRACKING_OPTIN (1ULL<<34) /* Tracking in opt-in mode. */
|
||||||
|
#define CLIENT_TRACKING_OPTOUT (1ULL<<35) /* Tracking in opt-out mode. */
|
||||||
|
#define CLIENT_TRACKING_CACHING (1ULL<<36) /* CACHING yes/no was given,
|
||||||
|
depending on optin/optout mode. */
|
||||||
|
|
||||||
/* Client block type (btype field in client structure)
|
/* Client block type (btype field in client structure)
|
||||||
* if CLIENT_BLOCKED flag is set. */
|
* if CLIENT_BLOCKED flag is set. */
|
||||||
@ -1651,7 +1655,7 @@ void addReplyStatusFormat(client *c, const char *fmt, ...);
|
|||||||
#endif
|
#endif
|
||||||
|
|
||||||
/* Client side caching (tracking mode) */
|
/* Client side caching (tracking mode) */
|
||||||
void enableTracking(client *c, uint64_t redirect_to, int bcast, robj **prefix, size_t numprefix);
|
void enableTracking(client *c, uint64_t redirect_to, uint64_t options, robj **prefix, size_t numprefix);
|
||||||
void disableTracking(client *c);
|
void disableTracking(client *c);
|
||||||
void trackingRememberKeys(client *c);
|
void trackingRememberKeys(client *c);
|
||||||
void trackingInvalidateKey(robj *keyobj);
|
void trackingInvalidateKey(robj *keyobj);
|
||||||
|
@ -93,7 +93,8 @@ void disableTracking(client *c) {
|
|||||||
if (c->flags & CLIENT_TRACKING) {
|
if (c->flags & CLIENT_TRACKING) {
|
||||||
server.tracking_clients--;
|
server.tracking_clients--;
|
||||||
c->flags &= ~(CLIENT_TRACKING|CLIENT_TRACKING_BROKEN_REDIR|
|
c->flags &= ~(CLIENT_TRACKING|CLIENT_TRACKING_BROKEN_REDIR|
|
||||||
CLIENT_TRACKING_BCAST);
|
CLIENT_TRACKING_BCAST|CLIENT_TRACKING_OPTIN|
|
||||||
|
CLIENT_TRACKING_OPTOUT);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -124,10 +125,11 @@ void enableBcastTrackingForPrefix(client *c, char *prefix, size_t plen) {
|
|||||||
* eventually get freed, we'll send a message to the original client to
|
* eventually get freed, we'll send a message to the original client to
|
||||||
* inform it of the condition. Multiple clients can redirect the invalidation
|
* inform it of the condition. Multiple clients can redirect the invalidation
|
||||||
* messages to the same client ID. */
|
* messages to the same client ID. */
|
||||||
void enableTracking(client *c, uint64_t redirect_to, int bcast, robj **prefix, size_t numprefix) {
|
void enableTracking(client *c, uint64_t redirect_to, uint64_t options, robj **prefix, size_t numprefix) {
|
||||||
if (!(c->flags & CLIENT_TRACKING)) server.tracking_clients++;
|
if (!(c->flags & CLIENT_TRACKING)) server.tracking_clients++;
|
||||||
c->flags |= CLIENT_TRACKING;
|
c->flags |= CLIENT_TRACKING;
|
||||||
c->flags &= ~(CLIENT_TRACKING_BROKEN_REDIR|CLIENT_TRACKING_BCAST);
|
c->flags &= ~(CLIENT_TRACKING_BROKEN_REDIR|CLIENT_TRACKING_BCAST|
|
||||||
|
CLIENT_TRACKING_OPTIN|CLIENT_TRACKING_OPTOUT);
|
||||||
c->client_tracking_redirection = redirect_to;
|
c->client_tracking_redirection = redirect_to;
|
||||||
if (TrackingTable == NULL) {
|
if (TrackingTable == NULL) {
|
||||||
TrackingTable = raxNew();
|
TrackingTable = raxNew();
|
||||||
@ -135,7 +137,7 @@ void enableTracking(client *c, uint64_t redirect_to, int bcast, robj **prefix, s
|
|||||||
TrackingChannelName = createStringObject("__redis__:invalidate",20);
|
TrackingChannelName = createStringObject("__redis__:invalidate",20);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (bcast) {
|
if (options & CLIENT_TRACKING_BCAST) {
|
||||||
c->flags |= CLIENT_TRACKING_BCAST;
|
c->flags |= CLIENT_TRACKING_BCAST;
|
||||||
if (numprefix == 0) enableBcastTrackingForPrefix(c,"",0);
|
if (numprefix == 0) enableBcastTrackingForPrefix(c,"",0);
|
||||||
for (size_t j = 0; j < numprefix; j++) {
|
for (size_t j = 0; j < numprefix; j++) {
|
||||||
@ -143,14 +145,23 @@ void enableTracking(client *c, uint64_t redirect_to, int bcast, robj **prefix, s
|
|||||||
enableBcastTrackingForPrefix(c,sdsprefix,sdslen(sdsprefix));
|
enableBcastTrackingForPrefix(c,sdsprefix,sdslen(sdsprefix));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
c->flags |= options & (CLIENT_TRACKING_OPTIN|CLIENT_TRACKING_OPTOUT);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* This function is called after the execution of a readonly command in the
|
/* This function is called after the execution of a readonly command in the
|
||||||
* case the client 'c' has keys tracking enabled. It will populate the
|
* case the client 'c' has keys tracking enabled and the tracking is not
|
||||||
* tracking invalidation table according to the keys the user fetched, so that
|
* in BCAST mode. It will populate the tracking invalidation table according
|
||||||
* Redis will know what are the clients that should receive an invalidation
|
* to the keys the user fetched, so that Redis will know what are the clients
|
||||||
* message with certain groups of keys are modified. */
|
* that should receive an invalidation message with certain groups of keys
|
||||||
|
* are modified. */
|
||||||
void trackingRememberKeys(client *c) {
|
void trackingRememberKeys(client *c) {
|
||||||
|
/* Return if we are in optin/out mode and the right CACHING command
|
||||||
|
* was/wasn't given in order to modify the default behavior. */
|
||||||
|
uint64_t optin = c->flags & CLIENT_TRACKING_OPTIN;
|
||||||
|
uint64_t optout = c->flags & CLIENT_TRACKING_OPTOUT;
|
||||||
|
uint64_t caching_given = c->flags & CLIENT_TRACKING_CACHING;
|
||||||
|
if ((optin && !caching_given) || (optout && caching_given)) return;
|
||||||
|
|
||||||
int numkeys;
|
int numkeys;
|
||||||
int *keys = getKeysFromCommand(c->cmd,c->argv,c->argc,&numkeys);
|
int *keys = getKeysFromCommand(c->cmd,c->argv,c->argc,&numkeys);
|
||||||
if (keys == NULL) return;
|
if (keys == NULL) return;
|
||||||
|
Loading…
Reference in New Issue
Block a user