CG: streamCompareID() + group last_id updating.

This commit is contained in:
antirez 2018-01-22 11:58:18 +01:00
parent 86fe8fde20
commit 6c0af37b6e

View File

@ -149,6 +149,17 @@ void streamDecodeID(void *buf, streamID *id) {
id->seq = ntohu64(e[1]); id->seq = ntohu64(e[1]);
} }
/* Compare two stream IDs. Return -1 if a < b, 0 if a == b, 1 if a > b. */
int streamCompareID(streamID *a, streamID *b) {
if (a->ms > b->ms) return 1;
else if (a->ms < b->ms) return -1;
/* The ms part is the same. Check the sequence part. */
else if (a->seq > b->seq) return 1;
else if (a->seq < b->seq) return -1;
/* Everything is the same: IDs are equal. */
return 0;
}
/* Adds a new item into the stream 's' having the specified number of /* Adds a new item into the stream 's' having the specified number of
* field-value pairs as specified in 'numfields' and stored into 'argv'. * field-value pairs as specified in 'numfields' and stored into 'argv'.
* Returns the new entry ID populating the 'added_id' structure. * Returns the new entry ID populating the 'added_id' structure.
@ -164,9 +175,7 @@ void streamDecodeID(void *buf, streamID *id) {
int streamAppendItem(stream *s, robj **argv, int numfields, streamID *added_id, streamID *use_id) { int streamAppendItem(stream *s, robj **argv, int numfields, streamID *added_id, streamID *use_id) {
/* If an ID was given, check that it's greater than the last entry ID /* If an ID was given, check that it's greater than the last entry ID
* or return an error. */ * or return an error. */
if (use_id && (use_id->ms < s->last_id.ms || if (use_id && streamCompareID(use_id,&s->last_id) <= 0) return C_ERR;
(use_id->ms == s->last_id.ms &&
use_id->seq <= s->last_id.seq))) return C_ERR;
/* Add the new entry. */ /* Add the new entry. */
raxIterator ri; raxIterator ri;
@ -679,8 +688,21 @@ size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end
int64_t numfields; int64_t numfields;
streamID id; streamID id;
/* If a group was passed, as an optimization we check if the range
* specified is about messages that were never delivered. This is true if
* the 'start' range is greater than the current last_id in the stream.
* In that case, there is no need to check if the messages may already
* have another owner before delivering the message. This speeds up
* the processing significantly. */
int newmessages = group != NULL &&
streamCompareID(start,&group->last_id) > 0;
streamIteratorStart(&si,s,start,end,rev); streamIteratorStart(&si,s,start,end,rev);
while(streamIteratorGetID(&si,&id,&numfields)) { while(streamIteratorGetID(&si,&id,&numfields)) {
/* Update the group last_id if needed. */
if (group && streamCompareID(&id,&group->last_id) > 0)
group->last_id = id;
/* Emit a two elements array for each item. The first is /* Emit a two elements array for each item. The first is
* the ID, the second is an array of field-value pairs. */ * the ID, the second is an array of field-value pairs. */
sds replyid = sdscatfmt(sdsempty(),"+%U-%U\r\n",id.ms,id.seq); sds replyid = sdscatfmt(sdsempty(),"+%U-%U\r\n",id.ms,id.seq);
@ -1134,8 +1156,15 @@ void streamNotAckedFree(streamNotAcked *na) {
zfree(na); zfree(na);
} }
/* Free a consumer and associated data structures. Note that this function
* will not reassign the pending messages associated with this consumer
* nor will delete them from the stream, so when this function is called
* to delete a consumer, and not when the whole stream is destroyed, the caller
* should do some work before. */
void streamConsumerFree(streamConsumer *sc) { void streamConsumerFree(streamConsumer *sc) {
zfree(sc->name); raxFree(sc->pel); /* No value free callback: the PEL entries are shared
between the consumer and the main stream PEL. */
sdsfree(sc->name);
zfree(sc); zfree(sc);
} }