Skip to content

Commit b57cdc9

Browse files
Merge pull request #178 from AbsaOSS/feature/176-upgrade-abris-401
#176: Upgrade to Abris 4.0.1
2 parents b8ecdfc + 36d5688 commit b57cdc9

File tree

3 files changed

+18
-22
lines changed

3 files changed

+18
-22
lines changed

ingestor-default/src/test/scala/za/co/absa/hyperdrive/ingestor/implementation/transformer/avro/confluent/TestConfluentAvroDecodingTransformer.scala

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,16 +15,14 @@
1515

1616
package za.co.absa.hyperdrive.ingestor.implementation.transformer.avro.confluent
1717

18-
import io.confluent.kafka.schemaregistry.ParsedSchema
19-
import io.confluent.kafka.schemaregistry.avro.AvroSchema
2018
import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient
2119
import org.apache.commons.configuration2.BaseConfiguration
2220
import org.scalatest.{BeforeAndAfter, FlatSpec, Matchers}
2321
import za.co.absa.abris.avro.parsing.utils.AvroSchemaUtils
2422
import za.co.absa.abris.avro.read.confluent.SchemaManagerFactory
2523
import za.co.absa.abris.config.AbrisConfig
26-
import za.co.absa.hyperdrive.ingestor.implementation.transformer.avro.confluent.ConfluentAvroDecodingTransformer._
2724
import za.co.absa.hyperdrive.ingestor.implementation.reader.kafka.KafkaStreamReader.KEY_TOPIC
25+
import za.co.absa.hyperdrive.ingestor.implementation.transformer.avro.confluent.ConfluentAvroDecodingTransformer._
2826
import za.co.absa.hyperdrive.ingestor.implementation.utils.AbrisConfigUtil
2927

