Skip to content

Commit 712fe9b

Browse files
Feature/138 preserve schema (#257)
* Initial impl * Initial tests * Refactorings * Add provider config * Add some tests * Add tests * Add more tests / small refactoring * Add documentation. Rename metadata keys * Fix scalastyle issues * Exclude .json from apache-rat check * Upgrade abris version * Fix test, don't use RowEncoder because it doesn't transfer metadata * PR fixes
1 parent 404faa5 commit 712fe9b

27 files changed

+1246
-24
lines changed

README.md

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -123,9 +123,9 @@ If no file exists, the reader will fail.
123123

124124
Any additional properties can be added with the prefix `reader.parquet.options.`. See [Spark Structured Streaming Documentation](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#input-sources)
125125

126-
##### ConfluentAvroStreamDecodingTransformer
127-
The `ConfluentAvroStreamDecodingTransformer` is built on [ABRiS](https://github.com/AbsaOSS/ABRiS). More details about the configuration properties can be found there.
128-
**Caution**: The `ConfluentAvroStreamDecodingTransformer` requires the property `reader.kafka.topic` to be set.
126+
##### ConfluentAvroDecodingTransformer
127+
The `ConfluentAvroDecodingTransformer` is built on [ABRiS](https://github.com/AbsaOSS/ABRiS). More details about the configuration properties can be found there.
128+
**Caution**: The `ConfluentAvroDecodingTransformer` requires the property `reader.kafka.topic` to be set.
129129

130130
| Property Name | Required | Description |
131131
| :--- | :---: | :--- |
@@ -142,14 +142,18 @@ The `ConfluentAvroStreamDecodingTransformer` is built on [ABRiS](https://github.
142142
| `transformer.{transformer-id}.keep.columns` | No | Comma-separated list of columns to keep (e.g. offset, partition) |
143143
| `transformer.{transformer-id}.disable.nullability.preservation` | No | Set to true to ignore fix [#137](https://github.com/AbsaOSS/hyperdrive/issues/137) and to keep the same behaviour as for versions prior to and including v3.2.2. Default value: `false` |
144144
| `transformer.{transformer-id}.schema.registry.basic.auth.user.info.file` | No | A path to a text file, that contains one line in the form `<username>:<password>`. It will be passed as `basic.auth.user.info` to the schema registry config |
145+
| `transformer.{transformer-id}.use.advanced.schema.conversion` | No | Set to true to convert the avro schema using [AdvancedAvroToSparkConverter](https://github.com/AbsaOSS/hyperdrive/blob/develop/ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/transformer/avro/confluent/AdvancedAvroToSparkConverter.scala), which puts default value and underlying avro type to struct field metadata. Default false |
145146

146147
For detailed information on the subject name strategy, please take a look at the [Schema Registry Documentation](https://docs.confluent.io/current/schema-registry/).
147148

148149
Any additional properties for the schema registry config can be added with the prefix `transformer.{transformer-id}.schema.registry.options.`
149150

150-
##### ConfluentAvroStreamEncodingTransformer
151-
The `ConfluentAvroStreamEncodingTransformer` is built on [ABRiS](https://github.com/AbsaOSS/ABRiS). More details about the configuration properties can be found there.
152-
**Caution**: The `ConfluentAvroStreamEncodingTransformer` requires the property `writer.kafka.topic` to be set.
151+
Note: `use.advanced.schema.conversion` only works with a patched version of Spark, due to bug [SPARK-34805](https://issues.apache.org/jira/browse/SPARK-34805).
152+
For the latest version of Spark, the patch is available in https://github.com/apache/spark/pull/35270. For other versions of Spark, the changes need to be cherry-picked and built locally.
153+
154+
##### ConfluentAvroEncodingTransformer
155+
The `ConfluentAvroEncodingTransformer` is built on [ABRiS](https://github.com/AbsaOSS/ABRiS). More details about the configuration properties can be found there.
156+
**Caution**: The `ConfluentAvroEncodingTransformer` requires the property `writer.kafka.topic` to be set.
153157

154158
| Property Name | Required | Description |
155159
| :--- | :---: | :--- |
@@ -164,6 +168,7 @@ The `ConfluentAvroStreamEncodingTransformer` is built on [ABRiS](https://github.
164168
| `transformer.{transformer-id}.key.schema.record.namespace` | Yes for key naming strategies `record.name` and `topic.record.name` | Namespace of the record. |
165169
| `transformer.{transformer-id}.key.optional.fields` | No | Comma-separated list of nullable key columns that should get default value null in the avro schema. Nested columns' names should be concatenated with the dot (`.`) |
166170
| `transformer.{transformer-id}.schema.registry.basic.auth.user.info.file` | No | A path to a text file, that contains one line in the form `<username>:<password>`. It will be passed as `basic.auth.user.info` to the schema registry config |
171+
| `transformer.{transformer-id}.use.advanced.schema.conversion` | No | Set to true to convert the avro schema using [AdvancedSparkToAvroConverter](https://github.com/AbsaOSS/hyperdrive/blob/develop/ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/transformer/avro/confluent/AdvancedSparkToAvroConverter.scala), which reads default value and underlying avro type from struct field metadata. Default false |
167172

168173
Any additional properties for the schema registry config can be added with the prefix `transformer.{transformer-id}.schema.registry.options.`
169174

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
#
2+
# Copyright 2018 ABSA Group Limited
3+
#
4+
# Licensed under the Apache License, Version 2.0 (the "License");
5+
# you may not use this file except in compliance with the License.
6+
# You may obtain a copy of the License at
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
#
15+
za.co.absa.hyperdrive.ingestor.implementation.transformer.avro.confluent.AdvancedAvroToSparkConverter
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
/*
2+
* Copyright 2018 ABSA Group Limited
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software
10+
* distributed under the License is distributed on an "AS IS" BASIS,
11+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
* See the License for the specific language governing permissions and
13+
* limitations under the License.
14+
*/
15+
16+
package za.co.absa.hyperdrive.ingestor.implementation.transformer.avro.confluent
17+
18+
import org.apache.avro.Schema
19+
import org.apache.avro.Schema.Type._
20+
import org.apache.avro.util.internal.JacksonUtils
21+
import org.apache.spark.sql.avro.SchemaConverters
22+
import org.apache.spark.sql.types._
23+
import org.codehaus.jackson.map.ObjectMapper
24+
import za.co.absa.abris.avro.sql.SchemaConverter
25+
import za.co.absa.hyperdrive.ingestor.implementation.transformer.avro.confluent.SparkMetadataKeys._
26+
27+
import java.io.ByteArrayOutputStream
28+
import scala.collection.JavaConverters._
29+
30+
// scalastyle:off
31+
class AdvancedAvroToSparkConverter extends SchemaConverter {
32+
override val shortName: String = AdvancedAvroToSparkConverter.name
33+
private lazy val objectMapper = new ObjectMapper()
34+
35+
case class SchemaType(dataType: DataType, nullable: Boolean, avroType: Option[Schema])
36+
37+
/**
38+
* This function takes an avro schema and returns a sql schema.
39+
*/
40+
override def toSqlType(avroSchema: Schema): DataType = {
41+
toSqlTypeHelper(avroSchema, Set.empty).dataType
42+
}
43+
44+
def toSqlTypeHelper(avroSchema: Schema, existingRecordNames: Set[String]): SchemaType = {
45+
avroSchema.getType match {
46+
case RECORD =>
47+
if (existingRecordNames.contains(avroSchema.getFullName)) {
48+
throw new IncompatibleSchemaException(s"""
49+
|Found recursive reference in Avro schema, which can not be processed by Spark:
50+
|${avroSchema.toString(true)}
51+
""".stripMargin)
52+
}
53+
val newRecordNames = existingRecordNames + avroSchema.getFullName
54+
val fields = avroSchema.getFields.asScala.map { f =>
55+
val metadataBuilder = new MetadataBuilder()
56+
val defaultJsonOpt = Option(JacksonUtils.toJsonNode(f.defaultVal()))
57+
val metadataBuilderWithDefault = defaultJsonOpt match {
58+
case Some(defaultJson) =>
59+
val baos = new ByteArrayOutputStream()
60+
objectMapper.writeValue(baos, defaultJson)
61+
val r = metadataBuilder.putString(DefaultValueKey, baos.toString)
62+
baos.close()
63+
r
64+
case None => metadataBuilder
65+
}
66+
67+
val schemaType = toSqlTypeHelper(f.schema(), newRecordNames)
68+
schemaType.avroType
69+
.map(_.toString)
70+
.map(schema => metadataBuilderWithDefault.putString(AvroTypeKey, schema).build())
71+
.map(metadata => StructField(f.name, schemaType.dataType, schemaType.nullable, metadata))
72+
.getOrElse(StructField(f.name, schemaType.dataType, schemaType.nullable, metadataBuilderWithDefault.build()))
73+
}
74+
75+
SchemaType(StructType(fields), nullable = false, None)
76+
77+
case ARRAY =>
78+
val schemaType = toSqlTypeHelper(avroSchema.getElementType, existingRecordNames)
79+
SchemaType(
80+
ArrayType(schemaType.dataType, containsNull = schemaType.nullable),
81+
nullable = false,
82+
schemaType.avroType)
83+
84+
case MAP =>
85+
val schemaType = toSqlTypeHelper(avroSchema.getValueType, existingRecordNames)
86+
SchemaType(
87+
MapType(StringType, schemaType.dataType, valueContainsNull = schemaType.nullable),
88+
nullable = false,
89+
schemaType.avroType)
90+
91+
case UNION =>
92+
if (avroSchema.getTypes.asScala.exists(_.getType == NULL)) {
93+
// In case of a union with null, eliminate it and make a recursive call
94+
val remainingUnionTypes = avroSchema.getTypes.asScala.filterNot(_.getType == NULL)
95+
if (remainingUnionTypes.size == 1) {
96+
toSqlTypeHelper(remainingUnionTypes.head, existingRecordNames).copy(nullable = true)
97+
} else {
98+
toSqlTypeHelper(Schema.createUnion(remainingUnionTypes.asJava), existingRecordNames)
99+
.copy(nullable = true)
100+
}
101+
} else avroSchema.getTypes.asScala.map(_.getType) match {
102+
case Seq(t1) =>
103+
toSqlTypeHelper(avroSchema.getTypes.get(0), existingRecordNames)
104+
case Seq(t1, t2) if Set(t1, t2) == Set(INT, LONG) =>
105+
SchemaType(LongType, nullable = false, Option(avroSchema))
106+
case Seq(t1, t2) if Set(t1, t2) == Set(FLOAT, DOUBLE) =>
107+
SchemaType(DoubleType, nullable = false, Option(avroSchema))
108+
case _ =>
109+
// Convert complex unions to struct types where field names are member0, member1, etc.
110+
// This is consistent with the behavior when converting between Avro and Parquet.
111+
val fields = avroSchema.getTypes.asScala.zipWithIndex.map {
112+
case (s, i) =>
113+
val schemaType = toSqlTypeHelper(s, existingRecordNames)
114+
schemaType.avroType
115+
.map(_.toString)
116+
.map(schema => new MetadataBuilder().putString(AvroTypeKey, schema).build())
117+
.map(metadata => StructField(s"member$i", schemaType.dataType, schemaType.nullable, metadata))
118+
// All fields are nullable because only one of them is set at a time
119+
.getOrElse(StructField(s"member$i", schemaType.dataType, nullable = true))
120+
}
121+
122+
SchemaType(StructType(fields), nullable = false, None)
123+
}
124+
125+
case _ =>
126+
val originalSchemaType = SchemaConverters.toSqlType(avroSchema)
127+
SchemaType(originalSchemaType.dataType, originalSchemaType.nullable, Option(avroSchema))
128+
}
129+
}
130+
}
131+
132+
// scalastyle:on
133+
object AdvancedAvroToSparkConverter {
134+
val name = "advanced"
135+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
/*
2+
* Copyright 2018 ABSA Group Limited
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software
10+
* distributed under the License is distributed on an "AS IS" BASIS,
11+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
* See the License for the specific language governing permissions and
13+
* limitations under the License.
14+
*/
15+
16+
package za.co.absa.hyperdrive.ingestor.implementation.transformer.avro.confluent
17+
18+
import org.apache.avro.LogicalTypes.TimestampMillis
19+
import org.apache.avro.Schema.Type._
20+
import org.apache.avro.util.internal.JacksonUtils
21+
import org.apache.avro.{JsonProperties, LogicalTypes, Schema, SchemaBuilder}
22+
import org.apache.spark.sql.avro.SchemaConverters
23+
import org.apache.spark.sql.types.Decimal.minBytesForPrecision
24+
import org.apache.spark.sql.types._
25+
import org.codehaus.jackson.map.ObjectMapper
26+
27+
import java.util.Objects
28+
import scala.util.Try
29+
import za.co.absa.hyperdrive.ingestor.implementation.transformer.avro.confluent.SparkMetadataKeys._
30+
31+
object AdvancedSparkToAvroConverter extends SparkToAvroConverter {
32+
private lazy val nullSchema = Schema.create(Schema.Type.NULL)
33+
private lazy val objectMapper = new ObjectMapper()
34+
35+
override def apply(catalystType: DataType, nullable: Boolean, recordName: String, nameSpace: String): Schema =
36+
toAvroType(catalystType, None, nullable, None, recordName, nameSpace)
37+
38+
// scalastyle:off
39+
private def toAvroType(
40+
catalystType: DataType,
41+
avroSchema: Option[Schema],
42+
nullable: Boolean = false,
43+
defaultValue: Option[Object] = None,
44+
recordName: String = "topLevelRecord",
45+
nameSpace: String = "")
46+
: Schema = {
47+
val builder = SchemaBuilder.builder()
48+
49+
val schema = catalystType match {
50+
case TimestampType => avroSchema match {
51+
case Some(schema) if schema.getLogicalType.isInstanceOf[TimestampMillis] =>
52+
LogicalTypes.timestampMillis().addToSchema(builder.longType())
53+
case _ => LogicalTypes.timestampMicros().addToSchema(builder.longType())
54+
}
55+
case d: DecimalType => avroSchema match {
56+
case Some(schema) if schema.getType == BYTES =>
57+
val avroType = LogicalTypes.decimal(d.precision, d.scale)
58+
avroType.addToSchema(SchemaBuilder.builder().bytesType())
59+
case _ => getDecimalFixedType(d, avroSchema, nameSpace, recordName)
60+
}
61+
case BinaryType => avroSchema match {
62+
case Some(schema) if schema.getType == FIXED =>
63+
val name = getFixedName(recordName, nameSpace)
64+
builder
65+
.fixed(name)
66+
.size(schema.getFixedSize)
67+
case _ => builder.bytesType()
68+
}
69+
case ArrayType(et, containsNull) =>
70+
builder.array()
71+
.items(toAvroType(et, avroSchema, containsNull, defaultValue, recordName, nameSpace))
72+
case MapType(StringType, vt, valueContainsNull) =>
73+
builder.map()
74+
.values(toAvroType(vt, avroSchema, valueContainsNull, defaultValue, recordName, nameSpace))
75+
case st: StructType =>
76+
val childNameSpace = if (nameSpace != "") s"$nameSpace.$recordName" else recordName
77+
val fieldsAssembler = builder.record(recordName).namespace(nameSpace).fields()
78+
st.foreach { f =>
79+
val schema = Try(f.metadata.getString(AvroTypeKey)).toOption
80+
.map(schema => new Schema.Parser().parse(schema))
81+
val defaultValueOpt = Try(f.metadata.getString(DefaultValueKey))
82+
.flatMap(defaultJsonString => Try {
83+
val jsonNode = objectMapper.readTree(defaultJsonString)
84+
JacksonUtils.toObject(jsonNode)
85+
}).toOption
86+
val fieldAvroType =
87+
toAvroType(f.dataType, schema, f.nullable, defaultValueOpt, f.name, childNameSpace)
88+
defaultValueOpt match {
89+
case Some(defaultObject) if !Objects.equals(defaultObject, JsonProperties.NULL_VALUE) =>
90+
fieldsAssembler.name(f.name).`type`(fieldAvroType).withDefault(defaultObject)
91+
case Some(_) =>
92+
fieldsAssembler.name(f.name).`type`(fieldAvroType).withDefault(null)
93+
case _ => fieldsAssembler.name(f.name).`type`(fieldAvroType).noDefault()
94+
}
95+
}
96+
fieldsAssembler.endRecord()
97+
98+
// nullability is handled later in this method, thus pass nullable = false
99+
case _ => SchemaConverters.toAvroType(catalystType, nullable = false, recordName, nameSpace)
100+
}
101+
if (nullable) {
102+
defaultValue match {
103+
case Some(value) if !value.isInstanceOf[JsonProperties.Null] => Schema.createUnion(schema, nullSchema)
104+
case _ => Schema.createUnion(nullSchema, schema)
105+
}
106+
} else {
107+
schema
108+
}
109+
}
110+
111+
// scalastyle:on
112+
private def getDecimalFixedType(d: DecimalType, avroSchema: Option[Schema], nameSpace: String, recordName: String) = {
113+
val avroType = LogicalTypes.decimal(d.precision, d.scale)
114+
val name = getFixedName(recordName, nameSpace)
115+
val minBytes = minBytesForPrecision(d.precision)
116+
val size = avroSchema.map { schema =>
117+
if (schema.getFixedSize > minBytes) schema.getFixedSize else minBytes
118+
}.getOrElse {
119+
minBytes
120+
}
121+
avroType.addToSchema(SchemaBuilder.fixed(name).size(size))
122+
}
123+
124+
private def getFixedName(recordName: String, nameSpace: String) = {
125+
// Need to avoid naming conflict for the fixed fields
126+
nameSpace match {
127+
case "" => s"$recordName.fixed"
128+
case _ => s"$nameSpace.$recordName.fixed"
129+
}
130+
}
131+
}

ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/transformer/avro/confluent/ConfluentAvroDecodingTransformer.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,7 @@ object ConfluentAvroDecodingTransformer extends StreamTransformerFactory with Co
113113
override val namingStrategy: String = KEY_SCHEMA_REGISTRY_VALUE_NAMING_STRATEGY
114114
override val recordName: String = KEY_SCHEMA_REGISTRY_VALUE_RECORD_NAME
115115
override val recordNamespace: String = KEY_SCHEMA_REGISTRY_VALUE_RECORD_NAMESPACE
116+
override val useAdvancedSchemaConversion: String = KEY_USE_ADVANCED_SCHEMA_CONVERSION
116117
}
117118

118119
override def apply(config: Configuration): StreamTransformer = {

ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/transformer/avro/confluent/ConfluentAvroDecodingTransformerAttributes.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ trait ConfluentAvroDecodingTransformerAttributes extends HasComponentAttributes
3232

3333
val KEY_KEEP_COLUMNS = "keep.columns"
3434
val KEY_DISABLE_NULLABILITY_PRESERVATION = "disable.nullability.preservation"
35-
35+
val KEY_USE_ADVANCED_SCHEMA_CONVERSION = "use.advanced.schema.conversion"
3636

3737
override def getName: String = "Confluent Avro Stream Decoder"
3838

@@ -55,7 +55,8 @@ trait ConfluentAvroDecodingTransformerAttributes extends HasComponentAttributes
5555
KEY_SCHEMA_REGISTRY_KEY_RECORD_NAMESPACE -> PropertyMetadata("Key-Record namespace", Some("Key-Record namespace for naming strategies record.name or topic.record.name"), required = false),
5656
KEY_KEEP_COLUMNS -> PropertyMetadata("Columns to keep", Some("Comma-separated list of columns to keep (e.g. offset, partition)"), required = false),
5757
KEY_DISABLE_NULLABILITY_PRESERVATION -> PropertyMetadata("Disable nullability preservation", Some("Keep same behaviour as for versions prior to and including v3.2.2"), required = false),
58-
KEY_SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO_FILE -> PropertyMetadata("Basic auth user info file", Some("Text file containing one line in the form <username>:<password> for basic auth in schema registry"), required = false)
58+
KEY_SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO_FILE -> PropertyMetadata("Basic auth user info file", Some("Text file containing one line in the form <username>:<password> for basic auth in schema registry"), required = false),
59+
KEY_USE_ADVANCED_SCHEMA_CONVERSION -> PropertyMetadata("Use advanced Avro - Spark schema conversion", Some("Stores logical type and default value in Spark column metadata. Default false"), required = false)
5960
)
6061

6162
override def getExtraConfigurationPrefix: Option[String] = Some(KEY_SCHEMA_REGISTRY_EXTRA_CONFS_ROOT)

0 commit comments

Comments
 (0)