mirror of
https://codeberg.org/redict/redict.git
synced 2025-01-23 16:48:27 -05:00
191 lines
7.8 KiB
C
191 lines
7.8 KiB
C
/* blocked.c - generic support for blocking operations like BLPOP & WAIT.
|
|
*
|
|
* Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
|
|
* All rights reserved.
|
|
*
|
|
* Redistribution and use in source and binary forms, with or without
|
|
* modification, are permitted provided that the following conditions are met:
|
|
*
|
|
* * Redistributions of source code must retain the above copyright notice,
|
|
* this list of conditions and the following disclaimer.
|
|
* * Redistributions in binary form must reproduce the above copyright
|
|
* notice, this list of conditions and the following disclaimer in the
|
|
* documentation and/or other materials provided with the distribution.
|
|
* * Neither the name of Redis nor the names of its contributors may be used
|
|
* to endorse or promote products derived from this software without
|
|
* specific prior written permission.
|
|
*
|
|
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
|
|
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
|
|
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
|
|
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
|
|
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
|
|
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
|
|
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
|
|
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
|
|
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
|
|
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
|
|
* POSSIBILITY OF SUCH DAMAGE.
|
|
*
|
|
* ---------------------------------------------------------------------------
|
|
*
|
|
* API:
|
|
*
|
|
* getTimeoutFromObjectOrReply() is just an utility function to parse a
|
|
* timeout argument since blocking operations usually require a timeout.
|
|
*
|
|
* blockClient() set the CLIENT_BLOCKED flag in the client, and set the
|
|
* specified block type 'btype' filed to one of BLOCKED_* macros.
|
|
*
|
|
* unblockClient() unblocks the client doing the following:
|
|
* 1) It calls the btype-specific function to cleanup the state.
|
|
* 2) It unblocks the client by unsetting the CLIENT_BLOCKED flag.
|
|
* 3) It puts the client into a list of just unblocked clients that are
|
|
* processed ASAP in the beforeSleep() event loop callback, so that
|
|
* if there is some query buffer to process, we do it. This is also
|
|
* required because otherwise there is no 'readable' event fired, we
|
|
* already read the pending commands. We also set the CLIENT_UNBLOCKED
|
|
* flag to remember the client is in the unblocked_clients list.
|
|
*
|
|
* processUnblockedClients() is called inside the beforeSleep() function
|
|
* to process the query buffer from unblocked clients and remove the clients
|
|
* from the blocked_clients queue.
|
|
*
|
|
* replyToBlockedClientTimedOut() is called by the cron function when
|
|
* a client blocked reaches the specified timeout (if the timeout is set
|
|
* to 0, no timeout is processed).
|
|
* It usually just needs to send a reply to the client.
|
|
*
|
|
* When implementing a new type of blocking opeation, the implementation
|
|
* should modify unblockClient() and replyToBlockedClientTimedOut() in order
|
|
* to handle the btype-specific behavior of this two functions.
|
|
* If the blocking operation waits for certain keys to change state, the
|
|
* clusterRedirectBlockedClientIfNeeded() function should also be updated.
|
|
*/
|
|
|
|
#include "server.h"
|
|
|
|
/* Get a timeout value from an object and store it into 'timeout'.
|
|
* The final timeout is always stored as milliseconds as a time where the
|
|
* timeout will expire, however the parsing is performed according to
|
|
* the 'unit' that can be seconds or milliseconds.
|
|
*
|
|
* Note that if the timeout is zero (usually from the point of view of
|
|
* commands API this means no timeout) the value stored into 'timeout'
|
|
* is zero. */
|
|
int getTimeoutFromObjectOrReply(client *c, robj *object, mstime_t *timeout, int unit) {
|
|
long long tval;
|
|
|
|
if (getLongLongFromObjectOrReply(c,object,&tval,
|
|
"timeout is not an integer or out of range") != C_OK)
|
|
return C_ERR;
|
|
|
|
if (tval < 0) {
|
|
addReplyError(c,"timeout is negative");
|
|
return C_ERR;
|
|
}
|
|
|
|
if (tval > 0) {
|
|
if (unit == UNIT_SECONDS) tval *= 1000;
|
|
tval += mstime();
|
|
}
|
|
*timeout = tval;
|
|
|
|
return C_OK;
|
|
}
|
|
|
|
/* Block a client for the specific operation type. Once the CLIENT_BLOCKED
|
|
* flag is set client query buffer is not longer processed, but accumulated,
|
|
* and will be processed when the client is unblocked. */
|
|
void blockClient(client *c, int btype) {
|
|
c->flags |= CLIENT_BLOCKED;
|
|
c->btype = btype;
|
|
server.bpop_blocked_clients++;
|
|
}
|
|
|
|
/* This function is called in the beforeSleep() function of the event loop
|
|
* in order to process the pending input buffer of clients that were
|
|
* unblocked after a blocking operation. */
|
|
void processUnblockedClients(void) {
|
|
listNode *ln;
|
|
client *c;
|
|
|
|
while (listLength(server.unblocked_clients)) {
|
|
ln = listFirst(server.unblocked_clients);
|
|
serverAssert(ln != NULL);
|
|
c = ln->value;
|
|
listDelNode(server.unblocked_clients,ln);
|
|
c->flags &= ~CLIENT_UNBLOCKED;
|
|
|
|
/* Process remaining data in the input buffer, unless the client
|
|
* is blocked again. Actually processInputBuffer() checks that the
|
|
* client is not blocked before to proceed, but things may change and
|
|
* the code is conceptually more correct this way. */
|
|
if (!(c->flags & CLIENT_BLOCKED)) {
|
|
if (c->querybuf && sdslen(c->querybuf) > 0) {
|
|
processInputBuffer(c);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
/* Unblock a client calling the right function depending on the kind
|
|
* of operation the client is blocking for. */
|
|
void unblockClient(client *c) {
|
|
if (c->btype == BLOCKED_LIST) {
|
|
unblockClientWaitingData(c);
|
|
} else if (c->btype == BLOCKED_WAIT) {
|
|
unblockClientWaitingReplicas(c);
|
|
} else {
|
|
serverPanic("Unknown btype in unblockClient().");
|
|
}
|
|
/* Clear the flags, and put the client in the unblocked list so that
|
|
* we'll process new commands in its query buffer ASAP. */
|
|
c->flags &= ~CLIENT_BLOCKED;
|
|
c->btype = BLOCKED_NONE;
|
|
server.bpop_blocked_clients--;
|
|
/* The client may already be into the unblocked list because of a previous
|
|
* blocking operation, don't add back it into the list multiple times. */
|
|
if (!(c->flags & CLIENT_UNBLOCKED)) {
|
|
c->flags |= CLIENT_UNBLOCKED;
|
|
listAddNodeTail(server.unblocked_clients,c);
|
|
}
|
|
}
|
|
|
|
/* This function gets called when a blocked client timed out in order to
|
|
* send it a reply of some kind. */
|
|
void replyToBlockedClientTimedOut(client *c) {
|
|
if (c->btype == BLOCKED_LIST) {
|
|
addReply(c,shared.nullmultibulk);
|
|
} else if (c->btype == BLOCKED_WAIT) {
|
|
addReplyLongLong(c,replicationCountAcksByOffset(c->bpop.reploffset));
|
|
} else {
|
|
serverPanic("Unknown btype in replyToBlockedClientTimedOut().");
|
|
}
|
|
}
|
|
|
|
/* Mass-unblock clients because something changed in the instance that makes
|
|
* blocking no longer safe. For example clients blocked in list operations
|
|
* in an instance which turns from master to slave is unsafe, so this function
|
|
* is called when a master turns into a slave.
|
|
*
|
|
* The semantics is to send an -UNBLOCKED error to the client, disconnecting
|
|
* it at the same time. */
|
|
void disconnectAllBlockedClients(void) {
|
|
listNode *ln;
|
|
listIter li;
|
|
|
|
listRewind(server.clients,&li);
|
|
while((ln = listNext(&li))) {
|
|
client *c = listNodeValue(ln);
|
|
|
|
if (c->flags & CLIENT_BLOCKED) {
|
|
addReplySds(c,sdsnew(
|
|
"-UNBLOCKED force unblock from blocking operation, "
|
|
"instance state changed (master -> slave?)\r\n"));
|
|
unblockClient(c);
|
|
c->flags |= CLIENT_CLOSE_AFTER_REPLY;
|
|
}
|
|
}
|
|
}
|