mirror of
https://codeberg.org/redict/redict.git
synced 2025-01-22 08:08:53 -05:00
Reclaim page cache of RDB file (#11248)
# Background The RDB file is usually generated and used once and seldom used again, but the content would reside in page cache until OS evicts it. A potential problem is that once the free memory exhausts, the OS have to reclaim some memory from page cache or swap anonymous page out, which may result in a jitters to the Redis service. Supposing an exact scenario, a high-capacity machine hosts many redis instances, and we're upgrading the Redis together. The page cache in host machine increases as RDBs are generated. Once the free memory drop into low watermark(which is more likely to happen in older Linux kernel like 3.10, before [watermark_scale_factor](https://lore.kernel.org/lkml/1455813719-2395-1-git-send-email-hannes@cmpxchg.org/) is introduced, the `low watermark` is linear to `min watermark`, and there'is not too much buffer space for `kswapd` to be wake up to reclaim memory), a `direct reclaim` happens, which means the process would stall to wait for memory allocation. # What the PR does The PR introduces a capability to reclaim the cache when the RDB is operated. Generally there're two cases, read and write the RDB. For read it's a little messy to address the incremental reclaim, so the reclaim is done in one go in background after the load is finished to avoid blocking the work thread. For write, incremental reclaim amortizes the work of reclaim so no need to put it into background, and the peak watermark of cache can be reduced in this way. Two cases are addresses specially, replication and restart, for both of which the cache is leveraged to speed up the processing, so the reclaim is postponed to a right time. To do this, a flag is added to`rdbSave` and `rdbLoad` to control whether the cache need to be kept, with the default value false. # Something deserve noting 1. Though `posix_fadvise` is the POSIX standard, but only few platform support it, e.g. Linux, FreeBSD 10.0. 2. In Linux `posix_fadvise` only take effect on writeback-ed pages, so a `sync`(or `fsync`, `fdatasync`) is needed to flush the dirty page before `posix_fadvise` if we reclaim write cache. # About test A unit test is added to verify the effect of `posix_fadvise`. In integration test overall cache increase is checked, as well as the cache backed by RDB as a specific TCL test is executed in isolated Github action job.
This commit is contained in:
parent
5c3938d5cc
commit
7dae142a2e
72
.github/workflows/daily.yml
vendored
72
.github/workflows/daily.yml
vendored
@ -11,7 +11,7 @@ on:
|
||||
inputs:
|
||||
skipjobs:
|
||||
description: 'jobs to skip (delete the ones you wanna keep, do not leave empty)'
|
||||
default: 'valgrind,sanitizer,tls,freebsd,macos,alpine,32bit,iothreads,ubuntu,centos,malloc'
|
||||
default: 'valgrind,sanitizer,tls,freebsd,macos,alpine,32bit,iothreads,ubuntu,centos,malloc,specific'
|
||||
skiptests:
|
||||
description: 'tests to skip (delete the ones you wanna keep, do not leave empty)'
|
||||
default: 'redis,modules,sentinel,cluster,unittest'
|
||||
@ -282,6 +282,76 @@ jobs:
|
||||
if: true && !contains(github.event.inputs.skiptests, 'cluster')
|
||||
run: ./runtest-cluster --config io-threads 4 --config io-threads-do-reads yes ${{github.event.inputs.cluster_test_args}}
|
||||
|
||||
test-ubuntu-reclaim-cache:
|
||||
runs-on: ubuntu-latest
|
||||
if: |
|
||||
(github.event_name == 'workflow_dispatch' || (github.event_name != 'workflow_dispatch' && github.repository == 'redis/redis')) &&
|
||||
!contains(github.event.inputs.skipjobs, 'specific')
|
||||
timeout-minutes: 14400
|
||||
steps:
|
||||
- name: prep
|
||||
if: github.event_name == 'workflow_dispatch'
|
||||
run: |
|
||||
echo "GITHUB_REPOSITORY=${{github.event.inputs.use_repo}}" >> $GITHUB_ENV
|
||||
echo "GITHUB_HEAD_REF=${{github.event.inputs.use_git_ref}}" >> $GITHUB_ENV
|
||||
- uses: actions/checkout@v3
|
||||
with:
|
||||
repository: ${{ env.GITHUB_REPOSITORY }}
|
||||
ref: ${{ env.GITHUB_HEAD_REF }}
|
||||
- name: make
|
||||
run: |
|
||||
make REDIS_CFLAGS='-Werror'
|
||||
- name: testprep
|
||||
run: |
|
||||
sudo apt-get install vmtouch
|
||||
mkdir /tmp/master
|
||||
mkdir /tmp/slave
|
||||
- name: warm up
|
||||
run: |
|
||||
./src/redis-server --daemonize yes --logfile /dev/null
|
||||
./src/redis-benchmark -n 1 > /dev/null
|
||||
./src/redis-cli save | grep OK > /dev/null
|
||||
vmtouch -v ./dump.rdb > /dev/null
|
||||
- name: test
|
||||
run: |
|
||||
echo "test SAVE doesn't increase cache"
|
||||
CACHE0=$(grep -w file /sys/fs/cgroup/memory.stat | awk '{print $2}')
|
||||
./src/redis-server --daemonize yes --logfile /dev/null --dir /tmp/master --port 8080 --repl-diskless-sync no --pidfile /tmp/master/redis.pid
|
||||
./src/redis-server --daemonize yes --logfile /dev/null --dir /tmp/slave --port 8081 --repl-diskless-load disabled
|
||||
./src/redis-benchmark -p 8080 -d 102400 -t set -r 100000 -n 10000 > /dev/null
|
||||
./src/redis-cli -p 8080 save > /dev/null
|
||||
VMOUT=$(vmtouch -v /tmp/master/dump.rdb)
|
||||
echo $VMOUT
|
||||
grep -q "0%" <<< $VMOUT
|
||||
CACHE=$(grep -w file /sys/fs/cgroup/memory.stat | awk '{print $2}')
|
||||
if [ "$(( $CACHE-$CACHE0 ))" -gt "500000" ]; then echo "$CACHE0 $CACHE"; exit 1; fi
|
||||
|
||||
echo "test replication doesn't increase cache"
|
||||
./src/redis-cli -p 8081 REPLICAOF 127.0.0.1 8080 > /dev/null
|
||||
while [ $(./src/redis-cli -p 8081 info replication | grep "master_link_status:down") ]; do sleep 1; done;
|
||||
sleep 1 # wait for the completion of cache reclaim bio
|
||||
VMOUT=$(vmtouch -v /tmp/master/dump.rdb)
|
||||
echo $VMOUT
|
||||
grep -q "0%" <<< $VMOUT
|
||||
VMOUT=$(vmtouch -v /tmp/slave/dump.rdb)
|
||||
echo $VMOUT
|
||||
grep -q "0%" <<< $VMOUT
|
||||
CACHE=$(grep -w file /sys/fs/cgroup/memory.stat | awk '{print $2}')
|
||||
if [ "$(( $CACHE-$CACHE0 ))" -gt "500000" ]; then echo "$CACHE0 $CACHE"; exit 1; fi
|
||||
|
||||
echo "test reboot doesn't increase cache"
|
||||
PID=$(cat /tmp/master/redis.pid)
|
||||
kill -15 $PID
|
||||
while [ -x /proc/${PID} ]; do sleep 1; done
|
||||
./src/redis-server --daemonize yes --logfile /dev/null --dir /tmp/master --port 8080
|
||||
while [ $(./src/redis-cli -p 8080 info persistence | grep "loading:1") ]; do sleep 1; done;
|
||||
sleep 1 # wait for the completion of cache reclaim bio
|
||||
VMOUT=$(vmtouch -v /tmp/master/dump.rdb)
|
||||
echo $VMOUT
|
||||
grep -q "0%" <<< $VMOUT
|
||||
CACHE=$(grep -w file /sys/fs/cgroup/memory.stat | awk '{print $2}')
|
||||
if [ "$(( $CACHE-$CACHE0 ))" -gt "500000" ]; then echo "$CACHE0 $CACHE"; exit 1; fi
|
||||
|
||||
test-valgrind-test:
|
||||
runs-on: ubuntu-latest
|
||||
if: |
|
||||
|
10
src/aof.c
10
src/aof.c
@ -925,7 +925,7 @@ void aof_background_fsync(int fd) {
|
||||
|
||||
/* Close the fd on the basis of aof_background_fsync. */
|
||||
void aof_background_fsync_and_close(int fd) {
|
||||
bioCreateCloseJob(fd, 1);
|
||||
bioCreateCloseJob(fd, 1, 1);
|
||||
}
|
||||
|
||||
/* Kills an AOFRW child process if exists */
|
||||
@ -2342,8 +2342,10 @@ int rewriteAppendOnlyFile(char *filename) {
|
||||
|
||||
rioInitWithFile(&aof,fp);
|
||||
|
||||
if (server.aof_rewrite_incremental_fsync)
|
||||
if (server.aof_rewrite_incremental_fsync) {
|
||||
rioSetAutoSync(&aof,REDIS_AUTOSYNC_BYTES);
|
||||
rioSetReclaimCache(&aof,1);
|
||||
}
|
||||
|
||||
startSaving(RDBFLAGS_AOF_PREAMBLE);
|
||||
|
||||
@ -2360,6 +2362,10 @@ int rewriteAppendOnlyFile(char *filename) {
|
||||
/* Make sure data will not remain on the OS's output buffers */
|
||||
if (fflush(fp)) goto werr;
|
||||
if (fsync(fileno(fp))) goto werr;
|
||||
if (reclaimFilePageCache(fileno(fp), 0, 0) == -1) {
|
||||
/* A minor error. Just log to know what happens */
|
||||
serverLog(LL_NOTICE,"Unable to reclaim page cache: %s", strerror(errno));
|
||||
}
|
||||
if (fclose(fp)) { fp = NULL; goto werr; }
|
||||
fp = NULL;
|
||||
|
||||
|
10
src/bio.c
10
src/bio.c
@ -74,6 +74,8 @@ typedef union bio_job {
|
||||
int fd; /* Fd for file based background jobs */
|
||||
unsigned need_fsync:1; /* A flag to indicate that a fsync is required before
|
||||
* the file is closed. */
|
||||
unsigned need_reclaim_cache:1; /* A flag to indicate that reclaim cache is required before
|
||||
* the file is closed. */
|
||||
} fd_args;
|
||||
|
||||
struct {
|
||||
@ -144,10 +146,11 @@ void bioCreateLazyFreeJob(lazy_free_fn free_fn, int arg_count, ...) {
|
||||
bioSubmitJob(BIO_LAZY_FREE, job);
|
||||
}
|
||||
|
||||
void bioCreateCloseJob(int fd, int need_fsync) {
|
||||
void bioCreateCloseJob(int fd, int need_fsync, int need_reclaim_cache) {
|
||||
bio_job *job = zmalloc(sizeof(*job));
|
||||
job->fd_args.fd = fd;
|
||||
job->fd_args.need_fsync = need_fsync;
|
||||
job->fd_args.need_reclaim_cache = need_reclaim_cache;
|
||||
|
||||
bioSubmitJob(BIO_CLOSE_FILE, job);
|
||||
}
|
||||
@ -216,6 +219,11 @@ void *bioProcessBackgroundJobs(void *arg) {
|
||||
if (job->fd_args.need_fsync) {
|
||||
redis_fsync(job->fd_args.fd);
|
||||
}
|
||||
if (job->fd_args.need_reclaim_cache) {
|
||||
if (reclaimFilePageCache(job->fd_args.fd, 0, 0) == -1) {
|
||||
serverLog(LL_NOTICE,"Unable to reclaim page cache: %s", strerror(errno));
|
||||
}
|
||||
}
|
||||
close(job->fd_args.fd);
|
||||
} else if (type == BIO_AOF_FSYNC) {
|
||||
/* The fd may be closed by main thread and reused for another
|
||||
|
@ -36,7 +36,7 @@ typedef void lazy_free_fn(void *args[]);
|
||||
void bioInit(void);
|
||||
unsigned long bioPendingJobsOfType(int type);
|
||||
void bioKillThreads(void);
|
||||
void bioCreateCloseJob(int fd, int need_fsync);
|
||||
void bioCreateCloseJob(int fd, int need_fsync, int need_reclaim_cache);
|
||||
void bioCreateFsyncJob(int fd);
|
||||
void bioCreateLazyFreeJob(lazy_free_fn free_fn, int arg_count, ...);
|
||||
|
||||
|
@ -319,4 +319,9 @@ char *strcpy(char *restrict dest, const char *src) __attribute__((deprecated("pl
|
||||
char *strcat(char *restrict dest, const char *restrict src) __attribute__((deprecated("please avoid use of unsafe C functions. prefer use of redis_strlcat instead")));
|
||||
#endif
|
||||
|
||||
/* Test for posix_fadvise() */
|
||||
#if defined(__linux__) || __FreeBSD__ >= 10
|
||||
#define HAVE_FADVISE
|
||||
#endif
|
||||
|
||||
#endif
|
||||
|
2
src/db.c
2
src/db.c
@ -640,7 +640,7 @@ void flushAllDataAndResetRDB(int flags) {
|
||||
if (server.saveparamslen > 0) {
|
||||
rdbSaveInfo rsi, *rsiptr;
|
||||
rsiptr = rdbPopulateSaveInfo(&rsi);
|
||||
rdbSave(SLAVE_REQ_NONE,server.rdb_filename,rsiptr);
|
||||
rdbSave(SLAVE_REQ_NONE,server.rdb_filename,rsiptr,RDBFLAGS_NONE);
|
||||
}
|
||||
|
||||
#if defined(USE_JEMALLOC)
|
||||
|
@ -557,7 +557,7 @@ NULL
|
||||
if (save) {
|
||||
rdbSaveInfo rsi, *rsiptr;
|
||||
rsiptr = rdbPopulateSaveInfo(&rsi);
|
||||
if (rdbSave(SLAVE_REQ_NONE,server.rdb_filename,rsiptr) != C_OK) {
|
||||
if (rdbSave(SLAVE_REQ_NONE,server.rdb_filename,rsiptr,RDBFLAGS_NONE) != C_OK) {
|
||||
addReplyErrorObject(c,shared.err);
|
||||
return;
|
||||
}
|
||||
|
27
src/rdb.c
27
src/rdb.c
@ -35,6 +35,7 @@
|
||||
#include "stream.h"
|
||||
#include "functions.h"
|
||||
#include "intset.h" /* Compact integer set structure */
|
||||
#include "bio.h"
|
||||
|
||||
#include <math.h>
|
||||
#include <fcntl.h>
|
||||
@ -1437,7 +1438,7 @@ werr: /* Write error. */
|
||||
}
|
||||
|
||||
/* Save the DB on disk. Return C_ERR on error, C_OK on success. */
|
||||
int rdbSave(int req, char *filename, rdbSaveInfo *rsi) {
|
||||
int rdbSave(int req, char *filename, rdbSaveInfo *rsi, int rdbflags) {
|
||||
char tmpfile[256];
|
||||
char cwd[MAXPATHLEN]; /* Current working dir path for error messages. */
|
||||
FILE *fp = NULL;
|
||||
@ -1462,10 +1463,12 @@ int rdbSave(int req, char *filename, rdbSaveInfo *rsi) {
|
||||
rioInitWithFile(&rdb,fp);
|
||||
startSaving(RDBFLAGS_NONE);
|
||||
|
||||
if (server.rdb_save_incremental_fsync)
|
||||
if (server.rdb_save_incremental_fsync) {
|
||||
rioSetAutoSync(&rdb,REDIS_AUTOSYNC_BYTES);
|
||||
if (!(rdbflags & RDBFLAGS_KEEP_CACHE)) rioSetReclaimCache(&rdb,1);
|
||||
}
|
||||
|
||||
if (rdbSaveRio(req,&rdb,&error,RDBFLAGS_NONE,rsi) == C_ERR) {
|
||||
if (rdbSaveRio(req,&rdb,&error,rdbflags,rsi) == C_ERR) {
|
||||
errno = error;
|
||||
err_op = "rdbSaveRio";
|
||||
goto werr;
|
||||
@ -1474,6 +1477,9 @@ int rdbSave(int req, char *filename, rdbSaveInfo *rsi) {
|
||||
/* Make sure data will not remain on the OS's output buffers */
|
||||
if (fflush(fp)) { err_op = "fflush"; goto werr; }
|
||||
if (fsync(fileno(fp))) { err_op = "fsync"; goto werr; }
|
||||
if (!(rdbflags & RDBFLAGS_KEEP_CACHE) && reclaimFilePageCache(fileno(fp), 0, 0) == -1) {
|
||||
serverLog(LL_NOTICE,"Unable to reclaim cache after saving RDB: %s", strerror(errno));
|
||||
}
|
||||
if (fclose(fp)) { fp = NULL; err_op = "fclose"; goto werr; }
|
||||
fp = NULL;
|
||||
|
||||
@ -1510,7 +1516,7 @@ werr:
|
||||
return C_ERR;
|
||||
}
|
||||
|
||||
int rdbSaveBackground(int req, char *filename, rdbSaveInfo *rsi) {
|
||||
int rdbSaveBackground(int req, char *filename, rdbSaveInfo *rsi, int rdbflags) {
|
||||
pid_t childpid;
|
||||
|
||||
if (hasActiveChildProcess()) return C_ERR;
|
||||
@ -1525,7 +1531,7 @@ int rdbSaveBackground(int req, char *filename, rdbSaveInfo *rsi) {
|
||||
/* Child */
|
||||
redisSetProcTitle("redis-rdb-bgsave");
|
||||
redisSetCpuAffinity(server.bgsave_cpulist);
|
||||
retval = rdbSave(req, filename,rsi);
|
||||
retval = rdbSave(req, filename,rsi,rdbflags);
|
||||
if (retval == C_OK) {
|
||||
sendChildCowInfo(CHILD_INFO_TYPE_RDB_COW_SIZE, "RDB");
|
||||
}
|
||||
@ -3332,6 +3338,7 @@ int rdbLoad(char *filename, rdbSaveInfo *rsi, int rdbflags) {
|
||||
rio rdb;
|
||||
int retval;
|
||||
struct stat sb;
|
||||
int rdb_fd;
|
||||
|
||||
fp = fopen(filename, "r");
|
||||
if (fp == NULL) {
|
||||
@ -3351,6 +3358,12 @@ int rdbLoad(char *filename, rdbSaveInfo *rsi, int rdbflags) {
|
||||
|
||||
fclose(fp);
|
||||
stopLoading(retval==C_OK);
|
||||
/* Reclaim the cache backed by rdb */
|
||||
if (retval == C_OK && !(rdbflags & RDBFLAGS_KEEP_CACHE)) {
|
||||
/* TODO: maybe we could combine the fopen and open into one in the future */
|
||||
rdb_fd = open(server.rdb_filename, O_RDONLY);
|
||||
if (rdb_fd > 0) bioCreateCloseJob(rdb_fd, 0, 1);
|
||||
}
|
||||
return (retval==C_OK) ? RDB_OK : RDB_FAILED;
|
||||
}
|
||||
|
||||
@ -3575,7 +3588,7 @@ void saveCommand(client *c) {
|
||||
|
||||
rdbSaveInfo rsi, *rsiptr;
|
||||
rsiptr = rdbPopulateSaveInfo(&rsi);
|
||||
if (rdbSave(SLAVE_REQ_NONE,server.rdb_filename,rsiptr) == C_OK) {
|
||||
if (rdbSave(SLAVE_REQ_NONE,server.rdb_filename,rsiptr,RDBFLAGS_NONE) == C_OK) {
|
||||
addReply(c,shared.ok);
|
||||
} else {
|
||||
addReplyErrorObject(c,shared.err);
|
||||
@ -3612,7 +3625,7 @@ void bgsaveCommand(client *c) {
|
||||
"Use BGSAVE SCHEDULE in order to schedule a BGSAVE whenever "
|
||||
"possible.");
|
||||
}
|
||||
} else if (rdbSaveBackground(SLAVE_REQ_NONE,server.rdb_filename,rsiptr) == C_OK) {
|
||||
} else if (rdbSaveBackground(SLAVE_REQ_NONE,server.rdb_filename,rsiptr,RDBFLAGS_NONE) == C_OK) {
|
||||
addReplyStatus(c,"Background saving started");
|
||||
} else {
|
||||
addReplyErrorObject(c,shared.err);
|
||||
|
@ -135,6 +135,7 @@
|
||||
#define RDBFLAGS_REPLICATION (1<<1) /* Load/save for SYNC. */
|
||||
#define RDBFLAGS_ALLOW_DUP (1<<2) /* Allow duplicated keys when loading.*/
|
||||
#define RDBFLAGS_FEED_REPL (1<<3) /* Feed replication stream when loading.*/
|
||||
#define RDBFLAGS_KEEP_CACHE (1<<4) /* Don't reclaim cache after rdb file is generated */
|
||||
|
||||
/* When rdbLoadObject() returns NULL, the err flag is
|
||||
* set to hold the type of error that occurred */
|
||||
@ -153,10 +154,10 @@ int rdbLoadLenByRef(rio *rdb, int *isencoded, uint64_t *lenptr);
|
||||
int rdbSaveObjectType(rio *rdb, robj *o);
|
||||
int rdbLoadObjectType(rio *rdb);
|
||||
int rdbLoad(char *filename, rdbSaveInfo *rsi, int rdbflags);
|
||||
int rdbSaveBackground(int req, char *filename, rdbSaveInfo *rsi);
|
||||
int rdbSaveBackground(int req, char *filename, rdbSaveInfo *rsi, int rdbflags);
|
||||
int rdbSaveToSlavesSockets(int req, rdbSaveInfo *rsi);
|
||||
void rdbRemoveTempFile(pid_t childpid, int from_signal);
|
||||
int rdbSave(int req, char *filename, rdbSaveInfo *rsi);
|
||||
int rdbSave(int req, char *filename, rdbSaveInfo *rsi, int rdbflags);
|
||||
ssize_t rdbSaveObject(rio *rdb, robj *o, robj *key, int dbid);
|
||||
size_t rdbSavedObjectLen(robj *o, robj *key, int dbid);
|
||||
robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, int dbid, int *error);
|
||||
|
@ -111,7 +111,7 @@ int bg_unlink(const char *filename) {
|
||||
errno = old_errno;
|
||||
return -1;
|
||||
}
|
||||
bioCreateCloseJob(fd, 0);
|
||||
bioCreateCloseJob(fd, 0, 0);
|
||||
return 0; /* Success. */
|
||||
}
|
||||
}
|
||||
@ -858,8 +858,10 @@ int startBgsaveForReplication(int mincapa, int req) {
|
||||
if (rsiptr) {
|
||||
if (socket_target)
|
||||
retval = rdbSaveToSlavesSockets(req,rsiptr);
|
||||
else
|
||||
retval = rdbSaveBackground(req,server.rdb_filename,rsiptr);
|
||||
else {
|
||||
/* Keep the page cache since it'll get used soon */
|
||||
retval = rdbSaveBackground(req,server.rdb_filename,rsiptr,RDBFLAGS_KEEP_CACHE);
|
||||
}
|
||||
} else {
|
||||
serverLog(LL_WARNING,"BGSAVE for replication: replication information not available, can't generate the RDB file right now. Try later.");
|
||||
retval = C_ERR;
|
||||
@ -1350,6 +1352,29 @@ void removeRDBUsedToSyncReplicas(void) {
|
||||
}
|
||||
}
|
||||
|
||||
/* Close the repldbfd and reclaim the page cache if the client hold
|
||||
* the last reference to replication DB */
|
||||
void closeRepldbfd(client *myself) {
|
||||
listNode *ln;
|
||||
listIter li;
|
||||
int reclaim = 1;
|
||||
listRewind(server.slaves,&li);
|
||||
while((ln = listNext(&li))) {
|
||||
client *slave = ln->value;
|
||||
if (slave != myself && slave->replstate == SLAVE_STATE_SEND_BULK) {
|
||||
reclaim = 0;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (reclaim) {
|
||||
bioCreateCloseJob(myself->repldbfd, 0, 1);
|
||||
} else {
|
||||
close(myself->repldbfd);
|
||||
}
|
||||
myself->repldbfd = -1;
|
||||
}
|
||||
|
||||
void sendBulkToSlave(connection *conn) {
|
||||
client *slave = connGetPrivateData(conn);
|
||||
char buf[PROTO_IOBUF_LEN];
|
||||
@ -1398,8 +1423,7 @@ void sendBulkToSlave(connection *conn) {
|
||||
slave->repldboff += nwritten;
|
||||
atomicIncr(server.stat_net_repl_output_bytes, nwritten);
|
||||
if (slave->repldboff == slave->repldbsize) {
|
||||
close(slave->repldbfd);
|
||||
slave->repldbfd = -1;
|
||||
closeRepldbfd(slave);
|
||||
connSetWriteHandler(slave->conn,NULL);
|
||||
if (!replicaPutOnline(slave)) {
|
||||
freeClient(slave);
|
||||
@ -2164,7 +2188,7 @@ void readSyncBulkPayload(connection *conn) {
|
||||
return;
|
||||
}
|
||||
/* Close old rdb asynchronously. */
|
||||
if (old_rdb_fd != -1) bioCreateCloseJob(old_rdb_fd, 0);
|
||||
if (old_rdb_fd != -1) bioCreateCloseJob(old_rdb_fd, 0, 0);
|
||||
|
||||
/* Sync the directory to ensure rename is persisted */
|
||||
if (fsyncFileDir(server.rdb_filename) == -1) {
|
||||
@ -2201,7 +2225,6 @@ void readSyncBulkPayload(connection *conn) {
|
||||
}
|
||||
|
||||
zfree(server.repl_transfer_tmpfile);
|
||||
close(server.repl_transfer_fd);
|
||||
server.repl_transfer_fd = -1;
|
||||
server.repl_transfer_tmpfile = NULL;
|
||||
}
|
||||
|
20
src/rio.c
20
src/rio.c
@ -151,6 +151,16 @@ static size_t rioFileWrite(rio *r, const void *buf, size_t len) {
|
||||
#else
|
||||
if (redis_fsync(fileno(r->io.file.fp)) == -1) return 0;
|
||||
#endif
|
||||
if (r->io.file.reclaim_cache) {
|
||||
/* In Linux sync_file_range just issue a writeback request to
|
||||
* OS, and when posix_fadvise is called, the dirty page may
|
||||
* still be in flushing, which means it would be ignored by
|
||||
* posix_fadvise.
|
||||
*
|
||||
* So we posix_fadvise the whole file, and the writeback-ed
|
||||
* pages will have other chances to be reclaimed. */
|
||||
reclaimFilePageCache(fileno(r->io.file.fp), 0, 0);
|
||||
}
|
||||
r->io.file.buffered = 0;
|
||||
}
|
||||
}
|
||||
@ -191,6 +201,7 @@ void rioInitWithFile(rio *r, FILE *fp) {
|
||||
r->io.file.fp = fp;
|
||||
r->io.file.buffered = 0;
|
||||
r->io.file.autosync = 0;
|
||||
r->io.file.reclaim_cache = 0;
|
||||
}
|
||||
|
||||
/* ------------------- Connection implementation -------------------
|
||||
@ -439,6 +450,15 @@ void rioSetAutoSync(rio *r, off_t bytes) {
|
||||
r->io.file.autosync = bytes;
|
||||
}
|
||||
|
||||
/* Set the file-based rio object to reclaim cache after every auto-sync.
|
||||
* In the Linux implementation POSIX_FADV_DONTNEED skips the dirty
|
||||
* pages, so if auto sync is unset this option will have no effect.
|
||||
*
|
||||
* This feature can reduce the cache footprint backed by the file. */
|
||||
void rioSetReclaimCache(rio *r, int enabled) {
|
||||
r->io.file.reclaim_cache = enabled;
|
||||
}
|
||||
|
||||
/* Check the type of rio. */
|
||||
uint8_t rioCheckType(rio *r) {
|
||||
if (r->read == rioFileRead) {
|
||||
|
@ -81,6 +81,7 @@ struct _rio {
|
||||
FILE *fp;
|
||||
off_t buffered; /* Bytes written since last fsync. */
|
||||
off_t autosync; /* fsync after 'autosync' bytes written. */
|
||||
unsigned reclaim_cache:1; /* A flag to indicate reclaim cache after fsync */
|
||||
} file;
|
||||
/* Connection object (used to read from socket) */
|
||||
struct {
|
||||
@ -179,5 +180,6 @@ int rioWriteBulkObject(rio *r, struct redisObject *obj);
|
||||
|
||||
void rioGenericUpdateChecksum(rio *r, const void *buf, size_t len);
|
||||
void rioSetAutoSync(rio *r, off_t bytes);
|
||||
void rioSetReclaimCache(rio *r, int enabled);
|
||||
uint8_t rioCheckType(rio *r);
|
||||
#endif
|
||||
|
@ -1387,7 +1387,7 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
|
||||
sp->changes, (int)sp->seconds);
|
||||
rdbSaveInfo rsi, *rsiptr;
|
||||
rsiptr = rdbPopulateSaveInfo(&rsi);
|
||||
rdbSaveBackground(SLAVE_REQ_NONE,server.rdb_filename,rsiptr);
|
||||
rdbSaveBackground(SLAVE_REQ_NONE,server.rdb_filename,rsiptr,RDBFLAGS_NONE);
|
||||
break;
|
||||
}
|
||||
}
|
||||
@ -1482,7 +1482,7 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
|
||||
{
|
||||
rdbSaveInfo rsi, *rsiptr;
|
||||
rsiptr = rdbPopulateSaveInfo(&rsi);
|
||||
if (rdbSaveBackground(SLAVE_REQ_NONE,server.rdb_filename,rsiptr) == C_OK)
|
||||
if (rdbSaveBackground(SLAVE_REQ_NONE,server.rdb_filename,rsiptr,RDBFLAGS_NONE) == C_OK)
|
||||
server.rdb_bgsave_scheduled = 0;
|
||||
}
|
||||
|
||||
@ -4329,7 +4329,8 @@ int finishShutdown(void) {
|
||||
/* Snapshotting. Perform a SYNC SAVE and exit */
|
||||
rdbSaveInfo rsi, *rsiptr;
|
||||
rsiptr = rdbPopulateSaveInfo(&rsi);
|
||||
if (rdbSave(SLAVE_REQ_NONE,server.rdb_filename,rsiptr) != C_OK) {
|
||||
/* Keep the page cache since it's likely to restart soon */
|
||||
if (rdbSave(SLAVE_REQ_NONE,server.rdb_filename,rsiptr,RDBFLAGS_KEEP_CACHE) != C_OK) {
|
||||
/* Ooops.. error saving! The best we can do is to continue
|
||||
* operating. Note that if there was a background saving process,
|
||||
* in the next cron() Redis will be notified that the background
|
||||
|
59
src/util.c
59
src/util.c
@ -50,6 +50,8 @@
|
||||
#include "sha256.h"
|
||||
#include "config.h"
|
||||
|
||||
#define UNUSED(x) ((void)(x))
|
||||
|
||||
/* Glob-style pattern matching. */
|
||||
int stringmatchlen(const char *pattern, int patternLen,
|
||||
const char *string, int stringLen, int nocase)
|
||||
@ -1113,8 +1115,23 @@ int fsyncFileDir(const char *filename) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
/* free OS pages backed by file */
|
||||
int reclaimFilePageCache(int fd, size_t offset, size_t length) {
|
||||
#ifdef HAVE_FADVISE
|
||||
int ret = posix_fadvise(fd, offset, length, POSIX_FADV_DONTNEED);
|
||||
if (ret) return -1;
|
||||
return 0;
|
||||
#else
|
||||
UNUSED(fd);
|
||||
UNUSED(offset);
|
||||
UNUSED(length);
|
||||
return 0;
|
||||
#endif
|
||||
}
|
||||
|
||||
#ifdef REDIS_TEST
|
||||
#include <assert.h>
|
||||
#include <sys/mman.h>
|
||||
|
||||
static void test_string2ll(void) {
|
||||
char buf[32];
|
||||
@ -1333,7 +1350,44 @@ static void test_fixedpoint_d2string(void) {
|
||||
assert(sz == 0);
|
||||
}
|
||||
|
||||
#define UNUSED(x) (void)(x)
|
||||
#if defined(__linux__)
|
||||
/* Since fadvise and mincore is only supported in specific platforms like
|
||||
* Linux, we only verify the fadvise mechanism works in Linux */
|
||||
static int cache_exist(int fd) {
|
||||
unsigned char flag;
|
||||
void *m = mmap(NULL, 4096, PROT_READ, MAP_SHARED, fd, 0);
|
||||
assert(m);
|
||||
assert(mincore(m, 4096, &flag) == 0);
|
||||
munmap(m, 4096);
|
||||
/* the least significant bit of the byte will be set if the corresponding
|
||||
* page is currently resident in memory */
|
||||
return flag&1;
|
||||
}
|
||||
|
||||
static void test_reclaimFilePageCache(void) {
|
||||
char *tmpfile = "/tmp/redis-reclaim-cache-test";
|
||||
int fd = open(tmpfile, O_RDWR|O_CREAT, 0644);
|
||||
assert(fd >= 0);
|
||||
|
||||
/* test write file */
|
||||
char buf[4] = "foo";
|
||||
assert(write(fd, buf, sizeof(buf)) > 0);
|
||||
assert(cache_exist(fd));
|
||||
assert(redis_fsync(fd) == 0);
|
||||
assert(reclaimFilePageCache(fd, 0, 0) == 0);
|
||||
assert(!cache_exist(fd));
|
||||
|
||||
/* test read file */
|
||||
assert(pread(fd, buf, sizeof(buf), 0) > 0);
|
||||
assert(cache_exist(fd));
|
||||
assert(reclaimFilePageCache(fd, 0, 0) == 0);
|
||||
assert(!cache_exist(fd));
|
||||
|
||||
unlink(tmpfile);
|
||||
printf("reclaimFilePageCach test is ok\n");
|
||||
}
|
||||
#endif
|
||||
|
||||
int utilTest(int argc, char **argv, int flags) {
|
||||
UNUSED(argc);
|
||||
UNUSED(argv);
|
||||
@ -1344,6 +1398,9 @@ int utilTest(int argc, char **argv, int flags) {
|
||||
test_ll2string();
|
||||
test_ld2string();
|
||||
test_fixedpoint_d2string();
|
||||
#if defined(__linux__)
|
||||
test_reclaimFilePageCache();
|
||||
#endif
|
||||
return 0;
|
||||
}
|
||||
#endif
|
||||
|
@ -87,6 +87,7 @@ int dirRemove(char *dname);
|
||||
int fileExist(char *filename);
|
||||
sds makePath(char *path, char *filename);
|
||||
int fsyncFileDir(const char *filename);
|
||||
int reclaimFilePageCache(int fd, size_t offset, size_t length);
|
||||
|
||||
size_t redis_strlcpy(char *dst, const char *src, size_t dsize);
|
||||
size_t redis_strlcat(char *dst, const char *src, size_t dsize);
|
||||
|
Loading…
Reference in New Issue
Block a user