Skip to content

Commit 8e2f7b8

Browse files
committed
[AURON #1875] Minor refactor: Move legacy Spark version compatibility methods to Shims.scala
1 parent a603cc0 commit 8e2f7b8

File tree

3 files changed

+30
-24
lines changed

3 files changed

+30
-24
lines changed

spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/auron/ShimsImpl.scala

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1074,6 +1074,25 @@ class ShimsImpl extends Shims with Logging {
10741074
case _ => false
10751075
})
10761076
}
1077+
1078+
@sparkver("3.2 / 3.3 / 3.4 / 3.5")
1079+
override def getIsSkewJoinFromSHJ(exec: ShuffledHashJoinExec): Boolean = exec.isSkewJoin
1080+
1081+
@sparkver("3.0 / 3.1")
1082+
override def getIsSkewJoinFromSHJ(exec: ShuffledHashJoinExec): Boolean = false
1083+
1084+
@sparkver("3.1 / 3.2 / 3.3 / 3.4 / 3.5")
1085+
override def getShuffleOrigin(exec: ShuffleExchangeExec): Option[Any] = Some(exec.shuffleOrigin)
1086+
1087+
@sparkver("3.0")
1088+
override def getShuffleOrigin(exec: ShuffleExchangeExec): Option[Any] = None
1089+
1090+
@sparkver("3.1 / 3.2 / 3.3 / 3.4 / 3.5")
1091+
override def isNullAwareAntiJoin(exec: BroadcastHashJoinExec): Boolean =
1092+
exec.isNullAwareAntiJoin
1093+
1094+
@sparkver("3.0")
1095+
override def isNullAwareAntiJoin(exec: BroadcastHashJoinExec): Boolean = false
10771096
}
10781097

10791098
case class ForceNativeExecutionWrapper(override val child: SparkPlan)

spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConverters.scala

Lines changed: 3 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,6 @@ import org.apache.auron.metric.SparkMetricNode
7878
import org.apache.auron.protobuf.EmptyPartitionsExecNode
7979
import org.apache.auron.protobuf.PhysicalPlanNode
8080
import org.apache.auron.spark.configuration.SparkAuronConfiguration
81-
import org.apache.auron.sparkver
8281

8382
object AuronConverters extends Logging {
8483
def enableScan: Boolean =
@@ -412,21 +411,9 @@ object AuronConverters extends Logging {
412411
Shims.get.createNativeShuffleExchangeExec(
413412
outputPartitioning,
414413
addRenameColumnsExec(convertedChild),
415-
getShuffleOrigin(exec))
414+
Shims.get.getShuffleOrigin(exec))
416415
}
417416

418-
@sparkver(" 3.2 / 3.3 / 3.4 / 3.5")
419-
def getIsSkewJoinFromSHJ(exec: ShuffledHashJoinExec): Boolean = exec.isSkewJoin
420-
421-
@sparkver("3.0 / 3.1")
422-
def getIsSkewJoinFromSHJ(exec: ShuffledHashJoinExec): Boolean = false
423-
424-
@sparkver("3.1 / 3.2 / 3.3 / 3.4 / 3.5")
425-
def getShuffleOrigin(exec: ShuffleExchangeExec): Option[Any] = Some(exec.shuffleOrigin)
426-
427-
@sparkver("3.0")
428-
def getShuffleOrigin(exec: ShuffleExchangeExec): Option[Any] = None
429-
430417
def convertFileSourceScanExec(exec: FileSourceScanExec): SparkPlan = {
431418
val (
432419
relation,
@@ -606,8 +593,7 @@ object AuronConverters extends Logging {
606593
rightKeys,
607594
joinType,
608595
buildSide,
609-
getIsSkewJoinFromSHJ(exec))
610-
596+
Shims.get.getIsSkewJoinFromSHJ(exec))
611597
} catch {
612598
case _ if sparkAuronConfig.getBoolean(SparkAuronConfiguration.FORCE_SHUFFLED_HASH_JOIN) =>
613599
logWarning(
@@ -646,12 +632,6 @@ object AuronConverters extends Logging {
646632
}
647633
}
648634

649-
@sparkver("3.1 / 3.2 / 3.3 / 3.4 / 3.5")
650-
def isNullAwareAntiJoin(exec: BroadcastHashJoinExec): Boolean = exec.isNullAwareAntiJoin
651-
652-
@sparkver("3.0")
653-
def isNullAwareAntiJoin(exec: BroadcastHashJoinExec): Boolean = false
654-
655635
def convertBroadcastHashJoinExec(exec: BroadcastHashJoinExec): SparkPlan = {
656636
val buildSide = Shims.get.getJoinBuildSide(exec)
657637
try {
@@ -663,7 +643,7 @@ object AuronConverters extends Logging {
663643
exec.condition,
664644
exec.left,
665645
exec.right,
666-
isNullAwareAntiJoin(exec))
646+
Shims.get.isNullAwareAntiJoin(exec))
667647
logDebugPlanConversion(
668648
exec,
669649
Seq(

spark-extension/src/main/scala/org/apache/spark/sql/auron/Shims.scala

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,8 @@ import org.apache.spark.sql.execution.auron.plan.NativeBroadcastJoinBase
4848
import org.apache.spark.sql.execution.auron.plan.NativeSortMergeJoinBase
4949
import org.apache.spark.sql.execution.auron.shuffle.RssPartitionWriterBase
5050
import org.apache.spark.sql.execution.datasources.PartitionedFile
51-
import org.apache.spark.sql.execution.exchange.BroadcastExchangeLike
51+
import org.apache.spark.sql.execution.exchange.{BroadcastExchangeLike, ShuffleExchangeExec}
52+
import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, ShuffledHashJoinExec}
5253
import org.apache.spark.sql.execution.metric.SQLMetric
5354
import org.apache.spark.sql.hive.execution.InsertIntoHiveTable
5455
import org.apache.spark.sql.types.DataType
@@ -264,6 +265,12 @@ abstract class Shims {
264265
def getAdaptiveInputPlan(exec: AdaptiveSparkPlanExec): SparkPlan
265266

266267
def getJoinBuildSide(exec: SparkPlan): JoinBuildSide
268+
269+
def getIsSkewJoinFromSHJ(exec: ShuffledHashJoinExec): Boolean
270+
271+
def getShuffleOrigin(exec: ShuffleExchangeExec): Option[Any]
272+
273+
def isNullAwareAntiJoin(exec: BroadcastHashJoinExec): Boolean
267274
}
268275

269276
object Shims {

0 commit comments

Comments
 (0)