Skip to content

Commit 588d7f2

Browse files
authored
[flink] Introduce dedecate job for orphan files clean (#4181)
1 parent 5564403 commit 588d7f2

File tree

28 files changed

+1109
-764
lines changed

28 files changed

+1109
-764
lines changed

paimon-common/src/main/java/org/apache/paimon/options/Options.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,10 @@ public synchronized void remove(String key) {
153153
data.remove(key);
154154
}
155155

156+
public synchronized void remove(ConfigOption<?> option) {
157+
data.remove(option.key());
158+
}
159+
156160
public synchronized boolean containsKey(String key) {
157161
return data.containsKey(key);
158162
}
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.utils;
20+
21+
import java.io.Serializable;
22+
import java.util.function.Consumer;
23+
24+
/**
25+
* This interface is basically Java's {@link Consumer} interface enhanced with the {@link
26+
* Serializable}.
27+
*
28+
* @param <T> type of the consumed elements.
29+
*/
30+
@FunctionalInterface
31+
public interface SerializableConsumer<T> extends Consumer<T>, Serializable {}
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.utils;
20+
21+
import java.io.Serializable;
22+
import java.util.function.Predicate;
23+
24+
/**
25+
* This interface is basically Java's {@link Predicate} interface enhanced with the {@link
26+
* Serializable}.
27+
*/
28+
@FunctionalInterface
29+
public interface SerializablePredicate<T> extends Predicate<T>, Serializable {}
Lines changed: 239 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,239 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.operation;
20+
21+
import org.apache.paimon.CoreOptions;
22+
import org.apache.paimon.catalog.Catalog;
23+
import org.apache.paimon.catalog.Identifier;
24+
import org.apache.paimon.fs.FileStatus;
25+
import org.apache.paimon.fs.Path;
26+
import org.apache.paimon.manifest.ManifestEntry;
27+
import org.apache.paimon.manifest.ManifestFile;
28+
import org.apache.paimon.table.FileStoreTable;
29+
import org.apache.paimon.table.Table;
30+
import org.apache.paimon.utils.SerializableConsumer;
31+
32+
import javax.annotation.Nullable;
33+
34+
import java.io.IOException;
35+
import java.util.ArrayList;
36+
import java.util.Collections;
37+
import java.util.HashMap;
38+
import java.util.HashSet;
39+
import java.util.Iterator;
40+
import java.util.List;
41+
import java.util.Map;
42+
import java.util.Set;
43+
import java.util.concurrent.ExecutionException;
44+
import java.util.concurrent.ExecutorService;
45+
import java.util.concurrent.Executors;
46+
import java.util.concurrent.Future;
47+
import java.util.concurrent.ThreadPoolExecutor;
48+
import java.util.concurrent.TimeUnit;
49+
import java.util.function.Function;
50+
import java.util.stream.Collectors;
51+
52+
import static org.apache.paimon.utils.Preconditions.checkArgument;
53+
import static org.apache.paimon.utils.ThreadPoolUtils.createCachedThreadPool;
54+
import static org.apache.paimon.utils.ThreadPoolUtils.randomlyExecute;
55+
56+
/** Local {@link OrphanFilesClean}, it will use thread pool to execute deletion. */
57+
public class LocalOrphanFilesClean extends OrphanFilesClean {
58+
59+
private final ThreadPoolExecutor executor;
60+
61+
private static final int SHOW_LIMIT = 200;
62+
63+
private final List<Path> deleteFiles;
64+
65+
public LocalOrphanFilesClean(FileStoreTable table) {
66+
this(table, System.currentTimeMillis() - TimeUnit.DAYS.toMillis(1));
67+
}
68+
69+
public LocalOrphanFilesClean(FileStoreTable table, long olderThanMillis) {
70+
this(table, olderThanMillis, path -> table.fileIO().deleteQuietly(path));
71+
}
72+
73+
public LocalOrphanFilesClean(
74+
FileStoreTable table, long olderThanMillis, SerializableConsumer<Path> fileCleaner) {
75+
super(table, olderThanMillis, fileCleaner);
76+
this.deleteFiles = new ArrayList<>();
77+
this.executor =
78+
createCachedThreadPool(
79+
table.coreOptions().deleteFileThreadNum(), "ORPHAN_FILES_CLEAN");
80+
}
81+
82+
public List<Path> clean() throws IOException, ExecutionException, InterruptedException {
83+
List<String> branches = validBranches();
84+
85+
// specially handle to clear snapshot dir
86+
cleanSnapshotDir(branches, deleteFiles::add);
87+
88+
// delete candidate files
89+
Map<String, Path> candidates = getCandidateDeletingFiles();
90+
91+
// find used files
92+
Set<String> usedFiles =
93+
branches.stream()
94+
.flatMap(branch -> getUsedFiles(branch).stream())
95+
.collect(Collectors.toSet());
96+
97+
// delete unused files
98+
Set<String> deleted = new HashSet<>(candidates.keySet());
99+
deleted.removeAll(usedFiles);
100+
deleted.stream().map(candidates::get).forEach(fileCleaner);
101+
deleteFiles.addAll(deleted.stream().map(candidates::get).collect(Collectors.toList()));
102+
103+
return deleteFiles;
104+
}
105+
106+
private List<String> getUsedFiles(String branch) {
107+
List<String> usedFiles = new ArrayList<>();
108+
ManifestFile manifestFile =
109+
table.switchToBranch(branch).store().manifestFileFactory().create();
110+
try {
111+
List<String> manifests = new ArrayList<>();
112+
collectWithoutDataFile(
113+
branch, usedFiles::add, manifest -> manifests.add(manifest.fileName()));
114+
usedFiles.addAll(retryReadingDataFiles(manifestFile, manifests));
115+
} catch (IOException e) {
116+
throw new RuntimeException(e);
117+
}
118+
return usedFiles;
119+
}
120+
121+
/**
122+
* Get all the candidate deleting files in the specified directories and filter them by
123+
* olderThanMillis.
124+
*/
125+
private Map<String, Path> getCandidateDeletingFiles() {
126+
List<Path> fileDirs = listPaimonFileDirs();
127+
Function<Path, List<Path>> processor =
128+
path ->
129+
tryBestListingDirs(path).stream()
130+
.filter(this::oldEnough)
131+
.map(FileStatus::getPath)
132+
.collect(Collectors.toList());
133+
Iterator<Path> allPaths = randomlyExecute(executor, processor, fileDirs);
134+
Map<String, Path> result = new HashMap<>();
135+
while (allPaths.hasNext()) {
136+
Path next = allPaths.next();
137+
result.put(next.getName(), next);
138+
}
139+
return result;
140+
}
141+
142+
private List<String> retryReadingDataFiles(
143+
ManifestFile manifestFile, List<String> manifestNames) throws IOException {
144+
List<String> dataFiles = new ArrayList<>();
145+
for (String manifestName : manifestNames) {
146+
retryReadingFiles(
147+
() -> manifestFile.readWithIOException(manifestName),
148+
Collections.<ManifestEntry>emptyList())
149+
.stream()
150+
.map(ManifestEntry::file)
151+
.forEach(
152+
f -> {
153+
dataFiles.add(f.fileName());
154+
dataFiles.addAll(f.extraFiles());
155+
});
156+
}
157+
return dataFiles;
158+
}
159+
160+
public static List<String> showDeletedFiles(List<Path> deleteFiles, int showLimit) {
161+
int showSize = Math.min(deleteFiles.size(), showLimit);
162+
List<String> result = new ArrayList<>();
163+
if (deleteFiles.size() > showSize) {
164+
result.add(
165+
String.format(
166+
"Total %s files, only %s lines are displayed.",
167+
deleteFiles.size(), showSize));
168+
}
169+
for (int i = 0; i < showSize; i++) {
170+
result.add(deleteFiles.get(i).toUri().getPath());
171+
}
172+
return result;
173+
}
174+
175+
public static List<LocalOrphanFilesClean> createOrphanFilesCleans(
176+
Catalog catalog,
177+
String databaseName,
178+
@Nullable String tableName,
179+
long olderThanMillis,
180+
SerializableConsumer<Path> fileCleaner,
181+
@Nullable Integer parallelism)
182+
throws Catalog.DatabaseNotExistException, Catalog.TableNotExistException {
183+
List<LocalOrphanFilesClean> orphanFilesCleans = new ArrayList<>();
184+
List<String> tableNames = Collections.singletonList(tableName);
185+
if (tableName == null || "*".equals(tableName)) {
186+
tableNames = catalog.listTables(databaseName);
187+
}
188+
189+
Map<String, String> dynamicOptions =
190+
parallelism == null
191+
? Collections.emptyMap()
192+
: new HashMap<String, String>() {
193+
{
194+
put(
195+
CoreOptions.DELETE_FILE_THREAD_NUM.key(),
196+
parallelism.toString());
197+
}
198+
};
199+
200+
for (String t : tableNames) {
201+
Identifier identifier = new Identifier(databaseName, t);
202+
Table table = catalog.getTable(identifier).copy(dynamicOptions);
203+
checkArgument(
204+
table instanceof FileStoreTable,
205+
"Only FileStoreTable supports remove-orphan-files action. The table type is '%s'.",
206+
table.getClass().getName());
207+
208+
orphanFilesCleans.add(
209+
new LocalOrphanFilesClean(
210+
(FileStoreTable) table, olderThanMillis, fileCleaner));
211+
}
212+
213+
return orphanFilesCleans;
214+
}
215+
216+
public static String[] executeOrphanFilesClean(List<LocalOrphanFilesClean> tableCleans) {
217+
ExecutorService executorService =
218+
Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
219+
List<Future<List<Path>>> tasks = new ArrayList<>();
220+
for (LocalOrphanFilesClean clean : tableCleans) {
221+
tasks.add(executorService.submit(clean::clean));
222+
}
223+
224+
List<Path> cleanOrphanFiles = new ArrayList<>();
225+
for (Future<List<Path>> task : tasks) {
226+
try {
227+
cleanOrphanFiles.addAll(task.get());
228+
} catch (InterruptedException e) {
229+
Thread.currentThread().interrupt();
230+
throw new RuntimeException(e);
231+
} catch (ExecutionException e) {
232+
throw new RuntimeException(e);
233+
}
234+
}
235+
236+
executorService.shutdownNow();
237+
return showDeletedFiles(cleanOrphanFiles, SHOW_LIMIT).toArray(new String[0]);
238+
}
239+
}

0 commit comments

Comments
 (0)