From 47ca4b6e28af49d1904f40fceacf58bb2907fbf2 Mon Sep 17 00:00:00 2001 From: antirez Date: Tue, 22 May 2012 13:03:41 +0200 Subject: [PATCH] Allow an AOF rewrite buffer > 2GB (Fix for issue #504). During the AOF rewrite process, the parent process needs to accumulate the new writes in an in-memory buffer: when the child will terminate the AOF rewriting process this buffer (that ist the difference between the dataset when the rewrite was started, and the current dataset) is flushed to the new AOF file. We used to implement this buffer using an sds.c string, but sds.c has a 2GB limit. Sometimes the dataset can be big enough, the amount of writes so high, and the rewrite process slow enough that we overflow the 2GB limit, causing a crash, documented on github by issue #504. In order to prevent this from happening, this commit introduces a new system to accumulate writes, implemented by a linked list of blocks of 10 MB each, so that we also avoid paying the reallocation cost. Note that theoretically modern operating systems may implement realloc() simply as a remaping of the old pages, thus with very good performances, see for instance the mremap() syscall on Linux. However this is not always true, and jemalloc by default avoids doing this because there are issues with the current implementation of mremap(). For this reason we are using a linked list of blocks instead of a single block that gets reallocated again and again. The changes in this commit lacks testing, that will be performed before merging into the unstable branch. This fix will not enter 2.4 because it is too invasive. However 2.4 will log a warning when the AOF rewrite buffer is near to the 2GB limit. --- src/aof.c | 132 +++++++++++++++++++++++++++++++++++++++++++++------- src/redis.c | 4 +- src/redis.h | 4 +- 3 files changed, 121 insertions(+), 19 deletions(-) diff --git a/src/aof.c b/src/aof.c index 59b5ab89f..607bdce82 100644 --- a/src/aof.c +++ b/src/aof.c @@ -12,6 +12,115 @@ void aofUpdateCurrentSize(void); +/* ---------------------------------------------------------------------------- + * AOF rewrite buffer implementation. + * + * The following code implement a simple buffer used in order to accumulate + * changes while the background process is rewriting the AOF file. + * + * We only need to append, but can't just use realloc with a large block + * because 'huge' reallocs are not always handled as one could expect + * (via remapping of pages at OS level) but may involve copying data. + * + * For this reason we use a list of blocks, every block is + * AOF_RW_BUF_BLOCK_SIZE bytes. + * ------------------------------------------------------------------------- */ + +#define AOF_RW_BUF_BLOCK_SIZE (1024*1024*10) /* 10 MB per block */ + +typedef struct aofrwblock { + unsigned long used, free; + char buf[AOF_RW_BUF_BLOCK_SIZE]; +} aofrwblock; + +/* This function free the old AOF rewrite buffer if needed, and initialize + * a fresh new one. It tests for server.aof_rewrite_buf_blocks equal to NULL + * so can be used for the first initialization as well. */ +void aofRewriteBufferReset(void) { + if (server.aof_rewrite_buf_blocks) + listRelease(server.aof_rewrite_buf_blocks); + + server.aof_rewrite_buf_blocks = listCreate(); + listSetFreeMethod(server.aof_rewrite_buf_blocks,zfree); +} + +/* Return the current size of the AOF rerwite buffer. */ +unsigned long aofRewriteBufferSize(void) { + listNode *ln = listLast(server.aof_rewrite_buf_blocks); + aofrwblock *block = ln ? ln->value : NULL; + + if (block == NULL) return 0; + unsigned long size = + (listLength(server.aof_rewrite_buf_blocks)-1) * AOF_RW_BUF_BLOCK_SIZE; + size += block->used; + return size; +} + +/* Append data to the AOF rewrite buffer, allocating new blocks if needed. */ +void aofRewriteBufferAppend(unsigned char *s, unsigned long len) { + listNode *ln = listLast(server.aof_rewrite_buf_blocks); + aofrwblock *block = ln ? ln->value : NULL; + + while(len) { + /* If we already got at least an allocated block, try appending + * at least some piece into it. */ + if (block) { + unsigned long thislen = (block->free < len) ? block->free : len; + if (thislen) { /* The current block is not already full. */ + memcpy(block->buf+block->used, s, thislen); + block->used += thislen; + block->free -= thislen; + s += thislen; + len -= thislen; + } + } + + if (len) { /* First block to allocate, or need another block. */ + int numblocks; + + block = zmalloc(sizeof(*block)); + block->free = AOF_RW_BUF_BLOCK_SIZE; + block->used = 0; + listAddNodeTail(server.aof_rewrite_buf_blocks,block); + + /* Log every time we cross more 10 or 100 blocks, respectively + * as a notice or warning. */ + numblocks = listLength(server.aof_rewrite_buf_blocks); + if (((numblocks+1) % 10) == 0) { + int level = ((numblocks+1) % 100) == 0 ? REDIS_WARNING : + REDIS_NOTICE; + redisLog(level,"Background AOF buffer size: %lu MB", + aofRewriteBufferSize()/(1024*1024)); + } + } + } +} + +/* Write the buffer (possibly composed of multiple blocks) into the specified + * fd. If no short write or any other error happens -1 is returned, + * otherwise the number of bytes written is returned. */ +ssize_t aofRewriteBufferWrite(int fd) { + listNode *ln; + listIter li; + ssize_t count = 0; + + listRewind(server.aof_rewrite_buf_blocks,&li); + while((ln = listNext(&li))) { + aofrwblock *block = listNodeValue(ln); + ssize_t nwritten; + + if (block->used) { + nwritten = write(fd,block->buf,block->used); + if (nwritten != block->used) { + if (nwritten == 0) errno = EIO; + return -1; + } + count += nwritten; + } + } + return count; +} + /* ---------------------------------------------------------------------------- * AOF file implementation * ------------------------------------------------------------------------- */ @@ -42,8 +151,7 @@ void stopAppendOnly(void) { if (kill(server.aof_child_pid,SIGKILL) != -1) wait3(&statloc,0,NULL); /* reset the buffer accumulating changes while the child saves */ - sdsfree(server.aof_rewrite_buf); - server.aof_rewrite_buf = sdsempty(); + aofRewriteBufferReset(); aofRemoveTempFile(server.aof_child_pid); server.aof_child_pid = -1; } @@ -281,7 +389,7 @@ void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int a * in a buffer, so that when the child process will do its work we * can append the differences to the new append only file. */ if (server.aof_child_pid != -1) - server.aof_rewrite_buf = sdscatlen(server.aof_rewrite_buf,buf,sdslen(buf)); + aofRewriteBufferAppend((unsigned char*)buf,sdslen(buf)); sdsfree(buf); } @@ -885,7 +993,6 @@ void aofUpdateCurrentSize(void) { void backgroundRewriteDoneHandler(int exitcode, int bysignal) { if (!bysignal && exitcode == 0) { int newfd, oldfd; - int nwritten; char tmpfile[256]; long long now = ustime(); @@ -903,21 +1010,15 @@ void backgroundRewriteDoneHandler(int exitcode, int bysignal) { goto cleanup; } - nwritten = write(newfd,server.aof_rewrite_buf,sdslen(server.aof_rewrite_buf)); - if (nwritten != (signed)sdslen(server.aof_rewrite_buf)) { - if (nwritten == -1) { - redisLog(REDIS_WARNING, - "Error trying to flush the parent diff to the rewritten AOF: %s", strerror(errno)); - } else { - redisLog(REDIS_WARNING, - "Short write trying to flush the parent diff to the rewritten AOF: %s", strerror(errno)); - } + if (aofRewriteBufferWrite(newfd) == -1) { + redisLog(REDIS_WARNING, + "Error trying to flush the parent diff to the rewritten AOF: %s", strerror(errno)); close(newfd); goto cleanup; } redisLog(REDIS_NOTICE, - "Parent diff successfully flushed to the rewritten AOF (%lu bytes)", nwritten); + "Parent diff successfully flushed to the rewritten AOF (%lu bytes)", aofRewriteBufferSize()); /* The only remaining thing to do is to rename the temporary file to * the configured file and switch the file descriptor used to do AOF @@ -1009,8 +1110,7 @@ void backgroundRewriteDoneHandler(int exitcode, int bysignal) { } cleanup: - sdsfree(server.aof_rewrite_buf); - server.aof_rewrite_buf = sdsempty(); + aofRewriteBufferReset(); aofRemoveTempFile(server.aof_child_pid); server.aof_child_pid = -1; /* Schedule a new rewrite if we are waiting for it to switch the AOF ON. */ diff --git a/src/redis.c b/src/redis.c index 72a2894c4..17da90fff 100644 --- a/src/redis.c +++ b/src/redis.c @@ -1299,7 +1299,7 @@ void initServer() { server.cronloops = 0; server.rdb_child_pid = -1; server.aof_child_pid = -1; - server.aof_rewrite_buf = sdsempty(); + aofRewriteBufferReset(); server.aof_buf = sdsempty(); server.lastsave = time(NULL); server.dirty = 0; @@ -2187,7 +2187,7 @@ int freeMemoryIfNeeded(void) { } if (server.aof_state != REDIS_AOF_OFF) { mem_used -= sdslen(server.aof_buf); - mem_used -= sdslen(server.aof_rewrite_buf); + mem_used -= aofRewriteBufferSize(); } /* Check if we are over the memory limit. */ diff --git a/src/redis.h b/src/redis.h index e279aab48..49b689f8f 100644 --- a/src/redis.h +++ b/src/redis.h @@ -634,7 +634,7 @@ struct redisServer { off_t aof_current_size; /* AOF current size. */ int aof_rewrite_scheduled; /* Rewrite once BGSAVE terminates. */ pid_t aof_child_pid; /* PID if rewriting process */ - sds aof_rewrite_buf; /* buffer taken by parent during oppend only rewrite */ + list *aof_rewrite_buf_blocks; /* Hold changes during an AOF rewrite. */ sds aof_buf; /* AOF buffer, written before entering the event loop */ int aof_fd; /* File descriptor of currently selected AOF file */ int aof_selected_db; /* Currently selected DB in AOF */ @@ -977,6 +977,8 @@ int loadAppendOnlyFile(char *filename); void stopAppendOnly(void); int startAppendOnly(void); void backgroundRewriteDoneHandler(int exitcode, int bysignal); +void aofRewriteBufferReset(void); +unsigned long aofRewriteBufferSize(void); /* Sorted sets data type */