CLIENT PAUSE and related API implemented.

The API is one of the bulding blocks of CLUSTER FAILOVER command that
executes a manual failover in Redis Cluster. However exposed as a
command that the user can call directly, it makes much simpler to
upgrade a standalone Redis instance using a slave in a safer way.

The commands works like that:

    CLIENT PAUSE <milliesconds>

All the clients that are not slaves and not in MONITOR state are paused
for the specified number of milliesconds. This means that slaves are
normally served in the meantime.

At the end of the specified amount of time all the clients are unblocked
and will continue operations normally. This command has no effects on
the population of the slow log, since clients are not blocked in the
middle of operations but only when there is to process new data.

Note that while the clients are unblocked, still new commands are
accepted and queued in the client buffer, so clients will likely not
block while writing to the server while the pause is active.
This commit is contained in:
antirez 2014-02-04 15:52:09 +01:00
parent b089ba98cc
commit 4919a13f50
3 changed files with 64 additions and 0 deletions

View File

@ -1086,6 +1086,9 @@ int processMultibulkBuffer(redisClient *c) {
void processInputBuffer(redisClient *c) { void processInputBuffer(redisClient *c) {
/* Keep processing while there is something in the input buffer */ /* Keep processing while there is something in the input buffer */
while(sdslen(c->querybuf)) { while(sdslen(c->querybuf)) {
/* Return if clients are paused. */
if (!(c->flags & REDIS_SLAVE) && clientsArePaused()) return;
/* Immediately abort if the client is in the middle of something. */ /* Immediately abort if the client is in the middle of something. */
if (c->flags & REDIS_BLOCKED) return; if (c->flags & REDIS_BLOCKED) return;
@ -1373,6 +1376,13 @@ void clientCommand(redisClient *c) {
addReplyBulk(c,c->name); addReplyBulk(c,c->name);
else else
addReply(c,shared.nullbulk); addReply(c,shared.nullbulk);
} else if (!strcasecmp(c->argv[1]->ptr,"pause") && c->argc == 3) {
long long duration;
if (getTimeoutFromObjectOrReply(c,c->argv[2],&duration,UNIT_MILLISECONDS)
!= REDIS_OK) return;
pauseClients(duration);
addReply(c,shared.ok);
} else { } else {
addReplyError(c, "Syntax error, try CLIENT (LIST | KILL ip:port | GETNAME | SETNAME connection-name)"); addReplyError(c, "Syntax error, try CLIENT (LIST | KILL ip:port | GETNAME | SETNAME connection-name)");
} }
@ -1555,3 +1565,49 @@ void flushSlavesOutputBuffers(void) {
} }
} }
} }
/* Pause clients up to the specified unixtime (in ms). While clients
* are paused no command is processed from clients, so the data set can't
* change during that time.
*
* However while this function pauses normal and Pub/Sub clients, slaves are
* still served, so this function can be used on server upgrades where it is
* required that slaves process the latest bytes from the replication stream
* before being turned to masters.
*
* This function is also internally used by Redis Cluster for the manual
* failover procedure implemented by CLUSTER FAILOVER.
*
* The function always succeed, even if there is already a pause in progress.
* In such a case, the pause is extended if the duration is more than the
* time left for the previous duration. However if the duration is smaller
* than the time left for the previous pause, no change is made to the
* left duration. */
void pauseClients(mstime_t end) {
if (!server.clients_paused || end > server.clients_pause_end_time)
server.clients_pause_end_time = end;
server.clients_paused = 1;
}
/* Return non-zero if clients are currently paused. As a side effect the
* function checks if the pause time was reached and clear it. */
int clientsArePaused(void) {
if (server.clients_paused && server.clients_pause_end_time < server.mstime) {
listNode *ln;
listIter li;
redisClient *c;
server.clients_paused = 0;
/* Put all the clients in the unblocked clients queue in order to
* force the re-processing of the input buffer if any. */
listRewind(server.clients,&li);
while ((ln = listNext(&li)) != NULL) {
c = listNodeValue(ln);
if (c->flags & REDIS_SLAVE) continue;
listAddNodeTail(server.unblocked_clients,c);
}
}
return server.clients_paused;
}

View File

@ -1174,6 +1174,9 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
/* Close clients that need to be closed asynchronous */ /* Close clients that need to be closed asynchronous */
freeClientsInAsyncFreeQueue(); freeClientsInAsyncFreeQueue();
/* Clear the paused clients flag if needed. */
clientsArePaused(); /* Don't check return value, just use the side effect. */
/* Replication cron function -- used to reconnect to master and /* Replication cron function -- used to reconnect to master and
* to detect transfer failures. */ * to detect transfer failures. */
run_with_period(1000) replicationCron(); run_with_period(1000) replicationCron();
@ -1594,6 +1597,7 @@ void initServer() {
server.ready_keys = listCreate(); server.ready_keys = listCreate();
server.clients_waiting_acks = listCreate(); server.clients_waiting_acks = listCreate();
server.get_ack_from_slaves = 0; server.get_ack_from_slaves = 0;
server.clients_paused = 0;
createSharedObjects(); createSharedObjects();
adjustOpenFilesLimit(); adjustOpenFilesLimit();

View File

@ -623,6 +623,8 @@ struct redisServer {
list *clients_to_close; /* Clients to close asynchronously */ list *clients_to_close; /* Clients to close asynchronously */
list *slaves, *monitors; /* List of slaves and MONITORs */ list *slaves, *monitors; /* List of slaves and MONITORs */
redisClient *current_client; /* Current client, only used on crash report */ redisClient *current_client; /* Current client, only used on crash report */
int clients_paused; /* True if clients are currently paused */
mstime_t clients_pause_end_time; /* Time when we undo clients_paused */
char neterr[ANET_ERR_LEN]; /* Error buffer for anet.c */ char neterr[ANET_ERR_LEN]; /* Error buffer for anet.c */
dict *migrate_cached_sockets;/* MIGRATE cached sockets */ dict *migrate_cached_sockets;/* MIGRATE cached sockets */
/* RDB / AOF loading information */ /* RDB / AOF loading information */
@ -972,6 +974,8 @@ char *getClientLimitClassName(int class);
void flushSlavesOutputBuffers(void); void flushSlavesOutputBuffers(void);
void disconnectSlaves(void); void disconnectSlaves(void);
int listenToPort(int port, int *fds, int *count); int listenToPort(int port, int *fds, int *count);
void pauseClients(mstime_t duration);
int clientsArePaused(void);
#ifdef __GNUC__ #ifdef __GNUC__
void addReplyErrorFormat(redisClient *c, const char *fmt, ...) void addReplyErrorFormat(redisClient *c, const char *fmt, ...)