first working implementation of BLPOP and BRPOP, still everything is to test well

This commit is contained in:
antirez 2009-12-29 16:05:56 -05:00
parent 9fe33a0e12
commit 95242ab507

17
redis.c
View File

@ -3524,7 +3524,10 @@ static void pushGenericCommand(redisClient *c, int where) {
lobj = lookupKeyWrite(c->db,c->argv[1]); lobj = lookupKeyWrite(c->db,c->argv[1]);
if (lobj == NULL) { if (lobj == NULL) {
if (handleClientsWaitingListPush(c,c->argv[1],c->argv[2])) return; if (handleClientsWaitingListPush(c,c->argv[1],c->argv[2])) {
addReply(c,shared.ok);
return;
}
lobj = createListObject(); lobj = createListObject();
list = lobj->ptr; list = lobj->ptr;
if (where == REDIS_HEAD) { if (where == REDIS_HEAD) {
@ -3540,7 +3543,10 @@ static void pushGenericCommand(redisClient *c, int where) {
addReply(c,shared.wrongtypeerr); addReply(c,shared.wrongtypeerr);
return; return;
} }
if (handleClientsWaitingListPush(c,c->argv[1],c->argv[2])) return; if (handleClientsWaitingListPush(c,c->argv[1],c->argv[2])) {
addReply(c,shared.ok);
return;
}
list = lobj->ptr; list = lobj->ptr;
if (where == REDIS_HEAD) { if (where == REDIS_HEAD) {
listAddNodeHead(list,c->argv[2]); listAddNodeHead(list,c->argv[2]);
@ -5462,7 +5468,7 @@ static void execCommand(redisClient *c) {
* empty we need to block. In order to do so we remove the notification for * empty we need to block. In order to do so we remove the notification for
* new data to read in the client socket (so that we'll not serve new * new data to read in the client socket (so that we'll not serve new
* requests if the blocking request is not served). Also we put the client * requests if the blocking request is not served). Also we put the client
* in a dictionary (server.blockingkeys) mapping keys to a list of clients * in a dictionary (db->blockingkeys) mapping keys to a list of clients
* blocking for this keys. * blocking for this keys.
* - If a PUSH operation against a key with blocked clients waiting is * - If a PUSH operation against a key with blocked clients waiting is
* performed, we serve the first in the list: basically instead to push * performed, we serve the first in the list: basically instead to push
@ -5479,7 +5485,6 @@ static void blockForKey(redisClient *c, robj *key, time_t timeout) {
dictEntry *de; dictEntry *de;
list *l; list *l;
printf("blockForKey %p %s\n", c, key->ptr);
c->blockingkey = key; c->blockingkey = key;
incrRefCount(key); incrRefCount(key);
c->blockingto = timeout; c->blockingto = timeout;
@ -5504,7 +5509,6 @@ static void unblockClient(redisClient *c) {
dictEntry *de; dictEntry *de;
list *l; list *l;
printf("unblockClient %p %s\n", c, c->blockingkey->ptr);
/* Remove this client from the list of clients waiting for this key. */ /* Remove this client from the list of clients waiting for this key. */
assert(c->blockingkey != NULL); assert(c->blockingkey != NULL);
de = dictFind(c->db->blockingkeys,c->blockingkey); de = dictFind(c->db->blockingkeys,c->blockingkey);
@ -5554,9 +5558,6 @@ static int handleClientsWaitingListPush(redisClient *c, robj *key, robj *ele) {
ln = listFirst(l); ln = listFirst(l);
assert(ln != NULL); assert(ln != NULL);
receiver = ln->value; receiver = ln->value;
listDelNode(l,ln);
if (listLength(l) == 0)
dictDelete(c->db->blockingkeys,key);
addReplyBulkLen(receiver,ele); addReplyBulkLen(receiver,ele);
addReply(receiver,ele); addReply(receiver,ele);