Skip to content

Commit 7a671c7

Browse files
authored
[refactor] Move #earlierThanTimeMills to TimeTravelUtil (#5197)
1 parent 7bfb13f commit 7a671c7

File tree

3 files changed

+119
-116
lines changed

3 files changed

+119
-116
lines changed

paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousFromTimestampStartingScanner.java

Lines changed: 6 additions & 112 deletions
Original file line numberDiff line numberDiff line change
@@ -19,20 +19,12 @@
1919
package org.apache.paimon.table.source.snapshot;
2020

2121
import org.apache.paimon.CoreOptions;
22-
import org.apache.paimon.Snapshot;
2322
import org.apache.paimon.utils.ChangelogManager;
24-
import org.apache.paimon.utils.FunctionWithException;
2523
import org.apache.paimon.utils.SnapshotManager;
2624

2725
import org.slf4j.Logger;
2826
import org.slf4j.LoggerFactory;
2927

30-
import javax.annotation.Nullable;
31-
32-
import java.io.FileNotFoundException;
33-
34-
import static org.apache.paimon.utils.SnapshotManager.EARLIEST_SNAPSHOT_DEFAULT_RETRY_NUM;
35-
3628
/**
3729
* {@link StartingScanner} for the {@link CoreOptions.StartupMode#FROM_TIMESTAMP} startup mode of a
3830
* streaming read.
@@ -56,7 +48,7 @@ public ContinuousFromTimestampStartingScanner(
5648
this.startupMillis = startupMillis;
5749
this.startFromChangelog = changelogDecoupled;
5850
this.startingSnapshotId =
59-
earlierThanTimeMills(
51+
TimeTravelUtil.earlierThanTimeMills(
6052
snapshotManager, changelogManager, startupMillis, startFromChangelog);
6153
}
6254

@@ -71,113 +63,15 @@ public StartingContext startingContext() {
7163

7264
@Override
7365
public Result scan(SnapshotReader snapshotReader) {
74-
Long startingSnapshotId =
75-
earlierThanTimeMills(
76-
snapshotManager, changelogManager, startupMillis, startFromChangelog);
66+
if (startingSnapshotId == null) {
67+
startingSnapshotId =
68+
TimeTravelUtil.earlierThanTimeMills(
69+
snapshotManager, changelogManager, startupMillis, startFromChangelog);
70+
}
7771
if (startingSnapshotId == null) {
7872
LOG.debug("There is currently no snapshot. Waiting for snapshot generation.");
7973
return new NoSnapshot();
8074
}
8175
return new NextSnapshot(startingSnapshotId + 1);
8276
}
83-
84-
/**
85-
* Returns the latest snapshot earlier than the timestamp mills. A non-existent snapshot may be
86-
* returned if all snapshots are equal to or later than the timestamp mills.
87-
*/
88-
public static @Nullable Long earlierThanTimeMills(
89-
SnapshotManager snapshotManager,
90-
ChangelogManager changelogManager,
91-
long timestampMills,
92-
boolean startFromChangelog) {
93-
Long latest = snapshotManager.latestSnapshotId();
94-
if (latest == null) {
95-
return null;
96-
}
97-
98-
Snapshot earliestSnapshot =
99-
earliestSnapshot(snapshotManager, changelogManager, startFromChangelog, latest);
100-
if (earliestSnapshot == null) {
101-
return latest - 1;
102-
}
103-
104-
if (earliestSnapshot.timeMillis() >= timestampMills) {
105-
return earliestSnapshot.id() - 1;
106-
}
107-
108-
long earliest = earliestSnapshot.id();
109-
while (earliest < latest) {
110-
long mid = (earliest + latest + 1) / 2;
111-
Snapshot snapshot =
112-
startFromChangelog
113-
? changelogOrSnapshot(snapshotManager, changelogManager, mid)
114-
: snapshotManager.snapshot(mid);
115-
if (snapshot.timeMillis() < timestampMills) {
116-
earliest = mid;
117-
} else {
118-
latest = mid - 1;
119-
}
120-
}
121-
return earliest;
122-
}
123-
124-
private static @Nullable Snapshot earliestSnapshot(
125-
SnapshotManager snapshotManager,
126-
ChangelogManager changelogManager,
127-
boolean includeChangelog,
128-
@Nullable Long stopSnapshotId) {
129-
Long snapshotId = null;
130-
if (includeChangelog) {
131-
snapshotId = changelogManager.earliestLongLivedChangelogId();
132-
}
133-
if (snapshotId == null) {
134-
snapshotId = snapshotManager.earliestSnapshotId();
135-
}
136-
if (snapshotId == null) {
137-
return null;
138-
}
139-
140-
if (stopSnapshotId == null) {
141-
stopSnapshotId = snapshotId + EARLIEST_SNAPSHOT_DEFAULT_RETRY_NUM;
142-
}
143-
144-
FunctionWithException<Long, Snapshot, FileNotFoundException> snapshotFunction =
145-
includeChangelog
146-
? s -> tryGetChangelogOrSnapshot(snapshotManager, changelogManager, s)
147-
: snapshotManager::tryGetSnapshot;
148-
149-
do {
150-
try {
151-
return snapshotFunction.apply(snapshotId);
152-
} catch (FileNotFoundException e) {
153-
snapshotId++;
154-
if (snapshotId > stopSnapshotId) {
155-
return null;
156-
}
157-
LOG.warn(
158-
"The earliest snapshot or changelog was once identified but disappeared. "
159-
+ "It might have been expired by other jobs operating on this table. "
160-
+ "Searching for the second earliest snapshot or changelog instead. ");
161-
}
162-
} while (true);
163-
}
164-
165-
private static Snapshot tryGetChangelogOrSnapshot(
166-
SnapshotManager snapshotManager, ChangelogManager changelogManager, long snapshotId)
167-
throws FileNotFoundException {
168-
if (changelogManager.longLivedChangelogExists(snapshotId)) {
169-
return changelogManager.tryGetChangelog(snapshotId);
170-
} else {
171-
return snapshotManager.tryGetSnapshot(snapshotId);
172-
}
173-
}
174-
175-
private static Snapshot changelogOrSnapshot(
176-
SnapshotManager snapshotManager, ChangelogManager changelogManager, long snapshotId) {
177-
if (changelogManager.longLivedChangelogExists(snapshotId)) {
178-
return changelogManager.changelog(snapshotId);
179-
} else {
180-
return snapshotManager.snapshot(snapshotId);
181-
}
182-
}
18377
}

paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/TimeTravelUtil.java

Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,19 +23,30 @@
2323
import org.apache.paimon.schema.SchemaManager;
2424
import org.apache.paimon.schema.TableSchema;
2525
import org.apache.paimon.table.FileStoreTable;
26+
import org.apache.paimon.utils.ChangelogManager;
27+
import org.apache.paimon.utils.FunctionWithException;
2628
import org.apache.paimon.utils.SnapshotManager;
2729
import org.apache.paimon.utils.SnapshotNotExistException;
2830
import org.apache.paimon.utils.TagManager;
2931

32+
import org.slf4j.Logger;
33+
import org.slf4j.LoggerFactory;
34+
35+
import javax.annotation.Nullable;
36+
37+
import java.io.FileNotFoundException;
3038
import java.util.ArrayList;
3139
import java.util.List;
3240

3341
import static org.apache.paimon.CoreOptions.SCAN_SNAPSHOT_ID;
3442
import static org.apache.paimon.utils.Preconditions.checkArgument;
43+
import static org.apache.paimon.utils.SnapshotManager.EARLIEST_SNAPSHOT_DEFAULT_RETRY_NUM;
3544

3645
/** The util class of resolve snapshot from scan params for time travel. */
3746
public class TimeTravelUtil {
3847

48+
private static final Logger LOG = LoggerFactory.getLogger(TimeTravelUtil.class);
49+
3950
private static final String[] SCAN_KEYS = {
4051
CoreOptions.SCAN_SNAPSHOT_ID.key(),
4152
CoreOptions.SCAN_TAG_NAME.key(),
@@ -127,6 +138,106 @@ private static Snapshot resolveSnapshotByTagName(
127138
return tagManager.getOrThrow(tagName).trimToSnapshot();
128139
}
129140

141+
/**
142+
* Returns the latest snapshot earlier than the timestamp mills. A non-existent snapshot may be
143+
* returned if all snapshots are equal to or later than the timestamp mills.
144+
*/
145+
public static @Nullable Long earlierThanTimeMills(
146+
SnapshotManager snapshotManager,
147+
ChangelogManager changelogManager,
148+
long timestampMills,
149+
boolean startFromChangelog) {
150+
Long latest = snapshotManager.latestSnapshotId();
151+
if (latest == null) {
152+
return null;
153+
}
154+
155+
Snapshot earliestSnapshot =
156+
earliestSnapshot(snapshotManager, changelogManager, startFromChangelog, latest);
157+
if (earliestSnapshot == null) {
158+
return latest - 1;
159+
}
160+
161+
if (earliestSnapshot.timeMillis() >= timestampMills) {
162+
return earliestSnapshot.id() - 1;
163+
}
164+
165+
long earliest = earliestSnapshot.id();
166+
while (earliest < latest) {
167+
long mid = (earliest + latest + 1) / 2;
168+
Snapshot snapshot =
169+
startFromChangelog
170+
? changelogOrSnapshot(snapshotManager, changelogManager, mid)
171+
: snapshotManager.snapshot(mid);
172+
if (snapshot.timeMillis() < timestampMills) {
173+
earliest = mid;
174+
} else {
175+
latest = mid - 1;
176+
}
177+
}
178+
return earliest;
179+
}
180+
181+
private static @Nullable Snapshot earliestSnapshot(
182+
SnapshotManager snapshotManager,
183+
ChangelogManager changelogManager,
184+
boolean includeChangelog,
185+
@Nullable Long stopSnapshotId) {
186+
Long snapshotId = null;
187+
if (includeChangelog) {
188+
snapshotId = changelogManager.earliestLongLivedChangelogId();
189+
}
190+
if (snapshotId == null) {
191+
snapshotId = snapshotManager.earliestSnapshotId();
192+
}
193+
if (snapshotId == null) {
194+
return null;
195+
}
196+
197+
if (stopSnapshotId == null) {
198+
stopSnapshotId = snapshotId + EARLIEST_SNAPSHOT_DEFAULT_RETRY_NUM;
199+
}
200+
201+
FunctionWithException<Long, Snapshot, FileNotFoundException> snapshotFunction =
202+
includeChangelog
203+
? s -> tryGetChangelogOrSnapshot(snapshotManager, changelogManager, s)
204+
: snapshotManager::tryGetSnapshot;
205+
206+
do {
207+
try {
208+
return snapshotFunction.apply(snapshotId);
209+
} catch (FileNotFoundException e) {
210+
snapshotId++;
211+
if (snapshotId > stopSnapshotId) {
212+
return null;
213+
}
214+
LOG.warn(
215+
"The earliest snapshot or changelog was once identified but disappeared. "
216+
+ "It might have been expired by other jobs operating on this table. "
217+
+ "Searching for the second earliest snapshot or changelog instead. ");
218+
}
219+
} while (true);
220+
}
221+
222+
private static Snapshot tryGetChangelogOrSnapshot(
223+
SnapshotManager snapshotManager, ChangelogManager changelogManager, long snapshotId)
224+
throws FileNotFoundException {
225+
if (changelogManager.longLivedChangelogExists(snapshotId)) {
226+
return changelogManager.tryGetChangelog(snapshotId);
227+
} else {
228+
return snapshotManager.tryGetSnapshot(snapshotId);
229+
}
230+
}
231+
232+
private static Snapshot changelogOrSnapshot(
233+
SnapshotManager snapshotManager, ChangelogManager changelogManager, long snapshotId) {
234+
if (changelogManager.longLivedChangelogExists(snapshotId)) {
235+
return changelogManager.changelog(snapshotId);
236+
} else {
237+
return snapshotManager.snapshot(snapshotId);
238+
}
239+
}
240+
130241
public static void checkRescaleBucketForIncrementalTagQuery(
131242
SchemaManager schemaManager, Snapshot start, Snapshot end) {
132243
if (start.schemaId() != end.schemaId()) {

paimon-core/src/test/java/org/apache/paimon/utils/SnapshotManagerTest.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
import org.apache.paimon.fs.FileIO;
2424
import org.apache.paimon.fs.Path;
2525
import org.apache.paimon.fs.local.LocalFileIO;
26-
import org.apache.paimon.table.source.snapshot.ContinuousFromTimestampStartingScanner;
26+
import org.apache.paimon.table.source.snapshot.TimeTravelUtil;
2727

2828
import org.assertj.core.api.Assertions;
2929
import org.junit.jupiter.api.Test;
@@ -129,9 +129,7 @@ public void testEarlierThanTimeMillis(boolean isRaceCondition) throws IOExceptio
129129
// pick a random time equal to one of the snapshots
130130
time = millis.get(random.nextInt(numSnapshots));
131131
}
132-
Long actual =
133-
ContinuousFromTimestampStartingScanner.earlierThanTimeMills(
134-
snapshotManager, null, time, false);
132+
Long actual = TimeTravelUtil.earlierThanTimeMills(snapshotManager, null, time, false);
135133

136134
if (millis.get(numSnapshots - 1) < time) {
137135
if (isRaceCondition && millis.size() == 1) {

0 commit comments

Comments
 (0)