Skip to content

Commit 1eff5df

Browse files
authored
[Spark] Replace the default pattern matching for LogicalRelation to LogicalRelationWithTable (delta-io#3805)
#### Which Delta project/connector is this regarding? <!-- Please add the component selected below to the beginning of the pull request title For example: [Spark] Title of my pull request --> - [x] Spark - [ ] Standalone - [ ] Flink - [ ] Kernel - [ ] Other (fill in here) ## Description This PR proposes to replace the default pattern matching for `LogicalRelation` to newly introduced pattern object `LogicalRelationWithTable` which will be available in upcoming Spark 4.0. This change helps the project to modify less pieces of code when Spark makes the change to the LogicalRelation; most pattern matchings with LogicalRelation only extract the relation and catalog table, hence they fit with LogicalRelationWithTable. ## How was this patch tested? Existing tests would suffice. ## Does this PR introduce _any_ user-facing changes? No. --------- Signed-off-by: Jungtaek Lim <[email protected]>
1 parent 010a44c commit 1eff5df

File tree

11 files changed

+63
-33
lines changed

11 files changed

+63
-33
lines changed

sharing/src/main/scala/io/delta/sharing/spark/DeltaFormatSharingLimitPushDown.scala

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import org.apache.spark.sql.SparkSession
2222
import org.apache.spark.sql.catalyst.expressions.IntegerLiteral
2323
import org.apache.spark.sql.catalyst.plans.logical.{LocalLimit, LogicalPlan}
2424
import org.apache.spark.sql.catalyst.rules.Rule
25-
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation}
25+
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelationWithTable}
2626

