Skip to content

Commit 6843478

Browse files
authored
fix: panic on close with full internal queue (#412)
1 parent 27bdca5 commit 6843478

File tree

3 files changed

+38
-3
lines changed

3 files changed

+38
-3
lines changed

pkg/stream/consumer.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -342,8 +342,14 @@ func (consumer *Consumer) close(reason Event) {
342342
}
343343

344344
if consumer.response.data != nil {
345+
// drain the queue to avoid race condition
346+
for len(consumer.chunkForConsumer) > 0 {
347+
select {
348+
case <-consumer.chunkForConsumer:
349+
default:
350+
}
351+
}
345352
close(consumer.chunkForConsumer)
346-
347353
close(consumer.response.data)
348354
consumer.response.data = nil
349355
}

pkg/stream/consumer_test.go

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -472,6 +472,35 @@ var _ = Describe("Streaming Consumers", func() {
472472

473473
})
474474

475+
It("Not panics on close when the internal queue is full", func() {
476+
producer, err := env.NewProducer(streamName, nil)
477+
Expect(err).NotTo(HaveOccurred())
478+
err = producer.BatchSend(CreateArrayMessagesForTesting(10_000))
479+
Expect(err).NotTo(HaveOccurred())
480+
481+
defer func(producer *Producer) {
482+
err := producer.Close()
483+
Expect(err).NotTo(HaveOccurred())
484+
}(producer)
485+
486+
const credits = 2
487+
consumer, err := env.NewConsumer(streamName,
488+
func(_ ConsumerContext, _ *amqp.Message) {
489+
// it usually happens with slow consumer that fulfill the internal queue
490+
time.Sleep(time.Hour)
491+
}, NewConsumerOptions().
492+
SetInitialCredits(credits).
493+
SetOffset(OffsetSpecification{}.First()).
494+
SetConsumerName("consumer_test"))
495+
Expect(err).NotTo(HaveOccurred())
496+
497+
// waiting the internal queue to fulfill
498+
time.Sleep(time.Second)
499+
Expect(len(consumer.chunkForConsumer)).To(Equal(credits))
500+
501+
Expect(consumer.Close()).NotTo(HaveOccurred())
502+
})
503+
475504
It("message Properties", func() {
476505
producer, err := env.NewProducer(streamName, nil)
477506

pkg/stream/server_frame.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -404,13 +404,13 @@ func (c *Client) handleDeliver(r *bufio.Reader) {
404404
}
405405
}
406406
}
407-
// request a credit for the next chunk
408-
c.credit(subscriptionId, 1)
409407

410408
// dispatch the messages with offset to the consumer
411409
chunk.offsetMessages = batchConsumingMessages
412410
if consumer.getStatus() == open {
413411
consumer.chunkForConsumer <- chunk
412+
// request a credit for the next chunk
413+
c.credit(subscriptionId, 1)
414414
} else {
415415
logs.LogDebug("The consumer %s for the stream %s is closed during the chunk dispatching. "+
416416
"Messages won't dispatched", consumer.GetName(), consumer.GetStreamName())

0 commit comments

Comments
 (0)