Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1372,7 +1372,7 @@ tablePath, newPartitionSpec("age", "4"), false)
.get())
.cause()
.isInstanceOf(TooManyBucketsException.class)
.hasMessageContaining("exceeding the maximum of 30 buckets");
.hasMessageContaining("exceeding the database-level maximum of 30 buckets");
}

/**
Expand Down Expand Up @@ -1403,7 +1403,169 @@ public void testBucketLimitForNonPartitionedTable() throws Exception {
assertThatThrownBy(() -> admin.createTable(tablePath, nonPartitionedTable, false).get())
.cause()
.isInstanceOf(TooManyBucketsException.class)
.hasMessageContaining("exceeds the maximum limit");
.hasMessageContaining("exceeds the database-level maximum limit");
}

/** Database-level bucket limit should be enforced for non-partitioned tables. */
@Test
void testDatabaseBucketLimitForNonPartitionedTable() throws Exception {
String dbName = "db_bucket_limit_np";
admin.createDatabase(dbName, DatabaseDescriptor.EMPTY, true).get();

admin.alterClusterConfigs(
Collections.singletonList(
new AlterConfig(
"database.limits." + dbName + ".max.bucket.num",
"12",
AlterConfigOpType.SET)))
.get();

TablePath tablePath = TablePath.of(dbName, "t_np_bucket_limit");
TableDescriptor tooManyBuckets =
TableDescriptor.builder()
.schema(
Schema.newBuilder()
.column("id", DataTypes.STRING())
.column("name", DataTypes.STRING())
.build())
.distributedBy(20, "id")
.build();

assertThatThrownBy(() -> admin.createTable(tablePath, tooManyBuckets, false).get())
.cause()
.isInstanceOf(TooManyBucketsException.class);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

assert the message as well, the message should be clear to user the given database has exceed the allowed number of buckets.


TableDescriptor ok =
TableDescriptor.builder()
.schema(
Schema.newBuilder()
.column("id", DataTypes.STRING())
.column("name", DataTypes.STRING())
.build())
.distributedBy(12, "id")
.build();
admin.createTable(tablePath, ok, true).get();
}

/** Database-level bucket limit should cap total buckets for partitioned tables. */
@Test
void testDatabaseBucketLimitForPartitionedTable() throws Exception {
String dbName = "db_bucket_limit_part";
admin.createDatabase(dbName, DatabaseDescriptor.EMPTY, true).get();

admin.alterClusterConfigs(
Collections.singletonList(
new AlterConfig(
"database.limits." + dbName + ".max.bucket.num",
"25",
AlterConfigOpType.SET)))
.get();

TablePath tablePath = TablePath.of(dbName, "t_part_bucket_limit");
TableDescriptor td =
TableDescriptor.builder()
.schema(
Schema.newBuilder()
.column("id", DataTypes.STRING())
.column("age", DataTypes.STRING())
.build())
.distributedBy(10, "id")
.partitionedBy("age")
.build();
admin.createTable(tablePath, td, true).get();

admin.createPartition(tablePath, newPartitionSpec("age", "1"), false).get();
admin.createPartition(tablePath, newPartitionSpec("age", "2"), false).get();

assertThatThrownBy(
() ->
admin.createPartition(
tablePath, newPartitionSpec("age", "3"), false)
.get())
.cause()
.isInstanceOf(TooManyBucketsException.class);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

}

/** Database-level bucket limit should cap total buckets for partitioned tables. */
@Test
void testDatabaseBucketLimitForAutoPartitionedTable() throws Exception {
String dbName = "db_bucket_limit_auto_part";
admin.createDatabase(dbName, DatabaseDescriptor.EMPTY, true).get();

admin.alterClusterConfigs(
Collections.singletonList(
new AlterConfig(
"database.limits." + dbName + ".max.bucket.num",
"25",
AlterConfigOpType.SET)))
.get();

TablePath tablePath = TablePath.of(dbName, "t_auto_part_bucket_limit");
TableDescriptor td =
TableDescriptor.builder()
.schema(
Schema.newBuilder()
.column("id", DataTypes.STRING())
.column("pt", DataTypes.STRING())
.build())
.distributedBy(10, "id")
.partitionedBy("pt")
.property(ConfigOptions.TABLE_AUTO_PARTITION_ENABLED, true)
.property(
ConfigOptions.TABLE_AUTO_PARTITION_TIME_UNIT,
AutoPartitionTimeUnit.YEAR)
.build();

// default m is 7
assertThatThrownBy(() -> admin.createTable(tablePath, td, true).get())
.cause()
.isInstanceOf(TooManyBucketsException.class)
.hasMessageContaining(
"The number of buckets to retain must be less than the total number of buckets.");
}

