diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/orphan/FlinkOrphanFilesClean.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/orphan/FlinkOrphanFilesClean.java index a98282f45d02..bbf1c2170e36 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/orphan/FlinkOrphanFilesClean.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/orphan/FlinkOrphanFilesClean.java @@ -44,6 +44,7 @@ import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.ProcessFunction; +import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink; import org.apache.flink.streaming.api.operators.InputSelection; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.util.CloseableIterator; @@ -64,7 +65,6 @@ import java.util.Set; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Consumer; -import java.util.stream.Collectors; import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.STRING_TYPE_INFO; import static org.apache.flink.util.Preconditions.checkState; @@ -109,6 +109,7 @@ public DataStream doOrphanClean(StreamExecutionEnvironme // here, and subsequently, we will not count their orphan files. DataStream branchSnapshotDirDeleted = env.fromCollection(branches) + .name("branch-source") .process( new ProcessFunction>() { @Override @@ -128,6 +129,7 @@ public void processElement( deletedFilesLenInBytes.get())); } }) + .name("clean-branch-snapshot") .keyBy(tuple -> 1) .reduce( (ReduceFunction>) @@ -136,7 +138,9 @@ public void processElement( value1.f0 + value2.f0, value1.f1 + value2.f1)) .setParallelism(1) - .map(tuple -> new CleanOrphanFilesResult(tuple.f0, tuple.f1)); + .name("aggregate-branch-snapshot-deletion") + .map(tuple -> new CleanOrphanFilesResult(tuple.f0, tuple.f1)) + .name("branch-snapshot-deletion-result"); // branch and manifest file final OutputTag> manifestOutputTag = @@ -144,6 +148,7 @@ public void processElement( SingleOutputStreamOperator usedManifestFiles = env.fromCollection(branches) + .name("branch-source") .process( new ProcessFunction>() { @Override @@ -158,6 +163,7 @@ public void processElement( } } }) + .name("collect-snapshots") .rebalance() .process( new ProcessFunction, String>() { @@ -180,14 +186,15 @@ public void processElement( collectWithoutDataFile( branch, snapshot, out::collect, manifestConsumer); } - }); + }) + .name("collect-manifests"); DataStream usedFiles = usedManifestFiles .getSideOutput(manifestOutputTag) .keyBy(tuple2 -> tuple2.f0 + ":" + tuple2.f1) .transform( - "datafile-reader", + "collect-used-files", STRING_TYPE_INFO, new BoundedOneInputOperator, String>() { @@ -235,20 +242,101 @@ public void endInput() throws IOException { }); usedFiles = usedFiles.union(usedManifestFiles); - DataStream> candidates = + + final OutputTag emptyDirOutputTag = new OutputTag("empty-dir-output") {}; + SingleOutputStreamOperator> candidates = env.fromCollection(Collections.singletonList(1), TypeInformation.of(Integer.class)) .process( - new ProcessFunction>() { + new ProcessFunction() { @Override public void processElement( Integer i, - ProcessFunction>.Context + ProcessFunction.Context ctx, + Collector out) { + FileStorePathFactory pathFactory = + table.store().pathFactory(); + listPaimonFileDirs( + table.fullName(), + pathFactory.manifestPath().toString(), + pathFactory.indexPath().toString(), + pathFactory.statisticsPath().toString(), + pathFactory.dataFilePath().toString(), + partitionKeysNum, + table.coreOptions().dataFileExternalPaths()) + .stream() + .map(Path::toUri) + .map(Object::toString) + .forEach(out::collect); + } + }) + .name("list-dirs") + .forceNonParallel() + .process( + new ProcessFunction>() { + @Override + public void processElement( + String dir, + ProcessFunction>.Context ctx, Collector> out) { - listPaimonFilesForTable(out); + Path dirPath = new Path(dir); + List files = tryBestListingDirs(dirPath); + for (FileStatus file : files) { + if (oldEnough(file)) { + out.collect( + Tuple2.of( + file.getPath().toUri().toString(), + file.getLen())); + } + } + if (files.isEmpty()) { + ctx.output(emptyDirOutputTag, dirPath); + } } }) - .setParallelism(1); + .name("collect-candidate-files"); + + candidates + .getSideOutput(emptyDirOutputTag) + .transform( + "clean-empty-dirs", + STRING_TYPE_INFO, + new BoundedOneInputOperator() { + + private Set emptyDirs = new HashSet<>(); + + @Override + public void processElement(StreamRecord element) { + emptyDirs.add(element.getValue()); + } + + @Override + public void endInput() throws IOException { + // delete empty dir + while (!emptyDirs.isEmpty()) { + Set newEmptyDir = new HashSet<>(); + for (Path emptyDir : emptyDirs) { + try { + if (fileIO.delete(emptyDir, false)) { + LOG.info("Clean empty dir: {}", emptyDir); + output.collect( + new StreamRecord<>(emptyDir.toString())); + // recursive cleaning + newEmptyDir.add(emptyDir.getParent()); + } + } catch (IOException ignored) { + LOG.warn("Clean empty dir failed: {}", emptyDir); + } + } + emptyDirs = newEmptyDir; + } + } + }) + .forceNonParallel() + .sinkTo(new DiscardingSink<>()) + .name("end") + .setParallelism(1) + .setMaxParallelism(1); DataStream deleted = usedFiles @@ -256,7 +344,7 @@ public void processElement( .connect( candidates.keyBy(pathAndSize -> new Path(pathAndSize.f0).getName())) .transform( - "files_join", + "join-used-and-candidate-files", TypeInformation.of(CleanOrphanFilesResult.class), new BoundedTwoInputOperator< String, Tuple2, CleanOrphanFilesResult>() { @@ -323,50 +411,6 @@ public void processElement2( return deleted; } - private void listPaimonFilesForTable(Collector> out) { - FileStorePathFactory pathFactory = table.store().pathFactory(); - List dirs = - listPaimonFileDirs( - table.fullName(), - pathFactory.manifestPath().toString(), - pathFactory.indexPath().toString(), - pathFactory.statisticsPath().toString(), - pathFactory.dataFilePath().toString(), - partitionKeysNum, - table.coreOptions().dataFileExternalPaths()) - .stream() - .map(Path::toUri) - .map(Object::toString) - .collect(Collectors.toList()); - Set emptyDirs = new HashSet<>(); - for (String dir : dirs) { - Path dirPath = new Path(dir); - List files = tryBestListingDirs(dirPath); - for (FileStatus file : files) { - if (oldEnough(file)) { - out.collect(new Tuple2<>(file.getPath().toUri().toString(), file.getLen())); - } - } - if (files.isEmpty()) { - emptyDirs.add(dirPath); - } - } - - // delete empty dir - while (!emptyDirs.isEmpty()) { - Set newEmptyDir = new HashSet<>(); - for (Path emptyDir : emptyDirs) { - try { - fileIO.delete(emptyDir, false); - // recursive cleaning - newEmptyDir.add(emptyDir.getParent()); - } catch (IOException ignored) { - } - } - emptyDirs = newEmptyDir; - } - } - public static CleanOrphanFilesResult executeDatabaseOrphanFiles( StreamExecutionEnvironment env, Catalog catalog,