diff --git a/fluss-client/src/test/java/com/alibaba/fluss/client/admin/ClientToServerITCaseBase.java b/fluss-client/src/test/java/com/alibaba/fluss/client/admin/ClientToServerITCaseBase.java index b9b9e4472f..25fc9bfdee 100644 --- a/fluss-client/src/test/java/com/alibaba/fluss/client/admin/ClientToServerITCaseBase.java +++ b/fluss-client/src/test/java/com/alibaba/fluss/client/admin/ClientToServerITCaseBase.java @@ -115,6 +115,8 @@ private static Configuration initConfig() { conf.set(ConfigOptions.CLIENT_WRITER_BUFFER_MEMORY_SIZE, MemorySize.parse("1mb")); conf.set(ConfigOptions.CLIENT_WRITER_BATCH_SIZE, MemorySize.parse("1kb")); + + conf.set(ConfigOptions.MAX_PARTITION_NUM, 10); return conf; } diff --git a/fluss-client/src/test/java/com/alibaba/fluss/client/admin/FlussAdminITCase.java b/fluss-client/src/test/java/com/alibaba/fluss/client/admin/FlussAdminITCase.java index efeabb780e..9535891d0c 100644 --- a/fluss-client/src/test/java/com/alibaba/fluss/client/admin/FlussAdminITCase.java +++ b/fluss-client/src/test/java/com/alibaba/fluss/client/admin/FlussAdminITCase.java @@ -39,6 +39,7 @@ import com.alibaba.fluss.exception.SchemaNotExistException; import com.alibaba.fluss.exception.TableNotExistException; import com.alibaba.fluss.exception.TableNotPartitionedException; +import com.alibaba.fluss.exception.TooManyPartitionsException; import com.alibaba.fluss.fs.FsPath; import com.alibaba.fluss.fs.FsPathAndFileName; import com.alibaba.fluss.metadata.DataLakeFormat; @@ -776,6 +777,38 @@ void testBootstrapServerConfigAsTabletServer() throws Exception { } } + @Test + void testAddTooManyPartitions() throws Exception { + String dbName = DEFAULT_TABLE_PATH.getDatabaseName(); + TableDescriptor partitionedTable = + TableDescriptor.builder() + .schema( + Schema.newBuilder() + .column("id", DataTypes.STRING()) + .column("name", DataTypes.STRING()) + .column("age", DataTypes.STRING()) + .build()) + .distributedBy(3, "id") + .partitionedBy("age") + .build(); + TablePath tablePath = TablePath.of(dbName, "test_add_too_many_partitioned_table"); + admin.createTable(tablePath, partitionedTable, true).get(); + + // add 10 partitions. + for (int i = 0; i < 10; i++) { + admin.createPartition(tablePath, newPartitionSpec("age", String.valueOf(i)), false) + .get(); + } + // add out of limit partition + assertThatThrownBy( + () -> + admin.createPartition( + tablePath, newPartitionSpec("age", "11"), false) + .get()) + .cause() + .isInstanceOf(TooManyPartitionsException.class); + } + private void assertHasTabletServerNumber(int tabletServerNumber) { CoordinatorGateway coordinatorGateway = FLUSS_CLUSTER_EXTENSION.newCoordinatorClient(); retry( diff --git a/fluss-common/src/main/java/com/alibaba/fluss/config/ConfigOptions.java b/fluss-common/src/main/java/com/alibaba/fluss/config/ConfigOptions.java index 4d19b8e6cd..394993731c 100644 --- a/fluss-common/src/main/java/com/alibaba/fluss/config/ConfigOptions.java +++ b/fluss-common/src/main/java/com/alibaba/fluss/config/ConfigOptions.java @@ -119,6 +119,14 @@ public class ConfigOptions { "The interval of auto partition check. " + "The default value is 10 minutes."); + public static final ConfigOption MAX_PARTITION_NUM = + key("max.partition.num") + .intType() + .defaultValue(1000) + .withDescription( + "Limits the maximum number of partitions that can be created for a partitioned table " + + "to avoid creating too many partitions."); + // ------------------------------------------------------------------------ // ConfigOptions for Coordinator Server // ------------------------------------------------------------------------ diff --git a/fluss-common/src/main/java/com/alibaba/fluss/exception/TooManyPartitionsException.java b/fluss-common/src/main/java/com/alibaba/fluss/exception/TooManyPartitionsException.java new file mode 100644 index 0000000000..a8604a7baf --- /dev/null +++ b/fluss-common/src/main/java/com/alibaba/fluss/exception/TooManyPartitionsException.java @@ -0,0 +1,34 @@ +/* + * Copyright (c) 2025 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.exception; + +import com.alibaba.fluss.annotation.PublicEvolving; + +/** + * This exception is thrown if the number of table partitions is exceed max.partition.num. + * + * @since 0.7 + */ +@PublicEvolving +public class TooManyPartitionsException extends ApiException { + + private static final long serialVersionUID = 1L; + + public TooManyPartitionsException(String message) { + super(message); + } +} diff --git a/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/protocol/Errors.java b/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/protocol/Errors.java index 5f3e76f802..2e4cdf4edb 100644 --- a/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/protocol/Errors.java +++ b/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/protocol/Errors.java @@ -58,6 +58,7 @@ import com.alibaba.fluss.exception.TableNotExistException; import com.alibaba.fluss.exception.TableNotPartitionedException; import com.alibaba.fluss.exception.TimeoutException; +import com.alibaba.fluss.exception.TooManyPartitionsException; import com.alibaba.fluss.exception.UnknownServerException; import com.alibaba.fluss.exception.UnknownTableOrBucketException; import com.alibaba.fluss.exception.UnknownWriterIdException; @@ -187,7 +188,9 @@ public enum Errors { LEADER_NOT_AVAILABLE_EXCEPTION( 44, "There is no currently available leader for the given partition.", - LeaderNotAvailableException::new); + LeaderNotAvailableException::new), + PARTITION_MAX_NUM_EXCEPTION( + 45, "Exceed the maximum number of partitions.", TooManyPartitionsException::new); private static final Logger LOG = LoggerFactory.getLogger(Errors.class); diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/AutoPartitionManager.java b/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/AutoPartitionManager.java index 733f96498a..568c5180c5 100644 --- a/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/AutoPartitionManager.java +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/AutoPartitionManager.java @@ -23,6 +23,7 @@ import com.alibaba.fluss.config.Configuration; import com.alibaba.fluss.exception.PartitionAlreadyExistsException; import com.alibaba.fluss.exception.PartitionNotExistException; +import com.alibaba.fluss.exception.TooManyPartitionsException; import com.alibaba.fluss.metadata.ResolvedPartitionSpec; import com.alibaba.fluss.metadata.TableInfo; import com.alibaba.fluss.metadata.TablePath; @@ -245,16 +246,29 @@ private void createPartitions( try { metadataManager.createPartition( tablePath, tableId, partitionAssignment, partition, false); + currentPartitions.add(partition.getPartitionName()); + LOG.info( + "Auto partitioning created partition {} for table [{}].", + partition, + tablePath); } catch (PartitionAlreadyExistsException e) { LOG.info( "Auto partitioning skip to create partition {} for table [{}] as the partition is exist.", partition, tablePath); + } catch (TooManyPartitionsException t) { + LOG.warn( + "Auto partitioning skip to create partition {} for table [{}], " + + "because exceed the maximum number of partitions.", + partition, + tablePath); + } catch (Exception e) { + LOG.error( + "Auto partitioning failed to create partition {} for table [{}].", + partition, + tablePath, + e); } - - currentPartitions.add(partition.getPartitionName()); - LOG.info( - "Auto partitioning created partition {} for table [{}].", partition, tablePath); } } diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/MetadataManager.java b/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/MetadataManager.java index 257f5ebc3f..0d1a2beb6b 100644 --- a/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/MetadataManager.java +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/MetadataManager.java @@ -16,6 +16,7 @@ package com.alibaba.fluss.server.coordinator; +import com.alibaba.fluss.config.ConfigOptions; import com.alibaba.fluss.config.Configuration; import com.alibaba.fluss.exception.DatabaseAlreadyExistException; import com.alibaba.fluss.exception.DatabaseNotEmptyException; @@ -27,6 +28,7 @@ import com.alibaba.fluss.exception.TableAlreadyExistException; import com.alibaba.fluss.exception.TableNotExistException; import com.alibaba.fluss.exception.TableNotPartitionedException; +import com.alibaba.fluss.exception.TooManyPartitionsException; import com.alibaba.fluss.metadata.DatabaseDescriptor; import com.alibaba.fluss.metadata.DatabaseInfo; import com.alibaba.fluss.metadata.ResolvedPartitionSpec; @@ -65,6 +67,7 @@ public class MetadataManager { private final ZooKeeperClient zookeeperClient; private @Nullable final Map defaultTableLakeOptions; + private final int maxPartitionNum; /** * Creates a new metadata manager. @@ -75,6 +78,7 @@ public class MetadataManager { public MetadataManager(ZooKeeperClient zookeeperClient, Configuration conf) { this.zookeeperClient = zookeeperClient; this.defaultTableLakeOptions = LakeStorageUtils.generateDefaultTableLakeOptions(conf); + maxPartitionNum = conf.get(ConfigOptions.MAX_PARTITION_NUM); } public void createDatabase( @@ -343,6 +347,24 @@ public void createPartition( partition.getPartitionQualifiedName(), tablePath)); } + try { + int partitionNumber = zookeeperClient.getPartitionNumber(tablePath); + if (partitionNumber + 1 > maxPartitionNum) { + throw new TooManyPartitionsException( + String.format( + "Exceed the maximum number of partitions for table %s, only allow %s partitions.", + tablePath, maxPartitionNum)); + } + } catch (TooManyPartitionsException e) { + throw e; + } catch (Exception e) { + throw new FlussRuntimeException( + String.format( + "Get the number of partition from zookeeper failed for table %s", + tablePath), + e); + } + try { long partitionId = zookeeperClient.getPartitionIdAndIncrement(); // register partition assignments to zk first @@ -352,10 +374,10 @@ public void createPartition( LOG.info( "Register partition {} to zookeeper for table [{}].", partitionName, tablePath); } catch (Exception e) { - LOG.error( - "Register partition to zookeeper failed to create partition {} for table [{}]", - partitionName, - tablePath, + throw new FlussRuntimeException( + String.format( + "Register partition to zookeeper failed to create partition %s for table [%s]", + partitionName, tablePath), e); } } diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/zk/ZooKeeperClient.java b/fluss-server/src/main/java/com/alibaba/fluss/server/zk/ZooKeeperClient.java index 76d355df21..9c6e697547 100644 --- a/fluss-server/src/main/java/com/alibaba/fluss/server/zk/ZooKeeperClient.java +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/zk/ZooKeeperClient.java @@ -408,6 +408,16 @@ public Optional getPartition(TablePath tablePath, String partiti return getOrEmpty(path).map(PartitionZNode::decode); } + /** Get partition num of a table in ZK. */ + public int getPartitionNumber(TablePath tablePath) throws Exception { + String path = PartitionsZNode.path(tablePath); + Stat stat = zkClient.checkExists().forPath(path); + if (stat == null) { + return 0; + } + return stat.getNumChildren(); + } + /** Create a partition for a table in ZK. */ public void registerPartition( TablePath tablePath, long tableId, String partitionName, long partitionId) diff --git a/fluss-server/src/test/java/com/alibaba/fluss/server/coordinator/AutoPartitionManagerTest.java b/fluss-server/src/test/java/com/alibaba/fluss/server/coordinator/AutoPartitionManagerTest.java index a280de5717..b061664a32 100644 --- a/fluss-server/src/test/java/com/alibaba/fluss/server/coordinator/AutoPartitionManagerTest.java +++ b/fluss-server/src/test/java/com/alibaba/fluss/server/coordinator/AutoPartitionManagerTest.java @@ -38,6 +38,7 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; @@ -191,7 +192,7 @@ void testAddPartitionedTable(TestParams params) throws Exception { periodicExecutor); autoPartitionManager.start(); - TableInfo table = createPartitionedTable(params.timeUnit); + TableInfo table = createPartitionedTable(2, 4, params.timeUnit); TablePath tablePath = table.getTablePath(); autoPartitionManager.addAutoPartitionTable(table); // the first auto-partition task is a non-periodic task @@ -244,6 +245,80 @@ void testAddPartitionedTable(TestParams params) throws Exception { assertThat(partitions.keySet()).containsExactlyInAnyOrder(params.expectedPartitionsFinal); } + @Test + void testMaxPartitions() throws Exception { + int expectPartitionNumber = 10; + Configuration config = new Configuration(); + config.set(ConfigOptions.MAX_PARTITION_NUM, expectPartitionNumber); + MetadataManager metadataManager = new MetadataManager(zookeeperClient, config); + + ZonedDateTime startTime = + LocalDateTime.parse("2024-09-10T00:00:00").atZone(ZoneId.systemDefault()); + long startMs = startTime.toInstant().toEpochMilli(); + ManualClock clock = new ManualClock(startMs); + ManuallyTriggeredScheduledExecutorService periodicExecutor = + new ManuallyTriggeredScheduledExecutorService(); + + AutoPartitionManager autoPartitionManager = + new AutoPartitionManager( + new TestingMetadataCache(3), + metadataManager, + new Configuration(), + clock, + periodicExecutor); + autoPartitionManager.start(); + + // create a partitioned with -1 retention to never auto-drop partitions + TableInfo table = createPartitionedTable(-1, 4, AutoPartitionTimeUnit.DAY); + TablePath tablePath = table.getTablePath(); + autoPartitionManager.addAutoPartitionTable(table); + // the first auto-partition task is a non-periodic task + periodicExecutor.triggerPeriodicScheduledTasks(); + + Map partitions = zookeeperClient.getPartitionNameAndIds(tablePath); + // pre-create 4 partitions including current partition + assertThat(partitions.keySet()) + .containsExactlyInAnyOrder("20240910", "20240911", "20240912", "20240913"); + + // manually create 4 future partitions. + int replicaFactor = table.getTableConfig().getReplicationFactor(); + Map bucketAssignments = + TableAssignmentUtils.generateAssignment( + table.getNumBuckets(), replicaFactor, new int[] {0, 1, 2}) + .getBucketAssignments(); + long tableId = table.getTableId(); + PartitionAssignment partitionAssignment = + new PartitionAssignment(tableId, bucketAssignments); + for (int i = 20250101; i <= 20250104; i++) { + metadataManager.createPartition( + tablePath, + tableId, + partitionAssignment, + fromPartitionName(table.getPartitionKeys(), i + ""), + false); + // mock the partition is created in zk. + autoPartitionManager.addPartition(tableId, i + ""); + } + + clock.advanceTime(Duration.ofDays(4)); + periodicExecutor.triggerPeriodicScheduledTasks(); + partitions = zookeeperClient.getPartitionNameAndIds(tablePath); + assertThat(partitions.keySet()) + .containsExactlyInAnyOrder( + "20240910", + "20240911", + "20240912", + "20240913", + // only 20240914, 20240915 are created in this round + "20240914", + "20240915", + // 20250101 ~ 20250102 are retained + "20250101", + "20250102", + "20250103", + "20250104"); + } + private static class TestParams { final AutoPartitionTimeUnit timeUnit; final long startTimeMs; @@ -365,7 +440,9 @@ public TestParams build() { // ------------------------------------------------------------------------------------------- - private TableInfo createPartitionedTable(AutoPartitionTimeUnit timeUnit) throws Exception { + private TableInfo createPartitionedTable( + int partitionRetentionNum, int partitionPreCreateNum, AutoPartitionTimeUnit timeUnit) + throws Exception { long tableId = 1; TablePath tablePath = TablePath.of("db", "test_partition_" + UUID.randomUUID()); TableDescriptor descriptor = @@ -384,8 +461,12 @@ private TableInfo createPartitionedTable(AutoPartitionTimeUnit timeUnit) throws .property(ConfigOptions.TABLE_REPLICATION_FACTOR, 3) .property(ConfigOptions.TABLE_AUTO_PARTITION_ENABLED, true) .property(ConfigOptions.TABLE_AUTO_PARTITION_TIME_UNIT, timeUnit) - .property(ConfigOptions.TABLE_AUTO_PARTITION_NUM_RETENTION, 2) - .property(ConfigOptions.TABLE_AUTO_PARTITION_NUM_PRECREATE, 4) + .property( + ConfigOptions.TABLE_AUTO_PARTITION_NUM_RETENTION, + partitionRetentionNum) + .property( + ConfigOptions.TABLE_AUTO_PARTITION_NUM_PRECREATE, + partitionPreCreateNum) .build(); long currentMillis = System.currentTimeMillis(); TableInfo tableInfo = diff --git a/website/docs/maintenance/configuration.md b/website/docs/maintenance/configuration.md index 56ef99db7e..f4dabdf175 100644 --- a/website/docs/maintenance/configuration.md +++ b/website/docs/maintenance/configuration.md @@ -50,6 +50,7 @@ during the Fluss cluster working. | remote.fs.write-buffer-size | MemorySize | 4kb | The default size of the write buffer for writing the local files to remote file systems. | | plugin.classloader.parent-first-patterns.default | String | java.,
com.alibaba.fluss.,
javax.annotation.,
org.slf4j,
org.apache.log4j,
org.apache.logging,
org.apache.commons.logging,
ch.qos.logback | A (semicolon-separated) list of patterns that specifies which classes should always be resolved through the plugin parent ClassLoader first. A pattern is a simple prefix that is checked against the fully qualified class name. This setting should generally not be modified. | | auto-partition.check.interval | Duration | 10min | The interval of auto partition check. The default value is 10 minutes. | +| max.partition.num | Integer | 1000 | Limits the maximum number of partitions that can be created for a partitioned table to avoid creating too many partitions. | ## CoordinatorServer