diff --git a/CONTRIBUTING b/CONTRIBUTING index f57de3fd9..7dee24c74 100644 --- a/CONTRIBUTING +++ b/CONTRIBUTING @@ -8,7 +8,9 @@ each source file that you contribute. # IMPORTANT: HOW TO USE REDIS GITHUB ISSUES * Github issues SHOULD ONLY BE USED to report bugs, and for DETAILED feature - requests. Everything else belongs to the Redis Google Group. + requests. Everything else belongs to the Redis Google Group: + + https://groups.google.com/forum/m/#!forum/Redis-db PLEASE DO NOT POST GENERAL QUESTIONS that are not about bugs or suspected bugs in the Github issues system. We'll be very happy to help you and provide @@ -30,7 +32,7 @@ each source file that you contribute. a. Fork Redis on github ( http://help.github.com/fork-a-repo/ ) b. Create a topic branch (git checkout -b my_branch) c. Push to your branch (git push origin my_branch) - d. Initiate a pull request on github ( http://help.github.com/send-pull-requests/ ) + d. Initiate a pull request on github ( https://help.github.com/articles/creating-a-pull-request/ ) e. Done :) For minor fixes just open a pull request on Github. diff --git a/README.md b/README.md index 42ab47853..8dbad7dbf 100644 --- a/README.md +++ b/README.md @@ -435,7 +435,7 @@ top comment inside `server.c`. After the command operates in some way, it returns a reply to the client, usually using `addReply()` or a similar function defined inside `networking.c`. -There are tons of commands implementations inside th Redis source code +There are tons of commands implementations inside the Redis source code that can serve as examples of actual commands implementations. To write a few toy commands can be a good exercise to familiarize with the code base. diff --git a/deps/README.md b/deps/README.md index 6b1f019dd..367ee1627 100644 --- a/deps/README.md +++ b/deps/README.md @@ -22,7 +22,7 @@ just following tose steps: 1. Remove the jemalloc directory. 2. Substitute it with the new jemalloc source tree. -3. Edit the Makefile localted in the same directoy as the README you are +3. Edit the Makefile localted in the same directory as the README you are reading, and change the --with-version in the Jemalloc configure script options with the version you are using. This is required because otherwise Jemalloc configuration script is broken and will not work nested in another @@ -50,7 +50,7 @@ This is never upgraded since it's part of the Redis project. If there are change Hiredis --- -Hiredis uses the SDS string library, that must be the same version used inside Redis itself. Hiredis is also very critical for Sentinel. Historically Redis often used forked versions of hiredis in a way or the other. In order to upgrade it is adviced to take a lot of care: +Hiredis uses the SDS string library, that must be the same version used inside Redis itself. Hiredis is also very critical for Sentinel. Historically Redis often used forked versions of hiredis in a way or the other. In order to upgrade it is advised to take a lot of care: 1. Check with diff if hiredis API changed and what impact it could have in Redis. 2. Make sure thet the SDS library inside Hiredis and inside Redis are compatible. @@ -83,6 +83,6 @@ and our version: 1. Makefile is modified to allow a different compiler than GCC. 2. We have the implementation source code, and directly link to the following external libraries: `lua_cjson.o`, `lua_struct.o`, `lua_cmsgpack.o` and `lua_bit.o`. -3. There is a security fix in `ldo.c`, line 498: The check for `LUA_SIGNATURE[0]` is removed in order toa void direct bytecode exectuion. +3. There is a security fix in `ldo.c`, line 498: The check for `LUA_SIGNATURE[0]` is removed in order toa void direct bytecode execution. diff --git a/deps/jemalloc/include/jemalloc/internal/jemalloc_internal_inlines_c.h b/deps/jemalloc/include/jemalloc/internal/jemalloc_internal_inlines_c.h index c829ac60c..290e5cf99 100644 --- a/deps/jemalloc/include/jemalloc/internal/jemalloc_internal_inlines_c.h +++ b/deps/jemalloc/include/jemalloc/internal/jemalloc_internal_inlines_c.h @@ -215,4 +215,32 @@ ixalloc(tsdn_t *tsdn, void *ptr, size_t oldsize, size_t size, size_t extra, return arena_ralloc_no_move(tsdn, ptr, oldsize, size, extra, zero); } +JEMALLOC_ALWAYS_INLINE int +iget_defrag_hint(tsdn_t *tsdn, void* ptr, int *bin_util, int *run_util) { + int defrag = 0; + rtree_ctx_t rtree_ctx_fallback; + rtree_ctx_t *rtree_ctx = tsdn_rtree_ctx(tsdn, &rtree_ctx_fallback); + szind_t szind; + bool is_slab; + rtree_szind_slab_read(tsdn, &extents_rtree, rtree_ctx, (uintptr_t)ptr, true, &szind, &is_slab); + if (likely(is_slab)) { + /* Small allocation. */ + extent_t *slab = iealloc(tsdn, ptr); + arena_t *arena = extent_arena_get(slab); + szind_t binind = extent_szind_get(slab); + bin_t *bin = &arena->bins[binind]; + malloc_mutex_lock(tsdn, &bin->lock); + /* don't bother moving allocations from the slab currently used for new allocations */ + if (slab != bin->slabcur) { + const bin_info_t *bin_info = &bin_infos[binind]; + size_t availregs = bin_info->nregs * bin->stats.curslabs; + *bin_util = ((long long)bin->stats.curregs<<16) / availregs; + *run_util = ((long long)(bin_info->nregs - extent_nfree_get(slab))<<16) / bin_info->nregs; + defrag = 1; + } + malloc_mutex_unlock(tsdn, &bin->lock); + } + return defrag; +} + #endif /* JEMALLOC_INTERNAL_INLINES_C_H */ diff --git a/deps/jemalloc/include/jemalloc/jemalloc_macros.h.in b/deps/jemalloc/include/jemalloc/jemalloc_macros.h.in index aee55438c..daf9e571b 100644 --- a/deps/jemalloc/include/jemalloc/jemalloc_macros.h.in +++ b/deps/jemalloc/include/jemalloc/jemalloc_macros.h.in @@ -120,3 +120,7 @@ # define JEMALLOC_RESTRICT_RETURN # define JEMALLOC_ALLOCATOR #endif + +/* This version of Jemalloc, modified for Redis, has the je_get_defrag_hint() + * function. */ +#define JEMALLOC_FRAG_HINT diff --git a/deps/jemalloc/src/jemalloc.c b/deps/jemalloc/src/jemalloc.c index f93c16fa3..5b936cb48 100644 --- a/deps/jemalloc/src/jemalloc.c +++ b/deps/jemalloc/src/jemalloc.c @@ -3324,3 +3324,14 @@ jemalloc_postfork_child(void) { } /******************************************************************************/ + +/* Helps the application decide if a pointer is worth re-allocating in order to reduce fragmentation. + * returns 0 if the allocation is in the currently active run, + * or when it is not causing any frag issue (large or huge bin) + * returns the bin utilization and run utilization both in fixed point 16:16. + * If the application decides to re-allocate it should use MALLOCX_TCACHE_NONE when doing so. */ +JEMALLOC_EXPORT int JEMALLOC_NOTHROW +get_defrag_hint(void* ptr, int *bin_util, int *run_util) { + assert(ptr != NULL); + return iget_defrag_hint(TSDN_NULL, ptr, bin_util, run_util); +} diff --git a/deps/lua/src/lua_cmsgpack.c b/deps/lua/src/lua_cmsgpack.c index 90a388f3f..892154793 100644 --- a/deps/lua/src/lua_cmsgpack.c +++ b/deps/lua/src/lua_cmsgpack.c @@ -385,6 +385,7 @@ void mp_encode_lua_table_as_array(lua_State *L, mp_buf *buf, int level) { #endif mp_encode_array(L,buf,len); + luaL_checkstack(L, 1, "in function mp_encode_lua_table_as_array"); for (j = 1; j <= len; j++) { lua_pushnumber(L,j); lua_gettable(L,-2); @@ -400,6 +401,7 @@ void mp_encode_lua_table_as_map(lua_State *L, mp_buf *buf, int level) { * Lua API, we need to iterate a first time. Note that an alternative * would be to do a single run, and then hack the buffer to insert the * map opcodes for message pack. Too hackish for this lib. */ + luaL_checkstack(L, 3, "in function mp_encode_lua_table_as_map"); lua_pushnil(L); while(lua_next(L,-2)) { lua_pop(L,1); /* remove value, keep key for next iteration. */ @@ -515,10 +517,14 @@ int mp_pack(lua_State *L) { if (nargs == 0) return luaL_argerror(L, 0, "MessagePack pack needs input."); + if (!lua_checkstack(L, nargs)) + return luaL_argerror(L, 0, "Too many arguments for MessagePack pack."); + buf = mp_buf_new(L); for(i = 1; i <= nargs; i++) { /* Copy argument i to top of stack for _encode processing; * the encode function pops it from the stack when complete. */ + luaL_checkstack(L, 1, "in function mp_check"); lua_pushvalue(L, i); mp_encode_lua_type(L,buf,0); @@ -547,6 +553,7 @@ void mp_decode_to_lua_array(lua_State *L, mp_cur *c, size_t len) { int index = 1; lua_newtable(L); + luaL_checkstack(L, 1, "in function mp_decode_to_lua_array"); while(len--) { lua_pushnumber(L,index++); mp_decode_to_lua_type(L,c); @@ -821,6 +828,9 @@ int mp_unpack_full(lua_State *L, int limit, int offset) { * subtract the entire buffer size from the unprocessed size * to get our next start offset */ int offset = len - c.left; + + luaL_checkstack(L, 1, "in function mp_unpack_full"); + /* Return offset -1 when we have have processed the entire buffer. */ lua_pushinteger(L, c.left == 0 ? -1 : offset); /* Results are returned with the arg elements still diff --git a/deps/lua/src/lua_struct.c b/deps/lua/src/lua_struct.c index a602bb430..4d5f027b8 100644 --- a/deps/lua/src/lua_struct.c +++ b/deps/lua/src/lua_struct.c @@ -1,7 +1,7 @@ /* ** {====================================================== ** Library for packing/unpacking structures. -** $Id: struct.c,v 1.4 2012/07/04 18:54:29 roberto Exp $ +** $Id: struct.c,v 1.7 2018/05/11 22:04:31 roberto Exp $ ** See Copyright Notice at the end of this file ** ======================================================= */ @@ -15,8 +15,8 @@ ** h/H - signed/unsigned short ** l/L - signed/unsigned long ** T - size_t -** i/In - signed/unsigned integer with size `n' (default is size of int) -** cn - sequence of `n' chars (from/to a string); when packing, n==0 means +** i/In - signed/unsigned integer with size 'n' (default is size of int) +** cn - sequence of 'n' chars (from/to a string); when packing, n==0 means the whole string; when unpacking, n==0 means use the previous read number as the string length ** s - zero-terminated string @@ -89,14 +89,12 @@ typedef struct Header { } Header; -static int getnum (lua_State *L, const char **fmt, int df) { +static int getnum (const char **fmt, int df) { if (!isdigit(**fmt)) /* no number? */ return df; /* return default value */ else { int a = 0; do { - if (a > (INT_MAX / 10) || a * 10 > (INT_MAX - (**fmt - '0'))) - luaL_error(L, "integral size overflow"); a = a*10 + *((*fmt)++) - '0'; } while (isdigit(**fmt)); return a; @@ -117,9 +115,9 @@ static size_t optsize (lua_State *L, char opt, const char **fmt) { case 'f': return sizeof(float); case 'd': return sizeof(double); case 'x': return 1; - case 'c': return getnum(L, fmt, 1); + case 'c': return getnum(fmt, 1); case 'i': case 'I': { - int sz = getnum(L, fmt, sizeof(int)); + int sz = getnum(fmt, sizeof(int)); if (sz > MAXINTSIZE) luaL_error(L, "integral size %d is larger than limit of %d", sz, MAXINTSIZE); @@ -152,7 +150,7 @@ static void controloptions (lua_State *L, int opt, const char **fmt, case '>': h->endian = BIG; return; case '<': h->endian = LITTLE; return; case '!': { - int a = getnum(L, fmt, MAXALIGN); + int a = getnum(fmt, MAXALIGN); if (!isp2(a)) luaL_error(L, "alignment %d is not a power of 2", a); h->align = a; @@ -295,21 +293,26 @@ static int b_unpack (lua_State *L) { const char *fmt = luaL_checkstring(L, 1); size_t ld; const char *data = luaL_checklstring(L, 2, &ld); - size_t pos = luaL_optinteger(L, 3, 1) - 1; + size_t pos = luaL_optinteger(L, 3, 1); + luaL_argcheck(L, pos > 0, 3, "offset must be 1 or greater"); + pos--; /* Lua indexes are 1-based, but here we want 0-based for C + * pointer math. */ + int n = 0; /* number of results */ defaultoptions(&h); - lua_settop(L, 2); while (*fmt) { int opt = *fmt++; size_t size = optsize(L, opt, &fmt); pos += gettoalign(pos, &h, opt, size); - luaL_argcheck(L, pos+size <= ld, 2, "data string too short"); - luaL_checkstack(L, 1, "too many results"); + luaL_argcheck(L, size <= ld && pos <= ld - size, + 2, "data string too short"); + /* stack space for item + next position */ + luaL_checkstack(L, 2, "too many results"); switch (opt) { case 'b': case 'B': case 'h': case 'H': case 'l': case 'L': case 'T': case 'i': case 'I': { /* integer types */ int issigned = islower(opt); lua_Number res = getinteger(data+pos, h.endian, issigned, size); - lua_pushnumber(L, res); + lua_pushnumber(L, res); n++; break; } case 'x': { @@ -319,25 +322,26 @@ static int b_unpack (lua_State *L) { float f; memcpy(&f, data+pos, size); correctbytes((char *)&f, sizeof(f), h.endian); - lua_pushnumber(L, f); + lua_pushnumber(L, f); n++; break; } case 'd': { double d; memcpy(&d, data+pos, size); correctbytes((char *)&d, sizeof(d), h.endian); - lua_pushnumber(L, d); + lua_pushnumber(L, d); n++; break; } case 'c': { if (size == 0) { - if (!lua_isnumber(L, -1)) - luaL_error(L, "format `c0' needs a previous size"); + if (n == 0 || !lua_isnumber(L, -1)) + luaL_error(L, "format 'c0' needs a previous size"); size = lua_tonumber(L, -1); - lua_pop(L, 1); - luaL_argcheck(L, pos+size <= ld, 2, "data string too short"); + lua_pop(L, 1); n--; + luaL_argcheck(L, size <= ld && pos <= ld - size, + 2, "data string too short"); } - lua_pushlstring(L, data+pos, size); + lua_pushlstring(L, data+pos, size); n++; break; } case 's': { @@ -345,15 +349,15 @@ static int b_unpack (lua_State *L) { if (e == NULL) luaL_error(L, "unfinished string in data"); size = (e - (data+pos)) + 1; - lua_pushlstring(L, data+pos, size - 1); + lua_pushlstring(L, data+pos, size - 1); n++; break; } default: controloptions(L, opt, &fmt, &h); } pos += size; } - lua_pushinteger(L, pos + 1); - return lua_gettop(L) - 2; + lua_pushinteger(L, pos + 1); /* next position */ + return n + 1; } @@ -399,7 +403,7 @@ LUALIB_API int luaopen_struct (lua_State *L) { /****************************************************************************** -* Copyright (C) 2010-2012 Lua.org, PUC-Rio. All rights reserved. +* Copyright (C) 2010-2018 Lua.org, PUC-Rio. All rights reserved. * * Permission is hereby granted, free of charge, to any person obtaining * a copy of this software and associated documentation files (the diff --git a/redis.conf b/redis.conf index f5b7d5fed..42d24f26e 100644 --- a/redis.conf +++ b/redis.conf @@ -639,7 +639,7 @@ slave-priority 100 # it with the specified string. # 4) During replication, when a slave performs a full resynchronization with # its master, the content of the whole database is removed in order to -# load the RDB file just transfered. +# load the RDB file just transferred. # # In all the above cases the default is to delete objects in a blocking way, # like if DEL was called. However you can configure each case specifically @@ -1106,6 +1106,17 @@ zset-max-ziplist-value 64 # composed of many HyperLogLogs with cardinality in the 0 - 15000 range. hll-sparse-max-bytes 3000 +# Streams macro node max size / items. The stream data structure is a radix +# tree of big nodes that encode multiple items inside. Using this configuration +# it is possible to configure how big a single node can be in bytes, and the +# maximum number of items it may contain before switching to a new node when +# appending new stream entries. If any of the following settings are set to +# zero, the limit is ignored, so for instance it is possible to set just a +# max entires limit by setting max-bytes to 0 and max-entries to the desired +# value. +stream-node-max-bytes 4096 +stream-node-max-entries 100 + # Active rehashing uses 1 millisecond every 100 milliseconds of CPU time in # order to help rehashing the main Redis hash table (the one mapping top-level # keys to values). The hash table implementation Redis uses (see dict.c) @@ -1200,6 +1211,12 @@ hz 10 # big latency spikes. aof-rewrite-incremental-fsync yes +# When redis saves RDB file, if the following option is enabled +# the file will be fsync-ed every 32 MB of data generated. This is useful +# in order to commit the file to the disk more incrementally and avoid +# big latency spikes. +rdb-save-incremental-fsync yes + # Redis LFU eviction (see maxmemory setting) can be tuned. However it is a good # idea to start with the default settings and only change them after investigating # how to improve the performances and how the keys LFU change over time, which diff --git a/sentinel.conf b/sentinel.conf index 0e1b266ed..3703c7394 100644 --- a/sentinel.conf +++ b/sentinel.conf @@ -194,3 +194,31 @@ sentinel failover-timeout mymaster 180000 # # sentinel client-reconfig-script mymaster /var/redis/reconfig.sh +# SECURITY +# +# By default SENTINEL SET will not be able to change the notification-script +# and client-reconfig-script at runtime. This avoids a trivial security issue +# where clients can set the script to anything and trigger a failover in order +# to get the program executed. + +sentinel deny-scripts-reconfig yes + +# REDIS COMMANDS RENAMING +# +# Sometimes the Redis server has certain commands, that are needed for Sentinel +# to work correctly, renamed to unguessable strings. This is often the case +# of CONFIG and SLAVEOF in the context of providers that provide Redis as +# a service, and don't want the customers to reconfigure the instances outside +# of the administration console. +# +# In such case it is possible to tell Sentinel to use different command names +# instead of the normal ones. For example if the master "mymaster", and the +# associated slaves, have "CONFIG" all renamed to "GUESSME", I could use: +# +# sentinel rename-command mymaster CONFIG GUESSME +# +# After such configuration is set, every time Sentinel would use CONFIG it will +# use GUESSME instead. Note that there is no actual need to respect the command +# case, so writing "config guessme" is the same in the example above. +# +# SENTINEL SET can also be used in order to perform this configuration at runtime. diff --git a/src/Makefile b/src/Makefile index 8cd7afe5e..f5525bd6d 100644 --- a/src/Makefile +++ b/src/Makefile @@ -144,7 +144,7 @@ endif REDIS_SERVER_NAME=redis-server REDIS_SENTINEL_NAME=redis-sentinel -REDIS_SERVER_OBJ=adlist.o quicklist.o ae.o anet.o dict.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o crc16.o endianconv.o slowlog.o scripting.o bio.o rio.o rand.o memtest.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o redis-check-rdb.o redis-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o +REDIS_SERVER_OBJ=adlist.o quicklist.o ae.o anet.o dict.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o crc16.o endianconv.o slowlog.o scripting.o bio.o rio.o rand.o memtest.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o redis-check-rdb.o redis-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o REDIS_CLI_NAME=redis-cli REDIS_CLI_OBJ=anet.o adlist.o dict.o redis-cli.o zmalloc.o release.o anet.o ae.o crc64.o siphash.o crc16.o REDIS_BENCHMARK_NAME=redis-benchmark diff --git a/src/ae.c b/src/ae.c index 65adb2ab8..1ea671569 100644 --- a/src/ae.c +++ b/src/ae.c @@ -433,7 +433,7 @@ int aeProcessEvents(aeEventLoop *eventLoop, int flags) * before replying to a client. */ int invert = fe->mask & AE_BARRIER; - /* Note the "fe->mask & mask & ..." code: maybe an already + /* Note the "fe->mask & mask & ..." code: maybe an already * processed event removed an element that fired and we still * didn't processed, so we check if the event is still valid. * @@ -485,7 +485,7 @@ int aeWait(int fd, int mask, long long milliseconds) { if ((retval = poll(&pfd, 1, milliseconds))== 1) { if (pfd.revents & POLLIN) retmask |= AE_READABLE; if (pfd.revents & POLLOUT) retmask |= AE_WRITABLE; - if (pfd.revents & POLLERR) retmask |= AE_WRITABLE; + if (pfd.revents & POLLERR) retmask |= AE_WRITABLE; if (pfd.revents & POLLHUP) retmask |= AE_WRITABLE; return retmask; } else { diff --git a/src/aof.c b/src/aof.c index 0aa081f00..be416ec4e 100644 --- a/src/aof.c +++ b/src/aof.c @@ -228,7 +228,7 @@ static void killAppendOnlyChild(void) { void stopAppendOnly(void) { serverAssert(server.aof_state != AOF_OFF); flushAppendOnlyFile(1); - aof_fsync(server.aof_fd); + redis_fsync(server.aof_fd); close(server.aof_fd); server.aof_fd = -1; @@ -261,7 +261,7 @@ int startAppendOnly(void) { serverLog(LL_WARNING,"AOF was enabled but there is already a child process saving an RDB file on disk. An AOF background was scheduled to start when possible."); } else { /* If there is a pending AOF rewrite, we need to switch it off and - * start a new one: the old one cannot be reused becuase it is not + * start a new one: the old one cannot be reused because it is not * accumulating the AOF buffer. */ if (server.aof_child_pid != -1) { serverLog(LL_WARNING,"AOF was enabled but there is already an AOF rewriting in background. Stopping background AOF and starting a rewrite now."); @@ -476,10 +476,10 @@ void flushAppendOnlyFile(int force) { /* Perform the fsync if needed. */ if (server.aof_fsync == AOF_FSYNC_ALWAYS) { - /* aof_fsync is defined as fdatasync() for Linux in order to avoid + /* redis_fsync is defined as fdatasync() for Linux in order to avoid * flushing metadata. */ latencyStartMonitor(latency); - aof_fsync(server.aof_fd); /* Let's try to get this data on the disk */ + redis_fsync(server.aof_fd); /* Let's try to get this data on the disk */ latencyEndMonitor(latency); latencyAddSampleIfNeeded("aof-fsync-always",latency); server.aof_last_fsync = server.unixtime; @@ -645,7 +645,7 @@ struct client *createFakeClient(void) { c->obuf_soft_limit_reached_time = 0; c->watched_keys = listCreate(); c->peerid = NULL; - listSetFreeMethod(c->reply,decrRefCountVoid); + listSetFreeMethod(c->reply,freeClientReplyValue); listSetDupMethod(c->reply,dupClientReplyValue); initClientMultiState(c); return c; @@ -683,7 +683,7 @@ int loadAppendOnlyFile(char *filename) { exit(1); } - /* Handle a zero-length AOF file as a special case. An emtpy AOF file + /* Handle a zero-length AOF file as a special case. An empty AOF file * is a valid AOF because an empty server with AOF enabled will create * a zero length file at startup, that will remain like that if no write * operation is received. */ @@ -1221,7 +1221,6 @@ int rewriteAppendOnlyFileRio(rio *aof) { dictIterator *di = NULL; dictEntry *de; size_t processed = 0; - long long now = mstime(); int j; for (j = 0; j < server.dbnum; j++) { @@ -1247,9 +1246,6 @@ int rewriteAppendOnlyFileRio(rio *aof) { expiretime = getExpire(db,&key); - /* If this key is already expired skip it */ - if (expiretime != -1 && expiretime < now) continue; - /* Save the key and associated value */ if (o->type == OBJ_STRING) { /* Emit a SET command */ @@ -1322,7 +1318,7 @@ int rewriteAppendOnlyFile(char *filename) { rioInitWithFile(&aof,fp); if (server.aof_rewrite_incremental_fsync) - rioSetAutoSync(&aof,AOF_AUTOSYNC_BYTES); + rioSetAutoSync(&aof,REDIS_AUTOSYNC_BYTES); if (server.aof_use_rdb_preamble) { int error; @@ -1690,7 +1686,7 @@ void backgroundRewriteDoneHandler(int exitcode, int bysignal) { oldfd = server.aof_fd; server.aof_fd = newfd; if (server.aof_fsync == AOF_FSYNC_ALWAYS) - aof_fsync(newfd); + redis_fsync(newfd); else if (server.aof_fsync == AOF_FSYNC_EVERYSEC) aof_background_fsync(newfd); server.aof_selected_db = -1; /* Make sure SELECT is re-issued */ @@ -1717,7 +1713,7 @@ void backgroundRewriteDoneHandler(int exitcode, int bysignal) { "Background AOF rewrite signal handler took %lldus", ustime()-now); } else if (!bysignal && exitcode != 0) { /* SIGUSR1 is whitelisted, so we have a way to kill a child without - * tirggering an error conditon. */ + * tirggering an error condition. */ if (bysignal != SIGUSR1) server.aof_lastbgrewrite_status = C_ERR; serverLog(LL_WARNING, diff --git a/src/atomicvar.h b/src/atomicvar.h index 84a5bbc5c..173b045fc 100644 --- a/src/atomicvar.h +++ b/src/atomicvar.h @@ -16,7 +16,7 @@ * pthread_mutex_t myvar_mutex; * atomicSet(myvar,12345); * - * If atomic primitives are availble (tested in config.h) the mutex + * If atomic primitives are available (tested in config.h) the mutex * is not used. * * Never use return value from the macros, instead use the AtomicGetIncr() diff --git a/src/bio.c b/src/bio.c index da11f7b86..0c92d053b 100644 --- a/src/bio.c +++ b/src/bio.c @@ -187,7 +187,7 @@ void *bioProcessBackgroundJobs(void *arg) { if (type == BIO_CLOSE_FILE) { close((long)job->arg1); } else if (type == BIO_AOF_FSYNC) { - aof_fsync((long)job->arg1); + redis_fsync((long)job->arg1); } else if (type == BIO_LAZY_FREE) { /* What we free changes depending on what arguments are set: * arg1 -> free the object at pointer. diff --git a/src/bitops.c b/src/bitops.c index 43450fca3..23f2266a7 100644 --- a/src/bitops.c +++ b/src/bitops.c @@ -918,7 +918,7 @@ void bitfieldCommand(client *c) { struct bitfieldOp *ops = NULL; /* Array of ops to execute at end. */ int owtype = BFOVERFLOW_WRAP; /* Overflow type. */ int readonly = 1; - size_t higest_write_offset = 0; + size_t highest_write_offset = 0; for (j = 2; j < c->argc; j++) { int remargs = c->argc-j-1; /* Remaining args other than current. */ @@ -968,8 +968,8 @@ void bitfieldCommand(client *c) { if (opcode != BITFIELDOP_GET) { readonly = 0; - if (higest_write_offset < bitoffset + bits - 1) - higest_write_offset = bitoffset + bits - 1; + if (highest_write_offset < bitoffset + bits - 1) + highest_write_offset = bitoffset + bits - 1; /* INCRBY and SET require another argument. */ if (getLongLongFromObjectOrReply(c,c->argv[j+3],&i64,NULL) != C_OK){ zfree(ops); @@ -999,7 +999,7 @@ void bitfieldCommand(client *c) { /* Lookup by making room up to the farest bit reached by * this operation. */ if ((o = lookupStringForBitCommand(c, - higest_write_offset)) == NULL) return; + highest_write_offset)) == NULL) return; } addReplyMultiBulkLen(c,numops); diff --git a/src/blocked.c b/src/blocked.c index 023fba0cd..4a667501f 100644 --- a/src/blocked.c +++ b/src/blocked.c @@ -314,8 +314,9 @@ void handleClientsBlockedOnKeys(void) { if (de) { list *clients = dictGetVal(de); int numclients = listLength(clients); + unsigned long zcard = zsetLength(o); - while(numclients--) { + while(numclients-- && zcard) { listNode *clientnode = listFirst(clients); client *receiver = clientnode->value; @@ -332,6 +333,7 @@ void handleClientsBlockedOnKeys(void) { ? ZSET_MIN : ZSET_MAX; unblockClient(receiver); genericZpopCommand(receiver,&rl->key,1,where,1,NULL); + zcard--; /* Replicate the command. */ robj *argv[2]; @@ -368,40 +370,48 @@ void handleClientsBlockedOnKeys(void) { if (receiver->btype != BLOCKED_STREAM) continue; streamID *gt = dictFetchValue(receiver->bpop.keys, rl->key); - if (s->last_id.ms > gt->ms || - (s->last_id.ms == gt->ms && - s->last_id.seq > gt->seq)) - { + + /* If we blocked in the context of a consumer + * group, we need to resolve the group and update the + * last ID the client is blocked for: this is needed + * because serving other clients in the same consumer + * group will alter the "last ID" of the consumer + * group, and clients blocked in a consumer group are + * always blocked for the ">" ID: we need to deliver + * only new messages and avoid unblocking the client + * otherwise. */ + streamCG *group = NULL; + if (receiver->bpop.xread_group) { + group = streamLookupCG(s, + receiver->bpop.xread_group->ptr); + /* If the group was not found, send an error + * to the consumer. */ + if (!group) { + addReplyError(receiver, + "-NOGROUP the consumer group this client " + "was blocked on no longer exists"); + unblockClient(receiver); + continue; + } else { + *gt = group->last_id; + } + } + + if (streamCompareID(&s->last_id, gt) > 0) { streamID start = *gt; start.seq++; /* Can't overflow, it's an uint64_t */ - /* If we blocked in the context of a consumer - * group, we need to resolve the group and - * consumer here. */ - streamCG *group = NULL; + /* Lookup the consumer for the group, if any. */ streamConsumer *consumer = NULL; - if (receiver->bpop.xread_group) { - group = streamLookupCG(s, - receiver->bpop.xread_group->ptr); - /* In theory if the group is not found we - * just perform the read without the group, - * but actually when the group, or the key - * itself is deleted (triggering the removal - * of the group), we check for blocked clients - * and send them an error. */ - } + int noack = 0; + if (group) { consumer = streamLookupConsumer(group, receiver->bpop.xread_consumer->ptr, 1); + noack = receiver->bpop.xread_group_noack; } - /* Note that after we unblock the client, 'gt' - * and other receiver->bpop stuff are no longer - * valid, so we must do the setup above before - * this call. */ - unblockClient(receiver); - /* Emit the two elements sub-array consisting of * the name of the stream and the data we * extracted from it. Wrapped in a single-item @@ -416,7 +426,13 @@ void handleClientsBlockedOnKeys(void) { }; streamReplyWithRange(receiver,s,&start,NULL, receiver->bpop.xread_count, - 0, group, consumer, 0, &pi); + 0, group, consumer, noack, &pi); + + /* Note that after we unblock the client, 'gt' + * and other receiver->bpop stuff are no longer + * valid, so we must do the setup above before + * this call. */ + unblockClient(receiver); } } } diff --git a/src/cluster.c b/src/cluster.c index 0635d7c07..e568f68a6 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -2120,7 +2120,7 @@ void clusterWriteHandler(aeEventLoop *el, int fd, void *privdata, int mask) { nwritten = write(fd, link->sndbuf, sdslen(link->sndbuf)); if (nwritten <= 0) { serverLog(LL_DEBUG,"I/O error writing to node link: %s", - strerror(errno)); + (nwritten == -1) ? strerror(errno) : "short write"); handleLinkIOError(link); return; } @@ -2377,7 +2377,7 @@ void clusterSendPing(clusterLink *link, int type) { * same time. * * Since we have non-voting slaves that lower the probability of an entry - * to feature our node, we set the number of entires per packet as + * to feature our node, we set the number of entries per packet as * 10% of the total nodes we have. */ wanted = floor(dictSize(server.cluster->nodes)/10); if (wanted < 3) wanted = 3; @@ -3100,7 +3100,7 @@ void clusterHandleSlaveFailover(void) { (unsigned long long) myself->configEpoch); } - /* Take responsability for the cluster slots. */ + /* Take responsibility for the cluster slots. */ clusterFailoverReplaceYourMaster(); } else { clusterLogCantFailover(CLUSTER_CANT_FAILOVER_WAITING_VOTES); @@ -3151,11 +3151,11 @@ void clusterHandleSlaveMigration(int max_slaves) { !nodeTimedOut(mymaster->slaves[j])) okslaves++; if (okslaves <= server.cluster_migration_barrier) return; - /* Step 3: Idenitfy a candidate for migration, and check if among the + /* Step 3: Identify a candidate for migration, and check if among the * masters with the greatest number of ok slaves, I'm the one with the * smallest node ID (the "candidate slave"). * - * Note: this means that eventually a replica migration will occurr + * Note: this means that eventually a replica migration will occur * since slaves that are reachable again always have their FAIL flag * cleared, so eventually there must be a candidate. At the same time * this does not mean that there are no race conditions possible (two @@ -3736,7 +3736,7 @@ void clusterCloseAllSlots(void) { * -------------------------------------------------------------------------- */ /* The following are defines that are only used in the evaluation function - * and are based on heuristics. Actaully the main point about the rejoin and + * and are based on heuristics. Actually the main point about the rejoin and * writable delay is that they should be a few orders of magnitude larger * than the network latency. */ #define CLUSTER_MAX_REJOIN_DELAY 5000 @@ -4178,27 +4178,27 @@ void clusterCommand(client *c) { if (c->argc == 2 && !strcasecmp(c->argv[1]->ptr,"help")) { const char *help[] = { -"addslots [slot ...] -- Assign slots to current node.", -"bumpepoch -- Advance the cluster config epoch.", -"count-failure-reports -- Return number of failure reports for .", -"countkeysinslot - Return the number of keys in .", -"delslots [slot ...] -- Delete slots information from current node.", -"failover [force|takeover] -- Promote current slave node to being a master.", -"forget -- Remove a node from the cluster.", -"getkeysinslot -- Return key names stored by current node in a slot.", -"flushslots -- Delete current node own slots information.", -"info - Return onformation about the cluster.", -"keyslot -- Return the hash slot for .", -"meet [bus-port] -- Connect nodes into a working cluster.", -"myid -- Return the node id.", -"nodes -- Return cluster configuration seen by node. Output format:", +"ADDSLOTS [slot ...] -- Assign slots to current node.", +"BUMPEPOCH -- Advance the cluster config epoch.", +"COUNT-failure-reports -- Return number of failure reports for .", +"COUNTKEYSINSLOT - Return the number of keys in .", +"DELSLOTS [slot ...] -- Delete slots information from current node.", +"FAILOVER [force|takeover] -- Promote current slave node to being a master.", +"FORGET -- Remove a node from the cluster.", +"GETKEYSINSLOT -- Return key names stored by current node in a slot.", +"FLUSHSLOTS -- Delete current node own slots information.", +"INFO - Return onformation about the cluster.", +"KEYSLOT -- Return the hash slot for .", +"MEET [bus-port] -- Connect nodes into a working cluster.", +"MYID -- Return the node id.", +"NODES -- Return cluster configuration seen by node. Output format:", " ... ", -"replicate -- Configure current node as slave to .", -"reset [hard|soft] -- Reset current node (default: soft).", -"set-config-epoch - Set config epoch of current node.", -"setslot (importing|migrating|stable|node ) -- Set slot state.", -"slaves -- Return slaves.", -"slots -- Return information about slots range mappings. Each range is made of:", +"REPLICATE -- Configure current node as slave to .", +"RESET [hard|soft] -- Reset current node (default: soft).", +"SET-config-epoch - Set config epoch of current node.", +"SETSLOT (importing|migrating|stable|node ) -- Set slot state.", +"SLAVES -- Return slaves.", +"SLOTS -- Return information about slots range mappings. Each range is made of:", " start, end, master and replicas IP addresses, ports and ids", NULL }; @@ -4746,8 +4746,7 @@ NULL clusterReset(hard); addReply(c,shared.ok); } else { - addReplyErrorFormat(c, "Unknown subcommand or wrong number of arguments for '%s'. Try CLUSTER HELP", - (char*)c->argv[1]->ptr); + addReplySubcommandSyntaxError(c); return; } } @@ -4835,15 +4834,39 @@ void dumpCommand(client *c) { /* RESTORE key ttl serialized-value [REPLACE] */ void restoreCommand(client *c) { - long long ttl; + long long ttl, lfu_freq = -1, lru_idle = -1, lru_clock = -1; rio payload; - int j, type, replace = 0; + int j, type, replace = 0, absttl = 0; robj *obj; /* Parse additional options */ for (j = 4; j < c->argc; j++) { + int additional = c->argc-j-1; if (!strcasecmp(c->argv[j]->ptr,"replace")) { replace = 1; + } else if (!strcasecmp(c->argv[j]->ptr,"absttl")) { + absttl = 1; + } else if (!strcasecmp(c->argv[j]->ptr,"idletime") && additional >= 1 && + lfu_freq == -1) + { + if (getLongLongFromObjectOrReply(c,c->argv[j+1],&lru_idle,NULL) + != C_OK) return; + if (lru_idle < 0) { + addReplyError(c,"Invalid IDLETIME value, must be >= 0"); + return; + } + lru_clock = LRU_CLOCK(); + j++; /* Consume additional arg. */ + } else if (!strcasecmp(c->argv[j]->ptr,"freq") && additional >= 1 && + lru_idle == -1) + { + if (getLongLongFromObjectOrReply(c,c->argv[j+1],&lfu_freq,NULL) + != C_OK) return; + if (lfu_freq < 0 || lfu_freq > 255) { + addReplyError(c,"Invalid FREQ value, must be >= 0 and <= 255"); + return; + } + j++; /* Consume additional arg. */ } else { addReply(c,shared.syntaxerr); return; @@ -4884,7 +4907,11 @@ void restoreCommand(client *c) { /* Create the key and set the TTL if any */ dbAdd(c->db,c->argv[1],obj); - if (ttl) setExpire(c,c->db,c->argv[1],mstime()+ttl); + if (ttl) { + if (!absttl) ttl+=mstime(); + setExpire(c,c->db,c->argv[1],ttl); + } + objectSetLRUOrLFU(obj,lfu_freq,lru_idle,lru_clock); signalModifiedKey(c->db,c->argv[1]); addReply(c,shared.ok); server.dirty++; @@ -5557,7 +5584,7 @@ void clusterRedirectClient(client *c, clusterNode *n, int hashslot, int error_co if (error_code == CLUSTER_REDIR_CROSS_SLOT) { addReplySds(c,sdsnew("-CROSSSLOT Keys in request don't hash to the same slot\r\n")); } else if (error_code == CLUSTER_REDIR_UNSTABLE) { - /* The request spawns mutliple keys in the same slot, + /* The request spawns multiple keys in the same slot, * but the slot is not "stable" currently as there is * a migration or import in progress. */ addReplySds(c,sdsnew("-TRYAGAIN Multiple keys request during rehashing of slot\r\n")); @@ -5589,7 +5616,11 @@ void clusterRedirectClient(client *c, clusterNode *n, int hashslot, int error_co * longer handles, the client is sent a redirection error, and the function * returns 1. Otherwise 0 is returned and no operation is performed. */ int clusterRedirectBlockedClientIfNeeded(client *c) { - if (c->flags & CLIENT_BLOCKED && c->btype == BLOCKED_LIST) { + if (c->flags & CLIENT_BLOCKED && + (c->btype == BLOCKED_LIST || + c->btype == BLOCKED_ZSET || + c->btype == BLOCKED_STREAM)) + { dictEntry *de; dictIterator *di; diff --git a/src/cluster.h b/src/cluster.h index 4d4a4d60e..6f9954d24 100644 --- a/src/cluster.h +++ b/src/cluster.h @@ -243,7 +243,7 @@ union clusterMsgData { #define CLUSTER_PROTO_VER 1 /* Cluster bus protocol version. */ typedef struct { - char sig[4]; /* Siganture "RCmb" (Redis Cluster message bus). */ + char sig[4]; /* Signature "RCmb" (Redis Cluster message bus). */ uint32_t totlen; /* Total length of this message */ uint16_t ver; /* Protocol version, currently set to 1. */ uint16_t port; /* TCP base port number. */ diff --git a/src/config.c b/src/config.c index 5bc6aa2ed..54494c8e1 100644 --- a/src/config.c +++ b/src/config.c @@ -390,7 +390,7 @@ void loadServerConfigFromString(char *config) { } } else if (!strcasecmp(argv[0],"masterauth") && argc == 2) { zfree(server.masterauth); - server.masterauth = zstrdup(argv[1]); + server.masterauth = argv[1][0] ? zstrdup(argv[1]) : NULL; } else if (!strcasecmp(argv[0],"slave-serve-stale-data") && argc == 2) { if ((server.repl_serve_stale_data = yesnotoi(argv[1])) == -1) { err = "argument must be 'yes' or 'no'"; goto loaderr; @@ -431,6 +431,11 @@ void loadServerConfigFromString(char *config) { if ((server.active_defrag_enabled = yesnotoi(argv[1])) == -1) { err = "argument must be 'yes' or 'no'"; goto loaderr; } + if (server.active_defrag_enabled) { +#ifndef HAVE_DEFRAG + err = "active defrag can't be enabled without proper jemalloc support"; goto loaderr; +#endif + } } else if (!strcasecmp(argv[0],"daemonize") && argc == 2) { if ((server.daemonize = yesnotoi(argv[1])) == -1) { err = "argument must be 'yes' or 'no'"; goto loaderr; @@ -483,6 +488,13 @@ void loadServerConfigFromString(char *config) { yesnotoi(argv[1])) == -1) { err = "argument must be 'yes' or 'no'"; goto loaderr; } + } else if (!strcasecmp(argv[0],"rdb-save-incremental-fsync") && + argc == 2) + { + if ((server.rdb_save_incremental_fsync = + yesnotoi(argv[1])) == -1) { + err = "argument must be 'yes' or 'no'"; goto loaderr; + } } else if (!strcasecmp(argv[0],"aof-load-truncated") && argc == 2) { if ((server.aof_load_truncated = yesnotoi(argv[1])) == -1) { err = "argument must be 'yes' or 'no'"; goto loaderr; @@ -496,7 +508,7 @@ void loadServerConfigFromString(char *config) { err = "Password is longer than CONFIG_AUTHPASS_MAX_LEN"; goto loaderr; } - server.requirepass = zstrdup(argv[1]); + server.requirepass = argv[1][0] ? zstrdup(argv[1]) : NULL; } else if (!strcasecmp(argv[0],"pidfile") && argc == 2) { zfree(server.pidfile); server.pidfile = zstrdup(argv[1]); @@ -509,14 +521,16 @@ void loadServerConfigFromString(char *config) { server.rdb_filename = zstrdup(argv[1]); } else if (!strcasecmp(argv[0],"active-defrag-threshold-lower") && argc == 2) { server.active_defrag_threshold_lower = atoi(argv[1]); - if (server.active_defrag_threshold_lower < 0) { - err = "active-defrag-threshold-lower must be 0 or greater"; + if (server.active_defrag_threshold_lower < 0 || + server.active_defrag_threshold_lower > 1000) { + err = "active-defrag-threshold-lower must be between 0 and 1000"; goto loaderr; } } else if (!strcasecmp(argv[0],"active-defrag-threshold-upper") && argc == 2) { server.active_defrag_threshold_upper = atoi(argv[1]); - if (server.active_defrag_threshold_upper < 0) { - err = "active-defrag-threshold-upper must be 0 or greater"; + if (server.active_defrag_threshold_upper < 0 || + server.active_defrag_threshold_upper > 1000) { + err = "active-defrag-threshold-upper must be between 0 and 1000"; goto loaderr; } } else if (!strcasecmp(argv[0],"active-defrag-ignore-bytes") && argc == 2) { @@ -547,6 +561,10 @@ void loadServerConfigFromString(char *config) { server.hash_max_ziplist_entries = memtoll(argv[1], NULL); } else if (!strcasecmp(argv[0],"hash-max-ziplist-value") && argc == 2) { server.hash_max_ziplist_value = memtoll(argv[1], NULL); + } else if (!strcasecmp(argv[0],"stream-node-max-bytes") && argc == 2) { + server.stream_node_max_bytes = memtoll(argv[1], NULL); + } else if (!strcasecmp(argv[0],"stream-node-max-entries") && argc == 2) { + server.stream_node_max_entries = atoi(argv[1]); } else if (!strcasecmp(argv[0],"list-max-ziplist-entries") && argc == 2){ /* DEAD OPTION */ } else if (!strcasecmp(argv[0],"list-max-ziplist-value") && argc == 2) { @@ -1015,6 +1033,8 @@ void configSetCommand(client *c) { "cluster-slave-no-failover",server.cluster_slave_no_failover) { } config_set_bool_field( "aof-rewrite-incremental-fsync",server.aof_rewrite_incremental_fsync) { + } config_set_bool_field( + "rdb-save-incremental-fsync",server.rdb_save_incremental_fsync) { } config_set_bool_field( "aof-load-truncated",server.aof_load_truncated) { } config_set_bool_field( @@ -1056,15 +1076,15 @@ void configSetCommand(client *c) { /* Numerical fields. * config_set_numerical_field(name,var,min,max) */ } config_set_numerical_field( - "tcp-keepalive",server.tcpkeepalive,0,LLONG_MAX) { + "tcp-keepalive",server.tcpkeepalive,0,INT_MAX) { } config_set_numerical_field( - "maxmemory-samples",server.maxmemory_samples,1,LLONG_MAX) { + "maxmemory-samples",server.maxmemory_samples,1,INT_MAX) { } config_set_numerical_field( - "lfu-log-factor",server.lfu_log_factor,0,LLONG_MAX) { + "lfu-log-factor",server.lfu_log_factor,0,INT_MAX) { } config_set_numerical_field( - "lfu-decay-time",server.lfu_decay_time,0,LLONG_MAX) { + "lfu-decay-time",server.lfu_decay_time,0,INT_MAX) { } config_set_numerical_field( - "timeout",server.maxidletime,0,LONG_MAX) { + "timeout",server.maxidletime,0,INT_MAX) { } config_set_numerical_field( "active-defrag-threshold-lower",server.active_defrag_threshold_lower,0,1000) { } config_set_numerical_field( @@ -1076,52 +1096,56 @@ void configSetCommand(client *c) { } config_set_numerical_field( "active-defrag-cycle-max",server.active_defrag_cycle_max,1,99) { } config_set_numerical_field( - "active-defrag-max-scan-fields",server.active_defrag_max_scan_fields,1,LLONG_MAX) { + "active-defrag-max-scan-fields",server.active_defrag_max_scan_fields,1,LONG_MAX) { } config_set_numerical_field( - "auto-aof-rewrite-percentage",server.aof_rewrite_perc,0,LLONG_MAX){ + "auto-aof-rewrite-percentage",server.aof_rewrite_perc,0,INT_MAX){ } config_set_numerical_field( - "hash-max-ziplist-entries",server.hash_max_ziplist_entries,0,LLONG_MAX) { + "hash-max-ziplist-entries",server.hash_max_ziplist_entries,0,LONG_MAX) { } config_set_numerical_field( - "hash-max-ziplist-value",server.hash_max_ziplist_value,0,LLONG_MAX) { + "hash-max-ziplist-value",server.hash_max_ziplist_value,0,LONG_MAX) { + } config_set_numerical_field( + "stream-node-max-bytes",server.stream_node_max_bytes,0,LONG_MAX) { + } config_set_numerical_field( + "stream-node-max-entries",server.stream_node_max_entries,0,LLONG_MAX) { } config_set_numerical_field( "list-max-ziplist-size",server.list_max_ziplist_size,INT_MIN,INT_MAX) { } config_set_numerical_field( "list-compress-depth",server.list_compress_depth,0,INT_MAX) { } config_set_numerical_field( - "set-max-intset-entries",server.set_max_intset_entries,0,LLONG_MAX) { + "set-max-intset-entries",server.set_max_intset_entries,0,LONG_MAX) { } config_set_numerical_field( - "zset-max-ziplist-entries",server.zset_max_ziplist_entries,0,LLONG_MAX) { + "zset-max-ziplist-entries",server.zset_max_ziplist_entries,0,LONG_MAX) { } config_set_numerical_field( - "zset-max-ziplist-value",server.zset_max_ziplist_value,0,LLONG_MAX) { + "zset-max-ziplist-value",server.zset_max_ziplist_value,0,LONG_MAX) { } config_set_numerical_field( - "hll-sparse-max-bytes",server.hll_sparse_max_bytes,0,LLONG_MAX) { + "hll-sparse-max-bytes",server.hll_sparse_max_bytes,0,LONG_MAX) { } config_set_numerical_field( - "lua-time-limit",server.lua_time_limit,0,LLONG_MAX) { + "lua-time-limit",server.lua_time_limit,0,LONG_MAX) { } config_set_numerical_field( - "slowlog-log-slower-than",server.slowlog_log_slower_than,0,LLONG_MAX) { + "slowlog-log-slower-than",server.slowlog_log_slower_than,-1,LLONG_MAX) { } config_set_numerical_field( - "slowlog-max-len",ll,0,LLONG_MAX) { + "slowlog-max-len",ll,0,LONG_MAX) { /* Cast to unsigned. */ - server.slowlog_max_len = (unsigned)ll; + server.slowlog_max_len = (unsigned long)ll; } config_set_numerical_field( "latency-monitor-threshold",server.latency_monitor_threshold,0,LLONG_MAX){ } config_set_numerical_field( - "repl-ping-slave-period",server.repl_ping_slave_period,1,LLONG_MAX) { + "repl-ping-slave-period",server.repl_ping_slave_period,1,INT_MAX) { } config_set_numerical_field( - "repl-timeout",server.repl_timeout,1,LLONG_MAX) { + "repl-timeout",server.repl_timeout,1,INT_MAX) { } config_set_numerical_field( - "repl-backlog-ttl",server.repl_backlog_time_limit,0,LLONG_MAX) { + "repl-backlog-ttl",server.repl_backlog_time_limit,0,LONG_MAX) { } config_set_numerical_field( - "repl-diskless-sync-delay",server.repl_diskless_sync_delay,0,LLONG_MAX) { + "repl-diskless-sync-delay",server.repl_diskless_sync_delay,0,INT_MAX) { } config_set_numerical_field( - "slave-priority",server.slave_priority,0,LLONG_MAX) { + "slave-priority",server.slave_priority,0,INT_MAX) { } config_set_numerical_field( "slave-announce-port",server.slave_announce_port,0,65535) { } config_set_numerical_field( - "min-slaves-to-write",server.repl_min_slaves_to_write,0,LLONG_MAX) { + "min-slaves-to-write",server.repl_min_slaves_to_write,0,INT_MAX) { refreshGoodSlavesCount(); } config_set_numerical_field( - "min-slaves-max-lag",server.repl_min_slaves_max_lag,0,LLONG_MAX) { + "min-slaves-max-lag",server.repl_min_slaves_max_lag,0,INT_MAX) { refreshGoodSlavesCount(); } config_set_numerical_field( "cluster-node-timeout",server.cluster_node_timeout,0,LLONG_MAX) { @@ -1130,17 +1154,17 @@ void configSetCommand(client *c) { } config_set_numerical_field( "cluster-announce-bus-port",server.cluster_announce_bus_port,0,65535) { } config_set_numerical_field( - "cluster-migration-barrier",server.cluster_migration_barrier,0,LLONG_MAX){ + "cluster-migration-barrier",server.cluster_migration_barrier,0,INT_MAX){ } config_set_numerical_field( - "cluster-slave-validity-factor",server.cluster_slave_validity_factor,0,LLONG_MAX) { + "cluster-slave-validity-factor",server.cluster_slave_validity_factor,0,INT_MAX) { } config_set_numerical_field( - "hz",server.hz,0,LLONG_MAX) { + "hz",server.hz,0,INT_MAX) { /* Hz is more an hint from the user, so we accept values out of range * but cap them to reasonable values. */ if (server.hz < CONFIG_MIN_HZ) server.hz = CONFIG_MIN_HZ; if (server.hz > CONFIG_MAX_HZ) server.hz = CONFIG_MAX_HZ; } config_set_numerical_field( - "watchdog-period",ll,0,LLONG_MAX) { + "watchdog-period",ll,0,INT_MAX) { if (ll) enableWatchdog(ll); else @@ -1267,6 +1291,10 @@ void configGetCommand(client *c) { server.hash_max_ziplist_entries); config_get_numerical_field("hash-max-ziplist-value", server.hash_max_ziplist_value); + config_get_numerical_field("stream-node-max-bytes", + server.stream_node_max_bytes); + config_get_numerical_field("stream-node-max-entries", + server.stream_node_max_entries); config_get_numerical_field("list-max-ziplist-size", server.list_max_ziplist_size); config_get_numerical_field("list-compress-depth", @@ -1333,6 +1361,8 @@ void configGetCommand(client *c) { server.repl_diskless_sync); config_get_bool_field("aof-rewrite-incremental-fsync", server.aof_rewrite_incremental_fsync); + config_get_bool_field("rdb-save-incremental-fsync", + server.rdb_save_incremental_fsync); config_get_bool_field("aof-load-truncated", server.aof_load_truncated); config_get_bool_field("aof-use-rdb-preamble", @@ -2056,6 +2086,8 @@ int rewriteConfig(char *path) { rewriteConfigNotifykeyspaceeventsOption(state); rewriteConfigNumericalOption(state,"hash-max-ziplist-entries",server.hash_max_ziplist_entries,OBJ_HASH_MAX_ZIPLIST_ENTRIES); rewriteConfigNumericalOption(state,"hash-max-ziplist-value",server.hash_max_ziplist_value,OBJ_HASH_MAX_ZIPLIST_VALUE); + rewriteConfigNumericalOption(state,"stream-node-max-bytes",server.stream_node_max_bytes,OBJ_STREAM_NODE_MAX_BYTES); + rewriteConfigNumericalOption(state,"stream-node-max-entries",server.stream_node_max_entries,OBJ_STREAM_NODE_MAX_ENTRIES); rewriteConfigNumericalOption(state,"list-max-ziplist-size",server.list_max_ziplist_size,OBJ_LIST_MAX_ZIPLIST_SIZE); rewriteConfigNumericalOption(state,"list-compress-depth",server.list_compress_depth,OBJ_LIST_COMPRESS_DEPTH); rewriteConfigNumericalOption(state,"set-max-intset-entries",server.set_max_intset_entries,OBJ_SET_MAX_INTSET_ENTRIES); @@ -2068,6 +2100,7 @@ int rewriteConfig(char *path) { rewriteConfigClientoutputbufferlimitOption(state); rewriteConfigNumericalOption(state,"hz",server.hz,CONFIG_DEFAULT_HZ); rewriteConfigYesNoOption(state,"aof-rewrite-incremental-fsync",server.aof_rewrite_incremental_fsync,CONFIG_DEFAULT_AOF_REWRITE_INCREMENTAL_FSYNC); + rewriteConfigYesNoOption(state,"rdb-save-incremental-fsync",server.rdb_save_incremental_fsync,CONFIG_DEFAULT_RDB_SAVE_INCREMENTAL_FSYNC); rewriteConfigYesNoOption(state,"aof-load-truncated",server.aof_load_truncated,CONFIG_DEFAULT_AOF_LOAD_TRUNCATED); rewriteConfigYesNoOption(state,"aof-use-rdb-preamble",server.aof_use_rdb_preamble,CONFIG_DEFAULT_AOF_USE_RDB_PREAMBLE); rewriteConfigEnumOption(state,"supervised",server.supervised_mode,supervised_mode_enum,SUPERVISED_NONE); @@ -2107,10 +2140,10 @@ void configCommand(client *c) { if (c->argc == 2 && !strcasecmp(c->argv[1]->ptr,"help")) { const char *help[] = { -"get -- Return parameters matching the glob-like and their values.", -"set -- Set parameter to value.", -"resetstat -- Reset statistics reported by INFO.", -"rewrite -- Rewrite the configuration file.", +"GET -- Return parameters matching the glob-like and their values.", +"SET -- Set parameter to value.", +"RESETSTAT -- Reset statistics reported by INFO.", +"REWRITE -- Rewrite the configuration file.", NULL }; addReplyHelp(c, help); @@ -2135,8 +2168,7 @@ NULL addReply(c,shared.ok); } } else { - addReplyErrorFormat(c, "Unknown subcommand or wrong number of arguments for '%s'. Try CONFIG HELP", - (char*)c->argv[1]->ptr); + addReplySubcommandSyntaxError(c); return; } } diff --git a/src/config.h b/src/config.h index c23f1c789..ee3ad508e 100644 --- a/src/config.h +++ b/src/config.h @@ -87,11 +87,11 @@ #endif #endif -/* Define aof_fsync to fdatasync() in Linux and fsync() for all the rest */ +/* Define redis_fsync to fdatasync() in Linux and fsync() for all the rest */ #ifdef __linux__ -#define aof_fsync fdatasync +#define redis_fsync fdatasync #else -#define aof_fsync fsync +#define redis_fsync fsync #endif /* Define rdb_fsync_range to sync_file_range() on Linux, otherwise we use diff --git a/src/db.c b/src/db.c index 6c039e129..055af71be 100644 --- a/src/db.c +++ b/src/db.c @@ -90,7 +90,7 @@ robj *lookupKey(redisDb *db, robj *key, int flags) { * LOOKUP_NONE (or zero): no special flags are passed. * LOOKUP_NOTOUCH: don't alter the last access time of the key. * - * Note: this function also returns NULL is the key is logically expired + * Note: this function also returns NULL if the key is logically expired * but still existing, in case this is a slave, since this API is called only * for read operations. Even if the key expiry is master-driven, we can * correctly report a key is expired on slaves even if the master is lagging @@ -113,7 +113,7 @@ robj *lookupKeyReadWithFlags(redisDb *db, robj *key, int flags) { * safety measure, the command invoked is a read-only command, we can * safely return NULL here, and provide a more consistent behavior * to clients accessign expired values in a read-only fashion, that - * will say the key as non exisitng. + * will say the key as non existing. * * Notably this covers GETs when slaves are used to scale reads. */ if (server.current_client && @@ -223,6 +223,8 @@ int dbExists(redisDb *db, robj *key) { * The function makes sure to return keys not already expired. */ robj *dbRandomKey(redisDb *db) { dictEntry *de; + int maxtries = 100; + int allvolatile = dictSize(db->dict) == dictSize(db->expires); while(1) { sds key; @@ -234,6 +236,17 @@ robj *dbRandomKey(redisDb *db) { key = dictGetKey(de); keyobj = createStringObject(key,sdslen(key)); if (dictFind(db->expires,key)) { + if (allvolatile && server.masterhost && --maxtries == 0) { + /* If the DB is composed only of keys with an expire set, + * it could happen that all the keys are already logically + * expired in the slave, so the function cannot stop because + * expireIfNeeded() is false, nor it can stop because + * dictGetRandomKey() returns NULL (there are keys to return). + * To prevent the infinite loop we do some tries, but if there + * are the conditions for an infinite loop, eventually we + * return a key name that may be already expired. */ + return keyobj; + } if (expireIfNeeded(db,keyobj)) { decrRefCount(keyobj); continue; /* search for another key. This expired. */ @@ -305,7 +318,7 @@ robj *dbUnshareStringValue(redisDb *db, robj *key, robj *o) { * If callback is given the function is called from time to time to * signal that work is in progress. * - * The dbnum can be -1 if all teh DBs should be flushed, or the specified + * The dbnum can be -1 if all the DBs should be flushed, or the specified * DB number if we want to flush only a single Redis database number. * * Flags are be EMPTYDB_NO_FLAGS if no special flags are specified or @@ -467,8 +480,7 @@ void existsCommand(client *c) { int j; for (j = 1; j < c->argc; j++) { - expireIfNeeded(c->db,c->argv[j]); - if (dbExists(c->db,c->argv[j])) count++; + if (lookupKeyRead(c->db,c->argv[j])) count++; } addReplyLongLong(c,count); } @@ -942,16 +954,18 @@ void moveCommand(client *c) { } /* Helper function for dbSwapDatabases(): scans the list of keys that have - * one or more blocked clients for B[LR]POP or other list blocking commands - * and signal the keys are ready if they are lists. See the comment where - * the function is used for more info. */ + * one or more blocked clients for B[LR]POP or other blocking commands + * and signal the keys as ready if they are of the right type. See the comment + * where the function is used for more info. */ void scanDatabaseForReadyLists(redisDb *db) { dictEntry *de; dictIterator *di = dictGetSafeIterator(db->blocking_keys); while((de = dictNext(di)) != NULL) { robj *key = dictGetKey(de); robj *value = lookupKey(db,key,LOOKUP_NOTOUCH); - if (value && (value->type == OBJ_LIST || value->type == OBJ_STREAM)) + if (value && (value->type == OBJ_LIST || + value->type == OBJ_STREAM || + value->type == OBJ_ZSET)) signalKeyAsReady(db, key); } dictReleaseIterator(di); @@ -1171,7 +1185,7 @@ int *getKeysUsingCommandTable(struct redisCommand *cmd,robj **argv, int argc, in for (j = cmd->firstkey; j <= last; j += cmd->keystep) { if (j >= argc) { /* Modules commands, and standard commands with a not fixed number - * of arugments (negative arity parameter) do not have dispatch + * of arguments (negative arity parameter) do not have dispatch * time arity checks, so we need to handle the case where the user * passed an invalid number of arguments here. In this case we * return no keys and expect the command implementation to report @@ -1226,7 +1240,7 @@ int *zunionInterGetKeys(struct redisCommand *cmd, robj **argv, int argc, int *nu num = atoi(argv[2]->ptr); /* Sanity check. Don't return any key if the command is going to * reply with syntax error. */ - if (num > (argc-3)) { + if (num < 1 || num > (argc-3)) { *numkeys = 0; return NULL; } @@ -1255,7 +1269,7 @@ int *evalGetKeys(struct redisCommand *cmd, robj **argv, int argc, int *numkeys) num = atoi(argv[2]->ptr); /* Sanity check. Don't return any key if the command is going to * reply with syntax error. */ - if (num > (argc-3)) { + if (num <= 0 || num > (argc-3)) { *numkeys = 0; return NULL; } @@ -1384,23 +1398,37 @@ int *georadiusGetKeys(struct redisCommand *cmd, robj **argv, int argc, int *numk } /* XREAD [BLOCK ] [COUNT ] [GROUP ] - * [RETRY ] STREAMS key_1 key_2 ... key_N - * ID_1 ID_2 ... ID_N */ + * STREAMS key_1 key_2 ... key_N ID_1 ID_2 ... ID_N */ int *xreadGetKeys(struct redisCommand *cmd, robj **argv, int argc, int *numkeys) { - int i, num, *keys; + int i, num = 0, *keys; UNUSED(cmd); - /* We need to seek the last argument that contains "STREAMS", because other - * arguments before may contain it (for example the group name). */ + /* We need to parse the options of the command in order to seek the first + * "STREAMS" string which is actually the option. This is needed because + * "STREAMS" could also be the name of the consumer group and even the + * name of the stream key. */ int streams_pos = -1; for (i = 1; i < argc; i++) { char *arg = argv[i]->ptr; - if (!strcasecmp(arg, "streams")) streams_pos = i; + if (!strcasecmp(arg, "block")) { + i++; /* Skip option argument. */ + } else if (!strcasecmp(arg, "count")) { + i++; /* Skip option argument. */ + } else if (!strcasecmp(arg, "group")) { + i += 2; /* Skip option argument. */ + } else if (!strcasecmp(arg, "noack")) { + /* Nothing to do. */ + } else if (!strcasecmp(arg, "streams")) { + streams_pos = i; + break; + } else { + break; /* Syntax error. */ + } } if (streams_pos != -1) num = argc - streams_pos - 1; /* Syntax error. */ - if (streams_pos == -1 || num % 2 != 0) { + if (streams_pos == -1 || num == 0 || num % 2 != 0) { *numkeys = 0; return NULL; } @@ -1408,7 +1436,7 @@ int *xreadGetKeys(struct redisCommand *cmd, robj **argv, int argc, int *numkeys) there are also the IDs, one per key. */ keys = zmalloc(sizeof(int) * num); - for (i = streams_pos+1; i < argc; i++) keys[i-streams_pos-1] = i; + for (i = streams_pos+1; i < argc-num; i++) keys[i-streams_pos-1] = i; *numkeys = num; return keys; } diff --git a/src/debug.c b/src/debug.c index f239eea5a..32be3c59c 100644 --- a/src/debug.c +++ b/src/debug.c @@ -285,25 +285,26 @@ void computeDatasetDigest(unsigned char *final) { void debugCommand(client *c) { if (c->argc == 2 && !strcasecmp(c->argv[1]->ptr,"help")) { const char *help[] = { -"assert -- Crash by assertion failed.", -"change-repl-id -- Change the replication IDs of the instance. Dangerous, should be used only for testing the replication subsystem.", -"crash-and-recovery -- Hard crash and restart after delay.", -"digest -- Outputs an hex signature representing the current DB content.", -"htstats -- Return hash table statistics of the specified Redis database.", -"loadaof -- Flush the AOF buffers on disk and reload the AOF in memory.", -"lua-always-replicate-commands (0|1) -- Setting it to 1 makes Lua replication defaulting to replicating single commands, without the script having to enable effects replication.", -"object -- Show low level info about key and associated value.", -"panic -- Crash the server simulating a panic.", -"populate [prefix] [size] -- Create string keys named key:. If a prefix is specified is used instead of the 'key' prefix.", -"reload -- Save the RDB on disk and reload it back in memory.", -"restart -- Graceful restart: save config, db, restart.", -"sdslen -- Show low level SDS string info representing key and value.", -"segfault -- Crash the server with sigsegv.", -"set-active-expire (0|1) -- Setting it to 0 disables expiring keys in background when they are not accessed (otherwise the Redis behavior). Setting it to 1 reenables back the default.", -"sleep -- Stop the server for . Decimals allowed.", -"structsize -- Return the size of different Redis core C structures.", -"ziplist -- Show low level info about the ziplist encoding.", -"error -- Return a Redis protocol error with as message. Useful for clients unit tests to simulate Redis errors.", +"ASSERT -- Crash by assertion failed.", +"CHANGE-REPL-ID -- Change the replication IDs of the instance. Dangerous, should be used only for testing the replication subsystem.", +"CRASH-AND-RECOVER -- Hard crash and restart after delay.", +"DIGEST -- Output a hex signature representing the current DB content.", +"ERROR -- Return a Redis protocol error with as message. Useful for clients unit tests to simulate Redis errors.", +"HTSTATS -- Return hash table statistics of the specified Redis database.", +"HTSTATS-KEY -- Like htstats but for the hash table stored as key's value.", +"LOADAOF -- Flush the AOF buffers on disk and reload the AOF in memory.", +"LUA-ALWAYS-REPLICATE-COMMANDS <0|1> -- Setting it to 1 makes Lua replication defaulting to replicating single commands, without the script having to enable effects replication.", +"OBJECT -- Show low level info about key and associated value.", +"PANIC -- Crash the server simulating a panic.", +"POPULATE [prefix] [size] -- Create string keys named key:. If a prefix is specified is used instead of the 'key' prefix.", +"RELOAD -- Save the RDB on disk and reload it back in memory.", +"RESTART -- Graceful restart: save config, db, restart.", +"SDSLEN -- Show low level SDS string info representing key and value.", +"SEGFAULT -- Crash the server with sigsegv.", +"SET-ACTIVE-EXPIRE <0|1> -- Setting it to 0 disables expiring keys in background when they are not accessed (otherwise the Redis behavior). Setting it to 1 reenables back the default.", +"SLEEP -- Stop the server for . Decimals allowed.", +"STRUCTSIZE -- Return the size of different Redis core C structures.", +"ZIPLIST -- Show low level info about the ziplist encoding.", NULL }; addReplyHelp(c, help); @@ -347,7 +348,7 @@ NULL serverLog(LL_WARNING,"DB reloaded by DEBUG RELOAD"); addReply(c,shared.ok); } else if (!strcasecmp(c->argv[1]->ptr,"loadaof")) { - if (server.aof_state == AOF_ON) flushAppendOnlyFile(1); + if (server.aof_state != AOF_OFF) flushAppendOnlyFile(1); emptyDb(-1,EMPTYDB_NO_FLAGS,NULL); if (loadAppendOnlyFile(server.aof_filename) != C_OK) { addReply(c,shared.err); @@ -547,14 +548,41 @@ NULL stats = sdscat(stats,buf); addReplyBulkSds(c,stats); + } else if (!strcasecmp(c->argv[1]->ptr,"htstats-key") && c->argc == 3) { + robj *o; + dict *ht = NULL; + + if ((o = objectCommandLookupOrReply(c,c->argv[2],shared.nokeyerr)) + == NULL) return; + + /* Get the hash table reference from the object, if possible. */ + switch (o->encoding) { + case OBJ_ENCODING_SKIPLIST: + { + zset *zs = o->ptr; + ht = zs->dict; + } + break; + case OBJ_ENCODING_HT: + ht = o->ptr; + break; + } + + if (ht == NULL) { + addReplyError(c,"The value stored at the specified key is not " + "represented using an hash table"); + } else { + char buf[4096]; + dictGetStats(buf,sizeof(buf),ht); + addReplyBulkCString(c,buf); + } } else if (!strcasecmp(c->argv[1]->ptr,"change-repl-id") && c->argc == 2) { serverLog(LL_WARNING,"Changing replication IDs after receiving DEBUG change-repl-id"); changeReplicationId(); clearReplicationId2(); addReply(c,shared.ok); } else { - addReplyErrorFormat(c, "Unknown subcommand or wrong number of arguments for '%s'. Try DEBUG HELP", - (char*)c->argv[1]->ptr); + addReplySubcommandSyntaxError(c); return; } } @@ -1048,7 +1076,7 @@ void sigsegvHandler(int sig, siginfo_t *info, void *secret) { infostring = genRedisInfoString("all"); serverLogRaw(LL_WARNING|LL_RAW, infostring); serverLogRaw(LL_WARNING|LL_RAW, "\n------ CLIENT LIST OUTPUT ------\n"); - clients = getAllClientsInfoString(); + clients = getAllClientsInfoString(-1); serverLogRaw(LL_WARNING|LL_RAW, clients); sdsfree(infostring); sdsfree(clients); diff --git a/src/defrag.c b/src/defrag.c index aae72adcb..d67b6e253 100644 --- a/src/defrag.c +++ b/src/defrag.c @@ -592,6 +592,171 @@ long defragSet(redisDb *db, dictEntry *kde) { return defragged; } +/* Defrag callback for radix tree iterator, called for each node, + * used in order to defrag the nodes allocations. */ +int defragRaxNode(raxNode **noderef) { + raxNode *newnode = activeDefragAlloc(*noderef); + if (newnode) { + *noderef = newnode; + return 1; + } + return 0; +} + +/* returns 0 if no more work needs to be been done, and 1 if time is up and more work is needed. */ +int scanLaterStraemListpacks(robj *ob, unsigned long *cursor, long long endtime, long long *defragged) { + static unsigned char last[sizeof(streamID)]; + raxIterator ri; + long iterations = 0; + if (ob->type != OBJ_STREAM || ob->encoding != OBJ_ENCODING_STREAM) { + *cursor = 0; + return 0; + } + + stream *s = ob->ptr; + raxStart(&ri,s->rax); + if (*cursor == 0) { + /* if cursor is 0, we start new iteration */ + defragRaxNode(&s->rax->head); + /* assign the iterator node callback before the seek, so that the + * initial nodes that are processed till the first item are covered */ + ri.node_cb = defragRaxNode; + raxSeek(&ri,"^",NULL,0); + } else { + /* if cursor is non-zero, we seek to the static 'last' */ + if (!raxSeek(&ri,">", last, sizeof(last))) { + *cursor = 0; + return 0; + } + /* assign the iterator node callback after the seek, so that the + * initial nodes that are processed till now aren't covered */ + ri.node_cb = defragRaxNode; + } + + (*cursor)++; + while (raxNext(&ri)) { + void *newdata = activeDefragAlloc(ri.data); + if (newdata) + raxSetData(ri.node, ri.data=newdata), (*defragged)++; + if (++iterations > 16) { + if (ustime() > endtime) { + serverAssert(ri.key_len==sizeof(last)); + memcpy(last,ri.key,ri.key_len); + raxStop(&ri); + return 1; + } + iterations = 0; + } + } + raxStop(&ri); + *cursor = 0; + return 0; +} + +/* optional callback used defrag each rax element (not including the element pointer itself) */ +typedef void *(raxDefragFunction)(raxIterator *ri, void *privdata, long *defragged); + +/* defrag radix tree including: + * 1) rax struct + * 2) rax nodes + * 3) rax entry data (only if defrag_data is specified) + * 4) call a callback per element, and allow the callback to return a new pointer for the element */ +long defragRadixTree(rax **raxref, int defrag_data, raxDefragFunction *element_cb, void *element_cb_data) { + long defragged = 0; + raxIterator ri; + rax* rax; + if ((rax = activeDefragAlloc(*raxref))) + defragged++, *raxref = rax; + rax = *raxref; + raxStart(&ri,rax); + ri.node_cb = defragRaxNode; + defragRaxNode(&rax->head); + raxSeek(&ri,"^",NULL,0); + while (raxNext(&ri)) { + void *newdata = NULL; + if (element_cb) + newdata = element_cb(&ri, element_cb_data, &defragged); + if (defrag_data && !newdata) + newdata = activeDefragAlloc(ri.data); + if (newdata) + raxSetData(ri.node, ri.data=newdata), defragged++; + } + raxStop(&ri); + return defragged; +} + +typedef struct { + streamCG *cg; + streamConsumer *c; +} PendingEntryContext; + +void* defragStreamConsumerPendingEntry(raxIterator *ri, void *privdata, long *defragged) { + UNUSED(defragged); + PendingEntryContext *ctx = privdata; + streamNACK *nack = ri->data, *newnack; + nack->consumer = ctx->c; /* update nack pointer to consumer */ + newnack = activeDefragAlloc(nack); + if (newnack) { + /* update consumer group pointer to the nack */ + void *prev; + raxInsert(ctx->cg->pel, ri->key, ri->key_len, newnack, &prev); + serverAssert(prev==nack); + /* note: we don't increment 'defragged' that's done by the caller */ + } + return newnack; +} + +void* defragStreamConsumer(raxIterator *ri, void *privdata, long *defragged) { + streamConsumer *c = ri->data; + streamCG *cg = privdata; + void *newc = activeDefragAlloc(c); + if (newc) { + /* note: we don't increment 'defragged' that's done by the caller */ + c = newc; + } + sds newsds = activeDefragSds(c->name); + if (newsds) + (*defragged)++, c->name = newsds; + if (c->pel) { + PendingEntryContext pel_ctx = {cg, c}; + *defragged += defragRadixTree(&c->pel, 0, defragStreamConsumerPendingEntry, &pel_ctx); + } + return newc; /* returns NULL if c was not defragged */ +} + +void* defragStreamConsumerGroup(raxIterator *ri, void *privdata, long *defragged) { + streamCG *cg = ri->data; + UNUSED(privdata); + if (cg->consumers) + *defragged += defragRadixTree(&cg->consumers, 0, defragStreamConsumer, cg); + if (cg->pel) + *defragged += defragRadixTree(&cg->pel, 0, NULL, NULL); + return NULL; +} + +long defragStream(redisDb *db, dictEntry *kde) { + long defragged = 0; + robj *ob = dictGetVal(kde); + serverAssert(ob->type == OBJ_STREAM && ob->encoding == OBJ_ENCODING_STREAM); + stream *s = ob->ptr, *news; + + /* handle the main struct */ + if ((news = activeDefragAlloc(s))) + defragged++, ob->ptr = s = news; + + if (raxSize(s->rax) > server.active_defrag_max_scan_fields) { + rax *newrax = activeDefragAlloc(s->rax); + if (newrax) + defragged++, s->rax = newrax; + defragLater(db, kde); + } else + defragged += defragRadixTree(&s->rax, 1, NULL, NULL); + + if (s->cgroups) + defragged += defragRadixTree(&s->cgroups, 1, defragStreamConsumerGroup, NULL); + return defragged; +} + /* for each key we scan in the main dict, this function will attempt to defrag * all the various pointers it has. Returns a stat of how many pointers were * moved. */ @@ -660,6 +825,8 @@ long defragKey(redisDb *db, dictEntry *de) { } else { serverPanic("Unknown hash encoding"); } + } else if (ob->type == OBJ_STREAM) { + defragged += defragStream(db, de); } else if (ob->type == OBJ_MODULE) { /* Currently defragmenting modules private data types * is not supported. */ @@ -680,7 +847,7 @@ void defragScanCallback(void *privdata, const dictEntry *de) { server.stat_active_defrag_scanned++; } -/* Defrag scan callback for for each hash table bicket, +/* Defrag scan callback for each hash table bicket, * used in order to defrag the dictEntry allocations. */ void defragDictBucketCallback(void *privdata, dictEntry **bucketref) { UNUSED(privdata); /* NOTE: this function is also used by both activeDefragCycle and scanLaterHash, etc. don't use privdata */ @@ -700,9 +867,8 @@ void defragDictBucketCallback(void *privdata, dictEntry **bucketref) { * or not, a false detection can cause the defragmenter to waste a lot of CPU * without the possibility of getting any results. */ float getAllocatorFragmentation(size_t *out_frag_bytes) { - size_t resident = server.cron_malloc_stats.allocator_resident; - size_t active = server.cron_malloc_stats.allocator_active; - size_t allocated = server.cron_malloc_stats.allocator_allocated; + size_t resident, active, allocated; + zmalloc_get_allocator_info(&allocated, &active, &resident); float frag_pct = ((float)active / allocated)*100 - 100; size_t frag_bytes = active - allocated; float rss_pct = ((float)resident / allocated)*100 - 100; @@ -728,27 +894,29 @@ long defragOtherGlobals() { return defragged; } -unsigned long defragLaterItem(dictEntry *de, unsigned long cursor) { - long defragged = 0; +/* returns 0 more work may or may not be needed (see non-zero cursor), + * and 1 if time is up and more work is needed. */ +int defragLaterItem(dictEntry *de, unsigned long *cursor, long long endtime) { if (de) { robj *ob = dictGetVal(de); if (ob->type == OBJ_LIST) { - defragged += scanLaterList(ob); - cursor = 0; /* list has no scan, we must finish it in one go */ + server.stat_active_defrag_hits += scanLaterList(ob); + *cursor = 0; /* list has no scan, we must finish it in one go */ } else if (ob->type == OBJ_SET) { - defragged += scanLaterSet(ob, &cursor); + server.stat_active_defrag_hits += scanLaterSet(ob, cursor); } else if (ob->type == OBJ_ZSET) { - defragged += scanLaterZset(ob, &cursor); + server.stat_active_defrag_hits += scanLaterZset(ob, cursor); } else if (ob->type == OBJ_HASH) { - defragged += scanLaterHash(ob, &cursor); + server.stat_active_defrag_hits += scanLaterHash(ob, cursor); + } else if (ob->type == OBJ_STREAM) { + return scanLaterStraemListpacks(ob, cursor, endtime, &server.stat_active_defrag_hits); } else { - cursor = 0; /* object type may have changed since we schedule it for later */ + *cursor = 0; /* object type may have changed since we schedule it for later */ } } else { - cursor = 0; /* object may have been deleted already */ + *cursor = 0; /* object may have been deleted already */ } - server.stat_active_defrag_hits += defragged; - return cursor; + return 0; } /* returns 0 if no more work needs to be been done, and 1 if time is up and more work is needed. */ @@ -788,17 +956,22 @@ int defragLaterStep(redisDb *db, long long endtime) { dictEntry *de = dictFind(db->dict, current_key); key_defragged = server.stat_active_defrag_hits; do { - cursor = defragLaterItem(de, cursor); + int quit = 0; + if (defragLaterItem(de, &cursor, endtime)) + quit = 1; /* time is up, we didn't finish all the work */ + + /* Don't start a new BIG key in this loop, this is because the + * next key can be a list, and scanLaterList must be done in once cycle */ + if (!cursor) + quit = 1; /* Once in 16 scan iterations, 512 pointer reallocations, or 64 fields * (if we have a lot of pointers in one hash bucket, or rehashing), - * check if we reached the time limit. - * But regardless, don't start a new BIG key in this loop, this is because the - * next key can be a list, and scanLaterList must be done in once cycle */ - if (!cursor || (++iterations > 16 || + * check if we reached the time limit. */ + if (quit || (++iterations > 16 || server.stat_active_defrag_hits - prev_defragged > 512 || server.stat_active_defrag_scanned - prev_scanned > 64)) { - if (!cursor || ustime() > endtime) { + if (quit || ustime() > endtime) { if(key_defragged != server.stat_active_defrag_hits) server.stat_active_defrag_key_hits++; else diff --git a/src/dict.c b/src/dict.c index 97e636805..2cf9d4839 100644 --- a/src/dict.c +++ b/src/dict.c @@ -146,14 +146,14 @@ int dictResize(dict *d) /* Expand or create the hash table */ int dictExpand(dict *d, unsigned long size) { - dictht n; /* the new hash table */ - unsigned long realsize = _dictNextPower(size); - /* the size is invalid if it is smaller than the number of * elements already inside the hash table */ if (dictIsRehashing(d) || d->ht[0].used > size) return DICT_ERR; + dictht n; /* the new hash table */ + unsigned long realsize = _dictNextPower(size); + /* Rehashing to the same table size is not useful. */ if (realsize == d->ht[0].size) return DICT_ERR; @@ -327,7 +327,7 @@ int dictReplace(dict *d, void *key, void *val) dictEntry *entry, *existing, auxentry; /* Try to add the element. If the key - * does not exists dictAdd will suceed. */ + * does not exists dictAdd will succeed. */ entry = dictAddRaw(d,key,&existing); if (entry) { dictSetVal(d, entry, val); @@ -705,8 +705,10 @@ unsigned int dictGetSomeKeys(dict *d, dictEntry **des, unsigned int count) { * table, there will be no elements in both tables up to * the current rehashing index, so we jump if possible. * (this happens when going from big to small table). */ - if (i >= d->ht[1].size) i = d->rehashidx; - continue; + if (i >= d->ht[1].size) + i = d->rehashidx; + else + continue; } if (i >= d->ht[j].size) continue; /* Out of range for this table. */ dictEntry *he = d->ht[j].table[i]; @@ -858,6 +860,15 @@ unsigned long dictScan(dict *d, de = next; } + /* Set unmasked bits so incrementing the reversed cursor + * operates on the masked bits */ + v |= ~m0; + + /* Increment the reverse cursor */ + v = rev(v); + v++; + v = rev(v); + } else { t0 = &d->ht[0]; t1 = &d->ht[1]; @@ -892,22 +903,16 @@ unsigned long dictScan(dict *d, de = next; } - /* Increment bits not covered by the smaller mask */ - v = (((v | m0) + 1) & ~m0) | (v & m0); + /* Increment the reverse cursor not covered by the smaller mask.*/ + v |= ~m1; + v = rev(v); + v++; + v = rev(v); /* Continue while bits covered by mask difference is non-zero */ } while (v & (m0 ^ m1)); } - /* Set unmasked bits so incrementing the reversed cursor - * operates on the masked bits of the smaller table */ - v |= ~m0; - - /* Increment the reverse cursor */ - v = rev(v); - v++; - v = rev(v); - return v; } diff --git a/src/endianconv.h b/src/endianconv.h index 08f553136..475f72b08 100644 --- a/src/endianconv.h +++ b/src/endianconv.h @@ -43,12 +43,12 @@ uint16_t intrev16(uint16_t v); uint32_t intrev32(uint32_t v); uint64_t intrev64(uint64_t v); -/* variants of the function doing the actual convertion only if the target +/* variants of the function doing the actual conversion only if the target * host is big endian */ #if (BYTE_ORDER == LITTLE_ENDIAN) -#define memrev16ifbe(p) -#define memrev32ifbe(p) -#define memrev64ifbe(p) +#define memrev16ifbe(p) ((void)(0)) +#define memrev32ifbe(p) ((void)(0)) +#define memrev64ifbe(p) ((void)(0)) #define intrev16ifbe(v) (v) #define intrev32ifbe(v) (v) #define intrev64ifbe(v) (v) diff --git a/src/expire.c b/src/expire.c index ce7882e4c..0b92ee3fe 100644 --- a/src/expire.c +++ b/src/expire.c @@ -112,7 +112,7 @@ void activeExpireCycle(int type) { if (type == ACTIVE_EXPIRE_CYCLE_FAST) { /* Don't start a fast cycle if the previous cycle did not exit - * for time limt. Also don't repeat a fast cycle for the same period + * for time limit. Also don't repeat a fast cycle for the same period * as the fast cycle total duration itself. */ if (!timelimit_exit) return; if (start < last_fast_cycle + ACTIVE_EXPIRE_CYCLE_FAST_DURATION*2) return; diff --git a/src/geo.c b/src/geo.c index 90216e7dd..c78fadfcf 100644 --- a/src/geo.c +++ b/src/geo.c @@ -145,7 +145,7 @@ double extractUnitOrReply(client *c, robj *unit) { /* Input Argument Helper. * Extract the dinstance from the specified two arguments starting at 'argv' * that shouldbe in the form: and return the dinstance in the - * specified unit on success. *conversino is populated with the coefficient + * specified unit on success. *conversions is populated with the coefficient * to use in order to convert meters to the unit. * * On error a value less than zero is returned. */ diff --git a/src/geohash.c b/src/geohash.c index 1ae7a7e05..b40282e76 100644 --- a/src/geohash.c +++ b/src/geohash.c @@ -144,8 +144,8 @@ int geohashEncode(const GeoHashRange *long_range, const GeoHashRange *lat_range, (longitude - long_range->min) / (long_range->max - long_range->min); /* convert to fixed point based on the step size */ - lat_offset *= (1 << step); - long_offset *= (1 << step); + lat_offset *= (1ULL << step); + long_offset *= (1ULL << step); hash->bits = interleave64(lat_offset, long_offset); return 1; } diff --git a/src/help.h b/src/help.h index 5f927c303..c89f1f44b 100644 --- a/src/help.h +++ b/src/help.h @@ -1,4 +1,4 @@ -/* Automatically generated by utils/generate-command-help.rb, do not edit. */ +/* Automatically generated by generate-command-help.rb, do not edit. */ #ifndef __REDIS_HELP_H #define __REDIS_HELP_H @@ -17,7 +17,8 @@ static char *commandGroups[] = { "scripting", "hyperloglog", "cluster", - "geo" + "geo", + "stream" }; struct commandHelp { @@ -82,6 +83,16 @@ struct commandHelp { "Pop a value from a list, push it to another list and return it; or block until one is available", 2, "2.2.0" }, + { "BZPOPMAX", + "key [key ...] timeout", + "Remove and return the member with the highest score from one or more sorted sets, or block until one is available", + 4, + "5.0.0" }, + { "BZPOPMIN", + "key [key ...] timeout", + "Remove and return the member with the lowest score from one or more sorted sets, or block until one is available", + 4, + "5.0.0" }, { "CLIENT GETNAME", "-", "Get the current connection name", @@ -318,12 +329,12 @@ struct commandHelp { 0, "1.2.0" }, { "FLUSHALL", - "-", + "[ASYNC]", "Remove all keys from all databases", 9, "1.0.0" }, { "FLUSHDB", - "-", + "[ASYNC]", "Remove all keys from the current database", 9, "1.0.0" }, @@ -532,6 +543,36 @@ struct commandHelp { "Trim a list to the specified range", 2, "1.0.0" }, + { "MEMORY DOCTOR", + "-", + "Outputs memory problems report", + 9, + "4.0.0" }, + { "MEMORY HELP", + "-", + "Show helpful text about the different subcommands", + 9, + "4.0.0" }, + { "MEMORY MALLOC-STATS", + "-", + "Show allocator internal stats", + 9, + "4.0.0" }, + { "MEMORY PURGE", + "-", + "Ask the allocator to release memory", + 9, + "4.0.0" }, + { "MEMORY STATS", + "-", + "Show memory usage details", + 9, + "4.0.0" }, + { "MEMORY USAGE", + "key [SAMPLES count]", + "Estimate the memory usage of a key", + 9, + "4.0.0" }, { "MGET", "key [key ...]", "Get the values of all the given keys", @@ -723,7 +764,7 @@ struct commandHelp { 10, "3.2.0" }, { "SCRIPT EXISTS", - "script [script ...]", + "sha1 [sha1 ...]", "Check existence of scripts in the script cache.", 10, "2.6.0" }, @@ -758,7 +799,7 @@ struct commandHelp { 8, "1.0.0" }, { "SET", - "key value [EX seconds] [PX milliseconds] [NX|XX]", + "key value [expiration EX seconds|PX milliseconds] [NX|XX]", "Set the string value of a key", 1, "1.0.0" }, @@ -867,6 +908,11 @@ struct commandHelp { "Add multiple sets and store the resulting set in a key", 3, "1.0.0" }, + { "SWAPDB", + "index index", + "Swaps two Redis databases", + 8, + "4.0.0" }, { "SYNC", "-", "Internal command used for replication", @@ -877,6 +923,11 @@ struct commandHelp { "Return the current server time", 9, "2.6.0" }, + { "TOUCH", + "key [key ...]", + "Alters the last access time of a key(s). Returns the number of existing keys specified.", + 0, + "3.2.1" }, { "TTL", "key", "Get the time to live for a key", @@ -887,6 +938,11 @@ struct commandHelp { "Determine the type stored at key", 0, "1.0.0" }, + { "UNLINK", + "key [key ...]", + "Delete a key asynchronously in another thread. Otherwise it is just as DEL, but non blocking.", + 0, + "4.0.0" }, { "UNSUBSCRIBE", "[channel [channel ...]]", "Stop listening for messages posted to the given channels", @@ -907,6 +963,41 @@ struct commandHelp { "Watch the given keys to determine execution of the MULTI/EXEC block", 7, "2.2.0" }, + { "XADD", + "key ID field string [field string ...]", + "Appends a new entry to a stream", + 14, + "5.0.0" }, + { "XLEN", + "key", + "Return the number of entires in a stream", + 14, + "5.0.0" }, + { "XPENDING", + "key group [start end count] [consumer]", + "Return information and entries from a stream consumer group pending entries list, that are messages fetched but never acknowledged.", + 14, + "5.0.0" }, + { "XRANGE", + "key start end [COUNT count]", + "Return a range of elements in a stream, with IDs matching the specified IDs interval", + 14, + "5.0.0" }, + { "XREAD", + "[COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]", + "Return never seen elements in multiple streams, with IDs greater than the ones reported by the caller for each stream. Can block.", + 14, + "5.0.0" }, + { "XREADGROUP", + "GROUP group consumer [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]", + "Return new entries from a stream using a consumer group, or access the history of the pending entries for a given consumer. Can block.", + 14, + "5.0.0" }, + { "XREVRANGE", + "key end start [COUNT count]", + "Return a range of elements in a stream, with IDs matching the specified IDs interval, in reverse order (from greater to smaller IDs) compared to XRANGE", + 14, + "5.0.0" }, { "ZADD", "key [NX|XX] [CH] [INCR] score member [score member ...]", "Add one or more members to a sorted set, or update its score if it already exists", @@ -937,6 +1028,16 @@ struct commandHelp { "Count the number of members in a sorted set between a given lexicographical range", 4, "2.8.9" }, + { "ZPOPMAX", + "key [count]", + "Remove and return members with the highest scores in a sorted set", + 4, + "5.0.0" }, + { "ZPOPMIN", + "key [count]", + "Remove and return members with the lowest scores in a sorted set", + 4, + "5.0.0" }, { "ZRANGE", "key start stop [WITHSCORES]", "Return a range of members in a sorted set, by index", diff --git a/src/hyperloglog.c b/src/hyperloglog.c index 0670c1cf5..ba3a3ab60 100644 --- a/src/hyperloglog.c +++ b/src/hyperloglog.c @@ -429,14 +429,14 @@ uint64_t MurmurHash64A (const void * key, int len, unsigned int seed) { } switch(len & 7) { - case 7: h ^= (uint64_t)data[6] << 48; - case 6: h ^= (uint64_t)data[5] << 40; - case 5: h ^= (uint64_t)data[4] << 32; - case 4: h ^= (uint64_t)data[3] << 24; - case 3: h ^= (uint64_t)data[2] << 16; - case 2: h ^= (uint64_t)data[1] << 8; + case 7: h ^= (uint64_t)data[6] << 48; /* fall-thru */ + case 6: h ^= (uint64_t)data[5] << 40; /* fall-thru */ + case 5: h ^= (uint64_t)data[4] << 32; /* fall-thru */ + case 4: h ^= (uint64_t)data[3] << 24; /* fall-thru */ + case 3: h ^= (uint64_t)data[2] << 16; /* fall-thru */ + case 2: h ^= (uint64_t)data[1] << 8; /* fall-thru */ case 1: h ^= (uint64_t)data[0]; - h *= m; + h *= m; /* fall-thru */ }; h ^= h >> r; @@ -673,7 +673,7 @@ int hllSparseSet(robj *o, long index, uint8_t count) { end = p + sdslen(o->ptr) - HLL_HDR_SIZE; first = 0; - prev = NULL; /* Points to previos opcode at the end of the loop. */ + prev = NULL; /* Points to previous opcode at the end of the loop. */ next = NULL; /* Points to the next opcode at the end of the loop. */ span = 0; while(p < end) { @@ -764,7 +764,7 @@ int hllSparseSet(robj *o, long index, uint8_t count) { * and is either currently represented by a VAL opcode with len > 1, * by a ZERO opcode with len > 1, or by an XZERO opcode. * - * In those cases the original opcode must be split into muliple + * In those cases the original opcode must be split into multiple * opcodes. The worst case is an XZERO split in the middle resuling into * XZERO - VAL - XZERO, so the resulting sequence max length is * 5 bytes. @@ -887,7 +887,7 @@ promote: /* Promote to dense representation. */ * * Note that this in turn means that PFADD will make sure the command * is propagated to slaves / AOF, so if there is a sparse -> dense - * convertion, it will be performed in all the slaves as well. */ + * conversion, it will be performed in all the slaves as well. */ int dense_retval = hllDenseSet(hdr->registers,index,count); serverAssert(dense_retval == 1); return dense_retval; diff --git a/src/latency.c b/src/latency.c index 292720aa0..e8d2af306 100644 --- a/src/latency.c +++ b/src/latency.c @@ -152,7 +152,7 @@ int latencyResetEvent(char *event_to_reset) { /* ------------------------ Latency reporting (doctor) ---------------------- */ -/* Analyze the samples avaialble for a given event and return a structure +/* Analyze the samples available for a given event and return a structure * populate with different metrics, average, MAD, min, max, and so forth. * Check latency.h definition of struct latenctStat for more info. * If the specified event has no elements the structure is populate with @@ -294,7 +294,7 @@ sds createLatencyReport(void) { /* Potentially commands. */ if (!strcasecmp(event,"command")) { - if (server.slowlog_log_slower_than == 0) { + if (server.slowlog_log_slower_than < 0) { advise_slowlog_enabled = 1; advices++; } else if (server.slowlog_log_slower_than/1000 > diff --git a/src/lazyfree.c b/src/lazyfree.c index f1de0c898..ac8a6bee9 100644 --- a/src/lazyfree.c +++ b/src/lazyfree.c @@ -23,10 +23,10 @@ size_t lazyfreeGetPendingObjectsCount(void) { * the function just returns the number of elements the object is composed of. * * Objects composed of single allocations are always reported as having a - * single item even if they are actaully logical composed of multiple + * single item even if they are actually logical composed of multiple * elements. * - * For lists the funciton returns the number of elements in the quicklist + * For lists the function returns the number of elements in the quicklist * representing the list. */ size_t lazyfreeGetFreeEffort(robj *obj) { if (obj->type == OBJ_LIST) { diff --git a/src/listpack.c b/src/listpack.c index 30ea34690..c3070db6d 100644 --- a/src/listpack.c +++ b/src/listpack.c @@ -291,7 +291,7 @@ int lpEncodeGetType(unsigned char *ele, uint32_t size, unsigned char *intenc, ui /* Store a reverse-encoded variable length field, representing the length * of the previous element of size 'l', in the target buffer 'buf'. * The function returns the number of bytes used to encode it, from - * 1 to 5. If 'buf' is NULL the funciton just returns the number of bytes + * 1 to 5. If 'buf' is NULL the function just returns the number of bytes * needed in order to encode the backlen. */ unsigned long lpEncodeBacklen(unsigned char *buf, uint64_t l) { if (l <= 127) { @@ -568,7 +568,7 @@ unsigned char *lpGet(unsigned char *p, int64_t *count, unsigned char *intbuf) { } } -/* Insert, delete or replace the specified element 'ele' of lenght 'len' at +/* Insert, delete or replace the specified element 'ele' of length 'len' at * the specified position 'p', with 'p' being a listpack element pointer * obtained with lpFirst(), lpLast(), lpIndex(), lpNext(), lpPrev() or * lpSeek(). @@ -710,7 +710,7 @@ unsigned char *lpInsert(unsigned char *lp, unsigned char *ele, uint32_t size, un return lp; } -/* Append the specified element 'ele' of lenght 'len' at the end of the +/* Append the specified element 'ele' of length 'len' at the end of the * listpack. It is implemented in terms of lpInsert(), so the return value is * the same as lpInsert(). */ unsigned char *lpAppend(unsigned char *lp, unsigned char *ele, uint32_t size) { diff --git a/src/localtime.c b/src/localtime.c new file mode 100644 index 000000000..3f59a3331 --- /dev/null +++ b/src/localtime.c @@ -0,0 +1,123 @@ +/* + * Copyright (c) 2018, Salvatore Sanfilippo + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * * Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * * Neither the name of Redis nor the names of its contributors may be used + * to endorse or promote products derived from this software without + * specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#include + +/* This is a safe version of localtime() which contains no locks and is + * fork() friendly. Even the _r version of localtime() cannot be used safely + * in Redis. Another thread may be calling localtime() while the main thread + * forks(). Later when the child process calls localtime() again, for instance + * in order to log something to the Redis log, it may deadlock: in the copy + * of the address space of the forked process the lock will never be released. + * + * This function takes the timezone 'tz' as argument, and the 'dst' flag is + * used to check if daylight saving time is currently in effect. The caller + * of this function should obtain such information calling tzset() ASAP in the + * main() function to obtain the timezone offset from the 'timezone' global + * variable. To obtain the daylight information, if it is currently active or not, + * one trick is to call localtime() in main() ASAP as well, and get the + * information from the tm_isdst field of the tm structure. However the daylight + * time may switch in the future for long running processes, so this information + * should be refreshed at safe times. + * + * Note that this function does not work for dates < 1/1/1970, it is solely + * designed to work with what time(NULL) may return, and to support Redis + * logging of the dates, it's not really a complete implementation. */ +static int is_leap_year(time_t year) { + if (year % 4) return 0; /* A year not divisible by 4 is not leap. */ + else if (year % 100) return 1; /* If div by 4 and not 100 is surely leap. */ + else if (year % 400) return 0; /* If div by 100 *and* 400 is not leap. */ + else return 1; /* If div by 100 and not by 400 is leap. */ +} + +void nolocks_localtime(struct tm *tmp, time_t t, time_t tz, int dst) { + const time_t secs_min = 60; + const time_t secs_hour = 3600; + const time_t secs_day = 3600*24; + + t -= tz; /* Adjust for timezone. */ + t += 3600*dst; /* Adjust for daylight time. */ + time_t days = t / secs_day; /* Days passed since epoch. */ + time_t seconds = t % secs_day; /* Remaining seconds. */ + + tmp->tm_isdst = dst; + tmp->tm_hour = seconds / secs_hour; + tmp->tm_min = (seconds % secs_hour) / secs_min; + tmp->tm_sec = (seconds % secs_hour) % secs_min; + + /* 1/1/1970 was a Thursday, that is, day 4 from the POV of the tm structure + * where sunday = 0, so to calculate the day of the week we have to add 4 + * and take the modulo by 7. */ + tmp->tm_wday = (days+4)%7; + + /* Calculate the current year. */ + tmp->tm_year = 1970; + while(1) { + /* Leap years have one day more. */ + time_t days_this_year = 365 + is_leap_year(tmp->tm_year); + if (days_this_year > days) break; + days -= days_this_year; + tmp->tm_year++; + } + tmp->tm_yday = days; /* Number of day of the current year. */ + + /* We need to calculate in which month and day of the month we are. To do + * so we need to skip days according to how many days there are in each + * month, and adjust for the leap year that has one more day in February. */ + int mdays[12] = {31, 28, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31}; + mdays[1] += is_leap_year(tmp->tm_year); + + tmp->tm_mon = 0; + while(days >= mdays[tmp->tm_mon]) { + days -= mdays[tmp->tm_mon]; + tmp->tm_mon++; + } + + tmp->tm_mday = days+1; /* Add 1 since our 'days' is zero-based. */ + tmp->tm_year -= 1900; /* Surprisingly tm_year is year-1900. */ +} + +#ifdef LOCALTIME_TEST_MAIN +#include + +int main(void) { + /* Obtain timezone and daylight info. */ + tzset(); /* Now 'timezome' global is populated. */ + time_t t = time(NULL); + struct tm *aux = localtime(&t); + int daylight_active = aux->tm_isdst; + + struct tm tm; + char buf[1024]; + + nolocks_localtime(&tm,t,timezone,daylight_active); + strftime(buf,sizeof(buf),"%d %b %H:%M:%S",&tm); + printf("[timezone: %d, dl: %d] %s\n", (int)timezone, (int)daylight_active, buf); +} +#endif diff --git a/src/lzf_d.c b/src/lzf_d.c index c32be8e87..93f43c27c 100644 --- a/src/lzf_d.c +++ b/src/lzf_d.c @@ -86,6 +86,8 @@ lzf_decompress (const void *const in_data, unsigned int in_len, #ifdef lzf_movsb lzf_movsb (op, ip, ctrl); #else +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Wimplicit-fallthrough" switch (ctrl) { case 32: *op++ = *ip++; case 31: *op++ = *ip++; case 30: *op++ = *ip++; case 29: *op++ = *ip++; @@ -97,6 +99,7 @@ lzf_decompress (const void *const in_data, unsigned int in_len, case 8: *op++ = *ip++; case 7: *op++ = *ip++; case 6: *op++ = *ip++; case 5: *op++ = *ip++; case 4: *op++ = *ip++; case 3: *op++ = *ip++; case 2: *op++ = *ip++; case 1: *op++ = *ip++; } +#pragma GCC diagnostic pop #endif } else /* back reference */ @@ -163,17 +166,17 @@ lzf_decompress (const void *const in_data, unsigned int in_len, break; - case 9: *op++ = *ref++; - case 8: *op++ = *ref++; - case 7: *op++ = *ref++; - case 6: *op++ = *ref++; - case 5: *op++ = *ref++; - case 4: *op++ = *ref++; - case 3: *op++ = *ref++; - case 2: *op++ = *ref++; - case 1: *op++ = *ref++; + case 9: *op++ = *ref++; /* fall-thru */ + case 8: *op++ = *ref++; /* fall-thru */ + case 7: *op++ = *ref++; /* fall-thru */ + case 6: *op++ = *ref++; /* fall-thru */ + case 5: *op++ = *ref++; /* fall-thru */ + case 4: *op++ = *ref++; /* fall-thru */ + case 3: *op++ = *ref++; /* fall-thru */ + case 2: *op++ = *ref++; /* fall-thru */ + case 1: *op++ = *ref++; /* fall-thru */ case 0: *op++ = *ref++; /* two octets more */ - *op++ = *ref++; + *op++ = *ref++; /* fall-thru */ } #endif } diff --git a/src/module.c b/src/module.c index cb03ad2cd..9809cd74e 100644 --- a/src/module.c +++ b/src/module.c @@ -2239,6 +2239,9 @@ int RM_HashSet(RedisModuleKey *key, int flags, ...) { * to avoid a useless copy. */ if (flags & REDISMODULE_HASH_CFIELDS) low_flags |= HASH_SET_TAKE_FIELD; + + robj *argv[2] = {field,value}; + hashTypeTryConversion(key->value,argv,0,1); updated += hashTypeSet(key->value, field->ptr, value->ptr, low_flags); /* If CFIELDS is active, SDS string ownership is now of hashTypeSet(), @@ -2709,9 +2712,9 @@ RedisModuleCallReply *RM_Call(RedisModuleCtx *ctx, const char *cmdname, const ch sds proto = sdsnewlen(c->buf,c->bufpos); c->bufpos = 0; while(listLength(c->reply)) { - sds o = listNodeValue(listFirst(c->reply)); + clientReplyBlock *o = listNodeValue(listFirst(c->reply)); - proto = sdscatsds(proto,o); + proto = sdscatlen(proto,o->buf,o->used); listDelNode(c->reply,listFirst(c->reply)); } reply = moduleCreateCallReplyFromProto(ctx,proto); @@ -3396,7 +3399,7 @@ void RM_LogRaw(RedisModule *module, const char *levelstr, const char *fmt, va_li * * If the specified log level is invalid, verbose is used by default. * There is a fixed limit to the length of the log line this function is able - * to emit, this limti is not specified but is guaranteed to be more than + * to emit, this limit is not specified but is guaranteed to be more than * a few lines of text. */ void RM_Log(RedisModuleCtx *ctx, const char *levelstr, const char *fmt, ...) { @@ -3827,7 +3830,7 @@ void moduleReleaseGIL(void) { * * Notification callback gets executed with a redis context that can not be * used to send anything to the client, and has the db number where the event - * occured as its selected db number. + * occurred as its selected db number. * * Notice that it is not necessary to enable norifications in redis.conf for * module notifications to work. @@ -3884,7 +3887,7 @@ void moduleNotifyKeyspaceEvent(int type, const char *event, robj *key, int dbid) } } -/* Unsubscribe any notification subscirbers this module has upon unloading */ +/* Unsubscribe any notification subscribers this module has upon unloading */ void moduleUnsubscribeNotifications(RedisModule *module) { listIter li; listNode *ln; @@ -4362,7 +4365,7 @@ void moduleInitModulesSystem(void) { * because the server must be fully initialized before loading modules. * * The function aborts the server on errors, since to start with missing - * modules is not considered sane: clients may rely on the existance of + * modules is not considered sane: clients may rely on the existence of * given commands, loading AOF also may need some modules to exist, and * if this instance is a slave, it must understand commands from master. */ void moduleLoadFromQueue(void) { @@ -4499,7 +4502,15 @@ int moduleUnload(sds name) { * MODULE LOAD [args...] */ void moduleCommand(client *c) { char *subcmd = c->argv[1]->ptr; - + if (c->argc == 2 && !strcasecmp(subcmd,"help")) { + const char *help[] = { +"LIST -- Return a list of loaded modules.", +"LOAD [arg ...] -- Load a module library from .", +"UNLOAD -- Unload a module.", +NULL + }; + addReplyHelp(c, help); + } else if (!strcasecmp(subcmd,"load") && c->argc >= 3) { robj **argv = NULL; int argc = 0; @@ -4548,7 +4559,8 @@ void moduleCommand(client *c) { } dictReleaseIterator(di); } else { - addReply(c,shared.syntaxerr); + addReplySubcommandSyntaxError(c); + return; } } diff --git a/src/modules/gendoc.rb b/src/modules/gendoc.rb index 516f5d795..ee6572884 100644 --- a/src/modules/gendoc.rb +++ b/src/modules/gendoc.rb @@ -1,5 +1,5 @@ # gendoc.rb -- Converts the top-comments inside module.c to modules API -# reference documentaiton in markdown format. +# reference documentation in markdown format. # Convert the C comment to markdown def markdown(s) diff --git a/src/networking.c b/src/networking.c index 00558974e..af7422178 100644 --- a/src/networking.c +++ b/src/networking.c @@ -56,11 +56,14 @@ size_t getStringObjectSdsUsedMemory(robj *o) { /* Client.reply list dup and free methods. */ void *dupClientReplyValue(void *o) { - return sdsdup(o); + clientReplyBlock *old = o; + clientReplyBlock *buf = zmalloc(sizeof(clientReplyBlock) + old->size); + memcpy(buf, o, sizeof(clientReplyBlock) + old->size); + return buf; } void freeClientReplyValue(void *o) { - sdsfree(o); + zfree(o); } int listMatchObjects(void *a, void *b) { @@ -75,6 +78,8 @@ void linkClient(client *c) { * this way removing the client in unlinkClient() will not require * a linear scan, but just a constant time operation. */ c->client_list_node = listLast(server.clients); + uint64_t id = htonu64(c->id); + raxInsert(server.clients_index,(unsigned char*)&id,sizeof(id),c,NULL); } client *createClient(int fd) { @@ -138,6 +143,7 @@ client *createClient(int fd) { c->bpop.target = NULL; c->bpop.xread_group = NULL; c->bpop.xread_consumer = NULL; + c->bpop.xread_group_noack = 0; c->bpop.numreplicas = 0; c->bpop.reploffset = 0; c->woff = 0; @@ -237,25 +243,35 @@ int _addReplyToBuffer(client *c, const char *s, size_t len) { void _addReplyStringToList(client *c, const char *s, size_t len) { if (c->flags & CLIENT_CLOSE_AFTER_REPLY) return; - if (listLength(c->reply) == 0) { - sds node = sdsnewlen(s,len); - listAddNodeTail(c->reply,node); - c->reply_bytes += len; - } else { - listNode *ln = listLast(c->reply); - sds tail = listNodeValue(ln); + listNode *ln = listLast(c->reply); + clientReplyBlock *tail = ln? listNodeValue(ln): NULL; - /* Append to this object when possible. If tail == NULL it was - * set via addDeferredMultiBulkLength(). */ - if (tail && sdslen(tail)+len <= PROTO_REPLY_CHUNK_BYTES) { - tail = sdscatlen(tail,s,len); - listNodeValue(ln) = tail; - c->reply_bytes += len; - } else { - sds node = sdsnewlen(s,len); - listAddNodeTail(c->reply,node); - c->reply_bytes += len; - } + /* Note that 'tail' may be NULL even if we have a tail node, becuase when + * addDeferredMultiBulkLength() is used, it sets a dummy node to NULL just + * fo fill it later, when the size of the bulk length is set. */ + + /* Append to tail string when possible. */ + if (tail) { + /* Copy the part we can fit into the tail, and leave the rest for a + * new node */ + size_t avail = tail->size - tail->used; + size_t copy = avail >= len? len: avail; + memcpy(tail->buf + tail->used, s, copy); + tail->used += copy; + s += copy; + len -= copy; + } + if (len) { + /* Create a new node, make sure it is allocated to at + * least PROTO_REPLY_CHUNK_BYTES */ + size_t size = len < PROTO_REPLY_CHUNK_BYTES? PROTO_REPLY_CHUNK_BYTES: len; + tail = zmalloc(size + sizeof(clientReplyBlock)); + /* take over the allocation's internal fragmentation */ + tail->size = zmalloc_usable(tail) - sizeof(clientReplyBlock); + tail->used = len; + memcpy(tail->buf, s, len); + listAddNodeTail(c->reply, tail); + c->reply_bytes += tail->size; } asyncCloseClientOnOutputBufferLimitReached(c); } @@ -326,11 +342,30 @@ void addReplyErrorLength(client *c, const char *s, size_t len) { if (!len || s[0] != '-') addReplyString(c,"-ERR ",5); addReplyString(c,s,len); addReplyString(c,"\r\n",2); - if (c->flags & CLIENT_MASTER) { + + /* Sometimes it could be normal that a slave replies to a master with + * an error and this function gets called. Actually the error will never + * be sent because addReply*() against master clients has no effect... + * A notable example is: + * + * EVAL 'redis.call("incr",KEYS[1]); redis.call("nonexisting")' 1 x + * + * Where the master must propagate the first change even if the second + * will produce an error. However it is useful to log such events since + * they are rare and may hint at errors in a script or a bug in Redis. */ + if (c->flags & (CLIENT_MASTER|CLIENT_SLAVE)) { + char* to = c->flags & CLIENT_MASTER? "master": "slave"; + char* from = c->flags & CLIENT_MASTER? "slave": "master"; char *cmdname = c->lastcmd ? c->lastcmd->name : ""; - serverLog(LL_WARNING,"== CRITICAL == This slave is sending an error " - "to its master: '%s' after processing the command " - "'%s'", s, cmdname); + serverLog(LL_WARNING,"== CRITICAL == This %s is sending an error " + "to its %s: '%s' after processing the command " + "'%s'", from, to, s, cmdname); + /* Here we want to panic because when a master is sending an + * error to some slave in the context of replication, this can + * only create some kind of offset or data desynchronization. Better + * to catch it ASAP and crash instead of continuing. */ + if (c->flags & CLIENT_SLAVE) + serverPanic("Continuing is unsafe: replication protocol violation."); } } @@ -387,26 +422,41 @@ void *addDeferredMultiBulkLength(client *c) { /* Populate the length object and try gluing it to the next chunk. */ void setDeferredMultiBulkLength(client *c, void *node, long length) { listNode *ln = (listNode*)node; - sds len, next; + clientReplyBlock *next; + char lenstr[128]; + size_t lenstr_len = sprintf(lenstr, "*%ld\r\n", length); /* Abort when *node is NULL: when the client should not accept writes * we return NULL in addDeferredMultiBulkLength() */ if (node == NULL) return; + serverAssert(!listNodeValue(ln)); - len = sdscatprintf(sdsnewlen("*",1),"%ld\r\n",length); - listNodeValue(ln) = len; - c->reply_bytes += sdslen(len); - if (ln->next != NULL) { - next = listNodeValue(ln->next); - - /* Only glue when the next node is non-NULL (an sds in this case) */ - if (next != NULL) { - len = sdscatsds(len,next); - listDelNode(c->reply,ln->next); - listNodeValue(ln) = len; - /* No need to update c->reply_bytes: we are just moving the same - * amount of bytes from one node to another. */ - } + /* Normally we fill this dummy NULL node, added by addDeferredMultiBulkLength(), + * with a new buffer structure containing the protocol needed to specify + * the length of the array following. However sometimes when there is + * little memory to move, we may instead remove this NULL node, and prefix + * our protocol in the node immediately after to it, in order to save a + * write(2) syscall later. Conditions needed to do it: + * + * - The next node is non-NULL, + * - It has enough room already allocated + * - And not too large (avoid large memmove) */ + if (ln->next != NULL && (next = listNodeValue(ln->next)) && + next->size - next->used >= lenstr_len && + next->used < PROTO_REPLY_CHUNK_BYTES * 4) { + memmove(next->buf + lenstr_len, next->buf, next->used); + memcpy(next->buf, lenstr, lenstr_len); + next->used += lenstr_len; + listDelNode(c->reply,ln); + } else { + /* Create a new node */ + clientReplyBlock *buf = zmalloc(lenstr_len + sizeof(clientReplyBlock)); + /* Take over the allocation's internal fragmentation */ + buf->size = zmalloc_usable(buf) - sizeof(clientReplyBlock); + buf->used = lenstr_len; + memcpy(buf->buf, lenstr, lenstr_len); + listNodeValue(ln) = buf; + c->reply_bytes += buf->size; } asyncCloseClientOnOutputBufferLimitReached(c); } @@ -560,11 +610,24 @@ void addReplyHelp(client *c, const char **help) { setDeferredMultiBulkLength(c,blenp,blen); } +/* Add a suggestive error reply. + * This function is typically invoked by from commands that support + * subcommands in response to an unknown subcommand or argument error. */ +void addReplySubcommandSyntaxError(client *c) { + sds cmd = sdsnew((char*) c->argv[0]->ptr); + sdstoupper(cmd); + addReplyErrorFormat(c, + "Unknown subcommand or wrong number of arguments for '%s'. Try %s HELP.", + (char*)c->argv[1]->ptr,cmd); + sdsfree(cmd); +} + /* Copy 'src' client output buffers into 'dst' client output buffers. * The function takes care of freeing the old output buffers of the * destination client. */ void copyClientOutputBuffer(client *dst, client *src) { listRelease(dst->reply); + dst->sentlen = 0; dst->reply = listDup(src->reply); memcpy(dst->buf,src->buf,src->bufpos); dst->bufpos = src->bufpos; @@ -720,6 +783,8 @@ void unlinkClient(client *c) { if (c->fd != -1) { /* Remove from the list of active clients. */ if (c->client_list_node) { + uint64_t id = htonu64(c->id); + raxRemove(server.clients_index,(unsigned char*)&id,sizeof(id),NULL); listDelNode(server.clients,c->client_list_node); c->client_list_node = NULL; } @@ -864,12 +929,21 @@ void freeClientsInAsyncFreeQueue(void) { } } +/* Return a client by ID, or NULL if the client ID is not in the set + * of registered clients. Note that "fake clients", created with -1 as FD, + * are not registered clients. */ +client *lookupClientByID(uint64_t id) { + id = htonu64(id); + client *c = raxFind(server.clients_index,(unsigned char*)&id,sizeof(id)); + return (c == raxNotFound) ? NULL : c; +} + /* Write data in output buffers to client. Return C_OK if the client * is still valid after the call, C_ERR if it was freed. */ int writeToClient(int fd, client *c, int handler_installed) { ssize_t nwritten = 0, totwritten = 0; size_t objlen; - sds o; + clientReplyBlock *o; while(clientHasPendingReplies(c)) { if (c->bufpos > 0) { @@ -886,23 +960,24 @@ int writeToClient(int fd, client *c, int handler_installed) { } } else { o = listNodeValue(listFirst(c->reply)); - objlen = sdslen(o); + objlen = o->used; if (objlen == 0) { + c->reply_bytes -= o->size; listDelNode(c->reply,listFirst(c->reply)); continue; } - nwritten = write(fd, o + c->sentlen, objlen - c->sentlen); + nwritten = write(fd, o->buf + c->sentlen, objlen - c->sentlen); if (nwritten <= 0) break; c->sentlen += nwritten; totwritten += nwritten; /* If we fully sent the object on head go to the next one */ if (c->sentlen == objlen) { + c->reply_bytes -= o->size; listDelNode(c->reply,listFirst(c->reply)); c->sentlen = 0; - c->reply_bytes -= objlen; /* If there are no longer objects in the list, we expect * the count of reply bytes to be exactly zero. */ if (listLength(c->reply) == 0) @@ -1039,7 +1114,7 @@ void resetClient(client *c) { * with the error and close the connection. */ int processInlineBuffer(client *c) { char *newline; - int argc, j; + int argc, j, linefeed_chars = 1; sds *argv, aux; size_t querylen; @@ -1057,7 +1132,7 @@ int processInlineBuffer(client *c) { /* Handle the \r\n case. */ if (newline && newline != c->querybuf && *(newline-1) == '\r') - newline--; + newline--, linefeed_chars++; /* Split the input buffer up to the \r\n */ querylen = newline-(c->querybuf); @@ -1077,7 +1152,7 @@ int processInlineBuffer(client *c) { c->repl_ack_time = server.unixtime; /* Leave data after the first line of the query in the buffer */ - sdsrange(c->querybuf,querylen+2,-1); + sdsrange(c->querybuf,querylen+linefeed_chars,-1); /* Setup argv array on client structure */ if (argc) { @@ -1493,6 +1568,7 @@ sds catClientInfoString(sds s, client *client) { *p++ = 'S'; } if (client->flags & CLIENT_MASTER) *p++ = 'M'; + if (client->flags & CLIENT_PUBSUB) *p++ = 'P'; if (client->flags & CLIENT_MULTI) *p++ = 'x'; if (client->flags & CLIENT_BLOCKED) *p++ = 'b'; if (client->flags & CLIENT_DIRTY_CAS) *p++ = 'd'; @@ -1531,7 +1607,7 @@ sds catClientInfoString(sds s, client *client) { client->lastcmd ? client->lastcmd->name : "NULL"); } -sds getAllClientsInfoString(void) { +sds getAllClientsInfoString(int type) { listNode *ln; listIter li; client *client; @@ -1540,6 +1616,7 @@ sds getAllClientsInfoString(void) { listRewind(server.clients,&li); while ((ln = listNext(&li)) != NULL) { client = listNodeValue(ln); + if (type != -1 && getClientType(client) != type) continue; o = catClientInfoString(o,client); o = sdscatlen(o,"\n",1); } @@ -1553,22 +1630,40 @@ void clientCommand(client *c) { if (c->argc == 2 && !strcasecmp(c->argv[1]->ptr,"help")) { const char *help[] = { -"getname -- Return the name of the current connection.", -"kill -- Kill connection made from .", +"id -- Return the ID of the current connection.", +"getname -- Return the name of the current connection.", +"kill -- Kill connection made from .", "kill