Streams: fix xreadgroup crash after xgroup SETID is sent.

For issue #5111.
This commit is contained in:
dejun.xdj 2018-07-10 16:26:13 +08:00
parent 7239e9ca5f
commit 3f8a3efe5f

View File

@ -905,24 +905,29 @@ size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end
* will not require extra lookups. We'll fix the problem later * will not require extra lookups. We'll fix the problem later
* if we find that there is already a entry for this ID. */ * if we find that there is already a entry for this ID. */
streamNACK *nack = streamCreateNACK(consumer); streamNACK *nack = streamCreateNACK(consumer);
int retval = 0; int retval_group_tryins = 0, retval_consumer_tryins = 0;
retval += raxTryInsert(group->pel,buf,sizeof(buf),nack,NULL); retval_group_tryins = raxTryInsert(group->pel,buf,sizeof(buf),nack,NULL);
retval += raxTryInsert(consumer->pel,buf,sizeof(buf),nack,NULL); retval_consumer_tryins = raxTryInsert(consumer->pel,buf,sizeof(buf),nack,NULL);
/* Now we can check if the entry was already busy, and /* Now we can check if the entry was already busy, and
* in that case reassign the entry to the new consumer. */ * in that case reassign the entry to the new consumer. */
if (retval == 0) { if (retval_group_tryins == 0) {
streamFreeNACK(nack); streamNACK *oldnack = raxFind(group->pel,buf,sizeof(buf));
nack = raxFind(group->pel,buf,sizeof(buf)); serverAssert(oldnack != raxNotFound);
serverAssert(nack != raxNotFound); if (retval_consumer_tryins == 0) {
raxRemove(nack->consumer->pel,buf,sizeof(buf),NULL); /* In this case, only need to update old nack
/* Update the consumer and idle time. */ * delivery_time and delivery_count. */
nack->consumer = consumer; oldnack->delivery_time = mstime();
nack->delivery_time = mstime(); oldnack->delivery_count++;
nack->delivery_count++; streamFreeNACK(nack);
/* Add the entry in the new consumer local PEL. */ } else {
raxInsert(consumer->pel,buf,sizeof(buf),nack,NULL); /* remove the old nack from original consumer */
} else if (retval == 1) { raxRemove(oldnack->consumer->pel,buf,sizeof(buf),NULL);
/* Overwrite nack in group pel. */
raxInsert(group->pel,buf,sizeof(buf),nack,NULL);
streamFreeNACK(oldnack);
}
} else if (retval_group_tryins == 1 && retval_consumer_tryins == 0) {
serverPanic("NACK half-created. Should not be possible."); serverPanic("NACK half-created. Should not be possible.");
} }