Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.paimon.CoreOptions.ExternalPathStrategy;
import org.apache.paimon.catalog.RenamingSnapshotCommit;
import org.apache.paimon.catalog.SnapshotCommit;
import org.apache.paimon.catalog.TableRollback;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.format.FileFormat;
import org.apache.paimon.fs.FileIO;
Expand All @@ -43,6 +44,7 @@
import org.apache.paimon.operation.PartitionExpire;
import org.apache.paimon.operation.SnapshotDeletion;
import org.apache.paimon.operation.TagDeletion;
import org.apache.paimon.operation.commit.CommitRollback;
import org.apache.paimon.operation.commit.ConflictDetection;
import org.apache.paimon.operation.commit.StrictModeChecker;
import org.apache.paimon.partition.PartitionExpireStrategy;
Expand Down Expand Up @@ -287,6 +289,11 @@ public FileStoreCommitImpl newCommit(String commitUser, FileStoreTable table) {
commitUser,
this::newScan,
options.commitStrictModeLastSafeSnapshot().orElse(null));
CommitRollback rollback = null;
TableRollback tableRollback = catalogEnvironment.catalogTableRollback();
if (tableRollback != null) {
rollback = new CommitRollback(tableRollback);
}
return new FileStoreCommitImpl(
snapshotCommit,
fileIO,
Expand Down Expand Up @@ -319,7 +326,8 @@ public FileStoreCommitImpl newCommit(String commitUser, FileStoreTable table) {
options.rowTrackingEnabled(),
options.commitDiscardDuplicateFiles(),
conflictDetection,
strictModeChecker);
strictModeChecker,
rollback);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.catalog;

import org.apache.paimon.table.Instant;

import javax.annotation.Nullable;

