@@ -17,15 +17,14 @@ package za.co.absa.hyperdrive.ingestor.implementation.transformer.deduplicate.ka
1717
1818import java .time .Duration
1919import java .util .{Properties , UUID }
20-
2120import za .co .absa .hyperdrive .ingestor .api .utils .ConfigUtils
2221import za .co .absa .hyperdrive .ingestor .implementation .transformer .avro .confluent .{ConfluentAvroDecodingTransformer , ConfluentAvroEncodingTransformer }
2322import za .co .absa .hyperdrive .ingestor .implementation .utils .KafkaUtil
24-
2523import org .apache .avro .generic .GenericRecord
2624import org .apache .commons .configuration2 .Configuration
2725import org .apache .hadoop .fs .Path
2826import org .apache .kafka .clients .consumer .{ConsumerConfig , ConsumerRecord , KafkaConsumer }
27+ import org .apache .kafka .common .TopicPartition
2928import org .apache .logging .log4j .LogManager
3029import org .apache .spark .sql .DataFrame
3130import org .apache .spark .sql .execution .streaming .{CommitLog , OffsetSeqLog }
@@ -74,9 +73,14 @@ private[transformer] class DeduplicateKafkaSinkTransformer(
7473 implicit val kafkaConsumerTimeoutImpl : Duration = kafkaConsumerTimeout
7574 val sourceConsumer = createConsumer(readerBrokers, readerExtraOptions, readerSchemaRegistryUrl)
7675 val latestCommittedOffsets = KafkaUtil .getLatestCommittedOffset(offsetLog, commitLog)
76+ logCommittedOffsets(latestCommittedOffsets)
77+
7778 KafkaUtil .seekToOffsetsOrBeginning(sourceConsumer, readerTopic, latestCommittedOffsets)
79+ logCurrentPositions(sourceConsumer)
7880
7981 val latestOffsetsOpt = KafkaUtil .getLatestOffset(offsetLog)
82+ logOffsets(latestOffsetsOpt)
83+
8084 val sourceRecords = latestOffsetsOpt.map(latestOffset => consumeAndClose(sourceConsumer,
8185 consumer => KafkaUtil .getMessagesAtLeastToOffset(consumer, latestOffset))).getOrElse(Seq ())
8286 val sourceIds = sourceRecords.map(extractIdFieldsFromRecord(_, sourceIdColumnNames))
@@ -86,14 +90,51 @@ private[transformer] class DeduplicateKafkaSinkTransformer(
8690 val recordsPerPartition = sinkTopicPartitions.map(p => p -> sourceRecords.size.toLong).toMap
8791 val latestSinkRecords = consumeAndClose(sinkConsumer, consumer =>
8892 KafkaUtil .getAtLeastNLatestRecordsFromPartition(consumer, recordsPerPartition))
89- val publishedIds = latestSinkRecords.map(extractIdFieldsFromRecord(_, destinationIdColumnNames) )
93+ logConsumedSinkRecords(latestSinkRecords )
9094
95+ val publishedIds = latestSinkRecords.map(extractIdFieldsFromRecord(_, destinationIdColumnNames))
9196 val duplicatedIds = sourceIds.intersect(publishedIds)
97+ logDuplicatedIds(duplicatedIds)
9298 val duplicatedIdsLit = duplicatedIds.map(duplicatedId => struct(duplicatedId.map(lit): _* ))
9399 val idColumns = sourceIdColumnNames.map(col)
94100 dataFrame.filter(not(struct(idColumns : _* ).isInCollection(duplicatedIdsLit)))
95101 }
96102
103+ private def logCommittedOffsets (offsets : Option [Map [TopicPartition , Long ]]): Unit =
104+ logger.info(s " Latest committed source offsets by partition for ${readerTopic}: { ${offsetsToString(offsets)} } " )
105+
106+ private def logOffsets (offsets : Option [Map [TopicPartition , Long ]]): Unit =
107+ logger.info(s " Latest source offsets by partition for ${readerTopic}: { ${offsetsToString(offsets)} } " )
108+
109+ private def offsetsToString (offsets : Option [Map [TopicPartition , Long ]]) = {
110+ offsets.map(_.toSeq
111+ .sortBy{ case (tp, _) => tp.partition()}
112+ .map{ case (tp, offset) => s " ${tp.partition()}: $offset" }.reduce(_ + " , " + _)).getOrElse(" -" )
113+ }
114+
115+ private def logCurrentPositions (consumer : KafkaConsumer [GenericRecord , GenericRecord ]): Unit = {
116+ val sourcePartitions = KafkaUtil .getTopicPartitions(consumer, readerTopic)
117+ val currentPositions = sourcePartitions
118+ .sortBy(_.partition())
119+ .map { tp => s " ${tp.partition()}: ${consumer.position(tp)}" }.reduce(_ + " , " + _)
120+ logger.info(s " Reset source offsets by partition to { ${currentPositions} } " )
121+ }
122+
123+ private def logConsumedSinkRecords (latestSinkRecords : Seq [ConsumerRecord [GenericRecord , GenericRecord ]]): Unit = {
124+ val offsetsByPartition = latestSinkRecords.map(r => r.partition() -> r.offset())
125+ .groupBy(_._1)
126+ .mapValues(_.map(_._2))
127+ .toSeq
128+ .sortBy(_._1)
129+ val firstOffsets = offsetsByPartition.map { case (partition, offsets) => s " $partition: ${offsets.take(3 )}" }.reduce(_ + " , " + _)
130+ val lastOffsets = offsetsByPartition.map { case (partition, offsets) => s " $partition: ${offsets.takeRight(3 )}" }.reduce(_ + " , " + _)
131+ logger.info(s " Consumed ${latestSinkRecords.size} sink records. First three offsets by partition: { ${firstOffsets} }. Last three offsets: { ${lastOffsets} } " )
132+ }
133+
134+ private def logDuplicatedIds (duplicatedIds : Seq [Seq [Any ]]): Unit = {
135+ logger.info(s " Found ${duplicatedIds.size} duplicated ids. First three: ${duplicatedIds.take(3 )}. " )
136+ }
137+
97138 private def extractIdFieldsFromRecord (record : ConsumerRecord [GenericRecord , GenericRecord ], idColumnNames : Seq [String ]): Seq [Any ] = {
98139 idColumnNames.map(idColumnName =>
99140 AvroUtil .getFromConsumerRecord(record, idColumnName)
0 commit comments