Skip to content

Commit 6a73c17

Browse files
authored
[Spark] Run MergeIntoSQLSuite with both path-based and name-based access (delta-io#4896)
<!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md 2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP] Your PR title ...'. 3. Be sure to keep the PR description updated to reflect all changes. 4. Please write your PR title to summarize what this PR proposes. 5. If possible, provide a concise example to reproduce the issue for a faster review. 6. If applicable, include the corresponding issue number in the PR title and link it in the body. --> #### Which Delta project/connector is this regarding? <!-- Please add the component selected below to the beginning of the pull request title For example: [Spark] Title of my pull request --> - [x] Spark - [ ] Standalone - [ ] Flink - [ ] Kernel - [ ] Other (fill in here) ## Description This PR extends the `MergeIntoSQLSuite` to also run with name-based access using the new test generation framework. ## How was this patch tested? New unit tests ## Does this PR introduce _any_ user-facing changes? No
1 parent 0fb85d8 commit 6a73c17

File tree

14 files changed

+252
-101
lines changed

14 files changed

+252
-101
lines changed

spark/delta-suite-generator/src/main/scala/io/delta/suitegenerator/SuiteGeneratorConfig.scala

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,8 @@ case class TestConfig(
6969

7070
object SuiteGeneratorConfig {
7171
private object Dims {
72+
val PATH_BASED = DimensionMixin("DeltaDMLTestUtils", suffix = "PathBased")
73+
val NAME_BASED = DimensionMixin("DeltaDMLTestUtils", suffix = "NameBased")
7274
val MERGE_SQL = DimensionMixin("MergeIntoSQL")
7375
val MERGE_SCALA = DimensionMixin("MergeIntoScala")
7476
val MERGE_CDC = DimensionMixin("MergeCDC")
@@ -116,15 +118,16 @@ object SuiteGeneratorConfig {
116118
TestConfig(
117119
Tests.MERGE_SQL ++: Tests.MERGE_BASE,
118120
Seq(
119-
Seq(Dims.MERGE_SQL),
120-
Seq(Dims.MERGE_SQL, Dims.COLUMN_MAPPING, Dims.MERGE_SQL_COLMAP)
121+
Seq(Dims.MERGE_SQL, Dims.PATH_BASED),
122+
Seq(Dims.MERGE_SQL, Dims.NAME_BASED),
123+
Seq(Dims.MERGE_SQL, Dims.PATH_BASED, Dims.COLUMN_MAPPING, Dims.MERGE_SQL_COLMAP)
121124
)
122125
),
123126
TestConfig(
124127
"MergeIntoDVsTests" +: Tests.MERGE_SQL ++: Tests.MERGE_BASE,
125128
Seq(
126-
Seq(Dims.MERGE_SQL, Dims.MERGE_DVS, Dims.MERGE_DVS_OVERRIDES),
127-
Seq(Dims.MERGE_SQL, Dims.MERGE_DVS_PREDPUSH)
129+
Seq(Dims.MERGE_SQL, Dims.PATH_BASED, Dims.MERGE_DVS, Dims.MERGE_DVS_OVERRIDES),
130+
Seq(Dims.MERGE_SQL, Dims.PATH_BASED, Dims.MERGE_DVS_PREDPUSH)
128131
)
129132
),
130133
TestConfig(

spark/src/test/scala/org/apache/spark/sql/delta/DeleteSuiteBase.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ import org.apache.spark.sql.types.StructType
2727

2828
abstract class DeleteSuiteBase extends QueryTest
2929
with SharedSparkSession
30-
with DeltaDMLByPathTestUtils
30+
with DeltaDMLTestUtilsPathBased
3131
with DeltaTestUtilsForTempViews
3232
with DeltaExcludedBySparkVersionTestMixinShims {
3333

spark/src/test/scala/org/apache/spark/sql/delta/DeltaInsertIntoTest.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ import org.apache.spark.sql.types.StructType
3939
*/
4040
trait DeltaInsertIntoTest
4141
extends QueryTest
42-
with DeltaDMLByPathTestUtils
42+
with DeltaDMLTestUtilsPathBased
4343
with DeltaSQLCommandTest {
4444

4545
val catalogName = "spark_catalog"

spark/src/test/scala/org/apache/spark/sql/delta/DeltaTestUtils.scala

Lines changed: 42 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,8 @@ import com.fasterxml.jackson.module.scala.DefaultScalaModule
3838
import io.delta.tables.{DeltaTable => IODeltaTable}
3939
import org.apache.hadoop.fs.FileStatus
4040
import org.apache.hadoop.fs.Path
41-
import org.scalatest.BeforeAndAfterEach
41+
import org.scalactic.source.Position
42+
import org.scalatest.{BeforeAndAfterEach, Tag}
4243

4344
import org.apache.spark.{SparkContext, SparkFunSuite, SparkThrowable}
4445
import org.apache.spark.scheduler.{JobFailed, SparkListener, SparkListenerJobEnd, SparkListenerJobStart}
@@ -669,7 +670,7 @@ trait DeltaDMLTestUtils
669670
}
670671
}
671672

672-
trait DeltaDMLByPathTestUtils extends DeltaDMLTestUtils {
673+
trait DeltaDMLTestUtilsPathBased extends DeltaDMLTestUtils {
673674
self: SharedSparkSession =>
674675

675676
protected var tempDir: File = _
@@ -698,3 +699,42 @@ trait DeltaDMLByPathTestUtils extends DeltaDMLTestUtils {
698699
spark.read.format("delta").load(path)
699700
}
700701
}
702+
703+
/**
704+
* Represents a test that is incompatible with name-based table access
705+
*/
706+
case object NameBasedAccessIncompatible extends Tag("NameBasedAccessIncompatible")
707+
708+
trait DeltaDMLTestUtilsNameBased extends DeltaDMLTestUtils {
709+
self: SharedSparkSession =>
710+
711+
override protected def test(testName: String, testTags: Tag*)(testFun: => Any)(
712+
implicit pos: Position): Unit = {
713+
if (testTags.contains(NameBasedAccessIncompatible)) {
714+
super.ignore(testName, testTags: _*)(testFun)
715+
} else {
716+
super.test(testName, testTags: _*)(testFun)
717+
}
718+
}
719+
720+
override protected def afterEach(): Unit = {
721+
try {
722+
spark.sql(s"DROP TABLE IF EXISTS $tableSQLIdentifier")
723+
deltaLog = null
724+
} finally {
725+
super.afterEach()
726+
}
727+
}
728+
729+
override protected def append(df: DataFrame, partitionBy: Seq[String] = Nil): Unit = {
730+
super.append(df, partitionBy)
731+
if (deltaLog == null) {
732+
deltaLog = DeltaLog.forTable(spark, TableIdentifier(tableSQLIdentifier))
733+
}
734+
}
735+
736+
// Keep this all lowercase. Otherwise, for tests with spark.sql.caseSensitive set to
737+
// true, the table name used for dropping the table will not match the created table
738+
// name, causing the table not being dropped.
739+
override protected def tableSQLIdentifier: String = "test_delta_table"
740+
}

spark/src/test/scala/org/apache/spark/sql/delta/MergeIntoDVsSuite.scala

Lines changed: 14 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -101,21 +101,20 @@ trait MergeIntoDVsTests extends MergeIntoDVsMixin with MergeIntoDVsOverrides {
101101
test(s"Merge with DVs metrics - Incremental Updates") {
102102
withTempDir { dir =>
103103
val sourcePath = s"$dir/source"
104-
val targetPath = s"$dir/target"
105104

106105
spark.range(0, 10, 2).write.format("delta").save(sourcePath)
107-
spark.range(10).write.format("delta").save(targetPath)
106+
append(spark.range(10).toDF())
108107

109108
executeMerge(
110-
tgt = s"delta.`$targetPath` t",
109+
tgt = s"$tableSQLIdentifier t",
111110
src = s"delta.`$sourcePath` s",
112111
cond = "t.id = s.id",
113112
clauses = updateNotMatched(set = "id = t.id * 10"))
114113

115-
checkAnswer(readDeltaTable(targetPath), Seq(0, 10, 2, 30, 4, 50, 6, 70, 8, 90).toDF("id"))
114+
checkAnswer(readDeltaTableByIdentifier(), Seq(0, 10, 2, 30, 4, 50, 6, 70, 8, 90).toDF("id"))
116115

117116
assertOperationalDVMetrics(
118-
targetPath,
117+
deltaLog.dataPath.toString,
119118
numDeletedRows = 0,
120119
numUpdatedRows = 5,
121120
numCopiedRows = 0,
@@ -125,15 +124,15 @@ trait MergeIntoDVsTests extends MergeIntoDVsMixin with MergeIntoDVsOverrides {
125124
numDeletionVectorsUpdated = 0)
126125

127126
executeMerge(
128-
tgt = s"delta.`$targetPath` t",
127+
tgt = s"$tableSQLIdentifier t",
129128
src = s"delta.`$sourcePath` s",
130129
cond = "t.id = s.id",
131130
clauses = delete(condition = "t.id = 2"))
132131

133-
checkAnswer(readDeltaTable(targetPath), Seq(0, 10, 30, 4, 50, 6, 70, 8, 90).toDF("id"))
132+
checkAnswer(readDeltaTableByIdentifier(), Seq(0, 10, 30, 4, 50, 6, 70, 8, 90).toDF("id"))
134133

135134
assertOperationalDVMetrics(
136-
targetPath,
135+
deltaLog.dataPath.toString,
137136
numDeletedRows = 1,
138137
numUpdatedRows = 0,
139138
numCopiedRows = 0,
@@ -144,15 +143,15 @@ trait MergeIntoDVsTests extends MergeIntoDVsMixin with MergeIntoDVsOverrides {
144143

145144
// Delete all rows from a file.
146145
executeMerge(
147-
tgt = s"delta.`$targetPath` t",
146+
tgt = s"$tableSQLIdentifier t",
148147
src = s"delta.`$sourcePath` s",
149148
cond = "t.id = s.id",
150149
clauses = delete(condition = "t.id < 5"))
151150

152-
checkAnswer(readDeltaTable(targetPath), Seq(10, 30, 50, 6, 70, 8, 90).toDF("id"))
151+
checkAnswer(readDeltaTableByIdentifier(), Seq(10, 30, 50, 6, 70, 8, 90).toDF("id"))
153152

154153
assertOperationalDVMetrics(
155-
targetPath,
154+
deltaLog.dataPath.toString,
156155
numDeletedRows = 2,
157156
numUpdatedRows = 0,
158157
numCopiedRows = 0,
@@ -166,21 +165,20 @@ trait MergeIntoDVsTests extends MergeIntoDVsMixin with MergeIntoDVsOverrides {
166165
test(s"Merge with DVs metrics - delete entire file") {
167166
withTempDir { dir =>
168167
val sourcePath = s"$dir/source"
169-
val targetPath = s"$dir/target"
170168

171169
spark.range(0, 7).write.format("delta").save(sourcePath)
172-
spark.range(10).write.format("delta").save(targetPath)
170+
append(spark.range(10).toDF())
173171

174172
executeMerge(
175-
tgt = s"delta.`$targetPath` t",
173+
tgt = s"$tableSQLIdentifier t",
176174
src = s"delta.`$sourcePath` s",
177175
cond = "t.id = s.id",
178176
clauses = update(set = "id = t.id * 10"))
179177

180-
checkAnswer(readDeltaTable(targetPath), Seq(0, 10, 20, 30, 40, 50, 60, 7, 8, 9).toDF("id"))
178+
checkAnswer(readDeltaTableByIdentifier(), Seq(0, 10, 20, 30, 40, 50, 60, 7, 8, 9).toDF("id"))
181179

182180
assertOperationalDVMetrics(
183-
targetPath,
181+
deltaLog.dataPath.toString,
184182
numDeletedRows = 0,
185183
numUpdatedRows = 7,
186184
numCopiedRows = 0, // No rows were copied.

spark/src/test/scala/org/apache/spark/sql/delta/MergeIntoNotMatchedBySourceSuite.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -501,7 +501,7 @@ trait MergeIntoNotMatchedBySourceSuite extends MergeIntoSuiteBaseMixin {
501501
),
502502
cdc = Seq.empty)
503503

504-
test(s"special character in path - not matched by source delete") {
504+
test("special character in path - not matched by source delete", NameBasedAccessIncompatible) {
505505
withTempDir { tempDir =>
506506
val source = s"$tempDir/sou rce^"
507507
val target = s"$tempDir/tar get="
@@ -516,7 +516,7 @@ trait MergeIntoNotMatchedBySourceSuite extends MergeIntoSuiteBaseMixin {
516516
}
517517
}
518518

519-
test(s"special character in path - not matched by source update") {
519+
test("special character in path - not matched by source update", NameBasedAccessIncompatible) {
520520
withTempDir { tempDir =>
521521
val source = s"$tempDir/sou rce@"
522522
val target = s"$tempDir/tar get#"

spark/src/test/scala/org/apache/spark/sql/delta/MergeIntoSQLSuite.scala

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@ import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
3535
trait MergeIntoSQLMixin extends MergeIntoSuiteBaseMixin
3636
with MergeIntoSQLTestUtils
3737
with DeltaSQLCommandTest
38-
with DeltaDMLByPathTestUtils
3938
with DeltaTestUtilsForTempViews {
4039

4140
override def excluded: Seq[String] = super.excluded ++ Seq(
@@ -139,14 +138,14 @@ trait MergeIntoSQLTests extends MergeIntoSQLMixin {
139138

140139
val cte = "WITH cte1 AS (SELECT key1 + 2 AS key3, value FROM source) "
141140
val merge = basicMergeStmt(
142-
target = s"delta.`$tempPath` as target",
141+
target = s"$tableSQLIdentifier as target",
143142
source = "cte1 src",
144143
condition = "src.key3 = target.key2",
145144
update = "key2 = 20 + src.key3, value = 20 + src.value",
146145
insert = "(key2, value) VALUES (src.key3 - 10, src.value + 10)")
147146

148147
QueryTest.checkAnswer(sql(cte + merge), Seq(Row(2, 1, 0, 1)))
149-
checkAnswer(readDeltaTable(tempPath),
148+
checkAnswer(readDeltaTableByIdentifier(),
150149
Row(1, 4) :: // No change
151150
Row(22, 23) :: // Update
152151
Row(-7, 11) :: // Insert
@@ -159,7 +158,7 @@ trait MergeIntoSQLTests extends MergeIntoSQLMixin {
159158
append(Seq((2, 2), (1, 4)).toDF("key2", "value"))
160159

161160
executeMerge(
162-
target = s"delta.`$tempPath` as trg",
161+
target = s"$tableSQLIdentifier as trg",
163162
source =
164163
"""
165164
|( SELECT * FROM VALUES (1, 6, "a") as t1(key1, value, others)
@@ -171,7 +170,7 @@ trait MergeIntoSQLTests extends MergeIntoSQLMixin {
171170
update = "trg.key2 = 20 + key1, value = 20 + src.value",
172171
insert = "(trg.key2, value) VALUES (key1 - 10, src.value + 10)")
173172

174-
checkAnswer(readDeltaTable(tempPath),
173+
checkAnswer(readDeltaTableByIdentifier(),
175174
Row(2, 2) :: // No change
176175
Row(21, 26) :: // Update
177176
Row(-10, 13) :: // Insert
@@ -193,7 +192,7 @@ trait MergeIntoSQLTests extends MergeIntoSQLMixin {
193192
// duplicate column names in update clause
194193
var e = intercept[AnalysisException] {
195194
executeMerge(
196-
target = s"delta.`$tempPath` as target",
195+
target = s"$tableSQLIdentifier as target",
197196
source = "source src",
198197
condition = "src.key1 = target.key2",
199198
update = "key2 = 1, key2 = 2",
@@ -205,7 +204,7 @@ trait MergeIntoSQLTests extends MergeIntoSQLMixin {
205204
// duplicate column names in insert clause
206205
e = intercept[AnalysisException] {
207206
executeMerge(
208-
target = s"delta.`$tempPath` as target",
207+
target = s"$tableSQLIdentifier as target",
209208
source = "source src",
210209
condition = "src.key1 = target.key2",
211210
update = "key2 = 1, value = 2",
@@ -227,7 +226,7 @@ trait MergeIntoSQLTests extends MergeIntoSQLMixin {
227226
// filter pushdown can cause empty join conditions and cross-join being used
228227
withCrossJoinEnabled {
229228
val merge = basicMergeStmt(
230-
target = s"delta.`$tempPath`",
229+
target = tableSQLIdentifier,
231230
source = "source src",
232231
condition = "key2 < 0", // no row match
233232
update = "key2 = 20, value = 20",
@@ -241,7 +240,7 @@ trait MergeIntoSQLTests extends MergeIntoSQLMixin {
241240
assert(readSchema.flatten.isEmpty, "column pruning does not work")
242241
}
243242

244-
checkAnswer(readDeltaTable(tempPath),
243+
checkAnswer(readDeltaTableByIdentifier(),
245244
Row(2, 2) :: // No change
246245
Row(1, 4) :: // No change
247246
Row(10, 10) :: // Insert
@@ -259,7 +258,7 @@ trait MergeIntoSQLTests extends MergeIntoSQLMixin {
259258
// only the last NOT MATCHED clause can omit the condition
260259
val e = intercept[ParseException](
261260
sql(s"""
262-
|MERGE INTO delta.`$tempPath`
261+
|MERGE INTO $tableSQLIdentifier
263262
|USING source
264263
|ON srcKey = trgKey
265264
|WHEN NOT MATCHED THEN
@@ -460,14 +459,14 @@ trait MergeIntoSQLTests extends MergeIntoSQLMixin {
460459

461460
// View on path-based table
462461
append(Seq((0, 0), (1, 1)).toDF("key", "value"))
463-
readDeltaTable(tempPath).createOrReplaceTempView("v")
462+
readDeltaTableByIdentifier().createOrReplaceTempView("v")
464463
testMergeWithView("with path-based table")
465464

466465
// View on catalog table
467466
withTable("tab") {
468467
Seq((0, 0), (1, 1)).toDF("key", "value").write.format("delta").saveAsTable("tab")
469468
spark.table("tab").as("name").createOrReplaceTempView("v")
470-
testMergeWithView(s"delta.`$tempPath`")
469+
testMergeWithView(tableSQLIdentifier)
471470
}
472471
}
473472
}

spark/src/test/scala/org/apache/spark/sql/delta/MergeIntoScalaSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ import org.apache.spark.sql.types.StructType
3131
trait MergeIntoScalaMixin extends MergeIntoSuiteBaseMixin
3232
with MergeIntoScalaTestUtils
3333
with DeltaSQLCommandTest
34-
with DeltaDMLByPathTestUtils
34+
with DeltaDMLTestUtilsPathBased
3535
with DeltaTestUtilsForTempViews
3636
with DeltaExcludedTestMixin {
3737

spark/src/test/scala/org/apache/spark/sql/delta/MergeIntoSuiteBase.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2173,7 +2173,7 @@ trait MergeIntoSuiteBaseMiscTests extends MergeIntoSuiteBaseMixin {
21732173
}
21742174
}
21752175

2176-
test("Merge should use the same SparkSession consistently") {
2176+
test("Merge should use the same SparkSession consistently", NameBasedAccessIncompatible) {
21772177
withTempDir { dir =>
21782178
withSQLConf(DeltaSQLConf.DELTA_SCHEMA_AUTO_MIGRATE.key -> "false") {
21792179
val r = dir.getCanonicalPath
@@ -2270,7 +2270,7 @@ trait MergeIntoSuiteBaseMiscTests extends MergeIntoSuiteBaseMixin {
22702270
}
22712271
}
22722272

2273-
test("Negative case - MERGE to the child directory") {
2273+
test("Negative case - MERGE to the child directory", NameBasedAccessIncompatible) {
22742274
withTempDir { tempDir =>
22752275
val tempPath = tempDir.getCanonicalPath
22762276
val df = Seq((1, 1), (0, 3), (1, 5)).toDF("key2", "value")
@@ -2289,7 +2289,7 @@ trait MergeIntoSuiteBaseMiscTests extends MergeIntoSuiteBaseMixin {
22892289
}
22902290
}
22912291

2292-
test(s"special character in path - matched delete") {
2292+
test(s"special character in path - matched delete", NameBasedAccessIncompatible) {
22932293
withTempDir { tempDir =>
22942294
val source = s"$tempDir/sou rce~"
22952295
val target = s"$tempDir/tar get>"
@@ -2304,7 +2304,7 @@ trait MergeIntoSuiteBaseMiscTests extends MergeIntoSuiteBaseMixin {
23042304
}
23052305
}
23062306

2307-
test(s"special character in path - matched update") {
2307+
test(s"special character in path - matched update", NameBasedAccessIncompatible) {
23082308
withTempDir { tempDir =>
23092309
val source = s"$tempDir/sou rce("
23102310
val target = s"$tempDir/tar get*"

0 commit comments

Comments
 (0)