Skip to content

Commit 911f504

Browse files
authored
[Kernel] [Pagination] New Page Token Class (delta-io#4848)
<!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md 2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP] Your PR title ...'. 3. Be sure to keep the PR description updated to reflect all changes. 4. Please write your PR title to summarize what this PR proposes. 5. If possible, provide a concise example to reproduce the issue for a faster review. 6. If applicable, include the corresponding issue number in the PR title and link it in the body. --> #### Which Delta project/connector is this regarding? <!-- Please add the component selected below to the beginning of the pull request title For example: [Spark] Title of my pull request --> - [ ] Spark - [ ] Standalone - [ ] Flink - [x] Kernel - [ ] Other (fill in here) ## Description Introduce a new page token class (for pagination). <!-- - Describe what this PR changes. - Describe why we need the change. If this PR resolves an issue be sure to include "Resolves #XXX" to correctly link and close the issue upon merge. --> ## How was this patch tested? PageTokenSuite.scala <!-- If tests were added, say they were added here. Please make sure to test the changes thoroughly including negative and positive cases if possible. If the changes were tested in any way other than unit tests, please clarify how you tested step by step (ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future). If the changes were not tested, please explain why. --> ## Does this PR introduce _any_ user-facing changes? No. <!-- If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible. If possible, please also clarify if this is a user-facing change compared to the released Delta Lake versions or within the unreleased branches such as master. If no, write 'No'. -->
1 parent 0fe4b23 commit 911f504

File tree

2 files changed

+347
-0
lines changed

2 files changed

