mirror of
https://codeberg.org/redict/redict.git
synced 2025-01-22 08:08:53 -05:00
Added scan implementation to module api.
The implementation expose the following new functions: 1. RedisModule_CursorCreate - allow to create a new cursor object for keys scanning 2. RedisModule_CursorRestart - restart an existing cursor to restart the scan 3. RedisModule_CursorDestroy - destroy an existing cursor 4. RedisModule_Scan - scan keys The RedisModule_Scan function gets a cursor object, a callback and void* (used as user private data). The callback will be called for each key in the database proving the key name and the value as RedisModuleKey.
This commit is contained in:
parent
0f026af185
commit
11c6ce812a
@ -22,4 +22,5 @@ $TCLSH tests/test_helper.tcl \
|
|||||||
--single unit/moduleapi/hooks \
|
--single unit/moduleapi/hooks \
|
||||||
--single unit/moduleapi/misc \
|
--single unit/moduleapi/misc \
|
||||||
--single unit/moduleapi/blockonkeys \
|
--single unit/moduleapi/blockonkeys \
|
||||||
|
--single unit/moduleapi/scan \
|
||||||
"${@}"
|
"${@}"
|
||||||
|
133
src/module.c
133
src/module.c
@ -1848,6 +1848,17 @@ int RM_SelectDb(RedisModuleCtx *ctx, int newid) {
|
|||||||
return (retval == C_OK) ? REDISMODULE_OK : REDISMODULE_ERR;
|
return (retval == C_OK) ? REDISMODULE_OK : REDISMODULE_ERR;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void initializeKey(RedisModuleKey *kp, RedisModuleCtx *ctx, robj *keyname, robj *value, int mode){
|
||||||
|
kp->ctx = ctx;
|
||||||
|
kp->db = ctx->client->db;
|
||||||
|
kp->key = keyname;
|
||||||
|
incrRefCount(keyname);
|
||||||
|
kp->value = value;
|
||||||
|
kp->iter = NULL;
|
||||||
|
kp->mode = mode;
|
||||||
|
zsetKeyReset(kp);
|
||||||
|
}
|
||||||
|
|
||||||
/* Return an handle representing a Redis key, so that it is possible
|
/* Return an handle representing a Redis key, so that it is possible
|
||||||
* to call other APIs with the key handle as argument to perform
|
* to call other APIs with the key handle as argument to perform
|
||||||
* operations on the key.
|
* operations on the key.
|
||||||
@ -1878,27 +1889,24 @@ void *RM_OpenKey(RedisModuleCtx *ctx, robj *keyname, int mode) {
|
|||||||
|
|
||||||
/* Setup the key handle. */
|
/* Setup the key handle. */
|
||||||
kp = zmalloc(sizeof(*kp));
|
kp = zmalloc(sizeof(*kp));
|
||||||
kp->ctx = ctx;
|
initializeKey(kp, ctx, keyname, value, mode);
|
||||||
kp->db = ctx->client->db;
|
|
||||||
kp->key = keyname;
|
|
||||||
incrRefCount(keyname);
|
|
||||||
kp->value = value;
|
|
||||||
kp->iter = NULL;
|
|
||||||
kp->mode = mode;
|
|
||||||
zsetKeyReset(kp);
|
|
||||||
autoMemoryAdd(ctx,REDISMODULE_AM_KEY,kp);
|
autoMemoryAdd(ctx,REDISMODULE_AM_KEY,kp);
|
||||||
return (void*)kp;
|
return (void*)kp;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Close a key handle. */
|
static void closeKeyInternal(RedisModuleKey *key) {
|
||||||
void RM_CloseKey(RedisModuleKey *key) {
|
|
||||||
if (key == NULL) return;
|
|
||||||
int signal = SHOULD_SIGNAL_MODIFIED_KEYS(key->ctx);
|
int signal = SHOULD_SIGNAL_MODIFIED_KEYS(key->ctx);
|
||||||
if ((key->mode & REDISMODULE_WRITE) && signal)
|
if ((key->mode & REDISMODULE_WRITE) && signal)
|
||||||
signalModifiedKey(key->db,key->key);
|
signalModifiedKey(key->db,key->key);
|
||||||
/* TODO: if (key->iter) RM_KeyIteratorStop(kp); */
|
/* TODO: if (key->iter) RM_KeyIteratorStop(kp); */
|
||||||
RM_ZsetRangeStop(key);
|
RM_ZsetRangeStop(key);
|
||||||
decrRefCount(key->key);
|
decrRefCount(key->key);
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Close a key handle. */
|
||||||
|
void RM_CloseKey(RedisModuleKey *key) {
|
||||||
|
if (key == NULL) return;
|
||||||
|
closeKeyInternal(key);
|
||||||
autoMemoryFreed(key->ctx,REDISMODULE_AM_KEY,key);
|
autoMemoryFreed(key->ctx,REDISMODULE_AM_KEY,key);
|
||||||
zfree(key);
|
zfree(key);
|
||||||
}
|
}
|
||||||
@ -5891,6 +5899,105 @@ int RM_CommandFilterArgDelete(RedisModuleCommandFilterCtx *fctx, int pos)
|
|||||||
return REDISMODULE_OK;
|
return REDISMODULE_OK;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Callback for scan implementation.
|
||||||
|
*
|
||||||
|
* The keyname is owned by the caller and need to be retained if used after this function.
|
||||||
|
*
|
||||||
|
* The kp is the data and provide using the best efforts approach, in some cases it might
|
||||||
|
* not be available (in such case it will be set to NULL) and it is the user responsibility
|
||||||
|
* to handle it.
|
||||||
|
*
|
||||||
|
* The kp (if given) is owned by the caller and will be free when the callback returns
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
typedef void (*RedisModuleScanCB)(void *privdata, RedisModuleString* keyname, RedisModuleKey* key);
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
RedisModuleCtx *ctx;
|
||||||
|
void* user_data;
|
||||||
|
RedisModuleScanCB fn;
|
||||||
|
} ScanCBData;
|
||||||
|
|
||||||
|
typedef struct RedisModuleCursor{
|
||||||
|
int cursor;
|
||||||
|
}RedisModuleCursor;
|
||||||
|
|
||||||
|
void ScanCallback(void *privdata, const dictEntry *de) {
|
||||||
|
ScanCBData *data = privdata;
|
||||||
|
sds key = dictGetKey(de);
|
||||||
|
robj* val = dictGetVal(de);
|
||||||
|
RedisModuleString *keyname = createObject(OBJ_STRING,sdsdup(key));
|
||||||
|
|
||||||
|
/* Setup the key handle. */
|
||||||
|
RedisModuleKey kp = {0};
|
||||||
|
initializeKey(&kp, data->ctx, keyname, val, REDISMODULE_READ);
|
||||||
|
|
||||||
|
data->fn(data->user_data, keyname, &kp);
|
||||||
|
|
||||||
|
closeKeyInternal(&kp);
|
||||||
|
decrRefCount(keyname);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a new cursor to scan keys.
|
||||||
|
*/
|
||||||
|
RedisModuleCursor* RM_CursorCreate() {
|
||||||
|
RedisModuleCursor* cursor = zmalloc(sizeof(*cursor));
|
||||||
|
cursor->cursor = 0;
|
||||||
|
return cursor;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Restart an existing cursor. The keys will be rescanned.
|
||||||
|
*/
|
||||||
|
void RM_CursorRestart(RedisModuleCursor* cursor) {
|
||||||
|
cursor->cursor = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Destroy the cursor struct.
|
||||||
|
*/
|
||||||
|
void RM_CursorDestroy(RedisModuleCursor* cursor) {
|
||||||
|
zfree(cursor);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Scan api that allows module writer to scan all the keys and value in redis.
|
||||||
|
* The way it should be used:
|
||||||
|
* Cursor* c = RedisModule_CursorCreate();
|
||||||
|
* while(RedisModule_Scan(ctx, c, callback, privateData));
|
||||||
|
* RedisModule_CursorDestroy(c);
|
||||||
|
*
|
||||||
|
* It is also possible to use this api from another thread such that the GIL only have to
|
||||||
|
* be acquired durring the actuall call to RM_Scan:
|
||||||
|
* Cursor* c = RedisModule_CursorCreate();
|
||||||
|
* RedisModule_ThreadSafeCtxLock(ctx);
|
||||||
|
* while(RedisModule_Scan(ctx, c, callback, privateData)){
|
||||||
|
* RedisModule_ThreadSafeCtxUnlock(ctx);
|
||||||
|
* // do some background job
|
||||||
|
* RedisModule_ThreadSafeCtxLock(ctx);
|
||||||
|
* }
|
||||||
|
* RedisModule_CursorDestroy(c);
|
||||||
|
*
|
||||||
|
* The function will return 1 if there is more elements to scan and 0 otherwise.
|
||||||
|
* It is also possible to restart and existing cursor using RM_CursorRestart
|
||||||
|
*/
|
||||||
|
int RM_Scan(RedisModuleCtx *ctx, RedisModuleCursor* cursor, RedisModuleScanCB fn, void* privdata) {
|
||||||
|
if(cursor->cursor == -1){
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
int ret = 1;
|
||||||
|
ScanCBData data = { ctx, privdata, fn };
|
||||||
|
cursor->cursor = dictScan(ctx->client->db->dict, cursor->cursor, ScanCallback, NULL, &data);
|
||||||
|
if (cursor->cursor == 0){
|
||||||
|
cursor->cursor = -1;
|
||||||
|
ret = 0;
|
||||||
|
}
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/* --------------------------------------------------------------------------
|
/* --------------------------------------------------------------------------
|
||||||
* Module fork API
|
* Module fork API
|
||||||
* -------------------------------------------------------------------------- */
|
* -------------------------------------------------------------------------- */
|
||||||
@ -6969,6 +7076,10 @@ void moduleRegisterCoreAPI(void) {
|
|||||||
REGISTER_API(SetLRUOrLFU);
|
REGISTER_API(SetLRUOrLFU);
|
||||||
REGISTER_API(GetLRUOrLFU);
|
REGISTER_API(GetLRUOrLFU);
|
||||||
REGISTER_API(BlockClientOnKeys);
|
REGISTER_API(BlockClientOnKeys);
|
||||||
|
REGISTER_API(Scan);
|
||||||
|
REGISTER_API(CursorCreate);
|
||||||
|
REGISTER_API(CursorDestroy);
|
||||||
|
REGISTER_API(CursorRestart);
|
||||||
REGISTER_API(SignalKeyAsReady);
|
REGISTER_API(SignalKeyAsReady);
|
||||||
REGISTER_API(GetBlockedClientReadyKey);
|
REGISTER_API(GetBlockedClientReadyKey);
|
||||||
}
|
}
|
||||||
|
@ -392,6 +392,7 @@ typedef struct RedisModuleDictIter RedisModuleDictIter;
|
|||||||
typedef struct RedisModuleCommandFilterCtx RedisModuleCommandFilterCtx;
|
typedef struct RedisModuleCommandFilterCtx RedisModuleCommandFilterCtx;
|
||||||
typedef struct RedisModuleCommandFilter RedisModuleCommandFilter;
|
typedef struct RedisModuleCommandFilter RedisModuleCommandFilter;
|
||||||
typedef struct RedisModuleInfoCtx RedisModuleInfoCtx;
|
typedef struct RedisModuleInfoCtx RedisModuleInfoCtx;
|
||||||
|
typedef struct RedisModuleCursor RedisModuleCursor;
|
||||||
|
|
||||||
typedef int (*RedisModuleCmdFunc)(RedisModuleCtx *ctx, RedisModuleString **argv, int argc);
|
typedef int (*RedisModuleCmdFunc)(RedisModuleCtx *ctx, RedisModuleString **argv, int argc);
|
||||||
typedef void (*RedisModuleDisconnectFunc)(RedisModuleCtx *ctx, RedisModuleBlockedClient *bc);
|
typedef void (*RedisModuleDisconnectFunc)(RedisModuleCtx *ctx, RedisModuleBlockedClient *bc);
|
||||||
@ -409,6 +410,7 @@ typedef void (*RedisModuleTimerProc)(RedisModuleCtx *ctx, void *data);
|
|||||||
typedef void (*RedisModuleCommandFilterFunc) (RedisModuleCommandFilterCtx *filter);
|
typedef void (*RedisModuleCommandFilterFunc) (RedisModuleCommandFilterCtx *filter);
|
||||||
typedef void (*RedisModuleForkDoneHandler) (int exitcode, int bysignal, void *user_data);
|
typedef void (*RedisModuleForkDoneHandler) (int exitcode, int bysignal, void *user_data);
|
||||||
typedef void (*RedisModuleInfoFunc)(RedisModuleInfoCtx *ctx, int for_crash_report);
|
typedef void (*RedisModuleInfoFunc)(RedisModuleInfoCtx *ctx, int for_crash_report);
|
||||||
|
typedef void (*RedisModuleScanCB)(void *privdata, RedisModuleString* keyname, RedisModuleKey* key);
|
||||||
|
|
||||||
#define REDISMODULE_TYPE_METHOD_VERSION 2
|
#define REDISMODULE_TYPE_METHOD_VERSION 2
|
||||||
typedef struct RedisModuleTypeMethods {
|
typedef struct RedisModuleTypeMethods {
|
||||||
@ -633,6 +635,10 @@ int REDISMODULE_API_FUNC(RedisModule_CommandFilterArgDelete)(RedisModuleCommandF
|
|||||||
int REDISMODULE_API_FUNC(RedisModule_Fork)(RedisModuleForkDoneHandler cb, void *user_data);
|
int REDISMODULE_API_FUNC(RedisModule_Fork)(RedisModuleForkDoneHandler cb, void *user_data);
|
||||||
int REDISMODULE_API_FUNC(RedisModule_ExitFromChild)(int retcode);
|
int REDISMODULE_API_FUNC(RedisModule_ExitFromChild)(int retcode);
|
||||||
int REDISMODULE_API_FUNC(RedisModule_KillForkChild)(int child_pid);
|
int REDISMODULE_API_FUNC(RedisModule_KillForkChild)(int child_pid);
|
||||||
|
RedisModuleCursor* REDISMODULE_API_FUNC(RedisModule_CursorCreate)();
|
||||||
|
void REDISMODULE_API_FUNC(RedisModule_CursorRestart)(RedisModuleCursor* cursor);
|
||||||
|
void REDISMODULE_API_FUNC(RedisModule_CursorDestroy)(RedisModuleCursor* cursor);
|
||||||
|
int REDISMODULE_API_FUNC(RedisModule_Scan)(RedisModuleCtx *ctx, RedisModuleCursor* cursor, RedisModuleScanCB fn, void* privdata);
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
#define RedisModule_IsAOFClient(id) ((id) == UINT64_MAX)
|
#define RedisModule_IsAOFClient(id) ((id) == UINT64_MAX)
|
||||||
@ -842,6 +848,10 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int
|
|||||||
REDISMODULE_GET_API(Fork);
|
REDISMODULE_GET_API(Fork);
|
||||||
REDISMODULE_GET_API(ExitFromChild);
|
REDISMODULE_GET_API(ExitFromChild);
|
||||||
REDISMODULE_GET_API(KillForkChild);
|
REDISMODULE_GET_API(KillForkChild);
|
||||||
|
REDISMODULE_GET_API(Scan);
|
||||||
|
REDISMODULE_GET_API(CursorCreate);
|
||||||
|
REDISMODULE_GET_API(CursorRestart);
|
||||||
|
REDISMODULE_GET_API(CursorDestroy);
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
if (RedisModule_IsModuleNameBusy && RedisModule_IsModuleNameBusy(name)) return REDISMODULE_ERR;
|
if (RedisModule_IsModuleNameBusy && RedisModule_IsModuleNameBusy(name)) return REDISMODULE_ERR;
|
||||||
|
@ -19,7 +19,8 @@ TEST_MODULES = \
|
|||||||
propagate.so \
|
propagate.so \
|
||||||
misc.so \
|
misc.so \
|
||||||
hooks.so \
|
hooks.so \
|
||||||
blockonkeys.so
|
blockonkeys.so \
|
||||||
|
scan.so
|
||||||
|
|
||||||
.PHONY: all
|
.PHONY: all
|
||||||
|
|
||||||
|
62
tests/modules/scan.c
Normal file
62
tests/modules/scan.c
Normal file
@ -0,0 +1,62 @@
|
|||||||
|
#define REDISMODULE_EXPERIMENTAL_API
|
||||||
|
#include "redismodule.h"
|
||||||
|
|
||||||
|
#include <string.h>
|
||||||
|
#include <assert.h>
|
||||||
|
#include <unistd.h>
|
||||||
|
|
||||||
|
#define UNUSED(V) ((void) V)
|
||||||
|
|
||||||
|
typedef struct scan_pd{
|
||||||
|
size_t nkeys;
|
||||||
|
RedisModuleCtx *ctx;
|
||||||
|
} scan_pd;
|
||||||
|
|
||||||
|
void scan_callback(void *privdata, RedisModuleString* keyname, RedisModuleKey* key){
|
||||||
|
scan_pd* pd = privdata;
|
||||||
|
RedisModule_ReplyWithArray(pd->ctx, 2);
|
||||||
|
|
||||||
|
RedisModule_ReplyWithString(pd->ctx, keyname);
|
||||||
|
if(key && RedisModule_KeyType(key) == REDISMODULE_KEYTYPE_STRING){
|
||||||
|
size_t len;
|
||||||
|
char * data = RedisModule_StringDMA(key, &len, REDISMODULE_READ);
|
||||||
|
RedisModule_ReplyWithStringBuffer(pd->ctx, data, len);
|
||||||
|
}else{
|
||||||
|
RedisModule_ReplyWithNull(pd->ctx);
|
||||||
|
}
|
||||||
|
pd->nkeys++;
|
||||||
|
}
|
||||||
|
|
||||||
|
int scan_keys_values(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
|
||||||
|
{
|
||||||
|
scan_pd pd = {
|
||||||
|
.nkeys = 0,
|
||||||
|
.ctx = ctx,
|
||||||
|
};
|
||||||
|
|
||||||
|
RedisModule_ReplyWithArray(ctx, REDISMODULE_POSTPONED_ARRAY_LEN);
|
||||||
|
|
||||||
|
RedisModuleCursor* cursor = RedisModule_CursorCreate();
|
||||||
|
while(RedisModule_Scan(ctx, cursor, scan_callback, &pd));
|
||||||
|
RedisModule_CursorDestroy(cursor);
|
||||||
|
|
||||||
|
RedisModule_ReplySetArrayLength(ctx, pd.nkeys);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
|
||||||
|
UNUSED(argv);
|
||||||
|
UNUSED(argc);
|
||||||
|
if (RedisModule_Init(ctx, "scan", 1, REDISMODULE_APIVER_1)== REDISMODULE_ERR)
|
||||||
|
return REDISMODULE_ERR;
|
||||||
|
|
||||||
|
if (RedisModule_CreateCommand(ctx, "scan.scankeysvalues", scan_keys_values, "", 0, 0, 0) == REDISMODULE_ERR)
|
||||||
|
return REDISMODULE_ERR;
|
||||||
|
|
||||||
|
return REDISMODULE_OK;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
18
tests/unit/moduleapi/scan.tcl
Normal file
18
tests/unit/moduleapi/scan.tcl
Normal file
@ -0,0 +1,18 @@
|
|||||||
|
set testmodule [file normalize tests/modules/scan.so]
|
||||||
|
|
||||||
|
proc count_log_message {pattern} {
|
||||||
|
set result [exec grep -c $pattern < [srv 0 stdout]]
|
||||||
|
}
|
||||||
|
|
||||||
|
start_server {tags {"modules"}} {
|
||||||
|
r module load $testmodule
|
||||||
|
|
||||||
|
test {Module scan} {
|
||||||
|
# the module create a scan command which also return values
|
||||||
|
r set x 1
|
||||||
|
r set y 2
|
||||||
|
r set z 3
|
||||||
|
lsort [r scan.scankeysvalues]
|
||||||
|
} {{x 1} {y 2} {z 3}}
|
||||||
|
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user