Skip to content

Commit 4c90831

Browse files
authored
Spark: ORC vectorized reader to use the delete filter (apache#14746)
1 parent 7bac865 commit 4c90831

File tree

4 files changed

+125
-82
lines changed

4 files changed

+125
-82
lines changed

spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkOrcReaders.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -462,7 +462,9 @@ public ColumnVector convert(
462462
} else if (field.equals(MetadataColumns.ROW_POSITION)) {
463463
fieldVectors.add(new RowPositionColumnVector(batchOffsetInFile));
464464
} else if (field.equals(MetadataColumns.IS_DELETED)) {
465-
fieldVectors.add(new ConstantColumnVector(field.type(), batchSize, false));
465+
DeletedColumnVector deletedVector = new DeletedColumnVector(field.type());
466+
deletedVector.setValue(new boolean[batchSize]);
467+
fieldVectors.add(deletedVector);
466468
} else if (field.type().equals(Types.UnknownType.get())) {
467469
fieldVectors.add(new ConstantColumnVector(field.type(), batchSize, null));
468470
} else {

spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,10 +52,16 @@
5252
import org.apache.iceberg.ManifestFile;
5353
import org.apache.iceberg.Schema;
5454
import org.apache.iceberg.Snapshot;
55+
import org.apache.iceberg.StructLike;
5556
import org.apache.iceberg.Table;
5657
import org.apache.iceberg.TableScan;
58+
import org.apache.iceberg.data.DeleteFilter;
59+
import org.apache.iceberg.deletes.DeleteCounter;
60+
import org.apache.iceberg.deletes.PositionDeleteIndex;
5761
import org.apache.iceberg.io.CloseableIterable;
62+
import org.apache.iceberg.io.InputFile;
5863
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
64+
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
5965
import org.apache.iceberg.relocated.com.google.common.collect.Streams;
6066
import org.apache.iceberg.spark.SparkSchemaUtil;
6167
import org.apache.iceberg.types.Type;
@@ -883,4 +889,69 @@ public static Dataset<Row> selectNonDerived(Dataset<Row> metadataTable) {
883889
public static Types.StructType nonDerivedSchema(Dataset<Row> metadataTable) {
884890
return SparkSchemaUtil.convert(TestHelpers.selectNonDerived(metadataTable).schema()).asStruct();
885891
}
892+
893+
public static class CustomizedDeleteFilter extends DeleteFilter<InternalRow> {
894+
private final boolean hasDeletes;
895+
896+
protected CustomizedDeleteFilter(
897+
boolean hasDeletes, Schema tableSchema, Schema projectedSchema) {
898+
super("", List.of(), tableSchema, projectedSchema, new DeleteCounter(), true);
899+
this.hasDeletes = hasDeletes;
900+
}
901+
902+
@Override
903+
protected StructLike asStructLike(InternalRow record) {
904+
return null;
905+
}
906+
907+
@Override
908+
protected InputFile getInputFile(String location) {
909+
return null;
910+
}
911+
912+
@Override
913+
public boolean hasPosDeletes() {
914+
return hasDeletes;
915+
}
916+
917+
@Override
918+
public PositionDeleteIndex deletedRowPositions() {
919+
PositionDeleteIndex deletedRowPos = new CustomizedPositionDeleteIndex();
920+
if (hasDeletes) {
921+
deletedRowPos.delete(98, 103);
922+
}
923+
924+
return deletedRowPos;
925+
}
926+
}
927+
928+
public static class CustomizedPositionDeleteIndex implements PositionDeleteIndex {
929+
private final Set<Long> deleteIndex;
930+
931+
private CustomizedPositionDeleteIndex() {
932+
deleteIndex = Sets.newHashSet();
933+
}
934+
935+
@Override
936+
public void delete(long position) {
937+
deleteIndex.add(position);
938+
}
939+
940+
@Override
941+
public void delete(long posStart, long posEnd) {
942+
for (long l = posStart; l < posEnd; l++) {
943+
delete(l);
944+
}
945+
}
946+
947+
@Override
948+
public boolean isDeleted(long position) {
949+
return deleteIndex.contains(position);
950+
}
951+
952+
@Override
953+
public boolean isEmpty() {
954+
return deleteIndex.isEmpty();
955+
}
956+
}
886957
}

spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkOrcReadMetadataColumns.java

Lines changed: 45 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import static org.apache.iceberg.types.Types.NestedField.required;
2222
import static org.assertj.core.api.Assertions.assertThat;
23+
import static org.assertj.core.api.Assumptions.assumeThat;
2324

2425
import java.io.File;
2526
import java.io.IOException;
@@ -36,6 +37,7 @@
3637
import org.apache.iceberg.ParameterizedTestExtension;
3738
import org.apache.iceberg.Parameters;
3839
import org.apache.iceberg.Schema;
40+
import org.apache.iceberg.data.DeleteFilter;
3941
import org.apache.iceberg.exceptions.RuntimeIOException;
4042
import org.apache.iceberg.expressions.Expression;
4143
import org.apache.iceberg.expressions.Expressions;
@@ -46,6 +48,7 @@
4648
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
4749
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
4850
import org.apache.iceberg.spark.data.vectorized.VectorizedSparkOrcReaders;
51+
import org.apache.iceberg.spark.source.BatchReaderUtil;
4952
import org.apache.iceberg.types.TypeUtil;
5053
import org.apache.iceberg.types.Types;
5154
import org.apache.orc.OrcConf;
@@ -74,7 +77,11 @@ public class TestSparkOrcReadMetadataColumns {
7477
MetadataColumns.ROW_POSITION,
7578
MetadataColumns.IS_DELETED);
7679

80+
private static final DeleteFilter<InternalRow> NO_DELETES_FILTER =
81+
new TestHelpers.CustomizedDeleteFilter(false, DATA_SCHEMA, PROJECTION_SCHEMA);
82+
7783
private static final int NUM_ROWS = 1000;
84+
private static final int RECORDS_PER_BATCH = 10;
7885
private static final List<InternalRow> DATA_ROWS;
7986
private static final List<InternalRow> EXPECTED_ROWS;
8087

@@ -128,13 +135,35 @@ public void writeFile() throws IOException {
128135

129136
@TestTemplate
130137
public void testReadRowNumbers() throws IOException {
131-
readAndValidate(null, null, null, EXPECTED_ROWS);
138+
readAndValidate(null, null, null, EXPECTED_ROWS, NO_DELETES_FILTER);
139+
}
140+
141+
@TestTemplate
142+
public void testReadRowNumbersWithDelete() throws IOException {
143+
assumeThat(vectorized).isTrue();
144+
145+
List<InternalRow> expectedRowsAfterDelete = Lists.newArrayList();
146+
EXPECTED_ROWS.forEach(row -> expectedRowsAfterDelete.add(row.copy()));
147+
// remove row at position 98, 99, 100, 101, 102, this crosses two row groups [0, 100) and [100,
148+
// 200)
149+
for (int i = 98; i <= 102; i++) {
150+
expectedRowsAfterDelete.get(i).update(3, true);
151+
}
152+
153+
DeleteFilter<InternalRow> deleteFilter =
154+
new TestHelpers.CustomizedDeleteFilter(true, DATA_SCHEMA, PROJECTION_SCHEMA);
155+
156+
readAndValidate(null, null, null, expectedRowsAfterDelete, deleteFilter);
132157
}
133158

134159
@TestTemplate
135160
public void testReadRowNumbersWithFilter() throws IOException {
136161
readAndValidate(
137-
Expressions.greaterThanOrEqual("id", 500), null, null, EXPECTED_ROWS.subList(500, 1000));
162+
Expressions.greaterThanOrEqual("id", 500),
163+
null,
164+
null,
165+
EXPECTED_ROWS.subList(500, 1000),
166+
NO_DELETES_FILTER);
138167
}
139168

140169
@TestTemplate
@@ -157,12 +186,17 @@ public void testReadRowNumbersWithSplits() throws IOException {
157186
null,
158187
splitOffsets.get(i),
159188
splitLengths.get(i),
160-
EXPECTED_ROWS.subList(i * 100, (i + 1) * 100));
189+
EXPECTED_ROWS.subList(i * 100, (i + 1) * 100),
190+
NO_DELETES_FILTER);
161191
}
162192
}
163193

