Skip to content

Commit 8d6e8ce

Browse files
committed
[core] Support dedicated full compact to external paths
1 parent b0bb00f commit 8d6e8ce

File tree

15 files changed

+144
-25
lines changed

15 files changed

+144
-25
lines changed

docs/layouts/shortcodes/generated/core_configuration.html

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,12 @@
182182
<td>Double</td>
183183
<td>Ratio of the deleted rows in a data file to be forced compacted for append-only table.</td>
184184
</tr>
185+
<tr>
186+
<td><h5>compaction.force-compact-all-files</h5></td>
187+
<td style="word-wrap: break-word;">false</td>
188+
<td>Boolean</td>
189+
<td>Whether to force pick all files for a full compaction. Usually seen in a compaction task to external paths.</td>
190+
</tr>
185191
<tr>
186192
<td><h5>compaction.force-up-level-0</h5></td>
187193
<td style="word-wrap: break-word;">false</td>

paimon-api/src/main/java/org/apache/paimon/CoreOptions.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,13 @@ public class CoreOptions implements Serializable {
151151
+ ExternalPathStrategy.SPECIFIC_FS
152152
+ ", should be the prefix scheme of the external path, now supported are s3 and oss.");
153153

154+
public static final ConfigOption<Boolean> COMPACTION_FORCE_COMPACT_ALL_FILES =
155+
key("compaction.force-compact-all-files")
156+
.booleanType()
157+
.defaultValue(false)
158+
.withDescription(
159+
"Whether to force pick all files for a full compaction. Usually seen in a compaction task to external paths.");
160+
154161
@ExcludeFromDocumentation("Internal use only")
155162
public static final ConfigOption<String> PATH =
156163
key("path")
@@ -2466,6 +2473,10 @@ public String externalSpecificFS() {
24662473
return options.get(DATA_FILE_EXTERNAL_PATHS_SPECIFIC_FS);
24672474
}
24682475

2476+
public Boolean forceCompactAllFiles() {
2477+
return options.get(COMPACTION_FORCE_COMPACT_ALL_FILES);
2478+
}
2479+
24692480
public String partitionTimestampFormatter() {
24702481
return options.get(PARTITION_TIMESTAMP_FORMATTER);
24712482
}

