Use different accept handlers for TCP and unix socket connections

This commit is contained in:
Pieter Noordhuis 2010-10-13 18:34:24 +02:00
parent 893819801d
commit ab17b909fe
5 changed files with 68 additions and 26 deletions

View File

@ -307,15 +307,10 @@ int anetUnixServer(char *err, char *path)
return s; return s;
} }
int anetAccept(char *err, int serversock, char *ip, int *port) static int anetGenericAccept(char *err, int s, struct sockaddr *sa, socklen_t *len) {
{
int fd; int fd;
struct sockaddr_in sa;
unsigned int saLen;
while(1) { while(1) {
saLen = sizeof(sa); fd = accept(s,sa,len);
fd = accept(serversock, (struct sockaddr*)&sa, &saLen);
if (fd == -1) { if (fd == -1) {
if (errno == EINTR) if (errno == EINTR)
continue; continue;
@ -326,7 +321,31 @@ int anetAccept(char *err, int serversock, char *ip, int *port)
} }
break; break;
} }
return fd;
}
int anetTcpAccept(char *err, int s, char *ip, int *port) {
int fd;
struct sockaddr_in sa;
socklen_t salen = sizeof(sa);
if ((fd = anetGenericAccept(err,s,(struct sockaddr*)&sa,&salen)) == ANET_ERR)
return ANET_ERR;
if (ip) strcpy(ip,inet_ntoa(sa.sin_addr)); if (ip) strcpy(ip,inet_ntoa(sa.sin_addr));
if (port) *port = ntohs(sa.sin_port); if (port) *port = ntohs(sa.sin_port);
return fd; return fd;
} }
int anetUnixAccept(char *err, int s, char *path, int len) {
int fd;
struct sockaddr_un sa;
socklen_t salen = sizeof(sa);
if ((fd = anetGenericAccept(err,s,(struct sockaddr*)&sa,&salen)) == ANET_ERR)
return ANET_ERR;
if (path) {
strncpy(path,sa.sun_path,len-1);
path[len-1] = 0;
}
return fd;
}

View File

@ -43,7 +43,8 @@ int anetRead(int fd, char *buf, int count);
int anetResolve(char *err, char *host, char *ipbuf); int anetResolve(char *err, char *host, char *ipbuf);
int anetTcpServer(char *err, int port, char *bindaddr); int anetTcpServer(char *err, int port, char *bindaddr);
int anetUnixServer(char *err, char *path); int anetUnixServer(char *err, char *path);
int anetAccept(char *err, int serversock, char *ip, int *port); int anetTcpAccept(char *err, int serversock, char *ip, int *port);
int anetUnixAccept(char *err, int serversock, char *path, int len);
int anetWrite(int fd, char *buf, int count); int anetWrite(int fd, char *buf, int count);
int anetNonBlock(char *err, int fd); int anetNonBlock(char *err, int fd);
int anetTcpNoDelay(char *err, int fd); int anetTcpNoDelay(char *err, int fd);

View File

@ -157,23 +157,11 @@ void addReplyBulkCString(redisClient *c, char *s) {
} }
} }
void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) { static void acceptCommonHandler(int fd) {
int cport, cfd;
char cip[128];
redisClient *c; redisClient *c;
REDIS_NOTUSED(el); if ((c = createClient(fd)) == NULL) {
REDIS_NOTUSED(mask);
REDIS_NOTUSED(privdata);
cfd = anetAccept(server.neterr, fd, cip, &cport);
if (cfd == AE_ERR) {
redisLog(REDIS_VERBOSE,"Accepting client connection: %s", server.neterr);
return;
}
redisLog(REDIS_VERBOSE,"Accepted %s:%d", cip, cport);
if ((c = createClient(cfd)) == NULL) {
redisLog(REDIS_WARNING,"Error allocating resoures for the client"); redisLog(REDIS_WARNING,"Error allocating resoures for the client");
close(cfd); /* May be already closed, just ingore errors */ close(fd); /* May be already closed, just ingore errors */
return; return;
} }
/* If maxclient directive is set and this is one client more... close the /* If maxclient directive is set and this is one client more... close the
@ -193,6 +181,39 @@ void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
server.stat_numconnections++; server.stat_numconnections++;
} }
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
int cport, cfd;
char cip[128];
REDIS_NOTUSED(el);
REDIS_NOTUSED(mask);
REDIS_NOTUSED(privdata);
cfd = anetTcpAccept(server.neterr, fd, cip, &cport);
if (cfd == AE_ERR) {
redisLog(REDIS_VERBOSE,"Accepting client connection: %s", server.neterr);
return;
}
redisLog(REDIS_VERBOSE,"Accepted %s:%d", cip, cport);
acceptCommonHandler(cfd);
}
void acceptUnixHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
int cfd;
char cpath[128];
REDIS_NOTUSED(el);
REDIS_NOTUSED(mask);
REDIS_NOTUSED(privdata);
cfd = anetUnixAccept(server.neterr, fd, cpath, sizeof(cpath));
if (cfd == AE_ERR) {
redisLog(REDIS_VERBOSE,"Accepting client connection: %s", server.neterr);
return;
}
redisLog(REDIS_VERBOSE,"Accepted connection to %s", server.unixsocket);
acceptCommonHandler(cfd);
}
static void freeClientArgv(redisClient *c) { static void freeClientArgv(redisClient *c) {
int j; int j;

View File

@ -820,9 +820,9 @@ void initServer() {
server.unixtime = time(NULL); server.unixtime = time(NULL);
aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL); aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL);
if (server.ipfd > 0 && aeCreateFileEvent(server.el,server.ipfd,AE_READABLE, if (server.ipfd > 0 && aeCreateFileEvent(server.el,server.ipfd,AE_READABLE,
acceptHandler,NULL) == AE_ERR) oom("creating file event"); acceptTcpHandler,NULL) == AE_ERR) oom("creating file event");
if (server.sofd > 0 && aeCreateFileEvent(server.el,server.sofd,AE_READABLE, if (server.sofd > 0 && aeCreateFileEvent(server.el,server.sofd,AE_READABLE,
acceptHandler,NULL) == AE_ERR) oom("creating file event"); acceptUnixHandler,NULL) == AE_ERR) oom("creating file event");
if (server.appendonly) { if (server.appendonly) {
server.appendfd = open(server.appendfilename,O_WRONLY|O_APPEND|O_CREAT,0644); server.appendfd = open(server.appendfilename,O_WRONLY|O_APPEND|O_CREAT,0644);

View File

@ -579,7 +579,8 @@ void sendReplyToClientWritev(aeEventLoop *el, int fd, void *privdata, int mask);
void addReply(redisClient *c, robj *obj); void addReply(redisClient *c, robj *obj);
void addReplySds(redisClient *c, sds s); void addReplySds(redisClient *c, sds s);
void processInputBuffer(redisClient *c); void processInputBuffer(redisClient *c);
void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask); void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask);
void acceptUnixHandler(aeEventLoop *el, int fd, void *privdata, int mask);
void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask); void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask);
void addReplyBulk(redisClient *c, robj *obj); void addReplyBulk(redisClient *c, robj *obj);
void addReplyBulkCString(redisClient *c, char *s); void addReplyBulkCString(redisClient *c, char *s);