2013-12-03 11:43:53 -05:00
|
|
|
/* blocked.c - generic support for blocking operations like BLPOP & WAIT.
|
|
|
|
*
|
|
|
|
* Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
|
|
|
|
* All rights reserved.
|
|
|
|
*
|
|
|
|
* Redistribution and use in source and binary forms, with or without
|
|
|
|
* modification, are permitted provided that the following conditions are met:
|
|
|
|
*
|
|
|
|
* * Redistributions of source code must retain the above copyright notice,
|
|
|
|
* this list of conditions and the following disclaimer.
|
|
|
|
* * Redistributions in binary form must reproduce the above copyright
|
|
|
|
* notice, this list of conditions and the following disclaimer in the
|
|
|
|
* documentation and/or other materials provided with the distribution.
|
|
|
|
* * Neither the name of Redis nor the names of its contributors may be used
|
|
|
|
* to endorse or promote products derived from this software without
|
|
|
|
* specific prior written permission.
|
|
|
|
*
|
|
|
|
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
|
|
|
|
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
|
|
|
|
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
|
|
|
|
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
|
|
|
|
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
|
|
|
|
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
|
|
|
|
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
|
|
|
|
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
|
|
|
|
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
|
|
|
|
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
|
|
|
|
* POSSIBILITY OF SUCH DAMAGE.
|
2013-12-03 12:03:15 -05:00
|
|
|
*
|
|
|
|
* ---------------------------------------------------------------------------
|
|
|
|
*
|
|
|
|
* API:
|
|
|
|
*
|
|
|
|
* getTimeoutFromObjectOrReply() is just an utility function to parse a
|
|
|
|
* timeout argument since blocking operations usually require a timeout.
|
|
|
|
*
|
|
|
|
* blockClient() set the REDIS_BLOCKED flag in the client, and set the
|
|
|
|
* specified block type 'btype' filed to one of REDIS_BLOCKED_* macros.
|
|
|
|
*
|
|
|
|
* unblockClient() unblocks the client doing the following:
|
|
|
|
* 1) It calls the btype-specific function to cleanup the state.
|
|
|
|
* 2) It unblocks the client by unsetting the REDIS_BLOCKED flag.
|
|
|
|
* 3) It puts the client into a list of just unblocked clients that are
|
|
|
|
* processed ASAP in the beforeSleep() event loop callback, so that
|
|
|
|
* if there is some query buffer to process, we do it. This is also
|
|
|
|
* required because otherwise there is no 'readable' event fired, we
|
|
|
|
* already read the pending commands. We also set the REDIS_UNBLOCKED
|
|
|
|
* flag to remember the client is in the unblocked_clients list.
|
|
|
|
*
|
|
|
|
* processUnblockedClients() is called inside the beforeSleep() function
|
|
|
|
* to process the query buffer from unblocked clients and remove the clients
|
|
|
|
* from the blocked_clients queue.
|
|
|
|
*
|
|
|
|
* replyToBlockedClientTimedOut() is called by the cron function when
|
|
|
|
* a client blocked reaches the specified timeout (if the timeout is set
|
|
|
|
* to 0, no timeout is processed).
|
|
|
|
* It usually just needs to send a reply to the client.
|
|
|
|
*
|
|
|
|
* When implementing a new type of blocking opeation, the implementation
|
|
|
|
* should modify unblockClient() and replyToBlockedClientTimedOut() in order
|
|
|
|
* to handle the btype-specific behavior of this two functions.
|
2015-03-24 06:07:10 -04:00
|
|
|
* If the blocking operation waits for certain keys to change state, the
|
|
|
|
* clusterRedirectBlockedClientIfNeeded() function should also be updated.
|
2013-12-03 11:43:53 -05:00
|
|
|
*/
|
|
|
|
|
2015-07-26 09:14:57 -04:00
|
|
|
#include "server.h"
|
2013-12-03 11:43:53 -05:00
|
|
|
|
|
|
|
/* Get a timeout value from an object and store it into 'timeout'.
|
|
|
|
* The final timeout is always stored as milliseconds as a time where the
|
|
|
|
* timeout will expire, however the parsing is performed according to
|
|
|
|
* the 'unit' that can be seconds or milliseconds.
|
|
|
|
*
|
|
|
|
* Note that if the timeout is zero (usually from the point of view of
|
|
|
|
* commands API this means no timeout) the value stored into 'timeout'
|
|
|
|
* is zero. */
|
2015-07-26 09:20:46 -04:00
|
|
|
int getTimeoutFromObjectOrReply(client *c, robj *object, mstime_t *timeout, int unit) {
|
2013-12-03 11:43:53 -05:00
|
|
|
long long tval;
|
|
|
|
|
|
|
|
if (getLongLongFromObjectOrReply(c,object,&tval,
|
2015-07-26 17:17:55 -04:00
|
|
|
"timeout is not an integer or out of range") != C_OK)
|
|
|
|
return C_ERR;
|
2013-12-03 11:43:53 -05:00
|
|
|
|
|
|
|
if (tval < 0) {
|
|
|
|
addReplyError(c,"timeout is negative");
|
2015-07-26 17:17:55 -04:00
|
|
|
return C_ERR;
|
2013-12-03 11:43:53 -05:00
|
|
|
}
|
|
|
|
|
|
|
|
if (tval > 0) {
|
|
|
|
if (unit == UNIT_SECONDS) tval *= 1000;
|
|
|
|
tval += mstime();
|
|
|
|
}
|
|
|
|
*timeout = tval;
|
|
|
|
|
2015-07-26 17:17:55 -04:00
|
|
|
return C_OK;
|
2013-12-03 11:43:53 -05:00
|
|
|
}
|
|
|
|
|
|
|
|
/* Block a client for the specific operation type. Once the REDIS_BLOCKED
|
|
|
|
* flag is set client query buffer is not longer processed, but accumulated,
|
|
|
|
* and will be processed when the client is unblocked. */
|
2015-07-26 09:20:46 -04:00
|
|
|
void blockClient(client *c, int btype) {
|
2013-12-03 11:43:53 -05:00
|
|
|
c->flags |= REDIS_BLOCKED;
|
|
|
|
c->btype = btype;
|
|
|
|
server.bpop_blocked_clients++;
|
|
|
|
}
|
|
|
|
|
|
|
|
/* This function is called in the beforeSleep() function of the event loop
|
|
|
|
* in order to process the pending input buffer of clients that were
|
|
|
|
* unblocked after a blocking operation. */
|
|
|
|
void processUnblockedClients(void) {
|
|
|
|
listNode *ln;
|
2015-07-26 09:20:46 -04:00
|
|
|
client *c;
|
2013-12-03 11:43:53 -05:00
|
|
|
|
|
|
|
while (listLength(server.unblocked_clients)) {
|
|
|
|
ln = listFirst(server.unblocked_clients);
|
2015-07-26 09:29:53 -04:00
|
|
|
serverAssert(ln != NULL);
|
2013-12-03 11:43:53 -05:00
|
|
|
c = ln->value;
|
|
|
|
listDelNode(server.unblocked_clients,ln);
|
|
|
|
c->flags &= ~REDIS_UNBLOCKED;
|
|
|
|
|
2015-05-05 10:35:44 -04:00
|
|
|
/* Process remaining data in the input buffer, unless the client
|
|
|
|
* is blocked again. Actually processInputBuffer() checks that the
|
|
|
|
* client is not blocked before to proceed, but things may change and
|
|
|
|
* the code is conceptually more correct this way. */
|
2015-05-05 10:36:35 -04:00
|
|
|
if (!(c->flags & REDIS_BLOCKED)) {
|
2015-05-05 10:35:44 -04:00
|
|
|
if (c->querybuf && sdslen(c->querybuf) > 0) {
|
|
|
|
processInputBuffer(c);
|
|
|
|
}
|
2013-12-03 11:43:53 -05:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/* Unblock a client calling the right function depending on the kind
|
|
|
|
* of operation the client is blocking for. */
|
2015-07-26 09:20:46 -04:00
|
|
|
void unblockClient(client *c) {
|
2013-12-03 11:43:53 -05:00
|
|
|
if (c->btype == REDIS_BLOCKED_LIST) {
|
|
|
|
unblockClientWaitingData(c);
|
2013-12-04 09:52:20 -05:00
|
|
|
} else if (c->btype == REDIS_BLOCKED_WAIT) {
|
|
|
|
unblockClientWaitingReplicas(c);
|
2013-12-03 11:43:53 -05:00
|
|
|
} else {
|
|
|
|
redisPanic("Unknown btype in unblockClient().");
|
|
|
|
}
|
|
|
|
/* Clear the flags, and put the client in the unblocked list so that
|
|
|
|
* we'll process new commands in its query buffer ASAP. */
|
|
|
|
c->flags &= ~REDIS_BLOCKED;
|
|
|
|
c->btype = REDIS_BLOCKED_NONE;
|
|
|
|
server.bpop_blocked_clients--;
|
2015-05-05 10:32:53 -04:00
|
|
|
/* The client may already be into the unblocked list because of a previous
|
|
|
|
* blocking operation, don't add back it into the list multiple times. */
|
|
|
|
if (!(c->flags & REDIS_UNBLOCKED)) {
|
|
|
|
c->flags |= REDIS_UNBLOCKED;
|
|
|
|
listAddNodeTail(server.unblocked_clients,c);
|
|
|
|
}
|
2013-12-03 11:43:53 -05:00
|
|
|
}
|
|
|
|
|
|
|
|
/* This function gets called when a blocked client timed out in order to
|
|
|
|
* send it a reply of some kind. */
|
2015-07-26 09:20:46 -04:00
|
|
|
void replyToBlockedClientTimedOut(client *c) {
|
2013-12-03 11:43:53 -05:00
|
|
|
if (c->btype == REDIS_BLOCKED_LIST) {
|
|
|
|
addReply(c,shared.nullmultibulk);
|
2013-12-04 09:52:20 -05:00
|
|
|
} else if (c->btype == REDIS_BLOCKED_WAIT) {
|
|
|
|
addReplyLongLong(c,replicationCountAcksByOffset(c->bpop.reploffset));
|
2013-12-03 11:43:53 -05:00
|
|
|
} else {
|
|
|
|
redisPanic("Unknown btype in replyToBlockedClientTimedOut().");
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
Replication: disconnect blocked clients when switching to slave role.
Bug as old as Redis and blocking operations. It's hard to trigger since
only happens on instance role switch, but the results are quite bad
since an inconsistency between master and slave is created.
How to trigger the bug is a good description of the bug itself.
1. Client does "BLPOP mylist 0" in master.
2. Master is turned into slave, that replicates from New-Master.
3. Client does "LPUSH mylist foo" in New-Master.
4. New-Master propagates write to slave.
5. Slave receives the LPUSH, the blocked client get served.
Now Master "mylist" key has "foo", Slave "mylist" key is empty.
Highlights:
* At step "2" above, the client remains attached, basically escaping any
check performed during command dispatch: read only slave, in that case.
* At step "5" the slave (that was the master), serves the blocked client
consuming a list element, which is not consumed on the master side.
This scenario is technically likely to happen during failovers, however
since Redis Sentinel already disconnects clients using the CLIENT
command when changing the role of the instance, the bug is avoided in
Sentinel deployments.
Closes #2473.
2015-03-24 11:00:09 -04:00
|
|
|
/* Mass-unblock clients because something changed in the instance that makes
|
|
|
|
* blocking no longer safe. For example clients blocked in list operations
|
|
|
|
* in an instance which turns from master to slave is unsafe, so this function
|
|
|
|
* is called when a master turns into a slave.
|
|
|
|
*
|
|
|
|
* The semantics is to send an -UNBLOCKED error to the client, disconnecting
|
|
|
|
* it at the same time. */
|
|
|
|
void disconnectAllBlockedClients(void) {
|
|
|
|
listNode *ln;
|
|
|
|
listIter li;
|
|
|
|
|
|
|
|
listRewind(server.clients,&li);
|
|
|
|
while((ln = listNext(&li))) {
|
2015-07-26 09:20:46 -04:00
|
|
|
client *c = listNodeValue(ln);
|
Replication: disconnect blocked clients when switching to slave role.
Bug as old as Redis and blocking operations. It's hard to trigger since
only happens on instance role switch, but the results are quite bad
since an inconsistency between master and slave is created.
How to trigger the bug is a good description of the bug itself.
1. Client does "BLPOP mylist 0" in master.
2. Master is turned into slave, that replicates from New-Master.
3. Client does "LPUSH mylist foo" in New-Master.
4. New-Master propagates write to slave.
5. Slave receives the LPUSH, the blocked client get served.
Now Master "mylist" key has "foo", Slave "mylist" key is empty.
Highlights:
* At step "2" above, the client remains attached, basically escaping any
check performed during command dispatch: read only slave, in that case.
* At step "5" the slave (that was the master), serves the blocked client
consuming a list element, which is not consumed on the master side.
This scenario is technically likely to happen during failovers, however
since Redis Sentinel already disconnects clients using the CLIENT
command when changing the role of the instance, the bug is avoided in
Sentinel deployments.
Closes #2473.
2015-03-24 11:00:09 -04:00
|
|
|
|
|
|
|
if (c->flags & REDIS_BLOCKED) {
|
|
|
|
addReplySds(c,sdsnew(
|
|
|
|
"-UNBLOCKED force unblock from blocking operation, "
|
|
|
|
"instance state changed (master -> slave?)\r\n"));
|
|
|
|
unblockClient(c);
|
|
|
|
c->flags |= REDIS_CLOSE_AFTER_REPLY;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|