#include "redis.h" #include "bio.h" #include "rio.h" #include #include #include #include #include #include #include void aofUpdateCurrentSize(void); void aof_background_fsync(int fd) { bioCreateBackgroundJob(REDIS_BIO_AOF_FSYNC,(void*)(long)fd,NULL,NULL); } /* Called when the user switches from "appendonly yes" to "appendonly no" * at runtime using the CONFIG command. */ void stopAppendOnly(void) { flushAppendOnlyFile(1); aof_fsync(server.appendfd); close(server.appendfd); server.appendfd = -1; server.appendseldb = -1; server.appendonly = 0; server.aof_wait_rewrite = 0; /* rewrite operation in progress? kill it, wait child exit */ if (server.bgrewritechildpid != -1) { int statloc; if (kill(server.bgrewritechildpid,SIGKILL) != -1) wait3(&statloc,0,NULL); /* reset the buffer accumulating changes while the child saves */ sdsfree(server.bgrewritebuf); server.bgrewritebuf = sdsempty(); aofRemoveTempFile(server.bgrewritechildpid); server.bgrewritechildpid = -1; } } /* Called when the user switches from "appendonly no" to "appendonly yes" * at runtime using the CONFIG command. */ int startAppendOnly(void) { server.appendonly = 1; server.lastfsync = time(NULL); server.appendfd = open(server.appendfilename,O_WRONLY|O_APPEND|O_CREAT,0644); if (server.appendfd == -1) { redisLog(REDIS_WARNING,"Redis needs to enable the AOF but can't open the append only file: %s",strerror(errno)); return REDIS_ERR; } if (rewriteAppendOnlyFileBackground() == REDIS_ERR) { server.appendonly = 0; close(server.appendfd); redisLog(REDIS_WARNING,"Redis needs to enable the AOF but can't trigger a background AOF rewrite operation. Check the above logs for more info about the error."); return REDIS_ERR; } /* We correctly switched on AOF, now wait for the rerwite to be complete * in order to append data on disk. */ server.aof_wait_rewrite = 1; return REDIS_OK; } /* Write the append only file buffer on disk. * * Since we are required to write the AOF before replying to the client, * and the only way the client socket can get a write is entering when the * the event loop, we accumulate all the AOF writes in a memory * buffer and write it on disk using this function just before entering * the event loop again. * * About the 'force' argument: * * When the fsync policy is set to 'everysec' we may delay the flush if there * is still an fsync() going on in the background thread, since for instance * on Linux write(2) will be blocked by the background fsync anyway. * When this happens we remember that there is some aof buffer to be * flushed ASAP, and will try to do that in the serverCron() function. * * However if force is set to 1 we'll write regardless of the background * fsync. */ void flushAppendOnlyFile(int force) { ssize_t nwritten; int sync_in_progress = 0; if (sdslen(server.aofbuf) == 0) return; if (server.appendfsync == APPENDFSYNC_EVERYSEC) sync_in_progress = bioPendingJobsOfType(REDIS_BIO_AOF_FSYNC) != 0; if (server.appendfsync == APPENDFSYNC_EVERYSEC && !force) { /* With this append fsync policy we do background fsyncing. * If the fsync is still in progress we can try to delay * the write for a couple of seconds. */ if (sync_in_progress) { if (server.aof_flush_postponed_start == 0) { /* No previous write postponinig, remember that we are * postponing the flush and return. */ server.aof_flush_postponed_start = server.unixtime; return; } else if (server.unixtime - server.aof_flush_postponed_start < 2) { /* We were already waiting for fsync to finish, but for less * than two seconds this is still ok. Postpone again. */ return; } /* Otherwise fall trough, and go write since we can't wait * over two seconds. */ redisLog(REDIS_NOTICE,"Asynchronous AOF fsync is taking too long (disk is busy?). Writing the AOF buffer without waiting for fsync to complete, this may slow down Redis."); } } /* If you are following this code path, then we are going to write so * set reset the postponed flush sentinel to zero. */ server.aof_flush_postponed_start = 0; /* We want to perform a single write. This should be guaranteed atomic * at least if the filesystem we are writing is a real physical one. * While this will save us against the server being killed I don't think * there is much to do about the whole server stopping for power problems * or alike */ nwritten = write(server.appendfd,server.aofbuf,sdslen(server.aofbuf)); if (nwritten != (signed)sdslen(server.aofbuf)) { /* Ooops, we are in troubles. The best thing to do for now is * aborting instead of giving the illusion that everything is * working as expected. */ if (nwritten == -1) { redisLog(REDIS_WARNING,"Exiting on error writing to the append-only file: %s",strerror(errno)); } else { redisLog(REDIS_WARNING,"Exiting on short write while writing to the append-only file: %s",strerror(errno)); } exit(1); } server.appendonly_current_size += nwritten; /* Re-use AOF buffer when it is small enough. The maximum comes from the * arena size of 4k minus some overhead (but is otherwise arbitrary). */ if ((sdslen(server.aofbuf)+sdsavail(server.aofbuf)) < 4000) { sdsclear(server.aofbuf); } else { sdsfree(server.aofbuf); server.aofbuf = sdsempty(); } /* Don't fsync if no-appendfsync-on-rewrite is set to yes and there are * children doing I/O in the background. */ if (server.no_appendfsync_on_rewrite && (server.bgrewritechildpid != -1 || server.bgsavechildpid != -1)) return; /* Perform the fsync if needed. */ if (server.appendfsync == APPENDFSYNC_ALWAYS) { /* aof_fsync is defined as fdatasync() for Linux in order to avoid * flushing metadata. */ aof_fsync(server.appendfd); /* Let's try to get this data on the disk */ server.lastfsync = server.unixtime; } else if ((server.appendfsync == APPENDFSYNC_EVERYSEC && server.unixtime > server.lastfsync)) { if (!sync_in_progress) aof_background_fsync(server.appendfd); server.lastfsync = server.unixtime; } } sds catAppendOnlyGenericCommand(sds dst, int argc, robj **argv) { char buf[32]; int len, j; robj *o; buf[0] = '*'; len = 1+ll2string(buf+1,sizeof(buf)-1,argc); buf[len++] = '\r'; buf[len++] = '\n'; dst = sdscatlen(dst,buf,len); for (j = 0; j < argc; j++) { o = getDecodedObject(argv[j]); buf[0] = '$'; len = 1+ll2string(buf+1,sizeof(buf)-1,sdslen(o->ptr)); buf[len++] = '\r'; buf[len++] = '\n'; dst = sdscatlen(dst,buf,len); dst = sdscatlen(dst,o->ptr,sdslen(o->ptr)); dst = sdscatlen(dst,"\r\n",2); decrRefCount(o); } return dst; } /* Create the sds representation of an PEXPIREAT command, using * 'seconds' as time to live and 'cmd' to understand what command * we are translating into a PEXPIREAT. * * This command is used in order to translate EXPIRE and PEXPIRE commands * into PEXPIREAT command so that we retain precision in the append only * file, and the time is always absolute and not relative. */ sds catAppendOnlyExpireAtCommand(sds buf, struct redisCommand *cmd, robj *key, robj *seconds) { long long when; robj *argv[3]; /* Make sure we can use strtol */ seconds = getDecodedObject(seconds); when = strtoll(seconds->ptr,NULL,10); /* Convert argument into milliseconds for EXPIRE, SETEX, EXPIREAT */ if (cmd->proc == expireCommand || cmd->proc == setexCommand || cmd->proc == expireatCommand) { when *= 1000; } /* Convert into absolute time for EXPIRE, PEXPIRE, SETEX, PSETEX */ if (cmd->proc == expireCommand || cmd->proc == pexpireCommand || cmd->proc == setexCommand || cmd->proc == psetexCommand) { when += mstime(); } decrRefCount(seconds); argv[0] = createStringObject("PEXPIREAT",9); argv[1] = key; argv[2] = createStringObjectFromLongLong(when); buf = catAppendOnlyGenericCommand(buf, 3, argv); decrRefCount(argv[0]); decrRefCount(argv[2]); return buf; } void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc) { sds buf = sdsempty(); robj *tmpargv[3]; /* The DB this command was targetting is not the same as the last command * we appendend. To issue a SELECT command is needed. */ if (dictid != server.appendseldb) { char seldb[64]; snprintf(seldb,sizeof(seldb),"%d",dictid); buf = sdscatprintf(buf,"*2\r\n$6\r\nSELECT\r\n$%lu\r\n%s\r\n", (unsigned long)strlen(seldb),seldb); server.appendseldb = dictid; } if (cmd->proc == expireCommand || cmd->proc == pexpireCommand || cmd->proc == expireatCommand) { /* Translate EXPIRE/PEXPIRE/EXPIREAT into PEXPIREAT */ buf = catAppendOnlyExpireAtCommand(buf,cmd,argv[1],argv[2]); } else if (cmd->proc == setexCommand || cmd->proc == psetexCommand) { /* Translate SETEX/PSETEX to SET and PEXPIREAT */ tmpargv[0] = createStringObject("SET",3); tmpargv[1] = argv[1]; tmpargv[2] = argv[3]; buf = catAppendOnlyGenericCommand(buf,3,tmpargv); decrRefCount(tmpargv[0]); buf = catAppendOnlyExpireAtCommand(buf,cmd,argv[1],argv[2]); } else { /* All the other commands don't need translation or need the * same translation already operated in the command vector * for the replication itself. */ buf = catAppendOnlyGenericCommand(buf,argc,argv); } /* Append to the AOF buffer. This will be flushed on disk just before * of re-entering the event loop, so before the client will get a * positive reply about the operation performed. * * Note, we don't add stuff in the AOF buffer if aof_wait_rewrite is * non zero, as this means we are starting with a new AOF and the * current one is meaningless (this happens for instance after * a slave resyncs with its master). */ if (!server.aof_wait_rewrite) { server.aofbuf = sdscatlen(server.aofbuf,buf,sdslen(buf)); } /* If a background append only file rewriting is in progress we want to * accumulate the differences between the child DB and the current one * in a buffer, so that when the child process will do its work we * can append the differences to the new append only file. */ if (server.bgrewritechildpid != -1) server.bgrewritebuf = sdscatlen(server.bgrewritebuf,buf,sdslen(buf)); sdsfree(buf); } /* In Redis commands are always executed in the context of a client, so in * order to load the append only file we need to create a fake client. */ struct redisClient *createFakeClient(void) { struct redisClient *c = zmalloc(sizeof(*c)); selectDb(c,0); c->fd = -1; c->querybuf = sdsempty(); c->argc = 0; c->argv = NULL; c->bufpos = 0; c->flags = 0; /* We set the fake client as a slave waiting for the synchronization * so that Redis will not try to send replies to this client. */ c->replstate = REDIS_REPL_WAIT_BGSAVE_START; c->reply = listCreate(); c->watched_keys = listCreate(); listSetFreeMethod(c->reply,decrRefCount); listSetDupMethod(c->reply,dupClientReplyValue); initClientMultiState(c); return c; } void freeFakeClient(struct redisClient *c) { sdsfree(c->querybuf); listRelease(c->reply); listRelease(c->watched_keys); freeClientMultiState(c); zfree(c); } /* Replay the append log file. On error REDIS_OK is returned. On non fatal * error (the append only file is zero-length) REDIS_ERR is returned. On * fatal error an error message is logged and the program exists. */ int loadAppendOnlyFile(char *filename) { struct redisClient *fakeClient; FILE *fp = fopen(filename,"r"); struct redis_stat sb; int appendonly = server.appendonly; long loops = 0; if (fp && redis_fstat(fileno(fp),&sb) != -1 && sb.st_size == 0) { server.appendonly_current_size = 0; fclose(fp); return REDIS_ERR; } if (fp == NULL) { redisLog(REDIS_WARNING,"Fatal error: can't open the append log file for reading: %s",strerror(errno)); exit(1); } /* Temporarily disable AOF, to prevent EXEC from feeding a MULTI * to the same file we're about to read. */ server.appendonly = 0; fakeClient = createFakeClient(); startLoading(fp); while(1) { int argc, j; unsigned long len; robj **argv; char buf[128]; sds argsds; struct redisCommand *cmd; /* Serve the clients from time to time */ if (!(loops++ % 1000)) { loadingProgress(ftello(fp)); aeProcessEvents(server.el, AE_FILE_EVENTS|AE_DONT_WAIT); } if (fgets(buf,sizeof(buf),fp) == NULL) { if (feof(fp)) break; else goto readerr; } if (buf[0] != '*') goto fmterr; argc = atoi(buf+1); if (argc < 1) goto fmterr; argv = zmalloc(sizeof(robj*)*argc); for (j = 0; j < argc; j++) { if (fgets(buf,sizeof(buf),fp) == NULL) goto readerr; if (buf[0] != '$') goto fmterr; len = strtol(buf+1,NULL,10); argsds = sdsnewlen(NULL,len); if (len && fread(argsds,len,1,fp) == 0) goto fmterr; argv[j] = createObject(REDIS_STRING,argsds); if (fread(buf,2,1,fp) == 0) goto fmterr; /* discard CRLF */ } /* Command lookup */ cmd = lookupCommand(argv[0]->ptr); if (!cmd) { redisLog(REDIS_WARNING,"Unknown command '%s' reading the append only file", argv[0]->ptr); exit(1); } /* Run the command in the context of a fake client */ fakeClient->argc = argc; fakeClient->argv = argv; cmd->proc(fakeClient); /* The fake client should not have a reply */ redisAssert(fakeClient->bufpos == 0 && listLength(fakeClient->reply) == 0); /* The fake client should never get blocked */ redisAssert((fakeClient->flags & REDIS_BLOCKED) == 0); /* Clean up. Command code may have changed argv/argc so we use the * argv/argc of the client instead of the local variables. */ for (j = 0; j < fakeClient->argc; j++) decrRefCount(fakeClient->argv[j]); zfree(fakeClient->argv); } /* This point can only be reached when EOF is reached without errors. * If the client is in the middle of a MULTI/EXEC, log error and quit. */ if (fakeClient->flags & REDIS_MULTI) goto readerr; fclose(fp); freeFakeClient(fakeClient); server.appendonly = appendonly; stopLoading(); aofUpdateCurrentSize(); server.auto_aofrewrite_base_size = server.appendonly_current_size; return REDIS_OK; readerr: if (feof(fp)) { redisLog(REDIS_WARNING,"Unexpected end of file reading the append only file"); } else { redisLog(REDIS_WARNING,"Unrecoverable error reading the append only file: %s", strerror(errno)); } exit(1); fmterr: redisLog(REDIS_WARNING,"Bad file format reading the append only file: make a backup of your AOF file, then use ./redis-check-aof --fix "); exit(1); } /* Delegate writing an object to writing a bulk string or bulk long long. * This is not placed in rio.c since that adds the redis.h dependency. */ int rioWriteBulkObject(rio *r, robj *obj) { /* Avoid using getDecodedObject to help copy-on-write (we are often * in a child process when this function is called). */ if (obj->encoding == REDIS_ENCODING_INT) { return rioWriteBulkLongLong(r,(long)obj->ptr); } else if (obj->encoding == REDIS_ENCODING_RAW) { return rioWriteBulkString(r,obj->ptr,sdslen(obj->ptr)); } else { redisPanic("Unknown string encoding"); } } /* Emit the commands needed to rebuild a list object. * The function returns 0 on error, 1 on success. */ int rewriteListObject(rio *r, robj *key, robj *o) { long long count = 0, items = listTypeLength(o); if (o->encoding == REDIS_ENCODING_ZIPLIST) { unsigned char *zl = o->ptr; unsigned char *p = ziplistIndex(zl,0); unsigned char *vstr; unsigned int vlen; long long vlong; while(ziplistGet(p,&vstr,&vlen,&vlong)) { if (count == 0) { int cmd_items = (items > REDIS_AOFREWRITE_ITEMS_PER_CMD) ? REDIS_AOFREWRITE_ITEMS_PER_CMD : items; if (rioWriteBulkCount(r,'*',2+cmd_items) == 0) return 0; if (rioWriteBulkString(r,"RPUSH",5) == 0) return 0; if (rioWriteBulkObject(r,key) == 0) return 0; } if (vstr) { if (rioWriteBulkString(r,(char*)vstr,vlen) == 0) return 0; } else { if (rioWriteBulkLongLong(r,vlong) == 0) return 0; } p = ziplistNext(zl,p); if (++count == REDIS_AOFREWRITE_ITEMS_PER_CMD) count = 0; items--; } } else if (o->encoding == REDIS_ENCODING_LINKEDLIST) { list *list = o->ptr; listNode *ln; listIter li; listRewind(list,&li); while((ln = listNext(&li))) { robj *eleobj = listNodeValue(ln); if (count == 0) { int cmd_items = (items > REDIS_AOFREWRITE_ITEMS_PER_CMD) ? REDIS_AOFREWRITE_ITEMS_PER_CMD : items; if (rioWriteBulkCount(r,'*',2+cmd_items) == 0) return 0; if (rioWriteBulkString(r,"RPUSH",5) == 0) return 0; if (rioWriteBulkObject(r,key) == 0) return 0; } if (rioWriteBulkObject(r,eleobj) == 0) return 0; if (++count == REDIS_AOFREWRITE_ITEMS_PER_CMD) count = 0; items--; } } else { redisPanic("Unknown list encoding"); } return 1; } /* Emit the commands needed to rebuild a set object. * The function returns 0 on error, 1 on success. */ int rewriteSetObject(rio *r, robj *key, robj *o) { long long count = 0, items = setTypeSize(o); if (o->encoding == REDIS_ENCODING_INTSET) { int ii = 0; int64_t llval; while(intsetGet(o->ptr,ii++,&llval)) { if (count == 0) { int cmd_items = (items > REDIS_AOFREWRITE_ITEMS_PER_CMD) ? REDIS_AOFREWRITE_ITEMS_PER_CMD : items; if (rioWriteBulkCount(r,'*',2+cmd_items) == 0) return 0; if (rioWriteBulkString(r,"SADD",4) == 0) return 0; if (rioWriteBulkObject(r,key) == 0) return 0; } if (rioWriteBulkLongLong(r,llval) == 0) return 0; if (++count == REDIS_AOFREWRITE_ITEMS_PER_CMD) count = 0; items--; } } else if (o->encoding == REDIS_ENCODING_HT) { dictIterator *di = dictGetIterator(o->ptr); dictEntry *de; while((de = dictNext(di)) != NULL) { robj *eleobj = dictGetKey(de); if (count == 0) { int cmd_items = (items > REDIS_AOFREWRITE_ITEMS_PER_CMD) ? REDIS_AOFREWRITE_ITEMS_PER_CMD : items; if (rioWriteBulkCount(r,'*',2+cmd_items) == 0) return 0; if (rioWriteBulkString(r,"SADD",4) == 0) return 0; if (rioWriteBulkObject(r,key) == 0) return 0; } if (rioWriteBulkObject(r,eleobj) == 0) return 0; if (++count == REDIS_AOFREWRITE_ITEMS_PER_CMD) count = 0; items--; } dictReleaseIterator(di); } else { redisPanic("Unknown set encoding"); } return 1; } /* Emit the commands needed to rebuild a sorted set object. * The function returns 0 on error, 1 on success. */ int rewriteSortedSetObject(rio *r, robj *key, robj *o) { long long count = 0, items = zsetLength(o); if (o->encoding == REDIS_ENCODING_ZIPLIST) { unsigned char *zl = o->ptr; unsigned char *eptr, *sptr; unsigned char *vstr; unsigned int vlen; long long vll; double score; eptr = ziplistIndex(zl,0); redisAssert(eptr != NULL); sptr = ziplistNext(zl,eptr); redisAssert(sptr != NULL); while (eptr != NULL) { redisAssert(ziplistGet(eptr,&vstr,&vlen,&vll)); score = zzlGetScore(sptr); if (count == 0) { int cmd_items = (items > REDIS_AOFREWRITE_ITEMS_PER_CMD) ? REDIS_AOFREWRITE_ITEMS_PER_CMD : items; if (rioWriteBulkCount(r,'*',2+cmd_items*2) == 0) return 0; if (rioWriteBulkString(r,"ZADD",4) == 0) return 0; if (rioWriteBulkObject(r,key) == 0) return 0; } if (rioWriteBulkDouble(r,score) == 0) return 0; if (vstr != NULL) { if (rioWriteBulkString(r,(char*)vstr,vlen) == 0) return 0; } else { if (rioWriteBulkLongLong(r,vll) == 0) return 0; } zzlNext(zl,&eptr,&sptr); if (++count == REDIS_AOFREWRITE_ITEMS_PER_CMD) count = 0; items--; } } else if (o->encoding == REDIS_ENCODING_SKIPLIST) { zset *zs = o->ptr; dictIterator *di = dictGetIterator(zs->dict); dictEntry *de; while((de = dictNext(di)) != NULL) { robj *eleobj = dictGetKey(de); double *score = dictGetVal(de); if (count == 0) { int cmd_items = (items > REDIS_AOFREWRITE_ITEMS_PER_CMD) ? REDIS_AOFREWRITE_ITEMS_PER_CMD : items; if (rioWriteBulkCount(r,'*',2+cmd_items*2) == 0) return 0; if (rioWriteBulkString(r,"ZADD",4) == 0) return 0; if (rioWriteBulkObject(r,key) == 0) return 0; } if (rioWriteBulkDouble(r,*score) == 0) return 0; if (rioWriteBulkObject(r,eleobj) == 0) return 0; if (++count == REDIS_AOFREWRITE_ITEMS_PER_CMD) count = 0; items--; } dictReleaseIterator(di); } else { redisPanic("Unknown sorted zset encoding"); } return 1; } /* Emit the commands needed to rebuild a hash object. * The function returns 0 on error, 1 on success. */ int rewriteHashObject(rio *r, robj *key, robj *o) { long long count = 0, items = hashTypeLength(o); if (o->encoding == REDIS_ENCODING_ZIPMAP) { unsigned char *p = zipmapRewind(o->ptr); unsigned char *field, *val; unsigned int flen, vlen; while((p = zipmapNext(p,&field,&flen,&val,&vlen)) != NULL) { if (count == 0) { int cmd_items = (items > REDIS_AOFREWRITE_ITEMS_PER_CMD) ? REDIS_AOFREWRITE_ITEMS_PER_CMD : items; if (rioWriteBulkCount(r,'*',2+cmd_items*2) == 0) return 0; if (rioWriteBulkString(r,"HMSET",5) == 0) return 0; if (rioWriteBulkObject(r,key) == 0) return 0; } if (rioWriteBulkString(r,(char*)field,flen) == 0) return 0; if (rioWriteBulkString(r,(char*)val,vlen) == 0) return 0; if (++count == REDIS_AOFREWRITE_ITEMS_PER_CMD) count = 0; items--; } } else { dictIterator *di = dictGetIterator(o->ptr); dictEntry *de; while((de = dictNext(di)) != NULL) { robj *field = dictGetKey(de); robj *val = dictGetVal(de); if (count == 0) { int cmd_items = (items > REDIS_AOFREWRITE_ITEMS_PER_CMD) ? REDIS_AOFREWRITE_ITEMS_PER_CMD : items; if (rioWriteBulkCount(r,'*',2+cmd_items*2) == 0) return 0; if (rioWriteBulkString(r,"HMSET",5) == 0) return 0; if (rioWriteBulkObject(r,key) == 0) return 0; } if (rioWriteBulkObject(r,field) == 0) return 0; if (rioWriteBulkObject(r,val) == 0) return 0; if (++count == REDIS_AOFREWRITE_ITEMS_PER_CMD) count = 0; items--; } dictReleaseIterator(di); } return 1; } /* Write a sequence of commands able to fully rebuild the dataset into * "filename". Used both by REWRITEAOF and BGREWRITEAOF. * * In order to minimize the number of commands needed in the rewritten * log Redis uses variadic commands when possible, such as RPUSH, SADD * and ZADD. However at max REDIS_AOFREWRITE_ITEMS_PER_CMD items per time * are inserted using a single command. */ int rewriteAppendOnlyFile(char *filename) { dictIterator *di = NULL; dictEntry *de; rio aof; FILE *fp; char tmpfile[256]; int j; long long now = mstime(); /* Note that we have to use a different temp name here compared to the * one used by rewriteAppendOnlyFileBackground() function. */ snprintf(tmpfile,256,"temp-rewriteaof-%d.aof", (int) getpid()); fp = fopen(tmpfile,"w"); if (!fp) { redisLog(REDIS_WARNING, "Failed rewriting the append only file: %s", strerror(errno)); return REDIS_ERR; } rioInitWithFile(&aof,fp); for (j = 0; j < server.dbnum; j++) { char selectcmd[] = "*2\r\n$6\r\nSELECT\r\n"; redisDb *db = server.db+j; dict *d = db->dict; if (dictSize(d) == 0) continue; di = dictGetSafeIterator(d); if (!di) { fclose(fp); return REDIS_ERR; } /* SELECT the new DB */ if (rioWrite(&aof,selectcmd,sizeof(selectcmd)-1) == 0) goto werr; if (rioWriteBulkLongLong(&aof,j) == 0) goto werr; /* Iterate this DB writing every entry */ while((de = dictNext(di)) != NULL) { sds keystr; robj key, *o; long long expiretime; keystr = dictGetKey(de); o = dictGetVal(de); initStaticStringObject(key,keystr); expiretime = getExpire(db,&key); /* Save the key and associated value */ if (o->type == REDIS_STRING) { /* Emit a SET command */ char cmd[]="*3\r\n$3\r\nSET\r\n"; if (rioWrite(&aof,cmd,sizeof(cmd)-1) == 0) goto werr; /* Key and value */ if (rioWriteBulkObject(&aof,&key) == 0) goto werr; if (rioWriteBulkObject(&aof,o) == 0) goto werr; } else if (o->type == REDIS_LIST) { if (rewriteListObject(&aof,&key,o) == 0) goto werr; } else if (o->type == REDIS_SET) { if (rewriteSetObject(&aof,&key,o) == 0) goto werr; } else if (o->type == REDIS_ZSET) { if (rewriteSortedSetObject(&aof,&key,o) == 0) goto werr; } else if (o->type == REDIS_HASH) { if (rewriteHashObject(&aof,&key,o) == 0) goto werr; } else { redisPanic("Unknown object type"); } /* Save the expire time */ if (expiretime != -1) { char cmd[]="*3\r\n$9\r\nPEXPIREAT\r\n"; /* If this key is already expired skip it */ if (expiretime < now) continue; if (rioWrite(&aof,cmd,sizeof(cmd)-1) == 0) goto werr; if (rioWriteBulkObject(&aof,&key) == 0) goto werr; if (rioWriteBulkLongLong(&aof,expiretime) == 0) goto werr; } } dictReleaseIterator(di); } /* Make sure data will not remain on the OS's output buffers */ fflush(fp); aof_fsync(fileno(fp)); fclose(fp); /* Use RENAME to make sure the DB file is changed atomically only * if the generate DB file is ok. */ if (rename(tmpfile,filename) == -1) { redisLog(REDIS_WARNING,"Error moving temp append only file on the final destination: %s", strerror(errno)); unlink(tmpfile); return REDIS_ERR; } redisLog(REDIS_NOTICE,"SYNC append only file rewrite performed"); return REDIS_OK; werr: fclose(fp); unlink(tmpfile); redisLog(REDIS_WARNING,"Write error writing append only file on disk: %s", strerror(errno)); if (di) dictReleaseIterator(di); return REDIS_ERR; } /* This is how rewriting of the append only file in background works: * * 1) The user calls BGREWRITEAOF * 2) Redis calls this function, that forks(): * 2a) the child rewrite the append only file in a temp file. * 2b) the parent accumulates differences in server.bgrewritebuf. * 3) When the child finished '2a' exists. * 4) The parent will trap the exit code, if it's OK, will append the * data accumulated into server.bgrewritebuf into the temp file, and * finally will rename(2) the temp file in the actual file name. * The the new file is reopened as the new append only file. Profit! */ int rewriteAppendOnlyFileBackground(void) { pid_t childpid; long long start; if (server.bgrewritechildpid != -1) return REDIS_ERR; start = ustime(); if ((childpid = fork()) == 0) { char tmpfile[256]; /* Child */ if (server.ipfd > 0) close(server.ipfd); if (server.sofd > 0) close(server.sofd); snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) getpid()); if (rewriteAppendOnlyFile(tmpfile) == REDIS_OK) { _exit(0); } else { _exit(1); } } else { /* Parent */ server.stat_fork_time = ustime()-start; if (childpid == -1) { redisLog(REDIS_WARNING, "Can't rewrite append only file in background: fork: %s", strerror(errno)); return REDIS_ERR; } redisLog(REDIS_NOTICE, "Background append only file rewriting started by pid %d",childpid); server.aofrewrite_scheduled = 0; server.bgrewritechildpid = childpid; updateDictResizePolicy(); /* We set appendseldb to -1 in order to force the next call to the * feedAppendOnlyFile() to issue a SELECT command, so the differences * accumulated by the parent into server.bgrewritebuf will start * with a SELECT statement and it will be safe to merge. */ server.appendseldb = -1; return REDIS_OK; } return REDIS_OK; /* unreached */ } void bgrewriteaofCommand(redisClient *c) { if (server.bgrewritechildpid != -1) { addReplyError(c,"Background append only file rewriting already in progress"); } else if (server.bgsavechildpid != -1) { server.aofrewrite_scheduled = 1; addReplyStatus(c,"Background append only file rewriting scheduled"); } else if (rewriteAppendOnlyFileBackground() == REDIS_OK) { addReplyStatus(c,"Background append only file rewriting started"); } else { addReply(c,shared.err); } } void aofRemoveTempFile(pid_t childpid) { char tmpfile[256]; snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) childpid); unlink(tmpfile); } /* Update the server.appendonly_current_size filed explicitly using stat(2) * to check the size of the file. This is useful after a rewrite or after * a restart, normally the size is updated just adding the write length * to the current lenght, that is much faster. */ void aofUpdateCurrentSize(void) { struct redis_stat sb; if (redis_fstat(server.appendfd,&sb) == -1) { redisLog(REDIS_WARNING,"Unable to check the AOF length: %s", strerror(errno)); } else { server.appendonly_current_size = sb.st_size; } } /* A background append only file rewriting (BGREWRITEAOF) terminated its work. * Handle this. */ void backgroundRewriteDoneHandler(int exitcode, int bysignal) { if (!bysignal && exitcode == 0) { int newfd, oldfd; int nwritten; char tmpfile[256]; long long now = ustime(); redisLog(REDIS_NOTICE, "Background AOF rewrite terminated with success"); /* Flush the differences accumulated by the parent to the * rewritten AOF. */ snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int)server.bgrewritechildpid); newfd = open(tmpfile,O_WRONLY|O_APPEND); if (newfd == -1) { redisLog(REDIS_WARNING, "Unable to open the temporary AOF produced by the child: %s", strerror(errno)); goto cleanup; } nwritten = write(newfd,server.bgrewritebuf,sdslen(server.bgrewritebuf)); if (nwritten != (signed)sdslen(server.bgrewritebuf)) { if (nwritten == -1) { redisLog(REDIS_WARNING, "Error trying to flush the parent diff to the rewritten AOF: %s", strerror(errno)); } else { redisLog(REDIS_WARNING, "Short write trying to flush the parent diff to the rewritten AOF: %s", strerror(errno)); } close(newfd); goto cleanup; } redisLog(REDIS_NOTICE, "Parent diff successfully flushed to the rewritten AOF (%lu bytes)", nwritten); /* The only remaining thing to do is to rename the temporary file to * the configured file and switch the file descriptor used to do AOF * writes. We don't want close(2) or rename(2) calls to block the * server on old file deletion. * * There are two possible scenarios: * * 1) AOF is DISABLED and this was a one time rewrite. The temporary * file will be renamed to the configured file. When this file already * exists, it will be unlinked, which may block the server. * * 2) AOF is ENABLED and the rewritten AOF will immediately start * receiving writes. After the temporary file is renamed to the * configured file, the original AOF file descriptor will be closed. * Since this will be the last reference to that file, closing it * causes the underlying file to be unlinked, which may block the * server. * * To mitigate the blocking effect of the unlink operation (either * caused by rename(2) in scenario 1, or by close(2) in scenario 2), we * use a background thread to take care of this. First, we * make scenario 1 identical to scenario 2 by opening the target file * when it exists. The unlink operation after the rename(2) will then * be executed upon calling close(2) for its descriptor. Everything to * guarantee atomicity for this switch has already happened by then, so * we don't care what the outcome or duration of that close operation * is, as long as the file descriptor is released again. */ if (server.appendfd == -1) { /* AOF disabled */ /* Don't care if this fails: oldfd will be -1 and we handle that. * One notable case of -1 return is if the old file does * not exist. */ oldfd = open(server.appendfilename,O_RDONLY|O_NONBLOCK); } else { /* AOF enabled */ oldfd = -1; /* We'll set this to the current AOF filedes later. */ } /* Rename the temporary file. This will not unlink the target file if * it exists, because we reference it with "oldfd". */ if (rename(tmpfile,server.appendfilename) == -1) { redisLog(REDIS_WARNING, "Error trying to rename the temporary AOF: %s", strerror(errno)); close(newfd); if (oldfd != -1) close(oldfd); goto cleanup; } if (server.appendfd == -1) { /* AOF disabled, we don't need to set the AOF file descriptor * to this new file, so we can close it. */ close(newfd); } else { /* AOF enabled, replace the old fd with the new one. */ oldfd = server.appendfd; server.appendfd = newfd; if (server.appendfsync == APPENDFSYNC_ALWAYS) aof_fsync(newfd); else if (server.appendfsync == APPENDFSYNC_EVERYSEC) aof_background_fsync(newfd); server.appendseldb = -1; /* Make sure SELECT is re-issued */ aofUpdateCurrentSize(); server.auto_aofrewrite_base_size = server.appendonly_current_size; /* Clear regular AOF buffer since its contents was just written to * the new AOF from the background rewrite buffer. */ sdsfree(server.aofbuf); server.aofbuf = sdsempty(); } redisLog(REDIS_NOTICE, "Background AOF rewrite successful"); server.aof_wait_rewrite = 0; /* Asynchronously close the overwritten AOF. */ if (oldfd != -1) bioCreateBackgroundJob(REDIS_BIO_CLOSE_FILE,(void*)(long)oldfd,NULL,NULL); redisLog(REDIS_VERBOSE, "Background AOF rewrite signal handler took %lldus", ustime()-now); } else if (!bysignal && exitcode != 0) { redisLog(REDIS_WARNING, "Background AOF rewrite terminated with error"); } else { redisLog(REDIS_WARNING, "Background AOF rewrite terminated by signal %d", bysignal); } cleanup: sdsfree(server.bgrewritebuf); server.bgrewritebuf = sdsempty(); aofRemoveTempFile(server.bgrewritechildpid); server.bgrewritechildpid = -1; /* If we were waiting for an AOF rewrite before to start appending * to the AOF again (this happens both when the user switches on * AOF with CONFIG SET, and after a slave with AOF enabled syncs with * the master), but the rewrite failed (otherwise aof_wait_rewrite * would be zero), we need to schedule a new one. */ if (server.aof_wait_rewrite) server.aofrewrite_scheduled = 1; }