make sort/ro commands validate external keys access patterns (#10106) (#10340)

Currently the sort and sort_ro can access external keys via `GET` and `BY`
in order to make sure the user cannot violate the authorization ACL
rules, the decision is to reject external keys access patterns unless ACL allows
SORT full access to all keys.
I.e. for backwards compatibility, SORT with GET/BY keeps working, but
if ACL has restrictions to certain keys, these features get permission denied.

### Implemented solution
We have discussed several potential solutions and decided to only allow the GET and BY
arguments when the user has all key permissions with the SORT command. The reasons
being that SORT with GET or BY is problematic anyway, for instance it is not supported in
cluster mode since it doesn't declare keys, and we're not sure the combination of that feature
with ACL key restriction is really required.
**HOWEVER** If in the fullness of time we will identify a real need for fine grain access
support for SORT, we would implement the complete solution which is the alternative
described below.

### Alternative (Completion solution):
Check sort ACL rules after executing it and before committing output (either via store or
to COB). it would require making several changes to the sort command itself. and would
potentially cause performance degradation since we will have to collect all the get keys
instead of just applying them to a temp array and then scan the access keys against the
ACL selectors. This solution can include an optimization to avoid the overheads of collecting
the key names, in case the ACL rules grant SORT full key-access, or if the ACL key pattern
literal matches the one used in GET/BY. It would also mean that authorization would be
O(nlogn) since we will have to complete most of the command execution before we can
perform verification

Co-authored-by: Madelyn Olson <madelyneolson@gmail.com>
Co-authored-by: Oran Agra <oran@redislabs.com>
This commit is contained in:
ranshid 2022-03-15 17:14:53 +02:00 committed by GitHub
parent cf6dcb7bf1
commit 1078e30c5f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 113 additions and 3 deletions

View File

@ -1528,6 +1528,37 @@ static int ACLSelectorCheckKey(aclSelector *selector, const char *key, int keyle
return ACL_DENIED_KEY;
}
/* Checks if the provided selector selector has access specified in flags
* to all keys in the keyspace. For example, CMD_KEY_READ access requires either
* '%R~*', '~*', or allkeys to be granted to the selector. Returns 1 if all
* the access flags are satisfied with this selector or 0 otherwise.
*/
static int ACLSelectorHasUnrestrictedKeyAccess(aclSelector *selector, int flags) {
/* The selector can access any key */
if (selector->flags & SELECTOR_FLAG_ALLKEYS) return 1;
listIter li;
listNode *ln;
listRewind(selector->patterns,&li);
int access_flags = 0;
if (flags & CMD_KEY_ACCESS) access_flags |= ACL_READ_PERMISSION;
if (flags & CMD_KEY_INSERT) access_flags |= ACL_WRITE_PERMISSION;
if (flags & CMD_KEY_DELETE) access_flags |= ACL_WRITE_PERMISSION;
if (flags & CMD_KEY_UPDATE) access_flags |= ACL_WRITE_PERMISSION;
/* Test this key against every pattern. */
while((ln = listNext(&li))) {
keyPattern *pattern = listNodeValue(ln);
if ((pattern->flags & access_flags) != access_flags)
continue;
if (!strcmp(pattern->pattern,"*")) {
return 1;
}
}
return 0;
}
/* Checks a channel against a provided list of channels. The is_pattern
* argument should only be used when subscribing (not when publishing)
* and controls whether the input channel is evaluated as a channel pattern
@ -1672,6 +1703,39 @@ int ACLUserCheckKeyPerm(user *u, const char *key, int keylen, int flags) {
return ACL_DENIED_KEY;
}
/* Checks if the user can execute the given command with the added restriction
* it must also have the access specified in flags to any key in the key space.
* For example, CMD_KEY_READ access requires either '%R~*', '~*', or allkeys to be
* granted in addition to the access required by the command. Returns 1
* if the user has access or 0 otherwise.
*/
int ACLUserCheckCmdWithUnrestrictedKeyAccess(user *u, struct redisCommand *cmd, robj **argv, int argc, int flags) {
listIter li;
listNode *ln;
int local_idxptr;
/* If there is no associated user, the connection can run anything. */
if (u == NULL) return 1;
/* For multiple selectors, we cache the key result in between selector
* calls to prevent duplicate lookups. */
aclKeyResultCache cache;
initACLKeyResultCache(&cache);
/* Check each selector sequentially */
listRewind(u->selectors,&li);
while((ln = listNext(&li))) {
aclSelector *s = (aclSelector *) listNodeValue(ln);
int acl_retval = ACLSelectorCheckCmd(s, cmd, argv, argc, &local_idxptr, &cache);
if (acl_retval == ACL_OK && ACLSelectorHasUnrestrictedKeyAccess(s, flags)) {
cleanupACLKeyResultCache(&cache);
return 1;
}
}
cleanupACLKeyResultCache(&cache);
return 0;
}
/* Check if the channel can be accessed by the client according to
* the ACLs associated with the specified user.
*

View File

@ -2758,6 +2758,7 @@ user *ACLGetUserByName(const char *name, size_t namelen);
int ACLUserCheckKeyPerm(user *u, const char *key, int keylen, int flags);
int ACLUserCheckChannelPerm(user *u, sds channel, int literal);
int ACLCheckAllUserCommandPerm(user *u, struct redisCommand *cmd, robj **argv, int argc, int *idxptr);
int ACLUserCheckCmdWithUnrestrictedKeyAccess(user *u, struct redisCommand *cmd, robj **argv, int argc, int flags);
int ACLCheckAllPerm(client *c, int *idxptr);
int ACLSetUser(user *u, const char *op, ssize_t oplen);
uint64_t ACLGetCommandCategoryFlagByName(const char *name);

View File

@ -197,13 +197,15 @@ void sortCommandGeneric(client *c, int readonly) {
int syntax_error = 0;
robj *sortval, *sortby = NULL, *storekey = NULL;
redisSortObject *vector; /* Resulting vector to sort */
int user_has_full_key_access = 0; /* ACL - used in order to verify 'get' and 'by' options can be used */
/* Create a list of operations to perform for every sorted element.
* Operations can be GET */
operations = listCreate();
listSetFreeMethod(operations,zfree);
j = 2; /* options start at argv[2] */
user_has_full_key_access = ACLUserCheckCmdWithUnrestrictedKeyAccess(c->user, c->cmd, c->argv, c->argc, CMD_KEY_ACCESS);
/* The SORT command has an SQL-alike syntax, parse it */
while(j < c->argc) {
int leftargs = c->argc-j-1;
@ -233,13 +235,20 @@ void sortCommandGeneric(client *c, int readonly) {
if (strchr(c->argv[j+1]->ptr,'*') == NULL) {
dontsort = 1;
} else {
/* If BY is specified with a real patter, we can't accept
/* If BY is specified with a real pattern, we can't accept
* it in cluster mode. */
if (server.cluster_enabled) {
addReplyError(c,"BY option of SORT denied in Cluster mode.");
syntax_error++;
break;
}
/* If BY is specified with a real pattern, we can't accept
* it if no full ACL key access is applied for this command. */
if (!user_has_full_key_access) {
addReplyError(c,"BY option of SORT denied due to insufficient ACL permissions.");
syntax_error++;
break;
}
}
j++;
} else if (!strcasecmp(c->argv[j]->ptr,"get") && leftargs >= 1) {
@ -248,6 +257,11 @@ void sortCommandGeneric(client *c, int readonly) {
syntax_error++;
break;
}
if (!user_has_full_key_access) {
addReplyError(c,"GET option of SORT denied due to insufficient ACL permissions.");
syntax_error++;
break;
}
listAddNodeTail(operations,createSortOperation(
SORT_OP_GET,c->argv[j+1]));
getop++;

View File

@ -433,6 +433,38 @@ start_server {tags {"acl external:skip"}} {
assert_equal "This user has no permissions to access the 'otherchannel' channel" [r ACL DRYRUN test-channels spublish otherchannel foo]
assert_equal "This user has no permissions to access the 'otherchannel' channel" [r ACL DRYRUN test-channels ssubscribe otherchannel foo]
}
test {Test sort with ACL permissions} {
r set v1 1
r lpush mylist 1
r ACL setuser test-sort-acl on nopass (+sort ~mylist)
$r2 auth test-sort-acl nopass
catch {$r2 sort mylist by v*} e
assert_equal "ERR BY option of SORT denied due to insufficient ACL permissions." $e
catch {$r2 sort mylist get v*} e
assert_equal "ERR GET option of SORT denied due to insufficient ACL permissions." $e
r ACL setuser test-sort-acl (+sort ~mylist ~v*)
catch {$r2 sort mylist by v*} e
assert_equal "ERR BY option of SORT denied due to insufficient ACL permissions." $e
catch {$r2 sort mylist get v*} e
assert_equal "ERR GET option of SORT denied due to insufficient ACL permissions." $e
r ACL setuser test-sort-acl (+sort ~mylist %W~*)
catch {$r2 sort mylist by v*} e
assert_equal "ERR BY option of SORT denied due to insufficient ACL permissions." $e
catch {$r2 sort mylist get v*} e
assert_equal "ERR GET option of SORT denied due to insufficient ACL permissions." $e
r ACL setuser test-sort-acl (+sort ~mylist %R~*)
assert_equal "1" [$r2 sort mylist by v*]
# cleanup
r ACL deluser test-sort-acl
r del v1 mylist
}
test {Test DRYRUN with wrong number of arguments} {
r ACL setuser test-dry-run +@all ~v*
@ -444,7 +476,6 @@ start_server {tags {"acl external:skip"}} {
catch {r ACL DRYRUN test-dry-run SET} e
assert_equal "ERR wrong number of arguments for 'set' command" $e
}
$r2 close