mirror of
https://codeberg.org/redict/redict.git
synced 2025-01-22 16:18:28 -05:00
Accept multiple clients per iteration.
When the listening sockets readable event is fired, we have the chance to accept multiple clients instead of accepting a single one. This makes Redis more responsive when there is a mass-connect event (for example after the server startup), and in workloads where a connect-disconnect pattern is used often, so that multiple clients are waiting to be accepted continuously. As a side effect, this commit makes the LOADING, BUSY, and similar errors much faster to deliver to the client, making Redis more responsive when there is to return errors to inform the clients that the server is blocked in an not interruptible operation.
This commit is contained in:
parent
cac4bae11a
commit
3a3458ee7b
@ -551,6 +551,7 @@ void copyClientOutputBuffer(redisClient *dst, redisClient *src) {
|
||||
dst->reply_bytes = src->reply_bytes;
|
||||
}
|
||||
|
||||
#define MAX_ACCEPTS_PER_CALL 1000
|
||||
static void acceptCommonHandler(int fd, int flags) {
|
||||
redisClient *c;
|
||||
if ((c = createClient(fd)) == NULL) {
|
||||
@ -580,37 +581,44 @@ static void acceptCommonHandler(int fd, int flags) {
|
||||
}
|
||||
|
||||
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
|
||||
int cport, cfd;
|
||||
int cport, cfd, max = MAX_ACCEPTS_PER_CALL;
|
||||
char cip[REDIS_IP_STR_LEN];
|
||||
REDIS_NOTUSED(el);
|
||||
REDIS_NOTUSED(mask);
|
||||
REDIS_NOTUSED(privdata);
|
||||
|
||||
cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
|
||||
if (cfd == ANET_ERR) {
|
||||
redisLog(REDIS_WARNING,"Accepting client connection: %s", server.neterr);
|
||||
return;
|
||||
while(max--) {
|
||||
cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
|
||||
if (cfd == ANET_ERR) {
|
||||
if (errno != EWOULDBLOCK)
|
||||
redisLog(REDIS_WARNING,
|
||||
"Accepting client connection: %s", server.neterr);
|
||||
return;
|
||||
}
|
||||
redisLog(REDIS_VERBOSE,"Accepted %s:%d", cip, cport);
|
||||
acceptCommonHandler(cfd,0);
|
||||
}
|
||||
redisLog(REDIS_VERBOSE,"Accepted %s:%d", cip, cport);
|
||||
acceptCommonHandler(cfd,0);
|
||||
}
|
||||
|
||||
void acceptUnixHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
|
||||
int cfd;
|
||||
int cfd, max = MAX_ACCEPTS_PER_CALL;
|
||||
REDIS_NOTUSED(el);
|
||||
REDIS_NOTUSED(mask);
|
||||
REDIS_NOTUSED(privdata);
|
||||
|
||||
cfd = anetUnixAccept(server.neterr, fd);
|
||||
if (cfd == ANET_ERR) {
|
||||
redisLog(REDIS_WARNING,"Accepting client connection: %s", server.neterr);
|
||||
return;
|
||||
while(max--) {
|
||||
cfd = anetUnixAccept(server.neterr, fd);
|
||||
if (cfd == ANET_ERR) {
|
||||
if (errno != EWOULDBLOCK)
|
||||
redisLog(REDIS_WARNING,
|
||||
"Accepting client connection: %s", server.neterr);
|
||||
return;
|
||||
}
|
||||
redisLog(REDIS_VERBOSE,"Accepted connection to %s", server.unixsocket);
|
||||
acceptCommonHandler(cfd,REDIS_UNIX_SOCKET);
|
||||
}
|
||||
redisLog(REDIS_VERBOSE,"Accepted connection to %s", server.unixsocket);
|
||||
acceptCommonHandler(cfd,REDIS_UNIX_SOCKET);
|
||||
}
|
||||
|
||||
|
||||
static void freeClientArgv(redisClient *c) {
|
||||
int j;
|
||||
for (j = 0; j < c->argc; j++)
|
||||
|
12
src/redis.c
12
src/redis.c
@ -1615,10 +1615,16 @@ int listenToPort(int port, int *fds, int *count) {
|
||||
* server.bindaddr_count == 0. */
|
||||
fds[*count] = anetTcp6Server(server.neterr,port,NULL,
|
||||
server.tcp_backlog);
|
||||
if (fds[*count] != ANET_ERR) (*count)++;
|
||||
if (fds[*count] != ANET_ERR) {
|
||||
anetNonBlock(NULL,fds[*count]);
|
||||
(*count)++;
|
||||
}
|
||||
fds[*count] = anetTcpServer(server.neterr,port,NULL,
|
||||
server.tcp_backlog);
|
||||
if (fds[*count] != ANET_ERR) (*count)++;
|
||||
if (fds[*count] != ANET_ERR) {
|
||||
anetNonBlock(NULL,fds[*count]);
|
||||
(*count)++;
|
||||
}
|
||||
/* Exit the loop if we were able to bind * on IPv4 or IPv6,
|
||||
* otherwise fds[*count] will be ANET_ERR and we'll print an
|
||||
* error and return to the caller with an error. */
|
||||
@ -1639,6 +1645,7 @@ int listenToPort(int port, int *fds, int *count) {
|
||||
port, server.neterr);
|
||||
return REDIS_ERR;
|
||||
}
|
||||
anetNonBlock(NULL,fds[*count]);
|
||||
(*count)++;
|
||||
}
|
||||
return REDIS_OK;
|
||||
@ -1708,6 +1715,7 @@ void initServer() {
|
||||
redisLog(REDIS_WARNING, "Opening socket: %s", server.neterr);
|
||||
exit(1);
|
||||
}
|
||||
anetNonBlock(NULL,server.sofd);
|
||||
}
|
||||
|
||||
/* Abort if there are no listening sockets at all. */
|
||||
|
Loading…
Reference in New Issue
Block a user