Skip to content

Commit bee1c0f

Browse files
committed
Kafka source recover from RebalanceInProgressException
When a consumer group rebalances, the Kafka consumer must commit offsets before partitions are revoked. Previously we were using "Eager" rebalance revoke mode from fs2-kafka, with the following consequences: 1. Partitions were revoked immediately without waiting for the application to commit offsets. This could lead to duplicates. 2. The commit occasionally failed with a RebalanceInProgressException, if the application tried to commit offsets after the rebalance had started. This change switches to "Graceful" rebalance revoke mode. It works as follows: 1. Source waits up to session.timeout.ms for the fs2 stream to finalize. This includes committing outstanding offsets. 2. Rebalancing only proceeds after the fs2 stream has finalized, or after session.timeout.ms. This reduces the possibility of duplicates due to re-processing. 3. We catch and ignore the RebalanceInProgressException in case the downstream application cannot finalize the fs2 stream within the session timeout. This is needed e.g. for Lake Loader which is slow to finalize a window.
1 parent 6ab723e commit bee1c0f

File tree

1 file changed

+5
-1
lines changed
  • modules/kafka/src/main/scala/com/snowplowanalytics/snowplow/streams/kafka/source

1 file changed

+5
-1
lines changed

modules/kafka/src/main/scala/com/snowplowanalytics/snowplow/streams/kafka/source/KafkaSource.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import scala.concurrent.duration.{DurationLong, FiniteDuration}
2222
import fs2.kafka._
2323
import org.apache.kafka.common.TopicPartition
2424
import org.apache.kafka.clients.consumer.OffsetAndMetadata
25+
import org.apache.kafka.common.errors.RebalanceInProgressException
2526

2627
// snowplow
2728
import com.snowplowanalytics.snowplow.streams.SourceAndAck
@@ -68,7 +69,9 @@ private[kafka] object KafkaSource {
6869

6970
val empty: KafkaCheckpoints = Map.empty
7071
def ack(c: KafkaCheckpoints): F[Unit] =
71-
kafkaConsumer.commitSync(c)
72+
kafkaConsumer.commitSync(c).recoverWith { case _: RebalanceInProgressException =>
73+
Logger[F].warn("Failed to commit offsets during rebalance, offsets will be lost and events may be reprocessed")
74+
}
7275
def nack(c: KafkaCheckpoints): F[Unit] = Applicative[F].unit
7376
}
7477

@@ -145,4 +148,5 @@ private[kafka] object KafkaSource {
145148
.withProperties(config.consumerConf)
146149
.withEnableAutoCommit(false)
147150
.withCommitTimeout(config.commitTimeout)
151+
.withRebalanceRevokeMode(RebalanceRevokeMode.Graceful)
148152
}

0 commit comments

Comments
 (0)