mirror of
https://codeberg.org/redict/redict.git
synced 2025-01-22 16:18:28 -05:00
Add missing calls to raxStop (#7532)
Since the dynamic allocations in raxIterator are only used for deep walks, memory leak due to missing call to raxStop can only happen for rax with key names longer than 32 bytes. Out of all the missing calls, the only ones that may lead to a leak are the rax for consumer groups and consumers, and these were only in AOFRW and rdbSave, which normally only happen in fork or at shutdown.
This commit is contained in:
parent
2fbd0271f6
commit
4e8f2d6881
19
src/aof.c
19
src/aof.c
@ -1244,12 +1244,16 @@ int rewriteStreamObject(rio *r, robj *key, robj *o) {
|
||||
while(raxNext(&ri)) {
|
||||
streamCG *group = ri.data;
|
||||
/* Emit the XGROUP CREATE in order to create the group. */
|
||||
if (rioWriteBulkCount(r,'*',5) == 0) return 0;
|
||||
if (rioWriteBulkString(r,"XGROUP",6) == 0) return 0;
|
||||
if (rioWriteBulkString(r,"CREATE",6) == 0) return 0;
|
||||
if (rioWriteBulkObject(r,key) == 0) return 0;
|
||||
if (rioWriteBulkString(r,(char*)ri.key,ri.key_len) == 0) return 0;
|
||||
if (rioWriteBulkStreamID(r,&group->last_id) == 0) return 0;
|
||||
if (!rioWriteBulkCount(r,'*',5) ||
|
||||
!rioWriteBulkString(r,"XGROUP",6) ||
|
||||
!rioWriteBulkString(r,"CREATE",6) ||
|
||||
!rioWriteBulkObject(r,key) ||
|
||||
!rioWriteBulkString(r,(char*)ri.key,ri.key_len) ||
|
||||
!rioWriteBulkStreamID(r,&group->last_id))
|
||||
{
|
||||
raxStop(&ri);
|
||||
return 0;
|
||||
}
|
||||
|
||||
/* Generate XCLAIMs for each consumer that happens to
|
||||
* have pending entries. Empty consumers have no semantical
|
||||
@ -1270,6 +1274,9 @@ int rewriteStreamObject(rio *r, robj *key, robj *o) {
|
||||
ri.key_len,consumer,
|
||||
ri_pel.key,nack) == 0)
|
||||
{
|
||||
raxStop(&ri_pel);
|
||||
raxStop(&ri_cons);
|
||||
raxStop(&ri);
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
@ -662,6 +662,7 @@ int scanLaterStraemListpacks(robj *ob, unsigned long *cursor, long long endtime,
|
||||
/* if cursor is non-zero, we seek to the static 'last' */
|
||||
if (!raxSeek(&ri,">", last, sizeof(last))) {
|
||||
*cursor = 0;
|
||||
raxStop(&ri);
|
||||
return 0;
|
||||
}
|
||||
/* assign the iterator node callback after the seek, so that the
|
||||
|
61
src/rdb.c
61
src/rdb.c
@ -697,15 +697,23 @@ ssize_t rdbSaveStreamPEL(rio *rdb, rax *pel, int nacks) {
|
||||
while(raxNext(&ri)) {
|
||||
/* We store IDs in raw form as 128 big big endian numbers, like
|
||||
* they are inside the radix tree key. */
|
||||
if ((n = rdbWriteRaw(rdb,ri.key,sizeof(streamID))) == -1) return -1;
|
||||
if ((n = rdbWriteRaw(rdb,ri.key,sizeof(streamID))) == -1) {
|
||||
raxStop(&ri);
|
||||
return -1;
|
||||
}
|
||||
nwritten += n;
|
||||
|
||||
if (nacks) {
|
||||
streamNACK *nack = ri.data;
|
||||
if ((n = rdbSaveMillisecondTime(rdb,nack->delivery_time)) == -1)
|
||||
if ((n = rdbSaveMillisecondTime(rdb,nack->delivery_time)) == -1) {
|
||||
raxStop(&ri);
|
||||
return -1;
|
||||
}
|
||||
nwritten += n;
|
||||
if ((n = rdbSaveLen(rdb,nack->delivery_count)) == -1) return -1;
|
||||
if ((n = rdbSaveLen(rdb,nack->delivery_count)) == -1) {
|
||||
raxStop(&ri);
|
||||
return -1;
|
||||
}
|
||||
nwritten += n;
|
||||
/* We don't save the consumer name: we'll save the pending IDs
|
||||
* for each consumer in the consumer PEL, and resolve the consumer
|
||||
@ -734,20 +742,27 @@ size_t rdbSaveStreamConsumers(rio *rdb, streamCG *cg) {
|
||||
streamConsumer *consumer = ri.data;
|
||||
|
||||
/* Consumer name. */
|
||||
if ((n = rdbSaveRawString(rdb,ri.key,ri.key_len)) == -1) return -1;
|
||||
if ((n = rdbSaveRawString(rdb,ri.key,ri.key_len)) == -1) {
|
||||
raxStop(&ri);
|
||||
return -1;
|
||||
}
|
||||
nwritten += n;
|
||||
|
||||
/* Last seen time. */
|
||||
if ((n = rdbSaveMillisecondTime(rdb,consumer->seen_time)) == -1)
|
||||
if ((n = rdbSaveMillisecondTime(rdb,consumer->seen_time)) == -1) {
|
||||
raxStop(&ri);
|
||||
return -1;
|
||||
}
|
||||
nwritten += n;
|
||||
|
||||
/* Consumer PEL, without the ACKs (see last parameter of the function
|
||||
* passed with value of 0), at loading time we'll lookup the ID
|
||||
* in the consumer group global PEL and will put a reference in the
|
||||
* consumer local PEL. */
|
||||
if ((n = rdbSaveStreamPEL(rdb,consumer->pel,0)) == -1)
|
||||
if ((n = rdbSaveStreamPEL(rdb,consumer->pel,0)) == -1) {
|
||||
raxStop(&ri);
|
||||
return -1;
|
||||
}
|
||||
nwritten += n;
|
||||
}
|
||||
raxStop(&ri);
|
||||
@ -912,9 +927,15 @@ ssize_t rdbSaveObject(rio *rdb, robj *o, robj *key) {
|
||||
while (raxNext(&ri)) {
|
||||
unsigned char *lp = ri.data;
|
||||
size_t lp_bytes = lpBytes(lp);
|
||||
if ((n = rdbSaveRawString(rdb,ri.key,ri.key_len)) == -1) return -1;
|
||||
if ((n = rdbSaveRawString(rdb,ri.key,ri.key_len)) == -1) {
|
||||
raxStop(&ri);
|
||||
return -1;
|
||||
}
|
||||
nwritten += n;
|
||||
if ((n = rdbSaveRawString(rdb,lp,lp_bytes)) == -1) return -1;
|
||||
if ((n = rdbSaveRawString(rdb,lp,lp_bytes)) == -1) {
|
||||
raxStop(&ri);
|
||||
return -1;
|
||||
}
|
||||
nwritten += n;
|
||||
}
|
||||
raxStop(&ri);
|
||||
@ -946,22 +967,36 @@ ssize_t rdbSaveObject(rio *rdb, robj *o, robj *key) {
|
||||
streamCG *cg = ri.data;
|
||||
|
||||
/* Save the group name. */
|
||||
if ((n = rdbSaveRawString(rdb,ri.key,ri.key_len)) == -1)
|
||||
if ((n = rdbSaveRawString(rdb,ri.key,ri.key_len)) == -1) {
|
||||
raxStop(&ri);
|
||||
return -1;
|
||||
}
|
||||
nwritten += n;
|
||||
|
||||
/* Last ID. */
|
||||
if ((n = rdbSaveLen(rdb,cg->last_id.ms)) == -1) return -1;
|
||||
if ((n = rdbSaveLen(rdb,cg->last_id.ms)) == -1) {
|
||||
raxStop(&ri);
|
||||
return -1;
|
||||
}
|
||||
nwritten += n;
|
||||
if ((n = rdbSaveLen(rdb,cg->last_id.seq)) == -1) return -1;
|
||||
if ((n = rdbSaveLen(rdb,cg->last_id.seq)) == -1) {
|
||||
raxStop(&ri);
|
||||
return -1;
|
||||
}
|
||||
nwritten += n;
|
||||
|
||||
/* Save the global PEL. */
|
||||
if ((n = rdbSaveStreamPEL(rdb,cg->pel,1)) == -1) return -1;
|
||||
if ((n = rdbSaveStreamPEL(rdb,cg->pel,1)) == -1) {
|
||||
raxStop(&ri);
|
||||
return -1;
|
||||
}
|
||||
nwritten += n;
|
||||
|
||||
/* Save the consumers of this group. */
|
||||
if ((n = rdbSaveStreamConsumers(rdb,cg)) == -1) return -1;
|
||||
if ((n = rdbSaveStreamConsumers(rdb,cg)) == -1) {
|
||||
raxStop(&ri);
|
||||
return -1;
|
||||
}
|
||||
nwritten += n;
|
||||
}
|
||||
raxStop(&ri);
|
||||
|
@ -150,6 +150,7 @@ void handleBlockedClientsTimeout(void) {
|
||||
raxRemove(server.clients_timeout_table,ri.key,ri.key_len,NULL);
|
||||
raxSeek(&ri,"^",NULL,0);
|
||||
}
|
||||
raxStop(&ri);
|
||||
}
|
||||
|
||||
/* Get a timeout value from an object and store it into 'timeout'.
|
||||
|
Loading…
Reference in New Issue
Block a user