@@ -19,7 +19,7 @@ import java.time.Duration
1919import java .util .{Properties , UUID }
2020import za .co .absa .hyperdrive .ingestor .api .utils .ConfigUtils
2121import za .co .absa .hyperdrive .ingestor .implementation .transformer .avro .confluent .{ConfluentAvroDecodingTransformer , ConfluentAvroEncodingTransformer }
22- import za .co .absa .hyperdrive .ingestor .implementation .utils .KafkaUtil
22+ import za .co .absa .hyperdrive .ingestor .implementation .utils .{ AvroUtil , KafkaUtil , SchemaRegistryConfigUtil }
2323import org .apache .avro .generic .GenericRecord
2424import org .apache .commons .configuration2 .Configuration
2525import org .apache .hadoop .fs .Path
@@ -30,23 +30,22 @@ import org.apache.spark.sql.DataFrame
3030import org .apache .spark .sql .execution .streaming .{CommitLog , OffsetSeqLog }
3131import org .apache .spark .sql .functions .{col , lit , not , struct }
3232import za .co .absa .hyperdrive .ingestor .api .transformer .{StreamTransformer , StreamTransformerFactory }
33- import za .co .absa .hyperdrive .ingestor .api .utils .ConfigUtils .{getOrThrow , getPropertySubset , getSeqOrThrow }
33+ import za .co .absa .hyperdrive .ingestor .api .utils .ConfigUtils .{filterKeysContaining , getOrThrow , getPropertySubset , getSeqOrThrow }
3434import za .co .absa .hyperdrive .ingestor .api .utils .StreamWriterUtil
3535import za .co .absa .hyperdrive .ingestor .api .writer .StreamWriterCommonAttributes
3636import za .co .absa .hyperdrive .ingestor .implementation .reader .kafka .KafkaStreamReader
37- import za .co .absa .hyperdrive .ingestor .implementation .utils .AvroUtil
3837import za .co .absa .hyperdrive .ingestor .implementation .writer .kafka .KafkaStreamWriter
3938
4039
4140private [transformer] class DeduplicateKafkaSinkTransformer (
4241 val readerTopic : String ,
4342 val readerBrokers : String ,
4443 val readerExtraOptions : Map [String , String ],
45- val readerSchemaRegistryUrl : String ,
44+ val decoderSchemaRegistryConfig : Map [ String , String ] ,
4645 val writerTopic : String ,
4746 val writerBrokers : String ,
4847 val writerExtraOptions : Map [String , String ],
49- val writerSchemaRegistryUrl : String ,
48+ val encoderSchemaRegistryConfig : Map [ String , String ] ,
5049 val checkpointLocation : String ,
5150 val sourceIdColumnNames : Seq [String ],
5251 val destinationIdColumnNames : Seq [String ],
@@ -71,7 +70,7 @@ private[transformer] class DeduplicateKafkaSinkTransformer(
7170 private def deduplicateDataFrame (dataFrame : DataFrame , offsetLog : OffsetSeqLog , commitLog : CommitLog ) = {
7271 logger.info(" Deduplicate rows after retry" )
7372 implicit val kafkaConsumerTimeoutImpl : Duration = kafkaConsumerTimeout
74- val sourceConsumer = createConsumer(readerBrokers, readerExtraOptions, readerSchemaRegistryUrl )
73+ val sourceConsumer = createConsumer(readerBrokers, readerExtraOptions, decoderSchemaRegistryConfig )
7574 val latestCommittedOffsets = KafkaUtil .getLatestCommittedOffset(offsetLog, commitLog)
7675 logCommittedOffsets(latestCommittedOffsets)
7776
@@ -85,7 +84,7 @@ private[transformer] class DeduplicateKafkaSinkTransformer(
8584 consumer => KafkaUtil .getMessagesAtLeastToOffset(consumer, latestOffset))).getOrElse(Seq ())
8685 val sourceIds = sourceRecords.map(extractIdFieldsFromRecord(_, sourceIdColumnNames))
8786
88- val sinkConsumer = createConsumer(writerBrokers, writerExtraOptions, writerSchemaRegistryUrl )
87+ val sinkConsumer = createConsumer(writerBrokers, writerExtraOptions, encoderSchemaRegistryConfig )
8988 val sinkTopicPartitions = KafkaUtil .getTopicPartitions(sinkConsumer, writerTopic)
9089 val recordsPerPartition = sinkTopicPartitions.map(p => p -> sourceRecords.size.toLong).toMap
9190 val latestSinkRecords = consumeAndClose(sinkConsumer, consumer =>
@@ -107,16 +106,20 @@ private[transformer] class DeduplicateKafkaSinkTransformer(
107106 logger.info(s " Latest source offsets by partition for ${readerTopic}: { ${offsetsToString(offsets)} } " )
108107
109108 private def offsetsToString (offsets : Option [Map [TopicPartition , Long ]]) = {
110- offsets.map (_.toSeq
109+ offsets.flatMap (_.toSeq
111110 .sortBy{ case (tp, _) => tp.partition()}
112- .map{ case (tp, offset) => s " ${tp.partition()}: $offset" }.reduce(_ + " , " + _)).getOrElse(" -" )
111+ .map{ case (tp, offset) => s " ${tp.partition()}: $offset" }
112+ .reduceOption(_ + " , " + _))
113+ .getOrElse(" -" )
113114 }
114115
115116 private def logCurrentPositions (consumer : KafkaConsumer [GenericRecord , GenericRecord ]): Unit = {
116117 val sourcePartitions = KafkaUtil .getTopicPartitions(consumer, readerTopic)
117118 val currentPositions = sourcePartitions
118119 .sortBy(_.partition())
119- .map { tp => s " ${tp.partition()}: ${consumer.position(tp)}" }.reduce(_ + " , " + _)
120+ .map { tp => s " ${tp.partition()}: ${consumer.position(tp)}" }
121+ .reduceOption(_ + " , " + _)
122+ .getOrElse(" No positions available." )
120123 logger.info(s " Reset source offsets by partition to { ${currentPositions} } " )
121124 }
122125
@@ -126,8 +129,8 @@ private[transformer] class DeduplicateKafkaSinkTransformer(
126129 .mapValues(_.map(_._2))
127130 .toSeq
128131 .sortBy(_._1)
129- val firstOffsets = offsetsByPartition.map { case (partition, offsets) => s " $partition: ${offsets.take(3 )}" }.reduce (_ + " , " + _)
130- val lastOffsets = offsetsByPartition.map { case (partition, offsets) => s " $partition: ${offsets.takeRight(3 )}" }.reduce (_ + " , " + _)
132+ val firstOffsets = offsetsByPartition.map { case (partition, offsets) => s " $partition: ${offsets.take(3 )}" }.reduceOption (_ + " , " + _).getOrElse( " No offsets available " )
133+ val lastOffsets = offsetsByPartition.map { case (partition, offsets) => s " $partition: ${offsets.takeRight(3 )}" }.reduceOption (_ + " , " + _).getOrElse( " No offsets available " )
131134 logger.info(s " Consumed ${latestSinkRecords.size} sink records. First three offsets by partition: { ${firstOffsets} }. Last three offsets: { ${lastOffsets} } " )
132135 }
133136
@@ -153,38 +156,45 @@ private[transformer] class DeduplicateKafkaSinkTransformer(
153156 }
154157 }
155158
156- private def createConsumer (brokers : String , extraOptions : Map [String , String ], schemaRegistryUrl : String ) = {
159+ private def createConsumer (brokers : String , extraOptions : Map [String , String ], decoderSchemaRegistryConfig : Map [ String , String ] ) = {
157160 val props = new Properties ()
158161 props.put(ConsumerConfig .BOOTSTRAP_SERVERS_CONFIG , brokers)
159162 props.put(ConsumerConfig .CLIENT_ID_CONFIG , s " hyperdrive_consumer_ ${UUID .randomUUID().toString}" )
160163 props.put(ConsumerConfig .GROUP_ID_CONFIG , s " hyperdrive_group_ ${UUID .randomUUID().toString}" )
161164 extraOptions.foreach {
162165 case (key, value) => props.put(key, value)
163166 }
164- props.put(" schema.registry.url" , schemaRegistryUrl)
167+ decoderSchemaRegistryConfig.foreach {
168+ case (key, value) => props.put(key, value)
169+ }
165170 props.put(ConsumerConfig .KEY_DESERIALIZER_CLASS_CONFIG , " io.confluent.kafka.serializers.KafkaAvroDeserializer" )
166171 props.put(ConsumerConfig .VALUE_DESERIALIZER_CLASS_CONFIG , " io.confluent.kafka.serializers.KafkaAvroDeserializer" )
167172 new KafkaConsumer [GenericRecord , GenericRecord ](props)
168173 }
169174}
170175
171176object DeduplicateKafkaSinkTransformer extends StreamTransformerFactory with DeduplicateKafkaSinkTransformerAttributes {
177+ private val logger = LogManager .getLogger()
172178
173179 private val DefaultKafkaConsumerTimeoutSeconds = 120L
174180
175- private val readerSchemaRegistryUrlKey = " deduplicateKafkaSinkTransformer.readerSchemaRegistryUrl "
176- private val writerSchemaRegistryUrlKey = " deduplicateKafkaSinkTransformer.writerSchemaRegistryUrl "
181+ private val localDecoderPrefix = " deduplicateKafkaSinkTransformer.decoder "
182+ private val localEncoderPrefix = " deduplicateKafkaSinkTransformer.encoder "
177183
178184 override def apply (config : Configuration ): StreamTransformer = {
179185 val readerTopic = getOrThrow(KafkaStreamReader .KEY_TOPIC , config)
180186 val readerBrokers = getOrThrow(KafkaStreamReader .KEY_BROKERS , config)
181- val readerExtraOptions = KafkaStreamReader .getExtraConfigurationPrefix.map(getPropertySubset(config, _)).getOrElse(Map ())
182- val readerSchemaRegistryUrl = getOrThrow(readerSchemaRegistryUrlKey, config)
187+ val readerExtraOptions = KafkaStreamReader .getExtraConfigurationPrefix
188+ .map(prefix => getPropertySubset(config, s " ${prefix}.kafka " ))
189+ .getOrElse(Map ())
190+ val decoderSchemaRegistryConfig = SchemaRegistryConfigUtil .getSchemaRegistryConfig(config.subset(localDecoderPrefix))
183191
184192 val writerTopic = getOrThrow(KafkaStreamWriter .KEY_TOPIC , config)
185193 val writerBrokers = getOrThrow(KafkaStreamWriter .KEY_BROKERS , config)
186- val writerExtraOptions = KafkaStreamWriter .getExtraConfigurationPrefix.map(getPropertySubset(config, _)).getOrElse(Map ())
187- val writerSchemaRegistryUrl = getOrThrow(writerSchemaRegistryUrlKey, config)
194+ val writerExtraOptions = KafkaStreamWriter .getExtraConfigurationPrefix
195+ .map(prefix => getPropertySubset(config, s " ${prefix}.kafka " ))
196+ .getOrElse(Map ())
197+ val encoderSchemaRegistryConfig = SchemaRegistryConfigUtil .getSchemaRegistryConfig(config.subset(localEncoderPrefix))
188198
189199 val checkpointLocation = StreamWriterUtil .getCheckpointLocation(config)
190200
@@ -196,9 +206,22 @@ object DeduplicateKafkaSinkTransformer extends StreamTransformerFactory with Ded
196206 }
197207
198208 val kafkaConsumerTimeout = Duration .ofSeconds(config.getLong(KafkaConsumerTimeout , DefaultKafkaConsumerTimeoutSeconds ))
209+ logger.info(s " Going to create DeduplicateKafkaSinkTransformer with: readerTopic= $readerTopic, " +
210+ s " readerBrokers= $readerBrokers, " +
211+ s " readerExtraOptions= ${filterKeysContaining(readerExtraOptions, exclusionToken = " password" )}, " +
212+ s " decoderSchemaRegistryConfig= ${filterKeysContaining(decoderSchemaRegistryConfig, " basic.auth" )}, " +
213+ s " writerTopic= $writerTopic, " +
214+ s " writerBrokers= $writerBrokers, " +
215+ s " writerExtraOptions= ${filterKeysContaining(writerExtraOptions, exclusionToken = " password" )}, " +
216+ s " encoderSchemaRegistryConfig= ${filterKeysContaining(encoderSchemaRegistryConfig, " basic.auth" )}, " +
217+ s " checkpointLocation= $checkpointLocation, " +
218+ s " sourceIdColumns= $sourceIdColumns, " +
219+ s " destinationIdColumns= $destinationIdColumns, " +
220+ s " kafkaConsumerTimeout= $kafkaConsumerTimeout"
221+ )
199222
200- new DeduplicateKafkaSinkTransformer (readerTopic, readerBrokers, readerExtraOptions, readerSchemaRegistryUrl ,
201- writerTopic, writerBrokers, writerExtraOptions, writerSchemaRegistryUrl ,
223+ new DeduplicateKafkaSinkTransformer (readerTopic, readerBrokers, readerExtraOptions, decoderSchemaRegistryConfig ,
224+ writerTopic, writerBrokers, writerExtraOptions, encoderSchemaRegistryConfig ,
202225 checkpointLocation, sourceIdColumns, destinationIdColumns, kafkaConsumerTimeout)
203226 }
204227
@@ -208,6 +231,7 @@ object DeduplicateKafkaSinkTransformer extends StreamTransformerFactory with Ded
208231 KafkaStreamReader .getExtraConfigurationPrefix.map(globalConfig.getKeys(_).asScala.toSeq).getOrElse(Seq ())
209232 val writerExtraOptionsKeys =
210233 KafkaStreamWriter .getExtraConfigurationPrefix.map(globalConfig.getKeys(_).asScala.toSeq).getOrElse(Seq ())
234+
211235 val keys = readerExtraOptionsKeys ++ writerExtraOptionsKeys ++
212236 Seq (
213237 KafkaStreamReader .KEY_TOPIC ,
@@ -218,24 +242,22 @@ object DeduplicateKafkaSinkTransformer extends StreamTransformerFactory with Ded
218242 )
219243 val oneToOneMappings = keys.map(e => e -> e).toMap
220244
221- val readerSchemaRegistryUrlGlobalKey = getSchemaRegistryUrlKey(globalConfig, classOf [ConfluentAvroDecodingTransformer ],
222- ConfluentAvroDecodingTransformer .KEY_SCHEMA_REGISTRY_URL )
223- val writerSchemaRegistryUrlGlobalKey = getSchemaRegistryUrlKey(globalConfig, classOf [ConfluentAvroEncodingTransformer ],
224- ConfluentAvroEncodingTransformer .KEY_SCHEMA_REGISTRY_URL )
245+ val globalDecoderPrefix = getTransformerPrefix(globalConfig, classOf [ConfluentAvroDecodingTransformer ])
246+ val globalEncoderPrefix = getTransformerPrefix(globalConfig, classOf [ConfluentAvroEncodingTransformer ])
247+ val decoderKeys = globalConfig.getKeys(globalDecoderPrefix).asScala.toSeq
248+ val encoderKeys = globalConfig.getKeys(globalEncoderPrefix).asScala.toSeq
249+ val decoderMapping = decoderKeys.map(key => key -> key.replace(globalDecoderPrefix, localDecoderPrefix)).toMap
250+ val encoderMapping = encoderKeys.map(key => key -> key.replace(globalEncoderPrefix, localEncoderPrefix)).toMap
225251
226- oneToOneMappings ++ Map (
227- readerSchemaRegistryUrlGlobalKey -> readerSchemaRegistryUrlKey,
228- writerSchemaRegistryUrlGlobalKey -> writerSchemaRegistryUrlKey
229- )
252+ oneToOneMappings ++ decoderMapping ++ encoderMapping
230253 }
231254
232- private def getSchemaRegistryUrlKey [T <: StreamTransformer ](config : Configuration , transformerClass : Class [T ], transformerKey : String ) = {
255+ private def getTransformerPrefix [T <: StreamTransformer ](config : Configuration , transformerClass : Class [T ]) = {
233256 val prefix = ConfigUtils .getTransformerPrefix(config, transformerClass).getOrElse(throw new IllegalArgumentException (
234257 s " Could not find transformer configuration for ${transformerClass.getCanonicalName}, but it is required " ))
235258
236- s " ${StreamTransformerFactory .TransformerKeyPrefix }. ${prefix}. ${transformerKey} "
259+ s " ${StreamTransformerFactory .TransformerKeyPrefix }. ${prefix}"
237260 }
238-
239261}
240262
241263
0 commit comments