Skip to content

Commit 974f577

Browse files
authored
[core] Add pre check when drop partition for chain table (#7109)
1 parent cca171f commit 974f577

File tree

7 files changed

+385
-22
lines changed

7 files changed

+385
-22
lines changed

paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import org.apache.paimon.manifest.ManifestList;
3737
import org.apache.paimon.metastore.AddPartitionCommitCallback;
3838
import org.apache.paimon.metastore.AddPartitionTagCallback;
39+
import org.apache.paimon.metastore.ChainTableCommitPreCallback;
3940
import org.apache.paimon.metastore.ChainTableOverwriteCommitCallback;
4041
import org.apache.paimon.metastore.TagPreviewCommitCallback;
4142
import org.apache.paimon.operation.ChangelogDeletion;
@@ -59,6 +60,7 @@
5960
import org.apache.paimon.table.PartitionHandler;
6061
import org.apache.paimon.table.sink.CallbackUtils;
6162
import org.apache.paimon.table.sink.CommitCallback;
63+
import org.apache.paimon.table.sink.CommitPreCallback;
6264
import org.apache.paimon.table.sink.TagCallback;
6365
import org.apache.paimon.tag.SuccessFileTagCallback;
6466
import org.apache.paimon.tag.TagAutoManager;
@@ -333,7 +335,8 @@ public FileStoreCommitImpl newCommit(String commitUser, FileStoreTable table) {
333335
options.commitDiscardDuplicateFiles(),
334336
conflictDetectFactory,
335337
strictModeChecker,
336-
rollback);
338+
rollback,
339+
createCommitPreCallbacks(commitUser, table));
337340
}
338341

339342
@Override
@@ -431,6 +434,15 @@ private List<CommitCallback> createCommitCallbacks(String commitUser, FileStoreT
431434
return callbacks;
432435
}
433436

