@@ -26,6 +26,7 @@ import org.apache.kafka.common.record.{TimestampType => TimestampTypeJ}
2626import org .apache .kafka .common .{TopicPartition => TopicPartitionJ }
2727
2828import scala .jdk .CollectionConverters ._
29+ import scala .util .Try
2930
3031object ConsumerConverters {
3132
@@ -54,22 +55,27 @@ object ConsumerConverters {
5455 def onPartitions (
5556 partitions : CollectionJ [TopicPartitionJ ],
5657 call : Nes [TopicPartition ] => F [Unit ]
57- ) = {
58- serialListeners
59- .listener {
60- partitions
61- .asScala
62- .toList
63- .traverse { _.asScala[F ] }
64- .flatMap { partitions =>
65- partitions
66- .toSortedSet
67- .toNes
68- .foldMapM(call)
69- }
58+ ): Unit = {
59+ // The whole callback runs partly on the consumer thread, and partly asynchronously.
60+ // First, on the consumer thread, we pre-process the partition list, and register
61+ // our listener in `serialListeners`.
62+ // Then, asynchronously, we execute the registered listener.
63+ partitions.asScala
64+ .toList
65+ // Note: `traverse(_.asScala[F])` may cause StackOverflowError later, when executed
66+ // synchronously with `.toTry`. Traversing into `Try` directly avoids this.
67+ .traverse(_.asScala[Try ])
68+ .flatMap { topicPartitions =>
69+ topicPartitions
70+ .toSortedSet
71+ .toNes
72+ .traverse { partitions => serialListeners.listener(call(partitions)) }
73+ .toTry
7074 }
71- .toTry
75+ // If we fail to register the listener, fail right now on the consumer thread.
7276 .get
77+ // Schedule the actual callback for async execution with `.toFuture`.
78+ .traverse_(asyncListener => asyncListener)
7379 .toFuture
7480 ()
7581 }
@@ -101,20 +107,18 @@ object ConsumerConverters {
101107 ): Unit = {
102108 // If you're thinking about deriving ToTry timeout based on ConsumerConfig.maxPollInterval
103109 // please have a look on https://github.com/evolution-gaming/skafka/issues/125
104- val result = partitions
105- .asScala
110+ partitions.asScala
106111 .toList
107- .traverse { _.asScala[F ] }
108- .map { partitions =>
109- partitions
110- .toSortedSet
112+ // Note: `traverse(_.asScala[F])` may cause StackOverflowError later, when executed
113+ // synchronously with `.toTry`. Traversing into `Try` directly avoids this.
114+ .traverse { _.asScala[Try ] }
115+ .flatMap { topicPartitions =>
116+ topicPartitions.toSortedSet
111117 .toNes
118+ .traverse_ { partitions => call(partitions).run(RebalanceConsumer (consumer)) }
112119 }
113- .toTry
114- .flatMap {
115- _.foldMapM { partitions => call(partitions).run(RebalanceConsumer (consumer)) }
116- }
117- result.fold(throw _, _ => ())
120+ // If we fail to make a `call(..).run(..)`, fail right now on the consumer thread.
121+ .get
118122 }
119123
120124 new RebalanceListenerJ {
0 commit comments