Skip to content

Commit f5e5ada

Browse files
authored
[spark] Support compact_database procedure (#6328) (#6910)
1 parent 61d7443 commit f5e5ada

File tree

3 files changed

+423
-0
lines changed

3 files changed

+423
-0
lines changed

paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.paimon.spark.procedure.AlterFunctionProcedure;
2222
import org.apache.paimon.spark.procedure.AlterViewDialectProcedure;
2323
import org.apache.paimon.spark.procedure.ClearConsumersProcedure;
24+
import org.apache.paimon.spark.procedure.CompactDatabaseProcedure;
2425
import org.apache.paimon.spark.procedure.CompactManifestProcedure;
2526
import org.apache.paimon.spark.procedure.CompactProcedure;
2627
import org.apache.paimon.spark.procedure.CopyFilesProcedure;
@@ -96,6 +97,7 @@ private static Map<String, Supplier<ProcedureBuilder>> initProcedureBuilders() {
9697
procedureBuilders.put("create_global_index", CreateGlobalIndexProcedure::builder);
9798
procedureBuilders.put("delete_branch", DeleteBranchProcedure::builder);
9899
procedureBuilders.put("compact", CompactProcedure::builder);
100+
procedureBuilders.put("compact_database", CompactDatabaseProcedure::builder);
99101
procedureBuilders.put("rescale", RescaleProcedure::builder);
100102
procedureBuilders.put("migrate_database", MigrateDatabaseProcedure::builder);
101103
procedureBuilders.put("migrate_table", MigrateTableProcedure::builder);
Lines changed: 217 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,217 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.spark.procedure;
20+
21+
import org.apache.paimon.catalog.Catalog;
22+
import org.apache.paimon.catalog.Identifier;
23+
import org.apache.paimon.spark.catalog.WithPaimonCatalog;
24+
import org.apache.paimon.table.FileStoreTable;
25+
import org.apache.paimon.table.Table;
26+
import org.apache.paimon.utils.StringUtils;
27+
28+
import org.apache.spark.sql.catalyst.InternalRow;
29+
import org.apache.spark.sql.connector.catalog.TableCatalog;
30+
import org.apache.spark.sql.types.DataTypes;
31+
import org.apache.spark.sql.types.Metadata;
32+
import org.apache.spark.sql.types.StructField;
33+
import org.apache.spark.sql.types.StructType;
34+
import org.apache.spark.unsafe.types.UTF8String;
35+
import org.slf4j.Logger;
36+
import org.slf4j.LoggerFactory;
37+
38+
import java.util.List;
39+
import java.util.regex.Matcher;
40+
import java.util.regex.Pattern;
41+
42+
import static org.apache.spark.sql.types.DataTypes.StringType;
43+
44+
/**
45+
* Compact database procedure. Usage:
46+
*
47+
* <pre><code>
48+
* -- compact all databases
49+
* CALL sys.compact_database()
50+
*
51+
* -- compact some databases (accept regular expression)
52+
* CALL sys.compact_database(including_databases => 'db1|db2')
53+
*
54+
* -- compact some tables (accept regular expression)
55+
* CALL sys.compact_database(including_databases => 'db1', including_tables => 'table1|table2')
56+
*
57+
* -- exclude some tables (accept regular expression)
58+
* CALL sys.compact_database(including_databases => 'db1', including_tables => '.*', excluding_tables => 'ignore_table')
59+
*
60+
* -- set table options ('k=v,...')
61+
* CALL sys.compact_database(including_databases => 'db1', options => 'key1=value1,key2=value2')
62+
* </code></pre>
63+
*/
64+
public class CompactDatabaseProcedure extends BaseProcedure {
65+
66+
private static final Logger LOG = LoggerFactory.getLogger(CompactDatabaseProcedure.class);
67+
68+
private static final ProcedureParameter[] PARAMETERS =
69+
new ProcedureParameter[] {
70+
ProcedureParameter.optional("including_databases", StringType),
71+
ProcedureParameter.optional("including_tables", StringType),
72+
ProcedureParameter.optional("excluding_tables", StringType),
73+
ProcedureParameter.optional("options", StringType),
74+
};
75+
76+
private static final StructType OUTPUT_TYPE =
77+
new StructType(
78+
new StructField[] {
79+
new StructField("result", DataTypes.StringType, true, Metadata.empty())
80+
});
81+
82+
protected CompactDatabaseProcedure(TableCatalog tableCatalog) {
83+
super(tableCatalog);
84+
}
85+
86+
@Override
87+
public ProcedureParameter[] parameters() {
88+
return PARAMETERS;
89+
}
90+
91+
@Override
92+
public StructType outputType() {
93+
return OUTPUT_TYPE;
94+
}
95+
96+
@Override
97+
public InternalRow[] call(InternalRow args) {
98+
String includingDatabases = args.isNullAt(0) ? ".*" : args.getString(0);
99+
String includingTables = args.isNullAt(1) ? ".*" : args.getString(1);
100+
String excludingTables = args.isNullAt(2) ? null : args.getString(2);
101+
String options = args.isNullAt(3) ? null : args.getString(3);
102+
103+
Pattern databasePattern = Pattern.compile(includingDatabases);
104+
Pattern includingPattern = Pattern.compile(includingTables);
105+
Pattern excludingPattern =
106+
StringUtils.isNullOrWhitespaceOnly(excludingTables)
107+
? null
108+
: Pattern.compile(excludingTables);
109+
110+
Catalog paimonCatalog = ((WithPaimonCatalog) tableCatalog()).paimonCatalog();
111+
112+
int successCount = 0;
113+
int failedCount = 0;
114+
115+
try {
116+
List<String> databases = paimonCatalog.listDatabases();
117+
for (String databaseName : databases) {
118+
Matcher databaseMatcher = databasePattern.matcher(databaseName);
119+
if (!databaseMatcher.matches()) {
120+
LOG.debug("Database '{}' is excluded by pattern.", databaseName);
121+
continue;
122+
}
123+
124+
List<String> tables = paimonCatalog.listTables(databaseName);
125+
for (String tableName : tables) {
126+
String fullTableName = String.format("%s.%s", databaseName, tableName);
127+
128+
if (!shouldCompactTable(fullTableName, includingPattern, excludingPattern)) {
129+
LOG.debug("Table '{}' is excluded by pattern.", fullTableName);
130+
continue;
131+
}
132+
133+
try {
134+
Table table =
135+
paimonCatalog.getTable(Identifier.create(databaseName, tableName));
136+
if (!(table instanceof FileStoreTable)) {
137+
LOG.warn(
138+
"Only FileStoreTable supports compact action. "
139+
+ "Table '{}' type is '{}'.",
140+
fullTableName,
141+
table.getClass().getName());
142+
continue;
143+
}
144+
145+
compactTable(fullTableName, options);
146+
successCount++;
147+
LOG.info("Successfully compacted table: {}", fullTableName);
148+
} catch (Exception e) {
149+
failedCount++;
150+
LOG.error("Failed to compact table: {}", fullTableName, e);
151+
}
152+
}
153+
}
154+
} catch (Catalog.DatabaseNotExistException e) {
155+
throw new RuntimeException(e);
156+
}
157+
158+
String result =
159+
String.format(
160+
"Compact database finished. Success: %d, Failed: %d",
161+
successCount, failedCount);
162+
return new InternalRow[] {newInternalRow(UTF8String.fromString(result))};
163+
}
164+
165+
private boolean shouldCompactTable(
166+
String fullTableName, Pattern includingPattern, Pattern excludingPattern) {
167+
boolean shouldCompact = includingPattern.matcher(fullTableName).matches();
168+
if (excludingPattern != null) {
169+
shouldCompact = shouldCompact && !excludingPattern.matcher(fullTableName).matches();
170+
}
171+
return shouldCompact;
172+
}
173+
174+
private void compactTable(String tableName, String options) throws Exception {
175+
LOG.info("Start to compact table: {}", tableName);
176+
177+
// Build CompactProcedure and call it for each table
178+
CompactProcedure compactProcedure =
179+
(CompactProcedure)
180+
CompactProcedure.builder().withTableCatalog(tableCatalog()).build();
181+
182+
// Create InternalRow with the parameters for CompactProcedure
183+
// Parameters: table, partitions, compact_strategy, order_strategy, order_by, where,
184+
// options, partition_idle_time
185+
InternalRow compactArgs =
186+
newInternalRow(
187+
UTF8String.fromString(tableName), // table
188+
null, // partitions
189+
null, // compact_strategy
190+
null, // order_strategy
191+
null, // order_by
192+
null, // where
193+
options == null ? null : UTF8String.fromString(options), // options
194+
null // partition_idle_time
195+
);
196+
197+
InternalRow[] result = compactProcedure.call(compactArgs);
198+
199+
if (result.length > 0 && !result[0].getBoolean(0)) {
200+
throw new RuntimeException("Compact failed for table: " + tableName);
201+
}
202+
}
203+
204+
public static ProcedureBuilder builder() {
205+
return new BaseProcedure.Builder<CompactDatabaseProcedure>() {
206+
@Override
207+
public CompactDatabaseProcedure doBuild() {
208+
return new CompactDatabaseProcedure(tableCatalog());
209+
}
210+
};
211+
}
212+
213+
@Override
214+
public String description() {
215+
return "This procedure executes compact action on all tables in database(s).";
216+
}
217+
}

0 commit comments

Comments
 (0)