/* * Copyright (c) 2009-2012, Salvatore Sanfilippo * All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * * Redistributions of source code must retain the above copyright notice, * this list of conditions and the following disclaimer. * * Redistributions in binary form must reproduce the above copyright * notice, this list of conditions and the following disclaimer in the * documentation and/or other materials provided with the distribution. * * Neither the name of Redis nor the names of its contributors may be used * to endorse or promote products derived from this software without * specific prior written permission. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE * POSSIBILITY OF SUCH DAMAGE. */ #include "redis.h" #include #include static void setProtocolError(redisClient *c, int pos); /* To evaluate the output buffer size of a client we need to get size of * allocated objects, however we can't used zmalloc_size() directly on sds * strings because of the trick they use to work (the header is before the * returned pointer), so we use this helper function. */ size_t zmalloc_size_sds(sds s) { return zmalloc_size(s-sizeof(struct sdshdr)); } /* Return the amount of memory used by the sds string at object->ptr * for a string object. */ size_t getStringObjectSdsUsedMemory(robj *o) { redisAssertWithInfo(NULL,o,o->type == REDIS_STRING); switch(o->encoding) { case REDIS_ENCODING_RAW: return zmalloc_size_sds(o->ptr); case REDIS_ENCODING_EMBSTR: return sdslen(o->ptr); default: return 0; /* Just integer encoding for now. */ } } void *dupClientReplyValue(void *o) { incrRefCount((robj*)o); return o; } int listMatchObjects(void *a, void *b) { return equalStringObjects(a,b); } redisClient *createClient(int fd) { redisClient *c = zmalloc(sizeof(redisClient)); /* passing -1 as fd it is possible to create a non connected client. * This is useful since all the Redis commands needs to be executed * in the context of a client. When commands are executed in other * contexts (for instance a Lua script) we need a non connected client. */ if (fd != -1) { anetNonBlock(NULL,fd); anetEnableTcpNoDelay(NULL,fd); if (server.tcpkeepalive) anetKeepAlive(NULL,fd,server.tcpkeepalive); if (aeCreateFileEvent(server.el,fd,AE_READABLE, readQueryFromClient, c) == AE_ERR) { close(fd); zfree(c); return NULL; } } selectDb(c,0); c->id = server.next_client_id++; c->fd = fd; c->name = NULL; c->bufpos = 0; c->querybuf = sdsempty(); c->querybuf_peak = 0; c->reqtype = 0; c->argc = 0; c->argv = NULL; c->cmd = c->lastcmd = NULL; c->multibulklen = 0; c->bulklen = -1; c->sentlen = 0; c->flags = 0; c->ctime = c->lastinteraction = server.unixtime; c->authenticated = 0; c->replstate = REDIS_REPL_NONE; c->repl_put_online_on_ack = 0; c->reploff = 0; c->repl_ack_off = 0; c->repl_ack_time = 0; c->slave_listening_port = 0; c->reply = listCreate(); c->reply_bytes = 0; c->obuf_soft_limit_reached_time = 0; listSetFreeMethod(c->reply,decrRefCountVoid); listSetDupMethod(c->reply,dupClientReplyValue); c->btype = REDIS_BLOCKED_NONE; c->bpop.timeout = 0; c->bpop.keys = dictCreate(&setDictType,NULL); c->bpop.target = NULL; c->bpop.numreplicas = 0; c->bpop.reploffset = 0; c->woff = 0; c->watched_keys = listCreate(); c->pubsub_channels = dictCreate(&setDictType,NULL); c->pubsub_patterns = listCreate(); c->peerid = NULL; listSetFreeMethod(c->pubsub_patterns,decrRefCountVoid); listSetMatchMethod(c->pubsub_patterns,listMatchObjects); if (fd != -1) listAddNodeTail(server.clients,c); initClientMultiState(c); return c; } /* This function is called every time we are going to transmit new data * to the client. The behavior is the following: * * If the client should receive new data (normal clients will) the function * returns REDIS_OK, and make sure to install the write handler in our event * loop so that when the socket is writable new data gets written. * * If the client should not receive new data, because it is a fake client, * a master, a slave not yet online, or because the setup of the write handler * failed, the function returns REDIS_ERR. * * Typically gets called every time a reply is built, before adding more * data to the clients output buffers. If the function returns REDIS_ERR no * data should be appended to the output buffers. */ int prepareClientToWrite(redisClient *c) { if (c->flags & REDIS_LUA_CLIENT) return REDIS_OK; if ((c->flags & REDIS_MASTER) && !(c->flags & REDIS_MASTER_FORCE_REPLY)) return REDIS_ERR; if (c->fd <= 0) return REDIS_ERR; /* Fake client */ if (c->bufpos == 0 && listLength(c->reply) == 0 && (c->replstate == REDIS_REPL_NONE || c->replstate == REDIS_REPL_ONLINE) && aeCreateFileEvent(server.el, c->fd, AE_WRITABLE, sendReplyToClient, c) == AE_ERR) return REDIS_ERR; return REDIS_OK; } /* Create a duplicate of the last object in the reply list when * it is not exclusively owned by the reply list. */ robj *dupLastObjectIfNeeded(list *reply) { robj *new, *cur; listNode *ln; redisAssert(listLength(reply) > 0); ln = listLast(reply); cur = listNodeValue(ln); if (cur->refcount > 1) { new = dupStringObject(cur); decrRefCount(cur); listNodeValue(ln) = new; } return listNodeValue(ln); } /* ----------------------------------------------------------------------------- * Low level functions to add more data to output buffers. * -------------------------------------------------------------------------- */ int _addReplyToBuffer(redisClient *c, char *s, size_t len) { size_t available = sizeof(c->buf)-c->bufpos; if (c->flags & REDIS_CLOSE_AFTER_REPLY) return REDIS_OK; /* If there already are entries in the reply list, we cannot * add anything more to the static buffer. */ if (listLength(c->reply) > 0) return REDIS_ERR; /* Check that the buffer has enough space available for this string. */ if (len > available) return REDIS_ERR; memcpy(c->buf+c->bufpos,s,len); c->bufpos+=len; return REDIS_OK; } void _addReplyObjectToList(redisClient *c, robj *o) { robj *tail; if (c->flags & REDIS_CLOSE_AFTER_REPLY) return; if (listLength(c->reply) == 0) { incrRefCount(o); listAddNodeTail(c->reply,o); c->reply_bytes += getStringObjectSdsUsedMemory(o); } else { tail = listNodeValue(listLast(c->reply)); /* Append to this object when possible. */ if (tail->ptr != NULL && tail->encoding == REDIS_ENCODING_RAW && sdslen(tail->ptr)+sdslen(o->ptr) <= REDIS_REPLY_CHUNK_BYTES) { c->reply_bytes -= zmalloc_size_sds(tail->ptr); tail = dupLastObjectIfNeeded(c->reply); tail->ptr = sdscatlen(tail->ptr,o->ptr,sdslen(o->ptr)); c->reply_bytes += zmalloc_size_sds(tail->ptr); } else { incrRefCount(o); listAddNodeTail(c->reply,o); c->reply_bytes += getStringObjectSdsUsedMemory(o); } } asyncCloseClientOnOutputBufferLimitReached(c); } /* This method takes responsibility over the sds. When it is no longer * needed it will be free'd, otherwise it ends up in a robj. */ void _addReplySdsToList(redisClient *c, sds s) { robj *tail; if (c->flags & REDIS_CLOSE_AFTER_REPLY) { sdsfree(s); return; } if (listLength(c->reply) == 0) { listAddNodeTail(c->reply,createObject(REDIS_STRING,s)); c->reply_bytes += zmalloc_size_sds(s); } else { tail = listNodeValue(listLast(c->reply)); /* Append to this object when possible. */ if (tail->ptr != NULL && tail->encoding == REDIS_ENCODING_RAW && sdslen(tail->ptr)+sdslen(s) <= REDIS_REPLY_CHUNK_BYTES) { c->reply_bytes -= zmalloc_size_sds(tail->ptr); tail = dupLastObjectIfNeeded(c->reply); tail->ptr = sdscatlen(tail->ptr,s,sdslen(s)); c->reply_bytes += zmalloc_size_sds(tail->ptr); sdsfree(s); } else { listAddNodeTail(c->reply,createObject(REDIS_STRING,s)); c->reply_bytes += zmalloc_size_sds(s); } } asyncCloseClientOnOutputBufferLimitReached(c); } void _addReplyStringToList(redisClient *c, char *s, size_t len) { robj *tail; if (c->flags & REDIS_CLOSE_AFTER_REPLY) return; if (listLength(c->reply) == 0) { robj *o = createStringObject(s,len); listAddNodeTail(c->reply,o); c->reply_bytes += getStringObjectSdsUsedMemory(o); } else { tail = listNodeValue(listLast(c->reply)); /* Append to this object when possible. */ if (tail->ptr != NULL && tail->encoding == REDIS_ENCODING_RAW && sdslen(tail->ptr)+len <= REDIS_REPLY_CHUNK_BYTES) { c->reply_bytes -= zmalloc_size_sds(tail->ptr); tail = dupLastObjectIfNeeded(c->reply); tail->ptr = sdscatlen(tail->ptr,s,len); c->reply_bytes += zmalloc_size_sds(tail->ptr); } else { robj *o = createStringObject(s,len); listAddNodeTail(c->reply,o); c->reply_bytes += getStringObjectSdsUsedMemory(o); } } asyncCloseClientOnOutputBufferLimitReached(c); } /* ----------------------------------------------------------------------------- * Higher level functions to queue data on the client output buffer. * The following functions are the ones that commands implementations will call. * -------------------------------------------------------------------------- */ void addReply(redisClient *c, robj *obj) { if (prepareClientToWrite(c) != REDIS_OK) return; /* This is an important place where we can avoid copy-on-write * when there is a saving child running, avoiding touching the * refcount field of the object if it's not needed. * * If the encoding is RAW and there is room in the static buffer * we'll be able to send the object to the client without * messing with its page. */ if (sdsEncodedObject(obj)) { if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != REDIS_OK) _addReplyObjectToList(c,obj); } else if (obj->encoding == REDIS_ENCODING_INT) { /* Optimization: if there is room in the static buffer for 32 bytes * (more than the max chars a 64 bit integer can take as string) we * avoid decoding the object and go for the lower level approach. */ if (listLength(c->reply) == 0 && (sizeof(c->buf) - c->bufpos) >= 32) { char buf[32]; int len; len = ll2string(buf,sizeof(buf),(long)obj->ptr); if (_addReplyToBuffer(c,buf,len) == REDIS_OK) return; /* else... continue with the normal code path, but should never * happen actually since we verified there is room. */ } obj = getDecodedObject(obj); if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != REDIS_OK) _addReplyObjectToList(c,obj); decrRefCount(obj); } else { redisPanic("Wrong obj->encoding in addReply()"); } } void addReplySds(redisClient *c, sds s) { if (prepareClientToWrite(c) != REDIS_OK) { /* The caller expects the sds to be free'd. */ sdsfree(s); return; } if (_addReplyToBuffer(c,s,sdslen(s)) == REDIS_OK) { sdsfree(s); } else { /* This method free's the sds when it is no longer needed. */ _addReplySdsToList(c,s); } } void addReplyString(redisClient *c, char *s, size_t len) { if (prepareClientToWrite(c) != REDIS_OK) return; if (_addReplyToBuffer(c,s,len) != REDIS_OK) _addReplyStringToList(c,s,len); } void addReplyErrorLength(redisClient *c, char *s, size_t len) { addReplyString(c,"-ERR ",5); addReplyString(c,s,len); addReplyString(c,"\r\n",2); } void addReplyError(redisClient *c, char *err) { addReplyErrorLength(c,err,strlen(err)); } void addReplyErrorFormat(redisClient *c, const char *fmt, ...) { size_t l, j; va_list ap; va_start(ap,fmt); sds s = sdscatvprintf(sdsempty(),fmt,ap); va_end(ap); /* Make sure there are no newlines in the string, otherwise invalid protocol * is emitted. */ l = sdslen(s); for (j = 0; j < l; j++) { if (s[j] == '\r' || s[j] == '\n') s[j] = ' '; } addReplyErrorLength(c,s,sdslen(s)); sdsfree(s); } void addReplyStatusLength(redisClient *c, char *s, size_t len) { addReplyString(c,"+",1); addReplyString(c,s,len); addReplyString(c,"\r\n",2); } void addReplyStatus(redisClient *c, char *status) { addReplyStatusLength(c,status,strlen(status)); } void addReplyStatusFormat(redisClient *c, const char *fmt, ...) { va_list ap; va_start(ap,fmt); sds s = sdscatvprintf(sdsempty(),fmt,ap); va_end(ap); addReplyStatusLength(c,s,sdslen(s)); sdsfree(s); } /* Adds an empty object to the reply list that will contain the multi bulk * length, which is not known when this function is called. */ void *addDeferredMultiBulkLength(redisClient *c) { /* Note that we install the write event here even if the object is not * ready to be sent, since we are sure that before returning to the * event loop setDeferredMultiBulkLength() will be called. */ if (prepareClientToWrite(c) != REDIS_OK) return NULL; listAddNodeTail(c->reply,createObject(REDIS_STRING,NULL)); return listLast(c->reply); } /* Populate the length object and try gluing it to the next chunk. */ void setDeferredMultiBulkLength(redisClient *c, void *node, long length) { listNode *ln = (listNode*)node; robj *len, *next; /* Abort when *node is NULL (see addDeferredMultiBulkLength). */ if (node == NULL) return; len = listNodeValue(ln); len->ptr = sdscatprintf(sdsempty(),"*%ld\r\n",length); len->encoding = REDIS_ENCODING_RAW; /* in case it was an EMBSTR. */ c->reply_bytes += zmalloc_size_sds(len->ptr); if (ln->next != NULL) { next = listNodeValue(ln->next); /* Only glue when the next node is non-NULL (an sds in this case) */ if (next->ptr != NULL) { c->reply_bytes -= zmalloc_size_sds(len->ptr); c->reply_bytes -= getStringObjectSdsUsedMemory(next); len->ptr = sdscatlen(len->ptr,next->ptr,sdslen(next->ptr)); c->reply_bytes += zmalloc_size_sds(len->ptr); listDelNode(c->reply,ln->next); } } asyncCloseClientOnOutputBufferLimitReached(c); } /* Add a double as a bulk reply */ void addReplyDouble(redisClient *c, double d) { char dbuf[128], sbuf[128]; int dlen, slen; if (isinf(d)) { /* Libc in odd systems (Hi Solaris!) will format infinite in a * different way, so better to handle it in an explicit way. */ addReplyBulkCString(c, d > 0 ? "inf" : "-inf"); } else { dlen = snprintf(dbuf,sizeof(dbuf),"%.17g",d); slen = snprintf(sbuf,sizeof(sbuf),"$%d\r\n%s\r\n",dlen,dbuf); addReplyString(c,sbuf,slen); } } /* Add a long long as integer reply or bulk len / multi bulk count. * Basically this is used to output . */ void addReplyLongLongWithPrefix(redisClient *c, long long ll, char prefix) { char buf[128]; int len; /* Things like $3\r\n or *2\r\n are emitted very often by the protocol * so we have a few shared objects to use if the integer is small * like it is most of the times. */ if (prefix == '*' && ll < REDIS_SHARED_BULKHDR_LEN) { addReply(c,shared.mbulkhdr[ll]); return; } else if (prefix == '$' && ll < REDIS_SHARED_BULKHDR_LEN) { addReply(c,shared.bulkhdr[ll]); return; } buf[0] = prefix; len = ll2string(buf+1,sizeof(buf)-1,ll); buf[len+1] = '\r'; buf[len+2] = '\n'; addReplyString(c,buf,len+3); } void addReplyLongLong(redisClient *c, long long ll) { if (ll == 0) addReply(c,shared.czero); else if (ll == 1) addReply(c,shared.cone); else addReplyLongLongWithPrefix(c,ll,':'); } void addReplyMultiBulkLen(redisClient *c, long length) { if (length < REDIS_SHARED_BULKHDR_LEN) addReply(c,shared.mbulkhdr[length]); else addReplyLongLongWithPrefix(c,length,'*'); } /* Create the length prefix of a bulk reply, example: $2234 */ void addReplyBulkLen(redisClient *c, robj *obj) { size_t len; if (sdsEncodedObject(obj)) { len = sdslen(obj->ptr); } else { long n = (long)obj->ptr; /* Compute how many bytes will take this integer as a radix 10 string */ len = 1; if (n < 0) { len++; n = -n; } while((n = n/10) != 0) { len++; } } if (len < REDIS_SHARED_BULKHDR_LEN) addReply(c,shared.bulkhdr[len]); else addReplyLongLongWithPrefix(c,len,'$'); } /* Add a Redis Object as a bulk reply */ void addReplyBulk(redisClient *c, robj *obj) { addReplyBulkLen(c,obj); addReply(c,obj); addReply(c,shared.crlf); } /* Add a C buffer as bulk reply */ void addReplyBulkCBuffer(redisClient *c, void *p, size_t len) { addReplyLongLongWithPrefix(c,len,'$'); addReplyString(c,p,len); addReply(c,shared.crlf); } /* Add a C nul term string as bulk reply */ void addReplyBulkCString(redisClient *c, char *s) { if (s == NULL) { addReply(c,shared.nullbulk); } else { addReplyBulkCBuffer(c,s,strlen(s)); } } /* Add a long long as a bulk reply */ void addReplyBulkLongLong(redisClient *c, long long ll) { char buf[64]; int len; len = ll2string(buf,64,ll); addReplyBulkCBuffer(c,buf,len); } /* Copy 'src' client output buffers into 'dst' client output buffers. * The function takes care of freeing the old output buffers of the * destination client. */ void copyClientOutputBuffer(redisClient *dst, redisClient *src) { listRelease(dst->reply); dst->reply = listDup(src->reply); memcpy(dst->buf,src->buf,src->bufpos); dst->bufpos = src->bufpos; dst->reply_bytes = src->reply_bytes; } #define MAX_ACCEPTS_PER_CALL 1000 static void acceptCommonHandler(int fd, int flags) { redisClient *c; if ((c = createClient(fd)) == NULL) { redisLog(REDIS_WARNING, "Error registering fd event for the new client: %s (fd=%d)", strerror(errno),fd); close(fd); /* May be already closed, just ignore errors */ return; } /* If maxclient directive is set and this is one client more... close the * connection. Note that we create the client instead to check before * for this condition, since now the socket is already set in non-blocking * mode and we can send an error for free using the Kernel I/O */ if (listLength(server.clients) > server.maxclients) { char *err = "-ERR max number of clients reached\r\n"; /* That's a best effort error message, don't check write errors */ if (write(c->fd,err,strlen(err)) == -1) { /* Nothing to do, Just to avoid the warning... */ } server.stat_rejected_conn++; freeClient(c); return; } server.stat_numconnections++; c->flags |= flags; } void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) { int cport, cfd, max = MAX_ACCEPTS_PER_CALL; char cip[REDIS_IP_STR_LEN]; REDIS_NOTUSED(el); REDIS_NOTUSED(mask); REDIS_NOTUSED(privdata); while(max--) { cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport); if (cfd == ANET_ERR) { if (errno != EWOULDBLOCK) redisLog(REDIS_WARNING, "Accepting client connection: %s", server.neterr); return; } redisLog(REDIS_VERBOSE,"Accepted %s:%d", cip, cport); acceptCommonHandler(cfd,0); } } void acceptUnixHandler(aeEventLoop *el, int fd, void *privdata, int mask) { int cfd, max = MAX_ACCEPTS_PER_CALL; REDIS_NOTUSED(el); REDIS_NOTUSED(mask); REDIS_NOTUSED(privdata); while(max--) { cfd = anetUnixAccept(server.neterr, fd); if (cfd == ANET_ERR) { if (errno != EWOULDBLOCK) redisLog(REDIS_WARNING, "Accepting client connection: %s", server.neterr); return; } redisLog(REDIS_VERBOSE,"Accepted connection to %s", server.unixsocket); acceptCommonHandler(cfd,REDIS_UNIX_SOCKET); } } static void freeClientArgv(redisClient *c) { int j; for (j = 0; j < c->argc; j++) decrRefCount(c->argv[j]); c->argc = 0; c->cmd = NULL; } /* Close all the slaves connections. This is useful in chained replication * when we resync with our own master and want to force all our slaves to * resync with us as well. */ void disconnectSlaves(void) { while (listLength(server.slaves)) { listNode *ln = listFirst(server.slaves); freeClient((redisClient*)ln->value); } } /* This function is called when the slave lose the connection with the * master into an unexpected way. */ void replicationHandleMasterDisconnection(void) { server.master = NULL; server.repl_state = REDIS_REPL_CONNECT; server.repl_down_since = server.unixtime; /* We lost connection with our master, force our slaves to resync * with us as well to load the new data set. * * If server.masterhost is NULL the user called SLAVEOF NO ONE so * slave resync is not needed. */ if (server.masterhost != NULL) disconnectSlaves(); } void freeClient(redisClient *c) { listNode *ln; /* If this is marked as current client unset it */ if (server.current_client == c) server.current_client = NULL; /* If it is our master that's beging disconnected we should make sure * to cache the state to try a partial resynchronization later. * * Note that before doing this we make sure that the client is not in * some unexpected state, by checking its flags. */ if (server.master && c->flags & REDIS_MASTER) { redisLog(REDIS_WARNING,"Connection with master lost."); if (!(c->flags & (REDIS_CLOSE_AFTER_REPLY| REDIS_CLOSE_ASAP| REDIS_BLOCKED| REDIS_UNBLOCKED))) { replicationCacheMaster(c); return; } } /* Log link disconnection with slave */ if ((c->flags & REDIS_SLAVE) && !(c->flags & REDIS_MONITOR)) { redisLog(REDIS_WARNING,"Connection with slave %s lost.", replicationGetSlaveName(c)); } /* Free the query buffer */ sdsfree(c->querybuf); c->querybuf = NULL; /* Deallocate structures used to block on blocking ops. */ if (c->flags & REDIS_BLOCKED) unblockClient(c); dictRelease(c->bpop.keys); /* UNWATCH all the keys */ unwatchAllKeys(c); listRelease(c->watched_keys); /* Unsubscribe from all the pubsub channels */ pubsubUnsubscribeAllChannels(c,0); pubsubUnsubscribeAllPatterns(c,0); dictRelease(c->pubsub_channels); listRelease(c->pubsub_patterns); /* Close socket, unregister events, and remove list of replies and * accumulated arguments. */ if (c->fd != -1) { aeDeleteFileEvent(server.el,c->fd,AE_READABLE); aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE); close(c->fd); } listRelease(c->reply); freeClientArgv(c); /* Remove from the list of clients */ if (c->fd != -1) { ln = listSearchKey(server.clients,c); redisAssert(ln != NULL); listDelNode(server.clients,ln); } /* When client was just unblocked because of a blocking operation, * remove it from the list of unblocked clients. */ if (c->flags & REDIS_UNBLOCKED) { ln = listSearchKey(server.unblocked_clients,c); redisAssert(ln != NULL); listDelNode(server.unblocked_clients,ln); } /* Master/slave cleanup Case 1: * we lost the connection with a slave. */ if (c->flags & REDIS_SLAVE) { if (c->replstate == REDIS_REPL_SEND_BULK) { if (c->repldbfd != -1) close(c->repldbfd); if (c->replpreamble) sdsfree(c->replpreamble); } list *l = (c->flags & REDIS_MONITOR) ? server.monitors : server.slaves; ln = listSearchKey(l,c); redisAssert(ln != NULL); listDelNode(l,ln); /* We need to remember the time when we started to have zero * attached slaves, as after some time we'll free the replication * backlog. */ if (c->flags & REDIS_SLAVE && listLength(server.slaves) == 0) server.repl_no_slaves_since = server.unixtime; refreshGoodSlavesCount(); } /* Master/slave cleanup Case 2: * we lost the connection with the master. */ if (c->flags & REDIS_MASTER) replicationHandleMasterDisconnection(); /* If this client was scheduled for async freeing we need to remove it * from the queue. */ if (c->flags & REDIS_CLOSE_ASAP) { ln = listSearchKey(server.clients_to_close,c); redisAssert(ln != NULL); listDelNode(server.clients_to_close,ln); } /* Release other dynamically allocated client structure fields, * and finally release the client structure itself. */ if (c->name) decrRefCount(c->name); zfree(c->argv); freeClientMultiState(c); sdsfree(c->peerid); zfree(c); } /* Schedule a client to free it at a safe time in the serverCron() function. * This function is useful when we need to terminate a client but we are in * a context where calling freeClient() is not possible, because the client * should be valid for the continuation of the flow of the program. */ void freeClientAsync(redisClient *c) { if (c->flags & REDIS_CLOSE_ASAP) return; c->flags |= REDIS_CLOSE_ASAP; listAddNodeTail(server.clients_to_close,c); } void freeClientsInAsyncFreeQueue(void) { while (listLength(server.clients_to_close)) { listNode *ln = listFirst(server.clients_to_close); redisClient *c = listNodeValue(ln); c->flags &= ~REDIS_CLOSE_ASAP; freeClient(c); listDelNode(server.clients_to_close,ln); } } void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) { redisClient *c = privdata; int nwritten = 0, totwritten = 0, objlen; size_t objmem; robj *o; REDIS_NOTUSED(el); REDIS_NOTUSED(mask); while(c->bufpos > 0 || listLength(c->reply)) { if (c->bufpos > 0) { nwritten = write(fd,c->buf+c->sentlen,c->bufpos-c->sentlen); if (nwritten <= 0) break; c->sentlen += nwritten; totwritten += nwritten; /* If the buffer was sent, set bufpos to zero to continue with * the remainder of the reply. */ if (c->sentlen == c->bufpos) { c->bufpos = 0; c->sentlen = 0; } } else { o = listNodeValue(listFirst(c->reply)); objlen = sdslen(o->ptr); objmem = getStringObjectSdsUsedMemory(o); if (objlen == 0) { listDelNode(c->reply,listFirst(c->reply)); c->reply_bytes -= objmem; continue; } nwritten = write(fd, ((char*)o->ptr)+c->sentlen,objlen-c->sentlen); if (nwritten <= 0) break; c->sentlen += nwritten; totwritten += nwritten; /* If we fully sent the object on head go to the next one */ if (c->sentlen == objlen) { listDelNode(c->reply,listFirst(c->reply)); c->sentlen = 0; c->reply_bytes -= objmem; } } /* Note that we avoid to send more than REDIS_MAX_WRITE_PER_EVENT * bytes, in a single threaded server it's a good idea to serve * other clients as well, even if a very large request comes from * super fast link that is always able to accept data (in real world * scenario think about 'KEYS *' against the loopback interface). * * However if we are over the maxmemory limit we ignore that and * just deliver as much data as it is possible to deliver. */ if (totwritten > REDIS_MAX_WRITE_PER_EVENT && (server.maxmemory == 0 || zmalloc_used_memory() < server.maxmemory)) break; } if (nwritten == -1) { if (errno == EAGAIN) { nwritten = 0; } else { redisLog(REDIS_VERBOSE, "Error writing to client: %s", strerror(errno)); freeClient(c); return; } } if (totwritten > 0) { /* For clients representing masters we don't count sending data * as an interaction, since we always send REPLCONF ACK commands * that take some time to just fill the socket output buffer. * We just rely on data / pings received for timeout detection. */ if (!(c->flags & REDIS_MASTER)) c->lastinteraction = server.unixtime; } if (c->bufpos == 0 && listLength(c->reply) == 0) { c->sentlen = 0; aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE); /* Close connection after entire reply has been sent. */ if (c->flags & REDIS_CLOSE_AFTER_REPLY) freeClient(c); } } /* resetClient prepare the client to process the next command */ void resetClient(redisClient *c) { redisCommandProc *prevcmd = c->cmd ? c->cmd->proc : NULL; freeClientArgv(c); c->reqtype = 0; c->multibulklen = 0; c->bulklen = -1; /* We clear the ASKING flag as well if we are not inside a MULTI, and * if what we just executed is not the ASKING command itself. */ if (!(c->flags & REDIS_MULTI) && prevcmd != askingCommand) c->flags &= (~REDIS_ASKING); } int processInlineBuffer(redisClient *c) { char *newline; int argc, j; sds *argv, aux; size_t querylen; /* Search for end of line */ newline = strchr(c->querybuf,'\n'); /* Nothing to do without a \r\n */ if (newline == NULL) { if (sdslen(c->querybuf) > REDIS_INLINE_MAX_SIZE) { addReplyError(c,"Protocol error: too big inline request"); setProtocolError(c,0); } return REDIS_ERR; } /* Handle the \r\n case. */ if (newline && newline != c->querybuf && *(newline-1) == '\r') newline--; /* Split the input buffer up to the \r\n */ querylen = newline-(c->querybuf); aux = sdsnewlen(c->querybuf,querylen); argv = sdssplitargs(aux,&argc); sdsfree(aux); if (argv == NULL) { addReplyError(c,"Protocol error: unbalanced quotes in request"); setProtocolError(c,0); return REDIS_ERR; } /* Newline from slaves can be used to refresh the last ACK time. * This is useful for a slave to ping back while loading a big * RDB file. */ if (querylen == 0 && c->flags & REDIS_SLAVE) c->repl_ack_time = server.unixtime; /* Leave data after the first line of the query in the buffer */ sdsrange(c->querybuf,querylen+2,-1); /* Setup argv array on client structure */ if (c->argv) zfree(c->argv); c->argv = zmalloc(sizeof(robj*)*argc); /* Create redis objects for all arguments. */ for (c->argc = 0, j = 0; j < argc; j++) { if (sdslen(argv[j])) { c->argv[c->argc] = createObject(REDIS_STRING,argv[j]); c->argc++; } else { sdsfree(argv[j]); } } zfree(argv); return REDIS_OK; } /* Helper function. Trims query buffer to make the function that processes * multi bulk requests idempotent. */ static void setProtocolError(redisClient *c, int pos) { if (server.verbosity >= REDIS_VERBOSE) { sds client = catClientInfoString(sdsempty(),c); redisLog(REDIS_VERBOSE, "Protocol error from client: %s", client); sdsfree(client); } c->flags |= REDIS_CLOSE_AFTER_REPLY; sdsrange(c->querybuf,pos,-1); } int processMultibulkBuffer(redisClient *c) { char *newline = NULL; int pos = 0, ok; long long ll; if (c->multibulklen == 0) { /* The client should have been reset */ redisAssertWithInfo(c,NULL,c->argc == 0); /* Multi bulk length cannot be read without a \r\n */ newline = strchr(c->querybuf,'\r'); if (newline == NULL) { if (sdslen(c->querybuf) > REDIS_INLINE_MAX_SIZE) { addReplyError(c,"Protocol error: too big mbulk count string"); setProtocolError(c,0); } return REDIS_ERR; } /* Buffer should also contain \n */ if (newline-(c->querybuf) > ((signed)sdslen(c->querybuf)-2)) return REDIS_ERR; /* We know for sure there is a whole line since newline != NULL, * so go ahead and find out the multi bulk length. */ redisAssertWithInfo(c,NULL,c->querybuf[0] == '*'); ok = string2ll(c->querybuf+1,newline-(c->querybuf+1),&ll); if (!ok || ll > 1024*1024) { addReplyError(c,"Protocol error: invalid multibulk length"); setProtocolError(c,pos); return REDIS_ERR; } pos = (newline-c->querybuf)+2; if (ll <= 0) { sdsrange(c->querybuf,pos,-1); return REDIS_OK; } c->multibulklen = ll; /* Setup argv array on client structure */ if (c->argv) zfree(c->argv); c->argv = zmalloc(sizeof(robj*)*c->multibulklen); } redisAssertWithInfo(c,NULL,c->multibulklen > 0); while(c->multibulklen) { /* Read bulk length if unknown */ if (c->bulklen == -1) { newline = strchr(c->querybuf+pos,'\r'); if (newline == NULL) { if (sdslen(c->querybuf) > REDIS_INLINE_MAX_SIZE) { addReplyError(c, "Protocol error: too big bulk count string"); setProtocolError(c,0); return REDIS_ERR; } break; } /* Buffer should also contain \n */ if (newline-(c->querybuf) > ((signed)sdslen(c->querybuf)-2)) break; if (c->querybuf[pos] != '$') { addReplyErrorFormat(c, "Protocol error: expected '$', got '%c'", c->querybuf[pos]); setProtocolError(c,pos); return REDIS_ERR; } ok = string2ll(c->querybuf+pos+1,newline-(c->querybuf+pos+1),&ll); if (!ok || ll < 0 || ll > 512*1024*1024) { addReplyError(c,"Protocol error: invalid bulk length"); setProtocolError(c,pos); return REDIS_ERR; } pos += newline-(c->querybuf+pos)+2; if (ll >= REDIS_MBULK_BIG_ARG) { size_t qblen; /* If we are going to read a large object from network * try to make it likely that it will start at c->querybuf * boundary so that we can optimize object creation * avoiding a large copy of data. */ sdsrange(c->querybuf,pos,-1); pos = 0; qblen = sdslen(c->querybuf); /* Hint the sds library about the amount of bytes this string is * going to contain. */ if (qblen < (size_t)ll+2) c->querybuf = sdsMakeRoomFor(c->querybuf,ll+2-qblen); } c->bulklen = ll; } /* Read bulk argument */ if (sdslen(c->querybuf)-pos < (unsigned)(c->bulklen+2)) { /* Not enough data (+2 == trailing \r\n) */ break; } else { /* Optimization: if the buffer contains JUST our bulk element * instead of creating a new object by *copying* the sds we * just use the current sds string. */ if (pos == 0 && c->bulklen >= REDIS_MBULK_BIG_ARG && (signed) sdslen(c->querybuf) == c->bulklen+2) { c->argv[c->argc++] = createObject(REDIS_STRING,c->querybuf); sdsIncrLen(c->querybuf,-2); /* remove CRLF */ c->querybuf = sdsempty(); /* Assume that if we saw a fat argument we'll see another one * likely... */ c->querybuf = sdsMakeRoomFor(c->querybuf,c->bulklen+2); pos = 0; } else { c->argv[c->argc++] = createStringObject(c->querybuf+pos,c->bulklen); pos += c->bulklen+2; } c->bulklen = -1; c->multibulklen--; } } /* Trim to pos */ if (pos) sdsrange(c->querybuf,pos,-1); /* We're done when c->multibulk == 0 */ if (c->multibulklen == 0) return REDIS_OK; /* Still not read to process the command */ return REDIS_ERR; } void processInputBuffer(redisClient *c) { /* Keep processing while there is something in the input buffer */ while(sdslen(c->querybuf)) { /* Return if clients are paused. */ if (!(c->flags & REDIS_SLAVE) && clientsArePaused()) return; /* Immediately abort if the client is in the middle of something. */ if (c->flags & REDIS_BLOCKED) return; /* REDIS_CLOSE_AFTER_REPLY closes the connection once the reply is * written to the client. Make sure to not let the reply grow after * this flag has been set (i.e. don't process more commands). */ if (c->flags & REDIS_CLOSE_AFTER_REPLY) return; /* Determine request type when unknown. */ if (!c->reqtype) { if (c->querybuf[0] == '*') { c->reqtype = REDIS_REQ_MULTIBULK; } else { c->reqtype = REDIS_REQ_INLINE; } } if (c->reqtype == REDIS_REQ_INLINE) { if (processInlineBuffer(c) != REDIS_OK) break; } else if (c->reqtype == REDIS_REQ_MULTIBULK) { if (processMultibulkBuffer(c) != REDIS_OK) break; } else { redisPanic("Unknown request type"); } /* Multibulk processing could see a <= 0 length. */ if (c->argc == 0) { resetClient(c); } else { /* Only reset the client when the command was executed. */ if (processCommand(c) == REDIS_OK) resetClient(c); } } } void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) { redisClient *c = (redisClient*) privdata; int nread, readlen; size_t qblen; REDIS_NOTUSED(el); REDIS_NOTUSED(mask); server.current_client = c; readlen = REDIS_IOBUF_LEN; /* If this is a multi bulk request, and we are processing a bulk reply * that is large enough, try to maximize the probability that the query * buffer contains exactly the SDS string representing the object, even * at the risk of requiring more read(2) calls. This way the function * processMultiBulkBuffer() can avoid copying buffers to create the * Redis Object representing the argument. */ if (c->reqtype == REDIS_REQ_MULTIBULK && c->multibulklen && c->bulklen != -1 && c->bulklen >= REDIS_MBULK_BIG_ARG) { int remaining = (unsigned)(c->bulklen+2)-sdslen(c->querybuf); if (remaining < readlen) readlen = remaining; } qblen = sdslen(c->querybuf); if (c->querybuf_peak < qblen) c->querybuf_peak = qblen; c->querybuf = sdsMakeRoomFor(c->querybuf, readlen); nread = read(fd, c->querybuf+qblen, readlen); if (nread == -1) { if (errno == EAGAIN) { nread = 0; } else { redisLog(REDIS_VERBOSE, "Reading from client: %s",strerror(errno)); freeClient(c); return; } } else if (nread == 0) { redisLog(REDIS_VERBOSE, "Client closed connection"); freeClient(c); return; } if (nread) { sdsIncrLen(c->querybuf,nread); c->lastinteraction = server.unixtime; if (c->flags & REDIS_MASTER) c->reploff += nread; } else { server.current_client = NULL; return; } if (sdslen(c->querybuf) > server.client_max_querybuf_len) { sds ci = catClientInfoString(sdsempty(),c), bytes = sdsempty(); bytes = sdscatrepr(bytes,c->querybuf,64); redisLog(REDIS_WARNING,"Closing client that reached max query buffer length: %s (qbuf initial bytes: %s)", ci, bytes); sdsfree(ci); sdsfree(bytes); freeClient(c); return; } processInputBuffer(c); server.current_client = NULL; } void getClientsMaxBuffers(unsigned long *longest_output_list, unsigned long *biggest_input_buffer) { redisClient *c; listNode *ln; listIter li; unsigned long lol = 0, bib = 0; listRewind(server.clients,&li); while ((ln = listNext(&li)) != NULL) { c = listNodeValue(ln); if (listLength(c->reply) > lol) lol = listLength(c->reply); if (sdslen(c->querybuf) > bib) bib = sdslen(c->querybuf); } *longest_output_list = lol; *biggest_input_buffer = bib; } /* This is a helper function for genClientPeerId(). * It writes the specified ip/port to "peerid" as a null termiated string * in the form ip:port if ip does not contain ":" itself, otherwise * [ip]:port format is used (for IPv6 addresses basically). */ void formatPeerId(char *peerid, size_t peerid_len, char *ip, int port) { if (strchr(ip,':')) snprintf(peerid,peerid_len,"[%s]:%d",ip,port); else snprintf(peerid,peerid_len,"%s:%d",ip,port); } /* A Redis "Peer ID" is a colon separated ip:port pair. * For IPv4 it's in the form x.y.z.k:port, example: "127.0.0.1:1234". * For IPv6 addresses we use [] around the IP part, like in "[::1]:1234". * For Unix sockets we use path:0, like in "/tmp/redis:0". * * A Peer ID always fits inside a buffer of REDIS_PEER_ID_LEN bytes, including * the null term. * * The function returns REDIS_OK on succcess, and REDIS_ERR on failure. * * On failure the function still populates 'peerid' with the "?:0" string * in case you want to relax error checking or need to display something * anyway (see anetPeerToString implementation for more info). */ int genClientPeerId(redisClient *client, char *peerid, size_t peerid_len) { char ip[REDIS_IP_STR_LEN]; int port; if (client->flags & REDIS_UNIX_SOCKET) { /* Unix socket client. */ snprintf(peerid,peerid_len,"%s:0",server.unixsocket); return REDIS_OK; } else { /* TCP client. */ int retval = anetPeerToString(client->fd,ip,sizeof(ip),&port); formatPeerId(peerid,peerid_len,ip,port); return (retval == -1) ? REDIS_ERR : REDIS_OK; } } /* This function returns the client peer id, by creating and caching it * if client->peerid is NULL, otherwise returning the cached value. * The Peer ID never changes during the life of the client, however it * is expensive to compute. */ char *getClientPeerId(redisClient *c) { char peerid[REDIS_PEER_ID_LEN]; if (c->peerid == NULL) { genClientPeerId(c,peerid,sizeof(peerid)); c->peerid = sdsnew(peerid); } return c->peerid; } /* Concatenate a string representing the state of a client in an human * readable format, into the sds string 's'. */ sds catClientInfoString(sds s, redisClient *client) { char flags[16], events[3], *p; int emask; p = flags; if (client->flags & REDIS_SLAVE) { if (client->flags & REDIS_MONITOR) *p++ = 'O'; else *p++ = 'S'; } if (client->flags & REDIS_MASTER) *p++ = 'M'; if (client->flags & REDIS_MULTI) *p++ = 'x'; if (client->flags & REDIS_BLOCKED) *p++ = 'b'; if (client->flags & REDIS_DIRTY_CAS) *p++ = 'd'; if (client->flags & REDIS_CLOSE_AFTER_REPLY) *p++ = 'c'; if (client->flags & REDIS_UNBLOCKED) *p++ = 'u'; if (client->flags & REDIS_CLOSE_ASAP) *p++ = 'A'; if (client->flags & REDIS_UNIX_SOCKET) *p++ = 'U'; if (client->flags & REDIS_READONLY) *p++ = 'r'; if (p == flags) *p++ = 'N'; *p++ = '\0'; emask = client->fd == -1 ? 0 : aeGetFileEvents(server.el,client->fd); p = events; if (emask & AE_READABLE) *p++ = 'r'; if (emask & AE_WRITABLE) *p++ = 'w'; *p = '\0'; return sdscatfmt(s, "id=%U addr=%s fd=%i name=%s age=%I idle=%I flags=%s db=%i sub=%i psub=%i multi=%i qbuf=%U qbuf-free=%U obl=%U oll=%U omem=%U events=%s cmd=%s", (unsigned long long) client->id, getClientPeerId(client), client->fd, client->name ? (char*)client->name->ptr : "", (long long)(server.unixtime - client->ctime), (long long)(server.unixtime - client->lastinteraction), flags, client->db->id, (int) dictSize(client->pubsub_channels), (int) listLength(client->pubsub_patterns), (client->flags & REDIS_MULTI) ? client->mstate.count : -1, (unsigned long long) sdslen(client->querybuf), (unsigned long long) sdsavail(client->querybuf), (unsigned long long) client->bufpos, (unsigned long long) listLength(client->reply), (unsigned long long) getClientOutputBufferMemoryUsage(client), events, client->lastcmd ? client->lastcmd->name : "NULL"); } sds getAllClientsInfoString(void) { listNode *ln; listIter li; redisClient *client; sds o = sdsempty(); o = sdsMakeRoomFor(o,200*listLength(server.clients)); listRewind(server.clients,&li); while ((ln = listNext(&li)) != NULL) { client = listNodeValue(ln); o = catClientInfoString(o,client); o = sdscatlen(o,"\n",1); } return o; } void clientCommand(redisClient *c) { listNode *ln; listIter li; redisClient *client; if (!strcasecmp(c->argv[1]->ptr,"list") && c->argc == 2) { /* CLIENT LIST */ sds o = getAllClientsInfoString(); addReplyBulkCBuffer(c,o,sdslen(o)); sdsfree(o); } else if (!strcasecmp(c->argv[1]->ptr,"kill")) { /* CLIENT KILL * CLIENT KILL