Skip to content

Commit c38eede

Browse files
authored
[core][flink] Compact action support read high level files from overwrite upgrade (#6868)
1 parent 816a76f commit c38eede

File tree

3 files changed

+235
-3
lines changed

3 files changed

+235
-3
lines changed

paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManager.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import javax.annotation.Nullable;
4242

4343
import java.io.IOException;
44+
import java.util.Collections;
4445
import java.util.Comparator;
4546
import java.util.List;
4647
import java.util.Optional;
@@ -114,7 +115,9 @@ public boolean shouldWaitForPreparingCheckpoint() {
114115

115116
@Override
116117
public void addNewFile(DataFileMeta file) {
117-
levels.addLevel0File(file);
118+
// if overwrite an empty partition, the snapshot will be changed to APPEND, then its files
119+
// might be upgraded to high level, thus we should use #update
120+
levels.update(Collections.emptyList(), Collections.singletonList(file));
118121
MetricUtils.safeCall(this::reportMetrics, LOG);
119122
}
120123

paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java

Lines changed: 92 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,14 @@
2727
import org.apache.paimon.schema.SchemaChange;
2828
import org.apache.paimon.table.FileStoreTable;
2929
import org.apache.paimon.table.sink.StreamWriteBuilder;
30+
import org.apache.paimon.table.sink.TableCommitImpl;
31+
import org.apache.paimon.table.sink.TableWriteImpl;
3032
import org.apache.paimon.table.source.DataSplit;
3133
import org.apache.paimon.table.source.StreamTableScan;
3234
import org.apache.paimon.table.source.TableScan;
35+
import org.apache.paimon.types.DataType;
3336
import org.apache.paimon.types.DataTypes;
34-
import org.apache.paimon.utils.CommonTestUtils;
37+
import org.apache.paimon.types.RowType;
3538
import org.apache.paimon.utils.SnapshotManager;
3639
import org.apache.paimon.utils.TraceableFileIO;
3740

@@ -51,8 +54,10 @@
5154
import java.util.HashMap;
5255
import java.util.List;
5356
import java.util.Map;
57+
import java.util.UUID;
5458
import java.util.concurrent.ThreadLocalRandom;
5559

60+
import static org.apache.paimon.utils.CommonTestUtils.waitUtil;
5661
import static org.assertj.core.api.Assertions.assertThat;
5762

5863
/** IT cases for {@link CompactAction}. */
@@ -208,7 +213,7 @@ public void testStreamingCompact() throws Exception {
208213

209214
// assert dedicated compact job will expire snapshots
210215
SnapshotManager snapshotManager = table.snapshotManager();
211-
CommonTestUtils.waitUtil(
216+
waitUtil(
212217
() ->
213218
snapshotManager.latestSnapshotId() - 2
214219
== snapshotManager.earliestSnapshotId(),
@@ -735,6 +740,91 @@ public void testWrongUsage() throws Exception {
735740
.hasMessage("sort compact do not support 'partition_idle_time'.");
736741
}
737742

743+
@Test
744+
public void testStreamingCompactWithEmptyOverwriteUpgrade() throws Exception {
745+
Map<String, String> tableOptions = new HashMap<>();
746+
tableOptions.put(CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL.key(), "1s");
747+
tableOptions.put(CoreOptions.COMPACTION_FORCE_UP_LEVEL_0.key(), "true");
748+
tableOptions.put(CoreOptions.WRITE_ONLY.key(), "true");
749+
750+
DataType[] fieldTypes = new DataType[] {DataTypes.INT(), DataTypes.INT(), DataTypes.INT()};
751+
RowType rowType = RowType.of(fieldTypes, new String[] {"k", "v", "pt"});
752+
FileStoreTable table =
753+
createFileStoreTable(
754+
rowType,
755+
Collections.singletonList("pt"),
756+
Arrays.asList("k", "pt"),
757+
Collections.emptyList(),
758+
tableOptions);
759+
SnapshotManager snapshotManager = table.snapshotManager();
760+
761+
StreamExecutionEnvironment env =
762+
streamExecutionEnvironmentBuilder().checkpointIntervalMs(500).build();
763+
createAction(
764+
CompactAction.class,
765+
"compact",
766+
"--database",
767+
database,
768+
"--table",
769+
tableName,
770+
"--catalog_conf",
771+
"warehouse=" + warehouse)
772+
.withStreamExecutionEnvironment(env)
773+
.build();
774+
env.executeAsync();
775+
776+
StreamWriteBuilder streamWriteBuilder =
777+
table.newStreamWriteBuilder().withCommitUser(commitUser);
778+
write = streamWriteBuilder.newWrite();
779+
commit = streamWriteBuilder.newCommit();
780+
781+
writeData(rowData(1, 100, 1), rowData(2, 200, 1), rowData(1, 100, 2));
782+
783+
waitUtil(
784+
() -> {
785+
Snapshot latest = snapshotManager.latestSnapshot();
786+
return latest != null && latest.commitKind() == Snapshot.CommitKind.COMPACT;
787+
},
788+
Duration.ofSeconds(10),
789+
Duration.ofMillis(100));
790+
791+
long snapshotId1 = snapshotManager.latestSnapshotId();
792+
793+
// overwrite empty partition and let it upgrade
794+
String newCommitUser = UUID.randomUUID().toString();
795+
try (TableWriteImpl<?> newWrite = table.newWrite(newCommitUser);
796+
TableCommitImpl newCommit =
797+
table.newCommit(newCommitUser)
798+
.withOverwrite(Collections.singletonMap("pt", "3"))) {
799+
newWrite.write(rowData(1, 100, 3));
800+
newWrite.write(rowData(2, 200, 3));
801+
newCommit.commit(newWrite.prepareCommit(false, 1));
802+
}
803+
// write level 0 file to trigger compaction
804+
writeData(rowData(1, 101, 3));
805+
806+
waitUtil(
807+
() -> {
808+
Snapshot latest = snapshotManager.latestSnapshot();
809+
return latest.id() > snapshotId1
810+
&& latest.commitKind() == Snapshot.CommitKind.COMPACT;
811+
},
812+
Duration.ofSeconds(10),
813+
Duration.ofMillis(100));
814+
815+
validateResult(
816+
table,
817+
rowType,
818+
table.newStreamScan(),
819+
Arrays.asList(
820+
"+I[1, 100, 1]",
821+
"+I[1, 100, 2]",
822+
"+I[1, 101, 3]",
823+
"+I[2, 200, 1]",
824+
"+I[2, 200, 3]"),
825+
60_000);
826+
}
827+
738828
private void runAction(boolean isStreaming) throws Exception {
739829
runAction(isStreaming, false, Collections.emptyList());
740830
}

paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreCompactOperatorTest.java

Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,29 +18,49 @@
1818

1919
package org.apache.paimon.flink.sink;
2020

21+
import org.apache.paimon.Snapshot;
22+
import org.apache.paimon.catalog.Identifier;
2123
import org.apache.paimon.data.BinaryRow;
2224
import org.apache.paimon.data.GenericRow;
2325
import org.apache.paimon.data.InternalRow;
2426
import org.apache.paimon.flink.FlinkRowData;
2527
import org.apache.paimon.io.DataFileMeta;
28+
import org.apache.paimon.manifest.ManifestCommittable;
2629
import org.apache.paimon.operation.WriteRestore;
30+
import org.apache.paimon.schema.Schema;
2731
import org.apache.paimon.table.FileStoreTable;
2832
import org.apache.paimon.table.TableTestBase;
2933
import org.apache.paimon.table.sink.SinkRecord;
34+
import org.apache.paimon.table.sink.TableCommitImpl;
35+
import org.apache.paimon.table.sink.TableWriteImpl;
36+
import org.apache.paimon.table.source.InnerTableRead;
37+
import org.apache.paimon.table.source.PlanImpl;
38+
import org.apache.paimon.table.source.StreamDataTableScan;
39+
import org.apache.paimon.table.source.TableScan;
40+
import org.apache.paimon.table.system.CompactBucketsTable;
41+
import org.apache.paimon.types.DataTypes;
3042
import org.apache.paimon.utils.Pair;
3143
import org.apache.paimon.utils.SerializationUtils;
3244

3345
import org.apache.flink.api.common.ExecutionConfig;
3446
import org.apache.flink.api.common.typeutils.TypeSerializer;
47+
import org.apache.flink.streaming.api.environment.CheckpointConfig;
3548
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
3649
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
3750
import org.apache.flink.table.data.RowData;
51+
import org.junit.jupiter.api.Test;
3852
import org.junit.jupiter.params.ParameterizedTest;
3953
import org.junit.jupiter.params.provider.ValueSource;
4054

55+
import javax.annotation.Nullable;
56+
57+
import java.util.Collections;
4158
import java.util.List;
59+
import java.util.Map;
60+
import java.util.UUID;
4261

4362
import static org.assertj.core.api.Assertions.assertThat;
63+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
4464

4565
/** Test for {@link StoreCompactOperator}. */
4666
public class StoreCompactOperatorTest extends TableTestBase {
@@ -86,6 +106,91 @@ public void testCompactExactlyOnce(boolean streamingMode) throws Exception {
86106
assertThat(compactRememberStoreWrite.compactTime).isEqualTo(3);
87107
}
88108

109+
@Test
110+
public void testStreamingCompactConflictWithOverwrite() throws Exception {
111+
Schema schema =
112+
Schema.newBuilder()
113+
.column("pt", DataTypes.INT())
114+
.column("a", DataTypes.INT())
115+
.column("b", DataTypes.INT())
116+
.partitionKeys("pt")
117+
.primaryKey("pt", "a")
118+
.option("bucket", "1")
119+
.build();
120+
Identifier identifier = identifier();
121+
catalog.createTable(identifier, schema, false);
122+
FileStoreTable table = (FileStoreTable) catalog.getTable(identifier);
123+
124+
String writeJobCommitUser = UUID.randomUUID().toString();
125+
String compactJobCommitUser = UUID.randomUUID().toString();
126+
127+
CompactBucketsTable compactBucketsTable = new CompactBucketsTable(table, true);
128+
StreamDataTableScan scan = compactBucketsTable.newStreamScan();
129+
InnerTableRead read = compactBucketsTable.newRead();
130+
131+
CheckpointConfig checkpointConfig = new CheckpointConfig();
132+
checkpointConfig.setCheckpointInterval(500);
133+
StoreCompactOperator.Factory operatorFactory =
134+
new StoreCompactOperator.Factory(
135+
table,
136+
StoreSinkWrite.createWriteProvider(
137+
table, checkpointConfig, true, false, false),
138+
compactJobCommitUser,
139+
true);
140+
141+
TypeSerializer<Committable> serializer =
142+
new CommittableTypeInfo().createSerializer(new ExecutionConfig());
143+
OneInputStreamOperatorTestHarness<RowData, Committable> harness =
144+
new OneInputStreamOperatorTestHarness<>(operatorFactory);
145+
harness.setup(serializer);
146+
harness.initializeEmptyState();
147+
harness.open();
148+
StoreCompactOperator operator = (StoreCompactOperator) harness.getOperator();
149+
150+
FileStoreTable writeOnlyTable = table.copy(Collections.singletonMap("write-only", "true"));
151+
152+
// write base data
153+
batchWriteAndCommit(writeOnlyTable, writeJobCommitUser, null, GenericRow.of(1, 1, 100));
154+
read.createReader(scan.plan())
155+
.forEachRemaining(
156+
row -> {
157+
try {
158+
harness.processElement(new StreamRecord<>(new FlinkRowData(row)));
159+
} catch (Exception e) {
160+
throw new RuntimeException(e);
161+
}
162+
});
163+
164+
List<Committable> committables1 = operator.prepareCommit(true, 1);
165+
commit(table, compactJobCommitUser, committables1, 1);
166+
assertThat(table.snapshotManager().latestSnapshot().commitKind())
167+
.isEqualTo(Snapshot.CommitKind.COMPACT);
168+
169+
// overwrite and insert
170+
batchWriteAndCommit(
171+
writeOnlyTable,
172+
writeJobCommitUser,
173+
Collections.singletonMap("pt", "1"),
174+
GenericRow.of(1, 2, 200));
175+
batchWriteAndCommit(writeOnlyTable, writeJobCommitUser, null, GenericRow.of(1, 3, 300));
176+
assertThat(table.snapshotManager().latestSnapshot().id()).isEqualTo(4);
177+
TableScan.Plan plan = scan.plan();
178+
assertThat(((PlanImpl) plan).snapshotId()).isEqualTo(4);
179+
read.createReader(plan)
180+
.forEachRemaining(
181+
row -> {
182+
try {
183+
harness.processElement(new StreamRecord<>(new FlinkRowData(row)));
184+
} catch (Exception e) {
185+
throw new RuntimeException(e);
186+
}
187+
});
188+
189+
List<Committable> committables2 = operator.prepareCommit(true, 2);
190+
assertThatThrownBy(() -> commit(table, compactJobCommitUser, committables2, 2))
191+
.hasMessageContaining("File deletion conflicts detected! Give up committing.");
192+
}
193+
89194
private RowData data(int bucket) {
90195
GenericRow genericRow =
91196
GenericRow.of(
@@ -96,6 +201,40 @@ private RowData data(int bucket) {
96201
return new FlinkRowData(genericRow);
97202
}
98203

204+
private void batchWriteAndCommit(
205+
FileStoreTable table,
206+
String commitUser,
207+
@Nullable Map<String, String> overwritePartition,
208+
InternalRow... rows)
209+
throws Exception {
210+
try (TableWriteImpl<?> write = table.newWrite(commitUser);
211+
TableCommitImpl commit =
212+
table.newCommit(commitUser).withOverwrite(overwritePartition)) {
213+
for (InternalRow row : rows) {
214+
write.write(row);
215+
}
216+
commit.commit(write.prepareCommit());
217+
}
218+
}
219+
220+
private void commit(
221+
FileStoreTable table,
222+
String commitUser,
223+
List<Committable> committables,
224+
long checkpointId)
225+
throws Exception {
226+
try (TableCommitImpl commit = table.newCommit(commitUser)) {
227+
StoreCommitter committer =
228+
new StoreCommitter(
229+
table,
230+
commit,
231+
Committer.createContext(commitUser, null, true, false, null, 1, 1));
232+
ManifestCommittable manifestCommittable =
233+
committer.combine(checkpointId, System.currentTimeMillis(), committables);
234+
committer.commit(Collections.singletonList(manifestCommittable));
235+
}
236+
}
237+
99238
private static class CompactRememberStoreWrite implements StoreSinkWrite {
100239

101240
private final boolean streamingMode;

0 commit comments

Comments
 (0)