Allow an AOF rewrite buffer > 2GB (Fix for issue #504).

During the AOF rewrite process, the parent process needs to accumulate
the new writes in an in-memory buffer: when the child will terminate the
AOF rewriting process this buffer (that ist the difference between the
dataset when the rewrite was started, and the current dataset) is
flushed to the new AOF file.

We used to implement this buffer using an sds.c string, but sds.c has a
2GB limit. Sometimes the dataset can be big enough, the amount of writes
so high, and the rewrite process slow enough that we overflow the 2GB
limit, causing a crash, documented on github by issue #504.

In order to prevent this from happening, this commit introduces a new
system to accumulate writes, implemented by a linked list of blocks of
10 MB each, so that we also avoid paying the reallocation cost.

Note that theoretically modern operating systems may implement realloc()
simply as a remaping of the old pages, thus with very good performances,
see for instance the mremap() syscall on Linux. However this is not
always true, and jemalloc by default avoids doing this because there are
issues with the current implementation of mremap().

For this reason we are using a linked list of blocks instead of a single
block that gets reallocated again and again.

The changes in this commit lacks testing, that will be performed before
merging into the unstable branch. This fix will not enter 2.4 because it
is too invasive. However 2.4 will log a warning when the AOF rewrite
buffer is near to the 2GB limit.
This commit is contained in:
antirez 2012-05-22 13:03:41 +02:00
parent ef37997608
commit 47ca4b6e28
3 changed files with 121 additions and 19 deletions

132
src/aof.c
View File

@ -12,6 +12,115 @@
void aofUpdateCurrentSize(void); void aofUpdateCurrentSize(void);
/* ----------------------------------------------------------------------------
* AOF rewrite buffer implementation.
*
* The following code implement a simple buffer used in order to accumulate
* changes while the background process is rewriting the AOF file.
*
* We only need to append, but can't just use realloc with a large block
* because 'huge' reallocs are not always handled as one could expect
* (via remapping of pages at OS level) but may involve copying data.
*
* For this reason we use a list of blocks, every block is
* AOF_RW_BUF_BLOCK_SIZE bytes.
* ------------------------------------------------------------------------- */
#define AOF_RW_BUF_BLOCK_SIZE (1024*1024*10) /* 10 MB per block */
typedef struct aofrwblock {
unsigned long used, free;
char buf[AOF_RW_BUF_BLOCK_SIZE];
} aofrwblock;
/* This function free the old AOF rewrite buffer if needed, and initialize
* a fresh new one. It tests for server.aof_rewrite_buf_blocks equal to NULL
* so can be used for the first initialization as well. */
void aofRewriteBufferReset(void) {
if (server.aof_rewrite_buf_blocks)
listRelease(server.aof_rewrite_buf_blocks);
server.aof_rewrite_buf_blocks = listCreate();
listSetFreeMethod(server.aof_rewrite_buf_blocks,zfree);
}
/* Return the current size of the AOF rerwite buffer. */
unsigned long aofRewriteBufferSize(void) {
listNode *ln = listLast(server.aof_rewrite_buf_blocks);
aofrwblock *block = ln ? ln->value : NULL;
if (block == NULL) return 0;
unsigned long size =
(listLength(server.aof_rewrite_buf_blocks)-1) * AOF_RW_BUF_BLOCK_SIZE;
size += block->used;
return size;
}
/* Append data to the AOF rewrite buffer, allocating new blocks if needed. */
void aofRewriteBufferAppend(unsigned char *s, unsigned long len) {
listNode *ln = listLast(server.aof_rewrite_buf_blocks);
aofrwblock *block = ln ? ln->value : NULL;
while(len) {
/* If we already got at least an allocated block, try appending
* at least some piece into it. */
if (block) {
unsigned long thislen = (block->free < len) ? block->free : len;
if (thislen) { /* The current block is not already full. */
memcpy(block->buf+block->used, s, thislen);
block->used += thislen;
block->free -= thislen;
s += thislen;
len -= thislen;
}
}
if (len) { /* First block to allocate, or need another block. */
int numblocks;
block = zmalloc(sizeof(*block));
block->free = AOF_RW_BUF_BLOCK_SIZE;
block->used = 0;
listAddNodeTail(server.aof_rewrite_buf_blocks,block);
/* Log every time we cross more 10 or 100 blocks, respectively
* as a notice or warning. */
numblocks = listLength(server.aof_rewrite_buf_blocks);
if (((numblocks+1) % 10) == 0) {
int level = ((numblocks+1) % 100) == 0 ? REDIS_WARNING :
REDIS_NOTICE;
redisLog(level,"Background AOF buffer size: %lu MB",
aofRewriteBufferSize()/(1024*1024));
}
}
}
}
/* Write the buffer (possibly composed of multiple blocks) into the specified
* fd. If no short write or any other error happens -1 is returned,
* otherwise the number of bytes written is returned. */
ssize_t aofRewriteBufferWrite(int fd) {
listNode *ln;
listIter li;
ssize_t count = 0;
listRewind(server.aof_rewrite_buf_blocks,&li);
while((ln = listNext(&li))) {
aofrwblock *block = listNodeValue(ln);
ssize_t nwritten;
if (block->used) {
nwritten = write(fd,block->buf,block->used);
if (nwritten != block->used) {
if (nwritten == 0) errno = EIO;
return -1;
}
count += nwritten;
}
}
return count;
}
/* ---------------------------------------------------------------------------- /* ----------------------------------------------------------------------------
* AOF file implementation * AOF file implementation
* ------------------------------------------------------------------------- */ * ------------------------------------------------------------------------- */
@ -42,8 +151,7 @@ void stopAppendOnly(void) {
if (kill(server.aof_child_pid,SIGKILL) != -1) if (kill(server.aof_child_pid,SIGKILL) != -1)
wait3(&statloc,0,NULL); wait3(&statloc,0,NULL);
/* reset the buffer accumulating changes while the child saves */ /* reset the buffer accumulating changes while the child saves */
sdsfree(server.aof_rewrite_buf); aofRewriteBufferReset();
server.aof_rewrite_buf = sdsempty();
aofRemoveTempFile(server.aof_child_pid); aofRemoveTempFile(server.aof_child_pid);
server.aof_child_pid = -1; server.aof_child_pid = -1;
} }
@ -281,7 +389,7 @@ void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int a
* in a buffer, so that when the child process will do its work we * in a buffer, so that when the child process will do its work we
* can append the differences to the new append only file. */ * can append the differences to the new append only file. */
if (server.aof_child_pid != -1) if (server.aof_child_pid != -1)
server.aof_rewrite_buf = sdscatlen(server.aof_rewrite_buf,buf,sdslen(buf)); aofRewriteBufferAppend((unsigned char*)buf,sdslen(buf));
sdsfree(buf); sdsfree(buf);
} }
@ -885,7 +993,6 @@ void aofUpdateCurrentSize(void) {
void backgroundRewriteDoneHandler(int exitcode, int bysignal) { void backgroundRewriteDoneHandler(int exitcode, int bysignal) {
if (!bysignal && exitcode == 0) { if (!bysignal && exitcode == 0) {
int newfd, oldfd; int newfd, oldfd;
int nwritten;
char tmpfile[256]; char tmpfile[256];
long long now = ustime(); long long now = ustime();
@ -903,21 +1010,15 @@ void backgroundRewriteDoneHandler(int exitcode, int bysignal) {
goto cleanup; goto cleanup;
} }
nwritten = write(newfd,server.aof_rewrite_buf,sdslen(server.aof_rewrite_buf)); if (aofRewriteBufferWrite(newfd) == -1) {
if (nwritten != (signed)sdslen(server.aof_rewrite_buf)) { redisLog(REDIS_WARNING,
if (nwritten == -1) { "Error trying to flush the parent diff to the rewritten AOF: %s", strerror(errno));
redisLog(REDIS_WARNING,
"Error trying to flush the parent diff to the rewritten AOF: %s", strerror(errno));
} else {
redisLog(REDIS_WARNING,
"Short write trying to flush the parent diff to the rewritten AOF: %s", strerror(errno));
}
close(newfd); close(newfd);
goto cleanup; goto cleanup;
} }
redisLog(REDIS_NOTICE, redisLog(REDIS_NOTICE,
"Parent diff successfully flushed to the rewritten AOF (%lu bytes)", nwritten); "Parent diff successfully flushed to the rewritten AOF (%lu bytes)", aofRewriteBufferSize());
/* The only remaining thing to do is to rename the temporary file to /* The only remaining thing to do is to rename the temporary file to
* the configured file and switch the file descriptor used to do AOF * the configured file and switch the file descriptor used to do AOF
@ -1009,8 +1110,7 @@ void backgroundRewriteDoneHandler(int exitcode, int bysignal) {
} }
cleanup: cleanup:
sdsfree(server.aof_rewrite_buf); aofRewriteBufferReset();
server.aof_rewrite_buf = sdsempty();
aofRemoveTempFile(server.aof_child_pid); aofRemoveTempFile(server.aof_child_pid);
server.aof_child_pid = -1; server.aof_child_pid = -1;
/* Schedule a new rewrite if we are waiting for it to switch the AOF ON. */ /* Schedule a new rewrite if we are waiting for it to switch the AOF ON. */

View File

@ -1299,7 +1299,7 @@ void initServer() {
server.cronloops = 0; server.cronloops = 0;
server.rdb_child_pid = -1; server.rdb_child_pid = -1;
server.aof_child_pid = -1; server.aof_child_pid = -1;
server.aof_rewrite_buf = sdsempty(); aofRewriteBufferReset();
server.aof_buf = sdsempty(); server.aof_buf = sdsempty();
server.lastsave = time(NULL); server.lastsave = time(NULL);
server.dirty = 0; server.dirty = 0;
@ -2187,7 +2187,7 @@ int freeMemoryIfNeeded(void) {
} }
if (server.aof_state != REDIS_AOF_OFF) { if (server.aof_state != REDIS_AOF_OFF) {
mem_used -= sdslen(server.aof_buf); mem_used -= sdslen(server.aof_buf);
mem_used -= sdslen(server.aof_rewrite_buf); mem_used -= aofRewriteBufferSize();
} }
/* Check if we are over the memory limit. */ /* Check if we are over the memory limit. */

View File

@ -634,7 +634,7 @@ struct redisServer {
off_t aof_current_size; /* AOF current size. */ off_t aof_current_size; /* AOF current size. */
int aof_rewrite_scheduled; /* Rewrite once BGSAVE terminates. */ int aof_rewrite_scheduled; /* Rewrite once BGSAVE terminates. */
pid_t aof_child_pid; /* PID if rewriting process */ pid_t aof_child_pid; /* PID if rewriting process */
sds aof_rewrite_buf; /* buffer taken by parent during oppend only rewrite */ list *aof_rewrite_buf_blocks; /* Hold changes during an AOF rewrite. */
sds aof_buf; /* AOF buffer, written before entering the event loop */ sds aof_buf; /* AOF buffer, written before entering the event loop */
int aof_fd; /* File descriptor of currently selected AOF file */ int aof_fd; /* File descriptor of currently selected AOF file */
int aof_selected_db; /* Currently selected DB in AOF */ int aof_selected_db; /* Currently selected DB in AOF */
@ -977,6 +977,8 @@ int loadAppendOnlyFile(char *filename);
void stopAppendOnly(void); void stopAppendOnly(void);
int startAppendOnly(void); int startAppendOnly(void);
void backgroundRewriteDoneHandler(int exitcode, int bysignal); void backgroundRewriteDoneHandler(int exitcode, int bysignal);
void aofRewriteBufferReset(void);
unsigned long aofRewriteBufferSize(void);
/* Sorted sets data type */ /* Sorted sets data type */