Skip to content

Commit 37fff90

Browse files
authored
[Kernel-Spark] Phase 1: Basic Deletion Vector read support (delta-io#5774)
## 🥞 Stacked PR Use this [link](https://github.com/delta-io/delta/pull/5774/files) to review incremental changes. - [**stack/dv_pr2_phase1_basic_read**](delta-io#5774) [[Files changed](https://github.com/delta-io/delta/pull/5774/files)] - [stack/dv_pr3_phase2_vectorized](delta-io#5775) [[Files changed](https://github.com/delta-io/delta/pull/5775/files/6b90bdd14bd86a5bcfb109fb12d9447b769d4d24..9446011c512f4fa4d7f06417eb4629671813dbe4)] - [stack/dv_pr4_phase3_file_splitting](delta-io#5776) [[Files changed](https://github.com/delta-io/delta/pull/5776/files/9446011c512f4fa4d7f06417eb4629671813dbe4..5e798ef3a37c07b86f12de6f5465a5db3fbff153)] - [stack/dv_pr5_streaming_support](delta-io#5877) [[Files changed](https://github.com/delta-io/delta/pull/5877/files/5e798ef3a37c07b86f12de6f5465a5db3fbff153..1c4ddacd08654e2d778bfdfb992320cb6dcead9f)] - [stack/dv_pr5_test](delta-io#5975) [[Files changed](https://github.com/delta-io/delta/pull/5975/files/6b90bdd14bd86a5bcfb109fb12d9447b769d4d24..24ae4ed8d474449407582b05c0c0ab45f97a72ed)] --------- <!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md 2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP] Your PR title ...'. 3. Be sure to keep the PR description updated to reflect all changes. 4. Please write your PR title to summarize what this PR proposes. 5. If possible, provide a concise example to reproduce the issue for a faster review. 6. If applicable, include the corresponding issue number in the PR title and link it in the body. --> #### Which Delta project/connector is this regarding? <!-- Please add the component selected below to the beginning of the pull request title For example: [Spark] Title of my pull request --> - [x] Spark - [ ] Standalone - [ ] Flink - [x] Kernel - [ ] Other (fill in here) ## Description <!-- - Describe what this PR changes. - Describe why we need the change. If this PR resolves an issue be sure to include "Resolves #XXX" to correctly link and close the issue upon merge. --> Add basic deletion vector (DV) read support for the Spark V2 connector using row-based filtering. ### Changes: - `DvSchemaContext`: POJO to manage DV schema context (column indices, output schema) - `DeletionVectorReadFunction`: Wraps base reader to filter deleted rows and project out DV column - `PartitionUtils`: Creates DV-aware `PartitionReaderFactory` with `DeltaParquetFileFormatV2` - Add`serializeToBase64()` to Kernel's `DeletionVectorDescriptor` ### How it works: 1. Add `__delta_internal_is_row_deleted` column to read schema 2. Filter rows where DV column != 0 (deleted) 3. Project out DV column from output ## How was this patch tested? <!-- If tests were added, say they were added here. Please make sure to test the changes thoroughly including negative and positive cases if possible. If the changes were tested in any way other than unit tests, please clarify how you tested step by step (ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future). If the changes were not tested, please explain why. --> - `DvSchemaContextTest`: Unit tests for schema manipulation - `DeletionVectorReadFunctionTest`: Unit tests for row filtering and projection - Golden table tests with DV tables pass ## Does this PR introduce _any_ user-facing changes? <!-- If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible. If possible, please also clarify if this is a user-facing change compared to the released Delta Lake versions or within the unreleased branches such as master. If no, write 'No'. --> No
1 parent c0eebcd commit 37fff90

File tree

11 files changed

+801
-24
lines changed

11 files changed

+801
-24
lines changed

spark/v2/src/main/java/io/delta/spark/internal/v2/read/SparkPartitionReader.java

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -15,26 +15,25 @@
1515
*/
1616
package io.delta.spark.internal.v2.read;
1717

18+
import java.io.Closeable;
1819
import java.io.IOException;
1920
import org.apache.spark.sql.catalyst.InternalRow;
2021
import org.apache.spark.sql.connector.read.PartitionReader;
2122
import org.apache.spark.sql.execution.datasources.FilePartition;
2223
import org.apache.spark.sql.execution.datasources.PartitionedFile;
23-
import org.apache.spark.sql.execution.datasources.RecordReaderIterator;
2424
import scala.Function1;
2525
import scala.collection.Iterator;
2626

2727
public class SparkPartitionReader<T> implements PartitionReader<T> {
28-
// Function that produces a Spark RecordReaderIterator for a given file.
28+
// Function that produces an Iterator for a given file.
2929
private final Function1<PartitionedFile, Iterator<InternalRow>> readFunc;
3030
private final FilePartition partition;
3131

3232
// Index of the next file to read within the partition.
3333
private int currentFileIndex = 0;
3434

35-
// Spark's readers return RecordReaderIterator for both row and columnar modes.
36-
// Keep a reference so it can be closed when advancing to the next file.
37-
private RecordReaderIterator<T> currentIterator = null;
35+
// Current iterator for the file being read.
36+
private Iterator<T> currentIterator = null;
3837

3938
public SparkPartitionReader(
4039
Function1<PartitionedFile, Iterator<InternalRow>> readFunc, FilePartition partition) {
@@ -50,18 +49,15 @@ public boolean next() throws IOException {
5049
return true;
5150
}
5251

53-
if (currentIterator != null) {
54-
currentIterator.close();
55-
currentIterator = null;
56-
}
52+
closeCurrentIterator();
5753

5854
if (currentFileIndex >= partition.files().length) {
5955
return false;
6056
}
6157

6258
final PartitionedFile file = partition.files()[currentFileIndex++];
6359
@SuppressWarnings("unchecked")
64-
RecordReaderIterator<T> it = (RecordReaderIterator<T>) readFunc.apply(file);
60+
Iterator<T> it = (Iterator<T>) readFunc.apply(file);
6561
currentIterator = it;
6662
}
6763
}
@@ -71,14 +67,19 @@ public T get() {
7167
if (currentIterator == null) {
7268
throw new IllegalStateException("No current record. Call next() before get().");
7369
}
74-
// RecordReaderIterator.next() returns the current record and advances the iterator.
7570
return currentIterator.next();
7671
}
7772

7873
@Override
7974
public void close() throws IOException {
75+
closeCurrentIterator();
76+
}
77+
78+
private void closeCurrentIterator() throws IOException {
8079
if (currentIterator != null) {
81-
currentIterator.close();
80+
if (currentIterator instanceof Closeable) {
81+
((Closeable) currentIterator).close();
82+
}
8283
currentIterator = null;
8384
}
8485
}
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
/*
2+
* Copyright (2026) The Delta Lake Project Authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.delta.spark.internal.v2.read.deletionvector;
17+
18+
import io.delta.spark.internal.v2.utils.CloseableIterator;
19+
import java.io.Serializable;
20+
import org.apache.spark.sql.catalyst.InternalRow;
21+
import org.apache.spark.sql.catalyst.ProjectingInternalRow;
22+
import org.apache.spark.sql.execution.datasources.PartitionedFile;
23+
import scala.Function1;
24+
import scala.collection.Iterator;
25+
import scala.runtime.AbstractFunction1;
26+
27+
/**
28+
* Wraps a Parquet reader function to apply deletion vector filtering.
29+
*
30+
* <p>This function:
31+
*
32+
* <ol>
33+
* <li>Reads rows from the base Parquet reader (which includes the is_row_deleted column)
34+
* <li>Filters out deleted rows (where is_row_deleted != 0)
35+
* <li>Projects out the is_row_deleted column from the output
36+
* </ol>
37+
*
38+
* <p>The returned iterator implements {@link java.io.Closeable} to ensure proper resource cleanup
39+
* of the underlying Parquet reader, even when the iterator is not fully consumed.
40+
*/
41+
public class DeletionVectorReadFunction
42+
extends AbstractFunction1<PartitionedFile, Iterator<InternalRow>> implements Serializable {
43+
44+
private static final long serialVersionUID = 1L;
45+
46+
/** Byte value in the DV column indicating the row is NOT deleted (row should be kept). */
47+
private static final byte ROW_NOT_DELETED = 0;
48+
49+
private final Function1<PartitionedFile, Iterator<InternalRow>> baseReadFunc;
50+
private final DeletionVectorSchemaContext dvSchemaContext;
51+
52+
private DeletionVectorReadFunction(
53+
Function1<PartitionedFile, Iterator<InternalRow>> baseReadFunc,
54+
DeletionVectorSchemaContext dvSchemaContext) {
55+
this.baseReadFunc = baseReadFunc;
56+
this.dvSchemaContext = dvSchemaContext;
57+
}
58+
59+
@Override
60+
public Iterator<InternalRow> apply(PartitionedFile file) {
61+
int dvColumnIndex = dvSchemaContext.getDvColumnIndex();
62+
// Use pre-computed ordinals from DeletionVectorSchemaContext.
63+
ProjectingInternalRow projection =
64+
ProjectingInternalRow.apply(
65+
dvSchemaContext.getOutputSchema(), dvSchemaContext.getOutputColumnOrdinals());
66+
67+
// Wrap the base iterator as CloseableIterator to preserve close() through filter/map.
68+
// This ensures proper resource cleanup even when the iterator is not fully consumed.
69+
Iterator<InternalRow> baseIterator = baseReadFunc.apply(file);
70+
71+
return CloseableIterator.wrap(baseIterator)
72+
.filterCloseable(row -> row.getByte(dvColumnIndex) == ROW_NOT_DELETED)
73+
.mapCloseable(
74+
row -> {
75+
projection.project(row);
76+
return (InternalRow) projection;
77+
});
78+
}
79+
80+
/** Factory method to wrap a reader function with DV filtering. */
81+
public static DeletionVectorReadFunction wrap(
82+
Function1<PartitionedFile, Iterator<InternalRow>> baseReadFunc,
83+
DeletionVectorSchemaContext dvSchemaContext) {
84+
return new DeletionVectorReadFunction(baseReadFunc, dvSchemaContext);
85+
}
86+
}
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
/*
2+
* Copyright (2026) The Delta Lake Project Authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.delta.spark.internal.v2.read.deletionvector;
17+
18+
import java.io.Serializable;
19+
import java.util.Arrays;
20+
import org.apache.spark.sql.delta.DeltaParquetFileFormat;
21+
import org.apache.spark.sql.types.StructType;
22+
import scala.collection.immutable.Seq;
23+
24+
/**
25+
* Schema context for deletion vector processing in the V2 connector.
26+
*
27+
* <p>Encapsulates schema with DV column and pre-computed indices needed for DV filtering.
28+
*/
29+
public class DeletionVectorSchemaContext implements Serializable {
30+
31+
private static final long serialVersionUID = 1L;
32+
33+
private final StructType schemaWithDvColumn;
34+
private final int dvColumnIndex;
35+
private final int inputColumnCount;
36+
private final StructType outputSchema;
37+
private final Seq<Object> outputColumnOrdinals;
38+
39+
/**
40+
* Create a DV schema context for encapsulating schema info and indices needed for DV filtering.
41+
*
42+
* @param readDataSchema original data schema without DV column
43+
* @param partitionSchema partition columns schema
44+
* @throws IllegalArgumentException if readDataSchema already contains the DV column
45+
*/
46+
public DeletionVectorSchemaContext(StructType readDataSchema, StructType partitionSchema) {
47+
// Validate that readDataSchema doesn't already contain the DV column to ensure the DV column
48+
// is added only once. While Delta uses the "__delta_internal_" prefix as a naming convention
49+
// for internal columns (listed in DeltaColumnMapping.DELTA_INTERNAL_COLUMNS), there's no
50+
// enforced schema validation that prevents users from creating such columns. This check
51+
// provides a safety guard in the V2 connector.
52+
String dvColumnName = DeltaParquetFileFormat.IS_ROW_DELETED_COLUMN_NAME();
53+
if (Arrays.asList(readDataSchema.fieldNames()).contains(dvColumnName)) {
54+
throw new IllegalArgumentException(
55+
"readDataSchema already contains the deletion vector column: " + dvColumnName);
56+
}
57+
this.schemaWithDvColumn =
58+
readDataSchema.add(DeltaParquetFileFormat.IS_ROW_DELETED_STRUCT_FIELD());
59+
this.dvColumnIndex =
60+
schemaWithDvColumn.fieldIndex(DeltaParquetFileFormat.IS_ROW_DELETED_COLUMN_NAME());
61+
this.inputColumnCount = schemaWithDvColumn.fields().length + partitionSchema.fields().length;
62+
this.outputSchema = readDataSchema.merge(partitionSchema, /* handleDuplicateColumns= */ false);
63+
// Pre-compute output column ordinals: all indices except dvColumnIndex.
64+
int[] ordinals = new int[inputColumnCount - 1];
65+
int idx = 0;
66+
for (int i = 0; i < inputColumnCount; i++) {
67+
if (i != dvColumnIndex) {
68+
ordinals[idx++] = i;
69+
}
70+
}
71+
this.outputColumnOrdinals = scala.Predef.wrapIntArray(ordinals).toSeq();
72+
}
73+
74+
/** Returns schema with the __delta_internal_is_row_deleted column added. */
75+
public StructType getSchemaWithDvColumn() {
76+
return schemaWithDvColumn;
77+
}
78+
79+
public int getDvColumnIndex() {
80+
return dvColumnIndex;
81+
}
82+
83+
public int getInputColumnCount() {
84+
return inputColumnCount;
85+
}
86+
87+
public StructType getOutputSchema() {
88+
return outputSchema;
89+
}
90+
91+
/** Returns pre-computed output column ordinals for ProjectingInternalRow. */
92+
public Seq<Object> getOutputColumnOrdinals() {
93+
return outputColumnOrdinals;
94+
}
95+
}

0 commit comments

Comments
 (0)