Skip to content

Commit 3030d4e

Browse files
committed
[core] Introduce DelayedLookupCompactManager to support delayed lookup compaction
1 parent d6fa421 commit 3030d4e

File tree

11 files changed

+751
-39
lines changed

11 files changed

+751
-39
lines changed

docs/layouts/shortcodes/generated/core_configuration.html

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,24 @@
146146
<td>String</td>
147147
<td>Specifies the commit user prefix.</td>
148148
</tr>
149+
<tr>
150+
<td><h5>compaction.lookup-delay.l0-file-trigger</h5></td>
151+
<td style="word-wrap: break-word;">5</td>
152+
<td>Integer</td>
153+
<td>The L0 file trigger for a delayed lookup compaction. A delayed lookup compaction will only be performed when L0 files reach this trigger.</td>
154+
</tr>
155+
<tr>
156+
<td><h5>compaction.lookup-delay.partition-threshold</h5></td>
157+
<td style="word-wrap: break-word;">(none)</td>
158+
<td>Duration</td>
159+
<td>The threshold of partitioned pk table to perform delayed lookup compaction. Delayed lookup compaction can be configured with a less frequent compaction strategy to sacrifice timeliness for overall resource usage saving. This option is only valid for a partitioned pk table when needLookup() is true.</td>
160+
</tr>
161+
<tr>
162+
<td><h5>compaction.lookup-delay.stop-trigger</h5></td>
163+
<td style="word-wrap: break-word;">10</td>
164+
<td>Integer</td>
165+
<td>The stop trigger for a delayed lookup compaction. For every stop trigger, a forced lookup compaction will be performed to flush L0 files to higher level.</td>
166+
</tr>
149167
<tr>
150168
<td><h5>compaction.max-size-amplification-percent</h5></td>
151169
<td style="word-wrap: break-word;">200</td>

paimon-common/src/main/java/org/apache/paimon/CoreOptions.java

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -627,6 +627,32 @@ public class CoreOptions implements Serializable {
627627
text("Default value of Bucketed Append Table is '5'."))
628628
.build());
629629

630+
public static final ConfigOption<Duration> LOOKUP_DELAY_PARTITION_THRESHOLD =
631+
key("compaction.lookup-delay.partition-threshold")
632+
.durationType()
633+
.noDefaultValue()
634+
.withDescription(
635+
"The threshold of partitioned pk table to perform delayed lookup compaction. "
636+
+ "Delayed lookup compaction can be configured with a less frequent "
637+
+ "compaction strategy to sacrifice timeliness for overall resource usage saving. "
638+
+ "This option is only valid for a partitioned pk table when needLookup() is true.");
639+
640+
public static final ConfigOption<Integer> LOOKUP_DELAY_L0_FILE_TRIGGER =
641+
key("compaction.lookup-delay.l0-file-trigger")
642+
.intType()
643+
.defaultValue(5)
644+
.withDescription(
645+
"The L0 file trigger for a delayed lookup compaction. A delayed lookup compaction "
646+
+ "will only be performed when L0 files reach this trigger.");
647+
648+
public static final ConfigOption<Integer> LOOKUP_DELAY_STOP_TRIGGER =
649+
key("compaction.lookup-delay.stop-trigger")
650+
.intType()
651+
.defaultValue(LOOKUP_DELAY_L0_FILE_TRIGGER.defaultValue() * 2)
652+
.withDescription(
653+
"The stop trigger for a delayed lookup compaction. For every stop trigger, "
654+
+ "a forced lookup compaction will be performed to flush L0 files to higher level.");
655+
630656
public static final ConfigOption<ChangelogProducer> CHANGELOG_PRODUCER =
631657
key("changelog-producer")
632658
.enumType(ChangelogProducer.class)
@@ -2043,6 +2069,19 @@ public Duration optimizedCompactionInterval() {
20432069
return options.get(COMPACTION_OPTIMIZATION_INTERVAL);
20442070
}
20452071