2727
// A spark rule that applies limit pushdown to DeltaSharingFileIndex, when the config is enabled.
2828
// To allow only fetching needed files from delta sharing server.
@@ -38,10 +38,8 @@ object DeltaFormatSharingLimitPushDown extends Rule[LogicalPlan] {
3838
p transform {
3939
case localLimit @ LocalLimit(
4040
literalExpr @ IntegerLiteral(limit),
41-
l @ LogicalRelation(
41+
l @ LogicalRelationWithTable(
4242
r @ HadoopFsRelation(remoteIndex: DeltaSharingFileIndex, _, _, _, _, _),
43-
_,
44-
_,
4543
_
4644
)
4745
) if (ConfUtils.limitPushdownEnabled(p.conf) && remoteIndex.limitHint.isEmpty) =>
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/*
2+
* Copyright (2021) The Delta Lake Project Authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.apache.spark.sql.execution.datasources
18+
19+
import org.apache.spark.sql.catalyst.catalog.CatalogTable
20+
import org.apache.spark.sql.sources.BaseRelation
21+
22+
// Handles a breaking change between Spark 3.5 and Spark Master (4.0).
23+
// `LogicalRelationWithTable` is a new object in Spark 4.0.
24+
25+
/**
26+
* Extract the [[BaseRelation]] and [[CatalogTable]] from [[LogicalRelation]]. You can also
27+
* retrieve the instance of LogicalRelation like following:
28+
*
29+
* case l @ LogicalRelationWithTable(relation, catalogTable) => ...
30+
*/
31+
object LogicalRelationWithTable {
32+
def unapply(plan: LogicalRelation): Option[(BaseRelation, Option[CatalogTable])] = {
33+
Some(plan.relation, plan.catalogTable)
34+
}
35+
}

spark/src/main/scala/org/apache/spark/sql/delta/DeltaAnalysis.scala

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -61,8 +61,7 @@ import org.apache.spark.sql.connector.expressions.{FieldReference, IdentityTrans
6161
import org.apache.spark.sql.errors.QueryCompilationErrors
6262
import org.apache.spark.sql.execution.command.CreateTableLikeCommand
6363
import org.apache.spark.sql.execution.command.RunnableCommand
64-
import org.apache.spark.sql.execution.datasources.HadoopFsRelation
65-
import org.apache.spark.sql.execution.datasources.LogicalRelation
64+
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation, LogicalRelationWithTable}
6665
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
6766
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
6867
import org.apache.spark.sql.execution.streaming.StreamingRelation
@@ -327,8 +326,8 @@ class DeltaAnalysis(session: SparkSession)
327326
case TimeTravel(u: UnresolvedRelation, _, _, _) =>
328327
u.tableNotFound(u.multipartIdentifier)
329328

330-
case LogicalRelation(
331-
HadoopFsRelation(location, _, _, _, _: ParquetFileFormat, _), _, catalogTable, _) =>
329+
case LogicalRelationWithTable(
330+
HadoopFsRelation(location, _, _, _, _: ParquetFileFormat, _), catalogTable) =>
332331
val tableIdent = catalogTable.map(_.identifier)
333332
.getOrElse(TableIdentifier(location.rootPaths.head.toString, Some("parquet")))
334333
val provider = if (catalogTable.isDefined) {
@@ -836,7 +835,7 @@ class DeltaAnalysis(session: SparkSession)
836835
output = CloneTableCommand.output)
837836

838837
// Non-delta metastore table already exists at target
839-
case LogicalRelation(_, _, existingCatalogTable @ Some(catalogTable), _) =>
838+
case LogicalRelationWithTable(_, existingCatalogTable @ Some(catalogTable)) =>
840839
val tblIdent = catalogTable.identifier
841840
val path = new Path(catalogTable.location)
842841
val newCatalogTable = createCatalogTableForCloneCommand(path, byPath = false, tblIdent,

spark/src/main/scala/org/apache/spark/sql/delta/DeltaTable.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{Filter, LeafNode, LogicalPla
4040
import org.apache.spark.sql.catalyst.util.CharVarcharCodegenUtils
4141
import org.apache.spark.sql.connector.catalog.Identifier
4242
import org.apache.spark.sql.connector.expressions.{FieldReference, IdentityTransform}
43-
import org.apache.spark.sql.execution.datasources.{FileFormat, FileIndex, HadoopFsRelation, LogicalRelation}
43+
import org.apache.spark.sql.execution.datasources.{FileFormat, FileIndex, HadoopFsRelation, LogicalRelation, LogicalRelationWithTable}
4444
import org.apache.spark.sql.internal.SQLConf
4545
import org.apache.spark.sql.types._
4646
import org.apache.spark.sql.util.CaseInsensitiveStringMap
@@ -50,7 +50,7 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap
5050
*/
5151
object RelationFileIndex {
5252
def unapply(a: LogicalRelation): Option[FileIndex] = a match {
53-
case LogicalRelation(hrel: HadoopFsRelation, _, _, _) => Some(hrel.location)
53+
case LogicalRelationWithTable(hrel: HadoopFsRelation, _) => Some(hrel.location)
5454
case _ => None
5555
}
5656
}
@@ -416,7 +416,7 @@ object DeltaTableUtils extends PredicateHelper
416416
target: LogicalPlan,
417417
fileIndex: FileIndex): LogicalPlan = {
418418
target transform {
419-
case l @ LogicalRelation(hfsr: HadoopFsRelation, _, _, _) =>
419+
case l @ LogicalRelationWithTable(hfsr: HadoopFsRelation, _) =>
420420
l.copy(relation = hfsr.copy(location = fileIndex)(hfsr.sparkSession))
421421
}
422422
}
@@ -454,7 +454,7 @@ object DeltaTableUtils extends PredicateHelper
454454
}
455455

456456
target transformUp {
457-
case l@LogicalRelation(hfsr: HadoopFsRelation, _, _, _) =>
457+
case l@LogicalRelationWithTable(hfsr: HadoopFsRelation, _) =>
458458
// Prune columns from the scan.
459459
val prunedOutput = l.output.filterNot { col =>
460460
columnsToDrop.exists(resolver(_, col.name))
@@ -488,7 +488,7 @@ object DeltaTableUtils extends PredicateHelper
488488
target: LogicalPlan,
489489
updatedFileFormat: FileFormat): LogicalPlan = {
490490
target transform {
491-
case l @ LogicalRelation(hfsr: HadoopFsRelation, _, _, _) =>
491+
case l @ LogicalRelationWithTable(hfsr: HadoopFsRelation, _) =>
492492
l.copy(
493493
relation = hfsr.copy(fileFormat = updatedFileFormat)(hfsr.sparkSession))
494494
}

spark/src/main/scala/org/apache/spark/sql/delta/GenerateRowIDs.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import org.apache.spark.sql.catalyst.expressions._
2222
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
2323
import org.apache.spark.sql.catalyst.rules.Rule
2424
import org.apache.spark.sql.catalyst.trees.TreePattern.PLAN_EXPRESSION
25-
import org.apache.spark.sql.execution.datasources.{FileFormat, HadoopFsRelation, LogicalRelation}
25+
import org.apache.spark.sql.execution.datasources.{FileFormat, HadoopFsRelation, LogicalRelation, LogicalRelationWithTable}
2626
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
2727
import org.apache.spark.sql.types.StructType
2828

@@ -38,7 +38,7 @@ object GenerateRowIDs extends Rule[LogicalPlan] {
3838
*/
3939
private object DeltaScanWithRowTrackingEnabled {
4040
def unapply(plan: LogicalPlan): Option[LogicalRelation] = plan match {
41-
case scan @ LogicalRelation(relation: HadoopFsRelation, _, _, _) =>
41+
case scan @ LogicalRelationWithTable(relation: HadoopFsRelation, _) =>
4242
relation.fileFormat match {
4343
case format: DeltaParquetFileFormat
4444
if RowTracking.isEnabled(format.protocol, format.metadata) => Some(scan)
@@ -50,7 +50,7 @@ object GenerateRowIDs extends Rule[LogicalPlan] {
5050

5151
override def apply(plan: LogicalPlan): LogicalPlan = plan.transformUpWithNewOutput {
5252
case DeltaScanWithRowTrackingEnabled(
53-
scan @ LogicalRelation(baseRelation: HadoopFsRelation, _, _, _)) =>
53+
scan @ LogicalRelationWithTable(baseRelation: HadoopFsRelation, _)) =>
5454
// While Row IDs and commit versions are non-nullable, we'll use the Row ID & commit
5555
// version attributes to read the materialized values from now on, which can be null. We make
5656
// the materialized Row ID & commit version attributes nullable in the scan here.

spark/src/main/scala/org/apache/spark/sql/delta/PreprocessTableWithDVs.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import org.apache.spark.sql.SparkSession
2525
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, Literal}
2626
import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral
2727
import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project}
28-
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation}
28+
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation, LogicalRelationWithTable}
2929
import org.apache.spark.sql.execution.datasources.FileFormat.METADATA_NAME
3030
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
3131
import org.apache.spark.sql.types.StructType
@@ -59,9 +59,9 @@ trait PreprocessTableWithDVs extends SubqueryTransformerHelper {
5959

6060
object ScanWithDeletionVectors {
6161
def unapply(a: LogicalRelation): Option[LogicalPlan] = a match {
62-
case scan @ LogicalRelation(
62+
case scan @ LogicalRelationWithTable(
6363
relation @ HadoopFsRelation(
64-
index: TahoeFileIndex, _, _, _, format: DeltaParquetFileFormat, _), _, _, _) =>
64+
index: TahoeFileIndex, _, _, _, format: DeltaParquetFileFormat, _), _) =>
6565
dvEnabledScanFor(scan, relation, format, index)
6666
case _ => None
6767
}

spark/src/main/scala/org/apache/spark/sql/delta/PreprocessTableWithDVsStrategy.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ import org.apache.spark.sql.{SparkSession, Strategy}
2020
import org.apache.spark.sql.catalyst.planning.ScanOperation
2121
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
2222
import org.apache.spark.sql.execution.SparkPlan
23-
import org.apache.spark.sql.execution.datasources.{FileSourceStrategy, HadoopFsRelation, LogicalRelation}
23+
import org.apache.spark.sql.execution.datasources.{FileSourceStrategy, HadoopFsRelation, LogicalRelationWithTable}
2424

2525
/**
2626
* Strategy to process tables with DVs and add the skip row column and filters.
@@ -35,7 +35,7 @@ case class PreprocessTableWithDVsStrategy(session: SparkSession)
3535
with PreprocessTableWithDVs {
3636

3737
override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
38-
case ScanOperation(_, _, _, _ @ LogicalRelation(_: HadoopFsRelation, _, _, _)) =>
38+
case ScanOperation(_, _, _, _ @ LogicalRelationWithTable(_: HadoopFsRelation, _)) =>
3939
val updatedPlan = preprocessTablesWithDVs(plan)
4040
FileSourceStrategy(updatedPlan)
4141
case _ => Nil

spark/src/main/scala/org/apache/spark/sql/delta/commands/DMLWithDeletionVectorsHelper.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ import org.apache.spark.paths.SparkPath
3939
import org.apache.spark.sql._
4040
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression}
4141
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
42-
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation}
42+
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelationWithTable}
4343
import org.apache.spark.sql.execution.datasources.FileFormat.{FILE_PATH, METADATA_NAME}
4444
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
4545
import org.apache.spark.sql.functions.{col, lit}
@@ -92,8 +92,8 @@ object DMLWithDeletionVectorsHelper extends DeltaCommand {
9292
var fileMetadataCol: AttributeReference = null
9393

9494
val newTarget = target.transformUp {
95-
case l @ LogicalRelation(
96-
hfsr @ HadoopFsRelation(_, _, _, _, format: DeltaParquetFileFormat, _), _, _, _) =>
95+
case l @ LogicalRelationWithTable(
96+
hfsr @ HadoopFsRelation(_, _, _, _, format: DeltaParquetFileFormat, _), _) =>
9797
fileMetadataCol = format.createFileMetadataCol()
9898
// Take the existing schema and add additional metadata columns
9999
if (useMetadataRowIndex) {

spark/src/main/scala/org/apache/spark/sql/delta/commands/DeltaCommand.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ import org.apache.spark.sql.catalyst.parser.ParseException
4141
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
4242
import org.apache.spark.sql.connector.catalog.V1Table
4343
import org.apache.spark.sql.execution.SQLExecution
44-
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation}
44+
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelationWithTable}
4545
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
4646
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
4747

@@ -194,9 +194,9 @@ trait DeltaCommand extends DeltaLogging {
194194
try {
195195
resolveIdentifier(analyzer, tableIdent) match {
196196
// is path
197-
case LogicalRelation(HadoopFsRelation(_, _, _, _, _, _), _, None, _) => false
197+
case LogicalRelationWithTable(HadoopFsRelation(_, _, _, _, _, _), None) => false
198198
// is table
199-
case LogicalRelation(HadoopFsRelation(_, _, _, _, _, _), _, Some(_), _) => true
199+
case LogicalRelationWithTable(HadoopFsRelation(_, _, _, _, _, _), Some(_)) => true
200200
// is iceberg table
201201
case DataSourceV2Relation(_: IcebergTablePlaceHolder, _, _, _, _) => false
202202
// could not resolve table/db

spark/src/test/scala/org/apache/spark/sql/delta/DeltaSinkSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -241,7 +241,7 @@ class DeltaSinkSuite
241241

242242
// Verify the correct partitioning schema has been inferred
243243
val hadoopFsRelations = outputDf.queryExecution.analyzed.collect {
244-
case LogicalRelation(baseRelation, _, _, _) if
244+
case LogicalRelationWithTable(baseRelation, _) if
245245
baseRelation.isInstanceOf[HadoopFsRelation] =>
246246
baseRelation.asInstanceOf[HadoopFsRelation]
247247
}

0 commit comments

Comments
 (0)