/** Database-level bucket limit should only affect the target database. */
@Test
void testDatabaseBucketLimitIsolationAcrossDatabases() throws Exception {
String db1 = "db_bucket_limit_d1";
String db2 = "db_bucket_limit_d2";
admin.createDatabase(db1, DatabaseDescriptor.EMPTY, true).get();
admin.createDatabase(db2, DatabaseDescriptor.EMPTY, true).get();

admin.alterClusterConfigs(
Collections.singletonList(
new AlterConfig(
"database.limits." + db1 + ".max.bucket.num",
"12",
AlterConfigOpType.SET)))
.get();

TablePath t1 = TablePath.of(db1, "t_np_bucket_limit_d1");
TableDescriptor tooManyForDb1 =
TableDescriptor.builder()
.schema(
Schema.newBuilder()
.column("id", DataTypes.STRING())
.column("name", DataTypes.STRING())
.build())
.distributedBy(20, "id")
.build();
assertThatThrownBy(() -> admin.createTable(t1, tooManyForDb1, false).get())
.cause()
.isInstanceOf(TooManyBucketsException.class);

TablePath t2 = TablePath.of(db2, "t_np_bucket_limit_d2");
TableDescriptor okForDb2 =
TableDescriptor.builder()
.schema(
Schema.newBuilder()
.column("id", DataTypes.STRING())
.column("name", DataTypes.STRING())
.build())
.distributedBy(20, "id")
.build();
// cluster max.bucket.num is 30 in base config, so 20 should succeed in db2
admin.createTable(t2, okForDb2, true).get();
}

