Skip to content

Commit 2504732

Browse files
[core] Fix race condition for earliest snapshot (#4930)
1 parent 951d0dd commit 2504732

File tree

3 files changed

+212
-47
lines changed

3 files changed

+212
-47
lines changed

paimon-core/src/main/java/org/apache/paimon/Changelog.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828

2929
import javax.annotation.Nullable;
3030

31+
import java.io.FileNotFoundException;
3132
import java.io.IOException;
3233
import java.util.Map;
3334

@@ -105,9 +106,19 @@ public static Changelog fromJson(String json) {
105106
}
106107

107108
public static Changelog fromPath(FileIO fileIO, Path path) {
109+
try {
110+
return tryFromPath(fileIO, path);
111+
} catch (FileNotFoundException e) {
112+
throw new RuntimeException("Fails to read changelog from path " + path, e);
113+
}
114+
}
115+
116+
public static Changelog tryFromPath(FileIO fileIO, Path path) throws FileNotFoundException {
108117
try {
109118
String json = fileIO.readFileUtf8(path);
110119
return Changelog.fromJson(json);
120+
} catch (FileNotFoundException e) {
121+
throw e;
111122
} catch (IOException e) {
112123
throw new RuntimeException("Fails to read changelog from path " + path, e);
113124
}

paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java

Lines changed: 80 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,11 @@ public Changelog changelog(long snapshotId) {
168168
return Changelog.fromPath(fileIO, changelogPath);
169169
}
170170

171+
private Changelog tryGetChangelog(long snapshotId) throws FileNotFoundException {
172+
Path changelogPath = longLivedChangelogPath(snapshotId);
173+
return Changelog.tryFromPath(fileIO, changelogPath);
174+
}
175+
171176
public Changelog longLivedChangelog(long snapshotId) {
172177
return Changelog.fromPath(fileIO, longLivedChangelogPath(snapshotId));
173178
}
@@ -216,8 +221,41 @@ public boolean longLivedChangelogExists(long snapshotId) {
216221
}
217222

218223
public @Nullable Snapshot earliestSnapshot() {
219-
Long snapshotId = earliestSnapshotId();
220-
return snapshotId == null ? null : snapshot(snapshotId);
224+
return earliestSnapshot(false);
225+
}
226+
227+
private @Nullable Snapshot earliestSnapshot(boolean includeChangelog) {
228+
Long snapshotId = null;
229+
if (includeChangelog) {
230+
snapshotId = earliestLongLivedChangelogId();
231+
}
232+
if (snapshotId == null) {
233+
snapshotId = earliestSnapshotId();
234+
}
235+
if (snapshotId == null) {
236+
return null;
237+
}
238+
239+
FunctionWithException<Long, Snapshot, FileNotFoundException> snapshotFunction =
240+
includeChangelog ? this::tryGetChangelogOrSnapshot : this::tryGetSnapshot;
241+
242+
// The loss of the earliest snapshot is an event of small probability, so the retry number
243+
// here need not be too large.
244+
int retry = 0;
245+
do {
246+
try {
247+
return snapshotFunction.apply(snapshotId);
248+
} catch (FileNotFoundException e) {
249+
if (retry++ >= 3) {
250+
throw new RuntimeException(e);
251+
}
252+
LOG.warn(
253+
"The earliest snapshot or changelog was once identified but disappeared. "
254+
+ "It might have been expired by other jobs operating on this table. "
255+
+ "Searching for the second earliest snapshot or changelog instead. ");
256+
snapshotId++;
257+
}
258+
} while (true);
221259
}
222260

223261
public @Nullable Long earliestSnapshotId() {
@@ -276,28 +314,34 @@ private Snapshot changelogOrSnapshot(long snapshotId) {
276314
}
277315
}
278316

317+
private Snapshot tryGetChangelogOrSnapshot(long snapshotId) throws FileNotFoundException {
318+
if (longLivedChangelogExists(snapshotId)) {
319+
return tryGetChangelog(snapshotId);
320+
} else {
321+
return tryGetSnapshot(snapshotId);
322+
}
323+
}
324+
279325
/**
280326
* Returns the latest snapshot earlier than the timestamp mills. A non-existent snapshot may be
281327
* returned if all snapshots are equal to or later than the timestamp mills.
282328
*/
283329
public @Nullable Long earlierThanTimeMills(long timestampMills, boolean startFromChangelog) {
284-
Long earliestSnapshot = earliestSnapshotId();
285-
Long earliest;
286-
if (startFromChangelog) {
287-
Long earliestChangelog = earliestLongLivedChangelogId();
288-
earliest = earliestChangelog == null ? earliestSnapshot : earliestChangelog;
289-
} else {
290-
earliest = earliestSnapshot;
291-
}
292330
Long latest = latestSnapshotId();
293-
if (earliest == null || latest == null) {
331+
if (latest == null) {
332+
return null;
333+
}
334+
335+
Snapshot earliestSnapshot = earliestSnapshot(startFromChangelog);
336+
if (earliestSnapshot == null) {
294337
return null;
295338
}
296339

297-
if (changelogOrSnapshot(earliest).timeMillis() >= timestampMills) {
298-
return earliest - 1;
340+
if (earliestSnapshot.timeMillis() >= timestampMills) {
341+
return earliestSnapshot.id() - 1;
299342
}
300343

344+
long earliest = earliestSnapshot.id();
301345
while (earliest < latest) {
302346
long mid = (earliest + latest + 1) / 2;
303347
if (changelogOrSnapshot(mid).timeMillis() < timestampMills) {
@@ -314,16 +358,17 @@ private Snapshot changelogOrSnapshot(long snapshotId) {
314358
* mills. If there is no such a snapshot, returns null.
315359
*/
316360
public @Nullable Snapshot earlierOrEqualTimeMills(long timestampMills) {
317-
Long earliest = earliestSnapshotId();
318361
Long latest = latestSnapshotId();
319-
if (earliest == null || latest == null) {
362+
if (latest == null) {
320363
return null;
321364
}
322365

323-
Snapshot earliestSnapShot = snapshot(earliest);
324-
if (earliestSnapShot.timeMillis() > timestampMills) {
366+
Snapshot earliestSnapShot = earliestSnapshot();
367+
if (earliestSnapShot == null || earliestSnapShot.timeMillis() > timestampMills) {
325368
return earliestSnapShot;
326369
}
370+
long earliest = earliestSnapShot.id();
371+
327372
Snapshot finalSnapshot = null;
328373
while (earliest <= latest) {
329374
long mid = earliest + (latest - earliest) / 2; // Avoid overflow
@@ -376,16 +421,22 @@ private Snapshot changelogOrSnapshot(long snapshotId) {
376421
}
377422

378423
public @Nullable Snapshot earlierOrEqualWatermark(long watermark) {
379-
Long earliest = earliestSnapshotId();
380424
Long latest = latestSnapshotId();
381425
// If latest == Long.MIN_VALUE don't need next binary search for watermark
382426
// which can reduce IO cost with snapshot
383-
if (earliest == null || latest == null || snapshot(latest).watermark() == Long.MIN_VALUE) {
427+
if (latest == null || snapshot(latest).watermark() == Long.MIN_VALUE) {
384428
return null;
385429
}
430+
431+
Snapshot earliestSnapShot = earliestSnapshot();
432+
if (earliestSnapShot == null) {
433+
return null;
434+
}
435+
long earliest = earliestSnapShot.id();
436+
386437
Long earliestWatermark = null;
387438
// find the first snapshot with watermark
388-
if ((earliestWatermark = snapshot(earliest).watermark()) == null) {
439+
if ((earliestWatermark = earliestSnapShot.watermark()) == null) {
389440
while (earliest < latest) {
390441
earliest++;
391442
earliestWatermark = snapshot(earliest).watermark();
@@ -435,16 +486,22 @@ private Snapshot changelogOrSnapshot(long snapshotId) {
435486
}
436487

437488
public @Nullable Snapshot laterOrEqualWatermark(long watermark) {
438-
Long earliest = earliestSnapshotId();
439489
Long latest = latestSnapshotId();
440490
// If latest == Long.MIN_VALUE don't need next binary search for watermark
441491
// which can reduce IO cost with snapshot
442-
if (earliest == null || latest == null || snapshot(latest).watermark() == Long.MIN_VALUE) {
492+
if (latest == null || snapshot(latest).watermark() == Long.MIN_VALUE) {
443493
return null;
444494
}
495+
496+
Snapshot earliestSnapShot = earliestSnapshot();
497+
if (earliestSnapShot == null) {
498+
return null;
499+
}
500+
long earliest = earliestSnapShot.id();
501+
445502
Long earliestWatermark = null;
446503
// find the first snapshot with watermark
447-
if ((earliestWatermark = snapshot(earliest).watermark()) == null) {
504+
if ((earliestWatermark = earliestSnapShot.watermark()) == null) {
448505
while (earliest < latest) {
449506
earliest++;
450507
earliestWatermark = snapshot(earliest).watermark();

0 commit comments

Comments
 (0)