From 021321e0efc0fcc54645ee9016a229257e1d4ba7 Mon Sep 17 00:00:00 2001 From: Pieter Noordhuis Date: Thu, 16 Dec 2010 23:32:02 +0100 Subject: [PATCH 1/9] Update hiredis to 0.9.2 --- deps/hiredis/Makefile | 46 +++--- deps/hiredis/README.md | 63 +++++--- deps/hiredis/adapters/ae.h | 95 ++++++++++++ deps/hiredis/adapters/libev.h | 59 +++++--- deps/hiredis/adapters/libevent.h | 20 +-- deps/hiredis/async.c | 55 +++++-- deps/hiredis/async.h | 22 ++- deps/hiredis/example-ae.c | 53 +++++++ deps/hiredis/example-libev.c | 12 +- deps/hiredis/example-libevent.c | 7 + deps/hiredis/example.c | 2 +- deps/hiredis/hiredis.c | 241 ++++++++++++++++++++++++------- deps/hiredis/hiredis.h | 19 ++- deps/hiredis/net.c | 9 +- deps/hiredis/net.h | 6 + deps/hiredis/test.c | 142 ++++++++++++++---- 16 files changed, 678 insertions(+), 173 deletions(-) create mode 100644 deps/hiredis/adapters/ae.h create mode 100644 deps/hiredis/example-ae.c diff --git a/deps/hiredis/Makefile b/deps/hiredis/Makefile index 3c4b6ce28..ca3404a42 100644 --- a/deps/hiredis/Makefile +++ b/deps/hiredis/Makefile @@ -6,32 +6,35 @@ OBJ = net.o hiredis.o sds.o async.o BINS = hiredis-example hiredis-test uname_S := $(shell sh -c 'uname -s 2>/dev/null || echo not') -OPTIMIZATION?=-O2 +OPTIMIZATION?=-O3 ifeq ($(uname_S),SunOS) - CFLAGS?= -std=c99 -pedantic $(OPTIMIZATION) -fPIC -Wall -W -D__EXTENSIONS__ -D_XPG6 - CCLINK?= -ldl -lnsl -lsocket -lm -lpthread + CFLAGS?=-std=c99 -pedantic $(OPTIMIZATION) -fPIC -Wall -W -D__EXTENSIONS__ -D_XPG6 $(ARCH) $(PROF) + CCLINK?=-ldl -lnsl -lsocket -lm -lpthread + LDFLAGS?=-L. -Wl,-R,. DYLIBNAME?=libhiredis.so - DYLIB_MAKE_CMD?=gcc -shared -Wl,-soname,${DYLIBNAME} -o ${DYLIBNAME} ${OBJ} + DYLIB_MAKE_CMD?=$(CC) -G -o ${DYLIBNAME} ${OBJ} STLIBNAME?=libhiredis.a STLIB_MAKE_CMD?=ar rcs ${STLIBNAME} ${OBJ} else ifeq ($(uname_S),Darwin) - CFLAGS?= -std=c99 -pedantic $(OPTIMIZATION) -fPIC -Wall -W -Wwrite-strings $(ARCH) $(PROF) - CCLINK?= -lm -pthread - OBJARCH?= -arch i386 -arch x86_64 + CFLAGS?=-std=c99 -pedantic $(OPTIMIZATION) -fPIC -Wall -W -Wwrite-strings $(ARCH) $(PROF) + CCLINK?=-lm -pthread + LDFLAGS?=-L. -Wl,-rpath,. + OBJARCH?=-arch i386 -arch x86_64 DYLIBNAME?=libhiredis.dylib DYLIB_MAKE_CMD?=libtool -dynamic -o ${DYLIBNAME} -lm ${DEBUG} - ${OBJ} STLIBNAME?=libhiredis.a STLIB_MAKE_CMD?=libtool -static -o ${STLIBNAME} - ${OBJ} else - CFLAGS?= -std=c99 -pedantic $(OPTIMIZATION) -fPIC -Wall -W -Wwrite-strings $(ARCH) $(PROF) - CCLINK?= -lm -pthread + CFLAGS?=-std=c99 -pedantic $(OPTIMIZATION) -fPIC -Wall -W -Wwrite-strings $(ARCH) $(PROF) + CCLINK?=-lm -pthread + LDFLAGS?=-L. -Wl,-rpath,. DYLIBNAME?=libhiredis.so DYLIB_MAKE_CMD?=gcc -shared -Wl,-soname,${DYLIBNAME} -o ${DYLIBNAME} ${OBJ} STLIBNAME?=libhiredis.a STLIB_MAKE_CMD?=ar rcs ${STLIBNAME} ${OBJ} endif -CCOPT= $(CFLAGS) $(CCLINK) $(ARCH) $(PROF) -DEBUG?= -g -ggdb +CCOPT= $(CFLAGS) $(CCLINK) +DEBUG?= -g -ggdb PREFIX?= /usr/local INSTALL_INC= $(PREFIX)/include/hiredis @@ -43,8 +46,6 @@ all: ${DYLIBNAME} ${BINS} # Deps (use make dep to generate this) net.o: net.c fmacros.h net.h async.o: async.c async.h hiredis.h sds.h util.h -example-libev.o: example-libev.c hiredis.h async.h adapters/libev.h -example-libevent.o: example-libevent.c hiredis.h async.h adapters/libevent.h example.o: example.c hiredis.h hiredis.o: hiredis.c hiredis.h net.h sds.h util.h sds.o: sds.c sds.h @@ -60,14 +61,23 @@ dynamic: ${DYLIBNAME} static: ${STLIBNAME} # Binaries: -hiredis-example-libevent: example-libevent.o ${DYLIBNAME} - $(CC) -o $@ $(CCOPT) $(DEBUG) -L. -lhiredis -levent -Wl,-rpath,. example-libevent.c +hiredis-example-libevent: example-libevent.c adapters/libevent.h ${DYLIBNAME} + $(CC) -o $@ $(CCOPT) $(DEBUG) $(LDFLAGS) -lhiredis -levent example-libevent.c -hiredis-example-libev: example-libev.o ${DYLIBNAME} - $(CC) -o $@ $(CCOPT) $(DEBUG) -L. -lhiredis -lev -Wl,-rpath,. example-libev.c +hiredis-example-libev: example-libev.c adapters/libev.h ${DYLIBNAME} + $(CC) -o $@ $(CCOPT) $(DEBUG) $(LDFLAGS) -lhiredis -lev example-libev.c + +ifndef AE_DIR +hiredis-example-ae: + @echo "Please specify AE_DIR (e.g. /src)" + @false +else +hiredis-example-ae: example-ae.c adapters/ae.h ${DYLIBNAME} + $(CC) -o $@ $(CCOPT) $(DEBUG) -I$(AE_DIR) $(LDFLAGS) -lhiredis example-ae.c $(AE_DIR)/ae.o $(AE_DIR)/zmalloc.o +endif hiredis-%: %.o ${DYLIBNAME} - $(CC) -o $@ $(CCOPT) $(DEBUG) -L. -lhiredis -Wl,-rpath,. $< + $(CC) -o $@ $(CCOPT) $(DEBUG) $(LDFLAGS) -lhiredis $< test: hiredis-test ./hiredis-test diff --git a/deps/hiredis/README.md b/deps/hiredis/README.md index 51ca2a93f..e39ff0c1b 100644 --- a/deps/hiredis/README.md +++ b/deps/hiredis/README.md @@ -35,16 +35,18 @@ To consume the synchronous API, there are only a few function calls that need to ### Connecting -The function `redisConnect` is used to create a so-called `redisContext`. The context is where -Hiredis holds state for a connection. The `redisContext` struct has an `error` field that is -non-NULL when the connection is in an error state. It contains a string with a textual -representation of the error. After trying to connect to Redis using `redisConnect` you should -check the `error` field to see if establishing the connection was successful: +The function `redisConnect` is used to create a so-called `redisContext`. The +context is where Hiredis holds state for a connection. The `redisContext` +struct has an integer `err` field that is non-zero when an the connection is in +an error state. The field `errstr` will contain a string with a description of +the error. More information on errors can be found in the **Errors** section. +After trying to connect to Redis using `redisConnect` you should +check the `err` field to see if establishing the connection was successful: redisContext *c = redisConnect("127.0.0.1", 6379); - if (c->error != NULL) { - printf("Error: %s\n", c->error); - // handle error + if (c->err) { + printf("Error: %s\n", c->errstr); + // handle error } ### Sending commands @@ -76,8 +78,8 @@ anywhere in an argument: ### Using replies The return value of `redisCommand` holds a reply when the command was -successfully executed. When the return value is `NULL`, the `error` field -in the context can be used to find out what was the cause of failure. +successfully executed. When an error occurs, the return value is `NULL` and +the `err` field in the context will be set (see section on **Errors**). Once an error is returned the context cannot be reused and you should set up a new connection. @@ -166,7 +168,7 @@ to the `redisCommand` family, apart from not returning a reply: After calling either function one or more times, `redisGetReply` can be used to receive the subsequent replies. The return value for this function is either `REDIS_OK` or `REDIS_ERR`, where the latter means an error occurred while reading a reply. Just as with the other commands, -the `error` field in the context can be used to find out what the cause of this error is. +the `err` field in the context can be used to find out what the cause of this error is. The following examples shows a simple pipeline (resulting in only a single call to `write(2)` and a single call to `write(2)`): @@ -184,10 +186,35 @@ This API can also be used to implement a blocking subscriber: reply = redisCommand(context,"SUBSCRIBE foo"); freeReplyObject(reply); while(redisGetReply(context,&reply) == REDIS_OK) { - // consume message - freeReplyObject(reply); + // consume message + freeReplyObject(reply); } +### Errors + +When a function call is not successful, depending on the function either `NULL` or `REDIS_ERR` is +returned. The `err` field inside the context will be non-zero and set to one of the +following constants: + +* **`REDIS_ERR_IO`**: + There was an I/O error while creating the connection, trying to write + to the socket or read from the socket. If you included `errno.h` in your + application, you can use the global `errno` variable to find out what is + wrong. + +* **`REDIS_ERR_EOF`**: + The server closed the connection which resulted in an empty read. + +* **`REDIS_ERR_PROTOCOL`**: + There was an error while parsing the protocol. + +* **`REDIS_ERR_OTHER`**: + Any other error. Currently, it is only used when a specified hostname to connect + to cannot be resolved. + +In every case, the `errstr` field in the context will be set to hold a string representation +of the error. + ## Asynchronous API Hiredis comes with an asynchronous API that works easily with any event library. @@ -197,15 +224,15 @@ and [libevent](http://monkey.org/~provos/libevent/). ### Connecting The function `redisAsyncConnect` can be used to establish a non-blocking connection to -Redis. It returns a pointer to the newly created `redisAsyncContext` struct. The `error` field +Redis. It returns a pointer to the newly created `redisAsyncContext` struct. The `err` field should be checked after creation to see if there were errors creating the connection. Because the connection that will be created is non-blocking, the kernel is not able to instantly return if the specified host and port is able to accept a connection. redisAsyncContext *c = redisAsyncConnect("127.0.0.1", 6379); - if (c->error != NULL) { - printf("Error: %s\n", c->error); - // handle error + if (c->err) { + printf("Error: %s\n", c->errstr); + // handle error } The asynchronous context can hold a disconnect callback function that is called when the @@ -215,7 +242,7 @@ have the following prototype: void(const redisAsyncContext *c, int status); On a disconnect, the `status` argument is set to `REDIS_OK` when disconnection was initiated by the -user, or `REDIS_ERR` when the disconnection was caused by an error. When it is `REDIS_ERR`, the `error` +user, or `REDIS_ERR` when the disconnection was caused by an error. When it is `REDIS_ERR`, the `err` field in the context can be accessed to find out the cause of the error. The context object is always free'd after the disconnect callback fired. When a reconnect is needed, diff --git a/deps/hiredis/adapters/ae.h b/deps/hiredis/adapters/ae.h new file mode 100644 index 000000000..b8b2228ed --- /dev/null +++ b/deps/hiredis/adapters/ae.h @@ -0,0 +1,95 @@ +#include +#include +#include "../hiredis.h" +#include "../async.h" + +typedef struct redisAeEvents { + redisAsyncContext *context; + aeEventLoop *loop; + int fd; + int reading, writing; +} redisAeEvents; + +void redisAeReadEvent(aeEventLoop *el, int fd, void *privdata, int mask) { + ((void)el); ((void)fd); ((void)mask); + + redisAeEvents *e = (redisAeEvents*)privdata; + redisAsyncHandleRead(e->context); +} + +void redisAeWriteEvent(aeEventLoop *el, int fd, void *privdata, int mask) { + ((void)el); ((void)fd); ((void)mask); + + redisAeEvents *e = (redisAeEvents*)privdata; + redisAsyncHandleWrite(e->context); +} + +void redisAeAddRead(void *privdata) { + redisAeEvents *e = (redisAeEvents*)privdata; + aeEventLoop *loop = e->loop; + if (!e->reading) { + e->reading = 1; + aeCreateFileEvent(loop,e->fd,AE_READABLE,redisAeReadEvent,e); + } +} + +void redisAeDelRead(void *privdata) { + redisAeEvents *e = (redisAeEvents*)privdata; + aeEventLoop *loop = e->loop; + if (e->reading) { + e->reading = 0; + aeDeleteFileEvent(loop,e->fd,AE_READABLE); + } +} + +void redisAeAddWrite(void *privdata) { + redisAeEvents *e = (redisAeEvents*)privdata; + aeEventLoop *loop = e->loop; + if (!e->writing) { + e->writing = 1; + aeCreateFileEvent(loop,e->fd,AE_WRITABLE,redisAeWriteEvent,e); + } +} + +void redisAeDelWrite(void *privdata) { + redisAeEvents *e = (redisAeEvents*)privdata; + aeEventLoop *loop = e->loop; + if (e->writing) { + e->writing = 0; + aeDeleteFileEvent(loop,e->fd,AE_WRITABLE); + } +} + +void redisAeCleanup(void *privdata) { + redisAeEvents *e = (redisAeEvents*)privdata; + redisAeDelRead(privdata); + redisAeDelWrite(privdata); + free(e); +} + +int redisAeAttach(aeEventLoop *loop, redisAsyncContext *ac) { + redisContext *c = &(ac->c); + redisAeEvents *e; + + /* Nothing should be attached when something is already attached */ + if (ac->_adapter_data != NULL) + return REDIS_ERR; + + /* Create container for context and r/w events */ + e = (redisAeEvents*)malloc(sizeof(*e)); + e->context = ac; + e->loop = loop; + e->fd = c->fd; + e->reading = e->writing = 0; + + /* Register functions to start/stop listening for events */ + ac->evAddRead = redisAeAddRead; + ac->evDelRead = redisAeDelRead; + ac->evAddWrite = redisAeAddWrite; + ac->evDelWrite = redisAeDelWrite; + ac->evCleanup = redisAeCleanup; + ac->_adapter_data = e; + + return REDIS_OK; +} + diff --git a/deps/hiredis/adapters/libev.h b/deps/hiredis/adapters/libev.h index 79c069d92..3b9ed6560 100644 --- a/deps/hiredis/adapters/libev.h +++ b/deps/hiredis/adapters/libev.h @@ -10,69 +10,89 @@ typedef struct redisLibevEvents { ev_io rev, wev; } redisLibevEvents; -void redisLibevReadEvent(struct ev_loop *loop, ev_io *watcher, int revents) { - ((void)loop); ((void)revents); - redisLibevEvents *e = watcher->data; +void redisLibevReadEvent(EV_P_ ev_io *watcher, int revents) { +#if EV_MULTIPLICITY + ((void)loop); +#endif + ((void)revents); + + redisLibevEvents *e = (redisLibevEvents*)watcher->data; redisAsyncHandleRead(e->context); } -void redisLibevWriteEvent(struct ev_loop *loop, ev_io *watcher, int revents) { - ((void)loop); ((void)revents); - redisLibevEvents *e = watcher->data; +void redisLibevWriteEvent(EV_P_ ev_io *watcher, int revents) { +#if EV_MULTIPLICITY + ((void)loop); +#endif + ((void)revents); + + redisLibevEvents *e = (redisLibevEvents*)watcher->data; redisAsyncHandleWrite(e->context); } void redisLibevAddRead(void *privdata) { - redisLibevEvents *e = privdata; + redisLibevEvents *e = (redisLibevEvents*)privdata; + struct ev_loop *loop = e->loop; + ((void)loop); if (!e->reading) { e->reading = 1; - ev_io_start(e->loop,&e->rev); + ev_io_start(EV_A_ &e->rev); } } void redisLibevDelRead(void *privdata) { - redisLibevEvents *e = privdata; + redisLibevEvents *e = (redisLibevEvents*)privdata; + struct ev_loop *loop = e->loop; + ((void)loop); if (e->reading) { e->reading = 0; - ev_io_stop(e->loop,&e->rev); + ev_io_stop(EV_A_ &e->rev); } } void redisLibevAddWrite(void *privdata) { - redisLibevEvents *e = privdata; + redisLibevEvents *e = (redisLibevEvents*)privdata; + struct ev_loop *loop = e->loop; + ((void)loop); if (!e->writing) { e->writing = 1; - ev_io_start(e->loop,&e->wev); + ev_io_start(EV_A_ &e->wev); } } void redisLibevDelWrite(void *privdata) { - redisLibevEvents *e = privdata; + redisLibevEvents *e = (redisLibevEvents*)privdata; + struct ev_loop *loop = e->loop; + ((void)loop); if (e->writing) { e->writing = 0; - ev_io_stop(e->loop,&e->wev); + ev_io_stop(EV_A_ &e->wev); } } void redisLibevCleanup(void *privdata) { - redisLibevEvents *e = privdata; + redisLibevEvents *e = (redisLibevEvents*)privdata; redisLibevDelRead(privdata); redisLibevDelWrite(privdata); free(e); } -int redisLibevAttach(redisAsyncContext *ac, struct ev_loop *loop) { +int redisLibevAttach(EV_P_ redisAsyncContext *ac) { redisContext *c = &(ac->c); redisLibevEvents *e; /* Nothing should be attached when something is already attached */ - if (ac->data != NULL) + if (ac->_adapter_data != NULL) return REDIS_ERR; /* Create container for context and r/w events */ - e = malloc(sizeof(*e)); + e = (redisLibevEvents*)malloc(sizeof(*e)); e->context = ac; +#if EV_MULTIPLICITY e->loop = loop; +#else + e->loop = NULL; +#endif e->reading = e->writing = 0; e->rev.data = e; e->wev.data = e; @@ -83,10 +103,11 @@ int redisLibevAttach(redisAsyncContext *ac, struct ev_loop *loop) { ac->evAddWrite = redisLibevAddWrite; ac->evDelWrite = redisLibevDelWrite; ac->evCleanup = redisLibevCleanup; - ac->data = e; + ac->_adapter_data = e; /* Initialize read/write events */ ev_io_init(&e->rev,redisLibevReadEvent,c->fd,EV_READ); ev_io_init(&e->wev,redisLibevWriteEvent,c->fd,EV_WRITE); return REDIS_OK; } + diff --git a/deps/hiredis/adapters/libevent.h b/deps/hiredis/adapters/libevent.h index 1b759c131..dc1f5c739 100644 --- a/deps/hiredis/adapters/libevent.h +++ b/deps/hiredis/adapters/libevent.h @@ -10,38 +10,38 @@ typedef struct redisLibeventEvents { void redisLibeventReadEvent(int fd, short event, void *arg) { ((void)fd); ((void)event); - redisLibeventEvents *e = arg; + redisLibeventEvents *e = (redisLibeventEvents*)arg; redisAsyncHandleRead(e->context); } void redisLibeventWriteEvent(int fd, short event, void *arg) { ((void)fd); ((void)event); - redisLibeventEvents *e = arg; + redisLibeventEvents *e = (redisLibeventEvents*)arg; redisAsyncHandleWrite(e->context); } void redisLibeventAddRead(void *privdata) { - redisLibeventEvents *e = privdata; + redisLibeventEvents *e = (redisLibeventEvents*)privdata; event_add(&e->rev,NULL); } void redisLibeventDelRead(void *privdata) { - redisLibeventEvents *e = privdata; + redisLibeventEvents *e = (redisLibeventEvents*)privdata; event_del(&e->rev); } void redisLibeventAddWrite(void *privdata) { - redisLibeventEvents *e = privdata; + redisLibeventEvents *e = (redisLibeventEvents*)privdata; event_add(&e->wev,NULL); } void redisLibeventDelWrite(void *privdata) { - redisLibeventEvents *e = privdata; + redisLibeventEvents *e = (redisLibeventEvents*)privdata; event_del(&e->wev); } void redisLibeventCleanup(void *privdata) { - redisLibeventEvents *e = privdata; + redisLibeventEvents *e = (redisLibeventEvents*)privdata; event_del(&e->rev); event_del(&e->wev); free(e); @@ -52,11 +52,11 @@ int redisLibeventAttach(redisAsyncContext *ac, struct event_base *base) { redisLibeventEvents *e; /* Nothing should be attached when something is already attached */ - if (ac->data != NULL) + if (ac->_adapter_data != NULL) return REDIS_ERR; /* Create container for context and r/w events */ - e = malloc(sizeof(*e)); + e = (redisLibeventEvents*)malloc(sizeof(*e)); e->context = ac; /* Register functions to start/stop listening for events */ @@ -65,7 +65,7 @@ int redisLibeventAttach(redisAsyncContext *ac, struct event_base *base) { ac->evAddWrite = redisLibeventAddWrite; ac->evDelWrite = redisLibeventDelWrite; ac->evCleanup = redisLibeventCleanup; - ac->data = e; + ac->_adapter_data = e; /* Initialize and install read/write events */ event_set(&e->rev,c->fd,EV_READ,redisLibeventReadEvent,e); diff --git a/deps/hiredis/async.c b/deps/hiredis/async.c index 04a424595..5c11243ea 100644 --- a/deps/hiredis/async.c +++ b/deps/hiredis/async.c @@ -1,5 +1,7 @@ /* * Copyright (c) 2009-2010, Salvatore Sanfilippo + * Copyright (c) 2010, Pieter Noordhuis + * * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -38,8 +40,29 @@ void __redisAppendCommand(redisContext *c, char *cmd, size_t len); static redisAsyncContext *redisAsyncInitialize(redisContext *c) { redisAsyncContext *ac = realloc(c,sizeof(redisAsyncContext)); - /* Set all bytes in the async part of the context to 0 */ - memset(ac+sizeof(redisContext),0,sizeof(redisAsyncContext)-sizeof(redisContext)); + c = &(ac->c); + + /* The regular connect functions will always set the flag REDIS_CONNECTED. + * For the async API, we want to wait until the first write event is + * received up before setting this flag, so reset it here. */ + c->flags &= ~REDIS_CONNECTED; + + ac->err = 0; + ac->errstr = NULL; + ac->data = NULL; + ac->_adapter_data = NULL; + + ac->evAddRead = NULL; + ac->evDelRead = NULL; + ac->evAddWrite = NULL; + ac->evDelWrite = NULL; + ac->evCleanup = NULL; + + ac->onConnect = NULL; + ac->onDisconnect = NULL; + + ac->replies.head = NULL; + ac->replies.tail = NULL; return ac; } @@ -70,6 +93,14 @@ int redisAsyncSetReplyObjectFunctions(redisAsyncContext *ac, redisReplyObjectFun return redisSetReplyObjectFunctions(c,fn); } +int redisAsyncSetConnectCallback(redisAsyncContext *ac, redisConnectCallback *fn) { + if (ac->onConnect == NULL) { + ac->onConnect = fn; + return REDIS_OK; + } + return REDIS_ERR; +} + int redisAsyncSetDisconnectCallback(redisAsyncContext *ac, redisDisconnectCallback *fn) { if (ac->onDisconnect == NULL) { ac->onDisconnect = fn; @@ -153,7 +184,7 @@ static void __redisAsyncDisconnect(redisAsyncContext *ac) { } /* Signal event lib to clean up */ - if (ac->evCleanup) ac->evCleanup(ac->data); + if (ac->evCleanup) ac->evCleanup(ac->_adapter_data); /* Execute callback with proper status */ if (ac->onDisconnect) ac->onDisconnect(ac,status); @@ -206,7 +237,7 @@ void redisAsyncHandleRead(redisAsyncContext *ac) { __redisAsyncDisconnect(ac); } else { /* Always re-schedule reads */ - if (ac->evAddRead) ac->evAddRead(ac->data); + if (ac->evAddRead) ac->evAddRead(ac->_adapter_data); redisProcessCallbacks(ac); } } @@ -220,13 +251,19 @@ void redisAsyncHandleWrite(redisAsyncContext *ac) { } else { /* Continue writing when not done, stop writing otherwise */ if (!done) { - if (ac->evAddWrite) ac->evAddWrite(ac->data); + if (ac->evAddWrite) ac->evAddWrite(ac->_adapter_data); } else { - if (ac->evDelWrite) ac->evDelWrite(ac->data); + if (ac->evDelWrite) ac->evDelWrite(ac->_adapter_data); } - /* Always schedule reads when something was written */ - if (ac->evAddRead) ac->evAddRead(ac->data); + /* Always schedule reads after writes */ + if (ac->evAddRead) ac->evAddRead(ac->_adapter_data); + + /* Fire onConnect when this is the first write event. */ + if (!(c->flags & REDIS_CONNECTED)) { + c->flags |= REDIS_CONNECTED; + if (ac->onConnect) ac->onConnect(ac); + } } } @@ -249,7 +286,7 @@ static int __redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void __redisPushCallback(&ac->replies,&cb); /* Always schedule a write when the write buffer is non-empty */ - if (ac->evAddWrite) ac->evAddWrite(ac->data); + if (ac->evAddWrite) ac->evAddWrite(ac->_adapter_data); return REDIS_OK; } diff --git a/deps/hiredis/async.h b/deps/hiredis/async.h index d0a99da70..2ef0e21eb 100644 --- a/deps/hiredis/async.h +++ b/deps/hiredis/async.h @@ -31,6 +31,10 @@ #define __HIREDIS_ASYNC_H #include "hiredis.h" +#ifdef __cplusplus +extern "C" { +#endif + struct redisAsyncContext; /* need forward declaration of redisAsyncContext */ /* Reply callback prototype and container */ @@ -46,8 +50,9 @@ typedef struct redisCallbackList { redisCallback *head, *tail; } redisCallbackList; -/* Disconnect callback prototype */ +/* Connection callback prototypes */ typedef void (redisDisconnectCallback)(const struct redisAsyncContext*, int status); +typedef void (redisConnectCallback)(const struct redisAsyncContext*); /* Context for an async connection to Redis */ typedef struct redisAsyncContext { @@ -58,6 +63,12 @@ typedef struct redisAsyncContext { int err; char *errstr; + /* Not used by hiredis */ + void *data; + + /* Used by the different event lib adapters to store their private data */ + void *_adapter_data; + /* Called when the library expects to start reading/writing. * The supplied functions should be idempotent. */ void (*evAddRead)(void *privdata); @@ -65,12 +76,14 @@ typedef struct redisAsyncContext { void (*evAddWrite)(void *privdata); void (*evDelWrite)(void *privdata); void (*evCleanup)(void *privdata); - void *data; /* Called when either the connection is terminated due to an error or per * user request. The status is set accordingly (REDIS_OK, REDIS_ERR). */ redisDisconnectCallback *onDisconnect; + /* Called when the first write event was received. */ + redisConnectCallback *onConnect; + /* Reply callbacks */ redisCallbackList replies; } redisAsyncContext; @@ -78,6 +91,7 @@ typedef struct redisAsyncContext { /* Functions that proxy to hiredis */ redisAsyncContext *redisAsyncConnect(const char *ip, int port); int redisAsyncSetReplyObjectFunctions(redisAsyncContext *ac, redisReplyObjectFunctions *fn); +int redisAsyncSetConnectCallback(redisAsyncContext *ac, redisConnectCallback *fn); int redisAsyncSetDisconnectCallback(redisAsyncContext *ac, redisDisconnectCallback *fn); void redisAsyncDisconnect(redisAsyncContext *ac); @@ -91,4 +105,8 @@ int redisvAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void *privdat int redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, const char *format, ...); int redisAsyncCommandArgv(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, int argc, const char **argv, const size_t *argvlen); +#ifdef __cplusplus +} +#endif + #endif diff --git a/deps/hiredis/example-ae.c b/deps/hiredis/example-ae.c new file mode 100644 index 000000000..28c34dc9f --- /dev/null +++ b/deps/hiredis/example-ae.c @@ -0,0 +1,53 @@ +#include +#include +#include +#include +#include "hiredis.h" +#include "async.h" +#include "adapters/ae.h" + +/* Put event loop in the global scope, so it can be explicitly stopped */ +static aeEventLoop *loop; + +void getCallback(redisAsyncContext *c, void *r, void *privdata) { + redisReply *reply = r; + if (reply == NULL) return; + printf("argv[%s]: %s\n", (char*)privdata, reply->str); + + /* Disconnect after receiving the reply to GET */ + redisAsyncDisconnect(c); +} + +void connectCallback(const redisAsyncContext *c) { + ((void)c); + printf("connected...\n"); +} + +void disconnectCallback(const redisAsyncContext *c, int status) { + if (status != REDIS_OK) { + printf("Error: %s\n", c->errstr); + } + printf("disconnected...\n"); + aeStop(loop); +} + +int main (int argc, char **argv) { + signal(SIGPIPE, SIG_IGN); + + redisAsyncContext *c = redisAsyncConnect("127.0.0.1", 6379); + if (c->err) { + /* Let *c leak for now... */ + printf("Error: %s\n", c->errstr); + return 1; + } + + loop = aeCreateEventLoop(); + redisAeAttach(loop, c); + redisAsyncSetConnectCallback(c,connectCallback); + redisAsyncSetDisconnectCallback(c,disconnectCallback); + redisAsyncCommand(c, NULL, NULL, "SET key %b", argv[argc-1], strlen(argv[argc-1])); + redisAsyncCommand(c, getCallback, (char*)"end-1", "GET key"); + aeMain(loop); + return 0; +} + diff --git a/deps/hiredis/example-libev.c b/deps/hiredis/example-libev.c index 199d706c9..8efa1e39b 100644 --- a/deps/hiredis/example-libev.c +++ b/deps/hiredis/example-libev.c @@ -15,15 +15,20 @@ void getCallback(redisAsyncContext *c, void *r, void *privdata) { redisAsyncDisconnect(c); } +void connectCallback(const redisAsyncContext *c) { + ((void)c); + printf("connected...\n"); +} + void disconnectCallback(const redisAsyncContext *c, int status) { if (status != REDIS_OK) { printf("Error: %s\n", c->errstr); } + printf("disconnected...\n"); } int main (int argc, char **argv) { signal(SIGPIPE, SIG_IGN); - struct ev_loop *loop = ev_default_loop(0); redisAsyncContext *c = redisAsyncConnect("127.0.0.1", 6379); if (c->err) { @@ -32,10 +37,11 @@ int main (int argc, char **argv) { return 1; } - redisLibevAttach(c,loop); + redisLibevAttach(EV_DEFAULT_ c); + redisAsyncSetConnectCallback(c,connectCallback); redisAsyncSetDisconnectCallback(c,disconnectCallback); redisAsyncCommand(c, NULL, NULL, "SET key %b", argv[argc-1], strlen(argv[argc-1])); redisAsyncCommand(c, getCallback, (char*)"end-1", "GET key"); - ev_loop(loop, 0); + ev_loop(EV_DEFAULT_ 0); return 0; } diff --git a/deps/hiredis/example-libevent.c b/deps/hiredis/example-libevent.c index c257bb6cf..f6f8c8325 100644 --- a/deps/hiredis/example-libevent.c +++ b/deps/hiredis/example-libevent.c @@ -15,10 +15,16 @@ void getCallback(redisAsyncContext *c, void *r, void *privdata) { redisAsyncDisconnect(c); } +void connectCallback(const redisAsyncContext *c) { + ((void)c); + printf("connected...\n"); +} + void disconnectCallback(const redisAsyncContext *c, int status) { if (status != REDIS_OK) { printf("Error: %s\n", c->errstr); } + printf("disconnected...\n"); } int main (int argc, char **argv) { @@ -33,6 +39,7 @@ int main (int argc, char **argv) { } redisLibeventAttach(c,base); + redisAsyncSetConnectCallback(c,connectCallback); redisAsyncSetDisconnectCallback(c,disconnectCallback); redisAsyncCommand(c, NULL, NULL, "SET key %b", argv[argc-1], strlen(argv[argc-1])); redisAsyncCommand(c, getCallback, (char*)"end-1", "GET key"); diff --git a/deps/hiredis/example.c b/deps/hiredis/example.c index 676814a2e..2506f39cd 100644 --- a/deps/hiredis/example.c +++ b/deps/hiredis/example.c @@ -17,7 +17,7 @@ int main(void) { /* PING server */ reply = redisCommand(c,"PING"); - printf("PONG: %s\n", reply->str); + printf("PING: %s\n", reply->str); freeReplyObject(reply); /* Set a key */ diff --git a/deps/hiredis/hiredis.c b/deps/hiredis/hiredis.c index 898b4d6af..d4cad7c2f 100644 --- a/deps/hiredis/hiredis.c +++ b/deps/hiredis/hiredis.c @@ -1,5 +1,7 @@ /* * Copyright (c) 2009-2010, Salvatore Sanfilippo + * Copyright (c) 2010, Pieter Noordhuis + * * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -32,6 +34,7 @@ #include #include #include +#include #include "hiredis.h" #include "net.h" @@ -44,10 +47,12 @@ typedef struct redisReader { void *reply; /* holds temporary reply */ sds buf; /* read buffer */ - unsigned int pos; /* buffer cursor */ + size_t pos; /* buffer cursor */ + size_t len; /* buffer length */ redisReadTask rstack[3]; /* stack of read tasks */ int ridx; /* index of stack */ + void *privdata; /* user-settable arbitrary field */ } redisReader; static redisReply *createReplyObject(int type); @@ -68,7 +73,7 @@ static redisReplyObjectFunctions defaultFunctions = { /* Create a reply object */ static redisReply *createReplyObject(int type) { - redisReply *r = calloc(sizeof(*r),1); + redisReply *r = malloc(sizeof(*r)); if (!r) redisOOM(); r->type = type; @@ -88,9 +93,10 @@ void freeReplyObject(void *reply) { if (r->element[j]) freeReplyObject(r->element[j]); free(r->element); break; - default: - if (r->str != NULL) - free(r->str); + case REDIS_REPLY_ERROR: + case REDIS_REPLY_STATUS: + case REDIS_REPLY_STRING: + free(r->str); break; } free(r); @@ -111,7 +117,7 @@ static void *createStringObject(const redisReadTask *task, char *str, size_t len r->len = len; if (task->parent) { - redisReply *parent = task->parent; + redisReply *parent = task->parent->obj; assert(parent->type == REDIS_REPLY_ARRAY); parent->element[task->idx] = r; } @@ -124,7 +130,7 @@ static void *createArrayObject(const redisReadTask *task, int elements) { if ((r->element = calloc(sizeof(redisReply*),elements)) == NULL) redisOOM(); if (task->parent) { - redisReply *parent = task->parent; + redisReply *parent = task->parent->obj; assert(parent->type == REDIS_REPLY_ARRAY); parent->element[task->idx] = r; } @@ -135,7 +141,7 @@ static void *createIntegerObject(const redisReadTask *task, long long value) { redisReply *r = createReplyObject(REDIS_REPLY_INTEGER); r->integer = value; if (task->parent) { - redisReply *parent = task->parent; + redisReply *parent = task->parent->obj; assert(parent->type == REDIS_REPLY_ARRAY); parent->element[task->idx] = r; } @@ -145,7 +151,7 @@ static void *createIntegerObject(const redisReadTask *task, long long value) { static void *createNilObject(const redisReadTask *task) { redisReply *r = createReplyObject(REDIS_REPLY_NIL); if (task->parent) { - redisReply *parent = task->parent; + redisReply *parent = task->parent->obj; assert(parent->type == REDIS_REPLY_ARRAY); parent->element[task->idx] = r; } @@ -154,7 +160,7 @@ static void *createNilObject(const redisReadTask *task) { static char *readBytes(redisReader *r, unsigned int bytes) { char *p; - if (sdslen(r->buf)-r->pos >= bytes) { + if (r->len-r->pos >= bytes) { p = r->buf+r->pos; r->pos += bytes; return p; @@ -162,20 +168,60 @@ static char *readBytes(redisReader *r, unsigned int bytes) { return NULL; } -static char *seekNewline(char *s) { - /* Find pointer to \r\n without strstr */ - while (s != NULL) { - s = strchr(s,'\r'); - if (s != NULL) { - if (s[1] == '\n') - break; - else - s++; +/* Find pointer to \r\n. */ +static char *seekNewline(char *s, size_t len) { + int pos = 0; + int _len = len-1; + + /* Position should be < len-1 because the character at "pos" should be + * followed by a \n. Note that strchr cannot be used because it doesn't + * allow to search a limited length and the buffer that is being searched + * might not have a trailing NULL character. */ + while (pos < _len) { + while(pos < _len && s[pos] != '\r') pos++; + if (s[pos] != '\r') { + /* Not found. */ + return NULL; } else { - break; + if (s[pos+1] == '\n') { + /* Found. */ + return s+pos; + } else { + /* Continue searching. */ + pos++; + } } } - return s; + return NULL; +} + +/* Read a long long value starting at *s, under the assumption that it will be + * terminated by \r\n. Ambiguously returns -1 for unexpected input. */ +static long long readLongLong(char *s) { + long long v = 0; + int dec, mult = 1; + char c; + + if (*s == '-') { + mult = -1; + s++; + } else if (*s == '+') { + mult = 1; + s++; + } + + while ((c = *(s++)) != '\r') { + dec = c - '0'; + if (dec >= 0 && dec < 10) { + v *= 10; + v += dec; + } else { + /* Should not happen... */ + return -1; + } + } + + return mult*v; } static char *readLine(redisReader *r, int *_len) { @@ -183,7 +229,7 @@ static char *readLine(redisReader *r, int *_len) { int len; p = r->buf+r->pos; - s = seekNewline(p); + s = seekNewline(p,(r->len-r->pos)); if (s != NULL) { len = s-(r->buf+r->pos); r->pos += len+2; /* skip \r\n */ @@ -227,7 +273,7 @@ static int processLineItem(redisReader *r) { if ((p = readLine(r,&len)) != NULL) { if (r->fn) { if (cur->type == REDIS_REPLY_INTEGER) { - obj = r->fn->createInteger(cur,strtoll(p,NULL,10)); + obj = r->fn->createInteger(cur,readLongLong(p)); } else { obj = r->fn->createString(cur,p,len); } @@ -235,9 +281,8 @@ static int processLineItem(redisReader *r) { obj = (void*)(size_t)(cur->type); } - /* If there is no root yet, register this object as root. */ - if (r->reply == NULL) - r->reply = obj; + /* Set reply if this is the root object. */ + if (r->ridx == 0) r->reply = obj; moveToNextTask(r); return 0; } @@ -250,32 +295,36 @@ static int processBulkItem(redisReader *r) { char *p, *s; long len; unsigned long bytelen; + int success = 0; p = r->buf+r->pos; - s = seekNewline(p); + s = seekNewline(p,r->len-r->pos); if (s != NULL) { p = r->buf+r->pos; bytelen = s-(r->buf+r->pos)+2; /* include \r\n */ - len = strtol(p,NULL,10); + len = readLongLong(p); if (len < 0) { /* The nil object can always be created. */ obj = r->fn ? r->fn->createNil(cur) : (void*)REDIS_REPLY_NIL; + success = 1; } else { /* Only continue when the buffer contains the entire bulk item. */ bytelen += len+2; /* include \r\n */ - if (r->pos+bytelen <= sdslen(r->buf)) { + if (r->pos+bytelen <= r->len) { obj = r->fn ? r->fn->createString(cur,s+2,len) : (void*)REDIS_REPLY_STRING; + success = 1; } } /* Proceed when obj was created. */ - if (obj != NULL) { + if (success) { r->pos += bytelen; - if (r->reply == NULL) - r->reply = obj; + + /* Set reply if this is the root object. */ + if (r->ridx == 0) r->reply = obj; moveToNextTask(r); return 0; } @@ -288,9 +337,19 @@ static int processMultiBulkItem(redisReader *r) { void *obj; char *p; long elements; + int root = 0; + + /* Set error for nested multi bulks with depth > 1 */ + if (r->ridx == 2) { + redisSetReplyReaderError(r,sdscatprintf(sdsempty(), + "No support for nested multi bulk replies with depth > 1")); + return -1; + } if ((p = readLine(r,NULL)) != NULL) { - elements = strtol(p,NULL,10); + elements = readLongLong(p); + root = (r->ridx == 0); + if (elements == -1) { obj = r->fn ? r->fn->createNil(cur) : (void*)REDIS_REPLY_NIL; @@ -302,19 +361,21 @@ static int processMultiBulkItem(redisReader *r) { /* Modify task stack when there are more than 0 elements. */ if (elements > 0) { cur->elements = elements; + cur->obj = obj; r->ridx++; r->rstack[r->ridx].type = -1; r->rstack[r->ridx].elements = -1; - r->rstack[r->ridx].parent = obj; r->rstack[r->ridx].idx = 0; + r->rstack[r->ridx].obj = NULL; + r->rstack[r->ridx].parent = cur; + r->rstack[r->ridx].privdata = r->privdata; } else { moveToNextTask(r); } } - /* Object was created, so we can always continue. */ - if (r->reply == NULL) - r->reply = obj; + /* Set reply if this is the root object. */ + if (root) r->reply = obj; return 0; } return -1; @@ -347,7 +408,7 @@ static int processItem(redisReader *r) { default: byte = sdscatrepr(sdsempty(),p,1); redisSetReplyReaderError(r,sdscatprintf(sdsempty(), - "protocol error, got %s as reply type byte", byte)); + "Protocol error, got %s as reply type byte", byte)); sdsfree(byte); return -1; } @@ -368,8 +429,7 @@ static int processItem(redisReader *r) { case REDIS_REPLY_ARRAY: return processMultiBulkItem(r); default: - redisSetReplyReaderError(r,sdscatprintf(sdsempty(), - "unknown item type '%d'", cur->type)); + assert(NULL); return -1; } } @@ -394,6 +454,17 @@ int redisReplyReaderSetReplyObjectFunctions(void *reader, redisReplyObjectFuncti return REDIS_ERR; } +/* Set the private data field that is used in the read tasks. This argument can + * be used to curry arbitrary data to the custom reply object functions. */ +int redisReplyReaderSetPrivdata(void *reader, void *privdata) { + redisReader *r = reader; + if (r->reply == NULL) { + r->privdata = privdata; + return REDIS_OK; + } + return REDIS_ERR; +} + /* External libraries wrapping hiredis might need access to the temporary * variable while the reply is built up. When the reader contains an * object in between receiving some bytes to parse, this object might @@ -437,8 +508,10 @@ void redisReplyReaderFeed(void *reader, char *buf, size_t len) { redisReader *r = reader; /* Copy the provided buffer. */ - if (buf != NULL && len >= 1) + if (buf != NULL && len >= 1) { r->buf = sdscatlen(r->buf,buf,len); + r->len = sdslen(r->buf); + } } int redisReplyReaderGetReply(void *reader, void **reply) { @@ -446,15 +519,17 @@ int redisReplyReaderGetReply(void *reader, void **reply) { if (reply != NULL) *reply = NULL; /* When the buffer is empty, there will never be a reply. */ - if (sdslen(r->buf) == 0) + if (r->len == 0) return REDIS_OK; /* Set first item to process when the stack is empty. */ if (r->ridx == -1) { r->rstack[0].type = -1; r->rstack[0].elements = -1; - r->rstack[0].parent = NULL; r->rstack[0].idx = -1; + r->rstack[0].obj = NULL; + r->rstack[0].parent = NULL; + r->rstack[0].privdata = r->privdata; r->ridx = 0; } @@ -465,14 +540,15 @@ int redisReplyReaderGetReply(void *reader, void **reply) { /* Discard the consumed part of the buffer. */ if (r->pos > 0) { - if (r->pos == sdslen(r->buf)) { + if (r->pos == r->len) { /* sdsrange has a quirck on this edge case. */ sdsfree(r->buf); r->buf = sdsempty(); } else { - r->buf = sdsrange(r->buf,r->pos,sdslen(r->buf)); + r->buf = sdsrange(r->buf,r->pos,r->len); } r->pos = 0; + r->len = sdslen(r->buf); } /* Emit a reply when there is one. */ @@ -481,7 +557,7 @@ int redisReplyReaderGetReply(void *reader, void **reply) { r->reply = NULL; /* Destroy the buffer when it is empty and is quite large. */ - if (sdslen(r->buf) == 0 && sdsavail(r->buf) > 16*1024) { + if (r->len == 0 && sdsavail(r->buf) > 16*1024) { sdsfree(r->buf); r->buf = sdsempty(); r->pos = 0; @@ -525,6 +601,7 @@ int redisvFormatCommand(char **target, const char *format, va_list ap) { char *cmd = NULL; /* final command */ int pos; /* position in final command */ sds current; /* current argument */ + int interpolated = 0; /* did we do interpolation on an argument? */ char **argv = NULL; int argc = 0, j; int totlen = 0; @@ -541,6 +618,7 @@ int redisvFormatCommand(char **target, const char *format, va_list ap) { if (sdslen(current) != 0) { addArgument(current, &argv, &argc, &totlen); current = sdsempty(); + interpolated = 0; } } else { current = sdscatlen(current,c,1); @@ -549,16 +627,74 @@ int redisvFormatCommand(char **target, const char *format, va_list ap) { switch(c[1]) { case 's': arg = va_arg(ap,char*); - current = sdscat(current,arg); + size = strlen(arg); + if (size > 0) + current = sdscatlen(current,arg,size); + interpolated = 1; break; case 'b': arg = va_arg(ap,char*); size = va_arg(ap,size_t); - current = sdscatlen(current,arg,size); + if (size > 0) + current = sdscatlen(current,arg,size); + interpolated = 1; break; case '%': - cmd = sdscat(cmd,"%"); + current = sdscat(current,"%"); break; + default: + /* Try to detect printf format */ + { + char _format[16]; + const char *_p = c+1; + size_t _l = 0; + va_list _cpy; + + /* Flags */ + if (*_p != '\0' && *_p == '#') _p++; + if (*_p != '\0' && *_p == '0') _p++; + if (*_p != '\0' && *_p == '-') _p++; + if (*_p != '\0' && *_p == ' ') _p++; + if (*_p != '\0' && *_p == '+') _p++; + + /* Field width */ + while (*_p != '\0' && isdigit(*_p)) _p++; + + /* Precision */ + if (*_p == '.') { + _p++; + while (*_p != '\0' && isdigit(*_p)) _p++; + } + + /* Modifiers */ + if (*_p != '\0') { + if (*_p == 'h' || *_p == 'l') { + /* Allow a single repetition for these modifiers */ + if (_p[0] == _p[1]) _p++; + _p++; + } + } + + /* Conversion specifier */ + if (*_p != '\0' && strchr("diouxXeEfFgGaA",*_p) != NULL) { + _l = (_p+1)-c; + if (_l < sizeof(_format)-2) { + memcpy(_format,c,_l); + _format[_l] = '\0'; + va_copy(_cpy,ap); + current = sdscatvprintf(current,_format,_cpy); + interpolated = 1; + va_end(_cpy); + + /* Update current position (note: outer blocks + * increment c twice so compensate here) */ + c = _p-1; + } + } + + /* Consume and discard vararg */ + va_arg(ap,void); + } } c++; } @@ -566,7 +702,7 @@ int redisvFormatCommand(char **target, const char *format, va_list ap) { } /* Add the last argument if needed */ - if (sdslen(current) != 0) { + if (interpolated || sdslen(current) != 0) { addArgument(current, &argv, &argc, &totlen); } else { sdsfree(current); @@ -664,7 +800,6 @@ void __redisSetError(redisContext *c, int type, const sds errstr) { static redisContext *redisContextInit() { redisContext *c = calloc(sizeof(redisContext),1); - c->fd = -1; /* quick fix for a bug that should be addressed differently */ c->err = 0; c->errstr = NULL; c->obuf = sdsempty(); @@ -692,7 +827,6 @@ void redisFree(redisContext *c) { redisContext *redisConnect(const char *ip, int port) { redisContext *c = redisContextInit(); c->flags |= REDIS_BLOCK; - c->flags |= REDIS_CONNECTED; redisContextConnectTcp(c,ip,port); return c; } @@ -700,7 +834,6 @@ redisContext *redisConnect(const char *ip, int port) { redisContext *redisConnectNonBlock(const char *ip, int port) { redisContext *c = redisContextInit(); c->flags &= ~REDIS_BLOCK; - c->flags |= REDIS_CONNECTED; redisContextConnectTcp(c,ip,port); return c; } @@ -708,7 +841,6 @@ redisContext *redisConnectNonBlock(const char *ip, int port) { redisContext *redisConnectUnix(const char *path) { redisContext *c = redisContextInit(); c->flags |= REDIS_BLOCK; - c->flags |= REDIS_CONNECTED; redisContextConnectUnix(c,path); return c; } @@ -716,7 +848,6 @@ redisContext *redisConnectUnix(const char *path) { redisContext *redisConnectUnixNonBlock(const char *path) { redisContext *c = redisContextInit(); c->flags &= ~REDIS_BLOCK; - c->flags |= REDIS_CONNECTED; redisContextConnectUnix(c,path); return c; } diff --git a/deps/hiredis/hiredis.h b/deps/hiredis/hiredis.h index cb25b363f..1412a344c 100644 --- a/deps/hiredis/hiredis.h +++ b/deps/hiredis/hiredis.h @@ -1,5 +1,7 @@ /* * Copyright (c) 2009-2010, Salvatore Sanfilippo + * Copyright (c) 2010, Pieter Noordhuis + * * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -34,7 +36,7 @@ #define HIREDIS_MAJOR 0 #define HIREDIS_MINOR 9 -#define HIREDIS_PATCH 0 +#define HIREDIS_PATCH 2 #define REDIS_ERR -1 #define REDIS_OK 0 @@ -62,12 +64,16 @@ * should be terminated once all replies have been read. */ #define REDIS_DISCONNECTING 0x4 -#define REDIS_REPLY_ERROR 0 #define REDIS_REPLY_STRING 1 #define REDIS_REPLY_ARRAY 2 #define REDIS_REPLY_INTEGER 3 #define REDIS_REPLY_NIL 4 #define REDIS_REPLY_STATUS 5 +#define REDIS_REPLY_ERROR 6 + +#ifdef __cplusplus +extern "C" { +#endif /* This is the reply object returned by redisCommand() */ typedef struct redisReply { @@ -82,8 +88,10 @@ typedef struct redisReply { typedef struct redisReadTask { int type; int elements; /* number of elements in multibulk container */ - void *parent; /* optional pointer to parent object */ int idx; /* index in parent (array) object */ + void *obj; /* holds user-generated value for a read task */ + struct redisReadTask *parent; /* parent task */ + void *privdata; /* user-settable arbitrary field */ } redisReadTask; typedef struct redisReplyObjectFunctions { @@ -112,6 +120,7 @@ typedef struct redisContext { void freeReplyObject(void *reply); void *redisReplyReaderCreate(); int redisReplyReaderSetReplyObjectFunctions(void *reader, redisReplyObjectFunctions *fn); +int redisReplyReaderSetPrivdata(void *reader, void *privdata); void *redisReplyReaderGetObject(void *reader); char *redisReplyReaderGetError(void *reader); void redisReplyReaderFree(void *ptr); @@ -154,4 +163,8 @@ void *redisvCommand(redisContext *c, const char *format, va_list ap); void *redisCommand(redisContext *c, const char *format, ...); void *redisCommandArgv(redisContext *c, int argc, const char **argv, const size_t *argvlen); +#ifdef __cplusplus +} +#endif + #endif diff --git a/deps/hiredis/net.c b/deps/hiredis/net.c index 599ba9d6b..88171461e 100644 --- a/deps/hiredis/net.c +++ b/deps/hiredis/net.c @@ -1,8 +1,9 @@ /* Extracted from anet.c to work properly with Hiredis error reporting. * * Copyright (c) 2006-2010, Salvatore Sanfilippo - * All rights reserved. + * Copyright (c) 2010, Pieter Noordhuis * + * All rights reserved. * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * @@ -43,7 +44,7 @@ #include #include -#include "hiredis.h" +#include "net.h" #include "sds.h" /* Forward declaration */ @@ -114,7 +115,7 @@ int redisContextConnectTcp(redisContext *c, const char *addr, int port) { he = gethostbyname(addr); if (he == NULL) { __redisSetError(c,REDIS_ERR_OTHER, - sdscatprintf(sdsempty(),"can't resolve: %s",addr)); + sdscatprintf(sdsempty(),"Can't resolve: %s",addr)); close(s); return REDIS_ERR; } @@ -137,6 +138,7 @@ int redisContextConnectTcp(redisContext *c, const char *addr, int port) { } c->fd = s; + c->flags |= REDIS_CONNECTED; return REDIS_OK; } @@ -163,5 +165,6 @@ int redisContextConnectUnix(redisContext *c, const char *path) { } c->fd = s; + c->flags |= REDIS_CONNECTED; return REDIS_OK; } diff --git a/deps/hiredis/net.h b/deps/hiredis/net.h index 0e560008d..b052d97fe 100644 --- a/deps/hiredis/net.h +++ b/deps/hiredis/net.h @@ -31,6 +31,12 @@ #ifndef __NET_H #define __NET_H +#include "hiredis.h" + +#if defined(__sun) +#define AF_LOCAL AF_UNIX +#endif + int redisContextConnectTcp(redisContext *c, const char *addr, int port); int redisContextConnectUnix(redisContext *c, const char *path); diff --git a/deps/hiredis/test.c b/deps/hiredis/test.c index d23bc188a..ed355a73e 100644 --- a/deps/hiredis/test.c +++ b/deps/hiredis/test.c @@ -47,17 +47,59 @@ static void test_format_commands() { len == 4+4+(3+2)+4+(3+2)+4+(3+2)); free(cmd); + test("Format command with %%s and an empty string: "); + len = redisFormatCommand(&cmd,"SET %s %s","foo",""); + test_cond(strncmp(cmd,"*3\r\n$3\r\nSET\r\n$3\r\nfoo\r\n$0\r\n\r\n",len) == 0 && + len == 4+4+(3+2)+4+(3+2)+4+(0+2)); + free(cmd); + test("Format command with %%b string interpolation: "); len = redisFormatCommand(&cmd,"SET %b %b","foo",3,"b\0r",3); test_cond(strncmp(cmd,"*3\r\n$3\r\nSET\r\n$3\r\nfoo\r\n$3\r\nb\0r\r\n",len) == 0 && len == 4+4+(3+2)+4+(3+2)+4+(3+2)); free(cmd); + test("Format command with %%b and an empty string: "); + len = redisFormatCommand(&cmd,"SET %b %b","foo",3,"",0); + test_cond(strncmp(cmd,"*3\r\n$3\r\nSET\r\n$3\r\nfoo\r\n$0\r\n\r\n",len) == 0 && + len == 4+4+(3+2)+4+(3+2)+4+(0+2)); + free(cmd); + + test("Format command with literal %%: "); + len = redisFormatCommand(&cmd,"SET %% %%"); + test_cond(strncmp(cmd,"*3\r\n$3\r\nSET\r\n$1\r\n%\r\n$1\r\n%\r\n",len) == 0 && + len == 4+4+(3+2)+4+(1+2)+4+(1+2)); + free(cmd); + + test("Format command with printf-delegation (long long): "); + len = redisFormatCommand(&cmd,"key:%08lld",1234ll); + test_cond(strncmp(cmd,"*1\r\n$12\r\nkey:00001234\r\n",len) == 0 && + len == 4+5+(12+2)); + free(cmd); + + test("Format command with printf-delegation (float): "); + len = redisFormatCommand(&cmd,"v:%06.1f",12.34f); + test_cond(strncmp(cmd,"*1\r\n$8\r\nv:0012.3\r\n",len) == 0 && + len == 4+4+(8+2)); + free(cmd); + + test("Format command with printf-delegation and extra interpolation: "); + len = redisFormatCommand(&cmd,"key:%d %b",1234,"foo",3); + test_cond(strncmp(cmd,"*2\r\n$8\r\nkey:1234\r\n$3\r\nfoo\r\n",len) == 0 && + len == 4+4+(8+2)+4+(3+2)); + free(cmd); + + test("Format command with wrong printf format and extra interpolation: "); + len = redisFormatCommand(&cmd,"key:%08p %b",1234,"foo",3); + test_cond(strncmp(cmd,"*2\r\n$6\r\nkey:8p\r\n$3\r\nfoo\r\n",len) == 0 && + len == 4+4+(6+2)+4+(3+2)); + free(cmd); + const char *argv[3]; argv[0] = "SET"; - argv[1] = "foo"; + argv[1] = "foo\0xxx"; argv[2] = "bar"; - size_t lens[3] = { 3, 3, 3 }; + size_t lens[3] = { 3, 7, 3 }; int argc = 3; test("Format command by passing argc/argv without lengths: "); @@ -68,38 +110,29 @@ static void test_format_commands() { test("Format command by passing argc/argv with lengths: "); len = redisFormatCommandArgv(&cmd,argc,argv,lens); - test_cond(strncmp(cmd,"*3\r\n$3\r\nSET\r\n$3\r\nfoo\r\n$3\r\nbar\r\n",len) == 0 && - len == 4+4+(3+2)+4+(3+2)+4+(3+2)); + test_cond(strncmp(cmd,"*3\r\n$3\r\nSET\r\n$7\r\nfoo\0xxx\r\n$3\r\nbar\r\n",len) == 0 && + len == 4+4+(3+2)+4+(7+2)+4+(3+2)); free(cmd); } static void test_blocking_connection() { redisContext *c; redisReply *reply; + int major, minor; - __connect(&c); - test("Returns I/O error when the connection is lost: "); - reply = redisCommand(c,"QUIT"); - test_cond(strcasecmp(reply->str,"OK") == 0 && redisCommand(c,"PING") == NULL); - - /* Two conditions may happen, depending on the type of connection. - * When connected via TCP, the socket will not yet be aware of the closed - * connection and the write(2) call will succeed, but the read(2) will - * result in an EOF. When connected via Unix sockets, the socket will be - * immediately aware that it was closed and fail on the write(2) call. */ - if (use_unix) { - fprintf(stderr,"Error: %s\n", c->errstr); - assert(c->err == REDIS_ERR_IO && - strcmp(c->errstr,"Broken pipe") == 0); - } else { - fprintf(stderr,"Error: %s\n", c->errstr); - assert(c->err == REDIS_ERR_EOF && - strcmp(c->errstr,"Server closed the connection") == 0); - } - freeReplyObject(reply); + test("Returns error when host cannot be resolved: "); + c = redisConnect((char*)"idontexist.local", 6379); + test_cond(c->err == REDIS_ERR_OTHER && + strcmp(c->errstr,"Can't resolve: idontexist.local") == 0); redisFree(c); - __connect(&c); /* reconnect */ + test("Returns error when the port is not open: "); + c = redisConnect((char*)"localhost", 56380); + test_cond(c->err == REDIS_ERR_IO && + strcmp(c->errstr,"Connection refused") == 0); + redisFree(c); + + __connect(&c); test("Is able to deliver commands: "); reply = redisCommand(c,"PING"); test_cond(reply->type == REDIS_REPLY_STATUS && @@ -112,12 +145,9 @@ static void test_blocking_connection() { /* Make sure the DB is emtpy */ reply = redisCommand(c,"DBSIZE"); - if (reply->type != REDIS_REPLY_INTEGER || - reply->integer != 0) { - printf("Sorry DB 9 is not empty, test can not continue\n"); + if (reply->type != REDIS_REPLY_INTEGER || reply->integer != 0) { + printf("Database #9 is not empty, test can not continue\n"); exit(1); - } else { - printf("DB 9 is empty... test can continue\n"); } freeReplyObject(reply); @@ -183,6 +213,43 @@ static void test_blocking_connection() { reply->element[1]->type == REDIS_REPLY_STATUS && strcasecmp(reply->element[1]->str,"pong") == 0); freeReplyObject(reply); + + { + /* Find out Redis version to determine the path for the next test */ + const char *field = "redis_version:"; + char *p, *eptr; + + reply = redisCommand(c,"INFO"); + p = strstr(reply->str,field); + major = strtol(p+strlen(field),&eptr,10); + p = eptr+1; /* char next to the first "." */ + minor = strtol(p,&eptr,10); + freeReplyObject(reply); + } + + test("Returns I/O error when the connection is lost: "); + reply = redisCommand(c,"QUIT"); + if (major >= 2 && minor > 0) { + /* > 2.0 returns OK on QUIT and read() should be issued once more + * to know the descriptor is at EOF. */ + test_cond(strcasecmp(reply->str,"OK") == 0 && + redisGetReply(c,(void**)&reply) == REDIS_ERR); + freeReplyObject(reply); + } else { + test_cond(reply == NULL); + } + + /* On 2.0, QUIT will cause the connection to be closed immediately and + * the read(2) for the reply on QUIT will set the error to EOF. + * On >2.0, QUIT will return with OK and another read(2) needed to be + * issued to find out the socket was closed by the server. In both + * conditions, the error will be set to EOF. */ + assert(c->err == REDIS_ERR_EOF && + strcmp(c->errstr,"Server closed the connection") == 0); + + /* Clean up context and reconnect again */ + redisFree(c); + __connect(&c); } static void test_reply_reader() { @@ -197,7 +264,7 @@ static void test_reply_reader() { ret = redisReplyReaderGetReply(reader,NULL); err = redisReplyReaderGetError(reader); test_cond(ret == REDIS_ERR && - strcasecmp(err,"protocol error, got \"@\" as reply type byte") == 0); + strcasecmp(err,"Protocol error, got \"@\" as reply type byte") == 0); redisReplyReaderFree(reader); /* when the reply already contains multiple items, they must be free'd @@ -210,7 +277,18 @@ static void test_reply_reader() { ret = redisReplyReaderGetReply(reader,NULL); err = redisReplyReaderGetError(reader); test_cond(ret == REDIS_ERR && - strcasecmp(err,"protocol error, got \"@\" as reply type byte") == 0); + strcasecmp(err,"Protocol error, got \"@\" as reply type byte") == 0); + redisReplyReaderFree(reader); + + test("Set error on nested multi bulks with depth > 1: "); + reader = redisReplyReaderCreate(); + redisReplyReaderFeed(reader,(char*)"*1\r\n",4); + redisReplyReaderFeed(reader,(char*)"*1\r\n",4); + redisReplyReaderFeed(reader,(char*)"*1\r\n",4); + ret = redisReplyReaderGetReply(reader,NULL); + err = redisReplyReaderGetError(reader); + test_cond(ret == REDIS_ERR && + strncasecmp(err,"No support for",14) == 0); redisReplyReaderFree(reader); test("Works with NULL functions for reply: "); From 53f1d81712ff66d0343647af2f15308ea7e89d30 Mon Sep 17 00:00:00 2001 From: Pieter Noordhuis Date: Thu, 16 Dec 2010 23:35:02 +0100 Subject: [PATCH 2/9] Fix NULL-termination of variable data in redis-benchmark --- src/redis-benchmark.c | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/src/redis-benchmark.c b/src/redis-benchmark.c index c44b0ae45..5e40c97b5 100644 --- a/src/redis-benchmark.c +++ b/src/redis-benchmark.c @@ -195,6 +195,11 @@ static void readHandler(aeEventLoop *el, int fd, void *privdata, int mask) { exit(1); } if (reply != NULL) { + if (reply == (void*)REDIS_REPLY_ERROR) { + fprintf(stderr,"Unexpected error reply, exiting...\n"); + exit(1); + } + if (config.donerequests < config.requests) config.latency[config.donerequests++] = c->latency; clientDone(c); @@ -454,8 +459,9 @@ int main(int argc, char **argv) { c->obuf = sdscatprintf(c->obuf,"*%d\r\n$4\r\nMSET\r\n", 11); { int i; - char *data = zmalloc(config.datasize+2); + char *data = zmalloc(config.datasize+1); memset(data,'x',config.datasize); + data[config.datasize] = '\0'; for (i = 0; i < 10; i++) { c->obuf = sdscatprintf(c->obuf,"$%d\r\n%s\r\n",config.datasize,data); } From 174df6fe497898a9645fdafdf56807f6438b9978 Mon Sep 17 00:00:00 2001 From: Pieter Noordhuis Date: Thu, 16 Dec 2010 23:41:58 +0100 Subject: [PATCH 3/9] Re-use variable data in redis-benchmark --- src/redis-benchmark.c | 26 ++++++++------------------ 1 file changed, 8 insertions(+), 18 deletions(-) diff --git a/src/redis-benchmark.c b/src/redis-benchmark.c index 5e40c97b5..a8647bda3 100644 --- a/src/redis-benchmark.c +++ b/src/redis-benchmark.c @@ -397,6 +397,7 @@ int showThroughput(struct aeEventLoop *eventLoop, long long id, void *clientData } int main(int argc, char **argv) { + int i; client c; signal(SIGHUP, SIG_IGN); @@ -440,6 +441,10 @@ int main(int argc, char **argv) { } do { + char *data = zmalloc(config.datasize+1); + memset(data,'x',config.datasize); + data[config.datasize] = '\0'; + prepareForBenchmark("PING"); c = createClient(REDIS_REPLY_STATUS); c->obuf = sdscat(c->obuf,"PING\r\n"); @@ -457,16 +462,8 @@ int main(int argc, char **argv) { prepareForBenchmark("MSET (10 keys, multi bulk)"); c = createClient(REDIS_REPLY_ARRAY); c->obuf = sdscatprintf(c->obuf,"*%d\r\n$4\r\nMSET\r\n", 11); - { - int i; - char *data = zmalloc(config.datasize+1); - memset(data,'x',config.datasize); - data[config.datasize] = '\0'; - for (i = 0; i < 10; i++) { - c->obuf = sdscatprintf(c->obuf,"$%d\r\n%s\r\n",config.datasize,data); - } - zfree(data); - } + for (i = 0; i < 10; i++) { + c->obuf = sdscatprintf(c->obuf,"$%d\r\n%s\r\n",config.datasize,data); createMissingClients(c); aeMain(config.el); endBenchmark(); @@ -474,14 +471,7 @@ int main(int argc, char **argv) { prepareForBenchmark("SET"); c = createClient(REDIS_REPLY_STATUS); c->obuf = sdscat(c->obuf,"*3\r\n$3\r\nSET\r\n$20\r\nfoo_rand000000000000\r\n"); - { - char *data = zmalloc(config.datasize+2); - memset(data,'x',config.datasize); - data[config.datasize] = '\r'; - data[config.datasize+1] = '\n'; - c->obuf = sdscatprintf(c->obuf,"$%d\r\n",config.datasize); - c->obuf = sdscatlen(c->obuf,data,config.datasize+2); - } + c->obuf = sdscatprintf(c->obuf,"$%d\r\n%s\r\n",config.datasize,data); createMissingClients(c); aeMain(config.el); endBenchmark(); From 1cd3c1e08c96dfe4bddd4ef73229dc7b89c4ce2a Mon Sep 17 00:00:00 2001 From: Pieter Noordhuis Date: Fri, 17 Dec 2010 00:19:32 +0100 Subject: [PATCH 4/9] Use multi-bulk protocol by default in redis-benchmark --- src/redis-benchmark.c | 116 +++++++++++++++++++++++++++++------------- 1 file changed, 82 insertions(+), 34 deletions(-) diff --git a/src/redis-benchmark.c b/src/redis-benchmark.c index a8647bda3..ab0d9230c 100644 --- a/src/redis-benchmark.c +++ b/src/redis-benchmark.c @@ -80,6 +80,7 @@ typedef struct _client { redisContext *context; int state; sds obuf; + char *randptr; unsigned int written; /* bytes of 'obuf' already written */ int replytype; long long start; /* start time of a request */ @@ -145,16 +146,29 @@ static void resetClient(client c) { } static void randomizeClientKey(client c) { - char *p; + char *p, *newline; char buf[32]; long r; - p = strstr(c->obuf, "_rand"); - if (!p) return; - p += 5; + if (c->randptr == NULL) return; + + /* Check if we have to randomize (only once per connection) */ + if (c->randptr == (void*)-1) { + p = strstr(c->obuf,":rand:"); + if (!p) { + c->randptr = NULL; + return; + } else { + newline = strstr(p,"\r\n"); + assert(newline-(p+6) == 12); /* 12 chars for randomness */ + c->randptr = p+6; + } + } + + /* Set random number in output buffer */ r = random() % config.randomkeys_keyspacelen; - sprintf(buf,"%ld",r); - memcpy(p,buf,strlen(buf)); + snprintf(buf,sizeof(buf),"%012ld",r); + memcpy(c->randptr,buf,12); } static void clientDone(client c) { @@ -253,7 +267,8 @@ static client createClient(int replytype) { } c->replytype = replytype; c->state = CLIENT_CONNECTING; - c->obuf = sdsempty(); + c->obuf = NULL; + c->randptr = (void*)-1; c->written = 0; redisSetReplyObjectFunctions(c->context,NULL); aeCreateFileEvent(config.el,c->context->fd,AE_WRITABLE,writeHandler,c); @@ -265,7 +280,6 @@ static client createClient(int replytype) { static void createMissingClients(client c) { while(config.liveclients < config.numclients) { client new = createClient(c->replytype); - sdsfree(new->obuf); new->obuf = sdsdup(c->obuf); if (config.randomkeys) randomizeClientKey(c); } @@ -441,114 +455,148 @@ int main(int argc, char **argv) { } do { - char *data = zmalloc(config.datasize+1); + char *data, *cmd; + int len; + + data = zmalloc(config.datasize+1); memset(data,'x',config.datasize); data[config.datasize] = '\0'; + prepareForBenchmark("PING (inline)"); + c = createClient(REDIS_REPLY_STATUS); + c->obuf = sdscat(sdsempty(),"PING\r\n"); + createMissingClients(c); + aeMain(config.el); + endBenchmark(); + prepareForBenchmark("PING"); c = createClient(REDIS_REPLY_STATUS); - c->obuf = sdscat(c->obuf,"PING\r\n"); + len = redisFormatCommand(&cmd,"PING"); + c->obuf = sdsnewlen(cmd,len); + free(cmd); createMissingClients(c); aeMain(config.el); endBenchmark(); - prepareForBenchmark("PING (multi bulk)"); - c = createClient(REDIS_REPLY_STATUS); - c->obuf = sdscat(c->obuf,"*1\r\n$4\r\nPING\r\n"); - createMissingClients(c); - aeMain(config.el); - endBenchmark(); - - prepareForBenchmark("MSET (10 keys, multi bulk)"); + prepareForBenchmark("MSET (10 keys)"); c = createClient(REDIS_REPLY_ARRAY); - c->obuf = sdscatprintf(c->obuf,"*%d\r\n$4\r\nMSET\r\n", 11); - for (i = 0; i < 10; i++) { - c->obuf = sdscatprintf(c->obuf,"$%d\r\n%s\r\n",config.datasize,data); + { + const char *argv[11]; + argv[0] = "MSET"; + for (i = 1; i < 11; i++) + argv[i] = data; + len = redisFormatCommandArgv(&cmd,11,argv,NULL); + c->obuf = sdsnewlen(cmd,len); + free(cmd); + } createMissingClients(c); aeMain(config.el); endBenchmark(); prepareForBenchmark("SET"); c = createClient(REDIS_REPLY_STATUS); - c->obuf = sdscat(c->obuf,"*3\r\n$3\r\nSET\r\n$20\r\nfoo_rand000000000000\r\n"); - c->obuf = sdscatprintf(c->obuf,"$%d\r\n%s\r\n",config.datasize,data); + len = redisFormatCommand(&cmd,"SET foo:rand:000000000000 %s",data); + c->obuf = sdsnewlen(cmd,len); + free(cmd); createMissingClients(c); aeMain(config.el); endBenchmark(); prepareForBenchmark("GET"); c = createClient(REDIS_REPLY_STRING); - c->obuf = sdscat(c->obuf,"GET foo_rand000000000000\r\n"); + len = redisFormatCommand(&cmd,"GET foo:rand:000000000000"); + c->obuf = sdsnewlen(cmd,len); + free(cmd); createMissingClients(c); aeMain(config.el); endBenchmark(); prepareForBenchmark("INCR"); c = createClient(REDIS_REPLY_INTEGER); - c->obuf = sdscat(c->obuf,"INCR counter_rand000000000000\r\n"); + len = redisFormatCommand(&cmd,"INCR counter:rand:000000000000"); + c->obuf = sdsnewlen(cmd,len); + free(cmd); createMissingClients(c); aeMain(config.el); endBenchmark(); prepareForBenchmark("LPUSH"); c = createClient(REDIS_REPLY_INTEGER); - c->obuf = sdscat(c->obuf,"LPUSH mylist bar\r\n"); + len = redisFormatCommand(&cmd,"LPUSH mylist %s",data); + c->obuf = sdsnewlen(cmd,len); + free(cmd); createMissingClients(c); aeMain(config.el); endBenchmark(); prepareForBenchmark("LPOP"); c = createClient(REDIS_REPLY_STRING); - c->obuf = sdscat(c->obuf,"LPOP mylist\r\n"); + len = redisFormatCommand(&cmd,"LPOP mylist"); + c->obuf = sdsnewlen(cmd,len); + free(cmd); createMissingClients(c); aeMain(config.el); endBenchmark(); prepareForBenchmark("SADD"); c = createClient(REDIS_REPLY_STATUS); - c->obuf = sdscat(c->obuf,"SADD myset counter_rand000000000000\r\n"); + len = redisFormatCommand(&cmd,"SADD myset counter:rand:000000000000"); + c->obuf = sdsnewlen(cmd,len); + free(cmd); createMissingClients(c); aeMain(config.el); endBenchmark(); prepareForBenchmark("SPOP"); c = createClient(REDIS_REPLY_STRING); - c->obuf = sdscat(c->obuf,"SPOP myset\r\n"); + len = redisFormatCommand(&cmd,"SPOP myset"); + c->obuf = sdsnewlen(cmd,len); + free(cmd); createMissingClients(c); aeMain(config.el); endBenchmark(); prepareForBenchmark("LPUSH (again, in order to bench LRANGE)"); c = createClient(REDIS_REPLY_STATUS); - c->obuf = sdscat(c->obuf,"LPUSH mylist bar\r\n"); + len = redisFormatCommand(&cmd,"LPUSH mylist %s",data); + c->obuf = sdsnewlen(cmd,len); + free(cmd); createMissingClients(c); aeMain(config.el); endBenchmark(); prepareForBenchmark("LRANGE (first 100 elements)"); c = createClient(REDIS_REPLY_ARRAY); - c->obuf = sdscat(c->obuf,"LRANGE mylist 0 99\r\n"); + len = redisFormatCommand(&cmd,"LRANGE mylist 0 99"); + c->obuf = sdsnewlen(cmd,len); + free(cmd); createMissingClients(c); aeMain(config.el); endBenchmark(); prepareForBenchmark("LRANGE (first 300 elements)"); c = createClient(REDIS_REPLY_ARRAY); - c->obuf = sdscat(c->obuf,"LRANGE mylist 0 299\r\n"); + len = redisFormatCommand(&cmd,"LRANGE mylist 0 299"); + c->obuf = sdsnewlen(cmd,len); + free(cmd); createMissingClients(c); aeMain(config.el); endBenchmark(); prepareForBenchmark("LRANGE (first 450 elements)"); c = createClient(REDIS_REPLY_ARRAY); - c->obuf = sdscat(c->obuf,"LRANGE mylist 0 449\r\n"); + len = redisFormatCommand(&cmd,"LRANGE mylist 0 449"); + c->obuf = sdsnewlen(cmd,len); + free(cmd); createMissingClients(c); aeMain(config.el); endBenchmark(); prepareForBenchmark("LRANGE (first 600 elements)"); c = createClient(REDIS_REPLY_ARRAY); - c->obuf = sdscat(c->obuf,"LRANGE mylist 0 599\r\n"); + len = redisFormatCommand(&cmd,"LRANGE mylist 0 599"); + c->obuf = sdsnewlen(cmd,len); + free(cmd); createMissingClients(c); aeMain(config.el); endBenchmark(); From f474a5bd4e80157235dac13326edaa99181fb120 Mon Sep 17 00:00:00 2001 From: Didier Spezia Date: Sat, 18 Dec 2010 10:58:50 +0100 Subject: [PATCH 5/9] Add wait states to deal with many connections. --- src/redis-benchmark.c | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/src/redis-benchmark.c b/src/redis-benchmark.c index ab0d9230c..df3b85fd8 100644 --- a/src/redis-benchmark.c +++ b/src/redis-benchmark.c @@ -278,11 +278,22 @@ static client createClient(int replytype) { } static void createMissingClients(client c) { + int n = 0; + while(config.liveclients < config.numclients) { client new = createClient(c->replytype); new->obuf = sdsdup(c->obuf); if (config.randomkeys) randomizeClientKey(c); + + /* Listen backlog is quite limited on most systems */ + if (++n > 64) { + usleep(50000); + n = 0; + } } + + /* Start the timer once the connection are established */ + config.start = mstime(); } static int compareLatency(const void *a, const void *b) { From f2f2424e006856aacef00e8ce8dd7be76e038dce Mon Sep 17 00:00:00 2001 From: Pieter Noordhuis Date: Wed, 22 Dec 2010 18:31:33 +0100 Subject: [PATCH 6/9] Remove code duplication --- src/redis-benchmark.c | 147 ++++++++++-------------------------------- 1 file changed, 35 insertions(+), 112 deletions(-) diff --git a/src/redis-benchmark.c b/src/redis-benchmark.c index df3b85fd8..71c12d934 100644 --- a/src/redis-benchmark.c +++ b/src/redis-benchmark.c @@ -82,7 +82,6 @@ typedef struct _client { sds obuf; char *randptr; unsigned int written; /* bytes of 'obuf' already written */ - int replytype; long long start; /* start time of a request */ long long latency; /* request latency */ } *client; @@ -250,7 +249,7 @@ static void writeHandler(aeEventLoop *el, int fd, void *privdata, int mask) { } } -static client createClient(int replytype) { +static client createClient() { client c = zmalloc(sizeof(struct _client)); if (config.hostsocket == NULL) { c->context = redisConnectNonBlock(config.hostip,config.hostport); @@ -265,7 +264,6 @@ static client createClient(int replytype) { fprintf(stderr,"%s: %s\n",config.hostsocket,c->context->errstr); exit(1); } - c->replytype = replytype; c->state = CLIENT_CONNECTING; c->obuf = NULL; c->randptr = (void*)-1; @@ -281,7 +279,7 @@ static void createMissingClients(client c) { int n = 0; while(config.liveclients < config.numclients) { - client new = createClient(c->replytype); + client new = createClient(); new->obuf = sdsdup(c->obuf); if (config.randomkeys) randomizeClientKey(c); @@ -291,9 +289,6 @@ static void createMissingClients(client c) { n = 0; } } - - /* Start the timer once the connection are established */ - config.start = mstime(); } static int compareLatency(const void *a, const void *b) { @@ -328,14 +323,20 @@ static void showLatencyReport(void) { } } -static void prepareForBenchmark(char *title) { - config.title = title; - config.start = mstime(); - config.donerequests = 0; -} +static void benchmark(char *title, char *cmd, int len) { + client c; -static void endBenchmark(void) { + config.title = title; + config.donerequests = 0; + + c = createClient(); + c->obuf = sdsnewlen(cmd,len); + createMissingClients(c); + + config.start = mstime(); + aeMain(config.el); config.totlatency = mstime()-config.start; + showLatencyReport(); freeAllClients(); } @@ -457,7 +458,6 @@ int main(int argc, char **argv) { if (config.idlemode) { printf("Creating %d idle connections and waiting forever (Ctrl+C when done)\n", config.numclients); - prepareForBenchmark("IDLE"); c = createClient(0); /* will never receive a reply */ c->obuf = sdsempty(); createMissingClients(c); @@ -473,144 +473,67 @@ int main(int argc, char **argv) { memset(data,'x',config.datasize); data[config.datasize] = '\0'; - prepareForBenchmark("PING (inline)"); - c = createClient(REDIS_REPLY_STATUS); - c->obuf = sdscat(sdsempty(),"PING\r\n"); - createMissingClients(c); - aeMain(config.el); - endBenchmark(); + benchmark("PING (inline)","PING\r\n",6); - prepareForBenchmark("PING"); - c = createClient(REDIS_REPLY_STATUS); len = redisFormatCommand(&cmd,"PING"); - c->obuf = sdsnewlen(cmd,len); + benchmark("PING",cmd,len); free(cmd); - createMissingClients(c); - aeMain(config.el); - endBenchmark(); - prepareForBenchmark("MSET (10 keys)"); - c = createClient(REDIS_REPLY_ARRAY); - { - const char *argv[11]; - argv[0] = "MSET"; - for (i = 1; i < 11; i++) - argv[i] = data; - len = redisFormatCommandArgv(&cmd,11,argv,NULL); - c->obuf = sdsnewlen(cmd,len); - free(cmd); - } - createMissingClients(c); - aeMain(config.el); - endBenchmark(); + const char *argv[11]; + argv[0] = "MSET"; + for (i = 1; i < 11; i++) + argv[i] = data; + len = redisFormatCommandArgv(&cmd,11,argv,NULL); + benchmark("MSET (10 keys)",cmd,len); + free(cmd); - prepareForBenchmark("SET"); - c = createClient(REDIS_REPLY_STATUS); len = redisFormatCommand(&cmd,"SET foo:rand:000000000000 %s",data); - c->obuf = sdsnewlen(cmd,len); + benchmark("SET",cmd,len); free(cmd); - createMissingClients(c); - aeMain(config.el); - endBenchmark(); - prepareForBenchmark("GET"); - c = createClient(REDIS_REPLY_STRING); len = redisFormatCommand(&cmd,"GET foo:rand:000000000000"); - c->obuf = sdsnewlen(cmd,len); + benchmark("GET",cmd,len); free(cmd); - createMissingClients(c); - aeMain(config.el); - endBenchmark(); - prepareForBenchmark("INCR"); - c = createClient(REDIS_REPLY_INTEGER); len = redisFormatCommand(&cmd,"INCR counter:rand:000000000000"); - c->obuf = sdsnewlen(cmd,len); + benchmark("INCR",cmd,len); free(cmd); - createMissingClients(c); - aeMain(config.el); - endBenchmark(); - prepareForBenchmark("LPUSH"); - c = createClient(REDIS_REPLY_INTEGER); len = redisFormatCommand(&cmd,"LPUSH mylist %s",data); - c->obuf = sdsnewlen(cmd,len); + benchmark("LPUSH",cmd,len); free(cmd); - createMissingClients(c); - aeMain(config.el); - endBenchmark(); - prepareForBenchmark("LPOP"); - c = createClient(REDIS_REPLY_STRING); len = redisFormatCommand(&cmd,"LPOP mylist"); - c->obuf = sdsnewlen(cmd,len); + benchmark("LPOP",cmd,len); free(cmd); - createMissingClients(c); - aeMain(config.el); - endBenchmark(); - prepareForBenchmark("SADD"); - c = createClient(REDIS_REPLY_STATUS); len = redisFormatCommand(&cmd,"SADD myset counter:rand:000000000000"); - c->obuf = sdsnewlen(cmd,len); + benchmark("SADD",cmd,len); free(cmd); - createMissingClients(c); - aeMain(config.el); - endBenchmark(); - prepareForBenchmark("SPOP"); - c = createClient(REDIS_REPLY_STRING); len = redisFormatCommand(&cmd,"SPOP myset"); - c->obuf = sdsnewlen(cmd,len); + benchmark("SPOP",cmd,len); free(cmd); - createMissingClients(c); - aeMain(config.el); - endBenchmark(); - prepareForBenchmark("LPUSH (again, in order to bench LRANGE)"); - c = createClient(REDIS_REPLY_STATUS); len = redisFormatCommand(&cmd,"LPUSH mylist %s",data); - c->obuf = sdsnewlen(cmd,len); + benchmark("LPUSH (again, in order to bench LRANGE)",cmd,len); free(cmd); - createMissingClients(c); - aeMain(config.el); - endBenchmark(); - prepareForBenchmark("LRANGE (first 100 elements)"); - c = createClient(REDIS_REPLY_ARRAY); len = redisFormatCommand(&cmd,"LRANGE mylist 0 99"); - c->obuf = sdsnewlen(cmd,len); + benchmark("LRANGE (first 100 elements)",cmd,len); free(cmd); - createMissingClients(c); - aeMain(config.el); - endBenchmark(); - prepareForBenchmark("LRANGE (first 300 elements)"); - c = createClient(REDIS_REPLY_ARRAY); len = redisFormatCommand(&cmd,"LRANGE mylist 0 299"); - c->obuf = sdsnewlen(cmd,len); + benchmark("LRANGE (first 300 elements)",cmd,len); free(cmd); - createMissingClients(c); - aeMain(config.el); - endBenchmark(); - prepareForBenchmark("LRANGE (first 450 elements)"); - c = createClient(REDIS_REPLY_ARRAY); len = redisFormatCommand(&cmd,"LRANGE mylist 0 449"); - c->obuf = sdsnewlen(cmd,len); + benchmark("LRANGE (first 450 elements)",cmd,len); free(cmd); - createMissingClients(c); - aeMain(config.el); - endBenchmark(); - prepareForBenchmark("LRANGE (first 600 elements)"); - c = createClient(REDIS_REPLY_ARRAY); len = redisFormatCommand(&cmd,"LRANGE mylist 0 599"); - c->obuf = sdsnewlen(cmd,len); + benchmark("LRANGE (first 600 elements)",cmd,len); free(cmd); - createMissingClients(c); - aeMain(config.el); - endBenchmark(); printf("\n"); } while(config.loop); From d69a483556e7bcd8642723482bd1de1cc5a10ff4 Mon Sep 17 00:00:00 2001 From: Pieter Noordhuis Date: Wed, 22 Dec 2010 18:39:52 +0100 Subject: [PATCH 7/9] Make the MSET benchmark *really* work with 10 keys --- src/redis-benchmark.c | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/src/redis-benchmark.c b/src/redis-benchmark.c index 71c12d934..382874b33 100644 --- a/src/redis-benchmark.c +++ b/src/redis-benchmark.c @@ -479,11 +479,13 @@ int main(int argc, char **argv) { benchmark("PING",cmd,len); free(cmd); - const char *argv[11]; + const char *argv[21]; argv[0] = "MSET"; - for (i = 1; i < 11; i++) - argv[i] = data; - len = redisFormatCommandArgv(&cmd,11,argv,NULL); + for (i = 1; i < 21; i += 2) { + argv[i] = "foo:rand:000000000000"; + argv[i+1] = data; + } + len = redisFormatCommandArgv(&cmd,21,argv,NULL); benchmark("MSET (10 keys)",cmd,len); free(cmd); From 3c49070b35e0bfaa69359c73b4a7942bc0e3214d Mon Sep 17 00:00:00 2001 From: Pieter Noordhuis Date: Thu, 23 Dec 2010 11:04:44 +0100 Subject: [PATCH 8/9] Find substrings to randomize when the client is created --- src/redis-benchmark.c | 58 ++++++++++++++++++++----------------------- 1 file changed, 27 insertions(+), 31 deletions(-) diff --git a/src/redis-benchmark.c b/src/redis-benchmark.c index 382874b33..fdfa0548c 100644 --- a/src/redis-benchmark.c +++ b/src/redis-benchmark.c @@ -80,7 +80,8 @@ typedef struct _client { redisContext *context; int state; sds obuf; - char *randptr; + char *randptr[10]; /* needed for MSET against 10 keys */ + size_t randlen; unsigned int written; /* bytes of 'obuf' already written */ long long start; /* start time of a request */ long long latency; /* request latency */ @@ -145,29 +146,14 @@ static void resetClient(client c) { } static void randomizeClientKey(client c) { - char *p, *newline; char buf[32]; - long r; + size_t i, r; - if (c->randptr == NULL) return; - - /* Check if we have to randomize (only once per connection) */ - if (c->randptr == (void*)-1) { - p = strstr(c->obuf,":rand:"); - if (!p) { - c->randptr = NULL; - return; - } else { - newline = strstr(p,"\r\n"); - assert(newline-(p+6) == 12); /* 12 chars for randomness */ - c->randptr = p+6; - } + for (i = 0; i < c->randlen; i++) { + r = random() % config.randomkeys_keyspacelen; + snprintf(buf,sizeof(buf),"%012lu",r); + memcpy(c->randptr[i],buf,12); } - - /* Set random number in output buffer */ - r = random() % config.randomkeys_keyspacelen; - snprintf(buf,sizeof(buf),"%012ld",r); - memcpy(c->randptr,buf,12); } static void clientDone(client c) { @@ -249,7 +235,7 @@ static void writeHandler(aeEventLoop *el, int fd, void *privdata, int mask) { } } -static client createClient() { +static client createClient(char *cmd, int len) { client c = zmalloc(sizeof(struct _client)); if (config.hostsocket == NULL) { c->context = redisConnectNonBlock(config.hostip,config.hostport); @@ -265,9 +251,22 @@ static client createClient() { exit(1); } c->state = CLIENT_CONNECTING; - c->obuf = NULL; - c->randptr = (void*)-1; + c->obuf = sdsnewlen(cmd,len); + c->randlen = 0; c->written = 0; + + /* Find substrings in the output buffer that need to be randomized. */ + if (config.randomkeys) { + char *p = c->obuf, *newline; + while ((p = strstr(p,":rand:")) != NULL) { + newline = strstr(p,"\r\n"); + assert(newline-(p+6) == 12); /* 12 chars for randomness */ + assert(c->randlen < (signed)(sizeof(c->randptr)/sizeof(char*))); + c->randptr[c->randlen++] = p+6; + p = newline+2; + } + } + redisSetReplyObjectFunctions(c->context,NULL); aeCreateFileEvent(config.el,c->context->fd,AE_WRITABLE,writeHandler,c); listAddNodeTail(config.clients,c); @@ -279,9 +278,8 @@ static void createMissingClients(client c) { int n = 0; while(config.liveclients < config.numclients) { - client new = createClient(); - new->obuf = sdsdup(c->obuf); - if (config.randomkeys) randomizeClientKey(c); + client new = createClient(c->obuf,sdslen(c->obuf)); + if (config.randomkeys) randomizeClientKey(new); /* Listen backlog is quite limited on most systems */ if (++n > 64) { @@ -329,8 +327,7 @@ static void benchmark(char *title, char *cmd, int len) { config.title = title; config.donerequests = 0; - c = createClient(); - c->obuf = sdsnewlen(cmd,len); + c = createClient(cmd,len); createMissingClients(c); config.start = mstime(); @@ -458,8 +455,7 @@ int main(int argc, char **argv) { if (config.idlemode) { printf("Creating %d idle connections and waiting forever (Ctrl+C when done)\n", config.numclients); - c = createClient(0); /* will never receive a reply */ - c->obuf = sdsempty(); + c = createClient("",0); /* will never receive a reply */ createMissingClients(c); aeMain(config.el); /* and will wait for every */ From 2380388974bd0256e1b123d608940355144e9fd4 Mon Sep 17 00:00:00 2001 From: Pieter Noordhuis Date: Thu, 23 Dec 2010 11:22:40 +0100 Subject: [PATCH 9/9] Randomize keys and set start time when first write event fires --- src/redis-benchmark.c | 20 +++++--------------- 1 file changed, 5 insertions(+), 15 deletions(-) diff --git a/src/redis-benchmark.c b/src/redis-benchmark.c index fdfa0548c..407c7fb48 100644 --- a/src/redis-benchmark.c +++ b/src/redis-benchmark.c @@ -45,10 +45,6 @@ #include "adlist.h" #include "zmalloc.h" -#define CLIENT_CONNECTING 0 -#define CLIENT_SENDQUERY 1 -#define CLIENT_READREPLY 2 - #define REDIS_NOTUSED(V) ((void) V) static struct config { @@ -78,7 +74,6 @@ static struct config { typedef struct _client { redisContext *context; - int state; sds obuf; char *randptr[10]; /* needed for MSET against 10 keys */ size_t randlen; @@ -140,9 +135,6 @@ static void resetClient(client c) { aeDeleteFileEvent(config.el,c->context->fd,AE_READABLE); aeCreateFileEvent(config.el,c->context->fd,AE_WRITABLE,writeHandler,c); c->written = 0; - c->state = CLIENT_SENDQUERY; - c->start = ustime(); - c->latency = -1; } static void randomizeClientKey(client c) { @@ -164,7 +156,6 @@ static void clientDone(client c) { } if (config.keepalive) { resetClient(c); - if (config.randomkeys) randomizeClientKey(c); } else { config.liveclients--; createMissingClients(c); @@ -212,11 +203,13 @@ static void writeHandler(aeEventLoop *el, int fd, void *privdata, int mask) { REDIS_NOTUSED(fd); REDIS_NOTUSED(mask); - if (c->state == CLIENT_CONNECTING) { - c->state = CLIENT_SENDQUERY; + /* When nothing was written yet, randomize keys and set start time. */ + if (c->written == 0) { + if (config.randomkeys) randomizeClientKey(c); c->start = ustime(); c->latency = -1; } + if (sdslen(c->obuf) > c->written) { void *ptr = c->obuf+c->written; int nwritten = write(c->context->fd,ptr,sdslen(c->obuf)-c->written); @@ -230,7 +223,6 @@ static void writeHandler(aeEventLoop *el, int fd, void *privdata, int mask) { if (sdslen(c->obuf) == c->written) { aeDeleteFileEvent(config.el,c->context->fd,AE_WRITABLE); aeCreateFileEvent(config.el,c->context->fd,AE_READABLE,readHandler,c); - c->state = CLIENT_READREPLY; } } } @@ -250,7 +242,6 @@ static client createClient(char *cmd, int len) { fprintf(stderr,"%s: %s\n",config.hostsocket,c->context->errstr); exit(1); } - c->state = CLIENT_CONNECTING; c->obuf = sdsnewlen(cmd,len); c->randlen = 0; c->written = 0; @@ -278,8 +269,7 @@ static void createMissingClients(client c) { int n = 0; while(config.liveclients < config.numclients) { - client new = createClient(c->obuf,sdslen(c->obuf)); - if (config.randomkeys) randomizeClientKey(new); + createClient(c->obuf,sdslen(c->obuf)); /* Listen backlog is quite limited on most systems */ if (++n > 64) {