Skip to content

Commit 7d0346d

Browse files
authored
[core] Rollback 'COMPACT' commit for row-level operations (#6968)
1 parent 214dcc5 commit 7d0346d

File tree

10 files changed

+346
-46
lines changed

10 files changed

+346
-46
lines changed

paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.paimon.CoreOptions.ExternalPathStrategy;
2222
import org.apache.paimon.catalog.RenamingSnapshotCommit;
2323
import org.apache.paimon.catalog.SnapshotCommit;
24+
import org.apache.paimon.catalog.TableRollback;
2425
import org.apache.paimon.data.InternalRow;
2526
import org.apache.paimon.format.FileFormat;
2627
import org.apache.paimon.fs.FileIO;
@@ -43,6 +44,7 @@
4344
import org.apache.paimon.operation.PartitionExpire;
4445
import org.apache.paimon.operation.SnapshotDeletion;
4546
import org.apache.paimon.operation.TagDeletion;
47+
import org.apache.paimon.operation.commit.CommitRollback;
4648
import org.apache.paimon.operation.commit.ConflictDetection;
4749
import org.apache.paimon.operation.commit.StrictModeChecker;
4850
import org.apache.paimon.partition.PartitionExpireStrategy;
@@ -288,6 +290,11 @@ public FileStoreCommitImpl newCommit(String commitUser, FileStoreTable table) {
288290
commitUser,
289291
this::newScan,
290292
options.commitStrictModeLastSafeSnapshot().orElse(null));
293+
CommitRollback rollback = null;
294+
TableRollback tableRollback = catalogEnvironment.catalogTableRollback();
295+
if (tableRollback != null) {
296+
rollback = new CommitRollback(tableRollback);
297+
}
291298
return new FileStoreCommitImpl(
292299
snapshotCommit,
293300
fileIO,
@@ -320,7 +327,8 @@ public FileStoreCommitImpl newCommit(String commitUser, FileStoreTable table) {
320327
options.rowTrackingEnabled(),
321328
options.commitDiscardDuplicateFiles(),
322329
conflictDetection,
323-
strictModeChecker);
330+
strictModeChecker,
331+
rollback);
324332
}
325333

326334
@Override
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.catalog;
20+
21+
import org.apache.paimon.table.Instant;
22+
23+
import javax.annotation.Nullable;
24+
25+
/** Rollback table to instant from snapshot. */
26+
public interface TableRollback {
27+
28+
void rollbackTo(Instant instant, @Nullable Long fromSnapshot);
29+
}

paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java

