Skip to content

Commit 9e183e6

Browse files
authored
[Spark] Make MergeIntoSuiteBase agnostic to name/path-based access (delta-io#4809)
<!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md 2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP] Your PR title ...'. 3. Be sure to keep the PR description updated to reflect all changes. 4. Please write your PR title to summarize what this PR proposes. 5. If possible, provide a concise example to reproduce the issue for a faster review. 6. If applicable, include the corresponding issue number in the PR title and link it in the body. --> #### Which Delta project/connector is this regarding? <!-- Please add the component selected below to the beginning of the pull request title For example: [Spark] Title of my pull request --> - [x] Spark - [ ] Standalone - [ ] Flink - [ ] Kernel - [ ] Other (fill in here) ## Description This PR is a continuation of delta-io#4808. Here, we continue pushing the `DeltaDMLByPathTestUtils` trait down closer to the MergeInto suites. In the process, we need to change all explicit path-based accesses to more generic constructs such as `tableSQLIdentifier` and `readDeltaTableByIdentifier`, which can be overrode with either `DeltaDMLByPathTestUtils` or `DeltaDMLByNameTestUtils` (to be added later) traits. ## How was this patch tested? Existing UTs ## Does this PR introduce _any_ user-facing changes? No
1 parent 662f4ac commit 9e183e6

File tree

7 files changed

+193
-204
lines changed

7 files changed

+193
-204
lines changed

spark/src/test/scala/org/apache/spark/sql/delta/DeltaTestUtils.scala

Lines changed: 17 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ import org.scalatest.BeforeAndAfterEach
4242

4343
import org.apache.spark.{SparkContext, SparkFunSuite, SparkThrowable}
4444
import org.apache.spark.scheduler.{JobFailed, SparkListener, SparkListenerJobEnd, SparkListenerJobStart}
45-
import org.apache.spark.sql.{AnalysisException, DataFrame, SparkSession}
45+
import org.apache.spark.sql.{AnalysisException, DataFrame, DataFrameWriter, SparkSession}
4646
import org.apache.spark.sql.catalyst.TableIdentifier
4747
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
4848
import org.apache.spark.sql.catalyst.util.{quietly, FailFastMode}
@@ -578,20 +578,11 @@ trait DeltaDMLTestUtils
578578
protected def tableSQLIdentifier: String
579579

580580
protected def append(df: DataFrame, partitionBy: Seq[String] = Nil): Unit = {
581-
import DeltaTestUtils.TableIdentifierOrPath
582-
583581
val dfw = df.write.format("delta").mode("append")
584582
if (partitionBy.nonEmpty) {
585583
dfw.partitionBy(partitionBy: _*)
586584
}
587-
getTableIdentifierOrPath(tableSQLIdentifier) match {
588-
case TableIdentifierOrPath.Identifier(id, _) => dfw.saveAsTable(id.toString)
589-
// A cleaner way to write this is to just use `saveAsTable` where the
590-
// table name is delta.`path`. However, it will throw an error when
591-
// we use "append" mode and the table does not exist, so we use `save`
592-
// here instead.
593-
case TableIdentifierOrPath.Path(path, _) => dfw.save(path)
594-
}
585+
writeTable(dfw, tableSQLIdentifier)
595586
}
596587

