Skip to content

Commit 11c2849

Browse files
Add support for Apache Spark 4.0.0-SNAPSHOT with fewer exceptions (#358)
Co-authored-by: nimrodo <[email protected]>
1 parent 416c4a3 commit 11c2849

File tree

1 file changed

+31
-11
lines changed

1 file changed

+31
-11
lines changed

src/main/scala/org/apache/spark/sql/avro/AbrisAvroDeserializer.scala

Lines changed: 31 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import org.apache.avro.Schema
2020
import org.apache.spark.sql.types.DataType
2121
import za.co.absa.commons.annotation.DeveloperApi
2222

23+
import scala.collection.mutable
2324
import scala.util.Try
2425

2526
/**
@@ -31,18 +32,37 @@ class AbrisAvroDeserializer(rootAvroType: Schema, rootCatalystType: DataType) {
3132

3233
private val deserializer = {
3334
val clazz = classOf[AvroDeserializer]
34-
Try {
35-
clazz.getConstructor(classOf[Schema], classOf[DataType])
36-
.newInstance(rootAvroType, rootCatalystType) // Spark 2.4 -
37-
}.recover { case _: NoSuchMethodException =>
38-
clazz.getConstructor(classOf[Schema], classOf[DataType], classOf[String])
39-
.newInstance(rootAvroType, rootCatalystType, "LEGACY") // Spark 3.0 - Spark 3.5.0 (including)
40-
}.recover { case _: NoSuchMethodException =>
41-
clazz.getConstructor(classOf[Schema], classOf[DataType], classOf[String], classOf[Boolean])
42-
.newInstance(rootAvroType, rootCatalystType, "LEGACY", false: java.lang.Boolean) // Spark 3.5.x +
35+
val schemaClz = classOf[Schema]
36+
val dataTypeClz = classOf[DataType]
37+
val stringClz = classOf[String]
38+
val booleanClz = classOf[Boolean]
39+
40+
clazz.getConstructors.collectFirst {
41+
case currCtor if currCtor.getParameterTypes sameElements
42+
Array(schemaClz, dataTypeClz) =>
43+
// Spark 2.4
44+
currCtor.newInstance(rootAvroType, rootCatalystType)
45+
case currCtor if currCtor.getParameterTypes sameElements
46+
Array(schemaClz, dataTypeClz, stringClz) =>
47+
// Spark 3.0 - Spark 3.5.0 (including)
48+
currCtor.newInstance(rootAvroType, rootCatalystType, "LEGACY")
49+
case currCtor if currCtor.getParameterTypes sameElements
50+
Array(schemaClz, dataTypeClz, stringClz, booleanClz) =>
51+
// Spark 3.5.1 - 3.5.2
52+
currCtor.newInstance(rootAvroType, rootCatalystType, "LEGACY", false: java.lang.Boolean)
53+
case currCtor if currCtor.getParameterTypes.toSeq sameElements
54+
Array(schemaClz, dataTypeClz, stringClz, booleanClz, stringClz) =>
55+
// Spark 4.0.0-SNAPSHOT+
56+
currCtor.newInstance(rootAvroType, rootCatalystType, "LEGACY", false: java.lang.Boolean, "")
57+
} match {
58+
case Some(value: AvroDeserializer) =>
59+
value
60+
case _ =>
61+
throw new NoSuchMethodException(
62+
s"""Supported constructors for AvroDeserializer are:
63+
|${clazz.getConstructors.toSeq.mkString(System.lineSeparator())}""".stripMargin)
4364
}
44-
.get
45-
.asInstanceOf[AvroDeserializer]
65+
4666
}
4767

4868
private val ru = scala.reflect.runtime.universe

0 commit comments

Comments
 (0)