From a7c180e559d687022e85bee88f28d11db571e90c Mon Sep 17 00:00:00 2001 From: antirez Date: Thu, 12 Jul 2018 13:15:54 +0200 Subject: [PATCH] Simplify duplicated NACK #5112 fix. We don't really need to distinguish between the case the consumer is the same or is a different one. --- src/t_stream.c | 39 ++++++++++++++++++--------------------- 1 file changed, 18 insertions(+), 21 deletions(-) diff --git a/src/t_stream.c b/src/t_stream.c index 572e09669..5c1b7d4c4 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -905,29 +905,26 @@ size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end * will not require extra lookups. We'll fix the problem later * if we find that there is already a entry for this ID. */ streamNACK *nack = streamCreateNACK(consumer); - int retval_group_tryins = 0, retval_consumer_tryins = 0; - retval_group_tryins = raxTryInsert(group->pel,buf,sizeof(buf),nack,NULL); - retval_consumer_tryins = raxTryInsert(consumer->pel,buf,sizeof(buf),nack,NULL); + int group_inserted = + raxTryInsert(group->pel,buf,sizeof(buf),nack,NULL); + int consumer_inserted = + raxTryInsert(consumer->pel,buf,sizeof(buf),nack,NULL); /* Now we can check if the entry was already busy, and - * in that case reassign the entry to the new consumer. */ - if (retval_group_tryins == 0) { - streamNACK *oldnack = raxFind(group->pel,buf,sizeof(buf)); - serverAssert(oldnack != raxNotFound); - if (retval_consumer_tryins == 0) { - /* In this case, only need to update old nack - * delivery_time and delivery_count. */ - oldnack->delivery_time = mstime(); - oldnack->delivery_count++; - streamFreeNACK(nack); - } else { - /* remove the old nack from original consumer */ - raxRemove(oldnack->consumer->pel,buf,sizeof(buf),NULL); - /* Overwrite nack in group pel. */ - raxInsert(group->pel,buf,sizeof(buf),nack,NULL); - streamFreeNACK(oldnack); - } - } else if (retval_group_tryins == 1 && retval_consumer_tryins == 0) { + * in that case reassign the entry to the new consumer, + * or update it if the consumer is the same as before. */ + if (group_inserted == 0) { + streamFreeNACK(nack); + nack = raxFind(group->pel,buf,sizeof(buf)); + serverAssert(nack != raxNotFound); + raxRemove(nack->consumer->pel,buf,sizeof(buf),NULL); + /* Update the consumer and idle time. */ + nack->consumer = consumer; + nack->delivery_time = mstime(); + nack->delivery_count++; + /* Add the entry in the new consumer local PEL. */ + raxInsert(consumer->pel,buf,sizeof(buf),nack,NULL); + } else if (group_inserted == 1 && consumer_inserted == 0) { serverPanic("NACK half-created. Should not be possible."); }