Skip to content

Commit ecf0b8f

Browse files
committed
Refactor selection vector logic into StreamingHelper.getAddFile
- Add overloaded getAddFile(FilteredColumnarBatch, rowId) that respects selection vector - Selection vectors filter out duplicate files from stats re-collection - Simplify SparkMicroBatchStream.loadAndValidateSnapshot by delegating to helper - Add V2 DeletionVectors streaming tests
1 parent e56e5e0 commit ecf0b8f

File tree

4 files changed

+138
-45
lines changed

4 files changed

+138
-45
lines changed
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
/*
2+
* Copyright (2021) The Delta Lake Project Authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.apache.spark.sql.delta.test
18+
19+
import org.apache.spark.sql.delta.{DeltaSourceDeletionVectorTests, DeltaSourceSuiteBase, PersistentDVEnabled}
20+
import org.apache.spark.sql.delta.sources.DeltaSQLConf
21+
22+
/**
23+
* Test suite that runs DeltaSourceDeletionVectorTests using the V2 connector.
24+
*/
25+
class DeltaSourceV2DeletionVectorsSuite extends DeltaSourceSuiteBase
26+
with DeltaSQLCommandTest
27+
with DeltaSourceDeletionVectorTests
28+
with PersistentDVEnabled
29+
with V2ForceTest {
30+
31+
override protected def useDsv2: Boolean = true
32+
33+
// Override executeSql to use V1 connector for write operations (DELETE/INSERT)
34+
// V2 connector doesn't support write operations yet
35+
override protected def executeSql(sqlText: String): Unit = {
36+
withSQLConf(DeltaSQLConf.V2_ENABLE_MODE.key -> "NONE") {
37+
sql(sqlText)
38+
}
39+
}
40+
41+
private lazy val shouldPassTests = Set(
42+
"allow to delete files before starting a streaming query",
43+
"allow to delete files before staring a streaming query without checkpoint",
44+
"multiple deletion vectors per file with initial snapshot"
45+
)
46+
47+
private lazy val shouldFailTests = Set(
48+
// These tests use ignoreDeletes/ignoreChanges options not yet supported in V2
49+
"deleting files fails query if ignoreDeletes = false",
50+
"allow to delete files after staring a streaming query when ignoreFileDeletion is true",
51+
"allow to delete files after staring a streaming query when ignoreDeletes is true",
52+
"updating the source table causes failure when ignoreChanges = false - using DELETE",
53+
"allow to update the source table when ignoreChanges = true - using DELETE",
54+
"deleting files when ignoreChanges = true doesn't fail the query",
55+
"updating source table when ignoreDeletes = true fails the query - using DELETE",
56+
"subsequent DML commands are processed correctly in a batch - DELETE->DELETE - List()",
57+
"subsequent DML commands are processed correctly in a batch - DELETE->DELETE" +
58+
" - List((ignoreDeletes,true))",
59+
"subsequent DML commands are processed correctly in a batch - DELETE->DELETE" +
60+
" - List((ignoreChanges,true))",
61+
"subsequent DML commands are processed correctly in a batch - DELETE->DELETE" +
62+
" - List((skipChangeCommits,true))",
63+
"subsequent DML commands are processed correctly in a batch - INSERT->DELETE - List()",
64+
"subsequent DML commands are processed correctly in a batch - INSERT->DELETE" +
65+
" - List((ignoreDeletes,true))",
66+
"subsequent DML commands are processed correctly in a batch - INSERT->DELETE" +
67+
" - List((ignoreChanges,true))",
68+
"subsequent DML commands are processed correctly in a batch - INSERT->DELETE" +
69+
" - List((skipChangeCommits,true))",
70+
"multiple deletion vectors per file - List((ignoreFileDeletion,true))",
71+
"multiple deletion vectors per file - List((ignoreChanges,true))"
72+
)
73+
74+
override protected def shouldFail(testName: String): Boolean = {
75+
val inPassList = shouldPassTests.contains(testName)
76+
val inFailList = shouldFailTests.contains(testName)
77+
78+
assert(inPassList || inFailList, s"Test '$testName' not in shouldPassTests or shouldFailTests")
79+
assert(!(inPassList && inFailList),
80+
s"Test '$testName' in both shouldPassTests and shouldFailTests")
81+
82+
inFailList
83+
}
84+
}

