Add support for listen(2) backlog definition

In high RPS environments, the default listen backlog is not sufficient, so
giving users the power to configure it is the right approach, especially
since it requires only minor modifications to the code.
This commit is contained in:
Nenad Merdanovic 2013-11-08 20:55:48 +01:00 committed by antirez
parent b8bfbf46c5
commit d76aa96d1a
6 changed files with 31 additions and 21 deletions

View File

@ -44,6 +44,9 @@ pidfile /var/run/redis.pid
# If port 0 is specified Redis will not listen on a TCP socket. # If port 0 is specified Redis will not listen on a TCP socket.
port 6379 port 6379
# TCP listen() backlog
backlog 511
# By default Redis listens for connections from all the network interfaces # By default Redis listens for connections from all the network interfaces
# available on the server. It is possible to listen to just one or multiple # available on the server. It is possible to listen to just one or multiple
# interfaces using the "bind" configuration directive, followed by one or # interfaces using the "bind" configuration directive, followed by one or

View File

@ -361,17 +361,14 @@ int anetWrite(int fd, char *buf, int count)
return totlen; return totlen;
} }
static int anetListen(char *err, int s, struct sockaddr *sa, socklen_t len) { static int anetListen(char *err, int s, struct sockaddr *sa, socklen_t len, int backlog) {
if (bind(s,sa,len) == -1) { if (bind(s,sa,len) == -1) {
anetSetError(err, "bind: %s", strerror(errno)); anetSetError(err, "bind: %s", strerror(errno));
close(s); close(s);
return ANET_ERR; return ANET_ERR;
} }
/* Use a backlog of 512 entries. We pass 511 to the listen() call because if (listen(s, backlog) == -1) {
* the kernel does: backlogsize = roundup_pow_of_two(backlogsize + 1);
* which will thus give us a backlog of 512 entries */
if (listen(s, 511) == -1) {
anetSetError(err, "listen: %s", strerror(errno)); anetSetError(err, "listen: %s", strerror(errno));
close(s); close(s);
return ANET_ERR; return ANET_ERR;
@ -389,7 +386,7 @@ static int anetV6Only(char *err, int s) {
return ANET_OK; return ANET_OK;
} }
static int _anetTcpServer(char *err, int port, char *bindaddr, int af) static int _anetTcpServer(char *err, int port, char *bindaddr, int af, int backlog)
{ {
int s, rv; int s, rv;
char _port[6]; /* strlen("65535") */ char _port[6]; /* strlen("65535") */
@ -411,7 +408,7 @@ static int _anetTcpServer(char *err, int port, char *bindaddr, int af)
if (af == AF_INET6 && anetV6Only(err,s) == ANET_ERR) goto error; if (af == AF_INET6 && anetV6Only(err,s) == ANET_ERR) goto error;
if (anetSetReuseAddr(err,s) == ANET_ERR) goto error; if (anetSetReuseAddr(err,s) == ANET_ERR) goto error;
if (anetListen(err,s,p->ai_addr,p->ai_addrlen) == ANET_ERR) goto error; if (anetListen(err,s,p->ai_addr,p->ai_addrlen,backlog) == ANET_ERR) goto error;
goto end; goto end;
} }
if (p == NULL) { if (p == NULL) {
@ -426,17 +423,17 @@ end:
return s; return s;
} }
int anetTcpServer(char *err, int port, char *bindaddr) int anetTcpServer(char *err, int port, char *bindaddr, int backlog)
{ {
return _anetTcpServer(err, port, bindaddr, AF_INET); return _anetTcpServer(err, port, bindaddr, AF_INET, backlog);
} }
int anetTcp6Server(char *err, int port, char *bindaddr) int anetTcp6Server(char *err, int port, char *bindaddr, int backlog)
{ {
return _anetTcpServer(err, port, bindaddr, AF_INET6); return _anetTcpServer(err, port, bindaddr, AF_INET6, backlog);
} }
int anetUnixServer(char *err, char *path, mode_t perm) int anetUnixServer(char *err, char *path, mode_t perm, int backlog)
{ {
int s; int s;
struct sockaddr_un sa; struct sockaddr_un sa;
@ -447,7 +444,7 @@ int anetUnixServer(char *err, char *path, mode_t perm)
memset(&sa,0,sizeof(sa)); memset(&sa,0,sizeof(sa));
sa.sun_family = AF_LOCAL; sa.sun_family = AF_LOCAL;
strncpy(sa.sun_path,path,sizeof(sa.sun_path)-1); strncpy(sa.sun_path,path,sizeof(sa.sun_path)-1);
if (anetListen(err,s,(struct sockaddr*)&sa,sizeof(sa)) == ANET_ERR) if (anetListen(err,s,(struct sockaddr*)&sa,sizeof(sa),backlog) == ANET_ERR)
return ANET_ERR; return ANET_ERR;
if (perm) if (perm)
chmod(sa.sun_path, perm); chmod(sa.sun_path, perm);

View File

@ -50,9 +50,9 @@ int anetUnixNonBlockConnect(char *err, char *path);
int anetRead(int fd, char *buf, int count); int anetRead(int fd, char *buf, int count);
int anetResolve(char *err, char *host, char *ipbuf, size_t ipbuf_len); int anetResolve(char *err, char *host, char *ipbuf, size_t ipbuf_len);
int anetResolveIP(char *err, char *host, char *ipbuf, size_t ipbuf_len); int anetResolveIP(char *err, char *host, char *ipbuf, size_t ipbuf_len);
int anetTcpServer(char *err, int port, char *bindaddr); int anetTcpServer(char *err, int port, char *bindaddr, int backlog);
int anetTcp6Server(char *err, int port, char *bindaddr); int anetTcp6Server(char *err, int port, char *bindaddr, int backlog);
int anetUnixServer(char *err, char *path, mode_t perm); int anetUnixServer(char *err, char *path, mode_t perm, int backlog);
int anetTcpAccept(char *err, int serversock, char *ip, size_t ip_len, int *port); int anetTcpAccept(char *err, int serversock, char *ip, size_t ip_len, int *port);
int anetUnixAccept(char *err, int serversock); int anetUnixAccept(char *err, int serversock);
int anetWrite(int fd, char *buf, int count); int anetWrite(int fd, char *buf, int count);

View File

@ -127,6 +127,11 @@ void loadServerConfigFromString(char *config) {
if (server.port < 0 || server.port > 65535) { if (server.port < 0 || server.port > 65535) {
err = "Invalid port"; goto loaderr; err = "Invalid port"; goto loaderr;
} }
} else if (!strcasecmp(argv[0],"backlog") && argc == 2) {
server.backlog = atoi(argv[1]);
if (server.backlog < 0) {
err = "Invalid backlog value"; goto loaderr;
}
} else if (!strcasecmp(argv[0],"bind") && argc >= 2) { } else if (!strcasecmp(argv[0],"bind") && argc >= 2) {
int j, addresses = argc-1; int j, addresses = argc-1;
@ -975,6 +980,7 @@ void configGetCommand(redisClient *c) {
config_get_numerical_field("slowlog-max-len", config_get_numerical_field("slowlog-max-len",
server.slowlog_max_len); server.slowlog_max_len);
config_get_numerical_field("port",server.port); config_get_numerical_field("port",server.port);
config_get_numerical_field("backlog",server.backlog);
config_get_numerical_field("databases",server.dbnum); config_get_numerical_field("databases",server.dbnum);
config_get_numerical_field("repl-ping-slave-period",server.repl_ping_slave_period); config_get_numerical_field("repl-ping-slave-period",server.repl_ping_slave_period);
config_get_numerical_field("repl-timeout",server.repl_timeout); config_get_numerical_field("repl-timeout",server.repl_timeout);
@ -1695,6 +1701,7 @@ int rewriteConfig(char *path) {
rewriteConfigYesNoOption(state,"daemonize",server.daemonize,0); rewriteConfigYesNoOption(state,"daemonize",server.daemonize,0);
rewriteConfigStringOption(state,"pidfile",server.pidfile,REDIS_DEFAULT_PID_FILE); rewriteConfigStringOption(state,"pidfile",server.pidfile,REDIS_DEFAULT_PID_FILE);
rewriteConfigNumericalOption(state,"port",server.port,REDIS_SERVERPORT); rewriteConfigNumericalOption(state,"port",server.port,REDIS_SERVERPORT);
rewriteConfigNumericalOption(state,"backlog",server.backlog,REDIS_BACKLOG);
rewriteConfigBindOption(state); rewriteConfigBindOption(state);
rewriteConfigStringOption(state,"unixsocket",server.unixsocket,NULL); rewriteConfigStringOption(state,"unixsocket",server.unixsocket,NULL);
rewriteConfigOctalOption(state,"unixsocketperm",server.unixsocketperm,REDIS_DEFAULT_UNIX_SOCKET_PERM); rewriteConfigOctalOption(state,"unixsocketperm",server.unixsocketperm,REDIS_DEFAULT_UNIX_SOCKET_PERM);

View File

@ -1332,6 +1332,7 @@ void initServerConfig() {
server.runid[REDIS_RUN_ID_SIZE] = '\0'; server.runid[REDIS_RUN_ID_SIZE] = '\0';
server.arch_bits = (sizeof(long) == 8) ? 64 : 32; server.arch_bits = (sizeof(long) == 8) ? 64 : 32;
server.port = REDIS_SERVERPORT; server.port = REDIS_SERVERPORT;
server.backlog = REDIS_BACKLOG;
server.bindaddr_count = 0; server.bindaddr_count = 0;
server.unixsocket = NULL; server.unixsocket = NULL;
server.unixsocketperm = REDIS_DEFAULT_UNIX_SOCKET_PERM; server.unixsocketperm = REDIS_DEFAULT_UNIX_SOCKET_PERM;
@ -1540,9 +1541,9 @@ int listenToPort(int port, int *fds, int *count) {
if (server.bindaddr[j] == NULL) { if (server.bindaddr[j] == NULL) {
/* Bind * for both IPv6 and IPv4, we enter here only if /* Bind * for both IPv6 and IPv4, we enter here only if
* server.bindaddr_count == 0. */ * server.bindaddr_count == 0. */
fds[*count] = anetTcp6Server(server.neterr,port,NULL); fds[*count] = anetTcp6Server(server.neterr,port,NULL, server.backlog);
if (fds[*count] != ANET_ERR) (*count)++; if (fds[*count] != ANET_ERR) (*count)++;
fds[*count] = anetTcpServer(server.neterr,port,NULL); fds[*count] = anetTcpServer(server.neterr,port,NULL, server.backlog);
if (fds[*count] != ANET_ERR) (*count)++; if (fds[*count] != ANET_ERR) (*count)++;
/* Exit the loop if we were able to bind * on IPv4 or IPv6, /* Exit the loop if we were able to bind * on IPv4 or IPv6,
* otherwise fds[*count] will be ANET_ERR and we'll print an * otherwise fds[*count] will be ANET_ERR and we'll print an
@ -1550,10 +1551,10 @@ int listenToPort(int port, int *fds, int *count) {
if (*count) break; if (*count) break;
} else if (strchr(server.bindaddr[j],':')) { } else if (strchr(server.bindaddr[j],':')) {
/* Bind IPv6 address. */ /* Bind IPv6 address. */
fds[*count] = anetTcp6Server(server.neterr,port,server.bindaddr[j]); fds[*count] = anetTcp6Server(server.neterr,port,server.bindaddr[j], server.backlog);
} else { } else {
/* Bind IPv4 address. */ /* Bind IPv4 address. */
fds[*count] = anetTcpServer(server.neterr,port,server.bindaddr[j]); fds[*count] = anetTcpServer(server.neterr,port,server.bindaddr[j], server.backlog);
} }
if (fds[*count] == ANET_ERR) { if (fds[*count] == ANET_ERR) {
redisLog(REDIS_WARNING, redisLog(REDIS_WARNING,
@ -1603,7 +1604,7 @@ void initServer() {
/* Open the listening Unix domain socket. */ /* Open the listening Unix domain socket. */
if (server.unixsocket != NULL) { if (server.unixsocket != NULL) {
unlink(server.unixsocket); /* don't care if this fails */ unlink(server.unixsocket); /* don't care if this fails */
server.sofd = anetUnixServer(server.neterr,server.unixsocket,server.unixsocketperm); server.sofd = anetUnixServer(server.neterr,server.unixsocket,server.unixsocketperm, server.backlog);
if (server.sofd == ANET_ERR) { if (server.sofd == ANET_ERR) {
redisLog(REDIS_WARNING, "Opening socket: %s", server.neterr); redisLog(REDIS_WARNING, "Opening socket: %s", server.neterr);
exit(1); exit(1);

View File

@ -71,6 +71,7 @@
#define REDIS_MIN_HZ 1 #define REDIS_MIN_HZ 1
#define REDIS_MAX_HZ 500 #define REDIS_MAX_HZ 500
#define REDIS_SERVERPORT 6379 /* TCP port */ #define REDIS_SERVERPORT 6379 /* TCP port */
#define REDIS_BACKLOG 511 /* TCP listen backlog */
#define REDIS_MAXIDLETIME 0 /* default client timeout: infinite */ #define REDIS_MAXIDLETIME 0 /* default client timeout: infinite */
#define REDIS_DEFAULT_DBNUM 16 #define REDIS_DEFAULT_DBNUM 16
#define REDIS_CONFIGLINE_MAX 1024 #define REDIS_CONFIGLINE_MAX 1024
@ -608,6 +609,7 @@ struct redisServer {
int sentinel_mode; /* True if this instance is a Sentinel. */ int sentinel_mode; /* True if this instance is a Sentinel. */
/* Networking */ /* Networking */
int port; /* TCP listening port */ int port; /* TCP listening port */
int backlog; /* TCP listen backlog */
char *bindaddr[REDIS_BINDADDR_MAX]; /* Addresses we should bind to */ char *bindaddr[REDIS_BINDADDR_MAX]; /* Addresses we should bind to */
int bindaddr_count; /* Number of addresses in server.bindaddr[] */ int bindaddr_count; /* Number of addresses in server.bindaddr[] */
char *unixsocket; /* UNIX socket path */ char *unixsocket; /* UNIX socket path */