/* Background I/O service for Redis. * * This file implements operations that we need to perform in the background. * Currently there is only a single operation, that is a background close(2) * system call. This is needed as when the process is the last owner of a * reference to a file closing it means unlinking it, and the deletion of the * file is slow, blocking the server. * * In the future we'll either continue implementing new things we need or * we'll switch to libeio. However there are probably long term uses for this * file as we may want to put here Redis specific background tasks (for instance * it is not impossible that we'll need a non blocking FLUSHDB/FLUSHALL * implementation). * * DESIGN * ------ * * The design is trivial, we have a structure representing a job to perform * and a different thread and job queue for every job type. * Every thread wait for new jobs in its queue, and process every job * sequentially. * * Jobs of the same type are guaranteed to be processed from the least * recently inserted to the most recently inserted (older jobs processed * first). * * Currently there is no way for the creator of the job to be notified about * the completion of the operation, this will only be added when/if needed. * * ---------------------------------------------------------------------------- * * Copyright (c) 2009-2012, Salvatore Sanfilippo * All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * * Redistributions of source code must retain the above copyright notice, * this list of conditions and the following disclaimer. * * Redistributions in binary form must reproduce the above copyright * notice, this list of conditions and the following disclaimer in the * documentation and/or other materials provided with the distribution. * * Neither the name of Redis nor the names of its contributors may be used * to endorse or promote products derived from this software without * specific prior written permission. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE * POSSIBILITY OF SUCH DAMAGE. */ #include "server.h" #include "bio.h" static pthread_t bio_threads[BIO_NUM_OPS]; static pthread_mutex_t bio_mutex[BIO_NUM_OPS]; static pthread_cond_t bio_newjob_cond[BIO_NUM_OPS]; static pthread_cond_t bio_step_cond[BIO_NUM_OPS]; static list *bio_jobs[BIO_NUM_OPS]; /* The following array is used to hold the number of pending jobs for every * OP type. This allows us to export the bioPendingJobsOfType() API that is * useful when the main thread wants to perform some operation that may involve * objects shared with the background thread. The main thread will just wait * that there are no longer jobs of this type to be executed before performing * the sensible operation. This data is also useful for reporting. */ static unsigned long long bio_pending[BIO_NUM_OPS]; /* This structure represents a background Job. It is only used locally to this * file as the API does not expose the internals at all. */ struct bio_job { time_t time; /* Time at which the job was created. */ /* Job specific arguments pointers. If we need to pass more than three * arguments we can just pass a pointer to a structure or alike. */ void *arg1, *arg2, *arg3; }; void *bioProcessBackgroundJobs(void *arg); /* Make sure we have enough stack to perform all the things we do in the * main thread. */ #define REDIS_THREAD_STACK_SIZE (1024*1024*4) /* Initialize the background system, spawning the thread. */ void bioInit(void) { pthread_attr_t attr; pthread_t thread; size_t stacksize; int j; /* Initialization of state vars and objects */ for (j = 0; j < BIO_NUM_OPS; j++) { pthread_mutex_init(&bio_mutex[j],NULL); pthread_cond_init(&bio_newjob_cond[j],NULL); pthread_cond_init(&bio_step_cond[j],NULL); bio_jobs[j] = listCreate(); bio_pending[j] = 0; } /* Set the stack size as by default it may be small in some system */ pthread_attr_init(&attr); pthread_attr_getstacksize(&attr,&stacksize); if (!stacksize) stacksize = 1; /* The world is full of Solaris Fixes */ while (stacksize < REDIS_THREAD_STACK_SIZE) stacksize *= 2; pthread_attr_setstacksize(&attr, stacksize); /* Ready to spawn our threads. We use the single argument the thread * function accepts in order to pass the job ID the thread is * responsible of. */ for (j = 0; j < BIO_NUM_OPS; j++) { void *arg = (void*)(unsigned long) j; if (pthread_create(&thread,&attr,bioProcessBackgroundJobs,arg) != 0) { serverLog(LL_WARNING,"Fatal: Can't initialize Background Jobs."); exit(1); } bio_threads[j] = thread; } } void bioCreateBackgroundJob(int type, void *arg1, void *arg2, void *arg3) { struct bio_job *job = zmalloc(sizeof(*job)); job->time = time(NULL); job->arg1 = arg1; job->arg2 = arg2; job->arg3 = arg3; pthread_mutex_lock(&bio_mutex[type]); listAddNodeTail(bio_jobs[type],job); bio_pending[type]++; pthread_cond_signal(&bio_newjob_cond[type]); pthread_mutex_unlock(&bio_mutex[type]); } void *bioProcessBackgroundJobs(void *arg) { struct bio_job *job; unsigned long type = (unsigned long) arg; sigset_t sigset; /* Check that the type is within the right interval. */ if (type >= BIO_NUM_OPS) { serverLog(LL_WARNING, "Warning: bio thread started with wrong type %lu",type); return NULL; } /* Make the thread killable at any time, so that bioKillThreads() * can work reliably. */ pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL); pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL); pthread_mutex_lock(&bio_mutex[type]); /* Block SIGALRM so we are sure that only the main thread will * receive the watchdog signal. */ sigemptyset(&sigset); sigaddset(&sigset, SIGALRM); if (pthread_sigmask(SIG_BLOCK, &sigset, NULL)) serverLog(LL_WARNING, "Warning: can't mask SIGALRM in bio.c thread: %s", strerror(errno)); while(1) { listNode *ln; /* The loop always starts with the lock hold. */ if (listLength(bio_jobs[type]) == 0) { pthread_cond_wait(&bio_newjob_cond[type],&bio_mutex[type]); continue; } /* Pop the job from the queue. */ ln = listFirst(bio_jobs[type]); job = ln->value; /* It is now possible to unlock the background system as we know have * a stand alone job structure to process.*/ pthread_mutex_unlock(&bio_mutex[type]); /* Process the job accordingly to its type. */ if (type == BIO_CLOSE_FILE) { close((long)job->arg1); } else if (type == BIO_AOF_FSYNC) { aof_fsync((long)job->arg1); } else if (type == BIO_LAZY_FREE) { decrRefCount((robj*)job->arg1); } else { serverPanic("Wrong job type in bioProcessBackgroundJobs()."); } zfree(job); /* Unblock threads blocked on bioWaitStepOfType() if any. */ pthread_cond_broadcast(&bio_step_cond[type]); /* Lock again before reiterating the loop, if there are no longer * jobs to process we'll block again in pthread_cond_wait(). */ pthread_mutex_lock(&bio_mutex[type]); listDelNode(bio_jobs[type],ln); bio_pending[type]--; } } /* Return the number of pending jobs of the specified type. */ unsigned long long bioPendingJobsOfType(int type) { unsigned long long val; pthread_mutex_lock(&bio_mutex[type]); val = bio_pending[type]; pthread_mutex_unlock(&bio_mutex[type]); return val; } /* If there are pending jobs for the specified type, the function blocks * and waits that the next job was processed. Otherwise the function * does not block and returns ASAP. * * The function returns the number of jobs still to process of the * requested type. * * This function is useful when from another thread, we want to wait * a bio.c thread to do more work in a blocking way. */ unsigned long long bioWaitStepOfType(int type) { unsigned long long val; pthread_mutex_lock(&bio_mutex[type]); val = bio_pending[type]; if (val != 0) { pthread_cond_wait(&bio_step_cond[type],&bio_mutex[type]); val = bio_pending[type]; } pthread_mutex_unlock(&bio_mutex[type]); return val; } /* Kill the running bio threads in an unclean way. This function should be * used only when it's critical to stop the threads for some reason. * Currently Redis does this only on crash (for instance on SIGSEGV) in order * to perform a fast memory check without other threads messing with memory. */ void bioKillThreads(void) { int err, j; for (j = 0; j < BIO_NUM_OPS; j++) { if (pthread_cancel(bio_threads[j]) == 0) { if ((err = pthread_join(bio_threads[j],NULL)) != 0) { serverLog(LL_WARNING, "Bio thread for job type #%d can be joined: %s", j, strerror(err)); } else { serverLog(LL_WARNING, "Bio thread for job type #%d terminated",j); } } } }