Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,8 @@ private static Configuration initConfig() {

conf.set(ConfigOptions.CLIENT_WRITER_BUFFER_MEMORY_SIZE, MemorySize.parse("1mb"));
conf.set(ConfigOptions.CLIENT_WRITER_BATCH_SIZE, MemorySize.parse("1kb"));

conf.set(ConfigOptions.MAX_PARTITION_NUM, 10);
return conf;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import com.alibaba.fluss.exception.SchemaNotExistException;
import com.alibaba.fluss.exception.TableNotExistException;
import com.alibaba.fluss.exception.TableNotPartitionedException;
import com.alibaba.fluss.exception.TooManyPartitionsException;
import com.alibaba.fluss.fs.FsPath;
import com.alibaba.fluss.fs.FsPathAndFileName;
import com.alibaba.fluss.metadata.DataLakeFormat;
Expand Down Expand Up @@ -776,6 +777,38 @@ void testBootstrapServerConfigAsTabletServer() throws Exception {
}
}

@Test
void testAddTooManyPartitions() throws Exception {
String dbName = DEFAULT_TABLE_PATH.getDatabaseName();
TableDescriptor partitionedTable =
TableDescriptor.builder()
.schema(
Schema.newBuilder()
.column("id", DataTypes.STRING())
.column("name", DataTypes.STRING())
.column("age", DataTypes.STRING())
.build())
.distributedBy(3, "id")
.partitionedBy("age")
.build();
TablePath tablePath = TablePath.of(dbName, "test_add_too_many_partitioned_table");
admin.createTable(tablePath, partitionedTable, true).get();

// add 10 partitions.
for (int i = 0; i < 10; i++) {
admin.createPartition(tablePath, newPartitionSpec("age", String.valueOf(i)), false)
.get();
}
// add out of limit partition
assertThatThrownBy(
() ->
admin.createPartition(
tablePath, newPartitionSpec("age", "11"), false)
.get())
.cause()
.isInstanceOf(TooManyPartitionsException.class);
}

