diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java index 6944c0283fdd..2e044afd05bf 100644 --- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java +++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java @@ -21,6 +21,7 @@ import org.apache.paimon.CoreOptions.ExternalPathStrategy; import org.apache.paimon.catalog.RenamingSnapshotCommit; import org.apache.paimon.catalog.SnapshotCommit; +import org.apache.paimon.catalog.TableRollback; import org.apache.paimon.data.InternalRow; import org.apache.paimon.format.FileFormat; import org.apache.paimon.fs.FileIO; @@ -43,6 +44,7 @@ import org.apache.paimon.operation.PartitionExpire; import org.apache.paimon.operation.SnapshotDeletion; import org.apache.paimon.operation.TagDeletion; +import org.apache.paimon.operation.commit.CommitRollback; import org.apache.paimon.operation.commit.ConflictDetection; import org.apache.paimon.operation.commit.StrictModeChecker; import org.apache.paimon.partition.PartitionExpireStrategy; @@ -287,6 +289,11 @@ public FileStoreCommitImpl newCommit(String commitUser, FileStoreTable table) { commitUser, this::newScan, options.commitStrictModeLastSafeSnapshot().orElse(null)); + CommitRollback rollback = null; + TableRollback tableRollback = catalogEnvironment.catalogTableRollback(); + if (tableRollback != null) { + rollback = new CommitRollback(tableRollback); + } return new FileStoreCommitImpl( snapshotCommit, fileIO, @@ -319,7 +326,8 @@ public FileStoreCommitImpl newCommit(String commitUser, FileStoreTable table) { options.rowTrackingEnabled(), options.commitDiscardDuplicateFiles(), conflictDetection, - strictModeChecker); + strictModeChecker, + rollback); } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/TableRollback.java b/paimon-core/src/main/java/org/apache/paimon/catalog/TableRollback.java new file mode 100644 index 000000000000..5cb2a521924d --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/TableRollback.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.catalog; + +import org.apache.paimon.table.Instant; + +import javax.annotation.Nullable; + +/** Rollback table to instant from snapshot. */ +public interface TableRollback { + + void rollbackTo(Instant instant, @Nullable Long fromSnapshot); +} diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java index c4f433d3a80e..18b47b6ad558 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java @@ -44,6 +44,7 @@ import org.apache.paimon.operation.commit.CommitCleaner; import org.apache.paimon.operation.commit.CommitKindProvider; import org.apache.paimon.operation.commit.CommitResult; +import org.apache.paimon.operation.commit.CommitRollback; import org.apache.paimon.operation.commit.CommitScanner; import org.apache.paimon.operation.commit.ConflictDetection; import org.apache.paimon.operation.commit.ManifestEntryChanges; @@ -139,6 +140,7 @@ public class FileStoreCommitImpl implements FileStoreCommit { private final ManifestFile manifestFile; private final ManifestList manifestList; private final IndexManifestFile indexManifestFile; + @Nullable private final CommitRollback rollback; private final CommitScanner scanner; private final int numBucket; private final MemorySize manifestTargetSize; @@ -196,7 +198,8 @@ public FileStoreCommitImpl( boolean rowTrackingEnabled, boolean discardDuplicateFiles, ConflictDetection conflictDetection, - @Nullable StrictModeChecker strictModeChecker) { + @Nullable StrictModeChecker strictModeChecker, + @Nullable CommitRollback rollback) { this.snapshotCommit = snapshotCommit; this.fileIO = fileIO; this.schemaManager = schemaManager; @@ -210,6 +213,7 @@ public FileStoreCommitImpl( this.manifestFile = manifestFileFactory.create(); this.manifestList = manifestListFactory.create(); this.indexManifestFile = indexManifestFileFactory.create(); + this.rollback = rollback; this.scanner = new CommitScanner(scan, indexManifestFile, options); this.numBucket = numBucket; this.manifestTargetSize = manifestTargetSize; @@ -314,10 +318,13 @@ public int commit(ManifestCommittable committable, boolean checkAppendFiles) { if (appendCommitCheckConflict) { checkAppendFiles = true; } + + boolean allowRollback = false; if (containsFileDeletionOrDeletionVectors( appendSimpleEntries, changes.appendIndexFiles)) { commitKind = CommitKind.OVERWRITE; checkAppendFiles = true; + allowRollback = true; } attempts += @@ -330,6 +337,7 @@ public int commit(ManifestCommittable committable, boolean checkAppendFiles) { committable.watermark(), committable.properties(), CommitKindProvider.provider(commitKind), + allowRollback, checkAppendFiles, null); generatedSnapshot += 1; @@ -348,6 +356,7 @@ public int commit(ManifestCommittable committable, boolean checkAppendFiles) { committable.watermark(), committable.properties(), CommitKindProvider.provider(CommitKind.COMPACT), + false, true, null); generatedSnapshot += 1; @@ -513,6 +522,7 @@ public int overwritePartition( committable.watermark(), committable.properties(), CommitKindProvider.provider(CommitKind.COMPACT), + false, true, null); generatedSnapshot += 1; @@ -653,6 +663,7 @@ public void commitStatistics(Statistics stats, long commitIdentifier) { Collections.emptyMap(), CommitKindProvider.provider(CommitKind.ANALYZE), false, + false, statsFileName); } @@ -679,6 +690,7 @@ private int tryCommit( @Nullable Long watermark, Map properties, CommitKindProvider commitKindProvider, + boolean allowRollback, boolean detectConflicts, @Nullable String statsFileName) { int retryCount = 0; @@ -698,6 +710,7 @@ private int tryCommit( watermark, properties, commitKind, + allowRollback, latestSnapshot, detectConflicts, statsFileName); @@ -750,6 +763,7 @@ private int tryOverwritePartition( watermark, properties, commitKindProvider, + false, true, null); } @@ -764,6 +778,7 @@ CommitResult tryCommitOnce( @Nullable Long watermark, Map properties, CommitKind commitKind, + boolean allowRollback, @Nullable Snapshot latestSnapshot, boolean detectConflicts, @Nullable String newStatsFileName) { @@ -821,7 +836,9 @@ CommitResult tryCommitOnce( // latestSnapshotId is different from the snapshot id we've checked for conflicts, // so we have to check again List changedPartitions = changedPartitions(deltaFiles, indexFiles); - if (retryResult != null && retryResult.latestSnapshot != null) { + if (retryResult != null + && retryResult.latestSnapshot != null + && retryResult.baseDataFiles != null) { baseDataFiles = new ArrayList<>(retryResult.baseDataFiles); List incremental = scanner.readIncrementalChanges( @@ -845,12 +862,21 @@ CommitResult tryCommitOnce( .filter(entry -> !baseIdentifiers.contains(entry.identifier())) .collect(Collectors.toList()); } - conflictDetection.checkNoConflictsOrFail( - latestSnapshot, - baseDataFiles, - SimpleFileEntry.from(deltaFiles), - indexFiles, - commitKind); + Optional exception = + conflictDetection.checkConflicts( + latestSnapshot, + baseDataFiles, + SimpleFileEntry.from(deltaFiles), + indexFiles, + commitKind); + if (exception.isPresent()) { + if (allowRollback && rollback != null) { + if (rollback.tryToRollback(latestSnapshot)) { + return RetryCommitResult.ofEmpty(exception.get()); + } + } + throw exception.get(); + } } Snapshot newSnapshot; @@ -979,7 +1005,7 @@ CommitResult tryCommitOnce( } catch (Exception e) { // commit exception, not sure about the situation and should not clean up the files LOG.warn("Retry commit for exception.", e); - return new RetryCommitResult(latestSnapshot, baseDataFiles, e); + return RetryCommitResult.ofContext(latestSnapshot, baseDataFiles, e); } if (!success) { @@ -996,7 +1022,7 @@ CommitResult tryCommitOnce( commitTime); commitCleaner.cleanUpNoReuseTmpManifests( baseManifestList, mergeBeforeManifests, mergeAfterManifests); - return new RetryCommitResult(latestSnapshot, baseDataFiles, null); + return RetryCommitResult.ofContext(latestSnapshot, baseDataFiles, null); } LOG.info( diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/commit/CommitRollback.java b/paimon-core/src/main/java/org/apache/paimon/operation/commit/CommitRollback.java new file mode 100644 index 000000000000..683b6555a651 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/operation/commit/CommitRollback.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.operation.commit; + +import org.apache.paimon.Snapshot; +import org.apache.paimon.catalog.TableRollback; +import org.apache.paimon.table.Instant; + +/** Commit rollback to rollback 'COMPACT' commits for resolving conflicts. */ +public class CommitRollback { + + private final TableRollback tableRollback; + + public CommitRollback(TableRollback tableRollback) { + this.tableRollback = tableRollback; + } + + public boolean tryToRollback(Snapshot latestSnapshot) { + if (latestSnapshot.commitKind() == Snapshot.CommitKind.COMPACT) { + long latest = latestSnapshot.id(); + try { + tableRollback.rollbackTo(Instant.snapshot(latest - 1), latest); + return true; + } catch (Exception ignored) { + } + } + return false; + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/commit/ConflictDetection.java b/paimon-core/src/main/java/org/apache/paimon/operation/commit/ConflictDetection.java index 73dd5734f7a3..3adfbc277fbc 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/commit/ConflictDetection.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/commit/ConflictDetection.java @@ -51,6 +51,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.Set; import java.util.function.Function; import java.util.stream.Collectors; @@ -102,7 +103,7 @@ public void withPartitionExpire(PartitionExpire partitionExpire) { this.partitionExpire = partitionExpire; } - public void checkNoConflictsOrFail( + public Optional checkConflicts( Snapshot snapshot, List baseEntries, List deltaEntries, @@ -126,14 +127,20 @@ public void checkNoConflictsOrFail( deltaEntries = buildDeltaEntriesWithDV(baseEntries, deltaEntries, deltaIndexEntries); } catch (Throwable e) { - throw conflictException(commitUser, baseEntries, deltaEntries).apply(e); + return Optional.of( + conflictException(commitUser, baseEntries, deltaEntries).apply(e)); } } List allEntries = new ArrayList<>(baseEntries); allEntries.addAll(deltaEntries); - checkBucketKeepSame(baseEntries, deltaEntries, commitKind, allEntries, baseCommitUser); + Optional exception = + checkBucketKeepSame( + baseEntries, deltaEntries, commitKind, allEntries, baseCommitUser); + if (exception.isPresent()) { + return exception; + } Function conflictException = conflictException(baseCommitUser, baseEntries, deltaEntries); @@ -142,21 +149,24 @@ public void checkNoConflictsOrFail( // merge manifest entries and also check if the files we want to delete are still there mergedEntries = FileEntry.mergeEntries(allEntries); } catch (Throwable e) { - throw conflictException.apply(e); + return Optional.of(conflictException.apply(e)); } - checkNoDeleteInMergedEntries(mergedEntries, conflictException); - checkKeyRangeNoConflicts(baseEntries, deltaEntries, mergedEntries, baseCommitUser); + exception = checkDeleteInEntries(mergedEntries, conflictException); + if (exception.isPresent()) { + return exception; + } + return checkKeyRange(baseEntries, deltaEntries, mergedEntries, baseCommitUser); } - private void checkBucketKeepSame( + private Optional checkBucketKeepSame( List baseEntries, List deltaEntries, CommitKind commitKind, List allEntries, String baseCommitUser) { if (commitKind == CommitKind.OVERWRITE) { - return; + return Optional.empty(); } // total buckets within the same partition should remain the same @@ -190,18 +200,19 @@ private void checkBucketKeepSame( deltaEntries, null); LOG.warn("", conflictException.getLeft()); - throw conflictException.getRight(); + return Optional.of(conflictException.getRight()); } + return Optional.empty(); } - private void checkKeyRangeNoConflicts( + private Optional checkKeyRange( List baseEntries, List deltaEntries, Collection mergedEntries, String baseCommitUser) { // fast exit for file store without keys if (keyComparator == null) { - return; + return Optional.empty(); } // group entries by partitions, buckets and levels @@ -235,10 +246,11 @@ private void checkKeyRangeNoConflicts( null); LOG.warn("", conflictException.getLeft()); - throw conflictException.getRight(); + return Optional.of(conflictException.getRight()); } } } + return Optional.empty(); } private Function conflictException( @@ -262,7 +274,7 @@ private boolean checkForDeletionVector() { return deletionVectorsEnabled && bucketMode.equals(BucketMode.BUCKET_UNAWARE); } - private void checkNoDeleteInMergedEntries( + private Optional checkDeleteInEntries( Collection mergedEntries, Function exceptionFunction) { try { @@ -274,12 +286,17 @@ private void checkNoDeleteInMergedEntries( tableName); } } catch (Throwable e) { - assertConflictForPartitionExpire(mergedEntries); - throw exceptionFunction.apply(e); + Optional exception = assertConflictForPartitionExpire(mergedEntries); + if (exception.isPresent()) { + return exception; + } + return Optional.of(exceptionFunction.apply(e)); } + return Optional.empty(); } - private void assertConflictForPartitionExpire(Collection mergedEntries) { + private Optional assertConflictForPartitionExpire( + Collection mergedEntries) { if (partitionExpire != null && partitionExpire.isValueExpiration()) { Set deletedPartitions = new HashSet<>(); for (SimpleFileEntry entry : mergedEntries) { @@ -295,13 +312,15 @@ private void assertConflictForPartitionExpire(Collection merged partToSimpleString( partitionType, partition, "-", 200)) .collect(Collectors.toList()); - throw new RuntimeException( - "You are writing data to expired partitions, and you can filter this data to avoid job failover." - + " Otherwise, continuous expired records will cause the job to failover restart continuously." - + " Expired partitions are: " - + expiredPartitions); + return Optional.of( + new RuntimeException( + "You are writing data to expired partitions, and you can filter this data to avoid job failover." + + " Otherwise, continuous expired records will cause the job to failover restart continuously." + + " Expired partitions are: " + + expiredPartitions)); } } + return Optional.empty(); } static List buildBaseEntriesWithDV( diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/commit/RetryCommitResult.java b/paimon-core/src/main/java/org/apache/paimon/operation/commit/RetryCommitResult.java index e64049ea63c2..d07a4f9c1214 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/commit/RetryCommitResult.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/commit/RetryCommitResult.java @@ -21,22 +21,35 @@ import org.apache.paimon.Snapshot; import org.apache.paimon.manifest.SimpleFileEntry; +import javax.annotation.Nullable; + import java.util.List; /** Need to retry commit of {@link CommitResult}. */ public class RetryCommitResult implements CommitResult { - public final Snapshot latestSnapshot; - public final List baseDataFiles; + public final @Nullable Snapshot latestSnapshot; + public final @Nullable List baseDataFiles; public final Exception exception; - public RetryCommitResult( - Snapshot latestSnapshot, List baseDataFiles, Exception exception) { + private RetryCommitResult( + @Nullable Snapshot latestSnapshot, + @Nullable List baseDataFiles, + Exception exception) { this.latestSnapshot = latestSnapshot; this.baseDataFiles = baseDataFiles; this.exception = exception; } + public static RetryCommitResult ofContext( + Snapshot snapshot, List baseDataFiles, Exception exception) { + return new RetryCommitResult(snapshot, baseDataFiles, exception); + } + + public static RetryCommitResult ofEmpty(Exception exception) { + return new RetryCommitResult(null, null, exception); + } + @Override public boolean isSuccess() { return false; diff --git a/paimon-core/src/main/java/org/apache/paimon/table/CatalogEnvironment.java b/paimon-core/src/main/java/org/apache/paimon/table/CatalogEnvironment.java index 0f61f8fa0978..713115b322d2 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/CatalogEnvironment.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/CatalogEnvironment.java @@ -28,6 +28,7 @@ import org.apache.paimon.catalog.Identifier; import org.apache.paimon.catalog.RenamingSnapshotCommit; import org.apache.paimon.catalog.SnapshotCommit; +import org.apache.paimon.catalog.TableRollback; import org.apache.paimon.operation.Lock; import org.apache.paimon.table.source.TableQueryAuth; import org.apache.paimon.tag.SnapshotLoaderImpl; @@ -113,6 +114,21 @@ public SnapshotCommit snapshotCommit(SnapshotManager snapshotManager) { return snapshotCommit; } + @Nullable + public TableRollback catalogTableRollback() { + if (catalogLoader != null && supportsVersionManagement) { + Catalog catalog = catalogLoader.load(); + return (instant, fromSnapshot) -> { + try { + catalog.rollbackTo(identifier, instant, fromSnapshot); + } catch (Catalog.TableNotExistException e) { + throw new RuntimeException(e); + } + }; + } + return null; + } + @Nullable public SnapshotLoader snapshotLoader() { if (catalogLoader == null) { diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java index 69812952bfad..580662607941 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java @@ -929,6 +929,7 @@ private void cleanBucket(TestFileStore store, BinaryRow partition, int bucket) { null, Collections.emptyMap(), Snapshot.CommitKind.APPEND, + false, store.snapshotManager().latestSnapshot(), true, null); diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java index 75a8271ae16d..5b822015597f 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java @@ -1019,12 +1019,13 @@ public void testCommitTwiceWithDifferentKind() throws Exception { null, Collections.emptyMap(), Snapshot.CommitKind.APPEND, + false, firstLatest, true, null); // Compact commit.tryCommitOnce( - new RetryCommitResult(firstLatest, Collections.emptyList(), null), + RetryCommitResult.ofContext(firstLatest, Collections.emptyList(), null), Collections.emptyList(), Collections.emptyList(), Collections.emptyList(), @@ -1032,6 +1033,7 @@ public void testCommitTwiceWithDifferentKind() throws Exception { null, Collections.emptyMap(), Snapshot.CommitKind.COMPACT, + false, store.snapshotManager().latestSnapshot(), true, null); diff --git a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java index 288c925ceb23..e6123f132784 100644 --- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java @@ -22,6 +22,7 @@ import org.apache.paimon.PagedList; import org.apache.paimon.Snapshot; import org.apache.paimon.TableType; +import org.apache.paimon.append.AppendCompactTask; import org.apache.paimon.catalog.Catalog; import org.apache.paimon.catalog.CatalogContext; import org.apache.paimon.catalog.CatalogTestBase; @@ -37,6 +38,11 @@ import org.apache.paimon.function.Function; import org.apache.paimon.function.FunctionChange; import org.apache.paimon.function.FunctionDefinition; +import org.apache.paimon.io.CompactIncrement; +import org.apache.paimon.io.DataFileMeta; +import org.apache.paimon.io.DataIncrement; +import org.apache.paimon.operation.BaseAppendFileStoreWrite; +import org.apache.paimon.operation.FileStoreWrite; import org.apache.paimon.options.Options; import org.apache.paimon.partition.Partition; import org.apache.paimon.partition.PartitionStatistics; @@ -58,8 +64,10 @@ import org.apache.paimon.table.sink.BatchTableWrite; import org.apache.paimon.table.sink.BatchWriteBuilder; import org.apache.paimon.table.sink.CommitMessage; +import org.apache.paimon.table.sink.CommitMessageImpl; import org.apache.paimon.table.sink.StreamTableCommit; import org.apache.paimon.table.sink.StreamTableWrite; +import org.apache.paimon.table.sink.TableWriteImpl; import org.apache.paimon.table.source.ReadBuilder; import org.apache.paimon.table.source.Split; import org.apache.paimon.table.source.TableRead; @@ -98,6 +106,7 @@ import java.util.function.Consumer; import java.util.stream.Collectors; +import static java.util.Collections.emptyList; import static java.util.Collections.emptyMap; import static java.util.Collections.singletonList; import static java.util.Collections.singletonMap; @@ -107,6 +116,7 @@ import static org.apache.paimon.CoreOptions.TYPE; import static org.apache.paimon.TableType.OBJECT_TABLE; import static org.apache.paimon.catalog.Catalog.SYSTEM_DATABASE_NAME; +import static org.apache.paimon.data.BinaryRow.EMPTY_ROW; import static org.apache.paimon.rest.RESTApi.PAGE_TOKEN; import static org.apache.paimon.rest.RESTCatalogOptions.DLF_OSS_ENDPOINT; import static org.apache.paimon.rest.auth.DLFToken.TOKEN_DATE_FORMATTER; @@ -2969,6 +2979,109 @@ public void testReadSystemTablesWithExternalTable(@TempDir java.nio.file.Path pa DEFAULT_TABLE_SCHEMA.options().remove(CoreOptions.PATH.key()); } + @Test + public void testConflictRollback() throws Exception { + doTestConflictRollback(false); + } + + @Test + public void testConflictRollbackFail() throws Exception { + doTestConflictRollback(true); + } + + private void doTestConflictRollback(boolean insertMiddle) throws Exception { + Identifier identifier = + Identifier.create("test_conflict_rollback", "test_conflict_rollback"); + catalog.createDatabase(identifier.getDatabaseName(), true); + catalog.createTable( + identifier, + new Schema( + Lists.newArrayList(new DataField(0, "col1", DataTypes.INT())), + emptyList(), + emptyList(), + new HashMap<>(), + ""), + true); + Table table = catalog.getTable(identifier); + + // write 5 files + BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder(); + List files = new ArrayList<>(); + for (int i = 0; i < 5; i++) { + try (BatchTableWrite write = writeBuilder.newWrite(); + BatchTableCommit commit = writeBuilder.newCommit()) { + write.write(GenericRow.of(i)); + List commitMessages = write.prepareCommit(); + commit.commit(commitMessages); + DataFileMeta file = + ((CommitMessageImpl) commitMessages.get(0)) + .newFilesIncrement() + .newFiles() + .get(0); + files.add(file); + } + } + + // delete write + DataFileMeta file = files.get(0); + CommitMessageImpl deleteCommitMessage = + new CommitMessageImpl( + EMPTY_ROW, + 0, + -1, + new DataIncrement(emptyList(), singletonList(file), emptyList()), + new CompactIncrement(emptyList(), emptyList(), emptyList())); + + // compact write + CommitMessage compactCommitMessage; + try (BatchTableWrite write = writeBuilder.newWrite()) { + AppendCompactTask compactTask = new AppendCompactTask(EMPTY_ROW, files); + FileStoreWrite fileStoreWrite = ((TableWriteImpl) write).getWrite(); + compactCommitMessage = + compactTask.doCompact( + (FileStoreTable) table, (BaseAppendFileStoreWrite) fileStoreWrite); + } + + // do compact commit first + try (BatchTableCommit commit = writeBuilder.newCommit()) { + commit.commit(singletonList(compactCommitMessage)); + } + + if (insertMiddle) { + try (BatchTableWrite write = writeBuilder.newWrite(); + BatchTableCommit commit = writeBuilder.newCommit()) { + write.write(GenericRow.of(0)); + commit.commit(write.prepareCommit()); + } + } + + // do delete commit after + try (BatchTableCommit commit = writeBuilder.newCommit()) { + List messages = singletonList(deleteCommitMessage); + if (insertMiddle) { + assertThatThrownBy(() -> commit.commit(messages)) + .hasMessageContaining("File deletion conflicts detected"); + } else { + // should rollback compact commit + commit.commit(messages); + } + } + + // scan for rollback success + if (!insertMiddle) { + ReadBuilder readBuilder = table.newReadBuilder(); + List result = new ArrayList<>(); + readBuilder + .newRead() + .createReader(readBuilder.newScan().plan()) + .forEachRemaining(r -> result.add(r.getInt(0))); + assertThat(result).containsExactlyInAnyOrder(1, 2, 3, 4); + } + + // clear + catalog.dropDatabase(identifier.getDatabaseName(), false, true); + } + protected void createTable( Identifier identifier, Map options, List partitionKeys) throws Exception {