From ab17b909fe46603f78c8305a3cf2f02a0c6de0a4 Mon Sep 17 00:00:00 2001 From: Pieter Noordhuis Date: Wed, 13 Oct 2010 18:34:24 +0200 Subject: [PATCH] Use different accept handlers for TCP and unix socket connections --- src/anet.c | 33 ++++++++++++++++++++++++------- src/anet.h | 3 ++- src/networking.c | 51 ++++++++++++++++++++++++++++++++++-------------- src/redis.c | 4 ++-- src/redis.h | 3 ++- 5 files changed, 68 insertions(+), 26 deletions(-) diff --git a/src/anet.c b/src/anet.c index 63002f6cf..99cda0119 100644 --- a/src/anet.c +++ b/src/anet.c @@ -307,15 +307,10 @@ int anetUnixServer(char *err, char *path) return s; } -int anetAccept(char *err, int serversock, char *ip, int *port) -{ +static int anetGenericAccept(char *err, int s, struct sockaddr *sa, socklen_t *len) { int fd; - struct sockaddr_in sa; - unsigned int saLen; - while(1) { - saLen = sizeof(sa); - fd = accept(serversock, (struct sockaddr*)&sa, &saLen); + fd = accept(s,sa,len); if (fd == -1) { if (errno == EINTR) continue; @@ -326,7 +321,31 @@ int anetAccept(char *err, int serversock, char *ip, int *port) } break; } + return fd; +} + +int anetTcpAccept(char *err, int s, char *ip, int *port) { + int fd; + struct sockaddr_in sa; + socklen_t salen = sizeof(sa); + if ((fd = anetGenericAccept(err,s,(struct sockaddr*)&sa,&salen)) == ANET_ERR) + return ANET_ERR; + if (ip) strcpy(ip,inet_ntoa(sa.sin_addr)); if (port) *port = ntohs(sa.sin_port); return fd; } + +int anetUnixAccept(char *err, int s, char *path, int len) { + int fd; + struct sockaddr_un sa; + socklen_t salen = sizeof(sa); + if ((fd = anetGenericAccept(err,s,(struct sockaddr*)&sa,&salen)) == ANET_ERR) + return ANET_ERR; + + if (path) { + strncpy(path,sa.sun_path,len-1); + path[len-1] = 0; + } + return fd; +} diff --git a/src/anet.h b/src/anet.h index 10db3d2fc..45a894641 100644 --- a/src/anet.h +++ b/src/anet.h @@ -43,7 +43,8 @@ int anetRead(int fd, char *buf, int count); int anetResolve(char *err, char *host, char *ipbuf); int anetTcpServer(char *err, int port, char *bindaddr); int anetUnixServer(char *err, char *path); -int anetAccept(char *err, int serversock, char *ip, int *port); +int anetTcpAccept(char *err, int serversock, char *ip, int *port); +int anetUnixAccept(char *err, int serversock, char *path, int len); int anetWrite(int fd, char *buf, int count); int anetNonBlock(char *err, int fd); int anetTcpNoDelay(char *err, int fd); diff --git a/src/networking.c b/src/networking.c index e5a669846..ae772c926 100644 --- a/src/networking.c +++ b/src/networking.c @@ -157,23 +157,11 @@ void addReplyBulkCString(redisClient *c, char *s) { } } -void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) { - int cport, cfd; - char cip[128]; +static void acceptCommonHandler(int fd) { redisClient *c; - REDIS_NOTUSED(el); - REDIS_NOTUSED(mask); - REDIS_NOTUSED(privdata); - - cfd = anetAccept(server.neterr, fd, cip, &cport); - if (cfd == AE_ERR) { - redisLog(REDIS_VERBOSE,"Accepting client connection: %s", server.neterr); - return; - } - redisLog(REDIS_VERBOSE,"Accepted %s:%d", cip, cport); - if ((c = createClient(cfd)) == NULL) { + if ((c = createClient(fd)) == NULL) { redisLog(REDIS_WARNING,"Error allocating resoures for the client"); - close(cfd); /* May be already closed, just ingore errors */ + close(fd); /* May be already closed, just ingore errors */ return; } /* If maxclient directive is set and this is one client more... close the @@ -193,6 +181,39 @@ void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) { server.stat_numconnections++; } +void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) { + int cport, cfd; + char cip[128]; + REDIS_NOTUSED(el); + REDIS_NOTUSED(mask); + REDIS_NOTUSED(privdata); + + cfd = anetTcpAccept(server.neterr, fd, cip, &cport); + if (cfd == AE_ERR) { + redisLog(REDIS_VERBOSE,"Accepting client connection: %s", server.neterr); + return; + } + redisLog(REDIS_VERBOSE,"Accepted %s:%d", cip, cport); + acceptCommonHandler(cfd); +} + +void acceptUnixHandler(aeEventLoop *el, int fd, void *privdata, int mask) { + int cfd; + char cpath[128]; + REDIS_NOTUSED(el); + REDIS_NOTUSED(mask); + REDIS_NOTUSED(privdata); + + cfd = anetUnixAccept(server.neterr, fd, cpath, sizeof(cpath)); + if (cfd == AE_ERR) { + redisLog(REDIS_VERBOSE,"Accepting client connection: %s", server.neterr); + return; + } + redisLog(REDIS_VERBOSE,"Accepted connection to %s", server.unixsocket); + acceptCommonHandler(cfd); +} + + static void freeClientArgv(redisClient *c) { int j; diff --git a/src/redis.c b/src/redis.c index 8bff97b51..0e9b73b77 100644 --- a/src/redis.c +++ b/src/redis.c @@ -820,9 +820,9 @@ void initServer() { server.unixtime = time(NULL); aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL); if (server.ipfd > 0 && aeCreateFileEvent(server.el,server.ipfd,AE_READABLE, - acceptHandler,NULL) == AE_ERR) oom("creating file event"); + acceptTcpHandler,NULL) == AE_ERR) oom("creating file event"); if (server.sofd > 0 && aeCreateFileEvent(server.el,server.sofd,AE_READABLE, - acceptHandler,NULL) == AE_ERR) oom("creating file event"); + acceptUnixHandler,NULL) == AE_ERR) oom("creating file event"); if (server.appendonly) { server.appendfd = open(server.appendfilename,O_WRONLY|O_APPEND|O_CREAT,0644); diff --git a/src/redis.h b/src/redis.h index bfdde1a05..38f0c140c 100644 --- a/src/redis.h +++ b/src/redis.h @@ -579,7 +579,8 @@ void sendReplyToClientWritev(aeEventLoop *el, int fd, void *privdata, int mask); void addReply(redisClient *c, robj *obj); void addReplySds(redisClient *c, sds s); void processInputBuffer(redisClient *c); -void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask); +void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask); +void acceptUnixHandler(aeEventLoop *el, int fd, void *privdata, int mask); void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask); void addReplyBulk(redisClient *c, robj *obj); void addReplyBulkCString(redisClient *c, char *s);