Skip to content

Commit e024622

Browse files
authored
[Kernel] [Pagination] Add PaginationScanImpl Class (delta-io#4880)
1 parent 85740df commit e024622

File tree

6 files changed

+170
-12
lines changed

6 files changed

+170
-12
lines changed
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
/*
2+
* Copyright (2025) The Delta Lake Project Authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.delta.kernel.internal;
18+
19+
import io.delta.kernel.PaginatedScan;
20+
import io.delta.kernel.PaginatedScanFilesIterator;
21+
import io.delta.kernel.data.FilteredColumnarBatch;
22+
import io.delta.kernel.data.Row;
23+
import io.delta.kernel.engine.Engine;
24+
import io.delta.kernel.expressions.Predicate;
25+
import io.delta.kernel.internal.replay.PageToken;
26+
import io.delta.kernel.internal.replay.PaginatedScanFilesIteratorImpl;
27+
import io.delta.kernel.internal.replay.PaginationContext;
28+
import io.delta.kernel.utils.CloseableIterator;
29+
import java.util.Optional;
30+
31+
/** Implementation of {@link PaginatedScan} */
32+
public class PaginatedScanImpl implements PaginatedScan {
33+
private final long pageSize;
34+
private final Optional<PageToken> pageTokenOpt;
35+
private final ScanImpl baseScan;
36+
37+
public PaginatedScanImpl(ScanImpl baseScan, Optional<Row> pageTokenRowOpt, long pageSize) {
38+
this.baseScan = baseScan;
39+
this.pageTokenOpt = pageTokenRowOpt.map(PageToken::fromRow);
40+
this.pageSize = pageSize;
41+
}
42+
43+
@Override
44+
public Optional<Predicate> getRemainingFilter() {
45+
return baseScan.getRemainingFilter();
46+
}
47+
48+
@Override
49+
public Row getScanState(Engine engine) {
50+
return baseScan.getScanState(engine);
51+
}
52+
53+
@Override
54+
public PaginatedScanFilesIterator getScanFiles(Engine engine) {
55+
return this.getScanFiles(engine, false /* include stats */);
56+
}
57+
58+
public PaginatedScanFilesIterator getScanFiles(Engine engine, boolean includeStates) {
59+
PaginationContext paginationContext =
60+
pageTokenOpt
61+
.map(token -> PaginationContext.forPageWithPageToken(pageSize, token))
62+
.orElseGet(() -> PaginationContext.forFirstPage(pageSize));
63+
CloseableIterator<FilteredColumnarBatch> filteredScanFilesIter =
64+
baseScan.getScanFiles(engine, includeStates, Optional.of(paginationContext));
65+
return new PaginatedScanFilesIteratorImpl(filteredScanFilesIter, paginationContext);
66+
}
67+
}

kernel/kernel-api/src/main/java/io/delta/kernel/internal/ScanBuilderImpl.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
package io.delta.kernel.internal;
1818

1919
import io.delta.kernel.PaginatedScan;
20-
import io.delta.kernel.Scan;
2120
import io.delta.kernel.ScanBuilder;
2221
import io.delta.kernel.data.Row;
2322
import io.delta.kernel.expressions.Predicate;
@@ -76,7 +75,7 @@ public ScanBuilder withReadSchema(StructType readSchema) {
7675
}
7776

7877
@Override
79-
public Scan build() {
78+
public ScanImpl build() {
8079
return new ScanImpl(
8180
snapshotSchema,
8281
readSchema,
@@ -89,7 +88,8 @@ public Scan build() {
8988
}
9089

9190
@Override
92-
public PaginatedScan buildPaginated(long pageSize, Optional<Row> pageToken) {
93-
throw new UnsupportedOperationException("not implemented");
91+
public PaginatedScan buildPaginated(long pageSize, Optional<Row> pageTokenRowOpt) {
92+
ScanImpl baseScan = this.build();
93+
return new PaginatedScanImpl(baseScan, pageTokenRowOpt, pageSize);
9494
}
9595
}

kernel/kernel-api/src/main/java/io/delta/kernel/internal/ScanImpl.java

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import io.delta.kernel.internal.metrics.ScanReportImpl;
3535
import io.delta.kernel.internal.metrics.Timer;
3636
import io.delta.kernel.internal.replay.LogReplay;
37+
import io.delta.kernel.internal.replay.PaginationContext;
3738
import io.delta.kernel.internal.skipping.DataSkippingPredicate;
3839
import io.delta.kernel.internal.skipping.DataSkippingUtils;
3940
import io.delta.kernel.internal.util.*;
@@ -102,7 +103,17 @@ public ScanImpl(
102103
*/
103104
@Override
104105
public CloseableIterator<FilteredColumnarBatch> getScanFiles(Engine engine) {
105-
return getScanFiles(engine, false);
106+
return getScanFiles(engine, false /* includeStats */);
107+
}
108+
109+
/**
110+
* Get an iterator of data files in this version of scan that survived the predicate pruning.
111+
*
112+
* @return data in {@link ColumnarBatch} batch format. Each row correspond to one survived file.
113+
*/
114+
public CloseableIterator<FilteredColumnarBatch> getScanFiles(
115+
Engine engine, boolean includeStats) {
116+
return getScanFiles(engine, includeStats, Optional.empty() /* paginationContextOpt */);
106117
}
107118

108119
/**
@@ -115,10 +126,11 @@ public CloseableIterator<FilteredColumnarBatch> getScanFiles(Engine engine) {
115126
*
116127
* @param engine the {@link Engine} instance to use
117128
* @param includeStats whether to read and include the JSON statistics
129+
* @param paginationContextOpt pagination context if present
118130
* @return the surviving scan files as {@link FilteredColumnarBatch}s
119131
*/
120-
public CloseableIterator<FilteredColumnarBatch> getScanFiles(
121-
Engine engine, boolean includeStats) {
132+
protected CloseableIterator<FilteredColumnarBatch> getScanFiles(
133+
Engine engine, boolean includeStats, Optional<PaginationContext> paginationContextOpt) {
122134
if (accessedScanFiles) {
123135
throw new IllegalStateException("Scan files are already fetched from this instance");
124136
}
@@ -165,7 +177,8 @@ public CloseableIterator<FilteredColumnarBatch> getScanFiles(
165177
predicate ->
166178
rewritePartitionPredicateOnCheckpointFileSchema(
167179
predicate, partitionColToStructFieldMap.get())),
168-
scanMetrics);
180+
scanMetrics,
181+
paginationContextOpt);
169182

170183
// Apply partition pruning
171184
scanFileIter = applyPartitionPruning(engine, scanFileIter);

kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/ActionsIterator.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -84,22 +84,31 @@ public class ActionsIterator implements CloseableIterator<ActionWrapper> {
8484
*/
8585
private Optional<CloseableIterator<ActionWrapper>> actionsIter;
8686

87+
private final Optional<PaginationContext> paginationContextOpt;
88+
8789
private boolean closed;
8890

8991
public ActionsIterator(
9092
Engine engine,
9193
List<FileStatus> files,
9294
StructType deltaReadSchema,
9395
Optional<Predicate> checkpointPredicate) {
94-
this(engine, files, deltaReadSchema, deltaReadSchema, checkpointPredicate);
96+
this(
97+
engine,
98+
files,
99+
deltaReadSchema,
100+
deltaReadSchema,
101+
checkpointPredicate,
102+
Optional.empty() /* paginationContextOpt */);
95103
}
96104

97105
public ActionsIterator(
98106
Engine engine,
99107
List<FileStatus> files,
100108
StructType deltaReadSchema,
101109
StructType checkpointReadSchema,
102-
Optional<Predicate> checkpointPredicate) {
110+
Optional<Predicate> checkpointPredicate,
111+
Optional<PaginationContext> paginationContextOpt) {
103112
this.engine = engine;
104113
this.checkpointPredicate = checkpointPredicate;
105114
this.filesList = new LinkedList<>();
@@ -109,6 +118,7 @@ public ActionsIterator(
109118
this.checkpointReadSchema = checkpointReadSchema;
110119
this.actionsIter = Optional.empty();
111120
this.schemaContainsAddOrRemoveFiles = LogReplay.containsAddOrRemoveFileActions(deltaReadSchema);
121+
this.paginationContextOpt = paginationContextOpt;
112122
}
113123

114124
@Override

kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/LogReplay.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -215,7 +215,8 @@ public CloseableIterator<FilteredColumnarBatch> getAddFilesAsColumnarBatches(
215215
Engine engine,
216216
boolean shouldReadStats,
217217
Optional<Predicate> checkpointPredicate,
218-
ScanMetrics scanMetrics) {
218+
ScanMetrics scanMetrics,
219+
Optional<PaginationContext> paginationContextOpt) {
219220
// We do not need to look at any `remove` files from the checkpoints. Skip the column to save
220221
// I/O. Note that we are still going to process the row groups. Adds and removes are randomly
221222
// scattered through checkpoint part files, so row group push down is unlikely to be useful.
@@ -225,7 +226,8 @@ public CloseableIterator<FilteredColumnarBatch> getAddFilesAsColumnarBatches(
225226
getLogReplayFiles(getLogSegment()),
226227
getAddRemoveReadSchema(shouldReadStats),
227228
getAddReadSchema(shouldReadStats),
228-
checkpointPredicate);
229+
checkpointPredicate,
230+
paginationContextOpt);
229231
return new ActiveAddFilesIterator(engine, addRemoveIter, dataPath, scanMetrics);
230232
}
231233

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
/*
2+
* Copyright (2025) The Delta Lake Project Authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.delta.kernel.internal.replay;
17+
18+
import io.delta.kernel.PaginatedScanFilesIterator;
19+
import io.delta.kernel.data.FilteredColumnarBatch;
20+
import io.delta.kernel.data.Row;
21+
import io.delta.kernel.utils.CloseableIterator;
22+
import java.io.IOException;
23+
24+
/** Implementation of {@link PaginatedScanFilesIterator} */
25+
public class PaginatedScanFilesIteratorImpl implements PaginatedScanFilesIterator {
26+
27+
private final CloseableIterator<FilteredColumnarBatch> filteredScanFilesIter;
28+
private final long pageSize;
29+
30+
/**
31+
* Constructs a paginated iterator over scan files on top of a given filtered scan files iterator
32+
* and pagination context.
33+
*
34+
* @param filteredScanFilesIter The underlying scan files iterator with data skipping and
35+
* partition pruning applied. This iterator serves as the source of filtered scan results for
36+
* pagination.
37+
* @param paginationContext The pagination context that carries pagination-related information,
38+
* such as the maximum number of files to return in a page.
39+
*/
40+
public PaginatedScanFilesIteratorImpl(
41+
CloseableIterator<FilteredColumnarBatch> filteredScanFilesIter,
42+
PaginationContext paginationContext) {
43+
this.filteredScanFilesIter = filteredScanFilesIter;
44+
this.pageSize = paginationContext.getPageSize();
45+
}
46+
47+
@Override
48+
public Row getCurrentPageToken() {
49+
throw new UnsupportedOperationException("Not implemented");
50+
}
51+
52+
@Override
53+
public boolean hasNext() {
54+
throw new UnsupportedOperationException("Not implemented");
55+
}
56+
57+
@Override
58+
public FilteredColumnarBatch next() {
59+
throw new UnsupportedOperationException("Not implemented");
60+
}
61+
62+
@Override
63+
public void close() throws IOException {
64+
throw new UnsupportedOperationException("Not implemented");
65+
}
66+
}

0 commit comments

Comments
 (0)