Skip to content

Commit a4f3596

Browse files
committed
[flink] filter by directory for orphan file clean to reduce memory usage
1 parent e2ab2a0 commit a4f3596

File tree

8 files changed

+51
-6
lines changed

8 files changed

+51
-6
lines changed

paimon-core/src/main/java/org/apache/paimon/operation/LocalOrphanFilesClean.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ public LocalOrphanFilesClean(FileStoreTable table, long olderThanMillis) {
8585
}
8686

8787
public LocalOrphanFilesClean(FileStoreTable table, long olderThanMillis, boolean dryRun) {
88-
super(table, olderThanMillis, dryRun);
88+
super(table, olderThanMillis, dryRun, false);
8989
this.deleteFiles = new ArrayList<>();
9090
this.executor =
9191
createCachedThreadPool(

paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,14 +96,17 @@ public abstract class OrphanFilesClean implements Serializable {
9696
protected final boolean dryRun;
9797
protected final int partitionKeysNum;
9898
protected final Path location;
99+
protected final boolean folderBasedCheck;
99100

100-
public OrphanFilesClean(FileStoreTable table, long olderThanMillis, boolean dryRun) {
101+
public OrphanFilesClean(
102+
FileStoreTable table, long olderThanMillis, boolean dryRun, boolean folderBasedCheck) {
101103
this.table = table;
102104
this.fileIO = table.fileIO();
103105
this.partitionKeysNum = table.partitionKeys().size();
104106
this.location = table.location();
105107
this.olderThanMillis = olderThanMillis;
106108
this.dryRun = dryRun;
109+
this.folderBasedCheck = folderBasedCheck;
107110
}
108111

109112
protected List<String> validBranches() {
@@ -368,6 +371,10 @@ private List<Path> listFileDirs(Path dir, int level) {
368371
List<FileStatus> dirs = tryBestListingDirs(dir);
369372

370373
if (level == 0) {
374+
if (folderBasedCheck) {
375+
return filterDirs(
376+
dirs, p -> p.getName().startsWith(BUCKET_PATH_PREFIX), this::oldEnough);
377+
}
371378
// return bucket paths
372379
return filterDirs(dirs, p -> p.getName().startsWith(BUCKET_PATH_PREFIX));
373380
}
@@ -395,6 +402,23 @@ private List<Path> filterDirs(List<FileStatus> statuses, Predicate<Path> filter)
395402
return filtered;
396403
}
397404

405+
private List<Path> filterDirs(
406+
List<FileStatus> statuses,
407+
Predicate<Path> nameFilter,
408+
Predicate<FileStatus> timeFilter) {
409+
List<Path> filtered = new ArrayList<>();
410+
411+
for (FileStatus status : statuses) {
412+
Path path = status.getPath();
413+
if (nameFilter.test(path) && timeFilter.test(status)) {
414+
filtered.add(path);
415+
}
416+
// ignore unknown dirs
417+
}
418+
419+
return filtered;
420+
}
421+
398422
/**
399423
* If failed to list directory, just return an empty result because it's OK to not delete them.
400424
*/

paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/RemoveOrphanFilesProcedure.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,7 @@ public String[] call(
9797
catalog,
9898
olderThanMillis(olderThan),
9999
dryRun,
100+
false,
100101
parallelism,
101102
databaseName,
102103
tableName);

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RemoveOrphanFilesAction.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ public class RemoveOrphanFilesAction extends ActionBase {
3434

3535
private String olderThan = null;
3636
private boolean dryRun = false;
37+
private boolean folderBasedCheck = false;
3738

3839
public RemoveOrphanFilesAction(
3940
String databaseName,
@@ -54,13 +55,18 @@ public void dryRun() {
5455
this.dryRun = true;
5556
}
5657

58+
public void folderBasedCheck() {
59+
this.folderBasedCheck = true;
60+
}
61+
5762
@Override
5863
public void run() throws Exception {
5964
executeDatabaseOrphanFiles(
6065
env,
6166
catalog,
6267
olderThanMillis(olderThan),
6368
dryRun,
69+
folderBasedCheck,
6470
parallelism == null ? null : Integer.parseInt(parallelism),
6571
databaseName,
6672
tableName);

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RemoveOrphanFilesActionFactory.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ public class RemoveOrphanFilesActionFactory implements ActionFactory {
2626
public static final String IDENTIFIER = "remove_orphan_files";
2727
private static final String OLDER_THAN = "older_than";
2828
private static final String DRY_RUN = "dry_run";
29+
private static final String FOLDER_BASED_CHECK = "folder_based_check";
2930
private static final String PARALLELISM = "parallelism";
3031

3132
@Override
@@ -50,6 +51,11 @@ public Optional<Action> create(MultipleParameterToolAdapter params) {
5051
action.dryRun();
5152
}
5253

54+
if (params.has(FOLDER_BASED_CHECK)
55+
&& Boolean.parseBoolean(params.get(FOLDER_BASED_CHECK))) {
56+
action.folderBasedCheck();
57+
}
58+
5359
return Optional.of(action);
5460
}
5561

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/orphan/FlinkOrphanFilesClean.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -81,8 +81,9 @@ public FlinkOrphanFilesClean(
8181
FileStoreTable table,
8282
long olderThanMillis,
8383
boolean dryRun,
84+
boolean folderBasedCheck,
8485
@Nullable Integer parallelism) {
85-
super(table, olderThanMillis, dryRun);
86+
super(table, olderThanMillis, dryRun, folderBasedCheck);
8687
this.parallelism = parallelism;
8788
}
8889

@@ -372,10 +373,13 @@ public static CleanOrphanFilesResult executeDatabaseOrphanFiles(
372373
Catalog catalog,
373374
long olderThanMillis,
374375
boolean dryRun,
376+
boolean folderBasedCheck,
375377
@Nullable Integer parallelism,
376378
String databaseName,
377379
@Nullable String tableName)
378380
throws Catalog.DatabaseNotExistException, Catalog.TableNotExistException {
381+
LOG.info("execute orphan file clean using folderBasedCheck:{}", folderBasedCheck);
382+
379383
List<String> tableNames = Collections.singletonList(tableName);
380384
if (tableName == null || "*".equals(tableName)) {
381385
tableNames = catalog.listTables(databaseName);
@@ -393,7 +397,11 @@ public static CleanOrphanFilesResult executeDatabaseOrphanFiles(
393397

394398
DataStream<CleanOrphanFilesResult> clean =
395399
new FlinkOrphanFilesClean(
396-
(FileStoreTable) table, olderThanMillis, dryRun, parallelism)
400+
(FileStoreTable) table,
401+
olderThanMillis,
402+
dryRun,
403+
folderBasedCheck,
404+
parallelism)
397405
.doOrphanClean(env);
398406
if (clean != null) {
399407
orphanFilesCleans.add(clean);

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RemoveOrphanFilesProcedure.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@ public String[] call(
8585
catalog,
8686
olderThanMillis(olderThan),
8787
dryRun != null && dryRun,
88+
false,
8889
parallelism,
8990
databaseName,
9091
tableName);

paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/procedure/SparkOrphanFilesClean.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ import org.apache.paimon.operation.{CleanOrphanFilesResult, OrphanFilesClean}
2626
import org.apache.paimon.operation.OrphanFilesClean.retryReadingFiles
2727
import org.apache.paimon.table.FileStoreTable
2828
import org.apache.paimon.utils.FileStorePathFactory.BUCKET_PATH_PREFIX
29-
import org.apache.paimon.utils.SerializableConsumer
3029

3130
import org.apache.spark.internal.Logging
3231
import org.apache.spark.sql.{functions, Dataset, PaimonSparkSession, SparkSession}
@@ -47,7 +46,7 @@ case class SparkOrphanFilesClean(
4746
parallelism: Int,
4847
dryRunPara: Boolean,
4948
@transient spark: SparkSession)
50-
extends OrphanFilesClean(specifiedTable, specifiedOlderThanMillis, dryRunPara)
49+
extends OrphanFilesClean(specifiedTable, specifiedOlderThanMillis, dryRunPara, false)
5150
with SQLConfHelper
5251
with Logging {
5352

0 commit comments

Comments
 (0)