change SORT and SPOP to use lookupKeyWrite rather than lookupKeyRead

like in SUNIONSTORE etc, commands that perform writes are expected to open
all keys, even input keys, with lookupKeyWrite
This commit is contained in:
Oran Agra 2018-09-27 18:03:47 +03:00
parent 3eaa2cdc44
commit 747174388f
2 changed files with 32 additions and 25 deletions

View File

@ -58,7 +58,7 @@ redisSortOperation *createSortOperation(int type, robj *pattern) {
*
* The returned object will always have its refcount increased by 1
* when it is non-NULL. */
robj *lookupKeyByPattern(redisDb *db, robj *pattern, robj *subst) {
robj *lookupKeyByPattern(redisDb *db, robj *pattern, robj *subst, int writeflag) {
char *p, *f, *k;
sds spat, ssub;
robj *keyobj, *fieldobj = NULL, *o;
@ -106,7 +106,10 @@ robj *lookupKeyByPattern(redisDb *db, robj *pattern, robj *subst) {
decrRefCount(subst); /* Incremented by decodeObject() */
/* Lookup substituted key */
o = lookupKeyRead(db,keyobj);
if (!writeflag)
o = lookupKeyRead(db,keyobj);
else
o = lookupKeyWrite(db,keyobj);
if (o == NULL) goto noobj;
if (fieldobj) {
@ -198,30 +201,12 @@ void sortCommand(client *c) {
robj *sortval, *sortby = NULL, *storekey = NULL;
redisSortObject *vector; /* Resulting vector to sort */
/* Lookup the key to sort. It must be of the right types */
sortval = lookupKeyRead(c->db,c->argv[1]);
if (sortval && sortval->type != OBJ_SET &&
sortval->type != OBJ_LIST &&
sortval->type != OBJ_ZSET)
{
addReply(c,shared.wrongtypeerr);
return;
}
/* Create a list of operations to perform for every sorted element.
* Operations can be GET */
operations = listCreate();
listSetFreeMethod(operations,zfree);
j = 2; /* options start at argv[2] */
/* Now we need to protect sortval incrementing its count, in the future
* SORT may have options able to overwrite/delete keys during the sorting
* and the sorted key itself may get destroyed */
if (sortval)
incrRefCount(sortval);
else
sortval = createQuicklistObject();
/* The SORT command has an SQL-alike syntax, parse it */
while(j < c->argc) {
int leftargs = c->argc-j-1;
@ -280,11 +265,33 @@ void sortCommand(client *c) {
/* Handle syntax errors set during options parsing. */
if (syntax_error) {
decrRefCount(sortval);
listRelease(operations);
return;
}
/* Lookup the key to sort. It must be of the right types */
if (storekey)
sortval = lookupKeyRead(c->db,c->argv[1]);
else
sortval = lookupKeyWrite(c->db,c->argv[1]);
if (sortval && sortval->type != OBJ_SET &&
sortval->type != OBJ_LIST &&
sortval->type != OBJ_ZSET)
{
listRelease(operations);
addReply(c,shared.wrongtypeerr);
return;
}
/* Now we need to protect sortval incrementing its count, in the future
* SORT may have options able to overwrite/delete keys during the sorting
* and the sorted key itself may get destroyed */
if (sortval)
incrRefCount(sortval);
else
sortval = createQuicklistObject();
/* When sorting a set with no sort specified, we must sort the output
* so the result is consistent across scripting and replication.
*
@ -452,7 +459,7 @@ void sortCommand(client *c) {
robj *byval;
if (sortby) {
/* lookup value to sort by */
byval = lookupKeyByPattern(c->db,sortby,vector[j].obj);
byval = lookupKeyByPattern(c->db,sortby,vector[j].obj,storekey!=NULL);
if (!byval) continue;
} else {
/* use object itself to sort by */
@ -515,7 +522,7 @@ void sortCommand(client *c) {
while((ln = listNext(&li))) {
redisSortOperation *sop = ln->value;
robj *val = lookupKeyByPattern(c->db,sop->pattern,
vector[j].obj);
vector[j].obj,storekey!=NULL);
if (sop->type == SORT_OP_GET) {
if (!val) {
@ -545,7 +552,7 @@ void sortCommand(client *c) {
while((ln = listNext(&li))) {
redisSortOperation *sop = ln->value;
robj *val = lookupKeyByPattern(c->db,sop->pattern,
vector[j].obj);
vector[j].obj,storekey!=NULL);
if (sop->type == SORT_OP_GET) {
if (!val) val = createStringObject("",0);

View File

@ -415,7 +415,7 @@ void spopWithCountCommand(client *c) {
/* Make sure a key with the name inputted exists, and that it's type is
* indeed a set. Otherwise, return nil */
if ((set = lookupKeyReadOrReply(c,c->argv[1],shared.null[c->resp]))
if ((set = lookupKeyWriteOrReply(c,c->argv[1],shared.null[c->resp]))
== NULL || checkType(c,set,OBJ_SET)) return;
/* If count is zero, serve an empty multibulk ASAP to avoid special