mirror of
https://codeberg.org/redict/redict.git
synced 2025-01-22 16:18:28 -05:00
Merge pull request #5901 from artix75/bm_threads_cluster_dev
Redis Benchmark: add multithread idle mode
This commit is contained in:
commit
84d6ce4f56
@ -247,11 +247,11 @@ static redisConfig *getRedisConfig(const char *ip, int port,
|
||||
c = redisConnect(ip, port);
|
||||
else
|
||||
c = redisConnectUnix(hostsocket);
|
||||
if (c->err) {
|
||||
if (c == NULL || c->err) {
|
||||
fprintf(stderr,"Could not connect to Redis at ");
|
||||
if (hostsocket == NULL)
|
||||
fprintf(stderr,"%s:%d: %s\n",ip,port,c->errstr);
|
||||
else fprintf(stderr,"%s: %s\n",hostsocket,c->errstr);
|
||||
char *err = (c != NULL ? c->errstr : "");
|
||||
if (hostsocket == NULL) fprintf(stderr,"%s:%d: %s\n",ip,port,err);
|
||||
else fprintf(stderr,"%s: %s\n",hostsocket,err);
|
||||
goto fail;
|
||||
}
|
||||
redisAppendCommand(c, "CONFIG GET %s", "save");
|
||||
@ -276,18 +276,16 @@ static redisConfig *getRedisConfig(const char *ip, int port,
|
||||
case 1: cfg->appendonly = sdsnew(value); break;
|
||||
}
|
||||
}
|
||||
if (reply) freeReplyObject(reply);
|
||||
if (c) redisFree(c);
|
||||
freeReplyObject(reply);
|
||||
redisFree(c);
|
||||
return cfg;
|
||||
fail:
|
||||
if (reply) freeReplyObject(reply);
|
||||
if (c) redisFree(c);
|
||||
zfree(cfg);
|
||||
fprintf(stderr, "ERROR: failed to fetch CONFIG from ");
|
||||
if (c->connection_type == REDIS_CONN_TCP)
|
||||
fprintf(stderr, "%s:%d\n", c->tcp.host, c->tcp.port);
|
||||
else if (c->connection_type == REDIS_CONN_UNIX)
|
||||
fprintf(stderr, "%s\n", c->unix_sock.path);
|
||||
if (hostsocket == NULL) fprintf(stderr, "%s:%d\n", ip, port);
|
||||
else fprintf(stderr, "%s\n", hostsocket);
|
||||
freeReplyObject(reply);
|
||||
redisFree(c);
|
||||
zfree(cfg);
|
||||
return NULL;
|
||||
}
|
||||
static void freeRedisConfig(redisConfig *cfg) {
|
||||
@ -345,7 +343,9 @@ static void randomizeClientKey(client c) {
|
||||
|
||||
for (i = 0; i < c->randlen; i++) {
|
||||
char *p = c->randptr[i]+11;
|
||||
size_t r = random() % config.randomkeys_keyspacelen;
|
||||
size_t r = 0;
|
||||
if (config.randomkeys_keyspacelen != 0)
|
||||
r = random() % config.randomkeys_keyspacelen;
|
||||
size_t j;
|
||||
|
||||
for (j = 0; j < 12; j++) {
|
||||
@ -447,27 +447,27 @@ static void readHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
|
||||
}
|
||||
}
|
||||
|
||||
if (config.cluster_mode && is_err && c->cluster_node &&
|
||||
(!strncmp(r->str,"MOVED",5) ||
|
||||
!strcmp(r->str,"ASK")))
|
||||
{
|
||||
/* Try to update slots configuration if the key of the
|
||||
* command is using the slot hash tag. */
|
||||
if (c->staglen && !fetchClusterSlotsConfiguration(c))
|
||||
exit(1);
|
||||
/*
|
||||
clusterNode *node = c->cluster_node;
|
||||
assert(node);
|
||||
if (++node->current_slot_index >= node->slots_count) {
|
||||
if (config.showerrors) {
|
||||
fprintf(stderr, "WARN: No more available slots in "
|
||||
"node %s:%d\n", node->ip, node->port);
|
||||
}
|
||||
freeReplyObject(reply);
|
||||
freeClient(c);
|
||||
return;
|
||||
/* Try to update slots configuration if reply error is
|
||||
* MOVED/ASK/CLUSTERDOWN and the key(s) used by the command
|
||||
* contain(s) the slot hash tag. */
|
||||
if (is_err && c->cluster_node && c->staglen) {
|
||||
int fetch_slots = 0, do_wait = 0;
|
||||
if (!strncmp(r->str,"MOVED",5) || !strncmp(r->str,"ASK",3))
|
||||
fetch_slots = 1;
|
||||
else if (!strncmp(r->str,"CLUSTERDOWN",11)) {
|
||||
/* Usually the cluster is able to recover itself after
|
||||
* a CLUSTERDOWN error, so try to sleep one second
|
||||
* before requesting the new configuration. */
|
||||
fetch_slots = 1;
|
||||
do_wait = 1;
|
||||
printf("Error from server %s:%d: %s\n",
|
||||
c->cluster_node->ip,
|
||||
c->cluster_node->port,
|
||||
r->str);
|
||||
}
|
||||
*/
|
||||
if (do_wait) sleep(1);
|
||||
if (fetch_slots && !fetchClusterSlotsConfiguration(c))
|
||||
exit(1);
|
||||
}
|
||||
|
||||
freeReplyObject(reply);
|
||||
@ -817,22 +817,37 @@ static void showLatencyReport(void) {
|
||||
}
|
||||
}
|
||||
|
||||
static void benchmark(char *title, char *cmd, int len) {
|
||||
static void initBenchmarkThreads() {
|
||||
int i;
|
||||
if (config.threads) freeBenchmarkThreads();
|
||||
config.threads = zmalloc(config.num_threads * sizeof(benchmarkThread*));
|
||||
for (i = 0; i < config.num_threads; i++) {
|
||||
benchmarkThread *thread = createBenchmarkThread(i);
|
||||
config.threads[i] = thread;
|
||||
}
|
||||
}
|
||||
|
||||
static void startBenchmarkThreads() {
|
||||
int i;
|
||||
for (i = 0; i < config.num_threads; i++) {
|
||||
benchmarkThread *t = config.threads[i];
|
||||
if (pthread_create(&(t->thread), NULL, execBenchmarkThread, t)){
|
||||
fprintf(stderr, "FATAL: Failed to start thread %d.\n", i);
|
||||
exit(1);
|
||||
}
|
||||
}
|
||||
for (i = 0; i < config.num_threads; i++)
|
||||
pthread_join(config.threads[i]->thread, NULL);
|
||||
}
|
||||
|
||||
static void benchmark(char *title, char *cmd, int len) {
|
||||
client c;
|
||||
|
||||
config.title = title;
|
||||
config.requests_issued = 0;
|
||||
config.requests_finished = 0;
|
||||
|
||||
if (config.num_threads) {
|
||||
if (config.threads) freeBenchmarkThreads();
|
||||
config.threads = zmalloc(config.num_threads * sizeof(benchmarkThread*));
|
||||
for (i = 0; i < config.num_threads; i++) {
|
||||
benchmarkThread *thread = createBenchmarkThread(i);
|
||||
config.threads[i] = thread;
|
||||
}
|
||||
}
|
||||
if (config.num_threads) initBenchmarkThreads();
|
||||
|
||||
int thread_id = config.num_threads > 0 ? 0 : -1;
|
||||
c = createClient(cmd,len,NULL,thread_id);
|
||||
@ -840,17 +855,7 @@ static void benchmark(char *title, char *cmd, int len) {
|
||||
|
||||
config.start = mstime();
|
||||
if (!config.num_threads) aeMain(config.el);
|
||||
else {
|
||||
for (i = 0; i < config.num_threads; i++) {
|
||||
benchmarkThread *t = config.threads[i];
|
||||
if (pthread_create(&(t->thread), NULL, execBenchmarkThread, t)){
|
||||
fprintf(stderr, "FATAL: Failed to start thread %d.\n", i);
|
||||
exit(1);
|
||||
}
|
||||
}
|
||||
for (i = 0; i < config.num_threads; i++)
|
||||
pthread_join(config.threads[i]->thread, NULL);
|
||||
}
|
||||
else startBenchmarkThreads();
|
||||
config.totlatency = mstime()-config.start;
|
||||
|
||||
showLatencyReport();
|
||||
@ -1283,6 +1288,11 @@ int parseOptions(int argc, const char **argv) {
|
||||
if (config.pipeline <= 0) config.pipeline=1;
|
||||
} else if (!strcmp(argv[i],"-r")) {
|
||||
if (lastarg) goto invalid;
|
||||
const char *next = argv[++i], *p = next;
|
||||
if (*p == '-') {
|
||||
p++;
|
||||
if (*p < '0' || *p > '9') goto invalid;
|
||||
}
|
||||
config.randomkeys = 1;
|
||||
config.randomkeys_keyspacelen = atoi(argv[++i]);
|
||||
if (config.randomkeys_keyspacelen < 0)
|
||||
@ -1546,9 +1556,15 @@ int main(int argc, const char **argv) {
|
||||
|
||||
if (config.idlemode) {
|
||||
printf("Creating %d idle connections and waiting forever (Ctrl+C when done)\n", config.numclients);
|
||||
c = createClient("",0,NULL,-1); /* will never receive a reply */
|
||||
int thread_id = -1, use_threads = (config.num_threads > 0);
|
||||
if (use_threads) {
|
||||
thread_id = 0;
|
||||
initBenchmarkThreads();
|
||||
}
|
||||
c = createClient("",0,NULL,thread_id); /* will never receive a reply */
|
||||
createMissingClients(c);
|
||||
aeMain(config.el);
|
||||
if (use_threads) startBenchmarkThreads();
|
||||
else aeMain(config.el);
|
||||
/* and will wait for every */
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user