implemented a different approach to IO scheduling, so object->storage is no longer used, instead there is a queue and hash table of IO tasks to process, and it is always possible to know what are the scheduled and acrtive IO operations against every single key.

This commit is contained in:
antirez 2011-01-01 21:35:56 +01:00
parent aa81e4d5f4
commit 3be00d7ed6
7 changed files with 190 additions and 156 deletions

View File

@ -17,12 +17,14 @@ robj *lookupKey(redisDb *db, robj *key) {
if (server.bgsavechildpid == -1 && server.bgrewritechildpid == -1)
val->lru = server.lruclock;
if (server.ds_enabled && val->storage == REDIS_DS_SAVING) {
/* FIXME: change this code to just wait for our object to
* get out of the IO Job. As it is now it is correct but slow. */
if (server.ds_enabled &&
cacheScheduleIOGetFlags(db,key) & REDIS_IO_SAVEINPROG)
{
/* There is a save in progress for this object!
* Wait for it to get out. */
waitEmptyIOJobsQueue();
processAllPendingIOJobs();
redisAssert(val->storage != REDIS_DS_SAVING);
redisAssert(!(cacheScheduleIOGetFlags(db,key) & REDIS_IO_SAVEINPROG));
}
server.stat_keyspace_hits++;
return val;
@ -106,7 +108,6 @@ int dbReplace(redisDb *db, robj *key, robj *val) {
dictAdd(db->dict, copy, val);
return 1;
} else {
val->storage = oldval->storage;
dictReplace(db->dict, key->ptr, val);
return 0;
}
@ -149,8 +150,8 @@ int dbDelete(redisDb *db, robj *key) {
* loaded from disk. */
if (server.ds_enabled) handleClientsBlockedOnSwappedKey(db,key);
/* Mark this key as non existing on disk as well */
cacheSetKeyDoesNotExistRemember(db,key);
/* FIXME: we should mark this key as non existing on disk in the negative
* cache. */
/* Deleting an entry from the expires dict will not free the sds of
* the key, because it is shared with the main dictionary. */
@ -190,7 +191,7 @@ int selectDb(redisClient *c, int id) {
void signalModifiedKey(redisDb *db, robj *key) {
touchWatchedKey(db,key);
if (server.ds_enabled)
cacheScheduleForFlush(db,key);
cacheScheduleIO(db,key,REDIS_IO_SAVE);
}
void signalFlushedDb(int dbid) {
@ -240,7 +241,7 @@ void delCommand(redisClient *c) {
if (cacheKeyMayExist(c->db,c->argv[j]) &&
dsExists(c->db,c->argv[j]))
{
cacheScheduleForFlush(c->db,c->argv[j]);
cacheScheduleIO(c->db,c->argv[j],REDIS_IO_SAVE);
deleted = 1;
}
}

View File

@ -201,7 +201,6 @@ void debugCommand(redisClient *c) {
dictEntry *de = dictFind(c->db->dict,c->argv[2]->ptr);
robj *val;
char *strenc;
char *storage;
if (!de) {
addReply(c,shared.nokeyerr);
@ -209,19 +208,14 @@ void debugCommand(redisClient *c) {
}
val = dictGetEntryVal(de);
strenc = strEncoding(val->encoding);
switch(val->storage) {
case REDIS_DS_MEMORY: storage = "memory"; break;
case REDIS_DS_DIRTY: storage = "dirty"; break;
case REDIS_DS_SAVING: storage = "saving"; break;
default: storage = "unknown"; break;
}
addReplyStatusFormat(c,
"Value at:%p refcount:%d "
"encoding:%s serializedlength:%lld "
"lru:%d lru_seconds_idle:%lu storage:%s",
"lru:%d lru_seconds_idle:%lu",
(void*)val, val->refcount,
strenc, (long long) rdbSavedObjectLen(val),
val->lru, estimateObjectIdleTime(val), storage);
val->lru, estimateObjectIdleTime(val));
} else if (!strcasecmp(c->argv[1]->ptr,"populate") && c->argc == 3) {
long keys, j;
robj *key, *val;

View File

@ -214,17 +214,18 @@ int cacheFreeOneEntry(void) {
for (i = 0; i < 5; i++) {
dictEntry *de;
double swappability;
robj keyobj;
sds keystr;
if (maxtries) maxtries--;
de = dictGetRandomKey(db->dict);
keystr = dictGetEntryKey(de);
val = dictGetEntryVal(de);
/* Only swap objects that are currently in memory.
*
* Also don't swap shared objects: not a good idea in general and
* we need to ensure that the main thread does not touch the
* object while the I/O thread is using it, but we can't
* control other keys without adding additional mutex. */
if (val->storage != REDIS_DS_MEMORY) {
initStaticStringObject(keyobj,keystr);
/* Don't remove objects that are currently target of a
* read or write operation. */
if (cacheScheduleIOGetFlags(db,&keyobj) != 0) {
if (maxtries) i--; /* don't count this try */
continue;
}
@ -270,66 +271,34 @@ int dsCanTouchDiskStore(void) {
* When disk store is enabled, we need negative caching, that is, to remember
* keys that are for sure *not* on the disk key-value store.
*
* This is useful for two reasons:
* This is usefuls because without negative caching cache misses will cost us
* a disk lookup, even if the same non existing key is accessed again and again.
*
* 1) Without negative caching cache misses will cost us a disk lookup, even
* if the same non existing key is accessed again and again. We negative
* caching we remember that the key is not on disk, so if it's not in memory
* and we have a negative cache entry, we don't try a disk access at all.
*
* 2) Negative caching is the way to fix a specific race condition. For instance
* think at the following sequence of commands:
*
* SET foo bar
* DEL foo
* GET foo
*
* After the SET, we'll mark the value as dirty, so it will be flushed
* on disk at some time. Later the key is deleted, so will be removed
* from memory. Another job will be created to remove the key from the disk
* store, but the removal is not synchronous, so may happen later in time.
*
* Finally we have a GET foo operation. This operation may result in
* reading back a value from disk that is not updated data, as the deletion
* operaiton against the disk KV store was still not completed, so we
* read old data.
*
* Remembering that the given key is deleted is important. We can discard this
* information once the key was really removed from the disk.
*
* So actually there are two kind of negative caching entries: entries that
* can be evicted when we need to reclaim memory, and entries that will
* not be evicted, for all the time we need this information to be available.
*
* The API allows to create both kind of negative caching. */
* With negative caching we remember that the key is not on disk, so if it's
* not in memory and we have a negative cache entry, we don't try a disk
* access at all.
*/
/* Returns true if the specified key may exists on disk, that is, we don't
* have an entry in our negative cache for this key */
int cacheKeyMayExist(redisDb *db, robj *key) {
return dictFind(db->io_negcache,key) == NULL;
}
/* Set the specified key as an entry that may possibily exist on disk, that is,
* remove the negative cache entry for this key if any. */
void cacheSetKeyMayExist(redisDb *db, robj *key) {
dictDelete(db->io_negcache,key);
}
/* Set the specified key as non existing on disk, that is, create a negative
* cache entry for this key. */
void cacheSetKeyDoesNotExist(redisDb *db, robj *key) {
struct dictEntry *de;
/* Don't overwrite negative cached entries with val set to 0, as this
* entries were created with cacheSetKeyDoesNotExistRemember(). */
de = dictFind(db->io_negcache,key);
if (de != NULL && dictGetEntryVal(de) == NULL) return;
if (dictReplace(db->io_negcache,key,(void*)time(NULL))) {
incrRefCount(key);
}
}
void cacheSetKeyDoesNotExistRemember(redisDb *db, robj *key) {
if (dictReplace(db->io_negcache,key,NULL)) {
incrRefCount(key);
}
}
/* ================== Disk store cache - Threaded I/O ====================== */
void freeIOJob(iojob *j) {
@ -379,15 +348,9 @@ void vmThreadedIOCompletedJob(aeEventLoop *el, int fd, void *privdata,
if (j->type == REDIS_IOJOB_LOAD) {
/* Create the key-value pair in the in-memory database */
if (j->val != NULL) {
/* Note: the key may already be here if between the time
* this key loading was scheduled and now there was the
* need to blocking load the key for a key lookup.
*
* Also we don't add a key that was deleted in the
* meantime and should not be on disk either. */
if (cacheKeyMayExist(j->db,j->key) &&
dbAdd(j->db,j->key,j->val) == REDIS_OK)
{
/* Note: it's possible that the key is already in memory
* due to a blocking load operation. */
if (dbAdd(j->db,j->key,j->val) == REDIS_OK) {
incrRefCount(j->val);
if (j->expire != -1) setExpire(j->db,j->key,j->expire);
}
@ -396,20 +359,16 @@ void vmThreadedIOCompletedJob(aeEventLoop *el, int fd, void *privdata,
* for this key. */
cacheSetKeyDoesNotExist(j->db,j->key);
}
/* Handle clients waiting for this key to be loaded. */
cacheScheduleIODelFlag(j->db,j->key,REDIS_IO_LOADINPROG);
handleClientsBlockedOnSwappedKey(j->db,j->key);
freeIOJob(j);
} else if (j->type == REDIS_IOJOB_SAVE) {
if (j->val) {
redisAssert(j->val->storage == REDIS_DS_SAVING);
j->val->storage = REDIS_DS_MEMORY;
cacheSetKeyMayExist(j->db,j->key);
} else {
/* Key deleted. Probably we have this key marked as
* non existing, and impossible to evict, in our negative
* cache entry. Add it as a normal negative cache entry. */
cacheSetKeyMayExist(j->db,j->key);
cacheSetKeyDoesNotExist(j->db,j->key);
}
cacheScheduleIODelFlag(j->db,j->key,REDIS_IO_SAVEINPROG);
freeIOJob(j);
}
processed++;
@ -467,7 +426,6 @@ void *IOThreadEntryPoint(void *arg) {
if (j->val) j->expire = expire;
} else if (j->type == REDIS_IOJOB_SAVE) {
if (j->val) {
redisAssert(j->val->storage == REDIS_DS_SAVING);
dsSet(j->db,j->key,j->val);
} else {
dsDel(j->db,j->key);
@ -588,27 +546,104 @@ void dsCreateIOJob(int type, redisDb *db, robj *key, robj *val) {
unlockThreadedIO();
}
void cacheScheduleForFlush(redisDb *db, robj *key) {
dirtykey *dk;
dictEntry *de;
de = dictFind(db->dict,key->ptr);
if (de) {
robj *val = dictGetEntryVal(de);
if (val->storage == REDIS_DS_DIRTY)
return;
else
val->storage = REDIS_DS_DIRTY;
}
/* ============= Disk store cache - Scheduling of IO operations =============
*
* We use a queue and an hash table to hold the state of IO operations
* so that's fast to lookup if there is already an IO operation in queue
* for a given key.
*
* There are two types of IO operations for a given key:
* REDIS_IO_LOAD and REDIS_IO_SAVE.
*
* The function cacheScheduleIO() function pushes the specified IO operation
* in the queue, but avoid adding the same key for the same operation
* multiple times, thanks to the associated hash table.
*
* We take a set of flags per every key, so when the scheduled IO operation
* gets moved from the scheduled queue to the actual IO Jobs queue that
* is processed by the IO thread, we flag it as IO_LOADINPROG or
* IO_SAVEINPROG.
*
* So for every given key we always know if there is some IO operation
* scheduled, or in progress, for this key.
*
* NOTE: all this is very important in order to guarantee correctness of
* the Disk Store Cache. Jobs are always queued here. Load jobs are
* queued at the head for faster execution only in the case there is not
* already a write operation of some kind for this job.
*
* So we have ordering, but can do exceptions when there are no already
* operations for a given key. Also when we need to block load a given
* key, for an immediate lookup operation, we can check if the key can
* be accessed synchronously without race conditions (no IN PROGRESS
* operations for this key), otherwise we blocking wait for completion. */
redisLog(REDIS_DEBUG,"Scheduling key %s for saving (%s)",key->ptr,
de ? "key exists" : "key does not exist");
dk = zmalloc(sizeof(*dk));
dk->db = db;
dk->key = key;
#define REDIS_IO_LOAD 1
#define REDIS_IO_SAVE 2
#define REDIS_IO_LOADINPROG 4
#define REDIS_IO_SAVEINPROG 8
void cacheScheduleIOAddFlag(redisDb *db, robj *key, long flag) {
struct dictEntry *de = dictFind(db->io_queued,key);
if (!de) {
dictAdd(db->io_queued,key,(void*)flag);
incrRefCount(key);
return;
} else {
long flags = (long) dictGetEntryVal(de);
flags |= flag;
dictGetEntryVal(de) = (void*) flags;
}
}
void cacheScheduleIODelFlag(redisDb *db, robj *key, long flag) {
struct dictEntry *de = dictFind(db->io_queued,key);
long flags;
redisAssert(de != NULL);
flags = (long) dictGetEntryVal(de);
redisAssert(flags & flag);
flags &= ~flag;
if (flags == 0) {
dictDelete(db->io_queued,key);
} else {
dictGetEntryVal(de) = (void*) flags;
}
}
int cacheScheduleIOGetFlags(redisDb *db, robj *key) {
struct dictEntry *de = dictFind(db->io_queued,key);
return (de == NULL) ? 0 : ((long) dictGetEntryVal(de));
}
void cacheScheduleIO(redisDb *db, robj *key, int type) {
ioop *op;
long flags;
if ((flags = cacheScheduleIOGetFlags(db,key)) & type) return;
redisLog(REDIS_DEBUG,"Scheduling key %s for %s",
key->ptr, type == REDIS_IO_LOAD ? "loading" : "saving");
cacheScheduleIOAddFlag(db,key,type);
op = zmalloc(sizeof(*op));
op->type = type;
op->db = db;
op->key = key;
incrRefCount(key);
dk->ctime = time(NULL);
listAddNodeTail(server.cache_flush_queue, dk);
op->ctime = time(NULL);
/* Give priority to load operations if there are no save already
* in queue for the same key. */
if (type == REDIS_IO_LOAD && !(flags & REDIS_IO_SAVE)) {
listAddNodeHead(server.cache_io_queue, op);
} else {
/* FIXME: probably when this happens we want to at least move
* the write job about this queue on top, and set the creation time
* to a value that will force processing ASAP. */
listAddNodeTail(server.cache_io_queue, op);
}
}
void cacheCron(void) {
@ -624,36 +659,47 @@ void cacheCron(void) {
topush = 100-jobs;
if (topush < 0) topush = 0;
while((ln = listFirst(server.cache_flush_queue)) != NULL) {
dirtykey *dk = ln->value;
while((ln = listFirst(server.cache_io_queue)) != NULL) {
ioop *op = ln->value;
if (!topush) break;
topush--;
if ((now - dk->ctime) >= server.cache_flush_delay) {
if (op->type == REDIS_IO_LOAD ||
(now - op->ctime) >= server.cache_flush_delay)
{
struct dictEntry *de;
robj *val;
redisLog(REDIS_DEBUG,"Creating IO Job to save key %s",dk->key->ptr);
redisLog(REDIS_DEBUG,"Creating IO %s Job for key %s",
op->type == REDIS_IO_LOAD ? "load" : "save", op->key->ptr);
/* Lookup the key, in order to put the current value in the IO
* Job and mark it as DS_SAVING.
* Otherwise if the key does not exists we schedule a disk store
* delete operation, setting the value to NULL. */
de = dictFind(dk->db->dict,dk->key->ptr);
if (de) {
val = dictGetEntryVal(de);
redisAssert(val->storage == REDIS_DS_DIRTY);
val->storage = REDIS_DS_SAVING;
if (op->type == REDIS_IO_LOAD) {
dsCreateIOJob(REDIS_IOJOB_LOAD,op->db,op->key,NULL);
} else {
/* Setting the value to NULL tells the IO thread to delete
* the key on disk. */
val = NULL;
/* Lookup the key, in order to put the current value in the IO
* Job. Otherwise if the key does not exists we schedule a disk
* store delete operation, setting the value to NULL. */
de = dictFind(op->db->dict,op->key->ptr);
if (de) {
val = dictGetEntryVal(de);
} else {
/* Setting the value to NULL tells the IO thread to delete
* the key on disk. */
val = NULL;
}
dsCreateIOJob(REDIS_IOJOB_SAVE,op->db,op->key,val);
}
dsCreateIOJob(REDIS_IOJOB_SAVE,dk->db,dk->key,val);
listDelNode(server.cache_flush_queue,ln);
decrRefCount(dk->key);
zfree(dk);
/* Mark the operation as in progress. */
cacheScheduleIODelFlag(op->db,op->key,op->type);
cacheScheduleIOAddFlag(op->db,op->key,
(op->type == REDIS_IO_LOAD) ? REDIS_IO_LOADINPROG :
REDIS_IO_SAVEINPROG);
/* Finally remove the operation from the queue.
* But we'll have trace of it in the hash table. */
listDelNode(server.cache_io_queue,ln);
decrRefCount(op->key);
zfree(op);
} else {
break; /* too early */
}
@ -664,20 +710,14 @@ void cacheCron(void) {
server.cache_max_memory)
{
if (cacheFreeOneEntry() == REDIS_ERR) break;
/* FIXME: also free negative cache entries here. */
}
}
/* ============ Virtual Memory - Blocking clients on missing keys =========== */
/* ========== Disk store cache - Blocking clients on missing keys =========== */
/* This function makes the clinet 'c' waiting for the key 'key' to be loaded.
* If the key is already in memory we don't need to block, regardless
* of the storage of the value object for this key:
*
* - If it's REDIS_DS_MEMORY we have the key in memory.
* - If it's REDIS_DS_DIRTY they key was modified, but still in memory.
* - if it's REDIS_DS_SAVING the key is being saved by an IO Job. When
* the client will lookup the key it will block if the key is still
* in this stage but it's more or less the best we can do.
* If the key is already in memory we don't need to block.
*
* FIXME: we should try if it's actually better to suspend the client
* accessing an object that is being saved, and awake it only when
@ -722,7 +762,7 @@ int waitForSwappedKey(redisClient *c, robj *key) {
/* Are we already loading the key from disk? If not create a job */
if (de == NULL)
dsCreateIOJob(REDIS_IOJOB_LOAD,c->db,key,NULL);
cacheScheduleIO(c->db,key,REDIS_IO_LOAD);
return 1;
}

View File

@ -167,7 +167,6 @@ void _addReplyStringToList(redisClient *c, char *s, size_t len) {
void addReply(redisClient *c, robj *obj) {
if (_installWriteEvent(c) != REDIS_OK) return;
redisAssert(!server.ds_enabled || obj->storage != REDIS_DS_SAVING);
/* This is an important place where we can avoid copy-on-write
* when there is a saving child running, avoiding touching the

View File

@ -21,7 +21,6 @@ robj *createObject(int type, void *ptr) {
/* The following is only needed if VM is active, but since the conditional
* is probably more costly than initializing the field it's better to
* have every field properly initialized anyway. */
o->storage = REDIS_DS_MEMORY;
return o;
}
@ -163,9 +162,6 @@ void decrRefCount(void *obj) {
if (o->refcount <= 0) redisPanic("decrRefCount against refcount <= 0");
if (--(o->refcount) == 0) {
/* DS_SAVING objects should always have a reference in the
* IO Job structure. So we should never reach this state. */
redisAssert(o->storage != REDIS_DS_SAVING);
switch(o->type) {
case REDIS_STRING: freeStringObject(o); break;
case REDIS_LIST: freeListObject(o); break;

View File

@ -826,7 +826,7 @@ void initServer() {
server.slaves = listCreate();
server.monitors = listCreate();
server.unblocked_clients = listCreate();
server.cache_flush_queue = listCreate();
server.cache_io_queue = listCreate();
server.cache_flush_delay = 0;
createSharedObjects();
@ -857,6 +857,7 @@ void initServer() {
if (server.ds_enabled) {
server.db[j].io_keys = dictCreate(&keylistDictType,NULL);
server.db[j].io_negcache = dictCreate(&setDictType,NULL);
server.db[j].io_queued = dictCreate(&setDictType,NULL);
}
server.db[j].id = j;
}

View File

@ -119,10 +119,11 @@
#define REDIS_RDB_ENC_INT32 2 /* 32 bit signed integer */
#define REDIS_RDB_ENC_LZF 3 /* string compressed with FASTLZ */
/* Disk store cache object->storage values */
#define REDIS_DS_MEMORY 0 /* The object is on memory */
#define REDIS_DS_DIRTY 1 /* The object was modified */
#define REDIS_DS_SAVING 2 /* There is an IO Job created for this obj. */
/* Scheduled IO opeations flags. */
#define REDIS_IO_LOAD 1
#define REDIS_IO_SAVE 2
#define REDIS_IO_LOADINPROG 4
#define REDIS_IO_SAVEINPROG 8
#define REDIS_MAX_COMPLETED_JOBS_PROCESSED 1
#define REDIS_THREAD_STACK_SIZE (1024*1024*4)
@ -220,7 +221,7 @@ void _redisPanic(char *msg, char *file, int line);
#define REDIS_LRU_CLOCK_RESOLUTION 10 /* LRU clock resolution in seconds */
typedef struct redisObject {
unsigned type:4;
unsigned storage:2; /* REDIS_VM_MEMORY or REDIS_VM_SWAPPING */
unsigned notused:2; /* Not used */
unsigned encoding:4;
unsigned lru:22; /* lru time (relative to server.lruclock) */
int refcount;
@ -261,15 +262,15 @@ typedef struct vmPointer {
_var.type = REDIS_STRING; \
_var.encoding = REDIS_ENCODING_RAW; \
_var.ptr = _ptr; \
_var.storage = REDIS_DS_MEMORY; \
} while(0);
typedef struct redisDb {
dict *dict; /* The keyspace for this DB */
dict *expires; /* Timeout of keys with a timeout set */
dict *blocking_keys; /* Keys with clients waiting for data (BLPOP) */
dict *io_keys; /* Keys with clients waiting for VM I/O */
dict *io_keys; /* Keys with clients waiting for DS I/O */
dict *io_negcache; /* Negative caching for disk store */
dict *io_queued; /* Queued IO operations hash table */
dict *watched_keys; /* WATCHED keys for MULTI/EXEC CAS */
int id;
} redisDb;
@ -433,7 +434,7 @@ struct redisServer {
unsigned int bpop_blocked_clients;
unsigned int cache_blocked_clients;
list *unblocked_clients; /* list of clients to unblock before next loop */
list *cache_flush_queue; /* keys to flush on disk */
list *cache_io_queue; /* IO operations queue */
int cache_flush_delay; /* seconds to wait before flushing keys */
/* Sort parameters - qsort_r() is only available under BSD so we
* have to take this state global, in order to pass it to sortCompare() */
@ -545,7 +546,7 @@ typedef struct zset {
zskiplist *zsl;
} zset;
/* VM threaded I/O request message */
/* DIsk store threaded I/O request message */
#define REDIS_IOJOB_LOAD 0
#define REDIS_IOJOB_SAVE 1
@ -558,13 +559,13 @@ typedef struct iojob {
time_t expire; /* Expire time for this key on REDIS_IOJOB_LOAD */
} iojob;
/* When diskstore is enabled and a flush operation is requested we push
* one of this structures into server.cache_flush_queue. */
typedef struct dirtykey {
/* IO operations scheduled -- check dscache.c for more info */
typedef struct ioop {
int type;
redisDb *db;
robj *key;
time_t ctime; /* This is the creation time of the entry. */
} dirtykey;
} ioop;
/* Structure to hold list iteration abstraction. */
typedef struct {
@ -807,12 +808,14 @@ int blockClientOnSwappedKeys(redisClient *c, struct redisCommand *cmd);
int dontWaitForSwappedKey(redisClient *c, robj *key);
void handleClientsBlockedOnSwappedKey(redisDb *db, robj *key);
int cacheFreeOneEntry(void);
void cacheScheduleForFlush(redisDb *db, robj *key);
void cacheScheduleIOAddFlag(redisDb *db, robj *key, long flag);
void cacheScheduleIODelFlag(redisDb *db, robj *key, long flag);
int cacheScheduleIOGetFlags(redisDb *db, robj *key);
void cacheScheduleIO(redisDb *db, robj *key, int type);
void cacheCron(void);
int cacheKeyMayExist(redisDb *db, robj *key);
void cacheSetKeyExists(redisDb *db, robj *key);
void cacheSetKeyDoesNotExist(redisDb *db, robj *key);
void cacheSetKeyDoesNotExistRemember(redisDb *db, robj *key);
/* Set data type */
robj *setTypeCreate(robj *value);