Enhance RESTORE with RDBv9 new features

RESTORE now supports:
1. Setting LRU/LFU
2. Absolute-time TTL

Other related changes:
1. RDB loading will not override LRU bits when RDB file
   does not contain the LRU opcode.
2. RDB loading will not set LRU/LFU bits if the server's
   maxmemory-policy does not match.
This commit is contained in:
Guy Benoish 2018-06-20 14:40:18 +07:00
parent c6fdebf533
commit b5197f1fc9
5 changed files with 100 additions and 22 deletions

View File

@ -4835,15 +4835,39 @@ void dumpCommand(client *c) {
/* RESTORE key ttl serialized-value [REPLACE] */ /* RESTORE key ttl serialized-value [REPLACE] */
void restoreCommand(client *c) { void restoreCommand(client *c) {
long long ttl; long long ttl, lfu_freq = -1, lru_idle = -1, lru_clock;
rio payload; rio payload;
int j, type, replace = 0; int j, type, replace = 0, absttl = 0;
robj *obj; robj *obj;
/* Parse additional options */ /* Parse additional options */
for (j = 4; j < c->argc; j++) { for (j = 4; j < c->argc; j++) {
int additional = c->argc-j-1;
if (!strcasecmp(c->argv[j]->ptr,"replace")) { if (!strcasecmp(c->argv[j]->ptr,"replace")) {
replace = 1; replace = 1;
} else if (!strcasecmp(c->argv[j]->ptr,"absttl")) {
absttl = 1;
} else if (!strcasecmp(c->argv[j]->ptr,"idletime") && additional >= 1 &&
lfu_freq == -1)
{
if (getLongLongFromObjectOrReply(c,c->argv[j+1],&lru_idle,NULL)
!= C_OK) return;
if (lru_idle < 0) {
addReplyError(c,"Invalid IDLETIME value, must be >= 0");
return;
}
lru_clock = LRU_CLOCK();
j++; /* Consume additional arg. */
} else if (!strcasecmp(c->argv[j]->ptr,"freq") && additional >= 1 &&
lru_idle == -1)
{
if (getLongLongFromObjectOrReply(c,c->argv[j+1],&lfu_freq,NULL)
!= C_OK) return;
if (lfu_freq < 0 || lfu_freq > 255) {
addReplyError(c,"Invalid FREQ value, must be >= 0 and <= 255");
return;
}
j++; /* Consume additional arg. */
} else { } else {
addReply(c,shared.syntaxerr); addReply(c,shared.syntaxerr);
return; return;
@ -4884,7 +4908,11 @@ void restoreCommand(client *c) {
/* Create the key and set the TTL if any */ /* Create the key and set the TTL if any */
dbAdd(c->db,c->argv[1],obj); dbAdd(c->db,c->argv[1],obj);
if (ttl) setExpire(c,c->db,c->argv[1],mstime()+ttl); if (ttl) {
if (!absttl) ttl+=mstime();
setExpire(c,c->db,c->argv[1],ttl);
}
objectSetLRUOrLFU(obj,lfu_freq,lru_idle,lru_clock);
signalModifiedKey(c->db,c->argv[1]); signalModifiedKey(c->db,c->argv[1]);
addReply(c,shared.ok); addReply(c,shared.ok);
server.dirty++; server.dirty++;

View File

@ -1166,6 +1166,32 @@ sds getMemoryDoctorReport(void) {
return s; return s;
} }
/* Set the object LRU/LFU depending on server.maxmemory_policy.
* The lfu_freq arg is only relevant if policy is MAXMEMORY_FLAG_LFU.
* The lru_idle and lru_clock args are only relevant if policy
* is MAXMEMORY_FLAG_LRU.
* Either or both of them may be <0, in that case, nothing is set. */
void objectSetLRUOrLFU(robj *val, long long lfu_freq, long long lru_idle,
long long lru_clock) {
if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) {
if (lfu_freq >= 0) {
serverAssert(lfu_freq <= 255);
val->lru = (LFUGetTimeInMinutes()<<8) | lfu_freq;
}
} else if (lru_idle >= 0) {
/* Serialized LRU idle time is in seconds. Scale
* according to the LRU clock resolution this Redis
* instance was compiled with (normally 1000 ms, so the
* below statement will expand to lru_idle*1000/1000. */
lru_idle = lru_idle*1000/LRU_CLOCK_RESOLUTION;
val->lru = lru_clock - lru_idle;
/* If the lru field overflows (since LRU it is a wrapping
* clock), the best we can do is to provide the maximum
* representable idle time. */
if (val->lru < 0) val->lru = lru_clock+1;
}
}
/* ======================= The OBJECT and MEMORY commands =================== */ /* ======================= The OBJECT and MEMORY commands =================== */
/* This is a helper function for the OBJECT command. We need to lookup keys /* This is a helper function for the OBJECT command. We need to lookup keys

View File

@ -1871,11 +1871,9 @@ int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) {
} }
/* Key-specific attributes, set by opcodes before the key type. */ /* Key-specific attributes, set by opcodes before the key type. */
long long expiretime = -1, now = mstime(); long long lru_idle = -1, lfu_freq = -1, expiretime = -1, now = mstime();
long long lru_clock = LRU_CLOCK(); long long lru_clock = LRU_CLOCK();
uint64_t lru_idle = -1;
int lfu_freq = -1;
while(1) { while(1) {
robj *key, *val; robj *key, *val;
@ -1903,7 +1901,9 @@ int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) {
continue; /* Read next opcode. */ continue; /* Read next opcode. */
} else if (type == RDB_OPCODE_IDLE) { } else if (type == RDB_OPCODE_IDLE) {
/* IDLE: LRU idle time. */ /* IDLE: LRU idle time. */
if ((lru_idle = rdbLoadLen(rdb,NULL)) == RDB_LENERR) goto eoferr; uint64_t qword;
if ((qword = rdbLoadLen(rdb,NULL)) == RDB_LENERR) goto eoferr;
lru_idle = qword;
continue; /* Read next opcode. */ continue; /* Read next opcode. */
} else if (type == RDB_OPCODE_EOF) { } else if (type == RDB_OPCODE_EOF) {
/* EOF: End of file, exit the main loop. */ /* EOF: End of file, exit the main loop. */
@ -2022,20 +2022,9 @@ int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) {
/* Set the expire time if needed */ /* Set the expire time if needed */
if (expiretime != -1) setExpire(NULL,db,key,expiretime); if (expiretime != -1) setExpire(NULL,db,key,expiretime);
if (lfu_freq != -1) {
val->lru = (LFUGetTimeInMinutes()<<8) | lfu_freq; /* Set usage information (for eviction). */
} else { objectSetLRUOrLFU(val,lfu_freq,lru_idle,lru_clock);
/* LRU idle time loaded from RDB is in seconds. Scale
* according to the LRU clock resolution this Redis
* instance was compiled with (normaly 1000 ms, so the
* below statement will expand to lru_idle*1000/1000. */
lru_idle = lru_idle*1000/LRU_CLOCK_RESOLUTION;
val->lru = lru_clock - lru_idle;
/* If the lru field overflows (since LRU it is a wrapping
* clock), the best we can do is to provide the maxium
* representable idle time. */
if (val->lru < 0) val->lru = lru_clock+1;
}
/* Decrement the key refcount since dbAdd() will take its /* Decrement the key refcount since dbAdd() will take its
* own reference. */ * own reference. */

View File

@ -1772,6 +1772,8 @@ robj *lookupKeyWriteOrReply(client *c, robj *key, robj *reply);
robj *lookupKeyReadWithFlags(redisDb *db, robj *key, int flags); robj *lookupKeyReadWithFlags(redisDb *db, robj *key, int flags);
robj *objectCommandLookup(client *c, robj *key); robj *objectCommandLookup(client *c, robj *key);
robj *objectCommandLookupOrReply(client *c, robj *key, robj *reply); robj *objectCommandLookupOrReply(client *c, robj *key, robj *reply);
void objectSetLRUOrLFU(robj *val, long long lfu_freq, long long lru_idle,
long long lru_clock);
#define LOOKUP_NONE 0 #define LOOKUP_NONE 0
#define LOOKUP_NOTOUCH (1<<0) #define LOOKUP_NOTOUCH (1<<0)
void dbAdd(redisDb *db, robj *key, robj *val); void dbAdd(redisDb *db, robj *key, robj *val);

View File

@ -25,6 +25,39 @@ start_server {tags {"dump"}} {
assert {$ttl >= (2569591501-3000) && $ttl <= 2569591501} assert {$ttl >= (2569591501-3000) && $ttl <= 2569591501}
r get foo r get foo
} {bar} } {bar}
test {RESTORE can set an absolute expire} {
r set foo bar
set encoded [r dump foo]
r del foo
set now [clock milliseconds]
r restore foo [expr $now+3000] $encoded absttl
set ttl [r pttl foo]
assert {$ttl >= 2998 && $ttl <= 3000}
r get foo
} {bar}
test {RESTORE can set LRU} {
r set foo bar
set encoded [r dump foo]
r del foo
r config set maxmemory-policy allkeys-lru
r restore foo 0 $encoded idletime 1000
set idle [r object idletime foo]
assert {$idle >= 1000 && $idle <= 1002}
r get foo
} {bar}
test {RESTORE can set LFU} {
r set foo bar
set encoded [r dump foo]
r del foo
r config set maxmemory-policy allkeys-lfu
r restore foo 0 $encoded freq 100
set freq [r object freq foo]
assert {$freq == 100}
r get foo
} {bar}
test {RESTORE returns an error of the key already exists} { test {RESTORE returns an error of the key already exists} {
r set foo bar r set foo bar