diff --git a/TODO b/TODO index b5dabd3a2..2402a9d41 100644 --- a/TODO +++ b/TODO @@ -15,6 +15,8 @@ DISKSTORE TODO * Check that 00/00 and ff/ff exist at startup, otherwise exit with error. * Implement sync flush option, where data is written synchronously on disk when a command is executed. * Implement MULTI/EXEC as transaction abstract API to diskstore.c, with transaction_start, transaction_end, and a journal to recover. +* Stop BGSAVE thread on shutdown and any other condition where the child is killed during normal bgsave. +* Use a mutex to log on the file, so that we don't get overlapping messages, or even better make sure to use a single write against it. REPLICATION =========== diff --git a/src/aof.c b/src/aof.c index f5d04a62b..723d845f6 100644 --- a/src/aof.c +++ b/src/aof.c @@ -585,10 +585,7 @@ void aofRemoveTempFile(pid_t childpid) { /* A background append only file rewriting (BGREWRITEAOF) terminated its work. * Handle this. */ -void backgroundRewriteDoneHandler(int statloc) { - int exitcode = WEXITSTATUS(statloc); - int bysignal = WIFSIGNALED(statloc); - +void backgroundRewriteDoneHandler(int exitcode, int bysignal) { if (!bysignal && exitcode == 0) { int fd; char tmpfile[256]; @@ -636,7 +633,7 @@ void backgroundRewriteDoneHandler(int statloc) { } else { redisLog(REDIS_WARNING, "Background append only file rewriting terminated by signal %d", - WTERMSIG(statloc)); + bysitnal); } cleanup: sdsfree(server.bgrewritebuf); diff --git a/src/db.c b/src/db.c index 1242c8898..6276c992b 100644 --- a/src/db.c +++ b/src/db.c @@ -379,30 +379,6 @@ void typeCommand(redisClient *c) { addReplyStatus(c,type); } -void saveCommand(redisClient *c) { - if (server.bgsavechildpid != -1) { - addReplyError(c,"Background save already in progress"); - return; - } - if (rdbSave(server.dbfilename) == REDIS_OK) { - addReply(c,shared.ok); - } else { - addReply(c,shared.err); - } -} - -void bgsaveCommand(redisClient *c) { - if (server.bgsavechildpid != -1) { - addReplyError(c,"Background save already in progress"); - return; - } - if (rdbSaveBackground(server.dbfilename) == REDIS_OK) { - addReplyStatus(c,"Background saving started"); - } else { - addReply(c,shared.err); - } -} - void shutdownCommand(redisClient *c) { if (prepareForShutdown() == REDIS_OK) exit(0); diff --git a/src/diskstore.c b/src/diskstore.c index 26f3af607..0aa8e37fe 100644 --- a/src/diskstore.c +++ b/src/diskstore.c @@ -349,11 +349,16 @@ void dsFlushDb(int dbid) { } } -int dsRdbSave(char *filename) { - char tmpfile[256]; +void *dsRdbSave_thread(void *arg) { + char tmpfile[256], *filename = (char*)arg; int j, i; time_t now = time(NULL); + /* Change state to ACTIVE, to signal there is a saving thead working. */ + pthread_mutex_lock(&server.bgsavethread_mutex); + server.bgsavethread_state = REDIS_BGSAVE_THREAD_ACTIVE; + pthread_mutex_unlock(&server.bgsavethread_mutex); + snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid()); fp = fopen(tmpfile,"w"); if (!fp) { @@ -377,6 +382,7 @@ int dsRdbSave(char *filename) { fflush(fp); fsync(fileno(fp)); fclose(fp); + zfree(filename); /* Use RENAME to make sure the DB file is changed atomically only * if the generate DB file is ok. */ @@ -386,12 +392,24 @@ int dsRdbSave(char *filename) { return REDIS_ERR; } redisLog(REDIS_NOTICE,"DB saved on disk"); - server.dirty = 0; - server.lastsave = time(NULL); return REDIS_OK; werr: + zfree(filename); fclose(fp); unlink(tmpfile); redisLog(REDIS_WARNING,"Write error saving DB on disk: %s", strerror(errno)); } + +int dsRdbSave(char *filename) { + pthread_t thread; + + if (pthread_create(&thread,NULL,dsRdbSave_thread,zstrdup(filename)) != 0) { + redisLog(REDIS_WARNING,"Can't create diskstore BGSAVE thread: %s", + strerror(errno)); + return REDIS_ERR; + } else { + server.bgsavethread = thread; + return REDIS_OK; + } +} diff --git a/src/dscache.c b/src/dscache.c index 1adba6f56..1c419c6a7 100644 --- a/src/dscache.c +++ b/src/dscache.c @@ -132,6 +132,7 @@ void dsInit(void) { server.io_ready_clients = listCreate(); pthread_mutex_init(&server.io_mutex,NULL); pthread_cond_init(&server.io_condvar,NULL); + pthread_mutex_init(&server.bgsavethread_mutex,NULL); server.io_active_threads = 0; if (pipe(pipefds) == -1) { redisLog(REDIS_WARNING,"Unable to intialized DS: pipe(2): %s. Exiting." diff --git a/src/rdb.c b/src/rdb.c index 6b6b6ab64..62756d304 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -496,22 +496,23 @@ werr: int rdbSaveBackground(char *filename) { pid_t childpid; - if (server.bgsavechildpid != -1) return REDIS_ERR; + if (server.bgsavechildpid != -1 || + server.bgsavethread != (pthread_t) -1) return REDIS_ERR; server.dirty_before_bgsave = server.dirty; + if (server.ds_enabled) { + cacheForcePointInTime(); + return dsRdbSave(filename); + } + if ((childpid = fork()) == 0) { int retval; /* Child */ if (server.ipfd > 0) close(server.ipfd); if (server.sofd > 0) close(server.sofd); - if (server.ds_enabled) { - cacheForcePointInTime(); - dsRdbSave(filename); - } else { - rdbSave(filename); - } + retval = rdbSave(filename); _exit((retval == REDIS_OK) ? 0 : 1); } else { /* Parent */ @@ -950,10 +951,7 @@ eoferr: /* unexpected end of file is handled here with a fatal exit */ } /* A background saving child (BGSAVE) terminated its work. Handle this. */ -void backgroundSaveDoneHandler(int statloc) { - int exitcode = WEXITSTATUS(statloc); - int bysignal = WIFSIGNALED(statloc); - +void backgroundSaveDoneHandler(int exitcode, int bysignal) { if (!bysignal && exitcode == 0) { redisLog(REDIS_NOTICE, "Background saving terminated with success"); @@ -963,11 +961,37 @@ void backgroundSaveDoneHandler(int statloc) { redisLog(REDIS_WARNING, "Background saving error"); } else { redisLog(REDIS_WARNING, - "Background saving terminated by signal %d", WTERMSIG(statloc)); + "Background saving terminated by signal %d", bysignal); rdbRemoveTempFile(server.bgsavechildpid); } server.bgsavechildpid = -1; + server.bgsavethread = (pthread_t) -1; + server.bgsavethread_state = REDIS_BGSAVE_THREAD_UNACTIVE; /* Possibly there are slaves waiting for a BGSAVE in order to be served * (the first stage of SYNC is a bulk transfer of dump.rdb) */ updateSlavesWaitingBgsave(exitcode == 0 ? REDIS_OK : REDIS_ERR); } + +void saveCommand(redisClient *c) { + if (server.bgsavechildpid != -1 || server.bgsavethread != (pthread-t)-1) { + addReplyError(c,"Background save already in progress"); + return; + } + if (rdbSave(server.dbfilename) == REDIS_OK) { + addReply(c,shared.ok); + } else { + addReply(c,shared.err); + } +} + +void bgsaveCommand(redisClient *c) { + if (server.bgsavechildpid != -1 || server.bgsavethread != (pthread-t)-1) { + addReplyError(c,"Background save already in progress"); + return; + } + if (rdbSaveBackground(server.dbfilename) == REDIS_OK) { + addReplyStatus(c,"Background saving started"); + } else { + addReply(c,shared.err); + } +} diff --git a/src/redis.c b/src/redis.c index c0dac05fe..2fd3ee39e 100644 --- a/src/redis.c +++ b/src/redis.c @@ -589,13 +589,31 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { pid_t pid; if ((pid = wait3(&statloc,WNOHANG,NULL)) != 0) { + int exitcode = WEXITSTATUS(statloc); + int bysignal = 0; + + if (WIFSIGNALED(statloc)) bysignal = WTERMSIG(statloc); + if (pid == server.bgsavechildpid) { - backgroundSaveDoneHandler(statloc); + backgroundSaveDoneHandler(exitcode,bysignal); } else { - backgroundRewriteDoneHandler(statloc); + backgroundRewriteDoneHandler(exitcode,bysignal); } updateDictResizePolicy(); } + if (server.bgsavethread != (pthread_t) -1) { + int state; + + pthread_mutex_lock(&server.bgsavethread_mutex); + state = server.bgsavethread_state; + pthread_mutex_unlock(&server.bgsavethread_mutex); + + if (state == REDIS_BGSAVE_DONE_OK || state == REDIS_BGSAVE_DONE_ERR) + { + backgroundSaveDoneHandler( + (state == REDIS_BGSAVE_DONE_OK) ? 0 : 1, 0); + } + } } else if (!server.ds_enabled) { /* If there is not a background saving in progress check if * we have to save now */ @@ -867,6 +885,8 @@ void initServer() { server.cronloops = 0; server.bgsavechildpid = -1; server.bgrewritechildpid = -1; + server.bgsavethread_state = REDIS_BGSAVE_THREAD_UNACTIVE; + server.bgsavethread = (pthread_t) -1; server.bgrewritebuf = sdsempty(); server.aofbuf = sdsempty(); server.lastsave = time(NULL); diff --git a/src/redis.h b/src/redis.h index 495de9859..c87613349 100644 --- a/src/redis.h +++ b/src/redis.h @@ -203,6 +203,12 @@ #define REDIS_MAXMEMORY_ALLKEYS_RANDOM 4 #define REDIS_MAXMEMORY_NO_EVICTION 5 +/* Diskstore background saving thread states */ +#define REDIS_BGSAVE_THREAD_UNACTIVE 0 +#define REDIS_BGSAVE_THREAD_ACTIVE 1 +#define REDIS_BGSAVE_THREAD_DONE_OK 2 +#define REDIS_BGSAVE_THREAD_DONE_ERR 3 + /* We can print the stacktrace, so our assert is defined this way: */ #define redisAssert(_e) ((_e)?(void)0 : (_redisAssert(#_e,__FILE__,__LINE__),_exit(1))) #define redisPanic(_e) _redisPanic(#_e,__FILE__,__LINE__),_exit(1) @@ -390,25 +396,30 @@ struct redisServer { int appendfsync; int no_appendfsync_on_rewrite; int shutdown_asap; + int activerehashing; + char *requirepass; + /* Persistence */ time_t lastfsync; int appendfd; int appendseldb; char *pidfile; pid_t bgsavechildpid; pid_t bgrewritechildpid; + int bgsavethread_state; + pthread_mutex_t bgsavethread_mutex; + pthread_t bgsavethread; sds bgrewritebuf; /* buffer taken by parent during oppend only rewrite */ sds aofbuf; /* AOF buffer, written before entering the event loop */ struct saveparam *saveparams; int saveparamslen; + char *dbfilename; + int rdbcompression; + char *appendfilename; + /* Logging */ char *logfile; int syslog_enabled; char *syslog_ident; int syslog_facility; - char *dbfilename; - char *appendfilename; - char *requirepass; - int rdbcompression; - int activerehashing; /* Replication related */ int isslave; /* Slave specific fields */ @@ -745,7 +756,7 @@ int rdbSaveObject(FILE *fp, robj *o); off_t rdbSavedObjectLen(robj *o); off_t rdbSavedObjectPages(robj *o); robj *rdbLoadObject(int type, FILE *fp); -void backgroundSaveDoneHandler(int statloc); +void backgroundSaveDoneHandler(int exitcode, int bysignal) { int rdbSaveKeyValuePair(FILE *fp, redisDb *db, robj *key, robj *val, time_t now); int rdbLoadType(FILE *fp); time_t rdbLoadTime(FILE *fp); @@ -759,7 +770,7 @@ int rewriteAppendOnlyFileBackground(void); int loadAppendOnlyFile(char *filename); void stopAppendOnly(void); int startAppendOnly(void); -void backgroundRewriteDoneHandler(int statloc); +void backgroundRewriteDoneHandler(int exitcode, int bysignal); /* Sorted sets data type */ zskiplist *zslCreate(void);