+347
-0
lines changed
Lines changed: 180 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,180 @@
1+
/*
2+
* Copyright (2025) The Delta Lake Project Authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.delta.kernel.internal.replay;
17+
18+
import static io.delta.kernel.internal.util.Preconditions.checkArgument;
19+
import static java.util.Objects.requireNonNull;
20+
21+
import io.delta.kernel.data.Row;
22+
import io.delta.kernel.internal.data.GenericRow;
23+
import io.delta.kernel.types.*;
24+
import java.util.HashMap;
25+
import java.util.Map;
26+
import java.util.Objects;
27+
import java.util.Optional;
28+
29+
/** Page Token Class for Pagination Support */
30+
public class PageToken {
31+
32+
public static PageToken fromRow(Row row) {
33+
requireNonNull(row);
34+
35+
// Check #1: Correct schema
36+
checkArgument(
37+
PAGE_TOKEN_SCHEMA.equals(row.getSchema()),
38+
String.format(
39+
"Invalid Page Token: input row schema does not match expected PageToken schema."
40+
+ "\nExpected: %s\nGot: %s",
41+
PAGE_TOKEN_SCHEMA, row.getSchema()));
42+
43+
// Check #2: All required fields are present
44+
for (int i = 0; i < PAGE_TOKEN_SCHEMA.length(); i++) {
45+
if (PAGE_TOKEN_SCHEMA.at(i).getName().equals("lastReadSidecarFileIdx")) continue;
46+
checkArgument(
47+
!row.isNullAt(i),
48+
String.format(
49+
"Invalid Page Token: required field '%s' is null at index %d",
50+
PAGE_TOKEN_SCHEMA.at(i).getName(), i));
51+
}
52+
53+
return new PageToken(
54+
row.getString(0), // lastReadLogFileName
55+
row.getLong(1), // lastReturnedRowIndex
56+
Optional.ofNullable(row.isNullAt(2) ? null : row.getLong(2)), // lastReadSidecarFileIdx
57+
row.getString(3), // kernelVersion
58+
row.getString(4), // tablePath
59+
row.getLong(5), // tableVersion
60+
row.getLong(6), // predicateHash
61+
row.getLong(7)); // logSegmentHash
62+
}
63+
64+
public static final StructType PAGE_TOKEN_SCHEMA =
65+
new StructType()
66+
.add("lastReadLogFileName", StringType.STRING, false /* nullable */)
67+
.add("lastReturnedRowIndex", LongType.LONG, false /* nullable */)
68+
.add("lastReadSidecarFileIdx", LongType.LONG, true /* nullable */)
69+
.add("kernelVersion", StringType.STRING, false /* nullable */)
70+
.add("tablePath", StringType.STRING, false /* nullable */)
71+
.add("tableVersion", LongType.LONG, false /* nullable */)
72+
.add("predicateHash", LongType.LONG, false /* nullable */)
73+
.add("logSegmentHash", LongType.LONG, false /* nullable */);
74+
75+
// ===== Variables to mark where the last page ended (and the current page starts) =====
76+
77+
/** The last log file read in the previous page. */
78+
private final String lastReadLogFileName;
79+
80+
/**
81+
* The index of the last row that was returned from the last read log file during the previous
82+
* page. This row index is relative to the file. The current page should begin from the row
83+
* immediately after this row index.
84+
*/
85+
private final long lastReturnedRowIndex;
86+
87+
/**
88+
* Optional index of the last sidecar checkpoint file read in the previous page. This index is
89+
* based on the ordering of sidecar files in the V2 manifest checkpoint file. If present, it must
90+
* represent the final sidecar file that was read and must correspond to the same file as
91+
* `lastReadLogFileName`.
92+
*/
93+
private final Optional<Long> lastReadSidecarFileIdx;
94+
95+
// ===== Variables for validating query params and detecting changes in log segment =====
96+
private final String kernelVersion;
97+
private final String tablePath;
98+
private final long tableVersion;
99+
private final long predicateHash;
100+
private final long logSegmentHash;
101+
102+
public PageToken(
103+
String lastReadLogFileName,
104+
long lastReturnedRowIndex,
105+
Optional<Long> lastReadSidecarFileIdx,
106+
String kernelVersion,
107+
String tablePath,
108+
long tableVersion,
109+
long predicateHash,
110+
long logSegmentHash) {
111+
this.lastReadLogFileName = requireNonNull(lastReadLogFileName, "lastReadLogFileName is null");
112+
this.lastReturnedRowIndex = lastReturnedRowIndex;
113+
this.lastReadSidecarFileIdx = lastReadSidecarFileIdx;
114+
this.kernelVersion = requireNonNull(kernelVersion, "kernelVersion is null");
115+
this.tablePath = requireNonNull(tablePath, "tablePath is null");
116+
this.tableVersion = tableVersion;
117+
this.predicateHash = predicateHash;
118+
this.logSegmentHash = logSegmentHash;
119+
}
120+
121+
public Row toRow() {
122+
Map<Integer, Object> pageTokenMap = new HashMap<>();
123+
pageTokenMap.put(0, lastReadLogFileName);
124+
pageTokenMap.put(1, lastReturnedRowIndex);
125+
pageTokenMap.put(2, lastReadSidecarFileIdx.orElse(null));
126+
pageTokenMap.put(3, kernelVersion);
127+
pageTokenMap.put(4, tablePath);
128+
pageTokenMap.put(5, tableVersion);
129+
pageTokenMap.put(6, predicateHash);
130+
pageTokenMap.put(7, logSegmentHash);
131+
132+
return new GenericRow(PAGE_TOKEN_SCHEMA, pageTokenMap);
133+
}
134+
135+
public String getLastReadLogFileName() {
136+
return lastReadLogFileName;
137+
}
138+
139+
public long getLastReturnedRowIndex() {
140+
return lastReturnedRowIndex;
141+
}
142+
143+
public Optional<Long> getLastReadSidecarFileIdx() {
144+
return lastReadSidecarFileIdx;
145+
}
146+
147+
@Override
148+
public boolean equals(Object obj) {
149+
if (this == obj) {
150+
return true;
151+
}
152+
if (obj == null || getClass() != obj.getClass()) {
153+
return false;
154+
}
155+
156+
PageToken other = (PageToken) obj;
157+
158+
return lastReturnedRowIndex == other.lastReturnedRowIndex
159+
&& tableVersion == other.tableVersion
160+
&& predicateHash == other.predicateHash
161+
&& logSegmentHash == other.logSegmentHash
162+
&& Objects.equals(lastReadSidecarFileIdx, other.lastReadSidecarFileIdx)
163+
&& Objects.equals(lastReadLogFileName, other.lastReadLogFileName)
164+
&& Objects.equals(kernelVersion, other.kernelVersion)
165+
&& Objects.equals(tablePath, other.tablePath);
166+
}
167+
168+
@Override
169+
public int hashCode() {
170+
return Objects.hash(
171+
lastReadLogFileName,
172+
lastReturnedRowIndex,
173+
lastReadSidecarFileIdx,
174+
kernelVersion,
175+
tablePath,
176+
tableVersion,
177+
predicateHash,
178+
logSegmentHash);
179+
}
180+
}
Lines changed: 167 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,167 @@
1+
/*
2+
* Copyright (2025) The Delta Lake Project Authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.delta.kernel.internal
17+
18+
import java.util
19+
import java.util.{HashMap, Map}
20+
import java.util.Optional
21+
22+
import scala.collection.JavaConverters._
23+
24+
import io.delta.kernel.data.Row
25+
import io.delta.kernel.internal.annotation.VisibleForTesting
26+
import io.delta.kernel.internal.data.GenericRow
27+
import io.delta.kernel.internal.replay.PageToken
28+
import io.delta.kernel.test.MockFileSystemClientUtils
29+
import io.delta.kernel.types._
30+
31+
import org.scalatest.funsuite.AnyFunSuite
32+
33+
class PageTokenSuite extends AnyFunSuite with MockFileSystemClientUtils {
34+
35+
private val TEST_FILE_NAME = "test_file.json"
36+
private val TEST_ROW_INDEX = 42L
37+
private val TEST_SIDECAR_INDEX = Optional.of(java.lang.Long.valueOf(5L))
38+
private val TEST_KERNEL_VERSION = "4.0.0"
39+
private val TEST_TABLE_PATH = "/path/to/table"
40+
private val TEST_TABLE_VERSION = 5L
41+
private val TEST_PREDICATE_HASH = 123L
42+
private val TEST_LOG_SEGMENT_HASH = 456L
43+
44+
private val expectedPageToken = new PageToken(
45+
TEST_FILE_NAME,
46+
TEST_ROW_INDEX,
47+
TEST_SIDECAR_INDEX,
48+
TEST_KERNEL_VERSION,
49+
TEST_TABLE_PATH,
50+
TEST_TABLE_VERSION,
51+
TEST_PREDICATE_HASH,
52+
TEST_LOG_SEGMENT_HASH)
53+
54+
private val rowData: Map[Integer, Object] = new HashMap()
55+
rowData.put(0, TEST_FILE_NAME)
56+
rowData.put(1, TEST_ROW_INDEX.asInstanceOf[Object])
57+
rowData.put(2, TEST_SIDECAR_INDEX.get())
58+
rowData.put(3, TEST_KERNEL_VERSION)
59+
rowData.put(4, TEST_TABLE_PATH)
60+
rowData.put(5, TEST_TABLE_VERSION.asInstanceOf[Object])
61+
rowData.put(6, TEST_PREDICATE_HASH.asInstanceOf[Object])
62+
rowData.put(7, TEST_LOG_SEGMENT_HASH.asInstanceOf[Object])
63+
64+
val expectedRow = new GenericRow(PageToken.PAGE_TOKEN_SCHEMA, rowData)
65+
66+
test("PageToken.fromRow with valid data") {
67+
val pageToken = PageToken.fromRow(expectedRow)
68+
assert(pageToken.equals(expectedPageToken))
69+
}
70+
71+
test("PageToken.toRow with valid data") {
72+
val row = expectedPageToken.toRow
73+
assert(row.getSchema.equals(PageToken.PAGE_TOKEN_SCHEMA))
74+
75+
assert(row.getString(0) == TEST_FILE_NAME)
76+
assert(row.getLong(1) == TEST_ROW_INDEX)
77+
assert(Optional.of(row.getLong(2)) == TEST_SIDECAR_INDEX)
78+
assert(row.getString(3) == TEST_KERNEL_VERSION)
79+
assert(row.getString(4) == TEST_TABLE_PATH)
80+
assert(row.getLong(5) == TEST_TABLE_VERSION)
81+
assert(row.getLong(6) == TEST_PREDICATE_HASH)
82+
assert(row.getLong(7) == TEST_LOG_SEGMENT_HASH)
83+
}
84+
85+
test("E2E: PageToken round-trip: toRow -> fromRow") {
86+
val row = expectedPageToken.toRow
87+
val reconstructedPageToken = PageToken.fromRow(row)
88+
assert(reconstructedPageToken.equals(expectedPageToken))
89+
}
90+
91+
test("PageToken.fromRow throws exception when input row schema has invalid field name") {
92+
val invalidSchema = new StructType()
93+
.add("wrongFieldName", StringType.STRING)
94+
.add("lastReturnedRowIndex", LongType.LONG)
95+
.add("lastReadSidecarFileIdx", LongType.LONG)
96+
.add("kernelVersion", StringType.STRING)
97+
.add("tablePath", StringType.STRING)
98+
.add("tableVersion", LongType.LONG)
99+
.add("predicateHash", LongType.LONG)
100+
.add("logSegmentHash", LongType.LONG)
101+
102+
val invalidRowData: Map[Integer, Object] = new HashMap()
103+
invalidRowData.put(0, TEST_FILE_NAME)
104+
invalidRowData.put(1, TEST_ROW_INDEX.asInstanceOf[Object])
105+
invalidRowData.put(2, TEST_SIDECAR_INDEX)
106+
invalidRowData.put(3, TEST_KERNEL_VERSION)
107+
invalidRowData.put(4, TEST_TABLE_PATH)
108+
invalidRowData.put(5, TEST_TABLE_VERSION.asInstanceOf[Object])
109+
invalidRowData.put(6, TEST_PREDICATE_HASH.asInstanceOf[Object])
110+
invalidRowData.put(7, TEST_LOG_SEGMENT_HASH.asInstanceOf[Object])
111+
112+
val row = new GenericRow(invalidSchema, invalidRowData)
113+
val exception = intercept[IllegalArgumentException] {
114+
PageToken.fromRow(row)
115+
}
116+
assert(exception.getMessage.contains(
117+
"Invalid Page Token: input row schema does not match expected PageToken schema"))
118+
}
119+
120+
test("PageToken.fromRow throws exception when input row schema has wrong data type") {
121+
val invalidSchema = new StructType()
122+
.add("lastReadLogFileName", StringType.STRING)
123+
.add("lastReturnedRowIndex", LongType.LONG)
124+
.add("lastReadSidecarFileIdx", StringType.STRING) // should be long type
125+
.add("kernelVersion", StringType.STRING)
126+
.add("tablePath", StringType.STRING)
127+
.add("tableVersion", LongType.LONG)
128+
.add("predicateHash", LongType.LONG)
129+
.add("logSegmentHash", LongType.LONG)
130+
131+
val invalidRowData: Map[Integer, Object] = new HashMap()
132+
invalidRowData.put(0, TEST_FILE_NAME)
133+
invalidRowData.put(1, TEST_ROW_INDEX.asInstanceOf[Object])
134+
invalidRowData.put(2, TEST_SIDECAR_INDEX)
135+
invalidRowData.put(3, TEST_KERNEL_VERSION)
136+
invalidRowData.put(4, TEST_TABLE_PATH)
137+
invalidRowData.put(5, TEST_TABLE_VERSION.asInstanceOf[Object])
138+
invalidRowData.put(6, TEST_PREDICATE_HASH.asInstanceOf[Object])
139+
invalidRowData.put(7, TEST_LOG_SEGMENT_HASH.asInstanceOf[Object])
140+
141+
val row = new GenericRow(invalidSchema, invalidRowData)
142+
val exception = intercept[IllegalArgumentException] {
143+
PageToken.fromRow(row)
144+
}
145+
assert(exception.getMessage.contains(
146+
"Invalid Page Token: input row schema does not match expected PageToken schema"))
147+
}
148+
149+
test("PageToken.fromRow accepts the case sidecar field is null") {
150+
val nullSidecarData: Map[Integer, Object] = new HashMap(rowData)
151+
nullSidecarData.put(2, null)
152+
val nullSidecarRow = new GenericRow(PageToken.PAGE_TOKEN_SCHEMA, nullSidecarData)
153+
val pageToken = PageToken.fromRow(nullSidecarRow)
154+
assert(pageToken.getLastReadSidecarFileIdx == Optional.empty())
155+
}
156+
157+
test("PageToken.fromRow throws exception when required field is null") {
158+
val invalidData: Map[Integer, Object] = new HashMap(rowData)
159+
invalidData.put(3, null)
160+
val invalidRow = new GenericRow(PageToken.PAGE_TOKEN_SCHEMA, invalidData)
161+
val exception = intercept[IllegalArgumentException] {
162+
PageToken.fromRow(invalidRow)
163+
}
164+
assert(exception.getMessage.contains(
165+
"Invalid Page Token: required field"))
166+
}
167+
}

0 commit comments

Comments
 (0)