Skip to content

Commit 3e7c8eb

Browse files
authored
[FLINK-38347][Checkpoint] Tolerate RPC loss for cleanup in file-merging manager (#26980)
(cherry picked from commit f3bd4a4)
1 parent e107aad commit 3e7c8eb

File tree

2 files changed

+54
-17
lines changed

2 files changed

+54
-17
lines changed

flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerBase.java

Lines changed: 21 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -507,10 +507,10 @@ public void notifyCheckpointStart(SubtaskKey subtaskKey, long checkpointId) {
507507
managedSharedStateDirHandles.computeIfPresent(
508508
subtaskKey,
509509
(k, v) -> {
510-
v.increaseRefCountWhenCheckpointStart(checkpointId);
510+
v.addReferenceWhenCheckpointStart(checkpointId);
511511
return v;
512512
});
513-
managedExclusiveStateDirHandle.increaseRefCountWhenCheckpointStart(checkpointId);
513+
managedExclusiveStateDirHandle.addReferenceWhenCheckpointStart(checkpointId);
514514
}
515515
}
516516

@@ -534,10 +534,10 @@ public void notifyCheckpointAborted(SubtaskKey subtaskKey, long checkpointId) th
534534
managedSharedStateDirHandles.computeIfPresent(
535535
subtaskKey,
536536
(k, v) -> {
537-
v.decreaseRefCountWhenCheckpointAbort(checkpointId);
537+
v.removeReferenceWhenCheckpointAbort(checkpointId);
538538
return v;
539539
});
540-
managedExclusiveStateDirHandle.decreaseRefCountWhenCheckpointAbort(checkpointId);
540+
managedExclusiveStateDirHandle.removeReferenceWhenCheckpointAbort(checkpointId);
541541
}
542542

543543
synchronized (lock) {
@@ -948,20 +948,22 @@ boolean isCheckpointDiscard(long checkpointId) {
948948
}
949949

950950
/**
951-
* This class wrap DirectoryStreamStateHandle with reference count by ongoing checkpoint. If an
952-
* ongoing checkpoint which reference the directory handle complete, we will stop tracking the
953-
* handle, because the ownership of the handle is handover to JobManager.
951+
* This class wrap DirectoryStreamStateHandle with reference by ongoing checkpoint. If an
952+
* ongoing checkpoint which reference the directory handle complete or be subsumed, we will stop
953+
* tracking the handle, because the ownership of the handle is handover to JobManager.
954+
* JobManager acknowledges the handle and will clean up the directory when it is no longer
955+
* needed.
954956
*/
955957
protected static class DirectoryHandleWithReferenceTrack {
956958

957959
private final DirectoryStreamStateHandle directoryHandle;
958-
// reference count by ongoing checkpoint
959-
private final AtomicLong ongoingRefCount;
960+
// reference by ongoing checkpoint
961+
private final Set<Long> refCheckpointIds;
960962
private boolean tracking;
961963

962964
DirectoryHandleWithReferenceTrack(DirectoryStreamStateHandle directoryHandle, boolean own) {
963965
this.directoryHandle = directoryHandle;
964-
this.ongoingRefCount = new AtomicLong(0);
966+
this.refCheckpointIds = new HashSet<>();
965967
this.tracking = own;
966968
}
967969

@@ -974,23 +976,23 @@ DirectoryStreamStateHandle getHandle() {
974976
return directoryHandle;
975977
}
976978

977-
void increaseRefCountWhenCheckpointStart(long checkpointId) {
979+
void addReferenceWhenCheckpointStart(long checkpointId) {
978980
if (tracking) {
979981
LOG.debug(
980-
"checkpoint:{} start, increase ref-count to file-merging managed shared dir : {}",
982+
"checkpoint:{} start, add reference to file-merging managed shared dir : {}",
981983
checkpointId,
982984
directoryHandle.getDirectory());
983-
ongoingRefCount.incrementAndGet();
985+
refCheckpointIds.add(checkpointId);
984986
}
985987
}
986988

987-
void decreaseRefCountWhenCheckpointAbort(long checkpointId) {
989+
void removeReferenceWhenCheckpointAbort(long checkpointId) {
988990
if (tracking) {
989991
LOG.debug(
990-
"checkpoint:{} aborted, decrease ref-count to file-merging managed shared dir : {}",
992+
"checkpoint:{} aborted, remove reference to file-merging managed shared dir : {}",
991993
checkpointId,
992994
directoryHandle.getDirectory());
993-
ongoingRefCount.decrementAndGet();
995+
refCheckpointIds.remove(checkpointId);
994996
}
995997
}
996998

@@ -1001,6 +1003,7 @@ void handoverOwnershipWhenCheckpointComplete(long checkpointId) {
10011003
checkpointId,
10021004
directoryHandle.getDirectory());
10031005
tracking = false;
1006+
refCheckpointIds.clear();
10041007
}
10051008
}
10061009

@@ -1011,11 +1014,12 @@ void handoverOwnershipWhenCheckpointSubsumed(long checkpointId) {
10111014
checkpointId,
10121015
directoryHandle.getDirectory());
10131016
tracking = false;
1017+
refCheckpointIds.clear();
10141018
}
10151019
}
10161020

