Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,8 @@ private static Configuration initConfig() {

conf.set(ConfigOptions.CLIENT_WRITER_BUFFER_MEMORY_SIZE, MemorySize.parse("1mb"));
conf.set(ConfigOptions.CLIENT_WRITER_BATCH_SIZE, MemorySize.parse("1kb"));

conf.set(ConfigOptions.MAX_PARTITION_NUM, 10);
return conf;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import com.alibaba.fluss.exception.SchemaNotExistException;
import com.alibaba.fluss.exception.TableNotExistException;
import com.alibaba.fluss.exception.TableNotPartitionedException;
import com.alibaba.fluss.exception.TooManyPartitionsException;
import com.alibaba.fluss.fs.FsPath;
import com.alibaba.fluss.fs.FsPathAndFileName;
import com.alibaba.fluss.metadata.DataLakeFormat;
Expand Down Expand Up @@ -776,6 +777,38 @@ void testBootstrapServerConfigAsTabletServer() throws Exception {
}
}

@Test
void testAddTooManyPartitions() throws Exception {
String dbName = DEFAULT_TABLE_PATH.getDatabaseName();
TableDescriptor partitionedTable =
TableDescriptor.builder()
.schema(
Schema.newBuilder()
.column("id", DataTypes.STRING())
.column("name", DataTypes.STRING())
.column("age", DataTypes.STRING())
.build())
.distributedBy(3, "id")
.partitionedBy("age")
.build();
TablePath tablePath = TablePath.of(dbName, "test_add_too_many_partitioned_table");
admin.createTable(tablePath, partitionedTable, true).get();

// add 10 partitions.
for (int i = 0; i < 10; i++) {
admin.createPartition(tablePath, newPartitionSpec("age", String.valueOf(i)), false)
.get();
}
// add out of limit partition
assertThatThrownBy(
() ->
admin.createPartition(
tablePath, newPartitionSpec("age", "11"), false)
.get())
.cause()
.isInstanceOf(TooManyPartitionsException.class);
}

