redict/src/lazyfree.c

132 lines
5.0 KiB
C

#include "server.h"
#include "bio.h"
#include "atomicvar.h"
#include "cluster.h"
static size_t lazyfree_objects = 0;
pthread_mutex_t lazyfree_objects_mutex = PTHREAD_MUTEX_INITIALIZER;
/* Return the number of currently pending objects to free. */
size_t lazyfreeGetPendingObjectsCount(void) {
return lazyfree_objects;
}
/* Return the amount of work needed in order to free an object.
* The return value is not always the actual number of allocations the
* object is compoesd of, but a number proportional to it.
*
* For strings the function always returns 1.
*
* For aggregated objects represented by hash tables or other data structures
* the function just returns the number of elements the object is composed of.
*
* Objects composed of single allocations are always reported as having a
* single item even if they are actaully logical composed of multiple
* elements.
*
* For lists the funciton returns the number of elements in the quicklist
* representing the list. */
size_t lazyfreeGetFreeEffort(robj *obj) {
if (obj->type == OBJ_LIST) {
quicklist *ql = obj->ptr;
return ql->len;
} else if (obj->type == OBJ_SET && obj->encoding == OBJ_ENCODING_HT) {
dict *ht = obj->ptr;
return dictSize(ht);
} else if (obj->type == OBJ_ZSET && obj->encoding == OBJ_ENCODING_SKIPLIST){
zset *zs = obj->ptr;
return zs->zsl->length;
} else if (obj->type == OBJ_HASH && obj->encoding == OBJ_ENCODING_HT) {
dict *ht = obj->ptr;
return dictSize(ht);
} else {
return 1; /* Everything else is a single allocation. */
}
}
/* Delete a key, value, and associated expiration entry if any, from the DB.
* If there are enough allocations to free the value object may be put into
* a lazy free list instead of being freed synchronously. The lazy free list
* will be reclaimed in a different bio.c thread. */
#define LAZYFREE_THRESHOLD 64
int dbAsyncDelete(redisDb *db, robj *key) {
/* Deleting an entry from the expires dict will not free the sds of
* the key, because it is shared with the main dictionary. */
if (dictSize(db->expires) > 0) dictDelete(db->expires,key->ptr);
/* If the value is composed of a few allocations, to free in a lazy way
* is actually just slower... So under a certain limit we just free
* the object synchronously. */
dictEntry *de = dictFind(db->dict,key->ptr);
if (de) {
robj *val = dictGetVal(de);
size_t free_effort = lazyfreeGetFreeEffort(val);
/* If releasing the object is too much work, let's put it into the
* lazy free list. */
if (free_effort > LAZYFREE_THRESHOLD) {
atomicIncr(lazyfree_objects,1,&lazyfree_objects_mutex);
bioCreateBackgroundJob(BIO_LAZY_FREE,val,NULL,NULL);
dictSetVal(db->dict,de,NULL);
}
}
/* Release the key-val pair, or just the key if we set the val
* field to NULL in order to lazy free it later. */
if (dictDelete(db->dict,key->ptr) == DICT_OK) {
if (server.cluster_enabled) slotToKeyDel(key);
return 1;
} else {
return 0;
}
}
/* Empty a Redis DB asynchronously. What the function does actually is to
* create a new empty set of hash tables and scheduling the old ones for
* lazy freeing. */
void emptyDbAsync(redisDb *db) {
dict *oldht1 = db->dict, *oldht2 = db->expires;
db->dict = dictCreate(&dbDictType,NULL);
db->expires = dictCreate(&keyptrDictType,NULL);
atomicIncr(lazyfree_objects,dictSize(oldht1),
&lazyfree_objects_mutex);
bioCreateBackgroundJob(BIO_LAZY_FREE,NULL,oldht1,oldht2);
}
/* Empty the slots-keys map of Redis CLuster by creating a new empty one
* and scheduiling the old for lazy freeing. */
void slotToKeyFlushAsync(void) {
zskiplist *oldsl = server.cluster->slots_to_keys;
server.cluster->slots_to_keys = zslCreate();
atomicIncr(lazyfree_objects,oldsl->length,
&lazyfree_objects_mutex);
bioCreateBackgroundJob(BIO_LAZY_FREE,NULL,NULL,oldsl);
}
/* Release objects from the lazyfree thread. It's just decrRefCount()
* updating the count of objects to release. */
void lazyfreeFreeObjectFromBioThread(robj *o) {
decrRefCount(o);
atomicDecr(lazyfree_objects,1,&lazyfree_objects_mutex);
}
/* Release a database from the lazyfree thread. The 'db' pointer is the
* database which was substitutied with a fresh one in the main thread
* when the database was logically deleted. 'sl' is a skiplist used by
* Redis Cluster in order to take the hash slots -> keys mapping. This
* may be NULL if Redis Cluster is disabled. */
void lazyfreeFreeDatabaseFromBioThread(dict *ht1, dict *ht2) {
size_t numkeys = dictSize(ht1);
dictRelease(ht1);
dictRelease(ht2);
atomicDecr(lazyfree_objects,numkeys,&lazyfree_objects_mutex);
}
/* Release the skiplist mapping Redis Cluster keys to slots in the
* lazyfree thread. */
void lazyfreeFreeSlotsMapFromBioThread(zskiplist *sl) {
size_t len = sl->length;
zslFree(sl);
atomicDecr(lazyfree_objects,len,&lazyfree_objects_mutex);
}