update RPOPLPUSH to support dual encoding

This commit is contained in:
Pieter Noordhuis 2010-05-31 20:47:39 +02:00
parent be02a7c0d6
commit 0f62e1775d

44
redis.c
View File

@ -4805,8 +4805,9 @@ static robj *lPop(robj *subject, int where) {
} else {
value = createStringObjectFromLongLong(vval);
}
/* We only need to delete an element when it exists */
subject->ptr = ziplistDelete(subject->ptr,&p);
}
subject->ptr = ziplistDelete(subject->ptr,&p);
} else if (subject->encoding == REDIS_ENCODING_LIST) {
list *list = subject->ptr;
listNode *ln;
@ -5246,47 +5247,38 @@ static void lremCommand(redisClient *c) {
* as well. This command was originally proposed by Ezra Zygmuntowicz.
*/
static void rpoplpushcommand(redisClient *c) {
robj *sobj;
list *srclist;
listNode *ln;
robj *sobj, *value;
if ((sobj = lookupKeyWriteOrReply(c,c->argv[1],shared.nullbulk)) == NULL ||
checkType(c,sobj,REDIS_LIST)) return;
srclist = sobj->ptr;
ln = listLast(srclist);
if (ln == NULL) {
if (lLength(sobj) == 0) {
addReply(c,shared.nullbulk);
} else {
robj *dobj = lookupKeyWrite(c->db,c->argv[2]);
robj *ele = listNodeValue(ln);
list *dstlist;
if (dobj && dobj->type != REDIS_LIST) {
addReply(c,shared.wrongtypeerr);
return;
}
if (dobj && checkType(c,dobj,REDIS_LIST)) return;
value = lPop(sobj,REDIS_TAIL);
/* Add the element to the target list (unless it's directly
* passed to some BLPOP-ing client */
if (!handleClientsWaitingListPush(c,c->argv[2],ele)) {
if (dobj == NULL) {
/* Create the list if the key does not exist */
dobj = createListObject();
if (!handleClientsWaitingListPush(c,c->argv[2],value)) {
/* Create the list if the key does not exist */
if (!dobj) {
dobj = createObject(REDIS_LIST,ziplistNew());
dobj->encoding = REDIS_ENCODING_ZIPLIST;
dictAdd(c->db->dict,c->argv[2],dobj);
incrRefCount(c->argv[2]);
}
dstlist = dobj->ptr;
listAddNodeHead(dstlist,ele);
incrRefCount(ele);
lPush(dobj,value,REDIS_HEAD);
}
/* Send the element to the client as reply as well */
addReplyBulk(c,ele);
addReplyBulk(c,value);
/* Finally remove the element from the source list */
listDelNode(srclist,ln);
if (listLength(srclist) == 0) deleteKey(c->db,c->argv[1]);
/* lPop returns an object with its refcount incremented */
decrRefCount(value);
/* Delete the source list when it is empty */
if (lLength(sobj) == 0) deleteKey(c->db,c->argv[1]);
server.dirty++;
}
}