mirror of
https://codeberg.org/redict/redict.git
synced 2025-01-22 08:08:53 -05:00
all: replace hiredis with hiredict
Signed-off-by: Drew DeVault <sir@cmpwn.com>
This commit is contained in:
parent
8e71e44163
commit
a589ae5f76
@ -90,14 +90,18 @@ Files: deps/jemalloc/*
|
||||
Copyright: 2009-present Facebook, Inc
|
||||
License: BSD-2-Clause
|
||||
|
||||
Files: deps/hiredis/*
|
||||
Files: deps/hiredict/*
|
||||
Copyright: 2009-2011, Salvatore Sanfilippo <antirez at gmail dot com>
|
||||
License: BSD-3-Clause
|
||||
|
||||
Files: deps/hiredis/*
|
||||
Files: deps/hiredict/*
|
||||
Copyright: 2010-2011, Pieter Noordhuis <pcnoordhuis at gmail dot com>
|
||||
License: BSD-3-Clause
|
||||
|
||||
Files: deps/hiredict/*
|
||||
Copyright: 2024 Redict Contributors
|
||||
License: LGPL-3.0-only
|
||||
|
||||
Files: deps/hdr_histogram/*
|
||||
Copyright: 2012, 2013, 2014 Gil Tene
|
||||
License: BSD-2-Clause
|
||||
|
12
deps/Makefile
vendored
12
deps/Makefile
vendored
@ -36,7 +36,7 @@ ifneq ($(shell sh -c '[ -f .make-ldflags ] && cat .make-ldflags || echo none'),
|
||||
endif
|
||||
|
||||
distclean:
|
||||
-(cd hiredis && $(MAKE) clean) > /dev/null || true
|
||||
-(cd hiredict && $(MAKE) clean) > /dev/null || true
|
||||
-(cd linenoise && $(MAKE) clean) > /dev/null || true
|
||||
-(cd lua && $(MAKE) clean) > /dev/null || true
|
||||
-(cd jemalloc && [ -f Makefile ] && $(MAKE) distclean) > /dev/null || true
|
||||
@ -47,14 +47,14 @@ distclean:
|
||||
.PHONY: distclean
|
||||
|
||||
ifneq (,$(filter $(BUILD_TLS),yes module))
|
||||
HIREDIS_MAKE_FLAGS = USE_SSL=1
|
||||
HIREDICT_MAKE_FLAGS = USE_SSL=1
|
||||
endif
|
||||
|
||||
hiredis: .make-prerequisites
|
||||
hiredict: .make-prerequisites
|
||||
@printf '%b %b\n' $(MAKECOLOR)MAKE$(ENDCOLOR) $(BINCOLOR)$@$(ENDCOLOR)
|
||||
cd hiredis && $(MAKE) static $(HIREDIS_MAKE_FLAGS)
|
||||
cd hiredict && $(MAKE) static $(HIREDICT_MAKE_FLAGS)
|
||||
|
||||
.PHONY: hiredis
|
||||
.PHONY: hiredict
|
||||
|
||||
linenoise: .make-prerequisites
|
||||
@printf '%b %b\n' $(MAKECOLOR)MAKE$(ENDCOLOR) $(BINCOLOR)$@$(ENDCOLOR)
|
||||
@ -84,7 +84,7 @@ LUA_LDFLAGS+= $(LDFLAGS)
|
||||
ifeq ($(LUA_DEBUG),yes)
|
||||
LUA_CFLAGS+= -O0 -g -DLUA_USE_APICHECK
|
||||
else
|
||||
LUA_CFLAGS+= -O2
|
||||
LUA_CFLAGS+= -O2
|
||||
endif
|
||||
ifeq ($(LUA_COVERAGE),yes)
|
||||
LUA_CFLAGS += -fprofile-arcs -ftest-coverage
|
||||
|
14
src/Makefile
14
src/Makefile
@ -24,7 +24,7 @@ endif
|
||||
ifneq ($(OPTIMIZATION),-O0)
|
||||
OPTIMIZATION+=-fno-omit-frame-pointer
|
||||
endif
|
||||
DEPENDENCY_TARGETS=hiredis linenoise lua hdr_histogram fpconv
|
||||
DEPENDENCY_TARGETS=hiredict linenoise lua hdr_histogram fpconv
|
||||
NODEPS:=clean distclean
|
||||
|
||||
# Default settings
|
||||
@ -225,7 +225,7 @@ ifdef OPENSSL_PREFIX
|
||||
endif
|
||||
|
||||
# Include paths to dependencies
|
||||
FINAL_CFLAGS+= -I../deps/hiredis -I../deps/linenoise -I../deps/lua/src -I../deps/hdr_histogram -I../deps/fpconv
|
||||
FINAL_CFLAGS+= -I../deps/hiredict -I../deps/linenoise -I../deps/lua/src -I../deps/hdr_histogram -I../deps/fpconv
|
||||
|
||||
# Determine systemd support and/or build preference (defaulting to auto-detection)
|
||||
BUILD_WITH_SYSTEMD=no
|
||||
@ -292,7 +292,7 @@ BUILD_MODULE:=2
|
||||
ifeq ($(BUILD_TLS),yes)
|
||||
FINAL_CFLAGS+=-DUSE_OPENSSL=$(BUILD_YES) $(OPENSSL_CFLAGS) -DBUILD_TLS_MODULE=$(BUILD_NO)
|
||||
FINAL_LDFLAGS+=$(OPENSSL_LDFLAGS)
|
||||
FINAL_LIBS += ../deps/hiredis/libhiredis_ssl.a $(LIBSSL_LIBS) $(LIBCRYPTO_LIBS)
|
||||
FINAL_LIBS += ../deps/hiredict/libhiredict_ssl.a $(LIBSSL_LIBS) $(LIBCRYPTO_LIBS)
|
||||
endif
|
||||
|
||||
TLS_MODULE=
|
||||
@ -300,7 +300,7 @@ TLS_MODULE_NAME:=redict-tls$(PROG_SUFFIX).so
|
||||
TLS_MODULE_CFLAGS:=$(FINAL_CFLAGS)
|
||||
ifeq ($(BUILD_TLS),module)
|
||||
FINAL_CFLAGS+=-DUSE_OPENSSL=$(BUILD_MODULE) $(OPENSSL_CFLAGS)
|
||||
TLS_CLIENT_LIBS = ../deps/hiredis/libhiredis_ssl.a $(LIBSSL_LIBS) $(LIBCRYPTO_LIBS)
|
||||
TLS_CLIENT_LIBS = ../deps/hiredict/libhiredict_ssl.a $(LIBSSL_LIBS) $(LIBCRYPTO_LIBS)
|
||||
TLS_MODULE=$(TLS_MODULE_NAME)
|
||||
TLS_MODULE_CFLAGS+=-DUSE_OPENSSL=$(BUILD_MODULE) $(OPENSSL_CFLAGS) -DBUILD_TLS_MODULE=$(BUILD_MODULE)
|
||||
endif
|
||||
@ -399,7 +399,7 @@ endif
|
||||
|
||||
# redict-server
|
||||
$(REDICT_SERVER_NAME): $(REDICT_SERVER_OBJ)
|
||||
$(REDICT_LD) -o $@ $^ ../deps/hiredis/libhiredis.a ../deps/lua/src/liblua.a ../deps/hdr_histogram/libhdrhistogram.a ../deps/fpconv/libfpconv.a $(FINAL_LIBS)
|
||||
$(REDICT_LD) -o $@ $^ ../deps/hiredict/libhiredict.a ../deps/lua/src/liblua.a ../deps/hdr_histogram/libhdrhistogram.a ../deps/fpconv/libfpconv.a $(FINAL_LIBS)
|
||||
|
||||
# redict-sentinel
|
||||
$(REDICT_SENTINEL_NAME): $(REDICT_SERVER_NAME)
|
||||
@ -419,11 +419,11 @@ $(TLS_MODULE_NAME): $(REDICT_SERVER_NAME)
|
||||
|
||||
# redict-cli
|
||||
$(REDICT_CLI_NAME): $(REDICT_CLI_OBJ)
|
||||
$(REDICT_LD) -o $@ $^ ../deps/hiredis/libhiredis.a ../deps/linenoise/linenoise.o $(FINAL_LIBS) $(TLS_CLIENT_LIBS)
|
||||
$(REDICT_LD) -o $@ $^ ../deps/hiredict/libhiredict.a ../deps/linenoise/linenoise.o $(FINAL_LIBS) $(TLS_CLIENT_LIBS)
|
||||
|
||||
# redict-benchmark
|
||||
$(REDICT_BENCHMARK_NAME): $(REDICT_BENCHMARK_OBJ)
|
||||
$(REDICT_LD) -o $@ $^ ../deps/hiredis/libhiredis.a ../deps/hdr_histogram/libhdrhistogram.a $(FINAL_LIBS) $(TLS_CLIENT_LIBS)
|
||||
$(REDICT_LD) -o $@ $^ ../deps/hiredict/libhiredict.a ../deps/hdr_histogram/libhdrhistogram.a $(FINAL_LIBS) $(TLS_CLIENT_LIBS)
|
||||
|
||||
DEP = $(REDICT_SERVER_OBJ:%.o=%.d) $(REDICT_CLI_OBJ:%.o=%.d) $(REDICT_BENCHMARK_OBJ:%.o=%.d)
|
||||
-include $(DEP)
|
||||
|
@ -17,7 +17,7 @@
|
||||
/* Syntax specifications for a command argument. */
|
||||
typedef struct cliCommandArg {
|
||||
char *name;
|
||||
redisCommandArgType type;
|
||||
redictCommandArgType type;
|
||||
char *token;
|
||||
char *since;
|
||||
int flags;
|
||||
|
@ -13,16 +13,15 @@
|
||||
#include <stdlib.h>
|
||||
#include <fcntl.h>
|
||||
#include <errno.h>
|
||||
#include <hiredis.h>
|
||||
#include <sdscompat.h> /* Use hiredis' sds compat header that maps sds calls to their hi_ variants */
|
||||
#include <sds.h> /* use sds.h from hiredis, so that only one set of sds functions will be present in the binary */
|
||||
#include <hiredict.h>
|
||||
#include <sds.h>
|
||||
#include <unistd.h>
|
||||
#include <string.h>
|
||||
#include <ctype.h>
|
||||
#ifdef USE_OPENSSL
|
||||
#include <openssl/ssl.h>
|
||||
#include <openssl/err.h>
|
||||
#include <hiredis_ssl.h>
|
||||
#include <hiredict_ssl.h>
|
||||
#endif
|
||||
|
||||
#define UNUSED(V) ((void) V)
|
||||
@ -30,10 +29,10 @@
|
||||
char *redictGitSHA1(void);
|
||||
char *redictGitDirty(void);
|
||||
|
||||
/* Wrapper around redisSecureConnection to avoid hiredis_ssl dependencies if
|
||||
/* Wrapper around redictSecureConnection to avoid hiredict_ssl dependencies if
|
||||
* not building with TLS support.
|
||||
*/
|
||||
int cliSecureConnection(redisContext *c, cliSSLconfig config, const char **err) {
|
||||
int cliSecureConnection(redictContext *c, cliSSLconfig config, const char **err) {
|
||||
#ifdef USE_OPENSSL
|
||||
static SSL_CTX *ssl_ctx = NULL;
|
||||
|
||||
@ -82,32 +81,32 @@ int cliSecureConnection(redisContext *c, cliSSLconfig config, const char **err)
|
||||
SSL *ssl = SSL_new(ssl_ctx);
|
||||
if (!ssl) {
|
||||
*err = "Failed to create SSL object";
|
||||
return REDIS_ERR;
|
||||
return REDICT_ERR;
|
||||
}
|
||||
|
||||
if (config.sni && !SSL_set_tlsext_host_name(ssl, config.sni)) {
|
||||
*err = "Failed to configure SNI";
|
||||
SSL_free(ssl);
|
||||
return REDIS_ERR;
|
||||
return REDICT_ERR;
|
||||
}
|
||||
|
||||
return redisInitiateSSL(c, ssl);
|
||||
return redictInitiateSSL(c, ssl);
|
||||
|
||||
error:
|
||||
SSL_CTX_free(ssl_ctx);
|
||||
ssl_ctx = NULL;
|
||||
return REDIS_ERR;
|
||||
return REDICT_ERR;
|
||||
#else
|
||||
(void) config;
|
||||
(void) c;
|
||||
(void) err;
|
||||
return REDIS_OK;
|
||||
return REDICT_OK;
|
||||
#endif
|
||||
}
|
||||
|
||||
/* Wrapper around hiredis to allow arbitrary reads and writes.
|
||||
/* Wrapper around hiredict to allow arbitrary reads and writes.
|
||||
*
|
||||
* We piggybacks on top of hiredis to achieve transparent TLS support,
|
||||
* We piggybacks on top of hiredict to achieve transparent TLS support,
|
||||
* and use its internal buffers so it can co-exist with commands
|
||||
* previously/later issued on the connection.
|
||||
*
|
||||
@ -115,11 +114,11 @@ error:
|
||||
* work transparently.
|
||||
*/
|
||||
|
||||
/* Write a raw buffer through a redisContext. If we already have something
|
||||
* in the buffer (leftovers from hiredis operations) it will be written
|
||||
/* Write a raw buffer through a redictContext. If we already have something
|
||||
* in the buffer (leftovers from hiredict operations) it will be written
|
||||
* as well.
|
||||
*/
|
||||
ssize_t cliWriteConn(redisContext *c, const char *buf, size_t buf_len)
|
||||
ssize_t cliWriteConn(redictContext *c, const char *buf, size_t buf_len)
|
||||
{
|
||||
int done = 0;
|
||||
|
||||
@ -127,8 +126,8 @@ ssize_t cliWriteConn(redisContext *c, const char *buf, size_t buf_len)
|
||||
* but we don't assume that, and write.
|
||||
*/
|
||||
c->obuf = sdscatlen(c->obuf, buf, buf_len);
|
||||
if (redisBufferWrite(c, &done) == REDIS_ERR) {
|
||||
if (!(c->flags & REDIS_BLOCK))
|
||||
if (redictBufferWrite(c, &done) == REDICT_ERR) {
|
||||
if (!(c->flags & REDICT_BLOCK))
|
||||
errno = EAGAIN;
|
||||
|
||||
/* On error, we assume nothing was written and we roll back the
|
||||
@ -180,7 +179,7 @@ int cliSecureInit(void)
|
||||
SSL_load_error_strings();
|
||||
SSL_library_init();
|
||||
#endif
|
||||
return REDIS_OK;
|
||||
return REDICT_OK;
|
||||
}
|
||||
|
||||
/* Create an sds from stdin */
|
||||
@ -288,7 +287,7 @@ static sds percentDecode(const char *pe, size_t len) {
|
||||
* path: ["/" [<db>]]
|
||||
*
|
||||
* [1]: https://www.iana.org/assignments/uri-schemes/prov/redis */
|
||||
void parseRedisUri(const char *uri, const char* tool_name, cliConnInfo *connInfo, int *tls_flag) {
|
||||
void parseRedictUri(const char *uri, const char* tool_name, cliConnInfo *connInfo, int *tls_flag) {
|
||||
#ifdef USE_OPENSSL
|
||||
UNUSED(tool_name);
|
||||
#else
|
||||
@ -414,20 +413,20 @@ sds cliVersion(void) {
|
||||
return version;
|
||||
}
|
||||
|
||||
/* This is a wrapper to call redisConnect or redisConnectWithTimeout. */
|
||||
redisContext *redisConnectWrapper(const char *ip, int port, const struct timeval tv) {
|
||||
/* This is a wrapper to call redictConnect or redictConnectWithTimeout. */
|
||||
redictContext *redictConnectWrapper(const char *ip, int port, const struct timeval tv) {
|
||||
if (tv.tv_sec == 0 && tv.tv_usec == 0) {
|
||||
return redisConnect(ip, port);
|
||||
return redictConnect(ip, port);
|
||||
} else {
|
||||
return redisConnectWithTimeout(ip, port, tv);
|
||||
return redictConnectWithTimeout(ip, port, tv);
|
||||
}
|
||||
}
|
||||
|
||||
/* This is a wrapper to call redisConnectUnix or redisConnectUnixWithTimeout. */
|
||||
redisContext *redisConnectUnixWrapper(const char *path, const struct timeval tv) {
|
||||
/* This is a wrapper to call redictConnectUnix or redictConnectUnixWithTimeout. */
|
||||
redictContext *redictConnectUnixWrapper(const char *path, const struct timeval tv) {
|
||||
if (tv.tv_sec == 0 && tv.tv_usec == 0) {
|
||||
return redisConnectUnix(path);
|
||||
return redictConnectUnix(path);
|
||||
} else {
|
||||
return redisConnectUnixWithTimeout(path, tv);
|
||||
return redictConnectUnixWithTimeout(path, tv);
|
||||
}
|
||||
}
|
||||
|
@ -7,8 +7,8 @@
|
||||
#ifndef __CLICOMMON_H
|
||||
#define __CLICOMMON_H
|
||||
|
||||
#include <hiredis.h>
|
||||
#include <sdscompat.h> /* Use hiredis' sds compat header that maps sds calls to their hi_ variants */
|
||||
#include <hiredict.h>
|
||||
#include <sds.h>
|
||||
|
||||
typedef struct cliSSLconfig {
|
||||
/* Requested SNI, or NULL */
|
||||
@ -39,9 +39,9 @@ typedef struct cliConnInfo {
|
||||
char *user;
|
||||
} cliConnInfo;
|
||||
|
||||
int cliSecureConnection(redisContext *c, cliSSLconfig config, const char **err);
|
||||
int cliSecureConnection(redictContext *c, cliSSLconfig config, const char **err);
|
||||
|
||||
ssize_t cliWriteConn(redisContext *c, const char *buf, size_t buf_len);
|
||||
ssize_t cliWriteConn(redictContext *c, const char *buf, size_t buf_len);
|
||||
|
||||
int cliSecureInit(void);
|
||||
|
||||
@ -51,7 +51,7 @@ sds *getSdsArrayFromArgv(int argc,char **argv, int quoted);
|
||||
|
||||
sds unquoteCString(char *str);
|
||||
|
||||
void parseRedisUri(const char *uri, const char* tool_name, cliConnInfo *connInfo, int *tls_flag);
|
||||
void parseRedictUri(const char *uri, const char* tool_name, cliConnInfo *connInfo, int *tls_flag);
|
||||
|
||||
void freeCliConnInfo(cliConnInfo connInfo);
|
||||
|
||||
@ -59,7 +59,7 @@ sds escapeJsonString(sds s, const char *p, size_t len);
|
||||
|
||||
sds cliVersion(void);
|
||||
|
||||
redisContext *redisConnectWrapper(const char *ip, int port, const struct timeval tv);
|
||||
redisContext *redisConnectUnixWrapper(const char *path, const struct timeval tv);
|
||||
redictContext *redictConnectWrapper(const char *ip, int port, const struct timeval tv);
|
||||
redictContext *redictConnectUnixWrapper(const char *path, const struct timeval tv);
|
||||
|
||||
#endif /* __CLICOMMON_H */
|
||||
|
@ -18,7 +18,7 @@ typedef enum {
|
||||
ARG_TYPE_PURE_TOKEN,
|
||||
ARG_TYPE_ONEOF, /* Has subargs */
|
||||
ARG_TYPE_BLOCK /* Has subargs */
|
||||
} redisCommandArgType;
|
||||
} redictCommandArgType;
|
||||
|
||||
#define CMD_ARG_NONE (0)
|
||||
#define CMD_ARG_OPTIONAL (1<<0)
|
||||
@ -28,7 +28,7 @@ typedef enum {
|
||||
/* Must be compatible with RedictModuleCommandArg. See moduleCopyCommandArgs. */
|
||||
typedef struct redictCommandArg {
|
||||
const char *name;
|
||||
redisCommandArgType type;
|
||||
redictCommandArgType type;
|
||||
int key_spec_index;
|
||||
const char *token;
|
||||
const char *summary;
|
||||
@ -38,7 +38,7 @@ typedef struct redictCommandArg {
|
||||
int num_args;
|
||||
struct redictCommandArg *subargs;
|
||||
const char *display_text;
|
||||
} redisCommandArg;
|
||||
} redictCommandArg;
|
||||
|
||||
/* Returns the command group name by group number. */
|
||||
const char *commandGroupStr(int index);
|
||||
|
@ -456,7 +456,7 @@ static int moduleValidateCommandArgs(RedictModuleCommandArg *args,
|
||||
const RedictModuleCommandInfoVersion *version);
|
||||
static struct redictCommandArg *moduleCopyCommandArgs(RedictModuleCommandArg *args,
|
||||
const RedictModuleCommandInfoVersion *version);
|
||||
static redisCommandArgType moduleConvertArgType(RedictModuleCommandArgType type, int *error);
|
||||
static redictCommandArgType moduleConvertArgType(RedictModuleCommandArgType type, int *error);
|
||||
static int moduleConvertArgFlags(int flags);
|
||||
void moduleCreateContext(RedictModuleCtx *out_ctx, RedictModule *module, int ctx_flags);
|
||||
|
||||
@ -2157,7 +2157,7 @@ static struct redictCommandArg *moduleCopyCommandArgs(RedictModuleCommandArg *ar
|
||||
size_t count = 0;
|
||||
while (moduleCmdArgAt(version, args, count)->name) count++;
|
||||
serverAssert(count < SIZE_MAX / sizeof(struct redictCommandArg));
|
||||
struct redictCommandArg *realargs = zcalloc((count+1) * sizeof(redisCommandArg));
|
||||
struct redictCommandArg *realargs = zcalloc((count+1) * sizeof(redictCommandArg));
|
||||
|
||||
for (size_t j = 0; j < count; j++) {
|
||||
RedictModuleCommandArg *arg = moduleCmdArgAt(version, args, j);
|
||||
@ -2178,7 +2178,7 @@ static struct redictCommandArg *moduleCopyCommandArgs(RedictModuleCommandArg *ar
|
||||
return realargs;
|
||||
}
|
||||
|
||||
static redisCommandArgType moduleConvertArgType(RedictModuleCommandArgType type, int *error) {
|
||||
static redictCommandArgType moduleConvertArgType(RedictModuleCommandArgType type, int *error) {
|
||||
if (error) *error = 0;
|
||||
switch (type) {
|
||||
case REDICTMODULE_ARG_TYPE_STRING: return ARG_TYPE_STRING;
|
||||
|
@ -19,14 +19,13 @@
|
||||
#include <math.h>
|
||||
#include <pthread.h>
|
||||
|
||||
#include <sdscompat.h> /* Use hiredis' sds compat header that maps sds calls to their hi_ variants */
|
||||
#include <sds.h> /* Use hiredis sds. */
|
||||
#include <sds.h>
|
||||
#include "ae.h"
|
||||
#include <hiredis.h>
|
||||
#include <hiredict.h>
|
||||
#ifdef USE_OPENSSL
|
||||
#include <openssl/ssl.h>
|
||||
#include <openssl/err.h>
|
||||
#include <hiredis_ssl.h>
|
||||
#include <hiredict_ssl.h>
|
||||
#endif
|
||||
#include "adlist.h"
|
||||
#include "dict.h"
|
||||
@ -53,7 +52,7 @@
|
||||
|
||||
struct benchmarkThread;
|
||||
struct clusterNode;
|
||||
struct redisConfig;
|
||||
struct redictConfig;
|
||||
|
||||
static struct config {
|
||||
aeEventLoop *el;
|
||||
@ -92,7 +91,7 @@ static struct config {
|
||||
int cluster_mode;
|
||||
int cluster_node_count;
|
||||
struct clusterNode **cluster_nodes;
|
||||
struct redisConfig *redis_config;
|
||||
struct redictConfig *redict_config;
|
||||
struct hdr_histogram* latency_histogram;
|
||||
struct hdr_histogram* current_sec_latency_histogram;
|
||||
redictAtomic int is_fetching_slots;
|
||||
@ -105,7 +104,7 @@ static struct config {
|
||||
} config;
|
||||
|
||||
typedef struct _client {
|
||||
redisContext *context;
|
||||
redictContext *context;
|
||||
sds obuf;
|
||||
char **randptr; /* Pointers to :rand: strings inside the command buf */
|
||||
size_t randlen; /* Number of pointers in client->randptr */
|
||||
@ -152,13 +151,13 @@ typedef struct clusterNode {
|
||||
* strings are the source node IDs. */
|
||||
int migrating_count; /* Length of the migrating array (migrating slots*2) */
|
||||
int importing_count; /* Length of the importing array (importing slots*2) */
|
||||
struct redisConfig *redis_config;
|
||||
struct redictConfig *redict_config;
|
||||
} clusterNode;
|
||||
|
||||
typedef struct redisConfig {
|
||||
typedef struct redictConfig {
|
||||
sds save;
|
||||
sds appendonly;
|
||||
} redisConfig;
|
||||
} redictConfig;
|
||||
|
||||
/* Prototypes */
|
||||
static void writeHandler(aeEventLoop *el, int fd, void *privdata, int mask);
|
||||
@ -168,11 +167,11 @@ static void freeBenchmarkThread(benchmarkThread *thread);
|
||||
static void freeBenchmarkThreads(void);
|
||||
static void *execBenchmarkThread(void *ptr);
|
||||
static clusterNode *createClusterNode(char *ip, int port);
|
||||
static redisConfig *getRedisConfig(const char *ip, int port,
|
||||
static redictConfig *getRedictConfig(const char *ip, int port,
|
||||
const char *hostsocket);
|
||||
static redisContext *getRedisContext(const char *ip, int port,
|
||||
static redictContext *getRedictContext(const char *ip, int port,
|
||||
const char *hostsocket);
|
||||
static void freeRedisConfig(redisConfig *cfg);
|
||||
static void freeRedictConfig(redictConfig *cfg);
|
||||
static int fetchClusterSlotsConfiguration(client c);
|
||||
static void updateClusterSlotsConfiguration(void);
|
||||
int showThroughput(struct aeEventLoop *eventLoop, long long id,
|
||||
@ -212,15 +211,15 @@ static int dictSdsKeyCompare(dict *d, const void *key1, const void *key2)
|
||||
return memcmp(key1, key2, l1) == 0;
|
||||
}
|
||||
|
||||
static redisContext *getRedisContext(const char *ip, int port,
|
||||
static redictContext *getRedictContext(const char *ip, int port,
|
||||
const char *hostsocket)
|
||||
{
|
||||
redisContext *ctx = NULL;
|
||||
redisReply *reply = NULL;
|
||||
redictContext *ctx = NULL;
|
||||
redictReply *reply = NULL;
|
||||
if (hostsocket == NULL)
|
||||
ctx = redisConnect(ip, port);
|
||||
ctx = redictConnect(ip, port);
|
||||
else
|
||||
ctx = redisConnectUnix(hostsocket);
|
||||
ctx = redictConnectUnix(hostsocket);
|
||||
if (ctx == NULL || ctx->err) {
|
||||
fprintf(stderr,"Could not connect to Redict at ");
|
||||
char *err = (ctx != NULL ? ctx->errstr : "");
|
||||
@ -232,7 +231,7 @@ static redisContext *getRedisContext(const char *ip, int port,
|
||||
}
|
||||
if (config.tls==1) {
|
||||
const char *err = NULL;
|
||||
if (cliSecureConnection(ctx, config.sslconfig, &err) == REDIS_ERR && err) {
|
||||
if (cliSecureConnection(ctx, config.sslconfig, &err) == REDICT_ERR && err) {
|
||||
fprintf(stderr, "Could not negotiate a TLS connection: %s\n", err);
|
||||
goto cleanup;
|
||||
}
|
||||
@ -240,17 +239,17 @@ static redisContext *getRedisContext(const char *ip, int port,
|
||||
if (config.conn_info.auth == NULL)
|
||||
return ctx;
|
||||
if (config.conn_info.user == NULL)
|
||||
reply = redisCommand(ctx,"AUTH %s", config.conn_info.auth);
|
||||
reply = redictCommand(ctx,"AUTH %s", config.conn_info.auth);
|
||||
else
|
||||
reply = redisCommand(ctx,"AUTH %s %s", config.conn_info.user, config.conn_info.auth);
|
||||
reply = redictCommand(ctx,"AUTH %s %s", config.conn_info.user, config.conn_info.auth);
|
||||
if (reply != NULL) {
|
||||
if (reply->type == REDIS_REPLY_ERROR) {
|
||||
if (reply->type == REDICT_REPLY_ERROR) {
|
||||
if (hostsocket == NULL)
|
||||
fprintf(stderr, "Node %s:%d replied with error:\n%s\n", ip, port, reply->str);
|
||||
else
|
||||
fprintf(stderr, "Node %s replied with error:\n%s\n", hostsocket, reply->str);
|
||||
freeReplyObject(reply);
|
||||
redisFree(ctx);
|
||||
redictFree(ctx);
|
||||
exit(1);
|
||||
}
|
||||
freeReplyObject(reply);
|
||||
@ -263,38 +262,38 @@ static redisContext *getRedisContext(const char *ip, int port,
|
||||
fprintf(stderr, "%s\n", hostsocket);
|
||||
cleanup:
|
||||
freeReplyObject(reply);
|
||||
redisFree(ctx);
|
||||
redictFree(ctx);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
||||
|
||||
static redisConfig *getRedisConfig(const char *ip, int port,
|
||||
static redictConfig *getRedictConfig(const char *ip, int port,
|
||||
const char *hostsocket)
|
||||
{
|
||||
redisConfig *cfg = zcalloc(sizeof(*cfg));
|
||||
redictConfig *cfg = zcalloc(sizeof(*cfg));
|
||||
if (!cfg) return NULL;
|
||||
redisContext *c = NULL;
|
||||
redisReply *reply = NULL, *sub_reply = NULL;
|
||||
c = getRedisContext(ip, port, hostsocket);
|
||||
redictContext *c = NULL;
|
||||
redictReply *reply = NULL, *sub_reply = NULL;
|
||||
c = getRedictContext(ip, port, hostsocket);
|
||||
if (c == NULL) {
|
||||
freeRedisConfig(cfg);
|
||||
freeRedictConfig(cfg);
|
||||
exit(1);
|
||||
}
|
||||
redisAppendCommand(c, "CONFIG GET %s", "save");
|
||||
redisAppendCommand(c, "CONFIG GET %s", "appendonly");
|
||||
redictAppendCommand(c, "CONFIG GET %s", "save");
|
||||
redictAppendCommand(c, "CONFIG GET %s", "appendonly");
|
||||
int abort_test = 0;
|
||||
int i = 0;
|
||||
void *r = NULL;
|
||||
for (; i < 2; i++) {
|
||||
int res = redisGetReply(c, &r);
|
||||
int res = redictGetReply(c, &r);
|
||||
if (reply) freeReplyObject(reply);
|
||||
reply = res == REDIS_OK ? ((redisReply *) r) : NULL;
|
||||
if (res != REDIS_OK || !r) goto fail;
|
||||
if (reply->type == REDIS_REPLY_ERROR) {
|
||||
reply = res == REDICT_OK ? ((redictReply *) r) : NULL;
|
||||
if (res != REDICT_OK || !r) goto fail;
|
||||
if (reply->type == REDICT_REPLY_ERROR) {
|
||||
goto fail;
|
||||
}
|
||||
if (reply->type != REDIS_REPLY_ARRAY || reply->elements < 2) goto fail;
|
||||
if (reply->type != REDICT_REPLY_ARRAY || reply->elements < 2) goto fail;
|
||||
sub_reply = reply->element[1];
|
||||
char *value = sub_reply->str;
|
||||
if (!value) value = "";
|
||||
@ -304,10 +303,10 @@ static redisConfig *getRedisConfig(const char *ip, int port,
|
||||
}
|
||||
}
|
||||
freeReplyObject(reply);
|
||||
redisFree(c);
|
||||
redictFree(c);
|
||||
return cfg;
|
||||
fail:
|
||||
if (reply && reply->type == REDIS_REPLY_ERROR &&
|
||||
if (reply && reply->type == REDICT_REPLY_ERROR &&
|
||||
!strncmp(reply->str,"NOAUTH",6)) {
|
||||
if (hostsocket == NULL)
|
||||
fprintf(stderr, "Node %s:%d replied with error:\n%s\n", ip, port, reply->str);
|
||||
@ -316,12 +315,12 @@ fail:
|
||||
abort_test = 1;
|
||||
}
|
||||
freeReplyObject(reply);
|
||||
redisFree(c);
|
||||
freeRedisConfig(cfg);
|
||||
redictFree(c);
|
||||
freeRedictConfig(cfg);
|
||||
if (abort_test) exit(1);
|
||||
return NULL;
|
||||
}
|
||||
static void freeRedisConfig(redisConfig *cfg) {
|
||||
static void freeRedictConfig(redictConfig *cfg) {
|
||||
if (cfg->save) sdsfree(cfg->save);
|
||||
if (cfg->appendonly) sdsfree(cfg->appendonly);
|
||||
zfree(cfg);
|
||||
@ -339,7 +338,7 @@ static void freeClient(client c) {
|
||||
aeStop(el);
|
||||
}
|
||||
}
|
||||
redisFree(c->context);
|
||||
redictFree(c->context);
|
||||
sdsfree(c->obuf);
|
||||
zfree(c->randptr);
|
||||
zfree(c->stagptr);
|
||||
@ -447,22 +446,22 @@ static void readHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
|
||||
* is not part of the latency, so calculate it only once, here. */
|
||||
if (c->latency < 0) c->latency = ustime()-(c->start);
|
||||
|
||||
if (redisBufferRead(c->context) != REDIS_OK) {
|
||||
if (redictBufferRead(c->context) != REDICT_OK) {
|
||||
fprintf(stderr,"Error: %s\n",c->context->errstr);
|
||||
exit(1);
|
||||
} else {
|
||||
while(c->pending) {
|
||||
if (redisGetReply(c->context,&reply) != REDIS_OK) {
|
||||
if (redictGetReply(c->context,&reply) != REDICT_OK) {
|
||||
fprintf(stderr,"Error: %s\n",c->context->errstr);
|
||||
exit(1);
|
||||
}
|
||||
if (reply != NULL) {
|
||||
if (reply == (void*)REDIS_REPLY_ERROR) {
|
||||
if (reply == (void*)REDICT_REPLY_ERROR) {
|
||||
fprintf(stderr,"Unexpected error reply, exiting...\n");
|
||||
exit(1);
|
||||
}
|
||||
redisReply *r = reply;
|
||||
if (r->type == REDIS_REPLY_ERROR) {
|
||||
redictReply *r = reply;
|
||||
if (r->type == REDICT_REPLY_ERROR) {
|
||||
/* Try to update slots configuration if reply error is
|
||||
* MOVED/ASK/CLUSTERDOWN and the key(s) used by the command
|
||||
* contain(s) the slot hash tag.
|
||||
@ -642,9 +641,9 @@ static client createClient(char *cmd, size_t len, client from, int thread_id) {
|
||||
port = node->port;
|
||||
c->cluster_node = node;
|
||||
}
|
||||
c->context = redisConnectNonBlock(ip,port);
|
||||
c->context = redictConnectNonBlock(ip,port);
|
||||
} else {
|
||||
c->context = redisConnectUnixNonBlock(config.hostsocket);
|
||||
c->context = redictConnectUnixNonBlock(config.hostsocket);
|
||||
}
|
||||
if (c->context->err) {
|
||||
fprintf(stderr,"Could not connect to Redict at ");
|
||||
@ -656,13 +655,13 @@ static client createClient(char *cmd, size_t len, client from, int thread_id) {
|
||||
}
|
||||
if (config.tls==1) {
|
||||
const char *err = NULL;
|
||||
if (cliSecureConnection(c->context, config.sslconfig, &err) == REDIS_ERR && err) {
|
||||
if (cliSecureConnection(c->context, config.sslconfig, &err) == REDICT_ERR && err) {
|
||||
fprintf(stderr, "Could not negotiate a TLS connection: %s\n", err);
|
||||
exit(1);
|
||||
}
|
||||
}
|
||||
c->thread_id = thread_id;
|
||||
/* Suppress hiredis cleanup of unused buffers for max speed. */
|
||||
/* Suppress hiredict cleanup of unused buffers for max speed. */
|
||||
c->context->reader->maxbuf = 0;
|
||||
|
||||
/* Build the request buffer:
|
||||
@ -677,9 +676,9 @@ static client createClient(char *cmd, size_t len, client from, int thread_id) {
|
||||
char *buf = NULL;
|
||||
int len;
|
||||
if (config.conn_info.user == NULL)
|
||||
len = redisFormatCommand(&buf, "AUTH %s", config.conn_info.auth);
|
||||
len = redictFormatCommand(&buf, "AUTH %s", config.conn_info.auth);
|
||||
else
|
||||
len = redisFormatCommand(&buf, "AUTH %s %s",
|
||||
len = redictFormatCommand(&buf, "AUTH %s %s",
|
||||
config.conn_info.user, config.conn_info.auth);
|
||||
c->obuf = sdscatlen(c->obuf, buf, len);
|
||||
free(buf);
|
||||
@ -688,7 +687,7 @@ static client createClient(char *cmd, size_t len, client from, int thread_id) {
|
||||
|
||||
if (config.enable_tracking) {
|
||||
char *buf = NULL;
|
||||
int len = redisFormatCommand(&buf, "CLIENT TRACKING on");
|
||||
int len = redictFormatCommand(&buf, "CLIENT TRACKING on");
|
||||
c->obuf = sdscatlen(c->obuf, buf, len);
|
||||
free(buf);
|
||||
c->prefix_pending++;
|
||||
@ -706,7 +705,7 @@ static client createClient(char *cmd, size_t len, client from, int thread_id) {
|
||||
|
||||
if (config.resp3) {
|
||||
char *buf = NULL;
|
||||
int len = redisFormatCommand(&buf, "HELLO 3");
|
||||
int len = redictFormatCommand(&buf, "HELLO 3");
|
||||
c->obuf = sdscatlen(c->obuf, buf, len);
|
||||
free(buf);
|
||||
c->prefix_pending++;
|
||||
@ -847,7 +846,7 @@ static void showLatencyReport(void) {
|
||||
int m ;
|
||||
for (m = 0; m < config.cluster_node_count; m++) {
|
||||
clusterNode *node = config.cluster_nodes[m];
|
||||
redisConfig *cfg = node->redis_config;
|
||||
redictConfig *cfg = node->redict_config;
|
||||
if (cfg == NULL) continue;
|
||||
printf(" node [%d] configuration:\n",m );
|
||||
printf(" save: %s\n",
|
||||
@ -855,11 +854,11 @@ static void showLatencyReport(void) {
|
||||
printf(" appendonly: %s\n", cfg->appendonly);
|
||||
}
|
||||
} else {
|
||||
if (config.redis_config) {
|
||||
if (config.redict_config) {
|
||||
printf(" host configuration \"save\": %s\n",
|
||||
config.redis_config->save);
|
||||
config.redict_config->save);
|
||||
printf(" host configuration \"appendonly\": %s\n",
|
||||
config.redis_config->appendonly);
|
||||
config.redict_config->appendonly);
|
||||
}
|
||||
}
|
||||
printf(" multi-thread: %s\n", (config.num_threads ? "yes" : "no"));
|
||||
@ -1028,7 +1027,7 @@ static clusterNode *createClusterNode(char *ip, int port) {
|
||||
node->importing = NULL;
|
||||
node->migrating_count = 0;
|
||||
node->importing_count = 0;
|
||||
node->redis_config = NULL;
|
||||
node->redict_config = NULL;
|
||||
return node;
|
||||
}
|
||||
|
||||
@ -1048,7 +1047,7 @@ static void freeClusterNode(clusterNode *node) {
|
||||
* config.conn_info.hostip and config.conn_info.hostport, then the node ip has been
|
||||
* allocated by fetchClusterConfiguration, so it must be freed. */
|
||||
if (node->ip && strcmp(node->ip, config.conn_info.hostip) != 0) sdsfree(node->ip);
|
||||
if (node->redis_config != NULL) freeRedisConfig(node->redis_config);
|
||||
if (node->redict_config != NULL) freeRedictConfig(node->redict_config);
|
||||
zfree(node->slots);
|
||||
zfree(node);
|
||||
}
|
||||
@ -1077,19 +1076,19 @@ static clusterNode **addClusterNode(clusterNode *node) {
|
||||
*/
|
||||
static int fetchClusterConfiguration(void) {
|
||||
int success = 1;
|
||||
redisContext *ctx = NULL;
|
||||
redisReply *reply = NULL;
|
||||
ctx = getRedisContext(config.conn_info.hostip, config.conn_info.hostport, config.hostsocket);
|
||||
redictContext *ctx = NULL;
|
||||
redictReply *reply = NULL;
|
||||
ctx = getRedictContext(config.conn_info.hostip, config.conn_info.hostport, config.hostsocket);
|
||||
if (ctx == NULL) {
|
||||
exit(1);
|
||||
}
|
||||
clusterNode *firstNode = createClusterNode((char *) config.conn_info.hostip,
|
||||
config.conn_info.hostport);
|
||||
if (!firstNode) {success = 0; goto cleanup;}
|
||||
reply = redisCommand(ctx, "CLUSTER NODES");
|
||||
reply = redictCommand(ctx, "CLUSTER NODES");
|
||||
success = (reply != NULL);
|
||||
if (!success) goto cleanup;
|
||||
success = (reply->type != REDIS_REPLY_ERROR);
|
||||
success = (reply->type != REDICT_REPLY_ERROR);
|
||||
if (!success) {
|
||||
if (config.hostsocket == NULL) {
|
||||
fprintf(stderr, "Cluster node %s:%d replied with error:\n%s\n",
|
||||
@ -1233,7 +1232,7 @@ static int fetchClusterConfiguration(void) {
|
||||
}
|
||||
}
|
||||
cleanup:
|
||||
if (ctx) redisFree(ctx);
|
||||
if (ctx) redictFree(ctx);
|
||||
if (!success) {
|
||||
if (config.cluster_nodes) freeClusterNodes();
|
||||
}
|
||||
@ -1252,7 +1251,7 @@ static int fetchClusterSlotsConfiguration(client c) {
|
||||
c->slots_last_update = last_update;
|
||||
return -1;
|
||||
}
|
||||
redisReply *reply = NULL;
|
||||
redictReply *reply = NULL;
|
||||
atomicGetIncr(config.is_fetching_slots, is_fetching_slots, 1);
|
||||
if (is_fetching_slots) return -1; //TODO: use other codes || errno ?
|
||||
atomicSet(config.is_fetching_slots, 1);
|
||||
@ -1270,7 +1269,7 @@ static int fetchClusterSlotsConfiguration(client c) {
|
||||
};
|
||||
/* printf("[%d] fetchClusterSlotsConfiguration\n", c->thread_id); */
|
||||
dict *masters = dictCreate(&dtype);
|
||||
redisContext *ctx = NULL;
|
||||
redictContext *ctx = NULL;
|
||||
for (i = 0; i < (size_t) config.cluster_node_count; i++) {
|
||||
clusterNode *node = config.cluster_nodes[i];
|
||||
assert(node->ip != NULL);
|
||||
@ -1278,7 +1277,7 @@ static int fetchClusterSlotsConfiguration(client c) {
|
||||
assert(node->port);
|
||||
/* Use first node as entry point to connect to. */
|
||||
if (ctx == NULL) {
|
||||
ctx = getRedisContext(node->ip, node->port, NULL);
|
||||
ctx = getRedictContext(node->ip, node->port, NULL);
|
||||
if (!ctx) {
|
||||
success = 0;
|
||||
goto cleanup;
|
||||
@ -1290,23 +1289,23 @@ static int fetchClusterSlotsConfiguration(client c) {
|
||||
node->updated_slots_count = 0;
|
||||
dictReplace(masters, node->name, node) ;
|
||||
}
|
||||
reply = redisCommand(ctx, "CLUSTER SLOTS");
|
||||
if (reply == NULL || reply->type == REDIS_REPLY_ERROR) {
|
||||
reply = redictCommand(ctx, "CLUSTER SLOTS");
|
||||
if (reply == NULL || reply->type == REDICT_REPLY_ERROR) {
|
||||
success = 0;
|
||||
if (reply)
|
||||
fprintf(stderr,"%s\nCLUSTER SLOTS ERROR: %s\n",errmsg,reply->str);
|
||||
goto cleanup;
|
||||
}
|
||||
assert(reply->type == REDIS_REPLY_ARRAY);
|
||||
assert(reply->type == REDICT_REPLY_ARRAY);
|
||||
for (i = 0; i < reply->elements; i++) {
|
||||
redisReply *r = reply->element[i];
|
||||
assert(r->type == REDIS_REPLY_ARRAY);
|
||||
redictReply *r = reply->element[i];
|
||||
assert(r->type == REDICT_REPLY_ARRAY);
|
||||
assert(r->elements >= 3);
|
||||
int from, to, slot;
|
||||
from = r->element[0]->integer;
|
||||
to = r->element[1]->integer;
|
||||
redisReply *nr = r->element[2];
|
||||
assert(nr->type == REDIS_REPLY_ARRAY && nr->elements >= 3);
|
||||
redictReply *nr = r->element[2];
|
||||
assert(nr->type == REDICT_REPLY_ARRAY && nr->elements >= 3);
|
||||
assert(nr->element[2]->str != NULL);
|
||||
sds name = sdsnew(nr->element[2]->str);
|
||||
dictEntry *entry = dictFind(masters, name);
|
||||
@ -1327,7 +1326,7 @@ static int fetchClusterSlotsConfiguration(client c) {
|
||||
updateClusterSlotsConfiguration();
|
||||
cleanup:
|
||||
freeReplyObject(reply);
|
||||
redisFree(ctx);
|
||||
redictFree(ctx);
|
||||
dictRelease(masters);
|
||||
atomicSet(config.is_fetching_slots, 0);
|
||||
return success;
|
||||
@ -1354,7 +1353,7 @@ static void updateClusterSlotsConfiguration(void) {
|
||||
pthread_mutex_unlock(&config.is_updating_slots_mutex);
|
||||
}
|
||||
|
||||
/* Generate random data for redis benchmark. See #7196. */
|
||||
/* Generate random data for redict benchmark. See #7196. */
|
||||
static void genBenchmarkRandomData(char *data, int count) {
|
||||
static uint32_t state = 1234;
|
||||
int i = 0;
|
||||
@ -1412,7 +1411,7 @@ int parseOptions(int argc, char **argv) {
|
||||
if (lastarg) goto invalid;
|
||||
config.conn_info.user = sdsnew(argv[++i]);
|
||||
} else if (!strcmp(argv[i],"-u") && !lastarg) {
|
||||
parseRedisUri(argv[++i],"redict-benchmark",&config.conn_info,&config.tls);
|
||||
parseRedictUri(argv[++i],"redict-benchmark",&config.conn_info,&config.tls);
|
||||
if (config.conn_info.hostport < 0 || config.conn_info.hostport > 65535) {
|
||||
fprintf(stderr, "Invalid server port.\n");
|
||||
exit(1);
|
||||
@ -1622,7 +1621,7 @@ tls_usage,
|
||||
" Benchmark 127.0.0.1:6379 for a few commands producing CSV output:\n"
|
||||
" $ redict-benchmark -t ping,set,get -n 100000 --csv\n\n"
|
||||
" Benchmark a specific command line:\n"
|
||||
" $ redict-benchmark -r 10000 -n 10000 eval 'return redis.call(\"ping\")' 0\n\n"
|
||||
" $ redict-benchmark -r 10000 -n 10000 eval 'return redict.call(\"ping\")' 0\n\n"
|
||||
" Fill a list with 10000 random elements:\n"
|
||||
" $ redict-benchmark -r 10000 -n 10000 lpush mylist __rand_int__\n\n"
|
||||
" On user specified command lines __rand_int__ is replaced with a random integer\n"
|
||||
@ -1730,7 +1729,7 @@ int main(int argc, char **argv) {
|
||||
config.cluster_mode = 0;
|
||||
config.cluster_node_count = 0;
|
||||
config.cluster_nodes = NULL;
|
||||
config.redis_config = NULL;
|
||||
config.redict_config = NULL;
|
||||
config.is_fetching_slots = 0;
|
||||
config.is_updating_slots = 0;
|
||||
config.slots_last_update = 0;
|
||||
@ -1780,8 +1779,8 @@ int main(int argc, char **argv) {
|
||||
printf("Master %d: ", i);
|
||||
if (node->name) printf("%s ", node->name);
|
||||
printf("%s:%d\n", node->ip, node->port);
|
||||
node->redis_config = getRedisConfig(node->ip, node->port, NULL);
|
||||
if (node->redis_config == NULL) {
|
||||
node->redict_config = getRedictConfig(node->ip, node->port, NULL);
|
||||
if (node->redict_config == NULL) {
|
||||
fprintf(stderr, "WARNING: Could not fetch node CONFIG %s:%d\n",
|
||||
node->ip, node->port);
|
||||
}
|
||||
@ -1792,9 +1791,9 @@ int main(int argc, char **argv) {
|
||||
if (config.num_threads == 0)
|
||||
config.num_threads = config.cluster_node_count;
|
||||
} else {
|
||||
config.redis_config =
|
||||
getRedisConfig(config.conn_info.hostip, config.conn_info.hostport, config.hostsocket);
|
||||
if (config.redis_config == NULL) {
|
||||
config.redict_config =
|
||||
getRedictConfig(config.conn_info.hostip, config.conn_info.hostport, config.hostsocket);
|
||||
if (config.redict_config == NULL) {
|
||||
fprintf(stderr, "WARNING: Could not fetch server CONFIG\n");
|
||||
}
|
||||
}
|
||||
@ -1852,7 +1851,7 @@ int main(int argc, char **argv) {
|
||||
for (i = 0; i < argc; i++)
|
||||
argvlen[i] = sdslen(sds_args[i]);
|
||||
do {
|
||||
len = redisFormatCommandArgv(&cmd,argc,(const char**)sds_args,argvlen);
|
||||
len = redictFormatCommandArgv(&cmd,argc,(const char**)sds_args,argvlen);
|
||||
// adjust the datasize to the parsed command
|
||||
config.datasize = len;
|
||||
benchmark(title,cmd,len);
|
||||
@ -1861,7 +1860,7 @@ int main(int argc, char **argv) {
|
||||
sdsfreesplitres(sds_args, argc);
|
||||
|
||||
sdsfree(title);
|
||||
if (config.redis_config != NULL) freeRedisConfig(config.redis_config);
|
||||
if (config.redict_config != NULL) freeRedictConfig(config.redict_config);
|
||||
zfree(argvlen);
|
||||
return 0;
|
||||
}
|
||||
@ -1876,69 +1875,69 @@ int main(int argc, char **argv) {
|
||||
benchmark("PING_INLINE","PING\r\n",6);
|
||||
|
||||
if (test_is_selected("ping_mbulk") || test_is_selected("ping")) {
|
||||
len = redisFormatCommand(&cmd,"PING");
|
||||
len = redictFormatCommand(&cmd,"PING");
|
||||
benchmark("PING_MBULK",cmd,len);
|
||||
free(cmd);
|
||||
}
|
||||
|
||||
if (test_is_selected("set")) {
|
||||
len = redisFormatCommand(&cmd,"SET key%s:__rand_int__ %s",tag,data);
|
||||
len = redictFormatCommand(&cmd,"SET key%s:__rand_int__ %s",tag,data);
|
||||
benchmark("SET",cmd,len);
|
||||
free(cmd);
|
||||
}
|
||||
|
||||
if (test_is_selected("get")) {
|
||||
len = redisFormatCommand(&cmd,"GET key%s:__rand_int__",tag);
|
||||
len = redictFormatCommand(&cmd,"GET key%s:__rand_int__",tag);
|
||||
benchmark("GET",cmd,len);
|
||||
free(cmd);
|
||||
}
|
||||
|
||||
if (test_is_selected("incr")) {
|
||||
len = redisFormatCommand(&cmd,"INCR counter%s:__rand_int__",tag);
|
||||
len = redictFormatCommand(&cmd,"INCR counter%s:__rand_int__",tag);
|
||||
benchmark("INCR",cmd,len);
|
||||
free(cmd);
|
||||
}
|
||||
|
||||
if (test_is_selected("lpush")) {
|
||||
len = redisFormatCommand(&cmd,"LPUSH mylist%s %s",tag,data);
|
||||
len = redictFormatCommand(&cmd,"LPUSH mylist%s %s",tag,data);
|
||||
benchmark("LPUSH",cmd,len);
|
||||
free(cmd);
|
||||
}
|
||||
|
||||
if (test_is_selected("rpush")) {
|
||||
len = redisFormatCommand(&cmd,"RPUSH mylist%s %s",tag,data);
|
||||
len = redictFormatCommand(&cmd,"RPUSH mylist%s %s",tag,data);
|
||||
benchmark("RPUSH",cmd,len);
|
||||
free(cmd);
|
||||
}
|
||||
|
||||
if (test_is_selected("lpop")) {
|
||||
len = redisFormatCommand(&cmd,"LPOP mylist%s",tag);
|
||||
len = redictFormatCommand(&cmd,"LPOP mylist%s",tag);
|
||||
benchmark("LPOP",cmd,len);
|
||||
free(cmd);
|
||||
}
|
||||
|
||||
if (test_is_selected("rpop")) {
|
||||
len = redisFormatCommand(&cmd,"RPOP mylist%s",tag);
|
||||
len = redictFormatCommand(&cmd,"RPOP mylist%s",tag);
|
||||
benchmark("RPOP",cmd,len);
|
||||
free(cmd);
|
||||
}
|
||||
|
||||
if (test_is_selected("sadd")) {
|
||||
len = redisFormatCommand(&cmd,
|
||||
len = redictFormatCommand(&cmd,
|
||||
"SADD myset%s element:__rand_int__",tag);
|
||||
benchmark("SADD",cmd,len);
|
||||
free(cmd);
|
||||
}
|
||||
|
||||
if (test_is_selected("hset")) {
|
||||
len = redisFormatCommand(&cmd,
|
||||
len = redictFormatCommand(&cmd,
|
||||
"HSET myhash%s element:__rand_int__ %s",tag,data);
|
||||
benchmark("HSET",cmd,len);
|
||||
free(cmd);
|
||||
}
|
||||
|
||||
if (test_is_selected("spop")) {
|
||||
len = redisFormatCommand(&cmd,"SPOP myset%s",tag);
|
||||
len = redictFormatCommand(&cmd,"SPOP myset%s",tag);
|
||||
benchmark("SPOP",cmd,len);
|
||||
free(cmd);
|
||||
}
|
||||
@ -1946,14 +1945,14 @@ int main(int argc, char **argv) {
|
||||
if (test_is_selected("zadd")) {
|
||||
char *score = "0";
|
||||
if (config.randomkeys) score = "__rand_int__";
|
||||
len = redisFormatCommand(&cmd,
|
||||
len = redictFormatCommand(&cmd,
|
||||
"ZADD myzset%s %s element:__rand_int__",tag,score);
|
||||
benchmark("ZADD",cmd,len);
|
||||
free(cmd);
|
||||
}
|
||||
|
||||
if (test_is_selected("zpopmin")) {
|
||||
len = redisFormatCommand(&cmd,"ZPOPMIN myzset%s",tag);
|
||||
len = redictFormatCommand(&cmd,"ZPOPMIN myzset%s",tag);
|
||||
benchmark("ZPOPMIN",cmd,len);
|
||||
free(cmd);
|
||||
}
|
||||
@ -1964,31 +1963,31 @@ int main(int argc, char **argv) {
|
||||
test_is_selected("lrange_500") ||
|
||||
test_is_selected("lrange_600"))
|
||||
{
|
||||
len = redisFormatCommand(&cmd,"LPUSH mylist%s %s",tag,data);
|
||||
len = redictFormatCommand(&cmd,"LPUSH mylist%s %s",tag,data);
|
||||
benchmark("LPUSH (needed to benchmark LRANGE)",cmd,len);
|
||||
free(cmd);
|
||||
}
|
||||
|
||||
if (test_is_selected("lrange") || test_is_selected("lrange_100")) {
|
||||
len = redisFormatCommand(&cmd,"LRANGE mylist%s 0 99",tag);
|
||||
len = redictFormatCommand(&cmd,"LRANGE mylist%s 0 99",tag);
|
||||
benchmark("LRANGE_100 (first 100 elements)",cmd,len);
|
||||
free(cmd);
|
||||
}
|
||||
|
||||
if (test_is_selected("lrange") || test_is_selected("lrange_300")) {
|
||||
len = redisFormatCommand(&cmd,"LRANGE mylist%s 0 299",tag);
|
||||
len = redictFormatCommand(&cmd,"LRANGE mylist%s 0 299",tag);
|
||||
benchmark("LRANGE_300 (first 300 elements)",cmd,len);
|
||||
free(cmd);
|
||||
}
|
||||
|
||||
if (test_is_selected("lrange") || test_is_selected("lrange_500")) {
|
||||
len = redisFormatCommand(&cmd,"LRANGE mylist%s 0 499",tag);
|
||||
len = redictFormatCommand(&cmd,"LRANGE mylist%s 0 499",tag);
|
||||
benchmark("LRANGE_500 (first 500 elements)",cmd,len);
|
||||
free(cmd);
|
||||
}
|
||||
|
||||
if (test_is_selected("lrange") || test_is_selected("lrange_600")) {
|
||||
len = redisFormatCommand(&cmd,"LRANGE mylist%s 0 599",tag);
|
||||
len = redictFormatCommand(&cmd,"LRANGE mylist%s 0 599",tag);
|
||||
benchmark("LRANGE_600 (first 600 elements)",cmd,len);
|
||||
free(cmd);
|
||||
}
|
||||
@ -2001,24 +2000,24 @@ int main(int argc, char **argv) {
|
||||
cmd_argv[i] = key_placeholder;
|
||||
cmd_argv[i+1] = data;
|
||||
}
|
||||
len = redisFormatCommandArgv(&cmd,21,cmd_argv,NULL);
|
||||
len = redictFormatCommandArgv(&cmd,21,cmd_argv,NULL);
|
||||
benchmark("MSET (10 keys)",cmd,len);
|
||||
free(cmd);
|
||||
sdsfree(key_placeholder);
|
||||
}
|
||||
|
||||
if (test_is_selected("xadd")) {
|
||||
len = redisFormatCommand(&cmd,"XADD mystream%s * myfield %s", tag, data);
|
||||
len = redictFormatCommand(&cmd,"XADD mystream%s * myfield %s", tag, data);
|
||||
benchmark("XADD",cmd,len);
|
||||
free(cmd);
|
||||
}
|
||||
free(cmd);
|
||||
}
|
||||
|
||||
if (!config.csv) printf("\n");
|
||||
} while(config.loop);
|
||||
|
||||
zfree(data);
|
||||
freeCliConnInfo(config.conn_info);
|
||||
if (config.redis_config != NULL) freeRedisConfig(config.redis_config);
|
||||
if (config.redict_config != NULL) freeRedictConfig(config.redict_config);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
1071
src/redict-cli.c
1071
src/redict-cli.c
File diff suppressed because it is too large
Load Diff
242
src/sentinel.c
242
src/sentinel.c
@ -6,10 +6,10 @@
|
||||
// SPDX-License-Identifier: LGPL-3.0-only
|
||||
|
||||
#include "server.h"
|
||||
#include "hiredis.h"
|
||||
#include "hiredict.h"
|
||||
#if USE_OPENSSL == 1 /* BUILD_YES */
|
||||
#include "openssl/ssl.h"
|
||||
#include "hiredis_ssl.h"
|
||||
#include "hiredict_ssl.h"
|
||||
#endif
|
||||
#include "async.h"
|
||||
|
||||
@ -117,12 +117,12 @@ static mstime_t sentinel_default_failover_timeout = 60*3*1000;
|
||||
|
||||
/* The link to a sentinelRedisInstance. When we have the same set of Sentinels
|
||||
* monitoring many masters, we have different instances representing the
|
||||
* same Sentinels, one per master, and we need to share the hiredis connections
|
||||
* same Sentinels, one per master, and we need to share the hiredict connections
|
||||
* among them. Otherwise if 5 Sentinels are monitoring 100 masters we create
|
||||
* 500 outgoing connections instead of 5.
|
||||
*
|
||||
* So this structure represents a reference counted link in terms of the two
|
||||
* hiredis connections for commands and Pub/Sub, and the fields needed for
|
||||
* hiredict connections for commands and Pub/Sub, and the fields needed for
|
||||
* failure detection, since the ping/pong time are now local to the link: if
|
||||
* the link is available, the instance is available. This way we don't just
|
||||
* have 5 connections instead of 500, we also send 5 pings instead of 500.
|
||||
@ -133,8 +133,8 @@ typedef struct instanceLink {
|
||||
int refcount; /* Number of sentinelRedisInstance owners. */
|
||||
int disconnected; /* Non-zero if we need to reconnect cc or pc. */
|
||||
int pending_commands; /* Number of commands sent waiting for a reply. */
|
||||
redisAsyncContext *cc; /* Hiredis context for commands. */
|
||||
redisAsyncContext *pc; /* Hiredis context for Pub / Sub. */
|
||||
redictAsyncContext *cc; /* Hiredict context for commands. */
|
||||
redictAsyncContext *pc; /* Hiredict context for Pub / Sub. */
|
||||
mstime_t cc_conn_time; /* cc connection time. */
|
||||
mstime_t pc_conn_time; /* pc connection time. */
|
||||
mstime_t pc_last_activity; /* Last time we received any message. */
|
||||
@ -266,43 +266,43 @@ typedef struct sentinelScriptJob {
|
||||
pid_t pid; /* Script execution pid. */
|
||||
} sentinelScriptJob;
|
||||
|
||||
/* ======================= hiredis ae.c adapters =============================
|
||||
* Note: this implementation is taken from hiredis/adapters/ae.h, however
|
||||
/* ======================= hiredict ae.c adapters =============================
|
||||
* Note: this implementation is taken from hiredict/adapters/ae.h, however
|
||||
* we have our modified copy for Sentinel in order to use our allocator
|
||||
* and to have full control over how the adapter works. */
|
||||
|
||||
typedef struct redisAeEvents {
|
||||
redisAsyncContext *context;
|
||||
typedef struct redictAeEvents {
|
||||
redictAsyncContext *context;
|
||||
aeEventLoop *loop;
|
||||
int fd;
|
||||
int reading, writing;
|
||||
} redisAeEvents;
|
||||
} redictAeEvents;
|
||||
|
||||
static void redisAeReadEvent(aeEventLoop *el, int fd, void *privdata, int mask) {
|
||||
static void redictAeReadEvent(aeEventLoop *el, int fd, void *privdata, int mask) {
|
||||
((void)el); ((void)fd); ((void)mask);
|
||||
|
||||
redisAeEvents *e = (redisAeEvents*)privdata;
|
||||
redisAsyncHandleRead(e->context);
|
||||
redictAeEvents *e = (redictAeEvents*)privdata;
|
||||
redictAsyncHandleRead(e->context);
|
||||
}
|
||||
|
||||
static void redisAeWriteEvent(aeEventLoop *el, int fd, void *privdata, int mask) {
|
||||
static void redictAeWriteEvent(aeEventLoop *el, int fd, void *privdata, int mask) {
|
||||
((void)el); ((void)fd); ((void)mask);
|
||||
|
||||
redisAeEvents *e = (redisAeEvents*)privdata;
|
||||
redisAsyncHandleWrite(e->context);
|
||||
redictAeEvents *e = (redictAeEvents*)privdata;
|
||||
redictAsyncHandleWrite(e->context);
|
||||
}
|
||||
|
||||
static void redisAeAddRead(void *privdata) {
|
||||
redisAeEvents *e = (redisAeEvents*)privdata;
|
||||
static void redictAeAddRead(void *privdata) {
|
||||
redictAeEvents *e = (redictAeEvents*)privdata;
|
||||
aeEventLoop *loop = e->loop;
|
||||
if (!e->reading) {
|
||||
e->reading = 1;
|
||||
aeCreateFileEvent(loop,e->fd,AE_READABLE,redisAeReadEvent,e);
|
||||
aeCreateFileEvent(loop,e->fd,AE_READABLE,redictAeReadEvent,e);
|
||||
}
|
||||
}
|
||||
|
||||
static void redisAeDelRead(void *privdata) {
|
||||
redisAeEvents *e = (redisAeEvents*)privdata;
|
||||
static void redictAeDelRead(void *privdata) {
|
||||
redictAeEvents *e = (redictAeEvents*)privdata;
|
||||
aeEventLoop *loop = e->loop;
|
||||
if (e->reading) {
|
||||
e->reading = 0;
|
||||
@ -310,17 +310,17 @@ static void redisAeDelRead(void *privdata) {
|
||||
}
|
||||
}
|
||||
|
||||
static void redisAeAddWrite(void *privdata) {
|
||||
redisAeEvents *e = (redisAeEvents*)privdata;
|
||||
static void redictAeAddWrite(void *privdata) {
|
||||
redictAeEvents *e = (redictAeEvents*)privdata;
|
||||
aeEventLoop *loop = e->loop;
|
||||
if (!e->writing) {
|
||||
e->writing = 1;
|
||||
aeCreateFileEvent(loop,e->fd,AE_WRITABLE,redisAeWriteEvent,e);
|
||||
aeCreateFileEvent(loop,e->fd,AE_WRITABLE,redictAeWriteEvent,e);
|
||||
}
|
||||
}
|
||||
|
||||
static void redisAeDelWrite(void *privdata) {
|
||||
redisAeEvents *e = (redisAeEvents*)privdata;
|
||||
static void redictAeDelWrite(void *privdata) {
|
||||
redictAeEvents *e = (redictAeEvents*)privdata;
|
||||
aeEventLoop *loop = e->loop;
|
||||
if (e->writing) {
|
||||
e->writing = 0;
|
||||
@ -328,34 +328,34 @@ static void redisAeDelWrite(void *privdata) {
|
||||
}
|
||||
}
|
||||
|
||||
static void redisAeCleanup(void *privdata) {
|
||||
redisAeEvents *e = (redisAeEvents*)privdata;
|
||||
redisAeDelRead(privdata);
|
||||
redisAeDelWrite(privdata);
|
||||
static void redictAeCleanup(void *privdata) {
|
||||
redictAeEvents *e = (redictAeEvents*)privdata;
|
||||
redictAeDelRead(privdata);
|
||||
redictAeDelWrite(privdata);
|
||||
zfree(e);
|
||||
}
|
||||
|
||||
static int redisAeAttach(aeEventLoop *loop, redisAsyncContext *ac) {
|
||||
redisContext *c = &(ac->c);
|
||||
redisAeEvents *e;
|
||||
static int redictAeAttach(aeEventLoop *loop, redictAsyncContext *ac) {
|
||||
redictContext *c = &(ac->c);
|
||||
redictAeEvents *e;
|
||||
|
||||
/* Nothing should be attached when something is already attached */
|
||||
if (ac->ev.data != NULL)
|
||||
return C_ERR;
|
||||
|
||||
/* Create container for context and r/w events */
|
||||
e = (redisAeEvents*)zmalloc(sizeof(*e));
|
||||
e = (redictAeEvents*)zmalloc(sizeof(*e));
|
||||
e->context = ac;
|
||||
e->loop = loop;
|
||||
e->fd = c->fd;
|
||||
e->reading = e->writing = 0;
|
||||
|
||||
/* Register functions to start/stop listening for events */
|
||||
ac->ev.addRead = redisAeAddRead;
|
||||
ac->ev.delRead = redisAeDelRead;
|
||||
ac->ev.addWrite = redisAeAddWrite;
|
||||
ac->ev.delWrite = redisAeDelWrite;
|
||||
ac->ev.cleanup = redisAeCleanup;
|
||||
ac->ev.addRead = redictAeAddRead;
|
||||
ac->ev.delRead = redictAeDelRead;
|
||||
ac->ev.addWrite = redictAeAddWrite;
|
||||
ac->ev.delWrite = redictAeDelWrite;
|
||||
ac->ev.cleanup = redictAeCleanup;
|
||||
ac->ev.data = e;
|
||||
|
||||
return C_OK;
|
||||
@ -363,20 +363,20 @@ static int redisAeAttach(aeEventLoop *loop, redisAsyncContext *ac) {
|
||||
|
||||
/* ============================= Prototypes ================================= */
|
||||
|
||||
void sentinelLinkEstablishedCallback(const redisAsyncContext *c, int status);
|
||||
void sentinelDisconnectCallback(const redisAsyncContext *c, int status);
|
||||
void sentinelReceiveHelloMessages(redisAsyncContext *c, void *reply, void *privdata);
|
||||
void sentinelLinkEstablishedCallback(const redictAsyncContext *c, int status);
|
||||
void sentinelDisconnectCallback(const redictAsyncContext *c, int status);
|
||||
void sentinelReceiveHelloMessages(redictAsyncContext *c, void *reply, void *privdata);
|
||||
sentinelRedisInstance *sentinelGetMasterByName(char *name);
|
||||
char *sentinelGetSubjectiveLeader(sentinelRedisInstance *master);
|
||||
char *sentinelGetObjectiveLeader(sentinelRedisInstance *master);
|
||||
void instanceLinkConnectionError(const redisAsyncContext *c);
|
||||
void instanceLinkConnectionError(const redictAsyncContext *c);
|
||||
const char *sentinelRedisInstanceTypeStr(sentinelRedisInstance *ri);
|
||||
void sentinelAbortFailover(sentinelRedisInstance *ri);
|
||||
void sentinelEvent(int level, char *type, sentinelRedisInstance *ri, const char *fmt, ...);
|
||||
sentinelRedisInstance *sentinelSelectSlave(sentinelRedisInstance *master);
|
||||
void sentinelScheduleScriptExecution(char *path, ...);
|
||||
void sentinelStartFailover(sentinelRedisInstance *master);
|
||||
void sentinelDiscardReplyCallback(redisAsyncContext *c, void *reply, void *privdata);
|
||||
void sentinelDiscardReplyCallback(redictAsyncContext *c, void *reply, void *privdata);
|
||||
int sentinelSendSlaveOf(sentinelRedisInstance *ri, const sentinelAddr *addr);
|
||||
char *sentinelVoteLeader(sentinelRedisInstance *master, uint64_t req_epoch, char *req_runid, uint64_t *leader_epoch);
|
||||
int sentinelFlushConfig(void);
|
||||
@ -442,7 +442,7 @@ void sentinelConfigSetCommand(client *c);
|
||||
|
||||
/* this array is used for sentinel config lookup, which need to be loaded
|
||||
* before monitoring masters config to avoid dependency issues */
|
||||
const char *preMonitorCfgName[] = {
|
||||
const char *preMonitorCfgName[] = {
|
||||
"announce-ip",
|
||||
"announce-port",
|
||||
"deny-scripts-reconfig",
|
||||
@ -594,7 +594,7 @@ int sentinelAddrEqualsHostname(sentinelAddr *a, char *hostname) {
|
||||
sentinel.resolve_hostnames ? ANET_NONE : ANET_IP_ONLY) == ANET_ERR) {
|
||||
|
||||
/* If failed resolve then compare based on hostnames. That is our best effort as
|
||||
* long as the server is unavailable for some reason. It is fine since Redis
|
||||
* long as the server is unavailable for some reason. It is fine since Redis
|
||||
* instance cannot have multiple hostnames for a given setup */
|
||||
return !strcasecmp(sentinel.resolve_hostnames ? a->hostname : a->ip, hostname);
|
||||
}
|
||||
@ -1015,8 +1015,8 @@ instanceLink *createInstanceLink(void) {
|
||||
return link;
|
||||
}
|
||||
|
||||
/* Disconnect a hiredis connection in the context of an instance link. */
|
||||
void instanceLinkCloseConnection(instanceLink *link, redisAsyncContext *c) {
|
||||
/* Disconnect a hiredict connection in the context of an instance link. */
|
||||
void instanceLinkCloseConnection(instanceLink *link, redictAsyncContext *c) {
|
||||
if (c == NULL) return;
|
||||
|
||||
if (link->cc == c) {
|
||||
@ -1026,7 +1026,7 @@ void instanceLinkCloseConnection(instanceLink *link, redisAsyncContext *c) {
|
||||
if (link->pc == c) link->pc = NULL;
|
||||
c->data = NULL;
|
||||
link->disconnected = 1;
|
||||
redisAsyncFree(c);
|
||||
redictAsyncFree(c);
|
||||
}
|
||||
|
||||
/* Decrement the refcount of a link object, if it drops to zero, actually
|
||||
@ -1034,7 +1034,7 @@ void instanceLinkCloseConnection(instanceLink *link, redisAsyncContext *c) {
|
||||
* to the object.
|
||||
*
|
||||
* If we are not going to free the link and ri is not NULL, we rebind all the
|
||||
* pending requests in link->cc (hiredis connection for commands) to a
|
||||
* pending requests in link->cc (hiredict connection for commands) to a
|
||||
* callback that will just ignore them. This is useful to avoid processing
|
||||
* replies for an instance that no longer exists. */
|
||||
instanceLink *releaseInstanceLink(instanceLink *link, sentinelRedisInstance *ri)
|
||||
@ -1043,13 +1043,13 @@ instanceLink *releaseInstanceLink(instanceLink *link, sentinelRedisInstance *ri)
|
||||
link->refcount--;
|
||||
if (link->refcount != 0) {
|
||||
if (ri && ri->link->cc) {
|
||||
/* This instance may have pending callbacks in the hiredis async
|
||||
/* This instance may have pending callbacks in the hiredict async
|
||||
* context, having as 'privdata' the instance that we are going to
|
||||
* free. Let's rewrite the callback list, directly exploiting
|
||||
* hiredis internal data structures, in order to bind them with
|
||||
* hiredict internal data structures, in order to bind them with
|
||||
* a callback that will ignore the reply at all. */
|
||||
redisCallback *cb;
|
||||
redisCallbackList *callbacks = &link->cc->replies;
|
||||
redictCallback *cb;
|
||||
redictCallbackList *callbacks = &link->cc->replies;
|
||||
|
||||
cb = callbacks->head;
|
||||
while(cb) {
|
||||
@ -1119,7 +1119,7 @@ void dropInstanceConnections(sentinelRedisInstance *ri) {
|
||||
/* Disconnect with the master. */
|
||||
instanceLinkCloseConnection(ri->link, ri->link->cc);
|
||||
instanceLinkCloseConnection(ri->link, ri->link->pc);
|
||||
|
||||
|
||||
/* Disconnect with all replicas. */
|
||||
dictIterator *di;
|
||||
dictEntry *de;
|
||||
@ -1204,13 +1204,13 @@ int sentinelUpdateSentinelAddressInAllMasters(sentinelRedisInstance *ri) {
|
||||
return reconfigured;
|
||||
}
|
||||
|
||||
/* This function is called when a hiredis connection reported an error.
|
||||
/* This function is called when a hiredict connection reported an error.
|
||||
* We set it to NULL and mark the link as disconnected so that it will be
|
||||
* reconnected again.
|
||||
*
|
||||
* Note: we don't free the hiredis context as hiredis will do it for us
|
||||
* Note: we don't free the hiredict context as hiredict will do it for us
|
||||
* for async connections. */
|
||||
void instanceLinkConnectionError(const redisAsyncContext *c) {
|
||||
void instanceLinkConnectionError(const redictAsyncContext *c) {
|
||||
instanceLink *link = c->data;
|
||||
int pubsub;
|
||||
|
||||
@ -1224,13 +1224,13 @@ void instanceLinkConnectionError(const redisAsyncContext *c) {
|
||||
link->disconnected = 1;
|
||||
}
|
||||
|
||||
/* Hiredis connection established / disconnected callbacks. We need them
|
||||
/* Hiredict connection established / disconnected callbacks. We need them
|
||||
* just to cleanup our link state. */
|
||||
void sentinelLinkEstablishedCallback(const redisAsyncContext *c, int status) {
|
||||
void sentinelLinkEstablishedCallback(const redictAsyncContext *c, int status) {
|
||||
if (status != C_OK) instanceLinkConnectionError(c);
|
||||
}
|
||||
|
||||
void sentinelDisconnectCallback(const redisAsyncContext *c, int status) {
|
||||
void sentinelDisconnectCallback(const redictAsyncContext *c, int status) {
|
||||
UNUSED(status);
|
||||
instanceLinkConnectionError(c);
|
||||
}
|
||||
@ -1351,7 +1351,7 @@ sentinelRedisInstance *createSentinelRedisInstance(char *name, int flags, char *
|
||||
return ri;
|
||||
}
|
||||
|
||||
/* Release this instance and all its slaves, sentinels, hiredis connections.
|
||||
/* Release this instance and all its slaves, sentinels, hiredict connections.
|
||||
* This function does not take care of unlinking the instance from the main
|
||||
* masters table (if it is a master) or from its master sentinels/slaves table
|
||||
* if it is a slave or sentinel. */
|
||||
@ -1580,10 +1580,10 @@ int sentinelResetMasterAndChangeAddress(sentinelRedisInstance *master, char *hos
|
||||
if (newaddr == NULL) return C_ERR;
|
||||
|
||||
/* There can be only 0 or 1 slave that has the newaddr.
|
||||
* and It can add old master 1 more slave.
|
||||
* and It can add old master 1 more slave.
|
||||
* so It allocates dictSize(master->slaves) + 1 */
|
||||
slaves = zmalloc(sizeof(sentinelAddr*)*(dictSize(master->slaves) + 1));
|
||||
|
||||
|
||||
/* Don't include the one having the address we are switching to. */
|
||||
di = dictGetIterator(master->slaves);
|
||||
while((de = dictNext(di)) != NULL) {
|
||||
@ -2226,14 +2226,14 @@ void rewriteConfigSentinelOption(struct rewriteConfigState *state) {
|
||||
line = sdscatprintf(sdsempty(), "sentinel sentinel-pass %s", sentinel.sentinel_auth_pass);
|
||||
rewriteConfigRewriteLine(state,"sentinel sentinel-pass",line,1);
|
||||
} else {
|
||||
rewriteConfigMarkAsProcessed(state,"sentinel sentinel-pass");
|
||||
rewriteConfigMarkAsProcessed(state,"sentinel sentinel-pass");
|
||||
}
|
||||
|
||||
dictReleaseIterator(di);
|
||||
|
||||
/* NOTE: the purpose here is in case due to the state change, the config rewrite
|
||||
does not handle the configs, however, previously the config was set in the config file,
|
||||
rewriteConfigMarkAsProcessed should be put here to mark it as processed in order to
|
||||
/* NOTE: the purpose here is in case due to the state change, the config rewrite
|
||||
does not handle the configs, however, previously the config was set in the config file,
|
||||
rewriteConfigMarkAsProcessed should be put here to mark it as processed in order to
|
||||
delete the old config entry.
|
||||
*/
|
||||
rewriteConfigMarkAsProcessed(state,"sentinel monitor");
|
||||
@ -2283,7 +2283,7 @@ static void sentinelFlushConfigAndReply(client *c) {
|
||||
addReply(c, shared.ok);
|
||||
}
|
||||
|
||||
/* ====================== hiredis connection handling ======================= */
|
||||
/* ====================== hiredict connection handling ======================= */
|
||||
|
||||
/* Send the AUTH command with the specified master password if needed.
|
||||
* Note that for slaves the password set for the master is used.
|
||||
@ -2297,7 +2297,7 @@ static void sentinelFlushConfigAndReply(client *c) {
|
||||
* We don't check at all if the command was successfully transmitted
|
||||
* to the instance as if it fails Sentinel will detect the instance down,
|
||||
* will disconnect and reconnect the link and so forth. */
|
||||
void sentinelSendAuthIfNeeded(sentinelRedisInstance *ri, redisAsyncContext *c) {
|
||||
void sentinelSendAuthIfNeeded(sentinelRedisInstance *ri, redictAsyncContext *c) {
|
||||
char *auth_pass = NULL;
|
||||
char *auth_user = NULL;
|
||||
|
||||
@ -2322,13 +2322,13 @@ void sentinelSendAuthIfNeeded(sentinelRedisInstance *ri, redisAsyncContext *c) {
|
||||
}
|
||||
|
||||
if (auth_pass && auth_user == NULL) {
|
||||
if (redisAsyncCommand(c, sentinelDiscardReplyCallback, ri, "%s %s",
|
||||
if (redictAsyncCommand(c, sentinelDiscardReplyCallback, ri, "%s %s",
|
||||
sentinelInstanceMapCommand(ri,"AUTH"),
|
||||
auth_pass) == C_OK) ri->link->pending_commands++;
|
||||
} else if (auth_pass && auth_user) {
|
||||
/* If we also have an username, use the ACL-style AUTH command
|
||||
* with two arguments, username and password. */
|
||||
if (redisAsyncCommand(c, sentinelDiscardReplyCallback, ri, "%s %s %s",
|
||||
if (redictAsyncCommand(c, sentinelDiscardReplyCallback, ri, "%s %s %s",
|
||||
sentinelInstanceMapCommand(ri,"AUTH"),
|
||||
auth_user, auth_pass) == C_OK) ri->link->pending_commands++;
|
||||
}
|
||||
@ -2340,11 +2340,11 @@ void sentinelSendAuthIfNeeded(sentinelRedisInstance *ri, redisAsyncContext *c) {
|
||||
*
|
||||
* This makes it possible to list all the sentinel instances connected
|
||||
* to a Redis server with CLIENT LIST, grepping for a specific name format. */
|
||||
void sentinelSetClientName(sentinelRedisInstance *ri, redisAsyncContext *c, char *type) {
|
||||
void sentinelSetClientName(sentinelRedisInstance *ri, redictAsyncContext *c, char *type) {
|
||||
char name[64];
|
||||
|
||||
snprintf(name,sizeof(name),"sentinel-%.8s-%s",sentinel.myid,type);
|
||||
if (redisAsyncCommand(c, sentinelDiscardReplyCallback, ri,
|
||||
if (redictAsyncCommand(c, sentinelDiscardReplyCallback, ri,
|
||||
"%s SETNAME %s",
|
||||
sentinelInstanceMapCommand(ri,"CLIENT"),
|
||||
name) == C_OK)
|
||||
@ -2353,13 +2353,13 @@ void sentinelSetClientName(sentinelRedisInstance *ri, redisAsyncContext *c, char
|
||||
}
|
||||
}
|
||||
|
||||
static int instanceLinkNegotiateTLS(redisAsyncContext *context) {
|
||||
static int instanceLinkNegotiateTLS(redictAsyncContext *context) {
|
||||
#if USE_OPENSSL == 1 /* BUILD_YES */
|
||||
if (!redict_tls_ctx) return C_ERR;
|
||||
SSL *ssl = SSL_new(redict_tls_client_ctx ? redict_tls_client_ctx : redict_tls_ctx);
|
||||
if (!ssl) return C_ERR;
|
||||
|
||||
if (redisInitiateSSL(&context->c, ssl) == REDIS_ERR) {
|
||||
if (redictInitiateSSL(&context->c, ssl) == REDICT_ERR) {
|
||||
SSL_free(ssl);
|
||||
return C_ERR;
|
||||
}
|
||||
@ -2397,7 +2397,7 @@ void sentinelReconnectInstance(sentinelRedisInstance *ri) {
|
||||
}
|
||||
}
|
||||
|
||||
link->cc = redisAsyncConnectBind(ri->addr->ip,ri->addr->port,server.bind_source_addr);
|
||||
link->cc = redictAsyncConnectBind(ri->addr->ip,ri->addr->port,server.bind_source_addr);
|
||||
|
||||
if (link->cc && !link->cc->err) anetCloexec(link->cc->c.fd);
|
||||
if (!link->cc) {
|
||||
@ -2414,10 +2414,10 @@ void sentinelReconnectInstance(sentinelRedisInstance *ri) {
|
||||
link->pending_commands = 0;
|
||||
link->cc_conn_time = mstime();
|
||||
link->cc->data = link;
|
||||
redisAeAttach(server.el,link->cc);
|
||||
redisAsyncSetConnectCallback(link->cc,
|
||||
redictAeAttach(server.el,link->cc);
|
||||
redictAsyncSetConnectCallback(link->cc,
|
||||
sentinelLinkEstablishedCallback);
|
||||
redisAsyncSetDisconnectCallback(link->cc,
|
||||
redictAsyncSetDisconnectCallback(link->cc,
|
||||
sentinelDisconnectCallback);
|
||||
sentinelSendAuthIfNeeded(ri,link->cc);
|
||||
sentinelSetClientName(ri,link->cc,"cmd");
|
||||
@ -2428,7 +2428,7 @@ void sentinelReconnectInstance(sentinelRedisInstance *ri) {
|
||||
}
|
||||
/* Pub / Sub */
|
||||
if ((ri->flags & (SRI_MASTER|SRI_SLAVE)) && link->pc == NULL) {
|
||||
link->pc = redisAsyncConnectBind(ri->addr->ip,ri->addr->port,server.bind_source_addr);
|
||||
link->pc = redictAsyncConnectBind(ri->addr->ip,ri->addr->port,server.bind_source_addr);
|
||||
if (link->pc && !link->pc->err) anetCloexec(link->pc->c.fd);
|
||||
if (!link->pc) {
|
||||
sentinelEvent(LL_DEBUG,"-pubsub-link-reconnection",ri,"%@ #Failed to establish connection");
|
||||
@ -2443,15 +2443,15 @@ void sentinelReconnectInstance(sentinelRedisInstance *ri) {
|
||||
int retval;
|
||||
link->pc_conn_time = mstime();
|
||||
link->pc->data = link;
|
||||
redisAeAttach(server.el,link->pc);
|
||||
redisAsyncSetConnectCallback(link->pc,
|
||||
redictAeAttach(server.el,link->pc);
|
||||
redictAsyncSetConnectCallback(link->pc,
|
||||
sentinelLinkEstablishedCallback);
|
||||
redisAsyncSetDisconnectCallback(link->pc,
|
||||
redictAsyncSetDisconnectCallback(link->pc,
|
||||
sentinelDisconnectCallback);
|
||||
sentinelSendAuthIfNeeded(ri,link->pc);
|
||||
sentinelSetClientName(ri,link->pc,"pubsub");
|
||||
/* Now we subscribe to the Sentinels "Hello" channel. */
|
||||
retval = redisAsyncCommand(link->pc,
|
||||
retval = redictAsyncCommand(link->pc,
|
||||
sentinelReceiveHelloMessages, ri, "%s %s",
|
||||
sentinelInstanceMapCommand(ri,"SUBSCRIBE"),
|
||||
SENTINEL_HELLO_CHANNEL);
|
||||
@ -2742,10 +2742,10 @@ void sentinelRefreshInstanceInfo(sentinelRedisInstance *ri, const char *info) {
|
||||
}
|
||||
}
|
||||
|
||||
void sentinelInfoReplyCallback(redisAsyncContext *c, void *reply, void *privdata) {
|
||||
void sentinelInfoReplyCallback(redictAsyncContext *c, void *reply, void *privdata) {
|
||||
sentinelRedisInstance *ri = privdata;
|
||||
instanceLink *link = c->data;
|
||||
redisReply *r;
|
||||
redictReply *r;
|
||||
|
||||
if (!reply || !link) return;
|
||||
link->pending_commands--;
|
||||
@ -2753,13 +2753,13 @@ void sentinelInfoReplyCallback(redisAsyncContext *c, void *reply, void *privdata
|
||||
|
||||
/* INFO reply type is verbatim in resp3. Normally, sentinel will not use
|
||||
* resp3 but this is required for testing (see logreqres.c). */
|
||||
if (r->type == REDIS_REPLY_STRING || r->type == REDIS_REPLY_VERB)
|
||||
if (r->type == REDICT_REPLY_STRING || r->type == REDICT_REPLY_VERB)
|
||||
sentinelRefreshInstanceInfo(ri,r->str);
|
||||
}
|
||||
|
||||
/* Just discard the reply. We use this when we are not monitoring the return
|
||||
* value of the command but its effects directly. */
|
||||
void sentinelDiscardReplyCallback(redisAsyncContext *c, void *reply, void *privdata) {
|
||||
void sentinelDiscardReplyCallback(redictAsyncContext *c, void *reply, void *privdata) {
|
||||
instanceLink *link = c->data;
|
||||
UNUSED(reply);
|
||||
UNUSED(privdata);
|
||||
@ -2767,17 +2767,17 @@ void sentinelDiscardReplyCallback(redisAsyncContext *c, void *reply, void *privd
|
||||
if (link) link->pending_commands--;
|
||||
}
|
||||
|
||||
void sentinelPingReplyCallback(redisAsyncContext *c, void *reply, void *privdata) {
|
||||
void sentinelPingReplyCallback(redictAsyncContext *c, void *reply, void *privdata) {
|
||||
sentinelRedisInstance *ri = privdata;
|
||||
instanceLink *link = c->data;
|
||||
redisReply *r;
|
||||
redictReply *r;
|
||||
|
||||
if (!reply || !link) return;
|
||||
link->pending_commands--;
|
||||
r = reply;
|
||||
|
||||
if (r->type == REDIS_REPLY_STATUS ||
|
||||
r->type == REDIS_REPLY_ERROR) {
|
||||
if (r->type == REDICT_REPLY_STATUS ||
|
||||
r->type == REDICT_REPLY_ERROR) {
|
||||
/* Update the "instance available" field only if this is an
|
||||
* acceptable reply. */
|
||||
if (strncmp(r->str,"PONG",4) == 0 ||
|
||||
@ -2797,7 +2797,7 @@ void sentinelPingReplyCallback(redisAsyncContext *c, void *reply, void *privdata
|
||||
(ri->flags & SRI_S_DOWN) &&
|
||||
!(ri->flags & SRI_SCRIPT_KILL_SENT))
|
||||
{
|
||||
if (redisAsyncCommand(ri->link->cc,
|
||||
if (redictAsyncCommand(ri->link->cc,
|
||||
sentinelDiscardReplyCallback, ri,
|
||||
"%s KILL",
|
||||
sentinelInstanceMapCommand(ri,"SCRIPT")) == C_OK)
|
||||
@ -2813,10 +2813,10 @@ void sentinelPingReplyCallback(redisAsyncContext *c, void *reply, void *privdata
|
||||
|
||||
/* This is called when we get the reply about the PUBLISH command we send
|
||||
* to the master to advertise this sentinel. */
|
||||
void sentinelPublishReplyCallback(redisAsyncContext *c, void *reply, void *privdata) {
|
||||
void sentinelPublishReplyCallback(redictAsyncContext *c, void *reply, void *privdata) {
|
||||
sentinelRedisInstance *ri = privdata;
|
||||
instanceLink *link = c->data;
|
||||
redisReply *r;
|
||||
redictReply *r;
|
||||
|
||||
if (!reply || !link) return;
|
||||
link->pending_commands--;
|
||||
@ -2824,7 +2824,7 @@ void sentinelPublishReplyCallback(redisAsyncContext *c, void *reply, void *privd
|
||||
|
||||
/* Only update pub_time if we actually published our message. Otherwise
|
||||
* we'll retry again in 100 milliseconds. */
|
||||
if (r->type != REDIS_REPLY_ERROR)
|
||||
if (r->type != REDICT_REPLY_ERROR)
|
||||
ri->last_pub_time = mstime();
|
||||
}
|
||||
|
||||
@ -2950,9 +2950,9 @@ cleanup:
|
||||
|
||||
/* This is our Pub/Sub callback for the Hello channel. It's useful in order
|
||||
* to discover other sentinels attached at the same master. */
|
||||
void sentinelReceiveHelloMessages(redisAsyncContext *c, void *reply, void *privdata) {
|
||||
void sentinelReceiveHelloMessages(redictAsyncContext *c, void *reply, void *privdata) {
|
||||
sentinelRedisInstance *ri = privdata;
|
||||
redisReply *r;
|
||||
redictReply *r;
|
||||
UNUSED(c);
|
||||
|
||||
if (!reply || !ri) return;
|
||||
@ -2967,11 +2967,11 @@ void sentinelReceiveHelloMessages(redisAsyncContext *c, void *reply, void *privd
|
||||
* can avoid to check for details.
|
||||
* Note: Reply type is PUSH in resp3. Normally, sentinel will not use
|
||||
* resp3 but this is required for testing (see logreqres.c). */
|
||||
if ((r->type != REDIS_REPLY_ARRAY && r->type != REDIS_REPLY_PUSH) ||
|
||||
if ((r->type != REDICT_REPLY_ARRAY && r->type != REDICT_REPLY_PUSH) ||
|
||||
r->elements != 3 ||
|
||||
r->element[0]->type != REDIS_REPLY_STRING ||
|
||||
r->element[1]->type != REDIS_REPLY_STRING ||
|
||||
r->element[2]->type != REDIS_REPLY_STRING ||
|
||||
r->element[0]->type != REDICT_REPLY_STRING ||
|
||||
r->element[1]->type != REDICT_REPLY_STRING ||
|
||||
r->element[2]->type != REDICT_REPLY_STRING ||
|
||||
strcmp(r->element[0]->str,"message") != 0) return;
|
||||
|
||||
/* We are not interested in meeting ourselves */
|
||||
@ -3024,7 +3024,7 @@ int sentinelSendHello(sentinelRedisInstance *ri) {
|
||||
/* --- */
|
||||
master->name,announceSentinelAddr(master_addr),master_addr->port,
|
||||
(unsigned long long) master->config_epoch);
|
||||
retval = redisAsyncCommand(ri->link->cc,
|
||||
retval = redictAsyncCommand(ri->link->cc,
|
||||
sentinelPublishReplyCallback, ri, "%s %s %s",
|
||||
sentinelInstanceMapCommand(ri,"PUBLISH"),
|
||||
SENTINEL_HELLO_CHANNEL,payload);
|
||||
@ -3071,7 +3071,7 @@ int sentinelForceHelloUpdateForMaster(sentinelRedisInstance *master) {
|
||||
* On error zero is returned, and we can't consider the PING command
|
||||
* queued in the connection. */
|
||||
int sentinelSendPing(sentinelRedisInstance *ri) {
|
||||
int retval = redisAsyncCommand(ri->link->cc,
|
||||
int retval = redictAsyncCommand(ri->link->cc,
|
||||
sentinelPingReplyCallback, ri, "%s",
|
||||
sentinelInstanceMapCommand(ri,"PING"));
|
||||
if (retval == C_OK) {
|
||||
@ -3136,7 +3136,7 @@ void sentinelSendPeriodicCommands(sentinelRedisInstance *ri) {
|
||||
(ri->info_refresh == 0 ||
|
||||
(now - ri->info_refresh) > info_period))
|
||||
{
|
||||
retval = redisAsyncCommand(ri->link->cc,
|
||||
retval = redictAsyncCommand(ri->link->cc,
|
||||
sentinelInfoReplyCallback, ri, "%s",
|
||||
sentinelInstanceMapCommand(ri,"INFO"));
|
||||
if (retval == C_OK) ri->link->pending_commands++;
|
||||
@ -4207,7 +4207,7 @@ NULL
|
||||
addReplySentinelDebugInfo(c);
|
||||
else
|
||||
sentinelSetDebugConfigParameters(c);
|
||||
}
|
||||
}
|
||||
else {
|
||||
addReplySubcommandSyntaxError(c);
|
||||
}
|
||||
@ -4561,7 +4561,7 @@ void sentinelCheckSubjectivelyDown(sentinelRedisInstance *ri) {
|
||||
ri->role_reported == SRI_SLAVE &&
|
||||
mstime() - ri->role_reported_time >
|
||||
(ri->down_after_period+sentinel_info_period*2)) ||
|
||||
(ri->flags & SRI_MASTER_REBOOT &&
|
||||
(ri->flags & SRI_MASTER_REBOOT &&
|
||||
mstime()-ri->master_reboot_since_time > ri->master_reboot_down_after_period))
|
||||
{
|
||||
/* Is subjectively down */
|
||||
@ -4622,10 +4622,10 @@ void sentinelCheckObjectivelyDown(sentinelRedisInstance *master) {
|
||||
|
||||
/* Receive the SENTINEL is-master-down-by-addr reply, see the
|
||||
* sentinelAskMasterStateToOtherSentinels() function for more information. */
|
||||
void sentinelReceiveIsMasterDownReply(redisAsyncContext *c, void *reply, void *privdata) {
|
||||
void sentinelReceiveIsMasterDownReply(redictAsyncContext *c, void *reply, void *privdata) {
|
||||
sentinelRedisInstance *ri = privdata;
|
||||
instanceLink *link = c->data;
|
||||
redisReply *r;
|
||||
redictReply *r;
|
||||
|
||||
if (!reply || !link) return;
|
||||
link->pending_commands--;
|
||||
@ -4634,10 +4634,10 @@ void sentinelReceiveIsMasterDownReply(redisAsyncContext *c, void *reply, void *p
|
||||
/* Ignore every error or unexpected reply.
|
||||
* Note that if the command returns an error for any reason we'll
|
||||
* end clearing the SRI_MASTER_DOWN flag for timeout anyway. */
|
||||
if (r->type == REDIS_REPLY_ARRAY && r->elements == 3 &&
|
||||
r->element[0]->type == REDIS_REPLY_INTEGER &&
|
||||
r->element[1]->type == REDIS_REPLY_STRING &&
|
||||
r->element[2]->type == REDIS_REPLY_INTEGER)
|
||||
if (r->type == REDICT_REPLY_ARRAY && r->elements == 3 &&
|
||||
r->element[0]->type == REDICT_REPLY_INTEGER &&
|
||||
r->element[1]->type == REDICT_REPLY_STRING &&
|
||||
r->element[2]->type == REDICT_REPLY_INTEGER)
|
||||
{
|
||||
ri->last_master_down_reply_time = mstime();
|
||||
if (r->element[0]->integer == 1) {
|
||||
@ -4696,7 +4696,7 @@ void sentinelAskMasterStateToOtherSentinels(sentinelRedisInstance *master, int f
|
||||
|
||||
/* Ask */
|
||||
ll2string(port,sizeof(port),master->addr->port);
|
||||
retval = redisAsyncCommand(ri->link->cc,
|
||||
retval = redictAsyncCommand(ri->link->cc,
|
||||
sentinelReceiveIsMasterDownReply, ri,
|
||||
"%s is-master-down-by-addr %s %s %llu %s",
|
||||
sentinelInstanceMapCommand(ri,"SENTINEL"),
|
||||
@ -4879,20 +4879,20 @@ int sentinelSendSlaveOf(sentinelRedisInstance *ri, const sentinelAddr *addr) {
|
||||
*
|
||||
* Note that we don't check the replies returned by commands, since we
|
||||
* will observe instead the effects in the next INFO output. */
|
||||
retval = redisAsyncCommand(ri->link->cc,
|
||||
retval = redictAsyncCommand(ri->link->cc,
|
||||
sentinelDiscardReplyCallback, ri, "%s",
|
||||
sentinelInstanceMapCommand(ri,"MULTI"));
|
||||
if (retval == C_ERR) return retval;
|
||||
ri->link->pending_commands++;
|
||||
|
||||
retval = redisAsyncCommand(ri->link->cc,
|
||||
retval = redictAsyncCommand(ri->link->cc,
|
||||
sentinelDiscardReplyCallback, ri, "%s %s %s",
|
||||
sentinelInstanceMapCommand(ri,"SLAVEOF"),
|
||||
host, portstr);
|
||||
if (retval == C_ERR) return retval;
|
||||
ri->link->pending_commands++;
|
||||
|
||||
retval = redisAsyncCommand(ri->link->cc,
|
||||
retval = redictAsyncCommand(ri->link->cc,
|
||||
sentinelDiscardReplyCallback, ri, "%s REWRITE",
|
||||
sentinelInstanceMapCommand(ri,"CONFIG"));
|
||||
if (retval == C_ERR) return retval;
|
||||
@ -4904,7 +4904,7 @@ int sentinelSendSlaveOf(sentinelRedisInstance *ri, const sentinelAddr *addr) {
|
||||
* recognized as a syntax error, and the transaction will not fail (but
|
||||
* only the unsupported command will fail). */
|
||||
for (int type = 0; type < 2; type++) {
|
||||
retval = redisAsyncCommand(ri->link->cc,
|
||||
retval = redictAsyncCommand(ri->link->cc,
|
||||
sentinelDiscardReplyCallback, ri, "%s KILL TYPE %s",
|
||||
sentinelInstanceMapCommand(ri,"CLIENT"),
|
||||
type == 0 ? "normal" : "pubsub");
|
||||
@ -4912,7 +4912,7 @@ int sentinelSendSlaveOf(sentinelRedisInstance *ri, const sentinelAddr *addr) {
|
||||
ri->link->pending_commands++;
|
||||
}
|
||||
|
||||
retval = redisAsyncCommand(ri->link->cc,
|
||||
retval = redictAsyncCommand(ri->link->cc,
|
||||
sentinelDiscardReplyCallback, ri, "%s",
|
||||
sentinelInstanceMapCommand(ri,"EXEC"));
|
||||
if (retval == C_ERR) return retval;
|
||||
|
Loading…
Reference in New Issue
Block a user