Skip to content

Commit bd916bd

Browse files
Zakellyrkhachatryan
authored andcommitted
[FLINK-38574][checkpoint] Avoid reusing re-uploaded sst files when checkpoint notification is delayed (#27157)
1 parent 16a9d2f commit bd916bd

File tree

3 files changed

+127
-14
lines changed

3 files changed

+127
-14
lines changed

flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksDBSnapshotStrategyBase.java

Lines changed: 84 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@
6767
import java.util.List;
6868
import java.util.Map;
6969
import java.util.Optional;
70+
import java.util.SortedMap;
7071
import java.util.UUID;
7172
import java.util.stream.Collectors;
7273

@@ -392,22 +393,94 @@ public void release() {
392393
}
393394

394395
protected static final PreviousSnapshot EMPTY_PREVIOUS_SNAPSHOT =
395-
new PreviousSnapshot(Collections.emptyList());
396+
new PreviousSnapshot(null, -1L);
396397

397398
/** Previous snapshot with uploaded sst files. */
398399
protected static class PreviousSnapshot {
399400

400401
@Nonnull private final Map<String, StreamStateHandle> confirmedSstFiles;
401402

402-
protected PreviousSnapshot(@Nullable Collection<HandleAndLocalPath> confirmedSstFiles) {
403+
/**
404+
* Constructor of PreviousSnapshot. Giving a map of uploaded sst files in previous
405+
* checkpoints, prune the sst files which have been re-uploaded in the following
406+
* checkpoints. The prune logic is used to resolve the mismatch between TM and JM due to
407+
* notification delay. Following steps for example:
408+
*
409+
* <ul>
410+
* <li>1) checkpoint 1 uses file 00001.SST uploaded as xxx.sst.
411+
* <li>2) checkpoint 2 uses the same file 00001.SST but re-uploads it as yyy.sst because
412+
* CP 1 wasn't yet confirmed.
413+
* <li>3) TM get a confirmation of checkpoint 1.
414+
* <li>4) JM completes checkpoint 2 and subsumes checkpoint 1 - removing xxx.sst.
415+
* <li>5) checkpoint 3 tries to re-use file 00001.SST uploaded as xxx.sst in checkpoint 1,
416+
* but it was deleted in (4) by JM.
417+
* </ul>
418+
*
419+
* @param currentUploadedSstFiles the sst files uploaded in previous checkpoints.
420+
* @param lastCompletedCheckpoint the last completed checkpoint id.
421+
*/
422+
protected PreviousSnapshot(
423+
@Nullable SortedMap<Long, Collection<HandleAndLocalPath>> currentUploadedSstFiles,
424+
long lastCompletedCheckpoint) {
403425
this.confirmedSstFiles =
404-
confirmedSstFiles != null
405-
? confirmedSstFiles.stream()
426+
currentUploadedSstFiles != null
427+
? pruneFirstCheckpointSstFiles(
428+
currentUploadedSstFiles, lastCompletedCheckpoint)
429+
: Collections.emptyMap();
430+
}
431+
432+
/**
433+
* The last completed checkpoint's uploaded sst files are all included, then for each
434+
* following checkpoint, if a sst file has been re-uploaded, remove it from the first
435+
* checkpoint's sst files.
436+
*
437+
* @param currentUploadedSstFiles the sst files uploaded in the following checkpoint.
438+
* @param lastCompletedCheckpoint the last completed checkpoint id.
439+
*/
440+
private Map<String, StreamStateHandle> pruneFirstCheckpointSstFiles(
441+
@Nonnull SortedMap<Long, Collection<HandleAndLocalPath>> currentUploadedSstFiles,
442+
long lastCompletedCheckpoint) {
443+
Map<String, StreamStateHandle> prunedSstFiles = null;
444+
int removedCount = 0;
445+
for (Map.Entry<Long, Collection<HandleAndLocalPath>> entry :
446+
currentUploadedSstFiles.entrySet()) {
447+
// Iterate checkpoints in ascending order of checkpoint id.
448+
if (entry.getKey() == lastCompletedCheckpoint) {
449+
// The first checkpoint's uploaded sst files are all included.
450+
prunedSstFiles =
451+
entry.getValue().stream()
406452
.collect(
407453
Collectors.toMap(
408454
HandleAndLocalPath::getLocalPath,
409-
HandleAndLocalPath::getHandle))
410-
: Collections.emptyMap();
455+
HandleAndLocalPath::getHandle));
456+
} else if (prunedSstFiles == null) {
457+
// The last completed checkpoint's uploaded sst files are not existed.
458+
// So we skip the pruning process.
459+
break;
460+
} else if (!prunedSstFiles.isEmpty()) {
461+
// Prune sst files which have been re-uploaded in the following checkpoints.
462+
for (HandleAndLocalPath handleAndLocalPath : entry.getValue()) {
463+
if (!(handleAndLocalPath.getHandle()
464+
instanceof PlaceholderStreamStateHandle)) {
465+
// If it's not a placeholder handle, it means the sst file has been
466+
// re-uploaded in the following checkpoint.
467+
if (prunedSstFiles.remove(handleAndLocalPath.getLocalPath()) != null) {
468+
removedCount++;
469+
}
470+
}
471+
}
472+
}
473+
}
474+
if (removedCount > 0 && LOG.isTraceEnabled()) {
475+
LOG.trace(
476+
"Removed {} re-uploaded sst files from base file set for incremental "
477+
+ "checkpoint. Base checkpoint id: {}",
478+
removedCount,
479+
currentUploadedSstFiles.firstKey());
480+
}
481+
return (prunedSstFiles != null && !prunedSstFiles.isEmpty())
482+
? Collections.unmodifiableMap(prunedSstFiles)
483+
: Collections.emptyMap();
411484
}
412485

413486
protected Optional<StreamStateHandle> getUploaded(String filename) {
@@ -425,5 +498,10 @@ protected Optional<StreamStateHandle> getUploaded(String filename) {
425498
return Optional.empty();
426499
}
427500
}
501+
502+
@Override
503+
public String toString() {
504+
return "PreviousSnapshot{" + "confirmedSstFiles=" + confirmedSstFiles + '}';
505+
}
428506
}
429507
}

flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -195,30 +195,29 @@ protected PreviousSnapshot snapshotMetaData(
195195
long checkpointId, @Nonnull List<StateMetaInfoSnapshot> stateMetaInfoSnapshots) {
196196

197197
final long lastCompletedCheckpoint;
198-
final Collection<HandleAndLocalPath> confirmedSstFiles;
198+
final SortedMap<Long, Collection<HandleAndLocalPath>> currentUploadedSstFiles;
199199

200200
// use the last completed checkpoint as the comparison base.
201201
synchronized (uploadedSstFiles) {
202202
lastCompletedCheckpoint = lastCompletedCheckpointId;
203-
confirmedSstFiles = uploadedSstFiles.get(lastCompletedCheckpoint);
204-
LOG.trace(
205-
"Use confirmed SST files for checkpoint {}: {}",
206-
checkpointId,
207-
confirmedSstFiles);
203+
currentUploadedSstFiles =
204+
new TreeMap<>(uploadedSstFiles.tailMap(lastCompletedCheckpoint));
208205
}
206+
PreviousSnapshot previousSnapshot =
207+
new PreviousSnapshot(currentUploadedSstFiles, lastCompletedCheckpoint);
209208
LOG.trace(
210209
"Taking incremental snapshot for checkpoint {}. Snapshot is based on last completed checkpoint {} "
211210
+ "assuming the following (shared) confirmed files as base: {}.",
212211
checkpointId,
213212
lastCompletedCheckpoint,
214-
confirmedSstFiles);
213+
previousSnapshot);
215214

216215
// snapshot meta data to save
217216
for (Map.Entry<String, RocksDbKvStateInfo> stateMetaInfoEntry :
218217
kvStateInformation.entrySet()) {
219218
stateMetaInfoSnapshots.add(stateMetaInfoEntry.getValue().metaInfo.snapshot());
220219
}
221-
return new PreviousSnapshot(confirmedSstFiles);
220+
return previousSnapshot;
222221
}
223222

224223
/**
@@ -343,6 +342,7 @@ private long uploadSnapshotFiles(
343342
List<Path> miscFilePaths = new ArrayList<>(files.length);
344343

345344
createUploadFilePaths(files, sstFiles, sstFilePaths, miscFilePaths);
345+
LOG.info("Will re-use {} SST files. {}", sstFiles.size(), sstFiles);
346346

347347
final CheckpointedStateScope stateScope =
348348
sharingFilesStrategy == SnapshotType.SharingFilesStrategy.NO_SHARING

flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategyTest.java

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,41 @@ void testCheckpointIsIncremental() throws Exception {
9696
}
9797
}
9898

99+
@Test
100+
void testCheckpointIsIncrementalWithLateNotification() throws Exception {
101+
102+
try (CloseableRegistry closeableRegistry = new CloseableRegistry();
103+
RocksIncrementalSnapshotStrategy<?> checkpointSnapshotStrategy =
104+
createSnapshotStrategy()) {
105+
FsCheckpointStreamFactory checkpointStreamFactory = createFsCheckpointStreamFactory();
106+
107+
// make and checkpoint with id 1
108+
snapshot(1L, checkpointSnapshotStrategy, checkpointStreamFactory, closeableRegistry);
109+
110+
// make and checkpoint with id 2
111+
snapshot(2L, checkpointSnapshotStrategy, checkpointStreamFactory, closeableRegistry);
112+
113+
// Late notify checkpoint with id 1
114+
checkpointSnapshotStrategy.notifyCheckpointComplete(1L);
115+
116+
// make checkpoint with id 3, based on checkpoint 1
117+
IncrementalRemoteKeyedStateHandle incrementalRemoteKeyedStateHandle3 =
118+
snapshot(
119+
3L,
120+
checkpointSnapshotStrategy,
121+
checkpointStreamFactory,
122+
closeableRegistry);
123+
124+
// Late notify checkpoint with id 2
125+
checkpointSnapshotStrategy.notifyCheckpointComplete(2L);
126+
127+
// 3rd checkpoint is based on 1st checkpoint, BUT the 2nd checkpoint re-uploaded the 1st
128+
// one, so it should be based on nothing, thus this is effectively a full checkpoint.
129+
assertThat(incrementalRemoteKeyedStateHandle3.getStateSize())
130+
.isEqualTo(incrementalRemoteKeyedStateHandle3.getCheckpointedSize());
131+
}
132+
}
133+
99134
public RocksIncrementalSnapshotStrategy<?> createSnapshotStrategy()
100135
throws IOException, RocksDBException {
101136

0 commit comments

Comments
 (0)