Skip to content

Commit 9e6f6a4

Browse files
[kv] Support shared RocksDB rate limiter (#2178)
1 parent f731fc6 commit 9e6f6a4

File tree

25 files changed

+1380
-106
lines changed

25 files changed

+1380
-106
lines changed

fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,12 @@
2828
import org.apache.fluss.metadata.MergeEngineType;
2929
import org.apache.fluss.utils.ArrayUtils;
3030

31+
import java.lang.reflect.Field;
3132
import java.time.Duration;
3233
import java.time.ZoneId;
3334
import java.util.Arrays;
3435
import java.util.Collections;
36+
import java.util.HashMap;
3537
import java.util.List;
3638
import java.util.Map;
3739

@@ -1576,6 +1578,17 @@ public class ConfigOptions {
15761578
"The max size of the consumed memory for RocksDB batch write, "
15771579
+ "will flush just based on item count if this config set to 0.");
15781580

1581+
public static final ConfigOption<MemorySize> KV_SHARED_RATE_LIMITER_BYTES_PER_SEC =
1582+
key("kv.rocksdb.shared-rate-limiter.bytes-per-sec")
1583+
.memoryType()
1584+
.defaultValue(new MemorySize(Long.MAX_VALUE))
1585+
.withDescription(
1586+
"The shared rate limit in bytes per second for RocksDB flush and compaction operations "
1587+
+ "across all RocksDB instances in the TabletServer. "
1588+
+ "All KV tablets share a single global RateLimiter to prevent disk IO from being saturated. "
1589+
+ "The RateLimiter is always enabled. The default value is Long.MAX_VALUE (effectively unlimited). "
1590+
+ "Set to a lower value (e.g., 100MB) to limit the rate.");
1591+
15791592
// --------------------------------------------------------------------------
15801593
// Provided configurable ColumnFamilyOptions within Fluss
15811594
// --------------------------------------------------------------------------
@@ -1869,4 +1882,45 @@ public enum KvCompressionType {
18691882
LZ4,
18701883
ZSTD
18711884
}
1885+
1886+
// ------------------------------------------------------------------------
1887+
// ConfigOptions Registry and Utilities
1888+
// ------------------------------------------------------------------------
1889+
1890+
/**
1891+
* Holder class for lazy initialization of ConfigOptions registry. This ensures that the
1892+
* registry is initialized only when first accessed, and guarantees that all ConfigOption fields
1893+
* are already initialized (since static initialization happens in declaration order).
1894+
*/
1895+
private static class ConfigOptionsHolder {
1896+
private static final Map<String, ConfigOption<?>> CONFIG_OPTIONS_BY_KEY;
1897+
1898+
static {
1899+
Map<String, ConfigOption<?>> options = new HashMap<>();
1900+
Field[] fields = ConfigOptions.class.getFields();
1901+
for (Field field : fields) {
1902+
if (!ConfigOption.class.isAssignableFrom(field.getType())) {
1903+
continue;
1904+
}
1905+
try {
1906+
ConfigOption<?> configOption = (ConfigOption<?>) field.get(null);
1907+
options.put(configOption.key(), configOption);
1908+
} catch (IllegalAccessException e) {
1909+
// Ignore fields that cannot be accessed
1910+
}
1911+
}
1912+
CONFIG_OPTIONS_BY_KEY = Collections.unmodifiableMap(options);
1913+
}
1914+
}
1915+
1916+
/**
1917+
* Gets the ConfigOption for a given key.
1918+
*
1919+
* @param key the configuration key
1920+
* @return the ConfigOption if found, null otherwise
1921+
*/
1922+
@Internal
1923+
public static ConfigOption<?> getConfigOption(String key) {
1924+
return ConfigOptionsHolder.CONFIG_OPTIONS_BY_KEY.get(key);
1925+
}
18721926
}
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.config.cluster;
19+
20+
import org.apache.fluss.annotation.PublicEvolving;
21+
import org.apache.fluss.exception.ConfigException;
22+
23+
import javax.annotation.Nullable;
24+
25+
/**
26+
* Validator for a single dynamic configuration key.
27+
*
28+
* <p>Unlike {@link ServerReconfigurable}, validators are stateless and only perform validation
29+
* logic without requiring component instances. This allows coordinators to validate configurations
30+
* for components they don't run (e.g., KvManager).
31+
*
32+
* <p>Example use case: CoordinatorServer needs to validate KV-related configurations even though it
33+
* doesn't have a KvManager instance. A {@link ConfigValidator} can be registered on both
34+
* CoordinatorServer (for validation) and TabletServer (for both validation and actual
35+
* reconfiguration via {@link ServerReconfigurable}).
36+
*
37+
* <p>Each validator monitors a single configuration key. The validator will only be invoked when
38+
* that specific key changes, improving validation efficiency.
39+
*
40+
* <p>This interface is designed to be stateless and thread-safe. Implementations should not rely on
41+
* any mutable component state.
42+
*
43+
* <p><b>Type-safe validation:</b> The validator receives strongly-typed values that have already
44+
* been parsed and validated for basic type correctness. This avoids redundant string parsing and
45+
* allows validators to focus on business logic validation.
46+
*
47+
* @param <T> the type of the configuration value being validated
48+
*/
49+
@PublicEvolving
50+
public interface ConfigValidator<T> {
51+
52+
/**
53+
* Returns the configuration key this validator monitors.
54+
*
55+
* <p>The validator will only be invoked when this specific configuration key changes. This
56+
* allows efficient filtering of validators and clear declaration of dependencies.
57+
*
58+
* @return the configuration key to monitor, must not be null or empty
59+
*/
60+
String configKey();
61+
62+
/**
63+
* Validates a configuration value change.
64+
*
65+
* <p>This method is called when the monitored configuration key changes. It should check
66+
* whether the new value is valid, potentially considering the old value and validation rules.
67+
*
68+
* <p>The method should be stateless and deterministic - given the same old and new values, it
69+
* should always produce the same validation result.
70+
*
71+
* @param oldValue the previous value of the configuration key, null if the key was not set
72+
* before
73+
* @param newValue the new value of the configuration key, null if the key is being deleted
74+
* @throws ConfigException if the configuration change is invalid, with a descriptive error
75+
* message explaining why the change cannot be applied
76+
*/
77+
void validate(@Nullable T oldValue, @Nullable T newValue) throws ConfigException;
78+
}
Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.flink.procedure;
19+
20+
import org.apache.fluss.config.cluster.ConfigEntry;
21+
22+
import org.apache.flink.table.annotation.ArgumentHint;
23+
import org.apache.flink.table.annotation.DataTypeHint;
24+
import org.apache.flink.table.annotation.ProcedureHint;
25+
import org.apache.flink.table.procedure.ProcedureContext;
26+
import org.apache.flink.types.Row;
27+
28+
import javax.annotation.Nullable;
29+
30+
import java.util.ArrayList;
31+
import java.util.Collection;
32+
import java.util.List;
33+
34+
/**
35+
* Procedure to get cluster configuration(s).
36+
*
37+
* <p>This procedure allows querying dynamic cluster configurations. It can retrieve:
38+
*
39+
* <ul>
40+
* <li>A specific configuration key
41+
* <li>All configurations (when key parameter is null or empty)
42+
* </ul>
43+
*
44+
* <p>Usage examples:
45+
*
46+
* <pre>
47+
* -- Get a specific configuration
48+
* CALL sys.get_cluster_config('kv.rocksdb.shared-rate-limiter.bytes-per-sec');
49+
*
50+
* -- Get all cluster configurations
51+
* CALL sys.get_cluster_config();
52+
* </pre>
53+
*/
54+
public class GetClusterConfigProcedure extends ProcedureBase {
55+
56+
@ProcedureHint(
57+
output =
58+
@DataTypeHint(
59+
"ROW<config_key STRING, config_value STRING, config_source STRING>"))
60+
public Row[] call(ProcedureContext context) throws Exception {
61+
return getConfigs(null);
62+
}
63+
64+
@ProcedureHint(
65+
argument = {@ArgumentHint(name = "config_key", type = @DataTypeHint("STRING"))},
66+
output =
67+
@DataTypeHint(
68+
"ROW<config_key STRING, config_value STRING, config_source STRING>"))
69+
public Row[] call(ProcedureContext context, String configKey) throws Exception {
70+
return getConfigs(configKey);
71+
}
72+
73+
private Row[] getConfigs(@Nullable String configKey) throws Exception {
74+
try {
75+
// Get all cluster configurations
76+
Collection<ConfigEntry> configs = admin.describeClusterConfigs().get();
77+
78+
List<Row> results = new ArrayList<>();
79+
80+
if (configKey == null || configKey.isEmpty()) {
81+
// Return all configurations
82+
for (ConfigEntry entry : configs) {
83+
results.add(
84+
Row.of(
85+
entry.key(),
86+
entry.value(),
87+
entry.source() != null ? entry.source().name() : "UNKNOWN"));
88+
}
89+
} else {
90+
// Find specific configuration
91+
for (ConfigEntry entry : configs) {
92+
if (entry.key().equals(configKey)) {
93+
results.add(
94+
Row.of(
95+
entry.key(),
96+
entry.value(),
97+
entry.source() != null
98+
? entry.source().name()
99+
: "UNKNOWN"));
100+
break;
101+
}
102+
}
103+
}
104+
105+
return results.toArray(new Row[0]);
106+
107+
} catch (Exception e) {
108+
throw new RuntimeException("Failed to get cluster config: " + e.getMessage(), e);
109+
}
110+
}
111+
}

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/ProcedureManager.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,9 @@ private static Map<String, Class<? extends ProcedureBase>> initProcedureMap() {
6969
private enum ProcedureEnum {
7070
ADD_ACL("sys.add_acl", AddAclProcedure.class),
7171
DROP_ACL("sys.drop_acl", DropAclProcedure.class),
72-
List_ACL("sys.list_acl", ListAclProcedure.class);
72+
List_ACL("sys.list_acl", ListAclProcedure.class),
73+
SET_CLUSTER_CONFIG("sys.set_cluster_config", SetClusterConfigProcedure.class),
74+
GET_CLUSTER_CONFIG("sys.get_cluster_config", GetClusterConfigProcedure.class);
7375

7476
private final String path;
7577
private final Class<? extends ProcedureBase> procedureClass;

0 commit comments

Comments
 (0)