mirror of
https://codeberg.org/redict/redict.git
synced 2025-01-22 16:18:28 -05:00
c1b1e8c329
Fixes #7923. This PR appropriates the special `&` symbol (because `@` and `*` are taken), followed by a literal value or pattern for describing the Pub/Sub patterns that an ACL user can interact with. It is similar to the existing key patterns mechanism in function (additive) and implementation (copy-pasta). It also adds the allchannels and resetchannels ACL keywords, naturally. The default user is given allchannels permissions, whereas new users get whatever is defined by the acl-pubsub-default configuration directive. For backward compatibility in 6.2, the default of this directive is allchannels but this is likely to be changed to resetchannels in the next major version for stronger default security settings. Unless allchannels is set for the user, channel access permissions are checked as follows : * Calls to both PUBLISH and SUBSCRIBE will fail unless a pattern matching the argumentative channel name(s) exists for the user. * Calls to PSUBSCRIBE will fail unless the pattern(s) provided as an argument literally exist(s) in the user's list. Such failures are logged to the ACL log. Runtime changes to channel permissions for a user with existing subscribing clients cause said clients to disconnect unless the new permissions permit the connections to continue. Note, however, that PSUBSCRIBErs' patterns are matched literally, so given the change bar:* -> b*, pattern subscribers to bar:* will be disconnected. Notes/questions: * UNSUBSCRIBE, PUNSUBSCRIBE and PUBSUB remain unprotected due to lack of reasons for touching them.
506 lines
18 KiB
C
506 lines
18 KiB
C
/*
|
|
* Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
|
|
* All rights reserved.
|
|
*
|
|
* Redistribution and use in source and binary forms, with or without
|
|
* modification, are permitted provided that the following conditions are met:
|
|
*
|
|
* * Redistributions of source code must retain the above copyright notice,
|
|
* this list of conditions and the following disclaimer.
|
|
* * Redistributions in binary form must reproduce the above copyright
|
|
* notice, this list of conditions and the following disclaimer in the
|
|
* documentation and/or other materials provided with the distribution.
|
|
* * Neither the name of Redis nor the names of its contributors may be used
|
|
* to endorse or promote products derived from this software without
|
|
* specific prior written permission.
|
|
*
|
|
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
|
|
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
|
|
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
|
|
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
|
|
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
|
|
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
|
|
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
|
|
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
|
|
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
|
|
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
|
|
* POSSIBILITY OF SUCH DAMAGE.
|
|
*/
|
|
|
|
#include "server.h"
|
|
|
|
int clientSubscriptionsCount(client *c);
|
|
|
|
/*-----------------------------------------------------------------------------
|
|
* Pubsub client replies API
|
|
*----------------------------------------------------------------------------*/
|
|
|
|
/* Send a pubsub message of type "message" to the client.
|
|
* Normally 'msg' is a Redis object containing the string to send as
|
|
* message. However if the caller sets 'msg' as NULL, it will be able
|
|
* to send a special message (for instance an Array type) by using the
|
|
* addReply*() API family. */
|
|
void addReplyPubsubMessage(client *c, robj *channel, robj *msg) {
|
|
if (c->resp == 2)
|
|
addReply(c,shared.mbulkhdr[3]);
|
|
else
|
|
addReplyPushLen(c,3);
|
|
addReply(c,shared.messagebulk);
|
|
addReplyBulk(c,channel);
|
|
if (msg) addReplyBulk(c,msg);
|
|
}
|
|
|
|
/* Send a pubsub message of type "pmessage" to the client. The difference
|
|
* with the "message" type delivered by addReplyPubsubMessage() is that
|
|
* this message format also includes the pattern that matched the message. */
|
|
void addReplyPubsubPatMessage(client *c, robj *pat, robj *channel, robj *msg) {
|
|
if (c->resp == 2)
|
|
addReply(c,shared.mbulkhdr[4]);
|
|
else
|
|
addReplyPushLen(c,4);
|
|
addReply(c,shared.pmessagebulk);
|
|
addReplyBulk(c,pat);
|
|
addReplyBulk(c,channel);
|
|
addReplyBulk(c,msg);
|
|
}
|
|
|
|
/* Send the pubsub subscription notification to the client. */
|
|
void addReplyPubsubSubscribed(client *c, robj *channel) {
|
|
if (c->resp == 2)
|
|
addReply(c,shared.mbulkhdr[3]);
|
|
else
|
|
addReplyPushLen(c,3);
|
|
addReply(c,shared.subscribebulk);
|
|
addReplyBulk(c,channel);
|
|
addReplyLongLong(c,clientSubscriptionsCount(c));
|
|
}
|
|
|
|
/* Send the pubsub unsubscription notification to the client.
|
|
* Channel can be NULL: this is useful when the client sends a mass
|
|
* unsubscribe command but there are no channels to unsubscribe from: we
|
|
* still send a notification. */
|
|
void addReplyPubsubUnsubscribed(client *c, robj *channel) {
|
|
if (c->resp == 2)
|
|
addReply(c,shared.mbulkhdr[3]);
|
|
else
|
|
addReplyPushLen(c,3);
|
|
addReply(c,shared.unsubscribebulk);
|
|
if (channel)
|
|
addReplyBulk(c,channel);
|
|
else
|
|
addReplyNull(c);
|
|
addReplyLongLong(c,clientSubscriptionsCount(c));
|
|
}
|
|
|
|
/* Send the pubsub pattern subscription notification to the client. */
|
|
void addReplyPubsubPatSubscribed(client *c, robj *pattern) {
|
|
if (c->resp == 2)
|
|
addReply(c,shared.mbulkhdr[3]);
|
|
else
|
|
addReplyPushLen(c,3);
|
|
addReply(c,shared.psubscribebulk);
|
|
addReplyBulk(c,pattern);
|
|
addReplyLongLong(c,clientSubscriptionsCount(c));
|
|
}
|
|
|
|
/* Send the pubsub pattern unsubscription notification to the client.
|
|
* Pattern can be NULL: this is useful when the client sends a mass
|
|
* punsubscribe command but there are no pattern to unsubscribe from: we
|
|
* still send a notification. */
|
|
void addReplyPubsubPatUnsubscribed(client *c, robj *pattern) {
|
|
if (c->resp == 2)
|
|
addReply(c,shared.mbulkhdr[3]);
|
|
else
|
|
addReplyPushLen(c,3);
|
|
addReply(c,shared.punsubscribebulk);
|
|
if (pattern)
|
|
addReplyBulk(c,pattern);
|
|
else
|
|
addReplyNull(c);
|
|
addReplyLongLong(c,clientSubscriptionsCount(c));
|
|
}
|
|
|
|
/*-----------------------------------------------------------------------------
|
|
* Pubsub low level API
|
|
*----------------------------------------------------------------------------*/
|
|
|
|
void freePubsubPattern(void *p) {
|
|
pubsubPattern *pat = p;
|
|
|
|
decrRefCount(pat->pattern);
|
|
zfree(pat);
|
|
}
|
|
|
|
int listMatchPubsubPattern(void *a, void *b) {
|
|
pubsubPattern *pa = a, *pb = b;
|
|
|
|
return (pa->client == pb->client) &&
|
|
(equalStringObjects(pa->pattern,pb->pattern));
|
|
}
|
|
|
|
/* Return the number of channels + patterns a client is subscribed to. */
|
|
int clientSubscriptionsCount(client *c) {
|
|
return dictSize(c->pubsub_channels)+
|
|
listLength(c->pubsub_patterns);
|
|
}
|
|
|
|
/* Subscribe a client to a channel. Returns 1 if the operation succeeded, or
|
|
* 0 if the client was already subscribed to that channel. */
|
|
int pubsubSubscribeChannel(client *c, robj *channel) {
|
|
dictEntry *de;
|
|
list *clients = NULL;
|
|
int retval = 0;
|
|
|
|
/* Add the channel to the client -> channels hash table */
|
|
if (dictAdd(c->pubsub_channels,channel,NULL) == DICT_OK) {
|
|
retval = 1;
|
|
incrRefCount(channel);
|
|
/* Add the client to the channel -> list of clients hash table */
|
|
de = dictFind(server.pubsub_channels,channel);
|
|
if (de == NULL) {
|
|
clients = listCreate();
|
|
dictAdd(server.pubsub_channels,channel,clients);
|
|
incrRefCount(channel);
|
|
} else {
|
|
clients = dictGetVal(de);
|
|
}
|
|
listAddNodeTail(clients,c);
|
|
}
|
|
/* Notify the client */
|
|
addReplyPubsubSubscribed(c,channel);
|
|
return retval;
|
|
}
|
|
|
|
/* Unsubscribe a client from a channel. Returns 1 if the operation succeeded, or
|
|
* 0 if the client was not subscribed to the specified channel. */
|
|
int pubsubUnsubscribeChannel(client *c, robj *channel, int notify) {
|
|
dictEntry *de;
|
|
list *clients;
|
|
listNode *ln;
|
|
int retval = 0;
|
|
|
|
/* Remove the channel from the client -> channels hash table */
|
|
incrRefCount(channel); /* channel may be just a pointer to the same object
|
|
we have in the hash tables. Protect it... */
|
|
if (dictDelete(c->pubsub_channels,channel) == DICT_OK) {
|
|
retval = 1;
|
|
/* Remove the client from the channel -> clients list hash table */
|
|
de = dictFind(server.pubsub_channels,channel);
|
|
serverAssertWithInfo(c,NULL,de != NULL);
|
|
clients = dictGetVal(de);
|
|
ln = listSearchKey(clients,c);
|
|
serverAssertWithInfo(c,NULL,ln != NULL);
|
|
listDelNode(clients,ln);
|
|
if (listLength(clients) == 0) {
|
|
/* Free the list and associated hash entry at all if this was
|
|
* the latest client, so that it will be possible to abuse
|
|
* Redis PUBSUB creating millions of channels. */
|
|
dictDelete(server.pubsub_channels,channel);
|
|
}
|
|
}
|
|
/* Notify the client */
|
|
if (notify) addReplyPubsubUnsubscribed(c,channel);
|
|
decrRefCount(channel); /* it is finally safe to release it */
|
|
return retval;
|
|
}
|
|
|
|
/* Subscribe a client to a pattern. Returns 1 if the operation succeeded, or 0 if the client was already subscribed to that pattern. */
|
|
int pubsubSubscribePattern(client *c, robj *pattern) {
|
|
dictEntry *de;
|
|
list *clients;
|
|
int retval = 0;
|
|
|
|
if (listSearchKey(c->pubsub_patterns,pattern) == NULL) {
|
|
retval = 1;
|
|
pubsubPattern *pat;
|
|
listAddNodeTail(c->pubsub_patterns,pattern);
|
|
incrRefCount(pattern);
|
|
pat = zmalloc(sizeof(*pat));
|
|
pat->pattern = getDecodedObject(pattern);
|
|
pat->client = c;
|
|
listAddNodeTail(server.pubsub_patterns,pat);
|
|
/* Add the client to the pattern -> list of clients hash table */
|
|
de = dictFind(server.pubsub_patterns_dict,pattern);
|
|
if (de == NULL) {
|
|
clients = listCreate();
|
|
dictAdd(server.pubsub_patterns_dict,pattern,clients);
|
|
incrRefCount(pattern);
|
|
} else {
|
|
clients = dictGetVal(de);
|
|
}
|
|
listAddNodeTail(clients,c);
|
|
}
|
|
/* Notify the client */
|
|
addReplyPubsubPatSubscribed(c,pattern);
|
|
return retval;
|
|
}
|
|
|
|
/* Unsubscribe a client from a channel. Returns 1 if the operation succeeded, or
|
|
* 0 if the client was not subscribed to the specified channel. */
|
|
int pubsubUnsubscribePattern(client *c, robj *pattern, int notify) {
|
|
dictEntry *de;
|
|
list *clients;
|
|
listNode *ln;
|
|
pubsubPattern pat;
|
|
int retval = 0;
|
|
|
|
incrRefCount(pattern); /* Protect the object. May be the same we remove */
|
|
if ((ln = listSearchKey(c->pubsub_patterns,pattern)) != NULL) {
|
|
retval = 1;
|
|
listDelNode(c->pubsub_patterns,ln);
|
|
pat.client = c;
|
|
pat.pattern = pattern;
|
|
ln = listSearchKey(server.pubsub_patterns,&pat);
|
|
listDelNode(server.pubsub_patterns,ln);
|
|
/* Remove the client from the pattern -> clients list hash table */
|
|
de = dictFind(server.pubsub_patterns_dict,pattern);
|
|
serverAssertWithInfo(c,NULL,de != NULL);
|
|
clients = dictGetVal(de);
|
|
ln = listSearchKey(clients,c);
|
|
serverAssertWithInfo(c,NULL,ln != NULL);
|
|
listDelNode(clients,ln);
|
|
if (listLength(clients) == 0) {
|
|
/* Free the list and associated hash entry at all if this was
|
|
* the latest client. */
|
|
dictDelete(server.pubsub_patterns_dict,pattern);
|
|
}
|
|
}
|
|
/* Notify the client */
|
|
if (notify) addReplyPubsubPatUnsubscribed(c,pattern);
|
|
decrRefCount(pattern);
|
|
return retval;
|
|
}
|
|
|
|
/* Unsubscribe from all the channels. Return the number of channels the
|
|
* client was subscribed to. */
|
|
int pubsubUnsubscribeAllChannels(client *c, int notify) {
|
|
dictIterator *di = dictGetSafeIterator(c->pubsub_channels);
|
|
dictEntry *de;
|
|
int count = 0;
|
|
|
|
while((de = dictNext(di)) != NULL) {
|
|
robj *channel = dictGetKey(de);
|
|
|
|
count += pubsubUnsubscribeChannel(c,channel,notify);
|
|
}
|
|
/* We were subscribed to nothing? Still reply to the client. */
|
|
if (notify && count == 0) addReplyPubsubUnsubscribed(c,NULL);
|
|
dictReleaseIterator(di);
|
|
return count;
|
|
}
|
|
|
|
/* Unsubscribe from all the patterns. Return the number of patterns the
|
|
* client was subscribed from. */
|
|
int pubsubUnsubscribeAllPatterns(client *c, int notify) {
|
|
listNode *ln;
|
|
listIter li;
|
|
int count = 0;
|
|
|
|
listRewind(c->pubsub_patterns,&li);
|
|
while ((ln = listNext(&li)) != NULL) {
|
|
robj *pattern = ln->value;
|
|
|
|
count += pubsubUnsubscribePattern(c,pattern,notify);
|
|
}
|
|
if (notify && count == 0) addReplyPubsubPatUnsubscribed(c,NULL);
|
|
return count;
|
|
}
|
|
|
|
/* Publish a message */
|
|
int pubsubPublishMessage(robj *channel, robj *message) {
|
|
int receivers = 0;
|
|
dictEntry *de;
|
|
dictIterator *di;
|
|
listNode *ln;
|
|
listIter li;
|
|
|
|
/* Send to clients listening for that channel */
|
|
de = dictFind(server.pubsub_channels,channel);
|
|
if (de) {
|
|
list *list = dictGetVal(de);
|
|
listNode *ln;
|
|
listIter li;
|
|
|
|
listRewind(list,&li);
|
|
while ((ln = listNext(&li)) != NULL) {
|
|
client *c = ln->value;
|
|
addReplyPubsubMessage(c,channel,message);
|
|
receivers++;
|
|
}
|
|
}
|
|
/* Send to clients listening to matching channels */
|
|
di = dictGetIterator(server.pubsub_patterns_dict);
|
|
if (di) {
|
|
channel = getDecodedObject(channel);
|
|
while((de = dictNext(di)) != NULL) {
|
|
robj *pattern = dictGetKey(de);
|
|
list *clients = dictGetVal(de);
|
|
if (!stringmatchlen((char*)pattern->ptr,
|
|
sdslen(pattern->ptr),
|
|
(char*)channel->ptr,
|
|
sdslen(channel->ptr),0)) continue;
|
|
|
|
listRewind(clients,&li);
|
|
while ((ln = listNext(&li)) != NULL) {
|
|
client *c = listNodeValue(ln);
|
|
addReplyPubsubPatMessage(c,pattern,channel,message);
|
|
receivers++;
|
|
}
|
|
}
|
|
decrRefCount(channel);
|
|
dictReleaseIterator(di);
|
|
}
|
|
return receivers;
|
|
}
|
|
|
|
/* This wraps handling ACL channel permissions for the given client. */
|
|
int pubsubCheckACLPermissionsOrReply(client *c, int idx, int count, int literal) {
|
|
/* Check if the user can run the command according to the current
|
|
* ACLs. */
|
|
int acl_chanpos;
|
|
int acl_retval = ACLCheckPubsubPerm(c,idx,count,literal,&acl_chanpos);
|
|
if (acl_retval == ACL_DENIED_CHANNEL) {
|
|
addACLLogEntry(c,acl_retval,acl_chanpos,NULL);
|
|
addReplyError(c,
|
|
"-NOPERM this user has no permissions to access "
|
|
"one of the channels used as arguments");
|
|
}
|
|
return acl_retval;
|
|
}
|
|
|
|
/*-----------------------------------------------------------------------------
|
|
* Pubsub commands implementation
|
|
*----------------------------------------------------------------------------*/
|
|
|
|
/* SUBSCRIBE channel [channel ...] */
|
|
void subscribeCommand(client *c) {
|
|
int j;
|
|
if (pubsubCheckACLPermissionsOrReply(c,1,c->argc-1,0) != ACL_OK) return;
|
|
if ((c->flags & CLIENT_DENY_BLOCKING) && !(c->flags & CLIENT_MULTI)) {
|
|
/**
|
|
* A client that has CLIENT_DENY_BLOCKING flag on
|
|
* expect a reply per command and so can not execute subscribe.
|
|
*
|
|
* Notice that we have a special treatment for multi because of
|
|
* backword compatibility
|
|
*/
|
|
addReplyError(c, "SUBSCRIBE isn't allowed for a DENY BLOCKING client");
|
|
return;
|
|
}
|
|
|
|
for (j = 1; j < c->argc; j++)
|
|
pubsubSubscribeChannel(c,c->argv[j]);
|
|
c->flags |= CLIENT_PUBSUB;
|
|
}
|
|
|
|
/* UNSUBSCRIBE [channel [channel ...]] */
|
|
void unsubscribeCommand(client *c) {
|
|
if (c->argc == 1) {
|
|
pubsubUnsubscribeAllChannels(c,1);
|
|
} else {
|
|
int j;
|
|
|
|
for (j = 1; j < c->argc; j++)
|
|
pubsubUnsubscribeChannel(c,c->argv[j],1);
|
|
}
|
|
if (clientSubscriptionsCount(c) == 0) c->flags &= ~CLIENT_PUBSUB;
|
|
}
|
|
|
|
/* PSUBSCRIBE pattern [pattern ...] */
|
|
void psubscribeCommand(client *c) {
|
|
int j;
|
|
if (pubsubCheckACLPermissionsOrReply(c,1,c->argc-1,1) != ACL_OK) return;
|
|
if ((c->flags & CLIENT_DENY_BLOCKING) && !(c->flags & CLIENT_MULTI)) {
|
|
/**
|
|
* A client that has CLIENT_DENY_BLOCKING flag on
|
|
* expect a reply per command and so can not execute subscribe.
|
|
*
|
|
* Notice that we have a special treatment for multi because of
|
|
* backword compatibility
|
|
*/
|
|
addReplyError(c, "PSUBSCRIBE isn't allowed for a DENY BLOCKING client");
|
|
return;
|
|
}
|
|
|
|
for (j = 1; j < c->argc; j++)
|
|
pubsubSubscribePattern(c,c->argv[j]);
|
|
c->flags |= CLIENT_PUBSUB;
|
|
}
|
|
|
|
/* PUNSUBSCRIBE [pattern [pattern ...]] */
|
|
void punsubscribeCommand(client *c) {
|
|
if (c->argc == 1) {
|
|
pubsubUnsubscribeAllPatterns(c,1);
|
|
} else {
|
|
int j;
|
|
|
|
for (j = 1; j < c->argc; j++)
|
|
pubsubUnsubscribePattern(c,c->argv[j],1);
|
|
}
|
|
if (clientSubscriptionsCount(c) == 0) c->flags &= ~CLIENT_PUBSUB;
|
|
}
|
|
|
|
/* PUBLISH <channel> <message> */
|
|
void publishCommand(client *c) {
|
|
if (pubsubCheckACLPermissionsOrReply(c,1,1,0) != ACL_OK) return;
|
|
int receivers = pubsubPublishMessage(c->argv[1],c->argv[2]);
|
|
if (server.cluster_enabled)
|
|
clusterPropagatePublish(c->argv[1],c->argv[2]);
|
|
else
|
|
forceCommandPropagation(c,PROPAGATE_REPL);
|
|
addReplyLongLong(c,receivers);
|
|
}
|
|
|
|
/* PUBSUB command for Pub/Sub introspection. */
|
|
void pubsubCommand(client *c) {
|
|
if (c->argc == 2 && !strcasecmp(c->argv[1]->ptr,"help")) {
|
|
const char *help[] = {
|
|
"CHANNELS [<pattern>] -- Return the currently active channels matching a pattern (default: all).",
|
|
"NUMPAT -- Return number of subscriptions to patterns.",
|
|
"NUMSUB [channel-1 .. channel-N] -- Returns the number of subscribers for the specified channels (excluding patterns, default: none).",
|
|
NULL
|
|
};
|
|
addReplyHelp(c, help);
|
|
} else if (!strcasecmp(c->argv[1]->ptr,"channels") &&
|
|
(c->argc == 2 || c->argc == 3))
|
|
{
|
|
/* PUBSUB CHANNELS [<pattern>] */
|
|
sds pat = (c->argc == 2) ? NULL : c->argv[2]->ptr;
|
|
dictIterator *di = dictGetIterator(server.pubsub_channels);
|
|
dictEntry *de;
|
|
long mblen = 0;
|
|
void *replylen;
|
|
|
|
replylen = addReplyDeferredLen(c);
|
|
while((de = dictNext(di)) != NULL) {
|
|
robj *cobj = dictGetKey(de);
|
|
sds channel = cobj->ptr;
|
|
|
|
if (!pat || stringmatchlen(pat, sdslen(pat),
|
|
channel, sdslen(channel),0))
|
|
{
|
|
addReplyBulk(c,cobj);
|
|
mblen++;
|
|
}
|
|
}
|
|
dictReleaseIterator(di);
|
|
setDeferredArrayLen(c,replylen,mblen);
|
|
} else if (!strcasecmp(c->argv[1]->ptr,"numsub") && c->argc >= 2) {
|
|
/* PUBSUB NUMSUB [Channel_1 ... Channel_N] */
|
|
int j;
|
|
|
|
addReplyArrayLen(c,(c->argc-2)*2);
|
|
for (j = 2; j < c->argc; j++) {
|
|
list *l = dictFetchValue(server.pubsub_channels,c->argv[j]);
|
|
|
|
addReplyBulk(c,c->argv[j]);
|
|
addReplyLongLong(c,l ? listLength(l) : 0);
|
|
}
|
|
} else if (!strcasecmp(c->argv[1]->ptr,"numpat") && c->argc == 2) {
|
|
/* PUBSUB NUMPAT */
|
|
addReplyLongLong(c,listLength(server.pubsub_patterns));
|
|
} else {
|
|
addReplySubcommandSyntaxError(c);
|
|
}
|
|
}
|