Skip to content

Commit 0ced3e4

Browse files
committed
Move streaming DV and stats recompute tests to V2StreamingReadTest; revert non-v2 changes
1 parent 1b65973 commit 0ced3e4

File tree

5 files changed

+153
-137
lines changed

5 files changed

+153
-137
lines changed

spark-unified/src/test/scala/org/apache/spark/sql/delta/test/DeltaSourceV2DeletionVectorsSuite.scala

Lines changed: 0 additions & 84 deletions
This file was deleted.

spark-unified/src/test/scala/org/apache/spark/sql/delta/test/DeltaV2SourceSuite.scala

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -95,10 +95,7 @@ class DeltaV2SourceSuite extends DeltaSourceSuite with V2ForceTest {
9595

9696
// ========== Misc tests ==========
9797
"a fast writer should not starve a Delta source",
98-
"should not attempt to read a non exist version",
99-
100-
// ========== Stats recompute tests ==========
101-
"streaming read after stats recompute should not duplicate rows"
98+
"should not attempt to read a non exist version"
10299
)
103100

104101
private lazy val shouldFailTests = Set(

spark/src/test/scala/org/apache/spark/sql/delta/DeltaSourceDeletionVectorsSuite.scala

Lines changed: 27 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import java.io.File
2121
import scala.util.control.NonFatal
2222

2323
import org.apache.spark.sql.delta.Relocated.StreamExecution
24-
import org.apache.spark.sql.delta.test.{DeltaSQLCommandTest, DeltaSQLTestUtils}
24+
import org.apache.spark.sql.delta.test.DeltaSQLCommandTest
2525
import org.apache.hadoop.fs.Path
2626
import org.scalatest.concurrent.Eventually
2727
import org.scalatest.concurrent.PatienceConfiguration.Timeout
@@ -30,31 +30,28 @@ import org.apache.spark.sql.streaming.{StreamTest, Trigger}
3030
import org.apache.spark.sql.streaming.util.StreamManualClock
3131

3232
trait DeltaSourceDeletionVectorTests extends StreamTest
33-
with DeletionVectorsTestUtils
34-
with DeltaSourceConnectorTrait {
35-
self: DeltaSQLTestUtils =>
33+
with DeletionVectorsTestUtils {
3634

3735
import testImplicits._
3836

39-
/** Execute SQL statement. Override in V2 tests to use V1 connector for write operations. */
40-
protected def executeSql(sqlText: String): Unit = sql(sqlText)
41-
4237
test("allow to delete files before starting a streaming query") {
4338
withTempDir { inputDir =>
4439
val deltaLog = DeltaLog.forTable(spark, new Path(inputDir.toURI))
4540
(0 until 5).foreach { i =>
4641
val v = Seq(i.toString).toDF
4742
v.write.mode("append").format("delta").save(deltaLog.dataPath.toString)
4843
}
49-
executeSql(s"DELETE FROM delta.`$inputDir`")
44+
sql(s"DELETE FROM delta.`$inputDir`")
5045
(5 until 10).foreach { i =>
5146
val v = Seq(i.toString).toDF
5247
v.write.mode("append").format("delta").save(deltaLog.dataPath.toString)
5348
}
5449
deltaLog.checkpoint()
5550
assert(deltaLog.readLastCheckpointFile().nonEmpty, "this test requires a checkpoint")
5651

57-
val df = loadStreamWithOptions(inputDir.getCanonicalPath, Map.empty)
52+
val df = spark.readStream
53+
.format("delta")
54+
.load(inputDir.getCanonicalPath)
5855

5956
testStream(df)(
6057
AssertOnQuery { q =>
@@ -72,14 +69,16 @@ trait DeltaSourceDeletionVectorTests extends StreamTest
7269
val v = Seq(i.toString).toDF
7370
v.write.mode("append").format("delta").save(deltaLog.dataPath.toString)
7471
}
75-
executeSql(s"DELETE FROM delta.`$inputDir`")
72+
sql(s"DELETE FROM delta.`$inputDir`")
7673
(5 until 7).foreach { i =>
7774
val v = Seq(i.toString).toDF
7875
v.write.mode("append").format("delta").save(deltaLog.dataPath.toString)
7976
}
8077
assert(deltaLog.readLastCheckpointFile().isEmpty, "this test requires no checkpoint")
8178

82-
val df = loadStreamWithOptions(inputDir.getCanonicalPath, Map.empty)
79+
val df = spark.readStream
80+
.format("delta")
81+
.load(inputDir.getCanonicalPath)
8382

8483
testStream(df)(
8584
AssertOnQuery { q =>
@@ -116,7 +115,7 @@ trait DeltaSourceDeletionVectorTests extends StreamTest
116115
Seq(i, i + 1).toDF().coalesce(1).write.format("delta").mode("append").save(inputDir)
117116
}
118117

119-
val df = loadStreamWithOptions(inputDir, sourceOptions.toMap)
118+
val df = spark.readStream.format("delta").options(sourceOptions.toMap).load(inputDir)
120119
val expectDVs = commandShouldProduceDVs.getOrElse(
121120
sqlCommand.toUpperCase().startsWith("DELETE"))
122121

@@ -127,7 +126,7 @@ trait DeltaSourceDeletionVectorTests extends StreamTest
127126
},
128127
CheckAnswer((0 until 10): _*),
129128
AssertOnQuery { q =>
130-
executeSql(sqlCommand)
129+
sql(sqlCommand)
131130
deletionVectorsPresentIfExpected(inputDir, expectDVs)
132131
})
133132

@@ -149,7 +148,7 @@ trait DeltaSourceDeletionVectorTests extends StreamTest
149148
}
150149
val log = DeltaLog.forTable(spark, inputDir)
151150
val commitVersionBeforeDML = log.update().version
152-
val df = loadStreamWithOptions(inputDir, sourceOptions.toMap)
151+
val df = spark.readStream.format("delta").options(sourceOptions.toMap).load(inputDir)
153152
def expectDVsInCommand(shouldProduceDVs: Option[Boolean], command: String): Boolean = {
154153
shouldProduceDVs.getOrElse(command.toUpperCase().startsWith("DELETE"))
155154
}
@@ -178,11 +177,11 @@ trait DeltaSourceDeletionVectorTests extends StreamTest
178177
true
179178
},
180179
AssertOnQuery { q =>
181-
executeSql(sqlCommand1)
180+
sql(sqlCommand1)
182181
deletionVectorsPresentIfExpected(inputDir, expectDVsInCommand1)
183182
},
184183
AssertOnQuery { q =>
185-
executeSql(sqlCommand2)
184+
sql(sqlCommand2)
186185
deletionVectorsPresentIfExpected(inputDir, expectDVsInCommand2)
187186
},
188187
AssertOnQuery { q =>
@@ -417,19 +416,21 @@ trait DeltaSourceDeletionVectorTests extends StreamTest
417416
(0 until 10).toDF("value").coalesce(1).write.format("delta").save(path)
418417

419418
// V1: Delete row 0
420-
executeSql(s"DELETE FROM delta.`$path` WHERE value = 0")
419+
sql(s"DELETE FROM delta.`$path` WHERE value = 0")
421420

422421
// V2: Delete row 1
423-
executeSql(s"DELETE FROM delta.`$path` WHERE value = 1")
422+
sql(s"DELETE FROM delta.`$path` WHERE value = 1")
424423

425424
// V3: Delete row 2
426-
executeSql(s"DELETE FROM delta.`$path` WHERE value = 2")
425+
sql(s"DELETE FROM delta.`$path` WHERE value = 2")
427426

428427
// Verify DVs are present
429428
assert(getFilesWithDeletionVectors(deltaLog).nonEmpty,
430429
"This test requires deletion vectors to be present")
431430

432-
val df = loadStreamWithOptions(path, Map.empty)
431+
val df = spark.readStream
432+
.format("delta")
433+
.load(path)
433434

434435
testStream(df)(
435436
// Process the initial snapshot
@@ -456,7 +457,10 @@ trait DeltaSourceDeletionVectorTests extends StreamTest
456457
// V0: 10 rows in a single file
457458
(0 until 10).toDF("value").coalesce(1).write.format("delta").save(path)
458459

459-
val df = loadStreamWithOptions(path, sourceOptions.toMap)
460+
val df = spark.readStream
461+
.format("delta")
462+
.options(sourceOptions.toMap)
463+
.load(path)
460464

461465
testStream(df)(
462466
AssertOnQuery { q =>
@@ -466,12 +470,12 @@ trait DeltaSourceDeletionVectorTests extends StreamTest
466470
CheckAnswer((0 until 10): _*),
467471
AssertOnQuery { q =>
468472
// V1: Delete row 0 - creates first DV (version 1)
469-
executeSql(s"DELETE FROM delta.`$path` WHERE value = 0")
473+
sql(s"DELETE FROM delta.`$path` WHERE value = 0")
470474
true
471475
},
472476
AssertOnQuery { q =>
473477
// V2: Delete row 1 - updates DV (version 2). DV is cumulative: {0, 1}
474-
executeSql(s"DELETE FROM delta.`$path` WHERE value = 1")
478+
sql(s"DELETE FROM delta.`$path` WHERE value = 1")
475479
true
476480
},
477481
AssertOnQuery { q =>

spark/src/test/scala/org/apache/spark/sql/delta/DeltaSourceSuite.scala

Lines changed: 0 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@ import org.apache.spark.sql.delta.DeltaTestUtils.modifyCommitTimestamp
3030
import org.apache.spark.sql.delta.Relocated
3131
import org.apache.spark.sql.delta.actions.{AddFile, Protocol}
3232
import org.apache.spark.sql.delta.sources.{DeltaDataSource, DeltaSQLConf, DeltaSource, DeltaSourceOffset}
33-
import org.apache.spark.sql.delta.stats.StatisticsCollection
3433
import org.apache.spark.sql.delta.test.DeltaSQLCommandTest
3534
import org.apache.spark.sql.delta.test.DeltaTestImplicits._
3635
import org.apache.spark.sql.delta.test.shims.StreamingTestShims.{MemoryStream, OffsetSeqLog}
@@ -2669,31 +2668,6 @@ class DeltaSourceSuite extends DeltaSourceSuiteBase
26692668
)
26702669
}
26712670
}
2672-
2673-
test("streaming read after stats recompute should not duplicate rows") {
2674-
// StatisticsCollection.recompute re-adds files with updated stats.
2675-
// The initial snapshot scan should use selection vector to filter out duplicate files.
2676-
withTempDir { inputDir =>
2677-
val tablePath = inputDir.getCanonicalPath
2678-
2679-
// Create table with data (stats disabled initially)
2680-
withSQLConf(DeltaSQLConf.DELTA_COLLECT_STATS.key -> "false") {
2681-
spark.range(10).selectExpr("id", "cast(id as string) as value")
2682-
.write.format("delta").save(tablePath)
2683-
}
2684-
2685-
// Recompute statistics - this adds duplicate AddFile entries in the log
2686-
val deltaLog = DeltaLog.forTable(spark, new Path(tablePath))
2687-
StatisticsCollection.recompute(spark, deltaLog)
2688-
2689-
// Streaming read should only see each row once, not duplicated
2690-
val df = loadStreamWithOptions(tablePath, Map.empty)
2691-
testStream(df)(
2692-
AssertOnQuery { q => q.processAllAvailable(); true },
2693-
CheckAnswer((0L until 10L).map(i => Row(i, i.toString)): _*)
2694-
)
2695-
}
2696-
}
26972671
}
26982672

26992673
/**

0 commit comments

Comments
 (0)