diff --git a/runtest-moduleapi b/runtest-moduleapi index 11d496a27..a9b6e50c7 100755 --- a/runtest-moduleapi +++ b/runtest-moduleapi @@ -36,4 +36,5 @@ $TCLSH tests/test_helper.tcl \ --single unit/moduleapi/hash \ --single unit/moduleapi/zset \ --single unit/moduleapi/stream \ +--single unit/moduleapi/datatype2 \ "${@}" diff --git a/src/aof.c b/src/aof.c index 1d04b5223..30474b67d 100644 --- a/src/aof.c +++ b/src/aof.c @@ -1386,11 +1386,11 @@ int rewriteStreamObject(rio *r, robj *key, robj *o) { /* Call the module type callback in order to rewrite a data type * that is exported by a module and is not handled by Redis itself. * The function returns 0 on error, 1 on success. */ -int rewriteModuleObject(rio *r, robj *key, robj *o) { +int rewriteModuleObject(rio *r, robj *key, robj *o, int dbid) { RedisModuleIO io; moduleValue *mv = o->ptr; moduleType *mt = mv->type; - moduleInitIOContext(io,mt,r,key); + moduleInitIOContext(io,mt,r,key,dbid); mt->aof_rewrite(&io,key,mv->value); if (io.ctx) { moduleFreeContext(io.ctx); @@ -1464,7 +1464,7 @@ int rewriteAppendOnlyFileRio(rio *aof) { } else if (o->type == OBJ_STREAM) { if (rewriteStreamObject(aof,&key,o) == 0) goto werr; } else if (o->type == OBJ_MODULE) { - if (rewriteModuleObject(aof,&key,o) == 0) goto werr; + if (rewriteModuleObject(aof,&key,o,j) == 0) goto werr; } else { serverPanic("Unknown object type"); } diff --git a/src/cluster.c b/src/cluster.c index aaecd5aea..1dce37a2f 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -5062,7 +5062,7 @@ NULL /* Generates a DUMP-format representation of the object 'o', adding it to the * io stream pointed by 'rio'. This function can't fail. */ -void createDumpPayload(rio *payload, robj *o, robj *key) { +void createDumpPayload(rio *payload, robj *o, robj *key, int dbid) { unsigned char buf[2]; uint64_t crc; @@ -5070,7 +5070,7 @@ void createDumpPayload(rio *payload, robj *o, robj *key) { * byte followed by the serialized object. This is understood by RESTORE. */ rioInitWithBuffer(payload,sdsempty()); serverAssert(rdbSaveObjectType(payload,o)); - serverAssert(rdbSaveObject(payload,o,key)); + serverAssert(rdbSaveObject(payload,o,key,dbid)); /* Write the footer, this is how it looks like: * ----------------+---------------------+---------------+ @@ -5131,7 +5131,7 @@ void dumpCommand(client *c) { } /* Create the DUMP encoded representation. */ - createDumpPayload(&payload,o,c->argv[1]); + createDumpPayload(&payload,o,c->argv[1],c->db->id); /* Transfer to the client */ addReplyBulkSds(c,payload.io.buffer.ptr); @@ -5203,7 +5203,7 @@ void restoreCommand(client *c) { rioInitWithBuffer(&payload,c->argv[3]->ptr); if (((type = rdbLoadObjectType(&payload)) == -1) || - ((obj = rdbLoadObject(type,&payload,key->ptr)) == NULL)) + ((obj = rdbLoadObject(type,&payload,key->ptr,c->db->id)) == NULL)) { addReplyError(c,"Bad data format"); return; @@ -5523,7 +5523,7 @@ try_again: /* Emit the payload argument, that is the serialized object using * the DUMP format. */ - createDumpPayload(&payload,ov[j],kv[j]); + createDumpPayload(&payload,ov[j],kv[j],dbid); serverAssertWithInfo(c,NULL, rioWriteBulkString(&cmd,payload.io.buffer.ptr, sdslen(payload.io.buffer.ptr))); diff --git a/src/db.c b/src/db.c index 5a71d7fd1..79d482ab9 100644 --- a/src/db.c +++ b/src/db.c @@ -233,11 +233,11 @@ void dbOverwrite(redisDb *db, robj *key, robj *val) { /* Although the key is not really deleted from the database, we regard overwrite as two steps of unlink+add, so we still need to call the unlink callback of the module. */ - moduleNotifyKeyUnlink(key,old); + moduleNotifyKeyUnlink(key,old,db->id); dictSetVal(db->dict, de, val); if (server.lazyfree_lazy_server_del) { - freeObjAsync(key,old); + freeObjAsync(key,old,db->id); dictSetVal(db->dict, &auxentry, NULL); } @@ -319,7 +319,7 @@ int dbSyncDelete(redisDb *db, robj *key) { if (de) { robj *val = dictGetVal(de); /* Tells the module that the key has been unlinked from the database. */ - moduleNotifyKeyUnlink(key,val); + moduleNotifyKeyUnlink(key,val,db->id); dictFreeUnlinkedEntry(db->dict,de); if (server.cluster_enabled) slotToKeyDel(key->ptr); return 1; @@ -1277,7 +1277,7 @@ void copyCommand(client *c) { case OBJ_HASH: newobj = hashTypeDup(o); break; case OBJ_STREAM: newobj = streamDup(o); break; case OBJ_MODULE: - newobj = moduleTypeDupOrReply(c, key, newkey, o); + newobj = moduleTypeDupOrReply(c, key, newkey, dst->id, o); if (!newobj) return; break; default: diff --git a/src/debug.c b/src/debug.c index 53affc015..c85bdb2dd 100644 --- a/src/debug.c +++ b/src/debug.c @@ -249,7 +249,7 @@ void xorObjectDigest(redisDb *db, robj *keyobj, unsigned char *digest, robj *o) } streamIteratorStop(&si); } else if (o->type == OBJ_MODULE) { - RedisModuleDigest md = {{0},{0}}; + RedisModuleDigest md = {{0},{0},keyobj,db->id}; moduleValue *mv = o->ptr; moduleType *mt = mv->type; moduleInitDigestContext(md); @@ -607,7 +607,7 @@ NULL "encoding:%s serializedlength:%zu " "lru:%d lru_seconds_idle:%llu%s", (void*)val, val->refcount, - strenc, rdbSavedObjectLen(val, c->argv[2]), + strenc, rdbSavedObjectLen(val, c->argv[2], c->db->id), val->lru, estimateObjectIdleTime(val)/1000, extra); } else if (!strcasecmp(c->argv[1]->ptr,"sdslen") && c->argc == 3) { dictEntry *de; diff --git a/src/defrag.c b/src/defrag.c index 4131d3349..15230b5d2 100644 --- a/src/defrag.c +++ b/src/defrag.c @@ -803,7 +803,7 @@ long defragModule(redisDb *db, dictEntry *kde) { serverAssert(obj->type == OBJ_MODULE); long defragged = 0; - if (!moduleDefragValue(dictGetKey(kde), obj, &defragged)) + if (!moduleDefragValue(dictGetKey(kde), obj, &defragged, db->id)) defragLater(db, kde); return defragged; @@ -945,7 +945,7 @@ long defragOtherGlobals() { /* returns 0 more work may or may not be needed (see non-zero cursor), * and 1 if time is up and more work is needed. */ -int defragLaterItem(dictEntry *de, unsigned long *cursor, long long endtime) { +int defragLaterItem(dictEntry *de, unsigned long *cursor, long long endtime, int dbid) { if (de) { robj *ob = dictGetVal(de); if (ob->type == OBJ_LIST) { @@ -959,7 +959,7 @@ int defragLaterItem(dictEntry *de, unsigned long *cursor, long long endtime) { } else if (ob->type == OBJ_STREAM) { return scanLaterStreamListpacks(ob, cursor, endtime, &server.stat_active_defrag_hits); } else if (ob->type == OBJ_MODULE) { - return moduleLateDefrag(dictGetKey(de), ob, cursor, endtime, &server.stat_active_defrag_hits); + return moduleLateDefrag(dictGetKey(de), ob, cursor, endtime, &server.stat_active_defrag_hits, dbid); } else { *cursor = 0; /* object type may have changed since we schedule it for later */ } @@ -1008,7 +1008,7 @@ int defragLaterStep(redisDb *db, long long endtime) { key_defragged = server.stat_active_defrag_hits; do { int quit = 0; - if (defragLaterItem(de, &defrag_later_cursor, endtime)) + if (defragLaterItem(de, &defrag_later_cursor, endtime,db->id)) quit = 1; /* time is up, we didn't finish all the work */ /* Once in 16 scan iterations, 512 pointer reallocations, or 64 fields diff --git a/src/lazyfree.c b/src/lazyfree.c index bb966e7f5..398ebd194 100644 --- a/src/lazyfree.c +++ b/src/lazyfree.c @@ -90,7 +90,7 @@ void lazyfreeResetStats() { * * For lists the function returns the number of elements in the quicklist * representing the list. */ -size_t lazyfreeGetFreeEffort(robj *key, robj *obj) { +size_t lazyfreeGetFreeEffort(robj *key, robj *obj, int dbid) { if (obj->type == OBJ_LIST) { quicklist *ql = obj->ptr; return ql->len; @@ -128,16 +128,10 @@ size_t lazyfreeGetFreeEffort(robj *key, robj *obj) { } return effort; } else if (obj->type == OBJ_MODULE) { - moduleValue *mv = obj->ptr; - moduleType *mt = mv->type; - if (mt->free_effort != NULL) { - size_t effort = mt->free_effort(key,mv->value); - /* If the module's free_effort returns 0, it will use asynchronous free - memory by default */ - return effort == 0 ? ULONG_MAX : effort; - } else { - return 1; - } + size_t effort = moduleGetFreeEffort(key, obj, dbid); + /* If the module's free_effort returns 0, we will use asynchronous free + * memory by default. */ + return effort == 0 ? ULONG_MAX : effort; } else { return 1; /* Everything else is a single allocation. */ } @@ -161,9 +155,9 @@ int dbAsyncDelete(redisDb *db, robj *key) { robj *val = dictGetVal(de); /* Tells the module that the key has been unlinked from the database. */ - moduleNotifyKeyUnlink(key,val); + moduleNotifyKeyUnlink(key,val,db->id); - size_t free_effort = lazyfreeGetFreeEffort(key,val); + size_t free_effort = lazyfreeGetFreeEffort(key,val,db->id); /* If releasing the object is too much work, do it in the background * by adding the object to the lazy free list. @@ -192,8 +186,8 @@ int dbAsyncDelete(redisDb *db, robj *key) { } /* Free an object, if the object is huge enough, free it in async way. */ -void freeObjAsync(robj *key, robj *obj) { - size_t free_effort = lazyfreeGetFreeEffort(key,obj); +void freeObjAsync(robj *key, robj *obj, int dbid) { + size_t free_effort = lazyfreeGetFreeEffort(key,obj,dbid); if (free_effort > LAZYFREE_THRESHOLD && obj->refcount == 1) { atomicIncr(lazyfree_objects,1); bioCreateLazyFreeJob(lazyfreeFreeObject,1,obj); diff --git a/src/module.c b/src/module.c index fd2f69948..74a53fe8a 100644 --- a/src/module.c +++ b/src/module.c @@ -382,7 +382,15 @@ typedef struct RedisModuleUser { user *user; /* Reference to the real redis user */ } RedisModuleUser; - +/* This is a structure used to export some meta-information such as dbid to the module. */ +typedef struct RedisModuleKeyOptCtx { + struct redisObject *from_key, *to_key; /* Optional name of key processed, NULL when unknown. + In most cases, only 'from_key' is valid, but in callbacks + such as `copy2`, both 'from_key' and 'to_key' are valid. */ + int from_dbid, to_dbid; /* The dbid of the key being processed, -1 when unknown. + In most cases, only 'from_dbid' is valid, but in callbacks such + as `copy2`, 'from_dbid' and 'to_dbid' are both valid. */ +} RedisModuleKeyOptCtx; /* -------------------------------------------------------------------------- * Prototypes * -------------------------------------------------------------------------- */ @@ -2433,6 +2441,25 @@ RedisModuleString *RM_RandomKey(RedisModuleCtx *ctx) { return key; } +/* Returns the name of the key currently being processed. */ +const RedisModuleString *RM_GetKeyNameFromOptCtx(RedisModuleKeyOptCtx *ctx) { + return ctx->from_key; +} + +/* Returns the name of the target key currently being processed. */ +const RedisModuleString *RM_GetToKeyNameFromOptCtx(RedisModuleKeyOptCtx *ctx) { + return ctx->to_key; +} + +/* Returns the dbid currently being processed. */ +int RM_GetDbIdFromOptCtx(RedisModuleKeyOptCtx *ctx) { + return ctx->from_dbid; +} + +/* Returns the target dbid currently being processed. */ +int RM_GetToDbIdFromOptCtx(RedisModuleKeyOptCtx *ctx) { + return ctx->to_dbid; +} /* -------------------------------------------------------------------------- * ## Key API for String type * @@ -4374,14 +4401,21 @@ const char *moduleTypeModuleName(moduleType *mt) { /* Create a copy of a module type value using the copy callback. If failed * or not supported, produce an error reply and return NULL. */ -robj *moduleTypeDupOrReply(client *c, robj *fromkey, robj *tokey, robj *value) { +robj *moduleTypeDupOrReply(client *c, robj *fromkey, robj *tokey, int todb, robj *value) { moduleValue *mv = value->ptr; moduleType *mt = mv->type; - if (!mt->copy) { + if (!mt->copy && !mt->copy2) { addReplyError(c, "not supported for this module key"); return NULL; } - void *newval = mt->copy(fromkey, tokey, mv->value); + void *newval = NULL; + if (mt->copy2 != NULL) { + RedisModuleKeyOptCtx ctx = {fromkey, tokey, c->db->id, todb}; + newval = mt->copy2(&ctx, mv->value); + } else { + newval = mt->copy(fromkey, tokey, mv->value); + } + if (!newval) { addReplyError(c, "module key failed to copy"); return NULL; @@ -4431,6 +4465,12 @@ robj *moduleTypeDupOrReply(client *c, robj *fromkey, robj *tokey, robj *value) { * .unlink = myType_UnlinkCallBack, * .copy = myType_CopyCallback, * .defrag = myType_DefragCallback + * + * // Enhanced optional fields + * .mem_usage2 = myType_MemUsageCallBack2, + * .free_effort2 = myType_FreeEffortCallBack2, + * .unlink2 = myType_UnlinkCallBack2, + * .copy2 = myType_CopyCallback2, * } * * * **rdb_load**: A callback function pointer that loads data from RDB files. @@ -4472,6 +4512,15 @@ robj *moduleTypeDupOrReply(client *c, robj *fromkey, robj *tokey, robj *value) { * NOTE: The value is passed as a `void**` and the function is expected to update the * pointer if the top-level value pointer is defragmented and consequently changes. * + * * **mem_usage2**: Similar to `mem_usage`, but provides the `RedisModuleKeyOptCtx` parameter + * so that meta information such as key name and db id can be obtained. + * * **free_effort2**: Similar to `free_effort`, but provides the `RedisModuleKeyOptCtx` parameter + * so that meta information such as key name and db id can be obtained. + * * **unlink2**: Similar to `unlink`, but provides the `RedisModuleKeyOptCtx` parameter + * so that meta information such as key name and db id can be obtained. + * * **copy2**: Similar to `copy`, but provides the `RedisModuleKeyOptCtx` parameter + * so that meta information such as key names and db ids can be obtained. + * * Note: the module name "AAAAAAAAA" is reserved and produces an error, it * happens to be pretty lame as well. * @@ -4517,6 +4566,12 @@ moduleType *RM_CreateDataType(RedisModuleCtx *ctx, const char *name, int encver, moduleTypeCopyFunc copy; moduleTypeDefragFunc defrag; } v3; + struct { + moduleTypeMemUsageFunc2 mem_usage2; + moduleTypeFreeEffortFunc2 free_effort2; + moduleTypeUnlinkFunc2 unlink2; + moduleTypeCopyFunc2 copy2; + } v4; } *tms = (struct typemethods*) typemethods_ptr; moduleType *mt = zcalloc(sizeof(*mt)); @@ -4539,6 +4594,12 @@ moduleType *RM_CreateDataType(RedisModuleCtx *ctx, const char *name, int encver, mt->copy = tms->v3.copy; mt->defrag = tms->v3.defrag; } + if (tms->version >= 4) { + mt->mem_usage2 = tms->v4.mem_usage2; + mt->unlink2 = tms->v4.unlink2; + mt->free_effort2 = tms->v4.free_effort2; + mt->copy2 = tms->v4.copy2; + } memcpy(mt->name,name,sizeof(mt->name)); listAddNodeTail(ctx->module->types,mt); return mt; @@ -4977,7 +5038,7 @@ void *RM_LoadDataTypeFromString(const RedisModuleString *str, const moduleType * void *ret; rioInitWithBuffer(&payload, str->ptr); - moduleInitIOContext(io,(moduleType *)mt,&payload,NULL); + moduleInitIOContext(io,(moduleType *)mt,&payload,NULL,-1); /* All RM_Save*() calls always write a version 2 compatible format, so we * need to make sure we read the same. @@ -5003,7 +5064,7 @@ RedisModuleString *RM_SaveDataTypeToString(RedisModuleCtx *ctx, void *data, cons RedisModuleIO io; rioInitWithBuffer(&payload,sdsempty()); - moduleInitIOContext(io,(moduleType *)mt,&payload,NULL); + moduleInitIOContext(io,(moduleType *)mt,&payload,NULL,-1); mt->rdb_save(&io,data); if (io.ctx) { moduleFreeContext(io.ctx); @@ -5018,6 +5079,15 @@ RedisModuleString *RM_SaveDataTypeToString(RedisModuleCtx *ctx, void *data, cons } } +/* Returns the name of the key currently being processed. */ +const RedisModuleString *RM_GetKeyNameFromDigest(RedisModuleDigest *dig) { + return dig->key; +} + +/* Returns the database id of the key currently being processed. */ +int RM_GetDbIdFromDigest(RedisModuleDigest *dig) { + return dig->dbid; +} /* -------------------------------------------------------------------------- * ## AOF API for modules data types * -------------------------------------------------------------------------- */ @@ -5087,9 +5157,8 @@ RedisModuleCtx *RM_GetContextFromIO(RedisModuleIO *io) { return io->ctx; } -/* Returns a RedisModuleString with the name of the key currently saving or - * loading, when an IO data type callback is called. There is no guarantee - * that the key name is always available, so this may return NULL. +/* Returns the name of the key currently being processed. + * There is no guarantee that the key name is always available, so this may return NULL. */ const RedisModuleString *RM_GetKeyNameFromIO(RedisModuleIO *io) { return io->key; @@ -5100,6 +5169,18 @@ const RedisModuleString *RM_GetKeyNameFromModuleKey(RedisModuleKey *key) { return key ? key->key : NULL; } +/* Returns a database id of the key from RedisModuleKey. */ +int RM_GetDbIdFromModuleKey(RedisModuleKey *key) { + return key ? key->db->id : -1; +} + +/* Returns the database id of the key currently being processed. + * There is no guarantee that this info is always available, so this may return -1. + */ +int RM_GetDbIdFromIO(RedisModuleIO *io) { + return io->dbid; +} + /* -------------------------------------------------------------------------- * ## Logging * -------------------------------------------------------------------------- */ @@ -8346,16 +8427,55 @@ void processModuleLoadingProgressEvent(int is_aof) { /* When a module key is deleted (in dbAsyncDelete/dbSyncDelete/dbOverwrite), it * will be called to tell the module which key is about to be released. */ -void moduleNotifyKeyUnlink(robj *key, robj *val) { +void moduleNotifyKeyUnlink(robj *key, robj *val, int dbid) { if (val->type == OBJ_MODULE) { moduleValue *mv = val->ptr; moduleType *mt = mv->type; - if (mt->unlink != NULL) { + /* We prefer to use the enhanced version. */ + if (mt->unlink2 != NULL) { + RedisModuleKeyOptCtx ctx = {key, NULL, dbid, -1}; + mt->unlink2(&ctx,mv->value); + } else if (mt->unlink != NULL) { mt->unlink(key,mv->value); } } } +/* Return the free_effort of the module, it will automatically choose to call + * `free_effort` or `free_effort2`, and the default return value is 1. + * value of 0 means very high effort (always asynchronous freeing). */ +size_t moduleGetFreeEffort(robj *key, robj *val, int dbid) { + moduleValue *mv = val->ptr; + moduleType *mt = mv->type; + size_t effort = 1; + /* We prefer to use the enhanced version. */ + if (mt->free_effort2 != NULL) { + RedisModuleKeyOptCtx ctx = {key, NULL, dbid, -1}; + effort = mt->free_effort2(&ctx,mv->value); + } else if (mt->free_effort != NULL) { + effort = mt->free_effort(key,mv->value); + } + + return effort; +} + +/* Return the memory usage of the module, it will automatically choose to call + * `mem_usage` or `mem_usage2`, and the default return value is 0. */ +size_t moduleGetMemUsage(robj *key, robj *val, int dbid) { + moduleValue *mv = val->ptr; + moduleType *mt = mv->type; + size_t size = 0; + /* We prefer to use the enhanced version. */ + if (mt->mem_usage2 != NULL) { + RedisModuleKeyOptCtx ctx = {key, NULL, dbid, -1}; + size = mt->mem_usage2(&ctx,mv->value); + } else if (mt->mem_usage != NULL) { + size = mt->mem_usage(mv->value); + } + + return size; +} + /* -------------------------------------------------------------------------- * Modules API internals * -------------------------------------------------------------------------- */ @@ -9041,6 +9161,8 @@ typedef struct RedisModuleDefragCtx { long defragged; long long int endtime; unsigned long *cursor; + struct redisObject *key; /* Optional name of key processed, NULL when unknown. */ + int dbid; /* The dbid of the key being processed, -1 when unknown. */ } RedisModuleDefragCtx; /* Register a defrag callback for global data, i.e. anything that the module @@ -9152,11 +9274,11 @@ RedisModuleString *RM_DefragRedisModuleString(RedisModuleDefragCtx *ctx, RedisMo * Returns a zero value (and initializes the cursor) if no more needs to be done, * or a non-zero value otherwise. */ -int moduleLateDefrag(robj *key, robj *value, unsigned long *cursor, long long endtime, long long *defragged) { +int moduleLateDefrag(robj *key, robj *value, unsigned long *cursor, long long endtime, long long *defragged, int dbid) { moduleValue *mv = value->ptr; moduleType *mt = mv->type; - RedisModuleDefragCtx defrag_ctx = { 0, endtime, cursor }; + RedisModuleDefragCtx defrag_ctx = { 0, endtime, cursor, key, dbid}; /* Invoke callback. Note that the callback may be missing if the key has been * replaced with a different type since our last visit. @@ -9180,7 +9302,7 @@ int moduleLateDefrag(robj *key, robj *value, unsigned long *cursor, long long en * Returns 1 if the operation has been completed or 0 if it needs to * be scheduled for late defrag. */ -int moduleDefragValue(robj *key, robj *value, long *defragged) { +int moduleDefragValue(robj *key, robj *value, long *defragged, int dbid) { moduleValue *mv = value->ptr; moduleType *mt = mv->type; @@ -9200,16 +9322,14 @@ int moduleDefragValue(robj *key, robj *value, long *defragged) { * necessary schedule it for defragLater instead of quick immediate * defrag. */ - if (mt->free_effort) { - size_t effort = mt->free_effort(key, mv->value); - if (!effort) - effort = SIZE_MAX; - if (effort > server.active_defrag_max_scan_fields) { - return 0; /* Defrag later */ - } + size_t effort = moduleGetFreeEffort(key, value, dbid); + if (!effort) + effort = SIZE_MAX; + if (effort > server.active_defrag_max_scan_fields) { + return 0; /* Defrag later */ } - RedisModuleDefragCtx defrag_ctx = { 0, 0, NULL }; + RedisModuleDefragCtx defrag_ctx = { 0, 0, NULL, key, dbid}; mt->defrag(&defrag_ctx, key, &mv->value); (*defragged) += defrag_ctx.defragged; return 1; @@ -9225,7 +9345,7 @@ long moduleDefragGlobals(void) { struct RedisModule *module = dictGetVal(de); if (!module->defrag_cb) continue; - RedisModuleDefragCtx defrag_ctx = { 0, 0, NULL }; + RedisModuleDefragCtx defrag_ctx = { 0, 0, NULL, NULL, -1}; module->defrag_cb(&defrag_ctx); defragged += defrag_ctx.defragged; } @@ -9234,6 +9354,20 @@ long moduleDefragGlobals(void) { return defragged; } +/* Returns the name of the key currently being processed. + * There is no guarantee that the key name is always available, so this may return NULL. + */ +const RedisModuleString *RM_GetKeyNameFromDefragCtx(RedisModuleDefragCtx *ctx) { + return ctx->key; +} + +/* Returns the database id of the key currently being processed. + * There is no guarantee that this info is always available, so this may return -1. + */ +int RM_GetDbIdFromDefragCtx(RedisModuleDefragCtx *ctx) { + return ctx->dbid; +} + /* Register all the APIs we export. Keep this function at the end of the * file so that's easy to seek it to add new entries. */ void moduleRegisterCoreAPI(void) { @@ -9374,6 +9508,16 @@ void moduleRegisterCoreAPI(void) { REGISTER_API(GetContextFromIO); REGISTER_API(GetKeyNameFromIO); REGISTER_API(GetKeyNameFromModuleKey); + REGISTER_API(GetDbIdFromModuleKey); + REGISTER_API(GetDbIdFromIO); + REGISTER_API(GetKeyNameFromOptCtx); + REGISTER_API(GetToKeyNameFromOptCtx); + REGISTER_API(GetDbIdFromOptCtx); + REGISTER_API(GetToDbIdFromOptCtx); + REGISTER_API(GetKeyNameFromDefragCtx); + REGISTER_API(GetDbIdFromDefragCtx); + REGISTER_API(GetKeyNameFromDigest); + REGISTER_API(GetDbIdFromDigest); REGISTER_API(BlockClient); REGISTER_API(UnblockClient); REGISTER_API(IsBlockedReplyRequest); diff --git a/src/object.c b/src/object.c index 0f63d980a..31a4fcf8c 100644 --- a/src/object.c +++ b/src/object.c @@ -790,7 +790,7 @@ size_t streamRadixTreeMemoryUsage(rax *rax) { * case of aggregated data types where only "sample_size" elements * are checked and averaged to estimate the total size. */ #define OBJ_COMPUTE_SIZE_DEF_SAMPLES 5 /* Default sample size. */ -size_t objectComputeSize(robj *o, size_t sample_size) { +size_t objectComputeSize(robj *key, robj *o, size_t sample_size, int dbid) { sds ele, ele2; dict *d; dictIterator *di; @@ -941,13 +941,7 @@ size_t objectComputeSize(robj *o, size_t sample_size) { raxStop(&ri); } } else if (o->type == OBJ_MODULE) { - moduleValue *mv = o->ptr; - moduleType *mt = mv->type; - if (mt->mem_usage != NULL) { - asize = mt->mem_usage(mv->value); - } else { - asize = 0; - } + asize = moduleGetMemUsage(key, o, dbid); } else { serverPanic("Unknown object type"); } @@ -1336,7 +1330,7 @@ NULL addReplyNull(c); return; } - size_t usage = objectComputeSize(dictGetVal(de),samples); + size_t usage = objectComputeSize(c->argv[2],dictGetVal(de),samples,c->db->id); usage += sdsZmallocSize(dictGetKey(de)); usage += sizeof(dictEntry); addReplyLongLong(c,usage); diff --git a/src/rdb.c b/src/rdb.c index dfc08afcd..576c73c4f 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -792,7 +792,7 @@ size_t rdbSaveStreamConsumers(rio *rdb, streamCG *cg) { /* Save a Redis object. * Returns -1 on error, number of bytes written on success. */ -ssize_t rdbSaveObject(rio *rdb, robj *o, robj *key) { +ssize_t rdbSaveObject(rio *rdb, robj *o, robj *key, int dbid) { ssize_t n = 0, nwritten = 0; if (o->type == OBJ_STRING) { @@ -1032,7 +1032,7 @@ ssize_t rdbSaveObject(rio *rdb, robj *o, robj *key) { * to call the right module during loading. */ int retval = rdbSaveLen(rdb,mt->id); if (retval == -1) return -1; - moduleInitIOContext(io,mt,rdb,key); + moduleInitIOContext(io,mt,rdb,key,dbid); io.bytes += retval; /* Then write the module-specific representation + EOF marker. */ @@ -1058,8 +1058,8 @@ ssize_t rdbSaveObject(rio *rdb, robj *o, robj *key) { * the rdbSaveObject() function. Currently we use a trick to get * this length with very little changes to the code. In the future * we could switch to a faster solution. */ -size_t rdbSavedObjectLen(robj *o, robj *key) { - ssize_t len = rdbSaveObject(NULL,o,key); +size_t rdbSavedObjectLen(robj *o, robj *key, int dbid) { + ssize_t len = rdbSaveObject(NULL,o,key,dbid); serverAssertWithInfo(NULL,o,len != -1); return len; } @@ -1067,7 +1067,7 @@ size_t rdbSavedObjectLen(robj *o, robj *key) { /* Save a key-value pair, with expire time, type, key, value. * On error -1 is returned. * On success if the key was actually saved 1 is returned. */ -int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val, long long expiretime) { +int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val, long long expiretime, int dbid) { int savelru = server.maxmemory_policy & MAXMEMORY_FLAG_LRU; int savelfu = server.maxmemory_policy & MAXMEMORY_FLAG_LFU; @@ -1100,7 +1100,7 @@ int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val, long long expiretime) { /* Save type, key, value */ if (rdbSaveObjectType(rdb,val) == -1) return -1; if (rdbSaveStringObject(rdb,key) == -1) return -1; - if (rdbSaveObject(rdb,val,key) == -1) return -1; + if (rdbSaveObject(rdb,val,key,dbid) == -1) return -1; /* Delay return if required (for testing) */ if (server.rdb_key_save_delay) @@ -1163,7 +1163,7 @@ ssize_t rdbSaveSingleModuleAux(rio *rdb, int when, moduleType *mt) { RedisModuleIO io; int retval = rdbSaveType(rdb, RDB_OPCODE_MODULE_AUX); if (retval == -1) return -1; - moduleInitIOContext(io,mt,rdb,NULL); + moduleInitIOContext(io,mt,rdb,NULL,-1); io.bytes += retval; /* Write the "module" identifier as prefix, so that we'll be able @@ -1251,7 +1251,7 @@ int rdbSaveRio(rio *rdb, int *error, int rdbflags, rdbSaveInfo *rsi) { initStaticStringObject(key,keystr); expire = getExpire(db,&key); - if (rdbSaveKeyValuePair(rdb,&key,o,expire) == -1) goto werr; + if (rdbSaveKeyValuePair(rdb,&key,o,expire,j) == -1) goto werr; /* When this RDB is produced as part of an AOF rewrite, move * accumulated diff from parent to child while rewriting in @@ -1510,7 +1510,7 @@ robj *rdbLoadCheckModuleValue(rio *rdb, char *modulename) { /* Load a Redis object of the specified type from the specified file. * On success a newly allocated object is returned, otherwise NULL. */ -robj *rdbLoadObject(int rdbtype, rio *rdb, sds key) { +robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, int dbid) { robj *o = NULL, *ele, *dec; uint64_t len; unsigned int i; @@ -2184,7 +2184,7 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key) { RedisModuleIO io; robj keyobj; initStaticStringObject(keyobj,key); - moduleInitIOContext(io,mt,rdb,&keyobj); + moduleInitIOContext(io,mt,rdb,&keyobj,dbid); io.ver = (rdbtype == RDB_TYPE_MODULE) ? 1 : 2; /* Call the rdb_load method of the module providing the 10 bit * encoding version in the lower 10 bits of the module ID. */ @@ -2495,7 +2495,7 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) { } RedisModuleIO io; - moduleInitIOContext(io,mt,rdb,NULL); + moduleInitIOContext(io,mt,rdb,NULL,-1); io.ver = 2; /* Call the rdb_load method of the module providing the 10 bit * encoding version in the lower 10 bits of the module ID. */ @@ -2526,7 +2526,7 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) { if ((key = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL)) == NULL) goto eoferr; /* Read value */ - if ((val = rdbLoadObject(type,rdb,key)) == NULL) { + if ((val = rdbLoadObject(type,rdb,key,db->id)) == NULL) { sdsfree(key); goto eoferr; } diff --git a/src/rdb.h b/src/rdb.h index 00ed5297c..aab23fbe2 100644 --- a/src/rdb.h +++ b/src/rdb.h @@ -143,11 +143,11 @@ int rdbSaveBackground(char *filename, rdbSaveInfo *rsi); int rdbSaveToSlavesSockets(rdbSaveInfo *rsi); void rdbRemoveTempFile(pid_t childpid, int from_signal); int rdbSave(char *filename, rdbSaveInfo *rsi); -ssize_t rdbSaveObject(rio *rdb, robj *o, robj *key); -size_t rdbSavedObjectLen(robj *o, robj *key); -robj *rdbLoadObject(int type, rio *rdb, sds key); +ssize_t rdbSaveObject(rio *rdb, robj *o, robj *key, int dbid); +size_t rdbSavedObjectLen(robj *o, robj *key, int dbid); +robj *rdbLoadObject(int type, rio *rdb, sds key, int dbid); void backgroundSaveDoneHandler(int exitcode, int bysignal); -int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val, long long expiretime); +int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val, long long expiretime,int dbid); ssize_t rdbSaveSingleModuleAux(rio *rdb, int when, moduleType *mt); robj *rdbLoadCheckModuleValue(rio *rdb, char *modulename); robj *rdbLoadStringObject(rio *rdb); diff --git a/src/redis-check-rdb.c b/src/redis-check-rdb.c index 8f57fa4d4..b2cc6a5b4 100644 --- a/src/redis-check-rdb.c +++ b/src/redis-check-rdb.c @@ -184,6 +184,7 @@ void rdbCheckSetupSignals(void) { * otherwise the already open file 'fp' is checked. */ int redis_check_rdb(char *rdbfilename, FILE *fp) { uint64_t dbid; + int selected_dbid = -1; int type, rdbver; char buf[1024]; long long expiretime, now = mstime(); @@ -251,6 +252,7 @@ int redis_check_rdb(char *rdbfilename, FILE *fp) { if ((dbid = rdbLoadLen(&rdb,NULL)) == RDB_LENERR) goto eoferr; rdbCheckInfo("Selecting DB ID %llu", (unsigned long long)dbid); + selected_dbid = dbid; continue; /* Read type again. */ } else if (type == RDB_OPCODE_RESIZEDB) { /* RESIZEDB: Hint about the size of the keys in the currently @@ -308,7 +310,7 @@ int redis_check_rdb(char *rdbfilename, FILE *fp) { rdbstate.keys++; /* Read value */ rdbstate.doing = RDB_CHECK_DOING_READ_OBJECT_VALUE; - if ((val = rdbLoadObject(type,&rdb,key->ptr)) == NULL) goto eoferr; + if ((val = rdbLoadObject(type,&rdb,key->ptr,selected_dbid)) == NULL) goto eoferr; /* Check if the key already expired. */ if (expiretime != -1 && expiretime < now) rdbstate.already_expired++; diff --git a/src/redismodule.h b/src/redismodule.h index 48a3a9df1..6f52818a5 100644 --- a/src/redismodule.h +++ b/src/redismodule.h @@ -16,7 +16,7 @@ /* Version of the RedisModuleTypeMethods structure. Once the RedisModuleTypeMethods * structure is changed, this version number needs to be changed synchronistically. */ -#define REDISMODULE_TYPE_METHOD_VERSION 3 +#define REDISMODULE_TYPE_METHOD_VERSION 4 /* API flags and constants */ #define REDISMODULE_READ (1<<0) @@ -520,6 +520,7 @@ typedef struct RedisModuleServerInfoData RedisModuleServerInfoData; typedef struct RedisModuleScanCursor RedisModuleScanCursor; typedef struct RedisModuleDefragCtx RedisModuleDefragCtx; typedef struct RedisModuleUser RedisModuleUser; +typedef struct RedisModuleKeyOptCtx RedisModuleKeyOptCtx; typedef int (*RedisModuleCmdFunc)(RedisModuleCtx *ctx, RedisModuleString **argv, int argc); typedef void (*RedisModuleDisconnectFunc)(RedisModuleCtx *ctx, RedisModuleBlockedClient *bc); @@ -530,11 +531,15 @@ typedef int (*RedisModuleTypeAuxLoadFunc)(RedisModuleIO *rdb, int encver, int wh typedef void (*RedisModuleTypeAuxSaveFunc)(RedisModuleIO *rdb, int when); typedef void (*RedisModuleTypeRewriteFunc)(RedisModuleIO *aof, RedisModuleString *key, void *value); typedef size_t (*RedisModuleTypeMemUsageFunc)(const void *value); +typedef size_t (*RedisModuleTypeMemUsageFunc2)(RedisModuleKeyOptCtx *ctx, const void *value); typedef void (*RedisModuleTypeDigestFunc)(RedisModuleDigest *digest, void *value); typedef void (*RedisModuleTypeFreeFunc)(void *value); typedef size_t (*RedisModuleTypeFreeEffortFunc)(RedisModuleString *key, const void *value); +typedef size_t (*RedisModuleTypeFreeEffortFunc2)(RedisModuleKeyOptCtx *ctx, const void *value); typedef void (*RedisModuleTypeUnlinkFunc)(RedisModuleString *key, const void *value); +typedef void (*RedisModuleTypeUnlinkFunc2)(RedisModuleKeyOptCtx *ctx, const void *value); typedef void *(*RedisModuleTypeCopyFunc)(RedisModuleString *fromkey, RedisModuleString *tokey, const void *value); +typedef void *(*RedisModuleTypeCopyFunc2)(RedisModuleKeyOptCtx *ctx, const void *value); typedef int (*RedisModuleTypeDefragFunc)(RedisModuleDefragCtx *ctx, RedisModuleString *key, void **value); typedef void (*RedisModuleClusterMessageReceiver)(RedisModuleCtx *ctx, const char *sender_id, uint8_t type, const unsigned char *payload, uint32_t len); typedef void (*RedisModuleTimerProc)(RedisModuleCtx *ctx, void *data); @@ -561,6 +566,10 @@ typedef struct RedisModuleTypeMethods { RedisModuleTypeUnlinkFunc unlink; RedisModuleTypeCopyFunc copy; RedisModuleTypeDefragFunc defrag; + RedisModuleTypeMemUsageFunc2 mem_usage2; + RedisModuleTypeFreeEffortFunc2 free_effort2; + RedisModuleTypeUnlinkFunc2 unlink2; + RedisModuleTypeCopyFunc2 copy2; } RedisModuleTypeMethods; #define REDISMODULE_GET_API(name) \ @@ -716,10 +725,18 @@ REDISMODULE_API int (*RedisModule_StringCompare)(RedisModuleString *a, RedisModu REDISMODULE_API RedisModuleCtx * (*RedisModule_GetContextFromIO)(RedisModuleIO *io) REDISMODULE_ATTR; REDISMODULE_API const RedisModuleString * (*RedisModule_GetKeyNameFromIO)(RedisModuleIO *io) REDISMODULE_ATTR; REDISMODULE_API const RedisModuleString * (*RedisModule_GetKeyNameFromModuleKey)(RedisModuleKey *key) REDISMODULE_ATTR; +REDISMODULE_API int (*RedisModule_GetDbIdFromModuleKey)(RedisModuleKey *key) REDISMODULE_ATTR; +REDISMODULE_API int (*RedisModule_GetDbIdFromIO)(RedisModuleIO *io) REDISMODULE_ATTR; +REDISMODULE_API int (*RedisModule_GetDbIdFromOptCtx)(RedisModuleKeyOptCtx *ctx) REDISMODULE_ATTR; +REDISMODULE_API int (*RedisModule_GetToDbIdFromOptCtx)(RedisModuleKeyOptCtx *ctx) REDISMODULE_ATTR; +REDISMODULE_API const RedisModuleString * (*RedisModule_GetKeyNameFromOptCtx)(RedisModuleKeyOptCtx *ctx) REDISMODULE_ATTR; +REDISMODULE_API const RedisModuleString * (*RedisModule_GetToKeyNameFromOptCtx)(RedisModuleKeyOptCtx *ctx) REDISMODULE_ATTR; REDISMODULE_API long long (*RedisModule_Milliseconds)(void) REDISMODULE_ATTR; REDISMODULE_API void (*RedisModule_DigestAddStringBuffer)(RedisModuleDigest *md, unsigned char *ele, size_t len) REDISMODULE_ATTR; REDISMODULE_API void (*RedisModule_DigestAddLongLong)(RedisModuleDigest *md, long long ele) REDISMODULE_ATTR; REDISMODULE_API void (*RedisModule_DigestEndSequence)(RedisModuleDigest *md) REDISMODULE_ATTR; +REDISMODULE_API int (*RedisModule_GetDbIdFromDigest)(RedisModuleDigest *dig) REDISMODULE_ATTR; +REDISMODULE_API const RedisModuleString * (*RedisModule_GetKeyNameFromDigest)(RedisModuleDigest *dig) REDISMODULE_ATTR; REDISMODULE_API RedisModuleDict * (*RedisModule_CreateDict)(RedisModuleCtx *ctx) REDISMODULE_ATTR; REDISMODULE_API void (*RedisModule_FreeDict)(RedisModuleCtx *ctx, RedisModuleDict *d) REDISMODULE_ATTR; REDISMODULE_API uint64_t (*RedisModule_DictSize)(RedisModuleDict *d) REDISMODULE_ATTR; @@ -843,6 +860,8 @@ REDISMODULE_API RedisModuleString *(*RedisModule_DefragRedisModuleString)(RedisM REDISMODULE_API int (*RedisModule_DefragShouldStop)(RedisModuleDefragCtx *ctx) REDISMODULE_ATTR; REDISMODULE_API int (*RedisModule_DefragCursorSet)(RedisModuleDefragCtx *ctx, unsigned long cursor) REDISMODULE_ATTR; REDISMODULE_API int (*RedisModule_DefragCursorGet)(RedisModuleDefragCtx *ctx, unsigned long *cursor) REDISMODULE_ATTR; +REDISMODULE_API int (*RedisModule_GetDbIdFromDefragCtx)(RedisModuleDefragCtx *ctx) REDISMODULE_ATTR; +REDISMODULE_API const RedisModuleString * (*RedisModule_GetKeyNameFromDefragCtx)(RedisModuleDefragCtx *ctx) REDISMODULE_ATTR; #endif #define RedisModule_IsAOFClient(id) ((id) == UINT64_MAX) @@ -989,10 +1008,18 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int REDISMODULE_GET_API(GetContextFromIO); REDISMODULE_GET_API(GetKeyNameFromIO); REDISMODULE_GET_API(GetKeyNameFromModuleKey); + REDISMODULE_GET_API(GetDbIdFromModuleKey); + REDISMODULE_GET_API(GetDbIdFromIO); + REDISMODULE_GET_API(GetKeyNameFromOptCtx); + REDISMODULE_GET_API(GetToKeyNameFromOptCtx); + REDISMODULE_GET_API(GetDbIdFromOptCtx); + REDISMODULE_GET_API(GetToDbIdFromOptCtx); REDISMODULE_GET_API(Milliseconds); REDISMODULE_GET_API(DigestAddStringBuffer); REDISMODULE_GET_API(DigestAddLongLong); REDISMODULE_GET_API(DigestEndSequence); + REDISMODULE_GET_API(GetKeyNameFromDigest); + REDISMODULE_GET_API(GetDbIdFromDigest); REDISMODULE_GET_API(CreateDict); REDISMODULE_GET_API(FreeDict); REDISMODULE_GET_API(DictSize); @@ -1116,6 +1143,8 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int REDISMODULE_GET_API(DefragShouldStop); REDISMODULE_GET_API(DefragCursorSet); REDISMODULE_GET_API(DefragCursorGet); + REDISMODULE_GET_API(GetKeyNameFromDefragCtx); + REDISMODULE_GET_API(GetDbIdFromDefragCtx); #endif if (RedisModule_IsModuleNameBusy && RedisModule_IsModuleNameBusy(name)) return REDISMODULE_ERR; diff --git a/src/server.h b/src/server.h index 367a6d0bc..8180df9b3 100644 --- a/src/server.h +++ b/src/server.h @@ -550,6 +550,7 @@ struct moduleLoadQueueEntry; struct redisObject; struct RedisModuleDefragCtx; struct RedisModuleInfoCtx; +struct RedisModuleKeyOptCtx; /* Each module type implementation should export a set of methods in order * to serialize and deserialize the value in the RDB file, rewrite the AOF @@ -569,6 +570,11 @@ typedef void *(*moduleTypeCopyFunc)(struct redisObject *fromkey, struct redisObj typedef int (*moduleTypeDefragFunc)(struct RedisModuleDefragCtx *ctx, struct redisObject *key, void **value); typedef void (*RedisModuleInfoFunc)(struct RedisModuleInfoCtx *ctx, int for_crash_report); typedef void (*RedisModuleDefragFunc)(struct RedisModuleDefragCtx *ctx); +typedef size_t (*moduleTypeMemUsageFunc2)(struct RedisModuleKeyOptCtx *ctx, const void *value); +typedef void (*moduleTypeFreeFunc2)(struct RedisModuleKeyOptCtx *ctx, void *value); +typedef size_t (*moduleTypeFreeEffortFunc2)(struct RedisModuleKeyOptCtx *ctx, const void *value); +typedef void (*moduleTypeUnlinkFunc2)(struct RedisModuleKeyOptCtx *ctx, void *value); +typedef void *(*moduleTypeCopyFunc2)(struct RedisModuleKeyOptCtx *ctx, const void *value); /* This callback type is called by moduleNotifyUserChanged() every time * a user authenticated via the module API is associated with a different @@ -594,6 +600,10 @@ typedef struct RedisModuleType { moduleTypeDefragFunc defrag; moduleTypeAuxLoadFunc aux_load; moduleTypeAuxSaveFunc aux_save; + moduleTypeMemUsageFunc2 mem_usage2; + moduleTypeFreeEffortFunc2 free_effort2; + moduleTypeUnlinkFunc2 unlink2; + moduleTypeCopyFunc2 copy2; int aux_save_triggers; char name[10]; /* 9 bytes name + null term. Charset: A-Z a-z 0-9 _- */ } moduleType; @@ -650,17 +660,19 @@ typedef struct RedisModuleIO { * 2 (current version with opcodes annotation). */ struct RedisModuleCtx *ctx; /* Optional context, see RM_GetContextFromIO()*/ struct redisObject *key; /* Optional name of key processed */ -} RedisModuleIO; + int dbid; /* The dbid of the key being processed, -1 when unknown. */ +} RedisModuleIO; /* Macro to initialize an IO context. Note that the 'ver' field is populated * inside rdb.c according to the version of the value to load. */ -#define moduleInitIOContext(iovar,mtype,rioptr,keyptr) do { \ +#define moduleInitIOContext(iovar,mtype,rioptr,keyptr,db) do { \ iovar.rio = rioptr; \ iovar.type = mtype; \ iovar.bytes = 0; \ iovar.error = 0; \ iovar.ver = 0; \ iovar.key = keyptr; \ + iovar.dbid = db; \ iovar.ctx = NULL; \ } while(0) @@ -672,6 +684,8 @@ typedef struct RedisModuleIO { typedef struct RedisModuleDigest { unsigned char o[20]; /* Ordered elements. */ unsigned char x[20]; /* Xored elements. */ + struct redisObject *key; /* Optional name of key processed */ + int dbid; /* The dbid of the key being processed */ } RedisModuleDigest; /* Just start with a digest composed of all zero bytes. */ @@ -1817,10 +1831,12 @@ int moduleTryServeClientBlockedOnKey(client *c, robj *key); void moduleUnblockClient(client *c); int moduleClientIsBlockedOnKeys(client *c); void moduleNotifyUserChanged(client *c); -void moduleNotifyKeyUnlink(robj *key, robj *val); -robj *moduleTypeDupOrReply(client *c, robj *fromkey, robj *tokey, robj *value); -int moduleDefragValue(robj *key, robj *obj, long *defragged); -int moduleLateDefrag(robj *key, robj *value, unsigned long *cursor, long long endtime, long long *defragged); +void moduleNotifyKeyUnlink(robj *key, robj *val, int dbid); +size_t moduleGetFreeEffort(robj *key, robj *val, int dbid); +size_t moduleGetMemUsage(robj *key, robj *val, int dbid); +robj *moduleTypeDupOrReply(client *c, robj *fromkey, robj *tokey, int todb, robj *value); +int moduleDefragValue(robj *key, robj *obj, long *defragged, int dbid); +int moduleLateDefrag(robj *key, robj *value, unsigned long *cursor, long long endtime, long long *defragged, int dbid); long moduleDefragGlobals(void); /* Utils */ @@ -2406,7 +2422,7 @@ void slotToKeyFlush(int async); size_t lazyfreeGetPendingObjectsCount(void); size_t lazyfreeGetFreedObjectsCount(void); void lazyfreeResetStats(void); -void freeObjAsync(robj *key, robj *obj); +void freeObjAsync(robj *key, robj *obj, int dbid); void freeSlotsToKeysMapAsync(rax *rt); void freeSlotsToKeysMap(rax *rt, int async); diff --git a/src/t_set.c b/src/t_set.c index d0c54848e..b58bb6fc0 100644 --- a/src/t_set.c +++ b/src/t_set.c @@ -1137,7 +1137,7 @@ void sunionDiffGenericCommand(client *c, robj **setkeys, int setnum, sdsfree(ele); } setTypeReleaseIterator(si); - server.lazyfree_lazy_server_del ? freeObjAsync(NULL, dstset) : + server.lazyfree_lazy_server_del ? freeObjAsync(NULL, dstset, -1) : decrRefCount(dstset); } else { /* If we have a target key where to store the resulting set diff --git a/tests/modules/Makefile b/tests/modules/Makefile index f56313964..947706ec8 100644 --- a/tests/modules/Makefile +++ b/tests/modules/Makefile @@ -28,6 +28,7 @@ TEST_MODULES = \ blockonbackground.so \ scan.so \ datatype.so \ + datatype2.so \ auth.so \ keyspace_events.so \ blockedclient.so \ @@ -37,7 +38,7 @@ TEST_MODULES = \ defragtest.so \ hash.so \ zset.so \ - stream.so + stream.so \ .PHONY: all diff --git a/tests/modules/datatype2.c b/tests/modules/datatype2.c new file mode 100644 index 000000000..aba66b124 --- /dev/null +++ b/tests/modules/datatype2.c @@ -0,0 +1,735 @@ +/* This module is used to test a use case of a module that stores information + * about keys in global memory, and relies on the enhanced data type callbacks to + * get key name and dbid on various operations. + * + * it simulates a simple memory allocator. The smallest allocation unit of + * the allocator is a mem block with a size of 4KB. Multiple mem blocks are combined + * using a linked list. These linked lists are placed in a global dict named 'mem_pool'. + * Each db has a 'mem_pool'. You can use the 'mem.alloc' command to allocate a specified + * number of mem blocks, and use 'mem.free' to release the memory. Use 'mem.write', 'mem.read' + * to write and read the specified mem block (note that each mem block can only be written once). + * Use 'mem.usage' to get the memory usage under different dbs, and it will return the size + * mem blocks and used mem blocks under the db. + * The specific structure diagram is as follows: + * + * + * Global variables of the module: + * + * mem blocks link + * ┌─────┬─────┐ + * │ │ │ ┌───┐ ┌───┐ ┌───┐ + * │ k1 │ ───┼───►│4KB├───►│4KB├───►│4KB│ + * │ │ │ └───┘ └───┘ └───┘ + * ├─────┼─────┤ + * ┌───────┐ ┌────► │ │ │ ┌───┐ ┌───┐ + * │ │ │ │ k2 │ ───┼───►│4KB├───►│4KB│ + * │ db0 ├──────┘ │ │ │ └───┘ └───┘ + * │ │ ├─────┼─────┤ + * ├───────┤ │ │ │ ┌───┐ ┌───┐ ┌───┐ + * │ │ │ k3 │ ───┼───►│4KB├───►│4KB├───►│4KB│ + * │ db1 ├──►null │ │ │ └───┘ └───┘ └───┘ + * │ │ └─────┴─────┘ + * ├───────┤ dict + * │ │ + * │ db2 ├─────────┐ + * │ │ │ + * ├───────┤ │ ┌─────┬─────┐ + * │ │ │ │ │ │ ┌───┐ ┌───┐ ┌───┐ + * │ db3 ├──►null │ │ k1 │ ───┼───►│4KB├───►│4KB├───►│4KB│ + * │ │ │ │ │ │ └───┘ └───┘ └───┘ + * └───────┘ │ ├─────┼─────┤ + * mem_pool[MAX_DB] │ │ │ │ ┌───┐ ┌───┐ + * └──►│ k2 │ ───┼───►│4KB├───►│4KB│ + * │ │ │ └───┘ └───┘ + * └─────┴─────┘ + * dict + * + * + * Keys in redis database: + * + * ┌───────┐ + * │ size │ + * ┌───────────►│ used │ + * │ │ mask │ + * ┌─────┬─────┐ │ └───────┘ ┌───────┐ + * │ │ │ │ MemAllocObject │ size │ + * │ k1 │ ───┼─┘ ┌───────────►│ used │ + * │ │ │ │ │ mask │ + * ├─────┼─────┤ ┌───────┐ ┌─────┬─────┐ │ └───────┘ + * │ │ │ │ size │ │ │ │ │ MemAllocObject + * │ k2 │ ───┼─────────────►│ used │ │ k1 │ ───┼─┘ + * │ │ │ │ mask │ │ │ │ + * ├─────┼─────┤ └───────┘ ├─────┼─────┤ + * │ │ │ MemAllocObject │ │ │ + * │ k3 │ ───┼─┐ │ k2 │ ───┼─┐ + * │ │ │ │ │ │ │ │ + * └─────┴─────┘ │ ┌───────┐ └─────┴─────┘ │ ┌───────┐ + * redis db[0] │ │ size │ redis db[1] │ │ size │ + * └───────────►│ used │ └───────────►│ used │ + * │ mask │ │ mask │ + * └───────┘ └───────┘ + * MemAllocObject MemAllocObject + * + **/ + +#include "redismodule.h" +#include +#include +#include +#include +#include + +static RedisModuleType *MemAllocType; + +#define MAX_DB 16 +RedisModuleDict *mem_pool[MAX_DB]; +typedef struct MemAllocObject { + long long size; + long long used; + uint64_t mask; +} MemAllocObject; + +MemAllocObject *createMemAllocObject(void) { + MemAllocObject *o = RedisModule_Calloc(1, sizeof(*o)); + return o; +} + +/*---------------------------- mem block apis ------------------------------------*/ +#define BLOCK_SIZE 4096 +struct MemBlock { + char block[BLOCK_SIZE]; + struct MemBlock *next; +}; + +void MemBlockFree(struct MemBlock *head) { + if (head) { + struct MemBlock *block = head->next, *next; + RedisModule_Free(head); + while (block) { + next = block->next; + RedisModule_Free(block); + block = next; + } + } +} +struct MemBlock *MemBlockCreate(long long num) { + if (num <= 0) { + return NULL; + } + + struct MemBlock *head = RedisModule_Calloc(1, sizeof(struct MemBlock)); + struct MemBlock *block = head; + while (--num) { + block->next = RedisModule_Calloc(1, sizeof(struct MemBlock)); + block = block->next; + } + + return head; +} + +long long MemBlockNum(const struct MemBlock *head) { + long long num = 0; + const struct MemBlock *block = head; + while (block) { + num++; + block = block->next; + } + + return num; +} + +size_t MemBlockWrite(struct MemBlock *head, long long block_index, const char *data, size_t size) { + size_t w_size = 0; + struct MemBlock *block = head; + while (block_index-- && block) { + block = block->next; + } + + if (block) { + size = size > BLOCK_SIZE ? BLOCK_SIZE:size; + memcpy(block->block, data, size); + w_size += size; + } + + return w_size; +} + +int MemBlockRead(struct MemBlock *head, long long block_index, char *data, size_t size) { + size_t r_size = 0; + struct MemBlock *block = head; + while (block_index-- && block) { + block = block->next; + } + + if (block) { + size = size > BLOCK_SIZE ? BLOCK_SIZE:size; + memcpy(data, block->block, size); + r_size += size; + } + + return r_size; +} + +void MemPoolFreeDb(RedisModuleCtx *ctx, int dbid) { + RedisModuleString *key; + void *tdata; + RedisModuleDictIter *iter = RedisModule_DictIteratorStartC(mem_pool[dbid], "^", NULL, 0); + while((key = RedisModule_DictNext(ctx, iter, &tdata)) != NULL) { + MemBlockFree((struct MemBlock *)tdata); + } + RedisModule_DictIteratorStop(iter); + RedisModule_FreeDict(NULL, mem_pool[dbid]); + mem_pool[dbid] = RedisModule_CreateDict(NULL); +} + +struct MemBlock *MemBlockClone(const struct MemBlock *head) { + struct MemBlock *newhead = NULL; + if (head) { + newhead = RedisModule_Calloc(1, sizeof(struct MemBlock)); + memcpy(newhead->block, head->block, BLOCK_SIZE); + struct MemBlock *newblock = newhead; + const struct MemBlock *oldblock = head->next; + while (oldblock) { + newblock->next = RedisModule_Calloc(1, sizeof(struct MemBlock)); + newblock = newblock->next; + memcpy(newblock->block, oldblock->block, BLOCK_SIZE); + oldblock = oldblock->next; + } + } + + return newhead; +} + +/*---------------------------- event handler ------------------------------------*/ +void swapDbCallback(RedisModuleCtx *ctx, RedisModuleEvent e, uint64_t sub, void *data) { + REDISMODULE_NOT_USED(ctx); + REDISMODULE_NOT_USED(e); + REDISMODULE_NOT_USED(sub); + + RedisModuleSwapDbInfo *ei = data; + + // swap + RedisModuleDict *tmp = mem_pool[ei->dbnum_first]; + mem_pool[ei->dbnum_first] = mem_pool[ei->dbnum_second]; + mem_pool[ei->dbnum_second] = tmp; +} + +void flushdbCallback(RedisModuleCtx *ctx, RedisModuleEvent e, uint64_t sub, void *data) { + REDISMODULE_NOT_USED(ctx); + REDISMODULE_NOT_USED(e); + int i; + RedisModuleFlushInfo *fi = data; + + if (sub == REDISMODULE_SUBEVENT_FLUSHDB_START) { + if (fi->dbnum != -1) { + MemPoolFreeDb(ctx, fi->dbnum); + } else { + for (i = 0; i < MAX_DB; i++) { + MemPoolFreeDb(ctx, i); + } + } + } +} + +/*---------------------------- command implementation ------------------------------------*/ + +/* MEM.ALLOC key block_num */ +int MemAlloc_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + RedisModule_AutoMemory(ctx); + + if (argc != 3) { + return RedisModule_WrongArity(ctx); + } + + long long block_num; + if ((RedisModule_StringToLongLong(argv[2], &block_num) != REDISMODULE_OK) || block_num <= 0) { + return RedisModule_ReplyWithError(ctx, "ERR invalid block_num: must be a value greater than 0"); + } + + RedisModuleKey *key = RedisModule_OpenKey(ctx, argv[1], REDISMODULE_READ | REDISMODULE_WRITE); + int type = RedisModule_KeyType(key); + if (type != REDISMODULE_KEYTYPE_EMPTY && RedisModule_ModuleTypeGetType(key) != MemAllocType) { + return RedisModule_ReplyWithError(ctx, REDISMODULE_ERRORMSG_WRONGTYPE); + } + + MemAllocObject *o; + if (type == REDISMODULE_KEYTYPE_EMPTY) { + o = createMemAllocObject(); + RedisModule_ModuleTypeSetValue(key, MemAllocType, o); + } else { + o = RedisModule_ModuleTypeGetValue(key); + } + + struct MemBlock *mem = MemBlockCreate(block_num); + RedisModule_Assert(mem != NULL); + RedisModule_DictSet(mem_pool[RedisModule_GetSelectedDb(ctx)], argv[1], mem); + o->size = block_num; + o->used = 0; + o->mask = 0; + + RedisModule_ReplyWithLongLong(ctx, block_num); + RedisModule_ReplicateVerbatim(ctx); + return REDISMODULE_OK; +} + +/* MEM.FREE key */ +int MemFree_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + RedisModule_AutoMemory(ctx); + + if (argc != 2) { + return RedisModule_WrongArity(ctx); + } + + RedisModuleKey *key = RedisModule_OpenKey(ctx, argv[1], REDISMODULE_READ); + int type = RedisModule_KeyType(key); + if (type != REDISMODULE_KEYTYPE_EMPTY && RedisModule_ModuleTypeGetType(key) != MemAllocType) { + return RedisModule_ReplyWithError(ctx, REDISMODULE_ERRORMSG_WRONGTYPE); + } + + int ret = 0; + MemAllocObject *o; + if (type == REDISMODULE_KEYTYPE_EMPTY) { + RedisModule_ReplyWithLongLong(ctx, ret); + return REDISMODULE_OK; + } else { + o = RedisModule_ModuleTypeGetValue(key); + } + + int nokey; + struct MemBlock *mem = (struct MemBlock *)RedisModule_DictGet(mem_pool[RedisModule_GetSelectedDb(ctx)], argv[1], &nokey); + if (!nokey && mem) { + MemBlockFree(mem); + o->used = 0; + o->size = 0; + o->mask = 0; + ret = 1; + } + + RedisModule_ReplyWithLongLong(ctx, ret); + RedisModule_ReplicateVerbatim(ctx); + return REDISMODULE_OK; +} + +/* MEM.WRITE key block_index data */ +int MemWrite_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + RedisModule_AutoMemory(ctx); + + if (argc != 4) { + return RedisModule_WrongArity(ctx); + } + + long long block_index; + if ((RedisModule_StringToLongLong(argv[2], &block_index) != REDISMODULE_OK) || block_index < 0) { + return RedisModule_ReplyWithError(ctx, "ERR invalid block_index: must be a value greater than 0"); + } + + RedisModuleKey *key = RedisModule_OpenKey(ctx, argv[1], REDISMODULE_READ | REDISMODULE_WRITE); + int type = RedisModule_KeyType(key); + if (type != REDISMODULE_KEYTYPE_EMPTY && RedisModule_ModuleTypeGetType(key) != MemAllocType) { + return RedisModule_ReplyWithError(ctx, REDISMODULE_ERRORMSG_WRONGTYPE); + } + + MemAllocObject *o; + if (type == REDISMODULE_KEYTYPE_EMPTY) { + return RedisModule_ReplyWithError(ctx, "ERR Memory has not been allocated"); + } else { + o = RedisModule_ModuleTypeGetValue(key); + } + + if (o->mask & (1UL << block_index)) { + return RedisModule_ReplyWithError(ctx, "ERR block is busy"); + } + + int ret = 0; + int nokey; + struct MemBlock *mem = (struct MemBlock *)RedisModule_DictGet(mem_pool[RedisModule_GetSelectedDb(ctx)], argv[1], &nokey); + if (!nokey && mem) { + size_t len; + const char *buf = RedisModule_StringPtrLen(argv[3], &len); + ret = MemBlockWrite(mem, block_index, buf, len); + o->mask |= (1UL << block_index); + o->used++; + } + + RedisModule_ReplyWithLongLong(ctx, ret); + RedisModule_ReplicateVerbatim(ctx); + return REDISMODULE_OK; +} + +/* MEM.READ key block_index */ +int MemRead_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + RedisModule_AutoMemory(ctx); + + if (argc != 3) { + return RedisModule_WrongArity(ctx); + } + + long long block_index; + if ((RedisModule_StringToLongLong(argv[2], &block_index) != REDISMODULE_OK) || block_index < 0) { + return RedisModule_ReplyWithError(ctx, "ERR invalid block_index: must be a value greater than 0"); + } + + RedisModuleKey *key = RedisModule_OpenKey(ctx, argv[1], REDISMODULE_READ); + int type = RedisModule_KeyType(key); + if (type != REDISMODULE_KEYTYPE_EMPTY && RedisModule_ModuleTypeGetType(key) != MemAllocType) { + return RedisModule_ReplyWithError(ctx, REDISMODULE_ERRORMSG_WRONGTYPE); + } + + MemAllocObject *o; + if (type == REDISMODULE_KEYTYPE_EMPTY) { + return RedisModule_ReplyWithError(ctx, "ERR Memory has not been allocated"); + } else { + o = RedisModule_ModuleTypeGetValue(key); + } + + if (!(o->mask & (1UL << block_index))) { + return RedisModule_ReplyWithNull(ctx); + } + + int nokey; + struct MemBlock *mem = (struct MemBlock *)RedisModule_DictGet(mem_pool[RedisModule_GetSelectedDb(ctx)], argv[1], &nokey); + RedisModule_Assert(nokey == 0 && mem != NULL); + + char buf[BLOCK_SIZE]; + MemBlockRead(mem, block_index, buf, sizeof(buf)); + + /* Assuming that the contents are all c-style strings */ + RedisModule_ReplyWithStringBuffer(ctx, buf, strlen(buf)); + return REDISMODULE_OK; +} + +/* MEM.USAGE dbid */ +int MemUsage_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + RedisModule_AutoMemory(ctx); + + if (argc != 2) { + return RedisModule_WrongArity(ctx); + } + + long long dbid; + if ((RedisModule_StringToLongLong(argv[1], (long long *)&dbid) != REDISMODULE_OK)) { + return RedisModule_ReplyWithError(ctx, "ERR invalid value: must be a integer"); + } + + if (dbid < 0 || dbid >= MAX_DB) { + return RedisModule_ReplyWithError(ctx, "ERR dbid out of range"); + } + + + long long size = 0, used = 0; + + void *data; + RedisModuleString *key; + RedisModuleDictIter *iter = RedisModule_DictIteratorStartC(mem_pool[dbid], "^", NULL, 0); + while((key = RedisModule_DictNext(ctx, iter, &data)) != NULL) { + int dbbackup = RedisModule_GetSelectedDb(ctx); + RedisModule_SelectDb(ctx, dbid); + RedisModuleKey *openkey = RedisModule_OpenKey(ctx, key, REDISMODULE_READ); + int type = RedisModule_KeyType(openkey); + RedisModule_Assert(type != REDISMODULE_KEYTYPE_EMPTY && RedisModule_ModuleTypeGetType(openkey) == MemAllocType); + MemAllocObject *o = RedisModule_ModuleTypeGetValue(openkey); + used += o->used; + size += o->size; + RedisModule_CloseKey(openkey); + RedisModule_SelectDb(ctx, dbbackup); + } + RedisModule_DictIteratorStop(iter); + + RedisModule_ReplyWithArray(ctx, 4); + RedisModule_ReplyWithSimpleString(ctx, "total"); + RedisModule_ReplyWithLongLong(ctx, size); + RedisModule_ReplyWithSimpleString(ctx, "used"); + RedisModule_ReplyWithLongLong(ctx, used); + return REDISMODULE_OK; +} + +/* MEM.ALLOCANDWRITE key block_num block_index data block_index data ... */ +int MemAllocAndWrite_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + RedisModule_AutoMemory(ctx); + + if (argc < 3) { + return RedisModule_WrongArity(ctx); + } + + long long block_num; + if ((RedisModule_StringToLongLong(argv[2], &block_num) != REDISMODULE_OK) || block_num <= 0) { + return RedisModule_ReplyWithError(ctx, "ERR invalid block_num: must be a value greater than 0"); + } + + RedisModuleKey *key = RedisModule_OpenKey(ctx, argv[1], REDISMODULE_READ | REDISMODULE_WRITE); + int type = RedisModule_KeyType(key); + if (type != REDISMODULE_KEYTYPE_EMPTY && RedisModule_ModuleTypeGetType(key) != MemAllocType) { + return RedisModule_ReplyWithError(ctx, REDISMODULE_ERRORMSG_WRONGTYPE); + } + + MemAllocObject *o; + if (type == REDISMODULE_KEYTYPE_EMPTY) { + o = createMemAllocObject(); + RedisModule_ModuleTypeSetValue(key, MemAllocType, o); + } else { + o = RedisModule_ModuleTypeGetValue(key); + } + + struct MemBlock *mem = MemBlockCreate(block_num); + RedisModule_Assert(mem != NULL); + RedisModule_DictSet(mem_pool[RedisModule_GetSelectedDb(ctx)], argv[1], mem); + o->used = 0; + o->mask = 0; + o->size = block_num; + + int i = 3; + long long block_index; + for (; i < argc; i++) { + /* Security is guaranteed internally, so no security check. */ + RedisModule_StringToLongLong(argv[i], &block_index); + size_t len; + const char * buf = RedisModule_StringPtrLen(argv[i + 1], &len); + MemBlockWrite(mem, block_index, buf, len); + o->used++; + o->mask |= (1UL << block_index); + } + + RedisModule_ReplyWithSimpleString(ctx, "OK"); + RedisModule_ReplicateVerbatim(ctx); + return REDISMODULE_OK; +} + +/*---------------------------- type callbacks ------------------------------------*/ + +void *MemAllocRdbLoad(RedisModuleIO *rdb, int encver) { + if (encver != 0) { + return NULL; + } + + MemAllocObject *o = createMemAllocObject(); + o->size = RedisModule_LoadSigned(rdb); + o->used = RedisModule_LoadSigned(rdb); + o->mask = RedisModule_LoadUnsigned(rdb); + + const RedisModuleString *key = RedisModule_GetKeyNameFromIO(rdb); + int dbid = RedisModule_GetDbIdFromIO(rdb); + + if (o->size) { + size_t size; + char *tmpbuf; + long long num = o->size; + struct MemBlock *head = RedisModule_Calloc(1, sizeof(struct MemBlock)); + tmpbuf = RedisModule_LoadStringBuffer(rdb, &size); + memcpy(head->block, tmpbuf, size > BLOCK_SIZE ? BLOCK_SIZE:size); + RedisModule_Free(tmpbuf); + struct MemBlock *block = head; + while (--num) { + block->next = RedisModule_Calloc(1, sizeof(struct MemBlock)); + block = block->next; + + tmpbuf = RedisModule_LoadStringBuffer(rdb, &size); + memcpy(block->block, tmpbuf, size > BLOCK_SIZE ? BLOCK_SIZE:size); + RedisModule_Free(tmpbuf); + } + + RedisModule_DictSet(mem_pool[dbid], (RedisModuleString *)key, head); + } + + return o; +} + +void MemAllocRdbSave(RedisModuleIO *rdb, void *value) { + MemAllocObject *o = value; + RedisModule_SaveSigned(rdb, o->size); + RedisModule_SaveSigned(rdb, o->used); + RedisModule_SaveUnsigned(rdb, o->mask); + + const RedisModuleString *key = RedisModule_GetKeyNameFromIO(rdb); + int dbid = RedisModule_GetDbIdFromIO(rdb); + + if (o->size) { + int nokey; + struct MemBlock *mem = (struct MemBlock *)RedisModule_DictGet(mem_pool[dbid], (RedisModuleString *)key, &nokey); + RedisModule_Assert(nokey == 0 && mem != NULL); + + struct MemBlock *block = mem; + while (block) { + RedisModule_SaveStringBuffer(rdb, block->block, BLOCK_SIZE); + block = block->next; + } + } +} + +void MemAllocAofRewrite(RedisModuleIO *aof, RedisModuleString *key, void *value) { + MemAllocObject *o = (MemAllocObject *)value; + if (o->size) { + int dbid = RedisModule_GetDbIdFromIO(aof); + int nokey; + size_t i = 0, j = 0; + struct MemBlock *mem = (struct MemBlock *)RedisModule_DictGet(mem_pool[dbid], (RedisModuleString *)key, &nokey); + RedisModule_Assert(nokey == 0 && mem != NULL); + size_t array_size = o->size * 2; + RedisModuleString ** string_array = RedisModule_Calloc(array_size, sizeof(RedisModuleString *)); + while (mem) { + string_array[i] = RedisModule_CreateStringFromLongLong(NULL, j); + string_array[i + 1] = RedisModule_CreateString(NULL, mem->block, BLOCK_SIZE); + mem = mem->next; + i += 2; + j++; + } + RedisModule_EmitAOF(aof, "mem.allocandwrite", "slv", key, o->size, string_array, array_size); + for (i = 0; i < array_size; i++) { + RedisModule_FreeString(NULL, string_array[i]); + } + RedisModule_Free(string_array); + } else { + RedisModule_EmitAOF(aof, "mem.allocandwrite", "sl", key, o->size); + } +} + +void MemAllocFree(void *value) { + RedisModule_Free(value); +} + +void MemAllocUnlink(RedisModuleString *key, const void *value) { + REDISMODULE_NOT_USED(key); + REDISMODULE_NOT_USED(value); + + /* When unlink and unlink2 exist at the same time, we will only call unlink2. */ + RedisModule_Assert(0); +} + +void MemAllocUnlink2(RedisModuleKeyOptCtx *ctx, const void *value) { + MemAllocObject *o = (MemAllocObject *)value; + + const RedisModuleString *key = RedisModule_GetKeyNameFromOptCtx(ctx); + int dbid = RedisModule_GetDbIdFromOptCtx(ctx); + + if (o->size) { + void *oldval; + RedisModule_DictDel(mem_pool[dbid], (RedisModuleString *)key, &oldval); + RedisModule_Assert(oldval != NULL); + MemBlockFree((struct MemBlock *)oldval); + } +} + +void MemAllocDigest(RedisModuleDigest *md, void *value) { + MemAllocObject *o = (MemAllocObject *)value; + RedisModule_DigestAddLongLong(md, o->size); + RedisModule_DigestAddLongLong(md, o->used); + RedisModule_DigestAddLongLong(md, o->mask); + + int dbid = RedisModule_GetDbIdFromDigest(md); + const RedisModuleString *key = RedisModule_GetKeyNameFromDigest(md); + + if (o->size) { + int nokey; + struct MemBlock *mem = (struct MemBlock *)RedisModule_DictGet(mem_pool[dbid], (RedisModuleString *)key, &nokey); + RedisModule_Assert(nokey == 0 && mem != NULL); + + struct MemBlock *block = mem; + while (block) { + RedisModule_DigestAddStringBuffer(md, (unsigned char *)block->block, BLOCK_SIZE); + block = block->next; + } + } +} + +void *MemAllocCopy2(RedisModuleKeyOptCtx *ctx, const void *value) { + const MemAllocObject *old = value; + MemAllocObject *new = createMemAllocObject(); + new->size = old->size; + new->used = old->used; + new->mask = old->mask; + + int from_dbid = RedisModule_GetDbIdFromOptCtx(ctx); + int to_dbid = RedisModule_GetToDbIdFromOptCtx(ctx); + const RedisModuleString *fromkey = RedisModule_GetKeyNameFromOptCtx(ctx); + const RedisModuleString *tokey = RedisModule_GetToKeyNameFromOptCtx(ctx); + + if (old->size) { + int nokey; + struct MemBlock *oldmem = (struct MemBlock *)RedisModule_DictGet(mem_pool[from_dbid], (RedisModuleString *)fromkey, &nokey); + RedisModule_Assert(nokey == 0 && oldmem != NULL); + struct MemBlock *newmem = MemBlockClone(oldmem); + RedisModule_Assert(newmem != NULL); + RedisModule_DictSet(mem_pool[to_dbid], (RedisModuleString *)tokey, newmem); + } + + return new; +} + +size_t MemAllocMemUsage2(RedisModuleKeyOptCtx *ctx, const void *value) { + REDISMODULE_NOT_USED(ctx); + uint64_t size = 0; + MemAllocObject *o = (MemAllocObject *)value; + + size += sizeof(*o); + size += o->size * sizeof(struct MemBlock); + + return size; +} + +size_t MemAllocMemFreeEffort2(RedisModuleKeyOptCtx *ctx, const void *value) { + REDISMODULE_NOT_USED(ctx); + MemAllocObject *o = (MemAllocObject *)value; + return o->size; +} + +int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + REDISMODULE_NOT_USED(argv); + REDISMODULE_NOT_USED(argc); + + if (RedisModule_Init(ctx, "datatype2", 1,REDISMODULE_APIVER_1) == REDISMODULE_ERR) { + return REDISMODULE_ERR; + } + + RedisModuleTypeMethods tm = { + .version = REDISMODULE_TYPE_METHOD_VERSION, + .rdb_load = MemAllocRdbLoad, + .rdb_save = MemAllocRdbSave, + .aof_rewrite = MemAllocAofRewrite, + .free = MemAllocFree, + .digest = MemAllocDigest, + .unlink = MemAllocUnlink, + // .defrag = MemAllocDefrag, // Tested in defragtest.c + .unlink2 = MemAllocUnlink2, + .copy2 = MemAllocCopy2, + .mem_usage2 = MemAllocMemUsage2, + .free_effort2 = MemAllocMemFreeEffort2, + }; + + MemAllocType = RedisModule_CreateDataType(ctx, "mem_alloc", 0, &tm); + if (MemAllocType == NULL) { + return REDISMODULE_ERR; + } + + if (RedisModule_CreateCommand(ctx, "mem.alloc", MemAlloc_RedisCommand, "write deny-oom", 1, 1, 1) == REDISMODULE_ERR) { + return REDISMODULE_ERR; + } + + if (RedisModule_CreateCommand(ctx, "mem.free", MemFree_RedisCommand, "write deny-oom", 1, 1, 1) == REDISMODULE_ERR) { + return REDISMODULE_ERR; + } + + if (RedisModule_CreateCommand(ctx, "mem.write", MemWrite_RedisCommand, "write deny-oom", 1, 1, 1) == REDISMODULE_ERR) { + return REDISMODULE_ERR; + } + + if (RedisModule_CreateCommand(ctx, "mem.read", MemRead_RedisCommand, "readonly", 1, 1, 1) == REDISMODULE_ERR) { + return REDISMODULE_ERR; + } + + if (RedisModule_CreateCommand(ctx, "mem.usage", MemUsage_RedisCommand, "readonly", 1, 1, 1) == REDISMODULE_ERR) { + return REDISMODULE_ERR; + } + + /* used for internal aof rewrite */ + if (RedisModule_CreateCommand(ctx, "mem.allocandwrite", MemAllocAndWrite_RedisCommand, "write deny-oom", 1, 1, 1) == REDISMODULE_ERR) { + return REDISMODULE_ERR; + } + + for(int i = 0; i < MAX_DB; i++){ + mem_pool[i] = RedisModule_CreateDict(NULL); + } + + RedisModule_SubscribeToServerEvent(ctx, RedisModuleEvent_FlushDB, flushdbCallback); + RedisModule_SubscribeToServerEvent(ctx, RedisModuleEvent_SwapDB, swapDbCallback); + + return REDISMODULE_OK; +} diff --git a/tests/modules/defragtest.c b/tests/modules/defragtest.c index b63680c63..1e4c810da 100644 --- a/tests/modules/defragtest.c +++ b/tests/modules/defragtest.c @@ -3,6 +3,7 @@ #define REDISMODULE_EXPERIMENTAL_API #include "redismodule.h" +#include static RedisModuleType *FragType; @@ -147,6 +148,9 @@ int FragDefrag(RedisModuleDefragCtx *ctx, RedisModuleString *key, void **value) unsigned long i = 0; int steps = 0; + int dbid = RedisModule_GetDbIdFromDefragCtx(ctx); + RedisModule_Assert(dbid != -1); + /* Attempt to get cursor, validate it's what we're exepcting */ if (RedisModule_DefragCursorGet(ctx, &i) == REDISMODULE_OK) { if (i > 0) datatype_resumes++; diff --git a/tests/unit/moduleapi/datatype2.tcl b/tests/unit/moduleapi/datatype2.tcl new file mode 100644 index 000000000..cccbb4d73 --- /dev/null +++ b/tests/unit/moduleapi/datatype2.tcl @@ -0,0 +1,243 @@ +set testmodule [file normalize tests/modules/datatype2.so] + +start_server {tags {"modules"}} { + r module load $testmodule + + test "datatype2: test mem alloc and free" { + r flushall + r select 0 + assert_equal 3 [r mem.alloc k1 3] + assert_equal 2 [r mem.alloc k2 2] + + r select 1 + + assert_equal 1 [r mem.alloc k1 1] + assert_equal 5 [r mem.alloc k2 5] + + r select 0 + assert_equal 1 [r mem.free k1] + assert_equal 1 [r mem.free k2] + + r select 1 + assert_equal 1 [r mem.free k1] + assert_equal 1 [r mem.free k2] + } + + test "datatype2: test del and unlink" { + r flushall + assert_equal 100 [r mem.alloc k1 100] + assert_equal 60 [r mem.alloc k2 60] + + assert_equal 1 [r unlink k1] + assert_equal 1 [r del k2] + } + + test "datatype2: test read and write" { + r flushall + + assert_equal 3 [r mem.alloc k1 3] + + set data datatype2 + assert_equal [string length $data] [r mem.write k1 0 $data] + assert_equal $data [r mem.read k1 0] + } + + test "datatype2: test rdb save and load" { + r flushall + r select 0 + + set data k1 + assert_equal 3 [r mem.alloc k1 3] + assert_equal [string length $data] [r mem.write k1 1 $data] + + set data k2 + assert_equal 2 [r mem.alloc k2 2] + assert_equal [string length $data] [r mem.write k2 0 $data] + + r select 1 + + set data k3 + assert_equal 3 [r mem.alloc k3 3] + assert_equal [string length $data] [r mem.write k3 1 $data] + + set data k4 + assert_equal 2 [r mem.alloc k4 2] + assert_equal [string length $data] [r mem.write k4 0 $data] + + r bgsave + waitForBgsave r + r debug reload + + r select 0 + assert_equal k1 [r mem.read k1 1] + assert_equal k2 [r mem.read k2 0] + + r select 1 + assert_equal k3 [r mem.read k3 1] + assert_equal k4 [r mem.read k4 0] + } + + test "datatype2: test aof rewrite" { + r flushall + r select 0 + + set data k1 + assert_equal 3 [r mem.alloc k1 3] + assert_equal [string length $data] [r mem.write k1 1 $data] + + set data k2 + assert_equal 2 [r mem.alloc k2 2] + assert_equal [string length $data] [r mem.write k2 0 $data] + + r select 1 + + set data k3 + assert_equal 3 [r mem.alloc k3 3] + assert_equal [string length $data] [r mem.write k3 1 $data] + + set data k4 + assert_equal 2 [r mem.alloc k4 2] + assert_equal [string length $data] [r mem.write k4 0 $data] + + r bgrewriteaof + waitForBgrewriteaof r + r debug loadaof + + r select 0 + assert_equal k1 [r mem.read k1 1] + assert_equal k2 [r mem.read k2 0] + + r select 1 + assert_equal k3 [r mem.read k3 1] + assert_equal k4 [r mem.read k4 0] + } + + test "datatype2: test copy" { + r flushall + r select 0 + + set data k1 + assert_equal 3 [r mem.alloc k1 3] + assert_equal [string length $data] [r mem.write k1 1 $data] + assert_equal $data [r mem.read k1 1] + + set data k2 + assert_equal 2 [r mem.alloc k2 2] + assert_equal [string length $data] [r mem.write k2 0 $data] + assert_equal $data [r mem.read k2 0] + + + r select 1 + + set data k3 + assert_equal 3 [r mem.alloc k3 3] + assert_equal [string length $data] [r mem.write k3 1 $data] + + set data k4 + assert_equal 2 [r mem.alloc k4 2] + assert_equal [string length $data] [r mem.write k4 0 $data] + + assert_equal {total 5 used 2} [r mem.usage 0] + assert_equal {total 5 used 2} [r mem.usage 1] + + r select 0 + + assert_equal 1 [r copy k1 k3] + assert_equal k1 [r mem.read k3 1] + + assert_equal {total 8 used 3} [r mem.usage 0] + + assert_equal 1 [r copy k2 k1 db 1] + r select 1 + assert_equal k2 [r mem.read k1 0] + + assert_equal {total 8 used 3} [r mem.usage 0] + assert_equal {total 7 used 3} [r mem.usage 1] + } + + test "datatype2: test swapdb" { + r flushall + r select 0 + + set data k1 + assert_equal 5 [r mem.alloc k1 5] + assert_equal [string length $data] [r mem.write k1 1 $data] + assert_equal $data [r mem.read k1 1] + + set data k2 + assert_equal 4 [r mem.alloc k2 4] + assert_equal [string length $data] [r mem.write k2 0 $data] + assert_equal $data [r mem.read k2 0] + + + r select 1 + + set data k1 + assert_equal 3 [r mem.alloc k3 3] + assert_equal [string length $data] [r mem.write k3 1 $data] + + set data k2 + assert_equal 2 [r mem.alloc k4 2] + assert_equal [string length $data] [r mem.write k4 0 $data] + + assert_equal {total 9 used 2} [r mem.usage 0] + assert_equal {total 5 used 2} [r mem.usage 1] + + assert_equal OK [r swapdb 0 1] + + assert_equal {total 9 used 2} [r mem.usage 1] + assert_equal {total 5 used 2} [r mem.usage 0] + } + + test "datatype2: test digest" { + r flushall + r select 0 + + set data k1 + assert_equal 3 [r mem.alloc k1 3] + assert_equal [string length $data] [r mem.write k1 1 $data] + assert_equal $data [r mem.read k1 1] + + set data k2 + assert_equal 2 [r mem.alloc k2 2] + assert_equal [string length $data] [r mem.write k2 0 $data] + assert_equal $data [r mem.read k2 0] + + r select 1 + + set data k1 + assert_equal 3 [r mem.alloc k1 3] + assert_equal [string length $data] [r mem.write k1 1 $data] + assert_equal $data [r mem.read k1 1] + + set data k2 + assert_equal 2 [r mem.alloc k2 2] + assert_equal [string length $data] [r mem.write k2 0 $data] + assert_equal $data [r mem.read k2 0] + + + r select 0 + set digest0 [r debug digest] + + r select 1 + set digest1 [r debug digest] + + assert_equal $digest0 $digest1 + } + + test "datatype2: test memusage" { + r flushall + + set data k1 + assert_equal 3 [r mem.alloc k1 3] + assert_equal [string length $data] [r mem.write k1 1 $data] + assert_equal $data [r mem.read k1 1] + + set data k2 + assert_equal 3 [r mem.alloc k2 3] + assert_equal [string length $data] [r mem.write k2 0 $data] + assert_equal $data [r mem.read k2 0] + + assert_equal [r memory usage k1] [r memory usage k2] + } +} \ No newline at end of file