paimon-core/src/main/java/org/apache/paimon/append/BucketedAppendCompactManager.java

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ public class BucketedAppendCompactManager extends CompactFutureManager {
6060
private final PriorityQueue<DataFileMeta> toCompact;
6161
private final int minFileNum;
6262
private final long targetFileSize;
63+
private final boolean forceCompactAllFiles;
6364
private final CompactRewriter rewriter;
6465

6566
private List<DataFileMeta> compacting;
@@ -72,6 +73,7 @@ public BucketedAppendCompactManager(
7273
@Nullable DeletionVectorsMaintainer dvMaintainer,
7374
int minFileNum,
7475
long targetFileSize,
76+
boolean forceCompactAllFiles,
7577
CompactRewriter rewriter,
7678
@Nullable CompactionMetrics.Reporter metricsReporter) {
7779
this.executor = executor;
@@ -80,6 +82,7 @@ public BucketedAppendCompactManager(
8082
this.toCompact.addAll(restored);
8183
this.minFileNum = minFileNum;
8284
this.targetFileSize = targetFileSize;
85+
this.forceCompactAllFiles = forceCompactAllFiles;
8386
this.rewriter = rewriter;
8487
this.metricsReporter = metricsReporter;
8588
}
@@ -98,9 +101,10 @@ private void triggerFullCompaction() {
98101
taskFuture == null,
99102
"A compaction task is still running while the user "
100103
+ "forces a new compaction. This is unexpected.");
101-
// if deletion vector enables, always trigger compaction.
102-
if (toCompact.isEmpty()
103-
|| (dvMaintainer == null && toCompact.size() < FULL_COMPACT_MIN_FILE)) {
104+
// if all files are force picked or deletion vector enables, always trigger compaction.
105+
if (!forceCompactAllFiles
106+
&& (toCompact.isEmpty()
107+
|| (dvMaintainer == null && toCompact.size() < FULL_COMPACT_MIN_FILE))) {
104108
return;
105109
}
106110

@@ -114,6 +118,7 @@ private void triggerFullCompaction() {
114118
dvMaintainer,
115119
toCompact,
116120
targetFileSize,
121+
forceCompactAllFiles,
117122
rewriter,
118123
metricsReporter));
119124
recordCompactionsQueuedRequest();
@@ -238,25 +243,28 @@ public static class FullCompactTask extends CompactTask {
238243
private final DeletionVectorsMaintainer dvMaintainer;
239244
private final LinkedList<DataFileMeta> toCompact;
240245
private final long targetFileSize;
246+
private final boolean forceCompactAllFiles;
241247
private final CompactRewriter rewriter;
242248

243249
public FullCompactTask(
244250
DeletionVectorsMaintainer dvMaintainer,
245251
Collection<DataFileMeta> inputs,
246252
long targetFileSize,
253+
boolean forceCompactAllFiles,
247254
CompactRewriter rewriter,
248255
@Nullable CompactionMetrics.Reporter metricsReporter) {
249256
super(metricsReporter);
250257
this.dvMaintainer = dvMaintainer;
251258
this.toCompact = new LinkedList<>(inputs);
252259
this.targetFileSize = targetFileSize;
260+
this.forceCompactAllFiles = forceCompactAllFiles;
253261
this.rewriter = rewriter;
254262
}
255263

256264
@Override
257265
protected CompactResult doCompact() throws Exception {
258266
// remove large files
259-
while (!toCompact.isEmpty()) {
267+
while (!forceCompactAllFiles && !toCompact.isEmpty()) {
260268
DataFileMeta file = toCompact.peekFirst();
261269
// the data file with deletion file always need to be compacted.
262270
if (file.fileSize() >= targetFileSize && !hasDeletionFile(file)) {
@@ -281,7 +289,8 @@ protected CompactResult doCompact() throws Exception {
281289
small++;
282290
}
283291
}
284-
if (small > big && toCompact.size() >= FULL_COMPACT_MIN_FILE) {
292+
if (forceCompactAllFiles
293+
|| (small > big && toCompact.size() >= FULL_COMPACT_MIN_FILE)) {
285294
return compact(null, toCompact, rewriter);
286295
} else {
287296
return result(emptyList(), emptyList());

paimon-core/src/main/java/org/apache/paimon/mergetree/compact/CompactStrategy.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,8 @@ static Optional<CompactUnit> pickFullCompaction(
5555
int numLevels,
5656
List<LevelSortedRun> runs,
5757
@Nullable RecordLevelExpire recordLevelExpire,
58-
@Nullable DeletionVectorsMaintainer dvMaintainer) {
58+
@Nullable DeletionVectorsMaintainer dvMaintainer,
59+
boolean forceCompactAllFiles) {
5960
int maxLevel = numLevels - 1;
6061
if (runs.isEmpty()) {
6162
// no sorted run, no need to compact
@@ -64,7 +65,10 @@ static Optional<CompactUnit> pickFullCompaction(
6465
List<DataFileMeta> filesToBeCompacted = new ArrayList<>();
6566

6667
for (DataFileMeta file : runs.get(0).run().files()) {
67-
if (recordLevelExpire != null && recordLevelExpire.isExpireFile(file)) {
68+
if (forceCompactAllFiles) {
69+
// add all files when force compacted
70+
filesToBeCompacted.add(file);
71+
} else if (recordLevelExpire != null && recordLevelExpire.isExpireFile(file)) {
6872
// check record level expire for large files
6973
filesToBeCompacted.add(file);
7074
} else if (dvMaintainer != null

paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManager.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ public class MergeTreeCompactManager extends CompactFutureManager {
6565
@Nullable private final DeletionVectorsMaintainer dvMaintainer;
6666
private final boolean lazyGenDeletionFile;
6767
private final boolean needLookup;
68+
private final boolean forceCompactAllFiles;
6869

6970
@Nullable private final RecordLevelExpire recordLevelExpire;
7071

@@ -80,7 +81,8 @@ public MergeTreeCompactManager(
8081
@Nullable DeletionVectorsMaintainer dvMaintainer,
8182
boolean lazyGenDeletionFile,
8283
boolean needLookup,
83-
@Nullable RecordLevelExpire recordLevelExpire) {
84+
@Nullable RecordLevelExpire recordLevelExpire,
85+
boolean forceCompactAllFiles) {
8486
this.executor = executor;
8587
this.levels = levels;
8688
this.strategy = strategy;
@@ -93,6 +95,7 @@ public MergeTreeCompactManager(
9395
this.lazyGenDeletionFile = lazyGenDeletionFile;
9496
this.recordLevelExpire = recordLevelExpire;
9597
this.needLookup = needLookup;
98+
this.forceCompactAllFiles = forceCompactAllFiles;
9699

97100
MetricUtils.safeCall(this::reportMetrics, LOG);
98101
}
@@ -135,7 +138,11 @@ public void triggerCompaction(boolean fullCompaction) {
135138
}
136139
optionalUnit =
137140
CompactStrategy.pickFullCompaction(
138-
levels.numberOfLevels(), runs, recordLevelExpire, dvMaintainer);
141+
levels.numberOfLevels(),
142+
runs,
143+
recordLevelExpire,
144+
dvMaintainer,
145+
forceCompactAllFiles);
139146
} else {
140147
if (taskFuture != null) {
141148
return;
@@ -210,7 +217,8 @@ private void submitCompaction(CompactUnit unit, boolean dropDelete) {
210217
metricsReporter,
211218
compactDfSupplier,
212219
dvMaintainer,
213-
recordLevelExpire);
220+
recordLevelExpire,
221+
forceCompactAllFiles);
214222
if (LOG.isDebugEnabled()) {
215223
LOG.debug(
216224
"Pick these files (name, level, size) for compaction: {}",

paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactTask.java

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ public class MergeTreeCompactTask extends CompactTask {
5555
private int upgradeFilesNum;
5656

5757
@Nullable private final RecordLevelExpire recordLevelExpire;
58+
private final boolean forceCompactAllFiles;
5859
@Nullable private final DeletionVectorsMaintainer dvMaintainer;
5960

6061
public MergeTreeCompactTask(
@@ -67,7 +68,8 @@ public MergeTreeCompactTask(
6768
@Nullable CompactionMetrics.Reporter metricsReporter,
6869
Supplier<CompactDeletionFile> compactDfSupplier,
6970
@Nullable DeletionVectorsMaintainer dvMaintainer,
70-
@Nullable RecordLevelExpire recordLevelExpire) {
71+
@Nullable RecordLevelExpire recordLevelExpire,
72+
boolean forceCompactAllFiles) {
7173
super(metricsReporter);
7274
this.minFileSize = minFileSize;
7375
this.rewriter = rewriter;
@@ -78,6 +80,7 @@ public MergeTreeCompactTask(
7880
this.dropDelete = dropDelete;
7981
this.maxLevel = maxLevel;
8082
this.recordLevelExpire = recordLevelExpire;
83+
this.forceCompactAllFiles = forceCompactAllFiles;
8184

8285
this.upgradeFilesNum = 0;
8386
}
@@ -126,22 +129,24 @@ protected String logMetric(
126129

127130
private void upgrade(DataFileMeta file, CompactResult toUpdate) throws Exception {
128131
if (file.level() == outputLevel) {
129-
if (isContainExpiredRecords(file)
132+
if (forceCompactAllFiles
133+
|| isContainExpiredRecords(file)
130134
|| (dvMaintainer != null
131135
&& dvMaintainer.deletionVectorOf(file.fileName()).isPresent())) {
132136
/*
133-
* 1. if the large file in maxLevel has expired records, we need to rewrite it.
134-
* 2. if the large file in maxLevel has corresponding deletion vector, we need to rewrite it.
137+
* 1. if files are force picked, we need to rewrite all files.
138+
* 2. if the large file in maxLevel has expired records, we need to rewrite it.
139+
* 3. if the large file in maxLevel has corresponding deletion vector, we need to rewrite it.
135140
*/
136141
rewriteFile(file, toUpdate);
137142
}
138143
return;
139144
}
140145

141146
if (outputLevel != maxLevel || file.deleteRowCount().map(d -> d == 0).orElse(false)) {
142-
if (isContainExpiredRecords(file)) {
143-
// if the file which could be directly upgraded has expired records, we need to
144-
// rewrite it
147+
if (forceCompactAllFiles || isContainExpiredRecords(file)) {
148+
// if all files are force picked, or the file which could be directly upgraded has
149+
// expired records, we need to rewrite it
145150
rewriteFile(file, toUpdate);
146151
} else {
147152
CompactResult upgradeResult = rewriter.upgrade(outputLevel, file);

paimon-core/src/main/java/org/apache/paimon/operation/BucketedAppendFileStoreWrite.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@ protected CompactManager getCompactManager(
9191
dvMaintainer,
9292
options.compactionMinFileNum(),
9393
options.targetFileSize(false),
94+
options.forceCompactAllFiles(),
9495
files -> compactRewrite(partition, bucket, dvFactory, files),
9596
compactionMetrics == null
9697
? null

paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -290,7 +290,8 @@ private CompactManager createCompactManager(
290290
dvMaintainer,
291291
options.prepareCommitWaitCompaction(),
292292
options.needLookup(),
293-
recordLevelExpire);
293+
recordLevelExpire,
294+
options.forceCompactAllFiles());
294295
}
295296
}
296297

paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -652,6 +652,7 @@ private Pair<AppendOnlyWriter, List<DataFileMeta>> createWriter(
652652
null,
653653
MIN_FILE_NUM,
654654
targetFileSize,
655+
false,
655656
compactBefore -> {
656657
latch.await();
657658
return compactBefore.isEmpty()

paimon-core/src/test/java/org/apache/paimon/append/BucketedAppendCompactManagerTest.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,7 @@ private void innerTest(
207207
null,
208208
minFileNum,
209209
targetFileSize,
210+
false,
210211
null, // not used
211212
null);
212213
Optional<List<DataFileMeta>> actual = manager.pickCompactBefore();

0 commit comments

Comments
 (0)