// Copyright (c) 2020, Meir Shpilraien // SPDX-FileCopyrightText: 2024 Redict Contributors // SPDX-FileCopyrightText: 2024 Salvatore Sanfilippo // // SPDX-License-Identifier: BSD-3-Clause // SPDX-License-Identifier: LGPL-3.0-only /* This module allow to verify 'RedictModule_AddPostNotificationJob' by registering to 3 * key space event: * * STRINGS - the module register to all strings notifications and set post notification job * that increase a counter indicating how many times the string key was changed. * In addition, it increase another counter that counts the total changes that * was made on all strings keys. * * EXPIRED - the module register to expired event and set post notification job that that * counts the total number of expired events. * * EVICTED - the module register to evicted event and set post notification job that that * counts the total number of evicted events. * * In addition, the module register a new command, 'postnotification.async_set', that performs a set * command from a background thread. This allows to check the 'RedictModule_AddPostNotificationJob' on * notifications that was triggered on a background thread. */ #define _BSD_SOURCE #define _DEFAULT_SOURCE /* For usleep */ #include "redictmodule.h" #include #include #include #include static void KeySpace_PostNotificationStringFreePD(void *pd) { RedictModule_FreeString(NULL, pd); } static void KeySpace_PostNotificationReadKey(RedictModuleCtx *ctx, void *pd) { RedictModuleCallReply* rep = RedictModule_Call(ctx, "get", "!s", pd); RedictModule_FreeCallReply(rep); } static void KeySpace_PostNotificationString(RedictModuleCtx *ctx, void *pd) { REDICTMODULE_NOT_USED(ctx); RedictModuleCallReply* rep = RedictModule_Call(ctx, "incr", "!s", pd); RedictModule_FreeCallReply(rep); } static int KeySpace_NotificationExpired(RedictModuleCtx *ctx, int type, const char *event, RedictModuleString *key){ REDICTMODULE_NOT_USED(type); REDICTMODULE_NOT_USED(event); REDICTMODULE_NOT_USED(key); RedictModuleString *new_key = RedictModule_CreateString(NULL, "expired", 7); int res = RedictModule_AddPostNotificationJob(ctx, KeySpace_PostNotificationString, new_key, KeySpace_PostNotificationStringFreePD); if (res == REDICTMODULE_ERR) KeySpace_PostNotificationStringFreePD(new_key); return REDICTMODULE_OK; } static int KeySpace_NotificationEvicted(RedictModuleCtx *ctx, int type, const char *event, RedictModuleString *key){ REDICTMODULE_NOT_USED(type); REDICTMODULE_NOT_USED(event); REDICTMODULE_NOT_USED(key); const char *key_str = RedictModule_StringPtrLen(key, NULL); if (strncmp(key_str, "evicted", 7) == 0) { return REDICTMODULE_OK; /* do not count the evicted key */ } if (strncmp(key_str, "before_evicted", 14) == 0) { return REDICTMODULE_OK; /* do not count the before_evicted key */ } RedictModuleString *new_key = RedictModule_CreateString(NULL, "evicted", 7); int res = RedictModule_AddPostNotificationJob(ctx, KeySpace_PostNotificationString, new_key, KeySpace_PostNotificationStringFreePD); if (res == REDICTMODULE_ERR) KeySpace_PostNotificationStringFreePD(new_key); return REDICTMODULE_OK; } static int KeySpace_NotificationString(RedictModuleCtx *ctx, int type, const char *event, RedictModuleString *key){ REDICTMODULE_NOT_USED(ctx); REDICTMODULE_NOT_USED(type); REDICTMODULE_NOT_USED(event); const char *key_str = RedictModule_StringPtrLen(key, NULL); if (strncmp(key_str, "string_", 7) != 0) { return REDICTMODULE_OK; } if (strcmp(key_str, "string_total") == 0) { return REDICTMODULE_OK; } RedictModuleString *new_key; if (strncmp(key_str, "string_changed{", 15) == 0) { new_key = RedictModule_CreateString(NULL, "string_total", 12); } else { new_key = RedictModule_CreateStringPrintf(NULL, "string_changed{%s}", key_str); } int res = RedictModule_AddPostNotificationJob(ctx, KeySpace_PostNotificationString, new_key, KeySpace_PostNotificationStringFreePD); if (res == REDICTMODULE_ERR) KeySpace_PostNotificationStringFreePD(new_key); return REDICTMODULE_OK; } static int KeySpace_LazyExpireInsidePostNotificationJob(RedictModuleCtx *ctx, int type, const char *event, RedictModuleString *key){ REDICTMODULE_NOT_USED(ctx); REDICTMODULE_NOT_USED(type); REDICTMODULE_NOT_USED(event); const char *key_str = RedictModule_StringPtrLen(key, NULL); if (strncmp(key_str, "read_", 5) != 0) { return REDICTMODULE_OK; } RedictModuleString *new_key = RedictModule_CreateString(NULL, key_str + 5, strlen(key_str) - 5);; int res = RedictModule_AddPostNotificationJob(ctx, KeySpace_PostNotificationReadKey, new_key, KeySpace_PostNotificationStringFreePD); if (res == REDICTMODULE_ERR) KeySpace_PostNotificationStringFreePD(new_key); return REDICTMODULE_OK; } static int KeySpace_NestedNotification(RedictModuleCtx *ctx, int type, const char *event, RedictModuleString *key){ REDICTMODULE_NOT_USED(ctx); REDICTMODULE_NOT_USED(type); REDICTMODULE_NOT_USED(event); const char *key_str = RedictModule_StringPtrLen(key, NULL); if (strncmp(key_str, "write_sync_", 11) != 0) { return REDICTMODULE_OK; } /* This test was only meant to check REDICTMODULE_OPTIONS_ALLOW_NESTED_KEYSPACE_NOTIFICATIONS. * In general it is wrong and discourage to perform any writes inside a notification callback. */ RedictModuleString *new_key = RedictModule_CreateString(NULL, key_str + 11, strlen(key_str) - 11);; RedictModuleCallReply* rep = RedictModule_Call(ctx, "set", "!sc", new_key, "1"); RedictModule_FreeCallReply(rep); RedictModule_FreeString(NULL, new_key); return REDICTMODULE_OK; } static void *KeySpace_PostNotificationsAsyncSetInner(void *arg) { RedictModuleBlockedClient *bc = arg; RedictModuleCtx *ctx = RedictModule_GetThreadSafeContext(bc); RedictModule_ThreadSafeContextLock(ctx); RedictModuleCallReply* rep = RedictModule_Call(ctx, "set", "!cc", "string_x", "1"); RedictModule_ThreadSafeContextUnlock(ctx); RedictModule_ReplyWithCallReply(ctx, rep); RedictModule_FreeCallReply(rep); RedictModule_UnblockClient(bc, NULL); RedictModule_FreeThreadSafeContext(ctx); return NULL; } static int KeySpace_PostNotificationsAsyncSet(RedictModuleCtx *ctx, RedictModuleString **argv, int argc) { REDICTMODULE_NOT_USED(argv); if (argc != 1) return RedictModule_WrongArity(ctx); pthread_t tid; RedictModuleBlockedClient *bc = RedictModule_BlockClient(ctx,NULL,NULL,NULL,0); if (pthread_create(&tid,NULL,KeySpace_PostNotificationsAsyncSetInner,bc) != 0) { RedictModule_AbortBlock(bc); return RedictModule_ReplyWithError(ctx,"-ERR Can't start thread"); } return REDICTMODULE_OK; } typedef struct KeySpace_EventPostNotificationCtx { RedictModuleString *triggered_on; RedictModuleString *new_key; } KeySpace_EventPostNotificationCtx; static void KeySpace_ServerEventPostNotificationFree(void *pd) { KeySpace_EventPostNotificationCtx *pn_ctx = pd; RedictModule_FreeString(NULL, pn_ctx->new_key); RedictModule_FreeString(NULL, pn_ctx->triggered_on); RedictModule_Free(pn_ctx); } static void KeySpace_ServerEventPostNotification(RedictModuleCtx *ctx, void *pd) { REDICTMODULE_NOT_USED(ctx); KeySpace_EventPostNotificationCtx *pn_ctx = pd; RedictModuleCallReply* rep = RedictModule_Call(ctx, "lpush", "!ss", pn_ctx->new_key, pn_ctx->triggered_on); RedictModule_FreeCallReply(rep); } static void KeySpace_ServerEventCallback(RedictModuleCtx *ctx, RedictModuleEvent eid, uint64_t subevent, void *data) { REDICTMODULE_NOT_USED(eid); REDICTMODULE_NOT_USED(data); if (subevent > 3) { RedictModule_Log(ctx, "warning", "Got an unexpected subevent '%llu'", (unsigned long long)subevent); return; } static const char* events[] = { "before_deleted", "before_expired", "before_evicted", "before_overwritten", }; const RedictModuleString *key_name = RedictModule_GetKeyNameFromModuleKey(((RedictModuleKeyInfo*)data)->key); const char *key_str = RedictModule_StringPtrLen(key_name, NULL); for (int i = 0 ; i < 4 ; ++i) { const char *event = events[i]; if (strncmp(key_str, event , strlen(event)) == 0) { return; /* don't log any event on our tracking keys */ } } KeySpace_EventPostNotificationCtx *pn_ctx = RedictModule_Alloc(sizeof(*pn_ctx)); pn_ctx->triggered_on = RedictModule_HoldString(NULL, (RedictModuleString*)key_name); pn_ctx->new_key = RedictModule_CreateString(NULL, events[subevent], strlen(events[subevent])); int res = RedictModule_AddPostNotificationJob(ctx, KeySpace_ServerEventPostNotification, pn_ctx, KeySpace_ServerEventPostNotificationFree); if (res == REDICTMODULE_ERR) KeySpace_ServerEventPostNotificationFree(pn_ctx); } /* This function must be present on each Redict module. It is used in order to * register the commands into the Redict server. */ int RedictModule_OnLoad(RedictModuleCtx *ctx, RedictModuleString **argv, int argc) { REDICTMODULE_NOT_USED(argv); REDICTMODULE_NOT_USED(argc); if (RedictModule_Init(ctx,"postnotifications",1,REDICTMODULE_APIVER_1) == REDICTMODULE_ERR){ return REDICTMODULE_ERR; } if (!(RedictModule_GetModuleOptionsAll() & REDICTMODULE_OPTIONS_ALLOW_NESTED_KEYSPACE_NOTIFICATIONS)) { return REDICTMODULE_ERR; } int with_key_events = 0; if (argc >= 1) { const char *arg = RedictModule_StringPtrLen(argv[0], 0); if (strcmp(arg, "with_key_events") == 0) { with_key_events = 1; } } RedictModule_SetModuleOptions(ctx, REDICTMODULE_OPTIONS_ALLOW_NESTED_KEYSPACE_NOTIFICATIONS); if(RedictModule_SubscribeToKeyspaceEvents(ctx, REDICTMODULE_NOTIFY_STRING, KeySpace_NotificationString) != REDICTMODULE_OK){ return REDICTMODULE_ERR; } if(RedictModule_SubscribeToKeyspaceEvents(ctx, REDICTMODULE_NOTIFY_STRING, KeySpace_LazyExpireInsidePostNotificationJob) != REDICTMODULE_OK){ return REDICTMODULE_ERR; } if(RedictModule_SubscribeToKeyspaceEvents(ctx, REDICTMODULE_NOTIFY_STRING, KeySpace_NestedNotification) != REDICTMODULE_OK){ return REDICTMODULE_ERR; } if(RedictModule_SubscribeToKeyspaceEvents(ctx, REDICTMODULE_NOTIFY_EXPIRED, KeySpace_NotificationExpired) != REDICTMODULE_OK){ return REDICTMODULE_ERR; } if(RedictModule_SubscribeToKeyspaceEvents(ctx, REDICTMODULE_NOTIFY_EVICTED, KeySpace_NotificationEvicted) != REDICTMODULE_OK){ return REDICTMODULE_ERR; } if (with_key_events) { if(RedictModule_SubscribeToServerEvent(ctx, RedictModuleEvent_Key, KeySpace_ServerEventCallback) != REDICTMODULE_OK){ return REDICTMODULE_ERR; } } if (RedictModule_CreateCommand(ctx, "postnotification.async_set", KeySpace_PostNotificationsAsyncSet, "write", 0, 0, 0) == REDICTMODULE_ERR){ return REDICTMODULE_ERR; } return REDICTMODULE_OK; } int RedictModule_OnUnload(RedictModuleCtx *ctx) { REDICTMODULE_NOT_USED(ctx); return REDICTMODULE_OK; }