diff --git a/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java b/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java index 43c2445544..c1059ac062 100644 --- a/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java +++ b/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java @@ -28,10 +28,12 @@ import org.apache.fluss.metadata.MergeEngineType; import org.apache.fluss.utils.ArrayUtils; +import java.lang.reflect.Field; import java.time.Duration; import java.time.ZoneId; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; @@ -1576,6 +1578,17 @@ public class ConfigOptions { "The max size of the consumed memory for RocksDB batch write, " + "will flush just based on item count if this config set to 0."); + public static final ConfigOption KV_SHARED_RATE_LIMITER_BYTES_PER_SEC = + key("kv.rocksdb.shared-rate-limiter.bytes-per-sec") + .memoryType() + .defaultValue(new MemorySize(Long.MAX_VALUE)) + .withDescription( + "The shared rate limit in bytes per second for RocksDB flush and compaction operations " + + "across all RocksDB instances in the TabletServer. " + + "All KV tablets share a single global RateLimiter to prevent disk IO from being saturated. " + + "The RateLimiter is always enabled. The default value is Long.MAX_VALUE (effectively unlimited). " + + "Set to a lower value (e.g., 100MB) to limit the rate."); + // -------------------------------------------------------------------------- // Provided configurable ColumnFamilyOptions within Fluss // -------------------------------------------------------------------------- @@ -1869,4 +1882,45 @@ public enum KvCompressionType { LZ4, ZSTD } + + // ------------------------------------------------------------------------ + // ConfigOptions Registry and Utilities + // ------------------------------------------------------------------------ + + /** + * Holder class for lazy initialization of ConfigOptions registry. This ensures that the + * registry is initialized only when first accessed, and guarantees that all ConfigOption fields + * are already initialized (since static initialization happens in declaration order). + */ + private static class ConfigOptionsHolder { + private static final Map> CONFIG_OPTIONS_BY_KEY; + + static { + Map> options = new HashMap<>(); + Field[] fields = ConfigOptions.class.getFields(); + for (Field field : fields) { + if (!ConfigOption.class.isAssignableFrom(field.getType())) { + continue; + } + try { + ConfigOption configOption = (ConfigOption) field.get(null); + options.put(configOption.key(), configOption); + } catch (IllegalAccessException e) { + // Ignore fields that cannot be accessed + } + } + CONFIG_OPTIONS_BY_KEY = Collections.unmodifiableMap(options); + } + } + + /** + * Gets the ConfigOption for a given key. + * + * @param key the configuration key + * @return the ConfigOption if found, null otherwise + */ + @Internal + public static ConfigOption getConfigOption(String key) { + return ConfigOptionsHolder.CONFIG_OPTIONS_BY_KEY.get(key); + } } diff --git a/fluss-common/src/main/java/org/apache/fluss/config/cluster/ConfigValidator.java b/fluss-common/src/main/java/org/apache/fluss/config/cluster/ConfigValidator.java new file mode 100644 index 0000000000..deb9313230 --- /dev/null +++ b/fluss-common/src/main/java/org/apache/fluss/config/cluster/ConfigValidator.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.config.cluster; + +import org.apache.fluss.annotation.PublicEvolving; +import org.apache.fluss.exception.ConfigException; + +import javax.annotation.Nullable; + +/** + * Validator for a single dynamic configuration key. + * + *

Unlike {@link ServerReconfigurable}, validators are stateless and only perform validation + * logic without requiring component instances. This allows coordinators to validate configurations + * for components they don't run (e.g., KvManager). + * + *

Example use case: CoordinatorServer needs to validate KV-related configurations even though it + * doesn't have a KvManager instance. A {@link ConfigValidator} can be registered on both + * CoordinatorServer (for validation) and TabletServer (for both validation and actual + * reconfiguration via {@link ServerReconfigurable}). + * + *

Each validator monitors a single configuration key. The validator will only be invoked when + * that specific key changes, improving validation efficiency. + * + *

This interface is designed to be stateless and thread-safe. Implementations should not rely on + * any mutable component state. + * + *

