Skip to content

Commit 24fc423

Browse files
committed
[core] Support dedicated full compact to external paths for pk table
1 parent 3ca5ac6 commit 24fc423

File tree

9 files changed

+117
-16
lines changed

9 files changed

+117
-16
lines changed

docs/layouts/shortcodes/generated/core_configuration.html

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -356,6 +356,12 @@
356356
<td>Boolean</td>
357357
<td>Optional endInput check partition expire used in case of batch mode or bounded stream.</td>
358358
</tr>
359+
<tr>
360+
<td><h5>external.compact</h5></td>
361+
<td style="word-wrap: break-word;">false</td>
362+
<td>Boolean</td>
363+
<td>Whether to execute full compaction to external paths.</td>
364+
</tr>
359365
<tr>
360366
<td><h5>fields.default-aggregate-function</h5></td>
361367
<td style="word-wrap: break-word;">(none)</td>

paimon-api/src/main/java/org/apache/paimon/CoreOptions.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,12 @@ public class CoreOptions implements Serializable {
151151
+ ExternalPathStrategy.SPECIFIC_FS
152152
+ ", should be the prefix scheme of the external path, now supported are s3 and oss.");
153153

154+
public static final ConfigOption<Boolean> COMPACTION_EXTERNAL =
155+
key("compaction.external")
156+
.booleanType()
157+
.defaultValue(false)
158+
.withDescription("Whether to execute full compaction to external paths.");
159+
154160
@ExcludeFromDocumentation("Internal use only")
155161
public static final ConfigOption<String> PATH =
156162
key("path")
@@ -2457,6 +2463,10 @@ public String externalSpecificFS() {
24572463
return options.get(DATA_FILE_EXTERNAL_PATHS_SPECIFIC_FS);
24582464
}
24592465

2466+
public Boolean externalCompact() {
2467+
return options.get(COMPACTION_EXTERNAL);
2468+
}
2469+
24602470
public String partitionTimestampFormatter() {
24612471
return options.get(PARTITION_TIMESTAMP_FORMATTER);
24622472
}

