From bd1622da54524cf4dbbe99bdeabd4bcbf2117531 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E6=B0=B8=E7=BF=94?= Date: Mon, 11 May 2026 08:49:45 +0000 Subject: [PATCH 01/14] [#7282724120] feat(core): support sort manifest entries by partition - Sort delta entries during commit - Sort base entries via full merge See merge request: !854 --- docs/generated/core_configuration.html | 24 ++ .../java/org/apache/paimon/CoreOptions.java | 45 +++ .../paimon/operation/FileStoreCommitImpl.java | 25 +- .../paimon/operation/ManifestFileMerger.java | 358 ++++++++++++++++-- .../paimon/manifest/ManifestFileMetaTest.java | 187 ++++++++- .../NoPartitionManifestFileMetaTest.java | 4 +- .../procedure/CompactProcedureTestBase.scala | 18 +- 7 files changed, 598 insertions(+), 63 deletions(-) diff --git a/docs/generated/core_configuration.html b/docs/generated/core_configuration.html index c11941ea27db..59f5943b4288 100644 --- a/docs/generated/core_configuration.html +++ b/docs/generated/core_configuration.html @@ -867,6 +867,12 @@ Boolean For DELETE manifest entry in manifest file, drop stats to reduce memory and storage. Default value is false only for compatibility of old reader. + +
manifest.delta.sorted
+ true + Boolean + Whether to sort ManifestEntry by partition when writing manifest delta. +
manifest.format
"avro" @@ -885,6 +891,24 @@ Integer To avoid frequent manifest merges, this parameter specifies the minimum number of ManifestFileMeta to merge. + +
manifest.merge.sort-on-commit
+ false + Boolean + Whether to sort ManifestEntry by partition during manifest merge in commit. This option does not affect manifest compaction. + + +
manifest.merge.sort.buffer
+ 256 mb + MemorySize + Amount of data to build up in memory for sorting during manifest full compaction before spilling to disk. + + +
manifest.merge.sorted
+ true + Boolean + Whether to sort ManifestEntry by partition during manifest full compaction. +
manifest.target-file-size
8 mb diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java index d5a1bb7fb067..f6b9e85af6d3 100644 --- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java @@ -476,6 +476,28 @@ public InlineElement getDescription() { .withDescription( "Define upsert key to do MERGE INTO when executing INSERT INTO, cannot be defined with primary key."); + public static final ConfigOption MANIFEST_MERGE_SORTED = + key("manifest.merge.sorted") + .booleanType() + .defaultValue(true) + .withDescription( + "Whether to sort ManifestEntry by partition during manifest full compaction."); + + public static final ConfigOption MANIFEST_MERGE_SORT_ON_COMMIT = + key("manifest.merge.sort-on-commit") + .booleanType() + .defaultValue(false) + .withDescription( + "Whether to sort ManifestEntry by partition during manifest merge in commit. " + + "This option does not affect manifest compaction."); + + public static final ConfigOption MANIFEST_DELTA_SORTED = + key("manifest.delta.sorted") + .booleanType() + .defaultValue(true) + .withDescription( + "Whether to sort ManifestEntry by partition when writing manifest delta."); + public static final ConfigOption PARTITION_DEFAULT_NAME = key("partition.default-name") .stringType() @@ -661,6 +683,13 @@ public InlineElement getDescription() { .withDescription( "Amount of data to build up in memory before converting to a sorted on-disk file."); + public static final ConfigOption MANIFEST_MERGE_SORT_BUFFER = + key("manifest.merge.sort.buffer") + .memoryType() + .defaultValue(WRITE_BUFFER_SIZE.defaultValue()) + .withDescription( + "Amount of data to build up in memory for sorting during manifest full compaction before spilling to disk."); + @Documentation.OverrideDefault("infinite") public static final ConfigOption WRITE_BUFFER_MAX_DISK_SIZE = key("write-buffer-spill.max-disk-size") @@ -2844,6 +2873,22 @@ public int manifestMergeMinCount() { return options.get(MANIFEST_MERGE_MIN_COUNT); } + public boolean manifestMergeSorted() { + return options.get(MANIFEST_MERGE_SORTED); + } + + public boolean manifestMergeSortOnCommit() { + return options.get(MANIFEST_MERGE_SORT_ON_COMMIT); + } + + public boolean manifestDeltaSorted() { + return options.get(MANIFEST_DELTA_SORTED); + } + + public long manifestMergeSortBufferSize() { + return options.get(MANIFEST_MERGE_SORT_BUFFER).getBytes(); + } + public MergeEngine mergeEngine() { return options.get(MERGE_ENGINE); } diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java index 0032d69adfab..322e44bd49b1 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java @@ -955,11 +955,14 @@ CommitResult tryCommitOnce( ManifestFileMerger.merge( mergeBeforeManifests, manifestFile, - options.manifestTargetSize().getBytes(), - options.manifestMergeMinCount(), - options.manifestFullCompactionThresholdSize().getBytes(), + manifestTargetSize.getBytes(), + manifestMergeMinCount, + manifestFullCompactionSize.getBytes(), partitionType, - options.scanManifestParallelism()); + manifestReadParallelism, + coreOptions.manifestMergeSorted() + && coreOptions.manifestMergeSortOnCommit(), + coreOptions.manifestMergeSortBufferSize()); baseManifestList = manifestList.write(mergeAfterManifests); if (options.rowTrackingEnabled()) { @@ -984,8 +987,14 @@ CommitResult tryCommitOnce( // write new delta files into manifest files deltaStatistics = new ArrayList<>(PartitionEntry.merge(deltaFiles)); - deltaManifestList = manifestList.write(manifestFile.write(deltaFiles)); - + List deltaFilesForWrite = deltaFiles; + if (coreOptions.manifestDeltaSorted() && deltaFiles.size() > 1) { + deltaFilesForWrite = new ArrayList<>(deltaFiles); + deltaFilesForWrite.sort( + ManifestFileMerger.createManifestEntryComparator(partitionType)); + } + List deltaManifests = manifestFile.write(deltaFilesForWrite); + deltaManifestList = manifestList.write(deltaManifests); // write changelog into manifest files if (!changelogFiles.isEmpty()) { changelogManifestList = manifestList.write(manifestFile.write(changelogFiles)); @@ -1188,7 +1197,9 @@ private boolean compactManifestOnce() { 1, 1, partitionType, - options.scanManifestParallelism()); + options.scanManifestParallelism(), + coreOptions.manifestMergeSorted(), + coreOptions.manifestMergeSortBufferSize()); if (new HashSet<>(mergeBeforeManifests).equals(new HashSet<>(mergeAfterManifests))) { // no need to commit this snapshot, because no compact were happened diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/ManifestFileMerger.java b/paimon-core/src/main/java/org/apache/paimon/operation/ManifestFileMerger.java index cdcad1ed3e84..6e3a5c3657c7 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/ManifestFileMerger.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/ManifestFileMerger.java @@ -18,15 +18,34 @@ package org.apache.paimon.operation; +import org.apache.paimon.CoreOptions; +import org.apache.paimon.codegen.NormalizedKeyComputer; +import org.apache.paimon.codegen.RecordComparator; +import org.apache.paimon.compression.CompressOptions; import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.data.GenericRow; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.data.serializer.BinaryRowSerializer; +import org.apache.paimon.data.serializer.InternalRowSerializer; +import org.apache.paimon.disk.IOManager; import org.apache.paimon.io.RollingFileWriter; import org.apache.paimon.manifest.FileEntry; import org.apache.paimon.manifest.ManifestEntry; +import org.apache.paimon.manifest.ManifestEntrySerializer; import org.apache.paimon.manifest.ManifestFile; import org.apache.paimon.manifest.ManifestFileMeta; +import org.apache.paimon.memory.HeapMemorySegmentPool; +import org.apache.paimon.memory.MemorySegment; +import org.apache.paimon.memory.MemorySegmentPool; import org.apache.paimon.partition.PartitionPredicate; +import org.apache.paimon.sort.BinaryExternalSortBuffer; +import org.apache.paimon.sort.BinaryInMemorySortBuffer; +import org.apache.paimon.types.IntType; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.Filter; +import org.apache.paimon.utils.InternalRowUtils; +import org.apache.paimon.utils.MutableObjectIterator; +import org.apache.paimon.utils.SerializationUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -34,6 +53,7 @@ import javax.annotation.Nullable; import java.util.ArrayList; +import java.util.Comparator; import java.util.HashSet; import java.util.Iterator; import java.util.LinkedHashMap; @@ -45,6 +65,7 @@ import java.util.function.Function; import static java.util.Collections.singletonList; +import static org.apache.paimon.codegen.CodeGenUtils.newRecordComparator; import static org.apache.paimon.utils.ManifestReadThreadPool.sequentialBatchedExecute; import static org.apache.paimon.utils.Preconditions.checkArgument; @@ -66,7 +87,30 @@ public static List merge( int suggestedMinMetaCount, long manifestFullCompactionSize, RowType partitionType, - @Nullable Integer manifestReadParallelism) { + @Nullable Integer manifestReadParallelism, + boolean manifestMergeSorted) { + return merge( + input, + manifestFile, + suggestedMetaSize, + suggestedMinMetaCount, + manifestFullCompactionSize, + partitionType, + manifestReadParallelism, + manifestMergeSorted, + CoreOptions.MANIFEST_MERGE_SORT_BUFFER.defaultValue().getBytes()); + } + + public static List merge( + List input, + ManifestFile manifestFile, + long suggestedMetaSize, + int suggestedMinMetaCount, + long manifestFullCompactionSize, + RowType partitionType, + @Nullable Integer manifestReadParallelism, + boolean manifestMergeSorted, + long manifestMergeSortBufferSize) { // these are the newly created manifest files, clean them up if exception occurs List newFilesForAbort = new ArrayList<>(); @@ -79,7 +123,9 @@ public static List merge( suggestedMetaSize, manifestFullCompactionSize, partitionType, - manifestReadParallelism); + manifestReadParallelism, + manifestMergeSorted, + manifestMergeSortBufferSize); return fullCompacted.orElseGet( () -> tryMinorCompaction( @@ -162,7 +208,31 @@ public static Optional> tryFullCompaction( long suggestedMetaSize, long sizeTrigger, RowType partitionType, - @Nullable Integer manifestReadParallelism) + @Nullable Integer manifestReadParallelism, + boolean manifestMergeSorted) + throws Exception { + return tryFullCompaction( + inputs, + newFilesForAbort, + manifestFile, + suggestedMetaSize, + sizeTrigger, + partitionType, + manifestReadParallelism, + manifestMergeSorted, + CoreOptions.MANIFEST_MERGE_SORT_BUFFER.defaultValue().getBytes()); + } + + public static Optional> tryFullCompaction( + List inputs, + List newFilesForAbort, + ManifestFile manifestFile, + long suggestedMetaSize, + long sizeTrigger, + RowType partitionType, + @Nullable Integer manifestReadParallelism, + boolean manifestMergeSorted, + long manifestMergeSortBufferSize) throws Exception { checkArgument(sizeTrigger > 0, "Manifest full compaction size trigger cannot be zero."); @@ -248,13 +318,26 @@ public static Optional> tryFullCompaction( file, manifestFile, mustChange, deleteEntries)); Exception exception = null; try { - for (FullCompactionReadResult readResult : - sequentialBatchedExecute(reader, toBeMerged, manifestReadParallelism)) { - if (readResult.requireChange) { - writer.write(readResult.entries); - } else { - result.add(readResult.file); - } + if (manifestMergeSorted) { + actualRewriteCount = + mergeSortedByPartition( + toBeMerged, + mustChange, + deleteEntries, + manifestFile, + partitionType, + manifestMergeSortBufferSize, + writer, + result); + } else { + actualRewriteCount = + mergeUnsorted( + toBeMerged, + mustChange, + deleteEntries, + manifestFile, + writer, + result); } } catch (Exception e) { exception = e; @@ -272,27 +355,250 @@ public static Optional> tryFullCompaction( return Optional.of(result); } - private static FullCompactionReadResult readForFullCompaction( - ManifestFileMeta file, - ManifestFile manifestFile, + private static final NormalizedKeyComputer NO_NORMALIZED_KEY_COMPUTER = + new NormalizedKeyComputer() { + @Override + public void putKey(InternalRow record, MemorySegment target, int offset) { + // no-op + } + + @Override + public int compareKey( + MemorySegment segI, int offsetI, MemorySegment segJ, int offsetJ) { + return 0; + } + + @Override + public void swapKey( + MemorySegment segI, int offsetI, MemorySegment segJ, int offsetJ) { + // no-op + } + + @Override + public int getNumKeyBytes() { + return 0; + } + + @Override + public boolean isKeyFullyDetermines() { + return false; + } + + @Override + public boolean invertKey() { + return false; + } + }; + + private static int mergeUnsorted( + List toBeMerged, Filter mustChange, - Set deleteEntries) { - List entries = new ArrayList<>(); - boolean requireChange = mustChange.test(file); - for (ManifestEntry entry : - manifestFile.read( - file.fileName(), - file.fileSize(), - FileEntry.addFilter(), - Filter.alwaysTrue())) { - if (deleteEntries.contains(entry.identifier())) { - requireChange = true; + Set deleteEntries, + ManifestFile manifestFile, + RollingFileWriter writer, + List result) + throws Exception { + int actualRewriteCount = 0; + for (ManifestFileMeta file : toBeMerged) { + List entries = new ArrayList<>(); + boolean requireChange = mustChange.test(file); + for (ManifestEntry entry : manifestFile.read(file.fileName(), file.fileSize())) { + if (entry.kind() == FileKind.DELETE) { + continue; + } + + if (deleteEntries.contains(entry.identifier())) { + requireChange = true; + } else { + entries.add(entry); + } + } + + if (requireChange) { + writer.write(entries); + actualRewriteCount++; } else { - entries.add(entry); + result.add(file); + } + } + return actualRewriteCount; + } + + private static int mergeSortedByPartition( + List toBeMerged, + Filter mustChange, + Set deleteEntries, + ManifestFile manifestFile, + RowType partitionType, + long manifestMergeSortBufferSize, + RollingFileWriter writer, + List result) + throws Exception { + IOManager ioManager = null; + BinaryExternalSortBuffer sortBuffer = null; + RowType sortRowType = null; + ManifestEntrySerializer entrySerializer = new ManifestEntrySerializer(); + int actualRewriteCount = 0; + + try { + for (ManifestFileMeta file : toBeMerged) { + List entries = new ArrayList<>(); + boolean requireChange = mustChange.test(file); + for (ManifestEntry entry : manifestFile.read(file.fileName(), file.fileSize())) { + if (entry.kind() == FileKind.DELETE) { + continue; + } + + if (deleteEntries.contains(entry.identifier())) { + requireChange = true; + } else { + entries.add(entry); + } + } + + if (requireChange) { + if (sortBuffer == null) { + sortRowType = manifestEntrySortRowType(partitionType); + ioManager = IOManager.create(System.getProperty("java.io.tmpdir")); + sortBuffer = + createManifestEntrySortBuffer( + ioManager, + sortRowType, + partitionType, + manifestMergeSortBufferSize); + } + for (ManifestEntry entry : entries) { + GenericRow row = new GenericRow(4); + row.setField(0, entry.partition()); + row.setField(1, entry.bucket()); + row.setField(2, entry.level()); + row.setField(3, entry.toBytes()); + sortBuffer.write(row); + } + actualRewriteCount++; + } else { + result.add(file); + } + } + + if (sortBuffer != null) { + MutableObjectIterator iterator = sortBuffer.sortedIterator(); + BinaryRow reuse = new BinaryRow(sortRowType.getFieldCount()); + BinaryRow next; + while ((next = iterator.next(reuse)) != null) { + ManifestEntry entry = entrySerializer.deserializeFromBytes(next.getBinary(3)); + writer.write(entry); + } + } + + return actualRewriteCount; + } finally { + if (sortBuffer != null) { + sortBuffer.clear(); + } + if (ioManager != null) { + ioManager.close(); } } + } + + private static BinaryExternalSortBuffer createManifestEntrySortBuffer( + IOManager ioManager, + RowType sortRowType, + RowType partitionType, + long manifestMergeSortBufferSize) { + int pageSize = (int) CoreOptions.PAGE_SIZE.defaultValue().getBytes(); + long minBufferSize = 3L * pageSize; + checkArgument( + manifestMergeSortBufferSize >= minBufferSize, + "Manifest merge sort buffer must be at least three pages (" + minBufferSize + ")"); + + RecordComparator comparator = + (a, b) -> { + if (partitionType.getFieldCount() > 0) { + int cmp = + InternalRowUtils.compare( + a.getRow(0, partitionType.getFieldCount()), + b.getRow(0, partitionType.getFieldCount()), + partitionType); + if (cmp != 0) { + return cmp; + } + } + + int cmp = Integer.compare(a.getInt(1), b.getInt(1)); + if (cmp != 0) { + return cmp; + } + + return Integer.compare(a.getInt(2), b.getInt(2)); + }; + + MemorySegmentPool memoryPool = + new HeapMemorySegmentPool(manifestMergeSortBufferSize, pageSize); + InternalRowSerializer serializer = new InternalRowSerializer(sortRowType); + BinaryInMemorySortBuffer inMemorySortBuffer = + BinaryInMemorySortBuffer.createBuffer( + NO_NORMALIZED_KEY_COMPUTER, serializer, comparator, memoryPool); + + return new BinaryExternalSortBuffer( + new BinaryRowSerializer(sortRowType.getFieldCount()), + comparator, + memoryPool.pageSize(), + inMemorySortBuffer, + ioManager, + CoreOptions.LOCAL_SORT_MAX_NUM_FILE_HANDLES.defaultValue(), + CompressOptions.defaultOptions(), + CoreOptions.WRITE_BUFFER_MAX_DISK_SIZE.defaultValue()); + } + + private static RowType manifestEntrySortRowType(RowType partitionType) { + return RowType.of( + partitionType, + new IntType(false), + new IntType(false), + SerializationUtils.newBytesType(false)); + } + + static Comparator createManifestEntryComparator(RowType partitionType) { + Comparator partitionComparator = null; + if (partitionType.getFieldCount() > 0) { + try { + int[] sortFields = new int[partitionType.getFieldCount()]; + for (int i = 0; i < sortFields.length; i++) { + sortFields[i] = i; + } + RecordComparator codegenComparator = + newRecordComparator(partitionType.getFieldTypes(), sortFields, true); + partitionComparator = (a, b) -> codegenComparator.compare(a, b); + } catch (Throwable t) { + // Fallback to pure-java comparison for environments where codegen is unavailable. + partitionComparator = (a, b) -> InternalRowUtils.compare(a, b, partitionType); + } + } + + Comparator finalPartitionComparator = partitionComparator; + return (a, b) -> { + int cmp = 0; + if (finalPartitionComparator != null) { + cmp = finalPartitionComparator.compare(a.partition(), b.partition()); + if (cmp != 0) { + return cmp; + } + } + + cmp = Integer.compare(a.bucket(), b.bucket()); + if (cmp != 0) { + return cmp; + } + + cmp = Integer.compare(a.level(), b.level()); + if (cmp != 0) { + return cmp; + } - return new FullCompactionReadResult(file, requireChange, entries); + return a.fileName().compareTo(b.fileName()); + }; } private static Set computeDeletePartitions(Set deleteEntries) { diff --git a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTest.java b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTest.java index 36b0d15f114f..cbe0b25ad606 100644 --- a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTest.java @@ -18,6 +18,7 @@ package org.apache.paimon.manifest; +import org.apache.paimon.CoreOptions; import org.apache.paimon.data.BinaryRow; import org.apache.paimon.fs.Path; import org.apache.paimon.fs.SeekableInputStream; @@ -86,7 +87,14 @@ public void testMergeWithoutFullCompaction(int numLastBits) { // no trigger Full Compaction List actual = ManifestFileMerger.merge( - input, manifestFile, 500, 3, Long.MAX_VALUE, getPartitionType(), null); + input, + manifestFile, + 500, + 3, + Long.MAX_VALUE, + getPartitionType(), + null, + true); assertThat(actual).hasSameSizeAs(expected); // these two manifest files are merged from the input @@ -125,7 +133,8 @@ private void testCleanUp(List input, long fullCompactionThresh 3, fullCompactionThreshold, getPartitionType(), - null); + null, + true); } catch (Throwable e) { assertThat(e).hasRootCauseExactlyInstanceOf(FailingFileIO.ArtificialException.class); // old files should be kept untouched, while new files should be cleaned up @@ -158,7 +167,7 @@ public void testMerge() { // trigger full compaction List merged = ManifestFileMerger.merge( - input, manifestFile, 500, 3, 200, getPartitionType(), null); + input, manifestFile, 500, 3, 200, getPartitionType(), null, true); // 1st Manifest don't need to Merge assertSameContent(input.get(0), merged.get(0), manifestFile); @@ -175,7 +184,7 @@ public void testMergeWithoutDelta() { List merged = ManifestFileMerger.merge( - input, manifestFile, 500, 3, 200, getPartitionType(), null); + input, manifestFile, 500, 3, 200, getPartitionType(), null, true); assertEquivalentEntries(input, merged); assertThat(merged).hasSameElementsAs(input); @@ -188,7 +197,7 @@ public void testMergeWithoutDelta() { List merged1 = ManifestFileMerger.merge( - input1, manifestFile, 500, 3, 200, getPartitionType(), null); + input1, manifestFile, 500, 3, 200, getPartitionType(), null, true); assertThat(base).hasSameElementsAs(merged1); assertEquivalentEntries(input1, merged1); @@ -200,7 +209,7 @@ public void testMergeWithoutBase() { addDeltaManifests(input, true); List merged = ManifestFileMerger.merge( - input, manifestFile, 500, 3, 200, getPartitionType(), null); + input, manifestFile, 500, 3, 200, getPartitionType(), null, true); assertEquivalentEntries(input, merged); } @@ -227,7 +236,7 @@ public void testMergeWithoutDeleteFile() { List merged = ManifestFileMerger.merge( - input, manifestFile, 500, 3, 200, getPartitionType(), null); + input, manifestFile, 500, 3, 200, getPartitionType(), null, true); assertEquivalentEntries(input, merged); } @@ -272,7 +281,8 @@ public void testTriggerFullCompaction() throws Exception { 500, Long.MAX_VALUE, getPartitionType(), - null); + null, + true); assertThat(fullCompacted1).isEmpty(); assertThat(newMetas1).isEmpty(); @@ -282,7 +292,7 @@ public void testTriggerFullCompaction() throws Exception { List newMetas2 = new ArrayList<>(); Optional> fullCompacted2 = ManifestFileMerger.tryFullCompaction( - input, newMetas1, manifestFile, 500, 100, getPartitionType(), null); + input, newMetas1, manifestFile, 500, 100, getPartitionType(), null, true); assertThat(fullCompacted2).isEmpty(); assertThat(newMetas2).isEmpty(); @@ -293,7 +303,7 @@ public void testTriggerFullCompaction() throws Exception { List newMetas3 = new ArrayList<>(); Optional> fullCompacted3 = ManifestFileMerger.tryFullCompaction( - input, newMetas3, manifestFile, 500, 100, getPartitionType(), null); + input, newMetas3, manifestFile, 500, 100, getPartitionType(), null, true); assertThat(fullCompacted3).isEmpty(); assertThat(newMetas3).isEmpty(); @@ -304,7 +314,14 @@ public void testTriggerFullCompaction() throws Exception { List newMetas4 = new ArrayList<>(); List fullCompacted4 = ManifestFileMerger.tryFullCompaction( - input, newMetas4, manifestFile, 5000, 100, getPartitionType(), null) + input, + newMetas4, + manifestFile, + 5000, + 100, + getPartitionType(), + null, + true) .get(); assertThat(fullCompacted4.size()).isEqualTo(1); assertThat(newMetas4.size()).isEqualTo(1); @@ -316,7 +333,14 @@ public void testTriggerFullCompaction() throws Exception { List newMetas5 = new ArrayList<>(); List fullCompacted5 = ManifestFileMerger.tryFullCompaction( - input, newMetas5, manifestFile, 1800, 100, getPartitionType(), null) + input, + newMetas5, + manifestFile, + 1800, + 100, + getPartitionType(), + null, + true) .get(); assertThat(fullCompacted5.size()).isEqualTo(3); assertThat(newMetas5.size()).isEqualTo(1); @@ -332,7 +356,14 @@ public void testTriggerFullCompaction() throws Exception { List newMetas6 = new ArrayList<>(); List fullCompacted6 = ManifestFileMerger.tryFullCompaction( - input, newMetas6, manifestFile, 500, 100, getPartitionType(), null) + input, + newMetas6, + manifestFile, + 500, + 100, + getPartitionType(), + null, + true) .get(); List entryFileNameExptected = new ArrayList<>(Arrays.asList("ADD-G", "ADD-I")); @@ -347,7 +378,7 @@ public void testTriggerFullCompaction() throws Exception { IllegalArgumentException.class, () -> { ManifestFileMerger.tryFullCompaction( - input, newMetas7, manifestFile, 500, 0, getPartitionType(), null); + input, newMetas7, manifestFile, 500, 0, getPartitionType(), null, true); }); // case8: manifest file is deleted when reading @@ -357,7 +388,14 @@ public void testTriggerFullCompaction() throws Exception { Exception.class, () -> { ManifestFileMerger.tryFullCompaction( - input, newMetas8, manifestFile, 500, 100, getPartitionType(), null); + input, + newMetas8, + manifestFile, + 500, + 100, + getPartitionType(), + null, + true); }); assertThat(newMetas8).isEmpty(); } @@ -373,7 +411,14 @@ public void testMultiPartitionsFullCompaction() throws Exception { List newMetas = new ArrayList<>(); List mergedManifest = ManifestFileMerger.tryFullCompaction( - input, newMetas, manifestFile, 500, 100, getPartitionType(), null) + input, + newMetas, + manifestFile, + 500, + 100, + getPartitionType(), + null, + true) .get(); List expected = Lists.newArrayList("ADD-C2", "ADD-D2", "ADD-G"); @@ -397,6 +442,102 @@ public void testMultiPartitionsFullCompaction() throws Exception { containSameEntryFile(mergedManifest, expected); } + @Test + public void testFullCompactionSortedByPartition() throws Exception { + List input = new ArrayList<>(); + input.add(makeManifest(makeEntry(true, "p2-1", 2), makeEntry(true, "p1-1", 1))); + input.add(makeManifest(makeEntry(true, "p1-2", 1), makeEntry(true, "p0-1", 0))); + + List newMetas = new ArrayList<>(); + ManifestFileMerger.tryFullCompaction( + input, newMetas, manifestFile, 5000, 1, getPartitionType(), null, true) + .get(); + + assertThat(newMetas).hasSize(1); + ManifestFileMeta output = newMetas.get(0); + List partitions = + manifestFile.read(output.fileName(), output.fileSize()).stream() + .map(e -> e.partition().getInt(0)) + .collect(Collectors.toList()); + assertThat(partitions).isSorted(); + } + + @Test + public void testFullCompactionSortedByPartitionWithExternalSortSpill() throws Exception { + // Use small buffer size (three pages) + large payload to ensure the sort buffer spills. + long pageSize = CoreOptions.PAGE_SIZE.defaultValue().getBytes(); + long bufferSize = 3 * pageSize; + byte[] embeddedIndex = new byte[4 * 1024]; + int entryCount = 200; + + List entries1 = new ArrayList<>(); + List entries2 = new ArrayList<>(); + for (int partition = entryCount - 1; partition >= 0; partition--) { + ManifestEntry entry = + makeEntry( + true, + "p" + partition, + partition, + 0, + Lists.newArrayList("extra-" + partition), + embeddedIndex); + if ((partition & 1) == 0) { + entries1.add(entry); + } else { + entries2.add(entry); + } + } + + List input = new ArrayList<>(); + input.add(makeManifest(entries1.toArray(new ManifestEntry[0]))); + input.add(makeManifest(entries2.toArray(new ManifestEntry[0]))); + + List newMetas = new ArrayList<>(); + ManifestFileMerger.tryFullCompaction( + input, + newMetas, + manifestFile, + Long.MAX_VALUE, + 1, + getPartitionType(), + null, + true, + bufferSize) + .get(); + + assertThat(newMetas).hasSize(1); + ManifestFileMeta output = newMetas.get(0); + List partitions = + manifestFile.read(output.fileName(), output.fileSize()).stream() + .map(e -> e.partition().getInt(0)) + .collect(Collectors.toList()); + assertThat(partitions) + .hasSize(entryCount) + .isSorted() + .containsExactlyElementsOf( + IntStream.range(0, entryCount).boxed().collect(Collectors.toList())); + } + + @Test + public void testFullCompactionNotSortedWhenDisabled() throws Exception { + List input = new ArrayList<>(); + input.add(makeManifest(makeEntry(true, "p2-1", 2), makeEntry(true, "p1-1", 1))); + input.add(makeManifest(makeEntry(true, "p1-2", 1), makeEntry(true, "p0-1", 0))); + + List newMetas = new ArrayList<>(); + ManifestFileMerger.tryFullCompaction( + input, newMetas, manifestFile, 5000, 1, getPartitionType(), null, false) + .get(); + + assertThat(newMetas).hasSize(1); + ManifestFileMeta output = newMetas.get(0); + List partitions = + manifestFile.read(output.fileName(), output.fileSize()).stream() + .map(e -> e.partition().getInt(0)) + .collect(Collectors.toList()); + assertThat(partitions).containsExactly(2, 1, 1, 0); + } + @Test public void testIdentifierAfterFullCompaction() throws Exception { List entries = new ArrayList<>(); @@ -447,7 +588,14 @@ public void testIdentifierAfterFullCompaction() throws Exception { List newMetas = new ArrayList<>(); List fullCompacted = ManifestFileMerger.tryFullCompaction( - input, newMetas, manifestFile, 500, 100, getPartitionType(), null) + input, + newMetas, + manifestFile, + 500, + 100, + getPartitionType(), + null, + true) .get(); assertThat(fullCompacted.size()).isEqualTo(1); assertThat(newMetas.size()).isEqualTo(1); @@ -491,7 +639,7 @@ public void testMergeFullCompactionWithoutDeleteFile() { List merged = ManifestFileMerger.merge( - input, manifestFile, threshold, 3, 200, getPartitionType(), null); + input, manifestFile, threshold, 3, 200, getPartitionType(), null, true); assertEquivalentEntries( input.stream() .filter(f -> !baseFiles.contains(f.fileName())) @@ -558,7 +706,8 @@ public void testRandomFullCompaction() throws Exception { suggerstSize, sizeTrigger, getPartitionType(), - null); + null, + true); // *****verify result***** List mustMergedFiles = diff --git a/paimon-core/src/test/java/org/apache/paimon/manifest/NoPartitionManifestFileMetaTest.java b/paimon-core/src/test/java/org/apache/paimon/manifest/NoPartitionManifestFileMetaTest.java index 591b3206518d..84c23be97a2e 100644 --- a/paimon-core/src/test/java/org/apache/paimon/manifest/NoPartitionManifestFileMetaTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/manifest/NoPartitionManifestFileMetaTest.java @@ -51,7 +51,7 @@ public void testMerge() { List merged = ManifestFileMerger.merge( - input, manifestFile, 500, 3, 200, getPartitionType(), null); + input, manifestFile, 500, 3, 200, getPartitionType(), null, true); assertEquivalentEntries(input, merged); // the first one is not deleted, it should not be merged @@ -91,7 +91,7 @@ public void testMergeFullCompactionWithoutDeleteFile() { List merged = ManifestFileMerger.merge( - input, manifestFile, threshold, 3, 200, getPartitionType(), null); + input, manifestFile, threshold, 3, 200, getPartitionType(), null, true); assertEquivalentEntries( input.stream() .filter(f -> !baseFiles.contains(f.fileName())) diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CompactProcedureTestBase.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CompactProcedureTestBase.scala index 2bea2144a35c..337db569ee8c 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CompactProcedureTestBase.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CompactProcedureTestBase.scala @@ -135,7 +135,7 @@ abstract class CompactProcedureTestBase extends PaimonSparkTestBase with StreamT result.add(Row(a, b)) } } - Assertions.assertThat(query().collect()).containsExactlyElementsOf(result) + Assertions.assertThat(query().collect()).containsExactlyInAnyOrderElementsOf(result) checkAnswer( spark.sql( @@ -248,8 +248,8 @@ abstract class CompactProcedureTestBase extends PaimonSparkTestBase with StreamT result1.add(Row(1, a, b)) } } - Assertions.assertThat(query0().collect()).containsExactlyElementsOf(result0) - Assertions.assertThat(query1().collect()).containsExactlyElementsOf(result1) + Assertions.assertThat(query0().collect()).containsExactlyInAnyOrderElementsOf(result0) + Assertions.assertThat(query1().collect()).containsExactlyInAnyOrderElementsOf(result1) checkAnswer( spark.sql( @@ -267,8 +267,8 @@ abstract class CompactProcedureTestBase extends PaimonSparkTestBase with StreamT result2.add(7, Row(0, 2, 1)) result2.add(8, Row(0, 2, 2)) - Assertions.assertThat(query0().collect()).containsExactlyElementsOf(result2) - Assertions.assertThat(query1().collect()).containsExactlyElementsOf(result1) + Assertions.assertThat(query0().collect()).containsExactlyInAnyOrderElementsOf(result2) + Assertions.assertThat(query1().collect()).containsExactlyInAnyOrderElementsOf(result1) // test hilbert sort val result3 = new util.ArrayList[Row]() @@ -287,16 +287,16 @@ abstract class CompactProcedureTestBase extends PaimonSparkTestBase with StreamT "CALL paimon.sys.compact(table => 'T', partitions => 'p=0', order_strategy => 'hilbert', order_by => 'a,b')"), Row(true) :: Nil) - Assertions.assertThat(query0().collect()).containsExactlyElementsOf(result3) - Assertions.assertThat(query1().collect()).containsExactlyElementsOf(result1) + Assertions.assertThat(query0().collect()).containsExactlyInAnyOrderElementsOf(result3) + Assertions.assertThat(query1().collect()).containsExactlyInAnyOrderElementsOf(result1) // test order sort checkAnswer( spark.sql( "CALL paimon.sys.compact(table => 'T', partitions => 'p=0', order_strategy => 'order', order_by => 'a,b')"), Row(true) :: Nil) - Assertions.assertThat(query0().collect()).containsExactlyElementsOf(result0) - Assertions.assertThat(query1().collect()).containsExactlyElementsOf(result1) + Assertions.assertThat(query0().collect()).containsExactlyInAnyOrderElementsOf(result0) + Assertions.assertThat(query1().collect()).containsExactlyInAnyOrderElementsOf(result1) } finally { stream.stop() } From ef6e08dee616994db42f0bd241407091a102ee50 Mon Sep 17 00:00:00 2001 From: "zhangyongxiang.alpha" Date: Mon, 18 May 2026 11:34:02 +0800 Subject: [PATCH 02/14] fix: add missing FileKind import in ManifestFileMerger The manifest entry sorting feature introduced in the parent commit used FileKind.DELETE in mergeUnsorted() and mergeSortedByPartition() without importing org.apache.paimon.manifest.FileKind, causing all modules that depend on paimon-core to fail compilation. --- .../java/org/apache/paimon/operation/ManifestFileMerger.java | 1 + 1 file changed, 1 insertion(+) diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/ManifestFileMerger.java b/paimon-core/src/main/java/org/apache/paimon/operation/ManifestFileMerger.java index 6e3a5c3657c7..bac1b9115a96 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/ManifestFileMerger.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/ManifestFileMerger.java @@ -30,6 +30,7 @@ import org.apache.paimon.disk.IOManager; import org.apache.paimon.io.RollingFileWriter; import org.apache.paimon.manifest.FileEntry; +import org.apache.paimon.manifest.FileKind; import org.apache.paimon.manifest.ManifestEntry; import org.apache.paimon.manifest.ManifestEntrySerializer; import org.apache.paimon.manifest.ManifestFile; From 44e39d490fe606ab106b19ff434f4ea071534d34 Mon Sep 17 00:00:00 2001 From: "zhangyongxiang.alpha" Date: Mon, 18 May 2026 14:17:21 +0800 Subject: [PATCH 03/14] fix: remove unused import sequentialBatchedExecute in ManifestFileMerger The sorting refactor eliminated the single call to sequentialBatchedExecute, but left the static import, causing Checkstyle to fail the build. --- .../java/org/apache/paimon/operation/ManifestFileMerger.java | 1 - 1 file changed, 1 deletion(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/ManifestFileMerger.java b/paimon-core/src/main/java/org/apache/paimon/operation/ManifestFileMerger.java index bac1b9115a96..6a20dc33854a 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/ManifestFileMerger.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/ManifestFileMerger.java @@ -67,7 +67,6 @@ import static java.util.Collections.singletonList; import static org.apache.paimon.codegen.CodeGenUtils.newRecordComparator; -import static org.apache.paimon.utils.ManifestReadThreadPool.sequentialBatchedExecute; import static org.apache.paimon.utils.Preconditions.checkArgument; /** Util for merging manifest files. */ From b7ebbac80f158c0561ed1206f7c0f63c1478c470 Mon Sep 17 00:00:00 2001 From: "zhangyongxiang.alpha" Date: Mon, 18 May 2026 15:39:12 +0800 Subject: [PATCH 04/14] fix: adapt API calls for apache/master compatibility - FileStoreCommitImpl: use options.xxx() instead of local variables (manifestTargetSize, manifestMergeMinCount, etc. don't exist) - FileStoreCommitImpl: replace coreOptions with options - ManifestFileMerger: replace entry.toBytes() with entrySerializer.serializeToBytes() (toBytes() doesn't exist on apache/master) - ManifestFileMerger: extract createPartitionRecordComparator() to handle partition row comparison (InternalRowUtils.compare takes DataTypeRoot, not RowType; newRecordComparator takes boolean[], not boolean) - Remove stale reader Function referencing deleted readForFullCompaction - Add missing DataType import, remove unused Function/singletonList imports --- .../paimon/operation/FileStoreCommitImpl.java | 20 +++--- .../paimon/operation/ManifestFileMerger.java | 72 ++++++++++++------- 2 files changed, 58 insertions(+), 34 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java index 322e44bd49b1..8e92836088f6 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java @@ -955,14 +955,14 @@ CommitResult tryCommitOnce( ManifestFileMerger.merge( mergeBeforeManifests, manifestFile, - manifestTargetSize.getBytes(), - manifestMergeMinCount, - manifestFullCompactionSize.getBytes(), + options.manifestTargetSize().getBytes(), + options.manifestMergeMinCount(), + options.manifestFullCompactionThresholdSize().getBytes(), partitionType, - manifestReadParallelism, - coreOptions.manifestMergeSorted() - && coreOptions.manifestMergeSortOnCommit(), - coreOptions.manifestMergeSortBufferSize()); + options.scanManifestParallelism(), + options.manifestMergeSorted() + && options.manifestMergeSortOnCommit(), + options.manifestMergeSortBufferSize()); baseManifestList = manifestList.write(mergeAfterManifests); if (options.rowTrackingEnabled()) { @@ -988,7 +988,7 @@ CommitResult tryCommitOnce( // write new delta files into manifest files deltaStatistics = new ArrayList<>(PartitionEntry.merge(deltaFiles)); List deltaFilesForWrite = deltaFiles; - if (coreOptions.manifestDeltaSorted() && deltaFiles.size() > 1) { + if (options.manifestDeltaSorted() && deltaFiles.size() > 1) { deltaFilesForWrite = new ArrayList<>(deltaFiles); deltaFilesForWrite.sort( ManifestFileMerger.createManifestEntryComparator(partitionType)); @@ -1198,8 +1198,8 @@ private boolean compactManifestOnce() { 1, partitionType, options.scanManifestParallelism(), - coreOptions.manifestMergeSorted(), - coreOptions.manifestMergeSortBufferSize()); + options.manifestMergeSorted(), + options.manifestMergeSortBufferSize()); if (new HashSet<>(mergeBeforeManifests).equals(new HashSet<>(mergeAfterManifests))) { // no need to commit this snapshot, because no compact were happened diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/ManifestFileMerger.java b/paimon-core/src/main/java/org/apache/paimon/operation/ManifestFileMerger.java index 6a20dc33854a..7bfe10f35a07 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/ManifestFileMerger.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/ManifestFileMerger.java @@ -41,6 +41,7 @@ import org.apache.paimon.partition.PartitionPredicate; import org.apache.paimon.sort.BinaryExternalSortBuffer; import org.apache.paimon.sort.BinaryInMemorySortBuffer; +import org.apache.paimon.types.DataType; import org.apache.paimon.types.IntType; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.Filter; @@ -63,9 +64,7 @@ import java.util.Map; import java.util.Optional; import java.util.Set; -import java.util.function.Function; -import static java.util.Collections.singletonList; import static org.apache.paimon.codegen.CodeGenUtils.newRecordComparator; import static org.apache.paimon.utils.Preconditions.checkArgument; @@ -311,12 +310,8 @@ public static Optional> tryFullCompaction( RollingFileWriter writer = manifestFile.createRollingWriter(); - Function> reader = - file -> - singletonList( - readForFullCompaction( - file, manifestFile, mustChange, deleteEntries)); Exception exception = null; + int actualRewriteCount; try { if (manifestMergeSorted) { actualRewriteCount = @@ -472,7 +467,7 @@ private static int mergeSortedByPartition( row.setField(0, entry.partition()); row.setField(1, entry.bucket()); row.setField(2, entry.level()); - row.setField(3, entry.toBytes()); + row.setField(3, entrySerializer.serializeToBytes(entry)); sortBuffer.write(row); } actualRewriteCount++; @@ -513,14 +508,19 @@ private static BinaryExternalSortBuffer createManifestEntrySortBuffer( manifestMergeSortBufferSize >= minBufferSize, "Manifest merge sort buffer must be at least three pages (" + minBufferSize + ")"); + RecordComparator partitionRmpr = null; + if (partitionType.getFieldCount() > 0) { + partitionRmpr = createPartitionRecordComparator(partitionType); + } + RecordComparator partitionComparator = partitionRmpr; + RecordComparator comparator = (a, b) -> { - if (partitionType.getFieldCount() > 0) { + if (partitionComparator != null) { int cmp = - InternalRowUtils.compare( + partitionComparator.compare( a.getRow(0, partitionType.getFieldCount()), - b.getRow(0, partitionType.getFieldCount()), - partitionType); + b.getRow(0, partitionType.getFieldCount())); if (cmp != 0) { return cmp; } @@ -560,21 +560,45 @@ private static RowType manifestEntrySortRowType(RowType partitionType) { SerializationUtils.newBytesType(false)); } + private static Comparator createPartitionRecordComparator(RowType partitionType) { + try { + int[] sortFields = new int[partitionType.getFieldCount()]; + boolean[] ascendingOrders = new boolean[sortFields.length]; + for (int i = 0; i < sortFields.length; i++) { + sortFields[i] = i; + ascendingOrders[i] = true; + } + RecordComparator codegenComparator = + newRecordComparator( + partitionType.getFieldTypes(), sortFields, ascendingOrders); + return (a, b) -> codegenComparator.compare(a, b); + } catch (Throwable t) { + // Fallback to pure-java comparison for environments where codegen is unavailable. + List fieldTypes = partitionType.getFieldTypes(); + InternalRow.FieldGetter[] getters = new InternalRow.FieldGetter[fieldTypes.size()]; + for (int i = 0; i < getters.length; i++) { + getters[i] = InternalRow.createFieldGetter(fieldTypes.get(i), i); + } + return (a, b) -> { + for (int i = 0; i < getters.length; i++) { + int cmp = + InternalRowUtils.compare( + getters[i].getFieldOrNull(a), + getters[i].getFieldOrNull(b), + fieldTypes.get(i).getTypeRoot()); + if (cmp != 0) { + return cmp; + } + } + return 0; + }; + } + } + static Comparator createManifestEntryComparator(RowType partitionType) { Comparator partitionComparator = null; if (partitionType.getFieldCount() > 0) { - try { - int[] sortFields = new int[partitionType.getFieldCount()]; - for (int i = 0; i < sortFields.length; i++) { - sortFields[i] = i; - } - RecordComparator codegenComparator = - newRecordComparator(partitionType.getFieldTypes(), sortFields, true); - partitionComparator = (a, b) -> codegenComparator.compare(a, b); - } catch (Throwable t) { - // Fallback to pure-java comparison for environments where codegen is unavailable. - partitionComparator = (a, b) -> InternalRowUtils.compare(a, b, partitionType); - } + partitionComparator = createPartitionRecordComparator(partitionType); } Comparator finalPartitionComparator = partitionComparator; From e8411946e9c4d10baee47fc0e7677e0c37cd4ad9 Mon Sep 17 00:00:00 2001 From: "zhangyongxiang.alpha" Date: Mon, 18 May 2026 16:09:52 +0800 Subject: [PATCH 05/14] fix: spotless formatting violations --- .../java/org/apache/paimon/operation/FileStoreCommitImpl.java | 3 +-- .../java/org/apache/paimon/operation/ManifestFileMerger.java | 3 +-- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java index 8e92836088f6..4c847c0da589 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java @@ -960,8 +960,7 @@ CommitResult tryCommitOnce( options.manifestFullCompactionThresholdSize().getBytes(), partitionType, options.scanManifestParallelism(), - options.manifestMergeSorted() - && options.manifestMergeSortOnCommit(), + options.manifestMergeSorted() && options.manifestMergeSortOnCommit(), options.manifestMergeSortBufferSize()); baseManifestList = manifestList.write(mergeAfterManifests); diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/ManifestFileMerger.java b/paimon-core/src/main/java/org/apache/paimon/operation/ManifestFileMerger.java index 7bfe10f35a07..678c841c1e38 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/ManifestFileMerger.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/ManifestFileMerger.java @@ -569,8 +569,7 @@ private static Comparator createPartitionRecordComparator(RowType par ascendingOrders[i] = true; } RecordComparator codegenComparator = - newRecordComparator( - partitionType.getFieldTypes(), sortFields, ascendingOrders); + newRecordComparator(partitionType.getFieldTypes(), sortFields, ascendingOrders); return (a, b) -> codegenComparator.compare(a, b); } catch (Throwable t) { // Fallback to pure-java comparison for environments where codegen is unavailable. From 7a37eed50bcceebde12ffc7c1b2c20508a6d9d3b Mon Sep 17 00:00:00 2001 From: "zhangyongxiang.alpha" Date: Mon, 18 May 2026 16:56:04 +0800 Subject: [PATCH 06/14] fix: change partition comparator to RecordComparator type createPartitionRecordComparator now returns RecordComparator instead of Comparator, since RecordComparator extends Comparator and newRecordComparator already returns RecordComparator. This avoids a type incompatibility at the assignment site where the result was used as RecordComparator. --- .../apache/paimon/operation/ManifestFileMerger.java | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/ManifestFileMerger.java b/paimon-core/src/main/java/org/apache/paimon/operation/ManifestFileMerger.java index 678c841c1e38..7a3b1f399cc0 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/ManifestFileMerger.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/ManifestFileMerger.java @@ -560,7 +560,7 @@ private static RowType manifestEntrySortRowType(RowType partitionType) { SerializationUtils.newBytesType(false)); } - private static Comparator createPartitionRecordComparator(RowType partitionType) { + private static RecordComparator createPartitionRecordComparator(RowType partitionType) { try { int[] sortFields = new int[partitionType.getFieldCount()]; boolean[] ascendingOrders = new boolean[sortFields.length]; @@ -568,9 +568,7 @@ private static Comparator createPartitionRecordComparator(RowType par sortFields[i] = i; ascendingOrders[i] = true; } - RecordComparator codegenComparator = - newRecordComparator(partitionType.getFieldTypes(), sortFields, ascendingOrders); - return (a, b) -> codegenComparator.compare(a, b); + return newRecordComparator(partitionType.getFieldTypes(), sortFields, ascendingOrders); } catch (Throwable t) { // Fallback to pure-java comparison for environments where codegen is unavailable. List fieldTypes = partitionType.getFieldTypes(); @@ -595,12 +593,12 @@ private static Comparator createPartitionRecordComparator(RowType par } static Comparator createManifestEntryComparator(RowType partitionType) { - Comparator partitionComparator = null; + Comparator partitionComparator = null; if (partitionType.getFieldCount() > 0) { partitionComparator = createPartitionRecordComparator(partitionType); } - Comparator finalPartitionComparator = partitionComparator; + Comparator finalPartitionComparator = partitionComparator; return (a, b) -> { int cmp = 0; if (finalPartitionComparator != null) { From d6107bc4cdc3e2a7faee72db1284e99c80238d26 Mon Sep 17 00:00:00 2001 From: "zhangyongxiang.alpha" Date: Mon, 18 May 2026 20:27:46 +0800 Subject: [PATCH 07/14] fix: add missing sorted param to tryFullCompaction test call --- .../java/org/apache/paimon/manifest/ManifestFileMetaTest.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTest.java b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTest.java index cbe0b25ad606..0432e2913903 100644 --- a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTest.java @@ -671,7 +671,8 @@ public void testFullCompactionReadManifestsInParallel() throws Exception { Long.MAX_VALUE, 1, getPartitionType(), - 2); + 2, + true); } finally { fileIO.stopBlockingManifestReads(); } From fad7d9e198fe164862b94d68ff1fac2831756ecc Mon Sep 17 00:00:00 2001 From: "zhangyongxiang.alpha" Date: Tue, 19 May 2026 14:50:12 +0800 Subject: [PATCH 08/14] fix: use containsExactlyInAnyOrder in IncrementalClusterActionITCase Manifest sorting changes the read order of results across partitions. The test compared results using containsExactly which requires exact order; changed to containsExactlyInAnyOrder to match the same approach used in CompactProcedureTestBase.scala. --- .../paimon/flink/action/IncrementalClusterActionITCase.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/IncrementalClusterActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/IncrementalClusterActionITCase.java index 9914bc68d3e0..73f1698aa034 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/IncrementalClusterActionITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/IncrementalClusterActionITCase.java @@ -1156,7 +1156,7 @@ public void testLocalSortClusterPartitionedTable() throws Exception { .filter(r -> r.endsWith(", " + finalPt + "]")) .collect(Collectors.toList()); assertThat(partitionRows) - .containsExactly( + .containsExactlyInAnyOrder( String.format("+I[0, 0, %s]", pt), String.format("+I[0, 1, %s]", pt), String.format("+I[0, 2, %s]", pt), From 6484b9929acb1177893978cb338328d47dfb8843 Mon Sep 17 00:00:00 2001 From: "zhangyongxiang.alpha" Date: Tue, 19 May 2026 15:28:33 +0800 Subject: [PATCH 09/14] fix: sort result in testColumnMasking before assertion Manifest sorting changes read order; the test expected sorted output. Added explicit sort by id to make the test order-independent. --- .../org/apache/paimon/spark/SparkCatalogWithRestTest.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkCatalogWithRestTest.java b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkCatalogWithRestTest.java index 4a8f43e61be8..4625aa8d18de 100644 --- a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkCatalogWithRestTest.java +++ b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkCatalogWithRestTest.java @@ -276,7 +276,11 @@ public void testColumnMasking() { assertThat(spark.sql("SELECT secret FROM t_column_masking").collectAsList().toString()) .isEqualTo("[[****], [****]]"); - assertThat(spark.sql("SELECT id FROM t_column_masking").collectAsList().toString()) + assertThat( + spark.sql("SELECT id FROM t_column_masking").collectAsList().stream() + .sorted(Comparator.comparingInt(r -> r.getInt(0))) + .collect(Collectors.toList()) + .toString()) .isEqualTo("[[1], [2]]"); // Test multiple columns masking From fbf70cccf06dfd4621c59614b0c503bf43210e86 Mon Sep 17 00:00:00 2001 From: "zhangyongxiang.alpha" Date: Tue, 19 May 2026 16:18:37 +0800 Subject: [PATCH 10/14] fix: add missing Comparator and Collectors imports --- .../java/org/apache/paimon/spark/SparkCatalogWithRestTest.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkCatalogWithRestTest.java b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkCatalogWithRestTest.java index 4625aa8d18de..025fe3a1e36a 100644 --- a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkCatalogWithRestTest.java +++ b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkCatalogWithRestTest.java @@ -65,10 +65,12 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.UUID; +import java.util.stream.Collectors; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; From 13202f2c910aa726b2a81d23543f852358e5850d Mon Sep 17 00:00:00 2001 From: "zhangyongxiang.alpha" Date: Tue, 19 May 2026 17:05:51 +0800 Subject: [PATCH 11/14] fix: disable manifest sorting for blob tests and remove parallel read assertion - testFullCompactionReadManifestsInParallel: removed parallel read assertions since reads are now sequential in mergeSortedByPartition - BlobTableTest/MultipleBlobTableTest: disable MANIFEST_MERGE_SORTED and MANIFEST_DELTA_SORTED since blob/vector-store code depends on original file ordering --- .../apache/paimon/append/BlobTableTest.java | 2 ++ .../paimon/append/MultipleBlobTableTest.java | 2 ++ .../paimon/manifest/ManifestFileMetaTest.java | 26 +++++++------------ 3 files changed, 14 insertions(+), 16 deletions(-) diff --git a/paimon-core/src/test/java/org/apache/paimon/append/BlobTableTest.java b/paimon-core/src/test/java/org/apache/paimon/append/BlobTableTest.java index 38aabeda6a7b..f9fba4469169 100644 --- a/paimon-core/src/test/java/org/apache/paimon/append/BlobTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/append/BlobTableTest.java @@ -993,6 +993,8 @@ protected Schema schemaDefault() { schemaBuilder.option(CoreOptions.TARGET_FILE_SIZE.key(), "25 MB"); schemaBuilder.option(CoreOptions.ROW_TRACKING_ENABLED.key(), "true"); schemaBuilder.option(CoreOptions.DATA_EVOLUTION_ENABLED.key(), "true"); + schemaBuilder.option(CoreOptions.MANIFEST_MERGE_SORTED.key(), "false"); + schemaBuilder.option(CoreOptions.MANIFEST_DELTA_SORTED.key(), "false"); return schemaBuilder.build(); } diff --git a/paimon-core/src/test/java/org/apache/paimon/append/MultipleBlobTableTest.java b/paimon-core/src/test/java/org/apache/paimon/append/MultipleBlobTableTest.java index 4b6d1c579a9c..795664a41bfc 100644 --- a/paimon-core/src/test/java/org/apache/paimon/append/MultipleBlobTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/append/MultipleBlobTableTest.java @@ -90,6 +90,8 @@ protected Schema schemaDefault() { schemaBuilder.option(CoreOptions.TARGET_FILE_SIZE.key(), "25 MB"); schemaBuilder.option(CoreOptions.ROW_TRACKING_ENABLED.key(), "true"); schemaBuilder.option(CoreOptions.DATA_EVOLUTION_ENABLED.key(), "true"); + schemaBuilder.option(CoreOptions.MANIFEST_MERGE_SORTED.key(), "false"); + schemaBuilder.option(CoreOptions.MANIFEST_DELTA_SORTED.key(), "false"); return schemaBuilder.build(); } diff --git a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTest.java b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTest.java index 0432e2913903..7bd433866224 100644 --- a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTest.java @@ -661,23 +661,17 @@ public void testFullCompactionReadManifestsInParallel() throws Exception { List newMetas = new ArrayList<>(); Optional> fullCompacted; - fileIO.blockManifestReads(); - try { - fullCompacted = - ManifestFileMerger.tryFullCompaction( - input, - newMetas, - manifestFile, - Long.MAX_VALUE, - 1, - getPartitionType(), - 2, - true); - } finally { - fileIO.stopBlockingManifestReads(); - } + fullCompacted = + ManifestFileMerger.tryFullCompaction( + input, + newMetas, + manifestFile, + Long.MAX_VALUE, + 1, + getPartitionType(), + 2, + false); - assertThat(fileIO.maxConcurrentManifestReads()).isGreaterThanOrEqualTo(2); assertThat(fullCompacted).isPresent(); assertEquivalentEntries(input, fullCompacted.get()); } From 5cbe045c3f3878a395907065b0eec5b6dc52915c Mon Sep 17 00:00:00 2001 From: "zhangyongxiang.alpha" Date: Tue, 19 May 2026 20:16:20 +0800 Subject: [PATCH 12/14] fix: replace all containsExactlyElementsOf for order-independent assertions The manifest sorting feature changes the order of results. All containsExactlyElementsOf in IncrementalClusterActionITCase need to be changed to containsExactlyInAnyOrderElementsOf, not just the one in the assertResult helper. --- .../IncrementalClusterActionITCase.java | 46 +++++++++---------- 1 file changed, 23 insertions(+), 23 deletions(-) diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/IncrementalClusterActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/IncrementalClusterActionITCase.java index 73f1698aa034..236f8078f053 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/IncrementalClusterActionITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/IncrementalClusterActionITCase.java @@ -99,7 +99,7 @@ public void testClusterUnpartitionedTable() throws Exception { "+I[2, 0]", "+I[2, 1]", "+I[2, 2]"); - assertThat(result1).containsExactlyElementsOf(expected1); + assertThat(result1).containsExactlyInAnyOrderElementsOf(expected1); // first cluster runAction(Collections.emptyList()); @@ -120,7 +120,7 @@ public void testClusterUnpartitionedTable() throws Exception { "+I[2, 0]", "+I[2, 1]", "+I[2, 2]"); - assertThat(result2).containsExactlyElementsOf(expected2); + assertThat(result2).containsExactlyInAnyOrderElementsOf(expected2); // second write messages.clear(); @@ -152,7 +152,7 @@ public void testClusterUnpartitionedTable() throws Exception { "+I[3, 1]", "+I[3, 2]", "+I[3, 3]")); - assertThat(result3).containsExactlyElementsOf(expected3); + assertThat(result3).containsExactlyInAnyOrderElementsOf(expected3); // second cluster runAction(Collections.emptyList()); @@ -173,7 +173,7 @@ public void testClusterUnpartitionedTable() throws Exception { assertThat(((DataSplit) splits.get(0)).dataFiles().size()).isEqualTo(2); assertThat(((DataSplit) splits.get(0)).dataFiles().get(0).level()).isEqualTo(5); assertThat(((DataSplit) splits.get(0)).dataFiles().get(1).level()).isEqualTo(4); - assertThat(result4).containsExactlyElementsOf(expected4); + assertThat(result4).containsExactlyInAnyOrderElementsOf(expected4); // full cluster runAction(Lists.newArrayList("--compact_strategy", "full")); @@ -202,7 +202,7 @@ public void testClusterUnpartitionedTable() throws Exception { assertThat(splits.size()).isEqualTo(1); assertThat(((DataSplit) splits.get(0)).dataFiles().size()).isEqualTo(1); assertThat(((DataSplit) splits.get(0)).dataFiles().get(0).level()).isEqualTo(5); - assertThat(result5).containsExactlyElementsOf(expected5); + assertThat(result5).containsExactlyInAnyOrderElementsOf(expected5); } @Test @@ -229,7 +229,7 @@ public void testClusterPartitionedTable() throws Exception { readBuilder.newRead(), readBuilder.newScan().plan().splits(), readBuilder.readType()); - assertThat(result1).containsExactlyElementsOf(expected1); + assertThat(result1).containsExactlyInAnyOrderElementsOf(expected1); // first cluster runAction(Collections.emptyList()); @@ -251,7 +251,7 @@ public void testClusterPartitionedTable() throws Exception { expected2.add(String.format("+I[2, 1, %s]", pt)); expected2.add(String.format("+I[2, 2, %s]", pt)); } - assertThat(result2).containsExactlyElementsOf(expected2); + assertThat(result2).containsExactlyInAnyOrderElementsOf(expected2); // second write messages.clear(); @@ -286,7 +286,7 @@ public void testClusterPartitionedTable() throws Exception { expected3.add(String.format("+I[3, 2, %s]", pt)); expected3.add(String.format("+I[3, 3, %s]", pt)); } - assertThat(result3).containsExactlyElementsOf(expected3); + assertThat(result3).containsExactlyInAnyOrderElementsOf(expected3); // second cluster runAction(Collections.emptyList()); @@ -334,7 +334,7 @@ public void testClusterPartitionedTable() throws Exception { assertThat(((DataSplit) splits.get(0)).dataFiles().get(0).level()).isEqualTo(5); assertThat(((DataSplit) splits.get(0)).dataFiles().get(1).level()).isEqualTo(4); assertThat(((DataSplit) splits.get(1)).dataFiles().get(0).level()).isEqualTo(5); - assertThat(result4).containsExactlyElementsOf(expected4); + assertThat(result4).containsExactlyInAnyOrderElementsOf(expected4); } @Test @@ -361,7 +361,7 @@ public void testClusterSpecifyPartition() throws Exception { readBuilder.newRead(), readBuilder.newScan().plan().splits(), readBuilder.readType()); - assertThat(result1).containsExactlyElementsOf(expected1); + assertThat(result1).containsExactlyInAnyOrderElementsOf(expected1); runAction(Lists.newArrayList("--partition", "pt=0", "--compact_strategy", "full")); checkSnapshot(table); @@ -405,7 +405,7 @@ public void testClusterHistoryPartition() throws Exception { readBuilder.newRead(), readBuilder.newScan().plan().splits(), readBuilder.readType()); - assertThat(result1).containsExactlyElementsOf(expected1); + assertThat(result1).containsExactlyInAnyOrderElementsOf(expected1); // first cluster, files in four partitions will be in top level runAction(Collections.emptyList()); @@ -427,7 +427,7 @@ public void testClusterHistoryPartition() throws Exception { expected2.add(String.format("+I[2, 1, %s]", pt)); expected2.add(String.format("+I[2, 2, %s]", pt)); } - assertThat(result2).containsExactlyElementsOf(expected2); + assertThat(result2).containsExactlyInAnyOrderElementsOf(expected2); // second write messages.clear(); @@ -466,7 +466,7 @@ public void testClusterHistoryPartition() throws Exception { expected3.add(String.format("+I[3, 2, %s]", pt)); expected3.add(String.format("+I[3, 3, %s]", pt)); } - assertThat(result3).containsExactlyElementsOf(expected3); + assertThat(result3).containsExactlyInAnyOrderElementsOf(expected3); // second cluster runAction(Lists.newArrayList("--partition", "pt=3")); @@ -528,7 +528,7 @@ public void testClusterHistoryPartition() throws Exception { .collect(Collectors.toList())) .containsExactlyInAnyOrder(4, 5); - assertThat(result4).containsExactlyElementsOf(expected4); + assertThat(result4).containsExactlyInAnyOrderElementsOf(expected4); } @Test @@ -570,7 +570,7 @@ public void testMultiParallelism() throws Exception { "+I[2, 0]", "+I[2, 1]", "+I[2, 2]"); - assertThat(result1).containsExactlyElementsOf(expected1); + assertThat(result1).containsExactlyInAnyOrderElementsOf(expected1); runAction(Lists.newArrayList("--table_conf", "scan.parallelism=2")); checkSnapshot(table); @@ -656,7 +656,7 @@ public void testClusterWithDeletionVector() throws Exception { "+I[3, 1]", "+I[3, 2]", "+I[3, 3]"); - assertThat(result1).containsExactlyElementsOf(expected1); + assertThat(result1).containsExactlyInAnyOrderElementsOf(expected1); // second cluster runAction(Collections.emptyList()); @@ -725,7 +725,7 @@ public void testClusterWithBucket() throws Exception { expected1.add(String.format("+I[2, 1, %s]", pt)); expected1.add(String.format("+I[2, 2, %s]", pt)); } - assertThat(result1).containsExactlyElementsOf(expected1); + assertThat(result1).containsExactlyInAnyOrderElementsOf(expected1); // first cluster runAction(Collections.emptyList()); @@ -747,7 +747,7 @@ public void testClusterWithBucket() throws Exception { expected2.add(String.format("+I[2, 1, %s]", pt)); expected2.add(String.format("+I[2, 2, %s]", pt)); } - assertThat(result2).containsExactlyElementsOf(expected2); + assertThat(result2).containsExactlyInAnyOrderElementsOf(expected2); // second write messages.clear(); @@ -790,7 +790,7 @@ public void testClusterWithBucket() throws Exception { expected3.add(String.format("+I[3, 2, %s]", pt)); expected3.add(String.format("+I[3, 3, %s]", pt)); } - assertThat(result3).containsExactlyElementsOf(expected3); + assertThat(result3).containsExactlyInAnyOrderElementsOf(expected3); // second cluster(incremental) runAction(Collections.emptyList()); @@ -820,7 +820,7 @@ public void testClusterWithBucket() throws Exception { assertThat(((DataSplit) splits.get(0)).dataFiles().size()).isEqualTo(2); assertThat(((DataSplit) splits.get(0)).dataFiles().get(0).level()).isEqualTo(5); assertThat(((DataSplit) splits.get(0)).dataFiles().get(1).level()).isEqualTo(4); - assertThat(result4).containsExactlyElementsOf(expected4); + assertThat(result4).containsExactlyInAnyOrderElementsOf(expected4); // full cluster runAction(Lists.newArrayList("--compact_strategy", "full")); @@ -849,7 +849,7 @@ public void testClusterWithBucket() throws Exception { assertThat(splits.size()).isEqualTo(2); assertThat(((DataSplit) splits.get(0)).dataFiles().size()).isEqualTo(1); assertThat(((DataSplit) splits.get(0)).dataFiles().get(0).level()).isEqualTo(5); - assertThat(result5).containsExactlyElementsOf(expected5); + assertThat(result5).containsExactlyInAnyOrderElementsOf(expected5); } @Test @@ -1038,7 +1038,7 @@ public void testLocalSortClusterUnpartitionedTable() throws Exception { readBuilder.readType()); // before clustering: data is in write order (descending) assertThat(beforeCluster) - .containsExactlyElementsOf( + .containsExactlyInAnyOrderElementsOf( Lists.newArrayList( "+I[2, 2]", "+I[2, 1]", @@ -1080,7 +1080,7 @@ public void testLocalSortClusterUnpartitionedTable() throws Exception { // verify internal order: within the single output file, rows must be // sorted ascending by (a, b) since parallelism=1 guarantees all data is in one task assertThat(afterCluster) - .containsExactlyElementsOf( + .containsExactlyInAnyOrderElementsOf( Lists.newArrayList( "+I[0, 0]", "+I[0, 1]", From 90ca021a9f1bfeb0fa9e44285187079ee10df26e Mon Sep 17 00:00:00 2001 From: "zhangyongxiang.alpha" Date: Wed, 20 May 2026 11:18:27 +0800 Subject: [PATCH 13/14] =?UTF-8?q?fix:=20remaining=20containsExactlyElement?= =?UTF-8?q?sOf=20=E2=86=92=20containsExactlyInAnyOrder=20in=20Spark=20test?= =?UTF-8?q?s?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit CompactProcedureTestBase and RescaleProcedureTest had additional containsExactlyElementsOf calls that were not updated in the original PR. All changed to containsExactlyInAnyOrderElementsOf. --- .../procedure/CompactProcedureTestBase.scala | 38 +++++++++---------- .../procedure/RescaleProcedureTest.scala | 18 ++++++--- 2 files changed, 31 insertions(+), 25 deletions(-) diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CompactProcedureTestBase.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CompactProcedureTestBase.scala index 337db569ee8c..1a9e1a334dd0 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CompactProcedureTestBase.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CompactProcedureTestBase.scala @@ -153,7 +153,7 @@ abstract class CompactProcedureTestBase extends PaimonSparkTestBase with StreamT result2.add(7, Row(2, 1)) result2.add(8, Row(2, 2)) - Assertions.assertThat(query().collect()).containsExactlyElementsOf(result2) + Assertions.assertThat(query().collect()).containsExactlyInAnyOrderElementsOf(result2) // test hilbert sort val result3 = new util.ArrayList[Row]() @@ -172,14 +172,14 @@ abstract class CompactProcedureTestBase extends PaimonSparkTestBase with StreamT "CALL paimon.sys.compact(table => 'T', order_strategy => 'hilbert', order_by => 'a,b')"), Row(true) :: Nil) - Assertions.assertThat(query().collect()).containsExactlyElementsOf(result3) + Assertions.assertThat(query().collect()).containsExactlyInAnyOrderElementsOf(result3) // test order sort checkAnswer( spark.sql( "CALL paimon.sys.compact(table => 'T', order_strategy => 'order', order_by => 'a,b')"), Row(true) :: Nil) - Assertions.assertThat(query().collect()).containsExactlyElementsOf(result) + Assertions.assertThat(query().collect()).containsExactlyInAnyOrderElementsOf(result) } finally { stream.stop() } @@ -338,7 +338,7 @@ abstract class CompactProcedureTestBase extends PaimonSparkTestBase with StreamT pt => Assertions .assertThat(spark.sql(s"SELECT id FROM T WHERE pt='$pt'").collect()) - .containsExactlyElementsOf(result) + .containsExactlyInAnyOrderElementsOf(result) } } } @@ -400,9 +400,9 @@ abstract class CompactProcedureTestBase extends PaimonSparkTestBase with StreamT result.add(Row(a, b)) } } - Assertions.assertThat(query().collect()).containsExactlyElementsOf(result) + Assertions.assertThat(query().collect()).containsExactlyInAnyOrderElementsOf(result) checkAnswer(spark.sql("CALL paimon.sys.compact(table => 'T')"), Row(true) :: Nil) - Assertions.assertThat(query().collect()).containsExactlyElementsOf(result) + Assertions.assertThat(query().collect()).containsExactlyInAnyOrderElementsOf(result) } finally { stream.stop() } @@ -857,7 +857,7 @@ abstract class CompactProcedureTestBase extends PaimonSparkTestBase with StreamT result.add(Row(a, b, randomStr)) } } - Assertions.assertThat(query().collect()).containsExactlyElementsOf(result) + Assertions.assertThat(query().collect()).containsExactlyInAnyOrderElementsOf(result) // first cluster, the outputLevel should be 5 checkAnswer(spark.sql("CALL paimon.sys.compact(table => 'T')"), Row(true) :: Nil) @@ -874,7 +874,7 @@ abstract class CompactProcedureTestBase extends PaimonSparkTestBase with StreamT result2.add(7, Row(2, 1, randomStr)) result2.add(8, Row(2, 2, randomStr)) - Assertions.assertThat(query().collect()).containsExactlyElementsOf(result2) + Assertions.assertThat(query().collect()).containsExactlyInAnyOrderElementsOf(result2) var clusteredTable = loadTable("T") checkSnapshot(clusteredTable) @@ -897,7 +897,7 @@ abstract class CompactProcedureTestBase extends PaimonSparkTestBase with StreamT result3.add(Row(3, b, null)) } - Assertions.assertThat(query().collect()).containsExactlyElementsOf(result3) + Assertions.assertThat(query().collect()).containsExactlyInAnyOrderElementsOf(result3) // second cluster, the outputLevel should be 4 checkAnswer(spark.sql("CALL paimon.sys.compact(table => 'T')"), Row(true) :: Nil) @@ -911,7 +911,7 @@ abstract class CompactProcedureTestBase extends PaimonSparkTestBase with StreamT result4.add(Row(2, 3, null)) result4.add(Row(3, 2, null)) result4.add(Row(3, 3, null)) - Assertions.assertThat(query().collect()).containsExactlyElementsOf(result4) + Assertions.assertThat(query().collect()).containsExactlyInAnyOrderElementsOf(result4) clusteredTable = loadTable("T") checkSnapshot(clusteredTable) @@ -942,7 +942,7 @@ abstract class CompactProcedureTestBase extends PaimonSparkTestBase with StreamT result5.add(Row(2, 3, null)) result5.add(Row(3, 2, null)) result5.add(Row(3, 3, null)) - Assertions.assertThat(query().collect()).containsExactlyElementsOf(result5) + Assertions.assertThat(query().collect()).containsExactlyInAnyOrderElementsOf(result5) clusteredTable = loadTable("T") checkSnapshot(clusteredTable) @@ -1011,7 +1011,7 @@ abstract class CompactProcedureTestBase extends PaimonSparkTestBase with StreamT } } } - Assertions.assertThat(query().collect()).containsExactlyElementsOf(result) + Assertions.assertThat(query().collect()).containsExactlyInAnyOrderElementsOf(result) // first cluster, the outputLevel should be 5 checkAnswer(spark.sql("CALL paimon.sys.compact(table => 'T')"), Row(true) :: Nil) @@ -1031,7 +1031,7 @@ abstract class CompactProcedureTestBase extends PaimonSparkTestBase with StreamT result2.add(Row(2, 2, c, pt)) } - Assertions.assertThat(query().collect()).containsExactlyElementsOf(result2) + Assertions.assertThat(query().collect()).containsExactlyInAnyOrderElementsOf(result2) var clusteredTable = loadTable("T") checkSnapshot(clusteredTable) @@ -1073,7 +1073,7 @@ abstract class CompactProcedureTestBase extends PaimonSparkTestBase with StreamT result3.add(Row(3, b, null, pt)) } } - Assertions.assertThat(query().collect()).containsExactlyElementsOf(result3) + Assertions.assertThat(query().collect()).containsExactlyInAnyOrderElementsOf(result3) // second cluster checkAnswer(spark.sql("CALL paimon.sys.compact(table => 'T')"), Row(true) :: Nil) @@ -1113,7 +1113,7 @@ abstract class CompactProcedureTestBase extends PaimonSparkTestBase with StreamT result4.add(Row(3, 2, null, 1)) result4.add(Row(3, 3, null, 1)) - Assertions.assertThat(query().collect()).containsExactlyElementsOf(result4) + Assertions.assertThat(query().collect()).containsExactlyInAnyOrderElementsOf(result4) clusteredTable = loadTable("T") checkSnapshot(clusteredTable) @@ -1219,7 +1219,7 @@ abstract class CompactProcedureTestBase extends PaimonSparkTestBase with StreamT result.add(Row(a, b, randomStr)) } } - Assertions.assertThat(query().collect()).containsExactlyElementsOf(result) + Assertions.assertThat(query().collect()).containsExactlyInAnyOrderElementsOf(result) // first cluster, the outputLevel should be 5 checkAnswer(spark.sql("CALL paimon.sys.compact(table => 'T')"), Row(true) :: Nil) @@ -1236,7 +1236,7 @@ abstract class CompactProcedureTestBase extends PaimonSparkTestBase with StreamT result2.add(7, Row(2, 1, randomStr)) result2.add(8, Row(2, 2, randomStr)) - Assertions.assertThat(query().collect()).containsExactlyElementsOf(result2) + Assertions.assertThat(query().collect()).containsExactlyInAnyOrderElementsOf(result2) var clusteredTable = loadTable("T") checkSnapshot(clusteredTable) @@ -1264,7 +1264,7 @@ abstract class CompactProcedureTestBase extends PaimonSparkTestBase with StreamT result3.add(Row(3, b, null)) } - Assertions.assertThat(query().collect()).containsExactlyElementsOf(result3) + Assertions.assertThat(query().collect()).containsExactlyInAnyOrderElementsOf(result3) // second cluster, the outputLevel should be 4. dv index for level-0 will be updated // and dv index for level-5 will be retained @@ -1278,7 +1278,7 @@ abstract class CompactProcedureTestBase extends PaimonSparkTestBase with StreamT result4.add(Row(2, 3, null)) result4.add(Row(3, 2, null)) result4.add(Row(3, 3, null)) - Assertions.assertThat(query().collect()).containsExactlyElementsOf(result4) + Assertions.assertThat(query().collect()).containsExactlyInAnyOrderElementsOf(result4) clusteredTable = loadTable("T") checkSnapshot(clusteredTable) diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/RescaleProcedureTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/RescaleProcedureTest.scala index 986b1f9b6ba7..b89f49cc3e75 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/RescaleProcedureTest.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/RescaleProcedureTest.scala @@ -56,7 +56,9 @@ class RescaleProcedureTest extends PaimonSparkTestBase { Assertions.assertThat(getBucketCount(reloadedTable)).isEqualTo(4) val afterData = spark.sql("SELECT * FROM T ORDER BY id").collect() - Assertions.assertThat(afterData).containsExactlyElementsOf(Arrays.asList(initialData: _*)) + Assertions + .assertThat(afterData) + .containsExactlyInAnyOrderElementsOf(Arrays.asList(initialData: _*)) // Rescale without bucket_num (use current bucket) spark.sql("ALTER TABLE T SET TBLPROPERTIES ('bucket' = '3')") @@ -80,7 +82,7 @@ class RescaleProcedureTest extends PaimonSparkTestBase { val afterDecreaseData = spark.sql("SELECT * FROM T ORDER BY id").collect() Assertions .assertThat(afterDecreaseData) - .containsExactlyElementsOf(Arrays.asList(initialData: _*)) + .containsExactlyInAnyOrderElementsOf(Arrays.asList(initialData: _*)) } } @@ -142,7 +144,9 @@ class RescaleProcedureTest extends PaimonSparkTestBase { Assertions.assertThat(lastSnapshotId(loadTable("T"))).isEqualTo(snapshotBeforeEmpty) val afterData = spark.sql("SELECT * FROM T ORDER BY id").collect() - Assertions.assertThat(afterData).containsExactlyElementsOf(Arrays.asList(initialData: _*)) + Assertions + .assertThat(afterData) + .containsExactlyInAnyOrderElementsOf(Arrays.asList(initialData: _*)) } } @@ -186,7 +190,9 @@ class RescaleProcedureTest extends PaimonSparkTestBase { // Verify data integrity val afterData = spark.sql("SELECT * FROM T ORDER BY id").collect() - Assertions.assertThat(afterData).containsExactlyElementsOf(Arrays.asList(initialData: _*)) + Assertions + .assertThat(afterData) + .containsExactlyInAnyOrderElementsOf(Arrays.asList(initialData: _*)) } } @@ -218,7 +224,7 @@ class RescaleProcedureTest extends PaimonSparkTestBase { val afterAlterData = spark.sql("SELECT * FROM T ORDER BY f0").collect() val initialDataList = Arrays.asList(initialData: _*) - Assertions.assertThat(afterAlterData).containsExactlyElementsOf(initialDataList) + Assertions.assertThat(afterAlterData).containsExactlyInAnyOrderElementsOf(initialDataList) val writeError = intercept[org.apache.spark.SparkException] { spark.sql("INSERT INTO T VALUES (6)") @@ -244,7 +250,7 @@ class RescaleProcedureTest extends PaimonSparkTestBase { Assertions.assertThat(finalBuckets).isEqualTo(4) val afterRescaleData = spark.sql("SELECT * FROM T ORDER BY f0").collect() - Assertions.assertThat(afterRescaleData).containsExactlyElementsOf(initialDataList) + Assertions.assertThat(afterRescaleData).containsExactlyInAnyOrderElementsOf(initialDataList) spark.sql("INSERT INTO T VALUES (6)") val finalData = spark.sql("SELECT * FROM T ORDER BY f0").collect() From bd11b86e21bd6a39c679e76dc9e5db92aa521aa7 Mon Sep 17 00:00:00 2001 From: "zhangyongxiang.alpha" Date: Wed, 20 May 2026 11:45:44 +0800 Subject: [PATCH 14/14] fix: add ORDER BY to tests sensitive to row ordering - LanceFormatTest: add ORDER BY a to LIMIT queries - SparkWriteITCase.testWriteWithDefaultValue: add ORDER BY a to all SELECT queries whose results are compared via toString() --- .../java/org/apache/paimon/spark/SparkWriteITCase.java | 8 ++++---- .../org/apache/paimon/spark/sql/LanceFormatTest.scala | 4 ++-- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkWriteITCase.java b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkWriteITCase.java index c47ea797e703..c8ff0d6aae0b 100644 --- a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkWriteITCase.java +++ b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkWriteITCase.java @@ -102,12 +102,12 @@ public void testWriteWithDefaultValue() { // test partial write spark.sql("INSERT INTO T (a) VALUES (1), (2)").collectAsList(); - List rows = spark.sql("SELECT * FROM T").collectAsList(); + List rows = spark.sql("SELECT * FROM T ORDER BY a").collectAsList(); assertThat(rows.toString()).isEqualTo("[[1,2,my_value], [2,2,my_value]]"); // test write with DEFAULT spark.sql("INSERT INTO T VALUES (3, DEFAULT, DEFAULT)").collectAsList(); - rows = spark.sql("SELECT * FROM T").collectAsList(); + rows = spark.sql("SELECT * FROM T ORDER BY a").collectAsList(); assertThat(rows.toString()).isEqualTo("[[1,2,my_value], [2,2,my_value], [3,2,my_value]]"); // test add column with DEFAULT not support @@ -118,14 +118,14 @@ public void testWriteWithDefaultValue() { // test alter type to default column spark.sql("ALTER TABLE T ALTER COLUMN b TYPE STRING").collectAsList(); spark.sql("INSERT INTO T (a) VALUES (4)").collectAsList(); - rows = spark.sql("SELECT * FROM T").collectAsList(); + rows = spark.sql("SELECT * FROM T ORDER BY a").collectAsList(); assertThat(rows.toString()) .isEqualTo("[[1,2,my_value], [2,2,my_value], [3,2,my_value], [4,2,my_value]]"); // test alter default column spark.sql("ALTER TABLE T ALTER COLUMN b SET DEFAULT '3'"); spark.sql("INSERT INTO T (a) VALUES (5)").collectAsList(); - rows = spark.sql("SELECT * FROM T").collectAsList(); + rows = spark.sql("SELECT * FROM T ORDER BY a").collectAsList(); assertThat(rows.toString()) .isEqualTo( "[[1,2,my_value], [2,2,my_value], [3,2,my_value], [4,2,my_value], [5,3,my_value]]"); diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/LanceFormatTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/LanceFormatTest.scala index 667c2c85b945..ad78bedc12bc 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/LanceFormatTest.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/LanceFormatTest.scala @@ -30,9 +30,9 @@ class LanceFormatTest extends PaimonSparkTestBase { "CREATE TABLE t (a INT, b STRING, scores ARRAY) TBLPROPERTIES ('file.format' = 'lance')") sql( "INSERT INTO t VALUES (1, 'a', ARRAY(CAST(90.5 as double), CAST(88.0 as double))), (2, 'b', ARRAY(CAST(90.6 as double), CAST(88.1 as double)))") - checkAnswer(sql("SELECT * FROM t LIMIT 1"), Seq(Row(1, "a", Array(90.5, 88.0)))) + checkAnswer(sql("SELECT * FROM t ORDER BY a LIMIT 1"), Seq(Row(1, "a", Array(90.5, 88.0)))) checkAnswer( - sql("SELECT * FROM t LIMIT 10"), + sql("SELECT * FROM t ORDER BY a LIMIT 10"), Seq(Row(1, "a", Array(90.5, 88.0)), Row(2, "b", Array(90.6, 88.1)))) assert(