diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html
index fc5c04477bc1..19b8eae1bec8 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -146,6 +146,24 @@
String |
Specifies the commit user prefix. |
+
+ compaction.lookup-delay.l0-file-trigger |
+ 5 |
+ Integer |
+ The L0 file trigger for a delayed lookup compaction. A delayed lookup compaction will only be performed when L0 files reach this config value. |
+
+
+ compaction.lookup-delay.partition-threshold |
+ (none) |
+ Duration |
+ The threshold of partitioned pk table to perform delayed lookup compaction. Delayed lookup compaction can be configured with a less frequent compaction strategy to sacrifice timeliness for overall resource usage saving. This option is only valid for a partitioned pk table when CoreOptions#needLookup() is true. |
+
+
+ compaction.lookup-delay.stop-trigger |
+ 10 |
+ Integer |
+ The stop trigger for a delayed lookup compaction. For every stop trigger, a forced lookup compaction will be performed to flush L0 files to higher level. |
+
compaction.max-size-amplification-percent |
200 |
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index 9e0c679b576e..a3f46bb1d321 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -627,6 +627,33 @@ public class CoreOptions implements Serializable {
text("Default value of Bucketed Append Table is '5'."))
.build());
+ public static final ConfigOption LOOKUP_DELAY_PARTITION_THRESHOLD =
+ key("compaction.lookup-delay.partition-threshold")
+ .durationType()
+ .noDefaultValue()
+ .withDescription(
+ "The threshold of partitioned pk table to perform delayed lookup compaction. "
+ + "Delayed lookup compaction can be configured with a less frequent "
+ + "compaction strategy to sacrifice timeliness for overall resource usage saving. "
+ + "This option is only valid for a partitioned pk table when "
+ + "CoreOptions#needLookup() is true.");
+
+ public static final ConfigOption LOOKUP_DELAY_L0_FILE_TRIGGER =
+ key("compaction.lookup-delay.l0-file-trigger")
+ .intType()
+ .defaultValue(5)
+ .withDescription(
+ "The L0 file trigger for a delayed lookup compaction. A delayed lookup compaction "
+ + "will only be performed when L0 files reach this config value.");
+
+ public static final ConfigOption LOOKUP_DELAY_STOP_TRIGGER =
+ key("compaction.lookup-delay.stop-trigger")
+ .intType()
+ .defaultValue(LOOKUP_DELAY_L0_FILE_TRIGGER.defaultValue() * 2)
+ .withDescription(
+ "The stop trigger for a delayed lookup compaction. For every stop trigger, "
+ + "a forced lookup compaction will be performed to flush L0 files to higher level.");
+
public static final ConfigOption CHANGELOG_PRODUCER =
key("changelog-producer")
.enumType(ChangelogProducer.class)
@@ -2043,6 +2070,19 @@ public Duration optimizedCompactionInterval() {
return options.get(COMPACTION_OPTIMIZATION_INTERVAL);
}
+ @Nullable
+ public Duration lookupDelayPartitionThreshold() {
+ return options.get(LOOKUP_DELAY_PARTITION_THRESHOLD);
+ }
+
+ public int lookupDelayL0FileTrigger() {
+ return options.get(LOOKUP_DELAY_L0_FILE_TRIGGER);
+ }
+
+ public int lookupDelayStopTrigger() {
+ return options.get(LOOKUP_DELAY_STOP_TRIGGER);
+ }
+
public int numSortedRunStopTrigger() {
Integer stopTrigger = options.get(NUM_SORTED_RUNS_STOP_TRIGGER);
if (stopTrigger == null) {
diff --git a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
index 4c313dd655c0..6668544e3b43 100644
--- a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
@@ -215,6 +215,11 @@ public boolean isCompacting() {
return compactManager.isCompacting();
}
+ @Override
+ public boolean hasDelayedCompact() {
+ return false;
+ }
+
@VisibleForTesting
void flush(boolean waitForLatestCompaction, boolean forcedFullCompaction) throws Exception {
List flushedFiles = sinkWriter.flush();
diff --git a/paimon-core/src/main/java/org/apache/paimon/append/BucketedAppendCompactManager.java b/paimon-core/src/main/java/org/apache/paimon/append/BucketedAppendCompactManager.java
index 02f469624c2a..c854069995d7 100644
--- a/paimon-core/src/main/java/org/apache/paimon/append/BucketedAppendCompactManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/append/BucketedAppendCompactManager.java
@@ -186,6 +186,11 @@ public Optional getCompactionResult(boolean blocking)
return result;
}
+ @Override
+ public boolean hasDelayedCompact() {
+ return false;
+ }
+
@VisibleForTesting
Optional> pickCompactBefore() {
if (toCompact.isEmpty()) {
diff --git a/paimon-core/src/main/java/org/apache/paimon/compact/CompactManager.java b/paimon-core/src/main/java/org/apache/paimon/compact/CompactManager.java
index 88897bd860b8..572afc2430fe 100644
--- a/paimon-core/src/main/java/org/apache/paimon/compact/CompactManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/compact/CompactManager.java
@@ -54,4 +54,9 @@ Optional getCompactionResult(boolean blocking)
/** Check if a compaction is in progress, or if a compaction result remains to be fetched. */
boolean isCompacting();
+
+ /**
+ * Check if the {@code CompactManager} has delayed compacts need to be triggered in the future.
+ */
+ boolean hasDelayedCompact();
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/compact/NoopCompactManager.java b/paimon-core/src/main/java/org/apache/paimon/compact/NoopCompactManager.java
index 6f84105f389a..1e0a4361329c 100644
--- a/paimon-core/src/main/java/org/apache/paimon/compact/NoopCompactManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/compact/NoopCompactManager.java
@@ -75,6 +75,11 @@ public boolean isCompacting() {
return false;
}
+ @Override
+ public boolean hasDelayedCompact() {
+ return false;
+ }
+
@Override
public void close() throws IOException {}
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java
index 1c805e764a77..8bca37df705d 100644
--- a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java
@@ -142,7 +142,7 @@ private long newSequenceNumber() {
}
@VisibleForTesting
- CompactManager compactManager() {
+ public CompactManager compactManager() {
return compactManager;
}
@@ -283,6 +283,11 @@ public boolean isCompacting() {
return compactManager.isCompacting();
}
+ @Override
+ public boolean hasDelayedCompact() {
+ return compactManager.hasDelayedCompact();
+ }
+
@Override
public void sync() throws Exception {
trySyncLatestCompaction(true);
diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/DelayedLookupCompactManager.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/DelayedLookupCompactManager.java
new file mode 100644
index 000000000000..fd9c3be2bb58
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/DelayedLookupCompactManager.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.mergetree.compact;
+
+import org.apache.paimon.annotation.VisibleForTesting;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer;
+import org.apache.paimon.mergetree.Levels;
+import org.apache.paimon.operation.metrics.CompactionMetrics;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.time.Duration;
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.Comparator;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/** Compact manager to perform the delayed lookup compaction. */
+public class DelayedLookupCompactManager extends MergeTreeCompactManager {
+
+ private static final Logger LOG = LoggerFactory.getLogger(DelayedLookupCompactManager.class);
+ private final CompactStrategy strategy;
+ private final LocalDateTime currentPartitionDate;
+ private final Duration delayPartitionThreshold;
+ private final int l0FileTriggerThreshold;
+ private final int stopTriggerThreshold;
+
+ private final AtomicInteger triggerCount = new AtomicInteger(0);
+
+ public DelayedLookupCompactManager(
+ ExecutorService executor,
+ Levels levels,
+ CompactStrategy strategy,
+ Comparator keyComparator,
+ long compactionFileSize,
+ int numSortedRunStopTrigger,
+ CompactRewriter rewriter,
+ @Nullable CompactionMetrics.Reporter metricsReporter,
+ @Nullable DeletionVectorsMaintainer dvMaintainer,
+ boolean lazyGenDeletionFile,
+ LocalDateTime currentPartitionDate,
+ Duration delayPartitionThreshold,
+ int l0FileTriggerThreshold,
+ int stopTriggerThreshold) {
+ super(
+ executor,
+ levels,
+ strategy,
+ keyComparator,
+ compactionFileSize,
+ numSortedRunStopTrigger,
+ rewriter,
+ metricsReporter,
+ dvMaintainer,
+ lazyGenDeletionFile);
+ this.strategy = strategy;
+ this.currentPartitionDate = currentPartitionDate;
+ this.delayPartitionThreshold = delayPartitionThreshold;
+ this.l0FileTriggerThreshold = l0FileTriggerThreshold;
+ this.stopTriggerThreshold = stopTriggerThreshold;
+ }
+
+ @Override
+ public void triggerCompaction(boolean fullCompaction) {
+ if (isDelayedCompactPartition()) {
+ if (shouldTriggerDelayedLookupCompact()) {
+ triggerCount.set(0);
+ super.triggerCompaction(fullCompaction);
+ } else {
+ LOG.info(
+ "Skip to trigger this lookup compaction because the compaction trigger count {} has not reached "
+ + "the stop trigger threshold {} or l0 file size {} is less than l0 file trigger threshold {}.",
+ triggerCount.get(),
+ stopTriggerThreshold,
+ level0Files(),
+ l0FileTriggerThreshold);
+ triggerCount.incrementAndGet();
+ }
+ } else {
+ super.triggerCompaction(fullCompaction);
+ }
+ }
+
+ @Override
+ public boolean hasDelayedCompact() {
+ return strategy instanceof ForceUpLevel0Compaction && level0Files() > 0;
+ }
+
+ @VisibleForTesting
+ public boolean shouldTriggerDelayedLookupCompact() {
+ if (level0Files() >= l0FileTriggerThreshold) {
+ LOG.info("Trigger delayed lookup compact for L0 file count: {}", level0Files());
+ return true;
+ } else if (triggerCount.get() >= stopTriggerThreshold) {
+ LOG.info("Trigger delayed lookup compact for stopTrigger count: {}", triggerCount);
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ @VisibleForTesting
+ public boolean isDelayedCompactPartition() {
+ if (delayPartitionThreshold == null) {
+ return false;
+ }
+ LocalDateTime delayedCompactPartition = LocalDateTime.now().minus(delayPartitionThreshold);
+ // For delayedCompactPartitionThreshold=20250120, any data insert into partitions<=20250120
+ // should be delayed compact.
+ boolean result = !currentPartitionDate.isAfter(delayedCompactPartition);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "Current partition Date: {}, delayed compact partition threshold: {}, delayed compact partition result: {}.",
+ currentPartitionDate.format(DateTimeFormatter.BASIC_ISO_DATE),
+ delayedCompactPartition.format(DateTimeFormatter.BASIC_ISO_DATE),
+ result);
+ }
+ return result;
+ }
+
+ private int level0Files() {
+ return levels().level0().size();
+ }
+}
diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManager.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManager.java
index 8ae29201301c..8099c9b9d0c6 100644
--- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManager.java
@@ -240,6 +240,11 @@ public Optional getCompactionResult(boolean blocking)
return result;
}
+ @Override
+ public boolean hasDelayedCompact() {
+ return false;
+ }
+
private void reportLevel0FileCount() {
if (metricsReporter != null) {
metricsReporter.reportLevel0FileCount(levels.level0().size());
diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
index 14dfe75a6e35..39af6940819a 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
@@ -290,9 +290,12 @@ Function, Boolean> createConflictAwareWriterCleanChecker(
//
// Condition 2: No compaction is in progress. That is, no more changelog will be
// produced.
+ //
+ // Condition 3: The writer has no delayed compaction.
return writerContainer ->
writerContainer.lastModifiedCommitIdentifier < latestCommittedIdentifier
- && !writerContainer.writer.isCompacting();
+ && !writerContainer.writer.isCompacting()
+ && !writerContainer.writer.hasDelayedCompact();
}
protected static
@@ -397,7 +400,7 @@ public Map> getActiveBuckets() {
return result;
}
- protected WriterContainer getWriterWrapper(BinaryRow partition, int bucket) {
+ public WriterContainer getWriterWrapper(BinaryRow partition, int bucket) {
Map> buckets = writers.get(partition);
if (buckets == null) {
buckets = new HashMap<>();
@@ -552,7 +555,7 @@ protected WriterContainer(
}
@VisibleForTesting
- Map>> writers() {
+ public Map>> writers() {
return writers;
}
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
index d061e181618b..681cbbc4b1b1 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
@@ -52,6 +52,7 @@
import org.apache.paimon.mergetree.MergeTreeWriter;
import org.apache.paimon.mergetree.compact.CompactRewriter;
import org.apache.paimon.mergetree.compact.CompactStrategy;
+import org.apache.paimon.mergetree.compact.DelayedLookupCompactManager;
import org.apache.paimon.mergetree.compact.ForceUpLevel0Compaction;
import org.apache.paimon.mergetree.compact.FullChangelogMergeTreeCompactRewriter;
import org.apache.paimon.mergetree.compact.LookupMergeTreeCompactRewriter;
@@ -62,6 +63,7 @@
import org.apache.paimon.mergetree.compact.MergeTreeCompactRewriter;
import org.apache.paimon.mergetree.compact.UniversalCompaction;
import org.apache.paimon.options.Options;
+import org.apache.paimon.partition.PartitionTimeExtractor;
import org.apache.paimon.schema.KeyValueFieldsExtractor;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
@@ -79,6 +81,7 @@
import javax.annotation.Nullable;
+import java.time.LocalDateTime;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
@@ -107,6 +110,7 @@ public class KeyValueFileStoreWrite extends MemoryFileStoreWrite {
private final RowType keyType;
private final RowType valueType;
private final RowType partitionType;
+ private final PartitionTimeExtractor partitionTimeExtractor;
private final String commitUser;
@Nullable private final RecordLevelExpire recordLevelExpire;
@Nullable private Cache lookupFileCache;
@@ -142,6 +146,7 @@ public KeyValueFileStoreWrite(
tableName);
this.fileIO = fileIO;
this.partitionType = partitionType;
+ this.partitionTimeExtractor = new PartitionTimeExtractor(options);
this.keyType = keyType;
this.valueType = valueType;
this.commitUser = commitUser;
@@ -252,19 +257,42 @@ private CompactManager createCompactManager(
userDefinedSeqComparator,
levels,
dvMaintainer);
- return new MergeTreeCompactManager(
- compactExecutor,
- levels,
- compactStrategy,
- keyComparator,
- options.compactionFileSize(true),
- options.numSortedRunStopTrigger(),
- rewriter,
- compactionMetrics == null
- ? null
- : compactionMetrics.createReporter(partition, bucket),
- dvMaintainer,
- options.prepareCommitWaitCompaction());
+
+ if (options.needLookup() && options.lookupDelayPartitionThreshold() != null) {
+ LocalDateTime partitionTime =
+ partitionTimeExtractor.extract(partition, partitionType);
+ return new DelayedLookupCompactManager(
+ compactExecutor,
+ levels,
+ compactStrategy,
+ keyComparator,
+ options.compactionFileSize(true),
+ options.numSortedRunStopTrigger(),
+ rewriter,
+ compactionMetrics == null
+ ? null
+ : compactionMetrics.createReporter(partition, bucket),
+ dvMaintainer,
+ options.prepareCommitWaitCompaction(),
+ partitionTime,
+ options.lookupDelayPartitionThreshold(),
+ options.lookupDelayL0FileTrigger(),
+ options.lookupDelayStopTrigger());
+ } else {
+ return new MergeTreeCompactManager(
+ compactExecutor,
+ levels,
+ compactStrategy,
+ keyComparator,
+ options.compactionFileSize(true),
+ options.numSortedRunStopTrigger(),
+ rewriter,
+ compactionMetrics == null
+ ? null
+ : compactionMetrics.createReporter(partition, bucket),
+ dvMaintainer,
+ options.prepareCommitWaitCompaction());
+ }
}
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/partition/PartitionTimeExtractor.java b/paimon-core/src/main/java/org/apache/paimon/partition/PartitionTimeExtractor.java
index 0016619e2162..9af1789131b7 100644
--- a/paimon-core/src/main/java/org/apache/paimon/partition/PartitionTimeExtractor.java
+++ b/paimon-core/src/main/java/org/apache/paimon/partition/PartitionTimeExtractor.java
@@ -18,6 +18,11 @@
package org.apache.paimon.partition;
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.RowDataToObjectArrayConverter;
+
import javax.annotation.Nullable;
import java.time.LocalDate;
@@ -30,6 +35,7 @@
import java.time.format.SignStyle;
import java.time.temporal.ChronoField;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Locale;
@@ -79,11 +85,22 @@ public class PartitionTimeExtractor {
@Nullable private final String pattern;
@Nullable private final String formatter;
+ public PartitionTimeExtractor(CoreOptions options) {
+ this(options.partitionTimestampPattern(), options.partitionTimestampFormatter());
+ }
+
public PartitionTimeExtractor(@Nullable String pattern, @Nullable String formatter) {
this.pattern = pattern;
this.formatter = formatter;
}
+ public LocalDateTime extract(BinaryRow partition, RowType partitionType) {
+ RowDataToObjectArrayConverter toObjectArrayConverter =
+ new RowDataToObjectArrayConverter(partitionType);
+ Object[] array = toObjectArrayConverter.convert(partition);
+ return extract(partitionType.getFieldNames(), Arrays.asList(array));
+ }
+
public LocalDateTime extract(LinkedHashMap spec) {
return extract(new ArrayList<>(spec.keySet()), new ArrayList<>(spec.values()));
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/postpone/PostponeBucketWriter.java b/paimon-core/src/main/java/org/apache/paimon/postpone/PostponeBucketWriter.java
index 47cafb9645f9..31c261eb75cd 100644
--- a/paimon-core/src/main/java/org/apache/paimon/postpone/PostponeBucketWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/postpone/PostponeBucketWriter.java
@@ -88,6 +88,11 @@ public boolean isCompacting() {
return false;
}
+ @Override
+ public boolean hasDelayedCompact() {
+ return false;
+ }
+
@Override
public void sync() throws Exception {}
diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/RecordWriter.java b/paimon-core/src/main/java/org/apache/paimon/utils/RecordWriter.java
index 2ccf4cc861f6..c8591dafb176 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/RecordWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/RecordWriter.java
@@ -67,6 +67,11 @@ public interface RecordWriter {
/** Check if a compaction is in progress, or if a compaction result remains to be fetched. */
boolean isCompacting();
+ /**
+ * Check if the {@code RecordWriter} has delayed compacts need to be triggered in the future.
+ */
+ boolean hasDelayedCompact();
+
/**
* Sync the writer. The structure related to file reading and writing is thread unsafe, there
* are asynchronous threads inside the writer, which should be synced before reading data.
diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/DelayedLookupCompactManagerTest.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/DelayedLookupCompactManagerTest.java
new file mode 100644
index 000000000000..bf7d4ce62344
--- /dev/null
+++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/DelayedLookupCompactManagerTest.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.mergetree.compact;
+
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.mergetree.Levels;
+
+import org.junit.jupiter.api.Test;
+
+import java.time.Duration;
+import java.time.LocalDateTime;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link MergeTreeCompactManager}. */
+public class DelayedLookupCompactManagerTest extends MergeTreeCompactManagerTest {
+
+ @Test
+ public void testDelayedLookupCompactionBeforeThreshold()
+ throws ExecutionException, InterruptedException {
+ List inputs =
+ Arrays.asList(
+ new LevelMinMax(0, 1, 5),
+ new LevelMinMax(0, 3, 6),
+ new LevelMinMax(0, 6, 8),
+ new LevelMinMax(1, 1, 4),
+ new LevelMinMax(1, 6, 8),
+ new LevelMinMax(1, 10, 10),
+ new LevelMinMax(2, 1, 3),
+ new LevelMinMax(2, 4, 6));
+ List files = new ArrayList<>();
+ for (int i = 0; i < inputs.size(); i++) {
+ LevelMinMax minMax = inputs.get(i);
+ files.add(minMax.toFile(i));
+ }
+
+ Levels levels = new Levels(comparator, files, 3);
+
+ CompactStrategy strategy = new ForceUpLevel0Compaction(new UniversalCompaction(0, 0, 5));
+
+ DelayedLookupCompactManager oldManager =
+ new DelayedLookupCompactManager(
+ service,
+ levels,
+ strategy,
+ comparator,
+ 2,
+ Integer.MAX_VALUE,
+ new TestRewriter(true),
+ null,
+ null,
+ false,
+ LocalDateTime.now().minusDays(2),
+ Duration.ofDays(2),
+ 5,
+ 5);
+
+ // delayed lookup compaction not triggered
+ assertThat(oldManager.isDelayedCompactPartition()).isTrue();
+ assertThat(oldManager.shouldTriggerDelayedLookupCompact()).isFalse();
+ assertThat(oldManager.hasDelayedCompact()).isTrue();
+
+ // trigger delayed lookup compaction by stop trigger
+ oldManager.triggerCompaction(false);
+ assertThat(oldManager.getCompactionResult(true)).isEmpty();
+
+ oldManager.triggerCompaction(false);
+ oldManager.triggerCompaction(false);
+ oldManager.triggerCompaction(false);
+ oldManager.triggerCompaction(false);
+
+ assertThat(oldManager.shouldTriggerDelayedLookupCompact()).isTrue();
+ oldManager.triggerCompaction(false);
+ assertThat(oldManager.getCompactionResult(true)).isPresent();
+ assertThat(oldManager.hasDelayedCompact()).isFalse();
+
+ // trigger delayed lookup compaction by l0 file count
+ oldManager.addNewFile((new LevelMinMax(0, 11, 12)).toFile(inputs.size()));
+ oldManager.addNewFile((new LevelMinMax(0, 13, 14)).toFile(inputs.size() + 1));
+ oldManager.addNewFile((new LevelMinMax(0, 15, 16)).toFile(inputs.size() + 2));
+ oldManager.addNewFile((new LevelMinMax(0, 17, 18)).toFile(inputs.size() + 3));
+ assertThat(oldManager.shouldTriggerDelayedLookupCompact()).isFalse();
+ oldManager.addNewFile((new LevelMinMax(0, 15, 16)).toFile(inputs.size() + 4));
+ oldManager.addNewFile((new LevelMinMax(0, 17, 18)).toFile(inputs.size() + 6));
+ assertThat(oldManager.shouldTriggerDelayedLookupCompact()).isTrue();
+ oldManager.triggerCompaction(false);
+ assertThat(oldManager.getCompactionResult(true)).isPresent();
+ }
+
+ @Test
+ public void testDelayedLookupCompactionAfterThreshold()
+ throws ExecutionException, InterruptedException {
+ List inputs =
+ Arrays.asList(
+ new LevelMinMax(0, 1, 5),
+ new LevelMinMax(0, 3, 6),
+ new LevelMinMax(0, 6, 8),
+ new LevelMinMax(1, 1, 4),
+ new LevelMinMax(1, 6, 8),
+ new LevelMinMax(1, 10, 10),
+ new LevelMinMax(2, 1, 3),
+ new LevelMinMax(2, 4, 6));
+ List files = new ArrayList<>();
+ for (int i = 0; i < inputs.size(); i++) {
+ LevelMinMax minMax = inputs.get(i);
+ files.add(minMax.toFile(i));
+ }
+
+ Levels levels = new Levels(comparator, files, 3);
+
+ CompactStrategy strategy = new ForceUpLevel0Compaction(new UniversalCompaction(0, 0, 5));
+
+ DelayedLookupCompactManager newManager =
+ new DelayedLookupCompactManager(
+ service,
+ levels,
+ strategy,
+ comparator,
+ 2,
+ Integer.MAX_VALUE,
+ new TestRewriter(true),
+ null,
+ null,
+ false,
+ LocalDateTime.now().minusDays(1),
+ Duration.ofDays(2),
+ 5,
+ 5);
+
+ // compaction will trigger immediately for new partitions
+ assertThat(newManager.isDelayedCompactPartition()).isFalse();
+ assertThat(newManager.shouldTriggerDelayedLookupCompact()).isFalse();
+ newManager.triggerCompaction(false);
+ assertThat(newManager.getCompactionResult(true)).isPresent();
+ assertThat(newManager.hasDelayedCompact()).isFalse();
+
+ // stop-trigger not working for new partitions
+ newManager.triggerCompaction(false);
+ newManager.triggerCompaction(false);
+ newManager.triggerCompaction(false);
+ newManager.triggerCompaction(false);
+ newManager.triggerCompaction(false);
+ newManager.triggerCompaction(false);
+ assertThat(newManager.shouldTriggerDelayedLookupCompact()).isFalse();
+ }
+}
diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManagerTest.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManagerTest.java
index 8484525c82c8..8d7ec962de78 100644
--- a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManagerTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManagerTest.java
@@ -49,9 +49,9 @@
/** Test for {@link MergeTreeCompactManager}. */
public class MergeTreeCompactManagerTest {
- private final Comparator comparator = Comparator.comparingInt(o -> o.getInt(0));
+ final Comparator comparator = Comparator.comparingInt(o -> o.getInt(0));
- private static ExecutorService service;
+ static ExecutorService service;
@BeforeAll
public static void before() {
@@ -223,11 +223,11 @@ private CompactStrategy testStrategy() {
return (numLevels, runs) -> Optional.of(CompactUnit.fromLevelRuns(numLevels - 1, runs));
}
- private static class TestRewriter extends AbstractCompactRewriter {
+ static class TestRewriter extends AbstractCompactRewriter {
private final boolean expectedDropDelete;
- private TestRewriter(boolean expectedDropDelete) {
+ TestRewriter(boolean expectedDropDelete) {
this.expectedDropDelete = expectedDropDelete;
}
@@ -262,25 +262,25 @@ public CompactResult rewrite(
}
}
- private static class LevelMinMax {
+ static class LevelMinMax {
- private final int level;
- private final int min;
- private final int max;
+ protected final int level;
+ protected final int min;
+ protected final int max;
- private LevelMinMax(DataFileMeta file) {
+ LevelMinMax(DataFileMeta file) {
this.level = file.level();
this.min = file.minKey().getInt(0);
this.max = file.maxKey().getInt(0);
}
- private LevelMinMax(int level, int min, int max) {
+ LevelMinMax(int level, int min, int max) {
this.level = level;
this.min = min;
this.max = max;
}
- private DataFileMeta toFile(long maxSequence) {
+ DataFileMeta toFile(long maxSequence) {
return newFile(level, min, max, maxSequence);
}
diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreWriteTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreWriteTest.java
new file mode 100644
index 000000000000..140c9c8913f6
--- /dev/null
+++ b/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreWriteTest.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.operation;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.KeyValue;
+import org.apache.paimon.TestFileStore;
+import org.apache.paimon.TestKeyValueGenerator;
+import org.apache.paimon.compact.CompactManager;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.disk.IOManager;
+import org.apache.paimon.disk.IOManagerImpl;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.mergetree.MergeTreeWriter;
+import org.apache.paimon.mergetree.compact.DeduplicateMergeFunction;
+import org.apache.paimon.mergetree.compact.DelayedLookupCompactManager;
+import org.apache.paimon.partition.PartitionTimeExtractor;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.schema.TableSchema;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.time.LocalDateTime;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link KeyValueFileStoreWrite}. */
+public class KeyValueFileStoreWriteTest {
+
+ private static final int NUM_BUCKETS = 10;
+
+ @TempDir java.nio.file.Path tempDir;
+
+ private IOManager ioManager;
+
+ @BeforeEach
+ public void before() throws IOException {
+ this.ioManager = new IOManagerImpl(tempDir.toString());
+ }
+
+ @Test
+ public void testDelayedLookupPartition() throws Exception {
+ SchemaManager schemaManager =
+ new SchemaManager(LocalFileIO.create(), new Path(tempDir.toUri()));
+ Map options = new HashMap<>();
+ Duration lookupDelayPartitionThreshold = Duration.ofDays(1L);
+ options.put(CoreOptions.LOOKUP_DELAY_PARTITION_THRESHOLD.key(), "1 d");
+ options.put(CoreOptions.DELETION_VECTORS_ENABLED.key(), "true");
+ options.put(CoreOptions.PARTITION_TIMESTAMP_FORMATTER.key(), "yyyyMMdd H");
+ options.put(CoreOptions.PARTITION_TIMESTAMP_PATTERN.key(), "$dt $hr");
+
+ TableSchema schema =
+ schemaManager.createTable(
+ new Schema(
+ TestKeyValueGenerator.DEFAULT_ROW_TYPE.getFields(),
+ TestKeyValueGenerator.DEFAULT_PART_TYPE.getFieldNames(),
+ TestKeyValueGenerator.getPrimaryKeys(
+ TestKeyValueGenerator.GeneratorMode.MULTI_PARTITIONED),
+ options,
+ null));
+ TestFileStore store =
+ new TestFileStore.Builder(
+ "avro",
+ tempDir.toString(),
+ NUM_BUCKETS,
+ TestKeyValueGenerator.DEFAULT_PART_TYPE,
+ TestKeyValueGenerator.KEY_TYPE,
+ TestKeyValueGenerator.DEFAULT_ROW_TYPE,
+ TestKeyValueGenerator.TestKeyValueFieldsExtractor.EXTRACTOR,
+ DeduplicateMergeFunction.factory(),
+ schema)
+ .build();
+
+ KeyValueFileStoreWrite write = (KeyValueFileStoreWrite) store.newWrite();
+ write.withIOManager(ioManager);
+ TestKeyValueGenerator gen = new TestKeyValueGenerator();
+ PartitionTimeExtractor partitionTimeExtractor = new PartitionTimeExtractor(store.options());
+
+ for (int i = 0; i < 100; i++) {
+ KeyValue keyValue = gen.next();
+ BinaryRow partition = gen.getPartition(keyValue);
+ LocalDateTime partitionTime =
+ partitionTimeExtractor.extract(
+ partition, TestKeyValueGenerator.DEFAULT_PART_TYPE);
+ AbstractFileStoreWrite.WriterContainer writerContainer =
+ write.createWriterContainer(gen.getPartition(keyValue), 1, false);
+ MergeTreeWriter writer = (MergeTreeWriter) writerContainer.writer;
+
+ CompactManager compactManager = writer.compactManager();
+ assertThat(compactManager).isInstanceOf(DelayedLookupCompactManager.class);
+
+ DelayedLookupCompactManager delayedLookupCompactManager =
+ (DelayedLookupCompactManager) compactManager;
+ assertThat(delayedLookupCompactManager.isDelayedCompactPartition())
+ .isEqualTo(
+ !partitionTime.isAfter(
+ LocalDateTime.now().minus(lookupDelayPartitionThreshold)));
+ }
+ }
+}
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/DelayedCompactStoreSinkWrite.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/DelayedCompactStoreSinkWrite.java
new file mode 100644
index 000000000000..856b9d853945
--- /dev/null
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/DelayedCompactStoreSinkWrite.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.memory.MemorySegmentPool;
+import org.apache.paimon.operation.AbstractFileStoreWrite;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.utils.RecordWriter;
+
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/** {@link StoreSinkWrite} for tables that need delayed compaction trigger. */
+public class DelayedCompactStoreSinkWrite extends StoreSinkWriteImpl {
+
+ private final String tableName;
+
+ private static final String HAS_DELAYED_COMPACT_BUCKETS_STATE_NAME =
+ "paimon_has_delayed_compact_buckets";
+
+ public DelayedCompactStoreSinkWrite(
+ FileStoreTable table,
+ String commitUser,
+ StoreSinkWriteState state,
+ IOManager ioManager,
+ boolean ignorePreviousFiles,
+ boolean waitCompaction,
+ boolean isStreaming,
+ @Nullable MemorySegmentPool memoryPool,
+ MetricGroup metricGroup) {
+ super(
+ table,
+ commitUser,
+ state,
+ ioManager,
+ ignorePreviousFiles,
+ waitCompaction,
+ isStreaming,
+ memoryPool,
+ metricGroup);
+
+ this.tableName = table.name();
+ List lateCompactBucketsStateValues =
+ state.get(tableName, HAS_DELAYED_COMPACT_BUCKETS_STATE_NAME);
+ if (lateCompactBucketsStateValues != null) {
+ for (StoreSinkWriteState.StateValue stateValue : lateCompactBucketsStateValues) {
+ ((AbstractFileStoreWrite>) write.getWrite())
+ .getWriterWrapper(stateValue.partition(), stateValue.bucket());
+ stateValue.bucket();
+ }
+ }
+ }
+
+ @Override
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ public void snapshotState() throws Exception {
+ super.snapshotState();
+
+ Map> writerContainerMap =
+ ((AbstractFileStoreWrite) write.getWrite()).writers();
+ List delayedCompactBucketList = new ArrayList<>();
+
+ for (Map.Entry>
+ partitionEntry : writerContainerMap.entrySet()) {
+ for (Map.Entry bucketEntry :
+ partitionEntry.getValue().entrySet()) {
+ RecordWriter writer = bucketEntry.getValue().writer;
+ if (writer.hasDelayedCompact()) {
+ delayedCompactBucketList.add(
+ new StoreSinkWriteState.StateValue(
+ partitionEntry.getKey(), bucketEntry.getKey(), new byte[0]));
+ }
+ }
+ }
+
+ state.put(tableName, HAS_DELAYED_COMPACT_BUCKETS_STATE_NAME, delayedCompactBucketList);
+ }
+}
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
index fb0f8184b591..b475da892d49 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
@@ -142,20 +142,36 @@ private StoreSinkWrite.Provider createWriteProvider(
}
}
- if (coreOptions.needLookup() && !coreOptions.prepareCommitWaitCompaction()) {
- return (table, commitUser, state, ioManager, memoryPool, metricGroup) -> {
- assertNoSinkMaterializer.run();
- return new AsyncLookupSinkWrite(
- table,
- commitUser,
- state,
- ioManager,
- ignorePreviousFiles,
- waitCompaction,
- isStreaming,
- memoryPool,
- metricGroup);
- };
+ if (coreOptions.needLookup()) {
+ if (coreOptions.lookupDelayPartitionThreshold() != null) {
+ return (table, commitUser, state, ioManager, memoryPool, metricGroup) -> {
+ assertNoSinkMaterializer.run();
+ return new DelayedCompactStoreSinkWrite(
+ table,
+ commitUser,
+ state,
+ ioManager,
+ ignorePreviousFiles,
+ waitCompaction,
+ isStreaming,
+ memoryPool,
+ metricGroup);
+ };
+ } else if (!coreOptions.prepareCommitWaitCompaction()) {
+ return (table, commitUser, state, ioManager, memoryPool, metricGroup) -> {
+ assertNoSinkMaterializer.run();
+ return new AsyncLookupSinkWrite(
+ table,
+ commitUser,
+ state,
+ ioManager,
+ ignorePreviousFiles,
+ waitCompaction,
+ isStreaming,
+ memoryPool,
+ metricGroup);
+ };
+ }
}
return (table, commitUser, state, ioManager, memoryPool, metricGroup) -> {
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WriterOperatorTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WriterOperatorTest.java
index 83af15745078..f70001336484 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WriterOperatorTest.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WriterOperatorTest.java
@@ -19,6 +19,7 @@
package org.apache.paimon.flink.sink;
import org.apache.paimon.CoreOptions;
+import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.flink.utils.InternalRowTypeSerializer;
@@ -56,6 +57,9 @@
import org.junit.jupiter.api.io.TempDir;
import java.lang.reflect.Field;
+import java.time.Duration;
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -242,6 +246,141 @@ public void testAsyncLookupWithFailure() throws Exception {
.containsExactlyInAnyOrder("+I[1, 10, 101]", "+I[2, 20, 200]", "+I[3, 30, 301]");
}
+ @Test
+ public void testDelayedLookupWithFailure() throws Exception {
+ RowType rowType =
+ RowType.of(
+ new DataType[] {DataTypes.INT(), DataTypes.STRING(), DataTypes.INT()},
+ new String[] {"pt", "k", "v"});
+
+ String partitionFormatter = "yyyyMMdd";
+ Duration partitionThreshold = Duration.ofDays(1);
+ int lookupStopTrigger = 5;
+
+ Options options = new Options();
+ options.set("bucket", "1");
+ options.set("changelog-producer", "lookup");
+ options.set(CoreOptions.LOOKUP_DELAY_PARTITION_THRESHOLD, Duration.ofDays(1));
+ options.set(CoreOptions.LOOKUP_DELAY_STOP_TRIGGER, lookupStopTrigger);
+ options.set(CoreOptions.PARTITION_TIMESTAMP_FORMATTER, partitionFormatter);
+ options.set(CoreOptions.PARTITION_TIMESTAMP_PATTERN, "$k");
+
+ FileStoreTable fileStoreTable =
+ createFileStoreTable(
+ rowType, Arrays.asList("pt", "k"), Collections.singletonList("k"), options);
+
+ // compaction will be triggered immediately since this is delayed lookup compaction
+ RowDataStoreWriteOperator.Factory operatorFactory =
+ getDelayedCompactWriteOperatorFactory(fileStoreTable, false);
+ OneInputStreamOperatorTestHarness harness =
+ createHarness(operatorFactory);
+
+ TableCommitImpl commit = fileStoreTable.newCommit(commitUser);
+
+ TypeSerializer serializer =
+ new CommittableTypeInfo().createSerializer(new ExecutionConfig());
+ harness.setup(serializer);
+ harness.open();
+
+ String partition1 =
+ generatePartitionValue(partitionFormatter, partitionThreshold.minusDays(1));
+ String partition2 = generatePartitionValue(partitionFormatter, partitionThreshold);
+ String partition3 =
+ generatePartitionValue(partitionFormatter, partitionThreshold.plusDays(1));
+
+ // write basic records
+ harness.processElement(GenericRow.of(1, BinaryString.fromString(partition1), 100), 1);
+ harness.processElement(GenericRow.of(2, BinaryString.fromString(partition2), 200), 2);
+ harness.processElement(GenericRow.of(3, BinaryString.fromString(partition3), 300), 3);
+ harness.prepareSnapshotPreBarrier(1);
+ harness.snapshot(1, 10);
+ harness.notifyOfCompletedCheckpoint(1);
+ commitAll(harness, commit, 1);
+
+ harness.processElement(GenericRow.of(1, BinaryString.fromString(partition1), 101), 11);
+ harness.processElement(GenericRow.of(3, BinaryString.fromString(partition3), 301), 13);
+ harness.prepareSnapshotPreBarrier(2);
+ OperatorSubtaskState state = harness.snapshot(2, 20);
+ harness.notifyOfCompletedCheckpoint(2);
+ commitAll(harness, commit, 2);
+
+ // operator is closed due to failure
+ harness.close();
+
+ // restore operator to trigger not delayed compaction
+ operatorFactory = getDelayedCompactWriteOperatorFactory(fileStoreTable, true);
+ harness = createHarness(operatorFactory);
+ harness.setup(serializer);
+ harness.initializeState(state);
+ harness.open();
+
+ // write nothing, wait for compaction
+ harness.prepareSnapshotPreBarrier(3);
+ harness.snapshot(3, 30);
+ harness.notifyOfCompletedCheckpoint(3);
+ commitAll(harness, commit, 3);
+
+ harness.close();
+
+ // check partition1 result
+ ReadBuilder readBuilder = fileStoreTable.newReadBuilder();
+ StreamTableScan scan = readBuilder.newStreamScan();
+ List splits = scan.plan().splits();
+ TableRead read = readBuilder.newRead();
+ RecordReader reader = read.createReader(splits);
+ List actual = new ArrayList<>();
+ reader.forEachRemaining(
+ row ->
+ actual.add(
+ String.format(
+ "%s[%d, %s, %d]",
+ row.getRowKind().shortString(),
+ row.getInt(0),
+ row.getString(1).toString(),
+ row.getInt(2))));
+ assertThat(actual).containsExactlyInAnyOrder(String.format("+I[1, %s, 101]", partition1));
+
+ // restore operator to trigger delayed compaction
+ operatorFactory = getDelayedCompactWriteOperatorFactory(fileStoreTable, true);
+ harness = createHarness(operatorFactory);
+ harness.setup(serializer);
+ harness.initializeState(state);
+ harness.open();
+
+ // trigger delayed compaction by stop trigger
+ for (int i = 0; i <= lookupStopTrigger; i++) {
+ harness.prepareSnapshotPreBarrier(i + 4);
+ harness.snapshot(i + 4, i + 40);
+ harness.notifyOfCompletedCheckpoint(i + 4);
+ commitAll(harness, commit, i + 4);
+ }
+
+ harness.close();
+ commit.close();
+
+ // check all partition result
+ readBuilder = fileStoreTable.newReadBuilder();
+ scan = readBuilder.newStreamScan();
+ splits = scan.plan().splits();
+ read = readBuilder.newRead();
+ reader = read.createReader(splits);
+ List finalResult = new ArrayList<>();
+ reader.forEachRemaining(
+ row ->
+ finalResult.add(
+ String.format(
+ "%s[%d, %s, %d]",
+ row.getRowKind().shortString(),
+ row.getInt(0),
+ row.getString(1).toString(),
+ row.getInt(2))));
+ assertThat(finalResult)
+ .containsExactlyInAnyOrder(
+ String.format("+I[1, %s, 101]", partition1),
+ String.format("+I[2, %s, 200]", partition2),
+ String.format("+I[3, %s, 301]", partition3));
+ }
+
@Test
public void testChangelog() throws Exception {
testChangelog(false);
@@ -449,6 +588,25 @@ private RowDataStoreWriteOperator.Factory getAsyncLookupWriteOperatorFactory(
commitUser);
}
+ private RowDataStoreWriteOperator.Factory getDelayedCompactWriteOperatorFactory(
+ FileStoreTable fileStoreTable, boolean waitCompaction) {
+ return new RowDataStoreWriteOperator.Factory(
+ fileStoreTable,
+ null,
+ (table, commitUser, state, ioManager, memoryPool, metricGroup) ->
+ new DelayedCompactStoreSinkWrite(
+ table,
+ commitUser,
+ state,
+ ioManager,
+ false,
+ waitCompaction,
+ true,
+ memoryPool,
+ metricGroup),
+ commitUser);
+ }
+
@SuppressWarnings("unchecked")
private void commitAll(
OneInputStreamOperatorTestHarness harness,
@@ -482,4 +640,10 @@ private OneInputStreamOperatorTestHarness createHarnes
operatorFactory,
internalRowInternalTypeInfo.createSerializer(new ExecutionConfig()));
}
+
+ private String generatePartitionValue(String partitionFormatter, Duration minusDays) {
+ return LocalDateTime.now()
+ .minus(minusDays)
+ .format(DateTimeFormatter.ofPattern(partitionFormatter));
+ }
}