Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink;
import org.apache.flink.streaming.api.operators.InputSelection;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.util.CloseableIterator;
Expand All @@ -64,7 +65,6 @@
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.stream.Collectors;

import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.STRING_TYPE_INFO;
import static org.apache.flink.util.Preconditions.checkState;
Expand Down Expand Up @@ -109,6 +109,7 @@ public DataStream<CleanOrphanFilesResult> doOrphanClean(StreamExecutionEnvironme
// here, and subsequently, we will not count their orphan files.
DataStream<CleanOrphanFilesResult> branchSnapshotDirDeleted =
env.fromCollection(branches)
.name("branch-source")
.process(
new ProcessFunction<String, Tuple2<Long, Long>>() {
@Override
Expand All @@ -128,6 +129,7 @@ public void processElement(
deletedFilesLenInBytes.get()));
}
})
.name("clean-branch-snapshot")
.keyBy(tuple -> 1)
.reduce(
(ReduceFunction<Tuple2<Long, Long>>)
Expand All @@ -136,14 +138,17 @@ public void processElement(
value1.f0 + value2.f0,
value1.f1 + value2.f1))
.setParallelism(1)
.map(tuple -> new CleanOrphanFilesResult(tuple.f0, tuple.f1));
.name("aggregate-branch-snapshot-deletion")
.map(tuple -> new CleanOrphanFilesResult(tuple.f0, tuple.f1))
.name("branch-snapshot-deletion-result");

// branch and manifest file
final OutputTag<Tuple2<String, String>> manifestOutputTag =
new OutputTag<Tuple2<String, String>>("manifest-output") {};

SingleOutputStreamOperator<String> usedManifestFiles =
env.fromCollection(branches)
.name("branch-source")
.process(
new ProcessFunction<String, Tuple2<String, String>>() {
@Override
Expand All @@ -158,6 +163,7 @@ public void processElement(
}
}
})
.name("collect-snapshots")
.rebalance()
.process(
new ProcessFunction<Tuple2<String, String>, String>() {
Expand All @@ -180,14 +186,15 @@ public void processElement(
collectWithoutDataFile(
branch, snapshot, out::collect, manifestConsumer);
}
});
})
.name("collect-manifests");