/** Rollback table to instant from snapshot. */
public interface TableRollback {

void rollbackTo(Instant instant, @Nullable Long fromSnapshot);
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.apache.paimon.operation.commit.CommitCleaner;
import org.apache.paimon.operation.commit.CommitKindProvider;
import org.apache.paimon.operation.commit.CommitResult;
import org.apache.paimon.operation.commit.CommitRollback;
import org.apache.paimon.operation.commit.CommitScanner;
import org.apache.paimon.operation.commit.ConflictDetection;
import org.apache.paimon.operation.commit.ManifestEntryChanges;
Expand Down Expand Up @@ -139,6 +140,7 @@ public class FileStoreCommitImpl implements FileStoreCommit {
private final ManifestFile manifestFile;
private final ManifestList manifestList;
private final IndexManifestFile indexManifestFile;
@Nullable private final CommitRollback rollback;
private final CommitScanner scanner;
private final int numBucket;
private final MemorySize manifestTargetSize;
Expand Down Expand Up @@ -196,7 +198,8 @@ public FileStoreCommitImpl(
boolean rowTrackingEnabled,
boolean discardDuplicateFiles,
ConflictDetection conflictDetection,
@Nullable StrictModeChecker strictModeChecker) {
@Nullable StrictModeChecker strictModeChecker,
@Nullable CommitRollback rollback) {
this.snapshotCommit = snapshotCommit;
this.fileIO = fileIO;
this.schemaManager = schemaManager;
Expand All @@ -210,6 +213,7 @@ public FileStoreCommitImpl(
this.manifestFile = manifestFileFactory.create();
this.manifestList = manifestListFactory.create();
this.indexManifestFile = indexManifestFileFactory.create();
this.rollback = rollback;
this.scanner = new CommitScanner(scan, indexManifestFile, options);
this.numBucket = numBucket;
this.manifestTargetSize = manifestTargetSize;
Expand Down Expand Up @@ -314,10 +318,13 @@ public int commit(ManifestCommittable committable, boolean checkAppendFiles) {
if (appendCommitCheckConflict) {
checkAppendFiles = true;
}

boolean allowRollback = false;
if (containsFileDeletionOrDeletionVectors(
appendSimpleEntries, changes.appendIndexFiles)) {
commitKind = CommitKind.OVERWRITE;
checkAppendFiles = true;
allowRollback = true;
}

attempts +=
Expand All @@ -330,6 +337,7 @@ public int commit(ManifestCommittable committable, boolean checkAppendFiles) {
committable.watermark(),
committable.properties(),
CommitKindProvider.provider(commitKind),
allowRollback,
checkAppendFiles,
null);
generatedSnapshot += 1;
Expand All @@ -348,6 +356,7 @@ public int commit(ManifestCommittable committable, boolean checkAppendFiles) {
committable.watermark(),
committable.properties(),
CommitKindProvider.provider(CommitKind.COMPACT),
false,
true,
null);
generatedSnapshot += 1;
Expand Down Expand Up @@ -513,6 +522,7 @@ public int overwritePartition(
committable.watermark(),
committable.properties(),
CommitKindProvider.provider(CommitKind.COMPACT),
false,
true,
null);
generatedSnapshot += 1;
Expand Down Expand Up @@ -653,6 +663,7 @@ public void commitStatistics(Statistics stats, long commitIdentifier) {
Collections.emptyMap(),
CommitKindProvider.provider(CommitKind.ANALYZE),
false,
false,
statsFileName);
}

Expand All @@ -679,6 +690,7 @@ private int tryCommit(
@Nullable Long watermark,
Map<String, String> properties,
CommitKindProvider commitKindProvider,
boolean allowRollback,
boolean detectConflicts,
@Nullable String statsFileName) {
int retryCount = 0;
Expand All @@ -698,6 +710,7 @@ private int tryCommit(
watermark,
properties,
commitKind,
allowRollback,
latestSnapshot,
detectConflicts,
statsFileName);
Expand Down Expand Up @@ -750,6 +763,7 @@ private int tryOverwritePartition(
watermark,
properties,
commitKindProvider,
false,
true,
null);
}
Expand All @@ -764,6 +778,7 @@ CommitResult tryCommitOnce(
@Nullable Long watermark,
Map<String, String> properties,
CommitKind commitKind,
boolean allowRollback,
@Nullable Snapshot latestSnapshot,
boolean detectConflicts,
@Nullable String newStatsFileName) {
Expand Down Expand Up @@ -821,7 +836,9 @@ CommitResult tryCommitOnce(
// latestSnapshotId is different from the snapshot id we've checked for conflicts,
// so we have to check again
List<BinaryRow> changedPartitions = changedPartitions(deltaFiles, indexFiles);
if (retryResult != null && retryResult.latestSnapshot != null) {
if (retryResult != null
&& retryResult.latestSnapshot != null
&& retryResult.baseDataFiles != null) {
baseDataFiles = new ArrayList<>(retryResult.baseDataFiles);
List<SimpleFileEntry> incremental =
scanner.readIncrementalChanges(
Expand All @@ -845,12 +862,21 @@ CommitResult tryCommitOnce(
.filter(entry -> !baseIdentifiers.contains(entry.identifier()))
.collect(Collectors.toList());
}
conflictDetection.checkNoConflictsOrFail(
latestSnapshot,
baseDataFiles,
SimpleFileEntry.from(deltaFiles),
indexFiles,
commitKind);
Optional<RuntimeException> exception =
conflictDetection.checkConflicts(
latestSnapshot,
baseDataFiles,
SimpleFileEntry.from(deltaFiles),
indexFiles,
commitKind);
if (exception.isPresent()) {
if (allowRollback && rollback != null) {
if (rollback.tryToRollback(latestSnapshot)) {
return RetryCommitResult.ofEmpty(exception.get());
}
}
throw exception.get();
}
}

Snapshot newSnapshot;
Expand Down Expand Up @@ -979,7 +1005,7 @@ CommitResult tryCommitOnce(
} catch (Exception e) {
// commit exception, not sure about the situation and should not clean up the files
LOG.warn("Retry commit for exception.", e);
return new RetryCommitResult(latestSnapshot, baseDataFiles, e);
return RetryCommitResult.ofContext(latestSnapshot, baseDataFiles, e);
}

if (!success) {
Expand All @@ -996,7 +1022,7 @@ CommitResult tryCommitOnce(
commitTime);
commitCleaner.cleanUpNoReuseTmpManifests(
baseManifestList, mergeBeforeManifests, mergeAfterManifests);
return new RetryCommitResult(latestSnapshot, baseDataFiles, null);
return RetryCommitResult.ofContext(latestSnapshot, baseDataFiles, null);
}

LOG.info(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.operation.commit;

import org.apache.paimon.Snapshot;
import org.apache.paimon.catalog.TableRollback;
import org.apache.paimon.table.Instant;

/** Commit rollback to rollback 'COMPACT' commits for resolving conflicts. */
public class CommitRollback {

private final TableRollback tableRollback;

public CommitRollback(TableRollback tableRollback) {
this.tableRollback = tableRollback;
}

public boolean tryToRollback(Snapshot latestSnapshot) {
if (latestSnapshot.commitKind() == Snapshot.CommitKind.COMPACT) {
long latest = latestSnapshot.id();
try {
tableRollback.rollbackTo(Instant.snapshot(latest - 1), latest);
return true;
} catch (Exception ignored) {
}
}
return false;
}
}
Loading