2012-11-08 12:25:23 -05:00
|
|
|
/*
|
|
|
|
* Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
|
|
|
|
* All rights reserved.
|
|
|
|
*
|
|
|
|
* Redistribution and use in source and binary forms, with or without
|
|
|
|
* modification, are permitted provided that the following conditions are met:
|
|
|
|
*
|
|
|
|
* * Redistributions of source code must retain the above copyright notice,
|
|
|
|
* this list of conditions and the following disclaimer.
|
|
|
|
* * Redistributions in binary form must reproduce the above copyright
|
|
|
|
* notice, this list of conditions and the following disclaimer in the
|
|
|
|
* documentation and/or other materials provided with the distribution.
|
|
|
|
* * Neither the name of Redis nor the names of its contributors may be used
|
|
|
|
* to endorse or promote products derived from this software without
|
|
|
|
* specific prior written permission.
|
|
|
|
*
|
|
|
|
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
|
|
|
|
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
|
|
|
|
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
|
|
|
|
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
|
|
|
|
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
|
|
|
|
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
|
|
|
|
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
|
|
|
|
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
|
|
|
|
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
|
|
|
|
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
|
|
|
|
* POSSIBILITY OF SUCH DAMAGE.
|
|
|
|
*/
|
|
|
|
|
2010-06-21 18:07:48 -04:00
|
|
|
#include "redis.h"
|
|
|
|
|
A reimplementation of blocking operation internals.
Redis provides support for blocking operations such as BLPOP or BRPOP.
This operations are identical to normal LPOP and RPOP operations as long
as there are elements in the target list, but if the list is empty they
block waiting for new data to arrive to the list.
All the clients blocked waiting for th same list are served in a FIFO
way, so the first that blocked is the first to be served when there is
more data pushed by another client into the list.
The previous implementation of blocking operations was conceived to
serve clients in the context of push operations. For for instance:
1) There is a client "A" blocked on list "foo".
2) The client "B" performs `LPUSH foo somevalue`.
3) The client "A" is served in the context of the "B" LPUSH,
synchronously.
Processing things in a synchronous way was useful as if "A" pushes a
value that is served by "B", from the point of view of the database is a
NOP (no operation) thing, that is, nothing is replicated, nothing is
written in the AOF file, and so forth.
However later we implemented two things:
1) Variadic LPUSH that could add multiple values to a list in the
context of a single call.
2) BRPOPLPUSH that was a version of BRPOP that also provided a "PUSH"
side effect when receiving data.
This forced us to make the synchronous implementation more complex. If
client "B" is waiting for data, and "A" pushes three elemnents in a
single call, we needed to propagate an LPUSH with a missing argument
in the AOF and replication link. We also needed to make sure to
replicate the LPUSH side of BRPOPLPUSH, but only if in turn did not
happened to serve another blocking client into another list ;)
This were complex but with a few of mutually recursive functions
everything worked as expected... until one day we introduced scripting
in Redis.
Scripting + synchronous blocking operations = Issue #614.
Basically you can't "rewrite" a script to have just a partial effect on
the replicas and AOF file if the script happened to serve a few blocked
clients.
The solution to all this problems, implemented by this commit, is to
change the way we serve blocked clients. Instead of serving the blocked
clients synchronously, in the context of the command performing the PUSH
operation, it is now an asynchronous and iterative process:
1) If a key that has clients blocked waiting for data is the subject of
a list push operation, We simply mark keys as "ready" and put it into a
queue.
2) Every command pushing stuff on lists, as a variadic LPUSH, a script,
or whatever it is, is replicated verbatim without any rewriting.
3) Every time a Redis command, a MULTI/EXEC block, or a script,
completed its execution, we run the list of keys ready to serve blocked
clients (as more data arrived), and process this list serving the
blocked clients.
4) As a result of "3" maybe more keys are ready again for other clients
(as a result of BRPOPLPUSH we may have push operations), so we iterate
back to step "3" if it's needed.
The new code has a much simpler semantics, and a simpler to understand
implementation, with the disadvantage of not being able to "optmize out"
a PUSH+BPOP as a No OP.
This commit will be tested with care before the final merge, more tests
will be added likely.
2012-09-04 04:37:49 -04:00
|
|
|
void signalListAsReady(redisClient *c, robj *key);
|
|
|
|
|
2010-06-21 18:07:48 -04:00
|
|
|
/*-----------------------------------------------------------------------------
|
|
|
|
* List API
|
|
|
|
*----------------------------------------------------------------------------*/
|
|
|
|
|
|
|
|
/* Check the argument length to see if it requires us to convert the ziplist
|
|
|
|
* to a real list. Only check raw-encoded objects because integer encoded
|
|
|
|
* objects are never too long. */
|
|
|
|
void listTypeTryConversion(robj *subject, robj *value) {
|
|
|
|
if (subject->encoding != REDIS_ENCODING_ZIPLIST) return;
|
|
|
|
if (value->encoding == REDIS_ENCODING_RAW &&
|
|
|
|
sdslen(value->ptr) > server.list_max_ziplist_value)
|
|
|
|
listTypeConvert(subject,REDIS_ENCODING_LINKEDLIST);
|
|
|
|
}
|
|
|
|
|
A reimplementation of blocking operation internals.
Redis provides support for blocking operations such as BLPOP or BRPOP.
This operations are identical to normal LPOP and RPOP operations as long
as there are elements in the target list, but if the list is empty they
block waiting for new data to arrive to the list.
All the clients blocked waiting for th same list are served in a FIFO
way, so the first that blocked is the first to be served when there is
more data pushed by another client into the list.
The previous implementation of blocking operations was conceived to
serve clients in the context of push operations. For for instance:
1) There is a client "A" blocked on list "foo".
2) The client "B" performs `LPUSH foo somevalue`.
3) The client "A" is served in the context of the "B" LPUSH,
synchronously.
Processing things in a synchronous way was useful as if "A" pushes a
value that is served by "B", from the point of view of the database is a
NOP (no operation) thing, that is, nothing is replicated, nothing is
written in the AOF file, and so forth.
However later we implemented two things:
1) Variadic LPUSH that could add multiple values to a list in the
context of a single call.
2) BRPOPLPUSH that was a version of BRPOP that also provided a "PUSH"
side effect when receiving data.
This forced us to make the synchronous implementation more complex. If
client "B" is waiting for data, and "A" pushes three elemnents in a
single call, we needed to propagate an LPUSH with a missing argument
in the AOF and replication link. We also needed to make sure to
replicate the LPUSH side of BRPOPLPUSH, but only if in turn did not
happened to serve another blocking client into another list ;)
This were complex but with a few of mutually recursive functions
everything worked as expected... until one day we introduced scripting
in Redis.
Scripting + synchronous blocking operations = Issue #614.
Basically you can't "rewrite" a script to have just a partial effect on
the replicas and AOF file if the script happened to serve a few blocked
clients.
The solution to all this problems, implemented by this commit, is to
change the way we serve blocked clients. Instead of serving the blocked
clients synchronously, in the context of the command performing the PUSH
operation, it is now an asynchronous and iterative process:
1) If a key that has clients blocked waiting for data is the subject of
a list push operation, We simply mark keys as "ready" and put it into a
queue.
2) Every command pushing stuff on lists, as a variadic LPUSH, a script,
or whatever it is, is replicated verbatim without any rewriting.
3) Every time a Redis command, a MULTI/EXEC block, or a script,
completed its execution, we run the list of keys ready to serve blocked
clients (as more data arrived), and process this list serving the
blocked clients.
4) As a result of "3" maybe more keys are ready again for other clients
(as a result of BRPOPLPUSH we may have push operations), so we iterate
back to step "3" if it's needed.
The new code has a much simpler semantics, and a simpler to understand
implementation, with the disadvantage of not being able to "optmize out"
a PUSH+BPOP as a No OP.
This commit will be tested with care before the final merge, more tests
will be added likely.
2012-09-04 04:37:49 -04:00
|
|
|
/* The function pushes an elmenet to the specified list object 'subject',
|
|
|
|
* at head or tail position as specified by 'where'.
|
|
|
|
*
|
|
|
|
* There is no need for the caller to incremnet the refcount of 'value' as
|
|
|
|
* the function takes care of it if needed. */
|
2010-06-21 18:07:48 -04:00
|
|
|
void listTypePush(robj *subject, robj *value, int where) {
|
|
|
|
/* Check if we need to convert the ziplist */
|
|
|
|
listTypeTryConversion(subject,value);
|
|
|
|
if (subject->encoding == REDIS_ENCODING_ZIPLIST &&
|
|
|
|
ziplistLen(subject->ptr) >= server.list_max_ziplist_entries)
|
|
|
|
listTypeConvert(subject,REDIS_ENCODING_LINKEDLIST);
|
|
|
|
|
|
|
|
if (subject->encoding == REDIS_ENCODING_ZIPLIST) {
|
|
|
|
int pos = (where == REDIS_HEAD) ? ZIPLIST_HEAD : ZIPLIST_TAIL;
|
|
|
|
value = getDecodedObject(value);
|
|
|
|
subject->ptr = ziplistPush(subject->ptr,value->ptr,sdslen(value->ptr),pos);
|
|
|
|
decrRefCount(value);
|
|
|
|
} else if (subject->encoding == REDIS_ENCODING_LINKEDLIST) {
|
|
|
|
if (where == REDIS_HEAD) {
|
|
|
|
listAddNodeHead(subject->ptr,value);
|
|
|
|
} else {
|
|
|
|
listAddNodeTail(subject->ptr,value);
|
|
|
|
}
|
|
|
|
incrRefCount(value);
|
|
|
|
} else {
|
|
|
|
redisPanic("Unknown list encoding");
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
robj *listTypePop(robj *subject, int where) {
|
|
|
|
robj *value = NULL;
|
|
|
|
if (subject->encoding == REDIS_ENCODING_ZIPLIST) {
|
|
|
|
unsigned char *p;
|
|
|
|
unsigned char *vstr;
|
|
|
|
unsigned int vlen;
|
|
|
|
long long vlong;
|
|
|
|
int pos = (where == REDIS_HEAD) ? 0 : -1;
|
|
|
|
p = ziplistIndex(subject->ptr,pos);
|
|
|
|
if (ziplistGet(p,&vstr,&vlen,&vlong)) {
|
|
|
|
if (vstr) {
|
|
|
|
value = createStringObject((char*)vstr,vlen);
|
|
|
|
} else {
|
|
|
|
value = createStringObjectFromLongLong(vlong);
|
|
|
|
}
|
|
|
|
/* We only need to delete an element when it exists */
|
|
|
|
subject->ptr = ziplistDelete(subject->ptr,&p);
|
|
|
|
}
|
|
|
|
} else if (subject->encoding == REDIS_ENCODING_LINKEDLIST) {
|
|
|
|
list *list = subject->ptr;
|
|
|
|
listNode *ln;
|
|
|
|
if (where == REDIS_HEAD) {
|
|
|
|
ln = listFirst(list);
|
|
|
|
} else {
|
|
|
|
ln = listLast(list);
|
|
|
|
}
|
|
|
|
if (ln != NULL) {
|
|
|
|
value = listNodeValue(ln);
|
|
|
|
incrRefCount(value);
|
|
|
|
listDelNode(list,ln);
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
redisPanic("Unknown list encoding");
|
|
|
|
}
|
|
|
|
return value;
|
|
|
|
}
|
|
|
|
|
|
|
|
unsigned long listTypeLength(robj *subject) {
|
|
|
|
if (subject->encoding == REDIS_ENCODING_ZIPLIST) {
|
|
|
|
return ziplistLen(subject->ptr);
|
|
|
|
} else if (subject->encoding == REDIS_ENCODING_LINKEDLIST) {
|
|
|
|
return listLength((list*)subject->ptr);
|
|
|
|
} else {
|
|
|
|
redisPanic("Unknown list encoding");
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/* Initialize an iterator at the specified index. */
|
2012-01-31 04:35:52 -05:00
|
|
|
listTypeIterator *listTypeInitIterator(robj *subject, long index, unsigned char direction) {
|
2010-06-21 18:07:48 -04:00
|
|
|
listTypeIterator *li = zmalloc(sizeof(listTypeIterator));
|
|
|
|
li->subject = subject;
|
|
|
|
li->encoding = subject->encoding;
|
|
|
|
li->direction = direction;
|
|
|
|
if (li->encoding == REDIS_ENCODING_ZIPLIST) {
|
|
|
|
li->zi = ziplistIndex(subject->ptr,index);
|
|
|
|
} else if (li->encoding == REDIS_ENCODING_LINKEDLIST) {
|
|
|
|
li->ln = listIndex(subject->ptr,index);
|
|
|
|
} else {
|
|
|
|
redisPanic("Unknown list encoding");
|
|
|
|
}
|
|
|
|
return li;
|
|
|
|
}
|
|
|
|
|
|
|
|
/* Clean up the iterator. */
|
|
|
|
void listTypeReleaseIterator(listTypeIterator *li) {
|
|
|
|
zfree(li);
|
|
|
|
}
|
|
|
|
|
|
|
|
/* Stores pointer to current the entry in the provided entry structure
|
|
|
|
* and advances the position of the iterator. Returns 1 when the current
|
|
|
|
* entry is in fact an entry, 0 otherwise. */
|
|
|
|
int listTypeNext(listTypeIterator *li, listTypeEntry *entry) {
|
|
|
|
/* Protect from converting when iterating */
|
|
|
|
redisAssert(li->subject->encoding == li->encoding);
|
|
|
|
|
|
|
|
entry->li = li;
|
|
|
|
if (li->encoding == REDIS_ENCODING_ZIPLIST) {
|
|
|
|
entry->zi = li->zi;
|
|
|
|
if (entry->zi != NULL) {
|
|
|
|
if (li->direction == REDIS_TAIL)
|
|
|
|
li->zi = ziplistNext(li->subject->ptr,li->zi);
|
|
|
|
else
|
|
|
|
li->zi = ziplistPrev(li->subject->ptr,li->zi);
|
|
|
|
return 1;
|
|
|
|
}
|
|
|
|
} else if (li->encoding == REDIS_ENCODING_LINKEDLIST) {
|
|
|
|
entry->ln = li->ln;
|
|
|
|
if (entry->ln != NULL) {
|
|
|
|
if (li->direction == REDIS_TAIL)
|
|
|
|
li->ln = li->ln->next;
|
|
|
|
else
|
|
|
|
li->ln = li->ln->prev;
|
|
|
|
return 1;
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
redisPanic("Unknown list encoding");
|
|
|
|
}
|
|
|
|
return 0;
|
|
|
|
}
|
|
|
|
|
|
|
|
/* Return entry or NULL at the current position of the iterator. */
|
|
|
|
robj *listTypeGet(listTypeEntry *entry) {
|
|
|
|
listTypeIterator *li = entry->li;
|
|
|
|
robj *value = NULL;
|
|
|
|
if (li->encoding == REDIS_ENCODING_ZIPLIST) {
|
|
|
|
unsigned char *vstr;
|
|
|
|
unsigned int vlen;
|
|
|
|
long long vlong;
|
|
|
|
redisAssert(entry->zi != NULL);
|
|
|
|
if (ziplistGet(entry->zi,&vstr,&vlen,&vlong)) {
|
|
|
|
if (vstr) {
|
|
|
|
value = createStringObject((char*)vstr,vlen);
|
|
|
|
} else {
|
|
|
|
value = createStringObjectFromLongLong(vlong);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
} else if (li->encoding == REDIS_ENCODING_LINKEDLIST) {
|
|
|
|
redisAssert(entry->ln != NULL);
|
|
|
|
value = listNodeValue(entry->ln);
|
|
|
|
incrRefCount(value);
|
|
|
|
} else {
|
|
|
|
redisPanic("Unknown list encoding");
|
|
|
|
}
|
|
|
|
return value;
|
|
|
|
}
|
|
|
|
|
|
|
|
void listTypeInsert(listTypeEntry *entry, robj *value, int where) {
|
|
|
|
robj *subject = entry->li->subject;
|
|
|
|
if (entry->li->encoding == REDIS_ENCODING_ZIPLIST) {
|
|
|
|
value = getDecodedObject(value);
|
|
|
|
if (where == REDIS_TAIL) {
|
|
|
|
unsigned char *next = ziplistNext(subject->ptr,entry->zi);
|
|
|
|
|
|
|
|
/* When we insert after the current element, but the current element
|
|
|
|
* is the tail of the list, we need to do a push. */
|
|
|
|
if (next == NULL) {
|
|
|
|
subject->ptr = ziplistPush(subject->ptr,value->ptr,sdslen(value->ptr),REDIS_TAIL);
|
|
|
|
} else {
|
|
|
|
subject->ptr = ziplistInsert(subject->ptr,next,value->ptr,sdslen(value->ptr));
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
subject->ptr = ziplistInsert(subject->ptr,entry->zi,value->ptr,sdslen(value->ptr));
|
|
|
|
}
|
|
|
|
decrRefCount(value);
|
|
|
|
} else if (entry->li->encoding == REDIS_ENCODING_LINKEDLIST) {
|
|
|
|
if (where == REDIS_TAIL) {
|
|
|
|
listInsertNode(subject->ptr,entry->ln,value,AL_START_TAIL);
|
|
|
|
} else {
|
|
|
|
listInsertNode(subject->ptr,entry->ln,value,AL_START_HEAD);
|
|
|
|
}
|
|
|
|
incrRefCount(value);
|
|
|
|
} else {
|
|
|
|
redisPanic("Unknown list encoding");
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/* Compare the given object with the entry at the current position. */
|
|
|
|
int listTypeEqual(listTypeEntry *entry, robj *o) {
|
|
|
|
listTypeIterator *li = entry->li;
|
|
|
|
if (li->encoding == REDIS_ENCODING_ZIPLIST) {
|
2011-10-04 12:43:03 -04:00
|
|
|
redisAssertWithInfo(NULL,o,o->encoding == REDIS_ENCODING_RAW);
|
2010-06-21 18:07:48 -04:00
|
|
|
return ziplistCompare(entry->zi,o->ptr,sdslen(o->ptr));
|
|
|
|
} else if (li->encoding == REDIS_ENCODING_LINKEDLIST) {
|
|
|
|
return equalStringObjects(o,listNodeValue(entry->ln));
|
|
|
|
} else {
|
|
|
|
redisPanic("Unknown list encoding");
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/* Delete the element pointed to. */
|
|
|
|
void listTypeDelete(listTypeEntry *entry) {
|
|
|
|
listTypeIterator *li = entry->li;
|
|
|
|
if (li->encoding == REDIS_ENCODING_ZIPLIST) {
|
|
|
|
unsigned char *p = entry->zi;
|
|
|
|
li->subject->ptr = ziplistDelete(li->subject->ptr,&p);
|
|
|
|
|
|
|
|
/* Update position of the iterator depending on the direction */
|
|
|
|
if (li->direction == REDIS_TAIL)
|
|
|
|
li->zi = p;
|
|
|
|
else
|
|
|
|
li->zi = ziplistPrev(li->subject->ptr,p);
|
|
|
|
} else if (entry->li->encoding == REDIS_ENCODING_LINKEDLIST) {
|
|
|
|
listNode *next;
|
|
|
|
if (li->direction == REDIS_TAIL)
|
|
|
|
next = entry->ln->next;
|
|
|
|
else
|
|
|
|
next = entry->ln->prev;
|
|
|
|
listDelNode(li->subject->ptr,entry->ln);
|
|
|
|
li->ln = next;
|
|
|
|
} else {
|
|
|
|
redisPanic("Unknown list encoding");
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
void listTypeConvert(robj *subject, int enc) {
|
|
|
|
listTypeIterator *li;
|
|
|
|
listTypeEntry entry;
|
2011-10-04 12:43:03 -04:00
|
|
|
redisAssertWithInfo(NULL,subject,subject->type == REDIS_LIST);
|
2010-06-21 18:07:48 -04:00
|
|
|
|
|
|
|
if (enc == REDIS_ENCODING_LINKEDLIST) {
|
|
|
|
list *l = listCreate();
|
|
|
|
listSetFreeMethod(l,decrRefCount);
|
|
|
|
|
|
|
|
/* listTypeGet returns a robj with incremented refcount */
|
|
|
|
li = listTypeInitIterator(subject,0,REDIS_TAIL);
|
|
|
|
while (listTypeNext(li,&entry)) listAddNodeTail(l,listTypeGet(&entry));
|
|
|
|
listTypeReleaseIterator(li);
|
|
|
|
|
|
|
|
subject->encoding = REDIS_ENCODING_LINKEDLIST;
|
|
|
|
zfree(subject->ptr);
|
|
|
|
subject->ptr = l;
|
|
|
|
} else {
|
|
|
|
redisPanic("Unsupported list conversion");
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/*-----------------------------------------------------------------------------
|
|
|
|
* List Commands
|
|
|
|
*----------------------------------------------------------------------------*/
|
|
|
|
|
|
|
|
void pushGenericCommand(redisClient *c, int where) {
|
2012-02-28 10:17:55 -05:00
|
|
|
int j, waiting = 0, pushed = 0;
|
2010-06-21 18:07:48 -04:00
|
|
|
robj *lobj = lookupKeyWrite(c->db,c->argv[1]);
|
2011-04-15 10:35:27 -04:00
|
|
|
int may_have_waiting_clients = (lobj == NULL);
|
|
|
|
|
|
|
|
if (lobj && lobj->type != REDIS_LIST) {
|
|
|
|
addReply(c,shared.wrongtypeerr);
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
A reimplementation of blocking operation internals.
Redis provides support for blocking operations such as BLPOP or BRPOP.
This operations are identical to normal LPOP and RPOP operations as long
as there are elements in the target list, but if the list is empty they
block waiting for new data to arrive to the list.
All the clients blocked waiting for th same list are served in a FIFO
way, so the first that blocked is the first to be served when there is
more data pushed by another client into the list.
The previous implementation of blocking operations was conceived to
serve clients in the context of push operations. For for instance:
1) There is a client "A" blocked on list "foo".
2) The client "B" performs `LPUSH foo somevalue`.
3) The client "A" is served in the context of the "B" LPUSH,
synchronously.
Processing things in a synchronous way was useful as if "A" pushes a
value that is served by "B", from the point of view of the database is a
NOP (no operation) thing, that is, nothing is replicated, nothing is
written in the AOF file, and so forth.
However later we implemented two things:
1) Variadic LPUSH that could add multiple values to a list in the
context of a single call.
2) BRPOPLPUSH that was a version of BRPOP that also provided a "PUSH"
side effect when receiving data.
This forced us to make the synchronous implementation more complex. If
client "B" is waiting for data, and "A" pushes three elemnents in a
single call, we needed to propagate an LPUSH with a missing argument
in the AOF and replication link. We also needed to make sure to
replicate the LPUSH side of BRPOPLPUSH, but only if in turn did not
happened to serve another blocking client into another list ;)
This were complex but with a few of mutually recursive functions
everything worked as expected... until one day we introduced scripting
in Redis.
Scripting + synchronous blocking operations = Issue #614.
Basically you can't "rewrite" a script to have just a partial effect on
the replicas and AOF file if the script happened to serve a few blocked
clients.
The solution to all this problems, implemented by this commit, is to
change the way we serve blocked clients. Instead of serving the blocked
clients synchronously, in the context of the command performing the PUSH
operation, it is now an asynchronous and iterative process:
1) If a key that has clients blocked waiting for data is the subject of
a list push operation, We simply mark keys as "ready" and put it into a
queue.
2) Every command pushing stuff on lists, as a variadic LPUSH, a script,
or whatever it is, is replicated verbatim without any rewriting.
3) Every time a Redis command, a MULTI/EXEC block, or a script,
completed its execution, we run the list of keys ready to serve blocked
clients (as more data arrived), and process this list serving the
blocked clients.
4) As a result of "3" maybe more keys are ready again for other clients
(as a result of BRPOPLPUSH we may have push operations), so we iterate
back to step "3" if it's needed.
The new code has a much simpler semantics, and a simpler to understand
implementation, with the disadvantage of not being able to "optmize out"
a PUSH+BPOP as a No OP.
This commit will be tested with care before the final merge, more tests
will be added likely.
2012-09-04 04:37:49 -04:00
|
|
|
if (may_have_waiting_clients) signalListAsReady(c,c->argv[1]);
|
|
|
|
|
2011-04-15 10:35:27 -04:00
|
|
|
for (j = 2; j < c->argc; j++) {
|
|
|
|
c->argv[j] = tryObjectEncoding(c->argv[j]);
|
|
|
|
if (!lobj) {
|
|
|
|
lobj = createZiplistObject();
|
|
|
|
dbAdd(c->db,c->argv[1],lobj);
|
2010-06-21 18:07:48 -04:00
|
|
|
}
|
2011-04-15 10:35:27 -04:00
|
|
|
listTypePush(lobj,c->argv[j],where);
|
|
|
|
pushed++;
|
2010-06-21 18:07:48 -04:00
|
|
|
}
|
2012-02-28 10:17:55 -05:00
|
|
|
addReplyLongLong(c, waiting + (lobj ? listTypeLength(lobj) : 0));
|
2011-04-15 10:35:27 -04:00
|
|
|
if (pushed) signalModifiedKey(c->db,c->argv[1]);
|
|
|
|
server.dirty += pushed;
|
2010-06-21 18:07:48 -04:00
|
|
|
}
|
|
|
|
|
|
|
|
void lpushCommand(redisClient *c) {
|
|
|
|
pushGenericCommand(c,REDIS_HEAD);
|
|
|
|
}
|
|
|
|
|
|
|
|
void rpushCommand(redisClient *c) {
|
|
|
|
pushGenericCommand(c,REDIS_TAIL);
|
|
|
|
}
|
|
|
|
|
|
|
|
void pushxGenericCommand(redisClient *c, robj *refval, robj *val, int where) {
|
|
|
|
robj *subject;
|
|
|
|
listTypeIterator *iter;
|
|
|
|
listTypeEntry entry;
|
|
|
|
int inserted = 0;
|
|
|
|
|
|
|
|
if ((subject = lookupKeyReadOrReply(c,c->argv[1],shared.czero)) == NULL ||
|
|
|
|
checkType(c,subject,REDIS_LIST)) return;
|
|
|
|
|
|
|
|
if (refval != NULL) {
|
|
|
|
/* Note: we expect refval to be string-encoded because it is *not* the
|
|
|
|
* last argument of the multi-bulk LINSERT. */
|
2011-10-04 12:43:03 -04:00
|
|
|
redisAssertWithInfo(c,refval,refval->encoding == REDIS_ENCODING_RAW);
|
2010-06-21 18:07:48 -04:00
|
|
|
|
|
|
|
/* We're not sure if this value can be inserted yet, but we cannot
|
|
|
|
* convert the list inside the iterator. We don't want to loop over
|
|
|
|
* the list twice (once to see if the value can be inserted and once
|
|
|
|
* to do the actual insert), so we assume this value can be inserted
|
|
|
|
* and convert the ziplist to a regular list if necessary. */
|
|
|
|
listTypeTryConversion(subject,val);
|
|
|
|
|
|
|
|
/* Seek refval from head to tail */
|
|
|
|
iter = listTypeInitIterator(subject,0,REDIS_TAIL);
|
|
|
|
while (listTypeNext(iter,&entry)) {
|
|
|
|
if (listTypeEqual(&entry,refval)) {
|
|
|
|
listTypeInsert(&entry,val,where);
|
|
|
|
inserted = 1;
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
listTypeReleaseIterator(iter);
|
|
|
|
|
|
|
|
if (inserted) {
|
|
|
|
/* Check if the length exceeds the ziplist length threshold. */
|
|
|
|
if (subject->encoding == REDIS_ENCODING_ZIPLIST &&
|
|
|
|
ziplistLen(subject->ptr) > server.list_max_ziplist_entries)
|
|
|
|
listTypeConvert(subject,REDIS_ENCODING_LINKEDLIST);
|
2010-12-29 13:39:42 -05:00
|
|
|
signalModifiedKey(c->db,c->argv[1]);
|
2010-06-21 18:07:48 -04:00
|
|
|
server.dirty++;
|
|
|
|
} else {
|
|
|
|
/* Notify client of a failed insert */
|
|
|
|
addReply(c,shared.cnegone);
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
listTypePush(subject,val,where);
|
2010-12-29 13:39:42 -05:00
|
|
|
signalModifiedKey(c->db,c->argv[1]);
|
2010-06-21 18:07:48 -04:00
|
|
|
server.dirty++;
|
|
|
|
}
|
|
|
|
|
2010-09-02 08:30:56 -04:00
|
|
|
addReplyLongLong(c,listTypeLength(subject));
|
2010-06-21 18:07:48 -04:00
|
|
|
}
|
|
|
|
|
|
|
|
void lpushxCommand(redisClient *c) {
|
2010-10-17 11:21:41 -04:00
|
|
|
c->argv[2] = tryObjectEncoding(c->argv[2]);
|
2010-06-21 18:07:48 -04:00
|
|
|
pushxGenericCommand(c,NULL,c->argv[2],REDIS_HEAD);
|
|
|
|
}
|
|
|
|
|
|
|
|
void rpushxCommand(redisClient *c) {
|
2010-10-17 11:21:41 -04:00
|
|
|
c->argv[2] = tryObjectEncoding(c->argv[2]);
|
2010-06-21 18:07:48 -04:00
|
|
|
pushxGenericCommand(c,NULL,c->argv[2],REDIS_TAIL);
|
|
|
|
}
|
|
|
|
|
|
|
|
void linsertCommand(redisClient *c) {
|
2010-10-17 11:21:41 -04:00
|
|
|
c->argv[4] = tryObjectEncoding(c->argv[4]);
|
2010-06-21 18:07:48 -04:00
|
|
|
if (strcasecmp(c->argv[2]->ptr,"after") == 0) {
|
|
|
|
pushxGenericCommand(c,c->argv[3],c->argv[4],REDIS_TAIL);
|
|
|
|
} else if (strcasecmp(c->argv[2]->ptr,"before") == 0) {
|
|
|
|
pushxGenericCommand(c,c->argv[3],c->argv[4],REDIS_HEAD);
|
|
|
|
} else {
|
|
|
|
addReply(c,shared.syntaxerr);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
void llenCommand(redisClient *c) {
|
|
|
|
robj *o = lookupKeyReadOrReply(c,c->argv[1],shared.czero);
|
|
|
|
if (o == NULL || checkType(c,o,REDIS_LIST)) return;
|
2010-09-02 08:30:56 -04:00
|
|
|
addReplyLongLong(c,listTypeLength(o));
|
2010-06-21 18:07:48 -04:00
|
|
|
}
|
|
|
|
|
|
|
|
void lindexCommand(redisClient *c) {
|
|
|
|
robj *o = lookupKeyReadOrReply(c,c->argv[1],shared.nullbulk);
|
|
|
|
if (o == NULL || checkType(c,o,REDIS_LIST)) return;
|
2011-12-19 06:29:46 -05:00
|
|
|
long index;
|
2010-06-21 18:07:48 -04:00
|
|
|
robj *value = NULL;
|
|
|
|
|
2011-12-19 06:29:46 -05:00
|
|
|
if ((getLongFromObjectOrReply(c, c->argv[2], &index, NULL) != REDIS_OK))
|
|
|
|
return;
|
|
|
|
|
2010-06-21 18:07:48 -04:00
|
|
|
if (o->encoding == REDIS_ENCODING_ZIPLIST) {
|
|
|
|
unsigned char *p;
|
|
|
|
unsigned char *vstr;
|
|
|
|
unsigned int vlen;
|
|
|
|
long long vlong;
|
|
|
|
p = ziplistIndex(o->ptr,index);
|
|
|
|
if (ziplistGet(p,&vstr,&vlen,&vlong)) {
|
|
|
|
if (vstr) {
|
|
|
|
value = createStringObject((char*)vstr,vlen);
|
|
|
|
} else {
|
|
|
|
value = createStringObjectFromLongLong(vlong);
|
|
|
|
}
|
|
|
|
addReplyBulk(c,value);
|
|
|
|
decrRefCount(value);
|
|
|
|
} else {
|
|
|
|
addReply(c,shared.nullbulk);
|
|
|
|
}
|
|
|
|
} else if (o->encoding == REDIS_ENCODING_LINKEDLIST) {
|
|
|
|
listNode *ln = listIndex(o->ptr,index);
|
|
|
|
if (ln != NULL) {
|
|
|
|
value = listNodeValue(ln);
|
|
|
|
addReplyBulk(c,value);
|
|
|
|
} else {
|
|
|
|
addReply(c,shared.nullbulk);
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
redisPanic("Unknown list encoding");
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
void lsetCommand(redisClient *c) {
|
|
|
|
robj *o = lookupKeyWriteOrReply(c,c->argv[1],shared.nokeyerr);
|
|
|
|
if (o == NULL || checkType(c,o,REDIS_LIST)) return;
|
2011-12-19 06:29:46 -05:00
|
|
|
long index;
|
2010-10-17 11:21:41 -04:00
|
|
|
robj *value = (c->argv[3] = tryObjectEncoding(c->argv[3]));
|
2010-06-21 18:07:48 -04:00
|
|
|
|
2011-12-19 06:29:46 -05:00
|
|
|
if ((getLongFromObjectOrReply(c, c->argv[2], &index, NULL) != REDIS_OK))
|
|
|
|
return;
|
|
|
|
|
2010-06-21 18:07:48 -04:00
|
|
|
listTypeTryConversion(o,value);
|
|
|
|
if (o->encoding == REDIS_ENCODING_ZIPLIST) {
|
|
|
|
unsigned char *p, *zl = o->ptr;
|
|
|
|
p = ziplistIndex(zl,index);
|
|
|
|
if (p == NULL) {
|
|
|
|
addReply(c,shared.outofrangeerr);
|
|
|
|
} else {
|
|
|
|
o->ptr = ziplistDelete(o->ptr,&p);
|
|
|
|
value = getDecodedObject(value);
|
|
|
|
o->ptr = ziplistInsert(o->ptr,p,value->ptr,sdslen(value->ptr));
|
|
|
|
decrRefCount(value);
|
|
|
|
addReply(c,shared.ok);
|
2010-12-29 13:39:42 -05:00
|
|
|
signalModifiedKey(c->db,c->argv[1]);
|
2010-06-21 18:07:48 -04:00
|
|
|
server.dirty++;
|
|
|
|
}
|
|
|
|
} else if (o->encoding == REDIS_ENCODING_LINKEDLIST) {
|
|
|
|
listNode *ln = listIndex(o->ptr,index);
|
|
|
|
if (ln == NULL) {
|
|
|
|
addReply(c,shared.outofrangeerr);
|
|
|
|
} else {
|
|
|
|
decrRefCount((robj*)listNodeValue(ln));
|
|
|
|
listNodeValue(ln) = value;
|
|
|
|
incrRefCount(value);
|
|
|
|
addReply(c,shared.ok);
|
2010-12-29 13:39:42 -05:00
|
|
|
signalModifiedKey(c->db,c->argv[1]);
|
2010-06-21 18:07:48 -04:00
|
|
|
server.dirty++;
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
redisPanic("Unknown list encoding");
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
void popGenericCommand(redisClient *c, int where) {
|
|
|
|
robj *o = lookupKeyWriteOrReply(c,c->argv[1],shared.nullbulk);
|
|
|
|
if (o == NULL || checkType(c,o,REDIS_LIST)) return;
|
|
|
|
|
|
|
|
robj *value = listTypePop(o,where);
|
|
|
|
if (value == NULL) {
|
|
|
|
addReply(c,shared.nullbulk);
|
|
|
|
} else {
|
|
|
|
addReplyBulk(c,value);
|
|
|
|
decrRefCount(value);
|
|
|
|
if (listTypeLength(o) == 0) dbDelete(c->db,c->argv[1]);
|
2010-12-29 13:39:42 -05:00
|
|
|
signalModifiedKey(c->db,c->argv[1]);
|
2010-06-21 18:07:48 -04:00
|
|
|
server.dirty++;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
void lpopCommand(redisClient *c) {
|
|
|
|
popGenericCommand(c,REDIS_HEAD);
|
|
|
|
}
|
|
|
|
|
|
|
|
void rpopCommand(redisClient *c) {
|
|
|
|
popGenericCommand(c,REDIS_TAIL);
|
|
|
|
}
|
|
|
|
|
|
|
|
void lrangeCommand(redisClient *c) {
|
2010-12-07 10:33:13 -05:00
|
|
|
robj *o;
|
2012-01-31 04:35:52 -05:00
|
|
|
long start, end, llen, rangelen;
|
2010-06-21 18:07:48 -04:00
|
|
|
|
2011-12-19 06:29:46 -05:00
|
|
|
if ((getLongFromObjectOrReply(c, c->argv[2], &start, NULL) != REDIS_OK) ||
|
|
|
|
(getLongFromObjectOrReply(c, c->argv[3], &end, NULL) != REDIS_OK)) return;
|
|
|
|
|
2010-06-21 18:07:48 -04:00
|
|
|
if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.emptymultibulk)) == NULL
|
|
|
|
|| checkType(c,o,REDIS_LIST)) return;
|
|
|
|
llen = listTypeLength(o);
|
|
|
|
|
|
|
|
/* convert negative indexes */
|
|
|
|
if (start < 0) start = llen+start;
|
|
|
|
if (end < 0) end = llen+end;
|
|
|
|
if (start < 0) start = 0;
|
|
|
|
|
2010-07-05 15:16:33 -04:00
|
|
|
/* Invariant: start >= 0, so this test will be true when end < 0.
|
|
|
|
* The range is empty when start > end or start >= length. */
|
2010-06-21 18:07:48 -04:00
|
|
|
if (start > end || start >= llen) {
|
|
|
|
addReply(c,shared.emptymultibulk);
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
if (end >= llen) end = llen-1;
|
|
|
|
rangelen = (end-start)+1;
|
|
|
|
|
|
|
|
/* Return the result in form of a multi-bulk reply */
|
2010-09-02 06:38:34 -04:00
|
|
|
addReplyMultiBulkLen(c,rangelen);
|
2010-12-07 10:33:13 -05:00
|
|
|
if (o->encoding == REDIS_ENCODING_ZIPLIST) {
|
|
|
|
unsigned char *p = ziplistIndex(o->ptr,start);
|
|
|
|
unsigned char *vstr;
|
|
|
|
unsigned int vlen;
|
|
|
|
long long vlong;
|
|
|
|
|
|
|
|
while(rangelen--) {
|
|
|
|
ziplistGet(p,&vstr,&vlen,&vlong);
|
|
|
|
if (vstr) {
|
|
|
|
addReplyBulkCBuffer(c,vstr,vlen);
|
|
|
|
} else {
|
|
|
|
addReplyBulkLongLong(c,vlong);
|
|
|
|
}
|
|
|
|
p = ziplistNext(o->ptr,p);
|
|
|
|
}
|
|
|
|
} else if (o->encoding == REDIS_ENCODING_LINKEDLIST) {
|
2011-09-14 09:10:28 -04:00
|
|
|
listNode *ln;
|
|
|
|
|
|
|
|
/* If we are nearest to the end of the list, reach the element
|
|
|
|
* starting from tail and going backward, as it is faster. */
|
|
|
|
if (start > llen/2) start -= llen;
|
|
|
|
ln = listIndex(o->ptr,start);
|
2010-12-07 10:33:13 -05:00
|
|
|
|
|
|
|
while(rangelen--) {
|
|
|
|
addReplyBulk(c,ln->value);
|
|
|
|
ln = ln->next;
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
redisPanic("List encoding is not LINKEDLIST nor ZIPLIST!");
|
2010-06-21 18:07:48 -04:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
void ltrimCommand(redisClient *c) {
|
|
|
|
robj *o;
|
2012-01-31 04:35:52 -05:00
|
|
|
long start, end, llen, j, ltrim, rtrim;
|
2010-06-21 18:07:48 -04:00
|
|
|
list *list;
|
|
|
|
listNode *ln;
|
|
|
|
|
2011-12-19 06:29:46 -05:00
|
|
|
if ((getLongFromObjectOrReply(c, c->argv[2], &start, NULL) != REDIS_OK) ||
|
|
|
|
(getLongFromObjectOrReply(c, c->argv[3], &end, NULL) != REDIS_OK)) return;
|
|
|
|
|
2010-06-21 18:07:48 -04:00
|
|
|
if ((o = lookupKeyWriteOrReply(c,c->argv[1],shared.ok)) == NULL ||
|
|
|
|
checkType(c,o,REDIS_LIST)) return;
|
|
|
|
llen = listTypeLength(o);
|
|
|
|
|
|
|
|
/* convert negative indexes */
|
|
|
|
if (start < 0) start = llen+start;
|
|
|
|
if (end < 0) end = llen+end;
|
|
|
|
if (start < 0) start = 0;
|
|
|
|
|
2010-07-05 15:16:33 -04:00
|
|
|
/* Invariant: start >= 0, so this test will be true when end < 0.
|
|
|
|
* The range is empty when start > end or start >= length. */
|
2010-06-21 18:07:48 -04:00
|
|
|
if (start > end || start >= llen) {
|
|
|
|
/* Out of range start or start > end result in empty list */
|
|
|
|
ltrim = llen;
|
|
|
|
rtrim = 0;
|
|
|
|
} else {
|
|
|
|
if (end >= llen) end = llen-1;
|
|
|
|
ltrim = start;
|
|
|
|
rtrim = llen-end-1;
|
|
|
|
}
|
|
|
|
|
|
|
|
/* Remove list elements to perform the trim */
|
|
|
|
if (o->encoding == REDIS_ENCODING_ZIPLIST) {
|
|
|
|
o->ptr = ziplistDeleteRange(o->ptr,0,ltrim);
|
|
|
|
o->ptr = ziplistDeleteRange(o->ptr,-rtrim,rtrim);
|
|
|
|
} else if (o->encoding == REDIS_ENCODING_LINKEDLIST) {
|
|
|
|
list = o->ptr;
|
|
|
|
for (j = 0; j < ltrim; j++) {
|
|
|
|
ln = listFirst(list);
|
|
|
|
listDelNode(list,ln);
|
|
|
|
}
|
|
|
|
for (j = 0; j < rtrim; j++) {
|
|
|
|
ln = listLast(list);
|
|
|
|
listDelNode(list,ln);
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
redisPanic("Unknown list encoding");
|
|
|
|
}
|
|
|
|
if (listTypeLength(o) == 0) dbDelete(c->db,c->argv[1]);
|
2010-12-29 13:39:42 -05:00
|
|
|
signalModifiedKey(c->db,c->argv[1]);
|
2010-06-21 18:07:48 -04:00
|
|
|
server.dirty++;
|
|
|
|
addReply(c,shared.ok);
|
|
|
|
}
|
|
|
|
|
|
|
|
void lremCommand(redisClient *c) {
|
2010-10-17 11:21:41 -04:00
|
|
|
robj *subject, *obj;
|
|
|
|
obj = c->argv[3] = tryObjectEncoding(c->argv[3]);
|
2011-12-19 06:29:46 -05:00
|
|
|
long toremove;
|
2012-01-31 04:35:52 -05:00
|
|
|
long removed = 0;
|
2010-06-21 18:07:48 -04:00
|
|
|
listTypeEntry entry;
|
|
|
|
|
2011-12-19 06:29:46 -05:00
|
|
|
if ((getLongFromObjectOrReply(c, c->argv[2], &toremove, NULL) != REDIS_OK))
|
|
|
|
return;
|
|
|
|
|
2010-06-21 18:07:48 -04:00
|
|
|
subject = lookupKeyWriteOrReply(c,c->argv[1],shared.czero);
|
|
|
|
if (subject == NULL || checkType(c,subject,REDIS_LIST)) return;
|
|
|
|
|
|
|
|
/* Make sure obj is raw when we're dealing with a ziplist */
|
|
|
|
if (subject->encoding == REDIS_ENCODING_ZIPLIST)
|
|
|
|
obj = getDecodedObject(obj);
|
|
|
|
|
|
|
|
listTypeIterator *li;
|
|
|
|
if (toremove < 0) {
|
|
|
|
toremove = -toremove;
|
|
|
|
li = listTypeInitIterator(subject,-1,REDIS_HEAD);
|
|
|
|
} else {
|
|
|
|
li = listTypeInitIterator(subject,0,REDIS_TAIL);
|
|
|
|
}
|
|
|
|
|
|
|
|
while (listTypeNext(li,&entry)) {
|
|
|
|
if (listTypeEqual(&entry,obj)) {
|
|
|
|
listTypeDelete(&entry);
|
|
|
|
server.dirty++;
|
|
|
|
removed++;
|
|
|
|
if (toremove && removed == toremove) break;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
listTypeReleaseIterator(li);
|
|
|
|
|
|
|
|
/* Clean up raw encoded object */
|
|
|
|
if (subject->encoding == REDIS_ENCODING_ZIPLIST)
|
|
|
|
decrRefCount(obj);
|
|
|
|
|
|
|
|
if (listTypeLength(subject) == 0) dbDelete(c->db,c->argv[1]);
|
2010-09-02 08:30:56 -04:00
|
|
|
addReplyLongLong(c,removed);
|
2010-12-29 13:39:42 -05:00
|
|
|
if (removed) signalModifiedKey(c->db,c->argv[1]);
|
2010-06-21 18:07:48 -04:00
|
|
|
}
|
|
|
|
|
|
|
|
/* This is the semantic of this command:
|
|
|
|
* RPOPLPUSH srclist dstlist:
|
2010-12-06 08:48:58 -05:00
|
|
|
* IF LLEN(srclist) > 0
|
|
|
|
* element = RPOP srclist
|
|
|
|
* LPUSH dstlist element
|
|
|
|
* RETURN element
|
|
|
|
* ELSE
|
|
|
|
* RETURN nil
|
|
|
|
* END
|
2010-06-21 18:07:48 -04:00
|
|
|
* END
|
|
|
|
*
|
|
|
|
* The idea is to be able to get an element from a list in a reliable way
|
|
|
|
* since the element is not just returned but pushed against another list
|
|
|
|
* as well. This command was originally proposed by Ezra Zygmuntowicz.
|
|
|
|
*/
|
2010-12-06 08:48:58 -05:00
|
|
|
|
A reimplementation of blocking operation internals.
Redis provides support for blocking operations such as BLPOP or BRPOP.
This operations are identical to normal LPOP and RPOP operations as long
as there are elements in the target list, but if the list is empty they
block waiting for new data to arrive to the list.
All the clients blocked waiting for th same list are served in a FIFO
way, so the first that blocked is the first to be served when there is
more data pushed by another client into the list.
The previous implementation of blocking operations was conceived to
serve clients in the context of push operations. For for instance:
1) There is a client "A" blocked on list "foo".
2) The client "B" performs `LPUSH foo somevalue`.
3) The client "A" is served in the context of the "B" LPUSH,
synchronously.
Processing things in a synchronous way was useful as if "A" pushes a
value that is served by "B", from the point of view of the database is a
NOP (no operation) thing, that is, nothing is replicated, nothing is
written in the AOF file, and so forth.
However later we implemented two things:
1) Variadic LPUSH that could add multiple values to a list in the
context of a single call.
2) BRPOPLPUSH that was a version of BRPOP that also provided a "PUSH"
side effect when receiving data.
This forced us to make the synchronous implementation more complex. If
client "B" is waiting for data, and "A" pushes three elemnents in a
single call, we needed to propagate an LPUSH with a missing argument
in the AOF and replication link. We also needed to make sure to
replicate the LPUSH side of BRPOPLPUSH, but only if in turn did not
happened to serve another blocking client into another list ;)
This were complex but with a few of mutually recursive functions
everything worked as expected... until one day we introduced scripting
in Redis.
Scripting + synchronous blocking operations = Issue #614.
Basically you can't "rewrite" a script to have just a partial effect on
the replicas and AOF file if the script happened to serve a few blocked
clients.
The solution to all this problems, implemented by this commit, is to
change the way we serve blocked clients. Instead of serving the blocked
clients synchronously, in the context of the command performing the PUSH
operation, it is now an asynchronous and iterative process:
1) If a key that has clients blocked waiting for data is the subject of
a list push operation, We simply mark keys as "ready" and put it into a
queue.
2) Every command pushing stuff on lists, as a variadic LPUSH, a script,
or whatever it is, is replicated verbatim without any rewriting.
3) Every time a Redis command, a MULTI/EXEC block, or a script,
completed its execution, we run the list of keys ready to serve blocked
clients (as more data arrived), and process this list serving the
blocked clients.
4) As a result of "3" maybe more keys are ready again for other clients
(as a result of BRPOPLPUSH we may have push operations), so we iterate
back to step "3" if it's needed.
The new code has a much simpler semantics, and a simpler to understand
implementation, with the disadvantage of not being able to "optmize out"
a PUSH+BPOP as a No OP.
This commit will be tested with care before the final merge, more tests
will be added likely.
2012-09-04 04:37:49 -04:00
|
|
|
void rpoplpushHandlePush(redisClient *c, robj *dstkey, robj *dstobj, robj *value) {
|
|
|
|
/* Create the list if the key does not exist */
|
|
|
|
if (!dstobj) {
|
|
|
|
dstobj = createZiplistObject();
|
|
|
|
dbAdd(c->db,dstkey,dstobj);
|
|
|
|
signalListAsReady(c,dstkey);
|
2010-12-06 08:48:58 -05:00
|
|
|
}
|
A reimplementation of blocking operation internals.
Redis provides support for blocking operations such as BLPOP or BRPOP.
This operations are identical to normal LPOP and RPOP operations as long
as there are elements in the target list, but if the list is empty they
block waiting for new data to arrive to the list.
All the clients blocked waiting for th same list are served in a FIFO
way, so the first that blocked is the first to be served when there is
more data pushed by another client into the list.
The previous implementation of blocking operations was conceived to
serve clients in the context of push operations. For for instance:
1) There is a client "A" blocked on list "foo".
2) The client "B" performs `LPUSH foo somevalue`.
3) The client "A" is served in the context of the "B" LPUSH,
synchronously.
Processing things in a synchronous way was useful as if "A" pushes a
value that is served by "B", from the point of view of the database is a
NOP (no operation) thing, that is, nothing is replicated, nothing is
written in the AOF file, and so forth.
However later we implemented two things:
1) Variadic LPUSH that could add multiple values to a list in the
context of a single call.
2) BRPOPLPUSH that was a version of BRPOP that also provided a "PUSH"
side effect when receiving data.
This forced us to make the synchronous implementation more complex. If
client "B" is waiting for data, and "A" pushes three elemnents in a
single call, we needed to propagate an LPUSH with a missing argument
in the AOF and replication link. We also needed to make sure to
replicate the LPUSH side of BRPOPLPUSH, but only if in turn did not
happened to serve another blocking client into another list ;)
This were complex but with a few of mutually recursive functions
everything worked as expected... until one day we introduced scripting
in Redis.
Scripting + synchronous blocking operations = Issue #614.
Basically you can't "rewrite" a script to have just a partial effect on
the replicas and AOF file if the script happened to serve a few blocked
clients.
The solution to all this problems, implemented by this commit, is to
change the way we serve blocked clients. Instead of serving the blocked
clients synchronously, in the context of the command performing the PUSH
operation, it is now an asynchronous and iterative process:
1) If a key that has clients blocked waiting for data is the subject of
a list push operation, We simply mark keys as "ready" and put it into a
queue.
2) Every command pushing stuff on lists, as a variadic LPUSH, a script,
or whatever it is, is replicated verbatim without any rewriting.
3) Every time a Redis command, a MULTI/EXEC block, or a script,
completed its execution, we run the list of keys ready to serve blocked
clients (as more data arrived), and process this list serving the
blocked clients.
4) As a result of "3" maybe more keys are ready again for other clients
(as a result of BRPOPLPUSH we may have push operations), so we iterate
back to step "3" if it's needed.
The new code has a much simpler semantics, and a simpler to understand
implementation, with the disadvantage of not being able to "optmize out"
a PUSH+BPOP as a No OP.
This commit will be tested with care before the final merge, more tests
will be added likely.
2012-09-04 04:37:49 -04:00
|
|
|
signalModifiedKey(c->db,dstkey);
|
|
|
|
listTypePush(dstobj,value,REDIS_HEAD);
|
2010-12-06 08:48:58 -05:00
|
|
|
/* Always send the pushed value to the client. */
|
|
|
|
addReplyBulk(c,value);
|
|
|
|
}
|
|
|
|
|
2010-11-08 08:43:21 -05:00
|
|
|
void rpoplpushCommand(redisClient *c) {
|
2010-06-21 18:07:48 -04:00
|
|
|
robj *sobj, *value;
|
|
|
|
if ((sobj = lookupKeyWriteOrReply(c,c->argv[1],shared.nullbulk)) == NULL ||
|
|
|
|
checkType(c,sobj,REDIS_LIST)) return;
|
|
|
|
|
|
|
|
if (listTypeLength(sobj) == 0) {
|
2012-04-18 11:38:02 -04:00
|
|
|
/* This may only happen after loading very old RDB files. Recent
|
|
|
|
* versions of Redis delete keys of empty lists. */
|
2010-06-21 18:07:48 -04:00
|
|
|
addReply(c,shared.nullbulk);
|
|
|
|
} else {
|
|
|
|
robj *dobj = lookupKeyWrite(c->db,c->argv[2]);
|
2011-06-20 11:07:18 -04:00
|
|
|
robj *touchedkey = c->argv[1];
|
|
|
|
|
2010-06-21 18:07:48 -04:00
|
|
|
if (dobj && checkType(c,dobj,REDIS_LIST)) return;
|
|
|
|
value = listTypePop(sobj,REDIS_TAIL);
|
2011-06-20 11:07:18 -04:00
|
|
|
/* We saved touched key, and protect it, since rpoplpushHandlePush
|
A reimplementation of blocking operation internals.
Redis provides support for blocking operations such as BLPOP or BRPOP.
This operations are identical to normal LPOP and RPOP operations as long
as there are elements in the target list, but if the list is empty they
block waiting for new data to arrive to the list.
All the clients blocked waiting for th same list are served in a FIFO
way, so the first that blocked is the first to be served when there is
more data pushed by another client into the list.
The previous implementation of blocking operations was conceived to
serve clients in the context of push operations. For for instance:
1) There is a client "A" blocked on list "foo".
2) The client "B" performs `LPUSH foo somevalue`.
3) The client "A" is served in the context of the "B" LPUSH,
synchronously.
Processing things in a synchronous way was useful as if "A" pushes a
value that is served by "B", from the point of view of the database is a
NOP (no operation) thing, that is, nothing is replicated, nothing is
written in the AOF file, and so forth.
However later we implemented two things:
1) Variadic LPUSH that could add multiple values to a list in the
context of a single call.
2) BRPOPLPUSH that was a version of BRPOP that also provided a "PUSH"
side effect when receiving data.
This forced us to make the synchronous implementation more complex. If
client "B" is waiting for data, and "A" pushes three elemnents in a
single call, we needed to propagate an LPUSH with a missing argument
in the AOF and replication link. We also needed to make sure to
replicate the LPUSH side of BRPOPLPUSH, but only if in turn did not
happened to serve another blocking client into another list ;)
This were complex but with a few of mutually recursive functions
everything worked as expected... until one day we introduced scripting
in Redis.
Scripting + synchronous blocking operations = Issue #614.
Basically you can't "rewrite" a script to have just a partial effect on
the replicas and AOF file if the script happened to serve a few blocked
clients.
The solution to all this problems, implemented by this commit, is to
change the way we serve blocked clients. Instead of serving the blocked
clients synchronously, in the context of the command performing the PUSH
operation, it is now an asynchronous and iterative process:
1) If a key that has clients blocked waiting for data is the subject of
a list push operation, We simply mark keys as "ready" and put it into a
queue.
2) Every command pushing stuff on lists, as a variadic LPUSH, a script,
or whatever it is, is replicated verbatim without any rewriting.
3) Every time a Redis command, a MULTI/EXEC block, or a script,
completed its execution, we run the list of keys ready to serve blocked
clients (as more data arrived), and process this list serving the
blocked clients.
4) As a result of "3" maybe more keys are ready again for other clients
(as a result of BRPOPLPUSH we may have push operations), so we iterate
back to step "3" if it's needed.
The new code has a much simpler semantics, and a simpler to understand
implementation, with the disadvantage of not being able to "optmize out"
a PUSH+BPOP as a No OP.
This commit will be tested with care before the final merge, more tests
will be added likely.
2012-09-04 04:37:49 -04:00
|
|
|
* may change the client command argument vector (it does not
|
|
|
|
* currently). */
|
2011-06-20 11:07:18 -04:00
|
|
|
incrRefCount(touchedkey);
|
A reimplementation of blocking operation internals.
Redis provides support for blocking operations such as BLPOP or BRPOP.
This operations are identical to normal LPOP and RPOP operations as long
as there are elements in the target list, but if the list is empty they
block waiting for new data to arrive to the list.
All the clients blocked waiting for th same list are served in a FIFO
way, so the first that blocked is the first to be served when there is
more data pushed by another client into the list.
The previous implementation of blocking operations was conceived to
serve clients in the context of push operations. For for instance:
1) There is a client "A" blocked on list "foo".
2) The client "B" performs `LPUSH foo somevalue`.
3) The client "A" is served in the context of the "B" LPUSH,
synchronously.
Processing things in a synchronous way was useful as if "A" pushes a
value that is served by "B", from the point of view of the database is a
NOP (no operation) thing, that is, nothing is replicated, nothing is
written in the AOF file, and so forth.
However later we implemented two things:
1) Variadic LPUSH that could add multiple values to a list in the
context of a single call.
2) BRPOPLPUSH that was a version of BRPOP that also provided a "PUSH"
side effect when receiving data.
This forced us to make the synchronous implementation more complex. If
client "B" is waiting for data, and "A" pushes three elemnents in a
single call, we needed to propagate an LPUSH with a missing argument
in the AOF and replication link. We also needed to make sure to
replicate the LPUSH side of BRPOPLPUSH, but only if in turn did not
happened to serve another blocking client into another list ;)
This were complex but with a few of mutually recursive functions
everything worked as expected... until one day we introduced scripting
in Redis.
Scripting + synchronous blocking operations = Issue #614.
Basically you can't "rewrite" a script to have just a partial effect on
the replicas and AOF file if the script happened to serve a few blocked
clients.
The solution to all this problems, implemented by this commit, is to
change the way we serve blocked clients. Instead of serving the blocked
clients synchronously, in the context of the command performing the PUSH
operation, it is now an asynchronous and iterative process:
1) If a key that has clients blocked waiting for data is the subject of
a list push operation, We simply mark keys as "ready" and put it into a
queue.
2) Every command pushing stuff on lists, as a variadic LPUSH, a script,
or whatever it is, is replicated verbatim without any rewriting.
3) Every time a Redis command, a MULTI/EXEC block, or a script,
completed its execution, we run the list of keys ready to serve blocked
clients (as more data arrived), and process this list serving the
blocked clients.
4) As a result of "3" maybe more keys are ready again for other clients
(as a result of BRPOPLPUSH we may have push operations), so we iterate
back to step "3" if it's needed.
The new code has a much simpler semantics, and a simpler to understand
implementation, with the disadvantage of not being able to "optmize out"
a PUSH+BPOP as a No OP.
This commit will be tested with care before the final merge, more tests
will be added likely.
2012-09-04 04:37:49 -04:00
|
|
|
rpoplpushHandlePush(c,c->argv[2],dobj,value);
|
2010-06-21 18:07:48 -04:00
|
|
|
|
|
|
|
/* listTypePop returns an object with its refcount incremented */
|
|
|
|
decrRefCount(value);
|
|
|
|
|
|
|
|
/* Delete the source list when it is empty */
|
2011-06-20 11:07:18 -04:00
|
|
|
if (listTypeLength(sobj) == 0) dbDelete(c->db,touchedkey);
|
|
|
|
signalModifiedKey(c->db,touchedkey);
|
|
|
|
decrRefCount(touchedkey);
|
2010-06-21 18:07:48 -04:00
|
|
|
server.dirty++;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/*-----------------------------------------------------------------------------
|
|
|
|
* Blocking POP operations
|
|
|
|
*----------------------------------------------------------------------------*/
|
|
|
|
|
A reimplementation of blocking operation internals.
Redis provides support for blocking operations such as BLPOP or BRPOP.
This operations are identical to normal LPOP and RPOP operations as long
as there are elements in the target list, but if the list is empty they
block waiting for new data to arrive to the list.
All the clients blocked waiting for th same list are served in a FIFO
way, so the first that blocked is the first to be served when there is
more data pushed by another client into the list.
The previous implementation of blocking operations was conceived to
serve clients in the context of push operations. For for instance:
1) There is a client "A" blocked on list "foo".
2) The client "B" performs `LPUSH foo somevalue`.
3) The client "A" is served in the context of the "B" LPUSH,
synchronously.
Processing things in a synchronous way was useful as if "A" pushes a
value that is served by "B", from the point of view of the database is a
NOP (no operation) thing, that is, nothing is replicated, nothing is
written in the AOF file, and so forth.
However later we implemented two things:
1) Variadic LPUSH that could add multiple values to a list in the
context of a single call.
2) BRPOPLPUSH that was a version of BRPOP that also provided a "PUSH"
side effect when receiving data.
This forced us to make the synchronous implementation more complex. If
client "B" is waiting for data, and "A" pushes three elemnents in a
single call, we needed to propagate an LPUSH with a missing argument
in the AOF and replication link. We also needed to make sure to
replicate the LPUSH side of BRPOPLPUSH, but only if in turn did not
happened to serve another blocking client into another list ;)
This were complex but with a few of mutually recursive functions
everything worked as expected... until one day we introduced scripting
in Redis.
Scripting + synchronous blocking operations = Issue #614.
Basically you can't "rewrite" a script to have just a partial effect on
the replicas and AOF file if the script happened to serve a few blocked
clients.
The solution to all this problems, implemented by this commit, is to
change the way we serve blocked clients. Instead of serving the blocked
clients synchronously, in the context of the command performing the PUSH
operation, it is now an asynchronous and iterative process:
1) If a key that has clients blocked waiting for data is the subject of
a list push operation, We simply mark keys as "ready" and put it into a
queue.
2) Every command pushing stuff on lists, as a variadic LPUSH, a script,
or whatever it is, is replicated verbatim without any rewriting.
3) Every time a Redis command, a MULTI/EXEC block, or a script,
completed its execution, we run the list of keys ready to serve blocked
clients (as more data arrived), and process this list serving the
blocked clients.
4) As a result of "3" maybe more keys are ready again for other clients
(as a result of BRPOPLPUSH we may have push operations), so we iterate
back to step "3" if it's needed.
The new code has a much simpler semantics, and a simpler to understand
implementation, with the disadvantage of not being able to "optmize out"
a PUSH+BPOP as a No OP.
This commit will be tested with care before the final merge, more tests
will be added likely.
2012-09-04 04:37:49 -04:00
|
|
|
/* This is how the current blocking POP works, we use BLPOP as example:
|
2010-06-21 18:07:48 -04:00
|
|
|
* - If the user calls BLPOP and the key exists and contains a non empty list
|
|
|
|
* then LPOP is called instead. So BLPOP is semantically the same as LPOP
|
A reimplementation of blocking operation internals.
Redis provides support for blocking operations such as BLPOP or BRPOP.
This operations are identical to normal LPOP and RPOP operations as long
as there are elements in the target list, but if the list is empty they
block waiting for new data to arrive to the list.
All the clients blocked waiting for th same list are served in a FIFO
way, so the first that blocked is the first to be served when there is
more data pushed by another client into the list.
The previous implementation of blocking operations was conceived to
serve clients in the context of push operations. For for instance:
1) There is a client "A" blocked on list "foo".
2) The client "B" performs `LPUSH foo somevalue`.
3) The client "A" is served in the context of the "B" LPUSH,
synchronously.
Processing things in a synchronous way was useful as if "A" pushes a
value that is served by "B", from the point of view of the database is a
NOP (no operation) thing, that is, nothing is replicated, nothing is
written in the AOF file, and so forth.
However later we implemented two things:
1) Variadic LPUSH that could add multiple values to a list in the
context of a single call.
2) BRPOPLPUSH that was a version of BRPOP that also provided a "PUSH"
side effect when receiving data.
This forced us to make the synchronous implementation more complex. If
client "B" is waiting for data, and "A" pushes three elemnents in a
single call, we needed to propagate an LPUSH with a missing argument
in the AOF and replication link. We also needed to make sure to
replicate the LPUSH side of BRPOPLPUSH, but only if in turn did not
happened to serve another blocking client into another list ;)
This were complex but with a few of mutually recursive functions
everything worked as expected... until one day we introduced scripting
in Redis.
Scripting + synchronous blocking operations = Issue #614.
Basically you can't "rewrite" a script to have just a partial effect on
the replicas and AOF file if the script happened to serve a few blocked
clients.
The solution to all this problems, implemented by this commit, is to
change the way we serve blocked clients. Instead of serving the blocked
clients synchronously, in the context of the command performing the PUSH
operation, it is now an asynchronous and iterative process:
1) If a key that has clients blocked waiting for data is the subject of
a list push operation, We simply mark keys as "ready" and put it into a
queue.
2) Every command pushing stuff on lists, as a variadic LPUSH, a script,
or whatever it is, is replicated verbatim without any rewriting.
3) Every time a Redis command, a MULTI/EXEC block, or a script,
completed its execution, we run the list of keys ready to serve blocked
clients (as more data arrived), and process this list serving the
blocked clients.
4) As a result of "3" maybe more keys are ready again for other clients
(as a result of BRPOPLPUSH we may have push operations), so we iterate
back to step "3" if it's needed.
The new code has a much simpler semantics, and a simpler to understand
implementation, with the disadvantage of not being able to "optmize out"
a PUSH+BPOP as a No OP.
This commit will be tested with care before the final merge, more tests
will be added likely.
2012-09-04 04:37:49 -04:00
|
|
|
* if blocking is not required.
|
2010-06-21 18:07:48 -04:00
|
|
|
* - If instead BLPOP is called and the key does not exists or the list is
|
|
|
|
* empty we need to block. In order to do so we remove the notification for
|
|
|
|
* new data to read in the client socket (so that we'll not serve new
|
|
|
|
* requests if the blocking request is not served). Also we put the client
|
|
|
|
* in a dictionary (db->blocking_keys) mapping keys to a list of clients
|
|
|
|
* blocking for this keys.
|
|
|
|
* - If a PUSH operation against a key with blocked clients waiting is
|
A reimplementation of blocking operation internals.
Redis provides support for blocking operations such as BLPOP or BRPOP.
This operations are identical to normal LPOP and RPOP operations as long
as there are elements in the target list, but if the list is empty they
block waiting for new data to arrive to the list.
All the clients blocked waiting for th same list are served in a FIFO
way, so the first that blocked is the first to be served when there is
more data pushed by another client into the list.
The previous implementation of blocking operations was conceived to
serve clients in the context of push operations. For for instance:
1) There is a client "A" blocked on list "foo".
2) The client "B" performs `LPUSH foo somevalue`.
3) The client "A" is served in the context of the "B" LPUSH,
synchronously.
Processing things in a synchronous way was useful as if "A" pushes a
value that is served by "B", from the point of view of the database is a
NOP (no operation) thing, that is, nothing is replicated, nothing is
written in the AOF file, and so forth.
However later we implemented two things:
1) Variadic LPUSH that could add multiple values to a list in the
context of a single call.
2) BRPOPLPUSH that was a version of BRPOP that also provided a "PUSH"
side effect when receiving data.
This forced us to make the synchronous implementation more complex. If
client "B" is waiting for data, and "A" pushes three elemnents in a
single call, we needed to propagate an LPUSH with a missing argument
in the AOF and replication link. We also needed to make sure to
replicate the LPUSH side of BRPOPLPUSH, but only if in turn did not
happened to serve another blocking client into another list ;)
This were complex but with a few of mutually recursive functions
everything worked as expected... until one day we introduced scripting
in Redis.
Scripting + synchronous blocking operations = Issue #614.
Basically you can't "rewrite" a script to have just a partial effect on
the replicas and AOF file if the script happened to serve a few blocked
clients.
The solution to all this problems, implemented by this commit, is to
change the way we serve blocked clients. Instead of serving the blocked
clients synchronously, in the context of the command performing the PUSH
operation, it is now an asynchronous and iterative process:
1) If a key that has clients blocked waiting for data is the subject of
a list push operation, We simply mark keys as "ready" and put it into a
queue.
2) Every command pushing stuff on lists, as a variadic LPUSH, a script,
or whatever it is, is replicated verbatim without any rewriting.
3) Every time a Redis command, a MULTI/EXEC block, or a script,
completed its execution, we run the list of keys ready to serve blocked
clients (as more data arrived), and process this list serving the
blocked clients.
4) As a result of "3" maybe more keys are ready again for other clients
(as a result of BRPOPLPUSH we may have push operations), so we iterate
back to step "3" if it's needed.
The new code has a much simpler semantics, and a simpler to understand
implementation, with the disadvantage of not being able to "optmize out"
a PUSH+BPOP as a No OP.
This commit will be tested with care before the final merge, more tests
will be added likely.
2012-09-04 04:37:49 -04:00
|
|
|
* performed, we mark this key as "ready", and after the current command,
|
|
|
|
* MULTI/EXEC block, or script, is executed, we serve all the clients waiting
|
|
|
|
* for this list, from the one that blocked first, to the last, accordingly
|
|
|
|
* to the number of elements we have in the ready list.
|
2010-06-21 18:07:48 -04:00
|
|
|
*/
|
|
|
|
|
|
|
|
/* Set a client in blocking mode for the specified key, with the specified
|
|
|
|
* timeout */
|
2010-11-08 18:47:46 -05:00
|
|
|
void blockForKeys(redisClient *c, robj **keys, int numkeys, time_t timeout, robj *target) {
|
2010-06-21 18:07:48 -04:00
|
|
|
dictEntry *de;
|
|
|
|
list *l;
|
2012-12-02 14:36:18 -05:00
|
|
|
int j;
|
2010-06-21 18:07:48 -04:00
|
|
|
|
2010-11-09 13:06:25 -05:00
|
|
|
c->bpop.timeout = timeout;
|
|
|
|
c->bpop.target = target;
|
2010-11-08 18:47:46 -05:00
|
|
|
|
2012-12-01 06:26:07 -05:00
|
|
|
if (target != NULL) incrRefCount(target);
|
|
|
|
|
2010-06-21 18:07:48 -04:00
|
|
|
for (j = 0; j < numkeys; j++) {
|
2012-12-02 14:36:18 -05:00
|
|
|
/* If the key already exists in the dict ignore it. */
|
|
|
|
if (dictAdd(c->bpop.keys,keys[j],NULL) != DICT_OK) continue;
|
2010-06-21 18:07:48 -04:00
|
|
|
incrRefCount(keys[j]);
|
|
|
|
|
|
|
|
/* And in the other "side", to map keys -> clients */
|
|
|
|
de = dictFind(c->db->blocking_keys,keys[j]);
|
|
|
|
if (de == NULL) {
|
|
|
|
int retval;
|
|
|
|
|
|
|
|
/* For every key we take a list of clients blocked for it */
|
|
|
|
l = listCreate();
|
|
|
|
retval = dictAdd(c->db->blocking_keys,keys[j],l);
|
|
|
|
incrRefCount(keys[j]);
|
2011-10-04 12:43:03 -04:00
|
|
|
redisAssertWithInfo(c,keys[j],retval == DICT_OK);
|
2010-06-21 18:07:48 -04:00
|
|
|
} else {
|
2011-11-08 11:07:55 -05:00
|
|
|
l = dictGetVal(de);
|
2010-06-21 18:07:48 -04:00
|
|
|
}
|
|
|
|
listAddNodeTail(l,c);
|
|
|
|
}
|
2012-12-01 06:26:07 -05:00
|
|
|
|
2010-06-21 18:07:48 -04:00
|
|
|
/* Mark the client as a blocked client */
|
|
|
|
c->flags |= REDIS_BLOCKED;
|
2010-12-06 08:05:01 -05:00
|
|
|
server.bpop_blocked_clients++;
|
2010-06-21 18:07:48 -04:00
|
|
|
}
|
|
|
|
|
|
|
|
/* Unblock a client that's waiting in a blocking operation such as BLPOP */
|
|
|
|
void unblockClientWaitingData(redisClient *c) {
|
|
|
|
dictEntry *de;
|
2012-12-02 14:36:18 -05:00
|
|
|
dictIterator *di;
|
2010-06-21 18:07:48 -04:00
|
|
|
list *l;
|
|
|
|
|
2012-12-02 14:36:18 -05:00
|
|
|
redisAssertWithInfo(c,NULL,dictSize(c->bpop.keys) != 0);
|
|
|
|
di = dictGetIterator(c->bpop.keys);
|
2010-06-21 18:07:48 -04:00
|
|
|
/* The client may wait for multiple keys, so unblock it for every key. */
|
2012-12-02 14:36:18 -05:00
|
|
|
while((de = dictNext(di)) != NULL) {
|
|
|
|
robj *key = dictGetKey(de);
|
|
|
|
|
2010-06-21 18:07:48 -04:00
|
|
|
/* Remove this client from the list of clients waiting for this key. */
|
2012-12-02 14:36:18 -05:00
|
|
|
l = dictFetchValue(c->db->blocking_keys,key);
|
|
|
|
redisAssertWithInfo(c,key,l != NULL);
|
2010-06-21 18:07:48 -04:00
|
|
|
listDelNode(l,listSearchKey(l,c));
|
|
|
|
/* If the list is empty we need to remove it to avoid wasting memory */
|
|
|
|
if (listLength(l) == 0)
|
2012-12-02 14:36:18 -05:00
|
|
|
dictDelete(c->db->blocking_keys,key);
|
2010-06-21 18:07:48 -04:00
|
|
|
}
|
2012-12-02 14:36:18 -05:00
|
|
|
dictReleaseIterator(di);
|
2010-11-08 18:47:46 -05:00
|
|
|
|
2010-06-21 18:07:48 -04:00
|
|
|
/* Cleanup the client structure */
|
2012-12-02 14:36:18 -05:00
|
|
|
dictEmpty(c->bpop.keys);
|
|
|
|
if (c->bpop.target) {
|
|
|
|
decrRefCount(c->bpop.target);
|
|
|
|
c->bpop.target = NULL;
|
|
|
|
}
|
2011-01-17 04:03:21 -05:00
|
|
|
c->flags &= ~REDIS_BLOCKED;
|
|
|
|
c->flags |= REDIS_UNBLOCKED;
|
2010-12-06 08:05:01 -05:00
|
|
|
server.bpop_blocked_clients--;
|
2010-12-06 10:39:39 -05:00
|
|
|
listAddNodeTail(server.unblocked_clients,c);
|
2010-06-21 18:07:48 -04:00
|
|
|
}
|
|
|
|
|
A reimplementation of blocking operation internals.
Redis provides support for blocking operations such as BLPOP or BRPOP.
This operations are identical to normal LPOP and RPOP operations as long
as there are elements in the target list, but if the list is empty they
block waiting for new data to arrive to the list.
All the clients blocked waiting for th same list are served in a FIFO
way, so the first that blocked is the first to be served when there is
more data pushed by another client into the list.
The previous implementation of blocking operations was conceived to
serve clients in the context of push operations. For for instance:
1) There is a client "A" blocked on list "foo".
2) The client "B" performs `LPUSH foo somevalue`.
3) The client "A" is served in the context of the "B" LPUSH,
synchronously.
Processing things in a synchronous way was useful as if "A" pushes a
value that is served by "B", from the point of view of the database is a
NOP (no operation) thing, that is, nothing is replicated, nothing is
written in the AOF file, and so forth.
However later we implemented two things:
1) Variadic LPUSH that could add multiple values to a list in the
context of a single call.
2) BRPOPLPUSH that was a version of BRPOP that also provided a "PUSH"
side effect when receiving data.
This forced us to make the synchronous implementation more complex. If
client "B" is waiting for data, and "A" pushes three elemnents in a
single call, we needed to propagate an LPUSH with a missing argument
in the AOF and replication link. We also needed to make sure to
replicate the LPUSH side of BRPOPLPUSH, but only if in turn did not
happened to serve another blocking client into another list ;)
This were complex but with a few of mutually recursive functions
everything worked as expected... until one day we introduced scripting
in Redis.
Scripting + synchronous blocking operations = Issue #614.
Basically you can't "rewrite" a script to have just a partial effect on
the replicas and AOF file if the script happened to serve a few blocked
clients.
The solution to all this problems, implemented by this commit, is to
change the way we serve blocked clients. Instead of serving the blocked
clients synchronously, in the context of the command performing the PUSH
operation, it is now an asynchronous and iterative process:
1) If a key that has clients blocked waiting for data is the subject of
a list push operation, We simply mark keys as "ready" and put it into a
queue.
2) Every command pushing stuff on lists, as a variadic LPUSH, a script,
or whatever it is, is replicated verbatim without any rewriting.
3) Every time a Redis command, a MULTI/EXEC block, or a script,
completed its execution, we run the list of keys ready to serve blocked
clients (as more data arrived), and process this list serving the
blocked clients.
4) As a result of "3" maybe more keys are ready again for other clients
(as a result of BRPOPLPUSH we may have push operations), so we iterate
back to step "3" if it's needed.
The new code has a much simpler semantics, and a simpler to understand
implementation, with the disadvantage of not being able to "optmize out"
a PUSH+BPOP as a No OP.
This commit will be tested with care before the final merge, more tests
will be added likely.
2012-09-04 04:37:49 -04:00
|
|
|
/* If the specified key has clients blocked waiting for list pushes, this
|
|
|
|
* function will put the key reference into the server.ready_keys list.
|
|
|
|
* Note that db->ready_keys is an hash table that allows us to avoid putting
|
|
|
|
* the same key agains and again in the list in case of multiple pushes
|
|
|
|
* made by a script or in the context of MULTI/EXEC.
|
2010-06-21 18:07:48 -04:00
|
|
|
*
|
A reimplementation of blocking operation internals.
Redis provides support for blocking operations such as BLPOP or BRPOP.
This operations are identical to normal LPOP and RPOP operations as long
as there are elements in the target list, but if the list is empty they
block waiting for new data to arrive to the list.
All the clients blocked waiting for th same list are served in a FIFO
way, so the first that blocked is the first to be served when there is
more data pushed by another client into the list.
The previous implementation of blocking operations was conceived to
serve clients in the context of push operations. For for instance:
1) There is a client "A" blocked on list "foo".
2) The client "B" performs `LPUSH foo somevalue`.
3) The client "A" is served in the context of the "B" LPUSH,
synchronously.
Processing things in a synchronous way was useful as if "A" pushes a
value that is served by "B", from the point of view of the database is a
NOP (no operation) thing, that is, nothing is replicated, nothing is
written in the AOF file, and so forth.
However later we implemented two things:
1) Variadic LPUSH that could add multiple values to a list in the
context of a single call.
2) BRPOPLPUSH that was a version of BRPOP that also provided a "PUSH"
side effect when receiving data.
This forced us to make the synchronous implementation more complex. If
client "B" is waiting for data, and "A" pushes three elemnents in a
single call, we needed to propagate an LPUSH with a missing argument
in the AOF and replication link. We also needed to make sure to
replicate the LPUSH side of BRPOPLPUSH, but only if in turn did not
happened to serve another blocking client into another list ;)
This were complex but with a few of mutually recursive functions
everything worked as expected... until one day we introduced scripting
in Redis.
Scripting + synchronous blocking operations = Issue #614.
Basically you can't "rewrite" a script to have just a partial effect on
the replicas and AOF file if the script happened to serve a few blocked
clients.
The solution to all this problems, implemented by this commit, is to
change the way we serve blocked clients. Instead of serving the blocked
clients synchronously, in the context of the command performing the PUSH
operation, it is now an asynchronous and iterative process:
1) If a key that has clients blocked waiting for data is the subject of
a list push operation, We simply mark keys as "ready" and put it into a
queue.
2) Every command pushing stuff on lists, as a variadic LPUSH, a script,
or whatever it is, is replicated verbatim without any rewriting.
3) Every time a Redis command, a MULTI/EXEC block, or a script,
completed its execution, we run the list of keys ready to serve blocked
clients (as more data arrived), and process this list serving the
blocked clients.
4) As a result of "3" maybe more keys are ready again for other clients
(as a result of BRPOPLPUSH we may have push operations), so we iterate
back to step "3" if it's needed.
The new code has a much simpler semantics, and a simpler to understand
implementation, with the disadvantage of not being able to "optmize out"
a PUSH+BPOP as a No OP.
This commit will be tested with care before the final merge, more tests
will be added likely.
2012-09-04 04:37:49 -04:00
|
|
|
* The list will be finally processed by handleClientsBlockedOnLists() */
|
|
|
|
void signalListAsReady(redisClient *c, robj *key) {
|
|
|
|
readyList *rl;
|
|
|
|
|
|
|
|
/* No clients blocking for this key? No need to queue it. */
|
|
|
|
if (dictFind(c->db->blocking_keys,key) == NULL) return;
|
|
|
|
|
|
|
|
/* Key was already signaled? No need to queue it again. */
|
|
|
|
if (dictFind(c->db->ready_keys,key) != NULL) return;
|
|
|
|
|
|
|
|
/* Ok, we need to queue this key into server.ready_keys. */
|
|
|
|
rl = zmalloc(sizeof(*rl));
|
|
|
|
rl->key = key;
|
|
|
|
rl->db = c->db;
|
|
|
|
incrRefCount(key);
|
|
|
|
listAddNodeTail(server.ready_keys,rl);
|
|
|
|
|
|
|
|
/* We also add the key in the db->ready_keys dictionary in order
|
|
|
|
* to avoid adding it multiple times into a list with a simple O(1)
|
|
|
|
* check. */
|
|
|
|
incrRefCount(key);
|
|
|
|
redisAssert(dictAdd(c->db->ready_keys,key,NULL) == DICT_OK);
|
|
|
|
}
|
|
|
|
|
|
|
|
/* This is an helper function for handleClientsBlockedOnLists(). It's work
|
|
|
|
* is to serve a specific client (receiver) that is blocked on 'key'
|
|
|
|
* in the context of the specified 'db', doing the following:
|
2010-06-21 18:07:48 -04:00
|
|
|
*
|
A reimplementation of blocking operation internals.
Redis provides support for blocking operations such as BLPOP or BRPOP.
This operations are identical to normal LPOP and RPOP operations as long
as there are elements in the target list, but if the list is empty they
block waiting for new data to arrive to the list.
All the clients blocked waiting for th same list are served in a FIFO
way, so the first that blocked is the first to be served when there is
more data pushed by another client into the list.
The previous implementation of blocking operations was conceived to
serve clients in the context of push operations. For for instance:
1) There is a client "A" blocked on list "foo".
2) The client "B" performs `LPUSH foo somevalue`.
3) The client "A" is served in the context of the "B" LPUSH,
synchronously.
Processing things in a synchronous way was useful as if "A" pushes a
value that is served by "B", from the point of view of the database is a
NOP (no operation) thing, that is, nothing is replicated, nothing is
written in the AOF file, and so forth.
However later we implemented two things:
1) Variadic LPUSH that could add multiple values to a list in the
context of a single call.
2) BRPOPLPUSH that was a version of BRPOP that also provided a "PUSH"
side effect when receiving data.
This forced us to make the synchronous implementation more complex. If
client "B" is waiting for data, and "A" pushes three elemnents in a
single call, we needed to propagate an LPUSH with a missing argument
in the AOF and replication link. We also needed to make sure to
replicate the LPUSH side of BRPOPLPUSH, but only if in turn did not
happened to serve another blocking client into another list ;)
This were complex but with a few of mutually recursive functions
everything worked as expected... until one day we introduced scripting
in Redis.
Scripting + synchronous blocking operations = Issue #614.
Basically you can't "rewrite" a script to have just a partial effect on
the replicas and AOF file if the script happened to serve a few blocked
clients.
The solution to all this problems, implemented by this commit, is to
change the way we serve blocked clients. Instead of serving the blocked
clients synchronously, in the context of the command performing the PUSH
operation, it is now an asynchronous and iterative process:
1) If a key that has clients blocked waiting for data is the subject of
a list push operation, We simply mark keys as "ready" and put it into a
queue.
2) Every command pushing stuff on lists, as a variadic LPUSH, a script,
or whatever it is, is replicated verbatim without any rewriting.
3) Every time a Redis command, a MULTI/EXEC block, or a script,
completed its execution, we run the list of keys ready to serve blocked
clients (as more data arrived), and process this list serving the
blocked clients.
4) As a result of "3" maybe more keys are ready again for other clients
(as a result of BRPOPLPUSH we may have push operations), so we iterate
back to step "3" if it's needed.
The new code has a much simpler semantics, and a simpler to understand
implementation, with the disadvantage of not being able to "optmize out"
a PUSH+BPOP as a No OP.
This commit will be tested with care before the final merge, more tests
will be added likely.
2012-09-04 04:37:49 -04:00
|
|
|
* 1) Provide the client with the 'value' element.
|
|
|
|
* 2) If the dstkey is not NULL (we are serving a BRPOPLPUSH) also push the
|
|
|
|
* 'value' element on the destionation list (the LPUSH side of the command).
|
|
|
|
* 3) Propagate the resulting BRPOP, BLPOP and additional LPUSH if any into
|
|
|
|
* the AOF and replication channel.
|
|
|
|
*
|
|
|
|
* The argument 'where' is REDIS_TAIL or REDIS_HEAD, and indicates if the
|
|
|
|
* 'value' element was popped fron the head (BLPOP) or tail (BRPOP) so that
|
|
|
|
* we can propagate the command properly.
|
|
|
|
*
|
|
|
|
* The function returns REDIS_OK if we are able to serve the client, otherwise
|
|
|
|
* REDIS_ERR is returned to signal the caller that the list POP operation
|
|
|
|
* should be undoed as the client was not served: This only happens for
|
|
|
|
* BRPOPLPUSH that fails to push the value to the destination key as it is
|
|
|
|
* of the wrong type. */
|
|
|
|
int serveClientBlockedOnList(redisClient *receiver, robj *key, robj *dstkey, redisDb *db, robj *value, int where)
|
|
|
|
{
|
|
|
|
robj *argv[3];
|
|
|
|
|
|
|
|
if (dstkey == NULL) {
|
|
|
|
/* Propagate the [LR]POP operation. */
|
|
|
|
argv[0] = (where == REDIS_HEAD) ? shared.lpop :
|
|
|
|
shared.rpop;
|
|
|
|
argv[1] = key;
|
|
|
|
propagate((where == REDIS_HEAD) ?
|
|
|
|
server.lpopCommand : server.rpopCommand,
|
|
|
|
db->id,argv,2,REDIS_PROPAGATE_AOF|REDIS_PROPAGATE_REPL);
|
|
|
|
|
|
|
|
/* BRPOP/BLPOP */
|
|
|
|
addReplyMultiBulkLen(receiver,2);
|
|
|
|
addReplyBulk(receiver,key);
|
|
|
|
addReplyBulk(receiver,value);
|
|
|
|
} else {
|
|
|
|
/* BRPOPLPUSH */
|
|
|
|
robj *dstobj =
|
|
|
|
lookupKeyWrite(receiver->db,dstkey);
|
|
|
|
if (!(dstobj &&
|
|
|
|
checkType(receiver,dstobj,REDIS_LIST)))
|
|
|
|
{
|
|
|
|
/* Propagate the RPOP operation. */
|
|
|
|
argv[0] = shared.rpop;
|
|
|
|
argv[1] = key;
|
|
|
|
propagate(server.rpopCommand,
|
|
|
|
db->id,argv,2,
|
|
|
|
REDIS_PROPAGATE_AOF|
|
|
|
|
REDIS_PROPAGATE_REPL);
|
|
|
|
rpoplpushHandlePush(receiver,dstkey,dstobj,
|
|
|
|
value);
|
|
|
|
/* Propagate the LPUSH operation. */
|
|
|
|
argv[0] = shared.lpush;
|
|
|
|
argv[1] = dstkey;
|
|
|
|
argv[2] = value;
|
|
|
|
propagate(server.lpushCommand,
|
|
|
|
db->id,argv,3,
|
|
|
|
REDIS_PROPAGATE_AOF|
|
|
|
|
REDIS_PROPAGATE_REPL);
|
2010-12-06 10:04:10 -05:00
|
|
|
} else {
|
A reimplementation of blocking operation internals.
Redis provides support for blocking operations such as BLPOP or BRPOP.
This operations are identical to normal LPOP and RPOP operations as long
as there are elements in the target list, but if the list is empty they
block waiting for new data to arrive to the list.
All the clients blocked waiting for th same list are served in a FIFO
way, so the first that blocked is the first to be served when there is
more data pushed by another client into the list.
The previous implementation of blocking operations was conceived to
serve clients in the context of push operations. For for instance:
1) There is a client "A" blocked on list "foo".
2) The client "B" performs `LPUSH foo somevalue`.
3) The client "A" is served in the context of the "B" LPUSH,
synchronously.
Processing things in a synchronous way was useful as if "A" pushes a
value that is served by "B", from the point of view of the database is a
NOP (no operation) thing, that is, nothing is replicated, nothing is
written in the AOF file, and so forth.
However later we implemented two things:
1) Variadic LPUSH that could add multiple values to a list in the
context of a single call.
2) BRPOPLPUSH that was a version of BRPOP that also provided a "PUSH"
side effect when receiving data.
This forced us to make the synchronous implementation more complex. If
client "B" is waiting for data, and "A" pushes three elemnents in a
single call, we needed to propagate an LPUSH with a missing argument
in the AOF and replication link. We also needed to make sure to
replicate the LPUSH side of BRPOPLPUSH, but only if in turn did not
happened to serve another blocking client into another list ;)
This were complex but with a few of mutually recursive functions
everything worked as expected... until one day we introduced scripting
in Redis.
Scripting + synchronous blocking operations = Issue #614.
Basically you can't "rewrite" a script to have just a partial effect on
the replicas and AOF file if the script happened to serve a few blocked
clients.
The solution to all this problems, implemented by this commit, is to
change the way we serve blocked clients. Instead of serving the blocked
clients synchronously, in the context of the command performing the PUSH
operation, it is now an asynchronous and iterative process:
1) If a key that has clients blocked waiting for data is the subject of
a list push operation, We simply mark keys as "ready" and put it into a
queue.
2) Every command pushing stuff on lists, as a variadic LPUSH, a script,
or whatever it is, is replicated verbatim without any rewriting.
3) Every time a Redis command, a MULTI/EXEC block, or a script,
completed its execution, we run the list of keys ready to serve blocked
clients (as more data arrived), and process this list serving the
blocked clients.
4) As a result of "3" maybe more keys are ready again for other clients
(as a result of BRPOPLPUSH we may have push operations), so we iterate
back to step "3" if it's needed.
The new code has a much simpler semantics, and a simpler to understand
implementation, with the disadvantage of not being able to "optmize out"
a PUSH+BPOP as a No OP.
This commit will be tested with care before the final merge, more tests
will be added likely.
2012-09-04 04:37:49 -04:00
|
|
|
/* BRPOPLPUSH failed because of wrong
|
|
|
|
* destination type. */
|
|
|
|
return REDIS_ERR;
|
2010-12-06 10:04:10 -05:00
|
|
|
}
|
2010-11-08 13:25:59 -05:00
|
|
|
}
|
A reimplementation of blocking operation internals.
Redis provides support for blocking operations such as BLPOP or BRPOP.
This operations are identical to normal LPOP and RPOP operations as long
as there are elements in the target list, but if the list is empty they
block waiting for new data to arrive to the list.
All the clients blocked waiting for th same list are served in a FIFO
way, so the first that blocked is the first to be served when there is
more data pushed by another client into the list.
The previous implementation of blocking operations was conceived to
serve clients in the context of push operations. For for instance:
1) There is a client "A" blocked on list "foo".
2) The client "B" performs `LPUSH foo somevalue`.
3) The client "A" is served in the context of the "B" LPUSH,
synchronously.
Processing things in a synchronous way was useful as if "A" pushes a
value that is served by "B", from the point of view of the database is a
NOP (no operation) thing, that is, nothing is replicated, nothing is
written in the AOF file, and so forth.
However later we implemented two things:
1) Variadic LPUSH that could add multiple values to a list in the
context of a single call.
2) BRPOPLPUSH that was a version of BRPOP that also provided a "PUSH"
side effect when receiving data.
This forced us to make the synchronous implementation more complex. If
client "B" is waiting for data, and "A" pushes three elemnents in a
single call, we needed to propagate an LPUSH with a missing argument
in the AOF and replication link. We also needed to make sure to
replicate the LPUSH side of BRPOPLPUSH, but only if in turn did not
happened to serve another blocking client into another list ;)
This were complex but with a few of mutually recursive functions
everything worked as expected... until one day we introduced scripting
in Redis.
Scripting + synchronous blocking operations = Issue #614.
Basically you can't "rewrite" a script to have just a partial effect on
the replicas and AOF file if the script happened to serve a few blocked
clients.
The solution to all this problems, implemented by this commit, is to
change the way we serve blocked clients. Instead of serving the blocked
clients synchronously, in the context of the command performing the PUSH
operation, it is now an asynchronous and iterative process:
1) If a key that has clients blocked waiting for data is the subject of
a list push operation, We simply mark keys as "ready" and put it into a
queue.
2) Every command pushing stuff on lists, as a variadic LPUSH, a script,
or whatever it is, is replicated verbatim without any rewriting.
3) Every time a Redis command, a MULTI/EXEC block, or a script,
completed its execution, we run the list of keys ready to serve blocked
clients (as more data arrived), and process this list serving the
blocked clients.
4) As a result of "3" maybe more keys are ready again for other clients
(as a result of BRPOPLPUSH we may have push operations), so we iterate
back to step "3" if it's needed.
The new code has a much simpler semantics, and a simpler to understand
implementation, with the disadvantage of not being able to "optmize out"
a PUSH+BPOP as a No OP.
This commit will be tested with care before the final merge, more tests
will be added likely.
2012-09-04 04:37:49 -04:00
|
|
|
return REDIS_OK;
|
|
|
|
}
|
2010-06-21 18:07:48 -04:00
|
|
|
|
A reimplementation of blocking operation internals.
Redis provides support for blocking operations such as BLPOP or BRPOP.
This operations are identical to normal LPOP and RPOP operations as long
as there are elements in the target list, but if the list is empty they
block waiting for new data to arrive to the list.
All the clients blocked waiting for th same list are served in a FIFO
way, so the first that blocked is the first to be served when there is
more data pushed by another client into the list.
The previous implementation of blocking operations was conceived to
serve clients in the context of push operations. For for instance:
1) There is a client "A" blocked on list "foo".
2) The client "B" performs `LPUSH foo somevalue`.
3) The client "A" is served in the context of the "B" LPUSH,
synchronously.
Processing things in a synchronous way was useful as if "A" pushes a
value that is served by "B", from the point of view of the database is a
NOP (no operation) thing, that is, nothing is replicated, nothing is
written in the AOF file, and so forth.
However later we implemented two things:
1) Variadic LPUSH that could add multiple values to a list in the
context of a single call.
2) BRPOPLPUSH that was a version of BRPOP that also provided a "PUSH"
side effect when receiving data.
This forced us to make the synchronous implementation more complex. If
client "B" is waiting for data, and "A" pushes three elemnents in a
single call, we needed to propagate an LPUSH with a missing argument
in the AOF and replication link. We also needed to make sure to
replicate the LPUSH side of BRPOPLPUSH, but only if in turn did not
happened to serve another blocking client into another list ;)
This were complex but with a few of mutually recursive functions
everything worked as expected... until one day we introduced scripting
in Redis.
Scripting + synchronous blocking operations = Issue #614.
Basically you can't "rewrite" a script to have just a partial effect on
the replicas and AOF file if the script happened to serve a few blocked
clients.
The solution to all this problems, implemented by this commit, is to
change the way we serve blocked clients. Instead of serving the blocked
clients synchronously, in the context of the command performing the PUSH
operation, it is now an asynchronous and iterative process:
1) If a key that has clients blocked waiting for data is the subject of
a list push operation, We simply mark keys as "ready" and put it into a
queue.
2) Every command pushing stuff on lists, as a variadic LPUSH, a script,
or whatever it is, is replicated verbatim without any rewriting.
3) Every time a Redis command, a MULTI/EXEC block, or a script,
completed its execution, we run the list of keys ready to serve blocked
clients (as more data arrived), and process this list serving the
blocked clients.
4) As a result of "3" maybe more keys are ready again for other clients
(as a result of BRPOPLPUSH we may have push operations), so we iterate
back to step "3" if it's needed.
The new code has a much simpler semantics, and a simpler to understand
implementation, with the disadvantage of not being able to "optmize out"
a PUSH+BPOP as a No OP.
This commit will be tested with care before the final merge, more tests
will be added likely.
2012-09-04 04:37:49 -04:00
|
|
|
/* This function should be called by Redis every time a single command,
|
|
|
|
* a MULTI/EXEC block, or a Lua script, terminated its execution after
|
|
|
|
* being called by a client.
|
|
|
|
*
|
|
|
|
* All the keys with at least one client blocked that received at least
|
|
|
|
* one new element via some PUSH operation are accumulated into
|
|
|
|
* the server.ready_keys list. This function will run the list and will
|
|
|
|
* serve clients accordingly. Note that the function will iterate again and
|
|
|
|
* again as a result of serving BRPOPLPUSH we can have new blocking clients
|
|
|
|
* to serve because of the PUSH side of BRPOPLPUSH. */
|
|
|
|
void handleClientsBlockedOnLists(void) {
|
|
|
|
while(listLength(server.ready_keys) != 0) {
|
|
|
|
list *l;
|
|
|
|
|
|
|
|
/* Point server.ready_keys to a fresh list and save the current one
|
|
|
|
* locally. This way as we run the old list we are free to call
|
|
|
|
* signalListAsReady() that may push new elements in server.ready_keys
|
|
|
|
* when handling clients blocked into BRPOPLPUSH. */
|
|
|
|
l = server.ready_keys;
|
|
|
|
server.ready_keys = listCreate();
|
|
|
|
|
|
|
|
while(listLength(l) != 0) {
|
|
|
|
listNode *ln = listFirst(l);
|
|
|
|
readyList *rl = ln->value;
|
|
|
|
|
|
|
|
/* First of all remove this key from db->ready_keys so that
|
|
|
|
* we can safely call signalListAsReady() against this key. */
|
|
|
|
dictDelete(rl->db->ready_keys,rl->key);
|
|
|
|
|
|
|
|
/* If the key exists and it's a list, serve blocked clients
|
|
|
|
* with data. */
|
|
|
|
robj *o = lookupKeyWrite(rl->db,rl->key);
|
|
|
|
if (o != NULL && o->type == REDIS_LIST) {
|
|
|
|
dictEntry *de;
|
|
|
|
|
|
|
|
/* We serve clients in the same order they blocked for
|
|
|
|
* this key, from the first blocked to the last. */
|
|
|
|
de = dictFind(rl->db->blocking_keys,rl->key);
|
|
|
|
if (de) {
|
|
|
|
list *clients = dictGetVal(de);
|
|
|
|
int numclients = listLength(clients);
|
|
|
|
|
|
|
|
while(numclients--) {
|
|
|
|
listNode *clientnode = listFirst(clients);
|
|
|
|
redisClient *receiver = clientnode->value;
|
|
|
|
robj *dstkey = receiver->bpop.target;
|
|
|
|
int where = (receiver->lastcmd &&
|
|
|
|
receiver->lastcmd->proc == blpopCommand) ?
|
|
|
|
REDIS_HEAD : REDIS_TAIL;
|
|
|
|
robj *value = listTypePop(o,where);
|
|
|
|
|
|
|
|
if (value) {
|
|
|
|
/* Protect receiver->bpop.target, that will be
|
|
|
|
* freed by the next unblockClientWaitingData()
|
|
|
|
* call. */
|
|
|
|
if (dstkey) incrRefCount(dstkey);
|
|
|
|
unblockClientWaitingData(receiver);
|
|
|
|
|
|
|
|
if (serveClientBlockedOnList(receiver,
|
|
|
|
rl->key,dstkey,rl->db,value,
|
|
|
|
where) == REDIS_ERR)
|
|
|
|
{
|
|
|
|
/* If we failed serving the client we need
|
|
|
|
* to also undo the POP operation. */
|
|
|
|
listTypePush(o,value,where);
|
|
|
|
}
|
|
|
|
|
|
|
|
if (dstkey) decrRefCount(dstkey);
|
|
|
|
decrRefCount(value);
|
|
|
|
} else {
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
if (listTypeLength(o) == 0) dbDelete(rl->db,rl->key);
|
|
|
|
/* We don't call signalModifiedKey() as it was already called
|
|
|
|
* when an element was pushed on the list. */
|
|
|
|
}
|
|
|
|
|
|
|
|
/* Free this item. */
|
|
|
|
decrRefCount(rl->key);
|
|
|
|
zfree(rl);
|
|
|
|
listDelNode(l,ln);
|
|
|
|
}
|
|
|
|
listRelease(l); /* We have the new list on place at this point. */
|
|
|
|
}
|
2010-06-21 18:07:48 -04:00
|
|
|
}
|
|
|
|
|
2010-12-06 07:45:48 -05:00
|
|
|
int getTimeoutFromObjectOrReply(redisClient *c, robj *object, time_t *timeout) {
|
|
|
|
long tval;
|
2010-11-09 09:00:54 -05:00
|
|
|
|
2010-12-06 07:45:48 -05:00
|
|
|
if (getLongFromObjectOrReply(c,object,&tval,
|
|
|
|
"timeout is not an integer or out of range") != REDIS_OK)
|
2010-11-09 09:00:54 -05:00
|
|
|
return REDIS_ERR;
|
|
|
|
|
2010-12-06 07:45:48 -05:00
|
|
|
if (tval < 0) {
|
|
|
|
addReplyError(c,"timeout is negative");
|
2010-11-09 09:00:54 -05:00
|
|
|
return REDIS_ERR;
|
|
|
|
}
|
|
|
|
|
2012-03-27 11:39:58 -04:00
|
|
|
if (tval > 0) tval += server.unixtime;
|
2010-12-06 07:45:48 -05:00
|
|
|
*timeout = tval;
|
2010-11-09 09:00:54 -05:00
|
|
|
|
|
|
|
return REDIS_OK;
|
2010-06-21 18:07:48 -04:00
|
|
|
}
|
|
|
|
|
|
|
|
/* Blocking RPOP/LPOP */
|
|
|
|
void blockingPopGenericCommand(redisClient *c, int where) {
|
|
|
|
robj *o;
|
|
|
|
time_t timeout;
|
|
|
|
int j;
|
|
|
|
|
2010-12-06 07:45:48 -05:00
|
|
|
if (getTimeoutFromObjectOrReply(c,c->argv[c->argc-1],&timeout) != REDIS_OK)
|
2010-08-26 08:05:14 -04:00
|
|
|
return;
|
|
|
|
|
2010-06-21 18:07:48 -04:00
|
|
|
for (j = 1; j < c->argc-1; j++) {
|
|
|
|
o = lookupKeyWrite(c->db,c->argv[j]);
|
|
|
|
if (o != NULL) {
|
|
|
|
if (o->type != REDIS_LIST) {
|
|
|
|
addReply(c,shared.wrongtypeerr);
|
|
|
|
return;
|
|
|
|
} else {
|
|
|
|
if (listTypeLength(o) != 0) {
|
2012-02-29 08:41:57 -05:00
|
|
|
/* Non empty list, this is like a non normal [LR]POP. */
|
|
|
|
robj *value = listTypePop(o,where);
|
|
|
|
redisAssert(value != NULL);
|
2010-11-08 13:25:59 -05:00
|
|
|
|
2012-02-29 08:41:57 -05:00
|
|
|
addReplyMultiBulkLen(c,2);
|
|
|
|
addReplyBulk(c,c->argv[j]);
|
|
|
|
addReplyBulk(c,value);
|
|
|
|
decrRefCount(value);
|
|
|
|
if (listTypeLength(o) == 0) dbDelete(c->db,c->argv[j]);
|
|
|
|
signalModifiedKey(c->db,c->argv[j]);
|
|
|
|
server.dirty++;
|
|
|
|
|
|
|
|
/* Replicate it as an [LR]POP instead of B[LR]POP. */
|
|
|
|
rewriteClientCommandVector(c,2,
|
|
|
|
(where == REDIS_HEAD) ? shared.lpop : shared.rpop,
|
|
|
|
c->argv[j]);
|
2010-06-21 18:07:48 -04:00
|
|
|
return;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
2010-08-26 08:05:14 -04:00
|
|
|
|
2010-08-30 10:31:03 -04:00
|
|
|
/* If we are inside a MULTI/EXEC and the list is empty the only thing
|
|
|
|
* we can do is treating it as a timeout (even with timeout 0). */
|
|
|
|
if (c->flags & REDIS_MULTI) {
|
|
|
|
addReply(c,shared.nullmultibulk);
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
2010-06-21 18:07:48 -04:00
|
|
|
/* If the list is empty or the key does not exists we must block */
|
2010-11-08 18:47:46 -05:00
|
|
|
blockForKeys(c, c->argv + 1, c->argc - 2, timeout, NULL);
|
2010-06-21 18:07:48 -04:00
|
|
|
}
|
|
|
|
|
|
|
|
void blpopCommand(redisClient *c) {
|
|
|
|
blockingPopGenericCommand(c,REDIS_HEAD);
|
|
|
|
}
|
|
|
|
|
|
|
|
void brpopCommand(redisClient *c) {
|
|
|
|
blockingPopGenericCommand(c,REDIS_TAIL);
|
|
|
|
}
|
2010-11-08 13:25:59 -05:00
|
|
|
|
|
|
|
void brpoplpushCommand(redisClient *c) {
|
2010-11-08 18:47:46 -05:00
|
|
|
time_t timeout;
|
2010-11-08 13:25:59 -05:00
|
|
|
|
2010-12-06 07:45:48 -05:00
|
|
|
if (getTimeoutFromObjectOrReply(c,c->argv[3],&timeout) != REDIS_OK)
|
2010-11-08 18:47:46 -05:00
|
|
|
return;
|
|
|
|
|
|
|
|
robj *key = lookupKeyWrite(c->db, c->argv[1]);
|
|
|
|
|
|
|
|
if (key == NULL) {
|
|
|
|
if (c->flags & REDIS_MULTI) {
|
2010-11-09 08:31:02 -05:00
|
|
|
/* Blocking against an empty list in a multi state
|
|
|
|
* returns immediately. */
|
2011-01-31 10:49:56 -05:00
|
|
|
addReply(c, shared.nullbulk);
|
2010-11-08 18:47:46 -05:00
|
|
|
} else {
|
2010-11-09 08:31:02 -05:00
|
|
|
/* The list is empty and the client blocks. */
|
2010-11-08 18:47:46 -05:00
|
|
|
blockForKeys(c, c->argv + 1, 1, timeout, c->argv[2]);
|
|
|
|
}
|
|
|
|
} else {
|
2010-11-09 08:31:02 -05:00
|
|
|
if (key->type != REDIS_LIST) {
|
|
|
|
addReply(c, shared.wrongtypeerr);
|
|
|
|
} else {
|
|
|
|
/* The list exists and has elements, so
|
|
|
|
* the regular rpoplpushCommand is executed. */
|
2011-10-04 12:43:03 -04:00
|
|
|
redisAssertWithInfo(c,key,listTypeLength(key) > 0);
|
2010-11-09 08:31:02 -05:00
|
|
|
rpoplpushCommand(c);
|
|
|
|
}
|
2010-11-08 18:47:46 -05:00
|
|
|
}
|
2010-11-08 13:25:59 -05:00
|
|
|
}
|