BRPOPLPUSH.

This commit is contained in:
Damian Janowski & Michel Martens 2010-11-08 15:25:59 -03:00 committed by Michel Martens
parent 8a979f0390
commit b2a7fd0cf7
4 changed files with 110 additions and 21 deletions

View File

@ -89,6 +89,7 @@ struct redisCommand readonlyCommandTable[] = {
{"rpop",rpopCommand,2,0,NULL,1,1,1}, {"rpop",rpopCommand,2,0,NULL,1,1,1},
{"lpop",lpopCommand,2,0,NULL,1,1,1}, {"lpop",lpopCommand,2,0,NULL,1,1,1},
{"brpop",brpopCommand,-3,0,NULL,1,1,1}, {"brpop",brpopCommand,-3,0,NULL,1,1,1},
{"brpoplpush",brpoplpushCommand,4,REDIS_CMD_DENYOOM,NULL,1,2,1},
{"blpop",blpopCommand,-3,0,NULL,1,1,1}, {"blpop",blpopCommand,-3,0,NULL,1,1,1},
{"llen",llenCommand,2,0,NULL,1,1,1}, {"llen",llenCommand,2,0,NULL,1,1,1},
{"lindex",lindexCommand,3,0,NULL,1,1,1}, {"lindex",lindexCommand,3,0,NULL,1,1,1},

View File

@ -321,6 +321,7 @@ typedef struct redisClient {
int blocking_keys_num; /* Number of blocking keys */ int blocking_keys_num; /* Number of blocking keys */
time_t blockingto; /* Blocking operation timeout. If UNIX current time time_t blockingto; /* Blocking operation timeout. If UNIX current time
* is >= blockingto then the operation timed out. */ * is >= blockingto then the operation timed out. */
robj *blocking_target;
list *io_keys; /* Keys this client is waiting to be loaded from the list *io_keys; /* Keys this client is waiting to be loaded from the
* swap file in order to continue. */ * swap file in order to continue. */
list *watched_keys; /* Keys WATCHED for MULTI/EXEC CAS */ list *watched_keys; /* Keys WATCHED for MULTI/EXEC CAS */
@ -961,6 +962,7 @@ void execCommand(redisClient *c);
void discardCommand(redisClient *c); void discardCommand(redisClient *c);
void blpopCommand(redisClient *c); void blpopCommand(redisClient *c);
void brpopCommand(redisClient *c); void brpopCommand(redisClient *c);
void brpoplpushCommand(redisClient *c);
void appendCommand(redisClient *c); void appendCommand(redisClient *c);
void substrCommand(redisClient *c); void substrCommand(redisClient *c);
void strlenCommand(redisClient *c); void strlenCommand(redisClient *c);

View File

@ -777,9 +777,28 @@ int handleClientsWaitingListPush(redisClient *c, robj *key, robj *ele) {
redisAssert(ln != NULL); redisAssert(ln != NULL);
receiver = ln->value; receiver = ln->value;
addReplyMultiBulkLen(receiver,2); if (receiver->blocking_target == NULL) {
addReplyBulk(receiver,key); addReplyMultiBulkLen(receiver,2);
addReplyBulk(receiver,ele); addReplyBulk(receiver,key);
addReplyBulk(receiver,ele);
}
else {
receiver->argc++;
robj *dobj = lookupKeyWrite(receiver->db,receiver->blocking_target);
if (dobj && checkType(receiver,dobj,REDIS_LIST)) return 0;
addReplyBulk(receiver,ele);
/* Create the list if the key does not exist */
if (!dobj) {
dobj = createZiplistObject();
dbAdd(receiver->db,receiver->blocking_target,dobj);
}
listTypePush(dobj,ele,REDIS_HEAD);
}
unblockClientWaitingData(receiver); unblockClientWaitingData(receiver);
return 1; return 1;
} }
@ -814,26 +833,36 @@ void blockingPopGenericCommand(redisClient *c, int where) {
robj *argv[2], **orig_argv; robj *argv[2], **orig_argv;
int orig_argc; int orig_argc;
/* We need to alter the command arguments before to call if (c->blocking_target == NULL) {
* popGenericCommand() as the command takes a single key. */ /* We need to alter the command arguments before to call
orig_argv = c->argv; * popGenericCommand() as the command takes a single key. */
orig_argc = c->argc; orig_argv = c->argv;
argv[1] = c->argv[j]; orig_argc = c->argc;
c->argv = argv; argv[1] = c->argv[j];
c->argc = 2; c->argv = argv;
c->argc = 2;
/* Also the return value is different, we need to output /* Also the return value is different, we need to output
* the multi bulk reply header and the key name. The * the multi bulk reply header and the key name. The
* "real" command will add the last element (the value) * "real" command will add the last element (the value)
* for us. If this souds like an hack to you it's just * for us. If this souds like an hack to you it's just
* because it is... */ * because it is... */
addReplyMultiBulkLen(c,2); addReplyMultiBulkLen(c,2);
addReplyBulk(c,argv[1]); addReplyBulk(c,argv[1]);
popGenericCommand(c,where);
popGenericCommand(c,where);
/* Fix the client structure with the original stuff */
c->argv = orig_argv;
c->argc = orig_argc;
}
else {
c->argv[2] = c->blocking_target;
c->blocking_target = NULL;
rpoplpushCommand(c);
}
/* Fix the client structure with the original stuff */
c->argv = orig_argv;
c->argc = orig_argc;
return; return;
} }
} }
@ -860,3 +889,11 @@ void blpopCommand(redisClient *c) {
void brpopCommand(redisClient *c) { void brpopCommand(redisClient *c) {
blockingPopGenericCommand(c,REDIS_TAIL); blockingPopGenericCommand(c,REDIS_TAIL);
} }
void brpoplpushCommand(redisClient *c) {
c->blocking_target = c->argv[2];
c->argv[2] = c->argv[3];
c->argc--;
blockingPopGenericCommand(c,REDIS_TAIL);
}

View File

@ -127,6 +127,55 @@ start_server {
assert_equal 0 [r llen blist1] assert_equal 0 [r llen blist1]
assert_equal 1 [r llen blist2] assert_equal 1 [r llen blist2]
} }
test "BRPOPLPUSH - $type" {
r del target
set rd [redis_deferring_client]
create_$type blist "a b $large c d"
$rd brpoplpush blist target 1
assert_equal d [$rd read]
assert_equal d [r rpop target]
assert_equal "a b $large c" [r lrange blist 0 -1]
}
}
test "BRPOPLPUSH with zero timeout should block indefinitely" {
set rd [redis_deferring_client]
r del blist target
$rd brpoplpush blist target 0
after 1000
r rpush blist foo
assert_equal foo [$rd read]
assert_equal {foo} [r lrange target 0 -1]
}
test "BRPOPLPUSH with wrong source type" {
set rd [redis_deferring_client]
r del blist target
r set blist nolist
$rd brpoplpush blist target 1
assert_error "ERR*wrong kind*" {$rd read}
}
test "BRPOPLPUSH with wrong destination type" {
set rd [redis_deferring_client]
r del blist target
r set target nolist
r lpush blist foo
$rd brpoplpush blist target 1
assert_error "ERR*wrong kind*" {$rd read}
set rd [redis_deferring_client]
r del blist target
r set target nolist
$rd brpoplpush blist target 0
after 1000
r rpush blist foo
assert_error "ERR*wrong kind*" {$rd read}
assert_equal {foo} [r lrange blist 0 -1]
} }
foreach {pop} {BLPOP BRPOP} { foreach {pop} {BLPOP BRPOP} {