Merge branch 'unstable' of github.com:/antirez/redis into unstable

This commit is contained in:
antirez 2020-02-06 11:24:22 +01:00
commit 50d4326e3b
18 changed files with 187 additions and 217 deletions

View File

@ -35,6 +35,11 @@ It is as simple as:
% make
To build with TLS support, you'll need OpenSSL development libraries (e.g.
libssl-dev on Debian/Ubuntu) and run:
% make BUILD_TLS=yes
You can run a 32 bit Redis binary using:
% make 32bit
@ -43,6 +48,13 @@ After building Redis, it is a good idea to test it using:
% make test
If TLS is built, running the tests with TLS enabled (you will need `tcl-tls`
installed):
% ./utils/gen-test-certs.sh
% ./runtest --tls
Fixing build problems with dependencies or cached build options
---------
@ -125,6 +137,12 @@ as options using the command line. Examples:
All the options in redis.conf are also supported as options using the command
line, with exactly the same name.
Running Redis with TLS:
------------------
Please consult the [TLS.md](TLS.md) file for more information on
how to use Redis with TLS.
Playing with Redis
------------------

45
TLS.md
View File

@ -1,8 +1,5 @@
TLS Support -- Work In Progress
===============================
This is a brief note to capture current thoughts/ideas and track pending action
items.
TLS Support
===========
Getting Started
---------------
@ -69,37 +66,23 @@ probably not be so hard. For cluster keys migration it might be more difficult,
but there are probably other good reasons to improve that part anyway.
To-Do List
==========
----------
Additional TLS Features
-----------------------
- [ ] Add session caching support. Check if/how it's handled by clients to
assess how useful/important it is.
- [ ] redis-benchmark support. The current implementation is a mix of using
hiredis for parsing and basic networking (establishing connections), but
directly manipulating sockets for most actions. This will need to be cleaned
up for proper TLS support. The best approach is probably to migrate to hiredis
async mode.
- [ ] redis-cli `--slave` and `--rdb` support.
1. Add metrics to INFO?
2. Add session caching support. Check if/how it's handled by clients to assess
how useful/important it is.
redis-benchmark
---------------
The current implementation is a mix of using hiredis for parsing and basic
networking (establishing connections), but directly manipulating sockets for
most actions.
This will need to be cleaned up for proper TLS support. The best approach is
probably to migrate to hiredis async mode.
redis-cli
---------
1. Add support for TLS in --slave and --rdb modes.
Others
------
Multi-port
----------
Consider the implications of allowing TLS to be configured on a separate port,
making Redis listening on multiple ports.
making Redis listening on multiple ports:
This impacts many things, like
1. Startup banner port notification
2. Proctitle
3. How slaves announce themselves

View File

@ -155,23 +155,22 @@ tcp-keepalive 300
# tls-ca-cert-file ca.crt
# tls-ca-cert-dir /etc/ssl/certs
# If TLS/SSL clients are required to authenticate using a client side
# certificate, use this directive.
# By default, clients (including replica servers) on a TLS port are required
# to authenticate using valid client side certificates.
#
# Note: this applies to all incoming clients, including replicas.
# It is possible to disable authentication using this directive.
#
# tls-auth-clients yes
# tls-auth-clients no
# If TLS/SSL should be used when connecting as a replica to a master, enable
# this configuration directive:
# By default, a Redis replica does not attempt to establish a TLS connection
# with its master.
#
# Use the following directive to enable TLS on replication links.
#
# tls-replication yes
# If TLS/SSL should be used for the Redis Cluster bus, enable this configuration
# directive.
#
# NOTE: If TLS/SSL is enabled for Cluster Bus, mutual authentication is always
# enforced.
# By default, the Redis Cluster bus uses a plain TCP connection. To enable
# TLS for the bus protocol, use the following directive:
#
# tls-cluster yes
@ -1362,7 +1361,11 @@ latency-monitor-threshold 0
# z Sorted set commands
# x Expired events (events generated every time a key expires)
# e Evicted events (events generated when a key is evicted for maxmemory)
# A Alias for g$lshzxe, so that the "AKE" string means all the events.
# t Stream commands
# m Key-miss events (Note: It is not included in the 'A' class)
# A Alias for g$lshzxet, so that the "AKE" string means all the events
# (Except key-miss events which are excluded from 'A' due to their
# unique nature).
#
# The "notify-keyspace-events" takes as argument a string that is composed
# of zero or multiple characters. The empty string means that notifications

View File

