Skip to content

Commit b9cc7c4

Browse files
committed
[AURON #1875] Minor refactor: Move legacy Spark version compatibility methods to Shims.scala
1 parent 3264372 commit b9cc7c4

File tree

3 files changed

+23
-21
lines changed

3 files changed

+23
-21
lines changed

spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/auron/ShimsImpl.scala

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,7 @@ package org.apache.spark.sql.auron
1818

1919
import java.io.File
2020
import java.util.UUID
21-
2221
import scala.collection.mutable
23-
2422
import org.apache.commons.lang3.reflect.FieldUtils
2523
import org.apache.spark.{OneToOneDependency, ShuffleDependency, SparkContext, SparkEnv, SparkException, TaskContext}
2624
import org.apache.spark.internal.Logging
@@ -95,7 +93,7 @@ import org.apache.spark.sql.execution.auron.plan.NativeWindowBase
9593
import org.apache.spark.sql.execution.auron.plan.NativeWindowExec
9694
import org.apache.spark.sql.execution.auron.shuffle.{AuronBlockStoreShuffleReaderBase, AuronRssShuffleManagerBase, RssPartitionWriterBase}
9795
import org.apache.spark.sql.execution.datasources.PartitionedFile
98-
import org.apache.spark.sql.execution.exchange.{BroadcastExchangeLike, ReusedExchangeExec}
96+
import org.apache.spark.sql.execution.exchange.{BroadcastExchangeLike, ReusedExchangeExec, ShuffleExchangeExec}
9997
import org.apache.spark.sql.execution.joins.auron.plan.NativeBroadcastJoinExec
10098
import org.apache.spark.sql.execution.joins.auron.plan.NativeShuffledHashJoinExecProvider
10199
import org.apache.spark.sql.execution.joins.auron.plan.NativeSortMergeJoinExecProvider
@@ -108,11 +106,11 @@ import org.apache.spark.sql.types.StringType
108106
import org.apache.spark.status.ElementTrackingStore
109107
import org.apache.spark.storage.BlockManagerId
110108
import org.apache.spark.storage.FileSegment
111-
112-
import org.apache.auron.{protobuf => pb, sparkver}
109+
import org.apache.auron.{sparkver, protobuf => pb}
113110
import org.apache.auron.common.AuronBuildInfo
114111
import org.apache.auron.metric.SparkMetricNode
115112
import org.apache.auron.spark.ui.AuronBuildInfoEvent
113+
import org.apache.spark.sql.execution.joins.ShuffledHashJoinExec
116114

117115
class ShimsImpl extends Shims with Logging {
118116

@@ -1035,6 +1033,18 @@ class ShimsImpl extends Shims with Logging {
10351033
override def getAdaptiveInputPlan(exec: AdaptiveSparkPlanExec): SparkPlan = {
10361034
exec.inputPlan
10371035
}
1036+
1037+
@sparkver(" 3.2 / 3.3 / 3.4 / 3.5")
1038+
override def getIsSkewJoinFromSHJ(exec: ShuffledHashJoinExec): Boolean = exec.isSkewJoin
1039+
1040+
@sparkver("3.0 / 3.1")
1041+
override def getIsSkewJoinFromSHJ(exec: ShuffledHashJoinExec): Boolean = false
1042+
1043+
@sparkver("3.1 / 3.2 / 3.3 / 3.4 / 3.5")
1044+
override def getShuffleOrigin(exec: ShuffleExchangeExec): Option[Any] = Some(exec.shuffleOrigin)
1045+
1046+
@sparkver("3.0")
1047+
override def getShuffleOrigin(exec: ShuffleExchangeExec): Option[Any] = None
10381048
}
10391049

10401050
case class ForceNativeExecutionWrapper(override val child: SparkPlan)

spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConverters.scala

Lines changed: 2 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,6 @@ import org.apache.auron.metric.SparkMetricNode
7979
import org.apache.auron.protobuf.EmptyPartitionsExecNode
8080
import org.apache.auron.protobuf.PhysicalPlanNode
8181
import org.apache.auron.spark.configuration.SparkAuronConfiguration
82-
import org.apache.auron.sparkver
8382

8483
object AuronConverters extends Logging {
8584
def enableScan: Boolean =
@@ -421,21 +420,9 @@ object AuronConverters extends Logging {
421420
Shims.get.createNativeShuffleExchangeExec(
422421
outputPartitioning,
423422
addRenameColumnsExec(convertedChild),
424-
getShuffleOrigin(exec))
423+
Shims.get.getShuffleOrigin(exec))
425424
}
426425

427-
@sparkver(" 3.2 / 3.3 / 3.4 / 3.5")
428-
def getIsSkewJoinFromSHJ(exec: ShuffledHashJoinExec): Boolean = exec.isSkewJoin
429-
430-
@sparkver("3.0 / 3.1")
431-
def getIsSkewJoinFromSHJ(exec: ShuffledHashJoinExec): Boolean = false
432-
433-
@sparkver("3.1 / 3.2 / 3.3 / 3.4 / 3.5")
434-
def getShuffleOrigin(exec: ShuffleExchangeExec): Option[Any] = Some(exec.shuffleOrigin)
435-
436-
@sparkver("3.0")
437-
def getShuffleOrigin(exec: ShuffleExchangeExec): Option[Any] = None
438-
439426
def convertFileSourceScanExec(exec: FileSourceScanExec): SparkPlan = {
440427
val (
441428
relation,
@@ -624,7 +611,7 @@ object AuronConverters extends Logging {
624611
case BuildLeft => org.apache.spark.sql.execution.auron.plan.BuildLeft
625612
case BuildRight => org.apache.spark.sql.execution.auron.plan.BuildRight
626613
},
627-
getIsSkewJoinFromSHJ(exec))
614+
Shims.get.getIsSkewJoinFromSHJ(exec))
628615

629616
} catch {
630617
case _ if sparkAuronConfig.getBoolean(SparkAuronConfiguration.FORCE_SHUFFLED_HASH_JOIN) =>

spark-extension/src/main/scala/org/apache/spark/sql/auron/Shims.scala

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,8 @@ import org.apache.spark.sql.execution.auron.plan.NativeBroadcastJoinBase
4747
import org.apache.spark.sql.execution.auron.plan.NativeSortMergeJoinBase
4848
import org.apache.spark.sql.execution.auron.shuffle.RssPartitionWriterBase
4949
import org.apache.spark.sql.execution.datasources.PartitionedFile
50-
import org.apache.spark.sql.execution.exchange.BroadcastExchangeLike
50+
import org.apache.spark.sql.execution.exchange.{BroadcastExchangeLike, ShuffleExchangeExec}
51+
import org.apache.spark.sql.execution.joins.ShuffledHashJoinExec
5152
import org.apache.spark.sql.execution.metric.SQLMetric
5253
import org.apache.spark.sql.hive.execution.InsertIntoHiveTable
5354
import org.apache.spark.sql.types.DataType
@@ -260,6 +261,10 @@ abstract class Shims {
260261
def postTransform(plan: SparkPlan, sc: SparkContext): Unit = {}
261262

262263
def getAdaptiveInputPlan(exec: AdaptiveSparkPlanExec): SparkPlan
264+
265+
def getIsSkewJoinFromSHJ(exec: ShuffledHashJoinExec): Boolean
266+
267+
def getShuffleOrigin(exec: ShuffleExchangeExec): Option[Any]
263268
}
264269

265270
object Shims {

0 commit comments

Comments
 (0)