Skip to content

Commit 09653a4

Browse files
committed
Refactor CALL procedure "sys.xxx_cluster_config" to support multiple conifg keys
Signed-off-by: Pei Yu <125331682@qq.com>
1 parent f90b33e commit 09653a4

File tree

5 files changed

+240
-87
lines changed

5 files changed

+240
-87
lines changed

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/GetClusterConfigProcedure.java renamed to fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/GetClusterConfigsProcedure.java

Lines changed: 25 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -30,54 +30,61 @@
3030
import java.util.ArrayList;
3131
import java.util.Collection;
3232
import java.util.List;
33+
import java.util.Map;
34+
import java.util.function.Function;
35+
import java.util.stream.Collectors;
3336

3437
/**
3538
* Procedure to get cluster configuration(s).
3639
*
3740
* <p>This procedure allows querying dynamic cluster configurations. It can retrieve:
3841
*
3942
* <ul>
40-
* <li>A specific configuration key
43+
* <li>multiple configurations
4144
* <li>All configurations (when key parameter is null or empty)
4245
* </ul>
4346
*
4447
* <p>Usage examples:
4548
*
4649
* <pre>
4750
* -- Get a specific configuration
48-
* CALL sys.get_cluster_config('kv.rocksdb.shared-rate-limiter.bytes-per-sec');
51+
* CALL sys.get_cluster_configs('kv.rocksdb.shared-rate-limiter.bytes-per-sec');
52+
*
53+
* -- Get multiple configurations
54+
* CALL sys.get_cluster_configs('kv.rocksdb.shared-rate-limiter.bytes-per-sec', 'datalake.format');
4955
*
5056
* -- Get all cluster configurations
51-
* CALL sys.get_cluster_config();
57+
* CALL sys.get_cluster_configs();
5258
* </pre>
5359
*/
54-
public class GetClusterConfigProcedure extends ProcedureBase {
60+
public class GetClusterConfigsProcedure extends ProcedureBase {
5561

5662
@ProcedureHint(
5763
output =
5864
@DataTypeHint(
5965
"ROW<config_key STRING, config_value STRING, config_source STRING>"))
6066
public Row[] call(ProcedureContext context) throws Exception {
61-
return getConfigs(null);
67+
return getConfigs();
6268
}
6369

6470
@ProcedureHint(
6571
argument = {@ArgumentHint(name = "config_key", type = @DataTypeHint("STRING"))},
72+
isVarArgs = true,
6673
output =
6774
@DataTypeHint(
6875
"ROW<config_key STRING, config_value STRING, config_source STRING>"))
69-
public Row[] call(ProcedureContext context, String configKey) throws Exception {
76+
public Row[] call(ProcedureContext context, String... configKey) throws Exception {
7077
return getConfigs(configKey);
7178
}
7279

73-
private Row[] getConfigs(@Nullable String configKey) throws Exception {
80+
private Row[] getConfigs(@Nullable String... configKeys) throws Exception {
7481
try {
7582
// Get all cluster configurations
7683
Collection<ConfigEntry> configs = admin.describeClusterConfigs().get();
7784

7885
List<Row> results = new ArrayList<>();
7986

80-
if (configKey == null || configKey.isEmpty()) {
87+
if (configKeys == null || configKeys.length == 0) {
8188
// Return all configurations
8289
for (ConfigEntry entry : configs) {
8390
results.add(
@@ -87,17 +94,23 @@ private Row[] getConfigs(@Nullable String configKey) throws Exception {
8794
entry.source() != null ? entry.source().name() : "UNKNOWN"));
8895
}
8996
} else {
90-
// Find specific configuration
91-
for (ConfigEntry entry : configs) {
92-
if (entry.key().equals(configKey)) {
97+
// Find configurations
98+
// The order of the results is the same as that of the key.
99+
Map<String, ConfigEntry> configEntryMap =
100+
configs.stream()
101+
.collect(Collectors.toMap(ConfigEntry::key, Function.identity()));
102+
for (String key : configKeys) {
103+
ConfigEntry entry = configEntryMap.get(key);
104+
if (null != entry) {
93105
results.add(
94106
Row.of(
95107
entry.key(),
96108
entry.value(),
97109
entry.source() != null
98110
? entry.source().name()
99111
: "UNKNOWN"));
100-
break;
112+
} else {
113+
results.add(Row.of());
101114
}
102115
}
103116
}

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/ProcedureManager.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,8 +70,9 @@ private enum ProcedureEnum {
7070
ADD_ACL("sys.add_acl", AddAclProcedure.class),
7171
DROP_ACL("sys.drop_acl", DropAclProcedure.class),
7272
List_ACL("sys.list_acl", ListAclProcedure.class),
73-
SET_CLUSTER_CONFIG("sys.set_cluster_config", SetClusterConfigProcedure.class),
74-
GET_CLUSTER_CONFIG("sys.get_cluster_config", GetClusterConfigProcedure.class),
73+
SET_CLUSTER_CONFIGS("sys.set_cluster_configs", SetClusterConfigsProcedure.class),
74+
GET_CLUSTER_CONFIGS("sys.get_cluster_configs", GetClusterConfigsProcedure.class),
75+
RESET_CLUSTER_CONFIGS("sys.reset_cluster_configs", ResetClusterConfigsProcedure.class),
7576
ADD_SERVER_TAG("sys.add_server_tag", AddServerTagProcedure.class),
7677
REMOVE_SERVER_TAG("sys.remove_server_tag", RemoveServerTagProcedure.class);
7778

Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.flink.procedure;
19+
20+
import org.apache.fluss.config.cluster.AlterConfig;
21+
import org.apache.fluss.config.cluster.AlterConfigOpType;
22+
23+
import org.apache.flink.table.annotation.ArgumentHint;
24+
import org.apache.flink.table.annotation.DataTypeHint;
25+
import org.apache.flink.table.annotation.ProcedureHint;
26+
import org.apache.flink.table.procedure.ProcedureContext;
27+
28+
import java.util.ArrayList;
29+
import java.util.List;
30+
31+
/**
32+
* Procedure to reset cluster configuration dynamically.
33+
*
34+
* <p>This procedure allows modifying dynamic cluster configurations. The changes are:
35+
*
36+
* <ul>
37+
* <li>Validated by the CoordinatorServer before persistence
38+
* <li>Persisted in ZooKeeper for durability
39+
* <li>Applied to all relevant servers (Coordinator and TabletServers)
40+
* <li>Survive server restarts
41+
* </ul>
42+
*
43+
* <p>Usage examples:
44+
*
45+
* <pre>
46+
* -- reset a configuration
47+
* CALL sys.reset_cluster_configs('kv.rocksdb.shared-rate-limiter.bytes-per-sec');
48+
*
49+
* -- reset multiple configurations at one time
50+
* CALL sys.reset_cluster_configs('kv.rocksdb.shared-rate-limiter.bytes-per-sec','datalake.format');
51+
*
52+
* </pre>
53+
*
54+
* <p><b>Note:</b> Not all configurations support dynamic changes. The server will validate the
55+
* change and reject it if the configuration cannot be reset dynamically.
56+
*/
57+
public class ResetClusterConfigsProcedure extends ProcedureBase {
58+
59+
@ProcedureHint(
60+
argument = {@ArgumentHint(name = "config_keys", type = @DataTypeHint("STRING"))},
61+
isVarArgs = true)
62+
public String[] call(ProcedureContext context, String... configKeys) throws Exception {
63+
try {
64+
// Validate config key
65+
if (configKeys.length == 0) {
66+
throw new IllegalArgumentException(
67+
"config_pairs cannot be null or empty. "
68+
+ "Please specify valid configuration keys.");
69+
}
70+
71+
List<AlterConfig> configList = new ArrayList<>();
72+
StringBuilder resultMessage = new StringBuilder();
73+
74+
for (String key : configKeys) {
75+
String configKey = key.trim();
76+
if (configKey.isEmpty()) {
77+
throw new IllegalArgumentException(
78+
"Config key cannot be null or empty. "
79+
+ "Please specify valid configuration key.");
80+
}
81+
82+
String operationDesc = "deleted (reset to default)";
83+
84+
AlterConfig alterConfig =
85+
new AlterConfig(configKey, null, AlterConfigOpType.DELETE);
86+
configList.add(alterConfig);
87+
resultMessage.append(
88+
String.format(
89+
"Successfully %s configuration '%s'. ", operationDesc, configKey));
90+
}
91+
92+
// Call Admin API to modify cluster configuration
93+
// This will trigger validation on CoordinatorServer before persistence
94+
admin.alterClusterConfigs(configList).get();
95+
96+
return new String[] {
97+
resultMessage + "The change is persisted in ZooKeeper and applied to all servers."
98+
};
99+
} catch (IllegalArgumentException e) {
100+
// Re-throw validation errors with original message
101+
throw e;
102+
} catch (Exception e) {
103+
// Wrap other exceptions with more context
104+
throw new RuntimeException(
105+
String.format("Failed to reset cluster config: %s", e.getMessage()), e);
106+
}
107+
}
108+
}

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/SetClusterConfigProcedure.java renamed to fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/SetClusterConfigsProcedure.java

Lines changed: 44 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,8 @@
2525
import org.apache.flink.table.annotation.ProcedureHint;
2626
import org.apache.flink.table.procedure.ProcedureContext;
2727

28-
import javax.annotation.Nullable;
29-
30-
import java.util.Collections;
28+
import java.util.ArrayList;
29+
import java.util.List;
3130

3231
/**
3332
* Procedure to set or delete cluster configuration dynamically.
@@ -45,84 +44,76 @@
4544
*
4645
* <pre>
4746
* -- Set a configuration
48-
* CALL sys.set_cluster_config('kv.rocksdb.shared-rate-limiter.bytes-per-sec', '200MB');
49-
* CALL sys.set_cluster_config('datalake.format', 'paimon');
47+
* CALL sys.set_cluster_configs('kv.rocksdb.shared-rate-limiter.bytes-per-sec', '200MB');
48+
* CALL sys.set_cluster_configs('datalake.format', 'paimon');
49+
*
50+
* -- Set multiple configurations at one time
51+
* CALL sys.set_cluster_configs('kv.rocksdb.shared-rate-limiter.bytes-per-sec', '200MB','datalake.format', 'paimon');
5052
*
5153
* -- Delete a configuration (reset to default)
52-
* CALL sys.set_cluster_config('kv.rocksdb.shared-rate-limiter.bytes-per-sec', NULL);
53-
* CALL sys.set_cluster_config('kv.rocksdb.shared-rate-limiter.bytes-per-sec', '');
54+
* CALL sys.set_cluster_configs('kv.rocksdb.shared-rate-limiter.bytes-per-sec', '');
5455
* </pre>
5556
*
5657
* <p><b>Note:</b> Not all configurations support dynamic changes. The server will validate the
5758
* change and reject it if the configuration cannot be modified dynamically or if the new value is
5859
* invalid.
5960
*/
60-
public class SetClusterConfigProcedure extends ProcedureBase {
61-
62-
@ProcedureHint(argument = {@ArgumentHint(name = "config_key", type = @DataTypeHint("STRING"))})
63-
public String[] call(ProcedureContext context, String configKey) throws Exception {
64-
return performSet(configKey, null);
65-
}
61+
public class SetClusterConfigsProcedure extends ProcedureBase {
6662

6763
@ProcedureHint(
68-
argument = {
69-
@ArgumentHint(name = "config_key", type = @DataTypeHint("STRING")),
70-
@ArgumentHint(name = "config_value", type = @DataTypeHint("STRING"))
71-
})
72-
public String[] call(ProcedureContext context, String configKey, String configValue)
73-
throws Exception {
74-
return performSet(configKey, configValue);
75-
}
76-
77-
private String[] performSet(String configKey, @Nullable String configValue) throws Exception {
78-
64+
argument = {@ArgumentHint(name = "config_pairs", type = @DataTypeHint("STRING"))},
65+
isVarArgs = true)
66+
public String[] call(ProcedureContext context, String... configPairs) throws Exception {
7967
try {
8068
// Validate config key
81-
if (configKey == null || configKey.trim().isEmpty()) {
69+
if (configPairs.length == 0) {
8270
throw new IllegalArgumentException(
83-
"Config key cannot be null or empty. "
84-
+ "Please specify a valid configuration key.");
71+
"config_pairs cannot be null or empty. "
72+
+ "Please specify a valid configuration pairs.");
8573
}
8674

87-
configKey = configKey.trim();
88-
89-
// Determine operation type
90-
AlterConfigOpType opType;
91-
String operationDesc;
92-
93-
if (configValue == null || configValue.trim().isEmpty()) {
94-
// Delete operation - reset to default
95-
opType = AlterConfigOpType.DELETE;
96-
operationDesc = "deleted (reset to default)";
97-
} else {
98-
// Set operation
99-
opType = AlterConfigOpType.SET;
100-
operationDesc = String.format("set to '%s'", configValue);
75+
if (configPairs.length % 2 != 0) {
76+
throw new IllegalArgumentException(
77+
"config_pairs must be set in pairs. "
78+
+ "Please specify a valid configuration pairs.");
79+
}
80+
List<AlterConfig> configList = new ArrayList<>();
81+
StringBuilder resultMessage = new StringBuilder();
82+
83+
for (int i = 0; i < configPairs.length; i += 2) {
84+
String configKey = configPairs[i].trim();
85+
if (configKey.isEmpty()) {
86+
throw new IllegalArgumentException(
87+
"Config key cannot be null or empty. "
88+
+ "Please specify a valid configuration key.");
89+
}
90+
String configValue = configPairs[i + 1];
91+
92+
String operationDesc = String.format("set to '%s'", configValue);
93+
94+
// Construct configuration modification operation.
95+
AlterConfig alterConfig =
96+
new AlterConfig(configKey, configValue, AlterConfigOpType.SET);
97+
configList.add(alterConfig);
98+
resultMessage.append(
99+
String.format(
100+
"Successfully %s configuration '%s'. ", operationDesc, configKey));
101101
}
102-
103-
// Construct configuration modification operation.
104-
AlterConfig alterConfig = new AlterConfig(configKey, configValue, opType);
105102

106103
// Call Admin API to modify cluster configuration
107104
// This will trigger validation on CoordinatorServer before persistence
108-
admin.alterClusterConfigs(Collections.singletonList(alterConfig)).get();
105+
admin.alterClusterConfigs(configList).get();
109106

110107
return new String[] {
111-
String.format(
112-
"Successfully %s configuration '%s'. "
113-
+ "The change is persisted in ZooKeeper and applied to all servers.",
114-
operationDesc, configKey)
108+
resultMessage + "The change is persisted in ZooKeeper and applied to all servers."
115109
};
116-
117110
} catch (IllegalArgumentException e) {
118111
// Re-throw validation errors with original message
119112
throw e;
120113
} catch (Exception e) {
121114
// Wrap other exceptions with more context
122115
throw new RuntimeException(
123-
String.format(
124-
"Failed to set cluster config '%s': %s", configKey, e.getMessage()),
125-
e);
116+
String.format("Failed to set cluster config: %s", e.getMessage()), e);
126117
}
127118
}
128119
}

0 commit comments

Comments
 (0)