Skip to content

Commit 398435f

Browse files
LuciferYangdongjoon-hyun
authored andcommitted
[SPARK-43898][CORE] Automatically register immutable.ArraySeq$ofRef to KryoSerializer for Scala 2.13
SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt clean "sql/Test/runMain org.apache.spark.sql.execution.benchmark.TPCDSQueryBenchmark --data-location /Users/yangjie01/Tools/tpcds-sf-1 --query-filter q17" -Pscala-2.13 ### What changes were proposed in this pull request? This pr aims to automatically register `immutable.ArraySeq$ofRef` to `KryoSerializer` for Scala 2.13 to make `TPCDSQueryBenchmark` can run successfully using Scala 2.13. ### Why are the changes needed? Scala 2.13 introduced `scala.collection.immutable.ArraySeq$ofRef`, but it has not been registered with `KryoSerializer`, so when run `TPCDSQueryBenchmark` using `KryoSerializer` and enabled `spark.kryo.registrationRequired`(This is the default behavior after SPARK-42074), there will be the following error: ``` Error: Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Failed to serialize task 741, not attempting to retry it. Exception during serialization: java.io.IOException: java.lang.IllegalArgumentException: Class is not registered: scala.collection.immutable.ArraySeq$ofRef Note: To register this class use: kryo.register(scala.collection.immutable.ArraySeq$ofRef.class); at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2815) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2751) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2750) at scala.collection.immutable.List.foreach(List.scala:333) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2750) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1218) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1218) at scala.Option.foreach(Option.scala:437) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1218) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3014) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2953) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2942) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:983) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2285) at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WriteToDataSourceV2Exec.scala:385) at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2$(WriteToDataSourceV2Exec.scala:359) at org.apache.spark.sql.execution.datasources.v2.OverwriteByExpressionExec.writeWithV2(WriteToDataSourceV2Exec.scala:243) at org.apache.spark.sql.execution.datasources.v2.V2ExistingTableWriteExec.run(WriteToDataSourceV2Exec.scala:337) at org.apache.spark.sql.execution.datasources.v2.V2ExistingTableWriteExec.run$(WriteToDataSourceV2Exec.scala:336) at org.apache.spark.sql.execution.datasources.v2.OverwriteByExpressionExec.run(WriteToDataSourceV2Exec.scala:243) at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:43) at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:43) at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.executeCollect(V2CommandExec.scala:49) at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:98) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:118) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:195) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:103) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:825) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65) at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98) at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:512) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:104) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:512) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:488) at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94) at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81) at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79) at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:133) at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:858) at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:318) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:247) at org.apache.spark.sql.execution.benchmark.SqlBasedBenchmark$DatasetToBenchmark.noop(SqlBasedBenchmark.scala:70) at org.apache.spark.sql.execution.benchmark.TPCDSQueryBenchmark$.$anonfun$runTpcdsQueries$5(TPCDSQueryBenchmark.scala:111) at org.apache.spark.benchmark.Benchmark.$anonfun$addCase$1(Benchmark.scala:77) at org.apache.spark.benchmark.Benchmark.$anonfun$addCase$1$adapted(Benchmark.scala:75) at org.apache.spark.benchmark.Benchmark.measure(Benchmark.scala:140) at org.apache.spark.benchmark.Benchmark.$anonfun$run$1(Benchmark.scala:106) at scala.collection.StrictOptimizedIterableOps.map(StrictOptimizedIterableOps.scala:100) at scala.collection.StrictOptimizedIterableOps.map$(StrictOptimizedIterableOps.scala:87) at scala.collection.mutable.ArrayBuffer.map(ArrayBuffer.scala:43) at org.apache.spark.benchmark.Benchmark.run(Benchmark.scala:104) at org.apache.spark.sql.execution.benchmark.TPCDSQueryBenchmark$.$anonfun$runTpcdsQueries$1(TPCDSQueryBenchmark.scala:113) at org.apache.spark.sql.execution.benchmark.TPCDSQueryBenchmark$.$anonfun$runTpcdsQueries$1$adapted(TPCDSQueryBenchmark.scala:91) at scala.collection.immutable.List.foreach(List.scala:333) at org.apache.spark.sql.execution.benchmark.TPCDSQueryBenchmark$.runTpcdsQueries(TPCDSQueryBenchmark.scala:91) at org.apache.spark.sql.execution.benchmark.TPCDSQueryBenchmark$.runBenchmarkSuite(TPCDSQueryBenchmark.scala:185) at org.apache.spark.benchmark.BenchmarkBase.main(BenchmarkBase.scala:72) at org.apache.spark.sql.execution.benchmark.TPCDSQueryBenchmark.main(TPCDSQueryBenchmark.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.benchmark.Benchmarks$.$anonfun$main$7(Benchmarks.scala:128) at scala.collection.ArrayOps$.foreach$extension(ArrayOps.scala:1328) at org.apache.spark.benchmark.Benchmarks$.main(Benchmarks.scala:91) at org.apache.spark.benchmark.Benchmarks.main(Benchmarks.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52) at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:1025) at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:192) at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:215) at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:91) at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1116) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1125) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) ``` `KryoSerializer` is unable to optimize Scala 2.13 Spark because of missing register `immutable.ArraySeq$ofRef`, so this pr has added its registration for Scala 2.13. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? - Add new case to check serde of `scala.collection.immutable.ArraySeq$ofRef` for Scala 2.13. - Check `TPCDSQueryBenchmark` on Github Action: **Before** https://github.com/LuciferYang/spark/actions/runs/5129288422/jobs/9226895410 <img width="1290" alt="image" src="https://github.com/apache/spark/assets/1475305/4effdc9e-153c-4126-bbaf-3206ab98579e"> **After** https://github.com/LuciferYang/spark/actions/runs/5132285127 <img width="1225" alt="image" src="https://github.com/apache/spark/assets/1475305/89c4e2ac-848b-4acb-9d01-1bb4b86d09a1"> Closes #41402 from LuciferYang/SPARK-43898. Authored-by: yangjie01 <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent 33a7681 commit 398435f