/** Test that creating a table with system columns throws InvalidTableException. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,19 @@

package org.apache.fluss.flink.catalog;

import org.apache.fluss.client.Connection;
import org.apache.fluss.client.ConnectionFactory;
import org.apache.fluss.client.admin.Admin;
import org.apache.fluss.cluster.ServerNode;
import org.apache.fluss.config.ConfigOptions;
import org.apache.fluss.config.Configuration;
import org.apache.fluss.config.cluster.AlterConfig;
import org.apache.fluss.config.cluster.AlterConfigOpType;
import org.apache.fluss.exception.FlussRuntimeException;
import org.apache.fluss.exception.InvalidAlterTableException;
import org.apache.fluss.exception.InvalidConfigException;
import org.apache.fluss.exception.InvalidTableException;
import org.apache.fluss.exception.TooManyBucketsException;
import org.apache.fluss.metadata.DataLakeFormat;
import org.apache.fluss.metadata.TablePath;
import org.apache.fluss.server.testutils.FlussClusterExtension;
Expand Down Expand Up @@ -448,6 +454,91 @@ void testAutoPartitionedTable() throws Exception {
assertResultsIgnoreOrder(showPartitionIterator, expectedShowPartitionsResult, true);
}

@Test
void testDbLevelBucketLimitForNonPartitionedTableWithCatalog() throws Exception {
String dbName = "db_bucket_limit_np_sql";
tEnv.executeSql("create database " + dbName);
tEnv.executeSql("use " + dbName);

try (Connection conn =
ConnectionFactory.createConnection(FLUSS_CLUSTER_EXTENSION.getClientConfig())) {
Admin admin = conn.getAdmin();
admin.alterClusterConfigs(
java.util.Collections.singletonList(
new AlterConfig(
"database.limits." + dbName + ".max.bucket.num",
"12",
AlterConfigOpType.SET)))
.get();
}

assertThatThrownBy(
() ->
tEnv.executeSql(
"create table t_np_limit (a int, b string) with ('bucket.num' = '20')"))
.rootCause()
.isInstanceOf(TooManyBucketsException.class);

tEnv.executeSql("create table t_np_ok (a int, b string) with ('bucket.num' = '12')");
}

@Test
void testDbLevelBucketLimitForPartitionedTableWithCatalog() throws Exception {
String dbName = "db_bucket_limit_part_sql";
tEnv.executeSql("create database " + dbName);
tEnv.executeSql("use " + dbName);

try (Connection conn =
ConnectionFactory.createConnection(FLUSS_CLUSTER_EXTENSION.getClientConfig())) {
Admin admin = conn.getAdmin();
admin.alterClusterConfigs(
java.util.Collections.singletonList(
new AlterConfig(
"database.limits." + dbName + ".max.bucket.num",
"25",
AlterConfigOpType.SET)))
.get();
}

tEnv.executeSql(
"create table t_part_limit (a int, b string) partitioned by (b) with ('bucket.num' = '10')");

tEnv.executeSql("alter table t_part_limit add partition (b = '1')");
tEnv.executeSql("alter table t_part_limit add partition (b = '2')");

assertThatThrownBy(
() -> tEnv.executeSql("alter table t_part_limit add partition (b = '3')"))
.rootCause()
.isInstanceOf(TooManyBucketsException.class);
}

@Test
void testDbLevelBucketLimitForAutoPartitionedTableWithCatalog() throws Exception {
String dbName = "db_bucket_limit_auto_sql";
tEnv.executeSql("create database " + dbName);
tEnv.executeSql("use " + dbName);

try (Connection conn =
ConnectionFactory.createConnection(FLUSS_CLUSTER_EXTENSION.getClientConfig())) {
Admin admin = conn.getAdmin();
admin.alterClusterConfigs(
java.util.Collections.singletonList(
new AlterConfig(
"database.limits." + dbName + ".max.bucket.num",
"25",
AlterConfigOpType.SET)))
.get();
}

assertThatThrownBy(
() ->
tEnv.executeSql(
"create table t_auto_limit (a int, b string) partitioned by (b) "
+ "with ('bucket.num' = '10', 'table.auto-partition.enabled' = 'true', 'table.auto-partition.time-unit' = 'year')"))
.rootCause()
.isInstanceOf(TooManyBucketsException.class);
}

@Test
void testInvalidAutoPartitionedTableWithMultiPartitionKeys() {
// 1. test invalid auto partition table.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.fluss.metadata.TableInfo;
import org.apache.fluss.metadata.TablePath;
import org.apache.fluss.row.InternalRow;
import org.apache.fluss.server.DynamicConfigManager;
import org.apache.fluss.server.coordinator.LakeCatalogDynamicLoader;
import org.apache.fluss.server.coordinator.MetadataManager;
import org.apache.fluss.server.testutils.FlussClusterExtension;
Expand Down Expand Up @@ -225,7 +226,8 @@ public static Map<Long, String> createPartitions(
new MetadataManager(
zkClient,
new Configuration(),
new LakeCatalogDynamicLoader(new Configuration(), null, false));
new LakeCatalogDynamicLoader(new Configuration(), null, false),
new DynamicConfigManager(zkClient, new Configuration(), true));
TableInfo tableInfo = metadataManager.getTable(tablePath);
Map<Long, String> newPartitionIds = new HashMap<>();
for (String partition : partitionsToCreate) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,14 @@
* them to these {@link ServerReconfigurable} instances.
*/
@Internal
class DynamicServerConfig {
public class DynamicServerConfig {

private static final Logger LOG = LoggerFactory.getLogger(DynamicServerConfig.class);
private static final Set<String> ALLOWED_CONFIG_KEYS =
Collections.singleton(DATALAKE_FORMAT.key());
private static final Set<String> ALLOWED_CONFIG_PREFIXES = Collections.singleton("datalake.");
public static final String DATABASE_LIMITS_PREFIX = "database.limits.";
private static final Set<String> ALLOWED_CONFIG_PREFIXES =
new HashSet<>(java.util.Arrays.asList("datalake.", DATABASE_LIMITS_PREFIX));

private final ReadWriteLock lock = new ReentrantReadWriteLock();
private final Map<Class<? extends ServerReconfigurable>, ServerReconfigurable>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,8 @@ protected void startServices() throws Exception {
this.lakeTableTieringManager = new LakeTableTieringManager();

MetadataManager metadataManager =
new MetadataManager(zkClient, conf, lakeCatalogDynamicLoader);
new MetadataManager(
zkClient, conf, lakeCatalogDynamicLoader, dynamicConfigManager);
this.coordinatorService =
new CoordinatorService(
conf,
Expand Down Expand Up @@ -313,7 +314,7 @@ private void registerCoordinatorLeader() throws Exception {

private void createDefaultDatabase() {
MetadataManager metadataManager =
new MetadataManager(zkClient, conf, lakeCatalogDynamicLoader);
new MetadataManager(zkClient, conf, lakeCatalogDynamicLoader, dynamicConfigManager);
List<String> databases = metadataManager.listDatabases();
if (databases.isEmpty()) {
metadataManager.createDatabase(DEFAULT_DATABASE, DatabaseDescriptor.EMPTY, true);
Expand Down
Loading