Skip to content

Commit c20198b

Browse files
authored
[core] Support incremental clustering for append bucketed table (apache#6961)
1 parent f6e0196 commit c20198b

File tree

16 files changed

+1294
-19
lines changed

16 files changed

+1294
-19
lines changed

paimon-api/src/main/java/org/apache/paimon/CoreOptions.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3193,6 +3193,12 @@ public boolean clusteringIncrementalEnabled() {
31933193
return options.get(CLUSTERING_INCREMENTAL);
31943194
}
31953195

3196+
public boolean bucketClusterEnabled() {
3197+
return !bucketAppendOrdered()
3198+
&& !deletionVectorsEnabled()
3199+
&& clusteringIncrementalEnabled();
3200+
}
3201+
31963202
public Duration clusteringHistoryPartitionIdleTime() {
31973203
return options.get(CLUSTERING_HISTORY_PARTITION_IDLE_TIME);
31983204
}

paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,8 @@ public BaseAppendFileStoreWrite newWrite(String commitUser, @Nullable Integer wr
141141
newScan(),
142142
options,
143143
dvMaintainerFactory,
144-
tableName);
144+
tableName,
145+
schemaManager);
145146
}
146147
}
147148

Lines changed: 219 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,219 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.append.cluster;
20+
21+
import org.apache.paimon.AppendOnlyFileStore;
22+
import org.apache.paimon.annotation.VisibleForTesting;
23+
import org.apache.paimon.compact.CompactFutureManager;
24+
import org.apache.paimon.compact.CompactResult;
25+
import org.apache.paimon.compact.CompactTask;
26+
import org.apache.paimon.compact.CompactUnit;
27+
import org.apache.paimon.io.DataFileMeta;
28+
import org.apache.paimon.mergetree.LevelSortedRun;
29+
import org.apache.paimon.schema.SchemaManager;
30+
import org.apache.paimon.utils.Preconditions;
31+
32+
import org.slf4j.Logger;
33+
import org.slf4j.LoggerFactory;
34+
35+
import java.io.IOException;
36+
import java.util.List;
37+
import java.util.Optional;
38+
import java.util.concurrent.ExecutionException;
39+
import java.util.concurrent.ExecutorService;
40+
import java.util.stream.Collectors;
41+
42+
/** Cluster manager for {@link AppendOnlyFileStore}. */
43+
public class BucketedAppendClusterManager extends CompactFutureManager {
44+
45+
private static final Logger LOG = LoggerFactory.getLogger(BucketedAppendClusterManager.class);
46+
47+
private final ExecutorService executor;
48+
private final BucketedAppendLevels levels;
49+
private final IncrementalClusterStrategy strategy;
50+
private final CompactRewriter rewriter;
51+
52+
public BucketedAppendClusterManager(
53+
ExecutorService executor,
54+
List<DataFileMeta> restored,
55+
SchemaManager schemaManager,
56+
List<String> clusterKeys,
57+
int maxSizeAmp,
58+
int sizeRatio,
59+
int numRunCompactionTrigger,
60+
int numLevels,
61+
CompactRewriter rewriter) {
62+
this.executor = executor;
63+
this.levels = new BucketedAppendLevels(restored, numLevels);
64+
this.strategy =
65+
new IncrementalClusterStrategy(
66+
schemaManager, clusterKeys, maxSizeAmp, sizeRatio, numRunCompactionTrigger);
67+
this.rewriter = rewriter;
68+
}
69+
70+
@Override
71+
public boolean shouldWaitForLatestCompaction() {
72+
return false;
73+
}
74+
75+
@Override
76+
public boolean shouldWaitForPreparingCheckpoint() {
77+
return false;
78+
}
79+
80+
@Override
81+
public void addNewFile(DataFileMeta file) {
82+
levels.addLevel0File(file);
83+
}
84+
85+
@Override
86+
public List<DataFileMeta> allFiles() {
87+
return levels.allFiles();
88+
}
89+
90+
@Override
91+
public void triggerCompaction(boolean fullCompaction) {
92+
Optional<CompactUnit> optionalUnit;
93+
List<LevelSortedRun> runs = levels.levelSortedRuns();
94+
if (fullCompaction) {
95+
Preconditions.checkState(
96+
taskFuture == null,
97+
"A compaction task is still running while the user "
98+
+ "forces a new compaction. This is unexpected.");
99+
if (LOG.isDebugEnabled()) {
100+
LOG.debug(
101+
"Trigger forced full compaction. Picking from the following runs\n{}",
102+
runs);
103+
}
104+
optionalUnit = strategy.pick(levels.numberOfLevels(), runs, true);
105+
} else {
106+
if (taskFuture != null) {
107+
return;
108+
}
109+
if (LOG.isDebugEnabled()) {
110+
LOG.debug("Trigger normal compaction. Picking from the following runs\n{}", runs);
111+
}
112+
optionalUnit =
113+
strategy.pick(levels.numberOfLevels(), runs, false)
114+
.filter(unit -> !unit.files().isEmpty());
115+
}
116+
117+
optionalUnit.ifPresent(
118+
unit -> {
119+
if (LOG.isDebugEnabled()) {
120+
LOG.debug(
121+
"Submit compaction with files (name, level, size): "
122+
+ levels.levelSortedRuns().stream()
123+
.flatMap(lsr -> lsr.run().files().stream())
124+
.map(
125+
file ->
126+
String.format(
127+
"(%s, %d, %d)",
128+
file.fileName(),
129+
file.level(),
130+
file.fileSize()))
131+
.collect(Collectors.joining(", ")));
132+
}
133+
submitCompaction(unit);
134+
});
135+
}
136+
137+
private void submitCompaction(CompactUnit unit) {
138+
139+
BucketedAppendClusterTask task =
140+
new BucketedAppendClusterTask(unit.files(), unit.outputLevel(), rewriter);
141+
142+
if (LOG.isDebugEnabled()) {
143+
LOG.debug(
144+
"Pick these files (name, level, size) for {} compaction: {}",
145+
task.getClass().getSimpleName(),
146+
unit.files().stream()
147+
.map(
148+
file ->
149+
String.format(
150+
"(%s, %d, %d)",
151+
file.fileName(), file.level(), file.fileSize()))
152+
.collect(Collectors.joining(", ")));
153+
}
154+
taskFuture = executor.submit(task);
155+
}
156+
157+
@Override
158+
public Optional<CompactResult> getCompactionResult(boolean blocking)
159+
throws ExecutionException, InterruptedException {
160+
Optional<CompactResult> result = innerGetCompactionResult(blocking);
161+
result.ifPresent(
162+
r -> {
163+
if (LOG.isDebugEnabled()) {
164+
LOG.debug(
165+
"Update levels in compact manager with these changes:\nBefore:\n{}\nAfter:\n{}",
166+
r.before(),
167+
r.after());
168+
}
169+
levels.update(r.before(), r.after());
170+
if (LOG.isDebugEnabled()) {
171+
LOG.debug(
172+
"Levels in compact manager updated. Current runs are\n{}",
173+
levels.levelSortedRuns());
174+
}
175+
});
176+
return result;
177+
}
178+
179+
@Override
180+
public void close() throws IOException {}
181+
182+
@VisibleForTesting
183+
public BucketedAppendLevels levels() {
184+
return levels;
185+
}
186+
187+
/** A {@link CompactTask} impl for clustering of append bucketed table. */
188+
public static class BucketedAppendClusterTask extends CompactTask {
189+
190+
private final List<DataFileMeta> toCluster;
191+
private final int outputLevel;
192+
private final CompactRewriter rewriter;
193+
194+
public BucketedAppendClusterTask(
195+
List<DataFileMeta> toCluster, int outputLevel, CompactRewriter rewriter) {
196+
super(null);
197+
this.toCluster = toCluster;
198+
this.outputLevel = outputLevel;
199+
this.rewriter = rewriter;
200+
}
201+
202+
@Override
203+
protected CompactResult doCompact() throws Exception {
204+
List<DataFileMeta> rewrite = rewriter.rewrite(toCluster);
205+
return new CompactResult(toCluster, upgrade(rewrite));
206+
}
207+
208+
protected List<DataFileMeta> upgrade(List<DataFileMeta> files) {
209+
return files.stream()
210+
.map(file -> file.upgrade(outputLevel))
211+
.collect(Collectors.toList());
212+
}
213+
}
214+
215+
/** Compact rewriter for append-only table. */
216+
public interface CompactRewriter {
217+
List<DataFileMeta> rewrite(List<DataFileMeta> compactBefore) throws Exception;
218+
}
219+
}
Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.append.cluster;
20+
21+
import org.apache.paimon.io.DataFileMeta;
22+
import org.apache.paimon.mergetree.LevelSortedRun;
23+
import org.apache.paimon.mergetree.SortedRun;
24+
import org.apache.paimon.utils.Preconditions;
25+
26+
import java.util.ArrayList;
27+
import java.util.HashMap;
28+
import java.util.HashSet;
29+
import java.util.List;
30+
import java.util.Map;
31+
import java.util.stream.Collectors;
32+
33+
import static java.util.Collections.emptyList;
34+
import static org.apache.paimon.utils.Preconditions.checkArgument;
35+
36+
/** A class which stores all level files in append bucketed table. */
37+
public class BucketedAppendLevels {
38+
39+
private final HashSet<DataFileMeta> level0;
40+
41+
private final List<SortedRun> levels;
42+
43+
public BucketedAppendLevels(List<DataFileMeta> inputFiles, int numLevels) {
44+
int restoredNumLevels =
45+
Math.max(
46+
numLevels,
47+
inputFiles.stream().mapToInt(DataFileMeta::level).max().orElse(-1) + 1);
48+
checkArgument(restoredNumLevels > 1, "Number of levels must be at least 2.");
49+
this.level0 = new HashSet<>();
50+
this.levels = new ArrayList<>();
51+
for (int i = 1; i < restoredNumLevels; i++) {
52+
levels.add(SortedRun.empty());
53+
}
54+
55+
Map<Integer, List<DataFileMeta>> levelMap = new HashMap<>();
56+
for (DataFileMeta file : inputFiles) {
57+
levelMap.computeIfAbsent(file.level(), level -> new ArrayList<>()).add(file);
58+
}
59+
levelMap.forEach((level, files) -> updateLevel(level, emptyList(), files));
60+
61+
Preconditions.checkState(
62+
level0.size() + levels.stream().mapToInt(r -> r.files().size()).sum()
63+
== inputFiles.size(),
64+
"Number of files stored in Levels does not equal to the size of inputFiles. This is unexpected.");
65+
}
66+
67+
public void addLevel0File(DataFileMeta file) {
68+
checkArgument(file.level() == 0);
69+
level0.add(file);
70+
}
71+
72+
public SortedRun runOfLevel(int level) {
73+
checkArgument(level > 0, "Level0 does not have one single sorted run.");
74+
return levels.get(level - 1);
75+
}
76+
77+
public int numberOfLevels() {
78+
return levels.size() + 1;
79+
}
80+
81+
public int maxLevel() {
82+
return levels.size();
83+
}
84+
85+
public List<DataFileMeta> allFiles() {
86+
List<DataFileMeta> files = new ArrayList<>();
87+
List<LevelSortedRun> runs = levelSortedRuns();
88+
for (LevelSortedRun run : runs) {
89+
files.addAll(run.run().files());
90+
}
91+
return files;
92+
}
93+
94+
public List<LevelSortedRun> levelSortedRuns() {
95+
List<LevelSortedRun> runs = new ArrayList<>();
96+
level0.forEach(file -> runs.add(new LevelSortedRun(0, SortedRun.fromSingle(file))));
97+
for (int i = 0; i < levels.size(); i++) {
98+
SortedRun run = levels.get(i);
99+
if (run.nonEmpty()) {
100+
runs.add(new LevelSortedRun(i + 1, run));
101+
}
102+
}
103+
return runs;
104+
}
105+
106+
public void update(List<DataFileMeta> before, List<DataFileMeta> after) {
107+
Map<Integer, List<DataFileMeta>> groupedBefore = groupByLevel(before);
108+
Map<Integer, List<DataFileMeta>> groupedAfter = groupByLevel(after);
109+
for (int i = 0; i < numberOfLevels(); i++) {
110+
updateLevel(
111+
i,
112+
groupedBefore.getOrDefault(i, emptyList()),
113+
groupedAfter.getOrDefault(i, emptyList()));
114+
}
115+
}
116+
117+
private void updateLevel(int level, List<DataFileMeta> before, List<DataFileMeta> after) {
118+
if (before.isEmpty() && after.isEmpty()) {
119+
return;
120+
}
121+
122+
if (level == 0) {
123+
before.forEach(level0::remove);
124+
level0.addAll(after);
125+
} else {
126+
List<DataFileMeta> files = new ArrayList<>(runOfLevel(level).files());
127+
files.removeAll(before);
128+
files.addAll(after);
129+
levels.set(level - 1, SortedRun.fromSorted(files));
130+
}
131+
}
132+
133+
private Map<Integer, List<DataFileMeta>> groupByLevel(List<DataFileMeta> files) {
134+
return files.stream()
135+
.collect(Collectors.groupingBy(DataFileMeta::level, Collectors.toList()));
136+
}
137+
}

0 commit comments

Comments
 (0)