Streams: implement stream object release.

This commit is contained in:
antirez 2017-09-06 13:11:47 +02:00
parent ec9bbe96bf
commit 439120c620
5 changed files with 25 additions and 5 deletions

View File

@ -310,6 +310,10 @@ void freeModuleObject(robj *o) {
zfree(mv);
}
void freeStreamObject(robj *o) {
freeStream(o->ptr);
}
void incrRefCount(robj *o) {
if (o->refcount != OBJ_SHARED_REFCOUNT) o->refcount++;
}
@ -323,6 +327,7 @@ void decrRefCount(robj *o) {
case OBJ_ZSET: freeZsetObject(o); break;
case OBJ_HASH: freeHashObject(o); break;
case OBJ_MODULE: freeModuleObject(o); break;
case OBJ_STREAM: freeStreamObject(o); break;
default: serverPanic("Unknown object type"); break;
}
zfree(o);

View File

@ -1093,28 +1093,36 @@ int raxRemove(rax *rax, unsigned char *s, size_t len, void **old) {
/* This is the core of raxFree(): performs a depth-first scan of the
* tree and releases all the nodes found. */
void raxRecursiveFree(rax *rax, raxNode *n) {
void raxRecursiveFree(rax *rax, raxNode *n, void (*free_callback)(void*)) {
debugnode("free traversing",n);
int numchildren = n->iscompr ? 1 : n->size;
raxNode **cp = raxNodeLastChildPtr(n);
while(numchildren--) {
raxNode *child;
memcpy(&child,cp,sizeof(child));
raxRecursiveFree(rax,child);
raxRecursiveFree(rax,child,free_callback);
cp--;
}
debugnode("free depth-first",n);
if (free_callback && n->iskey && !n->isnull)
free_callback(raxGetData(n));
rax_free(n);
rax->numnodes--;
}
/* Free a whole radix tree. */
void raxFree(rax *rax) {
raxRecursiveFree(rax,rax->head);
/* Free a whole radix tree, calling the specified callback in order to
* free the auxiliary data. */
void raxFreeWithCallback(rax *rax, void (*free_callback)(void*)) {
raxRecursiveFree(rax,rax->head,free_callback);
assert(rax->numnodes == 0);
rax_free(rax);
}
/* Free a whole radix tree. */
void raxFree(rax *rax) {
raxFreeWithCallback(rax,NULL);
}
/* ------------------------------- Iterator --------------------------------- */
/* Initialize a Rax iterator. This call should be performed a single time

View File

@ -148,6 +148,7 @@ int raxInsert(rax *rax, unsigned char *s, size_t len, void *data, void **old);
int raxRemove(rax *rax, unsigned char *s, size_t len, void **old);
void *raxFind(rax *rax, unsigned char *s, size_t len);
void raxFree(rax *rax);
void raxFreeWithCallback(rax *rax, void (*free_callback)(void*));
void raxStart(raxIterator *it, rax *rt);
int raxSeek(raxIterator *it, const char *op, unsigned char *ele, size_t len);
int raxNext(raxIterator *it);

View File

@ -1419,6 +1419,7 @@ void signalListAsReady(redisDb *db, robj *key);
/* Stream data type. */
stream *streamNew(void);
void freeStream(stream *s);
/* MULTI/EXEC/WATCH... */
void unwatchAllKeys(client *c);

View File

@ -51,6 +51,11 @@ stream *streamNew(void) {
return s;
}
/* Free a stream, including the listpacks stored inside the radix tree. */
void freeStream(stream *s) {
raxFreeWithCallback(s->rax,(void(*)(void*))lpFree);
}
/* Generate the next stream item ID given the previous one. If the current
* milliseconds Unix time is greater than the previous one, just use this
* as time part and start with sequence part of zero. Otherwise we use the