Skip to content

Commit 61cd250

Browse files
committed
fix
1 parent 018d512 commit 61cd250

File tree

4 files changed

+54
-26
lines changed

4 files changed

+54
-26
lines changed

paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java

Lines changed: 19 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import org.apache.paimon.operation.commit.ConflictDetection;
5050
import org.apache.paimon.operation.commit.ManifestEntryChanges;
5151
import org.apache.paimon.operation.commit.RetryCommitResult;
52+
import org.apache.paimon.operation.commit.RetryCommitResult.CommitFailRetryResult;
5253
import org.apache.paimon.operation.commit.RowTrackingCommitUtils.RowTrackingAssigned;
5354
import org.apache.paimon.operation.commit.StrictModeChecker;
5455
import org.apache.paimon.operation.commit.SuccessCommitResult;
@@ -786,13 +787,15 @@ CommitResult tryCommitOnce(
786787

787788
// Check if the commit has been completed. At this point, there will be no more repeated
788789
// commits and just return success
789-
if (retryResult != null && latestSnapshot != null) {
790+
if (retryResult instanceof CommitFailRetryResult && latestSnapshot != null) {
791+
CommitFailRetryResult commitFailRetry = (CommitFailRetryResult) retryResult;
790792
Map<Long, Snapshot> snapshotCache = new HashMap<>();
791793
snapshotCache.put(latestSnapshot.id(), latestSnapshot);
792794
long startCheckSnapshot = Snapshot.FIRST_SNAPSHOT_ID;
793-
if (retryResult.latestSnapshot != null) {
794-
snapshotCache.put(retryResult.latestSnapshot.id(), retryResult.latestSnapshot);
795-
startCheckSnapshot = retryResult.latestSnapshot.id() + 1;
795+
if (commitFailRetry.latestSnapshot != null) {
796+
snapshotCache.put(
797+
commitFailRetry.latestSnapshot.id(), commitFailRetry.latestSnapshot);
798+
startCheckSnapshot = commitFailRetry.latestSnapshot.id() + 1;
796799
}
797800
for (long i = startCheckSnapshot; i <= latestSnapshot.id(); i++) {
798801
Snapshot snapshot = snapshotCache.computeIfAbsent(i, snapshotManager::snapshot);
@@ -836,13 +839,17 @@ CommitResult tryCommitOnce(
836839
// latestSnapshotId is different from the snapshot id we've checked for conflicts,
837840
// so we have to check again
838841
List<BinaryRow> changedPartitions = changedPartitions(deltaFiles, indexFiles);
839-
if (retryResult != null
840-
&& retryResult.latestSnapshot != null
841-
&& retryResult.baseDataFiles != null) {
842-
baseDataFiles = new ArrayList<>(retryResult.baseDataFiles);
842+
CommitFailRetryResult commitFailRetry =
843+
retryResult instanceof CommitFailRetryResult
844+
? (CommitFailRetryResult) retryResult
845+
: null;
846+
if (commitFailRetry != null
847+
&& commitFailRetry.latestSnapshot != null
848+
&& commitFailRetry.baseDataFiles != null) {
849+
baseDataFiles = new ArrayList<>(commitFailRetry.baseDataFiles);
843850
List<SimpleFileEntry> incremental =
844851
scanner.readIncrementalChanges(
845-
retryResult.latestSnapshot, latestSnapshot, changedPartitions);
852+
commitFailRetry.latestSnapshot, latestSnapshot, changedPartitions);
846853
if (!incremental.isEmpty()) {
847854
baseDataFiles.addAll(incremental);
848855
baseDataFiles = new ArrayList<>(FileEntry.mergeEntries(baseDataFiles));
@@ -872,7 +879,7 @@ CommitResult tryCommitOnce(
872879
if (exception.isPresent()) {
873880
if (allowRollback && rollback != null) {
874881
if (rollback.tryToRollback(latestSnapshot)) {
875-
return RetryCommitResult.ofEmpty(exception.get());
882+
return RetryCommitResult.forRollback(exception.get());
876883
}
877884
}
878885
throw exception.get();
@@ -1005,7 +1012,7 @@ CommitResult tryCommitOnce(
10051012
} catch (Exception e) {
10061013
// commit exception, not sure about the situation and should not clean up the files
10071014
LOG.warn("Retry commit for exception.", e);
1008-
return RetryCommitResult.ofContext(latestSnapshot, baseDataFiles, e);
1015+
return RetryCommitResult.forCommitFail(latestSnapshot, baseDataFiles, e);
10091016
}
10101017

10111018
if (!success) {
@@ -1022,7 +1029,7 @@ CommitResult tryCommitOnce(
10221029
commitTime);
10231030
commitCleaner.cleanUpNoReuseTmpManifests(
10241031
baseManifestList, mergeBeforeManifests, mergeAfterManifests);
1025-
return RetryCommitResult.ofContext(latestSnapshot, baseDataFiles, null);
1032+
return RetryCommitResult.forCommitFail(latestSnapshot, baseDataFiles, null);
10261033
}
10271034

10281035
LOG.info(

paimon-core/src/main/java/org/apache/paimon/operation/commit/RetryCommitResult.java

Lines changed: 30 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -26,32 +26,49 @@
2626
import java.util.List;
2727

2828
/** Need to retry commit of {@link CommitResult}. */
29-
public class RetryCommitResult implements CommitResult {
29+
public abstract class RetryCommitResult implements CommitResult {
3030

31-
public final @Nullable Snapshot latestSnapshot;
32-
public final @Nullable List<SimpleFileEntry> baseDataFiles;
3331
public final Exception exception;
3432

35-
private RetryCommitResult(
36-
@Nullable Snapshot latestSnapshot,
37-
@Nullable List<SimpleFileEntry> baseDataFiles,
38-
Exception exception) {
39-
this.latestSnapshot = latestSnapshot;
40-
this.baseDataFiles = baseDataFiles;
33+
private RetryCommitResult(Exception exception) {
4134
this.exception = exception;
4235
}
4336

44-
public static RetryCommitResult ofContext(
37+
public static RetryCommitResult forCommitFail(
4538
Snapshot snapshot, List<SimpleFileEntry> baseDataFiles, Exception exception) {
46-
return new RetryCommitResult(snapshot, baseDataFiles, exception);
39+
return new CommitFailRetryResult(snapshot, baseDataFiles, exception);
4740
}
4841

49-
public static RetryCommitResult ofEmpty(Exception exception) {
50-
return new RetryCommitResult(null, null, exception);
42+
public static RetryCommitResult forRollback(Exception exception) {
43+
return new RollbackRetryResult(exception);
5144
}
5245

5346
@Override
5447
public boolean isSuccess() {
5548
return false;
5649
}
50+
51+
/** Retry result for commit failing. */
52+
public static class CommitFailRetryResult extends RetryCommitResult {
53+
54+
public final @Nullable Snapshot latestSnapshot;
55+
public final @Nullable List<SimpleFileEntry> baseDataFiles;
56+
57+
private CommitFailRetryResult(
58+
@Nullable Snapshot latestSnapshot,
59+
@Nullable List<SimpleFileEntry> baseDataFiles,
60+
Exception exception) {
61+
super(exception);
62+
this.latestSnapshot = latestSnapshot;
63+
this.baseDataFiles = baseDataFiles;
64+
}
65+
}
66+
67+
/** Retry result for rollback. */
68+
public static class RollbackRetryResult extends RetryCommitResult {
69+
70+
private RollbackRetryResult(Exception exception) {
71+
super(exception);
72+
}
73+
}
5774
}

paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1025,7 +1025,7 @@ public void testCommitTwiceWithDifferentKind() throws Exception {
10251025
null);
10261026
// Compact
10271027
commit.tryCommitOnce(
1028-
RetryCommitResult.ofContext(firstLatest, Collections.emptyList(), null),
1028+
RetryCommitResult.forCommitFail(firstLatest, Collections.emptyList(), null),
10291029
Collections.emptyList(),
10301030
Collections.emptyList(),
10311031
Collections.emptyList(),

paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3056,6 +3056,10 @@ private void doTestConflictRollback(boolean insertMiddle) throws Exception {
30563056
}
30573057

30583058
// do delete commit after
3059+
// expire snapshots first
3060+
SnapshotManager snapshotManager = ((FileStoreTable) table).snapshotManager();
3061+
snapshotManager.deleteSnapshot(1);
3062+
snapshotManager.deleteSnapshot(2);
30593063
try (BatchTableCommit commit = writeBuilder.newCommit()) {
30603064
List<CommitMessage> messages = singletonList(deleteCommitMessage);
30613065
if (insertMiddle) {

0 commit comments

Comments
 (0)