Lazyfree: client output buffers no longer use Redis Objects.

This commit is contained in:
antirez 2015-07-31 14:59:54 +02:00
parent 0c05436cef
commit 4ff3c17a20
2 changed files with 69 additions and 94 deletions

View File

@ -52,9 +52,13 @@ size_t getStringObjectSdsUsedMemory(robj *o) {
} }
} }
/* Client.reply list dup and free methods. */
void *dupClientReplyValue(void *o) { void *dupClientReplyValue(void *o) {
incrRefCount((robj*)o); return sdsdup(o);
return o; }
void freeClientReplyValue(void *o) {
sdsfree(o);
} }
int listMatchObjects(void *a, void *b) { int listMatchObjects(void *a, void *b) {
@ -109,7 +113,7 @@ client *createClient(int fd) {
c->reply = listCreate(); c->reply = listCreate();
c->reply_bytes = 0; c->reply_bytes = 0;
c->obuf_soft_limit_reached_time = 0; c->obuf_soft_limit_reached_time = 0;
listSetFreeMethod(c->reply,decrRefCountVoid); listSetFreeMethod(c->reply,freeClientReplyValue);
listSetDupMethod(c->reply,dupClientReplyValue); listSetDupMethod(c->reply,dupClientReplyValue);
c->btype = BLOCKED_NONE; c->btype = BLOCKED_NONE;
c->bpop.timeout = 0; c->bpop.timeout = 0;
@ -144,7 +148,7 @@ client *createClient(int fd) {
* event handler in the following cases: * event handler in the following cases:
* *
* 1) The event handler should already be installed since the output buffer * 1) The event handler should already be installed since the output buffer
* already contained something. * already contains something.
* 2) The client is a slave but not yet online, so we want to just accumulate * 2) The client is a slave but not yet online, so we want to just accumulate
* writes in the buffer but not actually sending them yet. * writes in the buffer but not actually sending them yet.
* *
@ -186,22 +190,6 @@ int prepareClientToWrite(client *c) {
return C_OK; return C_OK;
} }
/* Create a duplicate of the last object in the reply list when
* it is not exclusively owned by the reply list. */
robj *dupLastObjectIfNeeded(list *reply) {
robj *new, *cur;
listNode *ln;
serverAssert(listLength(reply) > 0);
ln = listLast(reply);
cur = listNodeValue(ln);
if (cur->refcount > 1) {
new = dupStringObject(cur);
decrRefCount(cur);
listNodeValue(ln) = new;
}
return listNodeValue(ln);
}
/* ----------------------------------------------------------------------------- /* -----------------------------------------------------------------------------
* Low level functions to add more data to output buffers. * Low level functions to add more data to output buffers.
* -------------------------------------------------------------------------- */ * -------------------------------------------------------------------------- */
@ -224,30 +212,26 @@ int _addReplyToBuffer(client *c, const char *s, size_t len) {
} }
void _addReplyObjectToList(client *c, robj *o) { void _addReplyObjectToList(client *c, robj *o) {
robj *tail;
if (c->flags & CLIENT_CLOSE_AFTER_REPLY) return; if (c->flags & CLIENT_CLOSE_AFTER_REPLY) return;
if (listLength(c->reply) == 0) { if (listLength(c->reply) == 0) {
incrRefCount(o); sds s = sdsdup(o->ptr);
listAddNodeTail(c->reply,o); listAddNodeTail(c->reply,s);
c->reply_bytes += getStringObjectSdsUsedMemory(o); c->reply_bytes += sdslen(s);
} else { } else {
tail = listNodeValue(listLast(c->reply)); listNode *ln = listLast(c->reply);
sds tail = listNodeValue(ln);
/* Append to this object when possible. */ /* Append to this object when possible. If tail == NULL it was
if (tail->ptr != NULL && * set via addDeferredMultiBulkLength(). */
tail->encoding == OBJ_ENCODING_RAW && if (tail && sdslen(tail)+sdslen(o->ptr) <= PROTO_REPLY_CHUNK_BYTES) {
sdslen(tail->ptr)+sdslen(o->ptr) <= PROTO_REPLY_CHUNK_BYTES) tail = sdscatsds(tail,o->ptr);
{ listNodeValue(ln) = tail;
c->reply_bytes -= sdsZmallocSize(tail->ptr); c->reply_bytes += sdslen(o->ptr);
tail = dupLastObjectIfNeeded(c->reply);
tail->ptr = sdscatlen(tail->ptr,o->ptr,sdslen(o->ptr));
c->reply_bytes += sdsZmallocSize(tail->ptr);
} else { } else {
incrRefCount(o); sds s = sdsdup(o->ptr);
listAddNodeTail(c->reply,o); listAddNodeTail(c->reply,s);
c->reply_bytes += getStringObjectSdsUsedMemory(o); c->reply_bytes += sdslen(s);
} }
} }
asyncCloseClientOnOutputBufferLimitReached(c); asyncCloseClientOnOutputBufferLimitReached(c);
@ -256,62 +240,54 @@ void _addReplyObjectToList(client *c, robj *o) {
/* This method takes responsibility over the sds. When it is no longer /* This method takes responsibility over the sds. When it is no longer
* needed it will be free'd, otherwise it ends up in a robj. */ * needed it will be free'd, otherwise it ends up in a robj. */
void _addReplySdsToList(client *c, sds s) { void _addReplySdsToList(client *c, sds s) {
robj *tail;
if (c->flags & CLIENT_CLOSE_AFTER_REPLY) { if (c->flags & CLIENT_CLOSE_AFTER_REPLY) {
sdsfree(s); sdsfree(s);
return; return;
} }
if (listLength(c->reply) == 0) { if (listLength(c->reply) == 0) {
listAddNodeTail(c->reply,createObject(OBJ_STRING,s)); listAddNodeTail(c->reply,s);
c->reply_bytes += sdsZmallocSize(s); c->reply_bytes += sdslen(s);
} else { } else {
tail = listNodeValue(listLast(c->reply)); listNode *ln = listLast(c->reply);
sds tail = listNodeValue(ln);
/* Append to this object when possible. */ /* Append to this object when possible. If tail == NULL it was
if (tail->ptr != NULL && tail->encoding == OBJ_ENCODING_RAW && * set via addDeferredMultiBulkLength(). */
sdslen(tail->ptr)+sdslen(s) <= PROTO_REPLY_CHUNK_BYTES) if (tail && sdslen(tail)+sdslen(s) <= PROTO_REPLY_CHUNK_BYTES) {
{ tail = sdscatsds(tail,s);
c->reply_bytes -= sdsZmallocSize(tail->ptr); listNodeValue(ln) = tail;
tail = dupLastObjectIfNeeded(c->reply); c->reply_bytes += sdslen(s);
tail->ptr = sdscatlen(tail->ptr,s,sdslen(s));
c->reply_bytes += sdsZmallocSize(tail->ptr);
sdsfree(s); sdsfree(s);
} else { } else {
listAddNodeTail(c->reply,createObject(OBJ_STRING,s)); listAddNodeTail(c->reply,s);
c->reply_bytes += sdsZmallocSize(s); c->reply_bytes += sdslen(s);
} }
} }
asyncCloseClientOnOutputBufferLimitReached(c); asyncCloseClientOnOutputBufferLimitReached(c);
} }
void _addReplyStringToList(client *c, const char *s, size_t len) { void _addReplyStringToList(client *c, const char *s, size_t len) {
robj *tail;
if (c->flags & CLIENT_CLOSE_AFTER_REPLY) return; if (c->flags & CLIENT_CLOSE_AFTER_REPLY) return;
if (listLength(c->reply) == 0) { if (listLength(c->reply) == 0) {
robj *o = createStringObject(s,len); sds node = sdsnewlen(s,len);
listAddNodeTail(c->reply,node);
listAddNodeTail(c->reply,o); c->reply_bytes += len;
c->reply_bytes += getStringObjectSdsUsedMemory(o);
} else { } else {
tail = listNodeValue(listLast(c->reply)); listNode *ln = listLast(c->reply);
sds tail = listNodeValue(ln);
/* Append to this object when possible. */ /* Append to this object when possible. If tail == NULL it was
if (tail->ptr != NULL && tail->encoding == OBJ_ENCODING_RAW && * set via addDeferredMultiBulkLength(). */
sdslen(tail->ptr)+len <= PROTO_REPLY_CHUNK_BYTES) if (tail && sdslen(tail)+len <= PROTO_REPLY_CHUNK_BYTES) {
{ tail = sdscatlen(tail,s,len);
c->reply_bytes -= sdsZmallocSize(tail->ptr); listNodeValue(ln) = tail;
tail = dupLastObjectIfNeeded(c->reply); c->reply_bytes += len;
tail->ptr = sdscatlen(tail->ptr,s,len);
c->reply_bytes += sdsZmallocSize(tail->ptr);
} else { } else {
robj *o = createStringObject(s,len); sds node = sdsnewlen(s,len);
listAddNodeTail(c->reply,node);
listAddNodeTail(c->reply,o); c->reply_bytes += len;
c->reply_bytes += getStringObjectSdsUsedMemory(o);
} }
} }
asyncCloseClientOnOutputBufferLimitReached(c); asyncCloseClientOnOutputBufferLimitReached(c);
@ -430,32 +406,32 @@ void *addDeferredMultiBulkLength(client *c) {
* ready to be sent, since we are sure that before returning to the * ready to be sent, since we are sure that before returning to the
* event loop setDeferredMultiBulkLength() will be called. */ * event loop setDeferredMultiBulkLength() will be called. */
if (prepareClientToWrite(c) != C_OK) return NULL; if (prepareClientToWrite(c) != C_OK) return NULL;
listAddNodeTail(c->reply,createObject(OBJ_STRING,NULL)); listAddNodeTail(c->reply,NULL); /* NULL is our placeholder. */
return listLast(c->reply); return listLast(c->reply);
} }
/* Populate the length object and try gluing it to the next chunk. */ /* Populate the length object and try gluing it to the next chunk. */
void setDeferredMultiBulkLength(client *c, void *node, long length) { void setDeferredMultiBulkLength(client *c, void *node, long length) {
listNode *ln = (listNode*)node; listNode *ln = (listNode*)node;
robj *len, *next; sds len, next;
/* Abort when *node is NULL (see addDeferredMultiBulkLength). */ /* Abort when *node is NULL: when the client should not accept writes
* we return NULL in addDeferredMultiBulkLength() */
if (node == NULL) return; if (node == NULL) return;
len = listNodeValue(ln); len = sdscatprintf(sdsnewlen("*",1),"%ld\r\n",length);
len->ptr = sdscatprintf(sdsempty(),"*%ld\r\n",length); listNodeValue(ln) = len;
len->encoding = OBJ_ENCODING_RAW; /* in case it was an EMBSTR. */ c->reply_bytes += sdslen(len);
c->reply_bytes += sdsZmallocSize(len->ptr);
if (ln->next != NULL) { if (ln->next != NULL) {
next = listNodeValue(ln->next); next = listNodeValue(ln->next);
/* Only glue when the next node is non-NULL (an sds in this case) */ /* Only glue when the next node is non-NULL (an sds in this case) */
if (next->ptr != NULL) { if (next != NULL) {
c->reply_bytes -= sdsZmallocSize(len->ptr); len = sdscatsds(len,next);
c->reply_bytes -= getStringObjectSdsUsedMemory(next);
len->ptr = sdscatlen(len->ptr,next->ptr,sdslen(next->ptr));
c->reply_bytes += sdsZmallocSize(len->ptr);
listDelNode(c->reply,ln->next); listDelNode(c->reply,ln->next);
listNodeValue(ln) = len;
/* No need to update c->reply_bytes: we are just moving the same
* amount of bytes from one node to another. */
} }
} }
asyncCloseClientOnOutputBufferLimitReached(c); asyncCloseClientOnOutputBufferLimitReached(c);
@ -845,8 +821,7 @@ void freeClientsInAsyncFreeQueue(void) {
int writeToClient(int fd, client *c, int handler_installed) { int writeToClient(int fd, client *c, int handler_installed) {
ssize_t nwritten = 0, totwritten = 0; ssize_t nwritten = 0, totwritten = 0;
size_t objlen; size_t objlen;
size_t objmem; sds o;
robj *o;
while(clientHasPendingReplies(c)) { while(clientHasPendingReplies(c)) {
if (c->bufpos > 0) { if (c->bufpos > 0) {
@ -863,16 +838,14 @@ int writeToClient(int fd, client *c, int handler_installed) {
} }
} else { } else {
o = listNodeValue(listFirst(c->reply)); o = listNodeValue(listFirst(c->reply));
objlen = sdslen(o->ptr); objlen = sdslen(o);
objmem = getStringObjectSdsUsedMemory(o);
if (objlen == 0) { if (objlen == 0) {
listDelNode(c->reply,listFirst(c->reply)); listDelNode(c->reply,listFirst(c->reply));
c->reply_bytes -= objmem;
continue; continue;
} }
nwritten = write(fd, ((char*)o->ptr)+c->sentlen,objlen-c->sentlen); nwritten = write(fd, o + c->sentlen, objlen - c->sentlen);
if (nwritten <= 0) break; if (nwritten <= 0) break;
c->sentlen += nwritten; c->sentlen += nwritten;
totwritten += nwritten; totwritten += nwritten;
@ -881,7 +854,7 @@ int writeToClient(int fd, client *c, int handler_installed) {
if (c->sentlen == objlen) { if (c->sentlen == objlen) {
listDelNode(c->reply,listFirst(c->reply)); listDelNode(c->reply,listFirst(c->reply));
c->sentlen = 0; c->sentlen = 0;
c->reply_bytes -= objmem; c->reply_bytes -= objlen;
} }
} }
/* Note that we avoid to send more than NET_MAX_WRITES_PER_EVENT /* Note that we avoid to send more than NET_MAX_WRITES_PER_EVENT
@ -1626,7 +1599,9 @@ void rewriteClientCommandArgument(client *c, int i, robj *newval) {
* the caller wishes. The main usage of this function currently is * the caller wishes. The main usage of this function currently is
* enforcing the client output length limits. */ * enforcing the client output length limits. */
unsigned long getClientOutputBufferMemoryUsage(client *c) { unsigned long getClientOutputBufferMemoryUsage(client *c) {
unsigned long list_item_size = sizeof(listNode)+sizeof(robj); unsigned long list_item_size = sizeof(listNode)+5;
/* The +5 above means we assume an sds16 hdr, may not be true
* but is not going to be a problem. */
return c->reply_bytes + (list_item_size*listLength(c->reply)); return c->reply_bytes + (list_item_size*listLength(c->reply));
} }

View File

@ -387,9 +387,9 @@ int luaRedisGenericCommand(lua_State *lua, int raise_error) {
reply = sdsnewlen(c->buf,c->bufpos); reply = sdsnewlen(c->buf,c->bufpos);
c->bufpos = 0; c->bufpos = 0;
while(listLength(c->reply)) { while(listLength(c->reply)) {
robj *o = listNodeValue(listFirst(c->reply)); sds o = listNodeValue(listFirst(c->reply));
reply = sdscatlen(reply,o->ptr,sdslen(o->ptr)); reply = sdscatsds(reply,o);
listDelNode(c->reply,listFirst(c->reply)); listDelNode(c->reply,listFirst(c->reply));
} }
} }