Skip to content

Commit c37d128

Browse files
committed
[Kernel-Spark] Phase 2: Vectorized reader support for Deletion Vectors
1 parent 7e0e050 commit c37d128

File tree

5 files changed

+480
-12
lines changed

5 files changed

+480
-12
lines changed
Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,144 @@
1+
/*
2+
* Copyright (2025) The Delta Lake Project Authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.delta.spark.internal.v2.read.deletionvector;
17+
18+
import org.apache.spark.sql.types.Decimal;
19+
import org.apache.spark.sql.types.StructType;
20+
import org.apache.spark.sql.vectorized.ColumnVector;
21+
import org.apache.spark.sql.vectorized.ColumnarArray;
22+
import org.apache.spark.sql.vectorized.ColumnarMap;
23+
import org.apache.spark.unsafe.types.UTF8String;
24+
25+
/**
26+
* A column vector that applies row-level filtering using a row ID mapping.
27+
*
28+
* <p>Wraps an existing column vector and remaps row indices during data access, effectively
29+
* filtering the original data to only expose the live subset of rows without copying data.
30+
*
31+
* <p>Follows Apache Iceberg's ColumnVectorWithFilter pattern.
32+
*/
33+
public class ColumnVectorWithFilter extends ColumnVector {
34+
private final ColumnVector delegate;
35+
private final int[] rowIdMapping;
36+
private volatile ColumnVectorWithFilter[] children = null;
37+
38+
public ColumnVectorWithFilter(ColumnVector delegate, int[] rowIdMapping) {
39+
super(delegate.dataType());
40+
this.delegate = delegate;
41+
this.rowIdMapping = rowIdMapping;
42+
}
43+
44+
@Override
45+
public void close() {
46+
delegate.close();
47+
}
48+
49+
@Override
50+
public boolean hasNull() {
51+
return delegate.hasNull();
52+
}
53+
54+
@Override
55+
public int numNulls() {
56+
// Computing the actual number of nulls with rowIdMapping is expensive.
57+
// It is OK to overestimate and return the number of nulls in the original vector.
58+
return delegate.numNulls();
59+
}
60+
61+
@Override
62+
public boolean isNullAt(int rowId) {
63+
return delegate.isNullAt(rowIdMapping[rowId]);
64+
}
65+
66+
@Override
67+
public boolean getBoolean(int rowId) {
68+
return delegate.getBoolean(rowIdMapping[rowId]);
69+
}
70+
71+
@Override
72+
public byte getByte(int rowId) {
73+
return delegate.getByte(rowIdMapping[rowId]);
74+
}
75+
76+
@Override
77+
public short getShort(int rowId) {
78+
return delegate.getShort(rowIdMapping[rowId]);
79+
}
80+
81+
@Override
82+
public int getInt(int rowId) {
83+
return delegate.getInt(rowIdMapping[rowId]);
84+
}
85+
86+
@Override
87+
public long getLong(int rowId) {
88+
return delegate.getLong(rowIdMapping[rowId]);
89+
}
90+
91+
@Override
92+
public float getFloat(int rowId) {
93+
return delegate.getFloat(rowIdMapping[rowId]);
94+
}
95+
96+
@Override
97+
public double getDouble(int rowId) {
98+
return delegate.getDouble(rowIdMapping[rowId]);
99+
}
100+
101+
@Override
102+
public ColumnarArray getArray(int rowId) {
103+
return delegate.getArray(rowIdMapping[rowId]);
104+
}
105+
106+
@Override
107+
public ColumnarMap getMap(int rowId) {
108+
return delegate.getMap(rowIdMapping[rowId]);
109+
}
110+
111+
@Override
112+
public Decimal getDecimal(int rowId, int precision, int scale) {
113+
return delegate.getDecimal(rowIdMapping[rowId], precision, scale);
114+
}
115+
116+
@Override
117+
public UTF8String getUTF8String(int rowId) {
118+
return delegate.getUTF8String(rowIdMapping[rowId]);
119+
}
120+
121+
@Override
122+
public byte[] getBinary(int rowId) {
123+
return delegate.getBinary(rowIdMapping[rowId]);
124+
}
125+
126+
@Override
127+
public ColumnVector getChild(int ordinal) {
128+
if (children == null) {
129+
synchronized (this) {
130+
if (children == null) {
131+
// Eagerly create all children to avoid race condition on children[ordinal] access
132+
StructType structType = (StructType) dataType();
133+
ColumnVectorWithFilter[] newChildren =
134+
new ColumnVectorWithFilter[structType.fields().length];
135+
for (int i = 0; i < newChildren.length; i++) {
136+
newChildren[i] = new ColumnVectorWithFilter(delegate.getChild(i), rowIdMapping);
137+
}
138+
children = newChildren;
139+
}
140+
}
141+
}
142+
return children[ordinal];
143+
}
144+
}

