Skip to content

Commit 7922f2f

Browse files
authored
fix cause of StackOverflowException when initializing RebalanceListener with thousands of topic-partitions (#477)
* fix cause of `StackOverflowException` when initializing `RebalanceListener` with thousands of topic-partitions * use `sequence_` instead of `traverse_(identity)`
1 parent 9d41b51 commit 7922f2f

File tree

1 file changed

+29
-25
lines changed

1 file changed

+29
-25
lines changed

skafka/src/main/scala/com/evolutiongaming/skafka/consumer/ConsumerConverters.scala

Lines changed: 29 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import org.apache.kafka.common.record.{TimestampType => TimestampTypeJ}
2626
import org.apache.kafka.common.{TopicPartition => TopicPartitionJ}
2727

2828
import scala.jdk.CollectionConverters._
29+
import scala.util.Try
2930

3031
object ConsumerConverters {
3132

@@ -54,22 +55,27 @@ object ConsumerConverters {
5455
def onPartitions(
5556
partitions: CollectionJ[TopicPartitionJ],
5657
call: Nes[TopicPartition] => F[Unit]
57-
) = {
58-
serialListeners
59-
.listener {
60-
partitions
61-
.asScala
62-
.toList
63-
.traverse { _.asScala[F] }
64-
.flatMap { partitions =>
65-
partitions
66-
.toSortedSet
67-
.toNes
68-
.foldMapM(call)
69-
}
58+
): Unit = {
59+
// The whole callback runs partly on the consumer thread, and partly asynchronously.
60+
// First, on the consumer thread, we pre-process the partition list, and register
61+
// our listener in `serialListeners`.
62+
// Then, asynchronously, we execute the registered listener.
63+
partitions.asScala
64+
.toList
65+
// Note: `traverse(_.asScala[F])` may cause StackOverflowError later, when executed
66+
// synchronously with `.toTry`. Traversing into `Try` directly avoids this.
67+
.traverse(_.asScala[Try])
68+
.flatMap { topicPartitions =>
69+
topicPartitions
70+
.toSortedSet
71+
.toNes
72+
.traverse { partitions => serialListeners.listener(call(partitions)) }
73+
.toTry
7074
}
71-
.toTry
75+
// If we fail to register the listener, fail right now on the consumer thread.
7276
.get
77+
// Schedule the actual callback for async execution with `.toFuture`.
78+
.sequence_
7379
.toFuture
7480
()
7581
}
@@ -101,20 +107,18 @@ object ConsumerConverters {
101107
): Unit = {
102108
// If you're thinking about deriving ToTry timeout based on ConsumerConfig.maxPollInterval
103109
// please have a look on https://github.com/evolution-gaming/skafka/issues/125
104-
val result = partitions
105-
.asScala
110+
partitions.asScala
106111
.toList
107-
.traverse { _.asScala[F] }
108-
.map { partitions =>
109-
partitions
110-
.toSortedSet
112+
// Note: `traverse(_.asScala[F])` may cause StackOverflowError later, when executed
113+
// synchronously with `.toTry`. Traversing into `Try` directly avoids this.
114+
.traverse { _.asScala[Try] }
115+
.flatMap { topicPartitions =>
116+
topicPartitions.toSortedSet
111117
.toNes
118+
.traverse_ { partitions => call(partitions).run(RebalanceConsumer(consumer)) }
112119
}
113-
.toTry
114-
.flatMap {
115-
_.foldMapM { partitions => call(partitions).run(RebalanceConsumer(consumer)) }
116-
}
117-
result.fold(throw _, _ => ())
120+
// If we fail to make a `call(..).run(..)`, fail right now on the consumer thread.
121+
.get
118122
}
119123

120124
new RebalanceListenerJ {

0 commit comments

Comments
 (0)