diskstore bug fixing and negative cache proper implementation

This commit is contained in:
antirez 2011-01-03 10:47:39 +01:00
parent 120b9ba8f8
commit c15a3887e0
4 changed files with 53 additions and 26 deletions

View File

@ -93,9 +93,7 @@ int dbAdd(redisDb *db, robj *key, robj *val) {
} else { } else {
sds copy = sdsdup(key->ptr); sds copy = sdsdup(key->ptr);
dictAdd(db->dict, copy, val); dictAdd(db->dict, copy, val);
if (server.ds_enabled) { if (server.ds_enabled) cacheSetKeyMayExist(db,key);
/* FIXME: remove entry from negative cache */
}
return REDIS_OK; return REDIS_OK;
} }
} }
@ -106,15 +104,18 @@ int dbAdd(redisDb *db, robj *key, robj *val) {
* On update (key already existed) 0 is returned. Otherwise 1. */ * On update (key already existed) 0 is returned. Otherwise 1. */
int dbReplace(redisDb *db, robj *key, robj *val) { int dbReplace(redisDb *db, robj *key, robj *val) {
robj *oldval; robj *oldval;
int retval;
if ((oldval = dictFetchValue(db->dict,key->ptr)) == NULL) { if ((oldval = dictFetchValue(db->dict,key->ptr)) == NULL) {
sds copy = sdsdup(key->ptr); sds copy = sdsdup(key->ptr);
dictAdd(db->dict, copy, val); dictAdd(db->dict, copy, val);
return 1; retval = 1;
} else { } else {
dictReplace(db->dict, key->ptr, val); dictReplace(db->dict, key->ptr, val);
return 0; retval = 0;
} }
if (server.ds_enabled) cacheSetKeyMayExist(db,key);
return retval;
} }
int dbExists(redisDb *db, robj *key) { int dbExists(redisDb *db, robj *key) {
@ -152,10 +153,10 @@ int dbDelete(redisDb *db, robj *key) {
/* If diskstore is enabled make sure to awake waiting clients for this key /* If diskstore is enabled make sure to awake waiting clients for this key
* as it is not really useful to wait for a key already deleted to be * as it is not really useful to wait for a key already deleted to be
* loaded from disk. */ * loaded from disk. */
if (server.ds_enabled) handleClientsBlockedOnSwappedKey(db,key); if (server.ds_enabled) {
handleClientsBlockedOnSwappedKey(db,key);
/* FIXME: we should mark this key as non existing on disk in the negative cacheSetKeyDoesNotExist(db,key);
* cache. */ }
/* Deleting an entry from the expires dict will not free the sds of /* Deleting an entry from the expires dict will not free the sds of
* the key, because it is shared with the main dictionary. */ * the key, because it is shared with the main dictionary. */

View File

@ -298,7 +298,7 @@ void dsFlushOneDir(char *path, int dbid) {
id[len] = '\0'; id[len] = '\0';
if (atoi(id) != dbid) continue; /* skip this file */ if (atoi(id) != dbid) continue; /* skip this file */
} }
if (unlink(path) == -1) { if (unlink(dp->d_name) == -1) {
redisLog(REDIS_WARNING, redisLog(REDIS_WARNING,
"Can't unlink %s: %s", path, strerror(errno)); "Can't unlink %s: %s", path, strerror(errno));
redisPanic("Unrecoverable Disk store errore. Existing."); redisPanic("Unrecoverable Disk store errore. Existing.");

View File

@ -245,10 +245,10 @@ int cacheFreeOneEntry(void) {
} }
} }
if (best == NULL) { if (best == NULL) {
/* FIXME: If there are objects marked as DS_DIRTY or DS_SAVING /* FIXME: If there are objects that are in the write queue
* let's wait for this objects to be clear and retry... * so we can't delete them we should block here, at the cost of
* * slowness as the object cache memory limit is considered
* Object cache vm limit is considered an hard limit. */ * n hard limit. */
return REDIS_ERR; return REDIS_ERR;
} }
key = dictGetEntryKey(best); key = dictGetEntryKey(best);
@ -306,6 +306,38 @@ void cacheSetKeyDoesNotExist(redisDb *db, robj *key) {
} }
} }
/* Remove one entry from negative cache using approximated LRU. */
int negativeCacheEvictOneEntry(void) {
struct dictEntry *de;
robj *best = NULL;
redisDb *best_db = NULL;
time_t time, best_time = 0;
int j;
for (j = 0; j < server.dbnum; j++) {
redisDb *db = server.db+j;
int i;
if (dictSize(db->io_negcache) == 0) continue;
for (i = 0; i < 3; i++) {
de = dictGetRandomKey(db->io_negcache);
time = (time_t) dictGetEntryVal(de);
if (best == NULL || time < best_time) {
best = dictGetEntryKey(de);
best_db = db;
best_time = time;
}
}
}
if (best) {
dictDelete(best_db->io_negcache,best);
return REDIS_OK;
} else {
return REDIS_ERR;
}
}
/* ================== Disk store cache - Threaded I/O ====================== */ /* ================== Disk store cache - Threaded I/O ====================== */
void freeIOJob(iojob *j) { void freeIOJob(iojob *j) {
@ -361,20 +393,11 @@ void vmThreadedIOCompletedJob(aeEventLoop *el, int fd, void *privdata,
incrRefCount(j->val); incrRefCount(j->val);
if (j->expire != -1) setExpire(j->db,j->key,j->expire); if (j->expire != -1) setExpire(j->db,j->key,j->expire);
} }
} else {
/* The key does not exist. Create a negative cache entry
* for this key. */
cacheSetKeyDoesNotExist(j->db,j->key);
} }
cacheScheduleIODelFlag(j->db,j->key,REDIS_IO_LOADINPROG); cacheScheduleIODelFlag(j->db,j->key,REDIS_IO_LOADINPROG);
handleClientsBlockedOnSwappedKey(j->db,j->key); handleClientsBlockedOnSwappedKey(j->db,j->key);
freeIOJob(j); freeIOJob(j);
} else if (j->type == REDIS_IOJOB_SAVE) { } else if (j->type == REDIS_IOJOB_SAVE) {
if (j->val) {
cacheSetKeyMayExist(j->db,j->key);
} else {
cacheSetKeyDoesNotExist(j->db,j->key);
}
cacheScheduleIODelFlag(j->db,j->key,REDIS_IO_SAVEINPROG); cacheScheduleIODelFlag(j->db,j->key,REDIS_IO_SAVEINPROG);
freeIOJob(j); freeIOJob(j);
} }
@ -740,8 +763,11 @@ void cacheCron(void) {
while (server.ds_enabled && zmalloc_used_memory() > while (server.ds_enabled && zmalloc_used_memory() >
server.cache_max_memory) server.cache_max_memory)
{ {
if (cacheFreeOneEntry() == REDIS_ERR) break; int done = 0;
/* FIXME: also free negative cache entries here. */
if (cacheFreeOneEntry() == REDIS_OK) done++;
if (negativeCacheEvictOneEntry() == REDIS_OK) done++;
if (done == 0) break; /* nothing more to free */
} }
} }

View File

@ -814,7 +814,7 @@ int cacheScheduleIOGetFlags(redisDb *db, robj *key);
void cacheScheduleIO(redisDb *db, robj *key, int type); void cacheScheduleIO(redisDb *db, robj *key, int type);
void cacheCron(void); void cacheCron(void);
int cacheKeyMayExist(redisDb *db, robj *key); int cacheKeyMayExist(redisDb *db, robj *key);
void cacheSetKeyExists(redisDb *db, robj *key); void cacheSetKeyMayExist(redisDb *db, robj *key);
void cacheSetKeyDoesNotExist(redisDb *db, robj *key); void cacheSetKeyDoesNotExist(redisDb *db, robj *key);
/* Set data type */ /* Set data type */