#include "redis.h" #include #include #include #include #include #include #include /* Called when the user switches from "appendonly yes" to "appendonly no" * at runtime using the CONFIG command. */ void stopAppendOnly(void) { flushAppendOnlyFile(); aof_fsync(server.appendfd); close(server.appendfd); server.appendfd = -1; server.appendseldb = -1; server.appendonly = 0; /* rewrite operation in progress? kill it, wait child exit */ if (server.bgsavechildpid != -1) { int statloc; if (kill(server.bgsavechildpid,SIGKILL) != -1) wait3(&statloc,0,NULL); /* reset the buffer accumulating changes while the child saves */ sdsfree(server.bgrewritebuf); server.bgrewritebuf = sdsempty(); server.bgsavechildpid = -1; } } /* Called when the user switches from "appendonly no" to "appendonly yes" * at runtime using the CONFIG command. */ int startAppendOnly(void) { server.appendonly = 1; server.lastfsync = time(NULL); server.appendfd = open(server.appendfilename,O_WRONLY|O_APPEND|O_CREAT,0644); if (server.appendfd == -1) { redisLog(REDIS_WARNING,"Used tried to switch on AOF via CONFIG, but I can't open the AOF file: %s",strerror(errno)); return REDIS_ERR; } if (rewriteAppendOnlyFileBackground() == REDIS_ERR) { server.appendonly = 0; close(server.appendfd); redisLog(REDIS_WARNING,"Used tried to switch on AOF via CONFIG, I can't trigger a background AOF rewrite operation. Check the above logs for more info about the error.",strerror(errno)); return REDIS_ERR; } return REDIS_OK; } /* Write the append only file buffer on disk. * * Since we are required to write the AOF before replying to the client, * and the only way the client socket can get a write is entering when the * the event loop, we accumulate all the AOF writes in a memory * buffer and write it on disk using this function just before entering * the event loop again. */ void flushAppendOnlyFile(void) { time_t now; ssize_t nwritten; if (sdslen(server.aofbuf) == 0) return; /* We want to perform a single write. This should be guaranteed atomic * at least if the filesystem we are writing is a real physical one. * While this will save us against the server being killed I don't think * there is much to do about the whole server stopping for power problems * or alike */ nwritten = write(server.appendfd,server.aofbuf,sdslen(server.aofbuf)); if (nwritten != (signed)sdslen(server.aofbuf)) { /* Ooops, we are in troubles. The best thing to do for now is * aborting instead of giving the illusion that everything is * working as expected. */ if (nwritten == -1) { redisLog(REDIS_WARNING,"Exiting on error writing to the append-only file: %s",strerror(errno)); } else { redisLog(REDIS_WARNING,"Exiting on short write while writing to the append-only file: %s",strerror(errno)); } exit(1); } sdsfree(server.aofbuf); server.aofbuf = sdsempty(); /* Don't Fsync if no-appendfsync-on-rewrite is set to yes and we have * childs performing heavy I/O on disk. */ if (server.no_appendfsync_on_rewrite && (server.bgrewritechildpid != -1 || server.bgsavechildpid != -1)) return; /* Fsync if needed */ now = time(NULL); if (server.appendfsync == APPENDFSYNC_ALWAYS || (server.appendfsync == APPENDFSYNC_EVERYSEC && now-server.lastfsync > 1)) { /* aof_fsync is defined as fdatasync() for Linux in order to avoid * flushing metadata. */ aof_fsync(server.appendfd); /* Let's try to get this data on the disk */ server.lastfsync = now; } } sds catAppendOnlyGenericCommand(sds buf, int argc, robj **argv) { int j; buf = sdscatprintf(buf,"*%d\r\n",argc); for (j = 0; j < argc; j++) { robj *o = getDecodedObject(argv[j]); buf = sdscatprintf(buf,"$%lu\r\n",(unsigned long)sdslen(o->ptr)); buf = sdscatlen(buf,o->ptr,sdslen(o->ptr)); buf = sdscatlen(buf,"\r\n",2); decrRefCount(o); } return buf; } sds catAppendOnlyExpireAtCommand(sds buf, robj *key, robj *seconds) { int argc = 3; long when; robj *argv[3]; /* Make sure we can use strtol */ seconds = getDecodedObject(seconds); when = time(NULL)+strtol(seconds->ptr,NULL,10); decrRefCount(seconds); argv[0] = createStringObject("EXPIREAT",8); argv[1] = key; argv[2] = createObject(REDIS_STRING, sdscatprintf(sdsempty(),"%ld",when)); buf = catAppendOnlyGenericCommand(buf, argc, argv); decrRefCount(argv[0]); decrRefCount(argv[2]); return buf; } void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc) { sds buf = sdsempty(); robj *tmpargv[3]; /* The DB this command was targetting is not the same as the last command * we appendend. To issue a SELECT command is needed. */ if (dictid != server.appendseldb) { char seldb[64]; snprintf(seldb,sizeof(seldb),"%d",dictid); buf = sdscatprintf(buf,"*2\r\n$6\r\nSELECT\r\n$%lu\r\n%s\r\n", (unsigned long)strlen(seldb),seldb); server.appendseldb = dictid; } if (cmd->proc == expireCommand) { /* Translate EXPIRE into EXPIREAT */ buf = catAppendOnlyExpireAtCommand(buf,argv[1],argv[2]); } else if (cmd->proc == setexCommand) { /* Translate SETEX to SET and EXPIREAT */ tmpargv[0] = createStringObject("SET",3); tmpargv[1] = argv[1]; tmpargv[2] = argv[3]; buf = catAppendOnlyGenericCommand(buf,3,tmpargv); decrRefCount(tmpargv[0]); buf = catAppendOnlyExpireAtCommand(buf,argv[1],argv[2]); } else { buf = catAppendOnlyGenericCommand(buf,argc,argv); } /* Append to the AOF buffer. This will be flushed on disk just before * of re-entering the event loop, so before the client will get a * positive reply about the operation performed. */ server.aofbuf = sdscatlen(server.aofbuf,buf,sdslen(buf)); /* If a background append only file rewriting is in progress we want to * accumulate the differences between the child DB and the current one * in a buffer, so that when the child process will do its work we * can append the differences to the new append only file. */ if (server.bgrewritechildpid != -1) server.bgrewritebuf = sdscatlen(server.bgrewritebuf,buf,sdslen(buf)); sdsfree(buf); } /* In Redis commands are always executed in the context of a client, so in * order to load the append only file we need to create a fake client. */ struct redisClient *createFakeClient(void) { struct redisClient *c = zmalloc(sizeof(*c)); selectDb(c,0); c->fd = -1; c->querybuf = sdsempty(); c->argc = 0; c->argv = NULL; c->bufpos = 0; c->flags = 0; /* We set the fake client as a slave waiting for the synchronization * so that Redis will not try to send replies to this client. */ c->replstate = REDIS_REPL_WAIT_BGSAVE_START; c->reply = listCreate(); c->watched_keys = listCreate(); listSetFreeMethod(c->reply,decrRefCount); listSetDupMethod(c->reply,dupClientReplyValue); initClientMultiState(c); return c; } void freeFakeClient(struct redisClient *c) { sdsfree(c->querybuf); listRelease(c->reply); listRelease(c->watched_keys); freeClientMultiState(c); zfree(c); } /* Replay the append log file. On error REDIS_OK is returned. On non fatal * error (the append only file is zero-length) REDIS_ERR is returned. On * fatal error an error message is logged and the program exists. */ int loadAppendOnlyFile(char *filename) { struct redisClient *fakeClient; FILE *fp = fopen(filename,"r"); struct redis_stat sb; int appendonly = server.appendonly; long loops = 0; if (redis_fstat(fileno(fp),&sb) != -1 && sb.st_size == 0) return REDIS_ERR; if (fp == NULL) { redisLog(REDIS_WARNING,"Fatal error: can't open the append log file for reading: %s",strerror(errno)); exit(1); } /* Temporarily disable AOF, to prevent EXEC from feeding a MULTI * to the same file we're about to read. */ server.appendonly = 0; fakeClient = createFakeClient(); startLoading(fp); while(1) { int argc, j; unsigned long len; robj **argv; char buf[128]; sds argsds; struct redisCommand *cmd; /* Serve the clients from time to time */ if (!(loops++ % 1000)) { loadingProgress(ftello(fp)); aeProcessEvents(server.el, AE_FILE_EVENTS|AE_DONT_WAIT); } if (fgets(buf,sizeof(buf),fp) == NULL) { if (feof(fp)) break; else goto readerr; } if (buf[0] != '*') goto fmterr; argc = atoi(buf+1); argv = zmalloc(sizeof(robj*)*argc); for (j = 0; j < argc; j++) { if (fgets(buf,sizeof(buf),fp) == NULL) goto readerr; if (buf[0] != '$') goto fmterr; len = strtol(buf+1,NULL,10); argsds = sdsnewlen(NULL,len); if (len && fread(argsds,len,1,fp) == 0) goto fmterr; argv[j] = createObject(REDIS_STRING,argsds); if (fread(buf,2,1,fp) == 0) goto fmterr; /* discard CRLF */ } /* Command lookup */ cmd = lookupCommand(argv[0]->ptr); if (!cmd) { redisLog(REDIS_WARNING,"Unknown command '%s' reading the append only file", argv[0]->ptr); exit(1); } /* Run the command in the context of a fake client */ fakeClient->argc = argc; fakeClient->argv = argv; cmd->proc(fakeClient); /* The fake client should not have a reply */ redisAssert(fakeClient->bufpos == 0 && listLength(fakeClient->reply) == 0); /* Clean up, ready for the next command */ for (j = 0; j < argc; j++) decrRefCount(argv[j]); zfree(argv); } /* This point can only be reached when EOF is reached without errors. * If the client is in the middle of a MULTI/EXEC, log error and quit. */ if (fakeClient->flags & REDIS_MULTI) goto readerr; fclose(fp); freeFakeClient(fakeClient); server.appendonly = appendonly; stopLoading(); return REDIS_OK; readerr: if (feof(fp)) { redisLog(REDIS_WARNING,"Unexpected end of file reading the append only file"); } else { redisLog(REDIS_WARNING,"Unrecoverable error reading the append only file: %s", strerror(errno)); } exit(1); fmterr: redisLog(REDIS_WARNING,"Bad file format reading the append only file: make a backup of your AOF file, then use ./redis-check-aof --fix "); exit(1); } /* Write a sequence of commands able to fully rebuild the dataset into * "filename". Used both by REWRITEAOF and BGREWRITEAOF. */ int rewriteAppendOnlyFile(char *filename) { dictIterator *di = NULL; dictEntry *de; FILE *fp; char tmpfile[256]; int j; time_t now = time(NULL); /* Note that we have to use a different temp name here compared to the * one used by rewriteAppendOnlyFileBackground() function. */ snprintf(tmpfile,256,"temp-rewriteaof-%d.aof", (int) getpid()); fp = fopen(tmpfile,"w"); if (!fp) { redisLog(REDIS_WARNING, "Failed rewriting the append only file: %s", strerror(errno)); return REDIS_ERR; } for (j = 0; j < server.dbnum; j++) { char selectcmd[] = "*2\r\n$6\r\nSELECT\r\n"; redisDb *db = server.db+j; dict *d = db->dict; if (dictSize(d) == 0) continue; di = dictGetIterator(d); if (!di) { fclose(fp); return REDIS_ERR; } /* SELECT the new DB */ if (fwrite(selectcmd,sizeof(selectcmd)-1,1,fp) == 0) goto werr; if (fwriteBulkLongLong(fp,j) == 0) goto werr; /* Iterate this DB writing every entry */ while((de = dictNext(di)) != NULL) { sds keystr = dictGetEntryKey(de); robj key, *o; time_t expiretime; keystr = dictGetEntryKey(de); o = dictGetEntryVal(de); initStaticStringObject(key,keystr); expiretime = getExpire(db,&key); /* Save the key and associated value */ if (o->type == REDIS_STRING) { /* Emit a SET command */ char cmd[]="*3\r\n$3\r\nSET\r\n"; if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr; /* Key and value */ if (fwriteBulkObject(fp,&key) == 0) goto werr; if (fwriteBulkObject(fp,o) == 0) goto werr; } else if (o->type == REDIS_LIST) { /* Emit the RPUSHes needed to rebuild the list */ char cmd[]="*3\r\n$5\r\nRPUSH\r\n"; if (o->encoding == REDIS_ENCODING_ZIPLIST) { unsigned char *zl = o->ptr; unsigned char *p = ziplistIndex(zl,0); unsigned char *vstr; unsigned int vlen; long long vlong; while(ziplistGet(p,&vstr,&vlen,&vlong)) { if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr; if (fwriteBulkObject(fp,&key) == 0) goto werr; if (vstr) { if (fwriteBulkString(fp,(char*)vstr,vlen) == 0) goto werr; } else { if (fwriteBulkLongLong(fp,vlong) == 0) goto werr; } p = ziplistNext(zl,p); } } else if (o->encoding == REDIS_ENCODING_LINKEDLIST) { list *list = o->ptr; listNode *ln; listIter li; listRewind(list,&li); while((ln = listNext(&li))) { robj *eleobj = listNodeValue(ln); if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr; if (fwriteBulkObject(fp,&key) == 0) goto werr; if (fwriteBulkObject(fp,eleobj) == 0) goto werr; } } else { redisPanic("Unknown list encoding"); } } else if (o->type == REDIS_SET) { char cmd[]="*3\r\n$4\r\nSADD\r\n"; /* Emit the SADDs needed to rebuild the set */ if (o->encoding == REDIS_ENCODING_INTSET) { int ii = 0; int64_t llval; while(intsetGet(o->ptr,ii++,&llval)) { if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr; if (fwriteBulkObject(fp,&key) == 0) goto werr; if (fwriteBulkLongLong(fp,llval) == 0) goto werr; } } else if (o->encoding == REDIS_ENCODING_HT) { dictIterator *di = dictGetIterator(o->ptr); dictEntry *de; while((de = dictNext(di)) != NULL) { robj *eleobj = dictGetEntryKey(de); if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr; if (fwriteBulkObject(fp,&key) == 0) goto werr; if (fwriteBulkObject(fp,eleobj) == 0) goto werr; } dictReleaseIterator(di); } else { redisPanic("Unknown set encoding"); } } else if (o->type == REDIS_ZSET) { /* Emit the ZADDs needed to rebuild the sorted set */ zset *zs = o->ptr; dictIterator *di = dictGetIterator(zs->dict); dictEntry *de; while((de = dictNext(di)) != NULL) { char cmd[]="*4\r\n$4\r\nZADD\r\n"; robj *eleobj = dictGetEntryKey(de); double *score = dictGetEntryVal(de); if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr; if (fwriteBulkObject(fp,&key) == 0) goto werr; if (fwriteBulkDouble(fp,*score) == 0) goto werr; if (fwriteBulkObject(fp,eleobj) == 0) goto werr; } dictReleaseIterator(di); } else if (o->type == REDIS_HASH) { char cmd[]="*4\r\n$4\r\nHSET\r\n"; /* Emit the HSETs needed to rebuild the hash */ if (o->encoding == REDIS_ENCODING_ZIPMAP) { unsigned char *p = zipmapRewind(o->ptr); unsigned char *field, *val; unsigned int flen, vlen; while((p = zipmapNext(p,&field,&flen,&val,&vlen)) != NULL) { if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr; if (fwriteBulkObject(fp,&key) == 0) goto werr; if (fwriteBulkString(fp,(char*)field,flen) == 0) goto werr; if (fwriteBulkString(fp,(char*)val,vlen) == 0) goto werr; } } else { dictIterator *di = dictGetIterator(o->ptr); dictEntry *de; while((de = dictNext(di)) != NULL) { robj *field = dictGetEntryKey(de); robj *val = dictGetEntryVal(de); if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr; if (fwriteBulkObject(fp,&key) == 0) goto werr; if (fwriteBulkObject(fp,field) == 0) goto werr; if (fwriteBulkObject(fp,val) == 0) goto werr; } dictReleaseIterator(di); } } else { redisPanic("Unknown object type"); } /* Save the expire time */ if (expiretime != -1) { char cmd[]="*3\r\n$8\r\nEXPIREAT\r\n"; /* If this key is already expired skip it */ if (expiretime < now) continue; if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr; if (fwriteBulkObject(fp,&key) == 0) goto werr; if (fwriteBulkLongLong(fp,expiretime) == 0) goto werr; } } dictReleaseIterator(di); } /* Make sure data will not remain on the OS's output buffers */ fflush(fp); aof_fsync(fileno(fp)); fclose(fp); /* Use RENAME to make sure the DB file is changed atomically only * if the generate DB file is ok. */ if (rename(tmpfile,filename) == -1) { redisLog(REDIS_WARNING,"Error moving temp append only file on the final destination: %s", strerror(errno)); unlink(tmpfile); return REDIS_ERR; } redisLog(REDIS_NOTICE,"SYNC append only file rewrite performed"); return REDIS_OK; werr: fclose(fp); unlink(tmpfile); redisLog(REDIS_WARNING,"Write error writing append only file on disk: %s", strerror(errno)); if (di) dictReleaseIterator(di); return REDIS_ERR; } /* This is how rewriting of the append only file in background works: * * 1) The user calls BGREWRITEAOF * 2) Redis calls this function, that forks(): * 2a) the child rewrite the append only file in a temp file. * 2b) the parent accumulates differences in server.bgrewritebuf. * 3) When the child finished '2a' exists. * 4) The parent will trap the exit code, if it's OK, will append the * data accumulated into server.bgrewritebuf into the temp file, and * finally will rename(2) the temp file in the actual file name. * The the new file is reopened as the new append only file. Profit! */ int rewriteAppendOnlyFileBackground(void) { pid_t childpid; if (server.bgrewritechildpid != -1) return REDIS_ERR; if (server.ds_enabled != 0) { redisLog(REDIS_WARNING,"BGREWRITEAOF called with diskstore enabled: AOF is not supported when diskstore is enabled. Operation not performed."); return REDIS_ERR; } if ((childpid = fork()) == 0) { /* Child */ char tmpfile[256]; if (server.ipfd > 0) close(server.ipfd); if (server.sofd > 0) close(server.sofd); snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) getpid()); if (rewriteAppendOnlyFile(tmpfile) == REDIS_OK) { _exit(0); } else { _exit(1); } } else { /* Parent */ if (childpid == -1) { redisLog(REDIS_WARNING, "Can't rewrite append only file in background: fork: %s", strerror(errno)); return REDIS_ERR; } redisLog(REDIS_NOTICE, "Background append only file rewriting started by pid %d",childpid); server.bgrewritechildpid = childpid; updateDictResizePolicy(); /* We set appendseldb to -1 in order to force the next call to the * feedAppendOnlyFile() to issue a SELECT command, so the differences * accumulated by the parent into server.bgrewritebuf will start * with a SELECT statement and it will be safe to merge. */ server.appendseldb = -1; return REDIS_OK; } return REDIS_OK; /* unreached */ } void bgrewriteaofCommand(redisClient *c) { if (server.bgrewritechildpid != -1) { addReplyError(c,"Background append only file rewriting already in progress"); return; } if (rewriteAppendOnlyFileBackground() == REDIS_OK) { addReplyStatus(c,"Background append only file rewriting started"); } else { addReply(c,shared.err); } } void aofRemoveTempFile(pid_t childpid) { char tmpfile[256]; snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) childpid); unlink(tmpfile); } /* A background append only file rewriting (BGREWRITEAOF) terminated its work. * Handle this. */ void backgroundRewriteDoneHandler(int exitcode, int bysignal) { if (!bysignal && exitcode == 0) { int fd; char tmpfile[256]; redisLog(REDIS_NOTICE, "Background append only file rewriting terminated with success"); /* Now it's time to flush the differences accumulated by the parent */ snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) server.bgrewritechildpid); fd = open(tmpfile,O_WRONLY|O_APPEND); if (fd == -1) { redisLog(REDIS_WARNING, "Not able to open the temp append only file produced by the child: %s", strerror(errno)); goto cleanup; } /* Flush our data... */ if (write(fd,server.bgrewritebuf,sdslen(server.bgrewritebuf)) != (signed) sdslen(server.bgrewritebuf)) { redisLog(REDIS_WARNING, "Error or short write trying to flush the parent diff of the append log file in the child temp file: %s", strerror(errno)); close(fd); goto cleanup; } redisLog(REDIS_NOTICE,"Parent diff flushed into the new append log file with success (%lu bytes)",sdslen(server.bgrewritebuf)); /* Now our work is to rename the temp file into the stable file. And * switch the file descriptor used by the server for append only. */ if (rename(tmpfile,server.appendfilename) == -1) { redisLog(REDIS_WARNING,"Can't rename the temp append only file into the stable one: %s", strerror(errno)); close(fd); goto cleanup; } /* Mission completed... almost */ redisLog(REDIS_NOTICE,"Append only file successfully rewritten."); if (server.appendfd != -1) { /* If append only is actually enabled... */ close(server.appendfd); server.appendfd = fd; if (server.appendfsync != APPENDFSYNC_NO) aof_fsync(fd); server.appendseldb = -1; /* Make sure it will issue SELECT */ redisLog(REDIS_NOTICE,"The new append only file was selected for future appends."); } else { /* If append only is disabled we just generate a dump in this * format. Why not? */ close(fd); } } else if (!bysignal && exitcode != 0) { redisLog(REDIS_WARNING, "Background append only file rewriting error"); } else { redisLog(REDIS_WARNING, "Background append only file rewriting terminated by signal %d", bysignal); } cleanup: sdsfree(server.bgrewritebuf); server.bgrewritebuf = sdsempty(); aofRemoveTempFile(server.bgrewritechildpid); server.bgrewritechildpid = -1; }