Skip to content

Commit b6b2d6c

Browse files
committed
[Kernel-Spark] Phase 3: Enable file splitting for DV tables with _metadata.row_index support
1 parent c37d128 commit b6b2d6c

File tree

3 files changed

+139
-26
lines changed

3 files changed

+139
-26
lines changed

spark/v2/src/main/java/io/delta/spark/internal/v2/read/deletionvector/DeletionVectorSchemaContext.java

Lines changed: 47 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,11 @@
1616
package io.delta.spark.internal.v2.read.deletionvector;
1717

1818
import java.io.Serializable;
19+
import java.util.ArrayList;
1920
import java.util.Arrays;
21+
import java.util.List;
2022
import org.apache.spark.sql.delta.DeltaParquetFileFormat;
23+
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat;
2124
import org.apache.spark.sql.types.StructType;
2225
import scala.collection.immutable.Seq;
2326

@@ -35,15 +38,18 @@ public class DeletionVectorSchemaContext implements Serializable {
3538
private final int inputColumnCount;
3639
private final StructType outputSchema;
3740
private final Seq<Object> outputColumnOrdinals;
41+
private final List<Integer> outputColumnOrdinalsList;
3842

3943
/**
4044
* Create a DV schema context for encapsulating schema info and indices needed for DV filtering.
4145
*
4246
* @param readDataSchema original data schema without DV column
4347
* @param partitionSchema partition columns schema
48+
* @param useMetadataRowIndex whether to include _metadata.row_index for file splitting support
4449
* @throws IllegalArgumentException if readDataSchema already contains the DV column
4550
*/
46-
public DeletionVectorSchemaContext(StructType readDataSchema, StructType partitionSchema) {
51+
public DeletionVectorSchemaContext(
52+
StructType readDataSchema, StructType partitionSchema, boolean useMetadataRowIndex) {
4753
// Validate that readDataSchema doesn't already contain the DV column to ensure the DV column
4854
// is added only once. While Delta uses the "__delta_internal_" prefix as a naming convention
4955
// for internal columns (listed in DeltaColumnMapping.DELTA_INTERNAL_COLUMNS), there's no
@@ -54,21 +60,47 @@ public DeletionVectorSchemaContext(StructType readDataSchema, StructType partiti
5460
throw new IllegalArgumentException(
5561
"readDataSchema already contains the deletion vector column: " + dvColumnName);
5662
}
63+
64+
// Build schema: data columns + (optional row_index) + DV column
65+
StructType schemaBuilder = readDataSchema;
66+
if (useMetadataRowIndex) {
67+
schemaBuilder = schemaBuilder.add(ParquetFileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME(), "long");
68+
}
5769
this.schemaWithDvColumn =
58-
readDataSchema.add(DeltaParquetFileFormat.IS_ROW_DELETED_STRUCT_FIELD());
70+
schemaBuilder.add(DeltaParquetFileFormat.IS_ROW_DELETED_STRUCT_FIELD());
71+
5972
this.dvColumnIndex =
6073
schemaWithDvColumn.fieldIndex(DeltaParquetFileFormat.IS_ROW_DELETED_COLUMN_NAME());
6174
this.inputColumnCount = schemaWithDvColumn.fields().length + partitionSchema.fields().length;
6275
this.outputSchema = readDataSchema.merge(partitionSchema, /* handleDuplicateColumns= */ false);
63-
// Pre-compute output column ordinals: all indices except dvColumnIndex.
64-
int[] ordinals = new int[inputColumnCount - 1];
65-
int idx = 0;
66-
for (int i = 0; i < inputColumnCount; i++) {
67-
if (i != dvColumnIndex) {
68-
ordinals[idx++] = i;
69-
}
76+
77+
// Pre-compute output column ordinals: data columns + partition columns (skip row_index and DV)
78+
List<Integer> ordinals = new ArrayList<>();
79+
int partitionStartIdx = schemaWithDvColumn.fields().length;
80+
81+
// Add data column indices (0 to readDataSchema.length - 1)
82+
for (int i = 0; i < readDataSchema.fields().length; i++) {
83+
ordinals.add(i);
84+
}
85+
// Add partition column indices
86+
for (int i = 0; i < partitionSchema.fields().length; i++) {
87+
ordinals.add(partitionStartIdx + i);
7088
}
71-
this.outputColumnOrdinals = scala.Predef.wrapIntArray(ordinals).toSeq();
89+
90+
this.outputColumnOrdinalsList = ordinals;
91+
int[] ordinalsArray = ordinals.stream().mapToInt(Integer::intValue).toArray();
92+
this.outputColumnOrdinals = scala.Predef.wrapIntArray(ordinalsArray).toSeq();
93+
}
94+
95+
/**
96+
* Create a DV schema context without row_index support (for basic DV reads).
97+
*
98+
* @param readDataSchema original data schema without DV column
99+
* @param partitionSchema partition columns schema
100+
* @throws IllegalArgumentException if readDataSchema already contains the DV column
101+
*/
102+
public DeletionVectorSchemaContext(StructType readDataSchema, StructType partitionSchema) {
103+
this(readDataSchema, partitionSchema, /* useMetadataRowIndex= */ false);
72104
}
73105

74106
/** Returns schema with the __delta_internal_is_row_deleted column added. */
@@ -92,4 +124,9 @@ public StructType getOutputSchema() {
92124
public Seq<Object> getOutputColumnOrdinals() {
93125
return outputColumnOrdinals;
94126
}
127+
128+
/** Returns pre-computed output column ordinals as a Java List (for testing). */
129+
public List<Integer> getOutputColumnOrdinalsAsList() {
130+
return outputColumnOrdinalsList;
131+
}
95132
}

spark/v2/src/test/java/io/delta/spark/internal/v2/read/deletionvector/DeletionVectorSchemaContextTest.java

Lines changed: 61 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,16 @@
1717

1818
import static org.junit.jupiter.api.Assertions.*;
1919

20+
import java.util.Arrays;
21+
import java.util.List;
22+
import java.util.stream.Collectors;
2023
import org.apache.spark.sql.delta.DeltaParquetFileFormat;
24+
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat;
2125
import org.apache.spark.sql.types.DataTypes;
2226
import org.apache.spark.sql.types.StructType;
2327
import org.junit.jupiter.api.Test;
28+
import org.junit.jupiter.params.ParameterizedTest;
29+
import org.junit.jupiter.params.provider.CsvSource;
2430

2531
public class DeletionVectorSchemaContextTest {
2632

@@ -30,27 +36,64 @@ public class DeletionVectorSchemaContextTest {
3036
private static final StructType PARTITION_SCHEMA =
3137
new StructType().add("date", DataTypes.StringType);
3238

39+
@ParameterizedTest(name = "useMetadataRowIndex={0}")
40+
@CsvSource({"false, 3, 2", "true, 4, 3"})
41+
void testSchemaWithDvColumn(
42+
boolean useMetadataRowIndex, int expectedFieldCount, int expectedDvIndex) {
43+
DeletionVectorSchemaContext context =
44+
new DeletionVectorSchemaContext(DATA_SCHEMA, PARTITION_SCHEMA, useMetadataRowIndex);
45+
46+
StructType schemaWithDv = context.getSchemaWithDvColumn();
47+
assertEquals(expectedFieldCount, schemaWithDv.fields().length);
48+
assertEquals("id", schemaWithDv.fields()[0].name());
49+
assertEquals("name", schemaWithDv.fields()[1].name());
50+
51+
if (useMetadataRowIndex) {
52+
assertEquals(
53+
ParquetFileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME(), schemaWithDv.fields()[2].name());
54+
}
55+
assertEquals(
56+
DeltaParquetFileFormat.IS_ROW_DELETED_COLUMN_NAME(),
57+
schemaWithDv.fields()[expectedDvIndex].name());
58+
}
59+
60+
@ParameterizedTest(name = "useMetadataRowIndex={0}")
61+
@CsvSource({"false, 4", "true, 5"})
62+
void testInputColumnCount(boolean useMetadataRowIndex, int expectedCount) {
63+
DeletionVectorSchemaContext context =
64+
new DeletionVectorSchemaContext(DATA_SCHEMA, PARTITION_SCHEMA, useMetadataRowIndex);
65+
assertEquals(expectedCount, context.getInputColumnCount());
66+
}
67+
68+
@ParameterizedTest(name = "useMetadataRowIndex={0}")
69+
@CsvSource({"false, '0,1,3'", "true, '0,1,4'"})
70+
void testOutputColumnOrdinals(boolean useMetadataRowIndex, String expectedOrdinalsStr) {
71+
DeletionVectorSchemaContext context =
72+
new DeletionVectorSchemaContext(DATA_SCHEMA, PARTITION_SCHEMA, useMetadataRowIndex);
73+
74+
List<Integer> expected =
75+
Arrays.stream(expectedOrdinalsStr.split(","))
76+
.map(String::trim)
77+
.map(Integer::parseInt)
78+
.collect(Collectors.toList());
79+
assertEquals(expected, context.getOutputColumnOrdinalsAsList());
80+
}
81+
3382
@Test
34-
void testWithFullSchemas() {
83+
void testOutputSchema() {
3584
DeletionVectorSchemaContext context =
36-
new DeletionVectorSchemaContext(DATA_SCHEMA, PARTITION_SCHEMA);
85+
new DeletionVectorSchemaContext(DATA_SCHEMA, PARTITION_SCHEMA, /* useMetadataRowIndex= */ false);
3786

38-
StructType expectedSchemaWithDv =
39-
DATA_SCHEMA.add(DeltaParquetFileFormat.IS_ROW_DELETED_STRUCT_FIELD());
40-
assertEquals(expectedSchemaWithDv, context.getSchemaWithDvColumn());
41-
assertEquals(2, context.getDvColumnIndex());
42-
// Input: 2 data + 1 DV + 1 partition = 4.
43-
assertEquals(4, context.getInputColumnCount());
44-
StructType expectedOutputSchema =
87+
StructType expectedSchema =
4588
DATA_SCHEMA.merge(PARTITION_SCHEMA, /* handleDuplicateColumns= */ false);
46-
assertEquals(expectedOutputSchema, context.getOutputSchema());
89+
assertEquals(expectedSchema, context.getOutputSchema());
4790
}
4891

4992
@Test
5093
void testEmptyPartitionSchema() {
51-
StructType emptyPartition = new StructType();
94+
StructType emptyPartitionSchema = new StructType();
5295
DeletionVectorSchemaContext context =
53-
new DeletionVectorSchemaContext(DATA_SCHEMA, emptyPartition);
96+
new DeletionVectorSchemaContext(DATA_SCHEMA, emptyPartitionSchema, /* useMetadataRowIndex= */ false);
5497

5598
StructType expectedSchemaWithDv =
5699
DATA_SCHEMA.add(DeltaParquetFileFormat.IS_ROW_DELETED_STRUCT_FIELD());
@@ -63,12 +106,12 @@ void testEmptyPartitionSchema() {
63106

64107
@Test
65108
void testEmptyDataSchema() {
66-
StructType emptyData = new StructType();
109+
StructType emptyDataSchema = new StructType();
67110
DeletionVectorSchemaContext context =
68-
new DeletionVectorSchemaContext(emptyData, PARTITION_SCHEMA);
111+
new DeletionVectorSchemaContext(emptyDataSchema, PARTITION_SCHEMA, /* useMetadataRowIndex= */ false);
69112

70113
StructType expectedSchemaWithDv =
71-
emptyData.add(DeltaParquetFileFormat.IS_ROW_DELETED_STRUCT_FIELD());
114+
emptyDataSchema.add(DeltaParquetFileFormat.IS_ROW_DELETED_STRUCT_FIELD());
72115
assertEquals(expectedSchemaWithDv, context.getSchemaWithDvColumn());
73116
assertEquals(0, context.getDvColumnIndex());
74117
// Input: 1 DV + 1 partition = 2.
@@ -87,7 +130,9 @@ void testDuplicateDvColumnThrowsException() {
87130
IllegalArgumentException exception =
88131
assertThrows(
89132
IllegalArgumentException.class,
90-
() -> new DeletionVectorSchemaContext(schemaWithDv, new StructType()));
133+
() ->
134+
new DeletionVectorSchemaContext(
135+
schemaWithDv, new StructType(), /* useMetadataRowIndex= */ false));
91136

92137
assertTrue(
93138
exception.getMessage().contains(DeltaParquetFileFormat.IS_ROW_DELETED_COLUMN_NAME()));

spark/v2/src/test/java/io/delta/spark/internal/v2/read/deletionvector/DeletionVectorVectorizedReaderTest.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,37 @@ private Row convertInternalRowToRow(InternalRow internalRow, StructType schema)
136136
return new org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema(values, schema);
137137
}
138138

139+
/**
140+
* Test that file splitting is enabled for DV tables when useMetadataRowIndex=true (default). This
141+
* verifies PR4's key change: optimizationsEnabled=true enables isSplitable=true.
142+
*/
143+
@Test
144+
public void testFileSplittingEnabledForDvTable() throws Exception {
145+
String tableName = "dv-partitioned-with-checkpoint";
146+
String tablePath = goldenTablePath(tableName);
147+
148+
SparkTable table =
149+
new SparkTable(
150+
Identifier.of(new String[] {"spark_catalog", "default"}, tableName), tablePath);
151+
SparkScanBuilder scanBuilder =
152+
(SparkScanBuilder) table.newScanBuilder(new CaseInsensitiveStringMap(java.util.Map.of()));
153+
SparkScan scan = (SparkScan) scanBuilder.build();
154+
Batch batch = scan.toBatch();
155+
156+
// Verify we can get partitions (file splitting is allowed)
157+
InputPartition[] partitions = batch.planInputPartitions();
158+
assertTrue(partitions.length > 0, "Should have at least one partition");
159+
160+
// The key assertion: with useMetadataRowIndex=true (default), DV tables allow file splitting.
161+
// This is verified indirectly by the fact that planInputPartitions() succeeds and
162+
// supportColumnarReads() returns true. If optimizationsEnabled was false,
163+
// isSplitable would return false, preventing efficient file partitioning.
164+
PartitionReaderFactory readerFactory = batch.createReaderFactory();
165+
assertTrue(
166+
readerFactory.supportColumnarReads(partitions[0]),
167+
"DV table with useMetadataRowIndex=true should support columnar reads");
168+
}
169+
139170
private String goldenTablePath(String name) {
140171
return GoldenTableUtils$.MODULE$.goldenTablePath(name);
141172
}

0 commit comments

Comments
 (0)