Skip to content

Commit 245cc54

Browse files
committed
fix claude review comments
1 parent d76f88a commit 245cc54

File tree

2 files changed

+29
-15
lines changed

2 files changed

+29
-15
lines changed

fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/KvSnapshotManager.java

Lines changed: 24 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -240,6 +240,13 @@ public void downloadSnapshot(long snapshotId) throws Exception {
240240
}
241241
CompletedSnapshot completedSnapshot =
242242
snapshotContext.getCompletedSnapshotProvider(tableBucket, snapshotId);
243+
if (completedSnapshot == null) {
244+
LOG.warn(
245+
"Snapshot {} not found for bucket {}, skip downloading.",
246+
snapshotId,
247+
tableBucket);
248+
return;
249+
}
243250
incrementalDownloadSnapshot(completedSnapshot);
244251
standbySnapshotSize = completedSnapshot.getSnapshotSize();
245252
}
@@ -253,23 +260,26 @@ public void downloadSnapshot(long snapshotId) throws Exception {
253260
* @return the latest snapshot
254261
*/
255262
public Optional<CompletedSnapshot> downloadLatestSnapshot() throws Exception {
256-
// standbyInitializing is used to prevent concurrent download.
263+
// standbyInitializing is used to prevent concurrent download via
264+
// downloadSnapshot(snapshotId).
257265
standbyInitializing = true;
266+
try {
267+
// Note: no isStandby check here. This method is called from both:
268+
// 1. initKvTablet() during leader initialization - isStandby is already false
269+
// 2. becomeStandbyAsync() during standby initialization - isStandby is true
270+
// The isStandby guard is only needed in downloadSnapshot(snapshotId) which is
271+
// called from the notification path exclusively for standby replicas.
272+
Optional<CompletedSnapshot> latestSnapshot = getLatestSnapshot();
273+
if (latestSnapshot.isPresent()) {
274+
CompletedSnapshot completedSnapshot = latestSnapshot.get();
275+
incrementalDownloadSnapshot(completedSnapshot);
276+
standbySnapshotSize = completedSnapshot.getSnapshotSize();
277+
}
258278

259-
// Note: no isStandby check here. This method is called from both:
260-
// 1. initKvTablet() during leader initialization - isStandby is already false
261-
// 2. becomeStandbyAsync() during standby initialization - isStandby is true
262-
// The isStandby guard is only needed in downloadSnapshot(snapshotId) which is
263-
// called from the notification path exclusively for standby replicas.
264-
Optional<CompletedSnapshot> latestSnapshot = getLatestSnapshot();
265-
if (latestSnapshot.isPresent()) {
266-
CompletedSnapshot completedSnapshot = latestSnapshot.get();
267-
incrementalDownloadSnapshot(completedSnapshot);
268-
standbySnapshotSize = completedSnapshot.getSnapshotSize();
279+
return latestSnapshot;
280+
} finally {
281+
standbyInitializing = false;
269282
}
270-
271-
standbyInitializing = false;
272-
return latestSnapshot;
273283
}
274284

275285
private void incrementalDownloadSnapshot(CompletedSnapshot completedSnapshot) throws Exception {

fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1052,7 +1052,6 @@ public void notifyKvSnapshotOffset(
10521052
() -> {
10531053
try {
10541054
replica.downloadSnapshot(snapshotId);
1055-
updateMinRetainOffset(replica, minRetainOffset);
10561055
LOG.debug(
10571056
"Successfully downloaded snapshot {} for standby replica {}.",
10581057
snapshotId,
@@ -1063,6 +1062,11 @@ public void notifyKvSnapshotOffset(
10631062
snapshotId,
10641063
tb,
10651064
e);
1065+
} finally {
1066+
// Always update minRetainOffset regardless of download success/failure.
1067+
// If we skip this, log segments may never be cleaned up when download
1068+
// keeps failing.
1069+
updateMinRetainOffset(replica, minRetainOffset);
10661070
}
10671071
});
10681072
} else {

0 commit comments

Comments
 (0)