#define REDISMODULE_EXPERIMENTAL_API #include "redismodule.h" #include #include #include #define LIST_SIZE 1024 typedef struct { long long list[LIST_SIZE]; long long length; } fsl_t; /* Fixed-size list */ static RedisModuleType *fsltype = NULL; fsl_t *fsl_type_create() { fsl_t *o; o = RedisModule_Alloc(sizeof(*o)); o->length = 0; return o; } void fsl_type_free(fsl_t *o) { RedisModule_Free(o); } /* ========================== "fsltype" type methods ======================= */ void *fsl_rdb_load(RedisModuleIO *rdb, int encver) { if (encver != 0) { return NULL; } fsl_t *fsl = fsl_type_create(); fsl->length = RedisModule_LoadUnsigned(rdb); for (long long i = 0; i < fsl->length; i++) fsl->list[i] = RedisModule_LoadSigned(rdb); return fsl; } void fsl_rdb_save(RedisModuleIO *rdb, void *value) { fsl_t *fsl = value; RedisModule_SaveUnsigned(rdb,fsl->length); for (long long i = 0; i < fsl->length; i++) RedisModule_SaveSigned(rdb, fsl->list[i]); } void fsl_aofrw(RedisModuleIO *aof, RedisModuleString *key, void *value) { fsl_t *fsl = value; for (long long i = 0; i < fsl->length; i++) RedisModule_EmitAOF(aof, "FSL.PUSH","sl", key, fsl->list[i]); } void fsl_free(void *value) { fsl_type_free(value); } /* ========================== helper methods ======================= */ int get_fsl(RedisModuleCtx *ctx, RedisModuleString *keyname, int mode, int create, fsl_t **fsl, int reply_on_failure) { RedisModuleKey *key = RedisModule_OpenKey(ctx, keyname, mode); int type = RedisModule_KeyType(key); if (type != REDISMODULE_KEYTYPE_EMPTY && RedisModule_ModuleTypeGetType(key) != fsltype) { RedisModule_CloseKey(key); if (reply_on_failure) RedisModule_ReplyWithError(ctx, REDISMODULE_ERRORMSG_WRONGTYPE); return 0; } /* Create an empty value object if the key is currently empty. */ if (type == REDISMODULE_KEYTYPE_EMPTY) { if (!create) { /* Key is empty but we cannot create */ RedisModule_CloseKey(key); *fsl = NULL; return 1; } *fsl = fsl_type_create(); RedisModule_ModuleTypeSetValue(key, fsltype, *fsl); } else { *fsl = RedisModule_ModuleTypeGetValue(key); } RedisModule_CloseKey(key); return 1; } /* ========================== commands ======================= */ /* FSL.PUSH - Push an integer to the fixed-size list (to the right). * It must be greater than the element in the head of the list. */ int fsl_push(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { if (argc != 3) return RedisModule_WrongArity(ctx); long long ele; if (RedisModule_StringToLongLong(argv[2],&ele) != REDISMODULE_OK) return RedisModule_ReplyWithError(ctx,"ERR invalid integer"); fsl_t *fsl; if (!get_fsl(ctx, argv[1], REDISMODULE_WRITE, 1, &fsl, 1)) return REDISMODULE_OK; if (fsl->length == LIST_SIZE) return RedisModule_ReplyWithError(ctx,"ERR list is full"); if (fsl->length != 0 && fsl->list[fsl->length-1] >= ele) return RedisModule_ReplyWithError(ctx,"ERR new element has to be greater than the head element"); fsl->list[fsl->length++] = ele; if (fsl->length >= 2) RedisModule_SignalKeyAsReady(ctx, argv[1]); return RedisModule_ReplyWithSimpleString(ctx, "OK"); } int bpop2_reply_callback(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { REDISMODULE_NOT_USED(argv); REDISMODULE_NOT_USED(argc); RedisModuleString *keyname = RedisModule_GetBlockedClientReadyKey(ctx); fsl_t *fsl; if (!get_fsl(ctx, keyname, REDISMODULE_READ, 0, &fsl, 0)) return REDISMODULE_ERR; if (!fsl || fsl->length < 2) return REDISMODULE_ERR; RedisModule_ReplyWithArray(ctx, 2); RedisModule_ReplyWithLongLong(ctx, fsl->list[--fsl->length]); RedisModule_ReplyWithLongLong(ctx, fsl->list[--fsl->length]); return REDISMODULE_OK; } int bpop2_timeout_callback(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { REDISMODULE_NOT_USED(argv); REDISMODULE_NOT_USED(argc); return RedisModule_ReplyWithSimpleString(ctx, "Request timedout"); } /* FSL.BPOP2 - Block clients until list has two or more elements. * When that happens, unblock client and pop the last two elements (from the right). */ int fsl_bpop2(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { if (argc != 3) return RedisModule_WrongArity(ctx); long long timeout; if (RedisModule_StringToLongLong(argv[2],&timeout) != REDISMODULE_OK || timeout < 0) return RedisModule_ReplyWithError(ctx,"ERR invalid timeout"); fsl_t *fsl; if (!get_fsl(ctx, argv[1], REDISMODULE_READ, 0, &fsl, 1)) return REDISMODULE_OK; if (!fsl || fsl->length < 2) { /* Key is empty or has <2 elements, we must block */ RedisModule_BlockClientOnKeys(ctx, bpop2_reply_callback, bpop2_timeout_callback, NULL, timeout, &argv[1], 1, NULL); } else { RedisModule_ReplyWithArray(ctx, 2); RedisModule_ReplyWithLongLong(ctx, fsl->list[--fsl->length]); RedisModule_ReplyWithLongLong(ctx, fsl->list[--fsl->length]); } return REDISMODULE_OK; } int bpopgt_reply_callback(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { REDISMODULE_NOT_USED(argv); REDISMODULE_NOT_USED(argc); RedisModuleString *keyname = RedisModule_GetBlockedClientReadyKey(ctx); long long gt = (long long)RedisModule_GetBlockedClientPrivateData(ctx); fsl_t *fsl; if (!get_fsl(ctx, keyname, REDISMODULE_READ, 0, &fsl, 0)) return REDISMODULE_ERR; if (!fsl || fsl->list[fsl->length-1] <= gt) return REDISMODULE_ERR; RedisModule_ReplyWithLongLong(ctx, fsl->list[--fsl->length]); return REDISMODULE_OK; } int bpopgt_timeout_callback(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { REDISMODULE_NOT_USED(argv); REDISMODULE_NOT_USED(argc); return RedisModule_ReplyWithSimpleString(ctx, "Request timedout"); } void bpopgt_free_privdata(RedisModuleCtx *ctx, void *privdata) { /* Nothing to do because privdata is actually a 'long long', * not a pointer to the heap */ REDISMODULE_NOT_USED(ctx); REDISMODULE_NOT_USED(privdata); } /* FSL.BPOPGT - Block clients until list has an element greater than . * When that happens, unblock client and pop the last element (from the right). */ int fsl_bpopgt(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { if (argc != 4) return RedisModule_WrongArity(ctx); long long gt; if (RedisModule_StringToLongLong(argv[2],>) != REDISMODULE_OK) return RedisModule_ReplyWithError(ctx,"ERR invalid integer"); long long timeout; if (RedisModule_StringToLongLong(argv[3],&timeout) != REDISMODULE_OK || timeout < 0) return RedisModule_ReplyWithError(ctx,"ERR invalid timeout"); fsl_t *fsl; if (!get_fsl(ctx, argv[1], REDISMODULE_READ, 0, &fsl, 1)) return REDISMODULE_OK; if (!fsl || fsl->list[fsl->length-1] <= gt) { /* Key is empty or has <2 elements, we must block */ RedisModule_BlockClientOnKeys(ctx, bpopgt_reply_callback, bpopgt_timeout_callback, bpopgt_free_privdata, timeout, &argv[1], 1, (void*)gt); } else { RedisModule_ReplyWithLongLong(ctx, fsl->list[--fsl->length]); } return REDISMODULE_OK; } int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { REDISMODULE_NOT_USED(argv); REDISMODULE_NOT_USED(argc); if (RedisModule_Init(ctx, "blockonkeys", 1, REDISMODULE_APIVER_1)== REDISMODULE_ERR) return REDISMODULE_ERR; RedisModuleTypeMethods tm = { .version = REDISMODULE_TYPE_METHOD_VERSION, .rdb_load = fsl_rdb_load, .rdb_save = fsl_rdb_save, .aof_rewrite = fsl_aofrw, .mem_usage = NULL, .free = fsl_free, .digest = NULL }; fsltype = RedisModule_CreateDataType(ctx, "fsltype_t", 0, &tm); if (fsltype == NULL) return REDISMODULE_ERR; if (RedisModule_CreateCommand(ctx,"fsl.push",fsl_push,"",0,0,0) == REDISMODULE_ERR) return REDISMODULE_ERR; if (RedisModule_CreateCommand(ctx,"fsl.bpop2",fsl_bpop2,"",0,0,0) == REDISMODULE_ERR) return REDISMODULE_ERR; if (RedisModule_CreateCommand(ctx,"fsl.bpopgt",fsl_bpopgt,"",0,0,0) == REDISMODULE_ERR) return REDISMODULE_ERR; return REDISMODULE_OK; }