Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -3193,6 +3193,12 @@ public boolean clusteringIncrementalEnabled() {
return options.get(CLUSTERING_INCREMENTAL);
}

public boolean bucketClusterEnabled() {
return !bucketAppendOrdered()
&& !deletionVectorsEnabled()
&& clusteringIncrementalEnabled();
}

public Duration clusteringHistoryPartitionIdleTime() {
return options.get(CLUSTERING_HISTORY_PARTITION_IDLE_TIME);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,8 @@ public BaseAppendFileStoreWrite newWrite(String commitUser, @Nullable Integer wr
newScan(),
options,
dvMaintainerFactory,
tableName);
tableName,
schemaManager);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,219 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.append.cluster;

import org.apache.paimon.AppendOnlyFileStore;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.compact.CompactFutureManager;
import org.apache.paimon.compact.CompactResult;
import org.apache.paimon.compact.CompactTask;
import org.apache.paimon.compact.CompactUnit;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.mergetree.LevelSortedRun;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.utils.Preconditions;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;

/** Cluster manager for {@link AppendOnlyFileStore}. */
public class BucketedAppendClusterManager extends CompactFutureManager {

private static final Logger LOG = LoggerFactory.getLogger(BucketedAppendClusterManager.class);

private final ExecutorService executor;
private final BucketedAppendLevels levels;
private final IncrementalClusterStrategy strategy;
private final CompactRewriter rewriter;

public BucketedAppendClusterManager(
ExecutorService executor,
List<DataFileMeta> restored,
SchemaManager schemaManager,
List<String> clusterKeys,
int maxSizeAmp,
int sizeRatio,
int numRunCompactionTrigger,
int numLevels,
CompactRewriter rewriter) {
this.executor = executor;
this.levels = new BucketedAppendLevels(restored, numLevels);
this.strategy =
new IncrementalClusterStrategy(
schemaManager, clusterKeys, maxSizeAmp, sizeRatio, numRunCompactionTrigger);
this.rewriter = rewriter;
}

@Override
public boolean shouldWaitForLatestCompaction() {
return false;
}

@Override
public boolean shouldWaitForPreparingCheckpoint() {
return false;
}

@Override
public void addNewFile(DataFileMeta file) {
levels.addLevel0File(file);
}

@Override
public List<DataFileMeta> allFiles() {
return levels.allFiles();
}

@Override
public void triggerCompaction(boolean fullCompaction) {
Optional<CompactUnit> optionalUnit;
List<LevelSortedRun> runs = levels.levelSortedRuns();
if (fullCompaction) {
Preconditions.checkState(
taskFuture == null,
"A compaction task is still running while the user "
+ "forces a new compaction. This is unexpected.");
if (LOG.isDebugEnabled()) {
LOG.debug(
"Trigger forced full compaction. Picking from the following runs\n{}",
runs);
}
optionalUnit = strategy.pick(levels.numberOfLevels(), runs, true);
} else {
if (taskFuture != null) {
return;
}
if (LOG.isDebugEnabled()) {
LOG.debug("Trigger normal compaction. Picking from the following runs\n{}", runs);
}
optionalUnit =
strategy.pick(levels.numberOfLevels(), runs, false)
.filter(unit -> !unit.files().isEmpty());
}

optionalUnit.ifPresent(
unit -> {
if (LOG.isDebugEnabled()) {
LOG.debug(
"Submit compaction with files (name, level, size): "
+ levels.levelSortedRuns().stream()
.flatMap(lsr -> lsr.run().files().stream())
.map(
file ->
String.format(
"(%s, %d, %d)",
file.fileName(),
file.level(),
file.fileSize()))
.collect(Collectors.joining(", ")));
}
submitCompaction(unit);
});
}

private void submitCompaction(CompactUnit unit) {

BucketedAppendClusterTask task =
new BucketedAppendClusterTask(unit.files(), unit.outputLevel(), rewriter);

if (LOG.isDebugEnabled()) {
LOG.debug(
"Pick these files (name, level, size) for {} compaction: {}",
task.getClass().getSimpleName(),
unit.files().stream()
.map(
file ->
String.format(
"(%s, %d, %d)",
file.fileName(), file.level(), file.fileSize()))
.collect(Collectors.joining(", ")));
}
taskFuture = executor.submit(task);
}

@Override
public Optional<CompactResult> getCompactionResult(boolean blocking)
throws ExecutionException, InterruptedException {
Optional<CompactResult> result = innerGetCompactionResult(blocking);
result.ifPresent(
r -> {
if (LOG.isDebugEnabled()) {
LOG.debug(
"Update levels in compact manager with these changes:\nBefore:\n{}\nAfter:\n{}",
r.before(),
r.after());
}
levels.update(r.before(), r.after());
if (LOG.isDebugEnabled()) {
LOG.debug(
"Levels in compact manager updated. Current runs are\n{}",
levels.levelSortedRuns());
}
});
return result;
}

@Override
public void close() throws IOException {}

@VisibleForTesting
public BucketedAppendLevels levels() {
return levels;
}

/** A {@link CompactTask} impl for clustering of append bucketed table. */
public static class BucketedAppendClusterTask extends CompactTask {

private final List<DataFileMeta> toCluster;
private final int outputLevel;
private final CompactRewriter rewriter;

public BucketedAppendClusterTask(
List<DataFileMeta> toCluster, int outputLevel, CompactRewriter rewriter) {
super(null);
this.toCluster = toCluster;
this.outputLevel = outputLevel;
this.rewriter = rewriter;
}

@Override
protected CompactResult doCompact() throws Exception {
List<DataFileMeta> rewrite = rewriter.rewrite(toCluster);
return new CompactResult(toCluster, upgrade(rewrite));
}

protected List<DataFileMeta> upgrade(List<DataFileMeta> files) {
return files.stream()
.map(file -> file.upgrade(outputLevel))
.collect(Collectors.toList());
}
}

/** Compact rewriter for append-only table. */
public interface CompactRewriter {
List<DataFileMeta> rewrite(List<DataFileMeta> compactBefore) throws Exception;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.append.cluster;

import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.mergetree.LevelSortedRun;
import org.apache.paimon.mergetree.SortedRun;
import org.apache.paimon.utils.Preconditions;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import static java.util.Collections.emptyList;
import static org.apache.paimon.utils.Preconditions.checkArgument;

/** A class which stores all level files in append bucketed table. */
public class BucketedAppendLevels {

private final HashSet<DataFileMeta> level0;

private final List<SortedRun> levels;

public BucketedAppendLevels(List<DataFileMeta> inputFiles, int numLevels) {
int restoredNumLevels =
Math.max(
numLevels,
inputFiles.stream().mapToInt(DataFileMeta::level).max().orElse(-1) + 1);
checkArgument(restoredNumLevels > 1, "Number of levels must be at least 2.");
this.level0 = new HashSet<>();
this.levels = new ArrayList<>();
for (int i = 1; i < restoredNumLevels; i++) {
levels.add(SortedRun.empty());
}

Map<Integer, List<DataFileMeta>> levelMap = new HashMap<>();
for (DataFileMeta file : inputFiles) {
levelMap.computeIfAbsent(file.level(), level -> new ArrayList<>()).add(file);
}
levelMap.forEach((level, files) -> updateLevel(level, emptyList(), files));

Preconditions.checkState(
level0.size() + levels.stream().mapToInt(r -> r.files().size()).sum()
== inputFiles.size(),
"Number of files stored in Levels does not equal to the size of inputFiles. This is unexpected.");
}

public void addLevel0File(DataFileMeta file) {
checkArgument(file.level() == 0);
level0.add(file);
}

public SortedRun runOfLevel(int level) {
checkArgument(level > 0, "Level0 does not have one single sorted run.");
return levels.get(level - 1);
}

public int numberOfLevels() {
return levels.size() + 1;
}

public int maxLevel() {
return levels.size();
}

public List<DataFileMeta> allFiles() {
List<DataFileMeta> files = new ArrayList<>();
List<LevelSortedRun> runs = levelSortedRuns();
for (LevelSortedRun run : runs) {
files.addAll(run.run().files());
}
return files;
}

public List<LevelSortedRun> levelSortedRuns() {
List<LevelSortedRun> runs = new ArrayList<>();
level0.forEach(file -> runs.add(new LevelSortedRun(0, SortedRun.fromSingle(file))));
for (int i = 0; i < levels.size(); i++) {
SortedRun run = levels.get(i);
if (run.nonEmpty()) {
runs.add(new LevelSortedRun(i + 1, run));
}
}
return runs;
}

public void update(List<DataFileMeta> before, List<DataFileMeta> after) {
Map<Integer, List<DataFileMeta>> groupedBefore = groupByLevel(before);
Map<Integer, List<DataFileMeta>> groupedAfter = groupByLevel(after);
for (int i = 0; i < numberOfLevels(); i++) {
updateLevel(
i,
groupedBefore.getOrDefault(i, emptyList()),
groupedAfter.getOrDefault(i, emptyList()));
}
}

private void updateLevel(int level, List<DataFileMeta> before, List<DataFileMeta> after) {
if (before.isEmpty() && after.isEmpty()) {
return;
}

if (level == 0) {
before.forEach(level0::remove);
level0.addAll(after);
} else {
List<DataFileMeta> files = new ArrayList<>(runOfLevel(level).files());
files.removeAll(before);
files.addAll(after);
levels.set(level - 1, SortedRun.fromSorted(files));
}
}

private Map<Integer, List<DataFileMeta>> groupByLevel(List<DataFileMeta> files) {
return files.stream()
.collect(Collectors.groupingBy(DataFileMeta::level, Collectors.toList()));
}
}
Loading