437+
private List<CommitPreCallback> createCommitPreCallbacks(
438+
String commitUser, FileStoreTable table) {
439+
List<CommitPreCallback> callbacks = new ArrayList<>();
440+
if (options.isChainTable()) {
441+
callbacks.add(new ChainTableCommitPreCallback(table));
442+
}
443+
return callbacks;
444+
}
445+
434446
@Override
435447
@Nullable
436448
public PartitionExpire newPartitionExpire(String commitUser, FileStoreTable table) {
Lines changed: 207 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,207 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.metastore;
20+
21+
import org.apache.paimon.CoreOptions;
22+
import org.apache.paimon.Snapshot;
23+
import org.apache.paimon.Snapshot.CommitKind;
24+
import org.apache.paimon.codegen.CodeGenUtils;
25+
import org.apache.paimon.codegen.RecordComparator;
26+
import org.apache.paimon.data.BinaryRow;
27+
import org.apache.paimon.manifest.FileKind;
28+
import org.apache.paimon.manifest.IndexManifestEntry;
29+
import org.apache.paimon.manifest.ManifestEntry;
30+
import org.apache.paimon.manifest.PartitionEntry;
31+
import org.apache.paimon.manifest.SimpleFileEntry;
32+
import org.apache.paimon.operation.commit.ManifestEntryChanges;
33+
import org.apache.paimon.predicate.Predicate;
34+
import org.apache.paimon.predicate.PredicateBuilder;
35+
import org.apache.paimon.table.FileStoreTable;
36+
import org.apache.paimon.table.sink.CommitPreCallback;
37+
import org.apache.paimon.table.source.snapshot.SnapshotReader;
38+
import org.apache.paimon.types.RowType;
39+
import org.apache.paimon.utils.ChainTableUtils;
40+
import org.apache.paimon.utils.InternalRowPartitionComputer;
41+
import org.apache.paimon.utils.RowDataToObjectArrayConverter;
42+
43+
import org.slf4j.Logger;
44+
import org.slf4j.LoggerFactory;
45+
46+
import java.util.List;
47+
import java.util.Optional;
48+
import java.util.stream.Collectors;
49+
50+
/**
51+
* {@link CommitPreCallback} implementation for chain tables.
52+
*
53+
* <p>This callback performs a pre-check before dropping partitions on the snapshot branch of a
54+
* chain table. It verifies that a snapshot partition being dropped is either followed by no delta
55+
* partitions in the chain interval or has a previous snapshot partition that can serve as its
56+
* predecessor.
57+
*
58+
* <p>The callback is only executed when all of following conditions are met:
59+
*
60+
* <ul>
61+
* <li>The table is configured as a chain table and the current branch is the snapshot branch (see
62+
* {@link ChainTableUtils#isScanFallbackSnapshotBranch(CoreOptions)}).
63+
* <li>The committed snapshot kind is {@link CommitKind#OVERWRITE}.
64+
* <li>All table and index manifest entries in the commit are {@link FileKind#DELETE deletes}.
65+
* </ul>
66+
*
67+
* <p>If the validation fails for any of the affected partitions, a {@link RuntimeException} is
68+
* thrown and the commit is aborted.
69+
*
70+
* <p>This implementation keeps only references to the table and its options and does not maintain
71+
* mutable state between invocations.
72+
*/
73+
public class ChainTableCommitPreCallback implements CommitPreCallback {
74+
75+
private static final Logger LOG = LoggerFactory.getLogger(ChainTableCommitPreCallback.class);
76+
77+
private transient FileStoreTable table;
78+
private transient CoreOptions coreOptions;
79+
80+
public ChainTableCommitPreCallback(FileStoreTable table) {
81+
this.table = table;
82+
this.coreOptions = table.coreOptions();
83+
}
84+
85+
@Override
86+
public void call(
87+
List<SimpleFileEntry> baseFiles,
88+
List<ManifestEntry> deltaFiles,
89+
List<IndexManifestEntry> indexFiles,
90+
Snapshot snapshot) {
91+
if (!ChainTableUtils.isScanFallbackSnapshotBranch(coreOptions)) {
92+
return;
93+
}
94+
if (snapshot.commitKind() != CommitKind.OVERWRITE) {
95+
return;
96+
}
97+
if (!isPureDeleteCommit(deltaFiles, indexFiles)) {
98+
return;
99+
}
100+
List<BinaryRow> changedPartitions =
101+
ManifestEntryChanges.changedPartitions(deltaFiles, indexFiles);
102+
FileStoreTable candidateTable = ChainTableUtils.resolveChainPrimaryTable(table);
103+
FileStoreTable deltaTable =
104+
candidateTable.switchToBranch(coreOptions.scanFallbackDeltaBranch());
105+
RowType partitionType = table.schema().logicalPartitionType();
106+
RowDataToObjectArrayConverter partitionConverter =
107+
new RowDataToObjectArrayConverter(partitionType);
108+
InternalRowPartitionComputer partitionComputer =
109+
new InternalRowPartitionComputer(
110+
coreOptions.partitionDefaultName(),
111+
partitionType,
112+
table.schema().partitionKeys().toArray(new String[0]),
113+
coreOptions.legacyPartitionName());
114+
RecordComparator partitionComparator =
115+
CodeGenUtils.newRecordComparator(partitionType.getFieldTypes());
116+
List<BinaryRow> snapshotPartitions =
117+
table.newSnapshotReader().partitionEntries().stream()
118+
.map(PartitionEntry::partition)
119+
.sorted(partitionComparator)
120+
.collect(Collectors.toList());
121+
SnapshotReader deltaSnapshotReader = deltaTable.newSnapshotReader();
122+
PredicateBuilder builder = new PredicateBuilder(partitionType);
123+
for (BinaryRow partition : changedPartitions) {
124+
Optional<BinaryRow> preSnapshotPartition =
125+
findPreSnapshotPartition(snapshotPartitions, partition, partitionComparator);
126+
Optional<BinaryRow> nextSnapshotPartition =
127+
findNextSnapshotPartition(snapshotPartitions, partition, partitionComparator);
128+
Predicate deltaFollowingPredicate =
129+
ChainTableUtils.createTriangularPredicate(
130+
partition, partitionConverter, builder::equal, builder::greaterThan);
131+
List<BinaryRow> deltaFollowingPartitions =
132+
deltaSnapshotReader.withPartitionFilter(deltaFollowingPredicate)
133+
.partitionEntries().stream()
134+
.map(PartitionEntry::partition)
135+
.filter(
136+
deltaPartition ->
137+
isBeforeNextSnapshotPartition(
138+
deltaPartition,
139+
nextSnapshotPartition,
140+
partitionComparator))
141+
.collect(Collectors.toList());
142+
boolean canDrop =
143+
deltaFollowingPartitions.isEmpty() || preSnapshotPartition.isPresent();
144+
LOG.info(
145+
"Drop partition, partition={}, canDrop={}, preSnapshotPartition={}, nextSnapshotPartition={}",
146+
partitionComputer.generatePartValues(partition),
147+
canDrop,
148+
generatePartitionValues(preSnapshotPartition, partitionComputer),
149+
generatePartitionValues(nextSnapshotPartition, partitionComputer));
150+
if (!canDrop) {
151+
throw new RuntimeException("Snapshot partition cannot be dropped.");
152+
}
153+
}
154+
}
155+
156+
private boolean isPureDeleteCommit(
157+
List<ManifestEntry> deltaFiles, List<IndexManifestEntry> indexFiles) {
158+
return deltaFiles.stream().allMatch(f -> f.kind() == FileKind.DELETE)
159+
&& indexFiles.stream().allMatch(f -> f.kind() == FileKind.DELETE);
160+
}
161+
162+
private Optional<BinaryRow> findPreSnapshotPartition(
163+
List<BinaryRow> snapshotPartitions,
164+
BinaryRow partition,
165+
RecordComparator partitionComparator) {
166+
BinaryRow pre = null;
167+
for (BinaryRow snapshotPartition : snapshotPartitions) {
168+
if (partitionComparator.compare(snapshotPartition, partition) < 0) {
169+
pre = snapshotPartition;
170+
} else {
171+
break;
172+
}
173+
}
174+
return Optional.ofNullable(pre);
175+
}
176+
177+
private Optional<BinaryRow> findNextSnapshotPartition(
178+
List<BinaryRow> snapshotPartitions,
179+
BinaryRow partition,
180+
RecordComparator partitionComparator) {
181+
for (BinaryRow snapshotPartition : snapshotPartitions) {
182+
if (partitionComparator.compare(snapshotPartition, partition) > 0) {
183+
return Optional.of(snapshotPartition);
184+
}
185+
}
186+
return Optional.empty();
187+
}
188+
189+
private boolean isBeforeNextSnapshotPartition(
190+
BinaryRow partition,
191+
Optional<BinaryRow> nextSnapshotPartition,
192+
RecordComparator partitionComparator) {
193+
return !nextSnapshotPartition.isPresent()
194+
|| partitionComparator.compare(partition, nextSnapshotPartition.get()) < 0;
195+
}
196+
197+
private String generatePartitionValues(
198+
Optional<BinaryRow> partition, InternalRowPartitionComputer partitionComputer) {
199+
if (!partition.isPresent()) {
200+
return "<none>";
201+
}
202+
return partitionComputer.generatePartValues(partition.get()).toString();
203+
}
204+
205+
@Override
206+
public void close() throws Exception {}
207+
}

paimon-core/src/main/java/org/apache/paimon/metastore/ChainTableOverwriteCommitCallback.java

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,6 @@
2626
import org.apache.paimon.manifest.ManifestCommittable;
2727
import org.apache.paimon.manifest.ManifestEntry;
2828
import org.apache.paimon.manifest.SimpleFileEntry;
29-
import org.apache.paimon.table.ChainGroupReadTable;
30-
import org.apache.paimon.table.FallbackReadFileStoreTable;
3129
import org.apache.paimon.table.FileStoreTable;
3230
import org.apache.paimon.table.sink.BatchTableCommit;
3331
import org.apache.paimon.table.sink.CommitCallback;
@@ -78,13 +76,7 @@ public void call(
7876
return;
7977
}
8078

81-
// Find the underlying table for writing snapshot branch.
82-
FileStoreTable candidateTable = table;
83-
if (table instanceof FallbackReadFileStoreTable) {
84-
candidateTable =
85-
((ChainGroupReadTable) ((FallbackReadFileStoreTable) table).fallback())
86-
.wrapped();
87-
}
79+
FileStoreTable candidateTable = ChainTableUtils.resolveChainPrimaryTable(table);
8880

8981
FileStoreTable snapshotTable =
9082
candidateTable.switchToBranch(coreOptions.scanFallbackSnapshotBranch());

paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@
6666
import org.apache.paimon.table.sink.CommitCallback;
6767
import org.apache.paimon.table.sink.CommitMessage;
6868
import org.apache.paimon.table.sink.CommitMessageImpl;
69+
import org.apache.paimon.table.sink.CommitPreCallback;
6970
import org.apache.paimon.types.RowType;
7071
import org.apache.paimon.utils.DataFilePathFactories;
7172
import org.apache.paimon.utils.FileStorePathFactory;
@@ -166,6 +167,8 @@ public class FileStoreCommitImpl implements FileStoreCommit {
166167
private CommitMetrics commitMetrics;
167168
private boolean appendCommitCheckConflict = false;
168169

170+
private final List<CommitPreCallback> commitPreCallbacks;
171+
169172
public FileStoreCommitImpl(
170173
SnapshotCommit snapshotCommit,
171174
FileIO fileIO,
@@ -199,7 +202,8 @@ public FileStoreCommitImpl(
199202
boolean discardDuplicateFiles,
200203
ConflictDetection.Factory conflictDetectFactory,
201204
@Nullable StrictModeChecker strictModeChecker,
202-
@Nullable CommitRollback rollback) {
205+
@Nullable CommitRollback rollback,
206+
List<CommitPreCallback> commitPreCallbacks) {
203207
this.snapshotCommit = snapshotCommit;
204208
this.fileIO = fileIO;
205209
this.schemaManager = schemaManager;
@@ -241,6 +245,7 @@ public FileStoreCommitImpl(
241245
this.strictModeChecker = strictModeChecker;
242246
this.conflictDetection = conflictDetectFactory.create(scanner);
243247
this.commitCleaner = new CommitCleaner(manifestList, manifestFile, indexManifestFile);
248+
this.commitPreCallbacks = commitPreCallbacks;
244249
}
245250

246251
@Override
@@ -1005,6 +1010,11 @@ CommitResult tryCommitOnce(
10051010
}
10061011

10071012
boolean success;
1013+
final List<SimpleFileEntry> finalBaseFiles = baseDataFiles;
1014+
final List<ManifestEntry> finalDeltaFiles = deltaFiles;
1015+
commitPreCallbacks.forEach(
1016+
callback ->
1017+
callback.call(finalBaseFiles, finalDeltaFiles, indexFiles, newSnapshot));
10081018
try {
10091019
success = commitSnapshotImpl(newSnapshot, deltaStatistics);
10101020
} catch (Exception e) {
@@ -1041,8 +1051,6 @@ CommitResult tryCommitOnce(
10411051
if (strictModeChecker != null) {
10421052
strictModeChecker.update(newSnapshotId);
10431053
}
1044-
final List<SimpleFileEntry> finalBaseFiles = baseDataFiles;
1045-
final List<ManifestEntry> finalDeltaFiles = deltaFiles;
10461054
commitCallbacks.forEach(
10471055
callback ->
10481056
callback.call(finalBaseFiles, finalDeltaFiles, indexFiles, newSnapshot));
@@ -1185,6 +1193,7 @@ private boolean commitSnapshotImpl(Snapshot newSnapshot, List<PartitionEntry> de
11851193

11861194
@Override
11871195
public void close() {
1196+
IOUtils.closeAllQuietly(commitPreCallbacks);
11881197
IOUtils.closeAllQuietly(commitCallbacks);
11891198
IOUtils.closeQuietly(snapshotCommit);
11901199
}
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.table.sink;
20+
21+
import org.apache.paimon.Snapshot;
22+
import org.apache.paimon.manifest.IndexManifestEntry;
23+
import org.apache.paimon.manifest.ManifestEntry;
24+
import org.apache.paimon.manifest.SimpleFileEntry;
25+
26+
import java.util.List;
27+
28+
/**
29+
* Callback which is invoked before a snapshot is committed.
30+
*
31+
* <p>This hook allows additional validation or bookkeeping to be performed before a snapshot
32+
* becomes visible. Implementations can inspect the files that are about to be committed as well as
33+
* the {@link Snapshot} metadata and decide whether the commit should proceed.
34+
*
35+
* <p>If {@link #call(List, List, List, Snapshot)} throws a {@link RuntimeException}, the commit is
36+
* aborted and the snapshot will not be committed. Implementations are expected to be fast and
37+
* either side effect free or idempotent, because a single logical commit may be retried and this
38+
* callback might therefore be invoked multiple times for the same logical changes.
39+
*
40+
* <p>Implementations may optionally override {@link AutoCloseable#close()} if they hold external
41+
* resources that need to be released when the surrounding {@link org.apache.paimon.table.Table}
42+
* commit is closed.
43+
*/
44+
public interface CommitPreCallback extends AutoCloseable {
45+
46+
void call(
47+
List<SimpleFileEntry> baseFiles,
48+
List<ManifestEntry> deltaFiles,
49+
List<IndexManifestEntry> indexFiles,
50+
Snapshot snapshot);
51+
}

0 commit comments

Comments
 (0)