redict/src/t_list.c

771 lines
26 KiB
C
Raw Normal View History

/*
* Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* * Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
* * Neither the name of Redis nor the names of its contributors may be used
* to endorse or promote products derived from this software without
* specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
#include "server.h"
/*-----------------------------------------------------------------------------
* List API
*----------------------------------------------------------------------------*/
2013-01-16 12:00:20 -05:00
/* The function pushes an element to the specified list object 'subject',
A reimplementation of blocking operation internals. Redis provides support for blocking operations such as BLPOP or BRPOP. This operations are identical to normal LPOP and RPOP operations as long as there are elements in the target list, but if the list is empty they block waiting for new data to arrive to the list. All the clients blocked waiting for th same list are served in a FIFO way, so the first that blocked is the first to be served when there is more data pushed by another client into the list. The previous implementation of blocking operations was conceived to serve clients in the context of push operations. For for instance: 1) There is a client "A" blocked on list "foo". 2) The client "B" performs `LPUSH foo somevalue`. 3) The client "A" is served in the context of the "B" LPUSH, synchronously. Processing things in a synchronous way was useful as if "A" pushes a value that is served by "B", from the point of view of the database is a NOP (no operation) thing, that is, nothing is replicated, nothing is written in the AOF file, and so forth. However later we implemented two things: 1) Variadic LPUSH that could add multiple values to a list in the context of a single call. 2) BRPOPLPUSH that was a version of BRPOP that also provided a "PUSH" side effect when receiving data. This forced us to make the synchronous implementation more complex. If client "B" is waiting for data, and "A" pushes three elemnents in a single call, we needed to propagate an LPUSH with a missing argument in the AOF and replication link. We also needed to make sure to replicate the LPUSH side of BRPOPLPUSH, but only if in turn did not happened to serve another blocking client into another list ;) This were complex but with a few of mutually recursive functions everything worked as expected... until one day we introduced scripting in Redis. Scripting + synchronous blocking operations = Issue #614. Basically you can't "rewrite" a script to have just a partial effect on the replicas and AOF file if the script happened to serve a few blocked clients. The solution to all this problems, implemented by this commit, is to change the way we serve blocked clients. Instead of serving the blocked clients synchronously, in the context of the command performing the PUSH operation, it is now an asynchronous and iterative process: 1) If a key that has clients blocked waiting for data is the subject of a list push operation, We simply mark keys as "ready" and put it into a queue. 2) Every command pushing stuff on lists, as a variadic LPUSH, a script, or whatever it is, is replicated verbatim without any rewriting. 3) Every time a Redis command, a MULTI/EXEC block, or a script, completed its execution, we run the list of keys ready to serve blocked clients (as more data arrived), and process this list serving the blocked clients. 4) As a result of "3" maybe more keys are ready again for other clients (as a result of BRPOPLPUSH we may have push operations), so we iterate back to step "3" if it's needed. The new code has a much simpler semantics, and a simpler to understand implementation, with the disadvantage of not being able to "optmize out" a PUSH+BPOP as a No OP. This commit will be tested with care before the final merge, more tests will be added likely.
2012-09-04 04:37:49 -04:00
* at head or tail position as specified by 'where'.
*
2013-01-16 12:00:20 -05:00
* There is no need for the caller to increment the refcount of 'value' as
A reimplementation of blocking operation internals. Redis provides support for blocking operations such as BLPOP or BRPOP. This operations are identical to normal LPOP and RPOP operations as long as there are elements in the target list, but if the list is empty they block waiting for new data to arrive to the list. All the clients blocked waiting for th same list are served in a FIFO way, so the first that blocked is the first to be served when there is more data pushed by another client into the list. The previous implementation of blocking operations was conceived to serve clients in the context of push operations. For for instance: 1) There is a client "A" blocked on list "foo". 2) The client "B" performs `LPUSH foo somevalue`. 3) The client "A" is served in the context of the "B" LPUSH, synchronously. Processing things in a synchronous way was useful as if "A" pushes a value that is served by "B", from the point of view of the database is a NOP (no operation) thing, that is, nothing is replicated, nothing is written in the AOF file, and so forth. However later we implemented two things: 1) Variadic LPUSH that could add multiple values to a list in the context of a single call. 2) BRPOPLPUSH that was a version of BRPOP that also provided a "PUSH" side effect when receiving data. This forced us to make the synchronous implementation more complex. If client "B" is waiting for data, and "A" pushes three elemnents in a single call, we needed to propagate an LPUSH with a missing argument in the AOF and replication link. We also needed to make sure to replicate the LPUSH side of BRPOPLPUSH, but only if in turn did not happened to serve another blocking client into another list ;) This were complex but with a few of mutually recursive functions everything worked as expected... until one day we introduced scripting in Redis. Scripting + synchronous blocking operations = Issue #614. Basically you can't "rewrite" a script to have just a partial effect on the replicas and AOF file if the script happened to serve a few blocked clients. The solution to all this problems, implemented by this commit, is to change the way we serve blocked clients. Instead of serving the blocked clients synchronously, in the context of the command performing the PUSH operation, it is now an asynchronous and iterative process: 1) If a key that has clients blocked waiting for data is the subject of a list push operation, We simply mark keys as "ready" and put it into a queue. 2) Every command pushing stuff on lists, as a variadic LPUSH, a script, or whatever it is, is replicated verbatim without any rewriting. 3) Every time a Redis command, a MULTI/EXEC block, or a script, completed its execution, we run the list of keys ready to serve blocked clients (as more data arrived), and process this list serving the blocked clients. 4) As a result of "3" maybe more keys are ready again for other clients (as a result of BRPOPLPUSH we may have push operations), so we iterate back to step "3" if it's needed. The new code has a much simpler semantics, and a simpler to understand implementation, with the disadvantage of not being able to "optmize out" a PUSH+BPOP as a No OP. This commit will be tested with care before the final merge, more tests will be added likely.
2012-09-04 04:37:49 -04:00
* the function takes care of it if needed. */
void listTypePush(robj *subject, robj *value, int where) {
if (subject->encoding == OBJ_ENCODING_QUICKLIST) {
2015-07-27 03:41:48 -04:00
int pos = (where == LIST_HEAD) ? QUICKLIST_HEAD : QUICKLIST_TAIL;
value = getDecodedObject(value);
size_t len = sdslen(value->ptr);
quicklistPush(subject->ptr, value->ptr, len, pos);
decrRefCount(value);
} else {
2015-07-27 03:41:48 -04:00
serverPanic("Unknown list encoding");
}
}
void *listPopSaver(unsigned char *data, unsigned int sz) {
return createStringObject((char*)data,sz);
}
robj *listTypePop(robj *subject, int where) {
long long vlong;
robj *value = NULL;
2015-07-27 03:41:48 -04:00
int ql_where = where == LIST_HEAD ? QUICKLIST_HEAD : QUICKLIST_TAIL;
if (subject->encoding == OBJ_ENCODING_QUICKLIST) {
if (quicklistPopCustom(subject->ptr, ql_where, (unsigned char **)&value,
NULL, &vlong, listPopSaver)) {
if (!value)
value = createStringObjectFromLongLong(vlong);
}
} else {
2015-07-27 03:41:48 -04:00
serverPanic("Unknown list encoding");
}
return value;
}
unsigned long listTypeLength(const robj *subject) {
if (subject->encoding == OBJ_ENCODING_QUICKLIST) {
return quicklistCount(subject->ptr);
} else {
2015-07-27 03:41:48 -04:00
serverPanic("Unknown list encoding");
}
}
/* Initialize an iterator at the specified index. */
listTypeIterator *listTypeInitIterator(robj *subject, long index,
unsigned char direction) {
listTypeIterator *li = zmalloc(sizeof(listTypeIterator));
li->subject = subject;
li->encoding = subject->encoding;
li->direction = direction;
li->iter = NULL;
2015-07-27 03:41:48 -04:00
/* LIST_HEAD means start at TAIL and move *towards* head.
* LIST_TAIL means start at HEAD and move *towards tail. */
int iter_direction =
2015-07-27 03:41:48 -04:00
direction == LIST_HEAD ? AL_START_TAIL : AL_START_HEAD;
if (li->encoding == OBJ_ENCODING_QUICKLIST) {
li->iter = quicklistGetIteratorAtIdx(li->subject->ptr,
iter_direction, index);
} else {
2015-07-27 03:41:48 -04:00
serverPanic("Unknown list encoding");
}
return li;
}
/* Clean up the iterator. */
void listTypeReleaseIterator(listTypeIterator *li) {
zfree(li->iter);
zfree(li);
}
/* Stores pointer to current the entry in the provided entry structure
* and advances the position of the iterator. Returns 1 when the current
* entry is in fact an entry, 0 otherwise. */
int listTypeNext(listTypeIterator *li, listTypeEntry *entry) {
/* Protect from converting when iterating */
2015-07-26 09:29:53 -04:00
serverAssert(li->subject->encoding == li->encoding);
entry->li = li;
if (li->encoding == OBJ_ENCODING_QUICKLIST) {
return quicklistNext(li->iter, &entry->entry);
} else {
2015-07-27 03:41:48 -04:00
serverPanic("Unknown list encoding");
}
return 0;
}
/* Return entry or NULL at the current position of the iterator. */
robj *listTypeGet(listTypeEntry *entry) {
robj *value = NULL;
if (entry->li->encoding == OBJ_ENCODING_QUICKLIST) {
if (entry->entry.value) {
value = createStringObject((char *)entry->entry.value,
entry->entry.sz);
} else {
value = createStringObjectFromLongLong(entry->entry.longval);
}
} else {
2015-07-27 03:41:48 -04:00
serverPanic("Unknown list encoding");
}
return value;
}
void listTypeInsert(listTypeEntry *entry, robj *value, int where) {
if (entry->li->encoding == OBJ_ENCODING_QUICKLIST) {
value = getDecodedObject(value);
sds str = value->ptr;
size_t len = sdslen(str);
2015-07-27 03:41:48 -04:00
if (where == LIST_TAIL) {
quicklistInsertAfter((quicklist *)entry->entry.quicklist,
&entry->entry, str, len);
2015-07-27 03:41:48 -04:00
} else if (where == LIST_HEAD) {
quicklistInsertBefore((quicklist *)entry->entry.quicklist,
&entry->entry, str, len);
}
decrRefCount(value);
} else {
2015-07-27 03:41:48 -04:00
serverPanic("Unknown list encoding");
}
}
/* Compare the given object with the entry at the current position. */
int listTypeEqual(listTypeEntry *entry, robj *o) {
if (entry->li->encoding == OBJ_ENCODING_QUICKLIST) {
2015-07-26 09:29:53 -04:00
serverAssertWithInfo(NULL,o,sdsEncodedObject(o));
return quicklistCompare(entry->entry.zi,o->ptr,sdslen(o->ptr));
} else {
2015-07-27 03:41:48 -04:00
serverPanic("Unknown list encoding");
}
}
/* Delete the element pointed to. */
void listTypeDelete(listTypeIterator *iter, listTypeEntry *entry) {
if (entry->li->encoding == OBJ_ENCODING_QUICKLIST) {
quicklistDelEntry(iter->iter, &entry->entry);
} else {
2015-07-27 03:41:48 -04:00
serverPanic("Unknown list encoding");
}
}
/* Create a quicklist from a single ziplist */
void listTypeConvert(robj *subject, int enc) {
2015-07-26 09:29:53 -04:00
serverAssertWithInfo(NULL,subject,subject->type==OBJ_LIST);
serverAssertWithInfo(NULL,subject,subject->encoding==OBJ_ENCODING_ZIPLIST);
if (enc == OBJ_ENCODING_QUICKLIST) {
size_t zlen = server.list_max_ziplist_size;
int depth = server.list_compress_depth;
subject->ptr = quicklistCreateFromZiplist(zlen, depth, subject->ptr);
subject->encoding = OBJ_ENCODING_QUICKLIST;
} else {
2015-07-27 03:41:48 -04:00
serverPanic("Unsupported list conversion");
}
}
/*-----------------------------------------------------------------------------
* List Commands
*----------------------------------------------------------------------------*/
void pushGenericCommand(client *c, int where) {
2016-06-05 10:09:55 -04:00
int j, pushed = 0;
robj *lobj = lookupKeyWrite(c->db,c->argv[1]);
2011-04-15 10:35:27 -04:00
if (lobj && lobj->type != OBJ_LIST) {
2011-04-15 10:35:27 -04:00
addReply(c,shared.wrongtypeerr);
return;
}
for (j = 2; j < c->argc; j++) {
if (!lobj) {
lobj = createQuicklistObject();
quicklistSetOptions(lobj->ptr, server.list_max_ziplist_size,
server.list_compress_depth);
2011-04-15 10:35:27 -04:00
dbAdd(c->db,c->argv[1],lobj);
}
2011-04-15 10:35:27 -04:00
listTypePush(lobj,c->argv[j],where);
pushed++;
}
2016-06-05 10:09:55 -04:00
addReplyLongLong(c, (lobj ? listTypeLength(lobj) : 0));
if (pushed) {
2015-07-27 03:41:48 -04:00
char *event = (where == LIST_HEAD) ? "lpush" : "rpush";
signalModifiedKey(c->db,c->argv[1]);
2015-07-27 03:41:48 -04:00
notifyKeyspaceEvent(NOTIFY_LIST,event,c->argv[1],c->db->id);
}
2011-04-15 10:35:27 -04:00
server.dirty += pushed;
}
void lpushCommand(client *c) {
2015-07-27 03:41:48 -04:00
pushGenericCommand(c,LIST_HEAD);
}
void rpushCommand(client *c) {
2015-07-27 03:41:48 -04:00
pushGenericCommand(c,LIST_TAIL);
}
void pushxGenericCommand(client *c, int where) {
2016-06-05 10:22:52 -04:00
int j, pushed = 0;
robj *subject;
if ((subject = lookupKeyWriteOrReply(c,c->argv[1],shared.czero)) == NULL ||
checkType(c,subject,OBJ_LIST)) return;
2016-06-05 10:22:52 -04:00
for (j = 2; j < c->argc; j++) {
listTypePush(subject,c->argv[j],where);
pushed++;
}
addReplyLongLong(c,listTypeLength(subject));
2016-06-05 10:22:52 -04:00
if (pushed) {
char *event = (where == LIST_HEAD) ? "lpush" : "rpush";
signalModifiedKey(c->db,c->argv[1]);
notifyKeyspaceEvent(NOTIFY_LIST,event,c->argv[1],c->db->id);
}
server.dirty += pushed;
}
void lpushxCommand(client *c) {
pushxGenericCommand(c,LIST_HEAD);
}
void rpushxCommand(client *c) {
pushxGenericCommand(c,LIST_TAIL);
}
void linsertCommand(client *c) {
int where;
robj *subject;
listTypeIterator *iter;
listTypeEntry entry;
int inserted = 0;
if (strcasecmp(c->argv[2]->ptr,"after") == 0) {
where = LIST_TAIL;
} else if (strcasecmp(c->argv[2]->ptr,"before") == 0) {
where = LIST_HEAD;
} else {
addReply(c,shared.syntaxerr);
return;
}
if ((subject = lookupKeyWriteOrReply(c,c->argv[1],shared.czero)) == NULL ||
checkType(c,subject,OBJ_LIST)) return;
/* Seek pivot from head to tail */
iter = listTypeInitIterator(subject,0,LIST_TAIL);
while (listTypeNext(iter,&entry)) {
if (listTypeEqual(&entry,c->argv[3])) {
listTypeInsert(&entry,c->argv[4],where);
inserted = 1;
break;
}
}
listTypeReleaseIterator(iter);
if (inserted) {
signalModifiedKey(c->db,c->argv[1]);
notifyKeyspaceEvent(NOTIFY_LIST,"linsert",
c->argv[1],c->db->id);
server.dirty++;
} else {
/* Notify client of a failed insert */
addReplyLongLong(c,-1);
return;
}
addReplyLongLong(c,listTypeLength(subject));
}
void llenCommand(client *c) {
robj *o = lookupKeyReadOrReply(c,c->argv[1],shared.czero);
if (o == NULL || checkType(c,o,OBJ_LIST)) return;
addReplyLongLong(c,listTypeLength(o));
}
void lindexCommand(client *c) {
2018-11-30 03:41:54 -05:00
robj *o = lookupKeyReadOrReply(c,c->argv[1],shared.null[c->resp]);
if (o == NULL || checkType(c,o,OBJ_LIST)) return;
long index;
robj *value = NULL;
if ((getLongFromObjectOrReply(c, c->argv[2], &index, NULL) != C_OK))
return;
if (o->encoding == OBJ_ENCODING_QUICKLIST) {
quicklistEntry entry;
if (quicklistIndex(o->ptr, index, &entry)) {
if (entry.value) {
value = createStringObject((char*)entry.value,entry.sz);
} else {
value = createStringObjectFromLongLong(entry.longval);
}
addReplyBulk(c,value);
decrRefCount(value);
} else {
2018-11-30 03:41:54 -05:00
addReplyNull(c);
}
} else {
2015-07-27 03:41:48 -04:00
serverPanic("Unknown list encoding");
}
}
void lsetCommand(client *c) {
robj *o = lookupKeyWriteOrReply(c,c->argv[1],shared.nokeyerr);
if (o == NULL || checkType(c,o,OBJ_LIST)) return;
long index;
robj *value = c->argv[3];
if ((getLongFromObjectOrReply(c, c->argv[2], &index, NULL) != C_OK))
return;
if (o->encoding == OBJ_ENCODING_QUICKLIST) {
quicklist *ql = o->ptr;
int replaced = quicklistReplaceAtIndex(ql, index,
value->ptr, sdslen(value->ptr));
if (!replaced) {
addReply(c,shared.outofrangeerr);
} else {
addReply(c,shared.ok);
signalModifiedKey(c->db,c->argv[1]);
2015-07-27 03:41:48 -04:00
notifyKeyspaceEvent(NOTIFY_LIST,"lset",c->argv[1],c->db->id);
server.dirty++;
}
} else {
2015-07-27 03:41:48 -04:00
serverPanic("Unknown list encoding");
}
}
void popGenericCommand(client *c, int where) {
2018-11-30 03:41:54 -05:00
robj *o = lookupKeyWriteOrReply(c,c->argv[1],shared.null[c->resp]);
if (o == NULL || checkType(c,o,OBJ_LIST)) return;
robj *value = listTypePop(o,where);
if (value == NULL) {
2018-11-30 03:41:54 -05:00
addReplyNull(c);
} else {
2015-07-27 03:41:48 -04:00
char *event = (where == LIST_HEAD) ? "lpop" : "rpop";
addReplyBulk(c,value);
decrRefCount(value);
2015-07-27 03:41:48 -04:00
notifyKeyspaceEvent(NOTIFY_LIST,event,c->argv[1],c->db->id);
if (listTypeLength(o) == 0) {
2015-07-27 03:41:48 -04:00
notifyKeyspaceEvent(NOTIFY_GENERIC,"del",
c->argv[1],c->db->id);
dbDelete(c->db,c->argv[1]);
}
signalModifiedKey(c->db,c->argv[1]);
server.dirty++;
}
}
void lpopCommand(client *c) {
2015-07-27 03:41:48 -04:00
popGenericCommand(c,LIST_HEAD);
}
void rpopCommand(client *c) {
2015-07-27 03:41:48 -04:00
popGenericCommand(c,LIST_TAIL);
}
void lrangeCommand(client *c) {
robj *o;
long start, end, llen, rangelen;
if ((getLongFromObjectOrReply(c, c->argv[2], &start, NULL) != C_OK) ||
(getLongFromObjectOrReply(c, c->argv[3], &end, NULL) != C_OK)) return;
if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.emptyarray)) == NULL
|| checkType(c,o,OBJ_LIST)) return;
llen = listTypeLength(o);
/* convert negative indexes */
if (start < 0) start = llen+start;
if (end < 0) end = llen+end;
if (start < 0) start = 0;
/* Invariant: start >= 0, so this test will be true when end < 0.
* The range is empty when start > end or start >= length. */
if (start > end || start >= llen) {
addReply(c,shared.emptyarray);
return;
}
if (end >= llen) end = llen-1;
rangelen = (end-start)+1;
/* Return the result in form of a multi-bulk reply */
addReplyArrayLen(c,rangelen);
if (o->encoding == OBJ_ENCODING_QUICKLIST) {
2015-07-27 03:41:48 -04:00
listTypeIterator *iter = listTypeInitIterator(o, start, LIST_TAIL);
while(rangelen--) {
listTypeEntry entry;
listTypeNext(iter, &entry);
quicklistEntry *qe = &entry.entry;
if (qe->value) {
addReplyBulkCBuffer(c,qe->value,qe->sz);
} else {
addReplyBulkLongLong(c,qe->longval);
}
}
listTypeReleaseIterator(iter);
} else {
2015-07-27 03:41:48 -04:00
serverPanic("List encoding is not QUICKLIST!");
}
}
void ltrimCommand(client *c) {
robj *o;
long start, end, llen, ltrim, rtrim;
if ((getLongFromObjectOrReply(c, c->argv[2], &start, NULL) != C_OK) ||
(getLongFromObjectOrReply(c, c->argv[3], &end, NULL) != C_OK)) return;
if ((o = lookupKeyWriteOrReply(c,c->argv[1],shared.ok)) == NULL ||
checkType(c,o,OBJ_LIST)) return;
llen = listTypeLength(o);
/* convert negative indexes */
if (start < 0) start = llen+start;
if (end < 0) end = llen+end;
if (start < 0) start = 0;
/* Invariant: start >= 0, so this test will be true when end < 0.
* The range is empty when start > end or start >= length. */
if (start > end || start >= llen) {
/* Out of range start or start > end result in empty list */
ltrim = llen;
rtrim = 0;
} else {
if (end >= llen) end = llen-1;
ltrim = start;
rtrim = llen-end-1;
}
/* Remove list elements to perform the trim */
if (o->encoding == OBJ_ENCODING_QUICKLIST) {
quicklistDelRange(o->ptr,0,ltrim);
quicklistDelRange(o->ptr,-rtrim,rtrim);
} else {
2015-07-27 03:41:48 -04:00
serverPanic("Unknown list encoding");
}
2015-07-27 03:41:48 -04:00
notifyKeyspaceEvent(NOTIFY_LIST,"ltrim",c->argv[1],c->db->id);
if (listTypeLength(o) == 0) {
dbDelete(c->db,c->argv[1]);
2015-07-27 03:41:48 -04:00
notifyKeyspaceEvent(NOTIFY_GENERIC,"del",c->argv[1],c->db->id);
}
signalModifiedKey(c->db,c->argv[1]);
server.dirty++;
addReply(c,shared.ok);
}
void lremCommand(client *c) {
robj *subject, *obj;
obj = c->argv[3];
long toremove;
long removed = 0;
if ((getLongFromObjectOrReply(c, c->argv[2], &toremove, NULL) != C_OK))
return;
subject = lookupKeyWriteOrReply(c,c->argv[1],shared.czero);
if (subject == NULL || checkType(c,subject,OBJ_LIST)) return;
listTypeIterator *li;
if (toremove < 0) {
toremove = -toremove;
2015-07-27 03:41:48 -04:00
li = listTypeInitIterator(subject,-1,LIST_HEAD);
} else {
2015-07-27 03:41:48 -04:00
li = listTypeInitIterator(subject,0,LIST_TAIL);
}
listTypeEntry entry;
while (listTypeNext(li,&entry)) {
if (listTypeEqual(&entry,obj)) {
listTypeDelete(li, &entry);
server.dirty++;
removed++;
if (toremove && removed == toremove) break;
}
}
listTypeReleaseIterator(li);
2016-02-02 12:58:19 -05:00
if (removed) {
signalModifiedKey(c->db,c->argv[1]);
notifyKeyspaceEvent(NOTIFY_LIST,"lrem",c->argv[1],c->db->id);
2016-02-02 12:58:19 -05:00
}
if (listTypeLength(subject) == 0) {
dbDelete(c->db,c->argv[1]);
2016-02-02 12:58:19 -05:00
notifyKeyspaceEvent(NOTIFY_GENERIC,"del",c->argv[1],c->db->id);
}
addReplyLongLong(c,removed);
}
/* This is the semantic of this command:
* RPOPLPUSH srclist dstlist:
* IF LLEN(srclist) > 0
* element = RPOP srclist
* LPUSH dstlist element
* RETURN element
* ELSE
* RETURN nil
* END
* END
*
* The idea is to be able to get an element from a list in a reliable way
* since the element is not just returned but pushed against another list
* as well. This command was originally proposed by Ezra Zygmuntowicz.
*/
void rpoplpushHandlePush(client *c, robj *dstkey, robj *dstobj, robj *value) {
A reimplementation of blocking operation internals. Redis provides support for blocking operations such as BLPOP or BRPOP. This operations are identical to normal LPOP and RPOP operations as long as there are elements in the target list, but if the list is empty they block waiting for new data to arrive to the list. All the clients blocked waiting for th same list are served in a FIFO way, so the first that blocked is the first to be served when there is more data pushed by another client into the list. The previous implementation of blocking operations was conceived to serve clients in the context of push operations. For for instance: 1) There is a client "A" blocked on list "foo". 2) The client "B" performs `LPUSH foo somevalue`. 3) The client "A" is served in the context of the "B" LPUSH, synchronously. Processing things in a synchronous way was useful as if "A" pushes a value that is served by "B", from the point of view of the database is a NOP (no operation) thing, that is, nothing is replicated, nothing is written in the AOF file, and so forth. However later we implemented two things: 1) Variadic LPUSH that could add multiple values to a list in the context of a single call. 2) BRPOPLPUSH that was a version of BRPOP that also provided a "PUSH" side effect when receiving data. This forced us to make the synchronous implementation more complex. If client "B" is waiting for data, and "A" pushes three elemnents in a single call, we needed to propagate an LPUSH with a missing argument in the AOF and replication link. We also needed to make sure to replicate the LPUSH side of BRPOPLPUSH, but only if in turn did not happened to serve another blocking client into another list ;) This were complex but with a few of mutually recursive functions everything worked as expected... until one day we introduced scripting in Redis. Scripting + synchronous blocking operations = Issue #614. Basically you can't "rewrite" a script to have just a partial effect on the replicas and AOF file if the script happened to serve a few blocked clients. The solution to all this problems, implemented by this commit, is to change the way we serve blocked clients. Instead of serving the blocked clients synchronously, in the context of the command performing the PUSH operation, it is now an asynchronous and iterative process: 1) If a key that has clients blocked waiting for data is the subject of a list push operation, We simply mark keys as "ready" and put it into a queue. 2) Every command pushing stuff on lists, as a variadic LPUSH, a script, or whatever it is, is replicated verbatim without any rewriting. 3) Every time a Redis command, a MULTI/EXEC block, or a script, completed its execution, we run the list of keys ready to serve blocked clients (as more data arrived), and process this list serving the blocked clients. 4) As a result of "3" maybe more keys are ready again for other clients (as a result of BRPOPLPUSH we may have push operations), so we iterate back to step "3" if it's needed. The new code has a much simpler semantics, and a simpler to understand implementation, with the disadvantage of not being able to "optmize out" a PUSH+BPOP as a No OP. This commit will be tested with care before the final merge, more tests will be added likely.
2012-09-04 04:37:49 -04:00
/* Create the list if the key does not exist */
if (!dstobj) {
dstobj = createQuicklistObject();
quicklistSetOptions(dstobj->ptr, server.list_max_ziplist_size,
server.list_compress_depth);
A reimplementation of blocking operation internals. Redis provides support for blocking operations such as BLPOP or BRPOP. This operations are identical to normal LPOP and RPOP operations as long as there are elements in the target list, but if the list is empty they block waiting for new data to arrive to the list. All the clients blocked waiting for th same list are served in a FIFO way, so the first that blocked is the first to be served when there is more data pushed by another client into the list. The previous implementation of blocking operations was conceived to serve clients in the context of push operations. For for instance: 1) There is a client "A" blocked on list "foo". 2) The client "B" performs `LPUSH foo somevalue`. 3) The client "A" is served in the context of the "B" LPUSH, synchronously. Processing things in a synchronous way was useful as if "A" pushes a value that is served by "B", from the point of view of the database is a NOP (no operation) thing, that is, nothing is replicated, nothing is written in the AOF file, and so forth. However later we implemented two things: 1) Variadic LPUSH that could add multiple values to a list in the context of a single call. 2) BRPOPLPUSH that was a version of BRPOP that also provided a "PUSH" side effect when receiving data. This forced us to make the synchronous implementation more complex. If client "B" is waiting for data, and "A" pushes three elemnents in a single call, we needed to propagate an LPUSH with a missing argument in the AOF and replication link. We also needed to make sure to replicate the LPUSH side of BRPOPLPUSH, but only if in turn did not happened to serve another blocking client into another list ;) This were complex but with a few of mutually recursive functions everything worked as expected... until one day we introduced scripting in Redis. Scripting + synchronous blocking operations = Issue #614. Basically you can't "rewrite" a script to have just a partial effect on the replicas and AOF file if the script happened to serve a few blocked clients. The solution to all this problems, implemented by this commit, is to change the way we serve blocked clients. Instead of serving the blocked clients synchronously, in the context of the command performing the PUSH operation, it is now an asynchronous and iterative process: 1) If a key that has clients blocked waiting for data is the subject of a list push operation, We simply mark keys as "ready" and put it into a queue. 2) Every command pushing stuff on lists, as a variadic LPUSH, a script, or whatever it is, is replicated verbatim without any rewriting. 3) Every time a Redis command, a MULTI/EXEC block, or a script, completed its execution, we run the list of keys ready to serve blocked clients (as more data arrived), and process this list serving the blocked clients. 4) As a result of "3" maybe more keys are ready again for other clients (as a result of BRPOPLPUSH we may have push operations), so we iterate back to step "3" if it's needed. The new code has a much simpler semantics, and a simpler to understand implementation, with the disadvantage of not being able to "optmize out" a PUSH+BPOP as a No OP. This commit will be tested with care before the final merge, more tests will be added likely.
2012-09-04 04:37:49 -04:00
dbAdd(c->db,dstkey,dstobj);
}
A reimplementation of blocking operation internals. Redis provides support for blocking operations such as BLPOP or BRPOP. This operations are identical to normal LPOP and RPOP operations as long as there are elements in the target list, but if the list is empty they block waiting for new data to arrive to the list. All the clients blocked waiting for th same list are served in a FIFO way, so the first that blocked is the first to be served when there is more data pushed by another client into the list. The previous implementation of blocking operations was conceived to serve clients in the context of push operations. For for instance: 1) There is a client "A" blocked on list "foo". 2) The client "B" performs `LPUSH foo somevalue`. 3) The client "A" is served in the context of the "B" LPUSH, synchronously. Processing things in a synchronous way was useful as if "A" pushes a value that is served by "B", from the point of view of the database is a NOP (no operation) thing, that is, nothing is replicated, nothing is written in the AOF file, and so forth. However later we implemented two things: 1) Variadic LPUSH that could add multiple values to a list in the context of a single call. 2) BRPOPLPUSH that was a version of BRPOP that also provided a "PUSH" side effect when receiving data. This forced us to make the synchronous implementation more complex. If client "B" is waiting for data, and "A" pushes three elemnents in a single call, we needed to propagate an LPUSH with a missing argument in the AOF and replication link. We also needed to make sure to replicate the LPUSH side of BRPOPLPUSH, but only if in turn did not happened to serve another blocking client into another list ;) This were complex but with a few of mutually recursive functions everything worked as expected... until one day we introduced scripting in Redis. Scripting + synchronous blocking operations = Issue #614. Basically you can't "rewrite" a script to have just a partial effect on the replicas and AOF file if the script happened to serve a few blocked clients. The solution to all this problems, implemented by this commit, is to change the way we serve blocked clients. Instead of serving the blocked clients synchronously, in the context of the command performing the PUSH operation, it is now an asynchronous and iterative process: 1) If a key that has clients blocked waiting for data is the subject of a list push operation, We simply mark keys as "ready" and put it into a queue. 2) Every command pushing stuff on lists, as a variadic LPUSH, a script, or whatever it is, is replicated verbatim without any rewriting. 3) Every time a Redis command, a MULTI/EXEC block, or a script, completed its execution, we run the list of keys ready to serve blocked clients (as more data arrived), and process this list serving the blocked clients. 4) As a result of "3" maybe more keys are ready again for other clients (as a result of BRPOPLPUSH we may have push operations), so we iterate back to step "3" if it's needed. The new code has a much simpler semantics, and a simpler to understand implementation, with the disadvantage of not being able to "optmize out" a PUSH+BPOP as a No OP. This commit will be tested with care before the final merge, more tests will be added likely.
2012-09-04 04:37:49 -04:00
signalModifiedKey(c->db,dstkey);
2015-07-27 03:41:48 -04:00
listTypePush(dstobj,value,LIST_HEAD);
notifyKeyspaceEvent(NOTIFY_LIST,"lpush",dstkey,c->db->id);
/* Always send the pushed value to the client. */
addReplyBulk(c,value);
}
void rpoplpushCommand(client *c) {
robj *sobj, *value;
2018-11-30 03:41:54 -05:00
if ((sobj = lookupKeyWriteOrReply(c,c->argv[1],shared.null[c->resp]))
== NULL || checkType(c,sobj,OBJ_LIST)) return;
if (listTypeLength(sobj) == 0) {
/* This may only happen after loading very old RDB files. Recent
* versions of Redis delete keys of empty lists. */
2018-11-30 03:41:54 -05:00
addReplyNull(c);
} else {
robj *dobj = lookupKeyWrite(c->db,c->argv[2]);
robj *touchedkey = c->argv[1];
if (dobj && checkType(c,dobj,OBJ_LIST)) return;
2015-07-27 03:41:48 -04:00
value = listTypePop(sobj,LIST_TAIL);
/* We saved touched key, and protect it, since rpoplpushHandlePush
A reimplementation of blocking operation internals. Redis provides support for blocking operations such as BLPOP or BRPOP. This operations are identical to normal LPOP and RPOP operations as long as there are elements in the target list, but if the list is empty they block waiting for new data to arrive to the list. All the clients blocked waiting for th same list are served in a FIFO way, so the first that blocked is the first to be served when there is more data pushed by another client into the list. The previous implementation of blocking operations was conceived to serve clients in the context of push operations. For for instance: 1) There is a client "A" blocked on list "foo". 2) The client "B" performs `LPUSH foo somevalue`. 3) The client "A" is served in the context of the "B" LPUSH, synchronously. Processing things in a synchronous way was useful as if "A" pushes a value that is served by "B", from the point of view of the database is a NOP (no operation) thing, that is, nothing is replicated, nothing is written in the AOF file, and so forth. However later we implemented two things: 1) Variadic LPUSH that could add multiple values to a list in the context of a single call. 2) BRPOPLPUSH that was a version of BRPOP that also provided a "PUSH" side effect when receiving data. This forced us to make the synchronous implementation more complex. If client "B" is waiting for data, and "A" pushes three elemnents in a single call, we needed to propagate an LPUSH with a missing argument in the AOF and replication link. We also needed to make sure to replicate the LPUSH side of BRPOPLPUSH, but only if in turn did not happened to serve another blocking client into another list ;) This were complex but with a few of mutually recursive functions everything worked as expected... until one day we introduced scripting in Redis. Scripting + synchronous blocking operations = Issue #614. Basically you can't "rewrite" a script to have just a partial effect on the replicas and AOF file if the script happened to serve a few blocked clients. The solution to all this problems, implemented by this commit, is to change the way we serve blocked clients. Instead of serving the blocked clients synchronously, in the context of the command performing the PUSH operation, it is now an asynchronous and iterative process: 1) If a key that has clients blocked waiting for data is the subject of a list push operation, We simply mark keys as "ready" and put it into a queue. 2) Every command pushing stuff on lists, as a variadic LPUSH, a script, or whatever it is, is replicated verbatim without any rewriting. 3) Every time a Redis command, a MULTI/EXEC block, or a script, completed its execution, we run the list of keys ready to serve blocked clients (as more data arrived), and process this list serving the blocked clients. 4) As a result of "3" maybe more keys are ready again for other clients (as a result of BRPOPLPUSH we may have push operations), so we iterate back to step "3" if it's needed. The new code has a much simpler semantics, and a simpler to understand implementation, with the disadvantage of not being able to "optmize out" a PUSH+BPOP as a No OP. This commit will be tested with care before the final merge, more tests will be added likely.
2012-09-04 04:37:49 -04:00
* may change the client command argument vector (it does not
* currently). */
incrRefCount(touchedkey);
A reimplementation of blocking operation internals. Redis provides support for blocking operations such as BLPOP or BRPOP. This operations are identical to normal LPOP and RPOP operations as long as there are elements in the target list, but if the list is empty they block waiting for new data to arrive to the list. All the clients blocked waiting for th same list are served in a FIFO way, so the first that blocked is the first to be served when there is more data pushed by another client into the list. The previous implementation of blocking operations was conceived to serve clients in the context of push operations. For for instance: 1) There is a client "A" blocked on list "foo". 2) The client "B" performs `LPUSH foo somevalue`. 3) The client "A" is served in the context of the "B" LPUSH, synchronously. Processing things in a synchronous way was useful as if "A" pushes a value that is served by "B", from the point of view of the database is a NOP (no operation) thing, that is, nothing is replicated, nothing is written in the AOF file, and so forth. However later we implemented two things: 1) Variadic LPUSH that could add multiple values to a list in the context of a single call. 2) BRPOPLPUSH that was a version of BRPOP that also provided a "PUSH" side effect when receiving data. This forced us to make the synchronous implementation more complex. If client "B" is waiting for data, and "A" pushes three elemnents in a single call, we needed to propagate an LPUSH with a missing argument in the AOF and replication link. We also needed to make sure to replicate the LPUSH side of BRPOPLPUSH, but only if in turn did not happened to serve another blocking client into another list ;) This were complex but with a few of mutually recursive functions everything worked as expected... until one day we introduced scripting in Redis. Scripting + synchronous blocking operations = Issue #614. Basically you can't "rewrite" a script to have just a partial effect on the replicas and AOF file if the script happened to serve a few blocked clients. The solution to all this problems, implemented by this commit, is to change the way we serve blocked clients. Instead of serving the blocked clients synchronously, in the context of the command performing the PUSH operation, it is now an asynchronous and iterative process: 1) If a key that has clients blocked waiting for data is the subject of a list push operation, We simply mark keys as "ready" and put it into a queue. 2) Every command pushing stuff on lists, as a variadic LPUSH, a script, or whatever it is, is replicated verbatim without any rewriting. 3) Every time a Redis command, a MULTI/EXEC block, or a script, completed its execution, we run the list of keys ready to serve blocked clients (as more data arrived), and process this list serving the blocked clients. 4) As a result of "3" maybe more keys are ready again for other clients (as a result of BRPOPLPUSH we may have push operations), so we iterate back to step "3" if it's needed. The new code has a much simpler semantics, and a simpler to understand implementation, with the disadvantage of not being able to "optmize out" a PUSH+BPOP as a No OP. This commit will be tested with care before the final merge, more tests will be added likely.
2012-09-04 04:37:49 -04:00
rpoplpushHandlePush(c,c->argv[2],dobj,value);
/* listTypePop returns an object with its refcount incremented */
decrRefCount(value);
/* Delete the source list when it is empty */
2015-07-27 03:41:48 -04:00
notifyKeyspaceEvent(NOTIFY_LIST,"rpop",touchedkey,c->db->id);
if (listTypeLength(sobj) == 0) {
dbDelete(c->db,touchedkey);
2015-07-27 03:41:48 -04:00
notifyKeyspaceEvent(NOTIFY_GENERIC,"del",
touchedkey,c->db->id);
}
signalModifiedKey(c->db,touchedkey);
decrRefCount(touchedkey);
server.dirty++;
if (c->cmd->proc == brpoplpushCommand) {
rewriteClientCommandVector(c,3,shared.rpoplpush,c->argv[1],c->argv[2]);
}
}
}
/*-----------------------------------------------------------------------------
* Blocking POP operations
*----------------------------------------------------------------------------*/
/* This is a helper function for handleClientsBlockedOnKeys(). It's work
A reimplementation of blocking operation internals. Redis provides support for blocking operations such as BLPOP or BRPOP. This operations are identical to normal LPOP and RPOP operations as long as there are elements in the target list, but if the list is empty they block waiting for new data to arrive to the list. All the clients blocked waiting for th same list are served in a FIFO way, so the first that blocked is the first to be served when there is more data pushed by another client into the list. The previous implementation of blocking operations was conceived to serve clients in the context of push operations. For for instance: 1) There is a client "A" blocked on list "foo". 2) The client "B" performs `LPUSH foo somevalue`. 3) The client "A" is served in the context of the "B" LPUSH, synchronously. Processing things in a synchronous way was useful as if "A" pushes a value that is served by "B", from the point of view of the database is a NOP (no operation) thing, that is, nothing is replicated, nothing is written in the AOF file, and so forth. However later we implemented two things: 1) Variadic LPUSH that could add multiple values to a list in the context of a single call. 2) BRPOPLPUSH that was a version of BRPOP that also provided a "PUSH" side effect when receiving data. This forced us to make the synchronous implementation more complex. If client "B" is waiting for data, and "A" pushes three elemnents in a single call, we needed to propagate an LPUSH with a missing argument in the AOF and replication link. We also needed to make sure to replicate the LPUSH side of BRPOPLPUSH, but only if in turn did not happened to serve another blocking client into another list ;) This were complex but with a few of mutually recursive functions everything worked as expected... until one day we introduced scripting in Redis. Scripting + synchronous blocking operations = Issue #614. Basically you can't "rewrite" a script to have just a partial effect on the replicas and AOF file if the script happened to serve a few blocked clients. The solution to all this problems, implemented by this commit, is to change the way we serve blocked clients. Instead of serving the blocked clients synchronously, in the context of the command performing the PUSH operation, it is now an asynchronous and iterative process: 1) If a key that has clients blocked waiting for data is the subject of a list push operation, We simply mark keys as "ready" and put it into a queue. 2) Every command pushing stuff on lists, as a variadic LPUSH, a script, or whatever it is, is replicated verbatim without any rewriting. 3) Every time a Redis command, a MULTI/EXEC block, or a script, completed its execution, we run the list of keys ready to serve blocked clients (as more data arrived), and process this list serving the blocked clients. 4) As a result of "3" maybe more keys are ready again for other clients (as a result of BRPOPLPUSH we may have push operations), so we iterate back to step "3" if it's needed. The new code has a much simpler semantics, and a simpler to understand implementation, with the disadvantage of not being able to "optmize out" a PUSH+BPOP as a No OP. This commit will be tested with care before the final merge, more tests will be added likely.
2012-09-04 04:37:49 -04:00
* is to serve a specific client (receiver) that is blocked on 'key'
* in the context of the specified 'db', doing the following:
*
A reimplementation of blocking operation internals. Redis provides support for blocking operations such as BLPOP or BRPOP. This operations are identical to normal LPOP and RPOP operations as long as there are elements in the target list, but if the list is empty they block waiting for new data to arrive to the list. All the clients blocked waiting for th same list are served in a FIFO way, so the first that blocked is the first to be served when there is more data pushed by another client into the list. The previous implementation of blocking operations was conceived to serve clients in the context of push operations. For for instance: 1) There is a client "A" blocked on list "foo". 2) The client "B" performs `LPUSH foo somevalue`. 3) The client "A" is served in the context of the "B" LPUSH, synchronously. Processing things in a synchronous way was useful as if "A" pushes a value that is served by "B", from the point of view of the database is a NOP (no operation) thing, that is, nothing is replicated, nothing is written in the AOF file, and so forth. However later we implemented two things: 1) Variadic LPUSH that could add multiple values to a list in the context of a single call. 2) BRPOPLPUSH that was a version of BRPOP that also provided a "PUSH" side effect when receiving data. This forced us to make the synchronous implementation more complex. If client "B" is waiting for data, and "A" pushes three elemnents in a single call, we needed to propagate an LPUSH with a missing argument in the AOF and replication link. We also needed to make sure to replicate the LPUSH side of BRPOPLPUSH, but only if in turn did not happened to serve another blocking client into another list ;) This were complex but with a few of mutually recursive functions everything worked as expected... until one day we introduced scripting in Redis. Scripting + synchronous blocking operations = Issue #614. Basically you can't "rewrite" a script to have just a partial effect on the replicas and AOF file if the script happened to serve a few blocked clients. The solution to all this problems, implemented by this commit, is to change the way we serve blocked clients. Instead of serving the blocked clients synchronously, in the context of the command performing the PUSH operation, it is now an asynchronous and iterative process: 1) If a key that has clients blocked waiting for data is the subject of a list push operation, We simply mark keys as "ready" and put it into a queue. 2) Every command pushing stuff on lists, as a variadic LPUSH, a script, or whatever it is, is replicated verbatim without any rewriting. 3) Every time a Redis command, a MULTI/EXEC block, or a script, completed its execution, we run the list of keys ready to serve blocked clients (as more data arrived), and process this list serving the blocked clients. 4) As a result of "3" maybe more keys are ready again for other clients (as a result of BRPOPLPUSH we may have push operations), so we iterate back to step "3" if it's needed. The new code has a much simpler semantics, and a simpler to understand implementation, with the disadvantage of not being able to "optmize out" a PUSH+BPOP as a No OP. This commit will be tested with care before the final merge, more tests will be added likely.
2012-09-04 04:37:49 -04:00
* 1) Provide the client with the 'value' element.
* 2) If the dstkey is not NULL (we are serving a BRPOPLPUSH) also push the
2013-01-16 12:00:20 -05:00
* 'value' element on the destination list (the LPUSH side of the command).
A reimplementation of blocking operation internals. Redis provides support for blocking operations such as BLPOP or BRPOP. This operations are identical to normal LPOP and RPOP operations as long as there are elements in the target list, but if the list is empty they block waiting for new data to arrive to the list. All the clients blocked waiting for th same list are served in a FIFO way, so the first that blocked is the first to be served when there is more data pushed by another client into the list. The previous implementation of blocking operations was conceived to serve clients in the context of push operations. For for instance: 1) There is a client "A" blocked on list "foo". 2) The client "B" performs `LPUSH foo somevalue`. 3) The client "A" is served in the context of the "B" LPUSH, synchronously. Processing things in a synchronous way was useful as if "A" pushes a value that is served by "B", from the point of view of the database is a NOP (no operation) thing, that is, nothing is replicated, nothing is written in the AOF file, and so forth. However later we implemented two things: 1) Variadic LPUSH that could add multiple values to a list in the context of a single call. 2) BRPOPLPUSH that was a version of BRPOP that also provided a "PUSH" side effect when receiving data. This forced us to make the synchronous implementation more complex. If client "B" is waiting for data, and "A" pushes three elemnents in a single call, we needed to propagate an LPUSH with a missing argument in the AOF and replication link. We also needed to make sure to replicate the LPUSH side of BRPOPLPUSH, but only if in turn did not happened to serve another blocking client into another list ;) This were complex but with a few of mutually recursive functions everything worked as expected... until one day we introduced scripting in Redis. Scripting + synchronous blocking operations = Issue #614. Basically you can't "rewrite" a script to have just a partial effect on the replicas and AOF file if the script happened to serve a few blocked clients. The solution to all this problems, implemented by this commit, is to change the way we serve blocked clients. Instead of serving the blocked clients synchronously, in the context of the command performing the PUSH operation, it is now an asynchronous and iterative process: 1) If a key that has clients blocked waiting for data is the subject of a list push operation, We simply mark keys as "ready" and put it into a queue. 2) Every command pushing stuff on lists, as a variadic LPUSH, a script, or whatever it is, is replicated verbatim without any rewriting. 3) Every time a Redis command, a MULTI/EXEC block, or a script, completed its execution, we run the list of keys ready to serve blocked clients (as more data arrived), and process this list serving the blocked clients. 4) As a result of "3" maybe more keys are ready again for other clients (as a result of BRPOPLPUSH we may have push operations), so we iterate back to step "3" if it's needed. The new code has a much simpler semantics, and a simpler to understand implementation, with the disadvantage of not being able to "optmize out" a PUSH+BPOP as a No OP. This commit will be tested with care before the final merge, more tests will be added likely.
2012-09-04 04:37:49 -04:00
* 3) Propagate the resulting BRPOP, BLPOP and additional LPUSH if any into
* the AOF and replication channel.
*
2015-07-27 03:41:48 -04:00
* The argument 'where' is LIST_TAIL or LIST_HEAD, and indicates if the
* 'value' element was popped from the head (BLPOP) or tail (BRPOP) so that
A reimplementation of blocking operation internals. Redis provides support for blocking operations such as BLPOP or BRPOP. This operations are identical to normal LPOP and RPOP operations as long as there are elements in the target list, but if the list is empty they block waiting for new data to arrive to the list. All the clients blocked waiting for th same list are served in a FIFO way, so the first that blocked is the first to be served when there is more data pushed by another client into the list. The previous implementation of blocking operations was conceived to serve clients in the context of push operations. For for instance: 1) There is a client "A" blocked on list "foo". 2) The client "B" performs `LPUSH foo somevalue`. 3) The client "A" is served in the context of the "B" LPUSH, synchronously. Processing things in a synchronous way was useful as if "A" pushes a value that is served by "B", from the point of view of the database is a NOP (no operation) thing, that is, nothing is replicated, nothing is written in the AOF file, and so forth. However later we implemented two things: 1) Variadic LPUSH that could add multiple values to a list in the context of a single call. 2) BRPOPLPUSH that was a version of BRPOP that also provided a "PUSH" side effect when receiving data. This forced us to make the synchronous implementation more complex. If client "B" is waiting for data, and "A" pushes three elemnents in a single call, we needed to propagate an LPUSH with a missing argument in the AOF and replication link. We also needed to make sure to replicate the LPUSH side of BRPOPLPUSH, but only if in turn did not happened to serve another blocking client into another list ;) This were complex but with a few of mutually recursive functions everything worked as expected... until one day we introduced scripting in Redis. Scripting + synchronous blocking operations = Issue #614. Basically you can't "rewrite" a script to have just a partial effect on the replicas and AOF file if the script happened to serve a few blocked clients. The solution to all this problems, implemented by this commit, is to change the way we serve blocked clients. Instead of serving the blocked clients synchronously, in the context of the command performing the PUSH operation, it is now an asynchronous and iterative process: 1) If a key that has clients blocked waiting for data is the subject of a list push operation, We simply mark keys as "ready" and put it into a queue. 2) Every command pushing stuff on lists, as a variadic LPUSH, a script, or whatever it is, is replicated verbatim without any rewriting. 3) Every time a Redis command, a MULTI/EXEC block, or a script, completed its execution, we run the list of keys ready to serve blocked clients (as more data arrived), and process this list serving the blocked clients. 4) As a result of "3" maybe more keys are ready again for other clients (as a result of BRPOPLPUSH we may have push operations), so we iterate back to step "3" if it's needed. The new code has a much simpler semantics, and a simpler to understand implementation, with the disadvantage of not being able to "optmize out" a PUSH+BPOP as a No OP. This commit will be tested with care before the final merge, more tests will be added likely.
2012-09-04 04:37:49 -04:00
* we can propagate the command properly.
*
* The function returns C_OK if we are able to serve the client, otherwise
* C_ERR is returned to signal the caller that the list POP operation
2013-01-16 12:00:20 -05:00
* should be undone as the client was not served: This only happens for
A reimplementation of blocking operation internals. Redis provides support for blocking operations such as BLPOP or BRPOP. This operations are identical to normal LPOP and RPOP operations as long as there are elements in the target list, but if the list is empty they block waiting for new data to arrive to the list. All the clients blocked waiting for th same list are served in a FIFO way, so the first that blocked is the first to be served when there is more data pushed by another client into the list. The previous implementation of blocking operations was conceived to serve clients in the context of push operations. For for instance: 1) There is a client "A" blocked on list "foo". 2) The client "B" performs `LPUSH foo somevalue`. 3) The client "A" is served in the context of the "B" LPUSH, synchronously. Processing things in a synchronous way was useful as if "A" pushes a value that is served by "B", from the point of view of the database is a NOP (no operation) thing, that is, nothing is replicated, nothing is written in the AOF file, and so forth. However later we implemented two things: 1) Variadic LPUSH that could add multiple values to a list in the context of a single call. 2) BRPOPLPUSH that was a version of BRPOP that also provided a "PUSH" side effect when receiving data. This forced us to make the synchronous implementation more complex. If client "B" is waiting for data, and "A" pushes three elemnents in a single call, we needed to propagate an LPUSH with a missing argument in the AOF and replication link. We also needed to make sure to replicate the LPUSH side of BRPOPLPUSH, but only if in turn did not happened to serve another blocking client into another list ;) This were complex but with a few of mutually recursive functions everything worked as expected... until one day we introduced scripting in Redis. Scripting + synchronous blocking operations = Issue #614. Basically you can't "rewrite" a script to have just a partial effect on the replicas and AOF file if the script happened to serve a few blocked clients. The solution to all this problems, implemented by this commit, is to change the way we serve blocked clients. Instead of serving the blocked clients synchronously, in the context of the command performing the PUSH operation, it is now an asynchronous and iterative process: 1) If a key that has clients blocked waiting for data is the subject of a list push operation, We simply mark keys as "ready" and put it into a queue. 2) Every command pushing stuff on lists, as a variadic LPUSH, a script, or whatever it is, is replicated verbatim without any rewriting. 3) Every time a Redis command, a MULTI/EXEC block, or a script, completed its execution, we run the list of keys ready to serve blocked clients (as more data arrived), and process this list serving the blocked clients. 4) As a result of "3" maybe more keys are ready again for other clients (as a result of BRPOPLPUSH we may have push operations), so we iterate back to step "3" if it's needed. The new code has a much simpler semantics, and a simpler to understand implementation, with the disadvantage of not being able to "optmize out" a PUSH+BPOP as a No OP. This commit will be tested with care before the final merge, more tests will be added likely.
2012-09-04 04:37:49 -04:00
* BRPOPLPUSH that fails to push the value to the destination key as it is
* of the wrong type. */
int serveClientBlockedOnList(client *receiver, robj *key, robj *dstkey, redisDb *db, robj *value, int where)
A reimplementation of blocking operation internals. Redis provides support for blocking operations such as BLPOP or BRPOP. This operations are identical to normal LPOP and RPOP operations as long as there are elements in the target list, but if the list is empty they block waiting for new data to arrive to the list. All the clients blocked waiting for th same list are served in a FIFO way, so the first that blocked is the first to be served when there is more data pushed by another client into the list. The previous implementation of blocking operations was conceived to serve clients in the context of push operations. For for instance: 1) There is a client "A" blocked on list "foo". 2) The client "B" performs `LPUSH foo somevalue`. 3) The client "A" is served in the context of the "B" LPUSH, synchronously. Processing things in a synchronous way was useful as if "A" pushes a value that is served by "B", from the point of view of the database is a NOP (no operation) thing, that is, nothing is replicated, nothing is written in the AOF file, and so forth. However later we implemented two things: 1) Variadic LPUSH that could add multiple values to a list in the context of a single call. 2) BRPOPLPUSH that was a version of BRPOP that also provided a "PUSH" side effect when receiving data. This forced us to make the synchronous implementation more complex. If client "B" is waiting for data, and "A" pushes three elemnents in a single call, we needed to propagate an LPUSH with a missing argument in the AOF and replication link. We also needed to make sure to replicate the LPUSH side of BRPOPLPUSH, but only if in turn did not happened to serve another blocking client into another list ;) This were complex but with a few of mutually recursive functions everything worked as expected... until one day we introduced scripting in Redis. Scripting + synchronous blocking operations = Issue #614. Basically you can't "rewrite" a script to have just a partial effect on the replicas and AOF file if the script happened to serve a few blocked clients. The solution to all this problems, implemented by this commit, is to change the way we serve blocked clients. Instead of serving the blocked clients synchronously, in the context of the command performing the PUSH operation, it is now an asynchronous and iterative process: 1) If a key that has clients blocked waiting for data is the subject of a list push operation, We simply mark keys as "ready" and put it into a queue. 2) Every command pushing stuff on lists, as a variadic LPUSH, a script, or whatever it is, is replicated verbatim without any rewriting. 3) Every time a Redis command, a MULTI/EXEC block, or a script, completed its execution, we run the list of keys ready to serve blocked clients (as more data arrived), and process this list serving the blocked clients. 4) As a result of "3" maybe more keys are ready again for other clients (as a result of BRPOPLPUSH we may have push operations), so we iterate back to step "3" if it's needed. The new code has a much simpler semantics, and a simpler to understand implementation, with the disadvantage of not being able to "optmize out" a PUSH+BPOP as a No OP. This commit will be tested with care before the final merge, more tests will be added likely.
2012-09-04 04:37:49 -04:00
{
robj *argv[3];
if (dstkey == NULL) {
/* Propagate the [LR]POP operation. */
2015-07-27 03:41:48 -04:00
argv[0] = (where == LIST_HEAD) ? shared.lpop :
A reimplementation of blocking operation internals. Redis provides support for blocking operations such as BLPOP or BRPOP. This operations are identical to normal LPOP and RPOP operations as long as there are elements in the target list, but if the list is empty they block waiting for new data to arrive to the list. All the clients blocked waiting for th same list are served in a FIFO way, so the first that blocked is the first to be served when there is more data pushed by another client into the list. The previous implementation of blocking operations was conceived to serve clients in the context of push operations. For for instance: 1) There is a client "A" blocked on list "foo". 2) The client "B" performs `LPUSH foo somevalue`. 3) The client "A" is served in the context of the "B" LPUSH, synchronously. Processing things in a synchronous way was useful as if "A" pushes a value that is served by "B", from the point of view of the database is a NOP (no operation) thing, that is, nothing is replicated, nothing is written in the AOF file, and so forth. However later we implemented two things: 1) Variadic LPUSH that could add multiple values to a list in the context of a single call. 2) BRPOPLPUSH that was a version of BRPOP that also provided a "PUSH" side effect when receiving data. This forced us to make the synchronous implementation more complex. If client "B" is waiting for data, and "A" pushes three elemnents in a single call, we needed to propagate an LPUSH with a missing argument in the AOF and replication link. We also needed to make sure to replicate the LPUSH side of BRPOPLPUSH, but only if in turn did not happened to serve another blocking client into another list ;) This were complex but with a few of mutually recursive functions everything worked as expected... until one day we introduced scripting in Redis. Scripting + synchronous blocking operations = Issue #614. Basically you can't "rewrite" a script to have just a partial effect on the replicas and AOF file if the script happened to serve a few blocked clients. The solution to all this problems, implemented by this commit, is to change the way we serve blocked clients. Instead of serving the blocked clients synchronously, in the context of the command performing the PUSH operation, it is now an asynchronous and iterative process: 1) If a key that has clients blocked waiting for data is the subject of a list push operation, We simply mark keys as "ready" and put it into a queue. 2) Every command pushing stuff on lists, as a variadic LPUSH, a script, or whatever it is, is replicated verbatim without any rewriting. 3) Every time a Redis command, a MULTI/EXEC block, or a script, completed its execution, we run the list of keys ready to serve blocked clients (as more data arrived), and process this list serving the blocked clients. 4) As a result of "3" maybe more keys are ready again for other clients (as a result of BRPOPLPUSH we may have push operations), so we iterate back to step "3" if it's needed. The new code has a much simpler semantics, and a simpler to understand implementation, with the disadvantage of not being able to "optmize out" a PUSH+BPOP as a No OP. This commit will be tested with care before the final merge, more tests will be added likely.
2012-09-04 04:37:49 -04:00
shared.rpop;
argv[1] = key;
2015-07-27 03:41:48 -04:00
propagate((where == LIST_HEAD) ?
A reimplementation of blocking operation internals. Redis provides support for blocking operations such as BLPOP or BRPOP. This operations are identical to normal LPOP and RPOP operations as long as there are elements in the target list, but if the list is empty they block waiting for new data to arrive to the list. All the clients blocked waiting for th same list are served in a FIFO way, so the first that blocked is the first to be served when there is more data pushed by another client into the list. The previous implementation of blocking operations was conceived to serve clients in the context of push operations. For for instance: 1) There is a client "A" blocked on list "foo". 2) The client "B" performs `LPUSH foo somevalue`. 3) The client "A" is served in the context of the "B" LPUSH, synchronously. Processing things in a synchronous way was useful as if "A" pushes a value that is served by "B", from the point of view of the database is a NOP (no operation) thing, that is, nothing is replicated, nothing is written in the AOF file, and so forth. However later we implemented two things: 1) Variadic LPUSH that could add multiple values to a list in the context of a single call. 2) BRPOPLPUSH that was a version of BRPOP that also provided a "PUSH" side effect when receiving data. This forced us to make the synchronous implementation more complex. If client "B" is waiting for data, and "A" pushes three elemnents in a single call, we needed to propagate an LPUSH with a missing argument in the AOF and replication link. We also needed to make sure to replicate the LPUSH side of BRPOPLPUSH, but only if in turn did not happened to serve another blocking client into another list ;) This were complex but with a few of mutually recursive functions everything worked as expected... until one day we introduced scripting in Redis. Scripting + synchronous blocking operations = Issue #614. Basically you can't "rewrite" a script to have just a partial effect on the replicas and AOF file if the script happened to serve a few blocked clients. The solution to all this problems, implemented by this commit, is to change the way we serve blocked clients. Instead of serving the blocked clients synchronously, in the context of the command performing the PUSH operation, it is now an asynchronous and iterative process: 1) If a key that has clients blocked waiting for data is the subject of a list push operation, We simply mark keys as "ready" and put it into a queue. 2) Every command pushing stuff on lists, as a variadic LPUSH, a script, or whatever it is, is replicated verbatim without any rewriting. 3) Every time a Redis command, a MULTI/EXEC block, or a script, completed its execution, we run the list of keys ready to serve blocked clients (as more data arrived), and process this list serving the blocked clients. 4) As a result of "3" maybe more keys are ready again for other clients (as a result of BRPOPLPUSH we may have push operations), so we iterate back to step "3" if it's needed. The new code has a much simpler semantics, and a simpler to understand implementation, with the disadvantage of not being able to "optmize out" a PUSH+BPOP as a No OP. This commit will be tested with care before the final merge, more tests will be added likely.
2012-09-04 04:37:49 -04:00
server.lpopCommand : server.rpopCommand,
2015-07-27 03:41:48 -04:00
db->id,argv,2,PROPAGATE_AOF|PROPAGATE_REPL);
A reimplementation of blocking operation internals. Redis provides support for blocking operations such as BLPOP or BRPOP. This operations are identical to normal LPOP and RPOP operations as long as there are elements in the target list, but if the list is empty they block waiting for new data to arrive to the list. All the clients blocked waiting for th same list are served in a FIFO way, so the first that blocked is the first to be served when there is more data pushed by another client into the list. The previous implementation of blocking operations was conceived to serve clients in the context of push operations. For for instance: 1) There is a client "A" blocked on list "foo". 2) The client "B" performs `LPUSH foo somevalue`. 3) The client "A" is served in the context of the "B" LPUSH, synchronously. Processing things in a synchronous way was useful as if "A" pushes a value that is served by "B", from the point of view of the database is a NOP (no operation) thing, that is, nothing is replicated, nothing is written in the AOF file, and so forth. However later we implemented two things: 1) Variadic LPUSH that could add multiple values to a list in the context of a single call. 2) BRPOPLPUSH that was a version of BRPOP that also provided a "PUSH" side effect when receiving data. This forced us to make the synchronous implementation more complex. If client "B" is waiting for data, and "A" pushes three elemnents in a single call, we needed to propagate an LPUSH with a missing argument in the AOF and replication link. We also needed to make sure to replicate the LPUSH side of BRPOPLPUSH, but only if in turn did not happened to serve another blocking client into another list ;) This were complex but with a few of mutually recursive functions everything worked as expected... until one day we introduced scripting in Redis. Scripting + synchronous blocking operations = Issue #614. Basically you can't "rewrite" a script to have just a partial effect on the replicas and AOF file if the script happened to serve a few blocked clients. The solution to all this problems, implemented by this commit, is to change the way we serve blocked clients. Instead of serving the blocked clients synchronously, in the context of the command performing the PUSH operation, it is now an asynchronous and iterative process: 1) If a key that has clients blocked waiting for data is the subject of a list push operation, We simply mark keys as "ready" and put it into a queue. 2) Every command pushing stuff on lists, as a variadic LPUSH, a script, or whatever it is, is replicated verbatim without any rewriting. 3) Every time a Redis command, a MULTI/EXEC block, or a script, completed its execution, we run the list of keys ready to serve blocked clients (as more data arrived), and process this list serving the blocked clients. 4) As a result of "3" maybe more keys are ready again for other clients (as a result of BRPOPLPUSH we may have push operations), so we iterate back to step "3" if it's needed. The new code has a much simpler semantics, and a simpler to understand implementation, with the disadvantage of not being able to "optmize out" a PUSH+BPOP as a No OP. This commit will be tested with care before the final merge, more tests will be added likely.
2012-09-04 04:37:49 -04:00
/* BRPOP/BLPOP */
addReplyArrayLen(receiver,2);
A reimplementation of blocking operation internals. Redis provides support for blocking operations such as BLPOP or BRPOP. This operations are identical to normal LPOP and RPOP operations as long as there are elements in the target list, but if the list is empty they block waiting for new data to arrive to the list. All the clients blocked waiting for th same list are served in a FIFO way, so the first that blocked is the first to be served when there is more data pushed by another client into the list. The previous implementation of blocking operations was conceived to serve clients in the context of push operations. For for instance: 1) There is a client "A" blocked on list "foo". 2) The client "B" performs `LPUSH foo somevalue`. 3) The client "A" is served in the context of the "B" LPUSH, synchronously. Processing things in a synchronous way was useful as if "A" pushes a value that is served by "B", from the point of view of the database is a NOP (no operation) thing, that is, nothing is replicated, nothing is written in the AOF file, and so forth. However later we implemented two things: 1) Variadic LPUSH that could add multiple values to a list in the context of a single call. 2) BRPOPLPUSH that was a version of BRPOP that also provided a "PUSH" side effect when receiving data. This forced us to make the synchronous implementation more complex. If client "B" is waiting for data, and "A" pushes three elemnents in a single call, we needed to propagate an LPUSH with a missing argument in the AOF and replication link. We also needed to make sure to replicate the LPUSH side of BRPOPLPUSH, but only if in turn did not happened to serve another blocking client into another list ;) This were complex but with a few of mutually recursive functions everything worked as expected... until one day we introduced scripting in Redis. Scripting + synchronous blocking operations = Issue #614. Basically you can't "rewrite" a script to have just a partial effect on the replicas and AOF file if the script happened to serve a few blocked clients. The solution to all this problems, implemented by this commit, is to change the way we serve blocked clients. Instead of serving the blocked clients synchronously, in the context of the command performing the PUSH operation, it is now an asynchronous and iterative process: 1) If a key that has clients blocked waiting for data is the subject of a list push operation, We simply mark keys as "ready" and put it into a queue. 2) Every command pushing stuff on lists, as a variadic LPUSH, a script, or whatever it is, is replicated verbatim without any rewriting. 3) Every time a Redis command, a MULTI/EXEC block, or a script, completed its execution, we run the list of keys ready to serve blocked clients (as more data arrived), and process this list serving the blocked clients. 4) As a result of "3" maybe more keys are ready again for other clients (as a result of BRPOPLPUSH we may have push operations), so we iterate back to step "3" if it's needed. The new code has a much simpler semantics, and a simpler to understand implementation, with the disadvantage of not being able to "optmize out" a PUSH+BPOP as a No OP. This commit will be tested with care before the final merge, more tests will be added likely.
2012-09-04 04:37:49 -04:00
addReplyBulk(receiver,key);
addReplyBulk(receiver,value);
/* Notify event. */
char *event = (where == LIST_HEAD) ? "lpop" : "rpop";
notifyKeyspaceEvent(NOTIFY_LIST,event,key,receiver->db->id);
A reimplementation of blocking operation internals. Redis provides support for blocking operations such as BLPOP or BRPOP. This operations are identical to normal LPOP and RPOP operations as long as there are elements in the target list, but if the list is empty they block waiting for new data to arrive to the list. All the clients blocked waiting for th same list are served in a FIFO way, so the first that blocked is the first to be served when there is more data pushed by another client into the list. The previous implementation of blocking operations was conceived to serve clients in the context of push operations. For for instance: 1) There is a client "A" blocked on list "foo". 2) The client "B" performs `LPUSH foo somevalue`. 3) The client "A" is served in the context of the "B" LPUSH, synchronously. Processing things in a synchronous way was useful as if "A" pushes a value that is served by "B", from the point of view of the database is a NOP (no operation) thing, that is, nothing is replicated, nothing is written in the AOF file, and so forth. However later we implemented two things: 1) Variadic LPUSH that could add multiple values to a list in the context of a single call. 2) BRPOPLPUSH that was a version of BRPOP that also provided a "PUSH" side effect when receiving data. This forced us to make the synchronous implementation more complex. If client "B" is waiting for data, and "A" pushes three elemnents in a single call, we needed to propagate an LPUSH with a missing argument in the AOF and replication link. We also needed to make sure to replicate the LPUSH side of BRPOPLPUSH, but only if in turn did not happened to serve another blocking client into another list ;) This were complex but with a few of mutually recursive functions everything worked as expected... until one day we introduced scripting in Redis. Scripting + synchronous blocking operations = Issue #614. Basically you can't "rewrite" a script to have just a partial effect on the replicas and AOF file if the script happened to serve a few blocked clients. The solution to all this problems, implemented by this commit, is to change the way we serve blocked clients. Instead of serving the blocked clients synchronously, in the context of the command performing the PUSH operation, it is now an asynchronous and iterative process: 1) If a key that has clients blocked waiting for data is the subject of a list push operation, We simply mark keys as "ready" and put it into a queue. 2) Every command pushing stuff on lists, as a variadic LPUSH, a script, or whatever it is, is replicated verbatim without any rewriting. 3) Every time a Redis command, a MULTI/EXEC block, or a script, completed its execution, we run the list of keys ready to serve blocked clients (as more data arrived), and process this list serving the blocked clients. 4) As a result of "3" maybe more keys are ready again for other clients (as a result of BRPOPLPUSH we may have push operations), so we iterate back to step "3" if it's needed. The new code has a much simpler semantics, and a simpler to understand implementation, with the disadvantage of not being able to "optmize out" a PUSH+BPOP as a No OP. This commit will be tested with care before the final merge, more tests will be added likely.
2012-09-04 04:37:49 -04:00
} else {
/* BRPOPLPUSH */
robj *dstobj =
lookupKeyWrite(receiver->db,dstkey);
if (!(dstobj &&
checkType(receiver,dstobj,OBJ_LIST)))
A reimplementation of blocking operation internals. Redis provides support for blocking operations such as BLPOP or BRPOP. This operations are identical to normal LPOP and RPOP operations as long as there are elements in the target list, but if the list is empty they block waiting for new data to arrive to the list. All the clients blocked waiting for th same list are served in a FIFO way, so the first that blocked is the first to be served when there is more data pushed by another client into the list. The previous implementation of blocking operations was conceived to serve clients in the context of push operations. For for instance: 1) There is a client "A" blocked on list "foo". 2) The client "B" performs `LPUSH foo somevalue`. 3) The client "A" is served in the context of the "B" LPUSH, synchronously. Processing things in a synchronous way was useful as if "A" pushes a value that is served by "B", from the point of view of the database is a NOP (no operation) thing, that is, nothing is replicated, nothing is written in the AOF file, and so forth. However later we implemented two things: 1) Variadic LPUSH that could add multiple values to a list in the context of a single call. 2) BRPOPLPUSH that was a version of BRPOP that also provided a "PUSH" side effect when receiving data. This forced us to make the synchronous implementation more complex. If client "B" is waiting for data, and "A" pushes three elemnents in a single call, we needed to propagate an LPUSH with a missing argument in the AOF and replication link. We also needed to make sure to replicate the LPUSH side of BRPOPLPUSH, but only if in turn did not happened to serve another blocking client into another list ;) This were complex but with a few of mutually recursive functions everything worked as expected... until one day we introduced scripting in Redis. Scripting + synchronous blocking operations = Issue #614. Basically you can't "rewrite" a script to have just a partial effect on the replicas and AOF file if the script happened to serve a few blocked clients. The solution to all this problems, implemented by this commit, is to change the way we serve blocked clients. Instead of serving the blocked clients synchronously, in the context of the command performing the PUSH operation, it is now an asynchronous and iterative process: 1) If a key that has clients blocked waiting for data is the subject of a list push operation, We simply mark keys as "ready" and put it into a queue. 2) Every command pushing stuff on lists, as a variadic LPUSH, a script, or whatever it is, is replicated verbatim without any rewriting. 3) Every time a Redis command, a MULTI/EXEC block, or a script, completed its execution, we run the list of keys ready to serve blocked clients (as more data arrived), and process this list serving the blocked clients. 4) As a result of "3" maybe more keys are ready again for other clients (as a result of BRPOPLPUSH we may have push operations), so we iterate back to step "3" if it's needed. The new code has a much simpler semantics, and a simpler to understand implementation, with the disadvantage of not being able to "optmize out" a PUSH+BPOP as a No OP. This commit will be tested with care before the final merge, more tests will be added likely.
2012-09-04 04:37:49 -04:00
{
rpoplpushHandlePush(receiver,dstkey,dstobj,
value);
/* Propagate the RPOPLPUSH operation. */
argv[0] = shared.rpoplpush;
argv[1] = key;
argv[2] = dstkey;
propagate(server.rpoplpushCommand,
A reimplementation of blocking operation internals. Redis provides support for blocking operations such as BLPOP or BRPOP. This operations are identical to normal LPOP and RPOP operations as long as there are elements in the target list, but if the list is empty they block waiting for new data to arrive to the list. All the clients blocked waiting for th same list are served in a FIFO way, so the first that blocked is the first to be served when there is more data pushed by another client into the list. The previous implementation of blocking operations was conceived to serve clients in the context of push operations. For for instance: 1) There is a client "A" blocked on list "foo". 2) The client "B" performs `LPUSH foo somevalue`. 3) The client "A" is served in the context of the "B" LPUSH, synchronously. Processing things in a synchronous way was useful as if "A" pushes a value that is served by "B", from the point of view of the database is a NOP (no operation) thing, that is, nothing is replicated, nothing is written in the AOF file, and so forth. However later we implemented two things: 1) Variadic LPUSH that could add multiple values to a list in the context of a single call. 2) BRPOPLPUSH that was a version of BRPOP that also provided a "PUSH" side effect when receiving data. This forced us to make the synchronous implementation more complex. If client "B" is waiting for data, and "A" pushes three elemnents in a single call, we needed to propagate an LPUSH with a missing argument in the AOF and replication link. We also needed to make sure to replicate the LPUSH side of BRPOPLPUSH, but only if in turn did not happened to serve another blocking client into another list ;) This were complex but with a few of mutually recursive functions everything worked as expected... until one day we introduced scripting in Redis. Scripting + synchronous blocking operations = Issue #614. Basically you can't "rewrite" a script to have just a partial effect on the replicas and AOF file if the script happened to serve a few blocked clients. The solution to all this problems, implemented by this commit, is to change the way we serve blocked clients. Instead of serving the blocked clients synchronously, in the context of the command performing the PUSH operation, it is now an asynchronous and iterative process: 1) If a key that has clients blocked waiting for data is the subject of a list push operation, We simply mark keys as "ready" and put it into a queue. 2) Every command pushing stuff on lists, as a variadic LPUSH, a script, or whatever it is, is replicated verbatim without any rewriting. 3) Every time a Redis command, a MULTI/EXEC block, or a script, completed its execution, we run the list of keys ready to serve blocked clients (as more data arrived), and process this list serving the blocked clients. 4) As a result of "3" maybe more keys are ready again for other clients (as a result of BRPOPLPUSH we may have push operations), so we iterate back to step "3" if it's needed. The new code has a much simpler semantics, and a simpler to understand implementation, with the disadvantage of not being able to "optmize out" a PUSH+BPOP as a No OP. This commit will be tested with care before the final merge, more tests will be added likely.
2012-09-04 04:37:49 -04:00
db->id,argv,3,
2015-07-27 03:41:48 -04:00
PROPAGATE_AOF|
PROPAGATE_REPL);
/* Notify event ("lpush" was notified by rpoplpushHandlePush). */
notifyKeyspaceEvent(NOTIFY_LIST,"rpop",key,receiver->db->id);
} else {
A reimplementation of blocking operation internals. Redis provides support for blocking operations such as BLPOP or BRPOP. This operations are identical to normal LPOP and RPOP operations as long as there are elements in the target list, but if the list is empty they block waiting for new data to arrive to the list. All the clients blocked waiting for th same list are served in a FIFO way, so the first that blocked is the first to be served when there is more data pushed by another client into the list. The previous implementation of blocking operations was conceived to serve clients in the context of push operations. For for instance: 1) There is a client "A" blocked on list "foo". 2) The client "B" performs `LPUSH foo somevalue`. 3) The client "A" is served in the context of the "B" LPUSH, synchronously. Processing things in a synchronous way was useful as if "A" pushes a value that is served by "B", from the point of view of the database is a NOP (no operation) thing, that is, nothing is replicated, nothing is written in the AOF file, and so forth. However later we implemented two things: 1) Variadic LPUSH that could add multiple values to a list in the context of a single call. 2) BRPOPLPUSH that was a version of BRPOP that also provided a "PUSH" side effect when receiving data. This forced us to make the synchronous implementation more complex. If client "B" is waiting for data, and "A" pushes three elemnents in a single call, we needed to propagate an LPUSH with a missing argument in the AOF and replication link. We also needed to make sure to replicate the LPUSH side of BRPOPLPUSH, but only if in turn did not happened to serve another blocking client into another list ;) This were complex but with a few of mutually recursive functions everything worked as expected... until one day we introduced scripting in Redis. Scripting + synchronous blocking operations = Issue #614. Basically you can't "rewrite" a script to have just a partial effect on the replicas and AOF file if the script happened to serve a few blocked clients. The solution to all this problems, implemented by this commit, is to change the way we serve blocked clients. Instead of serving the blocked clients synchronously, in the context of the command performing the PUSH operation, it is now an asynchronous and iterative process: 1) If a key that has clients blocked waiting for data is the subject of a list push operation, We simply mark keys as "ready" and put it into a queue. 2) Every command pushing stuff on lists, as a variadic LPUSH, a script, or whatever it is, is replicated verbatim without any rewriting. 3) Every time a Redis command, a MULTI/EXEC block, or a script, completed its execution, we run the list of keys ready to serve blocked clients (as more data arrived), and process this list serving the blocked clients. 4) As a result of "3" maybe more keys are ready again for other clients (as a result of BRPOPLPUSH we may have push operations), so we iterate back to step "3" if it's needed. The new code has a much simpler semantics, and a simpler to understand implementation, with the disadvantage of not being able to "optmize out" a PUSH+BPOP as a No OP. This commit will be tested with care before the final merge, more tests will be added likely.
2012-09-04 04:37:49 -04:00
/* BRPOPLPUSH failed because of wrong
* destination type. */
return C_ERR;
}
2010-11-08 13:25:59 -05:00
}
return C_OK;
A reimplementation of blocking operation internals. Redis provides support for blocking operations such as BLPOP or BRPOP. This operations are identical to normal LPOP and RPOP operations as long as there are elements in the target list, but if the list is empty they block waiting for new data to arrive to the list. All the clients blocked waiting for th same list are served in a FIFO way, so the first that blocked is the first to be served when there is more data pushed by another client into the list. The previous implementation of blocking operations was conceived to serve clients in the context of push operations. For for instance: 1) There is a client "A" blocked on list "foo". 2) The client "B" performs `LPUSH foo somevalue`. 3) The client "A" is served in the context of the "B" LPUSH, synchronously. Processing things in a synchronous way was useful as if "A" pushes a value that is served by "B", from the point of view of the database is a NOP (no operation) thing, that is, nothing is replicated, nothing is written in the AOF file, and so forth. However later we implemented two things: 1) Variadic LPUSH that could add multiple values to a list in the context of a single call. 2) BRPOPLPUSH that was a version of BRPOP that also provided a "PUSH" side effect when receiving data. This forced us to make the synchronous implementation more complex. If client "B" is waiting for data, and "A" pushes three elemnents in a single call, we needed to propagate an LPUSH with a missing argument in the AOF and replication link. We also needed to make sure to replicate the LPUSH side of BRPOPLPUSH, but only if in turn did not happened to serve another blocking client into another list ;) This were complex but with a few of mutually recursive functions everything worked as expected... until one day we introduced scripting in Redis. Scripting + synchronous blocking operations = Issue #614. Basically you can't "rewrite" a script to have just a partial effect on the replicas and AOF file if the script happened to serve a few blocked clients. The solution to all this problems, implemented by this commit, is to change the way we serve blocked clients. Instead of serving the blocked clients synchronously, in the context of the command performing the PUSH operation, it is now an asynchronous and iterative process: 1) If a key that has clients blocked waiting for data is the subject of a list push operation, We simply mark keys as "ready" and put it into a queue. 2) Every command pushing stuff on lists, as a variadic LPUSH, a script, or whatever it is, is replicated verbatim without any rewriting. 3) Every time a Redis command, a MULTI/EXEC block, or a script, completed its execution, we run the list of keys ready to serve blocked clients (as more data arrived), and process this list serving the blocked clients. 4) As a result of "3" maybe more keys are ready again for other clients (as a result of BRPOPLPUSH we may have push operations), so we iterate back to step "3" if it's needed. The new code has a much simpler semantics, and a simpler to understand implementation, with the disadvantage of not being able to "optmize out" a PUSH+BPOP as a No OP. This commit will be tested with care before the final merge, more tests will be added likely.
2012-09-04 04:37:49 -04:00
}
/* Blocking RPOP/LPOP */
void blockingPopGenericCommand(client *c, int where) {
robj *o;
mstime_t timeout;
int j;
if (getTimeoutFromObjectOrReply(c,c->argv[c->argc-1],&timeout,UNIT_SECONDS)
!= C_OK) return;
for (j = 1; j < c->argc-1; j++) {
o = lookupKeyWrite(c->db,c->argv[j]);
if (o != NULL) {
if (o->type != OBJ_LIST) {
addReply(c,shared.wrongtypeerr);
return;
} else {
if (listTypeLength(o) != 0) {
/* Non empty list, this is like a non normal [LR]POP. */
2015-07-27 03:41:48 -04:00
char *event = (where == LIST_HEAD) ? "lpop" : "rpop";
robj *value = listTypePop(o,where);
2015-07-26 09:29:53 -04:00
serverAssert(value != NULL);
2010-11-08 13:25:59 -05:00
addReplyArrayLen(c,2);
addReplyBulk(c,c->argv[j]);
addReplyBulk(c,value);
decrRefCount(value);
2015-07-27 03:41:48 -04:00
notifyKeyspaceEvent(NOTIFY_LIST,event,
c->argv[j],c->db->id);
if (listTypeLength(o) == 0) {
dbDelete(c->db,c->argv[j]);
2015-07-27 03:41:48 -04:00
notifyKeyspaceEvent(NOTIFY_GENERIC,"del",
c->argv[j],c->db->id);
}
signalModifiedKey(c->db,c->argv[j]);
server.dirty++;
/* Replicate it as an [LR]POP instead of B[LR]POP. */
rewriteClientCommandVector(c,2,
2015-07-27 03:41:48 -04:00
(where == LIST_HEAD) ? shared.lpop : shared.rpop,
c->argv[j]);
return;
}
}
}
}
/* If we are inside a MULTI/EXEC and the list is empty the only thing
* we can do is treating it as a timeout (even with timeout 0). */
2015-07-27 03:41:48 -04:00
if (c->flags & CLIENT_MULTI) {
addReplyNullArray(c);
return;
}
/* If the list is empty or the key does not exists we must block */
blockForKeys(c,BLOCKED_LIST,c->argv + 1,c->argc - 2,timeout,NULL,NULL);
}
void blpopCommand(client *c) {
2015-07-27 03:41:48 -04:00
blockingPopGenericCommand(c,LIST_HEAD);
}
void brpopCommand(client *c) {
2015-07-27 03:41:48 -04:00
blockingPopGenericCommand(c,LIST_TAIL);
}
2010-11-08 13:25:59 -05:00
void brpoplpushCommand(client *c) {
mstime_t timeout;
2010-11-08 13:25:59 -05:00
if (getTimeoutFromObjectOrReply(c,c->argv[3],&timeout,UNIT_SECONDS)
!= C_OK) return;
2010-11-08 18:47:46 -05:00
robj *key = lookupKeyWrite(c->db, c->argv[1]);
if (key == NULL) {
2015-07-27 03:41:48 -04:00
if (c->flags & CLIENT_MULTI) {
/* Blocking against an empty list in a multi state
* returns immediately. */
2018-11-30 03:41:54 -05:00
addReplyNull(c);
2010-11-08 18:47:46 -05:00
} else {
/* The list is empty and the client blocks. */
blockForKeys(c,BLOCKED_LIST,c->argv + 1,1,timeout,c->argv[2],NULL);
2010-11-08 18:47:46 -05:00
}
} else {
if (key->type != OBJ_LIST) {
addReply(c, shared.wrongtypeerr);
} else {
/* The list exists and has elements, so
* the regular rpoplpushCommand is executed. */
2015-07-26 09:29:53 -04:00
serverAssertWithInfo(c,key,listTypeLength(key) > 0);
rpoplpushCommand(c);
}
2010-11-08 18:47:46 -05:00
}
2010-11-08 13:25:59 -05:00
}