Simplify duplicated NACK #5112 fix.

We don't really need to distinguish between the case the consumer is the
same or is a different one.
This commit is contained in:
antirez 2018-07-12 13:15:54 +02:00
parent bf4def0fbc
commit a7c180e559

View File

@ -905,29 +905,26 @@ size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end
* will not require extra lookups. We'll fix the problem later * will not require extra lookups. We'll fix the problem later
* if we find that there is already a entry for this ID. */ * if we find that there is already a entry for this ID. */
streamNACK *nack = streamCreateNACK(consumer); streamNACK *nack = streamCreateNACK(consumer);
int retval_group_tryins = 0, retval_consumer_tryins = 0; int group_inserted =
retval_group_tryins = raxTryInsert(group->pel,buf,sizeof(buf),nack,NULL); raxTryInsert(group->pel,buf,sizeof(buf),nack,NULL);
retval_consumer_tryins = raxTryInsert(consumer->pel,buf,sizeof(buf),nack,NULL); int consumer_inserted =
raxTryInsert(consumer->pel,buf,sizeof(buf),nack,NULL);
/* Now we can check if the entry was already busy, and /* Now we can check if the entry was already busy, and
* in that case reassign the entry to the new consumer. */ * in that case reassign the entry to the new consumer,
if (retval_group_tryins == 0) { * or update it if the consumer is the same as before. */
streamNACK *oldnack = raxFind(group->pel,buf,sizeof(buf)); if (group_inserted == 0) {
serverAssert(oldnack != raxNotFound); streamFreeNACK(nack);
if (retval_consumer_tryins == 0) { nack = raxFind(group->pel,buf,sizeof(buf));
/* In this case, only need to update old nack serverAssert(nack != raxNotFound);
* delivery_time and delivery_count. */ raxRemove(nack->consumer->pel,buf,sizeof(buf),NULL);
oldnack->delivery_time = mstime(); /* Update the consumer and idle time. */
oldnack->delivery_count++; nack->consumer = consumer;
streamFreeNACK(nack); nack->delivery_time = mstime();
} else { nack->delivery_count++;
/* remove the old nack from original consumer */ /* Add the entry in the new consumer local PEL. */
raxRemove(oldnack->consumer->pel,buf,sizeof(buf),NULL); raxInsert(consumer->pel,buf,sizeof(buf),nack,NULL);
/* Overwrite nack in group pel. */ } else if (group_inserted == 1 && consumer_inserted == 0) {
raxInsert(group->pel,buf,sizeof(buf),nack,NULL);
streamFreeNACK(oldnack);
}
} else if (retval_group_tryins == 1 && retval_consumer_tryins == 0) {
serverPanic("NACK half-created. Should not be possible."); serverPanic("NACK half-created. Should not be possible.");
} }