Skip to content

Commit 3934523

Browse files
authored
[GLUTEN-11379][CORE] Clean up Spark shims APIs following Spark 3.2 deprecation (#11687)
Since Spark 3.2 has been dropped, we need to clean up those shims APIs which were introduced to fix Spark code differences between Spark 3.2 and later versions. Then, the implementation for those APIs can be moved to the caller side. ``` getDistribution convertPartitionTransforms getTextScan bloomFilterExpressionMappings newBloomFilterAggregate newMightContain replaceBloomFilterAggregate replaceMightContain getShuffleReaderParam getPartitionId supportDuplicateReadingTracking getFileSizeAndModificationTime dateTimestampFormatInReadIsDefaultValue genDecimalRoundExpressionOutput ```
1 parent a96acea commit 3934523

File tree

20 files changed

+160
-865
lines changed

20 files changed

+160
-865
lines changed

backends-clickhouse/src-delta20/main/scala/org/apache/spark/sql/execution/datasources/v2/clickhouse/ClickHouseSparkCatalog.scala

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,6 @@
1616
*/
1717
package org.apache.spark.sql.execution.datasources.v2.clickhouse
1818

19-
import org.apache.gluten.sql.shims.SparkShimLoader
20-
2119
import org.apache.spark.sql.{AnalysisException, DataFrame, SparkSession}
2220
import org.apache.spark.sql.catalyst.TableIdentifier
2321
import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, NoSuchNamespaceException, NoSuchTableException}
@@ -35,6 +33,7 @@ import org.apache.spark.sql.delta.metering.DeltaLogging
3533
import org.apache.spark.sql.delta.sources.{DeltaSourceUtils, DeltaSQLConf}
3634
import org.apache.spark.sql.execution.datasources.{DataSource, PartitioningUtils}
3735
import org.apache.spark.sql.execution.datasources.v2.clickhouse.utils.CHDataSourceUtils
36+
import org.apache.spark.sql.execution.datasources.v2.utils.CatalogUtil
3837
import org.apache.spark.sql.sources.InsertableRelation
3938
import org.apache.spark.sql.types.StructType
4039

@@ -119,7 +118,7 @@ class ClickHouseSparkCatalog
119118
sourceQuery: Option[DataFrame],
120119
operation: TableCreationModes.CreationMode): Table = {
121120
val (partitionColumns, maybeBucketSpec) =
122-
SparkShimLoader.getSparkShims.convertPartitionTransforms(partitions)
121+
CatalogUtil.convertPartitionTransforms(partitions)
123122
var newSchema = schema
124123
var newPartitionColumns = partitionColumns
125124
var newBucketSpec = maybeBucketSpec
@@ -232,7 +231,7 @@ class ClickHouseSparkCatalog
232231
case _ => true
233232
}.toMap
234233
val (partitionColumns, maybeBucketSpec) =
235-
SparkShimLoader.getSparkShims.convertPartitionTransforms(partitions)
234+
CatalogUtil.convertPartitionTransforms(partitions)
236235
var newSchema = schema
237236
var newPartitionColumns = partitionColumns
238237
var newBucketSpec = maybeBucketSpec

backends-clickhouse/src-delta23/main/scala/org/apache/spark/sql/execution/datasources/v2/clickhouse/ClickHouseSparkCatalog.scala

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,6 @@
1616
*/
1717
package org.apache.spark.sql.execution.datasources.v2.clickhouse
1818

19-
import org.apache.gluten.sql.shims.SparkShimLoader
20-
2119
import org.apache.spark.sql.{AnalysisException, DataFrame, SparkSession}
2220
import org.apache.spark.sql.catalyst.TableIdentifier
2321
import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, NoSuchNamespaceException, NoSuchTableException}
@@ -35,6 +33,7 @@ import org.apache.spark.sql.delta.metering.DeltaLogging
3533
import org.apache.spark.sql.delta.sources.{DeltaSourceUtils, DeltaSQLConf}
3634
import org.apache.spark.sql.execution.datasources.{DataSource, PartitioningUtils}
3735
import org.apache.spark.sql.execution.datasources.v2.clickhouse.utils.CHDataSourceUtils
36+
import org.apache.spark.sql.execution.datasources.v2.utils.CatalogUtil
3837
import org.apache.spark.sql.sources.InsertableRelation
3938
import org.apache.spark.sql.types.StructType
4039

