mirror of
https://codeberg.org/redict/redict.git
synced 2025-01-22 16:18:28 -05:00
negative caching implemented
This commit is contained in:
parent
c4b64a1395
commit
d934e1e85b
13
src/db.c
13
src/db.c
@ -38,6 +38,8 @@ robj *lookupKey(redisDb *db, robj *key) {
|
|||||||
* async loading of this key, what may happen is that the old
|
* async loading of this key, what may happen is that the old
|
||||||
* key is loaded in memory if this gets deleted in the meantime. */
|
* key is loaded in memory if this gets deleted in the meantime. */
|
||||||
if (server.ds_enabled && cacheKeyMayExist(db,key)) {
|
if (server.ds_enabled && cacheKeyMayExist(db,key)) {
|
||||||
|
redisLog(REDIS_DEBUG,"Force loading key %s via lookup",
|
||||||
|
key->ptr);
|
||||||
val = dsGet(db,key,&expire);
|
val = dsGet(db,key,&expire);
|
||||||
if (val) {
|
if (val) {
|
||||||
int retval = dbAdd(db,key,val);
|
int retval = dbAdd(db,key,val);
|
||||||
@ -142,14 +144,13 @@ robj *dbRandomKey(redisDb *db) {
|
|||||||
|
|
||||||
/* Delete a key, value, and associated expiration entry if any, from the DB */
|
/* Delete a key, value, and associated expiration entry if any, from the DB */
|
||||||
int dbDelete(redisDb *db, robj *key) {
|
int dbDelete(redisDb *db, robj *key) {
|
||||||
/* If VM is enabled make sure to awake waiting clients for this key:
|
/* If diskstore is enabled make sure to awake waiting clients for this key
|
||||||
* deleting the key will kill the I/O thread bringing the key from swap
|
* as it is not really useful to wait for a key already deleted to be
|
||||||
* to memory, so the client will never be notified and unblocked if we
|
* loaded from disk. */
|
||||||
* don't do it now. */
|
|
||||||
if (server.ds_enabled) handleClientsBlockedOnSwappedKey(db,key);
|
if (server.ds_enabled) handleClientsBlockedOnSwappedKey(db,key);
|
||||||
|
|
||||||
/* FIXME: we need to delete the IO Job loading the key, or simply we can
|
/* Mark this key as non existing on disk as well */
|
||||||
* wait for it to finish. */
|
cacheSetKeyDoesNotExistRemember(db,key);
|
||||||
|
|
||||||
/* Deleting an entry from the expires dict will not free the sds of
|
/* Deleting an entry from the expires dict will not free the sds of
|
||||||
* the key, because it is shared with the main dictionary. */
|
* the key, because it is shared with the main dictionary. */
|
||||||
|
107
src/dscache.c
107
src/dscache.c
@ -105,6 +105,10 @@
|
|||||||
* value so it will be evicted later.
|
* value so it will be evicted later.
|
||||||
*
|
*
|
||||||
* Are there other patterns like this where we load stale data?
|
* Are there other patterns like this where we load stale data?
|
||||||
|
*
|
||||||
|
* Also, make sure that key preloading is ONLY done for keys that are
|
||||||
|
* not marked as cacheKeyDoesNotExist(), otherwise, again, we can load
|
||||||
|
* data from disk that should instead be deleted.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
/* Virtual Memory is composed mainly of two subsystems:
|
/* Virtual Memory is composed mainly of two subsystems:
|
||||||
@ -259,7 +263,72 @@ int dsCanTouchDiskStore(void) {
|
|||||||
return (server.bgsavechildpid == -1 && server.bgrewritechildpid == -1);
|
return (server.bgsavechildpid == -1 && server.bgrewritechildpid == -1);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* =================== Virtual Memory - Threaded I/O ======================= */
|
/* ==================== Disk store negative caching ========================
|
||||||
|
*
|
||||||
|
* When disk store is enabled, we need negative caching, that is, to remember
|
||||||
|
* keys that are for sure *not* on the disk key-value store.
|
||||||
|
*
|
||||||
|
* This is useful for two reasons:
|
||||||
|
*
|
||||||
|
* 1) Without negative caching cache misses will cost us a disk lookup, even
|
||||||
|
* if the same non existing key is accessed again and again. We negative
|
||||||
|
* caching we remember that the key is not on disk, so if it's not in memory
|
||||||
|
* and we have a negative cache entry, we don't try a disk access at all.
|
||||||
|
*
|
||||||
|
* 2) Negative caching is the way to fix a specific race condition. For instance
|
||||||
|
* think at the following sequence of commands:
|
||||||
|
*
|
||||||
|
* SET foo bar
|
||||||
|
* DEL foo
|
||||||
|
* GET foo
|
||||||
|
*
|
||||||
|
* After the SET, we'll mark the value as dirty, so it will be flushed
|
||||||
|
* on disk at some time. Later the key is deleted, so will be removed
|
||||||
|
* from memory. Another job will be created to remove the key from the disk
|
||||||
|
* store, but the removal is not synchronous, so may happen later in time.
|
||||||
|
*
|
||||||
|
* Finally we have a GET foo operation. This operation may result in
|
||||||
|
* reading back a value from disk that is not updated data, as the deletion
|
||||||
|
* operaiton against the disk KV store was still not completed, so we
|
||||||
|
* read old data.
|
||||||
|
*
|
||||||
|
* Remembering that the given key is deleted is important. We can discard this
|
||||||
|
* information once the key was really removed from the disk.
|
||||||
|
*
|
||||||
|
* So actually there are two kind of negative caching entries: entries that
|
||||||
|
* can be evicted when we need to reclaim memory, and entries that will
|
||||||
|
* not be evicted, for all the time we need this information to be available.
|
||||||
|
*
|
||||||
|
* The API allows to create both kind of negative caching. */
|
||||||
|
|
||||||
|
int cacheKeyMayExist(redisDb *db, robj *key) {
|
||||||
|
return dictFind(db->io_negcache,key) == NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
void cacheSetKeyMayExist(redisDb *db, robj *key) {
|
||||||
|
dictDelete(db->io_negcache,key);
|
||||||
|
}
|
||||||
|
|
||||||
|
void cacheSetKeyDoesNotExist(redisDb *db, robj *key) {
|
||||||
|
struct dictEntry *de;
|
||||||
|
|
||||||
|
/* Don't overwrite negative cached entries with val set to 0, as this
|
||||||
|
* entries were created with cacheSetKeyDoesNotExistRemember(). */
|
||||||
|
de = dictFind(db->io_negcache,key);
|
||||||
|
if (de != NULL && dictGetEntryVal(de) == NULL) return;
|
||||||
|
|
||||||
|
if (dictReplace(db->io_negcache,key,(void*)time(NULL))) {
|
||||||
|
incrRefCount(key);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void cacheSetKeyDoesNotExistRemember(redisDb *db, robj *key) {
|
||||||
|
if (dictReplace(db->io_negcache,key,NULL)) {
|
||||||
|
incrRefCount(key);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/* ================== Disk store cache - Threaded I/O ====================== */
|
||||||
|
|
||||||
void freeIOJob(iojob *j) {
|
void freeIOJob(iojob *j) {
|
||||||
decrRefCount(j->key);
|
decrRefCount(j->key);
|
||||||
@ -310,15 +379,20 @@ void vmThreadedIOCompletedJob(aeEventLoop *el, int fd, void *privdata,
|
|||||||
if (j->val != NULL) {
|
if (j->val != NULL) {
|
||||||
/* Note: the key may already be here if between the time
|
/* Note: the key may already be here if between the time
|
||||||
* this key loading was scheduled and now there was the
|
* this key loading was scheduled and now there was the
|
||||||
* need to blocking load the key for a key lookup. */
|
* need to blocking load the key for a key lookup.
|
||||||
if (dbAdd(j->db,j->key,j->val) == REDIS_OK) {
|
*
|
||||||
|
* Also we don't add a key that was deleted in the
|
||||||
|
* meantime and should not be on disk either. */
|
||||||
|
if (cacheKeyMayExist(j->db,j->key) &&
|
||||||
|
dbAdd(j->db,j->key,j->val) == REDIS_OK)
|
||||||
|
{
|
||||||
incrRefCount(j->val);
|
incrRefCount(j->val);
|
||||||
if (j->expire != -1) setExpire(j->db,j->key,j->expire);
|
if (j->expire != -1) setExpire(j->db,j->key,j->expire);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
/* The key does not exist. Create a negative cache entry
|
/* The key does not exist. Create a negative cache entry
|
||||||
* for this key. */
|
* for this key. */
|
||||||
/* FIXME: add this entry into the negative cache */
|
cacheSetKeyDoesNotExist(j->db,j->key);
|
||||||
}
|
}
|
||||||
/* Handle clients waiting for this key to be loaded. */
|
/* Handle clients waiting for this key to be loaded. */
|
||||||
handleClientsBlockedOnSwappedKey(j->db,j->key);
|
handleClientsBlockedOnSwappedKey(j->db,j->key);
|
||||||
@ -327,6 +401,12 @@ void vmThreadedIOCompletedJob(aeEventLoop *el, int fd, void *privdata,
|
|||||||
if (j->val) {
|
if (j->val) {
|
||||||
redisAssert(j->val->storage == REDIS_DS_SAVING);
|
redisAssert(j->val->storage == REDIS_DS_SAVING);
|
||||||
j->val->storage = REDIS_DS_MEMORY;
|
j->val->storage = REDIS_DS_MEMORY;
|
||||||
|
cacheSetKeyMayExist(j->db,j->key);
|
||||||
|
} else {
|
||||||
|
/* Key deleted. Probably we have this key marked as
|
||||||
|
* non existing, and impossible to evict, in our negative
|
||||||
|
* cache entry. Add it as a normal negative cache entry. */
|
||||||
|
cacheSetKeyMayExist(j->db,j->key);
|
||||||
}
|
}
|
||||||
freeIOJob(j);
|
freeIOJob(j);
|
||||||
}
|
}
|
||||||
@ -585,15 +665,6 @@ void cacheCron(void) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/* ============ Negative caching for diskstore objects ====================== */
|
|
||||||
/* Since accesses to keys that don't exist with disk store cost us a disk
|
|
||||||
* access, we need to cache names of keys that do not exist but are frequently
|
|
||||||
* accessed. */
|
|
||||||
int cacheKeyMayExist(redisDb *db, robj *key) {
|
|
||||||
/* FIXME: for now we just always return true. */
|
|
||||||
return 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
/* ============ Virtual Memory - Blocking clients on missing keys =========== */
|
/* ============ Virtual Memory - Blocking clients on missing keys =========== */
|
||||||
|
|
||||||
/* This function makes the clinet 'c' waiting for the key 'key' to be loaded.
|
/* This function makes the clinet 'c' waiting for the key 'key' to be loaded.
|
||||||
@ -624,6 +695,9 @@ int waitForSwappedKey(redisClient *c, robj *key) {
|
|||||||
de = dictFind(c->db->dict,key->ptr);
|
de = dictFind(c->db->dict,key->ptr);
|
||||||
if (de != NULL) return 0;
|
if (de != NULL) return 0;
|
||||||
|
|
||||||
|
/* Don't wait for keys we are sure are not on disk either */
|
||||||
|
if (!cacheKeyMayExist(c->db,key)) return 0;
|
||||||
|
|
||||||
/* Add the key to the list of keys this client is waiting for.
|
/* Add the key to the list of keys this client is waiting for.
|
||||||
* This maps clients to keys they are waiting for. */
|
* This maps clients to keys they are waiting for. */
|
||||||
listAddNodeTail(c->io_keys,key);
|
listAddNodeTail(c->io_keys,key);
|
||||||
@ -645,13 +719,6 @@ int waitForSwappedKey(redisClient *c, robj *key) {
|
|||||||
listAddNodeTail(l,c);
|
listAddNodeTail(l,c);
|
||||||
|
|
||||||
/* Are we already loading the key from disk? If not create a job */
|
/* Are we already loading the key from disk? If not create a job */
|
||||||
/* FIXME: if a given client was blocked for this key (so job already
|
|
||||||
* created) but the client was freed, there may be a job loading this
|
|
||||||
* key even if de == NULL. Does this creates some race condition?
|
|
||||||
*
|
|
||||||
* Example: after the first load the key gets a DEL that will schedule
|
|
||||||
* a write. But the write will happen later, the duplicated load will
|
|
||||||
* fire and we'll get again the key in memory. */
|
|
||||||
if (de == NULL)
|
if (de == NULL)
|
||||||
dsCreateIOJob(REDIS_IOJOB_LOAD,c->db,key,NULL);
|
dsCreateIOJob(REDIS_IOJOB_LOAD,c->db,key,NULL);
|
||||||
return 1;
|
return 1;
|
||||||
|
@ -345,7 +345,7 @@ unsigned int dictEncObjHash(const void *key) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Sets type */
|
/* Sets type and diskstore negative caching hash table */
|
||||||
dictType setDictType = {
|
dictType setDictType = {
|
||||||
dictEncObjHash, /* hash function */
|
dictEncObjHash, /* hash function */
|
||||||
NULL, /* key dup */
|
NULL, /* key dup */
|
||||||
@ -854,8 +854,10 @@ void initServer() {
|
|||||||
server.db[j].expires = dictCreate(&keyptrDictType,NULL);
|
server.db[j].expires = dictCreate(&keyptrDictType,NULL);
|
||||||
server.db[j].blocking_keys = dictCreate(&keylistDictType,NULL);
|
server.db[j].blocking_keys = dictCreate(&keylistDictType,NULL);
|
||||||
server.db[j].watched_keys = dictCreate(&keylistDictType,NULL);
|
server.db[j].watched_keys = dictCreate(&keylistDictType,NULL);
|
||||||
if (server.ds_enabled)
|
if (server.ds_enabled) {
|
||||||
server.db[j].io_keys = dictCreate(&keylistDictType,NULL);
|
server.db[j].io_keys = dictCreate(&keylistDictType,NULL);
|
||||||
|
server.db[j].io_negcache = dictCreate(&setDictType,NULL);
|
||||||
|
}
|
||||||
server.db[j].id = j;
|
server.db[j].id = j;
|
||||||
}
|
}
|
||||||
server.pubsub_channels = dictCreate(&keylistDictType,NULL);
|
server.pubsub_channels = dictCreate(&keylistDictType,NULL);
|
||||||
|
@ -269,6 +269,7 @@ typedef struct redisDb {
|
|||||||
dict *expires; /* Timeout of keys with a timeout set */
|
dict *expires; /* Timeout of keys with a timeout set */
|
||||||
dict *blocking_keys; /* Keys with clients waiting for data (BLPOP) */
|
dict *blocking_keys; /* Keys with clients waiting for data (BLPOP) */
|
||||||
dict *io_keys; /* Keys with clients waiting for VM I/O */
|
dict *io_keys; /* Keys with clients waiting for VM I/O */
|
||||||
|
dict *io_negcache; /* Negative caching for disk store */
|
||||||
dict *watched_keys; /* WATCHED keys for MULTI/EXEC CAS */
|
dict *watched_keys; /* WATCHED keys for MULTI/EXEC CAS */
|
||||||
int id;
|
int id;
|
||||||
} redisDb;
|
} redisDb;
|
||||||
@ -809,6 +810,9 @@ int cacheFreeOneEntry(void);
|
|||||||
void cacheScheduleForFlush(redisDb *db, robj *key);
|
void cacheScheduleForFlush(redisDb *db, robj *key);
|
||||||
void cacheCron(void);
|
void cacheCron(void);
|
||||||
int cacheKeyMayExist(redisDb *db, robj *key);
|
int cacheKeyMayExist(redisDb *db, robj *key);
|
||||||
|
void cacheSetKeyExists(redisDb *db, robj *key);
|
||||||
|
void cacheSetKeyDoesNotExist(redisDb *db, robj *key);
|
||||||
|
void cacheSetKeyDoesNotExistRemember(redisDb *db, robj *key);
|
||||||
|
|
||||||
/* Set data type */
|
/* Set data type */
|
||||||
robj *setTypeCreate(robj *value);
|
robj *setTypeCreate(robj *value);
|
||||||
|
Loading…
Reference in New Issue
Block a user