Modules callbacks for lazy free effort, and unlink (#7912)

Add two optional callbacks to the RedisModuleTypeMethods structure, which is `free_effort`
and `unlink`. the `free_effort` callback indicates the effort required to free a module memory.
Currently, if the effort exceeds LAZYFREE_THRESHOLD, the module memory may be released
asynchronously. the `unlink` callback indicates the key has been removed from the DB by redis, and
may soon be freed by a background thread.

Add `lazyfreed_objects` info field, which represents the number of objects that have been
lazyfreed since redis was started.

Add `RM_GetTypeMethodVersion` API, which return the current redis-server runtime value of
`REDISMODULE_TYPE_METHOD_VERSION`. You can use that when calling `RM_CreateDataType` to know
which fields of RedisModuleTypeMethods are gonna be supported and which will be ignored.
This commit is contained in:
chenyangyang 2020-11-16 16:34:04 +08:00 committed by GitHub
parent d8fd48c436
commit c1aaad06d8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 338 additions and 16 deletions

View File

@ -28,4 +28,5 @@ $TCLSH tests/test_helper.tcl \
--single unit/moduleapi/keyspace_events \ --single unit/moduleapi/keyspace_events \
--single unit/moduleapi/blockedclient \ --single unit/moduleapi/blockedclient \
--single unit/moduleapi/getkeys \ --single unit/moduleapi/getkeys \
--single unit/moduleapi/test_lazyfree \
"${@}" "${@}"

View File

@ -217,10 +217,14 @@ void dbOverwrite(redisDb *db, robj *key, robj *val) {
if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) { if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) {
val->lru = old->lru; val->lru = old->lru;
} }
/* Although the key is not really deleted from the database, we regard
overwrite as two steps of unlink+add, so we still need to call the unlink
callback of the module. */
moduleNotifyKeyUnlink(key,val);
dictSetVal(db->dict, de, val); dictSetVal(db->dict, de, val);
if (server.lazyfree_lazy_server_del) { if (server.lazyfree_lazy_server_del) {
freeObjAsync(old); freeObjAsync(key,old);
dictSetVal(db->dict, &auxentry, NULL); dictSetVal(db->dict, &auxentry, NULL);
} }
@ -298,7 +302,12 @@ int dbSyncDelete(redisDb *db, robj *key) {
/* Deleting an entry from the expires dict will not free the sds of /* Deleting an entry from the expires dict will not free the sds of
* the key, because it is shared with the main dictionary. */ * the key, because it is shared with the main dictionary. */
if (dictSize(db->expires) > 0) dictDelete(db->expires,key->ptr); if (dictSize(db->expires) > 0) dictDelete(db->expires,key->ptr);
if (dictDelete(db->dict,key->ptr) == DICT_OK) { dictEntry *de = dictUnlink(db->dict,key->ptr);
if (de) {
robj *val = dictGetVal(de);
/* Tells the module that the key has been unlinked from the database. */
moduleNotifyKeyUnlink(key,val);
dictFreeUnlinkedEntry(db->dict,de);
if (server.cluster_enabled) slotToKeyDel(key->ptr); if (server.cluster_enabled) slotToKeyDel(key->ptr);
return 1; return 1;
} else { } else {

View File

@ -4,6 +4,7 @@
#include "cluster.h" #include "cluster.h"
static redisAtomic size_t lazyfree_objects = 0; static redisAtomic size_t lazyfree_objects = 0;
static redisAtomic size_t lazyfreed_objects = 0;
/* Return the number of currently pending objects to free. */ /* Return the number of currently pending objects to free. */
size_t lazyfreeGetPendingObjectsCount(void) { size_t lazyfreeGetPendingObjectsCount(void) {
@ -12,6 +13,13 @@ size_t lazyfreeGetPendingObjectsCount(void) {
return aux; return aux;
} }
/* Return the number of objects that have been freed. */
size_t lazyfreeGetFreedObjectsCount(void) {
size_t aux;
atomicGet(lazyfreed_objects,aux);
return aux;
}
/* Return the amount of work needed in order to free an object. /* Return the amount of work needed in order to free an object.
* The return value is not always the actual number of allocations the * The return value is not always the actual number of allocations the
* object is composed of, but a number proportional to it. * object is composed of, but a number proportional to it.
@ -27,7 +35,7 @@ size_t lazyfreeGetPendingObjectsCount(void) {
* *
* For lists the function returns the number of elements in the quicklist * For lists the function returns the number of elements in the quicklist
* representing the list. */ * representing the list. */
size_t lazyfreeGetFreeEffort(robj *obj) { size_t lazyfreeGetFreeEffort(robj *key, robj *obj) {
if (obj->type == OBJ_LIST) { if (obj->type == OBJ_LIST) {
quicklist *ql = obj->ptr; quicklist *ql = obj->ptr;
return ql->len; return ql->len;
@ -64,6 +72,17 @@ size_t lazyfreeGetFreeEffort(robj *obj) {
raxStop(&ri); raxStop(&ri);
} }
return effort; return effort;
} else if (obj->type == OBJ_MODULE) {
moduleValue *mv = obj->ptr;
moduleType *mt = mv->type;
if (mt->free_effort != NULL) {
size_t effort = mt->free_effort(key,mv->value);
/* If the module's free_effort returns 0, it will use asynchronous free
memory by default */
return effort == 0 ? ULONG_MAX : effort;
} else {
return 1;
}
} else { } else {
return 1; /* Everything else is a single allocation. */ return 1; /* Everything else is a single allocation. */
} }
@ -85,7 +104,11 @@ int dbAsyncDelete(redisDb *db, robj *key) {
dictEntry *de = dictUnlink(db->dict,key->ptr); dictEntry *de = dictUnlink(db->dict,key->ptr);
if (de) { if (de) {
robj *val = dictGetVal(de); robj *val = dictGetVal(de);
size_t free_effort = lazyfreeGetFreeEffort(val);
/* Tells the module that the key has been unlinked from the database. */
moduleNotifyKeyUnlink(key,val);
size_t free_effort = lazyfreeGetFreeEffort(key,val);
/* If releasing the object is too much work, do it in the background /* If releasing the object is too much work, do it in the background
* by adding the object to the lazy free list. * by adding the object to the lazy free list.
@ -114,13 +137,13 @@ int dbAsyncDelete(redisDb *db, robj *key) {
} }
/* Free an object, if the object is huge enough, free it in async way. */ /* Free an object, if the object is huge enough, free it in async way. */
void freeObjAsync(robj *o) { void freeObjAsync(robj *key, robj *obj) {
size_t free_effort = lazyfreeGetFreeEffort(o); size_t free_effort = lazyfreeGetFreeEffort(key,obj);
if (free_effort > LAZYFREE_THRESHOLD && o->refcount == 1) { if (free_effort > LAZYFREE_THRESHOLD && obj->refcount == 1) {
atomicIncr(lazyfree_objects,1); atomicIncr(lazyfree_objects,1);
bioCreateBackgroundJob(BIO_LAZY_FREE,o,NULL,NULL); bioCreateBackgroundJob(BIO_LAZY_FREE,obj,NULL,NULL);
} else { } else {
decrRefCount(o); decrRefCount(obj);
} }
} }
@ -152,6 +175,7 @@ void slotToKeyFlushAsync(void) {
void lazyfreeFreeObjectFromBioThread(robj *o) { void lazyfreeFreeObjectFromBioThread(robj *o) {
decrRefCount(o); decrRefCount(o);
atomicDecr(lazyfree_objects,1); atomicDecr(lazyfree_objects,1);
atomicIncr(lazyfreed_objects,1);
} }
/* Release a database from the lazyfree thread. The 'db' pointer is the /* Release a database from the lazyfree thread. The 'db' pointer is the
@ -164,6 +188,7 @@ void lazyfreeFreeDatabaseFromBioThread(dict *ht1, dict *ht2) {
dictRelease(ht1); dictRelease(ht1);
dictRelease(ht2); dictRelease(ht2);
atomicDecr(lazyfree_objects,numkeys); atomicDecr(lazyfree_objects,numkeys);
atomicIncr(lazyfreed_objects,numkeys);
} }
/* Release the skiplist mapping Redis Cluster keys to slots in the /* Release the skiplist mapping Redis Cluster keys to slots in the
@ -172,4 +197,5 @@ void lazyfreeFreeSlotsMapFromBioThread(rax *rt) {
size_t len = rt->numele; size_t len = rt->numele;
raxFree(rt); raxFree(rt);
atomicDecr(lazyfree_objects,len); atomicDecr(lazyfree_objects,len);
atomicIncr(lazyfreed_objects,len);
} }

View File

@ -3666,6 +3666,8 @@ void moduleTypeNameByID(char *name, uint64_t moduleid) {
* .mem_usage = myType_MemUsageCallBack, * .mem_usage = myType_MemUsageCallBack,
* .aux_load = myType_AuxRDBLoadCallBack, * .aux_load = myType_AuxRDBLoadCallBack,
* .aux_save = myType_AuxRDBSaveCallBack, * .aux_save = myType_AuxRDBSaveCallBack,
* .free_effort = myType_FreeEffortCallBack
* .unlink = myType_UnlinkCallBack
* } * }
* *
* * **rdb_load**: A callback function pointer that loads data from RDB files. * * **rdb_load**: A callback function pointer that loads data from RDB files.
@ -3677,6 +3679,14 @@ void moduleTypeNameByID(char *name, uint64_t moduleid) {
* 'when' argument is either REDISMODULE_AUX_BEFORE_RDB or REDISMODULE_AUX_AFTER_RDB. * 'when' argument is either REDISMODULE_AUX_BEFORE_RDB or REDISMODULE_AUX_AFTER_RDB.
* * **aux_load**: A callback function pointer that loads out of keyspace data from RDB files. * * **aux_load**: A callback function pointer that loads out of keyspace data from RDB files.
* Similar to aux_save, returns REDISMODULE_OK on success, and ERR otherwise. * Similar to aux_save, returns REDISMODULE_OK on success, and ERR otherwise.
* * **free_effort**: A callback function pointer that used to determine whether the module's
* memory needs to be lazy reclaimed. The module should return the complexity involved by
* freeing the value. for example: how many pointers are gonna be freed. Note that if it
* returns 0, we'll always do an async free.
* * **unlink**: A callback function pointer that used to notifies the module that the key has
* been removed from the DB by redis, and may soon be freed by a background thread. Note that
* it won't be called on FLUSHALL/FLUSHDB (both sync and async), and the module can use the
* RedisModuleEvent_FlushDB to hook into that.
* *
* The **digest** and **mem_usage** methods should currently be omitted since * The **digest** and **mem_usage** methods should currently be omitted since
* they are not yet implemented inside the Redis modules core. * they are not yet implemented inside the Redis modules core.
@ -3720,6 +3730,10 @@ moduleType *RM_CreateDataType(RedisModuleCtx *ctx, const char *name, int encver,
moduleTypeAuxSaveFunc aux_save; moduleTypeAuxSaveFunc aux_save;
int aux_save_triggers; int aux_save_triggers;
} v2; } v2;
struct {
moduleTypeFreeEffortFunc free_effort;
moduleTypeUnlinkFunc unlink;
} v3;
} *tms = (struct typemethods*) typemethods_ptr; } *tms = (struct typemethods*) typemethods_ptr;
moduleType *mt = zcalloc(sizeof(*mt)); moduleType *mt = zcalloc(sizeof(*mt));
@ -3736,6 +3750,10 @@ moduleType *RM_CreateDataType(RedisModuleCtx *ctx, const char *name, int encver,
mt->aux_save = tms->v2.aux_save; mt->aux_save = tms->v2.aux_save;
mt->aux_save_triggers = tms->v2.aux_save_triggers; mt->aux_save_triggers = tms->v2.aux_save_triggers;
} }
if (tms->version >= 3) {
mt->free_effort = tms->v3.free_effort;
mt->unlink = tms->v3.unlink;
}
memcpy(mt->name,name,sizeof(mt->name)); memcpy(mt->name,name,sizeof(mt->name));
listAddNodeTail(ctx->module->types,mt); listAddNodeTail(ctx->module->types,mt);
return mt; return mt;
@ -7489,6 +7507,18 @@ void processModuleLoadingProgressEvent(int is_aof) {
} }
} }
/* When a module key is deleted (in dbAsyncDelete/dbSyncDelete/dbOverwrite), it
* will be called to tell the module which key is about to be released. */
void moduleNotifyKeyUnlink(robj *key, robj *val) {
if (val->type == OBJ_MODULE) {
moduleValue *mv = val->ptr;
moduleType *mt = mv->type;
if (mt->unlink != NULL) {
mt->unlink(key,mv->value);
}
}
}
/* -------------------------------------------------------------------------- /* --------------------------------------------------------------------------
* Modules API internals * Modules API internals
* -------------------------------------------------------------------------- */ * -------------------------------------------------------------------------- */
@ -7990,6 +8020,15 @@ int RM_GetServerVersion() {
return REDIS_VERSION_NUM; return REDIS_VERSION_NUM;
} }
/**
* Return the current redis-server runtime value of REDISMODULE_TYPE_METHOD_VERSION.
* You can use that when calling RM_CreateDataType to know which fields of
* RedisModuleTypeMethods are gonna be supported and which will be ignored.
*/
int RM_GetTypeMethodVersion() {
return REDISMODULE_TYPE_METHOD_VERSION;
}
/* Replace the value assigned to a module type. /* Replace the value assigned to a module type.
* *
* The key must be open for writing, have an existing value, and have a moduleType * The key must be open for writing, have an existing value, and have a moduleType
@ -8329,4 +8368,5 @@ void moduleRegisterCoreAPI(void) {
REGISTER_API(GetServerVersion); REGISTER_API(GetServerVersion);
REGISTER_API(GetClientCertificate); REGISTER_API(GetClientCertificate);
REGISTER_API(GetCommandKeys); REGISTER_API(GetCommandKeys);
REGISTER_API(GetTypeMethodVersion);
} }

View File

@ -14,6 +14,10 @@
/* API versions. */ /* API versions. */
#define REDISMODULE_APIVER_1 1 #define REDISMODULE_APIVER_1 1
/* Version of the RedisModuleTypeMethods structure. Once the RedisModuleTypeMethods
* structure is changed, this version number needs to be changed synchronistically. */
#define REDISMODULE_TYPE_METHOD_VERSION 3
/* API flags and constants */ /* API flags and constants */
#define REDISMODULE_READ (1<<0) #define REDISMODULE_READ (1<<0)
#define REDISMODULE_WRITE (1<<1) #define REDISMODULE_WRITE (1<<1)
@ -485,6 +489,8 @@ typedef void (*RedisModuleTypeRewriteFunc)(RedisModuleIO *aof, RedisModuleString
typedef size_t (*RedisModuleTypeMemUsageFunc)(const void *value); typedef size_t (*RedisModuleTypeMemUsageFunc)(const void *value);
typedef void (*RedisModuleTypeDigestFunc)(RedisModuleDigest *digest, void *value); typedef void (*RedisModuleTypeDigestFunc)(RedisModuleDigest *digest, void *value);
typedef void (*RedisModuleTypeFreeFunc)(void *value); typedef void (*RedisModuleTypeFreeFunc)(void *value);
typedef size_t (*RedisModuleTypeFreeEffortFunc)(RedisModuleString *key, const void *value);
typedef void (*RedisModuleTypeUnlinkFunc)(RedisModuleString *key, const void *value);
typedef void (*RedisModuleClusterMessageReceiver)(RedisModuleCtx *ctx, const char *sender_id, uint8_t type, const unsigned char *payload, uint32_t len); typedef void (*RedisModuleClusterMessageReceiver)(RedisModuleCtx *ctx, const char *sender_id, uint8_t type, const unsigned char *payload, uint32_t len);
typedef void (*RedisModuleTimerProc)(RedisModuleCtx *ctx, void *data); typedef void (*RedisModuleTimerProc)(RedisModuleCtx *ctx, void *data);
typedef void (*RedisModuleCommandFilterFunc) (RedisModuleCommandFilterCtx *filter); typedef void (*RedisModuleCommandFilterFunc) (RedisModuleCommandFilterCtx *filter);
@ -494,7 +500,6 @@ typedef void (*RedisModuleScanCB)(RedisModuleCtx *ctx, RedisModuleString *keynam
typedef void (*RedisModuleScanKeyCB)(RedisModuleKey *key, RedisModuleString *field, RedisModuleString *value, void *privdata); typedef void (*RedisModuleScanKeyCB)(RedisModuleKey *key, RedisModuleString *field, RedisModuleString *value, void *privdata);
typedef void (*RedisModuleUserChangedFunc) (uint64_t client_id, void *privdata); typedef void (*RedisModuleUserChangedFunc) (uint64_t client_id, void *privdata);
#define REDISMODULE_TYPE_METHOD_VERSION 2
typedef struct RedisModuleTypeMethods { typedef struct RedisModuleTypeMethods {
uint64_t version; uint64_t version;
RedisModuleTypeLoadFunc rdb_load; RedisModuleTypeLoadFunc rdb_load;
@ -506,6 +511,8 @@ typedef struct RedisModuleTypeMethods {
RedisModuleTypeAuxLoadFunc aux_load; RedisModuleTypeAuxLoadFunc aux_load;
RedisModuleTypeAuxSaveFunc aux_save; RedisModuleTypeAuxSaveFunc aux_save;
int aux_save_triggers; int aux_save_triggers;
RedisModuleTypeFreeEffortFunc free_effort;
RedisModuleTypeUnlinkFunc unlink;
} RedisModuleTypeMethods; } RedisModuleTypeMethods;
#define REDISMODULE_GET_API(name) \ #define REDISMODULE_GET_API(name) \
@ -706,6 +713,7 @@ REDISMODULE_API int (*RedisModule_GetContextFlagsAll)() REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_GetKeyspaceNotificationFlagsAll)() REDISMODULE_ATTR; REDISMODULE_API int (*RedisModule_GetKeyspaceNotificationFlagsAll)() REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_IsSubEventSupported)(RedisModuleEvent event, uint64_t subevent) REDISMODULE_ATTR; REDISMODULE_API int (*RedisModule_IsSubEventSupported)(RedisModuleEvent event, uint64_t subevent) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_GetServerVersion)() REDISMODULE_ATTR; REDISMODULE_API int (*RedisModule_GetServerVersion)() REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_GetTypeMethodVersion)() REDISMODULE_ATTR;
/* Experimental APIs */ /* Experimental APIs */
#ifdef REDISMODULE_EXPERIMENTAL_API #ifdef REDISMODULE_EXPERIMENTAL_API
@ -956,6 +964,7 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int
REDISMODULE_GET_API(GetKeyspaceNotificationFlagsAll); REDISMODULE_GET_API(GetKeyspaceNotificationFlagsAll);
REDISMODULE_GET_API(IsSubEventSupported); REDISMODULE_GET_API(IsSubEventSupported);
REDISMODULE_GET_API(GetServerVersion); REDISMODULE_GET_API(GetServerVersion);
REDISMODULE_GET_API(GetTypeMethodVersion);
#ifdef REDISMODULE_EXPERIMENTAL_API #ifdef REDISMODULE_EXPERIMENTAL_API
REDISMODULE_GET_API(GetThreadSafeContext); REDISMODULE_GET_API(GetThreadSafeContext);

View File

@ -4404,7 +4404,8 @@ sds genRedisInfoString(const char *section) {
"mem_aof_buffer:%zu\r\n" "mem_aof_buffer:%zu\r\n"
"mem_allocator:%s\r\n" "mem_allocator:%s\r\n"
"active_defrag_running:%d\r\n" "active_defrag_running:%d\r\n"
"lazyfree_pending_objects:%zu\r\n", "lazyfree_pending_objects:%zu\r\n"
"lazyfreed_objects:%zu\r\n",
zmalloc_used, zmalloc_used,
hmem, hmem,
server.cron_malloc_stats.process_rss, server.cron_malloc_stats.process_rss,
@ -4447,7 +4448,8 @@ sds genRedisInfoString(const char *section) {
mh->aof_buffer, mh->aof_buffer,
ZMALLOC_LIB, ZMALLOC_LIB,
server.active_defrag_running, server.active_defrag_running,
lazyfreeGetPendingObjectsCount() lazyfreeGetPendingObjectsCount(),
lazyfreeGetFreedObjectsCount()
); );
freeMemoryOverheadData(mh); freeMemoryOverheadData(mh);
} }

View File

@ -513,6 +513,8 @@ typedef void (*moduleTypeRewriteFunc)(struct RedisModuleIO *io, struct redisObje
typedef void (*moduleTypeDigestFunc)(struct RedisModuleDigest *digest, void *value); typedef void (*moduleTypeDigestFunc)(struct RedisModuleDigest *digest, void *value);
typedef size_t (*moduleTypeMemUsageFunc)(const void *value); typedef size_t (*moduleTypeMemUsageFunc)(const void *value);
typedef void (*moduleTypeFreeFunc)(void *value); typedef void (*moduleTypeFreeFunc)(void *value);
typedef size_t (*moduleTypeFreeEffortFunc)(struct redisObject *key, const void *value);
typedef void (*moduleTypeUnlinkFunc)(struct redisObject *key, void *value);
/* This callback type is called by moduleNotifyUserChanged() every time /* This callback type is called by moduleNotifyUserChanged() every time
* a user authenticated via the module API is associated with a different * a user authenticated via the module API is associated with a different
@ -532,6 +534,8 @@ typedef struct RedisModuleType {
moduleTypeMemUsageFunc mem_usage; moduleTypeMemUsageFunc mem_usage;
moduleTypeDigestFunc digest; moduleTypeDigestFunc digest;
moduleTypeFreeFunc free; moduleTypeFreeFunc free;
moduleTypeFreeEffortFunc free_effort;
moduleTypeUnlinkFunc unlink;
moduleTypeAuxLoadFunc aux_load; moduleTypeAuxLoadFunc aux_load;
moduleTypeAuxSaveFunc aux_save; moduleTypeAuxSaveFunc aux_save;
int aux_save_triggers; int aux_save_triggers;
@ -1651,6 +1655,7 @@ int moduleTryServeClientBlockedOnKey(client *c, robj *key);
void moduleUnblockClient(client *c); void moduleUnblockClient(client *c);
int moduleClientIsBlockedOnKeys(client *c); int moduleClientIsBlockedOnKeys(client *c);
void moduleNotifyUserChanged(client *c); void moduleNotifyUserChanged(client *c);
void moduleNotifyKeyUnlink(robj *key, robj *val);
/* Utils */ /* Utils */
long long ustime(void); long long ustime(void);
@ -2201,7 +2206,8 @@ int dbAsyncDelete(redisDb *db, robj *key);
void emptyDbAsync(redisDb *db); void emptyDbAsync(redisDb *db);
void slotToKeyFlushAsync(void); void slotToKeyFlushAsync(void);
size_t lazyfreeGetPendingObjectsCount(void); size_t lazyfreeGetPendingObjectsCount(void);
void freeObjAsync(robj *o); size_t lazyfreeGetFreedObjectsCount(void);
void freeObjAsync(robj *key, robj *obj);
/* API to get key arguments from commands */ /* API to get key arguments from commands */
int *getKeysPrepareResult(getKeysResult *result, int numkeys); int *getKeysPrepareResult(getKeysResult *result, int numkeys);

View File

@ -1080,7 +1080,7 @@ void sunionDiffGenericCommand(client *c, robj **setkeys, int setnum,
sdsfree(ele); sdsfree(ele);
} }
setTypeReleaseIterator(si); setTypeReleaseIterator(si);
server.lazyfree_lazy_server_del ? freeObjAsync(dstset) : server.lazyfree_lazy_server_del ? freeObjAsync(NULL, dstset) :
decrRefCount(dstset); decrRefCount(dstset);
} else { } else {
/* If we have a target key where to store the resulting set /* If we have a target key where to store the resulting set

View File

@ -26,7 +26,8 @@ TEST_MODULES = \
keyspace_events.so \ keyspace_events.so \
blockedclient.so \ blockedclient.so \
getkeys.so \ getkeys.so \
timer.so test_lazyfree.so \
timer.so \
.PHONY: all .PHONY: all

View File

@ -0,0 +1,196 @@
/* This module emulates a linked list for lazyfree testing of modules, which
is a simplified version of 'hellotype.c'
*/
#include "redismodule.h"
#include <stdio.h>
#include <stdlib.h>
#include <ctype.h>
#include <string.h>
#include <stdint.h>
static RedisModuleType *LazyFreeLinkType;
struct LazyFreeLinkNode {
int64_t value;
struct LazyFreeLinkNode *next;
};
struct LazyFreeLinkObject {
struct LazyFreeLinkNode *head;
size_t len; /* Number of elements added. */
};
struct LazyFreeLinkObject *createLazyFreeLinkObject(void) {
struct LazyFreeLinkObject *o;
o = RedisModule_Alloc(sizeof(*o));
o->head = NULL;
o->len = 0;
return o;
}
void LazyFreeLinkInsert(struct LazyFreeLinkObject *o, int64_t ele) {
struct LazyFreeLinkNode *next = o->head, *newnode, *prev = NULL;
while(next && next->value < ele) {
prev = next;
next = next->next;
}
newnode = RedisModule_Alloc(sizeof(*newnode));
newnode->value = ele;
newnode->next = next;
if (prev) {
prev->next = newnode;
} else {
o->head = newnode;
}
o->len++;
}
void LazyFreeLinkReleaseObject(struct LazyFreeLinkObject *o) {
struct LazyFreeLinkNode *cur, *next;
cur = o->head;
while(cur) {
next = cur->next;
RedisModule_Free(cur);
cur = next;
}
RedisModule_Free(o);
}
/* LAZYFREELINK.INSERT key value */
int LazyFreeLinkInsert_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
RedisModule_AutoMemory(ctx); /* Use automatic memory management. */
if (argc != 3) return RedisModule_WrongArity(ctx);
RedisModuleKey *key = RedisModule_OpenKey(ctx,argv[1],
REDISMODULE_READ|REDISMODULE_WRITE);
int type = RedisModule_KeyType(key);
if (type != REDISMODULE_KEYTYPE_EMPTY &&
RedisModule_ModuleTypeGetType(key) != LazyFreeLinkType)
{
return RedisModule_ReplyWithError(ctx,REDISMODULE_ERRORMSG_WRONGTYPE);
}
long long value;
if ((RedisModule_StringToLongLong(argv[2],&value) != REDISMODULE_OK)) {
return RedisModule_ReplyWithError(ctx,"ERR invalid value: must be a signed 64 bit integer");
}
struct LazyFreeLinkObject *hto;
if (type == REDISMODULE_KEYTYPE_EMPTY) {
hto = createLazyFreeLinkObject();
RedisModule_ModuleTypeSetValue(key,LazyFreeLinkType,hto);
} else {
hto = RedisModule_ModuleTypeGetValue(key);
}
LazyFreeLinkInsert(hto,value);
RedisModule_SignalKeyAsReady(ctx,argv[1]);
RedisModule_ReplyWithLongLong(ctx,hto->len);
RedisModule_ReplicateVerbatim(ctx);
return REDISMODULE_OK;
}
/* LAZYFREELINK.LEN key */
int LazyFreeLinkLen_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
RedisModule_AutoMemory(ctx); /* Use automatic memory management. */
if (argc != 2) return RedisModule_WrongArity(ctx);
RedisModuleKey *key = RedisModule_OpenKey(ctx,argv[1],
REDISMODULE_READ|REDISMODULE_WRITE);
int type = RedisModule_KeyType(key);
if (type != REDISMODULE_KEYTYPE_EMPTY &&
RedisModule_ModuleTypeGetType(key) != LazyFreeLinkType)
{
return RedisModule_ReplyWithError(ctx,REDISMODULE_ERRORMSG_WRONGTYPE);
}
struct LazyFreeLinkObject *hto = RedisModule_ModuleTypeGetValue(key);
RedisModule_ReplyWithLongLong(ctx,hto ? hto->len : 0);
return REDISMODULE_OK;
}
void *LazyFreeLinkRdbLoad(RedisModuleIO *rdb, int encver) {
if (encver != 0) {
return NULL;
}
uint64_t elements = RedisModule_LoadUnsigned(rdb);
struct LazyFreeLinkObject *hto = createLazyFreeLinkObject();
while(elements--) {
int64_t ele = RedisModule_LoadSigned(rdb);
LazyFreeLinkInsert(hto,ele);
}
return hto;
}
void LazyFreeLinkRdbSave(RedisModuleIO *rdb, void *value) {
struct LazyFreeLinkObject *hto = value;
struct LazyFreeLinkNode *node = hto->head;
RedisModule_SaveUnsigned(rdb,hto->len);
while(node) {
RedisModule_SaveSigned(rdb,node->value);
node = node->next;
}
}
void LazyFreeLinkAofRewrite(RedisModuleIO *aof, RedisModuleString *key, void *value) {
struct LazyFreeLinkObject *hto = value;
struct LazyFreeLinkNode *node = hto->head;
while(node) {
RedisModule_EmitAOF(aof,"LAZYFREELINK.INSERT","sl",key,node->value);
node = node->next;
}
}
void LazyFreeLinkFree(void *value) {
LazyFreeLinkReleaseObject(value);
}
size_t LazyFreeLinkFreeEffort(RedisModuleString *key, const void *value) {
REDISMODULE_NOT_USED(key);
const struct LazyFreeLinkObject *hto = value;
return hto->len;
}
void LazyFreeLinkUnlink(RedisModuleString *key, const void *value) {
REDISMODULE_NOT_USED(key);
REDISMODULE_NOT_USED(value);
/* Here you can know which key and value is about to be freed. */
}
int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
REDISMODULE_NOT_USED(argv);
REDISMODULE_NOT_USED(argc);
if (RedisModule_Init(ctx,"lazyfreetest",1,REDISMODULE_APIVER_1)
== REDISMODULE_ERR) return REDISMODULE_ERR;
/* We only allow our module to be loaded when the redis core version is greater than the version of my module */
if (RedisModule_GetTypeMethodVersion() < REDISMODULE_TYPE_METHOD_VERSION) {
return REDISMODULE_ERR;
}
RedisModuleTypeMethods tm = {
.version = REDISMODULE_TYPE_METHOD_VERSION,
.rdb_load = LazyFreeLinkRdbLoad,
.rdb_save = LazyFreeLinkRdbSave,
.aof_rewrite = LazyFreeLinkAofRewrite,
.free = LazyFreeLinkFree,
.free_effort = LazyFreeLinkFreeEffort,
.unlink = LazyFreeLinkUnlink,
};
LazyFreeLinkType = RedisModule_CreateDataType(ctx,"test_lazy",0,&tm);
if (LazyFreeLinkType == NULL) return REDISMODULE_ERR;
if (RedisModule_CreateCommand(ctx,"lazyfreelink.insert",
LazyFreeLinkInsert_RedisCommand,"write deny-oom",1,1,1) == REDISMODULE_ERR)
return REDISMODULE_ERR;
if (RedisModule_CreateCommand(ctx,"lazyfreelink.len",
LazyFreeLinkLen_RedisCommand,"readonly",1,1,1) == REDISMODULE_ERR)
return REDISMODULE_ERR;
return REDISMODULE_OK;
}

View File

@ -0,0 +1,32 @@
set testmodule [file normalize tests/modules/test_lazyfree.so]
start_server {tags {"modules"}} {
r module load $testmodule
test "modules allocated memory can be reclaimed in the background" {
set orig_mem [s used_memory]
set rd [redis_deferring_client]
# LAZYFREE_THRESHOLD is 64
for {set i 0} {$i < 10000} {incr i} {
$rd lazyfreelink.insert lazykey $i
}
for {set j 0} {$j < 10000} {incr j} {
$rd read
}
assert {[r lazyfreelink.len lazykey] == 10000}
set peak_mem [s used_memory]
assert {[r unlink lazykey] == 1}
assert {$peak_mem > $orig_mem+10000}
wait_for_condition 50 100 {
[s used_memory] < $peak_mem &&
[s used_memory] < $orig_mem*2 &&
[string match {*lazyfreed_objects:1*} [r info Memory]]
} else {
fail "Module memory is not reclaimed by UNLINK"
}
}
}