10171021
void tryCleanupQuietly() {
1018-
if (tracking && ongoingRefCount.get() == 0 && directoryHandle != null) {
1022+
if (tracking && refCheckpointIds.isEmpty() && directoryHandle != null) {
10191023
try {
10201024
directoryHandle.discardState();
10211025
} catch (Exception e) {

flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerTestBase.java

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -646,6 +646,39 @@ public void testManagedDirCleanup() throws Exception {
646646
assertThat(fs.exists(sharedDirOfSubtask1)).isTrue();
647647
assertThat(fs.exists(sharedDirOfSubtask2)).isTrue();
648648
assertThat(fs.exists(exclusiveDir)).isTrue();
649+
650+
// 4. Test not clean up managed dir when rpc loss
651+
emptyCheckpointBaseDir();
652+
try (FileMergingSnapshotManagerBase fmsm =
653+
(FileMergingSnapshotManagerBase)
654+
createFileMergingSnapshotManager(
655+
checkpointBaseDir,
656+
32,
657+
PhysicalFilePool.Type.BLOCKING,
658+
Float.MAX_VALUE)) {
659+
660+
fmsm.registerSubtaskForSharedStates(subtaskKey1);
661+
fmsm.registerSubtaskForSharedStates(subtaskKey2);
662+
663+
// record reference from checkpoint 1
664+
fmsm.notifyCheckpointStart(subtaskKey1, 1L);
665+
fmsm.notifyCheckpointStart(subtaskKey2, 1L);
666+
667+
// checkpoint 1 complete rpc loss
668+
// checkpoint 2 start rpc loss
669+
670+
// checkpoint 2 aborted
671+
fmsm.notifyCheckpointAborted(subtaskKey1, 2L);
672+
fmsm.notifyCheckpointAborted(subtaskKey2, 2L);
673+
674+
assertThat(fs.exists(sharedDirOfSubtask1)).isTrue();
675+
assertThat(fs.exists(sharedDirOfSubtask2)).isTrue();
676+
exclusiveDir = new Path(taskOwnedStateDir, fmsm.getId());
677+
assertThat(fs.exists(exclusiveDir)).isTrue();
678+
}
679+
assertThat(fs.exists(sharedDirOfSubtask1)).isTrue();
680+
assertThat(fs.exists(sharedDirOfSubtask2)).isTrue();
681+
assertThat(fs.exists(exclusiveDir)).isTrue();
649682
}
650683

651684
private void emptyCheckpointBaseDir() throws IOException {

0 commit comments

Comments
 (0)