private void assertHasTabletServerNumber(int tabletServerNumber) {
CoordinatorGateway coordinatorGateway = FLUSS_CLUSTER_EXTENSION.newCoordinatorClient();
retry(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,14 @@ public class ConfigOptions {
"The interval of auto partition check. "
+ "The default value is 10 minutes.");

public static final ConfigOption<Integer> MAX_PARTITION_NUM =
key("max.partition.num")
.intType()
.defaultValue(1000)
.withDescription(
"Limits the maximum number of partitions that can be created for a partitioned table "
+ "to avoid creating too many partitions.");

// ------------------------------------------------------------------------
// ConfigOptions for Coordinator Server
// ------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright (c) 2025 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.alibaba.fluss.exception;

/** This exception is thrown if the number of table partitions is exceed max.partition.num. */
public class TooManyPartitionsException extends ApiException {

private static final long serialVersionUID = 1L;

public TooManyPartitionsException(String message) {
super(message);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import com.alibaba.fluss.exception.TableNotExistException;
import com.alibaba.fluss.exception.TableNotPartitionedException;
import com.alibaba.fluss.exception.TimeoutException;
import com.alibaba.fluss.exception.TooManyPartitionsException;
import com.alibaba.fluss.exception.UnknownServerException;
import com.alibaba.fluss.exception.UnknownTableOrBucketException;
import com.alibaba.fluss.exception.UnknownWriterIdException;
Expand Down Expand Up @@ -187,7 +188,9 @@ public enum Errors {
LEADER_NOT_AVAILABLE_EXCEPTION(
44,
"There is no currently available leader for the given partition.",
LeaderNotAvailableException::new);
LeaderNotAvailableException::new),
PARTITION_MAX_NUM_EXCEPTION(
45, "Exceed the maximum number of partitions.", TooManyPartitionsException::new);

private static final Logger LOG = LoggerFactory.getLogger(Errors.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.alibaba.fluss.config.Configuration;
import com.alibaba.fluss.exception.PartitionAlreadyExistsException;
import com.alibaba.fluss.exception.PartitionNotExistException;
import com.alibaba.fluss.exception.TooManyPartitionsException;
import com.alibaba.fluss.metadata.ResolvedPartitionSpec;
import com.alibaba.fluss.metadata.TableInfo;
import com.alibaba.fluss.metadata.TablePath;
Expand Down Expand Up @@ -250,6 +251,11 @@ private void createPartitions(
"Auto partitioning skip to create partition {} for table [{}] as the partition is exist.",
partition,
tablePath);
} catch (TooManyPartitionsException t) {
LOG.warn(
"Auto partitioning skip to create partition {} for table [{}], "
+ "because exceed the maximum number of partitions.",
tablePath);
}

currentPartitions.add(partition.getPartitionName());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should move this into the above try catch, otherwise, the state of currentPartitions is not correct.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.alibaba.fluss.server.coordinator;

import com.alibaba.fluss.config.ConfigOptions;
import com.alibaba.fluss.config.Configuration;
import com.alibaba.fluss.exception.DatabaseAlreadyExistException;
import com.alibaba.fluss.exception.DatabaseNotEmptyException;
Expand All @@ -27,6 +28,7 @@
import com.alibaba.fluss.exception.TableAlreadyExistException;
import com.alibaba.fluss.exception.TableNotExistException;
import com.alibaba.fluss.exception.TableNotPartitionedException;
import com.alibaba.fluss.exception.TooManyPartitionsException;
import com.alibaba.fluss.metadata.DatabaseDescriptor;
import com.alibaba.fluss.metadata.DatabaseInfo;
import com.alibaba.fluss.metadata.ResolvedPartitionSpec;
Expand Down Expand Up @@ -65,6 +67,7 @@ public class MetadataManager {

private final ZooKeeperClient zookeeperClient;
private @Nullable final Map<String, String> defaultTableLakeOptions;
private final Configuration conf;

/**
* Creates a new metadata manager.
Expand All @@ -75,6 +78,7 @@ public class MetadataManager {
public MetadataManager(ZooKeeperClient zookeeperClient, Configuration conf) {
this.zookeeperClient = zookeeperClient;
this.defaultTableLakeOptions = LakeStorageUtils.generateDefaultTableLakeOptions(conf);
this.conf = conf;
}

public void createDatabase(
Expand Down Expand Up @@ -343,6 +347,23 @@ public void createPartition(
partition.getPartitionQualifiedName(), tablePath));
}

try {
int partitionNumber = zookeeperClient.getPartitionNumber(tablePath);
int configMaxSize = conf.get(ConfigOptions.MAX_PARTITION_NUM);
if (partitionNumber + 1 > configMaxSize) {
throw new TooManyPartitionsException(
String.format(
"Exceed the maximum number of partitions for table %s", tablePath));
}
} catch (TooManyPartitionsException e) {
throw e;
} catch (Exception e) {
LOG.error(
"Get the number of partition from zookeeper failed for table [{}]",
tablePath,
e);
}

try {
long partitionId = zookeeperClient.getPartitionIdAndIncrement();
// register partition assignments to zk first
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,13 @@ public Optional<TablePartition> getPartition(TablePath tablePath, String partiti
return getOrEmpty(path).map(PartitionZNode::decode);
}

/** Get partition num of a table in ZK. */
public int getPartitionNumber(TablePath tablePath) throws Exception {
String path = PartitionsZNode.path(tablePath);
Stat stat = zkClient.checkExists().forPath(path);
return stat.getNumChildren();
}

/** Create a partition for a table in ZK. */
public void registerPartition(
TablePath tablePath, long tableId, String partitionName, long partitionId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.alibaba.fluss.config.AutoPartitionTimeUnit;
import com.alibaba.fluss.config.ConfigOptions;
import com.alibaba.fluss.config.Configuration;
import com.alibaba.fluss.exception.TooManyPartitionsException;
import com.alibaba.fluss.metadata.Schema;
import com.alibaba.fluss.metadata.TableDescriptor;
import com.alibaba.fluss.metadata.TableInfo;
Expand Down Expand Up @@ -54,6 +55,7 @@

import static com.alibaba.fluss.metadata.ResolvedPartitionSpec.fromPartitionName;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

/** Test for {@link AutoPartitionManager}. */
class AutoPartitionManagerTest {
Expand Down Expand Up @@ -106,6 +108,7 @@ static Stream<Arguments> parameters() {
"2024091007",
"2024091008",
"2024091009")
.manualCreatedTooManyPartitions("2024091010")
.build()),
Arguments.of(
TestParams.builder(AutoPartitionTimeUnit.DAY)
Expand All @@ -130,6 +133,7 @@ static Stream<Arguments> parameters() {
"20240916",
"20240917",
"20240918")
.manualCreatedTooManyPartitions("20240919")
.build()),
Arguments.of(
TestParams.builder(AutoPartitionTimeUnit.MONTH)
Expand All @@ -144,6 +148,7 @@ static Stream<Arguments> parameters() {
.advanceClock2(c -> c.plusMonths(2))
.expectedPartitionsFinal(
"202412", "202501", "202502", "202503", "202504", "202505")
.manualCreatedTooManyPartitions("202506")
.build()),
Arguments.of(
TestParams.builder(AutoPartitionTimeUnit.QUARTER)
Expand All @@ -158,6 +163,7 @@ static Stream<Arguments> parameters() {
.advanceClock2(c -> c.plusMonths(2 * 3))
.expectedPartitionsFinal(
"20252", "20253", "20254", "20261", "20262", "20263")
.manualCreatedTooManyPartitions("20264")
.build()),
Arguments.of(
TestParams.builder(AutoPartitionTimeUnit.YEAR)
Expand All @@ -172,6 +178,7 @@ static Stream<Arguments> parameters() {
.advanceClock2(c -> c.plusYears(2))
.expectedPartitionsFinal(
"2027", "2028", "2029", "2030", "2031", "2032")
.manualCreatedTooManyPartitions("2033")
.build()));
}

Expand Down Expand Up @@ -242,6 +249,26 @@ void testAddPartitionedTable(TestParams params) throws Exception {
periodicExecutor.triggerPeriodicScheduledTasks();
partitions = zookeeperClient.getPartitionNameAndIds(tablePath);
assertThat(partitions.keySet()).containsExactlyInAnyOrder(params.expectedPartitionsFinal);

// create too many partitions
int expectPartitionNumber = params.expectedPartitionsFinal.length;
Configuration config = new Configuration();
config.set(ConfigOptions.MAX_PARTITION_NUM, expectPartitionNumber);
MetadataManager metadataManager = new MetadataManager(zookeeperClient, config);

assertThatThrownBy(
() ->
metadataManager.createPartition(
tablePath,
tableId,
partitionAssignment,
fromPartitionName(
table.getPartitionKeys(),
params.manualCreatedTooManyPartitions),
false))
.isInstanceOf(TooManyPartitionsException.class);
int afterPartitionNumber = zookeeperClient.getPartitionNumber(tablePath);
assertThat(afterPartitionNumber).isEqualTo(expectPartitionNumber);
}

private static class TestParams {
Expand All @@ -254,6 +281,7 @@ private static class TestParams {
final String[] expectedPartitionsAfterAdvance;
final Duration advanceDuration2;
final String[] expectedPartitionsFinal;
final String manualCreatedTooManyPartitions;

private TestParams(
AutoPartitionTimeUnit timeUnit,
Expand All @@ -264,7 +292,8 @@ private TestParams(
Duration advanceDuration,
String[] expectedPartitionsAfterAdvance,
Duration advanceDuration2,
String[] expectedPartitionsFinal) {
String[] expectedPartitionsFinal,
String manualCreatedTooManyPartitions) {
this.timeUnit = timeUnit;
this.startTimeMs = startTimeMs;
this.manualCreatedPartition = manualCreatedPartition;
Expand All @@ -274,6 +303,7 @@ private TestParams(
this.expectedPartitionsAfterAdvance = expectedPartitionsAfterAdvance;
this.advanceDuration2 = advanceDuration2;
this.expectedPartitionsFinal = expectedPartitionsFinal;
this.manualCreatedTooManyPartitions = manualCreatedTooManyPartitions;
}

@Override
Expand All @@ -296,6 +326,7 @@ private static class TestParamsBuilder {
String[] expectedPartitionsAfterAdvance;
long advanceSeconds2;
String[] expectedPartitionsFinal;
String manualCreatedTooManyPartitions;

TestParamsBuilder(AutoPartitionTimeUnit timeUnit) {
this.timeUnit = timeUnit;
Expand Down Expand Up @@ -349,6 +380,12 @@ public TestParamsBuilder expectedPartitionsFinal(String... expectedPartitionsFin
return this;
}

public TestParamsBuilder manualCreatedTooManyPartitions(
String manualCreatedTooManyPartitions) {
this.manualCreatedTooManyPartitions = manualCreatedTooManyPartitions;
return this;
}

public TestParams build() {
return new TestParams(
timeUnit,
Expand All @@ -359,7 +396,8 @@ public TestParams build() {
Duration.ofSeconds(advanceSeconds),
expectedPartitionsAfterAdvance,
Duration.ofSeconds(advanceSeconds2),
expectedPartitionsFinal);
expectedPartitionsFinal,
manualCreatedTooManyPartitions);
}
}

Expand Down