diff --git a/TODO b/TODO index f3eaeb219..6e1718873 100644 --- a/TODO +++ b/TODO @@ -3,14 +3,19 @@ Redis TODO and Roadmap VERSION 1.4 TODO (Hash type) ============================ -* Blocking LPOP (BLPOP). -* Hashes (HSET, HGET, HEXISTS, HLEN, ...). +* BRPOPLPUSH +* RPOPLPUSH should notify blocking POP operations * List ops like L/RPUSH L/RPOP should return the new list length. +* Save dataset / fsync() on SIGTERM +* MULTI/EXEC should support the "EXEC FSYNC" form +* Synchronous Virtual Memory +* BLPOP & C. tests (write a non blocking Tcl client as first step) VERSION 1.6 TODO (Virtual memory) ================================= -* Redis Virtual Memory for datasets bigger than RAM (http://groups.google.com/group/redis-db/msg/752997c7b38553cd) +* Asynchronous Virtual Memory +* Hashes (HSET, HGET, HEXISTS, HLEN, ...). VERSION 1.8 TODO (Fault tollerant sharding) =========================================== diff --git a/redis-cli.c b/redis-cli.c index fbf90fd87..a082a9bb9 100644 --- a/redis-cli.c +++ b/redis-cli.c @@ -71,8 +71,8 @@ static struct redisCommand cmdTable[] = { {"lpush",3,REDIS_CMD_BULK}, {"rpop",2,REDIS_CMD_INLINE}, {"lpop",2,REDIS_CMD_INLINE}, - {"brpop",3,REDIS_CMD_INLINE}, - {"blpop",3,REDIS_CMD_INLINE}, + {"brpop",-3,REDIS_CMD_INLINE}, + {"blpop",-3,REDIS_CMD_INLINE}, {"llen",2,REDIS_CMD_INLINE}, {"lindex",3,REDIS_CMD_INLINE}, {"lset",4,REDIS_CMD_BULK}, diff --git a/redis.c b/redis.c index 8b771c029..9f1eb7a25 100644 --- a/redis.c +++ b/redis.c @@ -268,8 +268,9 @@ typedef struct redisClient { long repldboff; /* replication DB file offset */ off_t repldbsize; /* replication DB file size */ multiState mstate; /* MULTI/EXEC state */ - robj *blockingkey; /* The key we waiting to terminate a blocking + robj **blockingkeys; /* The key we waiting to terminate a blocking * operation such as BLPOP. Otherwise NULL. */ + int blockingkeysnum; /* Number of blocking keys */ time_t blockingto; /* Blocking operation timeout. If UNIX current time * is >= blockingto then the operation timed out. */ } redisClient; @@ -542,8 +543,8 @@ static struct redisCommand cmdTable[] = { {"lpush",lpushCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM}, {"rpop",rpopCommand,2,REDIS_CMD_INLINE}, {"lpop",lpopCommand,2,REDIS_CMD_INLINE}, - {"brpop",brpopCommand,3,REDIS_CMD_INLINE}, - {"blpop",blpopCommand,3,REDIS_CMD_INLINE}, + {"brpop",brpopCommand,-3,REDIS_CMD_INLINE}, + {"blpop",blpopCommand,-3,REDIS_CMD_INLINE}, {"llen",llenCommand,2,REDIS_CMD_INLINE}, {"lindex",lindexCommand,3,REDIS_CMD_INLINE}, {"lset",lsetCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM}, @@ -902,7 +903,7 @@ static void closeTimedoutClients(void) { freeClient(c); } else if (c->flags & REDIS_BLOCKED) { if (c->blockingto != 0 && c->blockingto < now) { - addReply(c,shared.nullbulk); + addReply(c,shared.nullmultibulk); unblockClient(c); } } @@ -2089,7 +2090,8 @@ static redisClient *createClient(int fd) { c->authenticated = 0; c->replstate = REDIS_REPL_NONE; c->reply = listCreate(); - c->blockingkey = NULL; + c->blockingkeys = NULL; + c->blockingkeysnum = 0; listSetFreeMethod(c->reply,decrRefCount); listSetDupMethod(c->reply,dupClientReplyValue); if (aeCreateFileEvent(server.el, c->fd, AE_READABLE, @@ -5492,27 +5494,35 @@ static void execCommand(redisClient *c) { /* Set a client in blocking mode for the specified key, with the specified * timeout */ -static void blockForKey(redisClient *c, robj *key, time_t timeout) { +static void blockForKeys(redisClient *c, robj **keys, int numkeys, time_t timeout) { dictEntry *de; list *l; + int j; - c->blockingkey = key; - incrRefCount(key); + c->blockingkeys = zmalloc(sizeof(robj*)*numkeys); + c->blockingkeysnum = numkeys; c->blockingto = timeout; - de = dictFind(c->db->blockingkeys,key); - if (de == NULL) { - int retval; + for (j = 0; j < numkeys; j++) { + /* Add the key in the client structure, to map clients -> keys */ + c->blockingkeys[j] = keys[j]; + incrRefCount(keys[j]); - /* We take a list of clients blocked for a given key */ - l = listCreate(); - retval = dictAdd(c->db->blockingkeys,key,l); - incrRefCount(key); - assert(retval == DICT_OK); - } else { - l = dictGetEntryVal(de); + /* And in the other "side", to map keys -> clients */ + de = dictFind(c->db->blockingkeys,keys[j]); + if (de == NULL) { + int retval; + + /* For every key we take a list of clients blocked for it */ + l = listCreate(); + retval = dictAdd(c->db->blockingkeys,keys[j],l); + incrRefCount(keys[j]); + assert(retval == DICT_OK); + } else { + l = dictGetEntryVal(de); + } + listAddNodeTail(l,c); } - /* Add this client to the list, and mark it as blocked */ - listAddNodeTail(l,c); + /* Mark the client as a blocked client */ c->flags |= REDIS_BLOCKED; aeDeleteFileEvent(server.el,c->fd,AE_READABLE); server.blockedclients++; @@ -5522,19 +5532,24 @@ static void blockForKey(redisClient *c, robj *key, time_t timeout) { static void unblockClient(redisClient *c) { dictEntry *de; list *l; + int j; - /* Remove this client from the list of clients waiting for this key. */ - assert(c->blockingkey != NULL); - de = dictFind(c->db->blockingkeys,c->blockingkey); - assert(de != NULL); - l = dictGetEntryVal(de); - listDelNode(l,listSearchKey(l,c)); - /* If the list is empty we need to remove it to avoid wasting memory */ - if (listLength(l) == 0) - dictDelete(c->db->blockingkeys,c->blockingkey); - /* Finally set the right flags in the client structure */ - decrRefCount(c->blockingkey); - c->blockingkey = NULL; + assert(c->blockingkeys != NULL); + /* The client may wait for multiple keys, so unblock it for every key. */ + for (j = 0; j < c->blockingkeysnum; j++) { + /* Remove this client from the list of clients waiting for this key. */ + de = dictFind(c->db->blockingkeys,c->blockingkeys[j]); + assert(de != NULL); + l = dictGetEntryVal(de); + listDelNode(l,listSearchKey(l,c)); + /* If the list is empty we need to remove it to avoid wasting memory */ + if (listLength(l) == 0) + dictDelete(c->db->blockingkeys,c->blockingkeys[j]); + decrRefCount(c->blockingkeys[j]); + } + /* Cleanup the client structure */ + zfree(c->blockingkeys); + c->blockingkeys = NULL; c->flags &= (~REDIS_BLOCKED); server.blockedclients--; /* Ok now we are ready to get read events from socket, note that we @@ -5574,6 +5589,10 @@ static int handleClientsWaitingListPush(redisClient *c, robj *key, robj *ele) { assert(ln != NULL); receiver = ln->value; + addReplySds(receiver,sdsnew("*2\r\n")); + addReplyBulkLen(receiver,key); + addReply(receiver,key); + addReply(receiver,shared.crlf); addReplyBulkLen(receiver,ele); addReply(receiver,ele); addReply(receiver,shared.crlf); @@ -5585,26 +5604,53 @@ static int handleClientsWaitingListPush(redisClient *c, robj *key, robj *ele) { static void blockingPopGenericCommand(redisClient *c, int where) { robj *o; time_t timeout; + int j; - o = lookupKeyWrite(c->db,c->argv[1]); - if (o != NULL) { - if (o->type != REDIS_LIST) { - popGenericCommand(c,where); - return; - } else { - list *list = o->ptr; - if (listLength(list) != 0) { - /* If the list contains elements fall back to the usual - * non-blocking POP operation */ - popGenericCommand(c,where); + for (j = 1; j < c->argc-1; j++) { + o = lookupKeyWrite(c->db,c->argv[j]); + if (o != NULL) { + if (o->type != REDIS_LIST) { + addReply(c,shared.wrongtypeerr); return; + } else { + list *list = o->ptr; + if (listLength(list) != 0) { + /* If the list contains elements fall back to the usual + * non-blocking POP operation */ + robj *argv[2], **orig_argv; + int orig_argc; + + /* We need to alter the command arguments before to call + * popGenericCommand() as the command takes a single key. */ + orig_argv = c->argv; + orig_argc = c->argc; + argv[1] = c->argv[j]; + c->argv = argv; + c->argc = 2; + + /* Also the return value is different, we need to output + * the multi bulk reply header and the key name. The + * "real" command will add the last element (the value) + * for us. If this souds like an hack to you it's just + * because it is... */ + addReplySds(c,sdsnew("*2\r\n")); + addReplyBulkLen(c,argv[1]); + addReply(c,argv[1]); + addReply(c,shared.crlf); + popGenericCommand(c,where); + + /* Fix the client structure with the original stuff */ + c->argv = orig_argv; + c->argc = orig_argc; + return; + } } } } /* If the list is empty or the key does not exists we must block */ - timeout = strtol(c->argv[2]->ptr,NULL,10); + timeout = strtol(c->argv[c->argc-1]->ptr,NULL,10); if (timeout > 0) timeout += time(NULL); - blockForKey(c,c->argv[1],timeout); + blockForKeys(c,c->argv+1,c->argc-2,timeout); } static void blpopCommand(redisClient *c) { @@ -6659,7 +6705,7 @@ static void segvHandler(int sig, siginfo_t *info, void *secret) { redisLog(REDIS_WARNING,"%d redis-server %p %s + %d", i, trace[i], fn, (unsigned int)offset); } } - // free(messages); Don't call free() with possibly corrupted memory. + /* free(messages); Don't call free() with possibly corrupted memory. */ exit(0); } diff --git a/staticsymbols.h b/staticsymbols.h index d22663dad..0d48d7013 100644 --- a/staticsymbols.h +++ b/staticsymbols.h @@ -10,7 +10,7 @@ static struct redisFunctionSym symsTable[] = { {"authCommand",(unsigned long)authCommand}, {"bgrewriteaofCommand",(unsigned long)bgrewriteaofCommand}, {"bgsaveCommand",(unsigned long)bgsaveCommand}, -{"blockForKey",(unsigned long)blockForKey}, +{"blockForKeys",(unsigned long)blockForKeys}, {"blockingPopGenericCommand",(unsigned long)blockingPopGenericCommand}, {"blpopCommand",(unsigned long)blpopCommand}, {"brpopCommand",(unsigned long)brpopCommand},