From d6fa4218f7c7c36a56b3b82920f770366fd7e763 Mon Sep 17 00:00:00 2001 From: Xiangyu Feng Date: Sat, 22 Feb 2025 21:20:52 +0800 Subject: [PATCH 1/2] [core][flink] Introduce DelayedCompactStoreSinkWrite and delayed compact interface --- .../paimon/append/AppendOnlyWriter.java | 5 + .../append/BucketedAppendCompactManager.java | 5 + .../apache/paimon/compact/CompactManager.java | 5 + .../paimon/compact/NoopCompactManager.java | 5 + .../paimon/mergetree/MergeTreeWriter.java | 5 + .../compact/MergeTreeCompactManager.java | 5 + .../operation/AbstractFileStoreWrite.java | 9 +- .../paimon/postpone/PostponeBucketWriter.java | 5 + .../org/apache/paimon/utils/RecordWriter.java | 5 + .../sink/DelayedCompactStoreSinkWrite.java | 101 ++++++++++++++++++ 10 files changed, 147 insertions(+), 3 deletions(-) create mode 100644 paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/DelayedCompactStoreSinkWrite.java diff --git a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java index 4c313dd655c0..d132c9ddfa77 100644 --- a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java @@ -215,6 +215,11 @@ public boolean isCompacting() { return compactManager.isCompacting(); } + @Override + public boolean hasDelayedCompact() { + return compactManager.hasDelayedCompact(); + } + @VisibleForTesting void flush(boolean waitForLatestCompaction, boolean forcedFullCompaction) throws Exception { List flushedFiles = sinkWriter.flush(); diff --git a/paimon-core/src/main/java/org/apache/paimon/append/BucketedAppendCompactManager.java b/paimon-core/src/main/java/org/apache/paimon/append/BucketedAppendCompactManager.java index 02f469624c2a..c854069995d7 100644 --- a/paimon-core/src/main/java/org/apache/paimon/append/BucketedAppendCompactManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/append/BucketedAppendCompactManager.java @@ -186,6 +186,11 @@ public Optional getCompactionResult(boolean blocking) return result; } + @Override + public boolean hasDelayedCompact() { + return false; + } + @VisibleForTesting Optional> pickCompactBefore() { if (toCompact.isEmpty()) { diff --git a/paimon-core/src/main/java/org/apache/paimon/compact/CompactManager.java b/paimon-core/src/main/java/org/apache/paimon/compact/CompactManager.java index 88897bd860b8..572afc2430fe 100644 --- a/paimon-core/src/main/java/org/apache/paimon/compact/CompactManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/compact/CompactManager.java @@ -54,4 +54,9 @@ Optional getCompactionResult(boolean blocking) /** Check if a compaction is in progress, or if a compaction result remains to be fetched. */ boolean isCompacting(); + + /** + * Check if the {@code CompactManager} has delayed compacts need to be triggered in the future. + */ + boolean hasDelayedCompact(); } diff --git a/paimon-core/src/main/java/org/apache/paimon/compact/NoopCompactManager.java b/paimon-core/src/main/java/org/apache/paimon/compact/NoopCompactManager.java index 6f84105f389a..1e0a4361329c 100644 --- a/paimon-core/src/main/java/org/apache/paimon/compact/NoopCompactManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/compact/NoopCompactManager.java @@ -75,6 +75,11 @@ public boolean isCompacting() { return false; } + @Override + public boolean hasDelayedCompact() { + return false; + } + @Override public void close() throws IOException {} } diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java index 1c805e764a77..97b337f02b97 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java @@ -283,6 +283,11 @@ public boolean isCompacting() { return compactManager.isCompacting(); } + @Override + public boolean hasDelayedCompact() { + return compactManager.hasDelayedCompact(); + } + @Override public void sync() throws Exception { trySyncLatestCompaction(true); diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManager.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManager.java index 8ae29201301c..8099c9b9d0c6 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManager.java @@ -240,6 +240,11 @@ public Optional getCompactionResult(boolean blocking) return result; } + @Override + public boolean hasDelayedCompact() { + return false; + } + private void reportLevel0FileCount() { if (metricsReporter != null) { metricsReporter.reportLevel0FileCount(levels.level0().size()); diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java index 14dfe75a6e35..39af6940819a 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java @@ -290,9 +290,12 @@ Function, Boolean> createConflictAwareWriterCleanChecker( // // Condition 2: No compaction is in progress. That is, no more changelog will be // produced. + // + // Condition 3: The writer has no delayed compaction. return writerContainer -> writerContainer.lastModifiedCommitIdentifier < latestCommittedIdentifier - && !writerContainer.writer.isCompacting(); + && !writerContainer.writer.isCompacting() + && !writerContainer.writer.hasDelayedCompact(); } protected static @@ -397,7 +400,7 @@ public Map> getActiveBuckets() { return result; } - protected WriterContainer getWriterWrapper(BinaryRow partition, int bucket) { + public WriterContainer getWriterWrapper(BinaryRow partition, int bucket) { Map> buckets = writers.get(partition); if (buckets == null) { buckets = new HashMap<>(); @@ -552,7 +555,7 @@ protected WriterContainer( } @VisibleForTesting - Map>> writers() { + public Map>> writers() { return writers; } } diff --git a/paimon-core/src/main/java/org/apache/paimon/postpone/PostponeBucketWriter.java b/paimon-core/src/main/java/org/apache/paimon/postpone/PostponeBucketWriter.java index 47cafb9645f9..31c261eb75cd 100644 --- a/paimon-core/src/main/java/org/apache/paimon/postpone/PostponeBucketWriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/postpone/PostponeBucketWriter.java @@ -88,6 +88,11 @@ public boolean isCompacting() { return false; } + @Override + public boolean hasDelayedCompact() { + return false; + } + @Override public void sync() throws Exception {} diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/RecordWriter.java b/paimon-core/src/main/java/org/apache/paimon/utils/RecordWriter.java index 2ccf4cc861f6..c8591dafb176 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/RecordWriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/RecordWriter.java @@ -67,6 +67,11 @@ public interface RecordWriter { /** Check if a compaction is in progress, or if a compaction result remains to be fetched. */ boolean isCompacting(); + /** + * Check if the {@code RecordWriter} has delayed compacts need to be triggered in the future. + */ + boolean hasDelayedCompact(); + /** * Sync the writer. The structure related to file reading and writing is thread unsafe, there * are asynchronous threads inside the writer, which should be synced before reading data. diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/DelayedCompactStoreSinkWrite.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/DelayedCompactStoreSinkWrite.java new file mode 100644 index 000000000000..856b9d853945 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/DelayedCompactStoreSinkWrite.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.sink; + +import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.memory.MemorySegmentPool; +import org.apache.paimon.operation.AbstractFileStoreWrite; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.utils.RecordWriter; + +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.runtime.io.disk.iomanager.IOManager; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +/** {@link StoreSinkWrite} for tables that need delayed compaction trigger. */ +public class DelayedCompactStoreSinkWrite extends StoreSinkWriteImpl { + + private final String tableName; + + private static final String HAS_DELAYED_COMPACT_BUCKETS_STATE_NAME = + "paimon_has_delayed_compact_buckets"; + + public DelayedCompactStoreSinkWrite( + FileStoreTable table, + String commitUser, + StoreSinkWriteState state, + IOManager ioManager, + boolean ignorePreviousFiles, + boolean waitCompaction, + boolean isStreaming, + @Nullable MemorySegmentPool memoryPool, + MetricGroup metricGroup) { + super( + table, + commitUser, + state, + ioManager, + ignorePreviousFiles, + waitCompaction, + isStreaming, + memoryPool, + metricGroup); + + this.tableName = table.name(); + List lateCompactBucketsStateValues = + state.get(tableName, HAS_DELAYED_COMPACT_BUCKETS_STATE_NAME); + if (lateCompactBucketsStateValues != null) { + for (StoreSinkWriteState.StateValue stateValue : lateCompactBucketsStateValues) { + ((AbstractFileStoreWrite) write.getWrite()) + .getWriterWrapper(stateValue.partition(), stateValue.bucket()); + stateValue.bucket(); + } + } + } + + @Override + @SuppressWarnings({"rawtypes", "unchecked"}) + public void snapshotState() throws Exception { + super.snapshotState(); + + Map> writerContainerMap = + ((AbstractFileStoreWrite) write.getWrite()).writers(); + List delayedCompactBucketList = new ArrayList<>(); + + for (Map.Entry> + partitionEntry : writerContainerMap.entrySet()) { + for (Map.Entry bucketEntry : + partitionEntry.getValue().entrySet()) { + RecordWriter writer = bucketEntry.getValue().writer; + if (writer.hasDelayedCompact()) { + delayedCompactBucketList.add( + new StoreSinkWriteState.StateValue( + partitionEntry.getKey(), bucketEntry.getKey(), new byte[0])); + } + } + } + + state.put(tableName, HAS_DELAYED_COMPACT_BUCKETS_STATE_NAME, delayedCompactBucketList); + } +} From 0e8eed98f7418307ae6873a274f164522483079d Mon Sep 17 00:00:00 2001 From: Xiangyu Feng Date: Mon, 24 Feb 2025 11:40:19 +0800 Subject: [PATCH 2/2] [core] Introduce DelayedLookupCompactManager to support delayed lookup compaction --- .../generated/core_configuration.html | 18 ++ .../java/org/apache/paimon/CoreOptions.java | 40 +++++ .../paimon/append/AppendOnlyWriter.java | 2 +- .../paimon/mergetree/MergeTreeWriter.java | 2 +- .../compact/DelayedLookupCompactManager.java | 145 +++++++++++++++ .../operation/KeyValueFileStoreWrite.java | 54 ++++-- .../partition/PartitionTimeExtractor.java | 17 ++ .../DelayedLookupCompactManagerTest.java | 166 ++++++++++++++++++ .../compact/MergeTreeCompactManagerTest.java | 22 +-- .../operation/KeyValueFileStoreWriteTest.java | 124 +++++++++++++ .../apache/paimon/flink/sink/FlinkSink.java | 44 +++-- .../paimon/flink/sink/WriterOperatorTest.java | 164 +++++++++++++++++ 12 files changed, 758 insertions(+), 40 deletions(-) create mode 100644 paimon-core/src/main/java/org/apache/paimon/mergetree/compact/DelayedLookupCompactManager.java create mode 100644 paimon-core/src/test/java/org/apache/paimon/mergetree/compact/DelayedLookupCompactManagerTest.java create mode 100644 paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreWriteTest.java diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html index fc5c04477bc1..19b8eae1bec8 100644 --- a/docs/layouts/shortcodes/generated/core_configuration.html +++ b/docs/layouts/shortcodes/generated/core_configuration.html @@ -146,6 +146,24 @@ String Specifies the commit user prefix. + +
compaction.lookup-delay.l0-file-trigger
+ 5 + Integer + The L0 file trigger for a delayed lookup compaction. A delayed lookup compaction will only be performed when L0 files reach this config value. + + +
compaction.lookup-delay.partition-threshold
+ (none) + Duration + The threshold of partitioned pk table to perform delayed lookup compaction. Delayed lookup compaction can be configured with a less frequent compaction strategy to sacrifice timeliness for overall resource usage saving. This option is only valid for a partitioned pk table when CoreOptions#needLookup() is true. + + +
compaction.lookup-delay.stop-trigger
+ 10 + Integer + The stop trigger for a delayed lookup compaction. For every stop trigger, a forced lookup compaction will be performed to flush L0 files to higher level. +
compaction.max-size-amplification-percent
200 diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java index 9e0c679b576e..a3f46bb1d321 100644 --- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java @@ -627,6 +627,33 @@ public class CoreOptions implements Serializable { text("Default value of Bucketed Append Table is '5'.")) .build()); + public static final ConfigOption LOOKUP_DELAY_PARTITION_THRESHOLD = + key("compaction.lookup-delay.partition-threshold") + .durationType() + .noDefaultValue() + .withDescription( + "The threshold of partitioned pk table to perform delayed lookup compaction. " + + "Delayed lookup compaction can be configured with a less frequent " + + "compaction strategy to sacrifice timeliness for overall resource usage saving. " + + "This option is only valid for a partitioned pk table when " + + "CoreOptions#needLookup() is true."); + + public static final ConfigOption LOOKUP_DELAY_L0_FILE_TRIGGER = + key("compaction.lookup-delay.l0-file-trigger") + .intType() + .defaultValue(5) + .withDescription( + "The L0 file trigger for a delayed lookup compaction. A delayed lookup compaction " + + "will only be performed when L0 files reach this config value."); + + public static final ConfigOption LOOKUP_DELAY_STOP_TRIGGER = + key("compaction.lookup-delay.stop-trigger") + .intType() + .defaultValue(LOOKUP_DELAY_L0_FILE_TRIGGER.defaultValue() * 2) + .withDescription( + "The stop trigger for a delayed lookup compaction. For every stop trigger, " + + "a forced lookup compaction will be performed to flush L0 files to higher level."); + public static final ConfigOption CHANGELOG_PRODUCER = key("changelog-producer") .enumType(ChangelogProducer.class) @@ -2043,6 +2070,19 @@ public Duration optimizedCompactionInterval() { return options.get(COMPACTION_OPTIMIZATION_INTERVAL); } + @Nullable + public Duration lookupDelayPartitionThreshold() { + return options.get(LOOKUP_DELAY_PARTITION_THRESHOLD); + } + + public int lookupDelayL0FileTrigger() { + return options.get(LOOKUP_DELAY_L0_FILE_TRIGGER); + } + + public int lookupDelayStopTrigger() { + return options.get(LOOKUP_DELAY_STOP_TRIGGER); + } + public int numSortedRunStopTrigger() { Integer stopTrigger = options.get(NUM_SORTED_RUNS_STOP_TRIGGER); if (stopTrigger == null) { diff --git a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java index d132c9ddfa77..6668544e3b43 100644 --- a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java @@ -217,7 +217,7 @@ public boolean isCompacting() { @Override public boolean hasDelayedCompact() { - return compactManager.hasDelayedCompact(); + return false; } @VisibleForTesting diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java index 97b337f02b97..8bca37df705d 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java @@ -142,7 +142,7 @@ private long newSequenceNumber() { } @VisibleForTesting - CompactManager compactManager() { + public CompactManager compactManager() { return compactManager; } diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/DelayedLookupCompactManager.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/DelayedLookupCompactManager.java new file mode 100644 index 000000000000..fd9c3be2bb58 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/DelayedLookupCompactManager.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.mergetree.compact; + +import org.apache.paimon.annotation.VisibleForTesting; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer; +import org.apache.paimon.mergetree.Levels; +import org.apache.paimon.operation.metrics.CompactionMetrics; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.time.Duration; +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; +import java.util.Comparator; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicInteger; + +/** Compact manager to perform the delayed lookup compaction. */ +public class DelayedLookupCompactManager extends MergeTreeCompactManager { + + private static final Logger LOG = LoggerFactory.getLogger(DelayedLookupCompactManager.class); + private final CompactStrategy strategy; + private final LocalDateTime currentPartitionDate; + private final Duration delayPartitionThreshold; + private final int l0FileTriggerThreshold; + private final int stopTriggerThreshold; + + private final AtomicInteger triggerCount = new AtomicInteger(0); + + public DelayedLookupCompactManager( + ExecutorService executor, + Levels levels, + CompactStrategy strategy, + Comparator keyComparator, + long compactionFileSize, + int numSortedRunStopTrigger, + CompactRewriter rewriter, + @Nullable CompactionMetrics.Reporter metricsReporter, + @Nullable DeletionVectorsMaintainer dvMaintainer, + boolean lazyGenDeletionFile, + LocalDateTime currentPartitionDate, + Duration delayPartitionThreshold, + int l0FileTriggerThreshold, + int stopTriggerThreshold) { + super( + executor, + levels, + strategy, + keyComparator, + compactionFileSize, + numSortedRunStopTrigger, + rewriter, + metricsReporter, + dvMaintainer, + lazyGenDeletionFile); + this.strategy = strategy; + this.currentPartitionDate = currentPartitionDate; + this.delayPartitionThreshold = delayPartitionThreshold; + this.l0FileTriggerThreshold = l0FileTriggerThreshold; + this.stopTriggerThreshold = stopTriggerThreshold; + } + + @Override + public void triggerCompaction(boolean fullCompaction) { + if (isDelayedCompactPartition()) { + if (shouldTriggerDelayedLookupCompact()) { + triggerCount.set(0); + super.triggerCompaction(fullCompaction); + } else { + LOG.info( + "Skip to trigger this lookup compaction because the compaction trigger count {} has not reached " + + "the stop trigger threshold {} or l0 file size {} is less than l0 file trigger threshold {}.", + triggerCount.get(), + stopTriggerThreshold, + level0Files(), + l0FileTriggerThreshold); + triggerCount.incrementAndGet(); + } + } else { + super.triggerCompaction(fullCompaction); + } + } + + @Override + public boolean hasDelayedCompact() { + return strategy instanceof ForceUpLevel0Compaction && level0Files() > 0; + } + + @VisibleForTesting + public boolean shouldTriggerDelayedLookupCompact() { + if (level0Files() >= l0FileTriggerThreshold) { + LOG.info("Trigger delayed lookup compact for L0 file count: {}", level0Files()); + return true; + } else if (triggerCount.get() >= stopTriggerThreshold) { + LOG.info("Trigger delayed lookup compact for stopTrigger count: {}", triggerCount); + return true; + } else { + return false; + } + } + + @VisibleForTesting + public boolean isDelayedCompactPartition() { + if (delayPartitionThreshold == null) { + return false; + } + LocalDateTime delayedCompactPartition = LocalDateTime.now().minus(delayPartitionThreshold); + // For delayedCompactPartitionThreshold=20250120, any data insert into partitions<=20250120 + // should be delayed compact. + boolean result = !currentPartitionDate.isAfter(delayedCompactPartition); + if (LOG.isDebugEnabled()) { + LOG.debug( + "Current partition Date: {}, delayed compact partition threshold: {}, delayed compact partition result: {}.", + currentPartitionDate.format(DateTimeFormatter.BASIC_ISO_DATE), + delayedCompactPartition.format(DateTimeFormatter.BASIC_ISO_DATE), + result); + } + return result; + } + + private int level0Files() { + return levels().level0().size(); + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java index d061e181618b..681cbbc4b1b1 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java @@ -52,6 +52,7 @@ import org.apache.paimon.mergetree.MergeTreeWriter; import org.apache.paimon.mergetree.compact.CompactRewriter; import org.apache.paimon.mergetree.compact.CompactStrategy; +import org.apache.paimon.mergetree.compact.DelayedLookupCompactManager; import org.apache.paimon.mergetree.compact.ForceUpLevel0Compaction; import org.apache.paimon.mergetree.compact.FullChangelogMergeTreeCompactRewriter; import org.apache.paimon.mergetree.compact.LookupMergeTreeCompactRewriter; @@ -62,6 +63,7 @@ import org.apache.paimon.mergetree.compact.MergeTreeCompactRewriter; import org.apache.paimon.mergetree.compact.UniversalCompaction; import org.apache.paimon.options.Options; +import org.apache.paimon.partition.PartitionTimeExtractor; import org.apache.paimon.schema.KeyValueFieldsExtractor; import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.schema.TableSchema; @@ -79,6 +81,7 @@ import javax.annotation.Nullable; +import java.time.LocalDateTime; import java.util.Comparator; import java.util.List; import java.util.Map; @@ -107,6 +110,7 @@ public class KeyValueFileStoreWrite extends MemoryFileStoreWrite { private final RowType keyType; private final RowType valueType; private final RowType partitionType; + private final PartitionTimeExtractor partitionTimeExtractor; private final String commitUser; @Nullable private final RecordLevelExpire recordLevelExpire; @Nullable private Cache lookupFileCache; @@ -142,6 +146,7 @@ public KeyValueFileStoreWrite( tableName); this.fileIO = fileIO; this.partitionType = partitionType; + this.partitionTimeExtractor = new PartitionTimeExtractor(options); this.keyType = keyType; this.valueType = valueType; this.commitUser = commitUser; @@ -252,19 +257,42 @@ private CompactManager createCompactManager( userDefinedSeqComparator, levels, dvMaintainer); - return new MergeTreeCompactManager( - compactExecutor, - levels, - compactStrategy, - keyComparator, - options.compactionFileSize(true), - options.numSortedRunStopTrigger(), - rewriter, - compactionMetrics == null - ? null - : compactionMetrics.createReporter(partition, bucket), - dvMaintainer, - options.prepareCommitWaitCompaction()); + + if (options.needLookup() && options.lookupDelayPartitionThreshold() != null) { + LocalDateTime partitionTime = + partitionTimeExtractor.extract(partition, partitionType); + return new DelayedLookupCompactManager( + compactExecutor, + levels, + compactStrategy, + keyComparator, + options.compactionFileSize(true), + options.numSortedRunStopTrigger(), + rewriter, + compactionMetrics == null + ? null + : compactionMetrics.createReporter(partition, bucket), + dvMaintainer, + options.prepareCommitWaitCompaction(), + partitionTime, + options.lookupDelayPartitionThreshold(), + options.lookupDelayL0FileTrigger(), + options.lookupDelayStopTrigger()); + } else { + return new MergeTreeCompactManager( + compactExecutor, + levels, + compactStrategy, + keyComparator, + options.compactionFileSize(true), + options.numSortedRunStopTrigger(), + rewriter, + compactionMetrics == null + ? null + : compactionMetrics.createReporter(partition, bucket), + dvMaintainer, + options.prepareCommitWaitCompaction()); + } } } diff --git a/paimon-core/src/main/java/org/apache/paimon/partition/PartitionTimeExtractor.java b/paimon-core/src/main/java/org/apache/paimon/partition/PartitionTimeExtractor.java index 0016619e2162..9af1789131b7 100644 --- a/paimon-core/src/main/java/org/apache/paimon/partition/PartitionTimeExtractor.java +++ b/paimon-core/src/main/java/org/apache/paimon/partition/PartitionTimeExtractor.java @@ -18,6 +18,11 @@ package org.apache.paimon.partition; +import org.apache.paimon.CoreOptions; +import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.RowDataToObjectArrayConverter; + import javax.annotation.Nullable; import java.time.LocalDate; @@ -30,6 +35,7 @@ import java.time.format.SignStyle; import java.time.temporal.ChronoField; import java.util.ArrayList; +import java.util.Arrays; import java.util.LinkedHashMap; import java.util.List; import java.util.Locale; @@ -79,11 +85,22 @@ public class PartitionTimeExtractor { @Nullable private final String pattern; @Nullable private final String formatter; + public PartitionTimeExtractor(CoreOptions options) { + this(options.partitionTimestampPattern(), options.partitionTimestampFormatter()); + } + public PartitionTimeExtractor(@Nullable String pattern, @Nullable String formatter) { this.pattern = pattern; this.formatter = formatter; } + public LocalDateTime extract(BinaryRow partition, RowType partitionType) { + RowDataToObjectArrayConverter toObjectArrayConverter = + new RowDataToObjectArrayConverter(partitionType); + Object[] array = toObjectArrayConverter.convert(partition); + return extract(partitionType.getFieldNames(), Arrays.asList(array)); + } + public LocalDateTime extract(LinkedHashMap spec) { return extract(new ArrayList<>(spec.keySet()), new ArrayList<>(spec.values())); } diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/DelayedLookupCompactManagerTest.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/DelayedLookupCompactManagerTest.java new file mode 100644 index 000000000000..bf7d4ce62344 --- /dev/null +++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/DelayedLookupCompactManagerTest.java @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.mergetree.compact; + +import org.apache.paimon.io.DataFileMeta; +import org.apache.paimon.mergetree.Levels; + +import org.junit.jupiter.api.Test; + +import java.time.Duration; +import java.time.LocalDateTime; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.ExecutionException; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Test for {@link MergeTreeCompactManager}. */ +public class DelayedLookupCompactManagerTest extends MergeTreeCompactManagerTest { + + @Test + public void testDelayedLookupCompactionBeforeThreshold() + throws ExecutionException, InterruptedException { + List inputs = + Arrays.asList( + new LevelMinMax(0, 1, 5), + new LevelMinMax(0, 3, 6), + new LevelMinMax(0, 6, 8), + new LevelMinMax(1, 1, 4), + new LevelMinMax(1, 6, 8), + new LevelMinMax(1, 10, 10), + new LevelMinMax(2, 1, 3), + new LevelMinMax(2, 4, 6)); + List files = new ArrayList<>(); + for (int i = 0; i < inputs.size(); i++) { + LevelMinMax minMax = inputs.get(i); + files.add(minMax.toFile(i)); + } + + Levels levels = new Levels(comparator, files, 3); + + CompactStrategy strategy = new ForceUpLevel0Compaction(new UniversalCompaction(0, 0, 5)); + + DelayedLookupCompactManager oldManager = + new DelayedLookupCompactManager( + service, + levels, + strategy, + comparator, + 2, + Integer.MAX_VALUE, + new TestRewriter(true), + null, + null, + false, + LocalDateTime.now().minusDays(2), + Duration.ofDays(2), + 5, + 5); + + // delayed lookup compaction not triggered + assertThat(oldManager.isDelayedCompactPartition()).isTrue(); + assertThat(oldManager.shouldTriggerDelayedLookupCompact()).isFalse(); + assertThat(oldManager.hasDelayedCompact()).isTrue(); + + // trigger delayed lookup compaction by stop trigger + oldManager.triggerCompaction(false); + assertThat(oldManager.getCompactionResult(true)).isEmpty(); + + oldManager.triggerCompaction(false); + oldManager.triggerCompaction(false); + oldManager.triggerCompaction(false); + oldManager.triggerCompaction(false); + + assertThat(oldManager.shouldTriggerDelayedLookupCompact()).isTrue(); + oldManager.triggerCompaction(false); + assertThat(oldManager.getCompactionResult(true)).isPresent(); + assertThat(oldManager.hasDelayedCompact()).isFalse(); + + // trigger delayed lookup compaction by l0 file count + oldManager.addNewFile((new LevelMinMax(0, 11, 12)).toFile(inputs.size())); + oldManager.addNewFile((new LevelMinMax(0, 13, 14)).toFile(inputs.size() + 1)); + oldManager.addNewFile((new LevelMinMax(0, 15, 16)).toFile(inputs.size() + 2)); + oldManager.addNewFile((new LevelMinMax(0, 17, 18)).toFile(inputs.size() + 3)); + assertThat(oldManager.shouldTriggerDelayedLookupCompact()).isFalse(); + oldManager.addNewFile((new LevelMinMax(0, 15, 16)).toFile(inputs.size() + 4)); + oldManager.addNewFile((new LevelMinMax(0, 17, 18)).toFile(inputs.size() + 6)); + assertThat(oldManager.shouldTriggerDelayedLookupCompact()).isTrue(); + oldManager.triggerCompaction(false); + assertThat(oldManager.getCompactionResult(true)).isPresent(); + } + + @Test + public void testDelayedLookupCompactionAfterThreshold() + throws ExecutionException, InterruptedException { + List inputs = + Arrays.asList( + new LevelMinMax(0, 1, 5), + new LevelMinMax(0, 3, 6), + new LevelMinMax(0, 6, 8), + new LevelMinMax(1, 1, 4), + new LevelMinMax(1, 6, 8), + new LevelMinMax(1, 10, 10), + new LevelMinMax(2, 1, 3), + new LevelMinMax(2, 4, 6)); + List files = new ArrayList<>(); + for (int i = 0; i < inputs.size(); i++) { + LevelMinMax minMax = inputs.get(i); + files.add(minMax.toFile(i)); + } + + Levels levels = new Levels(comparator, files, 3); + + CompactStrategy strategy = new ForceUpLevel0Compaction(new UniversalCompaction(0, 0, 5)); + + DelayedLookupCompactManager newManager = + new DelayedLookupCompactManager( + service, + levels, + strategy, + comparator, + 2, + Integer.MAX_VALUE, + new TestRewriter(true), + null, + null, + false, + LocalDateTime.now().minusDays(1), + Duration.ofDays(2), + 5, + 5); + + // compaction will trigger immediately for new partitions + assertThat(newManager.isDelayedCompactPartition()).isFalse(); + assertThat(newManager.shouldTriggerDelayedLookupCompact()).isFalse(); + newManager.triggerCompaction(false); + assertThat(newManager.getCompactionResult(true)).isPresent(); + assertThat(newManager.hasDelayedCompact()).isFalse(); + + // stop-trigger not working for new partitions + newManager.triggerCompaction(false); + newManager.triggerCompaction(false); + newManager.triggerCompaction(false); + newManager.triggerCompaction(false); + newManager.triggerCompaction(false); + newManager.triggerCompaction(false); + assertThat(newManager.shouldTriggerDelayedLookupCompact()).isFalse(); + } +} diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManagerTest.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManagerTest.java index 8484525c82c8..8d7ec962de78 100644 --- a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManagerTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManagerTest.java @@ -49,9 +49,9 @@ /** Test for {@link MergeTreeCompactManager}. */ public class MergeTreeCompactManagerTest { - private final Comparator comparator = Comparator.comparingInt(o -> o.getInt(0)); + final Comparator comparator = Comparator.comparingInt(o -> o.getInt(0)); - private static ExecutorService service; + static ExecutorService service; @BeforeAll public static void before() { @@ -223,11 +223,11 @@ private CompactStrategy testStrategy() { return (numLevels, runs) -> Optional.of(CompactUnit.fromLevelRuns(numLevels - 1, runs)); } - private static class TestRewriter extends AbstractCompactRewriter { + static class TestRewriter extends AbstractCompactRewriter { private final boolean expectedDropDelete; - private TestRewriter(boolean expectedDropDelete) { + TestRewriter(boolean expectedDropDelete) { this.expectedDropDelete = expectedDropDelete; } @@ -262,25 +262,25 @@ public CompactResult rewrite( } } - private static class LevelMinMax { + static class LevelMinMax { - private final int level; - private final int min; - private final int max; + protected final int level; + protected final int min; + protected final int max; - private LevelMinMax(DataFileMeta file) { + LevelMinMax(DataFileMeta file) { this.level = file.level(); this.min = file.minKey().getInt(0); this.max = file.maxKey().getInt(0); } - private LevelMinMax(int level, int min, int max) { + LevelMinMax(int level, int min, int max) { this.level = level; this.min = min; this.max = max; } - private DataFileMeta toFile(long maxSequence) { + DataFileMeta toFile(long maxSequence) { return newFile(level, min, max, maxSequence); } diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreWriteTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreWriteTest.java new file mode 100644 index 000000000000..140c9c8913f6 --- /dev/null +++ b/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreWriteTest.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.operation; + +import org.apache.paimon.CoreOptions; +import org.apache.paimon.KeyValue; +import org.apache.paimon.TestFileStore; +import org.apache.paimon.TestKeyValueGenerator; +import org.apache.paimon.compact.CompactManager; +import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.disk.IOManager; +import org.apache.paimon.disk.IOManagerImpl; +import org.apache.paimon.fs.Path; +import org.apache.paimon.fs.local.LocalFileIO; +import org.apache.paimon.mergetree.MergeTreeWriter; +import org.apache.paimon.mergetree.compact.DeduplicateMergeFunction; +import org.apache.paimon.mergetree.compact.DelayedLookupCompactManager; +import org.apache.paimon.partition.PartitionTimeExtractor; +import org.apache.paimon.schema.Schema; +import org.apache.paimon.schema.SchemaManager; +import org.apache.paimon.schema.TableSchema; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.io.IOException; +import java.time.Duration; +import java.time.LocalDateTime; +import java.util.HashMap; +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link KeyValueFileStoreWrite}. */ +public class KeyValueFileStoreWriteTest { + + private static final int NUM_BUCKETS = 10; + + @TempDir java.nio.file.Path tempDir; + + private IOManager ioManager; + + @BeforeEach + public void before() throws IOException { + this.ioManager = new IOManagerImpl(tempDir.toString()); + } + + @Test + public void testDelayedLookupPartition() throws Exception { + SchemaManager schemaManager = + new SchemaManager(LocalFileIO.create(), new Path(tempDir.toUri())); + Map options = new HashMap<>(); + Duration lookupDelayPartitionThreshold = Duration.ofDays(1L); + options.put(CoreOptions.LOOKUP_DELAY_PARTITION_THRESHOLD.key(), "1 d"); + options.put(CoreOptions.DELETION_VECTORS_ENABLED.key(), "true"); + options.put(CoreOptions.PARTITION_TIMESTAMP_FORMATTER.key(), "yyyyMMdd H"); + options.put(CoreOptions.PARTITION_TIMESTAMP_PATTERN.key(), "$dt $hr"); + + TableSchema schema = + schemaManager.createTable( + new Schema( + TestKeyValueGenerator.DEFAULT_ROW_TYPE.getFields(), + TestKeyValueGenerator.DEFAULT_PART_TYPE.getFieldNames(), + TestKeyValueGenerator.getPrimaryKeys( + TestKeyValueGenerator.GeneratorMode.MULTI_PARTITIONED), + options, + null)); + TestFileStore store = + new TestFileStore.Builder( + "avro", + tempDir.toString(), + NUM_BUCKETS, + TestKeyValueGenerator.DEFAULT_PART_TYPE, + TestKeyValueGenerator.KEY_TYPE, + TestKeyValueGenerator.DEFAULT_ROW_TYPE, + TestKeyValueGenerator.TestKeyValueFieldsExtractor.EXTRACTOR, + DeduplicateMergeFunction.factory(), + schema) + .build(); + + KeyValueFileStoreWrite write = (KeyValueFileStoreWrite) store.newWrite(); + write.withIOManager(ioManager); + TestKeyValueGenerator gen = new TestKeyValueGenerator(); + PartitionTimeExtractor partitionTimeExtractor = new PartitionTimeExtractor(store.options()); + + for (int i = 0; i < 100; i++) { + KeyValue keyValue = gen.next(); + BinaryRow partition = gen.getPartition(keyValue); + LocalDateTime partitionTime = + partitionTimeExtractor.extract( + partition, TestKeyValueGenerator.DEFAULT_PART_TYPE); + AbstractFileStoreWrite.WriterContainer writerContainer = + write.createWriterContainer(gen.getPartition(keyValue), 1, false); + MergeTreeWriter writer = (MergeTreeWriter) writerContainer.writer; + + CompactManager compactManager = writer.compactManager(); + assertThat(compactManager).isInstanceOf(DelayedLookupCompactManager.class); + + DelayedLookupCompactManager delayedLookupCompactManager = + (DelayedLookupCompactManager) compactManager; + assertThat(delayedLookupCompactManager.isDelayedCompactPartition()) + .isEqualTo( + !partitionTime.isAfter( + LocalDateTime.now().minus(lookupDelayPartitionThreshold))); + } + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java index fb0f8184b591..b475da892d49 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java @@ -142,20 +142,36 @@ private StoreSinkWrite.Provider createWriteProvider( } } - if (coreOptions.needLookup() && !coreOptions.prepareCommitWaitCompaction()) { - return (table, commitUser, state, ioManager, memoryPool, metricGroup) -> { - assertNoSinkMaterializer.run(); - return new AsyncLookupSinkWrite( - table, - commitUser, - state, - ioManager, - ignorePreviousFiles, - waitCompaction, - isStreaming, - memoryPool, - metricGroup); - }; + if (coreOptions.needLookup()) { + if (coreOptions.lookupDelayPartitionThreshold() != null) { + return (table, commitUser, state, ioManager, memoryPool, metricGroup) -> { + assertNoSinkMaterializer.run(); + return new DelayedCompactStoreSinkWrite( + table, + commitUser, + state, + ioManager, + ignorePreviousFiles, + waitCompaction, + isStreaming, + memoryPool, + metricGroup); + }; + } else if (!coreOptions.prepareCommitWaitCompaction()) { + return (table, commitUser, state, ioManager, memoryPool, metricGroup) -> { + assertNoSinkMaterializer.run(); + return new AsyncLookupSinkWrite( + table, + commitUser, + state, + ioManager, + ignorePreviousFiles, + waitCompaction, + isStreaming, + memoryPool, + metricGroup); + }; + } } return (table, commitUser, state, ioManager, memoryPool, metricGroup) -> { diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WriterOperatorTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WriterOperatorTest.java index 83af15745078..f70001336484 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WriterOperatorTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WriterOperatorTest.java @@ -19,6 +19,7 @@ package org.apache.paimon.flink.sink; import org.apache.paimon.CoreOptions; +import org.apache.paimon.data.BinaryString; import org.apache.paimon.data.GenericRow; import org.apache.paimon.data.InternalRow; import org.apache.paimon.flink.utils.InternalRowTypeSerializer; @@ -56,6 +57,9 @@ import org.junit.jupiter.api.io.TempDir; import java.lang.reflect.Field; +import java.time.Duration; +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -242,6 +246,141 @@ public void testAsyncLookupWithFailure() throws Exception { .containsExactlyInAnyOrder("+I[1, 10, 101]", "+I[2, 20, 200]", "+I[3, 30, 301]"); } + @Test + public void testDelayedLookupWithFailure() throws Exception { + RowType rowType = + RowType.of( + new DataType[] {DataTypes.INT(), DataTypes.STRING(), DataTypes.INT()}, + new String[] {"pt", "k", "v"}); + + String partitionFormatter = "yyyyMMdd"; + Duration partitionThreshold = Duration.ofDays(1); + int lookupStopTrigger = 5; + + Options options = new Options(); + options.set("bucket", "1"); + options.set("changelog-producer", "lookup"); + options.set(CoreOptions.LOOKUP_DELAY_PARTITION_THRESHOLD, Duration.ofDays(1)); + options.set(CoreOptions.LOOKUP_DELAY_STOP_TRIGGER, lookupStopTrigger); + options.set(CoreOptions.PARTITION_TIMESTAMP_FORMATTER, partitionFormatter); + options.set(CoreOptions.PARTITION_TIMESTAMP_PATTERN, "$k"); + + FileStoreTable fileStoreTable = + createFileStoreTable( + rowType, Arrays.asList("pt", "k"), Collections.singletonList("k"), options); + + // compaction will be triggered immediately since this is delayed lookup compaction + RowDataStoreWriteOperator.Factory operatorFactory = + getDelayedCompactWriteOperatorFactory(fileStoreTable, false); + OneInputStreamOperatorTestHarness harness = + createHarness(operatorFactory); + + TableCommitImpl commit = fileStoreTable.newCommit(commitUser); + + TypeSerializer serializer = + new CommittableTypeInfo().createSerializer(new ExecutionConfig()); + harness.setup(serializer); + harness.open(); + + String partition1 = + generatePartitionValue(partitionFormatter, partitionThreshold.minusDays(1)); + String partition2 = generatePartitionValue(partitionFormatter, partitionThreshold); + String partition3 = + generatePartitionValue(partitionFormatter, partitionThreshold.plusDays(1)); + + // write basic records + harness.processElement(GenericRow.of(1, BinaryString.fromString(partition1), 100), 1); + harness.processElement(GenericRow.of(2, BinaryString.fromString(partition2), 200), 2); + harness.processElement(GenericRow.of(3, BinaryString.fromString(partition3), 300), 3); + harness.prepareSnapshotPreBarrier(1); + harness.snapshot(1, 10); + harness.notifyOfCompletedCheckpoint(1); + commitAll(harness, commit, 1); + + harness.processElement(GenericRow.of(1, BinaryString.fromString(partition1), 101), 11); + harness.processElement(GenericRow.of(3, BinaryString.fromString(partition3), 301), 13); + harness.prepareSnapshotPreBarrier(2); + OperatorSubtaskState state = harness.snapshot(2, 20); + harness.notifyOfCompletedCheckpoint(2); + commitAll(harness, commit, 2); + + // operator is closed due to failure + harness.close(); + + // restore operator to trigger not delayed compaction + operatorFactory = getDelayedCompactWriteOperatorFactory(fileStoreTable, true); + harness = createHarness(operatorFactory); + harness.setup(serializer); + harness.initializeState(state); + harness.open(); + + // write nothing, wait for compaction + harness.prepareSnapshotPreBarrier(3); + harness.snapshot(3, 30); + harness.notifyOfCompletedCheckpoint(3); + commitAll(harness, commit, 3); + + harness.close(); + + // check partition1 result + ReadBuilder readBuilder = fileStoreTable.newReadBuilder(); + StreamTableScan scan = readBuilder.newStreamScan(); + List splits = scan.plan().splits(); + TableRead read = readBuilder.newRead(); + RecordReader reader = read.createReader(splits); + List actual = new ArrayList<>(); + reader.forEachRemaining( + row -> + actual.add( + String.format( + "%s[%d, %s, %d]", + row.getRowKind().shortString(), + row.getInt(0), + row.getString(1).toString(), + row.getInt(2)))); + assertThat(actual).containsExactlyInAnyOrder(String.format("+I[1, %s, 101]", partition1)); + + // restore operator to trigger delayed compaction + operatorFactory = getDelayedCompactWriteOperatorFactory(fileStoreTable, true); + harness = createHarness(operatorFactory); + harness.setup(serializer); + harness.initializeState(state); + harness.open(); + + // trigger delayed compaction by stop trigger + for (int i = 0; i <= lookupStopTrigger; i++) { + harness.prepareSnapshotPreBarrier(i + 4); + harness.snapshot(i + 4, i + 40); + harness.notifyOfCompletedCheckpoint(i + 4); + commitAll(harness, commit, i + 4); + } + + harness.close(); + commit.close(); + + // check all partition result + readBuilder = fileStoreTable.newReadBuilder(); + scan = readBuilder.newStreamScan(); + splits = scan.plan().splits(); + read = readBuilder.newRead(); + reader = read.createReader(splits); + List finalResult = new ArrayList<>(); + reader.forEachRemaining( + row -> + finalResult.add( + String.format( + "%s[%d, %s, %d]", + row.getRowKind().shortString(), + row.getInt(0), + row.getString(1).toString(), + row.getInt(2)))); + assertThat(finalResult) + .containsExactlyInAnyOrder( + String.format("+I[1, %s, 101]", partition1), + String.format("+I[2, %s, 200]", partition2), + String.format("+I[3, %s, 301]", partition3)); + } + @Test public void testChangelog() throws Exception { testChangelog(false); @@ -449,6 +588,25 @@ private RowDataStoreWriteOperator.Factory getAsyncLookupWriteOperatorFactory( commitUser); } + private RowDataStoreWriteOperator.Factory getDelayedCompactWriteOperatorFactory( + FileStoreTable fileStoreTable, boolean waitCompaction) { + return new RowDataStoreWriteOperator.Factory( + fileStoreTable, + null, + (table, commitUser, state, ioManager, memoryPool, metricGroup) -> + new DelayedCompactStoreSinkWrite( + table, + commitUser, + state, + ioManager, + false, + waitCompaction, + true, + memoryPool, + metricGroup), + commitUser); + } + @SuppressWarnings("unchecked") private void commitAll( OneInputStreamOperatorTestHarness harness, @@ -482,4 +640,10 @@ private OneInputStreamOperatorTestHarness createHarnes operatorFactory, internalRowInternalTypeInfo.createSerializer(new ExecutionConfig())); } + + private String generatePartitionValue(String partitionFormatter, Duration minusDays) { + return LocalDateTime.now() + .minus(minusDays) + .format(DateTimeFormatter.ofPattern(partitionFormatter)); + } }