Skip to content
This repository was archived by the owner on Oct 7, 2025. It is now read-only.

Commit c994455

Browse files
committed
Bugfix: KafkaConsumer: reconnect on timeout
1 parent fd5caa0 commit c994455

File tree

1 file changed

+8
-1
lines changed

1 file changed

+8
-1
lines changed

segments/input/kafkaconsumer/kafkaconsumer.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -261,7 +261,7 @@ func (segment *KafkaConsumer) Run(wg *sync.WaitGroup) {
261261
defer func() {
262262
handlerWg.Wait()
263263
if err = client.Close(); err != nil {
264-
log.Panic().Err(err).Msg("KafkaConsumer: Error closing Kafka client:")
264+
log.Error().Err(err).Msg("KafkaConsumer: Error closing Kafka client:")
265265
}
266266
}()
267267
go func() {
@@ -299,6 +299,13 @@ func (segment *KafkaConsumer) Run(wg *sync.WaitGroup) {
299299
handlerCancel()
300300
log.Info().Msg("KafkaConsumer: Connection Closed")
301301
return
302+
case handlerReady = <-handler.ready:
303+
if !handlerReady {
304+
log.Error().Msg("KafkaConsumer: Failed to establish connection.")
305+
handlerCancel()
306+
return
307+
}
308+
log.Info().Msg("KafkaConsumer: Reconnected and operational.")
302309
}
303310
}
304311
}

0 commit comments

Comments
 (0)