spark/v2/src/main/java/io/delta/spark/internal/v2/read/deletionvector/DeletionVectorReadFunction.java

Lines changed: 65 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,14 @@
1717

1818
import io.delta.spark.internal.v2.utils.CloseableIterator;
1919
import java.io.Serializable;
20+
import java.util.Arrays;
2021
import org.apache.spark.sql.catalyst.InternalRow;
2122
import org.apache.spark.sql.catalyst.ProjectingInternalRow;
2223
import org.apache.spark.sql.execution.datasources.PartitionedFile;
24+
import org.apache.spark.sql.vectorized.ColumnVector;
25+
import org.apache.spark.sql.vectorized.ColumnarBatch;
2326
import scala.Function1;
2427
import scala.collection.Iterator;
25-
import scala.runtime.AbstractFunction1;
2628

2729
/**
2830
* Wraps a Parquet reader function to apply deletion vector filtering.
@@ -39,7 +41,8 @@
3941
* of the underlying Parquet reader, even when the iterator is not fully consumed.
4042
*/
4143
public class DeletionVectorReadFunction
42-
extends AbstractFunction1<PartitionedFile, Iterator<InternalRow>> implements Serializable {
44+
extends scala.runtime.AbstractFunction1<PartitionedFile, Iterator<InternalRow>>
45+
implements Serializable {
4346

4447
private static final long serialVersionUID = 1L;
4548

@@ -59,22 +62,74 @@ private DeletionVectorReadFunction(
5962
@Override
6063
public Iterator<InternalRow> apply(PartitionedFile file) {
6164
int dvColumnIndex = dvSchemaContext.getDvColumnIndex();
65+
int outputColumnCount = dvSchemaContext.getOutputSchema().fields().length;
6266
// Use pre-computed ordinals from DeletionVectorSchemaContext.
6367
ProjectingInternalRow projection =
6468
ProjectingInternalRow.apply(
6569
dvSchemaContext.getOutputSchema(), dvSchemaContext.getOutputColumnOrdinals());
6670

6771
// Wrap the base iterator as CloseableIterator to preserve close() through filter/map.
6872
// This ensures proper resource cleanup even when the iterator is not fully consumed.
69-
Iterator<InternalRow> baseIterator = baseReadFunc.apply(file);
73+
// Use Object as type: Spark passes ColumnarBatch cast to InternalRow in vectorized mode.
74+
@SuppressWarnings("unchecked")
75+
Iterator<Object> baseIterator = (Iterator<Object>) (Iterator<?>) baseReadFunc.apply(file);
7076

71-
return CloseableIterator.wrap(baseIterator)
72-
.filterCloseable(row -> row.getByte(dvColumnIndex) == ROW_NOT_DELETED)
73-
.mapCloseable(
74-
row -> {
75-
projection.project(row);
76-
return (InternalRow) projection;
77-
});
77+
// Filter: skip deleted rows (noop for vectorized - batch filtering done in map)
78+
// Map: project row / filter batch
79+
@SuppressWarnings("unchecked")
80+
Iterator<InternalRow> result =
81+
(Iterator<InternalRow>)
82+
(Iterator<?>)
83+
CloseableIterator.wrap(baseIterator)
84+
.filterCloseable(
85+
item -> {
86+
if (item instanceof InternalRow) {
87+
// Row-based: filter deleted rows
88+
return ((InternalRow) item).getByte(dvColumnIndex) == ROW_NOT_DELETED;
89+
}
90+
// Vectorized: noop (batch filtering done in map)
91+
return true;
92+
})
93+
.mapCloseable(
94+
item -> {
95+
if (item instanceof ColumnarBatch) {
96+
return filterBatch(
97+
(ColumnarBatch) item, dvColumnIndex, outputColumnCount);
98+
} else {
99+
// Row-based: project out DV column
100+
projection.project((InternalRow) item);
101+
return projection;
102+
}
103+
});
104+
return result;
105+
}
106+
107+
/** Filter a ColumnarBatch by building row ID mapping for live rows. */
108+
private static ColumnarBatch filterBatch(
109+
ColumnarBatch batch, int dvColumnIndex, int outputColumnCount) {
110+
int[] liveRows = findLiveRows(batch, dvColumnIndex);
111+
// Build filtered column vectors (excluding DV column)
112+
ColumnVector[] filteredVectors = new ColumnVector[outputColumnCount];
113+
int outIdx = 0;
114+
for (int i = 0; i < batch.numCols(); i++) {
115+
if (i != dvColumnIndex) {
116+
filteredVectors[outIdx++] = new ColumnVectorWithFilter(batch.column(i), liveRows);
117+
}
118+
}
119+
return new ColumnarBatch(filteredVectors, liveRows.length);
120+
}
121+
122+
/** Find indices of rows where DV column is 0 (not deleted). */
123+
private static int[] findLiveRows(ColumnarBatch batch, int dvColumnIndex) {
124+
ColumnVector dvColumn = batch.column(dvColumnIndex);
125+
int[] temp = new int[batch.numRows()];
126+
int count = 0;
127+
for (int i = 0; i < batch.numRows(); i++) {
128+
if (dvColumn.getByte(i) == ROW_NOT_DELETED) {
129+
temp[count++] = i;
130+
}
131+
}
132+
return Arrays.copyOf(temp, count);
78133
}
79134

80135
/** Factory method to wrap a reader function with DV filtering. */

spark/v2/src/main/java/io/delta/spark/internal/v2/utils/PartitionUtils.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -211,9 +211,9 @@ public static PartitionReaderFactory createDeltaParquetReaderFactory(
211211
.map(DeletionVectorSchemaContext::getSchemaWithDvColumn)
212212
.orElse(readDataSchema);
213213

214-
// TODO(https://github.com/delta-io/delta/issues/5859): Enable vectorized reader for DV tables
214+
// Vectorized reader is supported for DV tables using ColumnVectorWithFilter
215215
boolean enableVectorizedReader =
216-
!isTableSupportDv && ParquetUtils.isBatchReadSupportedForSchema(sqlConf, readDataSchema);
216+
ParquetUtils.isBatchReadSupportedForSchema(sqlConf, readDataSchema);
217217
scala.collection.immutable.Map<String, String> optionsWithVectorizedReading =
218218
scalaOptions.$plus(
219219
new Tuple2<>(
Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
/*
2+
* Copyright (2025) The Delta Lake Project Authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.delta.spark.internal.v2.read.deletionvector;
17+
18+
import static org.junit.jupiter.api.Assertions.*;
19+
20+
import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector;
21+
import org.apache.spark.sql.execution.vectorized.WritableColumnVector;
22+
import org.apache.spark.sql.types.DataTypes;
23+
import org.junit.jupiter.api.Test;
24+
25+
public class ColumnVectorWithFilterTest {
26+
27+
@Test
28+
void testIntegerFiltering() {
29+
try (WritableColumnVector delegate = new OnHeapColumnVector(5, DataTypes.IntegerType)) {
30+
for (int i = 0; i < 5; i++) {
31+
delegate.putInt(i, (i + 1) * 10); // [10, 20, 30, 40, 50]
32+
}
33+
ColumnVectorWithFilter filtered = new ColumnVectorWithFilter(delegate, new int[] {1, 3});
34+
35+
assertEquals(20, filtered.getInt(0));
36+
assertEquals(40, filtered.getInt(1));
37+
}
38+
}
39+
40+
@Test
41+
void testLongFiltering() {
42+
try (WritableColumnVector delegate = new OnHeapColumnVector(4, DataTypes.LongType)) {
43+
for (int i = 0; i < 4; i++) {
44+
delegate.putLong(i, (i + 1) * 100L);
45+
}
46+
ColumnVectorWithFilter filtered = new ColumnVectorWithFilter(delegate, new int[] {0, 2});
47+
48+
assertEquals(100L, filtered.getLong(0));
49+
assertEquals(300L, filtered.getLong(1));
50+
}
51+
}
52+
53+
@Test
54+
void testDoubleFiltering() {
55+
try (WritableColumnVector delegate = new OnHeapColumnVector(3, DataTypes.DoubleType)) {
56+
delegate.putDouble(0, 1.1);
57+
delegate.putDouble(1, 2.2);
58+
delegate.putDouble(2, 3.3);
59+
ColumnVectorWithFilter filtered = new ColumnVectorWithFilter(delegate, new int[] {0, 2});
60+
61+
assertEquals(1.1, filtered.getDouble(0), 0.001);
62+
assertEquals(3.3, filtered.getDouble(1), 0.001);
63+
}
64+
}
65+
66+
@Test
67+
void testBooleanFiltering() {
68+
try (WritableColumnVector delegate = new OnHeapColumnVector(4, DataTypes.BooleanType)) {
69+
delegate.putBoolean(0, true);
70+
delegate.putBoolean(1, false);
71+
delegate.putBoolean(2, true);
72+
delegate.putBoolean(3, false);
73+
ColumnVectorWithFilter filtered = new ColumnVectorWithFilter(delegate, new int[] {1, 2});
74+
75+
assertFalse(filtered.getBoolean(0));
76+
assertTrue(filtered.getBoolean(1));
77+
}
78+
}
79+
80+
@Test
81+
void testStringFiltering() {
82+
try (WritableColumnVector delegate = new OnHeapColumnVector(3, DataTypes.StringType)) {
83+
delegate.putByteArray(0, "alice".getBytes());
84+
delegate.putByteArray(1, "bob".getBytes());
85+
delegate.putByteArray(2, "charlie".getBytes());
86+
ColumnVectorWithFilter filtered = new ColumnVectorWithFilter(delegate, new int[] {2});
87+
88+
assertEquals("charlie", filtered.getUTF8String(0).toString());
89+
}
90+
}
91+
92+
@Test
93+
void testNullHandling() {
94+
try (WritableColumnVector delegate = new OnHeapColumnVector(4, DataTypes.IntegerType)) {
95+
delegate.putInt(0, 10);
96+
delegate.putNull(1);
97+
delegate.putInt(2, 30);
98+
delegate.putNull(3);
99+
ColumnVectorWithFilter filtered =
100+
new ColumnVectorWithFilter(delegate, new int[] {0, 1, 2, 3});
101+
102+
assertFalse(filtered.isNullAt(0));
103+
assertTrue(filtered.isNullAt(1));
104+
assertFalse(filtered.isNullAt(2));
105+
assertTrue(filtered.isNullAt(3));
106+
}
107+
}
108+
109+
@Test
110+
void testEmptyAndIdentityMapping() {
111+
try (WritableColumnVector delegate = new OnHeapColumnVector(3, DataTypes.IntegerType)) {
112+
delegate.putInt(0, 10);
113+
delegate.putInt(1, 20);
114+
delegate.putInt(2, 30);
115+
116+
// Empty mapping
117+
ColumnVectorWithFilter empty = new ColumnVectorWithFilter(delegate, new int[] {});
118+
assertEquals(DataTypes.IntegerType, empty.dataType());
119+
120+
// Identity mapping
121+
ColumnVectorWithFilter identity = new ColumnVectorWithFilter(delegate, new int[] {0, 1, 2});
122+
assertEquals(10, identity.getInt(0));
123+
assertEquals(20, identity.getInt(1));
124+
assertEquals(30, identity.getInt(2));
125+
}
126+
}
127+
}

0 commit comments

Comments
 (0)