diff --git a/src/networking.c b/src/networking.c index 5dfaaf8ba..cd241dac2 100644 --- a/src/networking.c +++ b/src/networking.c @@ -2480,13 +2480,9 @@ int tio_debug = 0; #define SERVER_MAX_IO_THREADS 32 pthread_t io_threads[SERVER_MAX_IO_THREADS]; -pthread_mutex_t io_threads_done_mutex = PTHREAD_MUTEX_INITIALIZER; -pthread_cond_t io_threads_done_cond = PTHREAD_COND_INITIALIZER; -pthread_mutex_t io_threads_idle_mutex = PTHREAD_MUTEX_INITIALIZER; -pthread_cond_t io_threads_idle_cond = PTHREAD_COND_INITIALIZER; -pthread_cond_t io_threads_start_cond = PTHREAD_COND_INITIALIZER; -int io_threads_done = 0; /* Number of threads that completed the work. */ -int io_threads_idle = 0; /* Number of threads in idle state ready to go. */ +pthread_mutex_t io_threads_mutex[SERVER_MAX_IO_THREADS]; +_Atomic unsigned long io_threads_pending[SERVER_MAX_IO_THREADS]; +int io_threads_active; list *io_threads_list[SERVER_MAX_IO_THREADS]; void *IOThreadMain(void *myid) { @@ -2496,30 +2492,23 @@ void *IOThreadMain(void *myid) { while(1) { /* ... Wait for start ... */ - pthread_mutex_lock(&io_threads_idle_mutex); - io_threads_idle++; - pthread_cond_signal(&io_threads_idle_cond); - if (tio_debug) printf("[%ld] Waiting start...\n", id); - pthread_cond_wait(&io_threads_start_cond,&io_threads_idle_mutex); - if (tio_debug) printf("[%ld] Started\n", id); - pthread_mutex_unlock(&io_threads_idle_mutex); - if (tio_debug) printf("%d to handle\n", (int)listLength(io_threads_list[id])); + pthread_mutex_lock(&io_threads_mutex[id]); + if (io_threads_pending[id]) { + if (tio_debug) printf("[%ld] %d to handle\n", id, (int)listLength(io_threads_list[id])); - /* ... Process ... */ - listIter li; - listNode *ln; - listRewind(io_threads_list[id],&li); - while((ln = listNext(&li))) { - client *c = listNodeValue(ln); - writeToClient(c->fd,c,0); + /* ... Process ... */ + listIter li; + listNode *ln; + listRewind(io_threads_list[id],&li); + while((ln = listNext(&li))) { + client *c = listNodeValue(ln); + writeToClient(c->fd,c,0); + io_threads_pending[id]--; + } + listEmpty(io_threads_list[id]); } - listEmpty(io_threads_list[id]); - /* Report success. */ - pthread_mutex_lock(&io_threads_done_mutex); - io_threads_done++; - pthread_cond_signal(&io_threads_done_cond); - pthread_mutex_unlock(&io_threads_done_mutex); + pthread_mutex_unlock(&io_threads_mutex[id]); if (tio_debug) printf("[%ld] Done\n", id); } } @@ -2529,30 +2518,50 @@ void initThreadedIO(void) { pthread_t tid; server.io_threads_num = 4; + io_threads_active = 0; /* We start with threads not active. */ for (int i = 0; i < server.io_threads_num; i++) { + pthread_mutex_init(&io_threads_mutex[i],NULL); + io_threads_pending[i] = 0; + io_threads_list[i] = listCreate(); + pthread_mutex_lock(&io_threads_mutex[i]); /* Thread will be stopped. */ if (pthread_create(&tid,NULL,IOThreadMain,(void*)(long)i) != 0) { serverLog(LL_WARNING,"Fatal: Can't initialize IO thread."); exit(1); } io_threads[i] = tid; - io_threads_list[i] = listCreate(); } } +void startThreadedIO(void) { + if (tio_debug) printf("--- STARTING THREADED IO ---\n"); + serverAssert(io_threads_active == 0); + for (int j = 0; j < server.io_threads_num; j++) + pthread_mutex_unlock(&io_threads_mutex[j]); + io_threads_active = 1; +} + +void stopThreadedIO(void) { + if (tio_debug) printf("--- STOPPING THREADED IO ---\n"); + serverAssert(io_threads_active == 1); + for (int j = 0; j < server.io_threads_num; j++) + pthread_mutex_lock(&io_threads_mutex[j]); + io_threads_active = 0; +} + int handleClientsWithPendingWritesUsingThreads(void) { int processed = listLength(server.clients_pending_write); if (processed == 0) return 0; /* Return ASAP if there are no clients. */ - if (tio_debug) printf("%d TOTAL\n", processed); - - /* Wait for all threads to be ready. */ - pthread_mutex_lock(&io_threads_idle_mutex); - while(io_threads_idle < server.io_threads_num) { - pthread_cond_wait(&io_threads_idle_cond,&io_threads_idle_mutex); + /* If we have just a few clients to serve, don't use I/O threads, but the + * boring synchronous code. */ + if (processed < (server.io_threads_num*2)) { + if (io_threads_active) stopThreadedIO(); + return handleClientsWithPendingWrites(); + } else { + if (!io_threads_active) startThreadedIO(); } - if (tio_debug) printf("All threads are idle: %d\n", io_threads_idle); - io_threads_idle = 0; - pthread_mutex_unlock(&io_threads_idle_mutex); + + if (tio_debug) printf("%d TOTAL pending clients\n", processed); /* Distribute the clients across N different lists. */ listIter li; @@ -2563,23 +2572,20 @@ int handleClientsWithPendingWritesUsingThreads(void) { client *c = listNodeValue(ln); c->flags &= ~CLIENT_PENDING_WRITE; int target_id = item_id % server.io_threads_num; + pthread_mutex_lock(&io_threads_mutex[target_id]); listAddNodeTail(io_threads_list[target_id],c); + io_threads_pending[target_id]++; + pthread_mutex_unlock(&io_threads_mutex[target_id]); item_id++; } - /* Start all threads. */ - if (tio_debug) printf("Send start condition\n"); - pthread_mutex_lock(&io_threads_done_mutex); - io_threads_done = 0; - pthread_cond_broadcast(&io_threads_start_cond); - pthread_mutex_unlock(&io_threads_done_mutex); - /* Wait for all threads to end their work. */ - pthread_mutex_lock(&io_threads_done_mutex); - while(io_threads_done < server.io_threads_num) { - pthread_cond_wait(&io_threads_done_cond,&io_threads_done_mutex); + while(1) { + unsigned long pending = 0; + for (int j = 0; j < server.io_threads_num; j++) + pending += io_threads_pending[j]; + if (pending == 0) break; } - pthread_mutex_unlock(&io_threads_done_mutex); if (tio_debug) printf("All threads finshed\n"); /* Run the list of clients again to install the write handler where diff --git a/src/server.c b/src/server.c index c437880d5..de5a814d0 100644 --- a/src/server.c +++ b/src/server.c @@ -2072,8 +2072,6 @@ void beforeSleep(struct aeEventLoop *eventLoop) { flushAppendOnlyFile(0); /* Handle writes with pending output buffers. */ - /* XXX: Put a condition based on number of waiting clients: if we - * have less than a given number of clients, use non threaded code. */ handleClientsWithPendingWritesUsingThreads(); /* Close clients that need to be closed asynchronous */