mirror of
https://codeberg.org/redict/redict.git
synced 2025-01-23 08:38:27 -05:00
Merge pull request #5126 from oranagra/slave_buf_memory_2
slave buffers were wasteful and incorrectly counted causing eviction
This commit is contained in:
commit
e22a1218bc
@ -645,7 +645,7 @@ struct client *createFakeClient(void) {
|
||||
c->obuf_soft_limit_reached_time = 0;
|
||||
c->watched_keys = listCreate();
|
||||
c->peerid = NULL;
|
||||
listSetFreeMethod(c->reply,decrRefCountVoid);
|
||||
listSetFreeMethod(c->reply,freeClientReplyValue);
|
||||
listSetDupMethod(c->reply,dupClientReplyValue);
|
||||
initClientMultiState(c);
|
||||
return c;
|
||||
|
@ -2712,9 +2712,9 @@ RedisModuleCallReply *RM_Call(RedisModuleCtx *ctx, const char *cmdname, const ch
|
||||
sds proto = sdsnewlen(c->buf,c->bufpos);
|
||||
c->bufpos = 0;
|
||||
while(listLength(c->reply)) {
|
||||
sds o = listNodeValue(listFirst(c->reply));
|
||||
clientReplyBlock *o = listNodeValue(listFirst(c->reply));
|
||||
|
||||
proto = sdscatsds(proto,o);
|
||||
proto = sdscatlen(proto,o->buf,o->used);
|
||||
listDelNode(c->reply,listFirst(c->reply));
|
||||
}
|
||||
reply = moduleCreateCallReplyFromProto(ctx,proto);
|
||||
|
@ -56,11 +56,14 @@ size_t getStringObjectSdsUsedMemory(robj *o) {
|
||||
|
||||
/* Client.reply list dup and free methods. */
|
||||
void *dupClientReplyValue(void *o) {
|
||||
return sdsdup(o);
|
||||
clientReplyBlock *old = o;
|
||||
clientReplyBlock *buf = zmalloc(sizeof(clientReplyBlock) + old->size);
|
||||
memcpy(buf, o, sizeof(clientReplyBlock) + old->size);
|
||||
return buf;
|
||||
}
|
||||
|
||||
void freeClientReplyValue(void *o) {
|
||||
sdsfree(o);
|
||||
zfree(o);
|
||||
}
|
||||
|
||||
int listMatchObjects(void *a, void *b) {
|
||||
@ -240,25 +243,31 @@ int _addReplyToBuffer(client *c, const char *s, size_t len) {
|
||||
void _addReplyStringToList(client *c, const char *s, size_t len) {
|
||||
if (c->flags & CLIENT_CLOSE_AFTER_REPLY) return;
|
||||
|
||||
if (listLength(c->reply) == 0) {
|
||||
sds node = sdsnewlen(s,len);
|
||||
listAddNodeTail(c->reply,node);
|
||||
c->reply_bytes += len;
|
||||
} else {
|
||||
listNode *ln = listLast(c->reply);
|
||||
sds tail = listNodeValue(ln);
|
||||
clientReplyBlock *tail = ln? listNodeValue(ln): NULL;
|
||||
/* It is possible that we have a tail list node, but no tail buffer.
|
||||
* if addDeferredMultiBulkLength() was used. */
|
||||
|
||||
/* Append to this object when possible. If tail == NULL it was
|
||||
* set via addDeferredMultiBulkLength(). */
|
||||
if (tail && (sdsavail(tail) >= len || sdslen(tail)+len <= PROTO_REPLY_CHUNK_BYTES)) {
|
||||
tail = sdscatlen(tail,s,len);
|
||||
listNodeValue(ln) = tail;
|
||||
c->reply_bytes += len;
|
||||
} else {
|
||||
sds node = sdsnewlen(s,len);
|
||||
listAddNodeTail(c->reply,node);
|
||||
c->reply_bytes += len;
|
||||
/* Append to tail string when possible. */
|
||||
if (tail) {
|
||||
/* Copy the part we can fit into the tail, and leave the rest for a new node */
|
||||
size_t avail = tail->size - tail->used;
|
||||
size_t copy = avail >= len? len: avail;
|
||||
memcpy(tail->buf + tail->used, s, copy);
|
||||
tail->used += copy;
|
||||
s += copy;
|
||||
len -= copy;
|
||||
}
|
||||
if (len) {
|
||||
/* Create a new node, make sure it is allocated to at least PROTO_REPLY_CHUNK_BYTES */
|
||||
size_t size = len < PROTO_REPLY_CHUNK_BYTES? PROTO_REPLY_CHUNK_BYTES: len;
|
||||
tail = zmalloc(size + sizeof(clientReplyBlock));
|
||||
/* take over the allocation's internal fragmentation */
|
||||
tail->size = zmalloc_usable(tail) - sizeof(clientReplyBlock);
|
||||
tail->used = len;
|
||||
memcpy(tail->buf, s, len);
|
||||
listAddNodeTail(c->reply, tail);
|
||||
c->reply_bytes += tail->size;
|
||||
}
|
||||
asyncCloseClientOnOutputBufferLimitReached(c);
|
||||
}
|
||||
@ -390,26 +399,35 @@ void *addDeferredMultiBulkLength(client *c) {
|
||||
/* Populate the length object and try gluing it to the next chunk. */
|
||||
void setDeferredMultiBulkLength(client *c, void *node, long length) {
|
||||
listNode *ln = (listNode*)node;
|
||||
sds len, next;
|
||||
clientReplyBlock *next;
|
||||
char lenstr[128];
|
||||
size_t lenstr_len = sprintf(lenstr, "*%ld\r\n", length);
|
||||
|
||||
/* Abort when *node is NULL: when the client should not accept writes
|
||||
* we return NULL in addDeferredMultiBulkLength() */
|
||||
if (node == NULL) return;
|
||||
serverAssert(!listNodeValue(ln));
|
||||
|
||||
len = sdscatprintf(sdsnewlen("*",1),"%ld\r\n",length);
|
||||
listNodeValue(ln) = len;
|
||||
c->reply_bytes += sdslen(len);
|
||||
if (ln->next != NULL) {
|
||||
next = listNodeValue(ln->next);
|
||||
|
||||
/* Only glue when the next node is non-NULL (an sds in this case) */
|
||||
if (next != NULL) {
|
||||
len = sdscatsds(len,next);
|
||||
listDelNode(c->reply,ln->next);
|
||||
listNodeValue(ln) = len;
|
||||
/* No need to update c->reply_bytes: we are just moving the same
|
||||
* amount of bytes from one node to another. */
|
||||
}
|
||||
/* Glue into next node when:
|
||||
* - the next node is non-NULL,
|
||||
* - it has enough room already allocated
|
||||
* - and not too large (avoid large memmove) */
|
||||
if (ln->next != NULL && (next = listNodeValue(ln->next)) &&
|
||||
next->size - next->used >= lenstr_len &&
|
||||
next->used < PROTO_REPLY_CHUNK_BYTES * 4) {
|
||||
memmove(next->buf + lenstr_len, next->buf, next->used);
|
||||
memcpy(next->buf, lenstr, lenstr_len);
|
||||
next->used += lenstr_len;
|
||||
listDelNode(c->reply,ln);
|
||||
} else {
|
||||
/* Create a new node */
|
||||
clientReplyBlock *buf = zmalloc(lenstr_len + sizeof(clientReplyBlock));
|
||||
/* take over the allocation's internal fragmentation */
|
||||
buf->size = zmalloc_usable(buf) - sizeof(clientReplyBlock);
|
||||
buf->used = lenstr_len;
|
||||
memcpy(buf->buf, lenstr, lenstr_len);
|
||||
listNodeValue(ln) = buf;
|
||||
c->reply_bytes += buf->size;
|
||||
}
|
||||
asyncCloseClientOnOutputBufferLimitReached(c);
|
||||
}
|
||||
@ -895,7 +913,7 @@ client *lookupClientByID(uint64_t id) {
|
||||
int writeToClient(int fd, client *c, int handler_installed) {
|
||||
ssize_t nwritten = 0, totwritten = 0;
|
||||
size_t objlen;
|
||||
sds o;
|
||||
clientReplyBlock *o;
|
||||
|
||||
while(clientHasPendingReplies(c)) {
|
||||
if (c->bufpos > 0) {
|
||||
@ -912,23 +930,24 @@ int writeToClient(int fd, client *c, int handler_installed) {
|
||||
}
|
||||
} else {
|
||||
o = listNodeValue(listFirst(c->reply));
|
||||
objlen = sdslen(o);
|
||||
objlen = o->used;
|
||||
|
||||
if (objlen == 0) {
|
||||
c->reply_bytes -= o->size;
|
||||
listDelNode(c->reply,listFirst(c->reply));
|
||||
continue;
|
||||
}
|
||||
|
||||
nwritten = write(fd, o + c->sentlen, objlen - c->sentlen);
|
||||
nwritten = write(fd, o->buf + c->sentlen, objlen - c->sentlen);
|
||||
if (nwritten <= 0) break;
|
||||
c->sentlen += nwritten;
|
||||
totwritten += nwritten;
|
||||
|
||||
/* If we fully sent the object on head go to the next one */
|
||||
if (c->sentlen == objlen) {
|
||||
c->reply_bytes -= o->size;
|
||||
listDelNode(c->reply,listFirst(c->reply));
|
||||
c->sentlen = 0;
|
||||
c->reply_bytes -= objlen;
|
||||
/* If there are no longer objects in the list, we expect
|
||||
* the count of reply bytes to be exactly zero. */
|
||||
if (listLength(c->reply) == 0)
|
||||
@ -1899,10 +1918,7 @@ void rewriteClientCommandArgument(client *c, int i, robj *newval) {
|
||||
* the caller wishes. The main usage of this function currently is
|
||||
* enforcing the client output length limits. */
|
||||
unsigned long getClientOutputBufferMemoryUsage(client *c) {
|
||||
unsigned long list_item_size = sizeof(listNode)+5;
|
||||
/* The +5 above means we assume an sds16 hdr, may not be true
|
||||
* but is not going to be a problem. */
|
||||
|
||||
unsigned long list_item_size = sizeof(listNode) + sizeof(clientReplyBlock);
|
||||
return c->reply_bytes + (list_item_size*listLength(c->reply));
|
||||
}
|
||||
|
||||
|
@ -2148,6 +2148,7 @@ void replicationCacheMaster(client *c) {
|
||||
server.master->read_reploff = server.master->reploff;
|
||||
if (c->flags & CLIENT_MULTI) discardTransaction(c);
|
||||
listEmpty(c->reply);
|
||||
c->reply_bytes = 0;
|
||||
c->bufpos = 0;
|
||||
resetClient(c);
|
||||
|
||||
|
@ -575,9 +575,9 @@ int luaRedisGenericCommand(lua_State *lua, int raise_error) {
|
||||
reply = sdsnewlen(c->buf,c->bufpos);
|
||||
c->bufpos = 0;
|
||||
while(listLength(c->reply)) {
|
||||
sds o = listNodeValue(listFirst(c->reply));
|
||||
clientReplyBlock *o = listNodeValue(listFirst(c->reply));
|
||||
|
||||
reply = sdscatsds(reply,o);
|
||||
reply = sdscatlen(reply,o->buf,o->used);
|
||||
listDelNode(c->reply,listFirst(c->reply));
|
||||
}
|
||||
}
|
||||
|
10
src/server.c
10
src/server.c
@ -3101,6 +3101,11 @@ sds genRedisInfoString(char *section) {
|
||||
"rss_overhead_bytes:%zu\r\n"
|
||||
"mem_fragmentation_ratio:%.2f\r\n"
|
||||
"mem_fragmentation_bytes:%zu\r\n"
|
||||
"mem_not_counted_for_evict:%zu\r\n"
|
||||
"mem_replication_backlog:%zu\r\n"
|
||||
"mem_clients_slaves:%zu\r\n"
|
||||
"mem_clients_normal:%zu\r\n"
|
||||
"mem_aof_buffer:%zu\r\n"
|
||||
"mem_allocator:%s\r\n"
|
||||
"active_defrag_running:%d\r\n"
|
||||
"lazyfree_pending_objects:%zu\r\n",
|
||||
@ -3133,6 +3138,11 @@ sds genRedisInfoString(char *section) {
|
||||
mh->rss_extra_bytes,
|
||||
mh->total_frag, /* this is the total RSS overhead, including fragmentation, */
|
||||
mh->total_frag_bytes, /* named so for backwards compatibility */
|
||||
freeMemoryGetNotCountedMemory(),
|
||||
mh->repl_backlog,
|
||||
mh->clients_slaves,
|
||||
mh->clients_normal,
|
||||
mh->aof_buffer,
|
||||
ZMALLOC_LIB,
|
||||
server.active_defrag_running,
|
||||
lazyfreeGetPendingObjectsCount()
|
||||
|
@ -619,6 +619,11 @@ typedef struct redisObject {
|
||||
|
||||
struct evictionPoolEntry; /* Defined in evict.c */
|
||||
|
||||
typedef struct clientReplyBlock {
|
||||
size_t size, used;
|
||||
char buf[];
|
||||
} clientReplyBlock;
|
||||
|
||||
/* Redis database representation. There are multiple databases identified
|
||||
* by integers from 0 (the default database) up to the max configured
|
||||
* database. The database number is the 'id' field in the structure. */
|
||||
@ -1423,6 +1428,7 @@ void addReplySubcommandSyntaxError(client *c);
|
||||
void copyClientOutputBuffer(client *dst, client *src);
|
||||
size_t sdsZmallocSize(sds s);
|
||||
size_t getStringObjectSdsUsedMemory(robj *o);
|
||||
void freeClientReplyValue(void *o);
|
||||
void *dupClientReplyValue(void *o);
|
||||
void getClientsMaxBuffers(unsigned long *longest_output_list,
|
||||
unsigned long *biggest_input_buffer);
|
||||
@ -1664,6 +1670,7 @@ int zslLexValueLteMax(sds value, zlexrangespec *spec);
|
||||
|
||||
/* Core functions */
|
||||
int getMaxmemoryState(size_t *total, size_t *logical, size_t *tofree, float *level);
|
||||
size_t freeMemoryGetNotCountedMemory();
|
||||
int freeMemoryIfNeeded(void);
|
||||
int processCommand(client *c);
|
||||
void setupSignalHandlers(void);
|
||||
|
@ -182,6 +182,9 @@ size_t zmalloc_size(void *ptr) {
|
||||
if (size&(sizeof(long)-1)) size += sizeof(long)-(size&(sizeof(long)-1));
|
||||
return size+PREFIX_SIZE;
|
||||
}
|
||||
size_t zmalloc_usable(void *ptr) {
|
||||
return zmalloc_usable(ptr)-PREFIX_SIZE;
|
||||
}
|
||||
#endif
|
||||
|
||||
void zfree(void *ptr) {
|
||||
|
@ -98,6 +98,9 @@ void *zmalloc_no_tcache(size_t size);
|
||||
|
||||
#ifndef HAVE_MALLOC_SIZE
|
||||
size_t zmalloc_size(void *ptr);
|
||||
size_t zmalloc_usable(void *ptr);
|
||||
#else
|
||||
#define zmalloc_usable(p) zmalloc_size(p)
|
||||
#endif
|
||||
|
||||
#endif /* __ZMALLOC_H */
|
||||
|
@ -142,3 +142,95 @@ start_server {tags {"maxmemory"}} {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
proc test_slave_buffers {cmd_count payload_len limit_memory pipeline} {
|
||||
start_server {tags {"maxmemory"}} {
|
||||
start_server {} {
|
||||
set slave [srv 0 client]
|
||||
set slave_host [srv 0 host]
|
||||
set slave_port [srv 0 port]
|
||||
set master [srv -1 client]
|
||||
set master_host [srv -1 host]
|
||||
set master_port [srv -1 port]
|
||||
|
||||
# add 100 keys of 100k (10MB total)
|
||||
for {set j 0} {$j < 100} {incr j} {
|
||||
$master setrange "key:$j" 100000 asdf
|
||||
}
|
||||
|
||||
$master config set maxmemory-policy allkeys-random
|
||||
$master config set client-output-buffer-limit "slave 100000000 100000000 60"
|
||||
$master config set repl-backlog-size [expr {10*1024}]
|
||||
|
||||
$slave slaveof $master_host $master_port
|
||||
wait_for_condition 50 100 {
|
||||
[s 0 master_link_status] eq {up}
|
||||
} else {
|
||||
fail "Replication not started."
|
||||
}
|
||||
|
||||
# measure used memory after the slave connected and set maxmemory
|
||||
set orig_used [s -1 used_memory]
|
||||
set orig_client_buf [s -1 mem_clients_normal]
|
||||
set orig_mem_not_counted_for_evict [s -1 mem_not_counted_for_evict]
|
||||
set orig_used_no_repl [expr {$orig_used - $orig_mem_not_counted_for_evict}]
|
||||
set limit [expr {$orig_used - $orig_mem_not_counted_for_evict + 20*1024}]
|
||||
|
||||
if {$limit_memory==1} {
|
||||
$master config set maxmemory $limit
|
||||
}
|
||||
|
||||
# put the slave to sleep
|
||||
set rd_slave [redis_deferring_client]
|
||||
$rd_slave debug sleep 60
|
||||
|
||||
# send some 10mb woth of commands that don't increase the memory usage
|
||||
if {$pipeline == 1} {
|
||||
set rd_master [redis_deferring_client -1]
|
||||
for {set k 0} {$k < $cmd_count} {incr k} {
|
||||
$rd_master setrange key:0 0 [string repeat A $payload_len]
|
||||
}
|
||||
for {set k 0} {$k < $cmd_count} {incr k} {
|
||||
#$rd_master read
|
||||
}
|
||||
} else {
|
||||
for {set k 0} {$k < $cmd_count} {incr k} {
|
||||
$master setrange key:0 0 [string repeat A $payload_len]
|
||||
}
|
||||
}
|
||||
|
||||
set new_used [s -1 used_memory]
|
||||
set slave_buf [s -1 mem_clients_slaves]
|
||||
set client_buf [s -1 mem_clients_normal]
|
||||
set mem_not_counted_for_evict [s -1 mem_not_counted_for_evict]
|
||||
set used_no_repl [expr {$new_used - $mem_not_counted_for_evict}]
|
||||
set delta [expr {($used_no_repl - $client_buf) - ($orig_used_no_repl - $orig_client_buf)}]
|
||||
|
||||
assert {[$master dbsize] == 100}
|
||||
assert {$slave_buf > 2*1024*1024} ;# some of the data may have been pushed to the OS buffers
|
||||
assert {$delta < 50*1024 && $delta > -50*1024} ;# 1 byte unaccounted for, with 1M commands will consume some 1MB
|
||||
|
||||
$master client kill type slave
|
||||
set killed_used [s -1 used_memory]
|
||||
set killed_slave_buf [s -1 mem_clients_slaves]
|
||||
set killed_mem_not_counted_for_evict [s -1 mem_not_counted_for_evict]
|
||||
set killed_used_no_repl [expr {$killed_used - $killed_mem_not_counted_for_evict}]
|
||||
set delta_no_repl [expr {$killed_used_no_repl - $used_no_repl}]
|
||||
assert {$killed_slave_buf == 0}
|
||||
assert {$delta_no_repl > -50*1024 && $delta_no_repl < 50*1024} ;# 1 byte unaccounted for, with 1M commands will consume some 1MB
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
test {slave buffer are counted correctly} {
|
||||
# we wanna use many small commands, and we don't wanna wait long
|
||||
# so we need to use a pipeline (redis_deferring_client)
|
||||
# that may cause query buffer to fill and induce eviction, so we disable it
|
||||
test_slave_buffers 1000000 10 0 1
|
||||
}
|
||||
|
||||
test {slave buffer don't induce eviction} {
|
||||
# test again with fewer (and bigger) commands without pipeline, but with eviction
|
||||
test_slave_buffers 100000 100 1 0
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user