@@ -10,7 +10,6 @@ package com.snowplowanalytics.snowplow.streams.kafka.source
1010import cats .Applicative
1111import cats .effect .{Async , Resource , Sync }
1212import cats .implicits ._
13- import cats .effect .implicits ._
1413import cats .kernel .Semigroup
1514import fs2 .Stream
1615import org .typelevel .log4cats .{Logger , SelfAwareStructuredLogger }
@@ -22,6 +21,7 @@ import scala.concurrent.duration.{DurationLong, FiniteDuration}
2221// kafka
2322import fs2 .kafka ._
2423import org .apache .kafka .common .TopicPartition
24+ import org .apache .kafka .clients .consumer .OffsetAndMetadata
2525
2626// snowplow
2727import com .snowplowanalytics .snowplow .streams .SourceAndAck
@@ -35,57 +35,57 @@ private[kafka] object KafkaSource {
3535 def build [F [_]: Async ](
3636 config : KafkaSourceConfig ,
3737 authHandlerClass : String
38- ): F [SourceAndAck [F ]] =
39- LowLevelSource .toSourceAndAck(lowLevel(config, authHandlerClass))
38+ ): Resource [F , SourceAndAck [F ]] =
39+ for {
40+ kafkaConsumer <- KafkaConsumer .resource(consumerSettings[F ](config, authHandlerClass))
41+ result <- Resource .eval(LowLevelSource .toSourceAndAck(lowLevel(config, kafkaConsumer)))
42+ } yield result
4043
4144 private def lowLevel [F [_]: Async ](
4245 config : KafkaSourceConfig ,
43- authHandlerClass : String
44- ): LowLevelSource [F , KafkaCheckpoints [ F ] ] =
45- new LowLevelSource [F , KafkaCheckpoints [ F ] ] {
46- def checkpointer : Checkpointer [F , KafkaCheckpoints [ F ]] = kafkaCheckpointer
46+ kafkaConsumer : KafkaConsumer [ F , Array [ Byte ], ByteBuffer ]
47+ ): LowLevelSource [F , KafkaCheckpoints ] =
48+ new LowLevelSource [F , KafkaCheckpoints ] {
49+ def checkpointer : Checkpointer [F , KafkaCheckpoints ] = kafkaCheckpointer(kafkaConsumer)
4750
48- def stream : Stream [F , Stream [F , Option [LowLevelEvents [KafkaCheckpoints [ F ] ]]]] =
49- kafkaStream(config, authHandlerClass )
51+ def stream : Stream [F , Stream [F , Option [LowLevelEvents [KafkaCheckpoints ]]]] =
52+ kafkaStream(config, kafkaConsumer )
5053
5154 def debounceCheckpoints : FiniteDuration = config.debounceCommitOffsets
5255 }
5356
54- case class OffsetAndCommit [F [_]](offset : Long , commit : F [Unit ])
55- case class KafkaCheckpoints [F [_]](byPartition : Map [Int , OffsetAndCommit [F ]])
57+ type KafkaCheckpoints = Map [TopicPartition , OffsetAndMetadata ]
5658
57- private implicit def offsetAndCommitSemigroup [ F [_]] : Semigroup [OffsetAndCommit [ F ]] = new Semigroup [OffsetAndCommit [ F ] ] {
58- def combine (x : OffsetAndCommit [ F ] , y : OffsetAndCommit [ F ] ): OffsetAndCommit [ F ] =
59+ private implicit def offsetAndMetadataSemigroup : Semigroup [OffsetAndMetadata ] = new Semigroup [OffsetAndMetadata ] {
60+ def combine (x : OffsetAndMetadata , y : OffsetAndMetadata ): OffsetAndMetadata =
5961 if (x.offset > y.offset) x else y
6062 }
6163
62- private def kafkaCheckpointer [F [_]: Async ]: Checkpointer [F , KafkaCheckpoints [F ]] = new Checkpointer [F , KafkaCheckpoints [F ]] {
63- def combine (x : KafkaCheckpoints [F ], y : KafkaCheckpoints [F ]): KafkaCheckpoints [F ] =
64- KafkaCheckpoints (x.byPartition |+| y.byPartition)
64+ private def kafkaCheckpointer [F [_]: Async ](kafkaConsumer : KafkaConsumer [F , Array [Byte ], ByteBuffer ]): Checkpointer [F , KafkaCheckpoints ] =
65+ new Checkpointer [F , KafkaCheckpoints ] {
66+ def combine (x : KafkaCheckpoints , y : KafkaCheckpoints ): KafkaCheckpoints =
67+ x |+| y
6568
66- val empty : KafkaCheckpoints [F ] = KafkaCheckpoints (Map .empty)
67- def ack (c : KafkaCheckpoints [F ]): F [Unit ] = c.byPartition.values.toList.parTraverse(_.commit).void
68- def nack (c : KafkaCheckpoints [F ]): F [Unit ] = Applicative [F ].unit
69- }
69+ val empty : KafkaCheckpoints = Map .empty
70+ def ack (c : KafkaCheckpoints ): F [Unit ] =
71+ kafkaConsumer.commitSync(c)
72+ def nack (c : KafkaCheckpoints ): F [Unit ] = Applicative [F ].unit
73+ }
7074
7175 private def kafkaStream [F [_]: Async ](
7276 config : KafkaSourceConfig ,
73- authHandlerClass : String
74- ): Stream [F , Stream [F , Option [LowLevelEvents [KafkaCheckpoints [F ]]]]] =
75- KafkaConsumer
76- .stream(consumerSettings[F ](config, authHandlerClass))
77- .evalTap(_.subscribeTo(config.topicName))
78- .flatMap { consumer =>
79- consumer.partitionsMapStream
80- .evalMapFilter(logWhenNoPartitions[F ])
81- .map(joinPartitions[F ](_))
82- }
77+ kafkaConsumer : KafkaConsumer [F , Array [Byte ], ByteBuffer ]
78+ ): Stream [F , Stream [F , Option [LowLevelEvents [KafkaCheckpoints ]]]] =
79+ Stream .eval(kafkaConsumer.subscribeTo(config.topicName)) >>
80+ kafkaConsumer.partitionsMapStream
81+ .evalMapFilter(logWhenNoPartitions[F ])
82+ .map(joinPartitions[F ](_))
8383
8484 private type PartitionedStreams [F [_]] = Map [TopicPartition , Stream [F , CommittableConsumerRecord [F , Array [Byte ], ByteBuffer ]]]
8585
8686 private def joinPartitions [F [_]: Async ](
8787 partitioned : PartitionedStreams [F ]
88- ): Stream [F , Option [LowLevelEvents [KafkaCheckpoints [ F ] ]]] = {
88+ ): Stream [F , Option [LowLevelEvents [KafkaCheckpoints ]]] = {
8989 val streams = partitioned.toList.map { case (topicPartition, stream) =>
9090 stream.chunks
9191 .flatMap { chunk =>
@@ -94,7 +94,7 @@ private[kafka] object KafkaSource {
9494 val events = chunk.map {
9595 _.record.value
9696 }
97- val ack = KafkaCheckpoints ( Map (topicPartition.partition -> OffsetAndCommit ( last.record. offset, last.offset.commit)) )
97+ val ack = Map (topicPartition -> last.offset.offsetAndMetadata )
9898 val timestamps = chunk.iterator.flatMap { ccr =>
9999 val ts = ccr.record.timestamp
100100 ts.logAppendTime.orElse(ts.createTime).orElse(ts.unknownTime)
0 commit comments