@@ -119,7 +118,7 @@ class ClickHouseSparkCatalog
119118
sourceQuery: Option[DataFrame],
120119
operation: TableCreationModes.CreationMode): Table = {
121120
val (partitionColumns, maybeBucketSpec) =
122-
SparkShimLoader.getSparkShims.convertPartitionTransforms(partitions)
121+
CatalogUtil.convertPartitionTransforms(partitions)
123122
var newSchema = schema
124123
var newPartitionColumns = partitionColumns
125124
var newBucketSpec = maybeBucketSpec
@@ -232,7 +231,7 @@ class ClickHouseSparkCatalog
232231
case _ => true
233232
}.toMap
234233
val (partitionColumns, maybeBucketSpec) =
235-
SparkShimLoader.getSparkShims.convertPartitionTransforms(partitions)
234+
CatalogUtil.convertPartitionTransforms(partitions)
236235
var newSchema = schema
237236
var newPartitionColumns = partitionColumns
238237
var newBucketSpec = maybeBucketSpec

backends-clickhouse/src-delta33/main/scala/org/apache/spark/sql/execution/datasources/v2/clickhouse/ClickHouseSparkCatalog.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,6 @@
1616
*/
1717
package org.apache.spark.sql.execution.datasources.v2.clickhouse
1818

19-
import org.apache.gluten.sql.shims.SparkShimLoader
20-
2119
import org.apache.spark.SparkException
2220
import org.apache.spark.sql.{AnalysisException, DataFrame, SparkSession}
2321
import org.apache.spark.sql.catalyst.TableIdentifier
@@ -39,6 +37,7 @@ import org.apache.spark.sql.delta.sources.{DeltaSourceUtils, DeltaSQLConf}
3937
import org.apache.spark.sql.delta.stats.StatisticsCollection
4038
import org.apache.spark.sql.execution.datasources.{DataSource, PartitioningUtils}
4139
import org.apache.spark.sql.execution.datasources.v2.clickhouse.utils.CHDataSourceUtils
40+
import org.apache.spark.sql.execution.datasources.v2.utils.CatalogUtil
4241
import org.apache.spark.sql.sources.InsertableRelation
4342
import org.apache.spark.sql.types.StructType
4443

@@ -136,7 +135,7 @@ class ClickHouseSparkCatalog
136135
sourceQuery: Option[DataFrame],
137136
operation: TableCreationModes.CreationMode): Table = {
138137
val (partitionColumns, maybeBucketSpec) =
139-
SparkShimLoader.getSparkShims.convertPartitionTransforms(partitions)
138+
CatalogUtil.convertPartitionTransforms(partitions)
140139
var newSchema = schema
141140
var newPartitionColumns = partitionColumns
142141
var newBucketSpec = maybeBucketSpec

backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -199,14 +199,8 @@ class CHIteratorApi extends IteratorApi with Logging with LogLevelUtil {
199199
}
200200
partitionColumns.add(partitionColumn)
201201

202-
val (fileSize, modificationTime) =
203-
SparkShimLoader.getSparkShims.getFileSizeAndModificationTime(file)
204-
(fileSize, modificationTime) match {
205-
case (Some(size), Some(time)) =>
206-
fileSizes.add(JLong.valueOf(size))
207-
modificationTimes.add(JLong.valueOf(time))
208-
case _ =>
209-
}
202+
fileSizes.add(file.fileSize)
203+
modificationTimes.add(file.modificationTime)
210204

211205
val otherConstantMetadataColumnValues =
212206
DeltaShimLoader.getDeltaShims.convertRowIndexFilterIdEncoded(

backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ import org.apache.spark.shuffle.utils.CHShuffleUtil
4141
import org.apache.spark.sql.catalyst.catalog.BucketSpec
4242
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
4343
import org.apache.spark.sql.catalyst.expressions._
44-
import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, CollectList, CollectSet}
44+
import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, BloomFilterAggregate, CollectList, CollectSet}
4545
import org.apache.spark.sql.catalyst.optimizer.BuildSide
4646
import org.apache.spark.sql.catalyst.plans.JoinType
4747
import org.apache.spark.sql.catalyst.plans.physical.{BroadcastMode, HashPartitioning, Partitioning, RangePartitioning}
@@ -56,7 +56,7 @@ import org.apache.spark.sql.execution.joins.{BuildSideRelation, ClickHouseBuildS
5656
import org.apache.spark.sql.execution.metric.SQLMetric
5757
import org.apache.spark.sql.execution.utils.{CHExecUtil, PushDownUtil}
5858
import org.apache.spark.sql.execution.window._
59-
import org.apache.spark.sql.types.{DecimalType, StructType}
59+
import org.apache.spark.sql.types.StructType
6060
import org.apache.spark.sql.vectorized.ColumnarBatch
6161
import org.apache.spark.util.SparkVersionUtil
6262

@@ -602,7 +602,10 @@ class CHSparkPlanExecApi extends SparkPlanExecApi with Logging {
602602
CHFlattenedExpression.sigOr
603603
) ++
604604
ExpressionExtensionTrait.expressionExtensionSigList ++
605-
SparkShimLoader.getSparkShims.bloomFilterExpressionMappings()
605+
Seq(
606+
Sig[BloomFilterMightContain](ExpressionNames.MIGHT_CONTAIN),
607+
Sig[BloomFilterAggregate](ExpressionNames.BLOOM_FILTER_AGG)
608+
)
606609
}
607610

