Streams: XADD MAXLEN implementation.

The core of this change is the implementation of stream trimming, and
the resulting MAXLEN option of XADD as a trivial result of having
trimming functionalities. MAXLEN already works but in order to be more
efficient listpack GC should be implemented, currently marked as a TODO
item inside the comments.
This commit is contained in:
antirez 2017-09-29 12:40:29 +02:00
parent 0c00fd7834
commit 0540803288

View File

@ -305,6 +305,107 @@ int streamAppendItem(stream *s, robj **argv, int numfields, streamID *added_id,
return C_OK;
}
/* Trim the stream 's' to have no more than maxlen elements, and return the
* number of elements removed from the stream. The 'approx' option, if non-zero,
* specifies that the trimming must be performed in a approximated way in
* order to maximize performances. This means that the stream may contain
* more elements than 'maxlen', and elements are only removed if we can remove
* a *whole* node of the radix tree. The elements are removed from the head
* of the stream (older elements).
*
* The function may return zero if:
*
* 1) The stream is already shorter or equal to the specified max length.
* 2) The 'approx' option is true and the head node had not enough elements
* to be deleted, leaving the stream with a number of elements >= maxlen.
*/
int64_t streamTrimByLength(stream *s, size_t maxlen, int approx) {
if (s->length <= maxlen) return 0;
raxIterator ri;
raxStart(&ri,s->rax);
raxSeek(&ri,"^",NULL,0);
int64_t deleted = 0;
while(s->length > maxlen && raxNext(&ri)) {
unsigned char *lp = ri.data, *p = lpFirst(lp);
int64_t entries = lpGetInteger(p);
/* Check if we can remove the whole node, and still have at
* least maxlen elements. */
if (s->length - entries >= maxlen) {
raxRemove(s->rax,ri.key,ri.key_len,NULL);
raxSeek(&ri,">=",ri.key,ri.key_len);
s->length -= entries;
deleted += entries;
continue;
}
/* If we cannot remove a whole element, and approx is true,
* stop here. */
if (approx) break;
/* Otherwise, we have to mark single entries inside the listpack
* as deleted. We start by updating the entries/deleted counters. */
int64_t to_delete = s->length - maxlen;
serverAssert(to_delete < entries);
lp = lpReplaceInteger(lp,&p,entries-to_delete);
p = lpNext(lp,p); /* Seek deleted field. */
int64_t deleted = lpGetInteger(p);
lp = lpReplaceInteger(lp,&p,deleted+to_delete);
p = lpNext(lp,p); /* Seek num-of-fields in the master entry. */
/* Skip all the master fields. */
int64_t master_fields_count = lpGetInteger(p);
p = lpNext(lp,p); /* Seek the first field. */
for (int64_t j = 0; j < master_fields_count; j++)
p = lpNext(lp,p); /* Skip all master fields. */
/* 'p' is now pointing to the first entry inside the listpack.
* We have to run entry after entry, marking entries as deleted
* if they are already not deleted. */
while(p) {
int flags = lpGetInteger(p);
int to_skip;
/* Mark the entry as deleted. */
if (!(flags & STREAM_ITEM_FLAG_DELETED)) {
flags |= STREAM_ITEM_FLAG_DELETED;
lp = lpReplaceInteger(lp,&p,flags);
deleted++;
s->length--;
if (s->length <= maxlen) break; /* Enough entries deleted. */
}
p = lpNext(lp,p); /* Skip ID ms delta. */
p = lpNext(lp,p); /* Skip ID seq delta. */
p = lpNext(lp,p); /* Seek num-fields or values (if compressed). */
if (flags & STREAM_ITEM_FLAG_SAMEFIELDS) {
to_skip = master_fields_count;
} else {
to_skip = lpGetInteger(p); p = lpNext(lp,p);
to_skip = 1+(to_skip*2);
}
while(to_skip--) p = lpNext(lp,p); /* Skip the whole entry. */
}
/* Here we should perform garbage collection in case at this point
* there are too many entries deleted inside the listpack. */
entries -= to_delete;
deleted += to_delete;
if (entries + deleted > 10 && deleted > entries/2) {
/* TODO: perform a garbage collection. */
}
break; /* If we are here, there was enough to delete in the current
node, so no need to go to the next node. */
}
raxStop(&ri);
return deleted;
}
/* Initialize the stream iterator, so that we can call iterating functions
* to get the next items. This requires a corresponding streamIteratorStop()
* at the end.
@ -578,21 +679,32 @@ invalid:
void xaddCommand(client *c) {
streamID id;
int id_given = 0; /* Was an ID different than "*" specified? */
long long maxlen = 0; /* 0 means no maximum length. */
int approx_maxlen = 0; /* If 1 only delete whole radix tree nodes, so
the maxium length is not applied verbatim. */
int maxlen_arg_idx = 0; /* Index of the count in MAXLEN, for rewriting. */
/* Parse options. */
int i = 2; /* This is the first argument position where we could
find an option, or the ID. */
for (; i < c->argc; i++) {
int moreargs = i != c->argc-1;
int moreargs = (c->argc-1) - i; /* Number of additional arguments. */
char *opt = c->argv[i]->ptr;
if (opt[0] == '*' && opt[1] == '\0') {
/* This is just a fast path for the common case of auto-ID
* creation. */
break;
} else if (!strcasecmp(opt,"maxlen") && moreargs) {
addReplyError(c,"Sorry, MAXLEN is still not implemented");
char *next = c->argv[i+1]->ptr;
/* Check for the form MAXLEN ~ <count>. */
if (moreargs >= 2 && next[0] == '~' && next[1] == '\0') {
approx_maxlen = 1;
i++;
}
if (getLongLongFromObjectOrReply(c,c->argv[i+1],&maxlen,NULL)
!= C_OK) return;
i++;
return;
maxlen_arg_idx = i;
} else {
/* If we are here is a syntax error or a valid ID. */
if (streamParseIDOrReply(NULL,c->argv[i],&id,0) == C_OK) {
@ -634,6 +746,20 @@ void xaddCommand(client *c) {
notifyKeyspaceEvent(NOTIFY_STREAM,"xadd",c->argv[1],c->db->id);
server.dirty++;
/* Remove older elements if MAXLEN was specified. */
if (maxlen) {
if (!streamTrimByLength(s,maxlen,approx_maxlen)) {
/* If no trimming was performed, for instance because approximated
* trimming length was specified, rewrite the MAXLEN argument
* as zero, so that the command is propagated without trimming. */
robj *zeroobj = createStringObjectFromLongLong(0);
rewriteClientCommandArgument(c,maxlen_arg_idx,zeroobj);
decrRefCount(zeroobj);
} else {
notifyKeyspaceEvent(NOTIFY_STREAM,"xtrim",c->argv[1],c->db->id);
}
}
/* Let's rewrite the ID argument with the one actually generated for
* AOF/replication propagation. */
robj *idarg = createObject(OBJ_STRING,