Type-safe validation: The validator receives strongly-typed values that have already + * been parsed and validated for basic type correctness. This avoids redundant string parsing and + * allows validators to focus on business logic validation. + * + * @param the type of the configuration value being validated + */ +@PublicEvolving +public interface ConfigValidator { + + /** + * Returns the configuration key this validator monitors. + * + *

The validator will only be invoked when this specific configuration key changes. This + * allows efficient filtering of validators and clear declaration of dependencies. + * + * @return the configuration key to monitor, must not be null or empty + */ + String configKey(); + + /** + * Validates a configuration value change. + * + *

This method is called when the monitored configuration key changes. It should check + * whether the new value is valid, potentially considering the old value and validation rules. + * + *

The method should be stateless and deterministic - given the same old and new values, it + * should always produce the same validation result. + * + * @param oldValue the previous value of the configuration key, null if the key was not set + * before + * @param newValue the new value of the configuration key, null if the key is being deleted + * @throws ConfigException if the configuration change is invalid, with a descriptive error + * message explaining why the change cannot be applied + */ + void validate(@Nullable T oldValue, @Nullable T newValue) throws ConfigException; +} diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/GetClusterConfigProcedure.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/GetClusterConfigProcedure.java new file mode 100644 index 0000000000..444f1dd13d --- /dev/null +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/GetClusterConfigProcedure.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.procedure; + +import org.apache.fluss.config.cluster.ConfigEntry; + +import org.apache.flink.table.annotation.ArgumentHint; +import org.apache.flink.table.annotation.DataTypeHint; +import org.apache.flink.table.annotation.ProcedureHint; +import org.apache.flink.table.procedure.ProcedureContext; +import org.apache.flink.types.Row; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +/** + * Procedure to get cluster configuration(s). + * + *

This procedure allows querying dynamic cluster configurations. It can retrieve: + * + *

    + *
  • A specific configuration key + *
  • All configurations (when key parameter is null or empty) + *
+ * + *

Usage examples: + * + *

+ * -- Get a specific configuration
+ * CALL sys.get_cluster_config('kv.rocksdb.shared-rate-limiter.bytes-per-sec');
+ *
+ * -- Get all cluster configurations
+ * CALL sys.get_cluster_config();
+ * 
+ */ +public class GetClusterConfigProcedure extends ProcedureBase { + + @ProcedureHint( + output = + @DataTypeHint( + "ROW")) + public Row[] call(ProcedureContext context) throws Exception { + return getConfigs(null); + } + + @ProcedureHint( + argument = {@ArgumentHint(name = "config_key", type = @DataTypeHint("STRING"))}, + output = + @DataTypeHint( + "ROW")) + public Row[] call(ProcedureContext context, String configKey) throws Exception { + return getConfigs(configKey); + } + + private Row[] getConfigs(@Nullable String configKey) throws Exception { + try { + // Get all cluster configurations + Collection configs = admin.describeClusterConfigs().get(); + + List results = new ArrayList<>(); + + if (configKey == null || configKey.isEmpty()) { + // Return all configurations + for (ConfigEntry entry : configs) { + results.add( + Row.of( + entry.key(), + entry.value(), + entry.source() != null ? entry.source().name() : "UNKNOWN")); + } + } else { + // Find specific configuration + for (ConfigEntry entry : configs) { + if (entry.key().equals(configKey)) { + results.add( + Row.of( + entry.key(), + entry.value(), + entry.source() != null + ? entry.source().name() + : "UNKNOWN")); + break; + } + } + } + + return results.toArray(new Row[0]); + + } catch (Exception e) { + throw new RuntimeException("Failed to get cluster config: " + e.getMessage(), e); + } + } +} diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/ProcedureManager.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/ProcedureManager.java index 0767b3f23d..dcc5e223ee 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/ProcedureManager.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/ProcedureManager.java @@ -69,7 +69,9 @@ private static Map> initProcedureMap() { private enum ProcedureEnum { ADD_ACL("sys.add_acl", AddAclProcedure.class), DROP_ACL("sys.drop_acl", DropAclProcedure.class), - List_ACL("sys.list_acl", ListAclProcedure.class); + List_ACL("sys.list_acl", ListAclProcedure.class), + SET_CLUSTER_CONFIG("sys.set_cluster_config", SetClusterConfigProcedure.class), + GET_CLUSTER_CONFIG("sys.get_cluster_config", GetClusterConfigProcedure.class); private final String path; private final Class procedureClass; diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/SetClusterConfigProcedure.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/SetClusterConfigProcedure.java new file mode 100644 index 0000000000..ee4dbcf37e --- /dev/null +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/SetClusterConfigProcedure.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.procedure; + +import org.apache.fluss.config.cluster.AlterConfig; +import org.apache.fluss.config.cluster.AlterConfigOpType; + +import org.apache.flink.table.annotation.ArgumentHint; +import org.apache.flink.table.annotation.DataTypeHint; +import org.apache.flink.table.annotation.ProcedureHint; +import org.apache.flink.table.procedure.ProcedureContext; + +import javax.annotation.Nullable; + +import java.util.Collections; + +/** + * Procedure to set or delete cluster configuration dynamically. + * + *

This procedure allows modifying dynamic cluster configurations. The changes are: + * + *

    + *
  • Validated by the CoordinatorServer before persistence + *
  • Persisted in ZooKeeper for durability + *
  • Applied to all relevant servers (Coordinator and TabletServers) + *
  • Survive server restarts + *
+ * + *

Usage examples: + * + *

+ * -- Set a configuration
+ * CALL sys.set_cluster_config('kv.rocksdb.shared-rate-limiter.bytes-per-sec', '200MB');
+ * CALL sys.set_cluster_config('datalake.format', 'paimon');
+ *
+ * -- Delete a configuration (reset to default)
+ * CALL sys.set_cluster_config('kv.rocksdb.shared-rate-limiter.bytes-per-sec', NULL);
+ * CALL sys.set_cluster_config('kv.rocksdb.shared-rate-limiter.bytes-per-sec', '');
+ * 
+ * + *

Note: Not all configurations support dynamic changes. The server will validate the + * change and reject it if the configuration cannot be modified dynamically or if the new value is + * invalid. + */ +public class SetClusterConfigProcedure extends ProcedureBase { + + @ProcedureHint(argument = {@ArgumentHint(name = "config_key", type = @DataTypeHint("STRING"))}) + public String[] call(ProcedureContext context, String configKey) throws Exception { + return performSet(configKey, null); + } + + @ProcedureHint( + argument = { + @ArgumentHint(name = "config_key", type = @DataTypeHint("STRING")), + @ArgumentHint(name = "config_value", type = @DataTypeHint("STRING")) + }) + public String[] call(ProcedureContext context, String configKey, String configValue) + throws Exception { + return performSet(configKey, configValue); + } + + private String[] performSet(String configKey, @Nullable String configValue) throws Exception { + + try { + // Validate config key + if (configKey == null || configKey.trim().isEmpty()) { + throw new IllegalArgumentException( + "Config key cannot be null or empty. " + + "Please specify a valid configuration key."); + } + + configKey = configKey.trim(); + + // Determine operation type + AlterConfigOpType opType; + String operationDesc; + + if (configValue == null || configValue.trim().isEmpty()) { + // Delete operation - reset to default + opType = AlterConfigOpType.DELETE; + operationDesc = "deleted (reset to default)"; + } else { + // Set operation + opType = AlterConfigOpType.SET; + operationDesc = String.format("set to '%s'", configValue); + } + + // Construct configuration modification operation. + AlterConfig alterConfig = new AlterConfig(configKey, configValue, opType); + + // Call Admin API to modify cluster configuration + // This will trigger validation on CoordinatorServer before persistence + admin.alterClusterConfigs(Collections.singletonList(alterConfig)).get(); + + return new String[] { + String.format( + "Successfully %s configuration '%s'. " + + "The change is persisted in ZooKeeper and applied to all servers.", + operationDesc, configKey) + }; + + } catch (IllegalArgumentException e) { + // Re-throw validation errors with original message + throw e; + } catch (Exception e) { + // Wrap other exceptions with more context + throw new RuntimeException( + String.format( + "Failed to set cluster config '%s': %s", configKey, e.getMessage()), + e); + } + } +} diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/procedure/FlinkProcedureITCase.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/procedure/FlinkProcedureITCase.java index 92956432d8..61ed2c78dc 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/procedure/FlinkProcedureITCase.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/procedure/FlinkProcedureITCase.java @@ -91,7 +91,12 @@ void testShowProcedures() throws Exception { try (CloseableIterator showProceduresIterator = tEnv.executeSql("show procedures").collect()) { List expectedShowProceduresResult = - Arrays.asList("+I[sys.add_acl]", "+I[sys.drop_acl]", "+I[sys.list_acl]"); + Arrays.asList( + "+I[sys.add_acl]", + "+I[sys.drop_acl]", + "+I[sys.get_cluster_config]", + "+I[sys.list_acl]", + "+I[sys.set_cluster_config]"); // make sure no more results is unread. assertResultsIgnoreOrder(showProceduresIterator, expectedShowProceduresResult, true); } @@ -259,6 +264,133 @@ void testDisableAuthorization() throws Exception { } } + @Test + void testGetClusterConfig() throws Exception { + // Get specific config + try (CloseableIterator resultIterator = + tEnv.executeSql( + String.format( + "Call %s.sys.get_cluster_config('%s')", + CATALOG_NAME, + ConfigOptions.KV_SHARED_RATE_LIMITER_BYTES_PER_SEC.key())) + .collect()) { + List results = CollectionUtil.iteratorToList(resultIterator); + assertThat(results).hasSize(1); + Row row = results.get(0); + assertThat(row.getField(0)) + .isEqualTo(ConfigOptions.KV_SHARED_RATE_LIMITER_BYTES_PER_SEC.key()); + assertThat(row.getField(1)).isEqualTo("100 mb"); + assertThat(row.getField(2)).isNotNull(); // config_source + } + + // Get all configs + try (CloseableIterator resultIterator = + tEnv.executeSql(String.format("Call %s.sys.get_cluster_config()", CATALOG_NAME)) + .collect()) { + List results = CollectionUtil.iteratorToList(resultIterator); + assertThat(results).isNotEmpty(); + } + + // Get non-existent config + try (CloseableIterator resultIterator = + tEnv.executeSql( + String.format( + "Call %s.sys.get_cluster_config('non.existent.config')", + CATALOG_NAME)) + .collect()) { + List results = CollectionUtil.iteratorToList(resultIterator); + assertThat(results).hasSize(0); + } + + // reset cluster configs. + tEnv.executeSql( + String.format( + "Call %s.sys.set_cluster_config('%s')", + CATALOG_NAME, + ConfigOptions.KV_SHARED_RATE_LIMITER_BYTES_PER_SEC.key())) + .await(); + } + + @Test + void testSetClusterConfig() throws Exception { + // Test setting a valid config + try (CloseableIterator resultIterator = + tEnv.executeSql( + String.format( + "Call %s.sys.set_cluster_config('%s', '200MB')", + CATALOG_NAME, + ConfigOptions.KV_SHARED_RATE_LIMITER_BYTES_PER_SEC.key())) + .collect()) { + List results = CollectionUtil.iteratorToList(resultIterator); + assertThat(results).hasSize(1); + assertThat(results.get(0).getField(0)) + .asString() + .contains("Successfully set to '200MB'") + .contains(ConfigOptions.KV_SHARED_RATE_LIMITER_BYTES_PER_SEC.key()); + } + + // Verify the config was actually set + try (CloseableIterator resultIterator = + tEnv.executeSql( + String.format( + "Call %s.sys.get_cluster_config('%s')", + CATALOG_NAME, + ConfigOptions.KV_SHARED_RATE_LIMITER_BYTES_PER_SEC.key())) + .collect()) { + List results = CollectionUtil.iteratorToList(resultIterator); + assertThat(results).hasSize(1); + assertThat(results.get(0).getField(1)).isEqualTo("200MB"); + } + + // reset cluster configs. + tEnv.executeSql( + String.format( + "Call %s.sys.set_cluster_config('%s')", + CATALOG_NAME, + ConfigOptions.KV_SHARED_RATE_LIMITER_BYTES_PER_SEC.key())) + .await(); + } + + @Test + void testDeleteClusterConfig() throws Exception { + // First set a config + tEnv.executeSql( + String.format( + "Call %s.sys.set_cluster_config('%s', 'paimon')", + CATALOG_NAME, ConfigOptions.DATALAKE_FORMAT.key())) + .await(); + + // Delete the config (reset to default) - omitting the value parameter + try (CloseableIterator resultIterator = + tEnv.executeSql( + String.format( + "Call %s.sys.set_cluster_config('%s')", + CATALOG_NAME, ConfigOptions.DATALAKE_FORMAT.key())) + .collect()) { + List results = CollectionUtil.iteratorToList(resultIterator); + assertThat(results).hasSize(1); + assertThat(results.get(0).getField(0)) + .asString() + .contains("Successfully deleted") + .contains(ConfigOptions.DATALAKE_FORMAT.key()); + } + } + + @Test + void testSetClusterConfigValidation() throws Exception { + // Try to set an invalid config (not allowed for dynamic change) + assertThatThrownBy( + () -> + tEnv.executeSql( + String.format( + "Call %s.sys.set_cluster_config('invalid.config.key', 'value')", + CATALOG_NAME)) + .await()) + .rootCause() + .hasMessageContaining( + "The config key invalid.config.key is not allowed to be changed dynamically"); + } + private static Configuration initConfig() { Configuration conf = new Configuration(); conf.setInt(ConfigOptions.DEFAULT_REPLICATION_FACTOR, 3); @@ -272,6 +404,9 @@ private static Configuration initConfig() { conf.set(ConfigOptions.CLIENT_WRITER_BUFFER_MEMORY_SIZE, MemorySize.parse("1mb")); conf.set(ConfigOptions.CLIENT_WRITER_BATCH_SIZE, MemorySize.parse("1kb")); + // Enable shared RocksDB rate limiter for testing + conf.set(ConfigOptions.KV_SHARED_RATE_LIMITER_BYTES_PER_SEC, MemorySize.parse("100mb")); + // set security information. conf.setString(ConfigOptions.SERVER_SECURITY_PROTOCOL_MAP.key(), "CLIENT:sasl"); conf.setString("security.sasl.enabled.mechanisms", "plain"); diff --git a/fluss-server/src/main/java/org/apache/fluss/server/DynamicConfigManager.java b/fluss-server/src/main/java/org/apache/fluss/server/DynamicConfigManager.java index 1580a1c183..a6b678c2ef 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/DynamicConfigManager.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/DynamicConfigManager.java @@ -21,6 +21,7 @@ import org.apache.fluss.config.Configuration; import org.apache.fluss.config.cluster.AlterConfig; import org.apache.fluss.config.cluster.ConfigEntry; +import org.apache.fluss.config.cluster.ConfigValidator; import org.apache.fluss.config.cluster.ServerReconfigurable; import org.apache.fluss.exception.ConfigException; import org.apache.fluss.server.authorizer.ZkNodeChangeNotificationWatcher; @@ -76,6 +77,19 @@ public void register(ServerReconfigurable serverReconfigurable) { dynamicServerConfig.register(serverReconfigurable); } + /** + * Register a ConfigValidator for stateless validation. + * + *

Typically used by CoordinatorServer to validate configs for components it doesn't run + * (e.g., KvManager). Validators are stateless and only perform validation without requiring + * component instances. + * + * @param validator the config validator to register + */ + public void registerValidator(ConfigValidator validator) { + dynamicServerConfig.registerValidator(validator); + } + public void close() { configChangeListener.stop(); } diff --git a/fluss-server/src/main/java/org/apache/fluss/server/DynamicServerConfig.java b/fluss-server/src/main/java/org/apache/fluss/server/DynamicServerConfig.java index 7e767a1488..bb19aa3318 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/DynamicServerConfig.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/DynamicServerConfig.java @@ -18,7 +18,10 @@ package org.apache.fluss.server; import org.apache.fluss.annotation.Internal; +import org.apache.fluss.config.ConfigOption; +import org.apache.fluss.config.ConfigOptions; import org.apache.fluss.config.Configuration; +import org.apache.fluss.config.cluster.ConfigValidator; import org.apache.fluss.config.cluster.ServerReconfigurable; import org.apache.fluss.exception.ConfigException; import org.apache.fluss.utils.MapUtils; @@ -26,15 +29,20 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Set; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import static org.apache.fluss.config.ConfigOptions.DATALAKE_FORMAT; +import static org.apache.fluss.config.ConfigOptions.KV_SHARED_RATE_LIMITER_BYTES_PER_SEC; import static org.apache.fluss.utils.concurrent.LockUtils.inReadLock; import static org.apache.fluss.utils.concurrent.LockUtils.inWriteLock; @@ -49,13 +57,19 @@ class DynamicServerConfig { private static final Logger LOG = LoggerFactory.getLogger(DynamicServerConfig.class); private static final Set ALLOWED_CONFIG_KEYS = - Collections.singleton(DATALAKE_FORMAT.key()); + new HashSet<>( + Arrays.asList( + DATALAKE_FORMAT.key(), KV_SHARED_RATE_LIMITER_BYTES_PER_SEC.key())); private static final Set ALLOWED_CONFIG_PREFIXES = Collections.singleton("datalake."); private final ReadWriteLock lock = new ReentrantReadWriteLock(); private final Map, ServerReconfigurable> serverReconfigures = MapUtils.newConcurrentHashMap(); + /** Registered stateless config validators, organized by config key for efficient lookup. */ + private final Map>> configValidatorsByKey = + MapUtils.newConcurrentHashMap(); + /** The initial configuration items when the server starts from server.yaml. */ private final Map initialConfigMap; @@ -83,6 +97,25 @@ void register(ServerReconfigurable serverReconfigurable) { serverReconfigures.put(serverReconfigurable.getClass(), serverReconfigurable); } + /** + * Register a ConfigValidator for stateless validation. + * + *

This is typically used by CoordinatorServer to validate configs for components it doesn't + * run (e.g., KvManager). Validators are stateless and only perform validation without requiring + * component instances. + * + *

Validators are organized by config key for efficient lookup. Multiple validators can be + * registered for the same config key. + * + * @param validator the config validator to register + */ + void registerValidator(ConfigValidator validator) { + String configKey = validator.configKey(); + configValidatorsByKey + .computeIfAbsent(configKey, k -> new CopyOnWriteArrayList<>()) + .add(validator); + } + /** * Update the dynamic configuration and apply to registered ServerReconfigurable. If skipping * error config, only the error one will be ignored. @@ -115,68 +148,300 @@ boolean isAllowedConfig(String key) { private void updateCurrentConfig(Map newDynamicConfigs, boolean skipErrorConfig) throws Exception { - Map newProps = new HashMap<>(initialConfigMap); - overrideProps(newProps, newDynamicConfigs); - Configuration newConfig = Configuration.fromMap(newProps); - Configuration oldConfig = currentConfig; - Set appliedServerReconfigurableSet = new HashSet<>(); - if (!newProps.equals(currentConfigMap)) { - serverReconfigures - .values() - .forEach( - serverReconfigurable -> { - try { - serverReconfigurable.validate(newConfig); - } catch (ConfigException e) { - LOG.error( - "Validate new dynamic config error and will roll back all the applied config.", - e); - if (!skipErrorConfig) { - throw e; - } - } - }); - - Exception throwable = null; - for (ServerReconfigurable serverReconfigurable : serverReconfigures.values()) { - try { - serverReconfigurable.reconfigure(newConfig); - appliedServerReconfigurableSet.add(serverReconfigurable); - } catch (ConfigException e) { - LOG.error( - "Apply new dynamic error and will roll back all the applied config.", - e); - if (!skipErrorConfig) { - throwable = e; - break; - } - } + // Compute effective config changes (merge with initial configs) + Map effectiveChanges = + computeEffectiveChanges(newDynamicConfigs, skipErrorConfig); + + // Early return if no effective changes + if (effectiveChanges.isEmpty()) { + LOG.info("No effective config changes detected for: {}", newDynamicConfigs); + return; + } + + // Build new configuration by merging initial + dynamic configs + Map newConfigMap = buildConfigMap(effectiveChanges); + Configuration newConfig = Configuration.fromMap(newConfigMap); + + // Apply changes to all registered ServerReconfigurable instances + applyToServerReconfigurables(newConfig, skipErrorConfig); + + // Update internal state + updateInternalState(newConfig, newConfigMap, newDynamicConfigs); + LOG.info("Dynamic configs changed: {}", effectiveChanges); + } + + /** + * Computes effective config changes by validating new configs and handling deletions. + * + * @param newDynamicConfigs new dynamic configs from ZooKeeper + * @param skipErrorConfig whether to skip invalid configs + * @return map of config changes that passed validation + * @throws ConfigException if validation fails and skipErrorConfig is false + */ + private Map computeEffectiveChanges( + Map newDynamicConfigs, boolean skipErrorConfig) throws ConfigException { + Map effectiveChanges = new HashMap<>(); + Set skippedConfigs = new HashSet<>(); + + // Process deleted configs: restore to initial value or remove + processDeletions(newDynamicConfigs, effectiveChanges, skippedConfigs, skipErrorConfig); + + // Process added/modified configs + processModifications(newDynamicConfigs, effectiveChanges, skippedConfigs, skipErrorConfig); + + if (!skippedConfigs.isEmpty()) { + LOG.warn("Skipped invalid configs: {}", skippedConfigs); + } + + return effectiveChanges; + } + + /** Processes config deletions by restoring to initial values or removing them. */ + private void processDeletions( + Map newDynamicConfigs, + Map effectiveChanges, + Set skippedConfigs, + boolean skipErrorConfig) + throws ConfigException { + for (String configKey : dynamicConfigs.keySet()) { + if (newDynamicConfigs.containsKey(configKey)) { + continue; // Not deleted + } + + String currentValue = currentConfigMap.get(configKey); + String initialValue = initialConfigMap.get(configKey); + + // Determine target value: initial value or null (removal) + String targetValue = initialValue; + + // Skip if no change needed (already at initial value) + if (Objects.equals(currentValue, targetValue)) { + continue; + } + + // Validate the change + if (validateConfigChange( + configKey, currentValue, targetValue, skippedConfigs, skipErrorConfig)) { + effectiveChanges.put(configKey, targetValue); } + } + } + + /** Processes config additions and modifications. */ + private void processModifications( + Map newDynamicConfigs, + Map effectiveChanges, + Set skippedConfigs, + boolean skipErrorConfig) + throws ConfigException { + for (Map.Entry entry : newDynamicConfigs.entrySet()) { + String configKey = entry.getKey(); + String newValue = entry.getValue(); + String currentValue = currentConfigMap.get(configKey); - // rollback to old config if there is an error. - if (throwable != null) { - appliedServerReconfigurableSet.forEach( - serverReconfigurable -> serverReconfigurable.reconfigure(oldConfig)); - throw throwable; + // Skip if value unchanged + if (Objects.equals(currentValue, newValue)) { + continue; } - currentConfig = newConfig; - currentConfigMap.clear(); - dynamicConfigs.clear(); - currentConfigMap.putAll(newProps); - dynamicConfigs.putAll(newDynamicConfigs); - LOG.info("Dynamic configs changed: {}", newDynamicConfigs); + // Validate and add to effective changes + if (validateConfigChange( + configKey, currentValue, newValue, skippedConfigs, skipErrorConfig)) { + effectiveChanges.put(configKey, newValue); + } } } - private void overrideProps(Map props, Map propsOverride) { - propsOverride.forEach( + /** + * Validates a single config change. + * + * @return true if validation passed, false if skipped due to error + * @throws ConfigException if validation fails and skipErrorConfig is false + */ + private boolean validateConfigChange( + String configKey, + String oldValue, + String newValue, + Set skippedConfigs, + boolean skipErrorConfig) + throws ConfigException { + try { + validateSingleConfig(configKey, oldValue, newValue); + return true; + } catch (ConfigException e) { + LOG.error( + "Config validation failed for '{}': {} -> {}. {}", + configKey, + oldValue, + newValue, + e.getMessage()); + if (skipErrorConfig) { + skippedConfigs.add(configKey); + return false; + } else { + throw e; + } + } + } + + /** Builds final config map by merging initial configs with effective changes. */ + private Map buildConfigMap(Map effectiveChanges) { + Map configMap = new HashMap<>(initialConfigMap); + effectiveChanges.forEach( (key, value) -> { if (value == null) { - props.remove(key); + configMap.remove(key); } else { - props.put(key, value); + configMap.put(key, value); } }); + return configMap; + } + + /** Updates internal state after successful reconfiguration. */ + private void updateInternalState( + Configuration newConfig, + Map newConfigMap, + Map newDynamicConfigs) { + currentConfig = newConfig; + currentConfigMap.clear(); + currentConfigMap.putAll(newConfigMap); + dynamicConfigs.clear(); + dynamicConfigs.putAll(newDynamicConfigs); + } + + /** + * Validates a single config entry including type parsing and business validation. + * + * @param configKey config key + * @param oldValueStr old value string + * @param newValueStr new value string + * @throws ConfigException if validation fails + */ + private void validateSingleConfig(String configKey, String oldValueStr, String newValueStr) + throws ConfigException { + // Get ConfigOption for type information + ConfigOption configOption = ConfigOptions.getConfigOption(configKey); + + // For configs with allowed prefixes (like "datalake."), skip ConfigOption validation + // and rely on ServerReconfigurable's business validation + boolean hasPrefixConfig = false; + for (String prefix : ALLOWED_CONFIG_PREFIXES) { + if (configKey.startsWith(prefix)) { + hasPrefixConfig = true; + break; + } + } + + if (configOption == null && !hasPrefixConfig) { + throw new ConfigException( + String.format("No ConfigOption found for config key: %s", configKey)); + } + + // Parse and validate type only if ConfigOption exists + Object newValue = null; + if (configOption != null && newValueStr != null) { + Configuration tempConfig = new Configuration(); + tempConfig.setString(configKey, newValueStr); + try { + newValue = tempConfig.getOptional(configOption).get(); + } catch (Exception e) { + String causeMessage = + e.getMessage() != null ? e.getMessage() : e.getClass().getSimpleName(); + throw new ConfigException( + String.format( + "Cannot parse '%s' as %s for config '%s': %s", + newValueStr, + configOption.isList() + ? "List<" + configOption.getClazz().getSimpleName() + ">" + : configOption.getClazz().getSimpleName(), + configKey, + causeMessage), + e); + } + } + + // Business validation with registered validators (if any) + List> validators = configValidatorsByKey.get(configKey); + if (validators != null && !validators.isEmpty() && configOption != null) { + Object oldValue = + oldValueStr != null + ? currentConfig.getOptional(configOption).orElse(null) + : null; + for (ConfigValidator validator : validators) { + invokeValidator(validator, oldValue, newValue); + } + } + } + + /** + * Applies new configuration to all ServerReconfigurable instances with rollback support. + * + * @param newConfig new configuration to apply + * @param skipErrorConfig whether to skip errors + * @throws Exception if apply fails and skipErrorConfig is false + */ + private void applyToServerReconfigurables(Configuration newConfig, boolean skipErrorConfig) + throws Exception { + Configuration oldConfig = currentConfig; + Set appliedSet = new HashSet<>(); + + // Validate all first + for (ServerReconfigurable reconfigurable : serverReconfigures.values()) { + try { + reconfigurable.validate(newConfig); + } catch (ConfigException e) { + LOG.error( + "Validation failed for {}: {}", + reconfigurable.getClass().getSimpleName(), + e.getMessage(), + e); + if (!skipErrorConfig) { + throw e; + } + } + } + + // Apply to all instances + Exception throwable = null; + for (ServerReconfigurable reconfigurable : serverReconfigures.values()) { + try { + reconfigurable.reconfigure(newConfig); + appliedSet.add(reconfigurable); + } catch (ConfigException e) { + LOG.error( + "Reconfiguration failed for {}: {}", + reconfigurable.getClass().getSimpleName(), + e.getMessage(), + e); + if (!skipErrorConfig) { + throwable = e; + break; + } + } + } + + // Rollback if there was an error + if (throwable != null) { + appliedSet.forEach(r -> r.reconfigure(oldConfig)); + throw throwable; + } + } + + /** + * Invokes validator with strongly-typed values. + * + * @param validator the config validator to invoke + * @param oldValue the old typed value + * @param newValue the new typed value + * @throws ConfigException if validation fails + */ + @SuppressWarnings({"unchecked", "rawtypes"}) + private void invokeValidator(ConfigValidator validator, Object oldValue, Object newValue) + throws ConfigException { + // Invoke validator with typed values + // We suppress unchecked warning because we trust that the validator + // is registered for the correct type matching the ConfigOption + ((ConfigValidator) validator).validate(oldValue, newValue); } } diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorServer.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorServer.java index 83bba2c52f..cd546f4479 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorServer.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorServer.java @@ -173,7 +173,10 @@ protected void startServices() throws Exception { this.lakeCatalogDynamicLoader = new LakeCatalogDynamicLoader(conf, pluginManager, true); this.dynamicConfigManager = new DynamicConfigManager(zkClient, conf, true); + + // Register server reconfigurable components dynamicConfigManager.register(lakeCatalogDynamicLoader); + dynamicConfigManager.startup(); this.coordinatorContext = new CoordinatorContext(); diff --git a/fluss-server/src/main/java/org/apache/fluss/server/kv/KvManager.java b/fluss-server/src/main/java/org/apache/fluss/server/kv/KvManager.java index b5318ef60b..4353530903 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/kv/KvManager.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/KvManager.java @@ -20,7 +20,10 @@ import org.apache.fluss.compression.ArrowCompressionInfo; import org.apache.fluss.config.ConfigOptions; import org.apache.fluss.config.Configuration; +import org.apache.fluss.config.MemorySize; import org.apache.fluss.config.TableConfig; +import org.apache.fluss.config.cluster.ServerReconfigurable; +import org.apache.fluss.exception.ConfigException; import org.apache.fluss.exception.KvStorageException; import org.apache.fluss.fs.FileSystem; import org.apache.fluss.fs.FsPath; @@ -45,6 +48,9 @@ import org.apache.fluss.utils.MapUtils; import org.apache.fluss.utils.types.Tuple2; +import org.rocksdb.RateLimiter; +import org.rocksdb.RateLimiterMode; +import org.rocksdb.RocksDB; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -65,9 +71,42 @@ * the individual instances. */ @ThreadSafe -public final class KvManager extends TabletManagerBase { +public final class KvManager extends TabletManagerBase implements ServerReconfigurable { private static final Logger LOG = LoggerFactory.getLogger(KvManager.class); + + /** + * Default global rate limiter with unlimited rate (Long.MAX_VALUE bytes per second). + * + *

This is used by RocksDBResourceContainer when no rate limiter is explicitly provided, + * ensuring the API is safer and more robust by avoiding null checks throughout the code. + */ + private static final RateLimiter DEFAULT_RATE_LIMITER = createDefaultRateLimiter(); + + /** + * Creates a default rate limiter with unlimited rate (Long.MAX_VALUE bytes per second). + * + * @return a default rate limiter instance + */ + private static RateLimiter createDefaultRateLimiter() { + RocksDB.loadLibrary(); + // Create a rate limiter with unlimited rate (effectively no limit) + // Using default refill period and fairness values + return new RateLimiter(Long.MAX_VALUE); + } + + /** + * Returns the default global rate limiter with unlimited rate. + * + *

This method provides access to the default rate limiter for use in + * RocksDBResourceContainer when no rate limiter is explicitly provided. + * + * @return the default rate limiter instance + */ + public static RateLimiter getDefaultRateLimiter() { + return DEFAULT_RATE_LIMITER; + } + private final LogManager logManager; private final TabletServerMetricGroup serverMetricGroup; @@ -89,6 +128,16 @@ public final class KvManager extends TabletManagerBase { private final FileSystem remoteFileSystem; + /** + * The shared rate limiter for all RocksDB instances to control flush and compaction write rate. + */ + private final RateLimiter sharedRocksDBRateLimiter; + + /** Current shared rate limiter configuration in bytes per second. */ + private volatile long currentSharedRateLimitBytesPerSec; + + private volatile boolean isShutdown = false; + private KvManager( File dataDir, Configuration conf, @@ -105,6 +154,27 @@ private KvManager( this.remoteKvDir = FlussPaths.remoteKvDir(conf); this.remoteFileSystem = remoteKvDir.getFileSystem(); this.serverMetricGroup = tabletServerMetricGroup; + this.sharedRocksDBRateLimiter = createSharedRateLimiter(conf); + this.currentSharedRateLimitBytesPerSec = + conf.get(ConfigOptions.KV_SHARED_RATE_LIMITER_BYTES_PER_SEC).getBytes(); + } + + private static RateLimiter createSharedRateLimiter(Configuration conf) { + long sharedRateLimitBytesPerSecond = + conf.get(ConfigOptions.KV_SHARED_RATE_LIMITER_BYTES_PER_SEC).getBytes(); + + RocksDB.loadLibrary(); + // Always create a shared rate limiter with the configured rate limit. + // The rate limiter is always enabled with a default value of Long.MAX_VALUE (effectively + // unlimited). + // This avoids the overhead of dynamically enabling/disabling the rate limiter. + // refill_period_us is set to 100ms, fairness is set to 10 + return new RateLimiter( + sharedRateLimitBytesPerSecond, + RateLimiter.DEFAULT_REFILL_PERIOD_MICROS, + RateLimiter.DEFAULT_FAIRNESS, + RateLimiterMode.WRITES_ONLY, + false); } public static KvManager create( @@ -130,6 +200,7 @@ public void startup() { public void shutdown() { LOG.info("Shutting down KvManager"); + isShutdown = true; List kvs = new ArrayList<>(currentKvs.values()); for (KvTablet kvTablet : kvs) { try { @@ -140,6 +211,9 @@ public void shutdown() { } arrowBufferAllocator.close(); memorySegmentPool.close(); + if (sharedRocksDBRateLimiter != null) { + sharedRocksDBRateLimiter.close(); + } LOG.info("Shut down KvManager complete."); } @@ -176,6 +250,8 @@ public KvTablet getOrCreateKv( RowMerger merger = RowMerger.create(tableConfig, kvFormat); KvTablet tablet = KvTablet.create( + tablePath, + tableBucket, logTablet, tabletDir, conf, @@ -186,7 +262,8 @@ public KvTablet getOrCreateKv( merger, arrowCompressionInfo, schemaGetter, - tableConfig.getChangelogImage()); + tableConfig.getChangelogImage(), + sharedRocksDBRateLimiter); currentKvs.put(tableBucket, tablet); LOG.info( @@ -294,7 +371,8 @@ public KvTablet loadKv(File tabletDir, SchemaGetter schemaGetter) throws Excepti rowMerger, tableConfig.getArrowCompressionInfo(), schemaGetter, - tableConfig.getChangelogImage()); + tableConfig.getChangelogImage(), + sharedRocksDBRateLimiter); if (this.currentKvs.containsKey(tableBucket)) { throw new IllegalStateException( String.format( @@ -327,4 +405,55 @@ public void deleteRemoteKvSnapshot( e); } } + + // ============ ServerReconfigurable Implementation ============ + + @Override + public void validate(Configuration newConfig) throws ConfigException { + // Config validation is already handled by KvConfigValidator which is registered + // on both CoordinatorServer and TabletServer. Here we only need to check runtime state. + + // Check if KvManager is in a valid state to accept reconfiguration + if (isShutdown) { + throw new ConfigException("Cannot reconfigure KvManager during shutdown"); + } + + // All config value validations are delegated to KvConfigValidator + LOG.debug("KvManager runtime state validation passed for reconfiguration"); + } + + @Override + public void reconfigure(Configuration newConfig) throws ConfigException { + long newSharedRateLimitBytes = + newConfig.get(ConfigOptions.KV_SHARED_RATE_LIMITER_BYTES_PER_SEC).getBytes(); + + // If value hasn't changed, skip + if (newSharedRateLimitBytes == currentSharedRateLimitBytesPerSec) { + LOG.debug( + "Shared RocksDB rate limiter config unchanged: {} bytes/sec", + newSharedRateLimitBytes); + return; + } + + long oldValue = currentSharedRateLimitBytesPerSec; + + try { + // Apply new configuration using RocksDB API (thread-safe) + // The rate limiter is always enabled, so we can safely reconfigure it + sharedRocksDBRateLimiter.setBytesPerSecond(newSharedRateLimitBytes); + currentSharedRateLimitBytesPerSec = newSharedRateLimitBytes; + + LOG.info( + "Shared RocksDB rate limiter reconfigured: {} bytes/sec ({}) -> {} bytes/sec ({})", + oldValue, + new MemorySize(oldValue).toHumanReadableString(), + newSharedRateLimitBytes, + new MemorySize(newSharedRateLimitBytes).toHumanReadableString()); + + } catch (Exception e) { + // If setting fails, throw ConfigException to trigger rollback + throw new ConfigException( + "Failed to reconfigure shared RocksDB rate limiter: " + e.getMessage(), e); + } + } } diff --git a/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java b/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java index 1f1fb5f0f5..0dde0a53df 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java @@ -67,9 +67,8 @@ import org.apache.fluss.types.RowType; import org.apache.fluss.utils.BytesUtils; import org.apache.fluss.utils.FileUtils; -import org.apache.fluss.utils.FlussPaths; -import org.apache.fluss.utils.types.Tuple2; +import org.rocksdb.RateLimiter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -161,37 +160,6 @@ private KvTablet( this.changelogImage = changelogImage; } - public static KvTablet create( - LogTablet logTablet, - File kvTabletDir, - Configuration serverConf, - TabletServerMetricGroup serverMetricGroup, - BufferAllocator arrowBufferAllocator, - MemorySegmentPool memorySegmentPool, - KvFormat kvFormat, - RowMerger rowMerger, - ArrowCompressionInfo arrowCompressionInfo, - SchemaGetter schemaGetter, - ChangelogImage changelogImage) - throws IOException { - Tuple2 tablePathAndBucket = - FlussPaths.parseTabletDir(kvTabletDir); - return create( - tablePathAndBucket.f0, - tablePathAndBucket.f1, - logTablet, - kvTabletDir, - serverConf, - serverMetricGroup, - arrowBufferAllocator, - memorySegmentPool, - kvFormat, - rowMerger, - arrowCompressionInfo, - schemaGetter, - changelogImage); - } - public static KvTablet create( PhysicalTablePath tablePath, TableBucket tableBucket, @@ -205,9 +173,10 @@ public static KvTablet create( RowMerger rowMerger, ArrowCompressionInfo arrowCompressionInfo, SchemaGetter schemaGetter, - ChangelogImage changelogImage) + ChangelogImage changelogImage, + RateLimiter sharedRateLimiter) throws IOException { - RocksDBKv kv = buildRocksDBKv(serverConf, kvTabletDir); + RocksDBKv kv = buildRocksDBKv(serverConf, kvTabletDir, sharedRateLimiter); return new KvTablet( tablePath, tableBucket, @@ -226,10 +195,11 @@ public static KvTablet create( changelogImage); } - private static RocksDBKv buildRocksDBKv(Configuration configuration, File kvDir) + private static RocksDBKv buildRocksDBKv( + Configuration configuration, File kvDir, RateLimiter sharedRateLimiter) throws IOException { RocksDBResourceContainer rocksDBResourceContainer = - new RocksDBResourceContainer(configuration, kvDir); + new RocksDBResourceContainer(configuration, kvDir, false, sharedRateLimiter); RocksDBKvBuilder rocksDBKvBuilder = new RocksDBKvBuilder( kvDir, diff --git a/fluss-server/src/main/java/org/apache/fluss/server/kv/rocksdb/RocksDBResourceContainer.java b/fluss-server/src/main/java/org/apache/fluss/server/kv/rocksdb/RocksDBResourceContainer.java index d437c08782..4495613e65 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/kv/rocksdb/RocksDBResourceContainer.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/rocksdb/RocksDBResourceContainer.java @@ -22,6 +22,7 @@ import org.apache.fluss.config.ConfigOptions; import org.apache.fluss.config.Configuration; import org.apache.fluss.config.ReadableConfig; +import org.apache.fluss.server.kv.KvManager; import org.apache.fluss.utils.FileUtils; import org.apache.fluss.utils.IOUtils; @@ -33,6 +34,7 @@ import org.rocksdb.DBOptions; import org.rocksdb.InfoLogLevel; import org.rocksdb.PlainTableConfig; +import org.rocksdb.RateLimiter; import org.rocksdb.ReadOptions; import org.rocksdb.Statistics; import org.rocksdb.TableFormatConfig; @@ -48,6 +50,8 @@ import java.util.Collection; import java.util.List; +import static org.apache.fluss.utils.Preconditions.checkNotNull; + /* This file is based on source code of Apache Flink Project (https://flink.apache.org/), licensed by the Apache * Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for * additional information regarding copyright ownership. */ @@ -73,22 +77,33 @@ public class RocksDBResourceContainer implements AutoCloseable { private final boolean enableStatistics; + /** The shared rate limiter for all RocksDB instances. */ + private final RateLimiter sharedRateLimiter; + /** The handles to be closed when the container is closed. */ private final ArrayList handlesToClose; @VisibleForTesting RocksDBResourceContainer() { - this(new Configuration(), null, false); + this(new Configuration(), null, false, KvManager.getDefaultRateLimiter()); } public RocksDBResourceContainer(ReadableConfig configuration, @Nullable File instanceBasePath) { - this(configuration, instanceBasePath, false); + this(configuration, instanceBasePath, false, KvManager.getDefaultRateLimiter()); } public RocksDBResourceContainer( ReadableConfig configuration, @Nullable File instanceBasePath, boolean enableStatistics) { + this(configuration, instanceBasePath, enableStatistics, KvManager.getDefaultRateLimiter()); + } + + public RocksDBResourceContainer( + ReadableConfig configuration, + @Nullable File instanceBasePath, + boolean enableStatistics, + RateLimiter sharedRateLimiter) { this.configuration = configuration; this.instanceRocksDBPath = @@ -96,6 +111,8 @@ public RocksDBResourceContainer( ? RocksDBKvBuilder.getInstanceRocksDBPath(instanceBasePath) : null; this.enableStatistics = enableStatistics; + this.sharedRateLimiter = + checkNotNull(sharedRateLimiter, "sharedRateLimiter must not be null"); this.handlesToClose = new ArrayList<>(); } @@ -117,6 +134,9 @@ public DBOptions getDbOptions() throws IOException { // add necessary default options opt = opt.setCreateIfMissing(true); + // set shared rate limiter + opt.setRateLimiter(sharedRateLimiter); + if (enableStatistics) { Statistics statistics = new Statistics(); opt.setStatistics(statistics); diff --git a/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletServer.java b/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletServer.java index bd185ec62c..8eed63c844 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletServer.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletServer.java @@ -220,7 +220,6 @@ protected void startServices() throws Exception { new MetadataManager(zkClient, conf, lakeCatalogDynamicLoader); this.dynamicConfigManager = new DynamicConfigManager(zkClient, conf, false); dynamicConfigManager.register(lakeCatalogDynamicLoader); - dynamicConfigManager.startup(); this.metadataCache = new TabletServerMetadataCache(metadataManager); @@ -231,6 +230,11 @@ protected void startServices() throws Exception { this.kvManager = KvManager.create(conf, zkClient, logManager, tabletServerMetricGroup); kvManager.startup(); + // Register kvManager to dynamicConfigManager for dynamic reconfiguration + dynamicConfigManager.register(kvManager); + // Start dynamicConfigManager after all reconfigurable components are registered + dynamicConfigManager.startup(); + this.authorizer = AuthorizerLoader.createAuthorizer(conf, zkClient, pluginManager); if (authorizer != null) { authorizer.startup(); diff --git a/fluss-server/src/test/java/org/apache/fluss/server/DynamicConfigChangeTest.java b/fluss-server/src/test/java/org/apache/fluss/server/DynamicConfigChangeTest.java index b19ff40e15..2ba44ac41c 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/DynamicConfigChangeTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/DynamicConfigChangeTest.java @@ -46,6 +46,7 @@ import static org.apache.fluss.metadata.DataLakeFormat.PAIMON; import static org.apache.fluss.testutils.common.CommonTestUtils.retry; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatCode; import static org.assertj.core.api.Assertions.assertThatThrownBy; /** Test for {@link DynamicConfigManager}. */ @@ -173,7 +174,7 @@ void testUnknownLakeHouse() throws Exception { "unknown", AlterConfigOpType.SET)))) .hasMessageContaining( - "Could not parse value 'unknown' for key 'datalake.format'"); + "Cannot parse 'unknown' as DataLakeFormat for config 'datalake.format'"); assertThat(lakeCatalogDynamicLoader.getLakeCatalogContainer().getDataLakeFormat()) .isNull(); @@ -256,4 +257,94 @@ void testReStartupContainsNoMatchedDynamicConfig() throws Exception { .isEqualTo(PAIMON); } } + + @Test + void testPreventInvalidConfig() throws Exception { + // Test that generic type validation prevents invalid config values + Configuration configuration = new Configuration(); + configuration.setString(ConfigOptions.KV_SHARED_RATE_LIMITER_BYTES_PER_SEC.key(), "100MB"); + + DynamicConfigManager dynamicConfigManager = + new DynamicConfigManager(zookeeperClient, configuration, true); + dynamicConfigManager.startup(); + + // Try to set rate limiter to an invalid value - should be rejected by generic type + // validation + assertThatThrownBy( + () -> + dynamicConfigManager.alterConfigs( + Collections.singletonList( + new AlterConfig( + ConfigOptions + .KV_SHARED_RATE_LIMITER_BYTES_PER_SEC + .key(), + "invalid_value", + AlterConfigOpType.SET)))) + .isInstanceOf(ConfigException.class) + .hasMessageContaining( + "Cannot parse 'invalid_value' as MemorySize for config 'kv.rocksdb.shared-rate-limiter.bytes-per-sec'"); + } + + @Test + void testConfigValidatorAllowsValidChange() throws Exception { + // Test that generic type validation allows valid config values + Configuration configuration = new Configuration(); + configuration.setString(ConfigOptions.KV_SHARED_RATE_LIMITER_BYTES_PER_SEC.key(), "100MB"); + + DynamicConfigManager dynamicConfigManager = + new DynamicConfigManager(zookeeperClient, configuration, true); + dynamicConfigManager.startup(); + + // Adjust rate limiter value - should succeed + assertThatCode( + () -> + dynamicConfigManager.alterConfigs( + Collections.singletonList( + new AlterConfig( + ConfigOptions + .KV_SHARED_RATE_LIMITER_BYTES_PER_SEC + .key(), + "200MB", + AlterConfigOpType.SET)))) + .doesNotThrowAnyException(); + + // Verify config was persisted to ZK + Map zkConfig = zookeeperClient.fetchEntityConfig(); + assertThat(zkConfig.get(ConfigOptions.KV_SHARED_RATE_LIMITER_BYTES_PER_SEC.key())) + .isEqualTo("200MB"); + } + + @Test + void testConfigValidatorWithMultipleValidators() throws Exception { + Configuration configuration = new Configuration(); + configuration.setString(ConfigOptions.KV_SHARED_RATE_LIMITER_BYTES_PER_SEC.key(), "100MB"); + + try (LakeCatalogDynamicLoader lakeCatalogDynamicLoader = + new LakeCatalogDynamicLoader(configuration, null, true)) { + DynamicConfigManager dynamicConfigManager = + new DynamicConfigManager(zookeeperClient, configuration, true); + + // Register reconfigurables - generic type validation works automatically + dynamicConfigManager.register(lakeCatalogDynamicLoader); + dynamicConfigManager.startup(); + + // Change multiple configs - generic validation applies to all + dynamicConfigManager.alterConfigs( + Arrays.asList( + new AlterConfig( + ConfigOptions.KV_SHARED_RATE_LIMITER_BYTES_PER_SEC.key(), + "200MB", + AlterConfigOpType.SET), + new AlterConfig( + DATALAKE_FORMAT.key(), "paimon", AlterConfigOpType.SET))); + + // Verify both configs were applied + Map zkConfig = zookeeperClient.fetchEntityConfig(); + assertThat(zkConfig.get(ConfigOptions.KV_SHARED_RATE_LIMITER_BYTES_PER_SEC.key())) + .isEqualTo("200MB"); + assertThat(zkConfig.get(DATALAKE_FORMAT.key())).isEqualTo("paimon"); + assertThat(lakeCatalogDynamicLoader.getLakeCatalogContainer().getDataLakeFormat()) + .isEqualTo(PAIMON); + } + } } diff --git a/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletTest.java b/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletTest.java index 94d41bffe0..06db30b340 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletTest.java @@ -191,7 +191,8 @@ private KvTablet createKvTablet( rowMerger, DEFAULT_COMPRESSION, schemaGetter, - tableConf.getChangelogImage()); + tableConf.getChangelogImage(), + KvManager.getDefaultRateLimiter()); } @Test diff --git a/website/docs/engine-flink/datastream.mdx b/website/docs/engine-flink/datastream.mdx index c9aaa3798a..a578cdaacf 100644 --- a/website/docs/engine-flink/datastream.mdx +++ b/website/docs/engine-flink/datastream.mdx @@ -1,6 +1,6 @@ --- title: "DataStream API" -sidebar_position: 7 +sidebar_position: 8 --- # DataStream API diff --git a/website/docs/engine-flink/delta-joins.md b/website/docs/engine-flink/delta-joins.md index dea8367f08..deeb9ed74f 100644 --- a/website/docs/engine-flink/delta-joins.md +++ b/website/docs/engine-flink/delta-joins.md @@ -1,7 +1,7 @@ --- sidebar_label: Delta Joins title: Flink Delta Joins -sidebar_position: 6 +sidebar_position: 7 --- # The Delta Join diff --git a/website/docs/engine-flink/getting-started.md b/website/docs/engine-flink/getting-started.md index f8573454ce..f2527774c2 100644 --- a/website/docs/engine-flink/getting-started.md +++ b/website/docs/engine-flink/getting-started.md @@ -34,6 +34,7 @@ For Flink's Table API, Fluss supports the following features: | [SQL Show Partitions](ddl.md#show-partitions) | ✔️ | | | [SQL Add Partition](ddl.md#add-partition) | ✔️ | | | [SQL Drop Partition](ddl.md#drop-partition) | ✔️ | | +| [Procedures](ddl.md#procedures) | ✔️ | ACL management and cluster configuration | | [SQL Select](reads.md) | ✔️ | Support both streaming and batch mode. | | [SQL Limit](reads.md#limit-read) | ✔️ | Only for Log Table | | [SQL Insert Into](writes.md) | ✔️ | Support both streaming and batch mode. | diff --git a/website/docs/engine-flink/lookups.md b/website/docs/engine-flink/lookups.md index f4e5abab18..835ee94924 100644 --- a/website/docs/engine-flink/lookups.md +++ b/website/docs/engine-flink/lookups.md @@ -1,7 +1,7 @@ --- sidebar_label: Lookups title: Flink Lookup Joins -sidebar_position: 5 +sidebar_position: 6 --- # Flink Lookup Joins diff --git a/website/docs/engine-flink/options.md b/website/docs/engine-flink/options.md index d6462fbfa0..2fecb25dd6 100644 --- a/website/docs/engine-flink/options.md +++ b/website/docs/engine-flink/options.md @@ -1,6 +1,6 @@ --- title: Connector Options -sidebar_position: 8 +sidebar_position: 9 --- # Connector Options diff --git a/website/docs/engine-flink/procedures.md b/website/docs/engine-flink/procedures.md new file mode 100644 index 0000000000..ac63113346 --- /dev/null +++ b/website/docs/engine-flink/procedures.md @@ -0,0 +1,255 @@ +--- +sidebar_label: Procedures +title: Procedures +sidebar_position: 3 +--- + +# Procedures + +Fluss provides stored procedures to perform administrative and management operations through Flink SQL. All procedures are located in the `sys` namespace and can be invoked using the `CALL` statement. + +## Available Procedures + +You can list all available procedures using: + +```sql title="Flink SQL" +SHOW PROCEDURES; +``` + +## Access Control Procedures + +Fluss provides procedures to manage Access Control Lists (ACLs) for security and authorization. See the [Security](../security/overview.md) documentation for more details. + +### add_acl + +Add an ACL entry to grant permissions to a principal. + +**Syntax:** + +```sql +CALL [catalog_name.]sys.add_acl( + resource => 'STRING', + permission => 'STRING', + principal => 'STRING', + operation => 'STRING', + host => 'STRING' -- optional, defaults to '*' +) +``` + +**Parameters:** + +- `resource` (required): The resource to grant permissions on. Can be `'CLUSTER'` for cluster-level permissions or a specific resource name (e.g., database or table name). +- `permission` (required): The permission type to grant. Valid values are `'ALLOW'` or `'DENY'`. +- `principal` (required): The principal to grant permissions to, in the format `'Type:Name'` (e.g., `'User:Alice'`). +- `operation` (required): The operation type to grant. Valid values include `'READ'`, `'WRITE'`, `'CREATE'`, `'DELETE'`, `'ALTER'`, `'DESCRIBE'`, `'CLUSTER_ACTION'`, `'IDEMPOTENT_WRITE'`. +- `host` (optional): The host from which the principal can access the resource. Defaults to `'*'` (all hosts). + +**Example:** + +```sql title="Flink SQL" +-- Use the Fluss catalog (replace 'fluss_catalog' with your catalog name if different) +USE fluss_catalog; + +-- Grant read permission to user Alice from any host +CALL sys.add_acl( + resource => 'CLUSTER', + permission => 'ALLOW', + principal => 'User:Alice', + operation => 'READ', + host => '*' +); + +-- Grant write permission to user Bob from a specific host +CALL sys.add_acl( + resource => 'my_database.my_table', + permission => 'ALLOW', + principal => 'User:Bob', + operation => 'WRITE', + host => '192.168.1.100' +); +``` + +### drop_acl + +Remove an ACL entry to revoke permissions. + +**Syntax:** + +```sql +CALL [catalog_name.]sys.drop_acl( + resource => 'STRING', + permission => 'STRING', + principal => 'STRING', + operation => 'STRING', + host => 'STRING' -- optional, defaults to '*' +) +``` + +**Parameters:** + +All parameters accept the same values as `add_acl`. You can use `'ANY'` as a wildcard value to match multiple entries for batch deletion. + +**Example:** + +```sql title="Flink SQL" +-- Use the Fluss catalog (replace 'fluss_catalog' with your catalog name if different) +USE fluss_catalog; + +-- Remove a specific ACL entry +CALL sys.drop_acl( + resource => 'CLUSTER', + permission => 'ALLOW', + principal => 'User:Alice', + operation => 'READ', + host => '*' +); + +-- Remove all ACL entries for a specific user +CALL sys.drop_acl( + resource => 'ANY', + permission => 'ANY', + principal => 'User:Alice', + operation => 'ANY', + host => 'ANY' +); +``` + +### list_acl + +List ACL entries matching the specified filters. + +**Syntax:** + +```sql +CALL [catalog_name.]sys.list_acl( + resource => 'STRING', + permission => 'STRING', -- optional, defaults to 'ANY' + principal => 'STRING', -- optional, defaults to 'ANY' + operation => 'STRING', -- optional, defaults to 'ANY' + host => 'STRING' -- optional, defaults to 'ANY' +) +``` + +**Parameters:** + +All parameters accept the same values as `add_acl`. Use `'ANY'` as a wildcard to match all values for that parameter. + +**Returns:** An array of strings, each representing an ACL entry in the format: `resource="...";permission="...";principal="...";operation="...";host="..."` + +**Example:** + +```sql title="Flink SQL" +-- Use the Fluss catalog (replace 'fluss_catalog' with your catalog name if different) +USE fluss_catalog; + +-- List all ACL entries +CALL sys.list_acl(resource => 'ANY'); + +-- List all ACL entries for a specific user +CALL sys.list_acl( + resource => 'ANY', + principal => 'User:Alice' +); + +-- List all read permissions +CALL sys.list_acl( + resource => 'ANY', + operation => 'READ' +); +``` + +## Cluster Configuration Procedures + +Fluss provides procedures to dynamically manage cluster configurations without requiring a server restart. + +### get_cluster_config + +Retrieve cluster configuration values. + +**Syntax:** + +```sql +-- Get a specific configuration +CALL [catalog_name.]sys.get_cluster_config(config_key => 'STRING') + +-- Get all cluster configurations +CALL [catalog_name.]sys.get_cluster_config() +``` + +**Parameters:** + +- `config_key` (optional): The configuration key to retrieve. If omitted, returns all cluster configurations. + +**Returns:** A table with columns: +- `config_key`: The configuration key name +- `config_value`: The current value +- `config_source`: The source of the configuration (e.g., `DYNAMIC_CONFIG`, `STATIC_CONFIG`) + +**Example:** + +```sql title="Flink SQL" +-- Use the Fluss catalog (replace 'fluss_catalog' with your catalog name if different) +USE fluss_catalog; + +-- Get a specific configuration +CALL sys.get_cluster_config( + config_key => 'kv.rocksdb.shared-rate-limiter.bytes-per-sec' +); + +-- Get all cluster configurations +CALL sys.get_cluster_config(); +``` + +### set_cluster_config + +Set or delete a cluster configuration dynamically. + +**Syntax:** + +```sql +-- Set a configuration value +CALL [catalog_name.]sys.set_cluster_config( + config_key => 'STRING', + config_value => 'STRING' +) + +-- Delete a configuration (reset to default) +CALL [catalog_name.]sys.set_cluster_config(config_key => 'STRING') +``` + +**Parameters:** + +- `config_key` (required): The configuration key to modify. +- `config_value` (optional): The new value to set. If omitted or empty, the configuration is deleted (reset to default). + +**Important Notes:** + +- Changes are validated before being applied and persisted in ZooKeeper +- Changes are automatically applied to all servers (Coordinator and TabletServers) +- Changes survive server restarts +- Not all configurations support dynamic changes. The server will reject invalid modifications + +**Example:** + +```sql title="Flink SQL" +-- Use the Fluss catalog (replace 'fluss_catalog' with your catalog name if different) +USE fluss_catalog; + +-- Set RocksDB rate limiter +CALL sys.set_cluster_config( + config_key => 'kv.rocksdb.shared-rate-limiter.bytes-per-sec', + config_value => '200MB' +); + +-- Set datalake format +CALL sys.set_cluster_config( + config_key => 'datalake.format', + config_value => 'paimon' +); + +-- Delete a configuration (reset to default) +CALL sys.set_cluster_config( + config_key => 'kv.rocksdb.shared-rate-limiter.bytes-per-sec' +); +``` + diff --git a/website/docs/engine-flink/reads.md b/website/docs/engine-flink/reads.md index bcb2c24756..4cab45fbdd 100644 --- a/website/docs/engine-flink/reads.md +++ b/website/docs/engine-flink/reads.md @@ -1,7 +1,7 @@ --- sidebar_label: Reads title: Flink Reads -sidebar_position: 4 +sidebar_position: 5 --- # Flink Reads diff --git a/website/docs/engine-flink/writes.md b/website/docs/engine-flink/writes.md index 29af716fdb..354aa97475 100644 --- a/website/docs/engine-flink/writes.md +++ b/website/docs/engine-flink/writes.md @@ -1,7 +1,7 @@ --- sidebar_label: Writes title: Flink Writes -sidebar_position: 3 +sidebar_position: 4 --- # Flink Writes diff --git a/website/docs/maintenance/configuration.md b/website/docs/maintenance/configuration.md index f6988aacef..97da3457d0 100644 --- a/website/docs/maintenance/configuration.md +++ b/website/docs/maintenance/configuration.md @@ -157,6 +157,7 @@ during the Fluss cluster working. | kv.rocksdb.use-bloom-filter | Boolean | true | If true, every newly created SST file will contain a Bloom filter. It is enabled by default. | | kv.rocksdb.bloom-filter.bits-per-key | Double | 10.0 | Bits per key that bloom filter will use, this only take effect when bloom filter is used. The default value is 10.0. | | kv.rocksdb.bloom-filter.block-based-mode | Boolean | false | If true, RocksDB will use block-based filter instead of full filter, this only take effect when bloom filter is used. The default value is `false`. | +| kv.rocksdb.shared-rate-limiter-bytes-per-sec | MemorySize | Long.MAX_VALUE | The bytes per second rate limit for RocksDB flush and compaction operations shared across all RocksDB instances on the TabletServer. The rate limiter is always enabled. The default value is Long.MAX_VALUE (effectively unlimited). Set to a lower value (e.g., 100MB) to limit the rate. This configuration can be updated dynamically without server restart. See [Updating Configs](operations/updating-configs.md) for more details. | | kv.recover.log-record-batch.max-size | MemorySize | 16mb | The max fetch size for fetching log to apply to kv during recovering kv. | ## Metrics diff --git a/website/docs/maintenance/operations/updating-configs.md b/website/docs/maintenance/operations/updating-configs.md index 0643a46fec..8b7a6a1e10 100644 --- a/website/docs/maintenance/operations/updating-configs.md +++ b/website/docs/maintenance/operations/updating-configs.md @@ -15,9 +15,13 @@ From Fluss version 0.8 onwards, some of the server configs can be updated withou Currently, the supported dynamically updatable server configurations include: - `datalake.format`: Enable lakehouse storage by specifying the lakehouse format, e.g., `paimon`, `iceberg`. - Options with prefix `datalake.${datalake.format}` +- `kv.rocksdb.shared-rate-limiter.bytes-per-sec`: Control RocksDB flush and compaction write rate shared across all RocksDB instances on the TabletServer. The rate limiter is always enabled. Set to a lower value (e.g., 100MB) to limit the rate, or a very high value to effectively disable rate limiting. -You can update the configuration of a cluster with [Java client](apis/java-client.md). +You can update the configuration of a cluster with [Java client](#using-java-client) or [Flink Procedures](../../engine-flink/procedures.md#cluster-configuration-procedures). + +### Using Java Client + Here is a code snippet to demonstrate how to update the cluster configurations using the Java Client: ```java @@ -30,6 +34,11 @@ admin.alterClusterConfigs( admin.alterClusterConfigs( Collections.singletonList( new AlterConfig(DATALAKE_FORMAT.key(), "paimon", AlterConfigOpType.DELETE))); + +// Set RocksDB shared rate limiter to 200MB/sec +admin.alterClusterConfigs( + Collections.singletonList( + new AlterConfig(KV_SHARED_RATE_LIMITER_BYTES_PER_SEC.key(), "200MB", AlterConfigOpType.SET))); ``` The `AlterConfig` class contains three properties: @@ -37,6 +46,9 @@ The `AlterConfig` class contains three properties: * `value`: The configuration value to be set (e.g., `paimon`) * `opType`: The operation type, either `AlterConfigOpType.SET` or `AlterConfigOpType.DELETE` +### Using Flink Stored Procedures + +For certain configurations, Fluss provides convenient Flink stored procedures that can be called directly from Flink SQL. See [Procedures](engine-flink/procedures.md#cluster-configuration-procedures) for detailed documentation on using `get_cluster_config` and `set_cluster_config` procedures. ## Updating Table Configs