Skip to content

Commit 634ba15

Browse files
authored
[SPARK] Rewrite GENERATE SYMLINK MANIFEST command to use Spark table resolution (delta-io#3914)
<!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md 2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP] Your PR title ...'. 3. Be sure to keep the PR description updated to reflect all changes. 4. Please write your PR title to summarize what this PR proposes. 5. If possible, provide a concise example to reproduce the issue for a faster review. 6. If applicable, include the corresponding issue number in the PR title and link it in the body. --> #### Which Delta project/connector is this regarding? <!-- Please add the component selected below to the beginning of the pull request title For example: [Spark] Title of my pull request --> - [x] Spark - [ ] Standalone - [ ] Flink - [ ] Kernel - [ ] Other (fill in here) ## Description <!-- - Describe what this PR changes. - Describe why we need the change. If this PR resolves an issue be sure to include "Resolves #XXX" to correctly link and close the issue upon merge. --> This PR rewrites the GENERATE SYMLINK MANIFEST command (`DeltaGenerateCommand`) to use Spark's table resolution mechanism. It is replaced to be a `UnaryRunnableCommand` that takes in a child logical plan, which is initially an unresolved table. The table is then resolved by Spark's resolution mechanism, resulting in a `DeltaTableV2` object. With this object, we also have a catalog table which is passed down into `generateFulllManifest` as part of the effort to pass the catalog table to DeltaLog API's call sites. ## How was this patch tested? <!-- If tests were added, say they were added here. Please make sure to test the changes thoroughly including negative and positive cases if possible. If the changes were tested in any way other than unit tests, please clarify how you tested step by step (ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future). If the changes were not tested, please explain why. --> unit tests ## Does this PR introduce _any_ user-facing changes? <!-- If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible. If possible, please also clarify if this is a user-facing change compared to the released Delta Lake versions or within the unreleased branches such as master. If no, write 'No'. --> No
1 parent 2937bc8 commit 634ba15

File tree

10 files changed

+114
-59
lines changed

10 files changed

+114
-59
lines changed

