Skip to content

Commit a388653

Browse files
Feature/227 nullable fields (#228)
* Refactor schema generation * Implement schema updating * Add properties to ConfluentAvroEncodingTransformer to specify optional columns
1 parent d154403 commit a388653

File tree

6 files changed

+266
-27
lines changed

6 files changed

+266
-27
lines changed

README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,10 +157,12 @@ The `ConfluentAvroStreamEncodingTransformer` is built on [ABRiS](https://github.
157157
| `transformer.{transformer-id}.value.schema.naming.strategy` | Yes | Subject name strategy of Schema Registry. Possible values are `topic.name`, `record.name` or `topic.record.name`. Equivalent to ABRiS property `SchemaManager.PARAM_VALUE_SCHEMA_NAMING_STRATEGY` |
158158
| `transformer.{transformer-id}.value.schema.record.name` | Yes for naming strategies `record.name` and `topic.record.name` | Name of the record. Equivalent to ABRiS property `SchemaManager.PARAM_SCHEMA_NAME_FOR_RECORD_STRATEGY` |
159159
| `transformer.{transformer-id}.value.schema.record.namespace` | Yes for naming strategies `record.name` and `topic.record.name` | Namespace of the record. Equivalent to ABRiS property `SchemaManager.PARAM_SCHEMA_NAMESPACE_FOR_RECORD_STRATEGY` |
160+
| `transformer.{transformer-id}.value.optional.fields` | No | Comma-separated list of nullable value columns that should get default value null in the avro schema. Nested columns' names should be concatenated with the dot (`.`) |
160161
| `transformer.{transformer-id}.produce.keys` | No | If set to `true`, keys will be produced according to the properties `key.column.prefix` and `key.column.names` of the [Hyperdrive Context](#hyperdrive-context) |
161162
| `transformer.{transformer-id}.key.schema.naming.strategy` | Yes if `produce.keys` is true | Subject name strategy for key |
162163
| `transformer.{transformer-id}.key.schema.record.name` | Yes for key naming strategies `record.name` and `topic.record.name` | Name of the record. |
163164
| `transformer.{transformer-id}.key.schema.record.namespace` | Yes for key naming strategies `record.name` and `topic.record.name` | Namespace of the record. |
165+
| `transformer.{transformer-id}.key.optional.fields` | No | Comma-separated list of nullable key columns that should get default value null in the avro schema. Nested columns' names should be concatenated with the dot (`.`) |
164166
| `transformer.{transformer-id}.schema.registry.basic.auth.user.info.file` | No | A path to a text file, that contains one line in the form `<username>:<password>`. It will be passed as `basic.auth.user.info` to the schema registry config |
165167

166168
Any additional properties for the schema registry config can be added with the prefix `transformer.{transformer-id}.schema.registry.options.`

ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/transformer/avro/confluent/ConfluentAvroEncodingTransformer.scala

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515

1616
package za.co.absa.hyperdrive.ingestor.implementation.transformer.avro.confluent
1717

18+
import org.apache.avro.JsonProperties
1819
import org.apache.commons.configuration2.Configuration
1920
import org.apache.logging.log4j.LogManager
2021
import org.apache.spark.sql.DataFrame
@@ -74,6 +75,7 @@ private[transformer] class ConfluentAvroEncodingTransformer(
7475
}
7576

7677
object ConfluentAvroEncodingTransformer extends StreamTransformerFactory with ConfluentAvroEncodingTransformerAttributes {
78+
private val logger = LogManager.getLogger
7779

7880
object AbrisConfigKeys extends AbrisProducerConfigKeys {
7981
override val topic: String = KEY_TOPIC
@@ -93,12 +95,23 @@ object ConfluentAvroEncodingTransformer extends StreamTransformerFactory with Co
9395

9496
def getKeyAvroConfig(config: Configuration, expression: Expression): ToAvroConfig = {
9597
val schemaRegistryConfig = SchemaRegistryConfigUtil.getSchemaRegistryConfig(config)
96-
AbrisConfigUtil.getKeyProducerSettings(config, AbrisConfigKeys, expression, schemaRegistryConfig)
98+
val newDefaultValues = ConfigUtils.getSeqOrNone(KEY_KEY_OPTIONAL_FIELDS, config)
99+
.map(optionalFields => optionalFields.map(_ -> JsonProperties.NULL_VALUE).toMap)
100+
.getOrElse(Map())
101+
102+
val schema = AbrisConfigUtil.generateSchema(config, AbrisConfigKeys, expression, newDefaultValues)
103+
logger.info(s"Generated key schema\n${schema.toString(true)}")
104+
AbrisConfigUtil.getKeyProducerSettings(config, AbrisConfigKeys, schema, schemaRegistryConfig)
97105
}
98106

99107
def getValueAvroConfig(config: Configuration, expression: Expression): ToAvroConfig = {
100108
val schemaRegistryConfig = SchemaRegistryConfigUtil.getSchemaRegistryConfig(config)
101-
AbrisConfigUtil.getValueProducerSettings(config, AbrisConfigKeys, expression, schemaRegistryConfig)
109+
val newDefaultValues = ConfigUtils.getSeqOrNone(KEY_VALUE_OPTIONAL_FIELDS, config)
110+
.map(optionalFields => optionalFields.map(_ -> JsonProperties.NULL_VALUE).toMap)
111+
.getOrElse(Map())
112+
val schema = AbrisConfigUtil.generateSchema(config, AbrisConfigKeys, expression, newDefaultValues)
113+
logger.info(s"Generated value schema\n${schema.toString(true)}")
114+
AbrisConfigUtil.getValueProducerSettings(config, AbrisConfigKeys, schema, schemaRegistryConfig)
102115
}
103116
}
104117

ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/transformer/avro/confluent/ConfluentAvroEncodingTransformerAttributes.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,13 @@ trait ConfluentAvroEncodingTransformerAttributes extends HasComponentAttributes
2121
val KEY_SCHEMA_REGISTRY_VALUE_NAMING_STRATEGY = "value.schema.naming.strategy"
2222
val KEY_SCHEMA_REGISTRY_VALUE_RECORD_NAME = "value.schema.record.name"
2323
val KEY_SCHEMA_REGISTRY_VALUE_RECORD_NAMESPACE = "value.schema.record.namespace"
24+
val KEY_VALUE_OPTIONAL_FIELDS = "value.optional.fields"
2425

2526
val KEY_PRODUCE_KEYS = "produce.keys"
2627
val KEY_SCHEMA_REGISTRY_KEY_NAMING_STRATEGY = "key.schema.naming.strategy"
2728
val KEY_SCHEMA_REGISTRY_KEY_RECORD_NAME = "key.schema.record.name"
2829
val KEY_SCHEMA_REGISTRY_KEY_RECORD_NAMESPACE = "key.schema.record.namespace"
30+
val KEY_KEY_OPTIONAL_FIELDS = "key.optional.fields"
2931

3032
override def getName: String = "Confluent Avro Stream Encoder"
3133

@@ -40,12 +42,14 @@ trait ConfluentAvroEncodingTransformerAttributes extends HasComponentAttributes
4042
Some("Record name for naming strategies record.name or topic.record.name"), required = false),
4143
KEY_SCHEMA_REGISTRY_VALUE_RECORD_NAMESPACE -> PropertyMetadata("Value-Record namespace",
4244
Some("Record namespace for naming strategies record.name or topic.record.name"), required = false),
45+
KEY_VALUE_OPTIONAL_FIELDS -> PropertyMetadata("Value-Record optional fields", Some("Comma-separated list of nullable value columns that should get default value null in the avro schema"), required = false),
4346

4447
KEY_SCHEMA_REGISTRY_KEY_NAMING_STRATEGY -> PropertyMetadata("Key-Schema naming strategy",
4548
Some("Subject name strategy of Schema Registry. Must be one of \"topic.name\", \"record.name\" or \"topic.record.name\""), required = false),
4649
KEY_SCHEMA_REGISTRY_KEY_RECORD_NAME -> PropertyMetadata("Key-Record name", Some("Key-Record name for naming strategies record.name or topic.record.name"), required = false),
4750
KEY_SCHEMA_REGISTRY_KEY_RECORD_NAMESPACE -> PropertyMetadata("Key-Record namespace", Some("Key-Record namespace for naming strategies record.name or topic.record.name"), required = false),
48-
KEY_SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO_FILE -> PropertyMetadata("Basic auth user info file", Some("Text file containing one line in the form <username>:<password> for basic auth in schema registry"), required = false)
51+
KEY_SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO_FILE -> PropertyMetadata("Basic auth user info file", Some("Text file containing one line in the form <username>:<password> for basic auth in schema registry"), required = false),
52+
KEY_KEY_OPTIONAL_FIELDS -> PropertyMetadata("Key-Record optional fields", Some("Comma-separated list of nullable key columns that should get default value null in the avro schema"), required = false)
4953
)
5054

5155
override def getExtraConfigurationPrefix: Option[String] = Some(KEY_SCHEMA_REGISTRY_EXTRA_CONFS_ROOT)

ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/utils/AbrisConfigUtil.scala

Lines changed: 75 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515

1616
package za.co.absa.hyperdrive.ingestor.implementation.utils
1717

18+
import org.apache.avro.{JsonProperties, Schema}
1819
import org.apache.commons.configuration2.Configuration
1920
import org.apache.spark.sql.avro.SchemaConverters.toAvroType
2021
import org.apache.spark.sql.catalyst.expressions.Expression
@@ -63,32 +64,27 @@ private[hyperdrive] object AbrisConfigUtil {
6364
fromSchemaRegisteringConfigFragment.usingSchemaRegistry(schemaRegistryConfig)
6465
}
6566

66-
def getKeyProducerSettings(configuration: Configuration, configKeys: AbrisProducerConfigKeys, expression: Expression,
67+
def getKeyProducerSettings(configuration: Configuration, configKeys: AbrisProducerConfigKeys, schema: Schema,
6768
schemaRegistryConfig: Map[String, String]): ToAvroConfig =
68-
getProducerSettings(configuration, configKeys, isKey = true, expression, schemaRegistryConfig)
69+
getProducerSettings(configuration, configKeys, isKey = true, schema, schemaRegistryConfig)
6970

70-
def getValueProducerSettings(configuration: Configuration, configKeys: AbrisProducerConfigKeys, expression: Expression,
71+
def getValueProducerSettings(configuration: Configuration, configKeys: AbrisProducerConfigKeys, schema: Schema,
7172
schemaRegistryConfig: Map[String, String]): ToAvroConfig =
72-
getProducerSettings(configuration, configKeys, isKey = false, expression, schemaRegistryConfig)
73+
getProducerSettings(configuration, configKeys, isKey = false, schema, schemaRegistryConfig)
7374

7475
private def getProducerSettings(configuration: Configuration, configKeys: AbrisProducerConfigKeys, isKey: Boolean,
75-
expression: Expression, schemaRegistryConfig: Map[String, String]): ToAvroConfig = {
76+
schema: Schema, schemaRegistryConfig: Map[String, String]): ToAvroConfig = {
7677
val schemaManager = SchemaManagerFactory.create(schemaRegistryConfig)
7778
val topic = getTopic(configuration, configKeys)
7879
val namingStrategy = getNamingStrategy(configuration, configKeys)
7980
val schemaId = namingStrategy match {
8081
case TopicNameStrategy =>
81-
val schema = toAvroType(expression.dataType, expression.nullable)
8282
val subject = SchemaSubject.usingTopicNameStrategy(topic, isKey)
8383
schemaManager.register(subject, schema)
8484
case RecordNameStrategy =>
85-
val schema = toAvroType(expression.dataType, expression.nullable, getRecordName(configuration, configKeys),
86-
getRecordNamespace(configuration, configKeys))
8785
val subject = SchemaSubject.usingRecordNameStrategy(schema)
8886
schemaManager.register(subject, schema)
8987
case TopicRecordNameStrategy =>
90-
val schema = toAvroType(expression.dataType, expression.nullable, getRecordName(configuration, configKeys),
91-
getRecordNamespace(configuration, configKeys))
9288
val subject = SchemaSubject.usingTopicRecordNameStrategy(topic, schema)
9389
schemaManager.register(subject, schema)
9490
case _ => throw new IllegalArgumentException("Naming strategy must be one of topic.name, record.name or topic.record.name")
@@ -100,6 +96,75 @@ private[hyperdrive] object AbrisConfigUtil {
10096
.usingSchemaRegistry(schemaRegistryConfig)
10197
}
10298

99+
/**
100+
* Generates an avro schema given a Spark expression. Record name and namespace are derived according to the
101+
* configured naming strategy. Default values for the avro schema can be passed using a key-value map. The keys
102+
* need to correspond to the field names. In case of nested structs, nested field names should be concatenated
103+
* using the dot (.), e.g. "parent.childField.subChildField". Note that dots in avro field names are not allowed.
104+
*/
105+
def generateSchema(configuration: Configuration, configKeys: AbrisProducerConfigKeys, expression: Expression,
106+
newDefaultValues: Map[String, Object]): Schema = {
107+
val namingStrategy = getNamingStrategy(configuration, configKeys)
108+
val initialSchema = namingStrategy match {
109+
case TopicNameStrategy => toAvroType(expression.dataType, expression.nullable)
110+
case x if x == RecordNameStrategy || x == TopicRecordNameStrategy => toAvroType(expression.dataType,
111+
expression.nullable, getRecordName(configuration, configKeys), getRecordNamespace(configuration, configKeys))
112+
case _ => throw new IllegalArgumentException("Naming strategy must be one of topic.name, record.name or topic.record.name")
113+
}
114+
115+
updateSchema(initialSchema, newDefaultValues)
116+
}
117+
118+
/**
119+
* This method is intended to update schemas created by [[org.apache.spark.sql.avro.SchemaConverters.toAvroType]] with
120+
* new default values.
121+
* Apart from the basic types, it only supports the complex types Record, Map and Array. New default values for Enum
122+
* or Fixed cannot be assigned. Updating default values for the union type is only supported for a union with null.
123+
* The correct order of arbitrary unions with respect to the given default value is not guaranteed.
124+
*/
125+
private def updateSchema(schema: Schema, newDefaultValues: Map[String, Object], fieldPrefix: String = ""): Schema = {
126+
val prefixSeparator = if (fieldPrefix.isEmpty) "" else "."
127+
import scala.collection.JavaConverters._
128+
schema.getType match {
129+
case Schema.Type.UNION =>
130+
val newSchemas = schema.getTypes.asScala.map(t =>
131+
updateSchema(t, newDefaultValues, fieldPrefix)
132+
)
133+
Schema.createUnion(newSchemas.asJava)
134+
case Schema.Type.RECORD =>
135+
val newFields = schema.getFields.asScala.map(f => {
136+
val fullFieldName = s"$fieldPrefix$prefixSeparator${f.name()}"
137+
val defaultValue = newDefaultValues.getOrElse(fullFieldName, f.defaultVal())
138+
val newSchema = updateSchema(f.schema(), newDefaultValues, fullFieldName)
139+
val newSchemaReordered = reorderUnionTypesForDefaultValueNull(newSchema, defaultValue)
140+
new Schema.Field(f.name(), newSchemaReordered, f.doc(), defaultValue, f.order())
141+
})
142+
Schema.createRecord(schema.getName, schema.getDoc, schema.getNamespace, schema.isError, newFields.asJava)
143+
case Schema.Type.ARRAY =>
144+
val newSchema = updateSchema(schema.getElementType, newDefaultValues, fieldPrefix)
145+
Schema.createArray(newSchema)
146+
case Schema.Type.MAP =>
147+
val newSchema = updateSchema(schema.getValueType, newDefaultValues, fieldPrefix)
148+
Schema.createMap(newSchema)
149+
case _ => schema
150+
}
151+
}
152+
153+
private def reorderUnionTypesForDefaultValueNull(schema: Schema, defaultValue: Object) = {
154+
import scala.collection.JavaConverters._
155+
lazy val schemaTypes = schema.getTypes.asScala
156+
if (schema.getType == Schema.Type.UNION &&
157+
schemaTypes.size == 2 &&
158+
schemaTypes.head.getType != Schema.Type.NULL &&
159+
schemaTypes(1).getType == Schema.Type.NULL &&
160+
defaultValue.isInstanceOf[JsonProperties.Null]
161+
) {
162+
Schema.createUnion(Schema.create(Schema.Type.NULL), schemaTypes.head)
163+
} else {
164+
schema
165+
}
166+
}
167+
103168
private def getTopic(configuration: Configuration, configKeys: AbrisConfigKeys): String =
104169
getOrThrow(configKeys.topic, configuration, errorMessage = s"Topic not found. Is '${configKeys.topic}' properly set?")
105170

ingestor-default/src/test/scala/za/co/absa/hyperdrive/ingestor/implementation/transformer/avro/confluent/TestConfluentAvroEncodingTransformer.scala

Lines changed: 86 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,22 @@
1515

1616
package za.co.absa.hyperdrive.ingestor.implementation.transformer.avro.confluent
1717

18+
import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient
1819
import org.apache.commons.configuration2.BaseConfiguration
20+
import org.apache.commons.configuration2.convert.DefaultListDelimiterHandler
21+
import org.apache.spark.sql.Row
22+
import org.apache.spark.sql.catalyst.encoders.RowEncoder
1923
import org.apache.spark.sql.execution.streaming.MemoryStream
24+
import org.apache.spark.sql.functions.{array, lit, map, struct}
2025
import org.apache.spark.sql.streaming.Trigger
26+
import org.apache.spark.sql.types.{BooleanType, IntegerType, StringType, StructField, StructType}
2127
import org.scalatest.{BeforeAndAfter, FlatSpec, Matchers}
28+
import za.co.absa.abris.avro.parsing.utils.AvroSchemaUtils
2229
import za.co.absa.abris.avro.read.confluent.SchemaManagerFactory
2330
import za.co.absa.abris.config.AbrisConfig
2431
import za.co.absa.commons.spark.SparkTestBase
32+
import za.co.absa.hyperdrive.ingestor.api.context.HyperdriveContext
33+
import za.co.absa.hyperdrive.ingestor.implementation.HyperdriveContextKeys
2534
import za.co.absa.hyperdrive.ingestor.implementation.testutils.HyperdriveMockSchemaRegistryClient
2635
import za.co.absa.hyperdrive.ingestor.implementation.transformer.avro.confluent.ConfluentAvroEncodingTransformer._
2736
import za.co.absa.hyperdrive.ingestor.implementation.utils.AbrisConfigUtil
@@ -31,11 +40,11 @@ class TestConfluentAvroEncodingTransformer extends FlatSpec with Matchers with B
3140

3241
private val topic = "topic"
3342
private val SchemaRegistryURL = "http://localhost:8081"
34-
43+
private var mockSchemaRegistryClient: MockSchemaRegistryClient = _
3544
behavior of ConfluentAvroEncodingTransformer.getClass.getSimpleName
3645

3746
before {
38-
val mockSchemaRegistryClient = new HyperdriveMockSchemaRegistryClient()
47+
mockSchemaRegistryClient = new HyperdriveMockSchemaRegistryClient()
3948
SchemaManagerFactory.resetSRClientInstance()
4049
SchemaManagerFactory.addSRClientInstance(Map(AbrisConfig.SCHEMA_REGISTRY_URL -> SchemaRegistryURL), mockSchemaRegistryClient)
4150
}
@@ -52,7 +61,7 @@ class TestConfluentAvroEncodingTransformer extends FlatSpec with Matchers with B
5261
encoder.withKey shouldBe false
5362
}
5463

55-
it should "encode the values" in {
64+
"transform" should "encode the values" in {
5665
// given
5766
import spark.implicits._
5867
val queryName = "dummyQuery"
@@ -83,4 +92,78 @@ class TestConfluentAvroEncodingTransformer extends FlatSpec with Matchers with B
8392
val byteArrays = outputDf.select("value").map(_ (0).asInstanceOf[Array[Byte]]).collect()
8493
byteArrays.distinct.length shouldBe byteArrays.length
8594
}
95+
96+
it should "register a schema with optional fields" in {
97+
// given
98+
val schema = StructType(Seq(
99+
StructField("key__col1", IntegerType, nullable = true),
100+
StructField("col2", StringType, nullable = true),
101+
StructField("col3", StructType(
102+
Seq(StructField("subCol1", StringType, nullable = true))
103+
), nullable = true)
104+
)
105+
)
106+
HyperdriveContext.put(HyperdriveContextKeys.keyColumnPrefix, "key__")
107+
HyperdriveContext.put(HyperdriveContextKeys.keyColumnNames, Seq("col1"))
108+
val memoryStream = new MemoryStream[Row](1, spark.sqlContext)(RowEncoder(schema))
109+
110+
val config = new BaseConfiguration()
111+
config.setListDelimiterHandler(new DefaultListDelimiterHandler(','))
112+
config.addProperty(KafkaStreamWriter.KEY_TOPIC, topic)
113+
config.addProperty(KEY_SCHEMA_REGISTRY_URL, SchemaRegistryURL)
114+
config.addProperty(KEY_SCHEMA_REGISTRY_VALUE_NAMING_STRATEGY, AbrisConfigUtil.TopicNameStrategy)
115+
config.addProperty(KEY_PRODUCE_KEYS, "true")
116+
config.addProperty(KEY_KEY_OPTIONAL_FIELDS, "col1")
117+
config.addProperty(KEY_VALUE_OPTIONAL_FIELDS, "col2, col3, col3.subCol1")
118+
val encoder = ConfluentAvroEncodingTransformer(config)
119+
120+
val expectedKeySchemaString = {
121+
raw"""{
122+
| "type" : "record",
123+
| "name" : "topLevelRecord",
124+
| "fields" : [ {
125+
| "name" : "col1",
126+
| "type" : [ "null", "int" ],
127+
| "default" : null
128+
| } ]
129+
|}
130+
|""".stripMargin
131+
}
132+
val expectedKeySchema = AvroSchemaUtils.parse(expectedKeySchemaString)
133+
134+
val expectedValueSchemaString =
135+
raw"""{
136+
| "type" : "record",
137+
| "name" : "topLevelRecord",
138+
| "fields" : [ {
139+
| "name" : "col2",
140+
| "type" : [ "null", "string" ],
141+
| "default" : null
142+
| }, {
143+
| "name" : "col3",
144+
| "type" : [ "null", {
145+
| "type" : "record",
146+
| "name" : "col3",
147+
| "namespace" : "topLevelRecord",
148+
| "fields" : [ {
149+
| "name" : "subCol1",
150+
| "type" : [ "null", "string" ],
151+
| "default" : null
152+
| } ]
153+
| } ],
154+
| "default" : null
155+
| } ]
156+
|}
157+
|""".stripMargin
158+
val expectedValueSchema = AvroSchemaUtils.parse(expectedValueSchemaString)
159+
160+
// when
161+
encoder.transform(memoryStream.toDF())
162+
163+
// then
164+
val keySchema = mockSchemaRegistryClient.getLatestSchemaMetadata(s"$topic-key")
165+
keySchema.getSchema shouldBe expectedKeySchema.toString
166+
val valueSchema = mockSchemaRegistryClient.getLatestSchemaMetadata(s"$topic-value")
167+
valueSchema.getSchema shouldBe expectedValueSchema.toString
168+
}
86169
}

0 commit comments

Comments
 (0)