Skip to content

Commit ce277f8

Browse files
authored
[server] Support max partition number of a table (#693)
1 parent 0c77120 commit ce277f8

File tree

10 files changed

+221
-13
lines changed

10 files changed

+221
-13
lines changed

fluss-client/src/test/java/com/alibaba/fluss/client/admin/ClientToServerITCaseBase.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,8 @@ private static Configuration initConfig() {
115115

116116
conf.set(ConfigOptions.CLIENT_WRITER_BUFFER_MEMORY_SIZE, MemorySize.parse("1mb"));
117117
conf.set(ConfigOptions.CLIENT_WRITER_BATCH_SIZE, MemorySize.parse("1kb"));
118+
119+
conf.set(ConfigOptions.MAX_PARTITION_NUM, 10);
118120
return conf;
119121
}
120122

fluss-client/src/test/java/com/alibaba/fluss/client/admin/FlussAdminITCase.java

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import com.alibaba.fluss.exception.SchemaNotExistException;
4040
import com.alibaba.fluss.exception.TableNotExistException;
4141
import com.alibaba.fluss.exception.TableNotPartitionedException;
42+
import com.alibaba.fluss.exception.TooManyPartitionsException;
4243
import com.alibaba.fluss.fs.FsPath;
4344
import com.alibaba.fluss.fs.FsPathAndFileName;
4445
import com.alibaba.fluss.metadata.DataLakeFormat;
@@ -776,6 +777,38 @@ void testBootstrapServerConfigAsTabletServer() throws Exception {
776777
}
777778
}
778779

