@@ -18,38 +18,34 @@ package za.co.absa.hyperdrive.ingestor.implementation.transformer.avro.confluent
1818import org .apache .commons .configuration2 .Configuration
1919import org .apache .logging .log4j .LogManager
2020import org .apache .spark .sql .DataFrame
21- import org .apache .spark .sql .functions .{col , struct }
22- import za .co .absa .abris .avro .functions .to_confluent_avro
23- import za .co .absa .abris .avro .read .confluent .SchemaManager .{PARAM_KEY_SCHEMA_ID , PARAM_KEY_SCHEMA_NAMESPACE_FOR_RECORD_STRATEGY , PARAM_KEY_SCHEMA_NAME_FOR_RECORD_STRATEGY , PARAM_KEY_SCHEMA_NAMING_STRATEGY , PARAM_VALUE_SCHEMA_ID , PARAM_VALUE_SCHEMA_NAME_FOR_RECORD_STRATEGY , PARAM_VALUE_SCHEMA_NAMESPACE_FOR_RECORD_STRATEGY , PARAM_VALUE_SCHEMA_NAMING_STRATEGY }
21+ import org .apache .spark .sql .catalyst .expressions .Expression
22+ import org .apache .spark .sql .functions .struct
23+ import za .co .absa .abris .avro .functions .to_avro
24+ import za .co .absa .abris .config .ToAvroConfig
2425import za .co .absa .hyperdrive .ingestor .api .context .HyperdriveContext
2526import za .co .absa .hyperdrive .ingestor .api .transformer .{StreamTransformer , StreamTransformerFactory }
2627import za .co .absa .hyperdrive .ingestor .api .utils .ConfigUtils
2728import za .co .absa .hyperdrive .ingestor .implementation .HyperdriveContextKeys
28- import za .co .absa .hyperdrive .ingestor .implementation .utils .{SchemaRegistryProducerConfigKeys , SchemaRegistrySettingsUtil }
29+ import za .co .absa .hyperdrive .ingestor .implementation .transformer .avro .confluent .ConfluentAvroEncodingTransformer .{getKeyAvroConfig , getValueAvroConfig }
30+ import za .co .absa .hyperdrive .ingestor .implementation .utils .{AbrisConfigUtil , SchemaRegistryProducerConfigKeys }
2931import za .co .absa .hyperdrive .ingestor .implementation .writer .kafka .KafkaStreamWriter .KEY_TOPIC
3032
3133private [transformer] class ConfluentAvroEncodingTransformer (
32- val valueSchemaRegistrySettings : Map [ String , String ] ,
33- val keySchemaRegistrySettings : Option [ Map [ String , String ]] )
34+ val config : Configuration ,
35+ val withKey : Boolean )
3436 extends StreamTransformer {
3537
36- if (valueSchemaRegistrySettings.isEmpty) {
37- throw new IllegalArgumentException (
38- " Empty Schema Registry settings received." )
39- }
40-
4138 private val logger = LogManager .getLogger
4239
4340 override def transform (dataFrame : DataFrame ): DataFrame = {
44- logger.info(s " SchemaRegistry settings: $valueSchemaRegistrySettings" )
45-
46- keySchemaRegistrySettings match {
47- case Some (keySettings) => getKeyValueDataFrame(dataFrame, keySettings)
48- case None => getValueDataFrame(dataFrame)
41+ if (withKey) {
42+ getKeyValueDataFrame(dataFrame)
43+ } else {
44+ getValueDataFrame(dataFrame)
4945 }
5046 }
5147
52- private def getKeyValueDataFrame (dataFrame : DataFrame , keySchemaRegistrySettings : Map [ String , String ] ): DataFrame = {
48+ private def getKeyValueDataFrame (dataFrame : DataFrame ): DataFrame = {
5349 val keyColumnPrefix = HyperdriveContext .get[String ](HyperdriveContextKeys .keyColumnPrefix).get
5450 val keyColumnNames = HyperdriveContext .get[Seq [String ]](HyperdriveContextKeys .keyColumnNames).get
5551 val prefixedKeyColumnNames = keyColumnNames.map(c => s " $keyColumnPrefix$c" )
@@ -58,60 +54,49 @@ private[transformer] class ConfluentAvroEncodingTransformer(
5854 .filterNot(columnName => prefixedKeyColumnNames.contains(columnName))
5955 .map(c => dataFrame(c))
6056 val unprefixedKeyColumns = keyColumnNames.map(c => dataFrame(s " $keyColumnPrefix$c" ).as(c))
61- val unprefixedDataFrame = dataFrame.select(struct(unprefixedKeyColumns : _* ) as ' key , struct(valueColumns : _* ) as ' value )
62- unprefixedDataFrame.select(
63- to_confluent_avro(col(" key" ), keySchemaRegistrySettings) as ' key ,
64- to_confluent_avro(col(" value" ), valueSchemaRegistrySettings) as ' value )
57+ val keyStruct = struct(unprefixedKeyColumns : _* ) as ' key
58+ val valueStruct = struct(valueColumns : _* ) as ' value
59+ val keyToAvroConfig = getKeyAvroConfig(config, keyStruct.expr)
60+ val valueToAvroConfig = getValueAvroConfig(config, valueStruct.expr)
61+ logger.info(s " Key ToAvro settings: $keyToAvroConfig" )
62+ logger.info(s " Value ToAvro settings: $valueToAvroConfig" )
63+ dataFrame.select(
64+ to_avro(keyStruct, keyToAvroConfig) as ' key ,
65+ to_avro(valueStruct, valueToAvroConfig) as ' value )
6566 }
6667
6768 private def getValueDataFrame (dataFrame : DataFrame ): DataFrame = {
6869 val allColumns = struct(dataFrame.columns.head, dataFrame.columns.tail: _* )
69- dataFrame.select(to_confluent_avro(allColumns, valueSchemaRegistrySettings) as ' value )
70+ val toAvroConfig = getValueAvroConfig(config, allColumns.expr)
71+ logger.info(s " ToAvro settings: $toAvroConfig" )
72+ dataFrame.select(to_avro(allColumns, toAvroConfig) as ' value )
7073 }
7174}
7275
7376object ConfluentAvroEncodingTransformer extends StreamTransformerFactory with ConfluentAvroEncodingTransformerAttributes {
7477
75- object ValueSchemaConfigKeys extends SchemaRegistryProducerConfigKeys {
78+ object SchemaConfigKeys extends SchemaRegistryProducerConfigKeys {
79+ override val topic : String = KEY_TOPIC
7680 override val schemaRegistryUrl : String = KEY_SCHEMA_REGISTRY_URL
7781 override val namingStrategy : String = KEY_SCHEMA_REGISTRY_VALUE_NAMING_STRATEGY
7882 override val recordName : String = KEY_SCHEMA_REGISTRY_VALUE_RECORD_NAME
7983 override val recordNamespace : String = KEY_SCHEMA_REGISTRY_VALUE_RECORD_NAMESPACE
80- override val paramSchemaId : String = PARAM_VALUE_SCHEMA_ID
81- override val paramSchemaNamingStrategy : String = PARAM_VALUE_SCHEMA_NAMING_STRATEGY
82- override val paramSchemaNameForRecordStrategy : String = PARAM_VALUE_SCHEMA_NAME_FOR_RECORD_STRATEGY
83- override val paramSchemaNamespaceForRecordStrategy : String = PARAM_VALUE_SCHEMA_NAMESPACE_FOR_RECORD_STRATEGY
84- }
85-
86- object KeySchemaConfigKeys extends SchemaRegistryProducerConfigKeys {
87- override val schemaRegistryUrl : String = KEY_SCHEMA_REGISTRY_URL
88- override val namingStrategy : String = KEY_SCHEMA_REGISTRY_KEY_NAMING_STRATEGY
89- override val recordName : String = KEY_SCHEMA_REGISTRY_KEY_RECORD_NAME
90- override val recordNamespace : String = KEY_SCHEMA_REGISTRY_KEY_RECORD_NAMESPACE
91- override val paramSchemaId : String = PARAM_KEY_SCHEMA_ID
92- override val paramSchemaNamingStrategy : String = PARAM_KEY_SCHEMA_NAMING_STRATEGY
93- override val paramSchemaNameForRecordStrategy : String = PARAM_KEY_SCHEMA_NAME_FOR_RECORD_STRATEGY
94- override val paramSchemaNamespaceForRecordStrategy : String = PARAM_KEY_SCHEMA_NAMESPACE_FOR_RECORD_STRATEGY
9584 }
9685
9786 override def apply (config : Configuration ): StreamTransformer = {
98- val topic = config.getString(KEY_TOPIC )
99-
100- val valueSchemaRegistrySettings = SchemaRegistrySettingsUtil .getProducerSettings(config, topic, ValueSchemaConfigKeys )
101- val produceKeys = ConfigUtils .getOptionalBoolean(KEY_PRODUCE_KEYS , config).getOrElse(false )
102- val keySchemaRegistrySettingsOpt = if (produceKeys) {
103- Some (SchemaRegistrySettingsUtil .getProducerSettings(config, topic, KeySchemaConfigKeys ))
104- } else {
105- None
106- }
107-
108- new ConfluentAvroEncodingTransformer (valueSchemaRegistrySettings, keySchemaRegistrySettingsOpt)
87+ val withKey = ConfigUtils .getOptionalBoolean(KEY_PRODUCE_KEYS , config).getOrElse(false )
88+ new ConfluentAvroEncodingTransformer (config, withKey)
10989 }
11090
11191 override def getMappingFromRetainedGlobalConfigToLocalConfig (globalConfig : Configuration ): Map [String , String ] = Map (
11292 KEY_TOPIC -> KEY_TOPIC
11393 )
11494
95+ def getKeyAvroConfig (config : Configuration , expression : Expression ): ToAvroConfig =
96+ AbrisConfigUtil .getKeyProducerSettings(config, SchemaConfigKeys , expression)
97+
98+ def getValueAvroConfig (config : Configuration , expression : Expression ): ToAvroConfig =
99+ AbrisConfigUtil .getValueProducerSettings(config, SchemaConfigKeys , expression)
115100}
116101
117102
0 commit comments