Skip to content

Commit 3e53bea

Browse files
authored
[Kernel] [Refactor] Update ParquetHandler::readParquetFiles interface to return a different data type (delta-io#4802)
<!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md 2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP] Your PR title ...'. 3. Be sure to keep the PR description updated to reflect all changes. 4. Please write your PR title to summarize what this PR proposes. 5. If possible, provide a concise example to reproduce the issue for a faster review. 6. If applicable, include the corresponding issue number in the PR title and link it in the body. --> #### Which Delta project/connector is this regarding? <!-- Please add the component selected below to the beginning of the pull request title For example: [Spark] Title of my pull request --> - [ ] Spark - [ ] Standalone - [ ] Flink - [x] Kernel - [ ] Other (fill in here) ## Description Update ParquetHandler::readParquetFiles interface. Change the element type of returned CloseableIterator from ColumnarBatch to FileReadResult. <!-- - Describe what this PR changes. - Describe why we need the change. If this PR resolves an issue be sure to include "Resolves #XXX" to correctly link and close the issue upon merge. --> ## How was this patch tested? <!-- If tests were added, say they were added here. Please make sure to test the changes thoroughly including negative and positive cases if possible. If the changes were tested in any way other than unit tests, please clarify how you tested step by step (ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future). If the changes were not tested, please explain why. --> ## Does this PR introduce _any_ user-facing changes? Yes. <!-- If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible. If possible, please also clarify if this is a user-facing change compared to the released Delta Lake versions or within the unreleased branches such as master. If no, write 'No'. -->
1 parent fd80d6c commit 3e53bea

File tree

13 files changed

+127
-50
lines changed

13 files changed

+127
-50
lines changed

kernel/examples/kernel-examples/src/main/java/io/delta/kernel/examples/MultiThreadedTableReader.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -244,7 +244,7 @@ private Runnable workConsumer(int workerId) {
244244
engine.getParquetHandler().readParquetFiles(
245245
singletonCloseableIterator(fileStatus),
246246
physicalReadSchema,
247-
Optional.empty());
247+
Optional.empty()).map(res -> res.getData());
248248