paimon-core/src/main/java/org/apache/paimon/mergetree/compact/CompactStrategy.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,8 @@ static Optional<CompactUnit> pickFullCompaction(
5555
int numLevels,
5656
List<LevelSortedRun> runs,
5757
@Nullable RecordLevelExpire recordLevelExpire,
58-
@Nullable DeletionVectorsMaintainer dvMaintainer) {
58+
@Nullable DeletionVectorsMaintainer dvMaintainer,
59+
boolean externalCompact) {
5960
int maxLevel = numLevels - 1;
6061
if (runs.isEmpty()) {
6162
// no sorted run, no need to compact
@@ -64,7 +65,10 @@ static Optional<CompactUnit> pickFullCompaction(
6465
List<DataFileMeta> filesToBeCompacted = new ArrayList<>();
6566

6667
for (DataFileMeta file : runs.get(0).run().files()) {
67-
if (recordLevelExpire != null && recordLevelExpire.isExpireFile(file)) {
68+
if (externalCompact) {
69+
// add all files when it is an external compaction
70+
filesToBeCompacted.add(file);
71+
} else if (recordLevelExpire != null && recordLevelExpire.isExpireFile(file)) {
6872
// check record level expire for large files
6973
filesToBeCompacted.add(file);
7074
} else if (dvMaintainer != null

paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManager.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ public class MergeTreeCompactManager extends CompactFutureManager {
6565
@Nullable private final DeletionVectorsMaintainer dvMaintainer;
6666
private final boolean lazyGenDeletionFile;
6767
private final boolean needLookup;
68+
private final boolean externalCompact;
6869

6970
@Nullable private final RecordLevelExpire recordLevelExpire;
7071

@@ -80,7 +81,8 @@ public MergeTreeCompactManager(
8081
@Nullable DeletionVectorsMaintainer dvMaintainer,
8182
boolean lazyGenDeletionFile,
8283
boolean needLookup,
83-
@Nullable RecordLevelExpire recordLevelExpire) {
84+
@Nullable RecordLevelExpire recordLevelExpire,
85+
boolean externalCompact) {
8486
this.executor = executor;
8587
this.levels = levels;
8688
this.strategy = strategy;
@@ -93,6 +95,7 @@ public MergeTreeCompactManager(
9395
this.lazyGenDeletionFile = lazyGenDeletionFile;
9496
this.recordLevelExpire = recordLevelExpire;
9597
this.needLookup = needLookup;
98+
this.externalCompact = externalCompact;
9699

97100
MetricUtils.safeCall(this::reportMetrics, LOG);
98101
}
@@ -135,7 +138,11 @@ public void triggerCompaction(boolean fullCompaction) {
135138
}
136139
optionalUnit =
137140
CompactStrategy.pickFullCompaction(
138-
levels.numberOfLevels(), runs, recordLevelExpire, dvMaintainer);
141+
levels.numberOfLevels(),
142+
runs,
143+
recordLevelExpire,
144+
dvMaintainer,
145+
externalCompact);
139146
} else {
140147
if (taskFuture != null) {
141148
return;
@@ -210,7 +217,8 @@ private void submitCompaction(CompactUnit unit, boolean dropDelete) {
210217
metricsReporter,
211218
compactDfSupplier,
212219
dvMaintainer,
213-
recordLevelExpire);
220+
recordLevelExpire,
221+
externalCompact);
214222
if (LOG.isDebugEnabled()) {
215223
LOG.debug(
216224
"Pick these files (name, level, size) for compaction: {}",

paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactTask.java

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ public class MergeTreeCompactTask extends CompactTask {
5555
private int upgradeFilesNum;
5656

5757
@Nullable private final RecordLevelExpire recordLevelExpire;
58+
private final boolean externalCompact;
5859
@Nullable private final DeletionVectorsMaintainer dvMaintainer;
5960

6061
public MergeTreeCompactTask(
@@ -67,7 +68,8 @@ public MergeTreeCompactTask(
6768
@Nullable CompactionMetrics.Reporter metricsReporter,
6869
Supplier<CompactDeletionFile> compactDfSupplier,
6970
@Nullable DeletionVectorsMaintainer dvMaintainer,
70-
@Nullable RecordLevelExpire recordLevelExpire) {
71+
@Nullable RecordLevelExpire recordLevelExpire,
72+
boolean externalCompact) {
7173
super(metricsReporter);
7274
this.minFileSize = minFileSize;
7375
this.rewriter = rewriter;
@@ -78,6 +80,7 @@ public MergeTreeCompactTask(
7880
this.dropDelete = dropDelete;
7981
this.maxLevel = maxLevel;
8082
this.recordLevelExpire = recordLevelExpire;
83+
this.externalCompact = externalCompact;
8184

8285
this.upgradeFilesNum = 0;
8386
}
@@ -127,21 +130,23 @@ protected String logMetric(
127130
private void upgrade(DataFileMeta file, CompactResult toUpdate) throws Exception {
128131
if (file.level() == outputLevel) {
129132
if (isContainExpiredRecords(file)
133+
|| externalCompact
130134
|| (dvMaintainer != null
131135
&& dvMaintainer.deletionVectorOf(file.fileName()).isPresent())) {
132136
/*
133137
* 1. if the large file in maxLevel has expired records, we need to rewrite it.
134-
* 2. if the large file in maxLevel has corresponding deletion vector, we need to rewrite it.
138+
* 2. if it is an external compaction, we need to rewrite all files.
139+
* 3. if the large file in maxLevel has corresponding deletion vector, we need to rewrite it.
135140
*/
136141
rewriteFile(file, toUpdate);
137142
}
138143
return;
139144
}
140145

141146
if (outputLevel != maxLevel || file.deleteRowCount().map(d -> d == 0).orElse(false)) {
142-
if (isContainExpiredRecords(file)) {
143-
// if the file which could be directly upgraded has expired records, we need to
144-
// rewrite it
147+
if (isContainExpiredRecords(file) || externalCompact) {
148+
// if the file which could be directly upgraded has expired records, or it is an
149+
// external compaction, we need to rewrite it
145150
rewriteFile(file, toUpdate);
146151
} else {
147152
CompactResult upgradeResult = rewriter.upgrade(outputLevel, file);

paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -290,7 +290,8 @@ private CompactManager createCompactManager(
290290
dvMaintainer,
291291
options.prepareCommitWaitCompaction(),
292292
options.needLookup(),
293-
recordLevelExpire);
293+
recordLevelExpire,
294+
options.externalCompact());
294295
}
295296
}
296297

paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -542,7 +542,8 @@ private MergeTreeCompactManager createCompactManager(
542542
null,
543543
false,
544544
options.needLookup(),
545-
null);
545+
null,
546+
false);
546547
}
547548

548549
static class MockFailResultCompactionManager extends MergeTreeCompactManager {
@@ -566,7 +567,8 @@ public MockFailResultCompactionManager(
566567
null,
567568
false,
568569
false,
569-
null);
570+
null,
571+
false);
570572
}
571573

572574
protected CompactResult obtainCompactResult()

paimon-core/src/test/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManagerTest.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -208,7 +208,8 @@ public void testIsCompacting() {
208208
null,
209209
false,
210210
true,
211-
null);
211+
null,
212+
false);
212213

213214
MergeTreeCompactManager defaultManager =
214215
new MergeTreeCompactManager(
@@ -223,7 +224,8 @@ public void testIsCompacting() {
223224
null,
224225
false,
225226
false,
226-
null);
227+
null,
228+
false);
227229

228230
assertThat(lookupManager.compactNotCompleted()).isTrue();
229231
assertThat(defaultManager.compactNotCompleted()).isFalse();
@@ -259,7 +261,8 @@ private void innerTest(
259261
null,
260262
false,
261263
false,
262-
null);
264+
null,
265+
false);
263266
manager.triggerCompaction(false);
264267
manager.getCompactionResult(true);
265268
List<LevelMinMax> outputs =

paimon-e2e-tests/src/test/java/org/apache/paimon/tests/FlinkProceduresE2eTest.java

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,4 +121,66 @@ public void testCompact() throws Exception {
121121
// check that second part of test data are compacted
122122
checkResult("20221205, 1, 101", "20221206, 1, 101");
123123
}
124+
125+
@Test
126+
public void testExternalCompact() throws Exception {
127+
// create hive catalog to test catalog loading
128+
String warehouse = HDFS_ROOT + "/" + UUID.randomUUID() + ".warehouse";
129+
String externalPath = HDFS_ROOT + "/" + UUID.randomUUID() + ".warehouse/external";
130+
String catalogDdl =
131+
String.format(
132+
"CREATE CATALOG ts_catalog WITH (\n"
133+
+ " 'type' = 'paimon',\n"
134+
+ " 'warehouse' = '%s',\n"
135+
+ " 'metastore' = 'hive',\n"
136+
+ " 'uri' = 'thrift://hive-metastore:9083'\n"
137+
+ ");",
138+
warehouse);
139+
String useCatalogCmd = "USE CATALOG ts_catalog;";
140+
141+
String tableDdl =
142+
"CREATE TABLE IF NOT EXISTS ts_table (\n"
143+
+ " dt STRING,\n"
144+
+ " k INT,\n"
145+
+ " v INT,\n"
146+
+ " PRIMARY KEY (dt, k) NOT ENFORCED\n"
147+
+ ") PARTITIONED BY (dt) WITH (\n"
148+
+ " 'data-file.external-paths' = '"
149+
+ externalPath
150+
+ "',\n"
151+
+ " 'bucket' = '1',\n"
152+
+ " 'write-only' = 'true'\n"
153+
+ ");";
154+
155+
String insert1 =
156+
"INSERT INTO ts_table VALUES ('2023-01-13', 0, 0), ('2023-01-14', 0, 0), ('2023-01-13', 0, 0), "
157+
+ "('2023-01-15', 0, 1), ('2023-01-15', 0, 1), ('2023-01-15', 0, 1), "
158+
+ "('2023-01-16', 1, 0), ('2023-01-17', 1, 0), ('2023-01-18', 1, 0), "
159+
+ "('2023-01-19', 1, 1), ('2023-01-20', 1, 1), ('2023-01-21', 1, 1);";
160+
161+
String insert2 =
162+
"INSERT INTO ts_table VALUES ('2023-01-13', 0, 1), ('2023-01-14', 0, 1), ('2023-01-13', 0, 1), "
163+
+ "('2023-01-15', 0, 2), ('2023-01-15', 0, 2), ('2023-01-15', 0, 2), "
164+
+ "('2023-01-16', 1, 3), ('2023-01-17', 1, 3), ('2023-01-18', 1, 3), "
165+
+ "('2023-01-16', 2, 3), ('2023-01-17', 2, 3), ('2023-01-18', 2, 3), ";
166+
167+
runBatchSql(
168+
"SET 'table.dml-sync' = 'true';\n" + insert1, catalogDdl, useCatalogCmd, tableDdl);
169+
runBatchSql(
170+
"SET 'table.dml-sync' = 'true';\n" + insert2, catalogDdl, useCatalogCmd, tableDdl);
171+
172+
Thread.sleep(5000);
173+
174+
// execute external compact procedure
175+
String callStatement;
176+
if (System.getProperty("test.flink.main.version").compareTo("1.18") == 0) {
177+
callStatement = "CALL sys.compact('default.ts_table');";
178+
} else {
179+
callStatement =
180+
"CALL sys.compact(table => 'default.ts_table', partition_idle_time =>'1s'," +
181+
" options => 'external.compact=true,data-file.external-paths.strategy=specific-fs,data-file.external-paths.specific-fs=hdfs');;";
182+
}
183+
184+
runStreamingSql(callStatement, catalogDdl, useCatalogCmd);
185+
}
124186
}

0 commit comments

Comments
 (0)