Skip to content

Commit bf45e37

Browse files
authored
[FLINK-38433][state/forst] Avoid delete ForSt's directory when there happened to be an existing one (#27064)
(cherry picked from commit a644a82)
1 parent 53c5d71 commit bf45e37

File tree

2 files changed

+14
-7
lines changed

2 files changed

+14
-7
lines changed

flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStResourceContainer.java

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,8 @@ public final class ForStResourceContainer implements AutoCloseable {
8686

8787
@Nullable private final Path remoteForStPath;
8888

89+
private boolean remotePathNewlyCreated;
90+
8991
@Nullable private final Path localBasePath;
9092

9193
@Nullable private final Path localForStPath;
@@ -369,7 +371,7 @@ public int getWriteIoParallelism() {
369371
*/
370372
public void prepareDirectories() throws Exception {
371373
if (remoteBasePath != null && remoteForStPath != null) {
372-
prepareDirectories(remoteBasePath, remoteForStPath);
374+
remotePathNewlyCreated = prepareDirectories(remoteBasePath, remoteForStPath);
373375
}
374376
if (localBasePath != null && localForStPath != null) {
375377
prepareDirectories(
@@ -402,23 +404,26 @@ public void prepareDirectories() throws Exception {
402404
return forStFileSystem;
403405
}
404406

405-
private static void prepareDirectories(Path basePath, Path dbPath) throws IOException {
407+
private static boolean prepareDirectories(Path basePath, Path dbPath) throws IOException {
408+
boolean allNewlyCreated = true;
406409
FileSystem fileSystem = basePath.getFileSystem();
407410
if (fileSystem.exists(basePath)) {
408411
if (!fileSystem.getFileStatus(basePath).isDir()) {
409412
throw new IOException("Not a directory: " + basePath);
410413
}
414+
allNewlyCreated = false;
411415
} else if (!fileSystem.mkdirs(basePath)) {
412416
throw new IOException(
413417
String.format("Could not create ForSt directory at %s.", basePath));
414418
}
415419
if (fileSystem.exists(dbPath)) {
416-
fileSystem.delete(dbPath, true);
417-
}
418-
if (!fileSystem.mkdirs(dbPath)) {
420+
LOG.info("Reusing previous ForSt db directory at {}.", dbPath);
421+
allNewlyCreated = false;
422+
} else if (!fileSystem.mkdirs(dbPath)) {
419423
throw new IOException(
420424
String.format("Could not create ForSt db directory at %s.", dbPath));
421425
}
426+
return allNewlyCreated;
422427
}
423428

424429
/**
@@ -436,7 +441,7 @@ public void clearDirectories() throws Exception {
436441
}
437442

438443
public void forceClearRemoteDirectories() throws Exception {
439-
if (remoteBasePath != null) {
444+
if (remoteBasePath != null && remotePathNewlyCreated) {
440445
clearDirectories(remoteBasePath);
441446
}
442447
}

flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStResourceContainerTest.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -336,7 +336,9 @@ public void testDirectoryResources() throws Exception {
336336

337337
assertTrue(new File(remoteBasePath.getPath()).exists());
338338
optionsContainer.forceClearRemoteDirectories();
339-
assertFalse(new File(remoteBasePath.getPath()).exists());
339+
340+
// Do not delete remote directory because it is not created by ForStResourceContainer
341+
assertTrue(new File(remoteBasePath.getPath()).exists());
340342
}
341343
}
342344

0 commit comments

Comments
 (0)