Skip to content

Commit 40b2a5f

Browse files
Omar-SalehOmar Elhadidy
andauthored
[SPARK] Refactor deltaTable forName fix (delta-io#4710)
<!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md 2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP] Your PR title ...'. 3. Be sure to keep the PR description updated to reflect all changes. 4. Please write your PR title to summarize what this PR proposes. 5. If possible, provide a concise example to reproduce the issue for a faster review. 6. If applicable, include the corresponding issue number in the PR title and link it in the body. --> #### Which Delta project/connector is this regarding? <!-- Please add the component selected below to the beginning of the pull request title For example: [Spark] Title of my pull request --> - [x] Spark - [ ] Standalone - [ ] Flink - [ ] Kernel - [ ] Other (fill in here) ## Description Follow up PR for delta-io#4605, adding an extra test and refactoring some parts of the previous PR. This PR doesn't add any new functional changes. <!-- - Describe what this PR changes. - Describe why we need the change. If this PR resolves an issue be sure to include "Resolves #XXX" to correctly link and close the issue upon merge. --> ## How was this patch tested? UTs <!-- If tests were added, say they were added here. Please make sure to test the changes thoroughly including negative and positive cases if possible. If the changes were tested in any way other than unit tests, please clarify how you tested step by step (ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future). If the changes were not tested, please explain why. --> ## Does this PR introduce _any_ user-facing changes? <!-- If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible. If possible, please also clarify if this is a user-facing change compared to the released Delta Lake versions or within the unreleased branches such as master. If no, write 'No'. --> --------- Co-authored-by: Omar Elhadidy <[email protected]>
1 parent b9b7ee9 commit 40b2a5f

File tree

3 files changed

+33
-27
lines changed

3 files changed

+33
-27
lines changed

spark/src/main/scala/org/apache/spark/sql/delta/catalog/CatalogResolver.scala

Lines changed: 19 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,7 @@
1616

1717
package org.apache.spark.sql.delta.catalog
1818

19-
import org.apache.spark.sql.delta.{DeltaErrors, DeltaTableIdentifier}
20-
import org.apache.spark.sql.delta.sources.DeltaSourceUtils
19+
import org.apache.spark.sql.delta.{DeltaErrors, DeltaTableIdentifier, DeltaTableUtils}
2120
import org.apache.hadoop.fs.Path
2221

2322
import org.apache.spark.sql.SparkSession
@@ -27,15 +26,25 @@ import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.CatalogHelper
2726
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.MultipartIdentifierHelper
2827

2928
/**
30-
* Helper object for resolving tables using a *non-session* catalog.
29+
* Helper object for resolving Delta tables using a *non-session* catalog.
3130
*/
3231
object CatalogResolver {
33-
private def asDeltaTable(spark: SparkSession, table: Table): DeltaTableV2 = table match {
34-
case v2: DeltaTableV2 => v2
35-
case v1: V1Table if DeltaSourceUtils.isDeltaTable(v1.v1Table.provider) =>
36-
DeltaTableV2(spark, new Path(v1.v1Table.location), Some(v1.v1Table))
37-
case _ => throw DeltaErrors.notADeltaTableException(
38-
DeltaTableIdentifier(table = Some(TableIdentifier(table.name()))))
32+
def getDeltaTableFromCatalog(
33+
spark: SparkSession,
34+
catalog: CatalogPlugin,
35+
ident: Identifier): DeltaTableV2 = {
36+
catalog.asTableCatalog.loadTable(ident) match {
37+
case v2: DeltaTableV2 => v2
38+
case v1: V1Table if DeltaTableUtils.isDeltaTable(v1.v1Table) =>
39+
DeltaTableV2(
40+
spark,
41+
path = new Path(v1.v1Table.location),
42+
catalogTable = Some(v1.v1Table),
43+
tableIdentifier = Some(ident.toString)
44+
)
45+
case table => throw DeltaErrors.notADeltaTableException(
46+
DeltaTableIdentifier(table = Some(TableIdentifier(table.name()))))
47+
}
3948
}
4049

4150
/**
@@ -46,20 +55,6 @@ object CatalogResolver {
4655
spark: SparkSession,
4756
catalog: String,
4857
ident: Seq[String]): (CatalogPlugin, Identifier) = {
49-
(spark.sessionState.catalogManager.catalog(catalog),
50-
MultipartIdentifierHelper(ident).asIdentifier)
51-
}
52-
53-
def getDeltaTableFromCatalog(
54-
spark: SparkSession,
55-
catalog: CatalogPlugin,
56-
ident: Identifier): DeltaTableV2 = {
57-
val tblCatalog = catalog.asTableCatalog
58-
if (tblCatalog.tableExists(ident)) {
59-
asDeltaTable(spark, tblCatalog.loadTable(ident))
60-
} else {
61-
throw DeltaErrors.nonExistentDeltaTable(
62-
DeltaTableIdentifier(table = Some(TableIdentifier(ident.name()))))
63-
}
58+
(spark.sessionState.catalogManager.catalog(catalog), ident.asIdentifier)
6459
}
6560
}

spark/src/test/scala/io/delta/tables/DeltaTableForNameSuite.scala

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ class DeltaTableForNameSuite extends QueryTest
8282
}
8383

8484
private def getTablePath(tableName: String): Path = {
85-
new Path(FileUtils.getTempDirectory.toString + "/DeltaTableForNameSuite/" + tableName)
85+
new Path(DummyCatalogWithNamespace.catalogDir + s"/$tableName")
8686
}
8787

8888
private def setUpTable(catalog: String, schema: String, table: String): Unit = {
@@ -167,4 +167,12 @@ class DeltaTableForNameSuite extends QueryTest
167167
checkError(exception = e, "DELTA_MISSING_DELTA_TABLE",
168168
parameters = Map("tableName" -> s"`$nonSessionCatalogNonDefaultSchema`.`$commonTblName`"))
169169
}
170+
171+
test("forName fails with fully qualified non existent table") {
172+
val e = intercept[AnalysisException] {
173+
DeltaTable.forName(spark, s"$catalogName.$defaultSchema.invalid_table")
174+
}
175+
checkError(exception = e, "TABLE_OR_VIEW_NOT_FOUND",
176+
parameters = Map("relationName" -> s"`$catalogName`.`$defaultSchema`.`invalid_table`"))
177+
}
170178
}

spark/src/test/scala/org/apache/spark/sql/delta/test/CustomCatalogs.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -194,8 +194,7 @@ class DummyCatalogWithNamespace extends DummyCatalog with SupportsNamespaces {
194194
// To load a catalog into spark CatalogPlugin calls the Catalog's no-arg constructor and
195195
// then Catalog.initialize. To have a consistent state across different invocations
196196
// in the same test, this catalog impl uses a hard coded path.
197-
override lazy val tempDir: Path = new Path(
198-
FileUtils.getTempDirectory.toString + "/DeltaTableForNameSuite")
197+
override lazy val tempDir: Path = DummyCatalogWithNamespace.catalogDir
199198
// scalastyle:off deltahadoopconfiguration
200199
override lazy val fs: FileSystem =
201200
tempDir.getFileSystem(spark.sessionState.newHadoopConf())
@@ -308,3 +307,7 @@ class DummyCatalogWithNamespace extends DummyCatalog with SupportsNamespaces {
308307
new util.HashMap[String, String]()
309308
}
310309
}
310+
311+
object DummyCatalogWithNamespace {
312+
val catalogDir: Path = new Path(Utils.createTempDir().getAbsolutePath)
313+
}

0 commit comments

Comments
 (0)