597588
protected def withKeyValueData(
@@ -633,10 +624,24 @@ trait DeltaDMLTestUtils
633624
* Reads a delta table by its identifier. The identifier can either be the table name or table
634625
* path that is in the form of delta.`tablePath`.
635626
*/
636-
protected def readDeltaTableByIdentifier(tableIdentifier: String): DataFrame = {
627+
protected def readDeltaTableByIdentifier(
628+
tableIdentifier: String = tableSQLIdentifier): DataFrame = {
637629
spark.read.format("delta").table(tableIdentifier)
638630
}
639631

632+
protected def writeTable[T](dfw: DataFrameWriter[T], tableName: String): Unit = {
633+
import DeltaTestUtils.TableIdentifierOrPath
634+
635+
getTableIdentifierOrPath(tableName) match {
636+
case TableIdentifierOrPath.Identifier(id, _) => dfw.saveAsTable(id.toString)
637+
// A cleaner way to write this is to just use `saveAsTable` where the
638+
// table name is delta.`path`. However, it will throw an error when
639+
// we use "append" mode and the table does not exist, so we use `save`
640+
// here instead.
641+
case TableIdentifierOrPath.Path(path, _) => dfw.save(path)
642+
}
643+
}
644+
640645
/**
641646
* Finds the latest operation of the given type that ran on the test table and returns the
642647
* dataframe with the changes of the corresponding table version.
@@ -692,6 +697,4 @@ trait DeltaDMLByPathTestUtils extends DeltaDMLTestUtils {
692697
protected def readDeltaTable(path: String): DataFrame = {
693698
spark.read.format("delta").load(path)
694699
}
695-
696-
protected def getDeltaFileStmt(path: String): String = s"SELECT * FROM delta.`$path`"
697700
}

spark/src/test/scala/org/apache/spark/sql/delta/MergeIntoDVsSuite.scala

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -38,16 +38,16 @@ trait MergeIntoDVsMixin extends MergeIntoSQLMixin with DeletionVectorsTestUtils
3838

3939
override def excluded: Seq[String] = {
4040
val miscFailures = Seq(
41-
"basic case - merge to view on a Delta table by path, " +
41+
"basic case - merge to view on a Delta table, " +
4242
"partitioned: true skippingEnabled: false useSqlView: true",
43-
"basic case - merge to view on a Delta table by path, " +
43+
"basic case - merge to view on a Delta table, " +
4444
"partitioned: true skippingEnabled: false useSqlView: false",
45-
"basic case - merge to view on a Delta table by path, " +
45+
"basic case - merge to view on a Delta table, " +
4646
"partitioned: false skippingEnabled: false useSqlView: true",
47-
"basic case - merge to view on a Delta table by path, " +
47+
"basic case - merge to view on a Delta table, " +
4848
"partitioned: false skippingEnabled: false useSqlView: false",
49-
"basic case - merge to Delta table by name, isPartitioned: false skippingEnabled: false",
50-
"basic case - merge to Delta table by name, isPartitioned: true skippingEnabled: false",
49+
"basic case - merge to Delta table, isPartitioned: false skippingEnabled: false",
50+
"basic case - merge to Delta table, isPartitioned: true skippingEnabled: false",
5151
"not matched by source - all 3 clauses - no changes - " +
5252
"isPartitioned: true - cdcEnabled: true",
5353
"not matched by source - all 3 clauses - no changes - " +

spark/src/test/scala/org/apache/spark/sql/delta/MergeIntoNotMatchedBySourceSuite.scala

Lines changed: 28 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -49,10 +49,8 @@ trait MergeIntoNotMatchedBySourceSuite extends MergeIntoSuiteBaseMixin {
4949
withSQLConf(DeltaSQLConf.MERGE_INSERT_ONLY_ENABLED.key -> "true") {
5050
executeMerge(s"$targetName t", s"$sourceName s", mergeOn, mergeClauses: _*)
5151
}
52-
val deltaPath = if (targetName.startsWith("delta.`")) {
53-
targetName.stripPrefix("delta.`").stripSuffix("`")
54-
} else targetName
55-
checkAnswer(readDeltaTable(deltaPath), result.map { case (k, v) => Row(k, v) })
52+
checkAnswer(readDeltaTableByIdentifier(targetName),
53+
result.map { case (k, v) => Row(k, v) })
5654
}
5755
if (cdcEnabled) {
5856
checkAnswer(getCDCForLatestOperation(deltaLog, DeltaOperations.OP_MERGE), cdc.toDF())
@@ -504,28 +502,34 @@ trait MergeIntoNotMatchedBySourceSuite extends MergeIntoSuiteBaseMixin {
504502
cdc = Seq.empty)
505503

506504
test(s"special character in path - not matched by source delete") {
507-
val source = s"$tempDir/sou rce^"
508-
val target = s"$tempDir/tar get="
509-
spark.range(0, 10, 2).write.format("delta").save(source)
510-
spark.range(10).write.format("delta").save(target)
511-
executeMerge(
512-
tgt = s"delta.`$target` t",
513-
src = s"delta.`$source` s",
514-
cond = "t.id = s.id",
515-
clauses = deleteNotMatched())
516-
checkAnswer(readDeltaTable(target), Seq(0, 2, 4, 6, 8).toDF("id"))
505+
withTempDir { tempDir =>
506+
val source = s"$tempDir/sou rce^"
507+
val target = s"$tempDir/tar get="
508+
spark.range(0, 10, 2).write.format("delta").save(source)
509+
spark.range(10).write.format("delta").save(target)
510+
executeMerge(
511+
tgt = s"delta.`$target` t",
512+
src = s"delta.`$source` s",
513+
cond = "t.id = s.id",
514+
clauses = deleteNotMatched())
515+
checkAnswer(readDeltaTableByIdentifier(s"delta.`$target`"), Seq(0, 2, 4, 6, 8).toDF("id"))
516+
}
517517
}
518518

519519
test(s"special character in path - not matched by source update") {
520-
val source = s"$tempDir/sou rce@"
521-
val target = s"$tempDir/tar get#"
522-
spark.range(0, 10, 2).write.format("delta").save(source)
523-
spark.range(10).write.format("delta").save(target)
524-
executeMerge(
525-
tgt = s"delta.`$target` t",
526-
src = s"delta.`$source` s",
527-
cond = "t.id = s.id",
528-
clauses = updateNotMatched(set = "id = t.id * 10"))
529-
checkAnswer(readDeltaTable(target), Seq(0, 10, 2, 30, 4, 50, 6, 70, 8, 90).toDF("id"))
520+
withTempDir { tempDir =>
521+
val source = s"$tempDir/sou rce@"
522+
val target = s"$tempDir/tar get#"
523+
spark.range(0, 10, 2).write.format("delta").save(source)
524+
spark.range(10).write.format("delta").save(target)
525+
executeMerge(
526+
tgt = s"delta.`$target` t",
527+
src = s"delta.`$source` s",
528+
cond = "t.id = s.id",
529+
clauses = updateNotMatched(set = "id = t.id * 10"))
530+
checkAnswer(
531+
readDeltaTableByIdentifier(s"delta.`$target`"),
532+
Seq(0, 10, 2, 30, 4, 50, 6, 70, 8, 90).toDF("id"))
533+
}
530534
}
531535
}

spark/src/test/scala/org/apache/spark/sql/delta/MergeIntoSQLSuite.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
3535
trait MergeIntoSQLMixin extends MergeIntoSuiteBaseMixin
3636
with MergeIntoSQLTestUtils
3737
with DeltaSQLCommandTest
38+
with DeltaDMLByPathTestUtils
3839
with DeltaTestUtilsForTempViews {
3940

4041
override def excluded: Seq[String] = super.excluded ++ Seq(

spark/src/test/scala/org/apache/spark/sql/delta/MergeIntoScalaSuite.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import org.apache.spark.sql.types.StructType
3131
trait MergeIntoScalaMixin extends MergeIntoSuiteBaseMixin
3232
with MergeIntoScalaTestUtils
3333
with DeltaSQLCommandTest
34+
with DeltaDMLByPathTestUtils
3435
with DeltaTestUtilsForTempViews
3536
with DeltaExcludedTestMixin {
3637

spark/src/test/scala/org/apache/spark/sql/delta/MergeIntoSchemaEvolutionSuite.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -77,14 +77,14 @@ trait MergeIntoSchemaEvolutionMixin {
7777
errorContains(Utils.exceptionString(ex), error)
7878
} else {
7979
executeMerge(s"$tableSQLIdentifier t", "source s", cond, clauses: _*)
80-
checkAnswer(readDeltaTableByIdentifier(tableSQLIdentifier), df.collect())
80+
checkAnswer(readDeltaTableByIdentifier(), df.collect())
8181
if (schema != null) {
82-
assert(readDeltaTableByIdentifier(tableSQLIdentifier).schema === schema)
82+
assert(readDeltaTableByIdentifier().schema === schema)
8383
} else {
8484
// Check against the schema of the expected result df if no explicit schema was
8585
// provided. Nullability of fields will vary depending on the actual data in the df so
8686
// we ignore it.
87-
assert(readDeltaTableByIdentifier(tableSQLIdentifier).schema.asNullable ===
87+
assert(readDeltaTableByIdentifier().schema.asNullable ===
8888
df.schema.asNullable)
8989
}
9090
}

0 commit comments

Comments
 (0)