project/SparkMimaExcludes.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ object SparkMimaExcludes {
2727
ProblemFilters.exclude[Problem]("io.delta.sql.parser.*"),
2828
ProblemFilters.exclude[Problem]("io.delta.tables.execution.*"),
2929
ProblemFilters.exclude[DirectMissingMethodProblem]("io.delta.tables.DeltaTable.apply"),
30+
ProblemFilters.exclude[DirectMissingMethodProblem]("io.delta.tables.DeltaTable.executeGenerate"),
3031
ProblemFilters.exclude[DirectMissingMethodProblem]("io.delta.tables.DeltaTable.executeHistory"),
3132
ProblemFilters.exclude[DirectMissingMethodProblem]("io.delta.tables.DeltaTable.executeVacuum"),
3233
ProblemFilters.exclude[DirectMissingMethodProblem]("io.delta.tables.DeltaTable.this"),

spark/src/main/scala/io/delta/sql/parser/DeltaSqlParser.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -453,9 +453,8 @@ class DeltaSqlAstBuilder extends DeltaSqlBaseBaseVisitor[AnyRef] {
453453

454454
override def visitGenerate(ctx: GenerateContext): LogicalPlan = withOrigin(ctx) {
455455
DeltaGenerateCommand(
456-
modeName = ctx.modeName.getText,
457-
tableId = visitTableIdentifier(ctx.table),
458-
Map.empty)
456+
UnresolvedTable(visitTableIdentifier(ctx.table).nameParts, DeltaGenerateCommand.COMMAND_NAME),
457+
modeName = ctx.modeName.getText)
459458
}
460459

461460
override def visitConvert(ctx: ConvertContext): LogicalPlan = withOrigin(ctx) {

spark/src/main/scala/io/delta/tables/DeltaTable.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -159,8 +159,7 @@ class DeltaTable private[tables](
159159
* @since 0.5.0
160160
*/
161161
def generate(mode: String): Unit = {
162-
val tableId = table.tableIdentifier.getOrElse(s"delta.`${deltaLog.dataPath.toString}`")
163-
executeGenerate(tableId, mode)
162+
executeGenerate(deltaLog.dataPath.toString, table.getTableIdentifierIfExists, mode)
164163
}
165164

166165
/**

spark/src/main/scala/io/delta/tables/execution/DeltaTableOperations.scala

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,8 @@ import org.apache.hadoop.fs.Path
2828

2929
import org.apache.spark.sql.{functions, Column, DataFrame}
3030
import org.apache.spark.sql.catalyst.TableIdentifier
31-
import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedRelation}
31+
import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedRelation, UnresolvedTable}
32+
import org.apache.spark.sql.catalyst.analysis.UnresolvedTableImplicits._
3233
import org.apache.spark.sql.catalyst.expressions.{Expression, Literal}
3334
import org.apache.spark.sql.catalyst.plans.logical._
3435
import org.apache.spark.sql.connector.catalog.Identifier
@@ -63,15 +64,13 @@ trait DeltaTableOperations extends AnalysisHelper { self: io.delta.tables.DeltaT
6364
toDataset(sparkSession, details)
6465
}
6566

66-
protected def executeGenerate(tblIdentifier: String, mode: String): Unit =
67-
withActiveSession(sparkSession) {
68-
val tableId: TableIdentifier = sparkSession
69-
.sessionState
70-
.sqlParser
71-
.parseTableIdentifier(tblIdentifier)
72-
val generate = DeltaGenerateCommand(mode, tableId, self.deltaLog.options)
73-
toDataset(sparkSession, generate)
74-
}
67+
protected def executeGenerate(
68+
path: String,
69+
tableIdentifier: Option[TableIdentifier],
70+
mode: String): Unit = withActiveSession(sparkSession) {
71+
val generate = DeltaGenerateCommand(Option(path), tableIdentifier, mode, self.deltaLog.options)
72+
toDataset(sparkSession, generate)
73+
}
7574

7675
protected def executeUpdate(
7776
set: Map[String, Column],

spark/src/main/scala/org/apache/spark/sql/delta/DeltaTable.scala

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -700,14 +700,21 @@ object UnresolvedDeltaPathOrIdentifier {
700700
def apply(
701701
path: Option[String],
702702
tableIdentifier: Option[TableIdentifier],
703+
options: Map[String, String],
703704
cmd: String): LogicalPlan = {
704705
(path, tableIdentifier) match {
705-
case (Some(p), None) => UnresolvedPathBasedDeltaTable(p, Map.empty, cmd)
706+
case (Some(p), None) => UnresolvedPathBasedDeltaTable(p, options, cmd)
706707
case (None, Some(t)) => UnresolvedTable(t.nameParts, cmd)
707708
case _ => throw new IllegalArgumentException(
708709
s"Exactly one of path or tableIdentifier must be provided to $cmd")
709710
}
710711
}
712+
713+
def apply(
714+
path: Option[String],
715+
tableIdentifier: Option[TableIdentifier],
716+
cmd: String): LogicalPlan =
717+
this(path, tableIdentifier, Map.empty, cmd)
711718
}
712719

713720
/**

spark/src/main/scala/org/apache/spark/sql/delta/commands/DeltaGenerateCommand.scala

Lines changed: 36 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -17,50 +17,59 @@
1717
package org.apache.spark.sql.delta.commands
1818

1919
// scalastyle:off import.ordering.noEmptyLine
20-
import org.apache.spark.sql.delta.{DeltaErrors, DeltaLog, DeltaTableIdentifier}
20+
import org.apache.spark.sql.delta.{DeltaErrors, DeltaLog, UnresolvedDeltaPathOrIdentifier}
2121
import org.apache.spark.sql.delta.hooks.GenerateSymlinkManifest
22-
import org.apache.hadoop.fs.Path
2322

2423
import org.apache.spark.sql.{Row, SparkSession}
2524
import org.apache.spark.sql.catalyst.TableIdentifier
25+
import org.apache.spark.sql.catalyst.catalog.CatalogTable
26+
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, UnaryNode}
2627
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
27-
import org.apache.spark.sql.execution.command.LeafRunnableCommand
28+
import org.apache.spark.sql.execution.command.RunnableCommand
2829

29-
case class DeltaGenerateCommand(
30-
modeName: String,
31-
tableId: TableIdentifier,
32-
options: Map[String, String])
33-
extends LeafRunnableCommand {
30+
case class DeltaGenerateCommand(override val child: LogicalPlan, modeName: String)
31+
extends RunnableCommand
32+
with UnaryNode
33+
with DeltaCommand {
3434

3535
import DeltaGenerateCommand._
3636

37+
override def withNewChildInternal(newChild: LogicalPlan): LogicalPlan = copy(child = newChild)
38+
3739
override def run(sparkSession: SparkSession): Seq[Row] = {
3840
if (!modeNameToGenerationFunc.contains(modeName)) {
3941
throw DeltaErrors.unsupportedGenerateModeException(modeName)
4042
}
41-
42-
val deltaLog = DeltaTableIdentifier(sparkSession, tableId) match {
43-
case Some(id) if id.path.isDefined =>
44-
DeltaLog.forTable(sparkSession, new Path(id.path.get), options)
45-
case _ =>
46-
DeltaLog.forTable(
47-
sparkSession,
48-
sparkSession.sessionState.catalog.getTableMetadata(tableId),
49-
options)
50-
}
51-
52-
if (!deltaLog.tableExists) {
53-
throw DeltaErrors.notADeltaTableException("GENERATE")
54-
}
5543
val generationFunc = modeNameToGenerationFunc(modeName)
56-
generationFunc(sparkSession, deltaLog)
44+
val table = getDeltaTable(child, COMMAND_NAME)
45+
generationFunc(sparkSession, table.deltaLog, table.catalogTable)
5746
Seq.empty
5847
}
5948
}
6049

6150
object DeltaGenerateCommand {
62-
val modeNameToGenerationFunc = CaseInsensitiveMap(
63-
Map[String, (SparkSession, DeltaLog) => Unit](
64-
"symlink_format_manifest" -> GenerateSymlinkManifest.generateFullManifest
65-
))
51+
val modeNameToGenerationFunc
52+
: CaseInsensitiveMap[(SparkSession, DeltaLog, Option[CatalogTable]) => Unit] =
53+
CaseInsensitiveMap(
54+
Map[String, (SparkSession, DeltaLog, Option[CatalogTable]) => Unit](
55+
"symlink_format_manifest" -> GenerateSymlinkManifest.generateFullManifest
56+
)
57+
)
58+
59+
val COMMAND_NAME = "GENERATE"
60+
61+
def apply(
62+
path: Option[String],
63+
tableIdentifier: Option[TableIdentifier],
64+
modeName: String,
65+
options: Map[String, String]
66+
): DeltaGenerateCommand = {
67+
// Exactly one of path or tableIdentifier should be specified
68+
val plan = UnresolvedDeltaPathOrIdentifier(
69+
path.filter(_ => tableIdentifier.isEmpty),
70+
tableIdentifier,
71+
options,
72+
COMMAND_NAME)
73+
DeltaGenerateCommand(plan, modeName)
74+
}
6675
}

spark/src/main/scala/org/apache/spark/sql/delta/hooks/GenerateSymlinkManifest.scala

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ import org.apache.spark.SparkEnv
3232
import org.apache.spark.internal.MDC
3333
import org.apache.spark.sql._
3434
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
35-
import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils
35+
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, ExternalCatalogUtils}
3636
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
3737
import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Concat, Expression, Literal, ScalaUDF}
3838
import org.apache.spark.sql.execution.datasources.InMemoryFileIndex
@@ -74,8 +74,7 @@ trait GenerateSymlinkManifestImpl extends PostCommitHook with DeltaLogging with
7474
committedVersion: Long,
7575
postCommitSnapshot: Snapshot,
7676
committedActions: Seq[Action]): Unit = {
77-
generateIncrementalManifest(
78-
spark, txn.deltaLog, txn.snapshot, postCommitSnapshot, committedActions)
77+
generateIncrementalManifest(spark, txn, postCommitSnapshot, committedActions)
7978
}
8079

8180
override def handleError(spark: SparkSession, error: Throwable, version: Long): Unit = {
@@ -93,21 +92,21 @@ trait GenerateSymlinkManifestImpl extends PostCommitHook with DeltaLogging with
9392
*/
9493
protected def generateIncrementalManifest(
9594
spark: SparkSession,
96-
deltaLog: DeltaLog,
97-
txnReadSnapshot: Snapshot,
95+
txn: OptimisticTransactionImpl,
9896
currentSnapshot: Snapshot,
99-
actions: Seq[Action]): Unit = recordManifestGeneration(deltaLog, full = false) {
97+
actions: Seq[Action]): Unit = recordManifestGeneration(txn.deltaLog, full = false) {
10098

10199
import org.apache.spark.sql.delta.implicits._
102100

103101
checkColumnMappingMode(currentSnapshot.metadata)
104102

103+
val deltaLog = txn.deltaLog
105104
val partitionCols = currentSnapshot.metadata.partitionColumns
106105
val manifestRootDirPath = new Path(deltaLog.dataPath, MANIFEST_LOCATION)
107106
val hadoopConf = new SerializableConfiguration(deltaLog.newDeltaHadoopConf())
108107
val fs = deltaLog.dataPath.getFileSystem(hadoopConf.value)
109108
if (!fs.exists(manifestRootDirPath)) {
110-
generateFullManifest(spark, deltaLog)
109+
generateFullManifest(spark, deltaLog, txn.catalogTable)
111110
return
112111
}
113112

@@ -120,7 +119,7 @@ trait GenerateSymlinkManifestImpl extends PostCommitHook with DeltaLogging with
120119
val removedFileNames =
121120
spark.createDataset(actions.collect { case r: RemoveFile => r.path }).toDF("path")
122121
val partitionValuesOfRemovedFiles =
123-
txnReadSnapshot.allFiles.join(removedFileNames, "path").select("partitionValues").persist()
122+
txn.snapshot.allFiles.join(removedFileNames, "path").select("partitionValues").persist()
124123
try {
125124
val partitionsOfRemovedFiles = partitionValuesOfRemovedFiles
126125
.as[Map[String, String]](GenerateSymlinkManifestUtils.mapEncoder).collect().toSet
@@ -180,8 +179,9 @@ trait GenerateSymlinkManifestImpl extends PostCommitHook with DeltaLogging with
180179
*/
181180
def generateFullManifest(
182181
spark: SparkSession,
183-
deltaLog: DeltaLog): Unit = {
184-
val snapshot = deltaLog.update(stalenessAcceptable = false)
182+
deltaLog: DeltaLog,
183+
catalogTableOpt: Option[CatalogTable]): Unit = {
184+
val snapshot = deltaLog.update(catalogTableOpt = catalogTableOpt)
185185
assertTableIsDVFree(spark, snapshot)
186186
generateFullManifestWithSnapshot(spark, deltaLog, snapshot)
187187
}
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
/*
2+
* Copyright (2024) The Delta Lake Project Authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.apache.spark.sql.delta
18+
19+
object DeltaGenerateSymlinkManifestSuiteShims {
20+
val FAILS_ON_TEMP_VIEWS_ERROR_MSG = "v is a temp view. 'GENERATE' expects a table"
21+
}
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
/*
2+
* Copyright (2024) The Delta Lake Project Authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.apache.spark.sql.delta
18+
19+
object DeltaGenerateSymlinkManifestSuiteShims {
20+
val FAILS_ON_TEMP_VIEWS_ERROR_MSG = "'GENERATE' expects a table but `v` is a view."
21+
}

spark/src/test/scala/org/apache/spark/sql/delta/DeltaGenerateSymlinkManifestSuite.scala

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import java.io.File
2020
import java.net.URI
2121

2222
// scalastyle:off import.ordering.noEmptyLine
23+
import org.apache.spark.sql.delta.DeltaGenerateSymlinkManifestSuiteShims._
2324
import org.apache.spark.sql.delta.DeltaOperations.Delete
2425
import org.apache.spark.sql.delta.commands.DeltaGenerateCommand
2526
import org.apache.spark.sql.delta.hooks.GenerateSymlinkManifest
@@ -100,16 +101,14 @@ trait DeltaGenerateSymlinkManifestSuiteBase extends QueryTest
100101
spark.sql(s"GENERATE symlink_format_manifest FOR TABLE delta.`$dir`")
101102
}
102103

103-
assert(e.getMessage.contains("not found") ||
104-
e.getMessage.contains("only supported for Delta"))
104+
assert(e.getMessage.contains("is not a Delta table"))
105105

106106
spark.range(2).write.format("parquet").mode("overwrite").save(dir.toString)
107107

108108
e = intercept[AnalysisException] {
109109
spark.sql(s"GENERATE symlink_format_manifest FOR TABLE delta.`$dir`")
110110
}
111-
assert(e.getMessage.contains("table not found") ||
112-
e.getMessage.contains("only supported for Delta"))
111+
assert(e.getMessage.contains("is not a Delta table"))
113112

114113
e = intercept[AnalysisException] {
115114
spark.sql(s"GENERATE symlink_format_manifest FOR TABLE parquet.`$dir`")
@@ -125,7 +124,7 @@ trait DeltaGenerateSymlinkManifestSuiteBase extends QueryTest
125124
val e = intercept[AnalysisException] {
126125
spark.sql(s"GENERATE symlink_format_manifest FOR TABLE v")
127126
}
128-
assert(e.getMessage.contains("not found") || e.getMessage.contains("cannot be found"))
127+
assert(e.getMessage.contains(FAILS_ON_TEMP_VIEWS_ERROR_MSG))
129128
}
130129
}
131130

@@ -771,7 +770,7 @@ trait DeltaGenerateSymlinkManifestSuiteBase extends QueryTest
771770

772771
protected def generateSymlinkManifest(tablePath: String): Unit = {
773772
val deltaLog = DeltaLog.forTable(spark, tablePath)
774-
GenerateSymlinkManifest.generateFullManifest(spark, deltaLog)
773+
GenerateSymlinkManifest.generateFullManifest(spark, deltaLog, catalogTableOpt = None)
775774
}
776775
}
777776

0 commit comments

Comments
 (0)