From 11c6ce812aa32cf6a6011697cbfe8881ff9450fa Mon Sep 17 00:00:00 2001 From: "meir@redislabs.com" Date: Thu, 17 Oct 2019 15:37:01 +0300 Subject: [PATCH] Added scan implementation to module api. The implementation expose the following new functions: 1. RedisModule_CursorCreate - allow to create a new cursor object for keys scanning 2. RedisModule_CursorRestart - restart an existing cursor to restart the scan 3. RedisModule_CursorDestroy - destroy an existing cursor 4. RedisModule_Scan - scan keys The RedisModule_Scan function gets a cursor object, a callback and void* (used as user private data). The callback will be called for each key in the database proving the key name and the value as RedisModuleKey. --- runtest-moduleapi | 1 + src/module.c | 133 +++++++++++++++++++++++++++++++--- src/redismodule.h | 10 +++ tests/modules/Makefile | 3 +- tests/modules/scan.c | 62 ++++++++++++++++ tests/unit/moduleapi/scan.tcl | 18 +++++ 6 files changed, 215 insertions(+), 12 deletions(-) create mode 100644 tests/modules/scan.c create mode 100644 tests/unit/moduleapi/scan.tcl diff --git a/runtest-moduleapi b/runtest-moduleapi index e48535126..3eb6b21b2 100755 --- a/runtest-moduleapi +++ b/runtest-moduleapi @@ -22,4 +22,5 @@ $TCLSH tests/test_helper.tcl \ --single unit/moduleapi/hooks \ --single unit/moduleapi/misc \ --single unit/moduleapi/blockonkeys \ +--single unit/moduleapi/scan \ "${@}" diff --git a/src/module.c b/src/module.c index ad34e7b64..5758abbb6 100644 --- a/src/module.c +++ b/src/module.c @@ -1848,6 +1848,17 @@ int RM_SelectDb(RedisModuleCtx *ctx, int newid) { return (retval == C_OK) ? REDISMODULE_OK : REDISMODULE_ERR; } +static void initializeKey(RedisModuleKey *kp, RedisModuleCtx *ctx, robj *keyname, robj *value, int mode){ + kp->ctx = ctx; + kp->db = ctx->client->db; + kp->key = keyname; + incrRefCount(keyname); + kp->value = value; + kp->iter = NULL; + kp->mode = mode; + zsetKeyReset(kp); +} + /* Return an handle representing a Redis key, so that it is possible * to call other APIs with the key handle as argument to perform * operations on the key. @@ -1878,27 +1889,24 @@ void *RM_OpenKey(RedisModuleCtx *ctx, robj *keyname, int mode) { /* Setup the key handle. */ kp = zmalloc(sizeof(*kp)); - kp->ctx = ctx; - kp->db = ctx->client->db; - kp->key = keyname; - incrRefCount(keyname); - kp->value = value; - kp->iter = NULL; - kp->mode = mode; - zsetKeyReset(kp); + initializeKey(kp, ctx, keyname, value, mode); autoMemoryAdd(ctx,REDISMODULE_AM_KEY,kp); return (void*)kp; } -/* Close a key handle. */ -void RM_CloseKey(RedisModuleKey *key) { - if (key == NULL) return; +static void closeKeyInternal(RedisModuleKey *key) { int signal = SHOULD_SIGNAL_MODIFIED_KEYS(key->ctx); if ((key->mode & REDISMODULE_WRITE) && signal) signalModifiedKey(key->db,key->key); /* TODO: if (key->iter) RM_KeyIteratorStop(kp); */ RM_ZsetRangeStop(key); decrRefCount(key->key); +} + +/* Close a key handle. */ +void RM_CloseKey(RedisModuleKey *key) { + if (key == NULL) return; + closeKeyInternal(key); autoMemoryFreed(key->ctx,REDISMODULE_AM_KEY,key); zfree(key); } @@ -5891,6 +5899,105 @@ int RM_CommandFilterArgDelete(RedisModuleCommandFilterCtx *fctx, int pos) return REDISMODULE_OK; } +/** + * Callback for scan implementation. + * + * The keyname is owned by the caller and need to be retained if used after this function. + * + * The kp is the data and provide using the best efforts approach, in some cases it might + * not be available (in such case it will be set to NULL) and it is the user responsibility + * to handle it. + * + * The kp (if given) is owned by the caller and will be free when the callback returns + * + */ +typedef void (*RedisModuleScanCB)(void *privdata, RedisModuleString* keyname, RedisModuleKey* key); + +typedef struct { + RedisModuleCtx *ctx; + void* user_data; + RedisModuleScanCB fn; +} ScanCBData; + +typedef struct RedisModuleCursor{ + int cursor; +}RedisModuleCursor; + +void ScanCallback(void *privdata, const dictEntry *de) { + ScanCBData *data = privdata; + sds key = dictGetKey(de); + robj* val = dictGetVal(de); + RedisModuleString *keyname = createObject(OBJ_STRING,sdsdup(key)); + + /* Setup the key handle. */ + RedisModuleKey kp = {0}; + initializeKey(&kp, data->ctx, keyname, val, REDISMODULE_READ); + + data->fn(data->user_data, keyname, &kp); + + closeKeyInternal(&kp); + decrRefCount(keyname); +} + +/** + * Create a new cursor to scan keys. + */ +RedisModuleCursor* RM_CursorCreate() { + RedisModuleCursor* cursor = zmalloc(sizeof(*cursor)); + cursor->cursor = 0; + return cursor; +} + +/** + * Restart an existing cursor. The keys will be rescanned. + */ +void RM_CursorRestart(RedisModuleCursor* cursor) { + cursor->cursor = 0; +} + +/** + * Destroy the cursor struct. + */ +void RM_CursorDestroy(RedisModuleCursor* cursor) { + zfree(cursor); +} + +/** + * Scan api that allows module writer to scan all the keys and value in redis. + * The way it should be used: + * Cursor* c = RedisModule_CursorCreate(); + * while(RedisModule_Scan(ctx, c, callback, privateData)); + * RedisModule_CursorDestroy(c); + * + * It is also possible to use this api from another thread such that the GIL only have to + * be acquired durring the actuall call to RM_Scan: + * Cursor* c = RedisModule_CursorCreate(); + * RedisModule_ThreadSafeCtxLock(ctx); + * while(RedisModule_Scan(ctx, c, callback, privateData)){ + * RedisModule_ThreadSafeCtxUnlock(ctx); + * // do some background job + * RedisModule_ThreadSafeCtxLock(ctx); + * } + * RedisModule_CursorDestroy(c); + * + * The function will return 1 if there is more elements to scan and 0 otherwise. + * It is also possible to restart and existing cursor using RM_CursorRestart + */ +int RM_Scan(RedisModuleCtx *ctx, RedisModuleCursor* cursor, RedisModuleScanCB fn, void* privdata) { + if(cursor->cursor == -1){ + return 0; + } + int ret = 1; + ScanCBData data = { ctx, privdata, fn }; + cursor->cursor = dictScan(ctx->client->db->dict, cursor->cursor, ScanCallback, NULL, &data); + if (cursor->cursor == 0){ + cursor->cursor = -1; + ret = 0; + } + return ret; +} + + /* -------------------------------------------------------------------------- * Module fork API * -------------------------------------------------------------------------- */ @@ -6969,6 +7076,10 @@ void moduleRegisterCoreAPI(void) { REGISTER_API(SetLRUOrLFU); REGISTER_API(GetLRUOrLFU); REGISTER_API(BlockClientOnKeys); + REGISTER_API(Scan); + REGISTER_API(CursorCreate); + REGISTER_API(CursorDestroy); + REGISTER_API(CursorRestart); REGISTER_API(SignalKeyAsReady); REGISTER_API(GetBlockedClientReadyKey); } diff --git a/src/redismodule.h b/src/redismodule.h index 728c7f584..c74772d0f 100644 --- a/src/redismodule.h +++ b/src/redismodule.h @@ -392,6 +392,7 @@ typedef struct RedisModuleDictIter RedisModuleDictIter; typedef struct RedisModuleCommandFilterCtx RedisModuleCommandFilterCtx; typedef struct RedisModuleCommandFilter RedisModuleCommandFilter; typedef struct RedisModuleInfoCtx RedisModuleInfoCtx; +typedef struct RedisModuleCursor RedisModuleCursor; typedef int (*RedisModuleCmdFunc)(RedisModuleCtx *ctx, RedisModuleString **argv, int argc); typedef void (*RedisModuleDisconnectFunc)(RedisModuleCtx *ctx, RedisModuleBlockedClient *bc); @@ -409,6 +410,7 @@ typedef void (*RedisModuleTimerProc)(RedisModuleCtx *ctx, void *data); typedef void (*RedisModuleCommandFilterFunc) (RedisModuleCommandFilterCtx *filter); typedef void (*RedisModuleForkDoneHandler) (int exitcode, int bysignal, void *user_data); typedef void (*RedisModuleInfoFunc)(RedisModuleInfoCtx *ctx, int for_crash_report); +typedef void (*RedisModuleScanCB)(void *privdata, RedisModuleString* keyname, RedisModuleKey* key); #define REDISMODULE_TYPE_METHOD_VERSION 2 typedef struct RedisModuleTypeMethods { @@ -633,6 +635,10 @@ int REDISMODULE_API_FUNC(RedisModule_CommandFilterArgDelete)(RedisModuleCommandF int REDISMODULE_API_FUNC(RedisModule_Fork)(RedisModuleForkDoneHandler cb, void *user_data); int REDISMODULE_API_FUNC(RedisModule_ExitFromChild)(int retcode); int REDISMODULE_API_FUNC(RedisModule_KillForkChild)(int child_pid); +RedisModuleCursor* REDISMODULE_API_FUNC(RedisModule_CursorCreate)(); +void REDISMODULE_API_FUNC(RedisModule_CursorRestart)(RedisModuleCursor* cursor); +void REDISMODULE_API_FUNC(RedisModule_CursorDestroy)(RedisModuleCursor* cursor); +int REDISMODULE_API_FUNC(RedisModule_Scan)(RedisModuleCtx *ctx, RedisModuleCursor* cursor, RedisModuleScanCB fn, void* privdata); #endif #define RedisModule_IsAOFClient(id) ((id) == UINT64_MAX) @@ -842,6 +848,10 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int REDISMODULE_GET_API(Fork); REDISMODULE_GET_API(ExitFromChild); REDISMODULE_GET_API(KillForkChild); + REDISMODULE_GET_API(Scan); + REDISMODULE_GET_API(CursorCreate); + REDISMODULE_GET_API(CursorRestart); + REDISMODULE_GET_API(CursorDestroy); #endif if (RedisModule_IsModuleNameBusy && RedisModule_IsModuleNameBusy(name)) return REDISMODULE_ERR; diff --git a/tests/modules/Makefile b/tests/modules/Makefile index 9e27758a2..07c3cb829 100644 --- a/tests/modules/Makefile +++ b/tests/modules/Makefile @@ -19,7 +19,8 @@ TEST_MODULES = \ propagate.so \ misc.so \ hooks.so \ - blockonkeys.so + blockonkeys.so \ + scan.so .PHONY: all diff --git a/tests/modules/scan.c b/tests/modules/scan.c new file mode 100644 index 000000000..21071720a --- /dev/null +++ b/tests/modules/scan.c @@ -0,0 +1,62 @@ +#define REDISMODULE_EXPERIMENTAL_API +#include "redismodule.h" + +#include +#include +#include + +#define UNUSED(V) ((void) V) + +typedef struct scan_pd{ + size_t nkeys; + RedisModuleCtx *ctx; +} scan_pd; + +void scan_callback(void *privdata, RedisModuleString* keyname, RedisModuleKey* key){ + scan_pd* pd = privdata; + RedisModule_ReplyWithArray(pd->ctx, 2); + + RedisModule_ReplyWithString(pd->ctx, keyname); + if(key && RedisModule_KeyType(key) == REDISMODULE_KEYTYPE_STRING){ + size_t len; + char * data = RedisModule_StringDMA(key, &len, REDISMODULE_READ); + RedisModule_ReplyWithStringBuffer(pd->ctx, data, len); + }else{ + RedisModule_ReplyWithNull(pd->ctx); + } + pd->nkeys++; +} + +int scan_keys_values(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) +{ + scan_pd pd = { + .nkeys = 0, + .ctx = ctx, + }; + + RedisModule_ReplyWithArray(ctx, REDISMODULE_POSTPONED_ARRAY_LEN); + + RedisModuleCursor* cursor = RedisModule_CursorCreate(); + while(RedisModule_Scan(ctx, cursor, scan_callback, &pd)); + RedisModule_CursorDestroy(cursor); + + RedisModule_ReplySetArrayLength(ctx, pd.nkeys); + return 0; +} + +int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + UNUSED(argv); + UNUSED(argc); + if (RedisModule_Init(ctx, "scan", 1, REDISMODULE_APIVER_1)== REDISMODULE_ERR) + return REDISMODULE_ERR; + + if (RedisModule_CreateCommand(ctx, "scan.scankeysvalues", scan_keys_values, "", 0, 0, 0) == REDISMODULE_ERR) + return REDISMODULE_ERR; + + return REDISMODULE_OK; +} + + + + + diff --git a/tests/unit/moduleapi/scan.tcl b/tests/unit/moduleapi/scan.tcl new file mode 100644 index 000000000..5a77e8195 --- /dev/null +++ b/tests/unit/moduleapi/scan.tcl @@ -0,0 +1,18 @@ +set testmodule [file normalize tests/modules/scan.so] + +proc count_log_message {pattern} { + set result [exec grep -c $pattern < [srv 0 stdout]] +} + +start_server {tags {"modules"}} { + r module load $testmodule + + test {Module scan} { + # the module create a scan command which also return values + r set x 1 + r set y 2 + r set z 3 + lsort [r scan.scankeysvalues] + } {{x 1} {y 2} {z 3}} + +} \ No newline at end of file