Skip to content

Commit 2a5d4f9

Browse files
Make member variable transient lazy (#276)
1 parent bd90f0f commit 2a5d4f9

File tree

3 files changed

+67
-2
lines changed

3 files changed

+67
-2
lines changed

src/main/scala/za/co/absa/abris/avro/sql/AvroDataToCatalyst.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ private[abris] case class AvroDataToCatalyst(
4040
schemaRegistryConf: Option[Map[String,String]]
4141
) extends UnaryExpression with ExpectsInputTypes {
4242

43-
private val schemaConverter = loadSchemaConverter(config.schemaConverter)
43+
@transient private lazy val schemaConverter = loadSchemaConverter(config.schemaConverter)
4444

4545
override def inputTypes: Seq[BinaryType.type] = Seq(BinaryType)
4646

src/test/scala/za/co/absa/abris/avro/sql/AvroDataToCatalystSpec.scala

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616

1717
package za.co.absa.abris.avro.sql
1818

19+
import org.apache.spark.SparkConf
20+
import org.apache.spark.serializer.{JavaSerializer, KryoSerializer}
1921
import org.apache.spark.sql.functions.col
2022
import org.apache.spark.sql.types.{IntegerType, LongType, StructField, StructType}
2123
import org.scalatest.BeforeAndAfterEach
@@ -101,7 +103,21 @@ class AvroDataToCatalystSpec extends AnyFlatSpec with Matchers with BeforeAndAft
101103
))
102104
.withSchemaConverter("nonexistent")
103105

104-
val ex = intercept[ClassNotFoundException](from_avro(col("avroBytes"), fromAvroConfig))
106+
val ex = intercept[ClassNotFoundException](from_avro(col("avroBytes"), fromAvroConfig).expr.dataType)
105107
ex.getMessage should include ("nonexistent")
106108
}
109+
110+
it should "be serializable" in {
111+
val schemaString = TestSchemas.NATIVE_SIMPLE_NESTED_SCHEMA
112+
val config = FromAvroConfig().withReaderSchema(schemaString)
113+
val avroDataToCatalyst = from_avro(col("col"), config).expr
114+
115+
val javaSerializer = new JavaSerializer(new SparkConf())
116+
javaSerializer.newInstance().serialize(avroDataToCatalyst)
117+
118+
val kryoSerializer = new KryoSerializer(new SparkConf())
119+
kryoSerializer.newInstance().serialize(avroDataToCatalyst)
120+
121+
// test successful if no exception is thrown
122+
}
107123
}
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/*
2+
* Copyright 2022 ABSA Group Limited
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package za.co.absa.abris.avro.sql
18+
19+
import org.apache.avro.SchemaBuilder
20+
import org.apache.spark.SparkConf
21+
import org.apache.spark.serializer.{JavaSerializer, KryoSerializer}
22+
import org.apache.spark.sql.functions.col
23+
import org.scalatest.BeforeAndAfterEach
24+
import org.scalatest.flatspec.AnyFlatSpec
25+
import org.scalatest.matchers.should.Matchers
26+
import za.co.absa.abris.avro.functions._
27+
import za.co.absa.abris.config.ToAvroConfig
28+
29+
class CatalystDataToAvroSpec extends AnyFlatSpec with Matchers with BeforeAndAfterEach {
30+
it should "be serializable" in {
31+
val schema = SchemaBuilder
32+
.record("foo")
33+
.namespace("test_namespace")
34+
.fields()
35+
.name("int").`type`().intType().noDefault()
36+
.endRecord()
37+
.toString
38+
val config = ToAvroConfig().withSchema(schema)
39+
val catalystDataToAvro = to_avro(col("col"), config).expr
40+
41+
val javaSerializer = new JavaSerializer(new SparkConf())
42+
javaSerializer.newInstance().serialize(catalystDataToAvro)
43+
44+
val kryoSerializer = new KryoSerializer(new SparkConf())
45+
kryoSerializer.newInstance().serialize(catalystDataToAvro)
46+
47+
// test successful if no exception is thrown
48+
}
49+
}

0 commit comments

Comments
 (0)