LPOS: implement the final design.

This commit is contained in:
antirez 2020-06-10 12:40:24 +02:00
parent a7936ef96d
commit e63a5ba122
3 changed files with 87 additions and 24 deletions

View File

@ -326,8 +326,8 @@ struct redisCommand redisCommandTable[] = {
"write @list", "write @list",
0,NULL,1,1,1,0,0,0}, 0,NULL,1,1,1,0,0,0},
{"lrank",lrankCommand,4, {"lpos",lposCommand,-3,
"read-only fast @list", "read-only @list",
0,NULL,1,1,1,0,0,0}, 0,NULL,1,1,1,0,0,0},
{"lrem",lremCommand,4, {"lrem",lremCommand,4,

View File

@ -2269,7 +2269,7 @@ void flushdbCommand(client *c);
void flushallCommand(client *c); void flushallCommand(client *c);
void sortCommand(client *c); void sortCommand(client *c);
void lremCommand(client *c); void lremCommand(client *c);
void lrankCommand(client *c); void lposCommand(client *c);
void rpoplpushCommand(client *c); void rpoplpushCommand(client *c);
void infoCommand(client *c); void infoCommand(client *c);
void mgetCommand(client *c); void mgetCommand(client *c);

View File

@ -487,38 +487,101 @@ void ltrimCommand(client *c) {
addReply(c,shared.ok); addReply(c,shared.ok);
} }
void lrankCommand(client *c) { /* LPOS key element [matchpos] [ALL] [RELATIVE]
robj *subject, *obj; *
obj = c->argv[3]; * matchnum is the position of the match, so if it is 1, the first match
long direction = 0; * is returned, if it is 2 the second match is returned and so forth.
long index = 0; * It is 1 by default. If negative has the same meaning but the search is
* performed starting from the end of the list.
*
* A matchnum of 0 is accepted only if ALL is given, and means to return
* all the elements.
*
* If ALL is given, instead of returning the single elmenet, a list of
* all the matching elements up to "matchnum" are returned.
*
* The returned elements indexes are always referring to what LINDEX
* would return. So first element from head is 0, and so forth.
* However if RELATIVE is given and a negative matchpos is given, the
* indexes are returned as if the last element of the list is the element 0,
* the penultimante is 1, and so forth. */
void lposCommand(client *c) {
robj *o, *ele;
ele = c->argv[2];
int all = 0, direction = LIST_TAIL;
long matchpos = 1;
if ((getLongFromObjectOrReply(c, c->argv[2], &direction, NULL) != C_OK)) /* Parse the optional "matchpos" argument, and the ALL option. */
if (c->argc >= 4 &&
getLongFromObjectOrReply(c, c->argv[3], &matchpos, NULL) != C_OK)
{
return; return;
subject = lookupKeyWriteOrReply(c,c->argv[1],shared.czero);
if (subject == NULL || checkType(c,subject,OBJ_LIST)) return;
listTypeIterator *li;
if (direction < 0) {
direction = -1;
li = listTypeInitIterator(subject,-1,LIST_HEAD);
} else {
direction = 1;
li = listTypeInitIterator(subject,0,LIST_TAIL);
} }
if (c->argc == 5) {
if (!strcasecmp(c->argv[4]->ptr,"all")) {
all = 1;
} else {
addReply(c,shared.syntaxerr);
return;
}
}
/* Raise an error on incompatible options. */
if (!all && matchpos == 0) {
addReplyError(c,"A match position of zero is valid only "
"when using the ALL option");
return;
}
/* A negative matchpos means start from the tail. */
if (matchpos < 0) {
matchpos = -matchpos;
direction = LIST_HEAD;
}
/* We return NULL or an empty array if there is no such key (or
* if we find no matches, depending on the presence of the ALL option. */
if ((o = lookupKeyWriteOrReply(c,c->argv[1],NULL)) == NULL) {
if (all)
addReply(c,shared.emptyarray);
else
addReply(c,shared.null[c->resp]);
return;
}
if (checkType(c,o,OBJ_LIST)) return;
/* If we got the ALL option, prepare to emit an array. */
void *arraylenptr = NULL;
if (all) arraylenptr = addReplyDeferredLen(c);
/* Seek the element. */
listTypeIterator *li;
li = listTypeInitIterator(o,direction == LIST_HEAD ? -1 : 0,direction);
listTypeEntry entry; listTypeEntry entry;
long llen = listTypeLength(o);
long index = 0, matches = 0, matchindex = -1;
while (listTypeNext(li,&entry)) { while (listTypeNext(li,&entry)) {
if (listTypeEqual(&entry,obj)) { if (listTypeEqual(&entry,ele)) {
break; matches++;
matchindex = (direction == LIST_TAIL) ? index : llen - index - 1;
if (all) addReplyLongLong(c,matchindex);
if (matches == matchpos) break;
} }
index++; index++;
} }
listTypeReleaseIterator(li); listTypeReleaseIterator(li);
addReplyLongLong(c,index * direction); /* Reply to the client. Note that arraylenptr is not NULL only if
* the ALL option was selected. */
if (arraylenptr != NULL) {
setDeferredArrayLen(c,arraylenptr,matches);
} else {
if (matchindex != -1)
addReplyLongLong(c,matchindex);
else
addReply(c,shared.null[c->resp]);
}
} }
void lremCommand(client *c) { void lremCommand(client *c) {