780+
@Test
781+
void testAddTooManyPartitions() throws Exception {
782+
String dbName = DEFAULT_TABLE_PATH.getDatabaseName();
783+
TableDescriptor partitionedTable =
784+
TableDescriptor.builder()
785+
.schema(
786+
Schema.newBuilder()
787+
.column("id", DataTypes.STRING())
788+
.column("name", DataTypes.STRING())
789+
.column("age", DataTypes.STRING())
790+
.build())
791+
.distributedBy(3, "id")
792+
.partitionedBy("age")
793+
.build();
794+
TablePath tablePath = TablePath.of(dbName, "test_add_too_many_partitioned_table");
795+
admin.createTable(tablePath, partitionedTable, true).get();
796+
797+
// add 10 partitions.
798+
for (int i = 0; i < 10; i++) {
799+
admin.createPartition(tablePath, newPartitionSpec("age", String.valueOf(i)), false)
800+
.get();
801+
}
802+
// add out of limit partition
803+
assertThatThrownBy(
804+
() ->
805+
admin.createPartition(
806+
tablePath, newPartitionSpec("age", "11"), false)
807+
.get())
808+
.cause()
809+
.isInstanceOf(TooManyPartitionsException.class);
810+
}
811+
779812
private void assertHasTabletServerNumber(int tabletServerNumber) {
780813
CoordinatorGateway coordinatorGateway = FLUSS_CLUSTER_EXTENSION.newCoordinatorClient();
781814
retry(

fluss-common/src/main/java/com/alibaba/fluss/config/ConfigOptions.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,14 @@ public class ConfigOptions {
119119
"The interval of auto partition check. "
120120
+ "The default value is 10 minutes.");
121121

122+
public static final ConfigOption<Integer> MAX_PARTITION_NUM =
123+
key("max.partition.num")
124+
.intType()
125+
.defaultValue(1000)
126+
.withDescription(
127+
"Limits the maximum number of partitions that can be created for a partitioned table "
128+
+ "to avoid creating too many partitions.");
129+
122130
// ------------------------------------------------------------------------
123131
// ConfigOptions for Coordinator Server
124132
// ------------------------------------------------------------------------
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* Copyright (c) 2025 Alibaba Group Holding Ltd.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.alibaba.fluss.exception;
18+
19+
import com.alibaba.fluss.annotation.PublicEvolving;
20+
21+
/**
22+
* This exception is thrown if the number of table partitions is exceed max.partition.num.
23+
*
24+
* @since 0.7
25+
*/
26+
@PublicEvolving
27+
public class TooManyPartitionsException extends ApiException {
28+
29+
private static final long serialVersionUID = 1L;
30+
31+
public TooManyPartitionsException(String message) {
32+
super(message);
33+
}
34+
}

fluss-rpc/src/main/java/com/alibaba/fluss/rpc/protocol/Errors.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@
5858
import com.alibaba.fluss.exception.TableNotExistException;
5959
import com.alibaba.fluss.exception.TableNotPartitionedException;
6060
import com.alibaba.fluss.exception.TimeoutException;
61+
import com.alibaba.fluss.exception.TooManyPartitionsException;
6162
import com.alibaba.fluss.exception.UnknownServerException;
6263
import com.alibaba.fluss.exception.UnknownTableOrBucketException;
6364
import com.alibaba.fluss.exception.UnknownWriterIdException;
@@ -187,7 +188,9 @@ public enum Errors {
187188
LEADER_NOT_AVAILABLE_EXCEPTION(
188189
44,
189190
"There is no currently available leader for the given partition.",
190-
LeaderNotAvailableException::new);
191+
LeaderNotAvailableException::new),
192+
PARTITION_MAX_NUM_EXCEPTION(
193+
45, "Exceed the maximum number of partitions.", TooManyPartitionsException::new);
191194

192195
private static final Logger LOG = LoggerFactory.getLogger(Errors.class);
193196

fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/AutoPartitionManager.java

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import com.alibaba.fluss.config.Configuration;
2424
import com.alibaba.fluss.exception.PartitionAlreadyExistsException;
2525
import com.alibaba.fluss.exception.PartitionNotExistException;
26+
import com.alibaba.fluss.exception.TooManyPartitionsException;
2627
import com.alibaba.fluss.metadata.ResolvedPartitionSpec;
2728
import com.alibaba.fluss.metadata.TableInfo;
2829
import com.alibaba.fluss.metadata.TablePath;
@@ -245,16 +246,29 @@ private void createPartitions(
245246
try {
246247
metadataManager.createPartition(
247248
tablePath, tableId, partitionAssignment, partition, false);
249+
currentPartitions.add(partition.getPartitionName());
250+
LOG.info(
251+
"Auto partitioning created partition {} for table [{}].",
252+
partition,
253+
tablePath);
248254
} catch (PartitionAlreadyExistsException e) {
249255
LOG.info(
250256
"Auto partitioning skip to create partition {} for table [{}] as the partition is exist.",
251257
partition,
252258
tablePath);
259+
} catch (TooManyPartitionsException t) {
260+
LOG.warn(
261+
"Auto partitioning skip to create partition {} for table [{}], "
262+
+ "because exceed the maximum number of partitions.",
263+
partition,
264+
tablePath);
265+
} catch (Exception e) {
266+
LOG.error(
267+
"Auto partitioning failed to create partition {} for table [{}].",
268+
partition,
269+
tablePath,
270+
e);
253271
}
254-
255-
currentPartitions.add(partition.getPartitionName());
256-
LOG.info(
257-
"Auto partitioning created partition {} for table [{}].", partition, tablePath);
258272
}
259273
}
260274

fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/MetadataManager.java

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package com.alibaba.fluss.server.coordinator;
1818

19+
import com.alibaba.fluss.config.ConfigOptions;
1920
import com.alibaba.fluss.config.Configuration;
2021
import com.alibaba.fluss.exception.DatabaseAlreadyExistException;
2122
import com.alibaba.fluss.exception.DatabaseNotEmptyException;
@@ -27,6 +28,7 @@
2728
import com.alibaba.fluss.exception.TableAlreadyExistException;
2829
import com.alibaba.fluss.exception.TableNotExistException;
2930
import com.alibaba.fluss.exception.TableNotPartitionedException;
31+
import com.alibaba.fluss.exception.TooManyPartitionsException;
3032
import com.alibaba.fluss.metadata.DatabaseDescriptor;
3133
import com.alibaba.fluss.metadata.DatabaseInfo;
3234
import com.alibaba.fluss.metadata.ResolvedPartitionSpec;
@@ -65,6 +67,7 @@ public class MetadataManager {
6567

6668
private final ZooKeeperClient zookeeperClient;
6769
private @Nullable final Map<String, String> defaultTableLakeOptions;
70+
private final int maxPartitionNum;
6871

6972
/**
7073
* Creates a new metadata manager.
@@ -75,6 +78,7 @@ public class MetadataManager {
7578
public MetadataManager(ZooKeeperClient zookeeperClient, Configuration conf) {
7679
this.zookeeperClient = zookeeperClient;
7780
this.defaultTableLakeOptions = LakeStorageUtils.generateDefaultTableLakeOptions(conf);
81+
maxPartitionNum = conf.get(ConfigOptions.MAX_PARTITION_NUM);
7882
}
7983

8084
public void createDatabase(
@@ -343,6 +347,24 @@ public void createPartition(
343347
partition.getPartitionQualifiedName(), tablePath));
344348
}
345349

350+
try {
351+
int partitionNumber = zookeeperClient.getPartitionNumber(tablePath);
352+
if (partitionNumber + 1 > maxPartitionNum) {
353+
throw new TooManyPartitionsException(
354+
String.format(
355+
"Exceed the maximum number of partitions for table %s, only allow %s partitions.",
356+
tablePath, maxPartitionNum));
357+
}
358+
} catch (TooManyPartitionsException e) {
359+
throw e;
360+
} catch (Exception e) {
361+
throw new FlussRuntimeException(
362+
String.format(
363+
"Get the number of partition from zookeeper failed for table %s",
364+
tablePath),
365+
e);
366+
}
367+
346368
try {
347369
long partitionId = zookeeperClient.getPartitionIdAndIncrement();
348370
// register partition assignments to zk first
@@ -352,10 +374,10 @@ public void createPartition(
352374
LOG.info(
353375
"Register partition {} to zookeeper for table [{}].", partitionName, tablePath);
354376
} catch (Exception e) {
355-
LOG.error(
356-
"Register partition to zookeeper failed to create partition {} for table [{}]",
357-
partitionName,
358-
tablePath,
377+
throw new FlussRuntimeException(
378+
String.format(
379+
"Register partition to zookeeper failed to create partition %s for table [%s]",
380+
partitionName, tablePath),
359381
e);
360382
}
361383
}

fluss-server/src/main/java/com/alibaba/fluss/server/zk/ZooKeeperClient.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -408,6 +408,16 @@ public Optional<TablePartition> getPartition(TablePath tablePath, String partiti
408408
return getOrEmpty(path).map(PartitionZNode::decode);
409409
}
410410

411+
/** Get partition num of a table in ZK. */
412+
public int getPartitionNumber(TablePath tablePath) throws Exception {
413+
String path = PartitionsZNode.path(tablePath);
414+
Stat stat = zkClient.checkExists().forPath(path);
415+
if (stat == null) {
416+
return 0;
417+
}
418+
return stat.getNumChildren();
419+
}
420+
411421
/** Create a partition for a table in ZK. */
412422
public void registerPartition(
413423
TablePath tablePath, long tableId, String partitionName, long partitionId)

fluss-server/src/test/java/com/alibaba/fluss/server/coordinator/AutoPartitionManagerTest.java

Lines changed: 85 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838

3939
import org.junit.jupiter.api.AfterEach;
4040
import org.junit.jupiter.api.BeforeAll;
41+
import org.junit.jupiter.api.Test;
4142
import org.junit.jupiter.api.extension.RegisterExtension;
4243
import org.junit.jupiter.params.ParameterizedTest;
4344
import org.junit.jupiter.params.provider.Arguments;
@@ -191,7 +192,7 @@ void testAddPartitionedTable(TestParams params) throws Exception {
191192
periodicExecutor);
192193
autoPartitionManager.start();
193194

194-
TableInfo table = createPartitionedTable(params.timeUnit);
195+
TableInfo table = createPartitionedTable(2, 4, params.timeUnit);
195196
TablePath tablePath = table.getTablePath();
196197
autoPartitionManager.addAutoPartitionTable(table);
197198
// the first auto-partition task is a non-periodic task
@@ -244,6 +245,80 @@ void testAddPartitionedTable(TestParams params) throws Exception {
244245
assertThat(partitions.keySet()).containsExactlyInAnyOrder(params.expectedPartitionsFinal);
245246
}
246247

248+
@Test
249+
void testMaxPartitions() throws Exception {
250+
int expectPartitionNumber = 10;
251+
Configuration config = new Configuration();
252+
config.set(ConfigOptions.MAX_PARTITION_NUM, expectPartitionNumber);
253+
MetadataManager metadataManager = new MetadataManager(zookeeperClient, config);
254+
255+
ZonedDateTime startTime =
256+
LocalDateTime.parse("2024-09-10T00:00:00").atZone(ZoneId.systemDefault());
257+
long startMs = startTime.toInstant().toEpochMilli();
258+
ManualClock clock = new ManualClock(startMs);
259+
ManuallyTriggeredScheduledExecutorService periodicExecutor =
260+
new ManuallyTriggeredScheduledExecutorService();
261+
262+
AutoPartitionManager autoPartitionManager =
263+
new AutoPartitionManager(
264+
new TestingMetadataCache(3),
265+
metadataManager,
266+
new Configuration(),
267+
clock,
268+
periodicExecutor);
269+
autoPartitionManager.start();
270+
271+
// create a partitioned with -1 retention to never auto-drop partitions
272+
TableInfo table = createPartitionedTable(-1, 4, AutoPartitionTimeUnit.DAY);
273+
TablePath tablePath = table.getTablePath();
274+
autoPartitionManager.addAutoPartitionTable(table);
275+
// the first auto-partition task is a non-periodic task
276+
periodicExecutor.triggerPeriodicScheduledTasks();
277+
278+
Map<String, Long> partitions = zookeeperClient.getPartitionNameAndIds(tablePath);
279+
// pre-create 4 partitions including current partition
280+
assertThat(partitions.keySet())
281+
.containsExactlyInAnyOrder("20240910", "20240911", "20240912", "20240913");
282+
283+
// manually create 4 future partitions.
284+
int replicaFactor = table.getTableConfig().getReplicationFactor();
285+
Map<Integer, BucketAssignment> bucketAssignments =
286+
TableAssignmentUtils.generateAssignment(
287+
table.getNumBuckets(), replicaFactor, new int[] {0, 1, 2})
288+
.getBucketAssignments();
289+
long tableId = table.getTableId();
290+
PartitionAssignment partitionAssignment =
291+
new PartitionAssignment(tableId, bucketAssignments);
292+
for (int i = 20250101; i <= 20250104; i++) {
293+
metadataManager.createPartition(
294+
tablePath,
295+
tableId,
296+
partitionAssignment,
297+
fromPartitionName(table.getPartitionKeys(), i + ""),
298+
false);
299+
// mock the partition is created in zk.
300+
autoPartitionManager.addPartition(tableId, i + "");
301+
}
302+
303+
clock.advanceTime(Duration.ofDays(4));
304+
periodicExecutor.triggerPeriodicScheduledTasks();
305+
partitions = zookeeperClient.getPartitionNameAndIds(tablePath);
306+
assertThat(partitions.keySet())
307+
.containsExactlyInAnyOrder(
308+
"20240910",
309+
"20240911",
310+
"20240912",
311+
"20240913",
312+
// only 20240914, 20240915 are created in this round
313+
"20240914",
314+
"20240915",
315+
// 20250101 ~ 20250102 are retained
316+
"20250101",
317+
"20250102",
318+
"20250103",
319+
"20250104");
320+
}
321+
247322
private static class TestParams {
248323
final AutoPartitionTimeUnit timeUnit;
249324
final long startTimeMs;
@@ -365,7 +440,9 @@ public TestParams build() {
365440

366441
// -------------------------------------------------------------------------------------------
367442

368-
private TableInfo createPartitionedTable(AutoPartitionTimeUnit timeUnit) throws Exception {
443+
private TableInfo createPartitionedTable(
444+
int partitionRetentionNum, int partitionPreCreateNum, AutoPartitionTimeUnit timeUnit)
445+
throws Exception {
369446
long tableId = 1;
370447
TablePath tablePath = TablePath.of("db", "test_partition_" + UUID.randomUUID());
371448
TableDescriptor descriptor =
@@ -384,8 +461,12 @@ private TableInfo createPartitionedTable(AutoPartitionTimeUnit timeUnit) throws
384461
.property(ConfigOptions.TABLE_REPLICATION_FACTOR, 3)
385462
.property(ConfigOptions.TABLE_AUTO_PARTITION_ENABLED, true)
386463
.property(ConfigOptions.TABLE_AUTO_PARTITION_TIME_UNIT, timeUnit)
387-
.property(ConfigOptions.TABLE_AUTO_PARTITION_NUM_RETENTION, 2)
388-
.property(ConfigOptions.TABLE_AUTO_PARTITION_NUM_PRECREATE, 4)
464+
.property(
465+
ConfigOptions.TABLE_AUTO_PARTITION_NUM_RETENTION,
466+
partitionRetentionNum)
467+
.property(
468+
ConfigOptions.TABLE_AUTO_PARTITION_NUM_PRECREATE,
469+
partitionPreCreateNum)
389470
.build();
390471
long currentMillis = System.currentTimeMillis();
391472
TableInfo tableInfo =

website/docs/maintenance/configuration.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ during the Fluss cluster working.
5050
| remote.fs.write-buffer-size | MemorySize | 4kb | The default size of the write buffer for writing the local files to remote file systems. |
5151
| plugin.classloader.parent-first-patterns.default | String | java.,<br/>com.alibaba.fluss.,<br/>javax.annotation.,<br/>org.slf4j,<br/>org.apache.log4j,<br/>org.apache.logging,<br/>org.apache.commons.logging,<br/>ch.qos.logback | A (semicolon-separated) list of patterns that specifies which classes should always be resolved through the plugin parent ClassLoader first. A pattern is a simple prefix that is checked against the fully qualified class name. This setting should generally not be modified. |
5252
| auto-partition.check.interval | Duration | 10min | The interval of auto partition check. The default value is 10 minutes. |
53+
| max.partition.num | Integer | 1000 | Limits the maximum number of partitions that can be created for a partitioned table to avoid creating too many partitions. |
5354
5455
## CoordinatorServer
5556

0 commit comments

Comments
 (0)