DataStream<String> usedFiles =
usedManifestFiles
.getSideOutput(manifestOutputTag)
.keyBy(tuple2 -> tuple2.f0 + ":" + tuple2.f1)
.transform(
"datafile-reader",
"collect-used-files",
STRING_TYPE_INFO,
new BoundedOneInputOperator<Tuple2<String, String>, String>() {

Expand Down Expand Up @@ -235,28 +242,109 @@ public void endInput() throws IOException {
});

usedFiles = usedFiles.union(usedManifestFiles);
DataStream<Tuple2<String, Long>> candidates =

final OutputTag<Path> emptyDirOutputTag = new OutputTag<Path>("empty-dir-output") {};
SingleOutputStreamOperator<Tuple2<String, Long>> candidates =
env.fromCollection(Collections.singletonList(1), TypeInformation.of(Integer.class))
.process(
new ProcessFunction<Integer, Tuple2<String, Long>>() {
new ProcessFunction<Integer, String>() {
@Override
public void processElement(
Integer i,
ProcessFunction<Integer, Tuple2<String, Long>>.Context
ProcessFunction<Integer, String>.Context ctx,
Collector<String> out) {
FileStorePathFactory pathFactory =
table.store().pathFactory();
listPaimonFileDirs(
table.fullName(),
pathFactory.manifestPath().toString(),
pathFactory.indexPath().toString(),
pathFactory.statisticsPath().toString(),
pathFactory.dataFilePath().toString(),
partitionKeysNum,
table.coreOptions().dataFileExternalPaths())
.stream()
.map(Path::toUri)
.map(Object::toString)
.forEach(out::collect);
}
})
.name("list-dirs")
.forceNonParallel()
.process(
new ProcessFunction<String, Tuple2<String, Long>>() {
@Override
public void processElement(
String dir,
ProcessFunction<String, Tuple2<String, Long>>.Context
ctx,
Collector<Tuple2<String, Long>> out) {
listPaimonFilesForTable(out);
Path dirPath = new Path(dir);
List<FileStatus> files = tryBestListingDirs(dirPath);
for (FileStatus file : files) {
if (oldEnough(file)) {
out.collect(
Tuple2.of(
file.getPath().toUri().toString(),
file.getLen()));
}
}
if (files.isEmpty()) {
ctx.output(emptyDirOutputTag, dirPath);
}
}
})
.setParallelism(1);
.name("collect-candidate-files");

candidates
.getSideOutput(emptyDirOutputTag)
.transform(
"clean-empty-dirs",
STRING_TYPE_INFO,
new BoundedOneInputOperator<Path, String>() {

private Set<Path> emptyDirs = new HashSet<>();

@Override
public void processElement(StreamRecord<Path> element) {
emptyDirs.add(element.getValue());
}

@Override
public void endInput() throws IOException {
// delete empty dir
while (!emptyDirs.isEmpty()) {
Set<Path> newEmptyDir = new HashSet<>();
for (Path emptyDir : emptyDirs) {
try {
if (fileIO.delete(emptyDir, false)) {
LOG.info("Clean empty dir: {}", emptyDir);
output.collect(
new StreamRecord<>(emptyDir.toString()));
// recursive cleaning
newEmptyDir.add(emptyDir.getParent());
}
} catch (IOException ignored) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can add a log here

LOG.warn("Clean empty dir failed: {}", emptyDir);
}
}
emptyDirs = newEmptyDir;
}
}
})
.forceNonParallel()
.sinkTo(new DiscardingSink<>())
.name("end")
.setParallelism(1)
.setMaxParallelism(1);

DataStream<CleanOrphanFilesResult> deleted =
usedFiles
.keyBy(f -> f)
.connect(
candidates.keyBy(pathAndSize -> new Path(pathAndSize.f0).getName()))
.transform(
"files_join",
"join-used-and-candidate-files",
TypeInformation.of(CleanOrphanFilesResult.class),
new BoundedTwoInputOperator<
String, Tuple2<String, Long>, CleanOrphanFilesResult>() {
Expand Down Expand Up @@ -323,50 +411,6 @@ public void processElement2(
return deleted;
}

private void listPaimonFilesForTable(Collector<Tuple2<String, Long>> out) {
FileStorePathFactory pathFactory = table.store().pathFactory();
List<String> dirs =
listPaimonFileDirs(
table.fullName(),
pathFactory.manifestPath().toString(),
pathFactory.indexPath().toString(),
pathFactory.statisticsPath().toString(),
pathFactory.dataFilePath().toString(),
partitionKeysNum,
table.coreOptions().dataFileExternalPaths())
.stream()
.map(Path::toUri)
.map(Object::toString)
.collect(Collectors.toList());
Set<Path> emptyDirs = new HashSet<>();
for (String dir : dirs) {
Path dirPath = new Path(dir);
List<FileStatus> files = tryBestListingDirs(dirPath);
for (FileStatus file : files) {
if (oldEnough(file)) {
out.collect(new Tuple2<>(file.getPath().toUri().toString(), file.getLen()));
}
}
if (files.isEmpty()) {
emptyDirs.add(dirPath);
}
}

// delete empty dir
while (!emptyDirs.isEmpty()) {
Set<Path> newEmptyDir = new HashSet<>();
for (Path emptyDir : emptyDirs) {
try {
fileIO.delete(emptyDir, false);
// recursive cleaning
newEmptyDir.add(emptyDir.getParent());
} catch (IOException ignored) {
}
}
emptyDirs = newEmptyDir;
}
}

public static CleanOrphanFilesResult executeDatabaseOrphanFiles(
StreamExecutionEnvironment env,
Catalog catalog,
Expand Down
Loading