Skip to content

Commit bb453c2

Browse files
authored
[core] add parallelism arguement for remove orphan files (#4044)
1 parent cb97fc8 commit bb453c2

File tree

10 files changed

+111
-20
lines changed

10 files changed

+111
-20
lines changed

docs/content/flink/procedures.md

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -214,7 +214,8 @@ All available procedures are listed below.
214214
<td>
215215
CALL [catalog.]sys.remove_orphan_files('identifier')<br/><br/>
216216
CALL [catalog.]sys.remove_orphan_files('identifier', 'olderThan')<br/><br/>
217-
CALL [catalog.]sys.remove_orphan_files('identifier', 'olderThan', 'dryRun')
217+
CALL [catalog.]sys.remove_orphan_files('identifier', 'olderThan', 'dryRun')<br/><br/>
218+
CALL [catalog.]sys.remove_orphan_files('identifier', 'olderThan', 'dryRun','parallelism')
218219
</td>
219220
<td>
220221
To remove the orphan data files and metadata files. Arguments:
@@ -223,10 +224,12 @@ All available procedures are listed below.
223224
deletes orphan files older than 1 day by default. This argument can modify the interval.
224225
</li>
225226
<li>dryRun: when true, view only orphan files, don't actually remove files. Default is false.</li>
227+
<li>parallelism: The maximum number of concurrent deleting files. By default is the number of processors available to the Java virtual machine.</li>
226228
</td>
227229
<td>CALL remove_orphan_files('default.T', '2023-10-31 12:00:00')<br/><br/>
228230
CALL remove_orphan_files('default.*', '2023-10-31 12:00:00')<br/><br/>
229-
CALL remove_orphan_files('default.T', '2023-10-31 12:00:00', true)
231+
CALL remove_orphan_files('default.T', '2023-10-31 12:00:00', true)<br/><br/>
232+
CALL remove_orphan_files('default.T', '2023-10-31 12:00:00', false, '5')
230233
</td>
231234
</tr>
232235
<tr>

docs/content/maintenance/manage-snapshots.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -314,7 +314,8 @@ CALL sys.remove_orphan_files(table => "my_db.*", [older_than => "2023-10-31 12:0
314314
--database <database-name> \
315315
--table <table-name> \
316316
[--older_than <timestamp>] \
317-
[--dry_run <false/true>]
317+
[--dry_run <false/true>] \
318+
[--parallelism <parallelism>]
318319
```
319320
320321
To avoid deleting files that are newly added by other writing jobs, this action only deletes orphan files older than

docs/content/spark/procedures.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -161,11 +161,13 @@ This section introduce all available spark procedures about paimon.
161161
<li>table: the target table identifier. Cannot be empty, you can use database_name.* to clean whole database.</li>
162162
<li>older_than: to avoid deleting newly written files, this procedure only deletes orphan files older than 1 day by default. This argument can modify the interval.</li>
163163
<li>dry_run: when true, view only orphan files, don't actually remove files. Default is false.</li>
164+
<li>parallelism: The maximum number of concurrent deleting files. By default is the number of processors available to the Java virtual machine.</li>
164165
</td>
165166
<td>
166167
CALL sys.remove_orphan_files(table => 'default.T', older_than => '2023-10-31 12:00:00')<br/><br/>
167168
CALL sys.remove_orphan_files(table => 'default.*', older_than => '2023-10-31 12:00:00')<br/><br/>
168-
CALL sys.remove_orphan_files(table => 'default.T', older_than => '2023-10-31 12:00:00', dry_run => true)
169+
CALL sys.remove_orphan_files(table => 'default.T', older_than => '2023-10-31 12:00:00', dry_run => true)<br/><br/>
170+
CALL sys.remove_orphan_files(table => 'default.T', older_than => '2023-10-31 12:00:00', dry_run => true, parallelism => '5')
169171
</td>
170172
</tr>
171173
<tr>

paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -95,9 +95,7 @@ public class OrphanFilesClean {
9595

9696
private static final Logger LOG = LoggerFactory.getLogger(OrphanFilesClean.class);
9797

98-
private static final ThreadPoolExecutor EXECUTOR =
99-
createCachedThreadPool(
100-
Runtime.getRuntime().availableProcessors(), "ORPHAN_FILES_CLEAN");
98+
private final ThreadPoolExecutor executor;
10199

102100
private static final int READ_FILE_RETRY_NUM = 3;
103101
private static final int READ_FILE_RETRY_INTERVAL = 5;
@@ -130,6 +128,9 @@ public OrphanFilesClean(FileStoreTable table) {
130128
} catch (IOException ignored) {
131129
}
132130
};
131+
this.executor =
132+
createCachedThreadPool(
133+
table.coreOptions().deleteFileThreadNum(), "ORPHAN_FILES_CLEAN");
133134
}
134135

135136
public OrphanFilesClean olderThan(String timestamp) {
@@ -204,7 +205,7 @@ private Set<String> getUsedFiles(FileStoreTable branchTable) throws IOException
204205

205206
return Sets.newHashSet(
206207
randomlyExecute(
207-
EXECUTOR, snapshot -> getUsedFiles(branchTable, snapshot), readSnapshots));
208+
executor, snapshot -> getUsedFiles(branchTable, snapshot), readSnapshots));
208209
}
209210

210211
private List<String> getUsedFiles(FileStoreTable branchTable, Snapshot snapshot) {
@@ -234,7 +235,7 @@ private Map<String, Path> getCandidateDeletingFiles() {
234235
.filter(this::oldEnough)
235236
.map(FileStatus::getPath)
236237
.collect(Collectors.toList());
237-
Iterator<Path> allPaths = randomlyExecute(EXECUTOR, processor, fileDirs);
238+
Iterator<Path> allPaths = randomlyExecute(executor, processor, fileDirs);
238239
Map<String, Path> result = new HashMap<>();
239240
while (allPaths.hasNext()) {
240241
Path next = allPaths.next();
@@ -525,7 +526,7 @@ private List<Path> listAndCleanDataDirs(Path dir, int level) {
525526

526527
// dive into the next partition level
527528
return Lists.newArrayList(
528-
randomlyExecute(EXECUTOR, p -> listAndCleanDataDirs(p, level - 1), partitionPaths));
529+
randomlyExecute(executor, p -> listAndCleanDataDirs(p, level - 1), partitionPaths));
529530
}
530531

531532
private List<Path> filterAndCleanDataDirs(
@@ -581,7 +582,10 @@ public static List<String> showDeletedFiles(List<Path> deleteFiles, int showLimi
581582
}
582583

583584
public static List<OrphanFilesClean> createOrphanFilesCleans(
584-
Catalog catalog, String databaseName, @Nullable String tableName)
585+
Catalog catalog,
586+
Map<String, String> tableConfig,
587+
String databaseName,
588+
@Nullable String tableName)
585589
throws Catalog.DatabaseNotExistException, Catalog.TableNotExistException {
586590
List<OrphanFilesClean> orphanFilesCleans = new ArrayList<>();
587591
List<String> tableNames = Collections.singletonList(tableName);
@@ -591,7 +595,7 @@ public static List<OrphanFilesClean> createOrphanFilesCleans(
591595

592596
for (String t : tableNames) {
593597
Identifier identifier = new Identifier(databaseName, t);
594-
Table table = catalog.getTable(identifier);
598+
Table table = catalog.getTable(identifier).copy(tableConfig);
595599
checkArgument(
596600
table instanceof FileStoreTable,
597601
"Only FileStoreTable supports remove-orphan-files action. The table type is '%s'.",

paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/RemoveOrphanFilesProcedure.java

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,17 @@
1818

1919
package org.apache.paimon.flink.procedure;
2020

21+
import org.apache.paimon.CoreOptions;
2122
import org.apache.paimon.catalog.Identifier;
2223
import org.apache.paimon.operation.OrphanFilesClean;
2324
import org.apache.paimon.utils.StringUtils;
2425

2526
import org.apache.flink.table.procedure.ProcedureContext;
2627

28+
import java.util.Collections;
29+
import java.util.HashMap;
2730
import java.util.List;
31+
import java.util.Map;
2832

2933
import static org.apache.paimon.operation.OrphanFilesClean.executeOrphanFilesClean;
3034

@@ -58,12 +62,32 @@ public String[] call(ProcedureContext procedureContext, String tableId, String o
5862
public String[] call(
5963
ProcedureContext procedureContext, String tableId, String olderThan, boolean dryRun)
6064
throws Exception {
65+
return call(procedureContext, tableId, olderThan, dryRun, "");
66+
}
67+
68+
public String[] call(
69+
ProcedureContext procedureContext,
70+
String tableId,
71+
String olderThan,
72+
boolean dryRun,
73+
String parallelism)
74+
throws Exception {
6175
Identifier identifier = Identifier.fromString(tableId);
6276
String databaseName = identifier.getDatabaseName();
6377
String tableName = identifier.getObjectName();
6478

79+
Map<String, String> dynamicOptions =
80+
!StringUtils.isNullOrWhitespaceOnly(parallelism)
81+
? Collections.emptyMap()
82+
: new HashMap<String, String>() {
83+
{
84+
put(CoreOptions.DELETE_FILE_THREAD_NUM.key(), parallelism);
85+
}
86+
};
87+
6588
List<OrphanFilesClean> tableCleans =
66-
OrphanFilesClean.createOrphanFilesCleans(catalog, databaseName, tableName);
89+
OrphanFilesClean.createOrphanFilesCleans(
90+
catalog, dynamicOptions, databaseName, tableName);
6791

6892
if (!StringUtils.isNullOrWhitespaceOnly(olderThan)) {
6993
tableCleans.forEach(clean -> clean.olderThan(olderThan));

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RemoveOrphanFilesAction.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,11 +37,13 @@ public RemoveOrphanFilesAction(
3737
String warehouse,
3838
String databaseName,
3939
@Nullable String tableName,
40-
Map<String, String> catalogConfig)
40+
Map<String, String> catalogConfig,
41+
Map<String, String> dynamicOptions)
4142
throws Catalog.TableNotExistException, Catalog.DatabaseNotExistException {
4243
super(warehouse, catalogConfig);
4344
this.tableCleans =
44-
OrphanFilesClean.createOrphanFilesCleans(catalog, databaseName, tableName);
45+
OrphanFilesClean.createOrphanFilesCleans(
46+
catalog, dynamicOptions, databaseName, tableName);
4547
}
4648

4749
public void olderThan(String olderThan) {

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RemoveOrphanFilesActionFactory.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,9 @@
1818

1919
package org.apache.paimon.flink.action;
2020

21+
import org.apache.paimon.CoreOptions;
22+
23+
import java.util.HashMap;
2124
import java.util.Map;
2225
import java.util.Optional;
2326

@@ -29,6 +32,7 @@ public class RemoveOrphanFilesActionFactory implements ActionFactory {
2932
public static final String IDENTIFIER = "remove_orphan_files";
3033
private static final String OLDER_THAN = "older_than";
3134
private static final String DRY_RUN = "dry_run";
35+
private static final String PARALLELISM = "parallelism";
3236

3337
@Override
3438
public String identifier() {
@@ -44,10 +48,16 @@ public Optional<Action> create(MultipleParameterToolAdapter params) {
4448
String table = params.get(TABLE);
4549

4650
Map<String, String> catalogConfig = optionalConfigMap(params, CATALOG_CONF);
51+
Map<String, String> dynamicOptions = new HashMap<>();
52+
if (params.has(PARALLELISM)) {
53+
dynamicOptions.put(CoreOptions.DELETE_FILE_THREAD_NUM.key(), params.get(PARALLELISM));
54+
}
4755

4856
RemoveOrphanFilesAction action;
4957
try {
50-
action = new RemoveOrphanFilesAction(warehouse, database, table, catalogConfig);
58+
action =
59+
new RemoveOrphanFilesAction(
60+
warehouse, database, table, catalogConfig, dynamicOptions);
5161
} catch (Exception e) {
5262
throw new RuntimeException(e);
5363
}

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RemoveOrphanFilesProcedure.java

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
package org.apache.paimon.flink.procedure;
2020

21+
import org.apache.paimon.CoreOptions;
2122
import org.apache.paimon.catalog.Identifier;
2223
import org.apache.paimon.operation.OrphanFilesClean;
2324
import org.apache.paimon.utils.StringUtils;
@@ -27,7 +28,10 @@
2728
import org.apache.flink.table.annotation.ProcedureHint;
2829
import org.apache.flink.table.procedure.ProcedureContext;
2930

31+
import java.util.Collections;
32+
import java.util.HashMap;
3033
import java.util.List;
34+
import java.util.Map;
3135

3236
import static org.apache.paimon.operation.OrphanFilesClean.executeOrphanFilesClean;
3337

@@ -56,13 +60,18 @@ public class RemoveOrphanFilesProcedure extends ProcedureBase {
5660
name = "older_than",
5761
type = @DataTypeHint("STRING"),
5862
isOptional = true),
59-
@ArgumentHint(name = "dry_run", type = @DataTypeHint("BOOLEAN"), isOptional = true)
63+
@ArgumentHint(name = "dry_run", type = @DataTypeHint("BOOLEAN"), isOptional = true),
64+
@ArgumentHint(
65+
name = "parallelism",
66+
type = @DataTypeHint("STRING"),
67+
isOptional = true)
6068
})
6169
public String[] call(
6270
ProcedureContext procedureContext,
6371
String tableId,
6472
String nullableOlderThan,
65-
Boolean dryRun)
73+
Boolean dryRun,
74+
String parallelism)
6675
throws Exception {
6776
final String olderThan = notnull(nullableOlderThan);
6877
if (dryRun == null) {
@@ -73,8 +82,18 @@ public String[] call(
7382
String databaseName = identifier.getDatabaseName();
7483
String tableName = identifier.getObjectName();
7584

85+
Map<String, String> dynamicOptions =
86+
!StringUtils.isNullOrWhitespaceOnly(parallelism)
87+
? Collections.emptyMap()
88+
: new HashMap<String, String>() {
89+
{
90+
put(CoreOptions.DELETE_FILE_THREAD_NUM.key(), parallelism);
91+
}
92+
};
93+
7694
List<OrphanFilesClean> tableCleans =
77-
OrphanFilesClean.createOrphanFilesCleans(catalog, databaseName, tableName);
95+
OrphanFilesClean.createOrphanFilesCleans(
96+
catalog, dynamicOptions, databaseName, tableName);
7897

7998
if (!StringUtils.isNullOrWhitespaceOnly(olderThan)) {
8099
tableCleans.forEach(clean -> clean.olderThan(olderThan));

paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RemoveOrphanFilesActionITCase.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,11 @@ public void testRemoveDatabaseOrphanFilesITCase(boolean isNamedArgument) throws
185185
RemoveOrphanFilesAction action2 = createAction(RemoveOrphanFilesAction.class, args);
186186
assertThatCode(action2::run).doesNotThrowAnyException();
187187

188+
args.add("--parallelism");
189+
args.add("5");
190+
RemoveOrphanFilesAction action3 = createAction(RemoveOrphanFilesAction.class, args);
191+
assertThatCode(action3::run).doesNotThrowAnyException();
192+
188193
String withoutOlderThan =
189194
String.format(
190195
isNamedArgument
@@ -195,6 +200,11 @@ public void testRemoveDatabaseOrphanFilesITCase(boolean isNamedArgument) throws
195200
CloseableIterator<Row> withoutOlderThanCollect = callProcedure(withoutOlderThan);
196201
assertThat(ImmutableList.copyOf(withoutOlderThanCollect).size()).isEqualTo(0);
197202

203+
String withParallelism =
204+
String.format("CALL sys.remove_orphan_files('%s.%s','',true,'5')", database, "*");
205+
CloseableIterator<Row> withParallelismCollect = callProcedure(withParallelism);
206+
assertThat(ImmutableList.copyOf(withParallelismCollect).size()).isEqualTo(0);
207+
198208
String withDryRun =
199209
String.format(
200210
isNamedArgument

paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RemoveOrphanFilesProcedure.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
package org.apache.paimon.spark.procedure;
2020

21+
import org.apache.paimon.CoreOptions;
2122
import org.apache.paimon.operation.OrphanFilesClean;
2223
import org.apache.paimon.spark.catalog.WithPaimonCatalog;
2324
import org.apache.paimon.utils.Preconditions;
@@ -34,7 +35,10 @@
3435

3536
import java.util.ArrayList;
3637
import java.util.Arrays;
38+
import java.util.Collections;
39+
import java.util.HashMap;
3740
import java.util.List;
41+
import java.util.Map;
3842

3943
import static org.apache.paimon.operation.OrphanFilesClean.executeOrphanFilesClean;
4044
import static org.apache.spark.sql.types.DataTypes.BooleanType;
@@ -58,7 +62,8 @@ public class RemoveOrphanFilesProcedure extends BaseProcedure {
5862
new ProcedureParameter[] {
5963
ProcedureParameter.required("table", StringType),
6064
ProcedureParameter.optional("older_than", StringType),
61-
ProcedureParameter.optional("dry_run", BooleanType)
65+
ProcedureParameter.optional("dry_run", BooleanType),
66+
ProcedureParameter.optional("parallelism", StringType)
6267
};
6368

6469
private static final StructType OUTPUT_TYPE =
@@ -85,6 +90,16 @@ public StructType outputType() {
8590
public InternalRow[] call(InternalRow args) {
8691
org.apache.paimon.catalog.Identifier identifier;
8792
String tableId = args.getString(0);
93+
String parallelism = args.isNullAt(3) ? null : args.getString(3);
94+
Map<String, String> dynamicOptions =
95+
!StringUtils.isNullOrWhitespaceOnly(parallelism)
96+
? Collections.emptyMap()
97+
: new HashMap<String, String>() {
98+
{
99+
put(CoreOptions.DELETE_FILE_THREAD_NUM.key(), parallelism);
100+
}
101+
};
102+
88103
Preconditions.checkArgument(
89104
tableId != null && !tableId.isEmpty(),
90105
"Cannot handle an empty tableId for argument %s",
@@ -104,6 +119,7 @@ public InternalRow[] call(InternalRow args) {
104119
tableCleans =
105120
OrphanFilesClean.createOrphanFilesCleans(
106121
((WithPaimonCatalog) tableCatalog()).paimonCatalog(),
122+
dynamicOptions,
107123
identifier.getDatabaseName(),
108124
identifier.getObjectName());
109125
} catch (Exception e) {

0 commit comments

Comments
 (0)