Skip to content

Commit a30d467

Browse files
Feature/185 218 basic auth schema registry (#219)
* 220: Upgrade to ABRiS 4.2.0 * Rename SchemaRegistryConfigKeys -> AbrisConfigKeys * Refactor AbrisConfigUtil to accept map as schema registry config * Rename SchemaConfigKeys -> AbrisConfigKeys * Add support for arbitrary schema registry options * For now, only print safe properties of the schema registry config * Fix test * Update readme * Fix readme, update attributes * Revert special printing of config, private members cannot be printed
1 parent 5567a7e commit a30d467

File tree

12 files changed

+248
-68
lines changed

12 files changed

+248
-68
lines changed

README.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,9 +141,12 @@ The `ConfluentAvroStreamDecodingTransformer` is built on [ABRiS](https://github.
141141
| `transformer.{transformer-id}.key.schema.record.namespace` | Yes for key naming strategies `record.name` and `topic.record.name` | Namespace of the record. |
142142
| `transformer.{transformer-id}.keep.columns` | No | Comma-separated list of columns to keep (e.g. offset, partition) |
143143
| `transformer.{transformer-id}.disable.nullability.preservation` | No | Set to true to ignore fix [#137](https://github.com/AbsaOSS/hyperdrive/issues/137) and to keep the same behaviour as for versions prior to and including v3.2.2. Default value: `false` |
144+
| `transformer.{transformer-id}.schema.registry.basic.auth.user.info.file` | No | A path to a text file, that contains one line in the form `<username>:<password>`. It will be passed as `basic.auth.user.info` to the schema registry config |
144145

145146
For detailed information on the subject name strategy, please take a look at the [Schema Registry Documentation](https://docs.confluent.io/current/schema-registry/).
146147

148+
Any additional properties for the schema registry config can be added with the prefix `transformer.{transformer-id}.schema.registry.options.`
149+
147150
##### ConfluentAvroStreamEncodingTransformer
148151
The `ConfluentAvroStreamEncodingTransformer` is built on [ABRiS](https://github.com/AbsaOSS/ABRiS). More details about the configuration properties can be found there.
149152
**Caution**: The `ConfluentAvroStreamEncodingTransformer` requires the property `writer.kafka.topic` to be set.
@@ -158,6 +161,9 @@ The `ConfluentAvroStreamEncodingTransformer` is built on [ABRiS](https://github.
158161
| `transformer.{transformer-id}.key.schema.naming.strategy` | Yes if `produce.keys` is true | Subject name strategy for key |
159162
| `transformer.{transformer-id}.key.schema.record.name` | Yes for key naming strategies `record.name` and `topic.record.name` | Name of the record. |
160163
| `transformer.{transformer-id}.key.schema.record.namespace` | Yes for key naming strategies `record.name` and `topic.record.name` | Namespace of the record. |
164+
| `transformer.{transformer-id}.schema.registry.basic.auth.user.info.file` | No | A path to a text file, that contains one line in the form `<username>:<password>`. It will be passed as `basic.auth.user.info` to the schema registry config |
165+
166+
Any additional properties for the schema registry config can be added with the prefix `transformer.{transformer-id}.schema.registry.options.`
161167

162168
##### ColumnSelectorStreamTransformer
163169
| Property Name | Required | Description |

ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/transformer/avro/confluent/ConfluentAvroDecodingTransformer.scala

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
package za.co.absa.hyperdrive.ingestor.implementation.transformer.avro.confluent
1717

1818
import java.util.UUID
19-
2019
import org.apache.commons.configuration2.Configuration
2120
import org.apache.commons.lang3.RandomStringUtils
2221
import org.apache.logging.log4j.LogManager
@@ -28,9 +27,10 @@ import za.co.absa.abris.config.FromAvroConfig
2827
import za.co.absa.hyperdrive.ingestor.api.context.HyperdriveContext
2928
import za.co.absa.hyperdrive.ingestor.api.transformer.{StreamTransformer, StreamTransformerFactory}
3029
import za.co.absa.hyperdrive.ingestor.api.utils.ConfigUtils
30+
import za.co.absa.hyperdrive.ingestor.api.utils.ConfigUtils.getOrThrow
3131
import za.co.absa.hyperdrive.ingestor.implementation.HyperdriveContextKeys
3232
import za.co.absa.hyperdrive.ingestor.implementation.reader.kafka.KafkaStreamReader.KEY_TOPIC
33-
import za.co.absa.hyperdrive.ingestor.implementation.utils.{AbrisConfigUtil, SchemaRegistryConsumerConfigKeys}
33+
import za.co.absa.hyperdrive.ingestor.implementation.utils.{AbrisConfigKeys, AbrisConfigUtil, AbrisConsumerConfigKeys, SchemaRegistryConfigUtil}
3434

3535
private[transformer] class ConfluentAvroDecodingTransformer(
3636
val valueAvroConfig: FromAvroConfig,
@@ -40,11 +40,7 @@ private[transformer] class ConfluentAvroDecodingTransformer(
4040
)
4141
extends StreamTransformer {
4242

43-
private val logger = LogManager.getLogger
44-
4543
override def transform(dataFrame: DataFrame): DataFrame = {
46-
logger.info(s"SchemaRegistry settings: $valueAvroConfig")
47-
4844
keyAvroConfigOpt match {
4945
case Some(keyAvroConfig) => getKeyValueDataFrame(dataFrame, keyAvroConfig)
5046
case None => getValueDataFrame(dataFrame)
@@ -111,21 +107,21 @@ private[transformer] class ConfluentAvroDecodingTransformer(
111107
object ConfluentAvroDecodingTransformer extends StreamTransformerFactory with ConfluentAvroDecodingTransformerAttributes {
112108
private val keyColumnPrefixLength = 4
113109

114-
object SchemaConfigKeys extends SchemaRegistryConsumerConfigKeys {
110+
object AbrisConfigKeys extends AbrisConsumerConfigKeys {
115111
override val topic: String = KEY_TOPIC
116-
override val schemaRegistryUrl: String = KEY_SCHEMA_REGISTRY_URL
117112
override val schemaId: String = KEY_SCHEMA_REGISTRY_VALUE_SCHEMA_ID
118113
override val namingStrategy: String = KEY_SCHEMA_REGISTRY_VALUE_NAMING_STRATEGY
119114
override val recordName: String = KEY_SCHEMA_REGISTRY_VALUE_RECORD_NAME
120115
override val recordNamespace: String = KEY_SCHEMA_REGISTRY_VALUE_RECORD_NAMESPACE
121116
}
122117

123118
override def apply(config: Configuration): StreamTransformer = {
124-
val valueAvroConfig = AbrisConfigUtil.getValueConsumerSettings(config, SchemaConfigKeys)
119+
val schemaRegistryConfig = SchemaRegistryConfigUtil.getSchemaRegistryConfig(config)
120+
val valueAvroConfig = AbrisConfigUtil.getValueConsumerSettings(config, AbrisConfigKeys, schemaRegistryConfig)
125121

126122
val consumeKeys = ConfigUtils.getOptionalBoolean(KEY_CONSUME_KEYS, config).getOrElse(false)
127123
val keyAvroConfigOpt = if (consumeKeys) {
128-
Some(AbrisConfigUtil.getKeyConsumerSettings(config, SchemaConfigKeys))
124+
Some(AbrisConfigUtil.getKeyConsumerSettings(config, AbrisConfigKeys, schemaRegistryConfig))
129125
} else {
130126
None
131127
}

ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/transformer/avro/confluent/ConfluentAvroDecodingTransformerAttributes.scala

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,7 @@ package za.co.absa.hyperdrive.ingestor.implementation.transformer.avro.confluent
1717

1818
import za.co.absa.hyperdrive.ingestor.api.{HasComponentAttributes, PropertyMetadata}
1919

20-
trait ConfluentAvroDecodingTransformerAttributes extends HasComponentAttributes {
21-
val KEY_SCHEMA_REGISTRY_URL = "schema.registry.url"
20+
trait ConfluentAvroDecodingTransformerAttributes extends HasComponentAttributes with SchemaRegistryAttributes {
2221

2322
val KEY_SCHEMA_REGISTRY_VALUE_NAMING_STRATEGY = "value.schema.naming.strategy"
2423
val KEY_SCHEMA_REGISTRY_VALUE_SCHEMA_ID = "value.schema.id"
@@ -34,6 +33,7 @@ trait ConfluentAvroDecodingTransformerAttributes extends HasComponentAttributes
3433
val KEY_KEEP_COLUMNS = "keep.columns"
3534
val KEY_DISABLE_NULLABILITY_PRESERVATION = "disable.nullability.preservation"
3635

36+
3737
override def getName: String = "Confluent Avro Stream Decoder"
3838

3939
override def getDescription: String = "Decoder for records in Avro format. The decoder connects to a Schema Registry instance to retrieve the schema information."
@@ -54,6 +54,9 @@ trait ConfluentAvroDecodingTransformerAttributes extends HasComponentAttributes
5454
KEY_SCHEMA_REGISTRY_KEY_RECORD_NAME -> PropertyMetadata("Key-Record name", Some("Key-Record name for naming strategies record.name or topic.record.name"), required = false),
5555
KEY_SCHEMA_REGISTRY_KEY_RECORD_NAMESPACE -> PropertyMetadata("Key-Record namespace", Some("Key-Record namespace for naming strategies record.name or topic.record.name"), required = false),
5656
KEY_KEEP_COLUMNS -> PropertyMetadata("Columns to keep", Some("Comma-separated list of columns to keep (e.g. offset, partition)"), required = false),
57-
KEY_DISABLE_NULLABILITY_PRESERVATION -> PropertyMetadata("Disable nullability preservation", Some("Keep same behaviour as for versions prior to and including v3.2.2"), required = false)
57+
KEY_DISABLE_NULLABILITY_PRESERVATION -> PropertyMetadata("Disable nullability preservation", Some("Keep same behaviour as for versions prior to and including v3.2.2"), required = false),
58+
KEY_SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO_FILE -> PropertyMetadata("Basic auth user info file", Some("Text file containing one line in the form <username>:<password> for basic auth in schema registry"), required = false)
5859
)
60+
61+
override def getExtraConfigurationPrefix: Option[String] = Some(KEY_SCHEMA_REGISTRY_EXTRA_CONFS_ROOT)
5962
}

ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/transformer/avro/confluent/ConfluentAvroEncodingTransformer.scala

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ import za.co.absa.hyperdrive.ingestor.api.transformer.{StreamTransformer, Stream
2727
import za.co.absa.hyperdrive.ingestor.api.utils.ConfigUtils
2828
import za.co.absa.hyperdrive.ingestor.implementation.HyperdriveContextKeys
2929
import za.co.absa.hyperdrive.ingestor.implementation.transformer.avro.confluent.ConfluentAvroEncodingTransformer.{getKeyAvroConfig, getValueAvroConfig}
30-
import za.co.absa.hyperdrive.ingestor.implementation.utils.{AbrisConfigUtil, SchemaRegistryProducerConfigKeys}
30+
import za.co.absa.hyperdrive.ingestor.implementation.utils.{AbrisConfigUtil, AbrisProducerConfigKeys, SchemaRegistryConfigUtil}
3131
import za.co.absa.hyperdrive.ingestor.implementation.writer.kafka.KafkaStreamWriter.KEY_TOPIC
3232

3333
private[transformer] class ConfluentAvroEncodingTransformer(
@@ -75,9 +75,8 @@ private[transformer] class ConfluentAvroEncodingTransformer(
7575

7676
object ConfluentAvroEncodingTransformer extends StreamTransformerFactory with ConfluentAvroEncodingTransformerAttributes {
7777

78-
object SchemaConfigKeys extends SchemaRegistryProducerConfigKeys {
78+
object AbrisConfigKeys extends AbrisProducerConfigKeys {
7979
override val topic: String = KEY_TOPIC
80-
override val schemaRegistryUrl: String = KEY_SCHEMA_REGISTRY_URL
8180
override val namingStrategy: String = KEY_SCHEMA_REGISTRY_VALUE_NAMING_STRATEGY
8281
override val recordName: String = KEY_SCHEMA_REGISTRY_VALUE_RECORD_NAME
8382
override val recordNamespace: String = KEY_SCHEMA_REGISTRY_VALUE_RECORD_NAMESPACE
@@ -92,11 +91,15 @@ object ConfluentAvroEncodingTransformer extends StreamTransformerFactory with Co
9291
KEY_TOPIC -> KEY_TOPIC
9392
)
9493

95-
def getKeyAvroConfig(config: Configuration, expression: Expression): ToAvroConfig =
96-
AbrisConfigUtil.getKeyProducerSettings(config, SchemaConfigKeys, expression)
94+
def getKeyAvroConfig(config: Configuration, expression: Expression): ToAvroConfig = {
95+
val schemaRegistryConfig = SchemaRegistryConfigUtil.getSchemaRegistryConfig(config)
96+
AbrisConfigUtil.getKeyProducerSettings(config, AbrisConfigKeys, expression, schemaRegistryConfig)
97+
}
9798

98-
def getValueAvroConfig(config: Configuration, expression: Expression): ToAvroConfig =
99-
AbrisConfigUtil.getValueProducerSettings(config, SchemaConfigKeys, expression)
99+
def getValueAvroConfig(config: Configuration, expression: Expression): ToAvroConfig = {
100+
val schemaRegistryConfig = SchemaRegistryConfigUtil.getSchemaRegistryConfig(config)
101+
AbrisConfigUtil.getValueProducerSettings(config, AbrisConfigKeys, expression, schemaRegistryConfig)
102+
}
100103
}
101104

102105

ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/transformer/avro/confluent/ConfluentAvroEncodingTransformerAttributes.scala

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,7 @@ package za.co.absa.hyperdrive.ingestor.implementation.transformer.avro.confluent
1717

1818
import za.co.absa.hyperdrive.ingestor.api.{HasComponentAttributes, PropertyMetadata}
1919

20-
trait ConfluentAvroEncodingTransformerAttributes extends HasComponentAttributes {
21-
val KEY_SCHEMA_REGISTRY_URL = "schema.registry.url"
20+
trait ConfluentAvroEncodingTransformerAttributes extends HasComponentAttributes with SchemaRegistryAttributes {
2221
val KEY_SCHEMA_REGISTRY_VALUE_NAMING_STRATEGY = "value.schema.naming.strategy"
2322
val KEY_SCHEMA_REGISTRY_VALUE_RECORD_NAME = "value.schema.record.name"
2423
val KEY_SCHEMA_REGISTRY_VALUE_RECORD_NAMESPACE = "value.schema.record.namespace"
@@ -45,6 +44,9 @@ trait ConfluentAvroEncodingTransformerAttributes extends HasComponentAttributes
4544
KEY_SCHEMA_REGISTRY_KEY_NAMING_STRATEGY -> PropertyMetadata("Key-Schema naming strategy",
4645
Some("Subject name strategy of Schema Registry. Must be one of \"topic.name\", \"record.name\" or \"topic.record.name\""), required = false),
4746
KEY_SCHEMA_REGISTRY_KEY_RECORD_NAME -> PropertyMetadata("Key-Record name", Some("Key-Record name for naming strategies record.name or topic.record.name"), required = false),
48-
KEY_SCHEMA_REGISTRY_KEY_RECORD_NAMESPACE -> PropertyMetadata("Key-Record namespace", Some("Key-Record namespace for naming strategies record.name or topic.record.name"), required = false)
47+
KEY_SCHEMA_REGISTRY_KEY_RECORD_NAMESPACE -> PropertyMetadata("Key-Record namespace", Some("Key-Record namespace for naming strategies record.name or topic.record.name"), required = false),
48+
KEY_SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO_FILE -> PropertyMetadata("Basic auth user info file", Some("Text file containing one line in the form <username>:<password> for basic auth in schema registry"), required = false)
4949
)
50+
51+
override def getExtraConfigurationPrefix: Option[String] = Some(KEY_SCHEMA_REGISTRY_EXTRA_CONFS_ROOT)
5052
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
2+
/*
3+
* Copyright 2018 ABSA Group Limited
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package za.co.absa.hyperdrive.ingestor.implementation.transformer.avro.confluent
18+
19+
trait SchemaRegistryAttributes {
20+
val KEY_SCHEMA_REGISTRY_URL = "schema.registry.url"
21+
val KEY_SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO_FILE = "schema.registry.basic.auth.user.info.file"
22+
val KEY_SCHEMA_REGISTRY_EXTRA_CONFS_ROOT = "schema.registry.options"
23+
}
24+
25+
object SchemaRegistryAttributes extends SchemaRegistryAttributes

ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/utils/SchemaRegistryConfigKeys.scala renamed to ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/utils/AbrisConfigKeys.scala

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,16 +15,15 @@
1515

1616
package za.co.absa.hyperdrive.ingestor.implementation.utils
1717

18-
private[hyperdrive] trait SchemaRegistryConfigKeys {
18+
private[hyperdrive] trait AbrisConfigKeys {
1919
val topic: String
20-
val schemaRegistryUrl: String
2120
val namingStrategy: String
2221
val recordName: String
2322
val recordNamespace: String
2423
}
2524

26-
private[hyperdrive] trait SchemaRegistryConsumerConfigKeys extends SchemaRegistryConfigKeys {
25+
private[hyperdrive] trait AbrisConsumerConfigKeys extends AbrisConfigKeys {
2726
val schemaId: String
2827
}
2928

30-
private[hyperdrive] trait SchemaRegistryProducerConfigKeys extends SchemaRegistryConfigKeys
29+
private[hyperdrive] trait AbrisProducerConfigKeys extends AbrisConfigKeys

0 commit comments

Comments
 (0)