Thread support for redis-benchmark.

This commit is contained in:
artix 2018-07-31 10:45:00 +02:00
parent b19e8b9a2c
commit 563885d6d9

View File

@ -40,16 +40,24 @@
#include <signal.h>
#include <assert.h>
#include <math.h>
#include <pthread.h>
#include <sds.h> /* Use hiredis sds. */
#include "ae.h"
#include "hiredis.h"
#include "adlist.h"
#include "zmalloc.h"
#include "atomicvar.h"
#define UNUSED(V) ((void) V)
#define RANDPTR_INITIAL_SIZE 8
#define MAX_LATENCY_PRECISION 3
#define MAX_THREADS 16
#define CLIENT_GET_EVENTLOOP(c) \
(c->thread_id >= 0 ? config.threads[c->thread_id]->el : config.el)
struct benchmarkThread;
static struct config {
aeEventLoop *el;
@ -82,6 +90,12 @@ static struct config {
char *tests;
char *auth;
int precision;
int num_threads;
struct benchmarkThread **threads;
/* Thread mutexes to be used as fallbacks by atomicvar.h */
pthread_mutex_t requests_issued_mutex;
pthread_mutex_t requests_finished_mutex;
pthread_mutex_t liveclients_mutex;
} config;
typedef struct _client {
@ -98,11 +112,27 @@ typedef struct _client {
such as auth and select are prefixed to the pipeline of
benchmark commands and discarded after the first send. */
int prefixlen; /* Size in bytes of the pending prefix commands */
int thread_id;
} *client;
/* Threads. */
typedef struct benchmarkThread {
int index;
pthread_t thread;
aeEventLoop *el;
} benchmarkThread;
/* Prototypes */
static void writeHandler(aeEventLoop *el, int fd, void *privdata, int mask);
static void createMissingClients(client c);
static benchmarkThread *createBenchmarkThread(int index);
static void freeBenchmarkThread(benchmarkThread *thread);
static void freeBenchmarkThreads();
static void *execBenchmarkThread(void *ptr);
int showThroughput(struct aeEventLoop *eventLoop, long long id,
void *clientData);
/* Implementation */
static long long ustime(void) {
@ -126,17 +156,20 @@ static long long mstime(void) {
}
static void freeClient(client c) {
aeEventLoop *el = CLIENT_GET_EVENTLOOP(c);
listNode *ln;
aeDeleteFileEvent(config.el,c->context->fd,AE_WRITABLE);
aeDeleteFileEvent(config.el,c->context->fd,AE_READABLE);
aeDeleteFileEvent(el,c->context->fd,AE_WRITABLE);
aeDeleteFileEvent(el,c->context->fd,AE_READABLE);
redisFree(c->context);
sdsfree(c->obuf);
zfree(c->randptr);
zfree(c);
if (config.num_threads) pthread_mutex_lock(&(config.liveclients_mutex));
config.liveclients--;
ln = listSearchKey(config.clients,c);
assert(ln != NULL);
listDelNode(config.clients,ln);
if (config.num_threads) pthread_mutex_unlock(&(config.liveclients_mutex));
}
static void freeAllClients(void) {
@ -150,9 +183,10 @@ static void freeAllClients(void) {
}
static void resetClient(client c) {
aeDeleteFileEvent(config.el,c->context->fd,AE_WRITABLE);
aeDeleteFileEvent(config.el,c->context->fd,AE_READABLE);
aeCreateFileEvent(config.el,c->context->fd,AE_WRITABLE,writeHandler,c);
aeEventLoop *el = CLIENT_GET_EVENTLOOP(c);
aeDeleteFileEvent(el,c->context->fd,AE_WRITABLE);
aeDeleteFileEvent(el,c->context->fd,AE_READABLE);
aeCreateFileEvent(el,c->context->fd,AE_WRITABLE,writeHandler,c);
c->written = 0;
c->pending = config.pipeline;
}
@ -174,17 +208,30 @@ static void randomizeClientKey(client c) {
}
static void clientDone(client c) {
if (config.requests_finished == config.requests) {
int requests_finished = 0;
if (!config.num_threads) requests_finished = config.requests_finished;
else atomicGet(config.requests_finished, requests_finished);
if (requests_finished == config.requests) {
aeStop(CLIENT_GET_EVENTLOOP(c));
freeClient(c);
aeStop(config.el);
if (config.num_threads) {
int i = 0;
for (;i < config.num_threads; i++) {
benchmarkThread *t = config.threads[i];
if (t && t->el) aeStop(t->el);
}
}
return;
}
if (config.keepalive) {
resetClient(c);
} else {
if (config.num_threads) pthread_mutex_lock(&(config.liveclients_mutex));
config.liveclients--;
createMissingClients(c);
config.liveclients++;
if (config.num_threads)
pthread_mutex_unlock(&(config.liveclients_mutex));
freeClient(c);
}
}
@ -243,9 +290,14 @@ static void readHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
}
continue;
}
if (config.requests_finished < config.requests)
config.latency[config.requests_finished++] = c->latency;
if (config.num_threads)
pthread_mutex_lock(&(config.requests_finished_mutex));
if (config.requests_finished < config.requests) {
config.requests_finished++;
config.latency[config.requests_finished] = c->latency;
}
if (config.num_threads)
pthread_mutex_unlock(&(config.requests_finished_mutex));
c->pending--;
if (c->pending == 0) {
clientDone(c);
@ -267,7 +319,10 @@ static void writeHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
/* Initialize request when nothing was written. */
if (c->written == 0) {
/* Enforce upper bound to number of requests. */
if (config.requests_issued++ >= config.requests) {
int requests_issued = 0;
if (!config.num_threads) requests_issued = config.requests_issued++;
else atomicGetIncr(config.requests_issued, requests_issued, 1);
if (requests_issued >= config.requests) {
freeClient(c);
return;
}
@ -289,8 +344,8 @@ static void writeHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
}
c->written += nwritten;
if (sdslen(c->obuf) == c->written) {
aeDeleteFileEvent(config.el,c->context->fd,AE_WRITABLE);
aeCreateFileEvent(config.el,c->context->fd,AE_READABLE,readHandler,c);
aeDeleteFileEvent(el,c->context->fd,AE_WRITABLE);
aeCreateFileEvent(el,c->context->fd,AE_READABLE,readHandler,c);
}
}
}
@ -316,7 +371,7 @@ static void writeHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
* for arguments randomization.
*
* Even when cloning another client, prefix commands are applied if needed.*/
static client createClient(char *cmd, size_t len, client from) {
static client createClient(char *cmd, size_t len, client from, int thread_id) {
int j;
client c = zmalloc(sizeof(struct _client));
@ -333,6 +388,7 @@ static client createClient(char *cmd, size_t len, client from) {
fprintf(stderr,"%s: %s\n",config.hostsocket,c->context->errstr);
exit(1);
}
c->thread_id = thread_id;
/* Suppress hiredis cleanup of unused buffers for max speed. */
c->context->reader->maxbuf = 0;
@ -406,8 +462,14 @@ static client createClient(char *cmd, size_t len, client from) {
}
}
}
aeEventLoop *el = NULL;
if (thread_id < 0) el = config.el;
else {
benchmarkThread *thread = config.threads[thread_id];
el = thread->el;
}
if (config.idlemode == 0)
aeCreateFileEvent(config.el,c->context->fd,AE_WRITABLE,writeHandler,c);
aeCreateFileEvent(el,c->context->fd,AE_WRITABLE,writeHandler,c);
listAddNodeTail(config.clients,c);
config.liveclients++;
return c;
@ -415,9 +477,11 @@ static client createClient(char *cmd, size_t len, client from) {
static void createMissingClients(client c) {
int n = 0;
while(config.liveclients < config.numclients) {
createClient(NULL,0,c);
int thread_id = -1;
if (config.num_threads)
thread_id = config.liveclients % config.num_threads;
createClient(NULL,0,c,thread_id);
/* Listen backlog is quite limited on most systems */
if (++n > 64) {
@ -454,6 +518,9 @@ static void showLatencyReport(void) {
printf(" %d parallel clients\n", config.numclients);
printf(" %d bytes payload\n", config.datasize);
printf(" keep alive: %d\n", config.keepalive);
printf(" multi-thread: %s\n", (config.num_threads ? "yes" : "no"));
if (config.num_threads)
printf(" threads: %d\n", config.num_threads);
printf("\n");
qsort(config.latency,config.requests,sizeof(long long),compareLatency);
@ -484,21 +551,74 @@ static void showLatencyReport(void) {
}
static void benchmark(char *title, char *cmd, int len) {
int i;
client c;
config.title = title;
config.requests_issued = 0;
config.requests_finished = 0;
c = createClient(cmd,len,NULL);
if (config.num_threads) {
if (config.threads) freeBenchmarkThreads();
config.threads = zmalloc(config.num_threads * sizeof(benchmarkThread*));
for (i = 0; i < config.num_threads; i++) {
benchmarkThread *thread = createBenchmarkThread(i);
config.threads[i] = thread;
}
}
int thread_id = config.num_threads > 0 ? 0 : -1;
c = createClient(cmd,len,NULL,thread_id);
createMissingClients(c);
config.start = mstime();
aeMain(config.el);
if (!config.num_threads) aeMain(config.el);
else {
for (i = 0; i < config.num_threads; i++) {
benchmarkThread *t = config.threads[i];
if (pthread_create(&(t->thread), NULL, execBenchmarkThread, t)){
fprintf(stderr, "FATAL: Failed to start thread %d.\n", i);
exit(1);
}
}
for (i = 0; i < config.num_threads; i++)
pthread_join(config.threads[i]->thread, NULL);
}
config.totlatency = mstime()-config.start;
showLatencyReport();
freeAllClients();
if (config.threads) freeBenchmarkThreads();
}
static benchmarkThread *createBenchmarkThread(int index) {
benchmarkThread *thread = zmalloc(sizeof(*thread));
if (thread == NULL) return NULL;
thread->index = index;
thread->el = aeCreateEventLoop(1024*10);
aeCreateTimeEvent(thread->el,1,showThroughput,NULL,NULL);
return thread;
}
static void freeBenchmarkThread(benchmarkThread *thread) {
if (thread->el) aeDeleteEventLoop(thread->el);
zfree(thread);
}
static void freeBenchmarkThreads() {
int i = 0;
for (; i < config.num_threads; i++) {
benchmarkThread *thread = config.threads[i];
if (thread) freeBenchmarkThread(thread);
}
zfree(config.threads);
config.threads = NULL;
}
static void *execBenchmarkThread(void *ptr) {
benchmarkThread *thread = (benchmarkThread *) ptr;
aeMain(thread->el);
return NULL;
}
/* Returns number of consumed options. */
@ -576,6 +696,14 @@ int parseOptions(int argc, const char **argv) {
config.precision = atoi(argv[++i]);
if (config.precision < 0) config.precision = 0;
if (config.precision > MAX_LATENCY_PRECISION) config.precision = MAX_LATENCY_PRECISION;
} else if (!strcmp(argv[i],"--threads")) {
if (lastarg) goto invalid;
config.num_threads = atoi(argv[++i]);
if (config.num_threads > MAX_THREADS) {
printf("WARNING: too many threads, limiting threads to %d.\n",
MAX_THREADS);
config.num_threads = MAX_THREADS;
} else if (config.num_threads < 0) config.num_threads = 0;
} else if (!strcmp(argv[i],"--help")) {
exit_status = 0;
goto usage;
@ -644,8 +772,17 @@ int showThroughput(struct aeEventLoop *eventLoop, long long id, void *clientData
UNUSED(eventLoop);
UNUSED(id);
UNUSED(clientData);
int liveclients = 0;
int requests_finished = 0;
if (!config.num_threads) {
liveclients = config.liveclients;
requests_finished = config.requests_finished;
} else {
atomicGet(config.liveclients, liveclients);
atomicGet(config.requests_finished, requests_finished);
}
if (config.liveclients == 0 && config.requests_finished != config.requests) {
if (liveclients == 0 && requests_finished != config.requests) {
fprintf(stderr,"All clients disconnected... aborting.\n");
exit(1);
}
@ -656,7 +793,7 @@ int showThroughput(struct aeEventLoop *eventLoop, long long id, void *clientData
return 250;
}
float dt = (float)(mstime()-config.start)/1000.0;
float rps = (float)config.requests_finished/dt;
float rps = (float)requests_finished/dt;
printf("%s: %.2f\r", config.title, rps);
fflush(stdout);
return 250; /* every 250ms */
@ -711,12 +848,19 @@ int main(int argc, const char **argv) {
config.dbnum = 0;
config.auth = NULL;
config.precision = 1;
config.num_threads = 0;
config.threads = NULL;
i = parseOptions(argc,argv);
argc -= i;
argv += i;
config.latency = zmalloc(sizeof(long long)*config.requests);
if (config.num_threads > 0) {
pthread_mutex_init(&(config.requests_issued_mutex), NULL);
pthread_mutex_init(&(config.requests_finished_mutex), NULL);
pthread_mutex_init(&(config.liveclients_mutex), NULL);
}
if (config.keepalive == 0) {
printf("WARNING: keepalive disabled, you probably need 'echo 1 > /proc/sys/net/ipv4/tcp_tw_reuse' for Linux and 'sudo sysctl -w net.inet.tcp.msl=1000' for Mac OS X in order to use a lot of clients/requests\n");
@ -724,7 +868,7 @@ int main(int argc, const char **argv) {
if (config.idlemode) {
printf("Creating %d idle connections and waiting forever (Ctrl+C when done)\n", config.numclients);
c = createClient("",0,NULL); /* will never receive a reply */
c = createClient("",0,NULL,-1); /* will never receive a reply */
createMissingClients(c);
aeMain(config.el);
/* and will wait for every */