mirror of
https://codeberg.org/redict/redict.git
synced 2025-01-23 16:48:27 -05:00
6413e5f81a
If we only has one node in cluster or before 8fdc857, we don't know myself ip, so we should use config.hostip for myself. However, we should use the IP from the command response to update node->ip if it exists and is different from config.hostip otherwise, when there's more than one node in cluster, if we use -h with virtual IP or DNS, benchmark doesn't show node real ip and port of myself even though it could get right IP and port by CLUSTER NODES command.
1970 lines
74 KiB
C
1970 lines
74 KiB
C
/* Redis benchmark utility.
|
|
*
|
|
* Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
|
|
* All rights reserved.
|
|
*
|
|
* Redistribution and use in source and binary forms, with or without
|
|
* modification, are permitted provided that the following conditions are met:
|
|
*
|
|
* * Redistributions of source code must retain the above copyright notice,
|
|
* this list of conditions and the following disclaimer.
|
|
* * Redistributions in binary form must reproduce the above copyright
|
|
* notice, this list of conditions and the following disclaimer in the
|
|
* documentation and/or other materials provided with the distribution.
|
|
* * Neither the name of Redis nor the names of its contributors may be used
|
|
* to endorse or promote products derived from this software without
|
|
* specific prior written permission.
|
|
*
|
|
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
|
|
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
|
|
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
|
|
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
|
|
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
|
|
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
|
|
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
|
|
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
|
|
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
|
|
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
|
|
* POSSIBILITY OF SUCH DAMAGE.
|
|
*/
|
|
|
|
#include "fmacros.h"
|
|
#include "version.h"
|
|
|
|
#include <stdio.h>
|
|
#include <string.h>
|
|
#include <stdlib.h>
|
|
#include <unistd.h>
|
|
#include <errno.h>
|
|
#include <time.h>
|
|
#include <sys/time.h>
|
|
#include <signal.h>
|
|
#include <assert.h>
|
|
#include <math.h>
|
|
#include <pthread.h>
|
|
|
|
#include <sdscompat.h> /* Use hiredis' sds compat header that maps sds calls to their hi_ variants */
|
|
#include <sds.h> /* Use hiredis sds. */
|
|
#include "ae.h"
|
|
#include <hiredis.h>
|
|
#ifdef USE_OPENSSL
|
|
#include <openssl/ssl.h>
|
|
#include <openssl/err.h>
|
|
#include <hiredis_ssl.h>
|
|
#endif
|
|
#include "adlist.h"
|
|
#include "dict.h"
|
|
#include "zmalloc.h"
|
|
#include "atomicvar.h"
|
|
#include "crc16_slottable.h"
|
|
#include "hdr_histogram.h"
|
|
#include "cli_common.h"
|
|
|
|
#define UNUSED(V) ((void) V)
|
|
#define RANDPTR_INITIAL_SIZE 8
|
|
#define DEFAULT_LATENCY_PRECISION 3
|
|
#define MAX_LATENCY_PRECISION 4
|
|
#define MAX_THREADS 500
|
|
#define CLUSTER_SLOTS 16384
|
|
#define CONFIG_LATENCY_HISTOGRAM_MIN_VALUE 10L /* >= 10 usecs */
|
|
#define CONFIG_LATENCY_HISTOGRAM_MAX_VALUE 3000000L /* <= 30 secs(us precision) */
|
|
#define CONFIG_LATENCY_HISTOGRAM_INSTANT_MAX_VALUE 3000000L /* <= 3 secs(us precision) */
|
|
|
|
#define CLIENT_GET_EVENTLOOP(c) \
|
|
(c->thread_id >= 0 ? config.threads[c->thread_id]->el : config.el)
|
|
|
|
struct benchmarkThread;
|
|
struct clusterNode;
|
|
struct redisConfig;
|
|
|
|
static struct config {
|
|
aeEventLoop *el;
|
|
const char *hostip;
|
|
int hostport;
|
|
const char *hostsocket;
|
|
int tls;
|
|
struct cliSSLconfig sslconfig;
|
|
int numclients;
|
|
redisAtomic int liveclients;
|
|
int requests;
|
|
redisAtomic int requests_issued;
|
|
redisAtomic int requests_finished;
|
|
redisAtomic int previous_requests_finished;
|
|
int last_printed_bytes;
|
|
long long previous_tick;
|
|
int keysize;
|
|
int datasize;
|
|
int randomkeys;
|
|
int randomkeys_keyspacelen;
|
|
int keepalive;
|
|
int pipeline;
|
|
int showerrors;
|
|
long long start;
|
|
long long totlatency;
|
|
long long *latency;
|
|
const char *title;
|
|
list *clients;
|
|
int quiet;
|
|
int csv;
|
|
int loop;
|
|
int idlemode;
|
|
int dbnum;
|
|
sds dbnumstr;
|
|
char *tests;
|
|
char *auth;
|
|
const char *user;
|
|
int precision;
|
|
int num_threads;
|
|
struct benchmarkThread **threads;
|
|
int cluster_mode;
|
|
int cluster_node_count;
|
|
struct clusterNode **cluster_nodes;
|
|
struct redisConfig *redis_config;
|
|
struct hdr_histogram* latency_histogram;
|
|
struct hdr_histogram* current_sec_latency_histogram;
|
|
redisAtomic int is_fetching_slots;
|
|
redisAtomic int is_updating_slots;
|
|
redisAtomic int slots_last_update;
|
|
int enable_tracking;
|
|
pthread_mutex_t liveclients_mutex;
|
|
pthread_mutex_t is_updating_slots_mutex;
|
|
} config;
|
|
|
|
typedef struct _client {
|
|
redisContext *context;
|
|
sds obuf;
|
|
char **randptr; /* Pointers to :rand: strings inside the command buf */
|
|
size_t randlen; /* Number of pointers in client->randptr */
|
|
size_t randfree; /* Number of unused pointers in client->randptr */
|
|
char **stagptr; /* Pointers to slot hashtags (cluster mode only) */
|
|
size_t staglen; /* Number of pointers in client->stagptr */
|
|
size_t stagfree; /* Number of unused pointers in client->stagptr */
|
|
size_t written; /* Bytes of 'obuf' already written */
|
|
long long start; /* Start time of a request */
|
|
long long latency; /* Request latency */
|
|
int pending; /* Number of pending requests (replies to consume) */
|
|
int prefix_pending; /* If non-zero, number of pending prefix commands. Commands
|
|
such as auth and select are prefixed to the pipeline of
|
|
benchmark commands and discarded after the first send. */
|
|
int prefixlen; /* Size in bytes of the pending prefix commands */
|
|
int thread_id;
|
|
struct clusterNode *cluster_node;
|
|
int slots_last_update;
|
|
} *client;
|
|
|
|
/* Threads. */
|
|
|
|
typedef struct benchmarkThread {
|
|
int index;
|
|
pthread_t thread;
|
|
aeEventLoop *el;
|
|
} benchmarkThread;
|
|
|
|
/* Cluster. */
|
|
typedef struct clusterNode {
|
|
char *ip;
|
|
int port;
|
|
sds name;
|
|
int flags;
|
|
sds replicate; /* Master ID if node is a slave */
|
|
int *slots;
|
|
int slots_count;
|
|
int current_slot_index;
|
|
int *updated_slots; /* Used by updateClusterSlotsConfiguration */
|
|
int updated_slots_count; /* Used by updateClusterSlotsConfiguration */
|
|
int replicas_count;
|
|
sds *migrating; /* An array of sds where even strings are slots and odd
|
|
* strings are the destination node IDs. */
|
|
sds *importing; /* An array of sds where even strings are slots and odd
|
|
* strings are the source node IDs. */
|
|
int migrating_count; /* Length of the migrating array (migrating slots*2) */
|
|
int importing_count; /* Length of the importing array (importing slots*2) */
|
|
struct redisConfig *redis_config;
|
|
} clusterNode;
|
|
|
|
typedef struct redisConfig {
|
|
sds save;
|
|
sds appendonly;
|
|
} redisConfig;
|
|
|
|
/* Prototypes */
|
|
char *redisGitSHA1(void);
|
|
char *redisGitDirty(void);
|
|
static void writeHandler(aeEventLoop *el, int fd, void *privdata, int mask);
|
|
static void createMissingClients(client c);
|
|
static benchmarkThread *createBenchmarkThread(int index);
|
|
static void freeBenchmarkThread(benchmarkThread *thread);
|
|
static void freeBenchmarkThreads();
|
|
static void *execBenchmarkThread(void *ptr);
|
|
static clusterNode *createClusterNode(char *ip, int port);
|
|
static redisConfig *getRedisConfig(const char *ip, int port,
|
|
const char *hostsocket);
|
|
static redisContext *getRedisContext(const char *ip, int port,
|
|
const char *hostsocket);
|
|
static void freeRedisConfig(redisConfig *cfg);
|
|
static int fetchClusterSlotsConfiguration(client c);
|
|
static void updateClusterSlotsConfiguration();
|
|
int showThroughput(struct aeEventLoop *eventLoop, long long id,
|
|
void *clientData);
|
|
|
|
static sds benchmarkVersion(void) {
|
|
sds version;
|
|
version = sdscatprintf(sdsempty(), "%s", REDIS_VERSION);
|
|
|
|
/* Add git commit and working tree status when available */
|
|
if (strtoll(redisGitSHA1(),NULL,16)) {
|
|
version = sdscatprintf(version, " (git:%s", redisGitSHA1());
|
|
if (strtoll(redisGitDirty(),NULL,10))
|
|
version = sdscatprintf(version, "-dirty");
|
|
version = sdscat(version, ")");
|
|
}
|
|
return version;
|
|
}
|
|
|
|
/* Dict callbacks */
|
|
static uint64_t dictSdsHash(const void *key);
|
|
static int dictSdsKeyCompare(void *privdata, const void *key1,
|
|
const void *key2);
|
|
|
|
/* Implementation */
|
|
static long long ustime(void) {
|
|
struct timeval tv;
|
|
long long ust;
|
|
|
|
gettimeofday(&tv, NULL);
|
|
ust = ((long)tv.tv_sec)*1000000;
|
|
ust += tv.tv_usec;
|
|
return ust;
|
|
}
|
|
|
|
static long long mstime(void) {
|
|
struct timeval tv;
|
|
long long mst;
|
|
|
|
gettimeofday(&tv, NULL);
|
|
mst = ((long long)tv.tv_sec)*1000;
|
|
mst += tv.tv_usec/1000;
|
|
return mst;
|
|
}
|
|
|
|
static uint64_t dictSdsHash(const void *key) {
|
|
return dictGenHashFunction((unsigned char*)key, sdslen((char*)key));
|
|
}
|
|
|
|
static int dictSdsKeyCompare(void *privdata, const void *key1,
|
|
const void *key2)
|
|
{
|
|
int l1,l2;
|
|
DICT_NOTUSED(privdata);
|
|
|
|
l1 = sdslen((sds)key1);
|
|
l2 = sdslen((sds)key2);
|
|
if (l1 != l2) return 0;
|
|
return memcmp(key1, key2, l1) == 0;
|
|
}
|
|
|
|
/* _serverAssert is needed by dict */
|
|
void _serverAssert(const char *estr, const char *file, int line) {
|
|
fprintf(stderr, "=== ASSERTION FAILED ===");
|
|
fprintf(stderr, "==> %s:%d '%s' is not true",file,line,estr);
|
|
*((char*)-1) = 'x';
|
|
}
|
|
|
|
static redisContext *getRedisContext(const char *ip, int port,
|
|
const char *hostsocket)
|
|
{
|
|
redisContext *ctx = NULL;
|
|
redisReply *reply = NULL;
|
|
if (hostsocket == NULL)
|
|
ctx = redisConnect(ip, port);
|
|
else
|
|
ctx = redisConnectUnix(hostsocket);
|
|
if (ctx == NULL || ctx->err) {
|
|
fprintf(stderr,"Could not connect to Redis at ");
|
|
char *err = (ctx != NULL ? ctx->errstr : "");
|
|
if (hostsocket == NULL)
|
|
fprintf(stderr,"%s:%d: %s\n",ip,port,err);
|
|
else
|
|
fprintf(stderr,"%s: %s\n",hostsocket,err);
|
|
goto cleanup;
|
|
}
|
|
if (config.tls==1) {
|
|
const char *err = NULL;
|
|
if (cliSecureConnection(ctx, config.sslconfig, &err) == REDIS_ERR && err) {
|
|
fprintf(stderr, "Could not negotiate a TLS connection: %s\n", err);
|
|
goto cleanup;
|
|
}
|
|
}
|
|
if (config.auth == NULL)
|
|
return ctx;
|
|
if (config.user == NULL)
|
|
reply = redisCommand(ctx,"AUTH %s", config.auth);
|
|
else
|
|
reply = redisCommand(ctx,"AUTH %s %s", config.user, config.auth);
|
|
if (reply != NULL) {
|
|
if (reply->type == REDIS_REPLY_ERROR) {
|
|
if (hostsocket == NULL)
|
|
fprintf(stderr, "Node %s:%d replied with error:\n%s\n", ip, port, reply->str);
|
|
else
|
|
fprintf(stderr, "Node %s replied with error:\n%s\n", hostsocket, reply->str);
|
|
goto cleanup;
|
|
}
|
|
freeReplyObject(reply);
|
|
return ctx;
|
|
}
|
|
fprintf(stderr, "ERROR: failed to fetch reply from ");
|
|
if (hostsocket == NULL)
|
|
fprintf(stderr, "%s:%d\n", ip, port);
|
|
else
|
|
fprintf(stderr, "%s\n", hostsocket);
|
|
cleanup:
|
|
freeReplyObject(reply);
|
|
redisFree(ctx);
|
|
return NULL;
|
|
}
|
|
|
|
|
|
|
|
static redisConfig *getRedisConfig(const char *ip, int port,
|
|
const char *hostsocket)
|
|
{
|
|
redisConfig *cfg = zcalloc(sizeof(*cfg));
|
|
if (!cfg) return NULL;
|
|
redisContext *c = NULL;
|
|
redisReply *reply = NULL, *sub_reply = NULL;
|
|
c = getRedisContext(ip, port, hostsocket);
|
|
if (c == NULL) {
|
|
freeRedisConfig(cfg);
|
|
return NULL;
|
|
}
|
|
redisAppendCommand(c, "CONFIG GET %s", "save");
|
|
redisAppendCommand(c, "CONFIG GET %s", "appendonly");
|
|
int i = 0;
|
|
void *r = NULL;
|
|
for (; i < 2; i++) {
|
|
int res = redisGetReply(c, &r);
|
|
if (reply) freeReplyObject(reply);
|
|
reply = res == REDIS_OK ? ((redisReply *) r) : NULL;
|
|
if (res != REDIS_OK || !r) goto fail;
|
|
if (reply->type == REDIS_REPLY_ERROR) {
|
|
fprintf(stderr, "ERROR: %s\n", reply->str);
|
|
goto fail;
|
|
}
|
|
if (reply->type != REDIS_REPLY_ARRAY || reply->elements < 2) goto fail;
|
|
sub_reply = reply->element[1];
|
|
char *value = sub_reply->str;
|
|
if (!value) value = "";
|
|
switch (i) {
|
|
case 0: cfg->save = sdsnew(value); break;
|
|
case 1: cfg->appendonly = sdsnew(value); break;
|
|
}
|
|
}
|
|
freeReplyObject(reply);
|
|
redisFree(c);
|
|
return cfg;
|
|
fail:
|
|
fprintf(stderr, "ERROR: failed to fetch CONFIG from ");
|
|
if (hostsocket == NULL) fprintf(stderr, "%s:%d\n", ip, port);
|
|
else fprintf(stderr, "%s\n", hostsocket);
|
|
freeReplyObject(reply);
|
|
redisFree(c);
|
|
freeRedisConfig(cfg);
|
|
return NULL;
|
|
}
|
|
static void freeRedisConfig(redisConfig *cfg) {
|
|
if (cfg->save) sdsfree(cfg->save);
|
|
if (cfg->appendonly) sdsfree(cfg->appendonly);
|
|
zfree(cfg);
|
|
}
|
|
|
|
static void freeClient(client c) {
|
|
aeEventLoop *el = CLIENT_GET_EVENTLOOP(c);
|
|
listNode *ln;
|
|
aeDeleteFileEvent(el,c->context->fd,AE_WRITABLE);
|
|
aeDeleteFileEvent(el,c->context->fd,AE_READABLE);
|
|
if (c->thread_id >= 0) {
|
|
int requests_finished = 0;
|
|
atomicGet(config.requests_finished, requests_finished);
|
|
if (requests_finished >= config.requests) {
|
|
aeStop(el);
|
|
}
|
|
}
|
|
redisFree(c->context);
|
|
sdsfree(c->obuf);
|
|
zfree(c->randptr);
|
|
zfree(c->stagptr);
|
|
zfree(c);
|
|
if (config.num_threads) pthread_mutex_lock(&(config.liveclients_mutex));
|
|
config.liveclients--;
|
|
ln = listSearchKey(config.clients,c);
|
|
assert(ln != NULL);
|
|
listDelNode(config.clients,ln);
|
|
if (config.num_threads) pthread_mutex_unlock(&(config.liveclients_mutex));
|
|
}
|
|
|
|
static void freeAllClients(void) {
|
|
listNode *ln = config.clients->head, *next;
|
|
|
|
while(ln) {
|
|
next = ln->next;
|
|
freeClient(ln->value);
|
|
ln = next;
|
|
}
|
|
}
|
|
|
|
static void resetClient(client c) {
|
|
aeEventLoop *el = CLIENT_GET_EVENTLOOP(c);
|
|
aeDeleteFileEvent(el,c->context->fd,AE_WRITABLE);
|
|
aeDeleteFileEvent(el,c->context->fd,AE_READABLE);
|
|
aeCreateFileEvent(el,c->context->fd,AE_WRITABLE,writeHandler,c);
|
|
c->written = 0;
|
|
c->pending = config.pipeline;
|
|
}
|
|
|
|
static void randomizeClientKey(client c) {
|
|
size_t i;
|
|
|
|
for (i = 0; i < c->randlen; i++) {
|
|
char *p = c->randptr[i]+11;
|
|
size_t r = 0;
|
|
if (config.randomkeys_keyspacelen != 0)
|
|
r = random() % config.randomkeys_keyspacelen;
|
|
size_t j;
|
|
|
|
for (j = 0; j < 12; j++) {
|
|
*p = '0'+r%10;
|
|
r/=10;
|
|
p--;
|
|
}
|
|
}
|
|
}
|
|
|
|
static void setClusterKeyHashTag(client c) {
|
|
assert(c->thread_id >= 0);
|
|
clusterNode *node = c->cluster_node;
|
|
assert(node);
|
|
assert(node->current_slot_index < node->slots_count);
|
|
int is_updating_slots = 0;
|
|
atomicGet(config.is_updating_slots, is_updating_slots);
|
|
/* If updateClusterSlotsConfiguration is updating the slots array,
|
|
* call updateClusterSlotsConfiguration is order to block the thread
|
|
* since the mutex is locked. When the slots will be updated by the
|
|
* thread that's actually performing the update, the execution of
|
|
* updateClusterSlotsConfiguration won't actually do anything, since
|
|
* the updated_slots_count array will be already NULL. */
|
|
if (is_updating_slots) updateClusterSlotsConfiguration();
|
|
int slot = node->slots[node->current_slot_index];
|
|
const char *tag = crc16_slot_table[slot];
|
|
int taglen = strlen(tag);
|
|
size_t i;
|
|
for (i = 0; i < c->staglen; i++) {
|
|
char *p = c->stagptr[i] + 1;
|
|
p[0] = tag[0];
|
|
p[1] = (taglen >= 2 ? tag[1] : '}');
|
|
p[2] = (taglen == 3 ? tag[2] : '}');
|
|
}
|
|
}
|
|
|
|
static void clientDone(client c) {
|
|
int requests_finished = 0;
|
|
atomicGet(config.requests_finished, requests_finished);
|
|
if (requests_finished >= config.requests) {
|
|
freeClient(c);
|
|
if (!config.num_threads && config.el) aeStop(config.el);
|
|
return;
|
|
}
|
|
if (config.keepalive) {
|
|
resetClient(c);
|
|
} else {
|
|
if (config.num_threads) pthread_mutex_lock(&(config.liveclients_mutex));
|
|
config.liveclients--;
|
|
createMissingClients(c);
|
|
config.liveclients++;
|
|
if (config.num_threads)
|
|
pthread_mutex_unlock(&(config.liveclients_mutex));
|
|
freeClient(c);
|
|
}
|
|
}
|
|
|
|
static void readHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
|
|
client c = privdata;
|
|
void *reply = NULL;
|
|
UNUSED(el);
|
|
UNUSED(fd);
|
|
UNUSED(mask);
|
|
|
|
/* Calculate latency only for the first read event. This means that the
|
|
* server already sent the reply and we need to parse it. Parsing overhead
|
|
* is not part of the latency, so calculate it only once, here. */
|
|
if (c->latency < 0) c->latency = ustime()-(c->start);
|
|
|
|
if (redisBufferRead(c->context) != REDIS_OK) {
|
|
fprintf(stderr,"Error: %s\n",c->context->errstr);
|
|
exit(1);
|
|
} else {
|
|
while(c->pending) {
|
|
if (redisGetReply(c->context,&reply) != REDIS_OK) {
|
|
fprintf(stderr,"Error: %s\n",c->context->errstr);
|
|
exit(1);
|
|
}
|
|
if (reply != NULL) {
|
|
if (reply == (void*)REDIS_REPLY_ERROR) {
|
|
fprintf(stderr,"Unexpected error reply, exiting...\n");
|
|
exit(1);
|
|
}
|
|
redisReply *r = reply;
|
|
int is_err = (r->type == REDIS_REPLY_ERROR);
|
|
|
|
if (is_err && config.showerrors) {
|
|
/* TODO: static lasterr_time not thread-safe */
|
|
static time_t lasterr_time = 0;
|
|
time_t now = time(NULL);
|
|
if (lasterr_time != now) {
|
|
lasterr_time = now;
|
|
if (c->cluster_node) {
|
|
printf("Error from server %s:%d: %s\n",
|
|
c->cluster_node->ip,
|
|
c->cluster_node->port,
|
|
r->str);
|
|
} else printf("Error from server: %s\n", r->str);
|
|
}
|
|
}
|
|
|
|
/* Try to update slots configuration if reply error is
|
|
* MOVED/ASK/CLUSTERDOWN and the key(s) used by the command
|
|
* contain(s) the slot hash tag. */
|
|
if (is_err && c->cluster_node && c->staglen) {
|
|
int fetch_slots = 0, do_wait = 0;
|
|
if (!strncmp(r->str,"MOVED",5) || !strncmp(r->str,"ASK",3))
|
|
fetch_slots = 1;
|
|
else if (!strncmp(r->str,"CLUSTERDOWN",11)) {
|
|
/* Usually the cluster is able to recover itself after
|
|
* a CLUSTERDOWN error, so try to sleep one second
|
|
* before requesting the new configuration. */
|
|
fetch_slots = 1;
|
|
do_wait = 1;
|
|
printf("Error from server %s:%d: %s\n",
|
|
c->cluster_node->ip,
|
|
c->cluster_node->port,
|
|
r->str);
|
|
}
|
|
if (do_wait) sleep(1);
|
|
if (fetch_slots && !fetchClusterSlotsConfiguration(c))
|
|
exit(1);
|
|
}
|
|
|
|
freeReplyObject(reply);
|
|
/* This is an OK for prefix commands such as auth and select.*/
|
|
if (c->prefix_pending > 0) {
|
|
c->prefix_pending--;
|
|
c->pending--;
|
|
/* Discard prefix commands on first response.*/
|
|
if (c->prefixlen > 0) {
|
|
size_t j;
|
|
sdsrange(c->obuf, c->prefixlen, -1);
|
|
/* We also need to fix the pointers to the strings
|
|
* we need to randomize. */
|
|
for (j = 0; j < c->randlen; j++)
|
|
c->randptr[j] -= c->prefixlen;
|
|
c->prefixlen = 0;
|
|
}
|
|
continue;
|
|
}
|
|
int requests_finished = 0;
|
|
atomicGetIncr(config.requests_finished, requests_finished, 1);
|
|
if (requests_finished < config.requests){
|
|
if (config.num_threads == 0) {
|
|
hdr_record_value(
|
|
config.latency_histogram, // Histogram to record to
|
|
(long)c->latency<=CONFIG_LATENCY_HISTOGRAM_MAX_VALUE ? (long)c->latency : CONFIG_LATENCY_HISTOGRAM_MAX_VALUE); // Value to record
|
|
hdr_record_value(
|
|
config.current_sec_latency_histogram, // Histogram to record to
|
|
(long)c->latency<=CONFIG_LATENCY_HISTOGRAM_INSTANT_MAX_VALUE ? (long)c->latency : CONFIG_LATENCY_HISTOGRAM_INSTANT_MAX_VALUE); // Value to record
|
|
} else {
|
|
hdr_record_value_atomic(
|
|
config.latency_histogram, // Histogram to record to
|
|
(long)c->latency<=CONFIG_LATENCY_HISTOGRAM_MAX_VALUE ? (long)c->latency : CONFIG_LATENCY_HISTOGRAM_MAX_VALUE); // Value to record
|
|
hdr_record_value_atomic(
|
|
config.current_sec_latency_histogram, // Histogram to record to
|
|
(long)c->latency<=CONFIG_LATENCY_HISTOGRAM_INSTANT_MAX_VALUE ? (long)c->latency : CONFIG_LATENCY_HISTOGRAM_INSTANT_MAX_VALUE); // Value to record
|
|
}
|
|
}
|
|
c->pending--;
|
|
if (c->pending == 0) {
|
|
clientDone(c);
|
|
break;
|
|
}
|
|
} else {
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
static void writeHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
|
|
client c = privdata;
|
|
UNUSED(el);
|
|
UNUSED(fd);
|
|
UNUSED(mask);
|
|
|
|
/* Initialize request when nothing was written. */
|
|
if (c->written == 0) {
|
|
/* Enforce upper bound to number of requests. */
|
|
int requests_issued = 0;
|
|
atomicGetIncr(config.requests_issued, requests_issued, config.pipeline);
|
|
if (requests_issued >= config.requests) {
|
|
return;
|
|
}
|
|
|
|
/* Really initialize: randomize keys and set start time. */
|
|
if (config.randomkeys) randomizeClientKey(c);
|
|
if (config.cluster_mode && c->staglen > 0) setClusterKeyHashTag(c);
|
|
atomicGet(config.slots_last_update, c->slots_last_update);
|
|
c->start = ustime();
|
|
c->latency = -1;
|
|
}
|
|
const ssize_t buflen = sdslen(c->obuf);
|
|
const ssize_t writeLen = buflen-c->written;
|
|
if (writeLen > 0) {
|
|
void *ptr = c->obuf+c->written;
|
|
while(1) {
|
|
/* Optimistically try to write before checking if the file descriptor
|
|
* is actually writable. At worst we get EAGAIN. */
|
|
const ssize_t nwritten = cliWriteConn(c->context,ptr,writeLen);
|
|
if (nwritten != writeLen) {
|
|
if (nwritten == -1 && errno != EAGAIN) {
|
|
if (errno != EPIPE)
|
|
fprintf(stderr, "Error writing to the server: %s\n", strerror(errno));
|
|
freeClient(c);
|
|
return;
|
|
}
|
|
} else {
|
|
aeDeleteFileEvent(el,c->context->fd,AE_WRITABLE);
|
|
aeCreateFileEvent(el,c->context->fd,AE_READABLE,readHandler,c);
|
|
return;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
/* Create a benchmark client, configured to send the command passed as 'cmd' of
|
|
* 'len' bytes.
|
|
*
|
|
* The command is copied N times in the client output buffer (that is reused
|
|
* again and again to send the request to the server) accordingly to the configured
|
|
* pipeline size.
|
|
*
|
|
* Also an initial SELECT command is prepended in order to make sure the right
|
|
* database is selected, if needed. The initial SELECT will be discarded as soon
|
|
* as the first reply is received.
|
|
*
|
|
* To create a client from scratch, the 'from' pointer is set to NULL. If instead
|
|
* we want to create a client using another client as reference, the 'from' pointer
|
|
* points to the client to use as reference. In such a case the following
|
|
* information is take from the 'from' client:
|
|
*
|
|
* 1) The command line to use.
|
|
* 2) The offsets of the __rand_int__ elements inside the command line, used
|
|
* for arguments randomization.
|
|
*
|
|
* Even when cloning another client, prefix commands are applied if needed.*/
|
|
static client createClient(char *cmd, size_t len, client from, int thread_id) {
|
|
int j;
|
|
int is_cluster_client = (config.cluster_mode && thread_id >= 0);
|
|
client c = zmalloc(sizeof(struct _client));
|
|
|
|
const char *ip = NULL;
|
|
int port = 0;
|
|
c->cluster_node = NULL;
|
|
if (config.hostsocket == NULL || is_cluster_client) {
|
|
if (!is_cluster_client) {
|
|
ip = config.hostip;
|
|
port = config.hostport;
|
|
} else {
|
|
int node_idx = 0;
|
|
if (config.num_threads < config.cluster_node_count)
|
|
node_idx = config.liveclients % config.cluster_node_count;
|
|
else
|
|
node_idx = thread_id % config.cluster_node_count;
|
|
clusterNode *node = config.cluster_nodes[node_idx];
|
|
assert(node != NULL);
|
|
ip = (const char *) node->ip;
|
|
port = node->port;
|
|
c->cluster_node = node;
|
|
}
|
|
c->context = redisConnectNonBlock(ip,port);
|
|
} else {
|
|
c->context = redisConnectUnixNonBlock(config.hostsocket);
|
|
}
|
|
if (c->context->err) {
|
|
fprintf(stderr,"Could not connect to Redis at ");
|
|
if (config.hostsocket == NULL || is_cluster_client)
|
|
fprintf(stderr,"%s:%d: %s\n",ip,port,c->context->errstr);
|
|
else
|
|
fprintf(stderr,"%s: %s\n",config.hostsocket,c->context->errstr);
|
|
exit(1);
|
|
}
|
|
if (config.tls==1) {
|
|
const char *err = NULL;
|
|
if (cliSecureConnection(c->context, config.sslconfig, &err) == REDIS_ERR && err) {
|
|
fprintf(stderr, "Could not negotiate a TLS connection: %s\n", err);
|
|
exit(1);
|
|
}
|
|
}
|
|
c->thread_id = thread_id;
|
|
/* Suppress hiredis cleanup of unused buffers for max speed. */
|
|
c->context->reader->maxbuf = 0;
|
|
|
|
/* Build the request buffer:
|
|
* Queue N requests accordingly to the pipeline size, or simply clone
|
|
* the example client buffer. */
|
|
c->obuf = sdsempty();
|
|
/* Prefix the request buffer with AUTH and/or SELECT commands, if applicable.
|
|
* These commands are discarded after the first response, so if the client is
|
|
* reused the commands will not be used again. */
|
|
c->prefix_pending = 0;
|
|
if (config.auth) {
|
|
char *buf = NULL;
|
|
int len;
|
|
if (config.user == NULL)
|
|
len = redisFormatCommand(&buf, "AUTH %s", config.auth);
|
|
else
|
|
len = redisFormatCommand(&buf, "AUTH %s %s",
|
|
config.user, config.auth);
|
|
c->obuf = sdscatlen(c->obuf, buf, len);
|
|
free(buf);
|
|
c->prefix_pending++;
|
|
}
|
|
|
|
if (config.enable_tracking) {
|
|
char *buf = NULL;
|
|
int len = redisFormatCommand(&buf, "CLIENT TRACKING on");
|
|
c->obuf = sdscatlen(c->obuf, buf, len);
|
|
free(buf);
|
|
c->prefix_pending++;
|
|
}
|
|
|
|
/* If a DB number different than zero is selected, prefix our request
|
|
* buffer with the SELECT command, that will be discarded the first
|
|
* time the replies are received, so if the client is reused the
|
|
* SELECT command will not be used again. */
|
|
if (config.dbnum != 0 && !is_cluster_client) {
|
|
c->obuf = sdscatprintf(c->obuf,"*2\r\n$6\r\nSELECT\r\n$%d\r\n%s\r\n",
|
|
(int)sdslen(config.dbnumstr),config.dbnumstr);
|
|
c->prefix_pending++;
|
|
}
|
|
c->prefixlen = sdslen(c->obuf);
|
|
/* Append the request itself. */
|
|
if (from) {
|
|
c->obuf = sdscatlen(c->obuf,
|
|
from->obuf+from->prefixlen,
|
|
sdslen(from->obuf)-from->prefixlen);
|
|
} else {
|
|
for (j = 0; j < config.pipeline; j++)
|
|
c->obuf = sdscatlen(c->obuf,cmd,len);
|
|
}
|
|
|
|
c->written = 0;
|
|
c->pending = config.pipeline+c->prefix_pending;
|
|
c->randptr = NULL;
|
|
c->randlen = 0;
|
|
c->stagptr = NULL;
|
|
c->staglen = 0;
|
|
|
|
/* Find substrings in the output buffer that need to be randomized. */
|
|
if (config.randomkeys) {
|
|
if (from) {
|
|
c->randlen = from->randlen;
|
|
c->randfree = 0;
|
|
c->randptr = zmalloc(sizeof(char*)*c->randlen);
|
|
/* copy the offsets. */
|
|
for (j = 0; j < (int)c->randlen; j++) {
|
|
c->randptr[j] = c->obuf + (from->randptr[j]-from->obuf);
|
|
/* Adjust for the different select prefix length. */
|
|
c->randptr[j] += c->prefixlen - from->prefixlen;
|
|
}
|
|
} else {
|
|
char *p = c->obuf;
|
|
|
|
c->randlen = 0;
|
|
c->randfree = RANDPTR_INITIAL_SIZE;
|
|
c->randptr = zmalloc(sizeof(char*)*c->randfree);
|
|
while ((p = strstr(p,"__rand_int__")) != NULL) {
|
|
if (c->randfree == 0) {
|
|
c->randptr = zrealloc(c->randptr,sizeof(char*)*c->randlen*2);
|
|
c->randfree += c->randlen;
|
|
}
|
|
c->randptr[c->randlen++] = p;
|
|
c->randfree--;
|
|
p += 12; /* 12 is strlen("__rand_int__). */
|
|
}
|
|
}
|
|
}
|
|
/* If cluster mode is enabled, set slot hashtags pointers. */
|
|
if (config.cluster_mode) {
|
|
if (from) {
|
|
c->staglen = from->staglen;
|
|
c->stagfree = 0;
|
|
c->stagptr = zmalloc(sizeof(char*)*c->staglen);
|
|
/* copy the offsets. */
|
|
for (j = 0; j < (int)c->staglen; j++) {
|
|
c->stagptr[j] = c->obuf + (from->stagptr[j]-from->obuf);
|
|
/* Adjust for the different select prefix length. */
|
|
c->stagptr[j] += c->prefixlen - from->prefixlen;
|
|
}
|
|
} else {
|
|
char *p = c->obuf;
|
|
|
|
c->staglen = 0;
|
|
c->stagfree = RANDPTR_INITIAL_SIZE;
|
|
c->stagptr = zmalloc(sizeof(char*)*c->stagfree);
|
|
while ((p = strstr(p,"{tag}")) != NULL) {
|
|
if (c->stagfree == 0) {
|
|
c->stagptr = zrealloc(c->stagptr,
|
|
sizeof(char*) * c->staglen*2);
|
|
c->stagfree += c->staglen;
|
|
}
|
|
c->stagptr[c->staglen++] = p;
|
|
c->stagfree--;
|
|
p += 5; /* 5 is strlen("{tag}"). */
|
|
}
|
|
}
|
|
}
|
|
aeEventLoop *el = NULL;
|
|
if (thread_id < 0) el = config.el;
|
|
else {
|
|
benchmarkThread *thread = config.threads[thread_id];
|
|
el = thread->el;
|
|
}
|
|
if (config.idlemode == 0)
|
|
aeCreateFileEvent(el,c->context->fd,AE_WRITABLE,writeHandler,c);
|
|
listAddNodeTail(config.clients,c);
|
|
atomicIncr(config.liveclients, 1);
|
|
atomicGet(config.slots_last_update, c->slots_last_update);
|
|
return c;
|
|
}
|
|
|
|
static void createMissingClients(client c) {
|
|
int n = 0;
|
|
while(config.liveclients < config.numclients) {
|
|
int thread_id = -1;
|
|
if (config.num_threads)
|
|
thread_id = config.liveclients % config.num_threads;
|
|
createClient(NULL,0,c,thread_id);
|
|
|
|
/* Listen backlog is quite limited on most systems */
|
|
if (++n > 64) {
|
|
usleep(50000);
|
|
n = 0;
|
|
}
|
|
}
|
|
}
|
|
|
|
static void showLatencyReport(void) {
|
|
|
|
const float reqpersec = (float)config.requests_finished/((float)config.totlatency/1000.0f);
|
|
const float p0 = ((float) hdr_min(config.latency_histogram))/1000.0f;
|
|
const float p50 = hdr_value_at_percentile(config.latency_histogram, 50.0 )/1000.0f;
|
|
const float p95 = hdr_value_at_percentile(config.latency_histogram, 95.0 )/1000.0f;
|
|
const float p99 = hdr_value_at_percentile(config.latency_histogram, 99.0 )/1000.0f;
|
|
const float p100 = ((float) hdr_max(config.latency_histogram))/1000.0f;
|
|
const float avg = hdr_mean(config.latency_histogram)/1000.0f;
|
|
|
|
if (!config.quiet && !config.csv) {
|
|
printf("%*s\r", config.last_printed_bytes, " "); // ensure there is a clean line
|
|
printf("====== %s ======\n", config.title);
|
|
printf(" %d requests completed in %.2f seconds\n", config.requests_finished,
|
|
(float)config.totlatency/1000);
|
|
printf(" %d parallel clients\n", config.numclients);
|
|
printf(" %d bytes payload\n", config.datasize);
|
|
printf(" keep alive: %d\n", config.keepalive);
|
|
if (config.cluster_mode) {
|
|
printf(" cluster mode: yes (%d masters)\n",
|
|
config.cluster_node_count);
|
|
int m ;
|
|
for (m = 0; m < config.cluster_node_count; m++) {
|
|
clusterNode *node = config.cluster_nodes[m];
|
|
redisConfig *cfg = node->redis_config;
|
|
if (cfg == NULL) continue;
|
|
printf(" node [%d] configuration:\n",m );
|
|
printf(" save: %s\n",
|
|
sdslen(cfg->save) ? cfg->save : "NONE");
|
|
printf(" appendonly: %s\n", cfg->appendonly);
|
|
}
|
|
} else {
|
|
if (config.redis_config) {
|
|
printf(" host configuration \"save\": %s\n",
|
|
config.redis_config->save);
|
|
printf(" host configuration \"appendonly\": %s\n",
|
|
config.redis_config->appendonly);
|
|
}
|
|
}
|
|
printf(" multi-thread: %s\n", (config.num_threads ? "yes" : "no"));
|
|
if (config.num_threads)
|
|
printf(" threads: %d\n", config.num_threads);
|
|
|
|
printf("\n");
|
|
printf("Latency by percentile distribution:\n");
|
|
struct hdr_iter iter;
|
|
long long previous_cumulative_count = -1;
|
|
const long long total_count = config.latency_histogram->total_count;
|
|
hdr_iter_percentile_init(&iter, config.latency_histogram, 1);
|
|
struct hdr_iter_percentiles *percentiles = &iter.specifics.percentiles;
|
|
while (hdr_iter_next(&iter))
|
|
{
|
|
const double value = iter.highest_equivalent_value / 1000.0f;
|
|
const double percentile = percentiles->percentile;
|
|
const long long cumulative_count = iter.cumulative_count;
|
|
if( previous_cumulative_count != cumulative_count || cumulative_count == total_count ){
|
|
printf("%3.3f%% <= %.3f milliseconds (cumulative count %lld)\n", percentile, value, cumulative_count);
|
|
}
|
|
previous_cumulative_count = cumulative_count;
|
|
}
|
|
printf("\n");
|
|
printf("Cumulative distribution of latencies:\n");
|
|
previous_cumulative_count = -1;
|
|
hdr_iter_linear_init(&iter, config.latency_histogram, 100);
|
|
while (hdr_iter_next(&iter))
|
|
{
|
|
const double value = iter.highest_equivalent_value / 1000.0f;
|
|
const long long cumulative_count = iter.cumulative_count;
|
|
const double percentile = ((double)cumulative_count/(double)total_count)*100.0;
|
|
if( previous_cumulative_count != cumulative_count || cumulative_count == total_count ){
|
|
printf("%3.3f%% <= %.3f milliseconds (cumulative count %lld)\n", percentile, value, cumulative_count);
|
|
}
|
|
/* After the 2 milliseconds latency to have percentages split
|
|
* by decimals will just add a lot of noise to the output. */
|
|
if(iter.highest_equivalent_value > 2000){
|
|
hdr_iter_linear_set_value_units_per_bucket(&iter,1000);
|
|
}
|
|
previous_cumulative_count = cumulative_count;
|
|
}
|
|
printf("\n");
|
|
printf("Summary:\n");
|
|
printf(" throughput summary: %.2f requests per second\n", reqpersec);
|
|
printf(" latency summary (msec):\n");
|
|
printf(" %9s %9s %9s %9s %9s %9s\n", "avg", "min", "p50", "p95", "p99", "max");
|
|
printf(" %9.3f %9.3f %9.3f %9.3f %9.3f %9.3f\n", avg, p0, p50, p95, p99, p100);
|
|
} else if (config.csv) {
|
|
printf("\"%s\",\"%.2f\",\"%.3f\",\"%.3f\",\"%.3f\",\"%.3f\",\"%.3f\",\"%.3f\"\n", config.title, reqpersec, avg, p0, p50, p95, p99, p100);
|
|
} else {
|
|
printf("%*s\r", config.last_printed_bytes, " "); // ensure there is a clean line
|
|
printf("%s: %.2f requests per second, p50=%.3f msec\n", config.title, reqpersec, p50);
|
|
}
|
|
}
|
|
|
|
static void initBenchmarkThreads() {
|
|
int i;
|
|
if (config.threads) freeBenchmarkThreads();
|
|
config.threads = zmalloc(config.num_threads * sizeof(benchmarkThread*));
|
|
for (i = 0; i < config.num_threads; i++) {
|
|
benchmarkThread *thread = createBenchmarkThread(i);
|
|
config.threads[i] = thread;
|
|
}
|
|
}
|
|
|
|
static void startBenchmarkThreads() {
|
|
int i;
|
|
for (i = 0; i < config.num_threads; i++) {
|
|
benchmarkThread *t = config.threads[i];
|
|
if (pthread_create(&(t->thread), NULL, execBenchmarkThread, t)){
|
|
fprintf(stderr, "FATAL: Failed to start thread %d.\n", i);
|
|
exit(1);
|
|
}
|
|
}
|
|
for (i = 0; i < config.num_threads; i++)
|
|
pthread_join(config.threads[i]->thread, NULL);
|
|
}
|
|
|
|
static void benchmark(char *title, char *cmd, int len) {
|
|
client c;
|
|
|
|
config.title = title;
|
|
config.requests_issued = 0;
|
|
config.requests_finished = 0;
|
|
config.previous_requests_finished = 0;
|
|
config.last_printed_bytes = 0;
|
|
hdr_init(
|
|
CONFIG_LATENCY_HISTOGRAM_MIN_VALUE, // Minimum value
|
|
CONFIG_LATENCY_HISTOGRAM_MAX_VALUE, // Maximum value
|
|
config.precision, // Number of significant figures
|
|
&config.latency_histogram); // Pointer to initialise
|
|
hdr_init(
|
|
CONFIG_LATENCY_HISTOGRAM_MIN_VALUE, // Minimum value
|
|
CONFIG_LATENCY_HISTOGRAM_INSTANT_MAX_VALUE, // Maximum value
|
|
config.precision, // Number of significant figures
|
|
&config.current_sec_latency_histogram); // Pointer to initialise
|
|
|
|
if (config.num_threads) initBenchmarkThreads();
|
|
|
|
int thread_id = config.num_threads > 0 ? 0 : -1;
|
|
c = createClient(cmd,len,NULL,thread_id);
|
|
createMissingClients(c);
|
|
|
|
config.start = mstime();
|
|
if (!config.num_threads) aeMain(config.el);
|
|
else startBenchmarkThreads();
|
|
config.totlatency = mstime()-config.start;
|
|
|
|
showLatencyReport();
|
|
freeAllClients();
|
|
if (config.threads) freeBenchmarkThreads();
|
|
if (config.current_sec_latency_histogram) hdr_close(config.current_sec_latency_histogram);
|
|
if (config.latency_histogram) hdr_close(config.latency_histogram);
|
|
|
|
}
|
|
|
|
/* Thread functions. */
|
|
|
|
static benchmarkThread *createBenchmarkThread(int index) {
|
|
benchmarkThread *thread = zmalloc(sizeof(*thread));
|
|
if (thread == NULL) return NULL;
|
|
thread->index = index;
|
|
thread->el = aeCreateEventLoop(1024*10);
|
|
aeCreateTimeEvent(thread->el,1,showThroughput,NULL,NULL);
|
|
return thread;
|
|
}
|
|
|
|
static void freeBenchmarkThread(benchmarkThread *thread) {
|
|
if (thread->el) aeDeleteEventLoop(thread->el);
|
|
zfree(thread);
|
|
}
|
|
|
|
static void freeBenchmarkThreads() {
|
|
int i = 0;
|
|
for (; i < config.num_threads; i++) {
|
|
benchmarkThread *thread = config.threads[i];
|
|
if (thread) freeBenchmarkThread(thread);
|
|
}
|
|
zfree(config.threads);
|
|
config.threads = NULL;
|
|
}
|
|
|
|
static void *execBenchmarkThread(void *ptr) {
|
|
benchmarkThread *thread = (benchmarkThread *) ptr;
|
|
aeMain(thread->el);
|
|
return NULL;
|
|
}
|
|
|
|
/* Cluster helper functions. */
|
|
|
|
static clusterNode *createClusterNode(char *ip, int port) {
|
|
clusterNode *node = zmalloc(sizeof(*node));
|
|
if (!node) return NULL;
|
|
node->ip = ip;
|
|
node->port = port;
|
|
node->name = NULL;
|
|
node->flags = 0;
|
|
node->replicate = NULL;
|
|
node->replicas_count = 0;
|
|
node->slots = zmalloc(CLUSTER_SLOTS * sizeof(int));
|
|
node->slots_count = 0;
|
|
node->current_slot_index = 0;
|
|
node->updated_slots = NULL;
|
|
node->updated_slots_count = 0;
|
|
node->migrating = NULL;
|
|
node->importing = NULL;
|
|
node->migrating_count = 0;
|
|
node->importing_count = 0;
|
|
node->redis_config = NULL;
|
|
return node;
|
|
}
|
|
|
|
static void freeClusterNode(clusterNode *node) {
|
|
int i;
|
|
if (node->name) sdsfree(node->name);
|
|
if (node->replicate) sdsfree(node->replicate);
|
|
if (node->migrating != NULL) {
|
|
for (i = 0; i < node->migrating_count; i++) sdsfree(node->migrating[i]);
|
|
zfree(node->migrating);
|
|
}
|
|
if (node->importing != NULL) {
|
|
for (i = 0; i < node->importing_count; i++) sdsfree(node->importing[i]);
|
|
zfree(node->importing);
|
|
}
|
|
/* If the node is not the reference node, that uses the address from
|
|
* config.hostip and config.hostport, then the node ip has been
|
|
* allocated by fetchClusterConfiguration, so it must be freed. */
|
|
if (node->ip && strcmp(node->ip, config.hostip) != 0) sdsfree(node->ip);
|
|
if (node->redis_config != NULL) freeRedisConfig(node->redis_config);
|
|
zfree(node->slots);
|
|
zfree(node);
|
|
}
|
|
|
|
static void freeClusterNodes() {
|
|
int i = 0;
|
|
for (; i < config.cluster_node_count; i++) {
|
|
clusterNode *n = config.cluster_nodes[i];
|
|
if (n) freeClusterNode(n);
|
|
}
|
|
zfree(config.cluster_nodes);
|
|
config.cluster_nodes = NULL;
|
|
}
|
|
|
|
static clusterNode **addClusterNode(clusterNode *node) {
|
|
int count = config.cluster_node_count + 1;
|
|
config.cluster_nodes = zrealloc(config.cluster_nodes,
|
|
count * sizeof(*node));
|
|
if (!config.cluster_nodes) return NULL;
|
|
config.cluster_nodes[config.cluster_node_count++] = node;
|
|
return config.cluster_nodes;
|
|
}
|
|
|
|
static int fetchClusterConfiguration() {
|
|
int success = 1;
|
|
redisContext *ctx = NULL;
|
|
redisReply *reply = NULL;
|
|
ctx = getRedisContext(config.hostip, config.hostport, config.hostsocket);
|
|
if (ctx == NULL) {
|
|
exit(1);
|
|
}
|
|
clusterNode *firstNode = createClusterNode((char *) config.hostip,
|
|
config.hostport);
|
|
if (!firstNode) {success = 0; goto cleanup;}
|
|
reply = redisCommand(ctx, "CLUSTER NODES");
|
|
success = (reply != NULL);
|
|
if (!success) goto cleanup;
|
|
success = (reply->type != REDIS_REPLY_ERROR);
|
|
if (!success) {
|
|
if (config.hostsocket == NULL) {
|
|
fprintf(stderr, "Cluster node %s:%d replied with error:\n%s\n",
|
|
config.hostip, config.hostport, reply->str);
|
|
} else {
|
|
fprintf(stderr, "Cluster node %s replied with error:\n%s\n",
|
|
config.hostsocket, reply->str);
|
|
}
|
|
goto cleanup;
|
|
}
|
|
char *lines = reply->str, *p, *line;
|
|
while ((p = strstr(lines, "\n")) != NULL) {
|
|
*p = '\0';
|
|
line = lines;
|
|
lines = p + 1;
|
|
char *name = NULL, *addr = NULL, *flags = NULL, *master_id = NULL;
|
|
int i = 0;
|
|
while ((p = strchr(line, ' ')) != NULL) {
|
|
*p = '\0';
|
|
char *token = line;
|
|
line = p + 1;
|
|
switch(i++){
|
|
case 0: name = token; break;
|
|
case 1: addr = token; break;
|
|
case 2: flags = token; break;
|
|
case 3: master_id = token; break;
|
|
}
|
|
if (i == 8) break; // Slots
|
|
}
|
|
if (!flags) {
|
|
fprintf(stderr, "Invalid CLUSTER NODES reply: missing flags.\n");
|
|
success = 0;
|
|
goto cleanup;
|
|
}
|
|
int myself = (strstr(flags, "myself") != NULL);
|
|
int is_replica = (strstr(flags, "slave") != NULL ||
|
|
(master_id != NULL && master_id[0] != '-'));
|
|
if (is_replica) continue;
|
|
if (addr == NULL) {
|
|
fprintf(stderr, "Invalid CLUSTER NODES reply: missing addr.\n");
|
|
success = 0;
|
|
goto cleanup;
|
|
}
|
|
clusterNode *node = NULL;
|
|
char *ip = NULL;
|
|
int port = 0;
|
|
char *paddr = strchr(addr, ':');
|
|
if (paddr != NULL) {
|
|
*paddr = '\0';
|
|
ip = addr;
|
|
addr = paddr + 1;
|
|
/* If internal bus is specified, then just drop it. */
|
|
if ((paddr = strchr(addr, '@')) != NULL) *paddr = '\0';
|
|
port = atoi(addr);
|
|
}
|
|
if (myself) {
|
|
node = firstNode;
|
|
if (ip != NULL && strcmp(node->ip, ip) != 0) {
|
|
node->ip = sdsnew(ip);
|
|
node->port = port;
|
|
}
|
|
} else {
|
|
node = createClusterNode(sdsnew(ip), port);
|
|
}
|
|
if (node == NULL) {
|
|
success = 0;
|
|
goto cleanup;
|
|
}
|
|
if (name != NULL) node->name = sdsnew(name);
|
|
if (i == 8) {
|
|
int remaining = strlen(line);
|
|
while (remaining > 0) {
|
|
p = strchr(line, ' ');
|
|
if (p == NULL) p = line + remaining;
|
|
remaining -= (p - line);
|
|
|
|
char *slotsdef = line;
|
|
*p = '\0';
|
|
if (remaining) {
|
|
line = p + 1;
|
|
remaining--;
|
|
} else line = p;
|
|
char *dash = NULL;
|
|
if (slotsdef[0] == '[') {
|
|
slotsdef++;
|
|
if ((p = strstr(slotsdef, "->-"))) { // Migrating
|
|
*p = '\0';
|
|
p += 3;
|
|
char *closing_bracket = strchr(p, ']');
|
|
if (closing_bracket) *closing_bracket = '\0';
|
|
sds slot = sdsnew(slotsdef);
|
|
sds dst = sdsnew(p);
|
|
node->migrating_count += 2;
|
|
node->migrating =
|
|
zrealloc(node->migrating,
|
|
(node->migrating_count * sizeof(sds)));
|
|
node->migrating[node->migrating_count - 2] =
|
|
slot;
|
|
node->migrating[node->migrating_count - 1] =
|
|
dst;
|
|
} else if ((p = strstr(slotsdef, "-<-"))) {//Importing
|
|
*p = '\0';
|
|
p += 3;
|
|
char *closing_bracket = strchr(p, ']');
|
|
if (closing_bracket) *closing_bracket = '\0';
|
|
sds slot = sdsnew(slotsdef);
|
|
sds src = sdsnew(p);
|
|
node->importing_count += 2;
|
|
node->importing = zrealloc(node->importing,
|
|
(node->importing_count * sizeof(sds)));
|
|
node->importing[node->importing_count - 2] =
|
|
slot;
|
|
node->importing[node->importing_count - 1] =
|
|
src;
|
|
}
|
|
} else if ((dash = strchr(slotsdef, '-')) != NULL) {
|
|
p = dash;
|
|
int start, stop;
|
|
*p = '\0';
|
|
start = atoi(slotsdef);
|
|
stop = atoi(p + 1);
|
|
while (start <= stop) {
|
|
int slot = start++;
|
|
node->slots[node->slots_count++] = slot;
|
|
}
|
|
} else if (p > slotsdef) {
|
|
int slot = atoi(slotsdef);
|
|
node->slots[node->slots_count++] = slot;
|
|
}
|
|
}
|
|
}
|
|
if (node->slots_count == 0) {
|
|
printf("WARNING: master node %s:%d has no slots, skipping...\n",
|
|
node->ip, node->port);
|
|
continue;
|
|
}
|
|
if (!addClusterNode(node)) {
|
|
success = 0;
|
|
goto cleanup;
|
|
}
|
|
}
|
|
cleanup:
|
|
if (ctx) redisFree(ctx);
|
|
if (!success) {
|
|
if (config.cluster_nodes) freeClusterNodes();
|
|
}
|
|
if (reply) freeReplyObject(reply);
|
|
return success;
|
|
}
|
|
|
|
/* Request the current cluster slots configuration by calling CLUSTER SLOTS
|
|
* and atomically update the slots after a successful reply. */
|
|
static int fetchClusterSlotsConfiguration(client c) {
|
|
UNUSED(c);
|
|
int success = 1, is_fetching_slots = 0, last_update = 0;
|
|
size_t i;
|
|
atomicGet(config.slots_last_update, last_update);
|
|
if (c->slots_last_update < last_update) {
|
|
c->slots_last_update = last_update;
|
|
return -1;
|
|
}
|
|
redisReply *reply = NULL;
|
|
atomicGetIncr(config.is_fetching_slots, is_fetching_slots, 1);
|
|
if (is_fetching_slots) return -1; //TODO: use other codes || errno ?
|
|
atomicSet(config.is_fetching_slots, 1);
|
|
if (config.showerrors)
|
|
printf("Cluster slots configuration changed, fetching new one...\n");
|
|
const char *errmsg = "Failed to update cluster slots configuration";
|
|
static dictType dtype = {
|
|
dictSdsHash, /* hash function */
|
|
NULL, /* key dup */
|
|
NULL, /* val dup */
|
|
dictSdsKeyCompare, /* key compare */
|
|
NULL, /* key destructor */
|
|
NULL, /* val destructor */
|
|
NULL /* allow to expand */
|
|
};
|
|
/* printf("[%d] fetchClusterSlotsConfiguration\n", c->thread_id); */
|
|
dict *masters = dictCreate(&dtype, NULL);
|
|
redisContext *ctx = NULL;
|
|
for (i = 0; i < (size_t) config.cluster_node_count; i++) {
|
|
clusterNode *node = config.cluster_nodes[i];
|
|
assert(node->ip != NULL);
|
|
assert(node->name != NULL);
|
|
assert(node->port);
|
|
/* Use first node as entry point to connect to. */
|
|
if (ctx == NULL) {
|
|
ctx = getRedisContext(node->ip, node->port, NULL);
|
|
if (!ctx) {
|
|
success = 0;
|
|
goto cleanup;
|
|
}
|
|
}
|
|
if (node->updated_slots != NULL)
|
|
zfree(node->updated_slots);
|
|
node->updated_slots = NULL;
|
|
node->updated_slots_count = 0;
|
|
dictReplace(masters, node->name, node) ;
|
|
}
|
|
reply = redisCommand(ctx, "CLUSTER SLOTS");
|
|
if (reply == NULL || reply->type == REDIS_REPLY_ERROR) {
|
|
success = 0;
|
|
if (reply)
|
|
fprintf(stderr,"%s\nCLUSTER SLOTS ERROR: %s\n",errmsg,reply->str);
|
|
goto cleanup;
|
|
}
|
|
assert(reply->type == REDIS_REPLY_ARRAY);
|
|
for (i = 0; i < reply->elements; i++) {
|
|
redisReply *r = reply->element[i];
|
|
assert(r->type == REDIS_REPLY_ARRAY);
|
|
assert(r->elements >= 3);
|
|
int from, to, slot;
|
|
from = r->element[0]->integer;
|
|
to = r->element[1]->integer;
|
|
redisReply *nr = r->element[2];
|
|
assert(nr->type == REDIS_REPLY_ARRAY && nr->elements >= 3);
|
|
assert(nr->element[2]->str != NULL);
|
|
sds name = sdsnew(nr->element[2]->str);
|
|
dictEntry *entry = dictFind(masters, name);
|
|
if (entry == NULL) {
|
|
success = 0;
|
|
fprintf(stderr, "%s: could not find node with ID %s in current "
|
|
"configuration.\n", errmsg, name);
|
|
if (name) sdsfree(name);
|
|
goto cleanup;
|
|
}
|
|
sdsfree(name);
|
|
clusterNode *node = dictGetVal(entry);
|
|
if (node->updated_slots == NULL)
|
|
node->updated_slots = zcalloc(CLUSTER_SLOTS * sizeof(int));
|
|
for (slot = from; slot <= to; slot++)
|
|
node->updated_slots[node->updated_slots_count++] = slot;
|
|
}
|
|
updateClusterSlotsConfiguration();
|
|
cleanup:
|
|
freeReplyObject(reply);
|
|
redisFree(ctx);
|
|
dictRelease(masters);
|
|
atomicSet(config.is_fetching_slots, 0);
|
|
return success;
|
|
}
|
|
|
|
/* Atomically update the new slots configuration. */
|
|
static void updateClusterSlotsConfiguration() {
|
|
pthread_mutex_lock(&config.is_updating_slots_mutex);
|
|
atomicSet(config.is_updating_slots, 1);
|
|
int i;
|
|
for (i = 0; i < config.cluster_node_count; i++) {
|
|
clusterNode *node = config.cluster_nodes[i];
|
|
if (node->updated_slots != NULL) {
|
|
int *oldslots = node->slots;
|
|
node->slots = node->updated_slots;
|
|
node->slots_count = node->updated_slots_count;
|
|
node->current_slot_index = 0;
|
|
node->updated_slots = NULL;
|
|
node->updated_slots_count = 0;
|
|
zfree(oldslots);
|
|
}
|
|
}
|
|
atomicSet(config.is_updating_slots, 0);
|
|
atomicIncr(config.slots_last_update, 1);
|
|
pthread_mutex_unlock(&config.is_updating_slots_mutex);
|
|
}
|
|
|
|
/* Generate random data for redis benchmark. See #7196. */
|
|
static void genBenchmarkRandomData(char *data, int count) {
|
|
static uint32_t state = 1234;
|
|
int i = 0;
|
|
|
|
while (count--) {
|
|
state = (state*1103515245+12345);
|
|
data[i++] = '0'+((state>>16)&63);
|
|
}
|
|
}
|
|
|
|
/* Returns number of consumed options. */
|
|
int parseOptions(int argc, const char **argv) {
|
|
int i;
|
|
int lastarg;
|
|
int exit_status = 1;
|
|
|
|
for (i = 1; i < argc; i++) {
|
|
lastarg = (i == (argc-1));
|
|
|
|
if (!strcmp(argv[i],"-c")) {
|
|
if (lastarg) goto invalid;
|
|
config.numclients = atoi(argv[++i]);
|
|
} else if (!strcmp(argv[i],"-v") || !strcmp(argv[i], "--version")) {
|
|
sds version = benchmarkVersion();
|
|
printf("redis-benchmark %s\n", version);
|
|
sdsfree(version);
|
|
exit(0);
|
|
} else if (!strcmp(argv[i],"-n")) {
|
|
if (lastarg) goto invalid;
|
|
config.requests = atoi(argv[++i]);
|
|
} else if (!strcmp(argv[i],"-k")) {
|
|
if (lastarg) goto invalid;
|
|
config.keepalive = atoi(argv[++i]);
|
|
} else if (!strcmp(argv[i],"-h")) {
|
|
if (lastarg) goto invalid;
|
|
config.hostip = strdup(argv[++i]);
|
|
} else if (!strcmp(argv[i],"-p")) {
|
|
if (lastarg) goto invalid;
|
|
config.hostport = atoi(argv[++i]);
|
|
} else if (!strcmp(argv[i],"-s")) {
|
|
if (lastarg) goto invalid;
|
|
config.hostsocket = strdup(argv[++i]);
|
|
} else if (!strcmp(argv[i],"-a") ) {
|
|
if (lastarg) goto invalid;
|
|
config.auth = strdup(argv[++i]);
|
|
} else if (!strcmp(argv[i],"--user")) {
|
|
if (lastarg) goto invalid;
|
|
config.user = argv[++i];
|
|
} else if (!strcmp(argv[i],"-d")) {
|
|
if (lastarg) goto invalid;
|
|
config.datasize = atoi(argv[++i]);
|
|
if (config.datasize < 1) config.datasize=1;
|
|
if (config.datasize > 1024*1024*1024) config.datasize = 1024*1024*1024;
|
|
} else if (!strcmp(argv[i],"-P")) {
|
|
if (lastarg) goto invalid;
|
|
config.pipeline = atoi(argv[++i]);
|
|
if (config.pipeline <= 0) config.pipeline=1;
|
|
} else if (!strcmp(argv[i],"-r")) {
|
|
if (lastarg) goto invalid;
|
|
const char *next = argv[++i], *p = next;
|
|
if (*p == '-') {
|
|
p++;
|
|
if (*p < '0' || *p > '9') goto invalid;
|
|
}
|
|
config.randomkeys = 1;
|
|
config.randomkeys_keyspacelen = atoi(next);
|
|
if (config.randomkeys_keyspacelen < 0)
|
|
config.randomkeys_keyspacelen = 0;
|
|
} else if (!strcmp(argv[i],"-q")) {
|
|
config.quiet = 1;
|
|
} else if (!strcmp(argv[i],"--csv")) {
|
|
config.csv = 1;
|
|
} else if (!strcmp(argv[i],"-l")) {
|
|
config.loop = 1;
|
|
} else if (!strcmp(argv[i],"-I")) {
|
|
config.idlemode = 1;
|
|
} else if (!strcmp(argv[i],"-e")) {
|
|
config.showerrors = 1;
|
|
} else if (!strcmp(argv[i],"-t")) {
|
|
if (lastarg) goto invalid;
|
|
/* We get the list of tests to run as a string in the form
|
|
* get,set,lrange,...,test_N. Then we add a comma before and
|
|
* after the string in order to make sure that searching
|
|
* for ",testname," will always get a match if the test is
|
|
* enabled. */
|
|
config.tests = sdsnew(",");
|
|
config.tests = sdscat(config.tests,(char*)argv[++i]);
|
|
config.tests = sdscat(config.tests,",");
|
|
sdstolower(config.tests);
|
|
} else if (!strcmp(argv[i],"--dbnum")) {
|
|
if (lastarg) goto invalid;
|
|
config.dbnum = atoi(argv[++i]);
|
|
config.dbnumstr = sdsfromlonglong(config.dbnum);
|
|
} else if (!strcmp(argv[i],"--precision")) {
|
|
if (lastarg) goto invalid;
|
|
config.precision = atoi(argv[++i]);
|
|
if (config.precision < 0) config.precision = DEFAULT_LATENCY_PRECISION;
|
|
if (config.precision > MAX_LATENCY_PRECISION) config.precision = MAX_LATENCY_PRECISION;
|
|
} else if (!strcmp(argv[i],"--threads")) {
|
|
if (lastarg) goto invalid;
|
|
config.num_threads = atoi(argv[++i]);
|
|
if (config.num_threads > MAX_THREADS) {
|
|
printf("WARNING: too many threads, limiting threads to %d.\n",
|
|
MAX_THREADS);
|
|
config.num_threads = MAX_THREADS;
|
|
} else if (config.num_threads < 0) config.num_threads = 0;
|
|
} else if (!strcmp(argv[i],"--cluster")) {
|
|
config.cluster_mode = 1;
|
|
} else if (!strcmp(argv[i],"--enable-tracking")) {
|
|
config.enable_tracking = 1;
|
|
} else if (!strcmp(argv[i],"--help")) {
|
|
exit_status = 0;
|
|
goto usage;
|
|
#ifdef USE_OPENSSL
|
|
} else if (!strcmp(argv[i],"--tls")) {
|
|
config.tls = 1;
|
|
} else if (!strcmp(argv[i],"--sni")) {
|
|
if (lastarg) goto invalid;
|
|
config.sslconfig.sni = strdup(argv[++i]);
|
|
} else if (!strcmp(argv[i],"--cacertdir")) {
|
|
if (lastarg) goto invalid;
|
|
config.sslconfig.cacertdir = strdup(argv[++i]);
|
|
} else if (!strcmp(argv[i],"--cacert")) {
|
|
if (lastarg) goto invalid;
|
|
config.sslconfig.cacert = strdup(argv[++i]);
|
|
} else if (!strcmp(argv[i],"--cert")) {
|
|
if (lastarg) goto invalid;
|
|
config.sslconfig.cert = strdup(argv[++i]);
|
|
} else if (!strcmp(argv[i],"--key")) {
|
|
if (lastarg) goto invalid;
|
|
config.sslconfig.key = strdup(argv[++i]);
|
|
} else if (!strcmp(argv[i],"--tls-ciphers")) {
|
|
if (lastarg) goto invalid;
|
|
config.sslconfig.ciphers = strdup(argv[++i]);
|
|
#ifdef TLS1_3_VERSION
|
|
} else if (!strcmp(argv[i],"--tls-ciphersuites")) {
|
|
if (lastarg) goto invalid;
|
|
config.sslconfig.ciphersuites = strdup(argv[++i]);
|
|
#endif
|
|
#endif
|
|
} else {
|
|
/* Assume the user meant to provide an option when the arg starts
|
|
* with a dash. We're done otherwise and should use the remainder
|
|
* as the command and arguments for running the benchmark. */
|
|
if (argv[i][0] == '-') goto invalid;
|
|
return i;
|
|
}
|
|
}
|
|
|
|
return i;
|
|
|
|
invalid:
|
|
printf("Invalid option \"%s\" or option argument missing\n\n",argv[i]);
|
|
|
|
usage:
|
|
printf(
|
|
"Usage: redis-benchmark [-h <host>] [-p <port>] [-c <clients>] [-n <requests>] [-k <boolean>]\n\n"
|
|
" -h <hostname> Server hostname (default 127.0.0.1)\n"
|
|
" -p <port> Server port (default 6379)\n"
|
|
" -s <socket> Server socket (overrides host and port)\n"
|
|
" -a <password> Password for Redis Auth\n"
|
|
" --user <username> Used to send ACL style 'AUTH username pass'. Needs -a.\n"
|
|
" -c <clients> Number of parallel connections (default 50)\n"
|
|
" -n <requests> Total number of requests (default 100000)\n"
|
|
" -d <size> Data size of SET/GET value in bytes (default 3)\n"
|
|
" --dbnum <db> SELECT the specified db number (default 0)\n"
|
|
" --threads <num> Enable multi-thread mode.\n"
|
|
" --cluster Enable cluster mode.\n"
|
|
" --enable-tracking Send CLIENT TRACKING on before starting benchmark.\n"
|
|
" -k <boolean> 1=keep alive 0=reconnect (default 1)\n"
|
|
" -r <keyspacelen> Use random keys for SET/GET/INCR, random values for SADD,\n"
|
|
" random members and scores for ZADD.\n"
|
|
" Using this option the benchmark will expand the string __rand_int__\n"
|
|
" inside an argument with a 12 digits number in the specified range\n"
|
|
" from 0 to keyspacelen-1. The substitution changes every time a command\n"
|
|
" is executed. Default tests use this to hit random keys in the\n"
|
|
" specified range.\n"
|
|
" -P <numreq> Pipeline <numreq> requests. Default 1 (no pipeline).\n"
|
|
" -e If server replies with errors, show them on stdout.\n"
|
|
" (no more than 1 error per second is displayed)\n"
|
|
" -q Quiet. Just show query/sec values\n"
|
|
" --precision Number of decimal places to display in latency output (default 0)\n"
|
|
" --csv Output in CSV format\n"
|
|
" -l Loop. Run the tests forever\n"
|
|
" -t <tests> Only run the comma separated list of tests. The test\n"
|
|
" names are the same as the ones produced as output.\n"
|
|
" -I Idle mode. Just open N idle connections and wait.\n"
|
|
#ifdef USE_OPENSSL
|
|
" --tls Establish a secure TLS connection.\n"
|
|
" --sni <host> Server name indication for TLS.\n"
|
|
" --cacert <file> CA Certificate file to verify with.\n"
|
|
" --cacertdir <dir> Directory where trusted CA certificates are stored.\n"
|
|
" If neither cacert nor cacertdir are specified, the default\n"
|
|
" system-wide trusted root certs configuration will apply.\n"
|
|
" --cert <file> Client certificate to authenticate with.\n"
|
|
" --key <file> Private key file to authenticate with.\n"
|
|
" --tls-ciphers <list> Sets the list of prefered ciphers (TLSv1.2 and below)\n"
|
|
" in order of preference from highest to lowest separated by colon (\":\").\n"
|
|
" See the ciphers(1ssl) manpage for more information about the syntax of this string.\n"
|
|
#ifdef TLS1_3_VERSION
|
|
" --tls-ciphersuites <list> Sets the list of prefered ciphersuites (TLSv1.3)\n"
|
|
" in order of preference from highest to lowest separated by colon (\":\").\n"
|
|
" See the ciphers(1ssl) manpage for more information about the syntax of this string,\n"
|
|
" and specifically for TLSv1.3 ciphersuites.\n"
|
|
#endif
|
|
#endif
|
|
" --help Output this help and exit.\n"
|
|
" --version Output version and exit.\n\n"
|
|
"Examples:\n\n"
|
|
" Run the benchmark with the default configuration against 127.0.0.1:6379:\n"
|
|
" $ redis-benchmark\n\n"
|
|
" Use 20 parallel clients, for a total of 100k requests, against 192.168.1.1:\n"
|
|
" $ redis-benchmark -h 192.168.1.1 -p 6379 -n 100000 -c 20\n\n"
|
|
" Fill 127.0.0.1:6379 with about 1 million keys only using the SET test:\n"
|
|
" $ redis-benchmark -t set -n 1000000 -r 100000000\n\n"
|
|
" Benchmark 127.0.0.1:6379 for a few commands producing CSV output:\n"
|
|
" $ redis-benchmark -t ping,set,get -n 100000 --csv\n\n"
|
|
" Benchmark a specific command line:\n"
|
|
" $ redis-benchmark -r 10000 -n 10000 eval 'return redis.call(\"ping\")' 0\n\n"
|
|
" Fill a list with 10000 random elements:\n"
|
|
" $ redis-benchmark -r 10000 -n 10000 lpush mylist __rand_int__\n\n"
|
|
" On user specified command lines __rand_int__ is replaced with a random integer\n"
|
|
" with a range of values selected by the -r option.\n"
|
|
);
|
|
exit(exit_status);
|
|
}
|
|
|
|
int showThroughput(struct aeEventLoop *eventLoop, long long id, void *clientData) {
|
|
UNUSED(eventLoop);
|
|
UNUSED(id);
|
|
UNUSED(clientData);
|
|
int liveclients = 0;
|
|
int requests_finished = 0;
|
|
int previous_requests_finished = 0;
|
|
long long current_tick = mstime();
|
|
atomicGet(config.liveclients, liveclients);
|
|
atomicGet(config.requests_finished, requests_finished);
|
|
atomicGet(config.previous_requests_finished, previous_requests_finished);
|
|
|
|
if (liveclients == 0 && requests_finished != config.requests) {
|
|
fprintf(stderr,"All clients disconnected... aborting.\n");
|
|
exit(1);
|
|
}
|
|
if (config.num_threads && requests_finished >= config.requests) {
|
|
aeStop(eventLoop);
|
|
return AE_NOMORE;
|
|
}
|
|
if (config.csv) return 250;
|
|
if (config.idlemode == 1) {
|
|
printf("clients: %d\r", config.liveclients);
|
|
fflush(stdout);
|
|
return 250;
|
|
}
|
|
const float dt = (float)(current_tick-config.start)/1000.0;
|
|
const float rps = (float)requests_finished/dt;
|
|
const float instantaneous_dt = (float)(current_tick-config.previous_tick)/1000.0;
|
|
const float instantaneous_rps = (float)(requests_finished-previous_requests_finished)/instantaneous_dt;
|
|
config.previous_tick = current_tick;
|
|
atomicSet(config.previous_requests_finished,requests_finished);
|
|
config.last_printed_bytes = printf("%s: rps=%.1f (overall: %.1f) avg_msec=%.3f (overall: %.3f)\r", config.title, instantaneous_rps, rps, hdr_mean(config.current_sec_latency_histogram)/1000.0f, hdr_mean(config.latency_histogram)/1000.0f);
|
|
hdr_reset(config.current_sec_latency_histogram);
|
|
fflush(stdout);
|
|
return 250; /* every 250ms */
|
|
}
|
|
|
|
/* Return true if the named test was selected using the -t command line
|
|
* switch, or if all the tests are selected (no -t passed by user). */
|
|
int test_is_selected(char *name) {
|
|
char buf[256];
|
|
int l = strlen(name);
|
|
|
|
if (config.tests == NULL) return 1;
|
|
buf[0] = ',';
|
|
memcpy(buf+1,name,l);
|
|
buf[l+1] = ',';
|
|
buf[l+2] = '\0';
|
|
return strstr(config.tests,buf) != NULL;
|
|
}
|
|
|
|
int main(int argc, const char **argv) {
|
|
int i;
|
|
char *data, *cmd, *tag;
|
|
int len;
|
|
|
|
client c;
|
|
|
|
srandom(time(NULL) ^ getpid());
|
|
signal(SIGHUP, SIG_IGN);
|
|
signal(SIGPIPE, SIG_IGN);
|
|
|
|
config.numclients = 50;
|
|
config.requests = 100000;
|
|
config.liveclients = 0;
|
|
config.el = aeCreateEventLoop(1024*10);
|
|
aeCreateTimeEvent(config.el,1,showThroughput,NULL,NULL);
|
|
config.keepalive = 1;
|
|
config.datasize = 3;
|
|
config.pipeline = 1;
|
|
config.showerrors = 0;
|
|
config.randomkeys = 0;
|
|
config.randomkeys_keyspacelen = 0;
|
|
config.quiet = 0;
|
|
config.csv = 0;
|
|
config.loop = 0;
|
|
config.idlemode = 0;
|
|
config.clients = listCreate();
|
|
config.hostip = "127.0.0.1";
|
|
config.hostport = 6379;
|
|
config.hostsocket = NULL;
|
|
config.tests = NULL;
|
|
config.dbnum = 0;
|
|
config.auth = NULL;
|
|
config.precision = DEFAULT_LATENCY_PRECISION;
|
|
config.num_threads = 0;
|
|
config.threads = NULL;
|
|
config.cluster_mode = 0;
|
|
config.cluster_node_count = 0;
|
|
config.cluster_nodes = NULL;
|
|
config.redis_config = NULL;
|
|
config.is_fetching_slots = 0;
|
|
config.is_updating_slots = 0;
|
|
config.slots_last_update = 0;
|
|
config.enable_tracking = 0;
|
|
|
|
i = parseOptions(argc,argv);
|
|
argc -= i;
|
|
argv += i;
|
|
|
|
tag = "";
|
|
|
|
#ifdef USE_OPENSSL
|
|
if (config.tls) {
|
|
cliSecureInit();
|
|
}
|
|
#endif
|
|
|
|
if (config.cluster_mode) {
|
|
// We only include the slot placeholder {tag} if cluster mode is enabled
|
|
tag = ":{tag}";
|
|
|
|
/* Fetch cluster configuration. */
|
|
if (!fetchClusterConfiguration() || !config.cluster_nodes) {
|
|
if (!config.hostsocket) {
|
|
fprintf(stderr, "Failed to fetch cluster configuration from "
|
|
"%s:%d\n", config.hostip, config.hostport);
|
|
} else {
|
|
fprintf(stderr, "Failed to fetch cluster configuration from "
|
|
"%s\n", config.hostsocket);
|
|
}
|
|
exit(1);
|
|
}
|
|
if (config.cluster_node_count <= 1) {
|
|
fprintf(stderr, "Invalid cluster: %d node(s).\n",
|
|
config.cluster_node_count);
|
|
exit(1);
|
|
}
|
|
printf("Cluster has %d master nodes:\n\n", config.cluster_node_count);
|
|
int i = 0;
|
|
for (; i < config.cluster_node_count; i++) {
|
|
clusterNode *node = config.cluster_nodes[i];
|
|
if (!node) {
|
|
fprintf(stderr, "Invalid cluster node #%d\n", i);
|
|
exit(1);
|
|
}
|
|
printf("Master %d: ", i);
|
|
if (node->name) printf("%s ", node->name);
|
|
printf("%s:%d\n", node->ip, node->port);
|
|
node->redis_config = getRedisConfig(node->ip, node->port, NULL);
|
|
if (node->redis_config == NULL) {
|
|
fprintf(stderr, "WARN: could not fetch node CONFIG %s:%d\n",
|
|
node->ip, node->port);
|
|
}
|
|
}
|
|
printf("\n");
|
|
/* Automatically set thread number to node count if not specified
|
|
* by the user. */
|
|
if (config.num_threads == 0)
|
|
config.num_threads = config.cluster_node_count;
|
|
} else {
|
|
config.redis_config =
|
|
getRedisConfig(config.hostip, config.hostport, config.hostsocket);
|
|
if (config.redis_config == NULL)
|
|
fprintf(stderr, "WARN: could not fetch server CONFIG\n");
|
|
}
|
|
if (config.num_threads > 0) {
|
|
pthread_mutex_init(&(config.liveclients_mutex), NULL);
|
|
pthread_mutex_init(&(config.is_updating_slots_mutex), NULL);
|
|
}
|
|
|
|
if (config.keepalive == 0) {
|
|
printf("WARNING: keepalive disabled, you probably need 'echo 1 > /proc/sys/net/ipv4/tcp_tw_reuse' for Linux and 'sudo sysctl -w net.inet.tcp.msl=1000' for Mac OS X in order to use a lot of clients/requests\n");
|
|
}
|
|
|
|
if (config.idlemode) {
|
|
printf("Creating %d idle connections and waiting forever (Ctrl+C when done)\n", config.numclients);
|
|
int thread_id = -1, use_threads = (config.num_threads > 0);
|
|
if (use_threads) {
|
|
thread_id = 0;
|
|
initBenchmarkThreads();
|
|
}
|
|
c = createClient("",0,NULL,thread_id); /* will never receive a reply */
|
|
createMissingClients(c);
|
|
if (use_threads) startBenchmarkThreads();
|
|
else aeMain(config.el);
|
|
/* and will wait for every */
|
|
}
|
|
if(config.csv){
|
|
printf("\"test\",\"rps\",\"avg_latency_ms\",\"min_latency_ms\",\"p50_latency_ms\",\"p95_latency_ms\",\"p99_latency_ms\",\"max_latency_ms\"\n");
|
|
}
|
|
/* Run benchmark with command in the remainder of the arguments. */
|
|
if (argc) {
|
|
sds title = sdsnew(argv[0]);
|
|
for (i = 1; i < argc; i++) {
|
|
title = sdscatlen(title, " ", 1);
|
|
title = sdscatlen(title, (char*)argv[i], strlen(argv[i]));
|
|
}
|
|
|
|
do {
|
|
len = redisFormatCommandArgv(&cmd,argc,argv,NULL);
|
|
// adjust the datasize to the parsed command
|
|
config.datasize = len;
|
|
benchmark(title,cmd,len);
|
|
free(cmd);
|
|
} while(config.loop);
|
|
|
|
if (config.redis_config != NULL) freeRedisConfig(config.redis_config);
|
|
return 0;
|
|
}
|
|
|
|
/* Run default benchmark suite. */
|
|
data = zmalloc(config.datasize+1);
|
|
do {
|
|
genBenchmarkRandomData(data, config.datasize);
|
|
data[config.datasize] = '\0';
|
|
|
|
if (test_is_selected("ping_inline") || test_is_selected("ping"))
|
|
benchmark("PING_INLINE","PING\r\n",6);
|
|
|
|
if (test_is_selected("ping_mbulk") || test_is_selected("ping")) {
|
|
len = redisFormatCommand(&cmd,"PING");
|
|
benchmark("PING_MBULK",cmd,len);
|
|
free(cmd);
|
|
}
|
|
|
|
if (test_is_selected("set")) {
|
|
len = redisFormatCommand(&cmd,"SET key%s:__rand_int__ %s",tag,data);
|
|
benchmark("SET",cmd,len);
|
|
free(cmd);
|
|
}
|
|
|
|
if (test_is_selected("get")) {
|
|
len = redisFormatCommand(&cmd,"GET key%s:__rand_int__",tag);
|
|
benchmark("GET",cmd,len);
|
|
free(cmd);
|
|
}
|
|
|
|
if (test_is_selected("incr")) {
|
|
len = redisFormatCommand(&cmd,"INCR counter%s:__rand_int__",tag);
|
|
benchmark("INCR",cmd,len);
|
|
free(cmd);
|
|
}
|
|
|
|
if (test_is_selected("lpush")) {
|
|
len = redisFormatCommand(&cmd,"LPUSH mylist%s %s",tag,data);
|
|
benchmark("LPUSH",cmd,len);
|
|
free(cmd);
|
|
}
|
|
|
|
if (test_is_selected("rpush")) {
|
|
len = redisFormatCommand(&cmd,"RPUSH mylist%s %s",tag,data);
|
|
benchmark("RPUSH",cmd,len);
|
|
free(cmd);
|
|
}
|
|
|
|
if (test_is_selected("lpop")) {
|
|
len = redisFormatCommand(&cmd,"LPOP mylist%s",tag);
|
|
benchmark("LPOP",cmd,len);
|
|
free(cmd);
|
|
}
|
|
|
|
if (test_is_selected("rpop")) {
|
|
len = redisFormatCommand(&cmd,"RPOP mylist%s",tag);
|
|
benchmark("RPOP",cmd,len);
|
|
free(cmd);
|
|
}
|
|
|
|
if (test_is_selected("sadd")) {
|
|
len = redisFormatCommand(&cmd,
|
|
"SADD myset%s element:__rand_int__",tag);
|
|
benchmark("SADD",cmd,len);
|
|
free(cmd);
|
|
}
|
|
|
|
if (test_is_selected("hset")) {
|
|
len = redisFormatCommand(&cmd,
|
|
"HSET myhash%s element:__rand_int__ %s",tag,data);
|
|
benchmark("HSET",cmd,len);
|
|
free(cmd);
|
|
}
|
|
|
|
if (test_is_selected("spop")) {
|
|
len = redisFormatCommand(&cmd,"SPOP myset%s",tag);
|
|
benchmark("SPOP",cmd,len);
|
|
free(cmd);
|
|
}
|
|
|
|
if (test_is_selected("zadd")) {
|
|
char *score = "0";
|
|
if (config.randomkeys) score = "__rand_int__";
|
|
len = redisFormatCommand(&cmd,
|
|
"ZADD myzset%s %s element:__rand_int__",tag,score);
|
|
benchmark("ZADD",cmd,len);
|
|
free(cmd);
|
|
}
|
|
|
|
if (test_is_selected("zpopmin")) {
|
|
len = redisFormatCommand(&cmd,"ZPOPMIN myzset%s",tag);
|
|
benchmark("ZPOPMIN",cmd,len);
|
|
free(cmd);
|
|
}
|
|
|
|
if (test_is_selected("lrange") ||
|
|
test_is_selected("lrange_100") ||
|
|
test_is_selected("lrange_300") ||
|
|
test_is_selected("lrange_500") ||
|
|
test_is_selected("lrange_600"))
|
|
{
|
|
len = redisFormatCommand(&cmd,"LPUSH mylist%s %s",tag,data);
|
|
benchmark("LPUSH (needed to benchmark LRANGE)",cmd,len);
|
|
free(cmd);
|
|
}
|
|
|
|
if (test_is_selected("lrange") || test_is_selected("lrange_100")) {
|
|
len = redisFormatCommand(&cmd,"LRANGE mylist%s 0 99",tag);
|
|
benchmark("LRANGE_100 (first 100 elements)",cmd,len);
|
|
free(cmd);
|
|
}
|
|
|
|
if (test_is_selected("lrange") || test_is_selected("lrange_300")) {
|
|
len = redisFormatCommand(&cmd,"LRANGE mylist%s 0 299",tag);
|
|
benchmark("LRANGE_300 (first 300 elements)",cmd,len);
|
|
free(cmd);
|
|
}
|
|
|
|
if (test_is_selected("lrange") || test_is_selected("lrange_500")) {
|
|
len = redisFormatCommand(&cmd,"LRANGE mylist%s 0 449",tag);
|
|
benchmark("LRANGE_500 (first 450 elements)",cmd,len);
|
|
free(cmd);
|
|
}
|
|
|
|
if (test_is_selected("lrange") || test_is_selected("lrange_600")) {
|
|
len = redisFormatCommand(&cmd,"LRANGE mylist%s 0 599",tag);
|
|
benchmark("LRANGE_600 (first 600 elements)",cmd,len);
|
|
free(cmd);
|
|
}
|
|
|
|
if (test_is_selected("mset")) {
|
|
const char *cmd_argv[21];
|
|
cmd_argv[0] = "MSET";
|
|
sds key_placeholder = sdscatprintf(sdsnew(""),"key%s:__rand_int__",tag);
|
|
for (i = 1; i < 21; i += 2) {
|
|
cmd_argv[i] = key_placeholder;
|
|
cmd_argv[i+1] = data;
|
|
}
|
|
len = redisFormatCommandArgv(&cmd,21,cmd_argv,NULL);
|
|
benchmark("MSET (10 keys)",cmd,len);
|
|
free(cmd);
|
|
sdsfree(key_placeholder);
|
|
}
|
|
|
|
if (!config.csv) printf("\n");
|
|
} while(config.loop);
|
|
|
|
if (config.redis_config != NULL) freeRedisConfig(config.redis_config);
|
|
|
|
return 0;
|
|
}
|