support for write operations against expiring keys, by master-controlled expiring in replication and AOF synthesizing DEL operations

This commit is contained in:
antirez 2010-08-02 18:13:39 +02:00
parent 0c7a9dec65
commit bcf2995c98
4 changed files with 86 additions and 49 deletions

View File

@ -45,7 +45,7 @@ robj *lookupKeyRead(redisDb *db, robj *key) {
} }
robj *lookupKeyWrite(redisDb *db, robj *key) { robj *lookupKeyWrite(redisDb *db, robj *key) {
deleteIfVolatile(db,key); expireIfNeeded(db,key);
return lookupKey(db,key); return lookupKey(db,key);
} }
@ -321,7 +321,6 @@ void renameGenericCommand(redisClient *c, int nx) {
return; return;
incrRefCount(o); incrRefCount(o);
deleteIfVolatile(c->db,c->argv[2]);
if (dbAdd(c->db,c->argv[2],o) == REDIS_ERR) { if (dbAdd(c->db,c->argv[2],o) == REDIS_ERR) {
if (nx) { if (nx) {
decrRefCount(o); decrRefCount(o);
@ -375,7 +374,6 @@ void moveCommand(redisClient *c) {
} }
/* Try to add the element to the target DB */ /* Try to add the element to the target DB */
deleteIfVolatile(dst,c->argv[1]);
if (dbAdd(dst,c->argv[1],o) == REDIS_ERR) { if (dbAdd(dst,c->argv[1],o) == REDIS_ERR) {
addReply(c,shared.czero); addReply(c,shared.czero);
return; return;
@ -430,8 +428,45 @@ time_t getExpire(redisDb *db, robj *key) {
return (time_t) dictGetEntryVal(de); return (time_t) dictGetEntryVal(de);
} }
/* Propagate expires into slaves and the AOF file.
* When a key expires in the master, a DEL operation for this key is sent
* to all the slaves and the AOF file if enabled.
*
* This way the key expiry is centralized in one place, and since both
* AOF and the master->slave link guarantee operation ordering, everything
* will be consistent even if we allow write operations against expiring
* keys. */
void propagateExpire(redisDb *db, robj *key) {
struct redisCommand *cmd;
robj *argv[2];
cmd = lookupCommand("del");
argv[0] = createStringObject("DEL",3);
argv[1] = key;
incrRefCount(key);
if (server.appendonly)
feedAppendOnlyFile(cmd,db->id,argv,2);
if (listLength(server.slaves))
replicationFeedSlaves(server.slaves,db->id,argv,2);
decrRefCount(key);
}
int expireIfNeeded(redisDb *db, robj *key) { int expireIfNeeded(redisDb *db, robj *key) {
time_t when = getExpire(db,key); time_t when = getExpire(db,key);
/* If we are running in the context of a slave, return ASAP:
* the slave key expiration is controlled by the master that will
* send us synthesized DEL operations for expired keys.
*
* Still we try to return the right information to the caller,
* that is, 0 if we think the key should be still valid, 1 if
* we think the key is expired at this time. */
if (server.masterhost != NULL) {
return time(NULL) > when;
}
if (when < 0) return 0; if (when < 0) return 0;
/* Return when this key has not expired */ /* Return when this key has not expired */
@ -440,15 +475,7 @@ int expireIfNeeded(redisDb *db, robj *key) {
/* Delete the key */ /* Delete the key */
server.stat_expiredkeys++; server.stat_expiredkeys++;
server.dirty++; server.dirty++;
return dbDelete(db,key); propagateExpire(db,key);
}
int deleteIfVolatile(redisDb *db, robj *key) {
if (getExpire(db,key) < 0) return 0;
/* Delete the key */
server.stat_expiredkeys++;
server.dirty++;
return dbDelete(db,key); return dbDelete(db,key);
} }

View File

@ -435,6 +435,48 @@ void updateDictResizePolicy(void) {
/* ======================= Cron: called every 100 ms ======================== */ /* ======================= Cron: called every 100 ms ======================== */
/* Try to expire a few timed out keys. The algorithm used is adaptive and
* will use few CPU cycles if there are few expiring keys, otherwise
* it will get more aggressive to avoid that too much memory is used by
* keys that can be removed from the keyspace. */
void activeExpireCycle(void) {
int j;
for (j = 0; j < server.dbnum; j++) {
int expired;
redisDb *db = server.db+j;
/* Continue to expire if at the end of the cycle more than 25%
* of the keys were expired. */
do {
long num = dictSize(db->expires);
time_t now = time(NULL);
expired = 0;
if (num > REDIS_EXPIRELOOKUPS_PER_CRON)
num = REDIS_EXPIRELOOKUPS_PER_CRON;
while (num--) {
dictEntry *de;
time_t t;
if ((de = dictGetRandomKey(db->expires)) == NULL) break;
t = (time_t) dictGetEntryVal(de);
if (now > t) {
sds key = dictGetEntryKey(de);
robj *keyobj = createStringObject(key,sdslen(key));
propagateExpire(db,keyobj);
dbDelete(db,keyobj);
decrRefCount(keyobj);
expired++;
server.stat_expiredkeys++;
}
}
} while (expired > REDIS_EXPIRELOOKUPS_PER_CRON/4);
}
}
int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
int j, loops = server.cronloops++; int j, loops = server.cronloops++;
REDIS_NOTUSED(eventLoop); REDIS_NOTUSED(eventLoop);
@ -533,41 +575,10 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
} }
} }
/* Try to expire a few timed out keys. The algorithm used is adaptive and /* Expire a few keys per cycle, only if this is a master.
* will use few CPU cycles if there are few expiring keys, otherwise * On slaves we wait for DEL operations synthesized by the master
* it will get more aggressive to avoid that too much memory is used by * in order to guarantee a strict consistency. */
* keys that can be removed from the keyspace. */ if (server.masterhost == NULL) activeExpireCycle();
for (j = 0; j < server.dbnum; j++) {
int expired;
redisDb *db = server.db+j;
/* Continue to expire if at the end of the cycle more than 25%
* of the keys were expired. */
do {
long num = dictSize(db->expires);
time_t now = time(NULL);
expired = 0;
if (num > REDIS_EXPIRELOOKUPS_PER_CRON)
num = REDIS_EXPIRELOOKUPS_PER_CRON;
while (num--) {
dictEntry *de;
time_t t;
if ((de = dictGetRandomKey(db->expires)) == NULL) break;
t = (time_t) dictGetEntryVal(de);
if (now > t) {
sds key = dictGetEntryKey(de);
robj *keyobj = createStringObject(key,sdslen(key));
dbDelete(db,keyobj);
decrRefCount(keyobj);
expired++;
server.stat_expiredkeys++;
}
}
} while (expired > REDIS_EXPIRELOOKUPS_PER_CRON/4);
}
/* Swap a few keys on disk if we are over the memory limit and VM /* Swap a few keys on disk if we are over the memory limit and VM
* is enbled. Try to free objects from the free list first. */ * is enbled. Try to free objects from the free list first. */

View File

@ -752,8 +752,8 @@ void resetServerSaveParams();
/* db.c -- Keyspace access API */ /* db.c -- Keyspace access API */
int removeExpire(redisDb *db, robj *key); int removeExpire(redisDb *db, robj *key);
void propagateExpire(redisDb *db, robj *key);
int expireIfNeeded(redisDb *db, robj *key); int expireIfNeeded(redisDb *db, robj *key);
int deleteIfVolatile(redisDb *db, robj *key);
time_t getExpire(redisDb *db, robj *key); time_t getExpire(redisDb *db, robj *key);
int setExpire(redisDb *db, robj *key, time_t when); int setExpire(redisDb *db, robj *key, time_t when);
robj *lookupKey(redisDb *db, robj *key); robj *lookupKey(redisDb *db, robj *key);

View File

@ -17,7 +17,6 @@ void setGenericCommand(redisClient *c, int nx, robj *key, robj *val, robj *expir
} }
} }
if (nx) deleteIfVolatile(c->db,key);
retval = dbAdd(c->db,key,val); retval = dbAdd(c->db,key,val);
if (retval == REDIS_ERR) { if (retval == REDIS_ERR) {
if (!nx) { if (!nx) {