Skip to content

Commit 457f71c

Browse files
LinMingQiangdanzhewuju
authored andcommitted
[core] Add dry_run parameters to purge_files Procedure and display the list of deleted directories (apache#5342)
1 parent 12e3d24 commit 457f71c

File tree

7 files changed

+145
-28
lines changed

7 files changed

+145
-28
lines changed

docs/content/flink/procedures.md

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -488,20 +488,27 @@ All available procedures are listed below.
488488
<td>
489489
-- for Flink 1.18<br/>
490490
-- clear table with purge files directly.<br/>
491-
CALL [catalog.]sys.purge_files('identifier')<br/><br/>
491+
CALL [catalog.]sys.purge_files('identifier')<br/>
492+
-- only check what dirs will be deleted, but not really delete them.<br/>
493+
CALL [catalog.]sys.purge_files('identifier', true)<br/><br/>
492494
-- for Flink 1.19 and later<br/>
493495
-- clear table with purge files directly.<br/>
494-
CALL [catalog.]sys.purge_files(`table` => 'default.T')<br/><br/>
496+
CALL [catalog.]sys.purge_files(`table` => 'default.T')<br/>
497+
-- only check what dirs will be deleted, but not really delete them.<br/>
498+
CALL [catalog.]sys.purge_files(`table` => 'default.T', `dry_run` => true)<br/><br/>
495499
</td>
496500
<td>
497501
To clear table with purge files directly. Argument:
498502
<li>table: the target table identifier. Cannot be empty.</li>
503+
<li>dry_run (optional): only check what dirs will be deleted, but not really delete them. Default is false.</li>
499504
</td>
500505
<td>
501506
-- for Flink 1.18<br/>
502-
CALL sys.purge_files('default.T')<br/><br/>
507+
CALL sys.purge_files('default.T')<br/>
508+
CALL sys.purge_files('default.T', true)<br/><br/>
503509
-- for Flink 1.19 and later<br/>
504-
CALL sys.purge_files(`table` => 'default.T')
510+
CALL sys.purge_files(`table` => 'default.T')<br/>
511+
CALL sys.purge_files(`table` => 'default.T', `dry_run` => true)
505512
</td>
506513
</tr>
507514
<tr>

docs/content/spark/procedures.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -200,9 +200,11 @@ This section introduce all available spark procedures about paimon.
200200
<td>
201201
To clear table with purge files directly. Argument:
202202
<li>table: the target table identifier. Cannot be empty.</li>
203+
<li>dry_run (optional): only check what dirs will be deleted, but not really delete them. Default is false.</li>
203204
</td>
204205
<td>
205-
CALL sys.purge_files(table => 'default.T')<br/><br/>
206+
CALL sys.purge_files(table => 'default.T')<br/>
207+
CALL sys.purge_files(table => 'default.T', dry_run => true)
206208
</td>
207209
</tr>
208210
<tr>

paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/PurgeFilesProcedure.java

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,15 @@
2121
import org.apache.paimon.catalog.Catalog;
2222
import org.apache.paimon.catalog.Identifier;
2323
import org.apache.paimon.fs.FileIO;
24+
import org.apache.paimon.fs.FileStatus;
2425
import org.apache.paimon.fs.Path;
2526
import org.apache.paimon.table.FileStoreTable;
2627
import org.apache.paimon.table.Table;
2728

2829
import org.apache.flink.table.procedure.ProcedureContext;
2930

3031
import java.io.IOException;
32+
import java.util.ArrayList;
3133
import java.util.Arrays;
3234

3335
/** A procedure to purge files for a table. */
@@ -36,27 +38,38 @@ public class PurgeFilesProcedure extends ProcedureBase {
3638

3739
public String[] call(ProcedureContext procedureContext, String tableId)
3840
throws Catalog.TableNotExistException {
41+
return call(procedureContext, tableId, false);
42+
}
43+
44+
public String[] call(ProcedureContext procedureContext, String tableId, boolean dryRun)
45+
throws Catalog.TableNotExistException {
3946
Table table = catalog.getTable(Identifier.fromString(tableId));
4047
FileStoreTable fileStoreTable = (FileStoreTable) table;
4148
FileIO fileIO = fileStoreTable.fileIO();
4249
Path tablePath = fileStoreTable.snapshotManager().tablePath();
50+
ArrayList<String> deleteDir;
4351
try {
44-
Arrays.stream(fileIO.listStatus(tablePath))
52+
FileStatus[] fileStatuses = fileIO.listStatus(tablePath);
53+
deleteDir = new ArrayList<>(fileStatuses.length);
54+
Arrays.stream(fileStatuses)
4555
.filter(f -> !f.getPath().getName().contains("schema"))
4656
.forEach(
4757
fileStatus -> {
4858
try {
49-
fileIO.delete(fileStatus.getPath(), true);
59+
deleteDir.add(fileStatus.getPath().getName());
60+
if (!dryRun) {
61+
fileIO.delete(fileStatus.getPath(), true);
62+
}
5063
} catch (IOException e) {
5164
throw new RuntimeException(e);
5265
}
5366
});
5467
} catch (IOException e) {
5568
throw new RuntimeException(e);
5669
}
57-
return new String[] {
58-
String.format("Success purge files with table: %s.", fileStoreTable.name())
59-
};
70+
return deleteDir.isEmpty()
71+
? new String[] {"There are no dir to be deleted."}
72+
: deleteDir.toArray(new String[0]);
6073
}
6174

6275
@Override

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/PurgeFilesProcedure.java

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.paimon.catalog.Catalog;
2222
import org.apache.paimon.catalog.Identifier;
2323
import org.apache.paimon.fs.FileIO;
24+
import org.apache.paimon.fs.FileStatus;
2425
import org.apache.paimon.fs.Path;
2526
import org.apache.paimon.table.FileStoreTable;
2627
import org.apache.paimon.table.Table;
@@ -29,8 +30,10 @@
2930
import org.apache.flink.table.annotation.DataTypeHint;
3031
import org.apache.flink.table.annotation.ProcedureHint;
3132
import org.apache.flink.table.procedure.ProcedureContext;
33+
import org.apache.flink.types.Row;
3234

3335
import java.io.IOException;
36+
import java.util.ArrayList;
3437
import java.util.Arrays;
3538

3639
/**
@@ -45,30 +48,42 @@ public class PurgeFilesProcedure extends ProcedureBase {
4548

4649
public static final String IDENTIFIER = "purge_files";
4750

48-
@ProcedureHint(argument = {@ArgumentHint(name = "table", type = @DataTypeHint("STRING"))})
49-
public String[] call(ProcedureContext procedureContext, String tableId)
51+
@ProcedureHint(
52+
argument = {
53+
@ArgumentHint(name = "table", type = @DataTypeHint("STRING")),
54+
@ArgumentHint(name = "dry_run", type = @DataTypeHint("BOOLEAN"), isOptional = true)
55+
})
56+
public @DataTypeHint("ROW<purged_file_path STRING>") Row[] call(
57+
ProcedureContext procedureContext, String tableId, Boolean dryRun)
5058
throws Catalog.TableNotExistException {
5159
Table table = catalog.getTable(Identifier.fromString(tableId));
5260
FileStoreTable fileStoreTable = (FileStoreTable) table;
5361
FileIO fileIO = fileStoreTable.fileIO();
5462
Path tablePath = fileStoreTable.snapshotManager().tablePath();
63+
ArrayList<String> deleteDir;
5564
try {
56-
Arrays.stream(fileIO.listStatus(tablePath))
65+
FileStatus[] fileStatuses = fileIO.listStatus(tablePath);
66+
deleteDir = new ArrayList<>(fileStatuses.length);
67+
Arrays.stream(fileStatuses)
5768
.filter(f -> !f.getPath().getName().contains("schema"))
5869
.forEach(
5970
fileStatus -> {
6071
try {
61-
fileIO.delete(fileStatus.getPath(), true);
72+
deleteDir.add(fileStatus.getPath().getName());
73+
if (dryRun == null || !dryRun) {
74+
fileIO.delete(fileStatus.getPath(), true);
75+
}
6276
} catch (IOException e) {
6377
throw new RuntimeException(e);
6478
}
6579
});
6680
} catch (IOException e) {
6781
throw new RuntimeException(e);
6882
}
69-
return new String[] {
70-
String.format("Success purge files with table: %s.", fileStoreTable.name())
71-
};
83+
84+
return deleteDir.isEmpty()
85+
? new Row[] {Row.of("There are no dir to be deleted.")}
86+
: deleteDir.stream().map(Row::of).toArray(Row[]::new);
7287
}
7388

7489
@Override

paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/PurgeFilesProcedureITCase.java

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,4 +45,39 @@ public void testPurgeFiles() throws Exception {
4545
sql("INSERT INTO T VALUES (2, 'a')");
4646
assertThat(sql("select * from `T`")).containsExactly(Row.of(2, "a"));
4747
}
48+
49+
@Test
50+
public void testPurgeFilesDryRun() {
51+
sql(
52+
"CREATE TABLE T (id INT, name STRING,"
53+
+ " PRIMARY KEY (id) NOT ENFORCED)"
54+
+ " WITH ('bucket'='1')");
55+
// There are no dir to delete.
56+
assertThat(
57+
sql("CALL sys.purge_files(`table` => 'default.T', `dry_run` => true)")
58+
.stream()
59+
.map(row -> row.getField(0)))
60+
.containsExactlyInAnyOrder("There are no dir to be deleted.");
61+
62+
sql("INSERT INTO T VALUES (1, 'a')");
63+
assertThat(sql("select * from `T`")).containsExactly(Row.of(1, "a"));
64+
65+
// dry run.
66+
assertThat(
67+
sql("CALL sys.purge_files(`table` => 'default.T', `dry_run` => true)")
68+
.stream()
69+
.map(row -> row.getField(0)))
70+
.containsExactlyInAnyOrder("snapshot", "bucket-0", "manifest");
71+
72+
assertThat(sql("select * from `T`")).containsExactly(Row.of(1, "a"));
73+
74+
assertThat(
75+
sql("CALL sys.purge_files(`table` => 'default.T')").stream()
76+
.map(row -> row.getField(0)))
77+
.containsExactlyInAnyOrder("snapshot", "bucket-0", "manifest");
78+
assertThat(sql("select * from `T`")).containsExactly();
79+
80+
sql("INSERT INTO T VALUES (2, 'a')");
81+
assertThat(sql("select * from `T`")).containsExactly(Row.of(2, "a"));
82+
}
4883
}

paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/PurgeFilesProcedure.java

Lines changed: 25 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.apache.paimon.spark.procedure;
2020

2121
import org.apache.paimon.fs.FileIO;
22+
import org.apache.paimon.fs.FileStatus;
2223
import org.apache.paimon.fs.Path;
2324
import org.apache.paimon.table.FileStoreTable;
2425

@@ -31,20 +32,25 @@
3132
import org.apache.spark.unsafe.types.UTF8String;
3233

3334
import java.io.IOException;
35+
import java.util.ArrayList;
3436
import java.util.Arrays;
3537

38+
import static org.apache.spark.sql.types.DataTypes.BooleanType;
3639
import static org.apache.spark.sql.types.DataTypes.StringType;
3740

3841
/** A procedure to purge files for a table. */
3942
public class PurgeFilesProcedure extends BaseProcedure {
4043

4144
private static final ProcedureParameter[] PARAMETERS =
42-
new ProcedureParameter[] {ProcedureParameter.required("table", StringType)};
45+
new ProcedureParameter[] {
46+
ProcedureParameter.required("table", StringType),
47+
ProcedureParameter.optional("dry_run", BooleanType)
48+
};
4349

4450
private static final StructType OUTPUT_TYPE =
4551
new StructType(
4652
new StructField[] {
47-
new StructField("result", StringType, true, Metadata.empty())
53+
new StructField("purged_file_path", StringType, true, Metadata.empty())
4854
});
4955

5056
private PurgeFilesProcedure(TableCatalog tableCatalog) {
@@ -64,20 +70,27 @@ public StructType outputType() {
6470
@Override
6571
public InternalRow[] call(InternalRow args) {
6672
Identifier tableIdent = toIdentifier(args.getString(0), PARAMETERS[0].name());
73+
boolean dryRun = !args.isNullAt(1) && args.getBoolean(1);
6774

6875
return modifyPaimonTable(
6976
tableIdent,
7077
table -> {
7178
FileStoreTable fileStoreTable = (FileStoreTable) table;
7279
FileIO fileIO = fileStoreTable.fileIO();
7380
Path tablePath = fileStoreTable.snapshotManager().tablePath();
81+
ArrayList<String> deleteDir;
7482
try {
75-
Arrays.stream(fileIO.listStatus(tablePath))
83+
FileStatus[] fileStatuses = fileIO.listStatus(tablePath);
84+
deleteDir = new ArrayList<>(fileStatuses.length);
85+
Arrays.stream(fileStatuses)
7686
.filter(f -> !f.getPath().getName().contains("schema"))
7787
.forEach(
7888
fileStatus -> {
7989
try {
80-
fileIO.delete(fileStatus.getPath(), true);
90+
deleteDir.add(fileStatus.getPath().getName());
91+
if (!dryRun) {
92+
fileIO.delete(fileStatus.getPath(), true);
93+
}
8194
} catch (IOException e) {
8295
throw new RuntimeException(e);
8396
}
@@ -87,13 +100,14 @@ public InternalRow[] call(InternalRow args) {
87100
throw new RuntimeException(e);
88101
}
89102

90-
InternalRow outputRow =
91-
newInternalRow(
92-
UTF8String.fromString(
93-
String.format(
94-
"Success purge files with table: %s.",
95-
fileStoreTable.name())));
96-
return new InternalRow[] {outputRow};
103+
return deleteDir.isEmpty()
104+
? new InternalRow[] {
105+
newInternalRow(
106+
UTF8String.fromString("There are no dir to be deleted."))
107+
}
108+
: deleteDir.stream()
109+
.map(x -> newInternalRow(UTF8String.fromString(x)))
110+
.toArray(InternalRow[]::new);
97111
});
98112
}
99113

paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/PurgeFilesProcedureTest.scala

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,4 +40,35 @@ class PurgeFilesProcedureTest extends PaimonSparkTestBase {
4040
checkAnswer(spark.sql("select * from test.T"), Row("2", "aa") :: Nil)
4141
}
4242

43+
test("Paimon procedure: purge files test with dry run.") {
44+
spark.sql(s"""
45+
|CREATE TABLE T (id STRING, name STRING)
46+
|USING PAIMON
47+
|""".stripMargin)
48+
49+
// There are no dir to be deleted.
50+
checkAnswer(
51+
spark.sql("CALL paimon.sys.purge_files(table => 'test.T')"),
52+
Row("There are no dir to be deleted.") :: Nil
53+
)
54+
55+
spark.sql("insert into T select '1', 'aa'");
56+
checkAnswer(spark.sql("select * from test.T"), Row("1", "aa") :: Nil)
57+
58+
// dry run.
59+
checkAnswer(
60+
spark.sql("CALL paimon.sys.purge_files(table => 'test.T', dry_run => true)"),
61+
Row("snapshot") :: Row("bucket-0") :: Row("manifest") :: Nil
62+
)
63+
checkAnswer(spark.sql("select * from test.T"), Row("1", "aa") :: Nil)
64+
65+
// Do delete.
66+
spark.sql("CALL paimon.sys.purge_files(table => 'test.T')")
67+
checkAnswer(spark.sql("select * from test.T"), Nil)
68+
69+
// insert new data.
70+
spark.sql("insert into T select '2', 'aa'");
71+
checkAnswer(spark.sql("select * from test.T"), Row("2", "aa") :: Nil)
72+
}
73+
4374
}

0 commit comments

Comments
 (0)