@ -1592,6 +1592,7 @@ void addACLLogEntry(client *c, int reason, int keypos, sds username) {
/* ACL -- show and modify the configuration of ACL users.
* ACL HELP
* ACL LOAD
* ACL SAVE
* ACL LIST
* ACL USERS
* ACL CAT [<category>]
@ -1855,6 +1856,7 @@ void aclCommand(client *c) {
} else if (!strcasecmp(sub,"help")) {
const char *help[] = {
"LOAD -- Reload users from the ACL file.",
"SAVE -- Save the current config to the ACL file."
"LIST -- Show user details in config file format.",
"USERS -- List all the registered usernames.",
"SETUSER <username> [attribs ...] -- Create or modify a user.",

View File

@ -242,6 +242,7 @@ void stopAppendOnly(void) {
server.aof_fd = -1;
server.aof_selected_db = -1;
server.aof_state = AOF_OFF;
server.aof_rewrite_scheduled = 0;
killAppendOnlyChild();
}

View File

@ -190,8 +190,9 @@ typedef struct typeInterface {
void (*init)(typeData data);
/* Called on server start, should return 1 on success, 0 on error and should set err */
int (*load)(typeData data, sds *argc, int argv, char **err);
/* Called on CONFIG SET, returns 1 on success, 0 on error */
int (*set)(typeData data, sds value, char **err);
/* Called on server startup and CONFIG SET, returns 1 on success, 0 on error
* and can set a verbose err string, update is true when called from CONFIG SET */
int (*set)(typeData data, sds value, int update, char **err);
/* Called on CONFIG GET, required to add output to the client */
void (*get)(client *c, typeData data);
/* Called on CONFIG REWRITE, required to rewrite the config state */
@ -323,7 +324,11 @@ void loadServerConfigFromString(char *config) {
if ((!strcasecmp(argv[0],config->name) ||
(config->alias && !strcasecmp(argv[0],config->alias))))
{
if (!config->interface.load(config->data, argv, argc, &err)) {
if (argc != 2) {
err = "wrong number of arguments";
goto loaderr;
}
if (!config->interface.set(config->data, argv[1], 0, &err)) {
goto loaderr;
}
@ -344,6 +349,10 @@ void loadServerConfigFromString(char *config) {
if (addresses > CONFIG_BINDADDR_MAX) {
err = "Too many bind addresses specified"; goto loaderr;
}
/* Free old bind addresses */
for (j = 0; j < server.bindaddr_count; j++) {
zfree(server.bindaddr[j]);
}
for (j = 0; j < addresses; j++)
server.bindaddr[j] = zstrdup(argv[j+1]);
server.bindaddr_count = addresses;
@ -599,7 +608,7 @@ void configSetCommand(client *c) {
if(config->modifiable && (!strcasecmp(c->argv[2]->ptr,config->name) ||
(config->alias && !strcasecmp(c->argv[2]->ptr,config->alias))))
{
if (!config->interface.set(config->data,o->ptr, &errstr)) {
if (!config->interface.set(config->data,o->ptr,1,&errstr)) {
goto badfmt;
}
addReply(c,shared.ok);
@ -1536,9 +1545,8 @@ static char loadbuf[LOADBUF_SIZE];
.alias = (config_alias), \
.modifiable = (is_modifiable),
#define embedConfigInterface(initfn, loadfn, setfn, getfn, rewritefn) .interface = { \
#define embedConfigInterface(initfn, setfn, getfn, rewritefn) .interface = { \
.init = (initfn), \
.load = (loadfn), \
.set = (setfn), \
.get = (getfn), \
.rewrite = (rewritefn) \
@ -1561,30 +1569,17 @@ static void boolConfigInit(typeData data) {
*data.yesno.config = data.yesno.default_value;
}
static int boolConfigLoad(typeData data, sds *argv, int argc, char **err) {
int yn;
if (argc != 2) {
*err = "wrong number of arguments";
return 0;
}
if ((yn = yesnotoi(argv[1])) == -1) {
static int boolConfigSet(typeData data, sds value, int update, char **err) {
int yn = yesnotoi(value);
if (yn == -1) {
*err = "argument must be 'yes' or 'no'";
return 0;
}
if (data.yesno.is_valid_fn && !data.yesno.is_valid_fn(yn, err))
return 0;
*data.yesno.config = yn;
return 1;
}
static int boolConfigSet(typeData data, sds value, char **err) {
int yn = yesnotoi(value);
if (yn == -1) return 0;
if (data.yesno.is_valid_fn && !data.yesno.is_valid_fn(yn, err))
return 0;
int prev = *(data.yesno.config);
*(data.yesno.config) = yn;
if (data.yesno.update_fn && !data.yesno.update_fn(yn, prev, err)) {
if (update && data.yesno.update_fn && !data.yesno.update_fn(yn, prev, err)) {
*(data.yesno.config) = prev;
return 0;
}
@ -1601,7 +1596,7 @@ static void boolConfigRewrite(typeData data, const char *name, struct rewriteCon
#define createBoolConfig(name, alias, modifiable, config_addr, default, is_valid, update) { \
embedCommonConfig(name, alias, modifiable) \
embedConfigInterface(boolConfigInit, boolConfigLoad, boolConfigSet, boolConfigGet, boolConfigRewrite) \
embedConfigInterface(boolConfigInit, boolConfigSet, boolConfigGet, boolConfigRewrite) \
.data.yesno = { \
.config = &(config_addr), \
.default_value = (default), \
@ -1619,23 +1614,7 @@ static void stringConfigInit(typeData data) {
}
}
static int stringConfigLoad(typeData data, sds *argv, int argc, char **err) {
if (argc != 2) {
*err = "wrong number of arguments";
return 0;
}
if (data.string.is_valid_fn && !data.string.is_valid_fn(argv[1], err))
return 0;
zfree(*data.string.config);
if (data.string.convert_empty_to_null) {
*data.string.config = argv[1][0] ? zstrdup(argv[1]) : NULL;
} else {
*data.string.config = zstrdup(argv[1]);
}
return 1;
}
static int stringConfigSet(typeData data, sds value, char **err) {
static int stringConfigSet(typeData data, sds value, int update, char **err) {
if (data.string.is_valid_fn && !data.string.is_valid_fn(value, err))
return 0;
char *prev = *data.string.config;
@ -1644,7 +1623,7 @@ static int stringConfigSet(typeData data, sds value, char **err) {
} else {
*data.string.config = zstrdup(value);
}
if (data.string.update_fn && !data.string.update_fn(*data.string.config, prev, err)) {
if (update && data.string.update_fn && !data.string.update_fn(*data.string.config, prev, err)) {
zfree(*data.string.config);
*data.string.config = prev;
return 0;
@ -1666,7 +1645,7 @@ static void stringConfigRewrite(typeData data, const char *name, struct rewriteC
#define createStringConfig(name, alias, modifiable, empty_to_null, config_addr, default, is_valid, update) { \
embedCommonConfig(name, alias, modifiable) \
embedConfigInterface(stringConfigInit, stringConfigLoad, stringConfigSet, stringConfigGet, stringConfigRewrite) \
embedConfigInterface(stringConfigInit, stringConfigSet, stringConfigGet, stringConfigRewrite) \
.data.string = { \
.config = &(config_addr), \
.default_value = (default), \
@ -1677,17 +1656,12 @@ static void stringConfigRewrite(typeData data, const char *name, struct rewriteC
}
/* Enum configs */
static void configEnumInit(typeData data) {
static void enumConfigInit(typeData data) {
*data.enumd.config = data.enumd.default_value;
}
static int configEnumLoad(typeData data, sds *argv, int argc, char **err) {
if (argc != 2) {
*err = "wrong number of arguments";
return 0;
}
int enumval = configEnumGetValue(data.enumd.enum_value, argv[1]);
static int enumConfigSet(typeData data, sds value, int update, char **err) {
int enumval = configEnumGetValue(data.enumd.enum_value, value);
if (enumval == INT_MIN) {
sds enumerr = sdsnew("argument must be one of the following: ");
configEnum *enumNode = data.enumd.enum_value;
@ -1707,37 +1681,28 @@ static int configEnumLoad(typeData data, sds *argv, int argc, char **err) {
*err = loadbuf;
return 0;
}
if (data.enumd.is_valid_fn && !data.enumd.is_valid_fn(enumval, err))
return 0;
*(data.enumd.config) = enumval;
return 1;
}
static int configEnumSet(typeData data, sds value, char **err) {
int enumval = configEnumGetValue(data.enumd.enum_value, value);
if (enumval == INT_MIN) return 0;
if (data.enumd.is_valid_fn && !data.enumd.is_valid_fn(enumval, err))
return 0;
int prev = *(data.enumd.config);
*(data.enumd.config) = enumval;
if (data.enumd.update_fn && !data.enumd.update_fn(enumval, prev, err)) {
if (update && data.enumd.update_fn && !data.enumd.update_fn(enumval, prev, err)) {
*(data.enumd.config) = prev;
return 0;
}
return 1;
}
static void configEnumGet(client *c, typeData data) {
static void enumConfigGet(client *c, typeData data) {
addReplyBulkCString(c, configEnumGetNameOrUnknown(data.enumd.enum_value,*data.enumd.config));
}
static void configEnumRewrite(typeData data, const char *name, struct rewriteConfigState *state) {
static void enumConfigRewrite(typeData data, const char *name, struct rewriteConfigState *state) {
rewriteConfigEnumOption(state, name,*(data.enumd.config), data.enumd.enum_value, data.enumd.default_value);
}
#define createEnumConfig(name, alias, modifiable, enum, config_addr, default, is_valid, update) { \
embedCommonConfig(name, alias, modifiable) \
embedConfigInterface(configEnumInit, configEnumLoad, configEnumSet, configEnumGet, configEnumRewrite) \
embedConfigInterface(enumConfigInit, enumConfigSet, enumConfigGet, enumConfigRewrite) \
.data.enumd = { \
.config = &(config_addr), \
.default_value = (default), \
@ -1832,49 +1797,22 @@ static int numericBoundaryCheck(typeData data, long long ll, char **err) {
return 1;
}
static int numericConfigLoad(typeData data, sds *argv, int argc, char **err) {
long long ll;
if (argc != 2) {
*err = "wrong number of arguments";
return 0;
}
static int numericConfigSet(typeData data, sds value, int update, char **err) {
long long ll, prev = 0;
if (data.numeric.is_memory) {
int memerr;
ll = memtoll(argv[1], &memerr);
ll = memtoll(value, &memerr);
if (memerr || ll < 0) {
*err = "argument must be a memory value";
return 0;
}
} else {
if (!string2ll(argv[1], sdslen(argv[1]),&ll)) {
if (!string2ll(value, sdslen(value),&ll)) {
*err = "argument couldn't be parsed into an integer" ;
return 0;
}
}
if (!numericBoundaryCheck(data, ll, err))
return 0;
if (data.numeric.is_valid_fn && !data.numeric.is_valid_fn(ll, err))
return 0;
SET_NUMERIC_TYPE(ll)
return 1;
}
static int numericConfigSet(typeData data, sds value, char **err) {
long long ll, prev = 0;
if (data.numeric.is_memory) {
int memerr;
ll = memtoll(value, &memerr);
if (memerr || ll < 0) return 0;
} else {
if (!string2ll(value, sdslen(value),&ll)) return 0;
}
if (!numericBoundaryCheck(data, ll, err))
return 0;
@ -1884,7 +1822,7 @@ static int numericConfigSet(typeData data, sds value, char **err) {
GET_NUMERIC_TYPE(prev)
SET_NUMERIC_TYPE(ll)
if (data.numeric.update_fn && !data.numeric.update_fn(ll, prev, err)) {
if (update && data.numeric.update_fn && !data.numeric.update_fn(ll, prev, err)) {
SET_NUMERIC_TYPE(prev)
return 0;
}
@ -1918,7 +1856,7 @@ static void numericConfigRewrite(typeData data, const char *name, struct rewrite
#define embedCommonNumericalConfig(name, alias, modifiable, lower, upper, config_addr, default, memory, is_valid, update) { \
embedCommonConfig(name, alias, modifiable) \
embedConfigInterface(numericConfigInit, numericConfigLoad, numericConfigSet, numericConfigGet, numericConfigRewrite) \
embedConfigInterface(numericConfigInit, numericConfigSet, numericConfigGet, numericConfigRewrite) \
.data.numeric = { \
.lower_bound = (lower), \
.upper_bound = (upper), \
@ -2061,8 +1999,9 @@ static int updateMaxmemory(long long val, long long prev, char **err) {
UNUSED(prev);
UNUSED(err);
if (val) {
if ((unsigned long long)val < zmalloc_used_memory()) {
serverLog(LL_WARNING,"WARNING: the new maxmemory value set via CONFIG SET is smaller than the current memory usage. This will result in key eviction and/or the inability to accept new write commands depending on the maxmemory-policy.");
size_t used = zmalloc_used_memory()-freeMemoryGetNotCountedMemory();
if ((unsigned long long)val < used) {
serverLog(LL_WARNING,"WARNING: the new maxmemory value set via CONFIG SET (%llu) is smaller than the current memory usage (%zu). This will result in key eviction and/or the inability to accept new write commands depending on the maxmemory-policy.", server.maxmemory, used);
}
freeMemoryIfNeededAndSafe();
}

View File

@ -1522,6 +1522,22 @@ int *georadiusGetKeys(struct redisCommand *cmd, robj **argv, int argc, int *numk
return keys;
}
/* Helper function to extract keys from memory command.
* MEMORY USAGE <key> */
int *memoryGetKeys(struct redisCommand *cmd, robj **argv, int argc, int *numkeys) {
int *keys;
UNUSED(cmd);
if (argc >= 3 && !strcasecmp(argv[1]->ptr,"usage")) {
keys = zmalloc(sizeof(int) * 1);
keys[0] = 2;
*numkeys = 1;
return keys;
}
*numkeys = 0;
return NULL;
}
/* XREAD [BLOCK <milliseconds>] [COUNT <count>] [GROUP <groupname> <ttl>]
* STREAMS key_1 key_2 ... key_N ID_1 ID_2 ... ID_N */
int *xreadGetKeys(struct redisCommand *cmd, robj **argv, int argc, int *numkeys) {

View File

@ -355,6 +355,7 @@ void debugCommand(client *c) {
"CRASH-AND-RECOVER <milliseconds> -- Hard crash and restart after <milliseconds> delay.",
"DIGEST -- Output a hex signature representing the current DB content.",
"DIGEST-VALUE <key-1> ... <key-N>-- Output a hex signature of the values of all the specified keys.",
"DEBUG PROTOCOL [string|integer|double|bignum|null|array|set|map|attrib|push|verbatim|true|false]",
"ERROR <string> -- Return a Redis protocol error with <string> as message. Useful for clients unit tests to simulate Redis errors.",
"LOG <message> -- write message to the server log.",
"HTSTATS <dbid> -- Return hash table statistics of the specified Redis database.",
@ -586,7 +587,7 @@ NULL
}
} else if (!strcasecmp(c->argv[1]->ptr,"protocol") && c->argc == 3) {
/* DEBUG PROTOCOL [string|integer|double|bignum|null|array|set|map|
* attrib|push|verbatim|true|false|state|err|bloberr] */
* attrib|push|verbatim|true|false] */
char *name = c->argv[2]->ptr;
if (!strcasecmp(name,"string")) {
addReplyBulkCString(c,"Hello World");
@ -634,7 +635,7 @@ NULL
} else if (!strcasecmp(name,"verbatim")) {
addReplyVerbatim(c,"This is a verbatim\nstring",25,"txt");
} else {
addReplyError(c,"Wrong protocol type name. Please use one of the following: string|integer|double|bignum|null|array|set|map|attrib|push|verbatim|true|false|state|err|bloberr");
addReplyError(c,"Wrong protocol type name. Please use one of the following: string|integer|double|bignum|null|array|set|map|attrib|push|verbatim|true|false");
}
} else if (!strcasecmp(c->argv[1]->ptr,"sleep") && c->argc == 3) {
double dtime = strtod(c->argv[2]->ptr,NULL);

View File

@ -714,9 +714,9 @@ void RM_KeyAtPos(RedisModuleCtx *ctx, int pos) {
* flags into the command flags used by the Redis core.
*
* It returns the set of flags, or -1 if unknown flags are found. */
int commandFlagsFromString(char *s) {
int64_t commandFlagsFromString(char *s) {
int count, j;
int flags = 0;
int64_t flags = 0;
sds *tokens = sdssplitlen(s,strlen(s)," ",1,&count);
for (j = 0; j < count; j++) {
char *t = tokens[j];
@ -798,7 +798,7 @@ int commandFlagsFromString(char *s) {
* to authenticate a client.
*/
int RM_CreateCommand(RedisModuleCtx *ctx, const char *name, RedisModuleCmdFunc cmdfunc, const char *strflags, int firstkey, int lastkey, int keystep) {
int flags = strflags ? commandFlagsFromString((char*)strflags) : 0;
int64_t flags = strflags ? commandFlagsFromString((char*)strflags) : 0;
if (flags == -1) return REDISMODULE_ERR;
if ((flags & CMD_MODULE_NO_CLUSTER) && server.cluster_enabled)
return REDISMODULE_ERR;
@ -890,7 +890,8 @@ void RM_SetModuleOptions(RedisModuleCtx *ctx, int options) {
ctx->module->options = options;
}
/* Signals that the key is modified from user's perspective (i.e. invalidate WATCH). */
/* Signals that the key is modified from user's perspective (i.e. invalidate WATCH
* and client side caching). */
int RM_SignalModifiedKey(RedisModuleCtx *ctx, RedisModuleString *keyname) {
signalModifiedKey(ctx->client->db,keyname);
return REDISMODULE_OK;
@ -3649,14 +3650,15 @@ void moduleRDBLoadError(RedisModuleIO *io) {
io->error = 1;
return;
}
serverLog(LL_WARNING,
serverPanic(
"Error loading data from RDB (short read or EOF). "
"Read performed by module '%s' about type '%s' "
"after reading '%llu' bytes of a value.",
"after reading '%llu' bytes of a value "
"for key named: '%s'.",
io->type->module->name,
io->type->name,
(unsigned long long)io->bytes);
exit(1);
(unsigned long long)io->bytes,
io->key? (char*)io->key->ptr: "(null)");
}
/* Returns 0 if there's at least one registered data type that did not declare
@ -4805,7 +4807,8 @@ void moduleReleaseGIL(void) {
* - REDISMODULE_NOTIFY_EXPIRED: Expiration events
* - REDISMODULE_NOTIFY_EVICTED: Eviction events
* - REDISMODULE_NOTIFY_STREAM: Stream events
* - REDISMODULE_NOTIFY_ALL: All events
* - REDISMODULE_NOTIFY_KEYMISS: Key-miss events
* - REDISMODULE_NOTIFY_ALL: All events (Excluding REDISMODULE_NOTIFY_KEYMISS)
*
* We do not distinguish between key events and keyspace events, and it is up
* to the module to filter the actions taken based on the key.

View File

@ -369,9 +369,10 @@ void addReplyErrorLength(client *c, const char *s, size_t len) {
* Where the master must propagate the first change even if the second
* will produce an error. However it is useful to log such events since
* they are rare and may hint at errors in a script or a bug in Redis. */
if (c->flags & (CLIENT_MASTER|CLIENT_SLAVE) && !(c->flags & CLIENT_MONITOR)) {
char* to = c->flags & CLIENT_MASTER? "master": "replica";
char* from = c->flags & CLIENT_MASTER? "replica": "master";
int ctype = getClientType(c);
if (ctype == CLIENT_TYPE_MASTER || ctype == CLIENT_TYPE_SLAVE) {
char* to = ctype == CLIENT_TYPE_MASTER? "master": "replica";
char* from = ctype == CLIENT_TYPE_MASTER? "replica": "master";
char *cmdname = c->lastcmd ? c->lastcmd->name : "<unknown>";
serverLog(LL_WARNING,"== CRITICAL == This %s is sending an error "
"to its %s: '%s' after processing the command "
@ -1074,7 +1075,7 @@ void freeClient(client *c) {
}
/* Log link disconnection with slave */
if ((c->flags & CLIENT_SLAVE) && !(c->flags & CLIENT_MONITOR)) {
if (getClientType(c) == CLIENT_TYPE_SLAVE) {
serverLog(LL_WARNING,"Connection with replica %s lost.",
replicationGetSlaveName(c));
}
@ -1121,7 +1122,7 @@ void freeClient(client *c) {
/* We need to remember the time when we started to have zero
* attached slaves, as after some time we'll free the replication
* backlog. */
if (c->flags & CLIENT_SLAVE && listLength(server.slaves) == 0)
if (getClientType(c) == CLIENT_TYPE_SLAVE && listLength(server.slaves) == 0)
server.repl_no_slaves_since = server.unixtime;
refreshGoodSlavesCount();
/* Fire the replica change modules event. */
@ -1162,9 +1163,14 @@ void freeClientAsync(client *c) {
* may access the list while Redis uses I/O threads. All the other accesses
* are in the context of the main thread while the other threads are
* idle. */
static pthread_mutex_t async_free_queue_mutex = PTHREAD_MUTEX_INITIALIZER;
if (c->flags & CLIENT_CLOSE_ASAP || c->flags & CLIENT_LUA) return;
c->flags |= CLIENT_CLOSE_ASAP;
if (server.io_threads_num == 1) {
/* no need to bother with locking if there's just one thread (the main thread) */
listAddNodeTail(server.clients_to_close,c);
return;
}
static pthread_mutex_t async_free_queue_mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_lock(&async_free_queue_mutex);
listAddNodeTail(server.clients_to_close,c);
pthread_mutex_unlock(&async_free_queue_mutex);
@ -1252,8 +1258,8 @@ int writeToClient(client *c, int handler_installed) {
* just deliver as much data as it is possible to deliver.
*
* Moreover, we also send as much as possible if the client is
* a slave (otherwise, on high-speed traffic, the replication
* buffer will grow indefinitely) */
* a slave or a monitor (otherwise, on high-speed traffic, the
* replication/output buffer will grow indefinitely) */
if (totwritten > NET_MAX_WRITES_PER_EVENT &&
(server.maxmemory == 0 ||
zmalloc_used_memory() < server.maxmemory) &&
@ -1439,7 +1445,7 @@ int processInlineBuffer(client *c) {
/* Newline from slaves can be used to refresh the last ACK time.
* This is useful for a slave to ping back while loading a big
* RDB file. */
if (querylen == 0 && c->flags & CLIENT_SLAVE)
if (querylen == 0 && getClientType(c) == CLIENT_TYPE_SLAVE)
c->repl_ack_time = server.unixtime;
/* Move querybuffer position to the next query in the buffer. */
@ -2433,12 +2439,14 @@ unsigned long getClientOutputBufferMemoryUsage(client *c) {
*
* The function will return one of the following:
* CLIENT_TYPE_NORMAL -> Normal client
* CLIENT_TYPE_SLAVE -> Slave or client executing MONITOR command
* CLIENT_TYPE_SLAVE -> Slave
* CLIENT_TYPE_PUBSUB -> Client subscribed to Pub/Sub channels
* CLIENT_TYPE_MASTER -> The client representing our replication master.
*/
int getClientType(client *c) {
if (c->flags & CLIENT_MASTER) return CLIENT_TYPE_MASTER;
/* Even though MONITOR clients are marked as replicas, we
* want the expose them as normal clients. */
if ((c->flags & CLIENT_SLAVE) && !(c->flags & CLIENT_MONITOR))
return CLIENT_TYPE_SLAVE;
if (c->flags & CLIENT_PUBSUB) return CLIENT_TYPE_PUBSUB;

View File

@ -82,10 +82,10 @@ sds keyspaceEventsFlagsToString(int flags) {
if (flags & NOTIFY_EXPIRED) res = sdscatlen(res,"x",1);
if (flags & NOTIFY_EVICTED) res = sdscatlen(res,"e",1);
if (flags & NOTIFY_STREAM) res = sdscatlen(res,"t",1);
if (flags & NOTIFY_KEY_MISS) res = sdscatlen(res,"m",1);
}
if (flags & NOTIFY_KEYSPACE) res = sdscatlen(res,"K",1);
if (flags & NOTIFY_KEYEVENT) res = sdscatlen(res,"E",1);
if (flags & NOTIFY_KEY_MISS) res = sdscatlen(res,"m",1);
return res;
}

View File

@ -974,38 +974,29 @@ struct redisMemOverhead *getMemoryOverheadData(void) {
mh->repl_backlog = mem;
mem_total += mem;
mem = 0;
if (listLength(server.slaves)) {
listIter li;
listNode *ln;
listRewind(server.slaves,&li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
mem += getClientOutputBufferMemoryUsage(c);
mem += sdsAllocSize(c->querybuf);
mem += sizeof(client);
}
}
mh->clients_slaves = mem;
mem_total+=mem;
mem = 0;
if (listLength(server.clients)) {
listIter li;
listNode *ln;
size_t mem_normal = 0, mem_slaves = 0;
listRewind(server.clients,&li);
while((ln = listNext(&li))) {
size_t mem_curr = 0;
client *c = listNodeValue(ln);
if (c->flags & CLIENT_SLAVE && !(c->flags & CLIENT_MONITOR))
continue;
mem += getClientOutputBufferMemoryUsage(c);
mem += sdsAllocSize(c->querybuf);
mem += sizeof(client);
int type = getClientType(c);
mem_curr += getClientOutputBufferMemoryUsage(c);
mem_curr += sdsAllocSize(c->querybuf);
mem_curr += sizeof(client);
if (type == CLIENT_TYPE_SLAVE)
mem_slaves += mem_curr;
else
mem_normal += mem_curr;
}
mh->clients_slaves = mem_slaves;
mh->clients_normal = mem_normal;
mem = mem_slaves + mem_normal;
}
mh->clients_normal = mem;
mem_total+=mem;
mem = 0;

View File

@ -1871,7 +1871,10 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, robj *key) {
}
} else if (rdbtype == RDB_TYPE_MODULE || rdbtype == RDB_TYPE_MODULE_2) {
uint64_t moduleid = rdbLoadLen(rdb,NULL);
if (rioGetReadError(rdb)) return NULL;
if (rioGetReadError(rdb)) {
rdbReportReadError("Short read module id");
return NULL;
}
moduleType *mt = moduleTypeLookupModuleByID(moduleid);
char name[10];

View File

@ -1595,15 +1595,15 @@ static int parseOptions(int argc, char **argv) {
#ifdef USE_OPENSSL
} else if (!strcmp(argv[i],"--tls")) {
config.tls = 1;
} else if (!strcmp(argv[i],"--sni")) {
} else if (!strcmp(argv[i],"--sni") && !lastarg) {
config.sni = argv[++i];
} else if (!strcmp(argv[i],"--cacertdir")) {
} else if (!strcmp(argv[i],"--cacertdir") && !lastarg) {
config.cacertdir = argv[++i];
} else if (!strcmp(argv[i],"--cacert")) {
} else if (!strcmp(argv[i],"--cacert") && !lastarg) {
config.cacert = argv[++i];
} else if (!strcmp(argv[i],"--cert")) {
} else if (!strcmp(argv[i],"--cert") && !lastarg) {
config.cert = argv[++i];
} else if (!strcmp(argv[i],"--key")) {
} else if (!strcmp(argv[i],"--key") && !lastarg) {
config.key = argv[++i];
#endif
} else if (!strcmp(argv[i],"-v") || !strcmp(argv[i], "--version")) {
@ -1701,12 +1701,13 @@ static void usage(void) {
" -c Enable cluster mode (follow -ASK and -MOVED redirections).\n"
#ifdef USE_OPENSSL
" --tls Establish a secure TLS connection.\n"
" --cacert CA Certificate file to verify with.\n"
" --cacertdir Directory where trusted CA certificates are stored.\n"
" --sni <host> Server name indication for TLS.\n"
" --cacert <file> CA Certificate file to verify with.\n"
" --cacertdir <dir> Directory where trusted CA certificates are stored.\n"
" If neither cacert nor cacertdir are specified, the default\n"
" system-wide trusted root certs configuration will apply.\n"
" --cert Client certificate to authenticate with.\n"
" --key Private key file to authenticate with.\n"
" --cert <file> Client certificate to authenticate with.\n"
" --key <file> Private key file to authenticate with.\n"
#endif
" --raw Use raw formatting for replies (default when STDOUT is\n"
" not a tty).\n"

View File

@ -127,8 +127,8 @@
#define REDISMODULE_NOTIFY_EXPIRED (1<<8) /* x */
#define REDISMODULE_NOTIFY_EVICTED (1<<9) /* e */
#define REDISMODULE_NOTIFY_STREAM (1<<10) /* t */
#define REDISMODULE_NOTIFY_KEY_MISS (1<<11) /* m */
#define REDISMODULE_NOTIFY_ALL (REDISMODULE_NOTIFY_GENERIC | REDISMODULE_NOTIFY_STRING | REDISMODULE_NOTIFY_LIST | REDISMODULE_NOTIFY_SET | REDISMODULE_NOTIFY_HASH | REDISMODULE_NOTIFY_ZSET | REDISMODULE_NOTIFY_EXPIRED | REDISMODULE_NOTIFY_EVICTED | REDISMODULE_NOTIFY_STREAM | REDISMODULE_NOTIFY_KEY_MISS) /* A */
#define REDISMODULE_NOTIFY_KEY_MISS (1<<11) /* m (Note: This one is excluded from REDISMODULE_NOTIFY_ALL on purpose) */
#define REDISMODULE_NOTIFY_ALL (REDISMODULE_NOTIFY_GENERIC | REDISMODULE_NOTIFY_STRING | REDISMODULE_NOTIFY_LIST | REDISMODULE_NOTIFY_SET | REDISMODULE_NOTIFY_HASH | REDISMODULE_NOTIFY_ZSET | REDISMODULE_NOTIFY_EXPIRED | REDISMODULE_NOTIFY_EVICTED | REDISMODULE_NOTIFY_STREAM) /* A */
/* A special pointer that we can use between the core and the module to signal

View File

@ -1354,7 +1354,7 @@ void disklessLoadRestoreBackups(redisDb *backup, int restore, int empty_db_flags
void readSyncBulkPayload(connection *conn) {
char buf[4096];
ssize_t nread, readlen, nwritten;
int use_diskless_load;
int use_diskless_load = useDisklessLoad();
redisDb *diskless_load_backup = NULL;
int empty_db_flags = server.repl_slave_lazy_flush ? EMPTYDB_ASYNC :
EMPTYDB_NO_FLAGS;
@ -1411,19 +1411,18 @@ void readSyncBulkPayload(connection *conn) {
server.repl_transfer_size = 0;
serverLog(LL_NOTICE,
"MASTER <-> REPLICA sync: receiving streamed RDB from master with EOF %s",
useDisklessLoad()? "to parser":"to disk");
use_diskless_load? "to parser":"to disk");
} else {
usemark = 0;
server.repl_transfer_size = strtol(buf+1,NULL,10);
serverLog(LL_NOTICE,
"MASTER <-> REPLICA sync: receiving %lld bytes from master %s",
(long long) server.repl_transfer_size,
useDisklessLoad()? "to parser":"to disk");
use_diskless_load? "to parser":"to disk");
}
return;
}
use_diskless_load = useDisklessLoad();
if (!use_diskless_load) {
/* Read the data from the socket, store it to a file and search
* for the EOF. */
@ -2399,6 +2398,10 @@ void replicationUnsetMaster(void) {
moduleFireServerEvent(REDISMODULE_EVENT_REPLICATION_ROLE_CHANGED,
REDISMODULE_EVENT_REPLROLECHANGED_NOW_MASTER,
NULL);
/* Restart the AOF subsystem in case we shut it down during a sync when
* we were still a slave. */
if (server.aof_enabled && server.aof_state == AOF_OFF) restartAOFAfterSYNC();
}
/* This function is called when the slave lose the connection with the
@ -2436,9 +2439,6 @@ void replicaofCommand(client *c) {
serverLog(LL_NOTICE,"MASTER MODE enabled (user request from '%s')",
client);
sdsfree(client);
/* Restart the AOF subsystem in case we shut it down during a sync when
* we were still a slave. */
if (server.aof_enabled && server.aof_state == AOF_OFF) restartAOFAfterSYNC();
}
} else {
long port;

View File

@ -579,7 +579,7 @@ struct redisCommand redisCommandTable[] = {
0,NULL,0,0,0,0,0,0},
{"select",selectCommand,2,
"ok-loading fast @keyspace",
"ok-loading fast ok-stale @keyspace",
0,NULL,0,0,0,0,0,0},
{"swapdb",swapdbCommand,3,
@ -660,7 +660,7 @@ struct redisCommand redisCommandTable[] = {
0,NULL,0,0,0,0,0,0},
{"lastsave",lastsaveCommand,1,
"read-only random fast @admin @dangerous",
"read-only random fast ok-loading ok-stale @admin @dangerous",
0,NULL,0,0,0,0,0,0},
{"type",typeCommand,2,
@ -708,7 +708,7 @@ struct redisCommand redisCommandTable[] = {
0,NULL,0,0,0,0,0,0},
{"monitor",monitorCommand,1,
"admin no-script",
"admin no-script ok-loading ok-stale",
0,NULL,0,0,0,0,0,0},
{"ttl",ttlCommand,2,
@ -740,7 +740,7 @@ struct redisCommand redisCommandTable[] = {
0,NULL,0,0,0,0,0,0},
{"debug",debugCommand,-2,
"admin no-script",
"admin no-script ok-loading ok-stale",
0,NULL,0,0,0,0,0,0},
{"config",configCommand,-2,
@ -817,14 +817,14 @@ struct redisCommand redisCommandTable[] = {
{"memory",memoryCommand,-2,
"random read-only",
0,NULL,0,0,0,0,0,0},
0,memoryGetKeys,0,0,0,0,0,0},
{"client",clientCommand,-2,
"admin no-script random @connection",
"admin no-script random ok-loading ok-stale @connection",
0,NULL,0,0,0,0,0,0},
{"hello",helloCommand,-2,
"no-auth no-script fast no-monitor no-slowlog @connection",
"no-auth no-script fast no-monitor ok-loading ok-stale no-slowlog @connection",
0,NULL,0,0,0,0,0,0},
/* EVAL can modify the dataset, however it is not flagged as a write
@ -838,7 +838,7 @@ struct redisCommand redisCommandTable[] = {
0,evalGetKeys,0,0,0,0,0,0},
{"slowlog",slowlogCommand,-2,
"admin random",
"admin random ok-loading ok-stale",
0,NULL,0,0,0,0,0,0},
{"script",scriptCommand,-2,
@ -846,7 +846,7 @@ struct redisCommand redisCommandTable[] = {
0,NULL,0,0,0,0,0,0},
{"time",timeCommand,1,
"read-only random fast",
"read-only random fast ok-loading ok-stale",
0,NULL,0,0,0,0,0,0},
{"bitop",bitopCommand,-4,
@ -1498,7 +1498,7 @@ int clientsCronHandleTimeout(client *c, mstime_t now_ms) {
time_t now = now_ms/1000;
if (server.maxidletime &&
!(c->flags & CLIENT_SLAVE) && /* no timeout for slaves */
!(c->flags & CLIENT_SLAVE) && /* no timeout for slaves and monitors */
!(c->flags & CLIENT_MASTER) && /* no timeout for masters */
!(c->flags & CLIENT_BLOCKED) && /* no timeout for BLPOP */
!(c->flags & CLIENT_PUBSUB) && /* no timeout for Pub/Sub clients */

View File

@ -413,8 +413,8 @@ typedef long long ustime_t; /* microsecond time type. */
#define NOTIFY_EXPIRED (1<<8) /* x */
#define NOTIFY_EVICTED (1<<9) /* e */
#define NOTIFY_STREAM (1<<10) /* t */
#define NOTIFY_KEY_MISS (1<<11) /* m */
#define NOTIFY_ALL (NOTIFY_GENERIC | NOTIFY_STRING | NOTIFY_LIST | NOTIFY_SET | NOTIFY_HASH | NOTIFY_ZSET | NOTIFY_EXPIRED | NOTIFY_EVICTED | NOTIFY_STREAM | NOTIFY_KEY_MISS) /* A flag */
#define NOTIFY_KEY_MISS (1<<11) /* m (Note: This one is excluded from NOTIFY_ALL on purpose) */
#define NOTIFY_ALL (NOTIFY_GENERIC | NOTIFY_STRING | NOTIFY_LIST | NOTIFY_SET | NOTIFY_HASH | NOTIFY_ZSET | NOTIFY_EXPIRED | NOTIFY_EVICTED | NOTIFY_STREAM) /* A flag */
/* Get the first bind addr or NULL */
#define NET_FIRST_BIND_ADDR (server.bindaddr_count ? server.bindaddr[0] : NULL)
@ -2077,6 +2077,7 @@ int *sortGetKeys(struct redisCommand *cmd, robj **argv, int argc, int *numkeys);
int *migrateGetKeys(struct redisCommand *cmd, robj **argv, int argc, int *numkeys);
int *georadiusGetKeys(struct redisCommand *cmd, robj **argv, int argc, int *numkeys);
int *xreadGetKeys(struct redisCommand *cmd, robj **argv, int argc, int *numkeys);
int *memoryGetKeys(struct redisCommand *cmd, robj **argv, int argc, int *numkeys);
/* Cluster */
void clusterInit(void);