Skip to content

Commit f5bd157

Browse files
authored
Merge pull request #63 from AbsaOSS/fix/schemas-evolution
Fixing schema evolution for Confluent-compliant records.
2 parents ed157ef + 9cb663c commit f5bd157

File tree

11 files changed

+82
-84
lines changed

11 files changed

+82
-84
lines changed

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
<modelVersion>4.0.0</modelVersion>
1212
<groupId>za.co.absa</groupId>
1313
<artifactId>abris_2.11</artifactId>
14-
<version>2.2.3</version>
14+
<version>2.2.4</version>
1515
<name>abris</name>
1616
<description>Provides seamless integration between Avro and Spark Structured APIs.</description>
1717
<url>https://github.com/AbsaOSS/ABRiS</url>

src/main/scala/za/co/absa/abris/avro/AvroSerDe.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -198,7 +198,7 @@ object AvroSerDe {
198198

199199
if (schemaId.isEmpty) {
200200
throw new InvalidParameterException(s"Schema could not be registered for topic '$topic'. Make sure that the Schema Registry " +
201-
s"is available, the parameters are correct and the schemas ar compatible")
201+
s"is available, the parameters are correct and the schemas are compatible")
202202
}
203203
else {
204204
logger.info(s"Schema successfully registered for topic '$topic' with id '{${schemaId.get}}'.")

src/main/scala/za/co/absa/abris/avro/read/confluent/ScalaConfluentKafkaAvroDeserializer.scala

Lines changed: 11 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -31,14 +31,7 @@ import za.co.absa.abris.avro.read.ScalaDatumReader
3131
*
3232
* Please, invest some time in understanding how it works and above all, read the documentation for the method 'deserialize()'.
3333
*/
34-
class ScalaConfluentKafkaAvroDeserializer(val topic: Option[String], val readerSchema: Option[Schema]) {
35-
36-
if (topic.isEmpty && readerSchema.isEmpty) {
37-
throw new IllegalArgumentException("Neither topic nor reader Schema were informed. If you want a specific schema to" +
38-
" be used for reading pass it as the readerSchema value. Otherwise, if you'd like the schema to be retrieved from" +
39-
" SchemaRegistry, pass in the topic being consume and inform the SchemaRegistry URLs by calling " +
40-
" 'configure' in this object using SchemaManager.PARAM_SCHEMA_REGISTRY_URL as the key in the map.")
41-
}
34+
class ScalaConfluentKafkaAvroDeserializer(val readerSchema: Schema) {
4235

4336
private val decoderFactory = DecoderFactory.get()
4437
private val idSchemaReader = scala.collection.mutable.Map[Int,ScalaDatumReader[ScalaAvroRecord]]()
@@ -62,9 +55,8 @@ class ScalaConfluentKafkaAvroDeserializer(val topic: Option[String], val readerS
6255
* 1. This uses the [[ScalaDatumReader]] to parse the bytes.
6356
* 2. This takes into account Confluent's specific metadata included in the payload (e.g. schema id), thus, it will
6457
* not work on regular binary Avro records.
65-
* 3. If there is a topic defined in the constructor and access to Schema Registry is configured, the schema retrieved
66-
* from the later will be considered the writer schema, otherwise, the reader schema passed to the constructor will
67-
* be used as both, reader and writer (thus notice that either, topic or reader schema must be informed).
58+
* 3. The schema retrieved from Schema Registry will be considered the writer schema, otherwise, the reader schema
59+
* passed to the constructor will be used as both, reader and writer.
6860
* 4. The Avro DatumReader is cached based on the schema id, thus, if a new id is received as part of the payload, a new
6961
* DatumReader will be created for that id, with a new schema being retrieved, iff the topic is informed and Schema
7062
* Registry is configured.
@@ -89,7 +81,7 @@ class ScalaConfluentKafkaAvroDeserializer(val topic: Option[String], val readerS
8981
val buffer = getByteBuffer(payload)
9082

9183
schemaId = buffer.getInt()
92-
val writerSchema = getWriterSchema(topic, schemaId)
84+
val writerSchema = getWriterSchema(schemaId)
9385

9486
val length = buffer.limit() - 1 - ConfluentConstants.SCHEMA_ID_SIZE_BYTES
9587
val start = buffer.position() + buffer.arrayOffset()
@@ -107,12 +99,12 @@ class ScalaConfluentKafkaAvroDeserializer(val topic: Option[String], val readerS
10799
* If there is a topic defined and the Schema Registry has been configured, the writer schema will be retrieved from
108100
* Schema Registry, otherwise, the reader schema passed on to the constructor will also the considered the writer's.
109101
*/
110-
private def getWriterSchema(topic: Option[String], id: Int): Schema = {
111-
if (topic.isDefined && SchemaManager.isSchemaRegistryConfigured) {
102+
private def getWriterSchema(id: Int): Schema = {
103+
if (SchemaManager.isSchemaRegistryConfigured) {
112104
SchemaManager.getById(id).get
113105
}
114106
else {
115-
readerSchema.get
107+
readerSchema
116108
}
117109
}
118110

@@ -136,7 +128,7 @@ class ScalaConfluentKafkaAvroDeserializer(val topic: Option[String], val readerS
136128
* the documentation of [[ScalaConfluentKafkaAvroDeserializer.deserialize()]] to understand the implications of schema
137129
* changes.
138130
*/
139-
private def getDatumReader(writerSchema: Schema, readerSchema: Option[Schema], id: Int): ScalaDatumReader[ScalaAvroRecord] = {
131+
private def getDatumReader(writerSchema: Schema, readerSchema: Schema, id: Int): ScalaDatumReader[ScalaAvroRecord] = {
140132
idSchemaReader.getOrElseUpdate(id, createDatumReader(writerSchema, readerSchema))
141133
}
142134

@@ -145,8 +137,7 @@ class ScalaConfluentKafkaAvroDeserializer(val topic: Option[String], val readerS
145137
*
146138
* If the reader schema passed on to the constructor is undefined, the writer schema is also considered the reader one.
147139
*/
148-
private def createDatumReader(writerSchema: Schema, readerSchema: Option[Schema]): ScalaDatumReader[ScalaAvroRecord] = {
149-
new ScalaDatumReader[ScalaAvroRecord](writerSchema,
150-
if (readerSchema.isDefined) readerSchema.get else writerSchema)
140+
private def createDatumReader(writerSchema: Schema, readerSchema: Schema): ScalaDatumReader[ScalaAvroRecord] = {
141+
new ScalaDatumReader[ScalaAvroRecord](writerSchema, readerSchema)
151142
}
152-
}
143+
}

src/main/scala/za/co/absa/abris/avro/serde/AvroReaderFactory.scala

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -48,22 +48,30 @@ private[avro] object AvroReaderFactory {
4848
* the parameters to do it are defined.
4949
*/
5050
def createConfiguredConfluentAvroReader(schemaPath: Option[String], schemaRegistryConf: Option[Map[String,String]]): ScalaConfluentKafkaAvroDeserializer = {
51-
val schema = if (schemaPath.isDefined) Some(AvroSchemaUtils.load(schemaPath.get)) else None
51+
val schema = resolveSchema(schemaPath, schemaRegistryConf)
5252
val configs = if (schemaRegistryConf.isDefined) schemaRegistryConf.get else Map[String,String]()
53-
val topic = if (configs.contains(SchemaManager.PARAM_SCHEMA_REGISTRY_TOPIC)) Some(configs(SchemaManager.PARAM_SCHEMA_REGISTRY_TOPIC)) else None
5453

55-
val reader = new ScalaConfluentKafkaAvroDeserializer(topic, schema)
56-
reader.configureSchemaRegistry(configs)
57-
reader
54+
createConfiguredConfluentAvroReader(schema, configs)
5855
}
5956

6057
/**
6158
* Creates an instance of [[ScalaConfluentKafkaAvroDeserializer]] and configures its Schema Registry access in case
6259
* the parameters to do it are defined.
6360
*/
6461
def createConfiguredConfluentAvroReader(schema: Schema, schemaRegistryConf: Map[String,String]): ScalaConfluentKafkaAvroDeserializer = {
65-
val reader = new ScalaConfluentKafkaAvroDeserializer(None, Some(schema))
62+
val reader = new ScalaConfluentKafkaAvroDeserializer(schema)
6663
reader.configureSchemaRegistry(schemaRegistryConf)
6764
reader
6865
}
66+
67+
private def resolveSchema(schemaPath: Option[String], schemaRegistryConf: Option[Map[String,String]]): Schema = {
68+
if (schemaPath.isEmpty && schemaRegistryConf.isEmpty) {
69+
throw new IllegalArgumentException("Schema could not be resolved: neither path nor Schema Registry configuration provided.")
70+
}
71+
72+
schemaPath match {
73+
case Some(path) => AvroSchemaUtils.load(path)
74+
case None => AvroSchemaUtils.load(schemaRegistryConf.get)
75+
}
76+
}
6977
}

src/main/scala/za/co/absa/abris/examples/ConfluentKafkaAvroReader.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ object ConfluentKafkaAvroReader {
4747
val stream = spark
4848
.readStream
4949
.format("kafka")
50+
.option("startingOffsets", "earliest")
5051
.addOptions(properties) // 1. this method will add the properties starting with "option."; 2. security options can be set in the properties file
5152

5253
val deserialized = configureExample(stream, properties)
@@ -65,7 +66,7 @@ object ConfluentKafkaAvroReader {
6566
private def configureExample(stream: DataStreamReader,props: Properties): Dataset[Row] = {
6667
import za.co.absa.abris.avro.AvroSerDe._
6768
if (props.getProperty(PARAM_EXAMPLE_SHOULD_USE_SCHEMA_REGISTRY).toBoolean) {
68-
stream.fromConfluentAvro("value", None, Some(props.getSchemaRegistryConfigurations(PARAM_OPTION_SUBSCRIBE)))(RETAIN_ORIGINAL_SCHEMA)
69+
stream.fromConfluentAvro("value", None, Some(props.getSchemaRegistryConfigurations(PARAM_OPTION_SUBSCRIBE)))(RETAIN_SELECTED_COLUMN_ONLY)
6970
}
7071
else {
7172
stream.fromConfluentAvro("value", Some(props.getProperty(PARAM_PAYLOAD_AVRO_SCHEMA)), None)(RETAIN_SELECTED_COLUMN_ONLY)

src/main/scala/za/co/absa/abris/examples/data/generation/ComplexRecordsGenerator.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ object ComplexRecordsGenerator {
4141
private val avroParser = new AvroToSparkParser()
4242
private val random = new Random()
4343

44-
def usedAvroSchema = plainSchema
44+
def usedAvroSchema: String = plainSchema
4545

4646
def generateRecords(howMany: Int): List[GenericRecord] = {
4747
val result = new Array[GenericRecord](howMany)
@@ -123,7 +123,7 @@ object ComplexRecordsGenerator {
123123
}
124124

125125
private def randomSeqOfLongs(listSize: Int) = {
126-
randomListOfLongs(listSize).asScala.toSeq
126+
randomListOfLongs(listSize).asScala
127127
}
128128

129129
private def randomListOfStrings(listSize: Int, stringLength: Int) = {
@@ -135,7 +135,7 @@ object ComplexRecordsGenerator {
135135
}
136136

137137
private def randomSeqOfStrings(listSize: Int, stringLength: Int) = {
138-
randomListOfStrings(listSize, stringLength).asScala.toSeq
138+
randomListOfStrings(listSize, stringLength).asScala
139139
}
140140

141141
private def randomString(length: Int): String = {
@@ -144,7 +144,7 @@ object ComplexRecordsGenerator {
144144
}
145145

146146
private def recordToBean(record: GenericRecord): Bean = {
147-
new Bean(
147+
Bean(
148148
record.get("bytes").toString().getBytes(),
149149
record.get("string").asInstanceOf[String],
150150
record.get("int").asInstanceOf[Int],
@@ -156,4 +156,4 @@ object ComplexRecordsGenerator {
156156
record.get("fixed").toString().getBytes,
157157
record.get("map").asInstanceOf[Map[String, java.util.ArrayList[Long]]])
158158
}
159-
}
159+
}

src/main/scala/za/co/absa/abris/examples/data/generation/TestSchemas.scala

Lines changed: 16 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -57,21 +57,22 @@ object TestSchemas {
5757
}"""
5858

5959
val NATIVE_COMPLETE_SCHEMA = """{
60-
"namespace": "all-types.test",
61-
"type": "record",
62-
"name": "native_complete",
63-
"fields":[
64-
{"name": "bytes", "type": "bytes" },
65-
{ "name": "string", "type": ["string", "null"] },
66-
{ "name": "int", "type": ["int", "null"] },
67-
{ "name": "long", "type": ["long", "null"] },
68-
{ "name": "double", "type": ["double", "null"] },
69-
{ "name": "float", "type": ["float", "null"] },
70-
{ "name": "boolean", "type": ["boolean","null"] },
71-
{ "name": "array", "type": {"type": "array", "items": "string"} },
72-
{"name": "map", "type": { "type": "map", "values": {"type": "array", "items": "long"}}},
73-
{"name": "fixed", "type": {"type": "fixed", "size": 13, "name": "fixed"}}
74-
]
60+
"namespace": "all-types.test",
61+
"type": "record",
62+
"name": "native_complete",
63+
"fields":
64+
[
65+
{"name": "bytes", "type": "bytes" },
66+
{ "name": "string", "type": ["string", "null"] , "doc":"a simple doc", "default": "default"},
67+
{ "name": "int", "type": ["int", "null"] },
68+
{ "name": "long", "type": ["long", "null"] },
69+
{ "name": "double", "type": ["double", "null"] },
70+
{ "name": "float", "type": ["float", "null"] },
71+
{ "name": "boolean", "type": ["boolean","null"] },
72+
{ "name": "array", "type": {"type": "array", "items": "string"} },
73+
{"name": "map", "type": { "type": "map", "values": {"type": "array", "items": "long"}}},
74+
{"name": "fixed", "type": {"type": "fixed", "size": 40, "name": "fixed"}}
75+
]
7576
}"""
7677

7778
val NATIVE_SCHEMA_SPEC = """{

src/test/resources/AvroReadingExample.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ key.schema.id=latest
3030
example.should.use.schema.registry=true
3131

3232
key.schema.naming.strategy=record.name
33-
value.schema.naming.strategy=topic.record.name
33+
value.schema.naming.strategy=topic.name
3434

3535
schema.name=RecordName
3636
schema.namespace=RecordNamespace

src/test/resources/DataframeWritingExample.properties

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,13 @@ job.master=local[2]
55
key.avro.schema=src/test/resources/example_key_schema.avsc
66
payload.avro.schema=src/test/resources/example_payload_schema.avsc
77

8-
avro.record.name=RecordName
8+
avro.record.name=native_complete
99

10-
avro.record.namespace=RecordNamespace
10+
avro.record.namespace=all-types.test
1111

1212
log.level=INFO
1313

14-
test.data.entries=5
14+
test.data.entries=1
1515

1616
parquet.data.source=some_parquet_file
1717

@@ -21,7 +21,7 @@ parquet.data.source.column=errCol
2121
num.partitions=1
2222

2323
# if true, schema will be inferred from Dataframe, otherwise the value set to avro.schema will be used
24-
infer.schema=true
24+
infer.schema=false
2525

2626
# if true, the application will keep running in a loop, otherwise it will execute just once
2727
execution.repeat=false
@@ -37,7 +37,7 @@ option.topic=test_topic
3737

3838
key.schema.naming.strategy=record.name
3939

40-
value.schema.naming.strategy=topic.record.name
40+
value.schema.naming.strategy=topic.name
4141

4242
# security options (comment in case the Kafka cluster is not secured)
4343
#option.kafka.security.protocol=SSL

src/test/resources/example_payload_schema.avsc

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,15 +4,15 @@
44
"name": "native_complete",
55
"fields":[
66
{"name": "bytes", "type": "bytes" },
7-
{ "name": "string", "type": ["string", "null"] , "doc":"a simple doc"},
7+
{ "name": "string", "type": ["string", "null"] , "doc":"a simple doc", "default": "default"},
88
{ "name": "int", "type": ["int", "null"] },
99
{ "name": "long", "type": ["long", "null"] },
10-
{ "name": "double", "type": ["double", "null"] },
11-
{ "name": "float", "type": ["float", "null"] },
12-
{ "name": "boolean", "type": ["boolean","null"] },
13-
{ "name": "array", "type": {"type": "array", "items": "string"} },
14-
{"name": "map", "type": { "type": "map", "values": {"type": "array", "items": "long"}}},
15-
{"name": "fixed", "type": {"type": "fixed", "size": 40, "name": "fixed"}}
10+
{ "name": "double", "type": ["double", "null"] },
11+
{ "name": "float", "type": ["float", "null"] },
12+
{ "name": "boolean", "type": ["boolean","null"] },
13+
{ "name": "array", "type": {"type": "array", "items": "string"} },
14+
{"name": "map", "type": { "type": "map", "values": {"type": "array", "items": "long"}}},
15+
{"name": "fixed", "type": {"type": "fixed", "size": 40, "name": "fixed"}}
1616
]
1717
}
1818

0 commit comments

Comments
 (0)