File tree

2 files changed

+15
-0
lines changed

2 files changed

+15
-0
lines changed

core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import javax.annotation.Nullable
2626
import scala.collection.JavaConverters._
2727
import scala.collection.mutable.ArrayBuffer
2828
import scala.reflect.ClassTag
29+
import scala.util.Properties
2930
import scala.util.control.NonFatal
3031

3132
import com.esotericsoftware.kryo.{Kryo, KryoException, Serializer => KryoClassSerializer}
@@ -219,6 +220,9 @@ class KryoSerializer(conf: SparkConf)
219220

220221
kryo.register(None.getClass)
221222
kryo.register(Nil.getClass)
223+
if (Properties.versionNumberString.startsWith("2.13")) {
224+
kryo.register(Utils.classForName("scala.collection.immutable.ArraySeq$ofRef"))
225+
}
222226
kryo.register(Utils.classForName("scala.collection.immutable.$colon$colon"))
223227
kryo.register(Utils.classForName("scala.collection.immutable.Map$EmptyMap$"))
224228
kryo.register(Utils.classForName("scala.math.Ordering$Reverse"))

core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -551,6 +551,17 @@ class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext {
551551
val set = new OpenHashMap[Double, Double](10)
552552
ser.serialize(set)
553553
}
554+
555+
test("SPARK-43898: Register scala.collection.immutable.ArraySeq$ofRef for Scala 2.13") {
556+
assume(scala.util.Properties.versionNumberString.startsWith("2.13"))
557+
val conf = new SparkConf(false)
558+
conf.set(KRYO_REGISTRATION_REQUIRED, true)
559+
val ser = new KryoSerializer(conf).newInstance()
560+
def check[T: ClassTag](t: T): Unit = {
561+
assert(ser.deserialize[T](ser.serialize(t)) === t)
562+
}
563+
check(Utils.classForName("scala.collection.immutable.ArraySeq$ofRef"))
564+
}
554565
}
555566

556567
class KryoSerializerAutoResetDisabledSuite extends SparkFunSuite with SharedSparkContext {

0 commit comments

Comments
 (0)