Skip to content

Commit 1851707

Browse files
[hotfix] Fix earliestSnapshot stability (#5096)
1 parent a5c9111 commit 1851707

File tree

2 files changed

+22
-15
lines changed

2 files changed

+22
-15
lines changed

paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java

Lines changed: 16 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ public class SnapshotManager implements Serializable {
7171
private static final String CHANGELOG_PREFIX = "changelog-";
7272
public static final String EARLIEST = "EARLIEST";
7373
public static final String LATEST = "LATEST";
74+
private static final int EARLIEST_SNAPSHOT_DEFAULT_RETRY_NUM = 3;
7475
private static final int READ_HINT_RETRY_NUM = 3;
7576
private static final int READ_HINT_RETRY_INTERVAL = 1;
7677

@@ -221,10 +222,11 @@ public boolean longLivedChangelogExists(long snapshotId) {
221222
}
222223

223224
public @Nullable Snapshot earliestSnapshot() {
224-
return earliestSnapshot(false);
225+
return earliestSnapshot(false, null);
225226
}
226227

227-
private @Nullable Snapshot earliestSnapshot(boolean includeChangelog) {
228+
private @Nullable Snapshot earliestSnapshot(
229+
boolean includeChangelog, @Nullable Long stopSnapshotId) {
228230
Long snapshotId = null;
229231
if (includeChangelog) {
230232
snapshotId = earliestLongLivedChangelogId();
@@ -236,24 +238,25 @@ public boolean longLivedChangelogExists(long snapshotId) {
236238
return null;
237239
}
238240

241+
if (stopSnapshotId == null) {
242+
stopSnapshotId = snapshotId + EARLIEST_SNAPSHOT_DEFAULT_RETRY_NUM;
243+
}
244+
239245
FunctionWithException<Long, Snapshot, FileNotFoundException> snapshotFunction =
240246
includeChangelog ? this::tryGetChangelogOrSnapshot : this::tryGetSnapshot;
241247

242-
// The loss of the earliest snapshot is an event of small probability, so the retry number
243-
// here need not be too large.
244-
int retry = 0;
245248
do {
246249
try {
247250
return snapshotFunction.apply(snapshotId);
248251
} catch (FileNotFoundException e) {
249-
if (retry++ >= 3) {
250-
throw new RuntimeException(e);
252+
snapshotId++;
253+
if (snapshotId > stopSnapshotId) {
254+
return null;
251255
}
252256
LOG.warn(
253257
"The earliest snapshot or changelog was once identified but disappeared. "
254258
+ "It might have been expired by other jobs operating on this table. "
255259
+ "Searching for the second earliest snapshot or changelog instead. ");
256-
snapshotId++;
257260
}
258261
} while (true);
259262
}
@@ -332,9 +335,9 @@ private Snapshot tryGetChangelogOrSnapshot(long snapshotId) throws FileNotFoundE
332335
return null;
333336
}
334337

335-
Snapshot earliestSnapshot = earliestSnapshot(startFromChangelog);
338+
Snapshot earliestSnapshot = earliestSnapshot(startFromChangelog, latest);
336339
if (earliestSnapshot == null) {
337-
return null;
340+
return latest - 1;
338341
}
339342

340343
if (earliestSnapshot.timeMillis() >= timestampMills) {
@@ -363,7 +366,7 @@ private Snapshot tryGetChangelogOrSnapshot(long snapshotId) throws FileNotFoundE
363366
return null;
364367
}
365368

366-
Snapshot earliestSnapShot = earliestSnapshot();
369+
Snapshot earliestSnapShot = earliestSnapshot(false, latest);
367370
if (earliestSnapShot == null || earliestSnapShot.timeMillis() > timestampMills) {
368371
return earliestSnapShot;
369372
}
@@ -428,7 +431,7 @@ private Snapshot tryGetChangelogOrSnapshot(long snapshotId) throws FileNotFoundE
428431
return null;
429432
}
430433

431-
Snapshot earliestSnapShot = earliestSnapshot();
434+
Snapshot earliestSnapShot = earliestSnapshot(false, latest);
432435
if (earliestSnapShot == null) {
433436
return null;
434437
}
@@ -493,7 +496,7 @@ private Snapshot tryGetChangelogOrSnapshot(long snapshotId) throws FileNotFoundE
493496
return null;
494497
}
495498

496-
Snapshot earliestSnapShot = earliestSnapshot();
499+
Snapshot earliestSnapShot = earliestSnapshot(false, latest);
497500
if (earliestSnapShot == null) {
498501
return null;
499502
}

paimon-core/src/test/java/org/apache/paimon/utils/SnapshotManagerTest.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,11 @@ public void testEarlierThanTimeMillis(boolean isRaceCondition) throws IOExceptio
129129

130130
if (millis.get(numSnapshots - 1) < time) {
131131
if (isRaceCondition && millis.size() == 1) {
132-
assertThat(actual).isNull();
132+
if (tries == 0) {
133+
assertThat(actual).isLessThanOrEqualTo(firstSnapshotId);
134+
} else {
135+
assertThat(actual).isNull();
136+
}
133137
} else {
134138
assertThat(actual).isEqualTo(firstSnapshotId + numSnapshots - 1);
135139
}
@@ -138,7 +142,7 @@ public void testEarlierThanTimeMillis(boolean isRaceCondition) throws IOExceptio
138142
if (millis.get(i) >= time) {
139143
if (isRaceCondition && i == 0) {
140144
// The first snapshot expired during invocation
141-
if (millis.size() == 1) {
145+
if (millis.size() == 1 && tries > 0) {
142146
assertThat(actual).isNull();
143147
} else {
144148
assertThat(actual).isLessThanOrEqualTo(firstSnapshotId);

0 commit comments

Comments
 (0)