Skip to content

Commit daf2a40

Browse files
authored
[Kernel] [Refactor] Add File Path to Action Wrapper (delta-io#4852)
## 🥞 Stacked PR Use this [link](https://github.com/delta-io/delta/pull/4852/files) to review incremental changes. - [**stack/refactor-batch**](delta-io#4852) [[Files changed](https://github.com/delta-io/delta/pull/4852/files)] - [stack/refactor-FB](delta-io#4853) [[Files changed](https://github.com/delta-io/delta/pull/4853/files/e335ce14517cecdec073fb2e535fa9a762801e2f..47d4e10a4e6bb9bf2f26e38f57740fd4b43278ed)] --------- <!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md 2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP] Your PR title ...'. 3. Be sure to keep the PR description updated to reflect all changes. 4. Please write your PR title to summarize what this PR proposes. 5. If possible, provide a concise example to reproduce the issue for a faster review. 6. If applicable, include the corresponding issue number in the PR title and link it in the body. --> #### Which Delta project/connector is this regarding? <!-- Please add the component selected below to the beginning of the pull request title For example: [Spark] Title of my pull request --> - [ ] Spark - [ ] Standalone - [ ] Flink - [x] Kernel - [ ] Other (fill in here) ## Description <!-- - Describe what this PR changes. - Describe why we need the change. If this PR resolves an issue be sure to include "Resolves #XXX" to correctly link and close the issue upon merge. --> Refactor Action wrapper -> put file Path associated with the ColumnarBatch to the action wrapper. ## How was this patch tested? <!-- If tests were added, say they were added here. Please make sure to test the changes thoroughly including negative and positive cases if possible. If the changes were tested in any way other than unit tests, please clarify how you tested step by step (ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future). If the changes were not tested, please explain why. --> ## Does this PR introduce _any_ user-facing changes? <!-- If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible. If possible, please also clarify if this is a user-facing change compared to the released Delta Lake versions or within the unreleased branches such as master. If no, write 'No'. -->
1 parent 8a1a88e commit daf2a40

File tree

2 files changed

+31
-16
lines changed

2 files changed

+31
-16
lines changed

kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/ActionWrapper.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,15 +23,21 @@ public class ActionWrapper {
2323
private final ColumnarBatch columnarBatch;
2424
private final boolean isFromCheckpoint;
2525
private final long version;
26+
private final String filePath;
2627
/* Timestamp of the commit file if isFromCheckpoint=false */
2728
private final Optional<Long> timestamp;
2829

2930
ActionWrapper(
30-
ColumnarBatch data, boolean isFromCheckpoint, long version, Optional<Long> timestamp) {
31+
ColumnarBatch data,
32+
boolean isFromCheckpoint,
33+
long version,
34+
Optional<Long> timestamp,
35+
String filePath) {
3136
this.columnarBatch = data;
3237
this.isFromCheckpoint = isFromCheckpoint;
3338
this.version = version;
3439
this.timestamp = timestamp;
40+
this.filePath = filePath;
3541
}
3642

3743
public ColumnarBatch getColumnarBatch() {
@@ -49,4 +55,8 @@ public long getVersion() {
4955
public Optional<Long> getTimestamp() {
5056
return timestamp;
5157
}
58+
59+
public String getFilePath() {
60+
return filePath;
61+
}
5262
}

kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/ActionsIterator.java

Lines changed: 20 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -337,8 +337,9 @@ private CloseableIterator<ActionWrapper> getNextActionsIter() {
337337
// If the checkpoint file is a UUID or classic checkpoint, read the top-level
338338
// checkpoint file and any potential sidecars. Otherwise, look for any other
339339
// parts of the current multipart checkpoint.
340-
CloseableIterator<ColumnarBatch> dataIter =
341-
getActionsIterFromSinglePartOrV2Checkpoint(nextFile, fileName);
340+
CloseableIterator<FileReadResult> dataIter =
341+
getActionsIterFromSinglePartOrV2Checkpoint(nextFile, fileName)
342+
.map(batch -> new FileReadResult(batch, nextFile.getPath()));
342343
long version = checkpointVersion(nextFilePath);
343344
return combine(dataIter, true /* isFromCheckpoint */, version, Optional.empty());
344345
}
@@ -350,13 +351,13 @@ private CloseableIterator<ActionWrapper> getNextActionsIter() {
350351
// optimizations like reading multiple files in parallel.
351352
CloseableIterator<FileStatus> checkpointFiles =
352353
retrieveRemainingCheckpointFiles(nextLogFile);
353-
CloseableIterator<ColumnarBatch> dataIter =
354+
CloseableIterator<FileReadResult> dataIter =
354355
wrapEngineExceptionThrowsIO(
355356
() ->
356357
engine
357358
.getParquetHandler()
358-
.readParquetFiles(checkpointFiles, deltaReadSchema, checkpointPredicate)
359-
.map(FileReadResult::getData),
359+
.readParquetFiles(
360+
checkpointFiles, deltaReadSchema, checkpointPredicate),
360361
"Reading checkpoint sidecars [%s] with readSchema=%s and predicate=%s",
361362
checkpointFiles,
362363
deltaReadSchema,
@@ -380,17 +381,17 @@ private CloseableIterator<ActionWrapper> readCommitOrCompactionFile(
380381
// version with actions read from the JSON file for further optimizations later
381382
// on (faster metadata & protocol loading in subsequent runs by remembering
382383
// the version of the last version where the metadata and protocol are found).
383-
final CloseableIterator<ColumnarBatch> dataIter =
384+
final CloseableIterator<FileReadResult> dataIter =
384385
wrapEngineExceptionThrowsIO(
385386
() ->
386387
engine
387388
.getJsonHandler()
388389
.readJsonFiles(
389-
singletonCloseableIterator(nextFile), deltaReadSchema, Optional.empty()),
390+
singletonCloseableIterator(nextFile), deltaReadSchema, Optional.empty())
391+
.map(batch -> new FileReadResult(batch, nextFile.getPath())),
390392
"Reading JSON log file `%s` with readSchema=%s",
391393
nextFile,
392394
deltaReadSchema);
393-
394395
return combine(
395396
dataIter,
396397
false /* isFromCheckpoint */,
@@ -406,7 +407,7 @@ private CloseableIterator<ActionWrapper> readCommitOrCompactionFile(
406407
* of the file.
407408
*/
408409
private CloseableIterator<ActionWrapper> combine(
409-
CloseableIterator<ColumnarBatch> fileReadDataIter,
410+
CloseableIterator<FileReadResult> fileReadDataIter,
410411
boolean isFromCheckpoint,
411412
long version,
412413
Optional<Long> timestamp) {
@@ -416,12 +417,14 @@ private CloseableIterator<ActionWrapper> combine(
416417
// enabled, we will read the first batch and try to extract the timestamp from it.
417418
// We also ensure that rewoundFileReadDataIter is identical to the original
418419
// fileReadDataIter before any data was consumed.
419-
final CloseableIterator<ColumnarBatch> rewoundFileReadDataIter;
420+
final CloseableIterator<FileReadResult> rewoundFileReadDataIter;
420421
Optional<Long> inCommitTimestampOpt = Optional.empty();
421422
if (!isFromCheckpoint && fileReadDataIter.hasNext()) {
422-
ColumnarBatch firstBatch = fileReadDataIter.next();
423-
rewoundFileReadDataIter = singletonCloseableIterator(firstBatch).combine(fileReadDataIter);
424-
inCommitTimestampOpt = InCommitTimestampUtils.tryExtractInCommitTimestamp(firstBatch);
423+
FileReadResult fileReadResult = fileReadDataIter.next();
424+
rewoundFileReadDataIter =
425+
singletonCloseableIterator(fileReadResult).combine(fileReadDataIter);
426+
inCommitTimestampOpt =
427+
InCommitTimestampUtils.tryExtractInCommitTimestamp(fileReadResult.getData());
425428
} else {
426429
rewoundFileReadDataIter = fileReadDataIter;
427430
}
@@ -436,11 +439,13 @@ public boolean hasNext() {
436439

437440
@Override
438441
public ActionWrapper next() {
442+
FileReadResult fileReadResult = rewoundFileReadDataIter.next();
439443
return new ActionWrapper(
440-
rewoundFileReadDataIter.next(),
444+
fileReadResult.getData(),
441445
isFromCheckpoint,
442446
version,
443-
finalResolvedCommitTimestamp);
447+
finalResolvedCommitTimestamp,
448+
fileReadResult.getFilePath());
444449
}
445450

446451
@Override

0 commit comments

Comments
 (0)