diff --git a/src/t_stream.c b/src/t_stream.c index db0b4fb96..75adba6ab 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -905,24 +905,29 @@ size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end * will not require extra lookups. We'll fix the problem later * if we find that there is already a entry for this ID. */ streamNACK *nack = streamCreateNACK(consumer); - int retval = 0; - retval += raxTryInsert(group->pel,buf,sizeof(buf),nack,NULL); - retval += raxTryInsert(consumer->pel,buf,sizeof(buf),nack,NULL); + int retval_group_tryins = 0, retval_consumer_tryins = 0; + retval_group_tryins = raxTryInsert(group->pel,buf,sizeof(buf),nack,NULL); + retval_consumer_tryins = raxTryInsert(consumer->pel,buf,sizeof(buf),nack,NULL); /* Now we can check if the entry was already busy, and * in that case reassign the entry to the new consumer. */ - if (retval == 0) { - streamFreeNACK(nack); - nack = raxFind(group->pel,buf,sizeof(buf)); - serverAssert(nack != raxNotFound); - raxRemove(nack->consumer->pel,buf,sizeof(buf),NULL); - /* Update the consumer and idle time. */ - nack->consumer = consumer; - nack->delivery_time = mstime(); - nack->delivery_count++; - /* Add the entry in the new consumer local PEL. */ - raxInsert(consumer->pel,buf,sizeof(buf),nack,NULL); - } else if (retval == 1) { + if (retval_group_tryins == 0) { + streamNACK *oldnack = raxFind(group->pel,buf,sizeof(buf)); + serverAssert(oldnack != raxNotFound); + if (retval_consumer_tryins == 0) { + /* In this case, only need to update old nack + * delivery_time and delivery_count. */ + oldnack->delivery_time = mstime(); + oldnack->delivery_count++; + streamFreeNACK(nack); + } else { + /* remove the old nack from original consumer */ + raxRemove(oldnack->consumer->pel,buf,sizeof(buf),NULL); + /* Overwrite nack in group pel. */ + raxInsert(group->pel,buf,sizeof(buf),nack,NULL); + streamFreeNACK(oldnack); + } + } else if (retval_group_tryins == 1 && retval_consumer_tryins == 0) { serverPanic("NACK half-created. Should not be possible."); }