249249
try (
250250
CloseableIterator<FilteredColumnarBatch> dataIter =

kernel/examples/kernel-examples/src/main/java/io/delta/kernel/examples/SingleThreadedTableReader.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ private int readData(StructType readSchema, Scan scan, int maxRowCount) throws I
115115
engine.getParquetHandler().readParquetFiles(
116116
singletonCloseableIterator(fileStatus),
117117
physicalReadSchema,
118-
Optional.empty());
118+
Optional.empty()).map(res -> res.getData());
119119
try (
120120
CloseableIterator<FilteredColumnarBatch> transformedData =
121121
Scan.transformPhysicalData(
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/*
2+
* Copyright (2025) The Delta Lake Project Authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.delta.kernel.engine;
17+
18+
import io.delta.kernel.data.ColumnarBatch;
19+
import java.util.Objects;
20+
21+
/**
22+
* The result of reading a batch of data in a file.
23+
*
24+
* <p>Encapsulates both the data read (as a {@link ColumnarBatch}) and the full path of the file
25+
* from which the data was read.
26+
*/
27+
public class FileReadResult {
28+
29+
private final ColumnarBatch data;
30+
private final String filePath;
31+
32+
/**
33+
* Constructs a {@code FileReadResult} object with the given data and file path.
34+
*
35+
* @param data the columnar batch of data read from the file
36+
* @param filePath the path of the file from which the data was read
37+
*/
38+
public FileReadResult(ColumnarBatch data, String filePath) {
39+
this.data = Objects.requireNonNull(data, "data must not be null");
40+
this.filePath = Objects.requireNonNull(filePath, "filePath must not be null");
41+
}
42+
43+
/** @return {@link ColumnarBatch} of data that was read from the file. */
44+
public ColumnarBatch getData() {
45+
return data;
46+
}
47+
48+
/** @return the path of the file that this data was read from. */
49+
public String getFilePath() {
50+
return filePath;
51+
}
52+
}

kernel/kernel-api/src/main/java/io/delta/kernel/engine/ParquetHandler.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -56,12 +56,12 @@ public interface ParquetHandler {
5656
* @param predicate Optional predicate which the Parquet reader can optionally use to prune rows
5757
* that don't satisfy the predicate. Because pruning is optional and may be incomplete, caller
5858
* is still responsible apply the predicate on the data returned by this method.
59-
* @return an iterator of {@link ColumnarBatch}s containing the data in columnar format. It is the
60-
* responsibility of the caller to close the iterator. The data returned is in the same as the
61-
* order of files given in {@code scanFileIter}.
59+
* @return an iterator of {@link FileReadResult}s containing the data in columnar format along
60+
* with metadata. It is the responsibility of the caller to close the iterator. The data
61+
* returned is in the same as the order of files given in {@code scanFileIter}.
6262
* @throws IOException if an I/O error occurs during the read.
6363
*/
64-
CloseableIterator<ColumnarBatch> readParquetFiles(
64+
CloseableIterator<FileReadResult> readParquetFiles(
6565
CloseableIterator<FileStatus> fileIter,
6666
StructType physicalSchema,
6767
Optional<Predicate> predicate)

kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/ActionsIterator.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import io.delta.kernel.data.ColumnVector;
2828
import io.delta.kernel.data.ColumnarBatch;
2929
import io.delta.kernel.engine.Engine;
30+
import io.delta.kernel.engine.FileReadResult;
3031
import io.delta.kernel.expressions.*;
3132
import io.delta.kernel.internal.checkpoints.SidecarFile;
3233
import io.delta.kernel.internal.fs.Path;
@@ -228,7 +229,8 @@ private CloseableIterator<ColumnarBatch> getActionsIterFromSinglePartOrV2Checkpo
228229
.readParquetFiles(
229230
singletonCloseableIterator(file),
230231
finalReadSchema,
231-
checkpointPredicateIncludingSidecars),
232+
checkpointPredicateIncludingSidecars)
233+
.map(FileReadResult::getData),
232234
"Reading parquet log file `%s` with readSchema=%s and predicate=%s",
233235
file,
234236
finalReadSchema,
@@ -353,8 +355,8 @@ private CloseableIterator<ActionWrapper> getNextActionsIter() {
353355
() ->
354356
engine
355357
.getParquetHandler()
356-
.readParquetFiles(
357-
checkpointFiles, deltaReadSchema, checkpointPredicate),
358+
.readParquetFiles(checkpointFiles, deltaReadSchema, checkpointPredicate)
359+
.map(FileReadResult::getData),
358360
"Reading checkpoint sidecars [%s] with readSchema=%s and predicate=%s",
359361
checkpointFiles,
360362
deltaReadSchema,

kernel/kernel-api/src/test/scala/io/delta/kernel/internal/SnapshotManagerSuite.scala

Lines changed: 35 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import scala.collection.JavaConverters._
2121
import scala.reflect.ClassTag
2222

2323
import io.delta.kernel.data.{ColumnarBatch, ColumnVector}
24+
import io.delta.kernel.engine.FileReadResult
2425
import io.delta.kernel.exceptions.{InvalidTableException, TableNotFoundException}
2526
import io.delta.kernel.expressions.Predicate
2627
import io.delta.kernel.internal.checkpoints.{CheckpointInstance, CheckpointMetaData, SidecarFile}
@@ -135,6 +136,7 @@ class SnapshotManagerSuite extends AnyFunSuite with MockFileSystemClientUtils {
135136
topLevelFileTypes.foreach { topLevelFileType =>
136137
val v2Checkpoints =
137138
v2CheckpointFileStatuses(v2CheckpointSpec, topLevelFileType)
139+
138140
val checkpointFiles = v2Checkpoints.flatMap {
139141
case (topLevelCheckpointFile, sidecars) =>
140142
Seq(topLevelCheckpointFile) ++ sidecars
@@ -160,11 +162,15 @@ class SnapshotManagerSuite extends AnyFunSuite with MockFileSystemClientUtils {
160162
}.getOrElse((Seq.empty, Seq.empty))
161163

162164
val compactions = compactedFileStatuses(compactionVersions)
163-
165+
val mockSidecarParquetHandler = if (expectedSidecars.nonEmpty) {
166+
new MockSidecarParquetHandler(expectedSidecars, expectedV2Checkpoint.head.getPath)
167+
} else {
168+
new BaseMockParquetHandler {}
169+
}
164170
val logSegment = snapshotManager.getLogSegmentForVersion(
165171
createMockFSListFromEngine(
166172
listFromProvider(deltas ++ compactions ++ checkpointFiles)("/"),
167-
new MockSidecarParquetHandler(expectedSidecars),
173+
mockSidecarParquetHandler,
168174
new MockSidecarJsonHandler(expectedSidecars)),
169175
versionToLoad)
170176

@@ -775,31 +781,39 @@ class SnapshotManagerSuite extends AnyFunSuite with MockFileSystemClientUtils {
775781
}
776782

777783
trait SidecarIteratorProvider extends VectorTestUtils {
778-
def singletonSidecarIterator(sidecars: Seq[FileStatus])
779-
: CloseableIterator[ColumnarBatch] = Utils.singletonCloseableIterator(
780-
new ColumnarBatch {
781-
override def getSchema: StructType = SidecarFile.READ_SCHEMA
782-
783-
override def getColumnVector(ordinal: Int): ColumnVector = {
784-
ordinal match {
785-
case 0 => stringVector(sidecars.map(_.getPath)) // path
786-
case 1 => longVector(sidecars.map(_.getSize): _*) // size
787-
case 2 =>
788-
longVector(sidecars.map(_.getModificationTime): _*); // modification time
789-
}
790-
}
791784

792-
override def getSize: Int = sidecars.length
793-
})
785+
private def buildSidecarBatch(sidecars: Seq[FileStatus]): ColumnarBatch = new ColumnarBatch {
786+
override def getSchema: StructType = SidecarFile.READ_SCHEMA
787+
788+
override def getColumnVector(ordinal: Int): ColumnVector = ordinal match {
789+
case 0 => stringVector(sidecars.map(_.getPath)) // path
790+
case 1 => longVector(sidecars.map(_.getSize): _*) // size
791+
case 2 => longVector(sidecars.map(_.getModificationTime): _*) // modification time
792+
}
793+
794+
override def getSize: Int = sidecars.length
795+
}
796+
797+
def singletonSidecarParquetIterator(sidecars: Seq[FileStatus], v2CheckpointFileName: String)
798+
: CloseableIterator[FileReadResult] = {
799+
val batch = buildSidecarBatch(sidecars)
800+
Utils.singletonCloseableIterator(new FileReadResult(batch, v2CheckpointFileName))
801+
}
802+
803+
// TODO: [delta-io/delta#4849] extend FileReadResult for JSON read result
804+
def singletonSidecarJsonIterator(sidecars: Seq[FileStatus]): CloseableIterator[ColumnarBatch] = {
805+
val batch = buildSidecarBatch(sidecars)
806+
Utils.singletonCloseableIterator(batch)
807+
}
794808
}
795809

796-
class MockSidecarParquetHandler(sidecars: Seq[FileStatus])
810+
class MockSidecarParquetHandler(sidecars: Seq[FileStatus], v2CheckpointFileName: String)
797811
extends BaseMockParquetHandler with SidecarIteratorProvider {
798812
override def readParquetFiles(
799813
fileIter: CloseableIterator[FileStatus],
800814
physicalSchema: StructType,
801-
predicate: Optional[Predicate]): CloseableIterator[ColumnarBatch] =
802-
singletonSidecarIterator(sidecars)
815+
predicate: Optional[Predicate]): CloseableIterator[FileReadResult] =
816+
singletonSidecarParquetIterator(sidecars, v2CheckpointFileName)
803817
}
804818

805819
class MockSidecarJsonHandler(sidecars: Seq[FileStatus])
@@ -809,7 +823,7 @@ class MockSidecarJsonHandler(sidecars: Seq[FileStatus])
809823
fileIter: CloseableIterator[FileStatus],
810824
physicalSchema: StructType,
811825
predicate: Optional[Predicate]): CloseableIterator[ColumnarBatch] =
812-
singletonSidecarIterator(sidecars)
826+
singletonSidecarJsonIterator(sidecars)
813827
}
814828

815829
class MockReadLastCheckpointFileJsonHandler(

kernel/kernel-api/src/test/scala/io/delta/kernel/test/MockEngineUtils.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ trait BaseMockParquetHandler extends ParquetHandler with MockEngineUtils {
107107
override def readParquetFiles(
108108
fileIter: CloseableIterator[FileStatus],
109109
physicalSchema: StructType,
110-
predicate: Optional[Predicate]): CloseableIterator[ColumnarBatch] =
110+
predicate: Optional[Predicate]): CloseableIterator[FileReadResult] =
111111
throw new UnsupportedOperationException("not supported in this test suite")
112112

113113
override def writeParquetFiles(

kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/engine/DefaultParquetHandler.java

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import io.delta.kernel.defaults.engine.fileio.FileIO;
2121
import io.delta.kernel.defaults.internal.parquet.ParquetFileReader;
2222
import io.delta.kernel.defaults.internal.parquet.ParquetFileWriter;
23+
import io.delta.kernel.engine.FileReadResult;
2324
import io.delta.kernel.engine.ParquetHandler;
2425
import io.delta.kernel.expressions.Column;
2526
import io.delta.kernel.expressions.Predicate;
@@ -46,14 +47,15 @@ public DefaultParquetHandler(FileIO fileIO) {
4647
}
4748

4849
@Override
49-
public CloseableIterator<ColumnarBatch> readParquetFiles(
50+
public CloseableIterator<FileReadResult> readParquetFiles(
5051
CloseableIterator<FileStatus> fileIter,
5152
StructType physicalSchema,
5253
Optional<Predicate> predicate)
5354
throws IOException {
54-
return new CloseableIterator<ColumnarBatch>() {
55+
return new CloseableIterator<FileReadResult>() {
5556
private final ParquetFileReader batchReader = new ParquetFileReader(fileIO);
5657
private CloseableIterator<ColumnarBatch> currentFileReader;
58+
private String currentFilePath;
5759

5860
@Override
5961
public void close() throws IOException {
@@ -70,8 +72,11 @@ public boolean hasNext() {
7072
// read.
7173
Utils.closeCloseables(currentFileReader);
7274
currentFileReader = null;
75+
currentFilePath = null;
7376
if (fileIter.hasNext()) {
74-
currentFileReader = batchReader.read(fileIter.next(), physicalSchema, predicate);
77+
FileStatus fileStatus = fileIter.next();
78+
currentFileReader = batchReader.read(fileStatus, physicalSchema, predicate);
79+
currentFilePath = fileStatus.getPath();
7580
return hasNext(); // recurse since it's possible the loaded file is empty
7681
} else {
7782
return false;
@@ -80,8 +85,8 @@ public boolean hasNext() {
8085
}
8186

8287
@Override
83-
public ColumnarBatch next() {
84-
return currentFileReader.next();
88+
public FileReadResult next() {
89+
return new FileReadResult(currentFileReader.next(), currentFilePath);
8590
}
8691
};
8792
}

kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/BenchmarkParallelCheckpointReading.java

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import io.delta.kernel.defaults.engine.hadoopio.HadoopFileIO;
2727
import io.delta.kernel.defaults.internal.parquet.ParquetFileReader;
2828
import io.delta.kernel.engine.Engine;
29+
import io.delta.kernel.engine.FileReadResult;
2930
import io.delta.kernel.engine.ParquetHandler;
3031
import io.delta.kernel.expressions.Predicate;
3132
import io.delta.kernel.internal.util.Utils;
@@ -177,17 +178,17 @@ static class ParallelParquetHandler extends DefaultParquetHandler {
177178
}
178179

179180
@Override
180-
public CloseableIterator<ColumnarBatch> readParquetFiles(
181+
public CloseableIterator<FileReadResult> readParquetFiles(
181182
CloseableIterator<FileStatus> fileIter,
182183
StructType physicalSchema,
183184
Optional<Predicate> predicate)
184185
throws IOException {
185-
return new CloseableIterator<ColumnarBatch>() {
186+
return new CloseableIterator<FileReadResult>() {
186187
// Executor service will be closed as part of the returned `CloseableIterator`'s
187188
// close method.
188189
private final ExecutorService executorService = newFixedThreadPool(numberOfParallelThreads);
189-
private Iterator<Future<List<ColumnarBatch>>> futuresIter;
190-
private Iterator<ColumnarBatch> currentBatchIter;
190+
private Iterator<Future<List<FileReadResult>>> futuresIter;
191+
private Iterator<FileReadResult> currentBatchIter;
191192

192193
@Override
193194
public void close() throws IOException {
@@ -213,15 +214,15 @@ public boolean hasNext() {
213214
}
214215

215216
@Override
216-
public ColumnarBatch next() {
217+
public FileReadResult next() {
217218
return currentBatchIter.next();
218219
}
219220

220221
private void submitReadRequestsIfNotDone() {
221222
if (futuresIter != null) {
222223
return;
223224
}
224-
List<Future<List<ColumnarBatch>>> futures = new ArrayList<>();
225+
List<Future<List<FileReadResult>>> futures = new ArrayList<>();
225226
while (fileIter.hasNext()) {
226227
futures.add(
227228
executorService.submit(() -> parquetFileReader(fileIter.next(), physicalSchema)));
@@ -231,13 +232,13 @@ private void submitReadRequestsIfNotDone() {
231232
};
232233
}
233234

234-
List<ColumnarBatch> parquetFileReader(FileStatus fileStatus, StructType readSchema) {
235+
List<FileReadResult> parquetFileReader(FileStatus fileStatus, StructType readSchema) {
235236
ParquetFileReader reader = new ParquetFileReader(fileIO);
236237
try (CloseableIterator<ColumnarBatch> batchIter =
237238
reader.read(fileStatus, readSchema, Optional.empty())) {
238-
List<ColumnarBatch> batches = new ArrayList<>();
239+
List<FileReadResult> batches = new ArrayList<>();
239240
while (batchIter.hasNext()) {
240-
batches.add(batchIter.next());
241+
batches.add(new FileReadResult(batchIter.next(), fileStatus.getPath()));
241242
}
242243
return batches;
243244
} catch (IOException e) {

kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/LogReplayBaseSuite.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import io.delta.kernel.defaults.engine.fileio.FileIO
2727
import io.delta.kernel.defaults.engine.hadoopio.HadoopFileIO
2828
import io.delta.kernel.defaults.utils.TestUtils
2929
import io.delta.kernel.engine.{Engine, ExpressionHandler, FileSystemClient}
30+
import io.delta.kernel.engine.FileReadResult
3031
import io.delta.kernel.expressions.Predicate
3132
import io.delta.kernel.internal.checkpoints.Checkpointer
3233
import io.delta.kernel.internal.fs.Path
@@ -258,7 +259,7 @@ class MetricsParquetHandler(fileIO: FileIO)
258259
override def readParquetFiles(
259260
fileIter: CloseableIterator[FileStatus],
260261
physicalSchema: StructType,
261-
predicate: Optional[Predicate]): CloseableIterator[ColumnarBatch] = {
262+
predicate: Optional[Predicate]): CloseableIterator[FileReadResult] = {
262263
val fileReadSet = fileIter.toSeq
263264
checkpointReadRequestSizes += fileReadSet.size
264265
super.readParquetFiles(

0 commit comments

Comments
 (0)