3028
class TestConfluentAvroDecodingTransformer extends FlatSpec with Matchers with BeforeAndAfter {
@@ -34,14 +32,14 @@ class TestConfluentAvroDecodingTransformer extends FlatSpec with Matchers with B
3432
private val SchemaRegistryValueSchemaId = "latest"
3533

3634
private var MockSchemaRegistryClient: MockSchemaRegistryClient = _
37-
private val DummySchema = new AvroSchema(AvroSchemaUtils.parse("""{
35+
private val DummySchema = AvroSchemaUtils.parse("""{
3836
"type": "record",
3937
"name": "default_name",
4038
"namespace": "default_namespace",
4139
"fields":[
4240
{"name": "int", "type": ["int", "null"] }
4341
]
44-
}""")).asInstanceOf[ParsedSchema]
42+
}""")
4543

4644
behavior of ConfluentAvroDecodingTransformer.getClass.getSimpleName
4745

ingestor-default/src/test/scala/za/co/absa/hyperdrive/ingestor/implementation/utils/TestAbrisConfigUtil.scala

Lines changed: 12 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,6 @@
1515

1616
package za.co.absa.hyperdrive.ingestor.implementation.utils
1717

18-
import io.confluent.kafka.schemaregistry.ParsedSchema
19-
import io.confluent.kafka.schemaregistry.avro.AvroSchema
2018
import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient
2119
import org.apache.commons.configuration2.BaseConfiguration
2220
import org.apache.spark.sql.functions._
@@ -49,8 +47,8 @@ class TestAbrisConfigUtil extends FlatSpec with Matchers with BeforeAndAfter {
4947
]
5048
}"""
5149
}
52-
private val dummyRecordNameSchema = new AvroSchema(AvroSchemaUtils.parse(getSchemaString(recordName, recordNamespace))).asInstanceOf[ParsedSchema]
53-
private val dummyTopicNameSchema = new AvroSchema(AvroSchemaUtils.parse(getSchemaString("topLevelRecord", ""))).asInstanceOf[ParsedSchema]
50+
private val dummyRecordNameSchema = AvroSchemaUtils.parse(getSchemaString(recordName, recordNamespace))
51+
private val dummyTopicNameSchema = AvroSchemaUtils.parse(getSchemaString("topLevelRecord", ""))
5452
private val dummyExpr = struct(lit(null).cast(IntegerType).as(columnName)).expr
5553

5654
private val keyTopic = "kafka.topic"
@@ -94,7 +92,7 @@ class TestAbrisConfigUtil extends FlatSpec with Matchers with BeforeAndAfter {
9492
val settings = AbrisConfigUtil.getKeyProducerSettings(config, ProducerConfigKeys, dummyExpr)
9593

9694
// then
97-
settings.schemaString shouldBe dummyTopicNameSchema.canonicalString()
95+
settings.schemaString shouldBe dummyTopicNameSchema.toString
9896
settings.schemaId shouldBe Some(1)
9997
mockSchemaRegistryClient.getAllSubjects.asScala should contain theSameElementsAs Seq(s"${topic}-key")
10098
}
@@ -111,7 +109,7 @@ class TestAbrisConfigUtil extends FlatSpec with Matchers with BeforeAndAfter {
111109
val settings = AbrisConfigUtil.getKeyProducerSettings(config, ProducerConfigKeys, dummyExpr)
112110

113111
// then
114-
settings.schemaString shouldBe dummyRecordNameSchema.canonicalString()
112+
settings.schemaString shouldBe dummyRecordNameSchema.toString
115113
settings.schemaId shouldBe Some(1)
116114
mockSchemaRegistryClient.getAllSubjects.asScala should contain theSameElementsAs Seq(s"$recordNamespace.$recordName")
117115
}
@@ -128,7 +126,7 @@ class TestAbrisConfigUtil extends FlatSpec with Matchers with BeforeAndAfter {
128126
val settings = AbrisConfigUtil.getKeyProducerSettings(config, ProducerConfigKeys, dummyExpr)
129127

130128
// then
131-
settings.schemaString shouldBe dummyRecordNameSchema.canonicalString()
129+
settings.schemaString shouldBe dummyRecordNameSchema.toString
132130
settings.schemaId shouldBe Some(1)
133131
mockSchemaRegistryClient.getAllSubjects.asScala should contain theSameElementsAs Seq(s"$topic-$recordNamespace.$recordName")
134132
}
@@ -143,7 +141,7 @@ class TestAbrisConfigUtil extends FlatSpec with Matchers with BeforeAndAfter {
143141
val settings = AbrisConfigUtil.getValueProducerSettings(config, ProducerConfigKeys, dummyExpr)
144142

145143
// then
146-
settings.schemaString shouldBe dummyTopicNameSchema.canonicalString()
144+
settings.schemaString shouldBe dummyTopicNameSchema.toString
147145
settings.schemaId shouldBe Some(1)
148146
mockSchemaRegistryClient.getAllSubjects.asScala should contain theSameElementsAs Seq(s"$topic-value")
149147
}
@@ -161,7 +159,7 @@ class TestAbrisConfigUtil extends FlatSpec with Matchers with BeforeAndAfter {
161159
val settings = AbrisConfigUtil.getKeyConsumerSettings(config, ConsumerConfigKeys)
162160

163161
// then
164-
settings.schemaString shouldBe dummyTopicNameSchema.canonicalString()
162+
settings.schemaString shouldBe dummyTopicNameSchema.toString
165163
settings.schemaRegistryConf.get shouldBe Map("schema.registry.url" -> dummySchemaRegistryUrl)
166164
}
167165

@@ -176,7 +174,7 @@ class TestAbrisConfigUtil extends FlatSpec with Matchers with BeforeAndAfter {
176174
{"name": "${columnName}2", "type": ["int", "null"] }
177175
]
178176
}"""
179-
val schema2 = new AvroSchema(AvroSchemaUtils.parse(schema2String)).asInstanceOf[ParsedSchema]
177+
val schema2 = AvroSchemaUtils.parse(schema2String)
180178
mockSchemaRegistryClient.register(s"$topic-key", dummyTopicNameSchema)
181179
mockSchemaRegistryClient.register(s"$topic-key", schema2)
182180
val config = createBaseConfiguration
@@ -188,7 +186,7 @@ class TestAbrisConfigUtil extends FlatSpec with Matchers with BeforeAndAfter {
188186
val settings = AbrisConfigUtil.getKeyConsumerSettings(config, ConsumerConfigKeys)
189187

190188
// then
191-
settings.schemaString shouldBe schema2.canonicalString()
189+
settings.schemaString shouldBe schema2.toString
192190
settings.schemaRegistryConf.get shouldBe Map("schema.registry.url" -> dummySchemaRegistryUrl)
193191
}
194192

@@ -207,7 +205,7 @@ class TestAbrisConfigUtil extends FlatSpec with Matchers with BeforeAndAfter {
207205
val settings = AbrisConfigUtil.getKeyConsumerSettings(config, ConsumerConfigKeys)
208206

209207
// then
210-
settings.schemaString shouldBe dummyRecordNameSchema.canonicalString()
208+
settings.schemaString shouldBe dummyRecordNameSchema.toString
211209
settings.schemaRegistryConf.get shouldBe Map("schema.registry.url" -> dummySchemaRegistryUrl)
212210
}
213211

@@ -226,7 +224,7 @@ class TestAbrisConfigUtil extends FlatSpec with Matchers with BeforeAndAfter {
226224
val settings = AbrisConfigUtil.getKeyConsumerSettings(config, ConsumerConfigKeys)
227225

228226
// then
229-
settings.schemaString shouldBe dummyRecordNameSchema.canonicalString()
227+
settings.schemaString shouldBe dummyRecordNameSchema.toString
230228
settings.schemaRegistryConf.get shouldBe Map("schema.registry.url" -> dummySchemaRegistryUrl)
231229
}
232230

@@ -243,7 +241,7 @@ class TestAbrisConfigUtil extends FlatSpec with Matchers with BeforeAndAfter {
243241
val settings = AbrisConfigUtil.getValueConsumerSettings(config, ConsumerConfigKeys)
244242

245243
// then
246-
settings.schemaString shouldBe dummyTopicNameSchema.canonicalString()
244+
settings.schemaString shouldBe dummyTopicNameSchema.toString
247245
settings.schemaRegistryConf.get shouldBe Map("schema.registry.url" -> dummySchemaRegistryUrl)
248246
}
249247

parent-conf/pom.xml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,14 +40,14 @@
4040
<properties>
4141
<!--Enforced versions-->
4242
<fasterxml.jackson.databind.version>2.6.7.1</fasterxml.jackson.databind.version> <!--Same as Spark uses-->
43-
<avro.version>1.9.2</avro.version> <!--Same as Abris uses-->
43+
<avro.version>1.8.2</avro.version> <!--Same as Abris uses-->
4444

4545
<!--Maven-->
4646
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
4747
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
4848

4949
<!--ABRiS-->
50-
<abris.version>4.0.0</abris.version>
50+
<abris.version>4.0.1</abris.version>
5151

5252
<!--Scala-->
5353
<scalatest.version>3.0.5</scalatest.version>
@@ -65,7 +65,7 @@
6565
<kafka.spark.version>0-10</kafka.spark.version>
6666
<spark.sql.kafka.version>2.4.3</spark.sql.kafka.version>
6767
<testcontainers.kafka.version>1.12.4</testcontainers.kafka.version>
68-
<kafka.avro.serializer.version>5.5.1</kafka.avro.serializer.version>
68+
<kafka.avro.serializer.version>5.3.4</kafka.avro.serializer.version> <!--Same as Abris uses-->
6969

7070
<!--Spark-->
7171
<spark.version>2.4.3</spark.version>

0 commit comments

Comments
 (0)