Lines changed: 49 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -43,10 +43,12 @@
4343
import org.apache.paimon.operation.commit.CommitChangesProvider;
4444
import org.apache.paimon.operation.commit.CommitCleaner;
4545
import org.apache.paimon.operation.commit.CommitResult;
46+
import org.apache.paimon.operation.commit.CommitRollback;
4647
import org.apache.paimon.operation.commit.CommitScanner;
4748
import org.apache.paimon.operation.commit.ConflictDetection;
4849
import org.apache.paimon.operation.commit.ManifestEntryChanges;
4950
import org.apache.paimon.operation.commit.RetryCommitResult;
51+
import org.apache.paimon.operation.commit.RetryCommitResult.CommitFailRetryResult;
5052
import org.apache.paimon.operation.commit.RowTrackingCommitUtils.RowTrackingAssigned;
5153
import org.apache.paimon.operation.commit.StrictModeChecker;
5254
import org.apache.paimon.operation.commit.SuccessCommitResult;
@@ -138,6 +140,7 @@ public class FileStoreCommitImpl implements FileStoreCommit {
138140
private final ManifestFile manifestFile;
139141
private final ManifestList manifestList;
140142
private final IndexManifestFile indexManifestFile;
143+
@Nullable private final CommitRollback rollback;
141144
private final CommitScanner scanner;
142145
private final int numBucket;
143146
private final MemorySize manifestTargetSize;
@@ -195,7 +198,8 @@ public FileStoreCommitImpl(
195198
boolean rowTrackingEnabled,
196199
boolean discardDuplicateFiles,
197200
ConflictDetection conflictDetection,
198-
@Nullable StrictModeChecker strictModeChecker) {
201+
@Nullable StrictModeChecker strictModeChecker,
202+
@Nullable CommitRollback rollback) {
199203
this.snapshotCommit = snapshotCommit;
200204
this.fileIO = fileIO;
201205
this.schemaManager = schemaManager;
@@ -209,6 +213,7 @@ public FileStoreCommitImpl(
209213
this.manifestFile = manifestFileFactory.create();
210214
this.manifestList = manifestListFactory.create();
211215
this.indexManifestFile = indexManifestFileFactory.create();
216+
this.rollback = rollback;
212217
this.scanner = new CommitScanner(scan, indexManifestFile, options);
213218
this.numBucket = numBucket;
214219
this.manifestTargetSize = manifestTargetSize;
@@ -313,10 +318,13 @@ public int commit(ManifestCommittable committable, boolean checkAppendFiles) {
313318
if (appendCommitCheckConflict) {
314319
checkAppendFiles = true;
315320
}
321+
322+
boolean allowRollback = false;
316323
if (containsFileDeletionOrDeletionVectors(
317324
appendSimpleEntries, changes.appendIndexFiles)) {
318325
commitKind = CommitKind.OVERWRITE;
319326
checkAppendFiles = true;
327+
allowRollback = true;
320328
}
321329

322330
attempts +=
@@ -329,6 +337,7 @@ public int commit(ManifestCommittable committable, boolean checkAppendFiles) {
329337
committable.watermark(),
330338
committable.properties(),
331339
commitKind,
340+
allowRollback,
332341
checkAppendFiles,
333342
null);
334343
generatedSnapshot += 1;
@@ -347,6 +356,7 @@ public int commit(ManifestCommittable committable, boolean checkAppendFiles) {
347356
committable.watermark(),
348357
committable.properties(),
349358
CommitKind.COMPACT,
359+
false,
350360
true,
351361
null);
352362
generatedSnapshot += 1;
@@ -512,6 +522,7 @@ public int overwritePartition(
512522
committable.watermark(),
513523
committable.properties(),
514524
CommitKind.COMPACT,
525+
false,
515526
true,
516527
null);
517528
generatedSnapshot += 1;
@@ -652,6 +663,7 @@ public void commitStatistics(Statistics stats, long commitIdentifier) {
652663
Collections.emptyMap(),
653664
CommitKind.ANALYZE,
654665
false,
666+
false,
655667
statsFileName);
656668
}
657669

@@ -678,6 +690,7 @@ private int tryCommit(
678690
@Nullable Long watermark,
679691
Map<String, String> properties,
680692
CommitKind commitKind,
693+
boolean allowRollback,
681694
boolean detectConflicts,
682695
@Nullable String statsFileName) {
683696
int retryCount = 0;
@@ -696,6 +709,7 @@ private int tryCommit(
696709
watermark,
697710
properties,
698711
commitKind,
712+
allowRollback,
699713
latestSnapshot,
700714
detectConflicts,
701715
statsFileName);
@@ -742,6 +756,7 @@ private int tryOverwritePartition(
742756
watermark,
743757
properties,
744758
CommitKind.OVERWRITE,
759+
false,
745760
true,
746761
null);
747762
}
@@ -756,20 +771,23 @@ CommitResult tryCommitOnce(
756771
@Nullable Long watermark,
757772
Map<String, String> properties,
758773
CommitKind commitKind,
774+
boolean allowRollback,
759775
@Nullable Snapshot latestSnapshot,
760776
boolean detectConflicts,
761777
@Nullable String newStatsFileName) {
762778
long startMillis = System.currentTimeMillis();
763779

764780
// Check if the commit has been completed. At this point, there will be no more repeated
765781
// commits and just return success
766-
if (retryResult != null && latestSnapshot != null) {
782+
if (retryResult instanceof CommitFailRetryResult && latestSnapshot != null) {
783+
CommitFailRetryResult commitFailRetry = (CommitFailRetryResult) retryResult;
767784
Map<Long, Snapshot> snapshotCache = new HashMap<>();
768785
snapshotCache.put(latestSnapshot.id(), latestSnapshot);
769786
long startCheckSnapshot = Snapshot.FIRST_SNAPSHOT_ID;
770-
if (retryResult.latestSnapshot != null) {
771-
snapshotCache.put(retryResult.latestSnapshot.id(), retryResult.latestSnapshot);
772-
startCheckSnapshot = retryResult.latestSnapshot.id() + 1;
787+
if (commitFailRetry.latestSnapshot != null) {
788+
snapshotCache.put(
789+
commitFailRetry.latestSnapshot.id(), commitFailRetry.latestSnapshot);
790+
startCheckSnapshot = commitFailRetry.latestSnapshot.id() + 1;
773791
}
774792
for (long i = startCheckSnapshot; i <= latestSnapshot.id(); i++) {
775793
Snapshot snapshot = snapshotCache.computeIfAbsent(i, snapshotManager::snapshot);
@@ -813,11 +831,17 @@ CommitResult tryCommitOnce(
813831
// latestSnapshotId is different from the snapshot id we've checked for conflicts,
814832
// so we have to check again
815833
List<BinaryRow> changedPartitions = changedPartitions(deltaFiles, indexFiles);
816-
if (retryResult != null && retryResult.latestSnapshot != null) {
817-
baseDataFiles = new ArrayList<>(retryResult.baseDataFiles);
834+
CommitFailRetryResult commitFailRetry =
835+
retryResult instanceof CommitFailRetryResult
836+
? (CommitFailRetryResult) retryResult
837+
: null;
838+
if (commitFailRetry != null
839+
&& commitFailRetry.latestSnapshot != null
840+
&& commitFailRetry.baseDataFiles != null) {
841+
baseDataFiles = new ArrayList<>(commitFailRetry.baseDataFiles);
818842
List<SimpleFileEntry> incremental =
819843
scanner.readIncrementalChanges(
820-
retryResult.latestSnapshot, latestSnapshot, changedPartitions);
844+
commitFailRetry.latestSnapshot, latestSnapshot, changedPartitions);
821845
if (!incremental.isEmpty()) {
822846
baseDataFiles.addAll(incremental);
823847
baseDataFiles = new ArrayList<>(FileEntry.mergeEntries(baseDataFiles));
@@ -837,12 +861,21 @@ CommitResult tryCommitOnce(
837861
.filter(entry -> !baseIdentifiers.contains(entry.identifier()))
838862
.collect(Collectors.toList());
839863
}
840-
conflictDetection.checkNoConflictsOrFail(
841-
latestSnapshot,
842-
baseDataFiles,
843-
SimpleFileEntry.from(deltaFiles),
844-
indexFiles,
845-
commitKind);
864+
Optional<RuntimeException> exception =
865+
conflictDetection.checkConflicts(
866+
latestSnapshot,
867+
baseDataFiles,
868+
SimpleFileEntry.from(deltaFiles),
869+
indexFiles,
870+
commitKind);
871+
if (exception.isPresent()) {
872+
if (allowRollback && rollback != null) {
873+
if (rollback.tryToRollback(latestSnapshot)) {
874+
return RetryCommitResult.forRollback(exception.get());
875+
}
876+
}
877+
throw exception.get();
878+
}
846879
}
847880

848881
Snapshot newSnapshot;
@@ -971,7 +1004,7 @@ CommitResult tryCommitOnce(
9711004
} catch (Exception e) {
9721005
// commit exception, not sure about the situation and should not clean up the files
9731006
LOG.warn("Retry commit for exception.", e);
974-
return new RetryCommitResult(latestSnapshot, baseDataFiles, e);
1007+
return RetryCommitResult.forCommitFail(latestSnapshot, baseDataFiles, e);
9751008
}
9761009

9771010
if (!success) {
@@ -988,7 +1021,7 @@ CommitResult tryCommitOnce(
9881021
commitTime);
9891022
commitCleaner.cleanUpNoReuseTmpManifests(
9901023
baseManifestList, mergeBeforeManifests, mergeAfterManifests);
991-
return new RetryCommitResult(latestSnapshot, baseDataFiles, null);
1024+
return RetryCommitResult.forCommitFail(latestSnapshot, baseDataFiles, null);
9921025
}
9931026

9941027
LOG.info(
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.operation.commit;
20+
21+
import org.apache.paimon.Snapshot;
22+
import org.apache.paimon.catalog.TableRollback;
23+
import org.apache.paimon.table.Instant;
24+
25+
/** Commit rollback to rollback 'COMPACT' commits for resolving conflicts. */
26+
public class CommitRollback {
27+
28+
private final TableRollback tableRollback;
29+
30+
public CommitRollback(TableRollback tableRollback) {
31+
this.tableRollback = tableRollback;
32+
}
33+
34+
public boolean tryToRollback(Snapshot latestSnapshot) {
35+
if (latestSnapshot.commitKind() == Snapshot.CommitKind.COMPACT) {
36+
long latest = latestSnapshot.id();
37+
try {
38+
tableRollback.rollbackTo(Instant.snapshot(latest - 1), latest);
39+
return true;
40+
} catch (Exception ignored) {
41+
}
42+
}
43+
return false;
44+
}
45+
}

0 commit comments

Comments
 (0)