private void assertHasTabletServerNumber(int tabletServerNumber) {
CoordinatorGateway coordinatorGateway = FLUSS_CLUSTER_EXTENSION.newCoordinatorClient();
retry(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,14 @@ public class ConfigOptions {
"The interval of auto partition check. "
+ "The default value is 10 minutes.");

public static final ConfigOption<Integer> MAX_PARTITION_NUM =
key("max.partition.num")
.intType()
.defaultValue(1000)
.withDescription(
"Limits the maximum number of partitions that can be created for a partitioned table "
+ "to avoid creating too many partitions.");

// ------------------------------------------------------------------------
// ConfigOptions for Coordinator Server
// ------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright (c) 2025 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.alibaba.fluss.exception;

import com.alibaba.fluss.annotation.PublicEvolving;

/**
* This exception is thrown if the number of table partitions is exceed max.partition.num.
*
* @since 0.7
*/
@PublicEvolving
public class TooManyPartitionsException extends ApiException {

private static final long serialVersionUID = 1L;

public TooManyPartitionsException(String message) {
super(message);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import com.alibaba.fluss.exception.TableNotExistException;
import com.alibaba.fluss.exception.TableNotPartitionedException;
import com.alibaba.fluss.exception.TimeoutException;
import com.alibaba.fluss.exception.TooManyPartitionsException;
import com.alibaba.fluss.exception.UnknownServerException;
import com.alibaba.fluss.exception.UnknownTableOrBucketException;
import com.alibaba.fluss.exception.UnknownWriterIdException;
Expand Down Expand Up @@ -187,7 +188,9 @@ public enum Errors {
LEADER_NOT_AVAILABLE_EXCEPTION(
44,
"There is no currently available leader for the given partition.",
LeaderNotAvailableException::new);
LeaderNotAvailableException::new),
PARTITION_MAX_NUM_EXCEPTION(
45, "Exceed the maximum number of partitions.", TooManyPartitionsException::new);

private static final Logger LOG = LoggerFactory.getLogger(Errors.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.alibaba.fluss.config.Configuration;
import com.alibaba.fluss.exception.PartitionAlreadyExistsException;
import com.alibaba.fluss.exception.PartitionNotExistException;
import com.alibaba.fluss.exception.TooManyPartitionsException;
import com.alibaba.fluss.metadata.ResolvedPartitionSpec;
import com.alibaba.fluss.metadata.TableInfo;
import com.alibaba.fluss.metadata.TablePath;
Expand Down Expand Up @@ -245,16 +246,29 @@ private void createPartitions(
try {
metadataManager.createPartition(
tablePath, tableId, partitionAssignment, partition, false);
currentPartitions.add(partition.getPartitionName());
LOG.info(
"Auto partitioning created partition {} for table [{}].",
partition,
tablePath);
} catch (PartitionAlreadyExistsException e) {
LOG.info(
"Auto partitioning skip to create partition {} for table [{}] as the partition is exist.",
partition,
tablePath);
} catch (TooManyPartitionsException t) {
LOG.warn(
"Auto partitioning skip to create partition {} for table [{}], "
+ "because exceed the maximum number of partitions.",
partition,
tablePath);
} catch (Exception e) {
LOG.error(
"Auto partitioning failed to create partition {} for table [{}].",
partition,
tablePath,
e);
}

currentPartitions.add(partition.getPartitionName());
LOG.info(
"Auto partitioning created partition {} for table [{}].", partition, tablePath);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.alibaba.fluss.server.coordinator;

import com.alibaba.fluss.config.ConfigOptions;
import com.alibaba.fluss.config.Configuration;
import com.alibaba.fluss.exception.DatabaseAlreadyExistException;
import com.alibaba.fluss.exception.DatabaseNotEmptyException;
Expand All @@ -27,6 +28,7 @@
import com.alibaba.fluss.exception.TableAlreadyExistException;
import com.alibaba.fluss.exception.TableNotExistException;
import com.alibaba.fluss.exception.TableNotPartitionedException;
import com.alibaba.fluss.exception.TooManyPartitionsException;
import com.alibaba.fluss.metadata.DatabaseDescriptor;
import com.alibaba.fluss.metadata.DatabaseInfo;
import com.alibaba.fluss.metadata.ResolvedPartitionSpec;
Expand Down Expand Up @@ -65,6 +67,7 @@ public class MetadataManager {

private final ZooKeeperClient zookeeperClient;
private @Nullable final Map<String, String> defaultTableLakeOptions;
private final int maxPartitionNum;

/**
* Creates a new metadata manager.
Expand All @@ -75,6 +78,7 @@ public class MetadataManager {
public MetadataManager(ZooKeeperClient zookeeperClient, Configuration conf) {
this.zookeeperClient = zookeeperClient;
this.defaultTableLakeOptions = LakeStorageUtils.generateDefaultTableLakeOptions(conf);
maxPartitionNum = conf.get(ConfigOptions.MAX_PARTITION_NUM);
}

public void createDatabase(
Expand Down Expand Up @@ -343,6 +347,24 @@ public void createPartition(
partition.getPartitionQualifiedName(), tablePath));
}

try {
int partitionNumber = zookeeperClient.getPartitionNumber(tablePath);
if (partitionNumber + 1 > maxPartitionNum) {
throw new TooManyPartitionsException(
String.format(
"Exceed the maximum number of partitions for table %s, only allow %s partitions.",
tablePath, maxPartitionNum));
}
} catch (TooManyPartitionsException e) {
throw e;
} catch (Exception e) {
throw new FlussRuntimeException(
String.format(
"Get the number of partition from zookeeper failed for table %s",
tablePath),
e);
}

try {
long partitionId = zookeeperClient.getPartitionIdAndIncrement();
// register partition assignments to zk first
Expand All @@ -352,10 +374,10 @@ public void createPartition(
LOG.info(
"Register partition {} to zookeeper for table [{}].", partitionName, tablePath);
} catch (Exception e) {
LOG.error(
"Register partition to zookeeper failed to create partition {} for table [{}]",
partitionName,
tablePath,
throw new FlussRuntimeException(
String.format(
"Register partition to zookeeper failed to create partition %s for table [%s]",
partitionName, tablePath),
e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,16 @@ public Optional<TablePartition> getPartition(TablePath tablePath, String partiti
return getOrEmpty(path).map(PartitionZNode::decode);
}

/** Get partition num of a table in ZK. */
public int getPartitionNumber(TablePath tablePath) throws Exception {
String path = PartitionsZNode.path(tablePath);
Stat stat = zkClient.checkExists().forPath(path);
if (stat == null) {
return 0;
}
return stat.getNumChildren();
}

/** Create a partition for a table in ZK. */
public void registerPartition(
TablePath tablePath, long tableId, String partitionName, long partitionId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
Expand Down Expand Up @@ -191,7 +192,7 @@ void testAddPartitionedTable(TestParams params) throws Exception {
periodicExecutor);
autoPartitionManager.start();

TableInfo table = createPartitionedTable(params.timeUnit);
TableInfo table = createPartitionedTable(2, 4, params.timeUnit);
TablePath tablePath = table.getTablePath();
autoPartitionManager.addAutoPartitionTable(table);
// the first auto-partition task is a non-periodic task
Expand Down Expand Up @@ -244,6 +245,80 @@ void testAddPartitionedTable(TestParams params) throws Exception {
assertThat(partitions.keySet()).containsExactlyInAnyOrder(params.expectedPartitionsFinal);
}

@Test
void testMaxPartitions() throws Exception {
int expectPartitionNumber = 10;
Configuration config = new Configuration();
config.set(ConfigOptions.MAX_PARTITION_NUM, expectPartitionNumber);
MetadataManager metadataManager = new MetadataManager(zookeeperClient, config);

ZonedDateTime startTime =
LocalDateTime.parse("2024-09-10T00:00:00").atZone(ZoneId.systemDefault());
long startMs = startTime.toInstant().toEpochMilli();
ManualClock clock = new ManualClock(startMs);
ManuallyTriggeredScheduledExecutorService periodicExecutor =
new ManuallyTriggeredScheduledExecutorService();

AutoPartitionManager autoPartitionManager =
new AutoPartitionManager(
new TestingMetadataCache(3),
metadataManager,
new Configuration(),
clock,
periodicExecutor);
autoPartitionManager.start();

// create a partitioned with -1 retention to never auto-drop partitions
TableInfo table = createPartitionedTable(-1, 4, AutoPartitionTimeUnit.DAY);
TablePath tablePath = table.getTablePath();
autoPartitionManager.addAutoPartitionTable(table);
// the first auto-partition task is a non-periodic task
periodicExecutor.triggerPeriodicScheduledTasks();

Map<String, Long> partitions = zookeeperClient.getPartitionNameAndIds(tablePath);
// pre-create 4 partitions including current partition
assertThat(partitions.keySet())
.containsExactlyInAnyOrder("20240910", "20240911", "20240912", "20240913");

// manually create 4 future partitions.
int replicaFactor = table.getTableConfig().getReplicationFactor();
Map<Integer, BucketAssignment> bucketAssignments =
TableAssignmentUtils.generateAssignment(
table.getNumBuckets(), replicaFactor, new int[] {0, 1, 2})
.getBucketAssignments();
long tableId = table.getTableId();
PartitionAssignment partitionAssignment =
new PartitionAssignment(tableId, bucketAssignments);
for (int i = 20250101; i <= 20250104; i++) {
metadataManager.createPartition(
tablePath,
tableId,
partitionAssignment,
fromPartitionName(table.getPartitionKeys(), i + ""),
false);
// mock the partition is created in zk.
autoPartitionManager.addPartition(tableId, i + "");
}

clock.advanceTime(Duration.ofDays(4));
periodicExecutor.triggerPeriodicScheduledTasks();
partitions = zookeeperClient.getPartitionNameAndIds(tablePath);
assertThat(partitions.keySet())
.containsExactlyInAnyOrder(
"20240910",
"20240911",
"20240912",
"20240913",
// only 20240914, 20240915 are created in this round
"20240914",
"20240915",
// 20250101 ~ 20250102 are retained
"20250101",
"20250102",
"20250103",
"20250104");
}

private static class TestParams {
final AutoPartitionTimeUnit timeUnit;
final long startTimeMs;
Expand Down Expand Up @@ -365,7 +440,9 @@ public TestParams build() {

// -------------------------------------------------------------------------------------------

private TableInfo createPartitionedTable(AutoPartitionTimeUnit timeUnit) throws Exception {
private TableInfo createPartitionedTable(
int partitionRetentionNum, int partitionPreCreateNum, AutoPartitionTimeUnit timeUnit)
throws Exception {
long tableId = 1;
TablePath tablePath = TablePath.of("db", "test_partition_" + UUID.randomUUID());
TableDescriptor descriptor =
Expand All @@ -384,8 +461,12 @@ private TableInfo createPartitionedTable(AutoPartitionTimeUnit timeUnit) throws
.property(ConfigOptions.TABLE_REPLICATION_FACTOR, 3)
.property(ConfigOptions.TABLE_AUTO_PARTITION_ENABLED, true)
.property(ConfigOptions.TABLE_AUTO_PARTITION_TIME_UNIT, timeUnit)
.property(ConfigOptions.TABLE_AUTO_PARTITION_NUM_RETENTION, 2)
.property(ConfigOptions.TABLE_AUTO_PARTITION_NUM_PRECREATE, 4)
.property(
ConfigOptions.TABLE_AUTO_PARTITION_NUM_RETENTION,
partitionRetentionNum)
.property(
ConfigOptions.TABLE_AUTO_PARTITION_NUM_PRECREATE,
partitionPreCreateNum)
.build();
long currentMillis = System.currentTimeMillis();
TableInfo tableInfo =
Expand Down
1 change: 1 addition & 0 deletions website/docs/maintenance/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ during the Fluss cluster working.
| remote.fs.write-buffer-size | MemorySize | 4kb | The default size of the write buffer for writing the local files to remote file systems. |
| plugin.classloader.parent-first-patterns.default | String | java.,<br/>com.alibaba.fluss.,<br/>javax.annotation.,<br/>org.slf4j,<br/>org.apache.log4j,<br/>org.apache.logging,<br/>org.apache.commons.logging,<br/>ch.qos.logback | A (semicolon-separated) list of patterns that specifies which classes should always be resolved through the plugin parent ClassLoader first. A pattern is a simple prefix that is checked against the fully qualified class name. This setting should generally not be modified. |
| auto-partition.check.interval | Duration | 10min | The interval of auto partition check. The default value is 10 minutes. |
| max.partition.num | Integer | 1000 | Limits the maximum number of partitions that can be created for a partitioned table to avoid creating too many partitions. |

## CoordinatorServer

Expand Down