Skip to content

Commit 5972d3b

Browse files
committed
fix and add tests
1 parent 86482e0 commit 5972d3b

File tree

7 files changed

+152
-25
lines changed

7 files changed

+152
-25
lines changed

paimon-api/src/main/java/org/apache/paimon/CoreOptions.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3193,6 +3193,12 @@ public boolean clusteringIncrementalEnabled() {
31933193
return options.get(CLUSTERING_INCREMENTAL);
31943194
}
31953195

3196+
public boolean bucketClusterEnabled() {
3197+
return !bucketAppendOrdered()
3198+
&& !deletionVectorsEnabled()
3199+
&& clusteringIncrementalEnabled();
3200+
}
3201+
31963202
public Duration clusteringHistoryPartitionIdleTime() {
31973203
return options.get(CLUSTERING_HISTORY_PARTITION_IDLE_TIME);
31983204
}

paimon-core/src/main/java/org/apache/paimon/append/cluster/BucketedAppendClusterManager.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -183,12 +183,10 @@ public static class BucketedAppendClusterTask extends CompactTask {
183183

184184
private final List<DataFileMeta> toCluster;
185185
private final int outputLevel;
186-
private final BucketedAppendClusterManager.CompactRewriter rewriter;
186+
private final CompactRewriter rewriter;
187187

188188
public BucketedAppendClusterTask(
189-
List<DataFileMeta> toCluster,
190-
int outputLevel,
191-
BucketedAppendClusterManager.CompactRewriter rewriter) {
189+
List<DataFileMeta> toCluster, int outputLevel, CompactRewriter rewriter) {
192190
super(null);
193191
this.toCluster = toCluster;
194192
this.outputLevel = outputLevel;

paimon-core/src/main/java/org/apache/paimon/operation/BucketedAppendFileStoreWrite.java

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,6 @@
4040
import java.util.concurrent.ExecutorService;
4141
import java.util.function.Function;
4242

43-
import static org.apache.paimon.utils.Preconditions.checkArgument;
44-
4543
/** {@link BaseAppendFileStoreWrite} for {@link org.apache.paimon.table.BucketMode#HASH_FIXED}. */
4644
public class BucketedAppendFileStoreWrite extends BaseAppendFileStoreWrite {
4745

@@ -101,12 +99,7 @@ protected CompactManager getCompactManager(
10199
@Nullable BucketedDvMaintainer dvMaintainer) {
102100
if (options.writeOnly()) {
103101
return new NoopCompactManager();
104-
} else if (!options.bucketAppendOrdered()
105-
&& !options.deletionVectorsEnabled()
106-
&& options.clusteringIncrementalEnabled()) {
107-
checkArgument(
108-
ioManager != null,
109-
"BucketedAppendClusterManager must be used with a valid IOManager.");
102+
} else if (options.bucketClusterEnabled()) {
110103
return new BucketedAppendClusterManager(
111104
compactExecutor,
112105
restoredFiles,
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.append.cluster;
20+
21+
/** Test for {@link BucketedAppendClusterManager}. */
22+
public class BucketedAppendClusterManagerTest {}
Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.append.cluster;
20+
21+
import org.apache.paimon.catalog.Catalog;
22+
import org.apache.paimon.catalog.FileSystemCatalog;
23+
import org.apache.paimon.catalog.Identifier;
24+
import org.apache.paimon.compact.CompactResult;
25+
import org.apache.paimon.data.BinaryRow;
26+
import org.apache.paimon.data.GenericRow;
27+
import org.apache.paimon.data.InternalRow;
28+
import org.apache.paimon.disk.IOManager;
29+
import org.apache.paimon.fs.Path;
30+
import org.apache.paimon.fs.local.LocalFileIO;
31+
import org.apache.paimon.io.DataFileMeta;
32+
import org.apache.paimon.operation.BaseAppendFileStoreWrite;
33+
import org.apache.paimon.reader.RecordReaderIterator;
34+
import org.apache.paimon.schema.Schema;
35+
import org.apache.paimon.table.AppendOnlyFileStoreTable;
36+
import org.apache.paimon.table.FileStoreTable;
37+
import org.apache.paimon.table.sink.StreamTableCommit;
38+
import org.apache.paimon.types.DataTypes;
39+
40+
import org.junit.jupiter.api.Test;
41+
import org.junit.jupiter.api.io.TempDir;
42+
43+
import java.util.ArrayList;
44+
import java.util.List;
45+
46+
import static org.assertj.core.api.Assertions.assertThat;
47+
48+
/** Test for {@link BucketedAppendClusterManager.BucketedAppendClusterTask}. */
49+
public class BucketedAppendClusterTaskTest {
50+
51+
@TempDir java.nio.file.Path tempDir;
52+
@TempDir java.nio.file.Path ioManagerTempDir;
53+
54+
@Test
55+
public void testTask() throws Exception {
56+
FileStoreTable table = createFileStoreTable();
57+
58+
BaseAppendFileStoreWrite write =
59+
(BaseAppendFileStoreWrite)
60+
table.store()
61+
.newWrite("ss")
62+
.withIOManager(IOManager.create(ioManagerTempDir.toString()));
63+
StreamTableCommit commit = table.newStreamWriteBuilder().newCommit();
64+
65+
for (int i = 0; i < 3; i++) {
66+
for (int j = 0; j < 3; j++) {
67+
write.write(BinaryRow.EMPTY_ROW, 0, GenericRow.of(0, i, j));
68+
commit.commit(i, write.prepareCommit(false, i));
69+
}
70+
}
71+
72+
List<DataFileMeta> toCluster =
73+
table.newSnapshotReader().read().dataSplits().get(0).dataFiles();
74+
75+
BucketedAppendClusterManager.BucketedAppendClusterTask task =
76+
new BucketedAppendClusterManager.BucketedAppendClusterTask(
77+
toCluster, 5, files -> write.clusterRewrite(BinaryRow.EMPTY_ROW, 0, files));
78+
79+
CompactResult result = task.doCompact();
80+
assertThat(result.before().size()).isEqualTo(9);
81+
assertThat(result.after().size()).isEqualTo(1);
82+
List<String> rows = new ArrayList<>();
83+
try (RecordReaderIterator<InternalRow> clusterRows =
84+
new RecordReaderIterator<>(
85+
((AppendOnlyFileStoreTable) table)
86+
.store()
87+
.newRead()
88+
.createReader(BinaryRow.EMPTY_ROW, 0, result.after(), null))) {
89+
while (clusterRows.hasNext()) {
90+
InternalRow row = clusterRows.next();
91+
rows.add(String.format("%d,%d", row.getInt(1), row.getInt(2)));
92+
}
93+
}
94+
95+
assertThat(rows)
96+
.containsExactly("0,0", "0,1", "1,0", "1,1", "0,2", "1,2", "2,0", "2,1", "2,2");
97+
}
98+
99+
private FileStoreTable createFileStoreTable() throws Exception {
100+
Catalog catalog = new FileSystemCatalog(LocalFileIO.create(), new Path(tempDir.toString()));
101+
Schema schema =
102+
Schema.newBuilder()
103+
.column("f0", DataTypes.INT())
104+
.column("f1", DataTypes.INT())
105+
.column("f2", DataTypes.INT())
106+
.option("bucket", "1")
107+
.option("bucket-key", "f0")
108+
.option("clustering.columns", "f1,f2")
109+
.option("clustering.strategy", "zorder")
110+
.build();
111+
Identifier identifier = Identifier.create("default", "test");
112+
catalog.createDatabase("default", false);
113+
catalog.createTable(identifier, schema, false);
114+
return (FileStoreTable) catalog.getTable(identifier);
115+
}
116+
}

paimon-core/src/test/java/org/apache/paimon/append/cluster/IncrementalClusterManagerTest.java

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -57,18 +57,6 @@ public class IncrementalClusterManagerTest {
5757

5858
@TempDir java.nio.file.Path tempDir;
5959

60-
@Test
61-
public void testNonUnAwareBucketTable() {
62-
Map<String, String> options = new HashMap<>();
63-
options.put(CoreOptions.BUCKET.key(), "1");
64-
options.put(CoreOptions.BUCKET_KEY.key(), "f0");
65-
66-
assertThatThrownBy(() -> createTable(options, Collections.emptyList()))
67-
.isInstanceOf(IllegalArgumentException.class)
68-
.hasMessageContaining(
69-
"Cannot define bucket for incremental clustering table, it only support bucket = -1");
70-
}
71-
7260
@Test
7361
public void testNonClusterIncremental() throws Exception {
7462
Map<String, String> options = new HashMap<>();

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,11 @@ protected void buildForBucketedTableCompact(
164164
StreamExecutionEnvironment env, FileStoreTable table, boolean isStreaming)
165165
throws Exception {
166166
if (fullCompaction == null) {
167-
fullCompaction = !isStreaming;
167+
if (table.coreOptions().bucketClusterEnabled()) {
168+
fullCompaction = false;
169+
} else {
170+
fullCompaction = !isStreaming;
171+
}
168172
} else {
169173
checkArgument(
170174
!(fullCompaction && isStreaming),

0 commit comments

Comments
 (0)