Process events with processEventsWhileBlocked() when blocked.

When we are blocked and a few events a processed from time to time, it
is smarter to call the event handler a few times in order to handle the
accept, read, write, close cycle of a client in a single pass, otherwise
there is too much latency added for clients to receive a reply while the
server is busy in some way (for example during the DB loading).
This commit is contained in:
antirez 2014-04-24 17:36:47 +02:00
parent 3a3458ee7b
commit e29d330724
5 changed files with 27 additions and 4 deletions

View File

@ -560,7 +560,7 @@ int loadAppendOnlyFile(char *filename) {
/* Serve the clients from time to time */
if (!(loops++ % 1000)) {
loadingProgress(ftello(fp));
aeProcessEvents(server.el, AE_FILE_EVENTS|AE_DONT_WAIT);
processEventsWhileBlocked();
}
if (fgets(buf,sizeof(buf),fp) == NULL) {

View File

@ -1621,3 +1621,26 @@ int clientsArePaused(void) {
}
return server.clients_paused;
}
/* This function is called by Redis in order to process a few events from
* time to time while blocked into some not interruptible operation.
* This allows to reply to clients with the -LOADING error while loading the
* data set at startup or after a full resynchronization with the master
* and so forth.
*
* It calls the event loop in order to process a few events. Specifically we
* try to call the event loop for times as long as we receive acknowledge that
* some event was processed, in order to go forward with the accept, read,
* write, close sequence needed to serve a client.
*
* The function returns the total number of events processed. */
int processEventsWhileBlocked(void) {
int iterations = 4; /* See the function top-comment. */
int count = 0;
while (iterations--) {
int events = aeProcessEvents(server.el, AE_FILE_EVENTS|AE_DONT_WAIT);
if (!events) break;
count += events;
}
return count;
}

View File

@ -1072,7 +1072,7 @@ void rdbLoadProgressCallback(rio *r, const void *buf, size_t len) {
if (server.masterhost && server.repl_state == REDIS_REPL_TRANSFER)
replicationSendNewlineToMaster();
loadingProgress(r->processed_bytes);
aeProcessEvents(server.el, AE_FILE_EVENTS|AE_DONT_WAIT);
processEventsWhileBlocked();
}
}

View File

@ -1006,6 +1006,7 @@ void disconnectSlaves(void);
int listenToPort(int port, int *fds, int *count);
void pauseClients(mstime_t duration);
int clientsArePaused(void);
int processEventsWhileBlocked(void);
#ifdef __GNUC__
void addReplyErrorFormat(redisClient *c, const char *fmt, ...)

View File

@ -452,8 +452,7 @@ void luaMaskCountHook(lua_State *lua, lua_Debug *ar) {
* here when the EVAL command will return. */
aeDeleteFileEvent(server.el, server.lua_caller->fd, AE_READABLE);
}
if (server.lua_timedout)
aeProcessEvents(server.el, AE_FILE_EVENTS|AE_DONT_WAIT);
if (server.lua_timedout) processEventsWhileBlocked();
if (server.lua_kill) {
redisLog(REDIS_WARNING,"Lua script killed by user with SCRIPT KILL.");
lua_pushstring(lua,"Script killed by user with SCRIPT KILL...");