Skip to content

Commit cd96a4d

Browse files
authored
[core] Introduce commit strict mode to avoid data loss in rescale job (apache#5727)
1 parent b13100d commit cd96a4d

File tree

6 files changed

+141
-5
lines changed

6 files changed

+141
-5
lines changed

docs/layouts/shortcodes/generated/core_configuration.html

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,12 @@
158158
<td>Integer</td>
159159
<td>Maximum number of retries when commit failed.</td>
160160
</tr>
161+
<tr>
162+
<td><h5>commit.strict-mode.last-safe-snapshot</h5></td>
163+
<td style="word-wrap: break-word;">(none)</td>
164+
<td>Long</td>
165+
<td>If set, committer will check if there are other commit user's COMPACT / OVERWRITE snapshot, starting from the snapshot after this one. If found, commit will be aborted. If the value of this option is -1, committer will not check for its first commit.</td>
166+
</tr>
161167
<tr>
162168
<td><h5>commit.timeout</h5></td>
163169
<td style="word-wrap: break-word;">(none)</td>

paimon-api/src/main/java/org/apache/paimon/CoreOptions.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1744,6 +1744,15 @@ public class CoreOptions implements Serializable {
17441744
"If true, it disables explicit type casting. For ex: it disables converting LONG type to INT type. "
17451745
+ "Users can enable this option to disable explicit type casting");
17461746

1747+
public static final ConfigOption<Long> COMMIT_STRICT_MODE_LAST_SAFE_SNAPSHOT =
1748+
ConfigOptions.key("commit.strict-mode.last-safe-snapshot")
1749+
.longType()
1750+
.noDefaultValue()
1751+
.withDescription(
1752+
"If set, committer will check if there are other commit user's COMPACT / OVERWRITE snapshot, "
1753+
+ "starting from the snapshot after this one. If found, commit will be aborted. "
1754+
+ "If the value of this option is -1, committer will not check for its first commit.");
1755+
17471756
private final Options options;
17481757

17491758
public CoreOptions(Map<String, String> options) {
@@ -2717,6 +2726,10 @@ public boolean aggregationRemoveRecordOnDelete() {
27172726
return options.get(AGGREGATION_REMOVE_RECORD_ON_DELETE);
27182727
}
27192728

2729+
public Optional<Long> commitStrictModeLastSafeSnapshot() {
2730+
return options.getOptional(COMMIT_STRICT_MODE_LAST_SAFE_SNAPSHOT);
2731+
}
2732+
27202733
/** Specifies the merge engine for table with primary key. */
27212734
public enum MergeEngine implements DescribedEnum {
27222735
DEDUPLICATE("deduplicate", "De-duplicate and keep the last row."),

paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -321,7 +321,8 @@ public FileStoreCommitImpl newCommit(String commitUser, FileStoreTable table) {
321321
options.scanManifestParallelism(),
322322
createCommitCallbacks(commitUser, table),
323323
options.commitMaxRetries(),
324-
options.commitTimeout());
324+
options.commitTimeout(),
325+
options.commitStrictModeLastSafeSnapshot().orElse(null));
325326
}
326327

327328
@Override

paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,7 @@ public class FileStoreCommitImpl implements FileStoreCommit {
141141
private final BucketMode bucketMode;
142142
private final long commitTimeout;
143143
private final int commitMaxRetries;
144+
@Nullable private Long strictModeLastSafeSnapshot;
144145
private final InternalRowPartitionComputer partitionComputer;
145146

146147
private boolean ignoreEmptyCommit;
@@ -174,7 +175,8 @@ public FileStoreCommitImpl(
174175
@Nullable Integer manifestReadParallelism,
175176
List<CommitCallback> commitCallbacks,
176177
int commitMaxRetries,
177-
long commitTimeout) {
178+
long commitTimeout,
179+
@Nullable Long strictModeLastSafeSnapshot) {
178180
this.snapshotCommit = snapshotCommit;
179181
this.fileIO = fileIO;
180182
this.schemaManager = schemaManager;
@@ -203,6 +205,7 @@ public FileStoreCommitImpl(
203205
this.commitCallbacks = commitCallbacks;
204206
this.commitMaxRetries = commitMaxRetries;
205207
this.commitTimeout = commitTimeout;
208+
this.strictModeLastSafeSnapshot = strictModeLastSafeSnapshot;
206209
this.partitionComputer =
207210
new InternalRowPartitionComputer(
208211
options.partitionDefaultName(),
@@ -894,6 +897,28 @@ CommitResult tryCommitOnce(
894897
long newSnapshotId =
895898
latestSnapshot == null ? Snapshot.FIRST_SNAPSHOT_ID : latestSnapshot.id() + 1;
896899

900+
if (strictModeLastSafeSnapshot != null && strictModeLastSafeSnapshot >= 0) {
901+
for (long id = strictModeLastSafeSnapshot + 1; id < newSnapshotId; id++) {
902+
Snapshot snapshot = snapshotManager.snapshot(id);
903+
if ((snapshot.commitKind() == Snapshot.CommitKind.COMPACT
904+
|| snapshot.commitKind() == Snapshot.CommitKind.OVERWRITE)
905+
&& !snapshot.commitUser().equals(commitUser)) {
906+
throw new RuntimeException(
907+
String.format(
908+
"When trying to commit snapshot %d, "
909+
+ "commit user %s has found a %s snapshot (id: %d) by another user %s. "
910+
+ "Giving up committing as %s is set.",
911+
newSnapshotId,
912+
commitUser,
913+
snapshot.commitKind().name(),
914+
id,
915+
snapshot.commitUser(),
916+
CoreOptions.COMMIT_STRICT_MODE_LAST_SAFE_SNAPSHOT.key()));
917+
}
918+
}
919+
strictModeLastSafeSnapshot = newSnapshotId - 1;
920+
}
921+
897922
if (LOG.isDebugEnabled()) {
898923
LOG.debug("Ready to commit table files to snapshot {}", newSnapshotId);
899924
for (ManifestEntry entry : deltaFiles) {
@@ -1086,6 +1111,9 @@ CommitResult tryCommitOnce(
10861111
commitUser,
10871112
identifier,
10881113
commitKind.name());
1114+
if (strictModeLastSafeSnapshot != null) {
1115+
strictModeLastSafeSnapshot = newSnapshot.id();
1116+
}
10891117
commitCallbacks.forEach(callback -> callback.call(deltaFiles, indexFiles, newSnapshot));
10901118
return new SuccessResult();
10911119
}

paimon-core/src/test/java/org/apache/paimon/table/sink/TableCommitTest.java

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -313,4 +313,72 @@ public void testGiveUpCommitWhenTotalBucketsChanged() throws Exception {
313313
.hasMessageContaining("changed from 1 to 2 without overwrite");
314314
}
315315
}
316+
317+
@Test
318+
public void testStrictMode() throws Exception {
319+
String path = tempDir.toString();
320+
RowType rowType =
321+
RowType.of(
322+
new DataType[] {DataTypes.INT(), DataTypes.BIGINT()},
323+
new String[] {"k", "v"});
324+
325+
Options options = new Options();
326+
options.set(CoreOptions.PATH, path);
327+
options.set(CoreOptions.BUCKET, 1);
328+
options.set(CoreOptions.NUM_SORTED_RUNS_COMPACTION_TRIGGER, 10);
329+
TableSchema tableSchema =
330+
SchemaUtils.forceCommit(
331+
new SchemaManager(LocalFileIO.create(), new Path(path)),
332+
new Schema(
333+
rowType.getFields(),
334+
Collections.emptyList(),
335+
Collections.singletonList("k"),
336+
options.toMap(),
337+
""));
338+
FileStoreTable table =
339+
FileStoreTableFactory.create(
340+
LocalFileIO.create(),
341+
new Path(path),
342+
tableSchema,
343+
CatalogEnvironment.empty());
344+
String user1 = UUID.randomUUID().toString();
345+
TableWriteImpl<?> write1 = table.newWrite(user1);
346+
TableCommitImpl commit1 = table.newCommit(user1);
347+
348+
Map<String, String> newOptions = new HashMap<>();
349+
newOptions.put(CoreOptions.COMMIT_STRICT_MODE_LAST_SAFE_SNAPSHOT.key(), "-1");
350+
table = table.copy(newOptions);
351+
String user2 = UUID.randomUUID().toString();
352+
TableWriteImpl<?> write2 = table.newWrite(user2);
353+
TableCommitImpl commit2 = table.newCommit(user2);
354+
355+
// by default, first commit is not checked
356+
357+
write1.write(GenericRow.of(0, 0L));
358+
write1.compact(BinaryRow.EMPTY_ROW, 0, true);
359+
commit1.commit(1, write1.prepareCommit(true, 1));
360+
361+
write2.write(GenericRow.of(1, 1L));
362+
commit2.commit(1, write2.prepareCommit(false, 1));
363+
364+
// APPEND commit is ignored
365+
366+
write1.write(GenericRow.of(2, 2L));
367+
commit1.commit(2, write1.prepareCommit(false, 2));
368+
369+
write2.write(GenericRow.of(3, 3L));
370+
commit2.commit(2, write2.prepareCommit(false, 2));
371+
372+
// COMPACT commit should be checked
373+
374+
write1.write(GenericRow.of(4, 4L));
375+
write1.compact(BinaryRow.EMPTY_ROW, 0, true);
376+
commit1.commit(3, write1.prepareCommit(true, 3));
377+
378+
write2.write(GenericRow.of(5, 5L));
379+
assertThatThrownBy(() -> commit2.commit(3, write2.prepareCommit(false, 3)))
380+
.isInstanceOf(RuntimeException.class)
381+
.hasMessageContaining(
382+
"Giving up committing as commit.strict-mode.last-safe-snapshot is set.");
383+
}
316384
}

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RescaleAction.java

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.apache.paimon.flink.action;
2020

2121
import org.apache.paimon.CoreOptions;
22+
import org.apache.paimon.Snapshot;
2223
import org.apache.paimon.flink.sink.FlinkSinkBuilder;
2324
import org.apache.paimon.flink.source.FlinkSourceBuilder;
2425
import org.apache.paimon.manifest.ManifestEntry;
@@ -41,6 +42,7 @@
4142
import java.util.HashMap;
4243
import java.util.Iterator;
4344
import java.util.Map;
45+
import java.util.Optional;
4446

4547
/** Action to rescale one partition of a table. */
4648
public class RescaleAction extends TableActionBase {
@@ -81,6 +83,21 @@ public void build() throws Exception {
8183
env.configure(flinkConf);
8284

8385
FileStoreTable fileStoreTable = (FileStoreTable) table;
86+
Optional<Snapshot> optionalSnapshot = fileStoreTable.latestSnapshot();
87+
if (!optionalSnapshot.isPresent()) {
88+
throw new IllegalArgumentException(
89+
"Table " + table.fullName() + " has no snapshot. No need to rescale.");
90+
}
91+
Snapshot snapshot = optionalSnapshot.get();
92+
93+
// If someone commits while the rescale job is running, this commit will be lost.
94+
// So we use strict mode to make sure nothing is lost.
95+
Map<String, String> dynamicOptions = new HashMap<>();
96+
dynamicOptions.put(
97+
CoreOptions.COMMIT_STRICT_MODE_LAST_SAFE_SNAPSHOT.key(),
98+
String.valueOf(snapshot.id()));
99+
fileStoreTable = fileStoreTable.copy(dynamicOptions);
100+
84101
RowType partitionType = fileStoreTable.schema().logicalPartitionType();
85102
Predicate partitionPredicate =
86103
PartitionPredicate.createPartitionPredicate(
@@ -94,7 +111,9 @@ public void build() throws Exception {
94111
.env(env)
95112
.sourceBounded(true)
96113
.sourceParallelism(
97-
scanParallelism == null ? currentBucketNum() : scanParallelism)
114+
scanParallelism == null
115+
? currentBucketNum(snapshot)
116+
: scanParallelism)
98117
.predicate(partitionPredicate)
99118
.build();
100119

@@ -118,14 +137,15 @@ public void build() throws Exception {
118137
@Override
119138
public void run() throws Exception {
120139
build();
121-
env.execute("Rescale Postpone Bucket : " + table.fullName());
140+
env.execute("Rescale : " + table.fullName());
122141
}
123142

124-
private int currentBucketNum() {
143+
private int currentBucketNum(Snapshot snapshot) {
125144
FileStoreTable fileStoreTable = (FileStoreTable) table;
126145
Iterator<ManifestEntry> it =
127146
fileStoreTable
128147
.newSnapshotReader()
148+
.withSnapshot(snapshot)
129149
.withPartitionFilter(partition)
130150
.readFileIterator();
131151
Preconditions.checkArgument(

0 commit comments

Comments
 (0)