2072+
@Nullable
2073+
public Duration lookupDelayPartitionThreshold() {
2074+
return options.get(LOOKUP_DELAY_PARTITION_THRESHOLD);
2075+
}
2076+
2077+
public int lookupDelayL0FileTrigger() {
2078+
return options.get(LOOKUP_DELAY_L0_FILE_TRIGGER);
2079+
}
2080+
2081+
public int lookupDelayStopTrigger() {
2082+
return options.get(LOOKUP_DELAY_STOP_TRIGGER);
2083+
}
2084+
20462085
public int numSortedRunStopTrigger() {
20472086
Integer stopTrigger = options.get(NUM_SORTED_RUNS_STOP_TRIGGER);
20482087
if (stopTrigger == null) {

paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ private long newSequenceNumber() {
142142
}
143143

144144
@VisibleForTesting
145-
CompactManager compactManager() {
145+
public CompactManager compactManager() {
146146
return compactManager;
147147
}
148148

Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.mergetree.compact;
20+
21+
import org.apache.paimon.annotation.VisibleForTesting;
22+
import org.apache.paimon.data.InternalRow;
23+
import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer;
24+
import org.apache.paimon.mergetree.Levels;
25+
import org.apache.paimon.operation.metrics.CompactionMetrics;
26+
27+
import org.slf4j.Logger;
28+
import org.slf4j.LoggerFactory;
29+
30+
import javax.annotation.Nullable;
31+
32+
import java.time.Duration;
33+
import java.time.LocalDateTime;
34+
import java.time.format.DateTimeFormatter;
35+
import java.util.Comparator;
36+
import java.util.concurrent.ExecutorService;
37+
import java.util.concurrent.atomic.AtomicInteger;
38+
39+
/** Compact manager to perform the delayed lookup compaction. */
40+
public class DelayedLookupCompactManager extends MergeTreeCompactManager {
41+
42+
private static final Logger LOG = LoggerFactory.getLogger(DelayedLookupCompactManager.class);
43+
private final CompactStrategy strategy;
44+
private final LocalDateTime currentPartitionDate;
45+
private final Duration partitionThreshold;
46+
private final int l0FileTrigger;
47+
private final int stopTrigger;
48+
49+
private final AtomicInteger triggerCount = new AtomicInteger(0);
50+
51+
public DelayedLookupCompactManager(
52+
ExecutorService executor,
53+
Levels levels,
54+
CompactStrategy strategy,
55+
Comparator<InternalRow> keyComparator,
56+
long compactionFileSize,
57+
int numSortedRunStopTrigger,
58+
CompactRewriter rewriter,
59+
@Nullable CompactionMetrics.Reporter metricsReporter,
60+
@Nullable DeletionVectorsMaintainer dvMaintainer,
61+
boolean lazyGenDeletionFile,
62+
LocalDateTime currentPartitionDate,
63+
Duration partitionThreshold,
64+
int l0FileTrigger,
65+
int stopTrigger) {
66+
super(
67+
executor,
68+
levels,
69+
strategy,
70+
keyComparator,
71+
compactionFileSize,
72+
numSortedRunStopTrigger,
73+
rewriter,
74+
metricsReporter,
75+
dvMaintainer,
76+
lazyGenDeletionFile);
77+
this.strategy = strategy;
78+
this.currentPartitionDate = currentPartitionDate;
79+
this.partitionThreshold = partitionThreshold;
80+
this.l0FileTrigger = l0FileTrigger;
81+
this.stopTrigger = stopTrigger;
82+
}
83+
84+
@Override
85+
public void triggerCompaction(boolean fullCompaction) {
86+
if (isDelayedCompactPartition()) {
87+
if (shouldTriggerDelayedLookupCompact()) {
88+
triggerCount.set(0);
89+
super.triggerCompaction(fullCompaction);
90+
} else {
91+
LOG.info(
92+
"Skip to trigger the delayed lookup compaction for trigger count {} and l0 file size {}.",
93+
triggerCount.get(),
94+
l0FileTrigger);
95+
triggerCount.incrementAndGet();
96+
}
97+
} else {
98+
super.triggerCompaction(fullCompaction);
99+
}
100+
}
101+
102+
@Override
103+
public boolean hasDelayedCompact() {
104+
return strategy instanceof ForceUpLevel0Compaction && level0Files() > 0;
105+
}
106+
107+
@VisibleForTesting
108+
public boolean shouldTriggerDelayedLookupCompact() {
109+
if (level0Files() >= l0FileTrigger) {
110+
LOG.info("Trigger delayed lookup compact for L0 file count: {}", level0Files());
111+
return true;
112+
} else if (triggerCount.get() >= stopTrigger) {
113+
LOG.info("Trigger delayed lookup compact for stopTrigger count: {}", triggerCount);
114+
return true;
115+
} else {
116+
return false;
117+
}
118+
}
119+
120+
@VisibleForTesting
121+
public boolean isDelayedCompactPartition() {
122+
if (partitionThreshold == null) {
123+
return false;
124+
}
125+
LocalDateTime delayedCompactPartition = LocalDateTime.now().minus(partitionThreshold);
126+
// For delayedCompactPartition=20250120, any data insert into partitions<=20250120 should be
127+
// delayed compact.
128+
boolean result = !currentPartitionDate.isAfter(delayedCompactPartition);
129+
LOG.debug(
130+
"Current partition Date: {}, delayed compact partition Date: {}, delayed compact partition result: {}.",
131+
currentPartitionDate.format(DateTimeFormatter.BASIC_ISO_DATE),
132+
delayedCompactPartition.format(DateTimeFormatter.BASIC_ISO_DATE),
133+
result);
134+
return result;
135+
}
136+
137+
private int level0Files() {
138+
return levels().level0().size();
139+
}
140+
}

paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java

Lines changed: 41 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
import org.apache.paimon.mergetree.MergeTreeWriter;
5353
import org.apache.paimon.mergetree.compact.CompactRewriter;
5454
import org.apache.paimon.mergetree.compact.CompactStrategy;
55+
import org.apache.paimon.mergetree.compact.DelayedLookupCompactManager;
5556
import org.apache.paimon.mergetree.compact.ForceUpLevel0Compaction;
5657
import org.apache.paimon.mergetree.compact.FullChangelogMergeTreeCompactRewriter;
5758
import org.apache.paimon.mergetree.compact.LookupMergeTreeCompactRewriter;
@@ -62,6 +63,7 @@
6263
import org.apache.paimon.mergetree.compact.MergeTreeCompactRewriter;
6364
import org.apache.paimon.mergetree.compact.UniversalCompaction;
6465
import org.apache.paimon.options.Options;
66+
import org.apache.paimon.partition.PartitionTimeExtractor;
6567
import org.apache.paimon.schema.KeyValueFieldsExtractor;
6668
import org.apache.paimon.schema.SchemaManager;
6769
import org.apache.paimon.schema.TableSchema;
@@ -79,6 +81,7 @@
7981

8082
import javax.annotation.Nullable;
8183

84+
import java.time.LocalDateTime;
8285
import java.util.Comparator;
8386
import java.util.List;
8487
import java.util.Map;
@@ -107,6 +110,7 @@ public class KeyValueFileStoreWrite extends MemoryFileStoreWrite<KeyValue> {
107110
private final RowType keyType;
108111
private final RowType valueType;
109112
private final RowType partitionType;
113+
private final PartitionTimeExtractor partitionTimeExtractor;
110114
private final String commitUser;
111115
@Nullable private final RecordLevelExpire recordLevelExpire;
112116
@Nullable private Cache<String, LookupFile> lookupFileCache;
@@ -142,6 +146,7 @@ public KeyValueFileStoreWrite(
142146
tableName);
143147
this.fileIO = fileIO;
144148
this.partitionType = partitionType;
149+
this.partitionTimeExtractor = new PartitionTimeExtractor(options);
145150
this.keyType = keyType;
146151
this.valueType = valueType;
147152
this.commitUser = commitUser;
@@ -252,19 +257,42 @@ private CompactManager createCompactManager(
252257
userDefinedSeqComparator,
253258
levels,
254259
dvMaintainer);
255-
return new MergeTreeCompactManager(
256-
compactExecutor,
257-
levels,
258-
compactStrategy,
259-
keyComparator,
260-
options.compactionFileSize(true),
261-
options.numSortedRunStopTrigger(),
262-
rewriter,
263-
compactionMetrics == null
264-
? null
265-
: compactionMetrics.createReporter(partition, bucket),
266-
dvMaintainer,
267-
options.prepareCommitWaitCompaction());
260+
261+
if (options.needLookup() && options.lookupDelayPartitionThreshold() != null) {
262+
LocalDateTime partitionTime =
263+
partitionTimeExtractor.extract(partition, partitionType);
264+
return new DelayedLookupCompactManager(
265+
compactExecutor,
266+
levels,
267+
compactStrategy,
268+
keyComparator,
269+
options.compactionFileSize(true),
270+
options.numSortedRunStopTrigger(),
271+
rewriter,
272+
compactionMetrics == null
273+
? null
274+
: compactionMetrics.createReporter(partition, bucket),
275+
dvMaintainer,
276+
options.prepareCommitWaitCompaction(),
277+
partitionTime,
278+
options.lookupDelayPartitionThreshold(),
279+
options.lookupDelayL0FileTrigger(),
280+
options.lookupDelayStopTrigger());
281+
} else {
282+
return new MergeTreeCompactManager(
283+
compactExecutor,
284+
levels,
285+
compactStrategy,
286+
keyComparator,
287+
options.compactionFileSize(true),
288+
options.numSortedRunStopTrigger(),
289+
rewriter,
290+
compactionMetrics == null
291+
? null
292+
: compactionMetrics.createReporter(partition, bucket),
293+
dvMaintainer,
294+
options.prepareCommitWaitCompaction());
295+
}
268296
}
269297
}
270298

paimon-core/src/main/java/org/apache/paimon/partition/PartitionTimeExtractor.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,11 @@
1818

1919
package org.apache.paimon.partition;
2020

21+
import org.apache.paimon.CoreOptions;
22+
import org.apache.paimon.data.BinaryRow;
23+
import org.apache.paimon.types.RowType;
24+
import org.apache.paimon.utils.RowDataToObjectArrayConverter;
25+
2126
import javax.annotation.Nullable;
2227

2328
import java.time.LocalDate;
@@ -30,6 +35,7 @@
3035
import java.time.format.SignStyle;
3136
import java.time.temporal.ChronoField;
3237
import java.util.ArrayList;
38+
import java.util.Arrays;
3339
import java.util.LinkedHashMap;
3440
import java.util.List;
3541
import java.util.Locale;
@@ -79,11 +85,22 @@ public class PartitionTimeExtractor {
7985
@Nullable private final String pattern;
8086
@Nullable private final String formatter;
8187

88+
public PartitionTimeExtractor(CoreOptions options) {
89+
this(options.partitionTimestampPattern(), options.partitionTimestampFormatter());
90+
}
91+
8292
public PartitionTimeExtractor(@Nullable String pattern, @Nullable String formatter) {
8393
this.pattern = pattern;
8494
this.formatter = formatter;
8595
}
8696

97+
public LocalDateTime extract(BinaryRow partition, RowType partitionType) {
98+
RowDataToObjectArrayConverter toObjectArrayConverter =
99+
new RowDataToObjectArrayConverter(partitionType);
100+
Object[] array = toObjectArrayConverter.convert(partition);
101+
return extract(partitionType.getFieldNames(), Arrays.asList(array));
102+
}
103+
87104
public LocalDateTime extract(LinkedHashMap<String, String> spec) {
88105
return extract(new ArrayList<>(spec.keySet()), new ArrayList<>(spec.values()));
89106
}

0 commit comments

Comments
 (0)