Skip to content

Commit b7232d1

Browse files
committed
Workaround confluent-kafka-go bug #463
1 parent 2bfa34b commit b7232d1

File tree

1 file changed

+15
-1
lines changed

1 file changed

+15
-1
lines changed

neli.go

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ type neli struct {
4646
barrier Barrier
4747
state concurrent.AtomicReference
4848
stateMutex sync.Mutex
49+
deliveryHandlerDone chan int
4950
}
5051

5152
// State of the Neli instance.
@@ -83,6 +84,7 @@ func New(config Config, barrier ...Barrier) (Neli, error) {
8384
barrier: barrierArg,
8485
pollDeadline: concurrent.NewDeadline(*config.MinPollInterval),
8586
state: concurrent.NewAtomicReference(Live),
87+
deliveryHandlerDone: make(chan int),
8688
}
8789

8890
consumerConfigs := copyKafkaConfig(n.config.KafkaConfig)
@@ -134,6 +136,8 @@ func New(config Config, barrier ...Barrier) (Neli, error) {
134136
}
135137
n.producer = p
136138
go func() {
139+
defer close(n.deliveryHandlerDone)
140+
137141
for event := range p.Events() {
138142
switch e := event.(type) {
139143
case *kafka.Message:
@@ -360,7 +364,17 @@ func (n *neli) Close() error {
360364
defer n.state.Set(Closed)
361365

362366
n.state.Set(Closing)
363-
n.producer.Close()
367+
defer func() {
368+
<-n.deliveryHandlerDone
369+
}()
370+
defer func() {
371+
go func() {
372+
// A bug in confluent-kafka-go (#463) occasionally causes an indefinite syscall hang in Close(), after it closes
373+
// the Events channel. So we delegate this to a separate goroutine — better an orphaned goroutine than a
374+
// frozen harvester. (The rest of the battery will still unwind normally.)
375+
n.producer.Close()
376+
}()
377+
}()
364378
return n.consumer.Close()
365379
}
366380

0 commit comments

Comments
 (0)