Skip to content

Commit abc3a84

Browse files
authored
[flink] Enable data-evolution compaction on flink (#6875)
1 parent fd8b159 commit abc3a84

File tree

12 files changed

+765
-81
lines changed

12 files changed

+765
-81
lines changed

paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactCoordinator.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.apache.paimon.manifest.ManifestFileMeta;
2727
import org.apache.paimon.partition.PartitionPredicate;
2828
import org.apache.paimon.table.FileStoreTable;
29+
import org.apache.paimon.table.source.EndOfScanException;
2930
import org.apache.paimon.table.source.ScanMode;
3031
import org.apache.paimon.table.source.snapshot.SnapshotReader;
3132
import org.apache.paimon.utils.RangeHelper;
@@ -44,6 +45,7 @@
4445
import java.util.stream.Collectors;
4546

4647
import static org.apache.paimon.format.blob.BlobFileFormat.isBlobFile;
48+
import static org.apache.paimon.utils.Preconditions.checkArgument;
4749

4850
/** Compact coordinator to compact data evolution table. */
4951
public class DataEvolutionCompactCoordinator {
@@ -108,9 +110,14 @@ List<ManifestEntry> scan() {
108110
List<ManifestEntry> targetEntries =
109111
currentMetas.stream()
110112
.flatMap(meta -> snapshotReader.readManifest(meta).stream())
113+
// we don't need stats for compaction
114+
.map(ManifestEntry::copyWithoutStats)
111115
.collect(Collectors.toList());
112116
result.addAll(targetEntries);
113117
}
118+
if (result.isEmpty()) {
119+
throw new EndOfScanException();
120+
}
114121
return result;
115122
}
116123
}
@@ -199,6 +206,10 @@ List<DataEvolutionCompactTask> compactPlan(List<ManifestEntry> input) {
199206

200207
long weightSum = 0L;
201208
for (List<DataFileMeta> fileGroup : groupedFiles) {
209+
checkArgument(
210+
rangeHelper.areAllRangesSame(fileGroup),
211+
"Data files %s should be all row id ranges same.",
212+
dataFiles);
202213
long currentGroupWeight =
203214
fileGroup.stream()
204215
.mapToLong(d -> Math.max(d.fileSize(), openFileCost))

paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactTask.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ public boolean isBlobTask() {
8080
return blobTask;
8181
}
8282

83-
public CommitMessage doCompact(FileStoreTable table) throws Exception {
83+
public CommitMessage doCompact(FileStoreTable table, String commitUser) throws Exception {
8484
if (blobTask) {
8585
// TODO: support blob file compaction
8686
throw new UnsupportedOperationException("Blob task is not supported");
@@ -107,8 +107,7 @@ public CommitMessage doCompact(FileStoreTable table) throws Exception {
107107
.build();
108108
RecordReader<InternalRow> reader =
109109
store.newDataEvolutionRead().withReadType(readWriteType).createReader(dataSplit);
110-
AppendFileStoreWrite storeWrite =
111-
(AppendFileStoreWrite) store.newWrite("Compact-Data-Evolution");
110+
AppendFileStoreWrite storeWrite = (AppendFileStoreWrite) store.newWrite(commitUser);
112111
storeWrite.withWriteType(readWriteType);
113112
RecordWriter<InternalRow> writer = storeWrite.createWriter(partition, 0);
114113

paimon-core/src/test/java/org/apache/paimon/table/DataEvolutionTableTest.java

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@
5656
import org.apache.paimon.table.sink.CommitMessage;
5757
import org.apache.paimon.table.sink.CommitMessageImpl;
5858
import org.apache.paimon.table.source.DataSplit;
59+
import org.apache.paimon.table.source.EndOfScanException;
5960
import org.apache.paimon.table.source.ReadBuilder;
6061
import org.apache.paimon.table.source.Split;
6162
import org.apache.paimon.types.DataField;
@@ -913,11 +914,15 @@ public void testCompactCoordinator() throws Exception {
913914
// Each plan() call processes one manifest group
914915
List<DataEvolutionCompactTask> allTasks = new ArrayList<>();
915916
List<DataEvolutionCompactTask> tasks;
916-
while (!(tasks = coordinator.plan()).isEmpty() || allTasks.isEmpty()) {
917-
allTasks.addAll(tasks);
918-
if (tasks.isEmpty()) {
919-
break;
917+
try {
918+
while (!(tasks = coordinator.plan()).isEmpty() || allTasks.isEmpty()) {
919+
allTasks.addAll(tasks);
920+
if (tasks.isEmpty()) {
921+
break;
922+
}
920923
}
924+
} catch (EndOfScanException ingore) {
925+
921926
}
922927

923928
// Verify no exceptions were thrown and tasks list is valid (may be empty)
@@ -940,10 +945,13 @@ public void testCompact() throws Exception {
940945
// Each plan() call processes one manifest group
941946
List<CommitMessage> commitMessages = new ArrayList<>();
942947
List<DataEvolutionCompactTask> tasks;
943-
while (!(tasks = coordinator.plan()).isEmpty()) {
944-
for (DataEvolutionCompactTask task : tasks) {
945-
commitMessages.add(task.doCompact(table));
948+
try {
949+
while (!(tasks = coordinator.plan()).isEmpty()) {
950+
for (DataEvolutionCompactTask task : tasks) {
951+
commitMessages.add(task.doCompact(table, "test-commit"));
952+
}
946953
}
954+
} catch (EndOfScanException ignore) {
947955
}
948956

949957
table.newBatchWriteBuilder().newCommit().commit(commitMessages);

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.paimon.data.InternalRow;
2424
import org.apache.paimon.flink.FlinkConnectorOptions;
2525
import org.apache.paimon.flink.compact.AppendTableCompact;
26+
import org.apache.paimon.flink.compact.DataEvolutionTableCompact;
2627
import org.apache.paimon.flink.compact.IncrementalClusterCompact;
2728
import org.apache.paimon.flink.postpone.PostponeBucketCompactSplitSource;
2829
import org.apache.paimon.flink.postpone.RewritePostponeBucketCommittableOperator;
@@ -100,9 +101,6 @@ public CompactAction(
100101
"Only FileStoreTable supports compact action. The table type is '%s'.",
101102
table.getClass().getName()));
102103
}
103-
checkArgument(
104-
!((FileStoreTable) table).coreOptions().dataEvolutionEnabled(),
105-
"Compact action does not support data evolution table yet. ");
106104
HashMap<String, String> dynamicOptions = new HashMap<>(tableConf);
107105
dynamicOptions.put(CoreOptions.WRITE_ONLY.key(), "false");
108106
table = table.copy(dynamicOptions);
@@ -146,7 +144,10 @@ protected boolean buildImpl() throws Exception {
146144
if (fileStoreTable.coreOptions().bucket() == BucketMode.POSTPONE_BUCKET) {
147145
buildForPostponeBucketCompaction(env, fileStoreTable, isStreaming);
148146
} else if (fileStoreTable.bucketMode() == BucketMode.BUCKET_UNAWARE) {
149-
if (fileStoreTable.coreOptions().clusteringIncrementalEnabled()) {
147+
148+
if (fileStoreTable.coreOptions().dataEvolutionEnabled()) {
149+
buildForDataEvolutionTableCompact(env, fileStoreTable, isStreaming);
150+
} else if (fileStoreTable.coreOptions().clusteringIncrementalEnabled()) {
150151
new IncrementalClusterCompact(
151152
env, fileStoreTable, partitionPredicate, fullCompaction)
152153
.build();
@@ -205,6 +206,16 @@ protected void buildForAppendTableCompact(
205206
builder.build();
206207
}
207208

209+
protected void buildForDataEvolutionTableCompact(
210+
StreamExecutionEnvironment env, FileStoreTable table, boolean isStreaming)
211+
throws Exception {
212+
checkArgument(!isStreaming, "Data evolution table compact only supports batch mode yet.");
213+
DataEvolutionTableCompact builder =
214+
new DataEvolutionTableCompact(env, identifier.getFullName(), table);
215+
builder.withPartitionPredicate(getPartitionPredicate());
216+
builder.build();
217+
}
218+
208219
protected PartitionPredicate getPartitionPredicate() throws Exception {
209220
checkArgument(
210221
partitions == null || whereSql == null,
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.flink.compact;
20+
21+
import org.apache.paimon.append.dataevolution.DataEvolutionCompactTask;
22+
import org.apache.paimon.flink.FlinkConnectorOptions;
23+
import org.apache.paimon.flink.sink.DataEvolutionTableCompactSink;
24+
import org.apache.paimon.flink.source.DataEvolutionTableCompactSource;
25+
import org.apache.paimon.options.Options;
26+
import org.apache.paimon.partition.PartitionPredicate;
27+
import org.apache.paimon.table.FileStoreTable;
28+
29+
import org.apache.flink.streaming.api.datastream.DataStream;
30+
import org.apache.flink.streaming.api.datastream.DataStreamSource;
31+
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
32+
import org.apache.flink.streaming.api.transformations.PartitionTransformation;
33+
import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner;
34+
35+
import javax.annotation.Nullable;
36+
37+
/** Build for data-evolution table flink compaction job. */
38+
public class DataEvolutionTableCompact {
39+
40+
private final transient StreamExecutionEnvironment env;
41+
private final String tableIdentifier;
42+
private final FileStoreTable table;
43+
44+
@Nullable private PartitionPredicate partitionPredicate;
45+
46+
public DataEvolutionTableCompact(
47+
StreamExecutionEnvironment env, String tableIdentifier, FileStoreTable table) {
48+
this.env = env;
49+
this.tableIdentifier = tableIdentifier;
50+
this.table = table;
51+
}
52+
53+
public void withPartitionPredicate(PartitionPredicate partitionPredicate) {
54+
this.partitionPredicate = partitionPredicate;
55+
}
56+
57+
public void build() {
58+
DataEvolutionTableCompactSource source =
59+
new DataEvolutionTableCompactSource(table, partitionPredicate);
60+
DataStreamSource<DataEvolutionCompactTask> sourceStream =
61+
DataEvolutionTableCompactSource.buildSource(env, source, tableIdentifier);
62+
63+
sinkFromSource(sourceStream);
64+
}
65+
66+
private void sinkFromSource(DataStreamSource<DataEvolutionCompactTask> input) {
67+
Options conf = Options.fromMap(table.options());
68+
Integer compactionWorkerParallelism =
69+
conf.get(FlinkConnectorOptions.UNAWARE_BUCKET_COMPACTION_PARALLELISM);
70+
PartitionTransformation<DataEvolutionCompactTask> transformation =
71+
new PartitionTransformation<>(
72+
input.getTransformation(), new RebalancePartitioner<>());
73+
if (compactionWorkerParallelism != null) {
74+
transformation.setParallelism(compactionWorkerParallelism);
75+
} else {
76+
transformation.setParallelism(env.getParallelism());
77+
}
78+
79+
DataStream<DataEvolutionCompactTask> rebalanced = new DataStream<>(env, transformation);
80+
DataEvolutionTableCompactSink.sink(table, rebalanced);
81+
}
82+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.flink.sink;
20+
21+
import org.apache.paimon.append.dataevolution.DataEvolutionCompactTask;
22+
import org.apache.paimon.append.dataevolution.DataEvolutionCompactTaskSerializer;
23+
24+
import org.apache.flink.core.io.SimpleVersionedSerializer;
25+
26+
import java.io.IOException;
27+
import java.nio.ByteBuffer;
28+
29+
/** {@link SimpleVersionedSerializer} for {@link DataEvolutionCompactTask}. */
30+
public class DataEvolutionCompactionTaskSimpleSerializer
31+
implements SimpleVersionedSerializer<DataEvolutionCompactTask> {
32+
33+
private final DataEvolutionCompactTaskSerializer compactionTaskSerializer;
34+
35+
public DataEvolutionCompactionTaskSimpleSerializer(
36+
DataEvolutionCompactTaskSerializer compactionTaskSerializer) {
37+
this.compactionTaskSerializer = compactionTaskSerializer;
38+
}
39+
40+
@Override
41+
public int getVersion() {
42+
return 1;
43+
}
44+
45+
@Override
46+
public byte[] serialize(DataEvolutionCompactTask compactionTask) throws IOException {
47+
byte[] wrapped = compactionTaskSerializer.serialize(compactionTask);
48+
int version = compactionTaskSerializer.getVersion();
49+
50+
return ByteBuffer.allocate(wrapped.length + 4).put(wrapped).putInt(version).array();
51+
}
52+
53+
@Override
54+
public DataEvolutionCompactTask deserialize(int compactionTaskVersion, byte[] bytes)
55+
throws IOException {
56+
if (compactionTaskVersion != getVersion()) {
57+
throw new RuntimeException("Can not deserialize version: " + compactionTaskVersion);
58+
}
59+
60+
ByteBuffer buffer = ByteBuffer.wrap(bytes);
61+
byte[] wrapped = new byte[bytes.length - 4];
62+
buffer.get(wrapped);
63+
int version = buffer.getInt();
64+
return compactionTaskSerializer.deserialize(version, wrapped);
65+
}
66+
}

0 commit comments

Comments
 (0)