spark/src/test/scala/org/apache/spark/sql/delta/DeltaSourceDeletionVectorsSuite.scala

Lines changed: 23 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import java.io.File
2121
import scala.util.control.NonFatal
2222

2323
import org.apache.spark.sql.delta.Relocated.StreamExecution
24-
import org.apache.spark.sql.delta.test.DeltaSQLCommandTest
24+
import org.apache.spark.sql.delta.test.{DeltaSQLCommandTest, DeltaSQLTestUtils}
2525
import org.apache.hadoop.fs.Path
2626
import org.scalatest.concurrent.Eventually
2727
import org.scalatest.concurrent.PatienceConfiguration.Timeout
@@ -30,28 +30,31 @@ import org.apache.spark.sql.streaming.{StreamTest, Trigger}
3030
import org.apache.spark.sql.streaming.util.StreamManualClock
3131

3232
trait DeltaSourceDeletionVectorTests extends StreamTest
33-
with DeletionVectorsTestUtils {
33+
with DeletionVectorsTestUtils
34+
with DeltaSourceConnectorTrait {
35+
self: DeltaSQLTestUtils =>
3436

3537
import testImplicits._
3638

39+
/** Execute SQL statement. Override in V2 tests to use V1 connector for write operations. */
40+
protected def executeSql(sqlText: String): Unit = sql(sqlText)
41+
3742
test("allow to delete files before starting a streaming query") {
3843
withTempDir { inputDir =>
3944
val deltaLog = DeltaLog.forTable(spark, new Path(inputDir.toURI))
4045
(0 until 5).foreach { i =>
4146
val v = Seq(i.toString).toDF
4247
v.write.mode("append").format("delta").save(deltaLog.dataPath.toString)
4348
}
44-
sql(s"DELETE FROM delta.`$inputDir`")
49+
executeSql(s"DELETE FROM delta.`$inputDir`")
4550
(5 until 10).foreach { i =>
4651
val v = Seq(i.toString).toDF
4752
v.write.mode("append").format("delta").save(deltaLog.dataPath.toString)
4853
}
4954
deltaLog.checkpoint()
5055
assert(deltaLog.readLastCheckpointFile().nonEmpty, "this test requires a checkpoint")
5156

52-
val df = spark.readStream
53-
.format("delta")
54-
.load(inputDir.getCanonicalPath)
57+
val df = loadStreamWithOptions(inputDir.getCanonicalPath, Map.empty)
5558

5659
testStream(df)(
5760
AssertOnQuery { q =>
@@ -69,16 +72,14 @@ trait DeltaSourceDeletionVectorTests extends StreamTest
6972
val v = Seq(i.toString).toDF
7073
v.write.mode("append").format("delta").save(deltaLog.dataPath.toString)
7174
}
72-
sql(s"DELETE FROM delta.`$inputDir`")
75+
executeSql(s"DELETE FROM delta.`$inputDir`")
7376
(5 until 7).foreach { i =>
7477
val v = Seq(i.toString).toDF
7578
v.write.mode("append").format("delta").save(deltaLog.dataPath.toString)
7679
}
7780
assert(deltaLog.readLastCheckpointFile().isEmpty, "this test requires no checkpoint")
7881

79-
val df = spark.readStream
80-
.format("delta")
81-
.load(inputDir.getCanonicalPath)
82+
val df = loadStreamWithOptions(inputDir.getCanonicalPath, Map.empty)
8283

8384
testStream(df)(
8485
AssertOnQuery { q =>
@@ -115,7 +116,7 @@ trait DeltaSourceDeletionVectorTests extends StreamTest
115116
Seq(i, i + 1).toDF().coalesce(1).write.format("delta").mode("append").save(inputDir)
116117
}
117118

118-
val df = spark.readStream.format("delta").options(sourceOptions.toMap).load(inputDir)
119+
val df = loadStreamWithOptions(inputDir, sourceOptions.toMap)
119120
val expectDVs = commandShouldProduceDVs.getOrElse(
120121
sqlCommand.toUpperCase().startsWith("DELETE"))
121122

@@ -126,7 +127,7 @@ trait DeltaSourceDeletionVectorTests extends StreamTest
126127
},
127128
CheckAnswer((0 until 10): _*),
128129
AssertOnQuery { q =>
129-
sql(sqlCommand)
130+
executeSql(sqlCommand)
130131
deletionVectorsPresentIfExpected(inputDir, expectDVs)
131132
})
132133

@@ -148,7 +149,7 @@ trait DeltaSourceDeletionVectorTests extends StreamTest
148149
}
149150
val log = DeltaLog.forTable(spark, inputDir)
150151
val commitVersionBeforeDML = log.update().version
151-
val df = spark.readStream.format("delta").options(sourceOptions.toMap).load(inputDir)
152+
val df = loadStreamWithOptions(inputDir, sourceOptions.toMap)
152153
def expectDVsInCommand(shouldProduceDVs: Option[Boolean], command: String): Boolean = {
153154
shouldProduceDVs.getOrElse(command.toUpperCase().startsWith("DELETE"))
154155
}
@@ -177,11 +178,11 @@ trait DeltaSourceDeletionVectorTests extends StreamTest
177178
true
178179
},
179180
AssertOnQuery { q =>
180-
sql(sqlCommand1)
181+
executeSql(sqlCommand1)
181182
deletionVectorsPresentIfExpected(inputDir, expectDVsInCommand1)
182183
},
183184
AssertOnQuery { q =>
184-
sql(sqlCommand2)
185+
executeSql(sqlCommand2)
185186
deletionVectorsPresentIfExpected(inputDir, expectDVsInCommand2)
186187
},
187188
AssertOnQuery { q =>
@@ -416,21 +417,19 @@ trait DeltaSourceDeletionVectorTests extends StreamTest
416417
(0 until 10).toDF("value").coalesce(1).write.format("delta").save(path)
417418

418419
// V1: Delete row 0
419-
sql(s"DELETE FROM delta.`$path` WHERE value = 0")
420+
executeSql(s"DELETE FROM delta.`$path` WHERE value = 0")
420421

421422
// V2: Delete row 1
422-
sql(s"DELETE FROM delta.`$path` WHERE value = 1")
423+
executeSql(s"DELETE FROM delta.`$path` WHERE value = 1")
423424

424425
// V3: Delete row 2
425-
sql(s"DELETE FROM delta.`$path` WHERE value = 2")
426+
executeSql(s"DELETE FROM delta.`$path` WHERE value = 2")
426427

427428
// Verify DVs are present
428429
assert(getFilesWithDeletionVectors(deltaLog).nonEmpty,
429430
"This test requires deletion vectors to be present")
430431

431-
val df = spark.readStream
432-
.format("delta")
433-
.load(path)
432+
val df = loadStreamWithOptions(path, Map.empty)
434433

435434
testStream(df)(
436435
// Process the initial snapshot
@@ -457,10 +456,7 @@ trait DeltaSourceDeletionVectorTests extends StreamTest
457456
// V0: 10 rows in a single file
458457
(0 until 10).toDF("value").coalesce(1).write.format("delta").save(path)
459458

460-
val df = spark.readStream
461-
.format("delta")
462-
.options(sourceOptions.toMap)
463-
.load(path)
459+
val df = loadStreamWithOptions(path, sourceOptions.toMap)
464460

465461
testStream(df)(
466462
AssertOnQuery { q =>
@@ -470,12 +466,12 @@ trait DeltaSourceDeletionVectorTests extends StreamTest
470466
CheckAnswer((0 until 10): _*),
471467
AssertOnQuery { q =>
472468
// V1: Delete row 0 - creates first DV (version 1)
473-
sql(s"DELETE FROM delta.`$path` WHERE value = 0")
469+
executeSql(s"DELETE FROM delta.`$path` WHERE value = 0")
474470
true
475471
},
476472
AssertOnQuery { q =>
477473
// V2: Delete row 1 - updates DV (version 2). DV is cumulative: {0, 1}
478-
sql(s"DELETE FROM delta.`$path` WHERE value = 1")
474+
executeSql(s"DELETE FROM delta.`$path` WHERE value = 1")
479475
true
480476
},
481477
AssertOnQuery { q =>

spark/v2/src/main/java/io/delta/spark/internal/v2/read/SparkMicroBatchStream.java

Lines changed: 5 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import io.delta.kernel.CommitRange;
2323
import io.delta.kernel.Scan;
2424
import io.delta.kernel.Snapshot;
25-
import io.delta.kernel.data.ColumnVector;
2625
import io.delta.kernel.data.ColumnarBatch;
2726
import io.delta.kernel.data.FilteredColumnarBatch;
2827
import io.delta.kernel.defaults.engine.DefaultEngine;
@@ -1067,24 +1066,13 @@ private List<IndexedFile> loadAndValidateSnapshot(long version) {
10671066
try (CloseableIterator<FilteredColumnarBatch> filesIter = scan.getScanFiles(engine)) {
10681067
while (filesIter.hasNext()) {
10691068
FilteredColumnarBatch filteredBatch = filesIter.next();
1070-
ColumnarBatch batch = filteredBatch.getData();
1071-
Optional<ColumnVector> selectionVector = filteredBatch.getSelectionVector();
10721069

10731070
// Get all AddFiles from the batch. Include both dataChange=true and dataChange=false
1074-
// (checkpoint files) files. Respect the selection vector to filter out duplicate files
1075-
// (e.g., stats re-collection that re-adds files with updated stats).
1076-
for (int rowId = 0; rowId < batch.getSize(); rowId++) {
1077-
// Skip rows that are filtered out by the selection vector
1078-
final int currentRowId = rowId;
1079-
boolean shouldSkip =
1080-
selectionVector
1081-
.map(sv -> sv.isNullAt(currentRowId) || !sv.getBoolean(currentRowId))
1082-
.orElse(false);
1083-
if (shouldSkip) {
1084-
continue;
1085-
}
1086-
1087-
Optional<AddFile> addOpt = StreamingHelper.getAddFile(batch, currentRowId);
1071+
// (checkpoint files) files. StreamingHelper.getAddFile respects the selection vector
1072+
// to filter out duplicate files (e.g., stats re-collection re-adds files with updated
1073+
// stats).
1074+
for (int rowId = 0; rowId < filteredBatch.getData().getSize(); rowId++) {
1075+
Optional<AddFile> addOpt = StreamingHelper.getAddFile(filteredBatch, rowId);
10881076
if (addOpt.isPresent()) {
10891077
addFiles.add(addOpt.get());
10901078

spark/v2/src/main/java/io/delta/spark/internal/v2/utils/StreamingHelper.java

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import io.delta.kernel.CommitActions;
2222
import io.delta.kernel.data.ColumnVector;
2323
import io.delta.kernel.data.ColumnarBatch;
24+
import io.delta.kernel.data.FilteredColumnarBatch;
2425
import io.delta.kernel.data.Row;
2526
import io.delta.kernel.engine.Engine;
2627
import io.delta.kernel.internal.DeltaLogActionUtils;
@@ -64,7 +65,31 @@ public static long getVersion(ColumnarBatch batch) {
6465
return batch.getColumnVector(versionColIdx).getLong(0);
6566
}
6667

67-
/** Get AddFile action from a batch at the specified row, if present. */
68+
/**
69+
* Get AddFile action from a FilteredColumnarBatch at the specified row, if present.
70+
*
71+
* <p>This method respects the selection vector to filter out duplicate files that may appear when
72+
* stats re-collection (e.g., ANALYZE TABLE COMPUTE STATISTICS) re-adds files with updated stats.
73+
* The Kernel uses selection vectors to mark which rows (AddFiles) are logically valid.
74+
*
75+
* @param batch the FilteredColumnarBatch containing AddFile actions
76+
* @param rowId the row index to check
77+
* @return Optional containing the AddFile if present and selected, empty otherwise
78+
*/
79+
public static Optional<AddFile> getAddFile(FilteredColumnarBatch batch, int rowId) {
80+
// Check selection vector first - rows may be filtered out when stats re-collection
81+
// re-adds files with updated stats
82+
Optional<ColumnVector> selectionVector = batch.getSelectionVector();
83+
boolean isFiltered =
84+
selectionVector.map(sv -> sv.isNullAt(rowId) || !sv.getBoolean(rowId)).orElse(false);
85+
if (isFiltered) {
86+
return Optional.empty();
87+
}
88+
89+
return getAddFile(batch.getData(), rowId);
90+
}
91+
92+
/** Get AddFile action from a ColumnarBatch at the specified row, if present. */
6893
public static Optional<AddFile> getAddFile(ColumnarBatch batch, int rowId) {
6994
int addIdx = getFieldIndex(batch, DeltaLogActionUtils.DeltaAction.ADD.colName);
7095
ColumnVector addVector = batch.getColumnVector(addIdx);

0 commit comments

Comments
 (0)