608611
/** Define backend-specific expression converter. */
@@ -940,12 +943,6 @@ class CHSparkPlanExecApi extends SparkPlanExecApi with Logging {
940943

941944
override def genPostProjectForGenerate(generate: GenerateExec): SparkPlan = generate
942945

943-
override def genDecimalRoundExpressionOutput(
944-
decimalType: DecimalType,
945-
toScale: Int): DecimalType = {
946-
SparkShimLoader.getSparkShims.genDecimalRoundExpressionOutput(decimalType, toScale)
947-
}
948-
949946
override def genWindowGroupLimitTransformer(
950947
partitionSpec: Seq[Expression],
951948
orderSpec: Seq[SortOrder],

backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -83,9 +83,9 @@ class VeloxIteratorApi extends IteratorApi with Logging {
8383
val locations = filePartitions.flatMap(p => SoftAffinity.getFilePartitionLocations(p))
8484
val (paths, starts, lengths) = getPartitionedFileInfo(partitionFiles).unzip3
8585
val (fileSizes, modificationTimes) = partitionFiles
86-
.map(f => SparkShimLoader.getSparkShims.getFileSizeAndModificationTime(f))
86+
.map(f => (f.fileSize, f.modificationTime))
8787
.collect {
88-
case (Some(size), Some(time)) =>
88+
case (size, time) =>
8989
(JLong.valueOf(size), JLong.valueOf(time))
9090
}
9191
.unzip

backends-velox/src/main/scala/org/apache/gluten/expression/VeloxBloomFilterMightContain.scala

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,11 @@
1616
*/
1717
package org.apache.gluten.expression
1818

19-
import org.apache.gluten.sql.shims.SparkShimLoader
2019
import org.apache.gluten.utils.VeloxBloomFilter
2120

2221
import org.apache.spark.sql.catalyst.InternalRow
2322
import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
24-
import org.apache.spark.sql.catalyst.expressions.{BinaryExpression, Expression}
23+
import org.apache.spark.sql.catalyst.expressions.{BinaryExpression, BloomFilterMightContain, Expression}
2524
import org.apache.spark.sql.catalyst.expressions.codegen._
2625
import org.apache.spark.sql.catalyst.expressions.codegen.Block.BlockHelper
2726
import org.apache.spark.sql.types.DataType
@@ -43,8 +42,7 @@ case class VeloxBloomFilterMightContain(
4342
extends BinaryExpression {
4443
import VeloxBloomFilterMightContain._
4544

46-
private val delegate =
47-
SparkShimLoader.getSparkShims.newMightContain(bloomFilterExpression, valueExpression)
45+
private val delegate = BloomFilterMightContain(bloomFilterExpression, valueExpression)
4846

4947
override def prettyName: String = "velox_might_contain"
5048

backends-velox/src/main/scala/org/apache/gluten/expression/aggregate/VeloxBloomFilterAggregate.scala

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,12 @@
1616
*/
1717
package org.apache.gluten.expression.aggregate
1818

19-
import org.apache.gluten.sql.shims.SparkShimLoader
2019
import org.apache.gluten.utils.VeloxBloomFilter
2120

2221
import org.apache.spark.sql.catalyst.InternalRow
2322
import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
2423
import org.apache.spark.sql.catalyst.expressions.Expression
25-
import org.apache.spark.sql.catalyst.expressions.aggregate.TypedImperativeAggregate
24+
import org.apache.spark.sql.catalyst.expressions.aggregate.{BloomFilterAggregate, TypedImperativeAggregate}
2625
import org.apache.spark.sql.catalyst.trees.TernaryLike
2726
import org.apache.spark.sql.internal.SQLConf
2827
import org.apache.spark.sql.types._
@@ -47,12 +46,12 @@ case class VeloxBloomFilterAggregate(
4746
extends TypedImperativeAggregate[BloomFilter]
4847
with TernaryLike[Expression] {
4948

50-
private val delegate = SparkShimLoader.getSparkShims.newBloomFilterAggregate[BloomFilter](
49+
private val delegate = BloomFilterAggregate(
5150
child,
5251
estimatedNumItemsExpression,
5352
numBitsExpression,
5453
mutableAggBufferOffset,
55-
inputAggBufferOffset)
54+
inputAggBufferOffset).asInstanceOf[TypedImperativeAggregate[BloomFilter]]
5655

5756
override def prettyName: String = "velox_bloom_filter_agg"
5857

backends-velox/src/main/scala/org/apache/gluten/extension/ArrowConvertorRule.scala

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,13 @@ package org.apache.gluten.extension
1919
import org.apache.gluten.backendsapi.BackendsApiManager
2020
import org.apache.gluten.datasource.ArrowCSVFileFormat
2121
import org.apache.gluten.datasource.v2.ArrowCSVTable
22-
import org.apache.gluten.sql.shims.SparkShimLoader
2322

2423
import org.apache.spark.annotation.Experimental
2524
import org.apache.spark.sql.SparkSession
2625
import org.apache.spark.sql.catalyst.csv.CSVOptions
2726
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
2827
import org.apache.spark.sql.catalyst.rules.Rule
29-
import org.apache.spark.sql.catalyst.util.PermissiveMode
28+
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, PermissiveMode}
3029
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation}
3130
import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat
3231
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
@@ -102,6 +101,7 @@ case class ArrowConvertorRule(session: SparkSession) extends Rule[LogicalPlan] {
102101
}
103102

104103
private def checkCsvOptions(csvOptions: CSVOptions, timeZone: String): Boolean = {
104+
val default = new CSVOptions(CaseInsensitiveMap(Map()), csvOptions.columnPruning, timeZone)
105105
csvOptions.headerFlag && !csvOptions.multiLine &&
106106
csvOptions.delimiter.length == 1 &&
107107
csvOptions.quote == '\"' &&
@@ -112,7 +112,9 @@ case class ArrowConvertorRule(session: SparkSession) extends Rule[LogicalPlan] {
112112
csvOptions.nullValue == "" &&
113113
csvOptions.emptyValueInRead == "" && csvOptions.comment == '\u0000' &&
114114
csvOptions.columnPruning &&
115-
SparkShimLoader.getSparkShims.dateTimestampFormatInReadIsDefaultValue(csvOptions, timeZone)
115+
csvOptions.dateFormatInRead == default.dateFormatInRead &&
116+
csvOptions.timestampFormatInRead == default.timestampFormatInRead &&
117+
csvOptions.timestampNTZFormatInRead == default.timestampNTZFormatInRead
116118
}
117119

118120
}

backends-velox/src/main/scala/org/apache/gluten/extension/BloomFilterMightContainJointRewriteRule.scala

Lines changed: 35 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,10 @@ package org.apache.gluten.extension
1919
import org.apache.gluten.config.GlutenConfig
2020
import org.apache.gluten.expression.VeloxBloomFilterMightContain
2121
import org.apache.gluten.expression.aggregate.VeloxBloomFilterAggregate
22-
import org.apache.gluten.sql.shims.SparkShimLoader
2322

2423
import org.apache.spark.sql.SparkSession
24+
import org.apache.spark.sql.catalyst.expressions.{BinaryExpression, BloomFilterMightContain, Expression}
25+
import org.apache.spark.sql.catalyst.expressions.aggregate.{BloomFilterAggregate, TypedImperativeAggregate}
2526
import org.apache.spark.sql.catalyst.rules.Rule
2627
import org.apache.spark.sql.execution.SparkPlan
2728

@@ -40,12 +41,42 @@ case class BloomFilterMightContainJointRewriteRule(
4041
out
4142
}
4243

44+
private def replaceBloomFilterAggregate[T](
45+
expr: Expression,
46+
bloomFilterAggReplacer: (
47+
Expression,
48+
Expression,
49+
Expression,
50+
Int,
51+
Int) => TypedImperativeAggregate[T]): Expression = expr match {
52+
case BloomFilterAggregate(
53+
child,
54+
estimatedNumItemsExpression,
55+
numBitsExpression,
56+
mutableAggBufferOffset,
57+
inputAggBufferOffset) =>
58+
bloomFilterAggReplacer(
59+
child,
60+
estimatedNumItemsExpression,
61+
numBitsExpression,
62+
mutableAggBufferOffset,
63+
inputAggBufferOffset)
64+
case other => other
65+
}
66+
67+
private def replaceMightContain[T](
68+
expr: Expression,
69+
mightContainReplacer: (Expression, Expression) => BinaryExpression): Expression = expr match {
70+
case BloomFilterMightContain(bloomFilterExpression, valueExpression) =>
71+
mightContainReplacer(bloomFilterExpression, valueExpression)
72+
case other => other
73+
}
74+
4375
private def applyForNode(p: SparkPlan) = {
4476
p.transformExpressions {
4577
case e =>
46-
SparkShimLoader.getSparkShims.replaceMightContain(
47-
SparkShimLoader.getSparkShims
48-
.replaceBloomFilterAggregate(e, VeloxBloomFilterAggregate.apply),
78+
replaceMightContain(
79+
replaceBloomFilterAggregate(e, VeloxBloomFilterAggregate.apply),
4980
VeloxBloomFilterMightContain.apply)
5081
}
5182
}

0 commit comments

Comments
 (0)