New vararg BLPOP able to block against multiple keys

This commit is contained in:
antirez 2010-01-02 09:06:44 -05:00
parent 58d976b8e8
commit b177fd30ac
4 changed files with 103 additions and 52 deletions

11
TODO
View File

@ -3,14 +3,19 @@ Redis TODO and Roadmap
VERSION 1.4 TODO (Hash type) VERSION 1.4 TODO (Hash type)
============================ ============================
* Blocking LPOP (BLPOP). * BRPOPLPUSH
* Hashes (HSET, HGET, HEXISTS, HLEN, ...). * RPOPLPUSH should notify blocking POP operations
* List ops like L/RPUSH L/RPOP should return the new list length. * List ops like L/RPUSH L/RPOP should return the new list length.
* Save dataset / fsync() on SIGTERM
* MULTI/EXEC should support the "EXEC FSYNC" form
* Synchronous Virtual Memory
* BLPOP & C. tests (write a non blocking Tcl client as first step)
VERSION 1.6 TODO (Virtual memory) VERSION 1.6 TODO (Virtual memory)
================================= =================================
* Redis Virtual Memory for datasets bigger than RAM (http://groups.google.com/group/redis-db/msg/752997c7b38553cd) * Asynchronous Virtual Memory
* Hashes (HSET, HGET, HEXISTS, HLEN, ...).
VERSION 1.8 TODO (Fault tollerant sharding) VERSION 1.8 TODO (Fault tollerant sharding)
=========================================== ===========================================

View File

@ -71,8 +71,8 @@ static struct redisCommand cmdTable[] = {
{"lpush",3,REDIS_CMD_BULK}, {"lpush",3,REDIS_CMD_BULK},
{"rpop",2,REDIS_CMD_INLINE}, {"rpop",2,REDIS_CMD_INLINE},
{"lpop",2,REDIS_CMD_INLINE}, {"lpop",2,REDIS_CMD_INLINE},
{"brpop",3,REDIS_CMD_INLINE}, {"brpop",-3,REDIS_CMD_INLINE},
{"blpop",3,REDIS_CMD_INLINE}, {"blpop",-3,REDIS_CMD_INLINE},
{"llen",2,REDIS_CMD_INLINE}, {"llen",2,REDIS_CMD_INLINE},
{"lindex",3,REDIS_CMD_INLINE}, {"lindex",3,REDIS_CMD_INLINE},
{"lset",4,REDIS_CMD_BULK}, {"lset",4,REDIS_CMD_BULK},

138
redis.c
View File

@ -268,8 +268,9 @@ typedef struct redisClient {
long repldboff; /* replication DB file offset */ long repldboff; /* replication DB file offset */
off_t repldbsize; /* replication DB file size */ off_t repldbsize; /* replication DB file size */
multiState mstate; /* MULTI/EXEC state */ multiState mstate; /* MULTI/EXEC state */
robj *blockingkey; /* The key we waiting to terminate a blocking robj **blockingkeys; /* The key we waiting to terminate a blocking
* operation such as BLPOP. Otherwise NULL. */ * operation such as BLPOP. Otherwise NULL. */
int blockingkeysnum; /* Number of blocking keys */
time_t blockingto; /* Blocking operation timeout. If UNIX current time time_t blockingto; /* Blocking operation timeout. If UNIX current time
* is >= blockingto then the operation timed out. */ * is >= blockingto then the operation timed out. */
} redisClient; } redisClient;
@ -542,8 +543,8 @@ static struct redisCommand cmdTable[] = {
{"lpush",lpushCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM}, {"lpush",lpushCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
{"rpop",rpopCommand,2,REDIS_CMD_INLINE}, {"rpop",rpopCommand,2,REDIS_CMD_INLINE},
{"lpop",lpopCommand,2,REDIS_CMD_INLINE}, {"lpop",lpopCommand,2,REDIS_CMD_INLINE},
{"brpop",brpopCommand,3,REDIS_CMD_INLINE}, {"brpop",brpopCommand,-3,REDIS_CMD_INLINE},
{"blpop",blpopCommand,3,REDIS_CMD_INLINE}, {"blpop",blpopCommand,-3,REDIS_CMD_INLINE},
{"llen",llenCommand,2,REDIS_CMD_INLINE}, {"llen",llenCommand,2,REDIS_CMD_INLINE},
{"lindex",lindexCommand,3,REDIS_CMD_INLINE}, {"lindex",lindexCommand,3,REDIS_CMD_INLINE},
{"lset",lsetCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM}, {"lset",lsetCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
@ -902,7 +903,7 @@ static void closeTimedoutClients(void) {
freeClient(c); freeClient(c);
} else if (c->flags & REDIS_BLOCKED) { } else if (c->flags & REDIS_BLOCKED) {
if (c->blockingto != 0 && c->blockingto < now) { if (c->blockingto != 0 && c->blockingto < now) {
addReply(c,shared.nullbulk); addReply(c,shared.nullmultibulk);
unblockClient(c); unblockClient(c);
} }
} }
@ -2089,7 +2090,8 @@ static redisClient *createClient(int fd) {
c->authenticated = 0; c->authenticated = 0;
c->replstate = REDIS_REPL_NONE; c->replstate = REDIS_REPL_NONE;
c->reply = listCreate(); c->reply = listCreate();
c->blockingkey = NULL; c->blockingkeys = NULL;
c->blockingkeysnum = 0;
listSetFreeMethod(c->reply,decrRefCount); listSetFreeMethod(c->reply,decrRefCount);
listSetDupMethod(c->reply,dupClientReplyValue); listSetDupMethod(c->reply,dupClientReplyValue);
if (aeCreateFileEvent(server.el, c->fd, AE_READABLE, if (aeCreateFileEvent(server.el, c->fd, AE_READABLE,
@ -5492,27 +5494,35 @@ static void execCommand(redisClient *c) {
/* Set a client in blocking mode for the specified key, with the specified /* Set a client in blocking mode for the specified key, with the specified
* timeout */ * timeout */
static void blockForKey(redisClient *c, robj *key, time_t timeout) { static void blockForKeys(redisClient *c, robj **keys, int numkeys, time_t timeout) {
dictEntry *de; dictEntry *de;
list *l; list *l;
int j;
c->blockingkey = key; c->blockingkeys = zmalloc(sizeof(robj*)*numkeys);
incrRefCount(key); c->blockingkeysnum = numkeys;
c->blockingto = timeout; c->blockingto = timeout;
de = dictFind(c->db->blockingkeys,key); for (j = 0; j < numkeys; j++) {
if (de == NULL) { /* Add the key in the client structure, to map clients -> keys */
int retval; c->blockingkeys[j] = keys[j];
incrRefCount(keys[j]);
/* We take a list of clients blocked for a given key */ /* And in the other "side", to map keys -> clients */
l = listCreate(); de = dictFind(c->db->blockingkeys,keys[j]);
retval = dictAdd(c->db->blockingkeys,key,l); if (de == NULL) {
incrRefCount(key); int retval;
assert(retval == DICT_OK);
} else { /* For every key we take a list of clients blocked for it */
l = dictGetEntryVal(de); l = listCreate();
retval = dictAdd(c->db->blockingkeys,keys[j],l);
incrRefCount(keys[j]);
assert(retval == DICT_OK);
} else {
l = dictGetEntryVal(de);
}
listAddNodeTail(l,c);
} }
/* Add this client to the list, and mark it as blocked */ /* Mark the client as a blocked client */
listAddNodeTail(l,c);
c->flags |= REDIS_BLOCKED; c->flags |= REDIS_BLOCKED;
aeDeleteFileEvent(server.el,c->fd,AE_READABLE); aeDeleteFileEvent(server.el,c->fd,AE_READABLE);
server.blockedclients++; server.blockedclients++;
@ -5522,19 +5532,24 @@ static void blockForKey(redisClient *c, robj *key, time_t timeout) {
static void unblockClient(redisClient *c) { static void unblockClient(redisClient *c) {
dictEntry *de; dictEntry *de;
list *l; list *l;
int j;
/* Remove this client from the list of clients waiting for this key. */ assert(c->blockingkeys != NULL);
assert(c->blockingkey != NULL); /* The client may wait for multiple keys, so unblock it for every key. */
de = dictFind(c->db->blockingkeys,c->blockingkey); for (j = 0; j < c->blockingkeysnum; j++) {
assert(de != NULL); /* Remove this client from the list of clients waiting for this key. */
l = dictGetEntryVal(de); de = dictFind(c->db->blockingkeys,c->blockingkeys[j]);
listDelNode(l,listSearchKey(l,c)); assert(de != NULL);
/* If the list is empty we need to remove it to avoid wasting memory */ l = dictGetEntryVal(de);
if (listLength(l) == 0) listDelNode(l,listSearchKey(l,c));
dictDelete(c->db->blockingkeys,c->blockingkey); /* If the list is empty we need to remove it to avoid wasting memory */
/* Finally set the right flags in the client structure */ if (listLength(l) == 0)
decrRefCount(c->blockingkey); dictDelete(c->db->blockingkeys,c->blockingkeys[j]);
c->blockingkey = NULL; decrRefCount(c->blockingkeys[j]);
}
/* Cleanup the client structure */
zfree(c->blockingkeys);
c->blockingkeys = NULL;
c->flags &= (~REDIS_BLOCKED); c->flags &= (~REDIS_BLOCKED);
server.blockedclients--; server.blockedclients--;
/* Ok now we are ready to get read events from socket, note that we /* Ok now we are ready to get read events from socket, note that we
@ -5574,6 +5589,10 @@ static int handleClientsWaitingListPush(redisClient *c, robj *key, robj *ele) {
assert(ln != NULL); assert(ln != NULL);
receiver = ln->value; receiver = ln->value;
addReplySds(receiver,sdsnew("*2\r\n"));
addReplyBulkLen(receiver,key);
addReply(receiver,key);
addReply(receiver,shared.crlf);
addReplyBulkLen(receiver,ele); addReplyBulkLen(receiver,ele);
addReply(receiver,ele); addReply(receiver,ele);
addReply(receiver,shared.crlf); addReply(receiver,shared.crlf);
@ -5585,26 +5604,53 @@ static int handleClientsWaitingListPush(redisClient *c, robj *key, robj *ele) {
static void blockingPopGenericCommand(redisClient *c, int where) { static void blockingPopGenericCommand(redisClient *c, int where) {
robj *o; robj *o;
time_t timeout; time_t timeout;
int j;
o = lookupKeyWrite(c->db,c->argv[1]); for (j = 1; j < c->argc-1; j++) {
if (o != NULL) { o = lookupKeyWrite(c->db,c->argv[j]);
if (o->type != REDIS_LIST) { if (o != NULL) {
popGenericCommand(c,where); if (o->type != REDIS_LIST) {
return; addReply(c,shared.wrongtypeerr);
} else {
list *list = o->ptr;
if (listLength(list) != 0) {
/* If the list contains elements fall back to the usual
* non-blocking POP operation */
popGenericCommand(c,where);
return; return;
} else {
list *list = o->ptr;
if (listLength(list) != 0) {
/* If the list contains elements fall back to the usual
* non-blocking POP operation */
robj *argv[2], **orig_argv;
int orig_argc;
/* We need to alter the command arguments before to call
* popGenericCommand() as the command takes a single key. */
orig_argv = c->argv;
orig_argc = c->argc;
argv[1] = c->argv[j];
c->argv = argv;
c->argc = 2;
/* Also the return value is different, we need to output
* the multi bulk reply header and the key name. The
* "real" command will add the last element (the value)
* for us. If this souds like an hack to you it's just
* because it is... */
addReplySds(c,sdsnew("*2\r\n"));
addReplyBulkLen(c,argv[1]);
addReply(c,argv[1]);
addReply(c,shared.crlf);
popGenericCommand(c,where);
/* Fix the client structure with the original stuff */
c->argv = orig_argv;
c->argc = orig_argc;
return;
}
} }
} }
} }
/* If the list is empty or the key does not exists we must block */ /* If the list is empty or the key does not exists we must block */
timeout = strtol(c->argv[2]->ptr,NULL,10); timeout = strtol(c->argv[c->argc-1]->ptr,NULL,10);
if (timeout > 0) timeout += time(NULL); if (timeout > 0) timeout += time(NULL);
blockForKey(c,c->argv[1],timeout); blockForKeys(c,c->argv+1,c->argc-2,timeout);
} }
static void blpopCommand(redisClient *c) { static void blpopCommand(redisClient *c) {
@ -6659,7 +6705,7 @@ static void segvHandler(int sig, siginfo_t *info, void *secret) {
redisLog(REDIS_WARNING,"%d redis-server %p %s + %d", i, trace[i], fn, (unsigned int)offset); redisLog(REDIS_WARNING,"%d redis-server %p %s + %d", i, trace[i], fn, (unsigned int)offset);
} }
} }
// free(messages); Don't call free() with possibly corrupted memory. /* free(messages); Don't call free() with possibly corrupted memory. */
exit(0); exit(0);
} }

View File

@ -10,7 +10,7 @@ static struct redisFunctionSym symsTable[] = {
{"authCommand",(unsigned long)authCommand}, {"authCommand",(unsigned long)authCommand},
{"bgrewriteaofCommand",(unsigned long)bgrewriteaofCommand}, {"bgrewriteaofCommand",(unsigned long)bgrewriteaofCommand},
{"bgsaveCommand",(unsigned long)bgsaveCommand}, {"bgsaveCommand",(unsigned long)bgsaveCommand},
{"blockForKey",(unsigned long)blockForKey}, {"blockForKeys",(unsigned long)blockForKeys},
{"blockingPopGenericCommand",(unsigned long)blockingPopGenericCommand}, {"blockingPopGenericCommand",(unsigned long)blockingPopGenericCommand},
{"blpopCommand",(unsigned long)blpopCommand}, {"blpopCommand",(unsigned long)blpopCommand},
{"brpopCommand",(unsigned long)brpopCommand}, {"brpopCommand",(unsigned long)brpopCommand},