164194
private void readAndValidate(
165-
Expression filter, Long splitStart, Long splitLength, List<InternalRow> expected)
195+
Expression filter,
196+
Long splitStart,
197+
Long splitLength,
198+
List<InternalRow> expected,
199+
DeleteFilter<InternalRow> deleteFilter)
166200
throws IOException {
167201
Schema projectionWithoutMetadataFields =
168202
TypeUtil.selectNot(PROJECTION_SCHEMA, MetadataColumns.metadataFieldIds());
@@ -173,10 +207,12 @@ private void readAndValidate(
173207

174208
if (vectorized) {
175209
builder =
176-
builder.createBatchedReaderFunc(
177-
readOrcSchema ->
178-
VectorizedSparkOrcReaders.buildReader(
179-
PROJECTION_SCHEMA, readOrcSchema, ImmutableMap.of()));
210+
builder
211+
.recordsPerBatch(RECORDS_PER_BATCH)
212+
.createBatchedReaderFunc(
213+
readOrcSchema ->
214+
VectorizedSparkOrcReaders.buildReader(
215+
PROJECTION_SCHEMA, readOrcSchema, ImmutableMap.of()));
180216
} else {
181217
builder =
182218
builder.createReaderFunc(
@@ -192,7 +228,7 @@ private void readAndValidate(
192228
}
193229

194230
if (vectorized) {
195-
reader = batchesToRows(builder.build());
231+
reader = batchesToRows(BatchReaderUtil.applyDeleteFilter(builder.build(), deleteFilter));
196232
} else {
197233
reader = builder.build();
198234
}

spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReadMetadataColumns.java

Lines changed: 6 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import java.io.IOException;
2727
import java.util.Iterator;
2828
import java.util.List;
29-
import java.util.Set;
3029
import org.apache.hadoop.conf.Configuration;
3130
import org.apache.hadoop.fs.Path;
3231
import org.apache.iceberg.Files;
@@ -35,21 +34,16 @@
3534
import org.apache.iceberg.ParameterizedTestExtension;
3635
import org.apache.iceberg.Parameters;
3736
import org.apache.iceberg.Schema;
38-
import org.apache.iceberg.StructLike;
3937
import org.apache.iceberg.data.DeleteFilter;
40-
import org.apache.iceberg.deletes.DeleteCounter;
41-
import org.apache.iceberg.deletes.PositionDeleteIndex;
4238
import org.apache.iceberg.expressions.Expression;
4339
import org.apache.iceberg.expressions.Expressions;
4440
import org.apache.iceberg.io.CloseableIterable;
4541
import org.apache.iceberg.io.FileAppender;
46-
import org.apache.iceberg.io.InputFile;
4742
import org.apache.iceberg.parquet.Parquet;
4843
import org.apache.iceberg.parquet.ParquetSchemaUtil;
4944
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
5045
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
5146
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
52-
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
5347
import org.apache.iceberg.spark.SparkSchemaUtil;
5448
import org.apache.iceberg.spark.data.vectorized.VectorizedSparkParquetReaders;
5549
import org.apache.iceberg.spark.source.BatchReaderUtil;
@@ -183,7 +177,8 @@ public void testReadRowNumbersWithDelete() throws IOException {
183177
Parquet.ReadBuilder builder =
184178
Parquet.read(Files.localInput(testFile)).project(PROJECTION_SCHEMA);
185179

186-
DeleteFilter<InternalRow> deleteFilter = new TestDeleteFilter(true);
180+
DeleteFilter<InternalRow> deleteFilter =
181+
new TestHelpers.CustomizedDeleteFilter(true, DATA_SCHEMA, PROJECTION_SCHEMA);
187182

188183
builder.createBatchedReaderFunc(
189184
fileSchema ->
@@ -194,70 +189,6 @@ public void testReadRowNumbersWithDelete() throws IOException {
194189
validate(expectedRowsAfterDelete, builder, deleteFilter);
195190
}
196191

197-
private static class TestDeleteFilter extends DeleteFilter<InternalRow> {
198-
private final boolean hasDeletes;
199-
200-
protected TestDeleteFilter(boolean hasDeletes) {
201-
super("", List.of(), DATA_SCHEMA, PROJECTION_SCHEMA, new DeleteCounter(), true);
202-
this.hasDeletes = hasDeletes;
203-
}
204-
205-
@Override
206-
protected StructLike asStructLike(InternalRow record) {
207-
return null;
208-
}
209-
210-
@Override
211-
protected InputFile getInputFile(String location) {
212-
return null;
213-
}
214-
215-
@Override
216-
public boolean hasPosDeletes() {
217-
return hasDeletes;
218-
}
219-
220-
@Override
221-
public PositionDeleteIndex deletedRowPositions() {
222-
PositionDeleteIndex deletedRowPos = new CustomizedPositionDeleteIndex();
223-
if (hasDeletes) {
224-
deletedRowPos.delete(98, 103);
225-
}
226-
227-
return deletedRowPos;
228-
}
229-
}
230-
231-
private static class CustomizedPositionDeleteIndex implements PositionDeleteIndex {
232-
private final Set<Long> deleteIndex;
233-
234-
private CustomizedPositionDeleteIndex() {
235-
deleteIndex = Sets.newHashSet();
236-
}
237-
238-
@Override
239-
public void delete(long position) {
240-
deleteIndex.add(position);
241-
}
242-
243-
@Override
244-
public void delete(long posStart, long posEnd) {
245-
for (long l = posStart; l < posEnd; l++) {
246-
delete(l);
247-
}
248-
}
249-
250-
@Override
251-
public boolean isDeleted(long position) {
252-
return deleteIndex.contains(position);
253-
}
254-
255-
@Override
256-
public boolean isEmpty() {
257-
return deleteIndex.isEmpty();
258-
}
259-
}
260-
261192
@TestTemplate
262193
public void testReadRowNumbersWithFilter() throws IOException {
263194
// current iceberg supports row group filter.
@@ -314,7 +245,10 @@ private void readAndValidate(
314245
builder = builder.split(splitStart, splitLength);
315246
}
316247

317-
validate(expected, builder, new TestDeleteFilter(false));
248+
validate(
249+
expected,
250+
builder,
251+
new TestHelpers.CustomizedDeleteFilter(false, DATA_SCHEMA, PROJECTION_SCHEMA));
318252
}
319253

320254
private void validate(

0 commit comments

Comments
 (0)