Implement redisAtomic to replace _Atomic C11 builtin (#7707)

Redis 6.0 introduces I/O threads, it is so cool and efficient, we use C11
_Atomic to establish inter-thread synchronization without mutex. But the
compiler that must supports C11 _Atomic can compile redis code, that brings a
lot of inconvenience since some common platforms can't support by default such
as CentOS7, so we want to implement redis atomic type to make it more portable.

We have implemented our atomic variable for redis that only has 'relaxed'
operations in src/atomicvar.h, so we implement some operations with
'sequentially-consistent', just like the default behavior of C11 _Atomic that
can establish inter-thread synchronization. And we replace all uses of C11
_Atomic with redis atomic variable.

Our implementation of redis atomic variable uses C11 _Atomic, __atomic or
__sync macros if available, it supports most common platforms, and we will
detect automatically which feature we use. In Makefile we use a dummy file to
detect if the compiler supports C11 _Atomic. Now for gcc, we can compile redis
code theoretically if your gcc version is not less than 4.1.2(starts to support
__sync_xxx operations). Otherwise, we remove use mutex fallback to implement
redis atomic variable for performance and test. You will get compiling errors
if your compiler doesn't support all features of above.

For cover redis atomic variable tests, we add other CI jobs that build redis on
CentOS6 and CentOS7 and workflow daily jobs that run the tests on them.
For them, we just install gcc by default in order to cover different compiler
versions, gcc is 4.4.7 by default installation on CentOS6 and 4.8.5 on CentOS7.

We restore the feature that we can test redis with Helgrind to find data race
errors. But you need install Valgrind in the default path configuration firstly
before running your tests, since we use macros in helgrind.h to tell Helgrind
inter-thread happens-before relationship explicitly for avoiding false positives.
Please open an issue on github if you find data race errors relate to this commit.

Unrelated:
- Fix redefinition of typedef 'RedisModuleUserChangedFunc'
  For some old version compilers, they will report errors or warnings, if we
  re-define function type.
This commit is contained in:
Wang Yuan 2020-09-17 21:01:45 +08:00 committed by GitHub
parent 092cfca522
commit 445a4b669a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 207 additions and 120 deletions

View File

@ -47,3 +47,22 @@ jobs:
- name: make
run: make MALLOC=libc
build-centos7-jemalloc:
runs-on: ubuntu-latest
container: centos:7
steps:
- uses: actions/checkout@v2
- name: make
run: |
yum -y install gcc make
make
build-centos6-jemalloc:
runs-on: ubuntu-latest
container: centos:6
steps:
- uses: actions/checkout@v1
- name: make
run: |
yum -y install gcc make
make

View File

@ -124,12 +124,33 @@ jobs:
- uses: actions/checkout@v2
- name: make
run: |
yum -y install centos-release-scl
yum -y install devtoolset-7
scl enable devtoolset-7 "make"
yum -y install gcc make
make
- name: test
run: |
yum -y install tcl
yum -y install which tcl
./runtest --accurate --verbose
- name: module api test
run: ./runtest-moduleapi --verbose
- name: sentinel tests
run: ./runtest-sentinel
- name: cluster tests
run: ./runtest-cluster
test-centos6-jemalloc:
runs-on: ubuntu-latest
if: github.repository == 'redis/redis'
container: centos:6
timeout-minutes: 14400
steps:
- uses: actions/checkout@v1
- name: make
run: |
yum -y install gcc make
make
- name: test
run: |
yum -y install which tcl util-linux-ng
./runtest --accurate --verbose
- name: module api test
run: ./runtest-moduleapi --verbose

View File

@ -20,7 +20,7 @@ DEPENDENCY_TARGETS=hiredis linenoise lua hdr_histogram
NODEPS:=clean distclean
# Default settings
STD=-std=c11 -pedantic -DREDIS_STATIC=''
STD=-pedantic -DREDIS_STATIC=''
ifneq (,$(findstring clang,$(CC)))
ifneq (,$(findstring FreeBSD,$(uname_S)))
STD+=-Wno-c11-extensions
@ -29,6 +29,16 @@ endif
WARN=-Wall -W -Wno-missing-field-initializers
OPT=$(OPTIMIZATION)
# Detect if the compiler supports C11 _Atomic
C11_ATOMIC := $(shell sh -c 'echo "\#include <stdatomic.h>" > foo.c; \
$(CC) -std=c11 -c foo.c -o foo.o &> /dev/null; \
if [ -f foo.o ]; then echo "yes"; rm foo.o; fi; rm foo.c')
ifeq ($(C11_ATOMIC),yes)
STD+=-std=c11
else
STD+=-std=c99
endif
PREFIX?=/usr/local
INSTALL_BIN=$(PREFIX)/bin
INSTALL=install
@ -367,7 +377,7 @@ valgrind:
$(MAKE) OPTIMIZATION="-O0" MALLOC="libc"
helgrind:
$(MAKE) OPTIMIZATION="-O0" MALLOC="libc" CFLAGS="-D__ATOMIC_VAR_FORCE_SYNC_MACROS"
$(MAKE) OPTIMIZATION="-O0" MALLOC="libc" CFLAGS="-D__ATOMIC_VAR_FORCE_SYNC_MACROS" REDIS_CFLAGS="-I/usr/local/include" REDIS_LDFLAGS="-L/usr/local/lib"
src/help.h:
@../utils/generate-command-help.rb > help.h

View File

@ -1,5 +1,5 @@
/* This file implements atomic counters using __atomic or __sync macros if
* available, otherwise synchronizing different threads using a mutex.
/* This file implements atomic counters using c11 _Atomic, __atomic or __sync
* macros if available, otherwise we will throw an error when compile.
*
* The exported interface is composed of three macros:
*
@ -8,16 +8,8 @@
* atomicDecr(var,count) -- Decrement the atomic counter
* atomicGet(var,dstvar) -- Fetch the atomic counter value
* atomicSet(var,value) -- Set the atomic counter value
*
* The variable 'var' should also have a declared mutex with the same
* name and the "_mutex" postfix, for instance:
*
* long myvar;
* pthread_mutex_t myvar_mutex;
* atomicSet(myvar,12345);
*
* If atomic primitives are available (tested in config.h) the mutex
* is not used.
* atomicGetWithSync(var,value) -- 'atomicGet' with inter-thread synchronization
* atomicSetWithSync(var,value) -- 'atomicSet' with inter-thread synchronization
*
* Never use return value from the macros, instead use the AtomicGetIncr()
* if you need to get the current value and increment it atomically, like
@ -58,17 +50,64 @@
*/
#include <pthread.h>
#include "config.h"
#ifndef __ATOMIC_VAR_H
#define __ATOMIC_VAR_H
/* Define redisAtomic for atomic variable. */
#define redisAtomic
/* To test Redis with Helgrind (a Valgrind tool) it is useful to define
* the following macro, so that __sync macros are used: those can be detected
* by Helgrind (even if they are less efficient) so that no false positive
* is reported. */
// #define __ATOMIC_VAR_FORCE_SYNC_MACROS
#if !defined(__ATOMIC_VAR_FORCE_SYNC_MACROS) && defined(__ATOMIC_RELAXED) && !defined(__sun) && (!defined(__clang__) || !defined(__APPLE__) || __apple_build_version__ > 4210057)
/* There will be many false positives if we test Redis with Helgrind, since
* Helgrind can't understand we have imposed ordering on the program, so
* we use macros in helgrind.h to tell Helgrind inter-thread happens-before
* relationship explicitly for avoiding false positives.
*
* For more details, please see: valgrind/helgrind.h and
* https://www.valgrind.org/docs/manual/hg-manual.html#hg-manual.effective-use
*
* These macros take effect only when 'make helgrind', and you must first
* install Valgrind in the default path configuration. */
#ifdef __ATOMIC_VAR_FORCE_SYNC_MACROS
#include <valgrind/helgrind.h>
#else
#define ANNOTATE_HAPPENS_BEFORE(v) ((void) v)
#define ANNOTATE_HAPPENS_AFTER(v) ((void) v)
#endif
#if !defined(__ATOMIC_VAR_FORCE_SYNC_MACROS) && defined(__STDC_VERSION__) && \
(__STDC_VERSION__ >= 201112L) && !defined(__STDC_NO_ATOMICS__)
/* Use '_Atomic' keyword if the compiler supports. */
#undef redisAtomic
#define redisAtomic _Atomic
/* Implementation using _Atomic in C11. */
#include <stdatomic.h>
#define atomicIncr(var,count) atomic_fetch_add_explicit(&var,(count),memory_order_relaxed)
#define atomicGetIncr(var,oldvalue_var,count) do { \
oldvalue_var = atomic_fetch_add_explicit(&var,(count),memory_order_relaxed); \
} while(0)
#define atomicDecr(var,count) atomic_fetch_sub_explicit(&var,(count),memory_order_relaxed)
#define atomicGet(var,dstvar) do { \
dstvar = atomic_load_explicit(&var,memory_order_relaxed); \
} while(0)
#define atomicSet(var,value) atomic_store_explicit(&var,value,memory_order_relaxed)
#define atomicGetWithSync(var,dstvar) do { \
dstvar = atomic_load_explicit(&var,memory_order_seq_cst); \
} while(0)
#define atomicSetWithSync(var,value) \
atomic_store_explicit(&var,value,memory_order_seq_cst)
#define REDIS_ATOMIC_API "c11-builtin"
#elif !defined(__ATOMIC_VAR_FORCE_SYNC_MACROS) && !defined(__sun) && \
(!defined(__clang__) || !defined(__APPLE__) || __apple_build_version__ > 4210057) && \
defined(__ATOMIC_RELAXED) && defined(__ATOMIC_SEQ_CST)
/* Implementation using __atomic macros. */
#define atomicIncr(var,count) __atomic_add_fetch(&var,(count),__ATOMIC_RELAXED)
@ -80,6 +119,11 @@
dstvar = __atomic_load_n(&var,__ATOMIC_RELAXED); \
} while(0)
#define atomicSet(var,value) __atomic_store_n(&var,value,__ATOMIC_RELAXED)
#define atomicGetWithSync(var,dstvar) do { \
dstvar = __atomic_load_n(&var,__ATOMIC_SEQ_CST); \
} while(0)
#define atomicSetWithSync(var,value) \
__atomic_store_n(&var,value,__ATOMIC_SEQ_CST)
#define REDIS_ATOMIC_API "atomic-builtin"
#elif defined(HAVE_ATOMIC)
@ -96,38 +140,19 @@
#define atomicSet(var,value) do { \
while(!__sync_bool_compare_and_swap(&var,var,value)); \
} while(0)
/* Actually the builtin issues a full memory barrier by default. */
#define atomicGetWithSync(var,dstvar) { \
dstvar = __sync_sub_and_fetch(&var,0,__sync_synchronize); \
ANNOTATE_HAPPENS_AFTER(&var); \
} while(0)
#define atomicSetWithSync(var,value) do { \
ANNOTATE_HAPPENS_BEFORE(&var); \
while(!__sync_bool_compare_and_swap(&var,var,value,__sync_synchronize)); \
} while(0)
#define REDIS_ATOMIC_API "sync-builtin"
#else
/* Implementation using pthread mutex. */
#define atomicIncr(var,count) do { \
pthread_mutex_lock(&var ## _mutex); \
var += (count); \
pthread_mutex_unlock(&var ## _mutex); \
} while(0)
#define atomicGetIncr(var,oldvalue_var,count) do { \
pthread_mutex_lock(&var ## _mutex); \
oldvalue_var = var; \
var += (count); \
pthread_mutex_unlock(&var ## _mutex); \
} while(0)
#define atomicDecr(var,count) do { \
pthread_mutex_lock(&var ## _mutex); \
var -= (count); \
pthread_mutex_unlock(&var ## _mutex); \
} while(0)
#define atomicGet(var,dstvar) do { \
pthread_mutex_lock(&var ## _mutex); \
dstvar = var; \
pthread_mutex_unlock(&var ## _mutex); \
} while(0)
#define atomicSet(var,value) do { \
pthread_mutex_lock(&var ## _mutex); \
var = value; \
pthread_mutex_unlock(&var ## _mutex); \
} while(0)
#define REDIS_ATOMIC_API "pthread-mutex"
#error "Unable to determine atomic operations for your platform"
#endif
#endif /* __ATOMIC_VAR_H */

View File

@ -79,7 +79,7 @@ unsigned int getLRUClock(void) {
unsigned int LRU_CLOCK(void) {
unsigned int lruclock;
if (1000/server.hz <= LRU_CLOCK_RESOLUTION) {
lruclock = server.lruclock;
atomicGet(server.lruclock,lruclock);
} else {
lruclock = getLRUClock();
}

View File

@ -3,8 +3,7 @@
#include "atomicvar.h"
#include "cluster.h"
static size_t lazyfree_objects = 0;
pthread_mutex_t lazyfree_objects_mutex = PTHREAD_MUTEX_INITIALIZER;
static redisAtomic size_t lazyfree_objects = 0;
/* Return the number of currently pending objects to free. */
size_t lazyfreeGetPendingObjectsCount(void) {

View File

@ -357,11 +357,6 @@ unsigned long long ModulesInHooks = 0; /* Total number of modules in hooks
/* Data structures related to the redis module users */
/* This callback type is called by moduleNotifyUserChanged() every time
* a user authenticated via the module API is associated with a different
* user or gets disconnected. */
typedef void (*RedisModuleUserChangedFunc) (uint64_t client_id, void *privdata);
/* This is the object returned by RM_CreateModuleUser(). The module API is
* able to create users, set ACLs to such users, and later authenticate
* clients using such newly created users. */

View File

@ -103,7 +103,8 @@ client *createClient(connection *conn) {
}
selectDb(c,0);
uint64_t client_id = ++server.next_client_id;
uint64_t client_id;
atomicGetIncr(server.next_client_id, client_id, 1);
c->id = client_id;
c->resp = 2;
c->conn = conn;
@ -1314,7 +1315,7 @@ client *lookupClientByID(uint64_t id) {
* thread safe. */
int writeToClient(client *c, int handler_installed) {
/* Update total number of writes on server */
server.stat_total_writes_processed++;
atomicIncr(server.stat_total_writes_processed, 1);
ssize_t nwritten = 0, totwritten = 0;
size_t objlen;
@ -1376,7 +1377,7 @@ int writeToClient(client *c, int handler_installed) {
zmalloc_used_memory() < server.maxmemory) &&
!(c->flags & CLIENT_SLAVE)) break;
}
server.stat_net_output_bytes += totwritten;
atomicIncr(server.stat_net_output_bytes, totwritten);
if (nwritten == -1) {
if (connGetState(c->conn) == CONN_STATE_CONNECTED) {
nwritten = 0;
@ -1933,7 +1934,7 @@ void readQueryFromClient(connection *conn) {
if (postponeClientRead(c)) return;
/* Update total number of reads on server */
server.stat_total_reads_processed++;
atomicIncr(server.stat_total_reads_processed, 1);
readlen = PROTO_IOBUF_LEN;
/* If this is a multi bulk request, and we are processing a bulk reply
@ -1979,7 +1980,7 @@ void readQueryFromClient(connection *conn) {
sdsIncrLen(c->querybuf,nread);
c->lastinteraction = server.unixtime;
if (c->flags & CLIENT_MASTER) c->read_reploff += nread;
server.stat_net_input_bytes += nread;
atomicIncr(server.stat_net_input_bytes, nread);
if (sdslen(c->querybuf) > server.client_max_querybuf_len) {
sds ci = catClientInfoString(sdsempty(),c), bytes = sdsempty();
@ -2938,7 +2939,7 @@ int tio_debug = 0;
pthread_t io_threads[IO_THREADS_MAX_NUM];
pthread_mutex_t io_threads_mutex[IO_THREADS_MAX_NUM];
_Atomic unsigned long io_threads_pending[IO_THREADS_MAX_NUM];
redisAtomic unsigned long io_threads_pending[IO_THREADS_MAX_NUM];
int io_threads_op; /* IO_THREADS_OP_WRITE or IO_THREADS_OP_READ. */
/* This is the list of clients each thread will serve when threaded I/O is
@ -2946,6 +2947,16 @@ int io_threads_op; /* IO_THREADS_OP_WRITE or IO_THREADS_OP_READ. */
* itself. */
list *io_threads_list[IO_THREADS_MAX_NUM];
static inline unsigned long getIOPendingCount(int i) {
unsigned long count = 0;
atomicGetWithSync(io_threads_pending[i], count);
return count;
}
static inline void setIOPendingCount(int i, unsigned long count) {
atomicSetWithSync(io_threads_pending[i], count);
}
void *IOThreadMain(void *myid) {
/* The ID is the thread number (from 0 to server.iothreads_num-1), and is
* used by the thread to just manipulate a single sub-array of clients. */
@ -2959,17 +2970,17 @@ void *IOThreadMain(void *myid) {
while(1) {
/* Wait for start */
for (int j = 0; j < 1000000; j++) {
if (io_threads_pending[id] != 0) break;
if (getIOPendingCount(id) != 0) break;
}
/* Give the main thread a chance to stop this thread. */
if (io_threads_pending[id] == 0) {
if (getIOPendingCount(id) == 0) {
pthread_mutex_lock(&io_threads_mutex[id]);
pthread_mutex_unlock(&io_threads_mutex[id]);
continue;
}
serverAssert(io_threads_pending[id] != 0);
serverAssert(getIOPendingCount(id) != 0);
if (tio_debug) printf("[%ld] %d to handle\n", id, (int)listLength(io_threads_list[id]));
@ -2989,7 +3000,7 @@ void *IOThreadMain(void *myid) {
}
}
listEmpty(io_threads_list[id]);
io_threads_pending[id] = 0;
setIOPendingCount(id, 0);
if (tio_debug) printf("[%ld] Done\n", id);
}
@ -3018,7 +3029,7 @@ void initThreadedIO(void) {
/* Things we do only for the additional threads. */
pthread_t tid;
pthread_mutex_init(&io_threads_mutex[i],NULL);
io_threads_pending[i] = 0;
setIOPendingCount(i, 0);
pthread_mutex_lock(&io_threads_mutex[i]); /* Thread will be stopped. */
if (pthread_create(&tid,NULL,IOThreadMain,(void*)(long)i) != 0) {
serverLog(LL_WARNING,"Fatal: Can't initialize IO thread.");
@ -3124,7 +3135,7 @@ int handleClientsWithPendingWritesUsingThreads(void) {
io_threads_op = IO_THREADS_OP_WRITE;
for (int j = 1; j < server.io_threads_num; j++) {
int count = listLength(io_threads_list[j]);
io_threads_pending[j] = count;
setIOPendingCount(j, count);
}
/* Also use the main thread to process a slice of clients. */
@ -3139,7 +3150,7 @@ int handleClientsWithPendingWritesUsingThreads(void) {
while(1) {
unsigned long pending = 0;
for (int j = 1; j < server.io_threads_num; j++)
pending += io_threads_pending[j];
pending += getIOPendingCount(j);
if (pending == 0) break;
}
if (tio_debug) printf("I/O WRITE All threads finshed\n");
@ -3214,7 +3225,7 @@ int handleClientsWithPendingReadsUsingThreads(void) {
io_threads_op = IO_THREADS_OP_READ;
for (int j = 1; j < server.io_threads_num; j++) {
int count = listLength(io_threads_list[j]);
io_threads_pending[j] = count;
setIOPendingCount(j, count);
}
/* Also use the main thread to process a slice of clients. */
@ -3229,7 +3240,7 @@ int handleClientsWithPendingReadsUsingThreads(void) {
while(1) {
unsigned long pending = 0;
for (int j = 1; j < server.io_threads_num; j++)
pending += io_threads_pending[j];
pending += getIOPendingCount(j);
if (pending == 0) break;
}
if (tio_debug) printf("I/O READ All threads finshed\n");

View File

@ -76,11 +76,11 @@ static struct config {
int hostport;
const char *hostsocket;
int numclients;
int liveclients;
redisAtomic int liveclients;
int requests;
int requests_issued;
int requests_finished;
int previous_requests_finished;
redisAtomic int requests_issued;
redisAtomic int requests_finished;
redisAtomic int previous_requests_finished;
int last_printed_bytes;
long long previous_tick;
int keysize;
@ -113,18 +113,12 @@ static struct config {
struct redisConfig *redis_config;
struct hdr_histogram* latency_histogram;
struct hdr_histogram* current_sec_latency_histogram;
int is_fetching_slots;
int is_updating_slots;
int slots_last_update;
redisAtomic int is_fetching_slots;
redisAtomic int is_updating_slots;
redisAtomic int slots_last_update;
int enable_tracking;
/* Thread mutexes to be used as fallbacks by atomicvar.h */
pthread_mutex_t requests_issued_mutex;
pthread_mutex_t requests_finished_mutex;
pthread_mutex_t liveclients_mutex;
pthread_mutex_t is_fetching_slots_mutex;
pthread_mutex_t is_updating_slots_mutex;
pthread_mutex_t updating_slots_mutex;
pthread_mutex_t slots_last_update_mutex;
} config;
typedef struct _client {
@ -1669,13 +1663,8 @@ int main(int argc, const char **argv) {
fprintf(stderr, "WARN: could not fetch server CONFIG\n");
}
if (config.num_threads > 0) {
pthread_mutex_init(&(config.requests_issued_mutex), NULL);
pthread_mutex_init(&(config.requests_finished_mutex), NULL);
pthread_mutex_init(&(config.liveclients_mutex), NULL);
pthread_mutex_init(&(config.is_fetching_slots_mutex), NULL);
pthread_mutex_init(&(config.is_updating_slots_mutex), NULL);
pthread_mutex_init(&(config.updating_slots_mutex), NULL);
pthread_mutex_init(&(config.slots_last_update_mutex), NULL);
}
if (config.keepalive == 0) {

View File

@ -1032,7 +1032,7 @@ void sendBulkToSlave(connection *conn) {
freeClient(slave);
return;
}
server.stat_net_output_bytes += nwritten;
atomicIncr(server.stat_net_output_bytes, nwritten);
sdsrange(slave->replpreamble,nwritten,-1);
if (sdslen(slave->replpreamble) == 0) {
sdsfree(slave->replpreamble);
@ -1061,7 +1061,7 @@ void sendBulkToSlave(connection *conn) {
return;
}
slave->repldboff += nwritten;
server.stat_net_output_bytes += nwritten;
atomicIncr(server.stat_net_output_bytes, nwritten);
if (slave->repldboff == slave->repldbsize) {
close(slave->repldbfd);
slave->repldbfd = -1;
@ -1102,7 +1102,7 @@ void rdbPipeWriteHandler(struct connection *conn) {
return;
} else {
slave->repldboff += nwritten;
server.stat_net_output_bytes += nwritten;
atomicIncr(server.stat_net_output_bytes, nwritten);
if (slave->repldboff < server.rdb_pipe_bufflen)
return; /* more data to write.. */
}
@ -1180,7 +1180,7 @@ void rdbPipeReadHandler(struct aeEventLoop *eventLoop, int fd, void *clientData,
/* Note: when use diskless replication, 'repldboff' is the offset
* of 'rdb_pipe_buff' sent rather than the offset of entire RDB. */
slave->repldboff = nwritten;
server.stat_net_output_bytes += nwritten;
atomicIncr(server.stat_net_output_bytes, nwritten);
}
/* If we were unable to write all the data to one of the replicas,
* setup write handler (and disable pipe read handler, below) */
@ -1558,7 +1558,7 @@ void readSyncBulkPayload(connection *conn) {
cancelReplicationHandshake(1);
return;
}
server.stat_net_input_bytes += nread;
atomicIncr(server.stat_net_input_bytes, nread);
/* When a mark is used, we want to detect EOF asap in order to avoid
* writing the EOF mark into the file... */

View File

@ -1764,7 +1764,8 @@ void databasesCron(void) {
void updateCachedTime(int update_daylight_info) {
server.ustime = ustime();
server.mstime = server.ustime / 1000;
server.unixtime = server.mstime / 1000;
time_t unixtime = server.mstime / 1000;
atomicSet(server.unixtime, unixtime);
/* To get information about daylight saving time, we need to call
* localtime_r and cache the result. However calling localtime_r in this
@ -1913,11 +1914,15 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
}
run_with_period(100) {
long long stat_net_input_bytes, stat_net_output_bytes;
atomicGet(server.stat_net_input_bytes, stat_net_input_bytes);
atomicGet(server.stat_net_output_bytes, stat_net_output_bytes);
trackInstantaneousMetric(STATS_METRIC_COMMAND,server.stat_numcommands);
trackInstantaneousMetric(STATS_METRIC_NET_INPUT,
server.stat_net_input_bytes);
stat_net_input_bytes);
trackInstantaneousMetric(STATS_METRIC_NET_OUTPUT,
server.stat_net_output_bytes);
stat_net_output_bytes);
}
/* We have just LRU_BITS bits per object for LRU information.
@ -1931,7 +1936,8 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
*
* Note that you can change the resolution altering the
* LRU_CLOCK_RESOLUTION define. */
server.lruclock = getLRUClock();
unsigned int lruclock = getLRUClock();
atomicSet(server.lruclock,lruclock);
cronUpdateMemoryStats();
@ -2443,7 +2449,8 @@ void initServerConfig(void) {
server.next_client_id = 1; /* Client IDs, start from 1 .*/
server.loading_process_events_interval_bytes = (1024*1024*2);
server.lruclock = getLRUClock();
unsigned int lruclock = getLRUClock();
atomicSet(server.lruclock,lruclock);
resetServerSaveParams();
appendServerSaveParams(60*60,1); /* save after 1 hour and 1 change */
@ -2846,9 +2853,9 @@ void resetServerStats(void) {
server.stat_sync_partial_ok = 0;
server.stat_sync_partial_err = 0;
server.stat_io_reads_processed = 0;
server.stat_total_reads_processed = 0;
atomicSet(server.stat_total_reads_processed, 0);
server.stat_io_writes_processed = 0;
server.stat_total_writes_processed = 0;
atomicSet(server.stat_total_writes_processed, 0);
for (j = 0; j < STATS_METRIC_COUNT; j++) {
server.inst_metric[j].idx = 0;
server.inst_metric[j].last_sample_time = mstime();
@ -2856,8 +2863,8 @@ void resetServerStats(void) {
memset(server.inst_metric[j].samples,0,
sizeof(server.inst_metric[j].samples));
}
server.stat_net_input_bytes = 0;
server.stat_net_output_bytes = 0;
atomicSet(server.stat_net_input_bytes, 0);
atomicSet(server.stat_net_output_bytes, 0);
server.stat_unexpected_error_replies = 0;
server.aof_delayed_fsync = 0;
server.blocked_last_cron = 0;
@ -4186,6 +4193,8 @@ sds genRedisInfoString(const char *section) {
call_uname = 0;
}
unsigned int lruclock;
atomicGet(server.lruclock,lruclock);
info = sdscatfmt(info,
"# Server\r\n"
"redis_version:%s\r\n"
@ -4230,7 +4239,7 @@ sds genRedisInfoString(const char *section) {
(int64_t)(uptime/(3600*24)),
server.hz,
server.config_hz,
server.lruclock,
lruclock,
server.executable ? server.executable : "",
server.configfile ? server.configfile : "",
server.io_threads_active);
@ -4472,6 +4481,13 @@ sds genRedisInfoString(const char *section) {
/* Stats */
if (allsections || defsections || !strcasecmp(section,"stats")) {
long long stat_total_reads_processed, stat_total_writes_processed;
long long stat_net_input_bytes, stat_net_output_bytes;
atomicGet(server.stat_total_reads_processed, stat_total_reads_processed);
atomicGet(server.stat_total_writes_processed, stat_total_writes_processed);
atomicGet(server.stat_net_input_bytes, stat_net_input_bytes);
atomicGet(server.stat_net_output_bytes, stat_net_output_bytes);
if (sections++) info = sdscat(info,"\r\n");
info = sdscatprintf(info,
"# Stats\r\n"
@ -4513,8 +4529,8 @@ sds genRedisInfoString(const char *section) {
server.stat_numconnections,
server.stat_numcommands,
getInstantaneousMetric(STATS_METRIC_COMMAND),
server.stat_net_input_bytes,
server.stat_net_output_bytes,
stat_net_input_bytes,
stat_net_output_bytes,
(float)getInstantaneousMetric(STATS_METRIC_NET_INPUT)/1024,
(float)getInstantaneousMetric(STATS_METRIC_NET_OUTPUT)/1024,
server.stat_rejected_conn,
@ -4541,8 +4557,8 @@ sds genRedisInfoString(const char *section) {
(unsigned long long) trackingGetTotalItems(),
(unsigned long long) trackingGetTotalPrefixes(),
server.stat_unexpected_error_replies,
server.stat_total_reads_processed,
server.stat_total_writes_processed,
stat_total_reads_processed,
stat_total_writes_processed,
server.stat_io_reads_processed,
server.stat_io_writes_processed);
}

View File

@ -34,6 +34,7 @@
#include "config.h"
#include "solarisfixes.h"
#include "rio.h"
#include "atomicvar.h"
#include <stdio.h>
#include <stdlib.h>
@ -511,8 +512,10 @@ typedef void (*moduleTypeDigestFunc)(struct RedisModuleDigest *digest, void *val
typedef size_t (*moduleTypeMemUsageFunc)(const void *value);
typedef void (*moduleTypeFreeFunc)(void *value);
/* A callback that is called when the client authentication changes. This
* needs to be exposed since you can't cast a function pointer to (void *) */
/* This callback type is called by moduleNotifyUserChanged() every time
* a user authenticated via the module API is associated with a different
* user or gets disconnected. This needs to be exposed since you can't cast
* a function pointer to (void *). */
typedef void (*RedisModuleUserChangedFunc) (uint64_t client_id, void *privdata);
@ -1063,7 +1066,7 @@ struct redisServer {
dict *commands; /* Command table */
dict *orig_commands; /* Command table before command renaming. */
aeEventLoop *el;
_Atomic unsigned int lruclock; /* Clock for LRU eviction */
redisAtomic unsigned int lruclock; /* Clock for LRU eviction */
int shutdown_asap; /* SHUTDOWN needed ASAP */
int activerehashing; /* Incremental rehash in serverCron() */
int active_defrag_running; /* Active defragmentation running (holds current scan aggressiveness) */
@ -1111,7 +1114,7 @@ struct redisServer {
mstime_t clients_pause_end_time; /* Time when we undo clients_paused */
char neterr[ANET_ERR_LEN]; /* Error buffer for anet.c */
dict *migrate_cached_sockets;/* MIGRATE cached sockets */
_Atomic uint64_t next_client_id; /* Next client unique ID. Incremental. */
redisAtomic uint64_t next_client_id; /* Next client unique ID. Incremental. */
int protected_mode; /* Don't accept external connections. */
int gopher_enabled; /* If true the server will reply to gopher
queries. Will still serve RESP2 queries. */
@ -1160,8 +1163,8 @@ struct redisServer {
long long slowlog_log_slower_than; /* SLOWLOG time limit (to get logged) */
unsigned long slowlog_max_len; /* SLOWLOG max number of items logged */
struct malloc_stats cron_malloc_stats; /* sampled in serverCron(). */
_Atomic long long stat_net_input_bytes; /* Bytes read from network. */
_Atomic long long stat_net_output_bytes; /* Bytes written to network. */
redisAtomic long long stat_net_input_bytes; /* Bytes read from network. */
redisAtomic long long stat_net_output_bytes; /* Bytes written to network. */
size_t stat_rdb_cow_bytes; /* Copy on write bytes during RDB saving. */
size_t stat_aof_cow_bytes; /* Copy on write bytes during AOF rewrite. */
size_t stat_module_cow_bytes; /* Copy on write bytes during module fork. */
@ -1169,8 +1172,8 @@ struct redisServer {
long long stat_unexpected_error_replies; /* Number of unexpected (aof-loading, replica to master, etc.) error replies */
long long stat_io_reads_processed; /* Number of read events processed by IO / Main threads */
long long stat_io_writes_processed; /* Number of write events processed by IO / Main threads */
_Atomic long long stat_total_reads_processed; /* Total number of read events processed */
_Atomic long long stat_total_writes_processed; /* Total number of write events processed */
redisAtomic long long stat_total_reads_processed; /* Total number of read events processed */
redisAtomic long long stat_total_writes_processed; /* Total number of write events processed */
/* The following two are used to track instantaneous metrics, like
* number of operations per second, network traffic. */
struct {
@ -1193,7 +1196,7 @@ struct redisServer {
int active_defrag_cycle_min; /* minimal effort for defrag in CPU percentage */
int active_defrag_cycle_max; /* maximal effort for defrag in CPU percentage */
unsigned long active_defrag_max_scan_fields; /* maximum number of fields of set/hash/zset/list to process from within the main dict scan */
_Atomic size_t client_max_querybuf_len; /* Limit for client query buffer length */
size_t client_max_querybuf_len; /* Limit for client query buffer length */
int dbnum; /* Total number of configured DBs */
int supervised; /* 1 if supervised, 0 otherwise. */
int supervised_mode; /* See SUPERVISED_* */
@ -1393,7 +1396,7 @@ struct redisServer {
int list_max_ziplist_size;
int list_compress_depth;
/* time cache */
_Atomic time_t unixtime; /* Unix time sampled every cron cycle. */
redisAtomic time_t unixtime; /* Unix time sampled every cron cycle. */
time_t timezone; /* Cached timezone. As set by tzset(). */
int daylight_active; /* Currently in daylight saving time. */
mstime_t mstime; /* 'unixtime' in milliseconds. */

View File

@ -74,8 +74,7 @@ void zlibc_free(void *ptr) {
#define update_zmalloc_stat_alloc(__n) atomicIncr(used_memory,(__n))
#define update_zmalloc_stat_free(__n) atomicDecr(used_memory,(__n))
static size_t used_memory = 0;
pthread_mutex_t used_memory_mutex = PTHREAD_MUTEX_INITIALIZER;
static redisAtomic size_t used_memory = 0;
static void zmalloc_default_oom(size_t size) {
fprintf(stderr, "zmalloc: Out of memory trying to allocate %zu bytes\n",