Move code for pushing on a (blocking) RPOPLPUSH

This commit is contained in:
Pieter Noordhuis 2010-12-06 14:48:58 +01:00
parent 5fa95ad763
commit ac06fc011d

View File

@ -605,19 +605,37 @@ void lremCommand(redisClient *c) {
/* This is the semantic of this command: /* This is the semantic of this command:
* RPOPLPUSH srclist dstlist: * RPOPLPUSH srclist dstlist:
* IF LLEN(srclist) > 0 * IF LLEN(srclist) > 0
* element = RPOP srclist * element = RPOP srclist
* LPUSH dstlist element * LPUSH dstlist element
* RETURN element * RETURN element
* ELSE * ELSE
* RETURN nil * RETURN nil
* END * END
* END * END
* *
* The idea is to be able to get an element from a list in a reliable way * The idea is to be able to get an element from a list in a reliable way
* since the element is not just returned but pushed against another list * since the element is not just returned but pushed against another list
* as well. This command was originally proposed by Ezra Zygmuntowicz. * as well. This command was originally proposed by Ezra Zygmuntowicz.
*/ */
void rpoplpushHandlePush(redisClient *c, robj *dstkey, robj *dstobj, robj *value) {
if (!handleClientsWaitingListPush(c,dstkey,value)) {
/* Create the list if the key does not exist */
if (!dstobj) {
dstobj = createZiplistObject();
dbAdd(c->db,dstkey,dstobj);
} else {
touchWatchedKey(c->db,dstkey);
server.dirty++;
}
listTypePush(dstobj,value,REDIS_HEAD);
}
/* Always send the pushed value to the client. */
addReplyBulk(c,value);
}
void rpoplpushCommand(redisClient *c) { void rpoplpushCommand(redisClient *c) {
robj *sobj, *value; robj *sobj, *value;
if ((sobj = lookupKeyWriteOrReply(c,c->argv[1],shared.nullbulk)) == NULL || if ((sobj = lookupKeyWriteOrReply(c,c->argv[1],shared.nullbulk)) == NULL ||
@ -629,20 +647,7 @@ void rpoplpushCommand(redisClient *c) {
robj *dobj = lookupKeyWrite(c->db,c->argv[2]); robj *dobj = lookupKeyWrite(c->db,c->argv[2]);
if (dobj && checkType(c,dobj,REDIS_LIST)) return; if (dobj && checkType(c,dobj,REDIS_LIST)) return;
value = listTypePop(sobj,REDIS_TAIL); value = listTypePop(sobj,REDIS_TAIL);
rpoplpushHandlePush(c,c->argv[2],dobj,value);
/* Add the element to the target list (unless it's directly
* passed to some BLPOP-ing client */
if (!handleClientsWaitingListPush(c,c->argv[2],value)) {
/* Create the list if the key does not exist */
if (!dobj) {
dobj = createZiplistObject();
dbAdd(c->db,c->argv[2],dobj);
}
listTypePush(dobj,value,REDIS_HEAD);
}
/* Send the element to the client as reply as well */
addReplyBulk(c,value);
/* listTypePop returns an object with its refcount incremented */ /* listTypePop returns an object with its refcount incremented */
decrRefCount(value); decrRefCount(value);
@ -799,18 +804,7 @@ int handleClientsWaitingListPush(redisClient *c, robj *key, robj *ele) {
/* BRPOPLPUSH */ /* BRPOPLPUSH */
robj *dobj = lookupKeyWrite(receiver->db,target); robj *dobj = lookupKeyWrite(receiver->db,target);
if (dobj && checkType(receiver,dobj,REDIS_LIST)) return 0; if (dobj && checkType(receiver,dobj,REDIS_LIST)) return 0;
rpoplpushHandlePush(receiver,target,dobj,ele);
addReplyBulk(receiver,ele);
if (!handleClientsWaitingListPush(receiver, target, ele)) {
/* Create the list if the key does not exist */
if (!dobj) {
dobj = createZiplistObject();
dbAdd(receiver->db, target, dobj);
}
listTypePush(dobj, ele, REDIS_HEAD);
}
decrRefCount(target); decrRefCount(target);
} }