Skip to content

Commit 223fe8d

Browse files
committed
fix comment
Signed-off-by: Pei Yu <[email protected]>
1 parent f37f72a commit 223fe8d

File tree

5 files changed

+139
-58
lines changed

5 files changed

+139
-58
lines changed

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/GetClusterConfigsProcedure.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -68,13 +68,13 @@ public Row[] call(ProcedureContext context) throws Exception {
6868
}
6969

7070
@ProcedureHint(
71-
argument = {@ArgumentHint(name = "config_key", type = @DataTypeHint("STRING"))},
71+
argument = {@ArgumentHint(name = "config_keys", type = @DataTypeHint("STRING"))},
7272
isVarArgs = true,
7373
output =
7474
@DataTypeHint(
7575
"ROW<config_key STRING, config_value STRING, config_source STRING>"))
76-
public Row[] call(ProcedureContext context, String... configKey) throws Exception {
77-
return getConfigs(configKey);
76+
public Row[] call(ProcedureContext context, String... configKeys) throws Exception {
77+
return getConfigs(configKeys);
7878
}
7979

8080
private Row[] getConfigs(@Nullable String... configKeys) throws Exception {

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/ResetClusterConfigsProcedure.java

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,9 @@
2929
import java.util.List;
3030

3131
/**
32-
* Procedure to reset cluster configuration dynamically.
32+
* Procedure to reset cluster configurations to their default values.
3333
*
34-
* <p>This procedure allows modifying dynamic cluster configurations. The changes are:
34+
* <p>This procedure reverts the configurations to their initial system defaults. The changes are:
3535
*
3636
* <ul>
3737
* <li>Validated by the CoordinatorServer before persistence
@@ -51,8 +51,8 @@
5151
*
5252
* </pre>
5353
*
54-
* <p><b>Note:</b> Not all configurations support dynamic changes. The server will validate the
55-
* change and reject it if the configuration cannot be reset dynamically.
54+
* <p><b>Note:</b> In theory, an operation like <b>Reset to default value<b/> should always succeed,
55+
* as the default value should be a valid one
5656
*/
5757
public class ResetClusterConfigsProcedure extends ProcedureBase {
5858

@@ -69,7 +69,7 @@ public String[] call(ProcedureContext context, String... configKeys) throws Exce
6969
}
7070

7171
List<AlterConfig> configList = new ArrayList<>();
72-
StringBuilder resultMessage = new StringBuilder();
72+
List<String> resultMessage = new ArrayList();
7373

7474
for (String key : configKeys) {
7575
String configKey = key.trim();
@@ -84,7 +84,7 @@ public String[] call(ProcedureContext context, String... configKeys) throws Exce
8484
AlterConfig alterConfig =
8585
new AlterConfig(configKey, null, AlterConfigOpType.DELETE);
8686
configList.add(alterConfig);
87-
resultMessage.append(
87+
resultMessage.add(
8888
String.format(
8989
"Successfully %s configuration '%s'. ", operationDesc, configKey));
9090
}
@@ -93,9 +93,7 @@ public String[] call(ProcedureContext context, String... configKeys) throws Exce
9393
// This will trigger validation on CoordinatorServer before persistence
9494
admin.alterClusterConfigs(configList).get();
9595

96-
return new String[] {
97-
resultMessage + "The change is persisted in ZooKeeper and applied to all servers."
98-
};
96+
return resultMessage.toArray(new String[0]);
9997
} catch (IllegalArgumentException e) {
10098
// Re-throw validation errors with original message
10199
throw e;

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/SetClusterConfigsProcedure.java

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -49,9 +49,6 @@
4949
*
5050
* -- Set multiple configurations at one time
5151
* CALL sys.set_cluster_configs('kv.rocksdb.shared-rate-limiter.bytes-per-sec', '200MB','datalake.format', 'paimon');
52-
*
53-
* -- Delete a configuration (reset to default)
54-
* CALL sys.set_cluster_configs('kv.rocksdb.shared-rate-limiter.bytes-per-sec', '');
5552
* </pre>
5653
*
5754
* <p><b>Note:</b> Not all configurations support dynamic changes. The server will validate the
@@ -78,7 +75,7 @@ public String[] call(ProcedureContext context, String... configPairs) throws Exc
7875
+ "Please specify a valid configuration pairs.");
7976
}
8077
List<AlterConfig> configList = new ArrayList<>();
81-
StringBuilder resultMessage = new StringBuilder();
78+
List<String> resultMessage = new ArrayList<>();
8279

8380
for (int i = 0; i < configPairs.length; i += 2) {
8481
String configKey = configPairs[i].trim();
@@ -95,7 +92,7 @@ public String[] call(ProcedureContext context, String... configPairs) throws Exc
9592
AlterConfig alterConfig =
9693
new AlterConfig(configKey, configValue, AlterConfigOpType.SET);
9794
configList.add(alterConfig);
98-
resultMessage.append(
95+
resultMessage.add(
9996
String.format(
10097
"Successfully %s configuration '%s'. ", operationDesc, configKey));
10198
}
@@ -104,9 +101,7 @@ public String[] call(ProcedureContext context, String... configPairs) throws Exc
104101
// This will trigger validation on CoordinatorServer before persistence
105102
admin.alterClusterConfigs(configList).get();
106103

107-
return new String[] {
108-
resultMessage + "The change is persisted in ZooKeeper and applied to all servers."
109-
};
104+
return resultMessage.toArray(new String[0]);
110105
} catch (IllegalArgumentException e) {
111106
// Re-throw validation errors with original message
112107
throw e;

fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/procedure/FlinkProcedureITCase.java

Lines changed: 65 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -284,6 +284,31 @@ void testGetClusterConfigs() throws Exception {
284284
assertThat(row.getField(2)).isNotNull(); // config_source
285285
}
286286

287+
// Get multiple config
288+
try (CloseableIterator<Row> resultIterator =
289+
tEnv.executeSql(
290+
String.format(
291+
"Call %s.sys.get_cluster_configs('%s', '%s')",
292+
CATALOG_NAME,
293+
ConfigOptions.KV_SHARED_RATE_LIMITER_BYTES_PER_SEC.key(),
294+
ConfigOptions.DATALAKE_FORMAT.key()))
295+
.collect()) {
296+
List<Row> results = CollectionUtil.iteratorToList(resultIterator);
297+
assertThat(results).hasSize(2);
298+
// the first row
299+
Row row0 = results.get(0);
300+
assertThat(row0.getField(0))
301+
.isEqualTo(ConfigOptions.KV_SHARED_RATE_LIMITER_BYTES_PER_SEC.key());
302+
assertThat(row0.getField(1)).isEqualTo("100 mb");
303+
assertThat(row0.getField(2)).isNotNull(); // config_source
304+
305+
// the second row
306+
Row row1 = results.get(1);
307+
assertThat(row1.getField(0)).isEqualTo(ConfigOptions.DATALAKE_FORMAT.key());
308+
assertThat(row1.getField(1)).isEqualTo("paimon");
309+
assertThat(row1.getField(2)).isNotNull(); // config_source
310+
}
311+
287312
// Get all configs
288313
try (CloseableIterator<Row> resultIterator =
289314
tEnv.executeSql(String.format("Call %s.sys.get_cluster_configs()", CATALOG_NAME))
@@ -340,11 +365,14 @@ void testSetClusterConfigs() throws Exception {
340365
ConfigOptions.DATALAKE_FORMAT.key()))
341366
.collect()) {
342367
List<Row> results = CollectionUtil.iteratorToList(resultIterator);
343-
assertThat(results).hasSize(1);
368+
assertThat(results).hasSize(2);
344369
assertThat(results.get(0).getField(0))
345370
.asString()
346371
.contains("Successfully set to '300MB'")
347-
.contains(ConfigOptions.KV_SHARED_RATE_LIMITER_BYTES_PER_SEC.key())
372+
.contains(ConfigOptions.KV_SHARED_RATE_LIMITER_BYTES_PER_SEC.key());
373+
374+
assertThat(results.get(1).getField(0))
375+
.asString()
348376
.contains("Successfully set to 'paimon'")
349377
.contains(ConfigOptions.DATALAKE_FORMAT.key());
350378
}
@@ -365,9 +393,10 @@ void testSetClusterConfigs() throws Exception {
365393
// reset cluster configs.
366394
tEnv.executeSql(
367395
String.format(
368-
"Call %s.sys.reset_cluster_configs('%s')",
396+
"Call %s.sys.reset_cluster_configs('%s', '%s')",
369397
CATALOG_NAME,
370-
ConfigOptions.KV_SHARED_RATE_LIMITER_BYTES_PER_SEC.key()))
398+
ConfigOptions.KV_SHARED_RATE_LIMITER_BYTES_PER_SEC.key(),
399+
ConfigOptions.DATALAKE_FORMAT.key()))
371400
.await();
372401
}
373402

@@ -392,11 +421,15 @@ void testResetClusterConfigs() throws Exception {
392421
ConfigOptions.KV_SHARED_RATE_LIMITER_BYTES_PER_SEC.key()))
393422
.collect()) {
394423
List<Row> results = CollectionUtil.iteratorToList(resultIterator);
395-
assertThat(results).hasSize(1);
424+
assertThat(results).hasSize(2);
396425
assertThat(results.get(0).getField(0))
397426
.asString()
398427
.contains("Successfully deleted")
399-
.contains(ConfigOptions.DATALAKE_FORMAT.key())
428+
.contains(ConfigOptions.DATALAKE_FORMAT.key());
429+
430+
assertThat(results.get(1).getField(0))
431+
.asString()
432+
.contains("Successfully deleted")
400433
.contains(ConfigOptions.KV_SHARED_RATE_LIMITER_BYTES_PER_SEC.key());
401434
}
402435
}
@@ -429,6 +462,18 @@ void testSetClusterConfigValidation() throws Exception {
429462
.rootCause()
430463
.hasMessageContaining(
431464
"config_pairs must be set in pairs. Please specify a valid configuration pairs");
465+
466+
// Try to no parameters passed
467+
assertThatThrownBy(
468+
() ->
469+
tEnv.executeSql(
470+
String.format(
471+
"Call %s.sys.set_cluster_configs()",
472+
CATALOG_NAME))
473+
.await())
474+
.rootCause()
475+
.hasMessageContaining(
476+
"config_pairs cannot be null or empty. Please specify a valid configuration pairs");
432477
}
433478

434479
@Test
@@ -438,12 +483,24 @@ void testResetClusterConfigValidation() throws Exception {
438483
() ->
439484
tEnv.executeSql(
440485
String.format(
441-
"Call %s.sys.reset_cluster_configs('')",
486+
"Call %s.sys.reset_cluster_configs('invalid.config.key')",
487+
CATALOG_NAME))
488+
.await())
489+
.rootCause()
490+
.hasMessageContaining(
491+
"The config key invalid.config.key is not allowed to be changed dynamically");
492+
493+
// Try to no parameters passed
494+
assertThatThrownBy(
495+
() ->
496+
tEnv.executeSql(
497+
String.format(
498+
"Call %s.sys.reset_cluster_configs()",
442499
CATALOG_NAME))
443500
.await())
444501
.rootCause()
445502
.hasMessageContaining(
446-
"Config key cannot be null or empty. Please specify a valid configuration key");
503+
"config_keys cannot be null or empty. Please specify valid configuration keys");
447504
}
448505

449506
@ParameterizedTest

website/docs/engine-flink/procedures.md

Lines changed: 61 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -162,23 +162,23 @@ CALL sys.list_acl(
162162

163163
Fluss provides procedures to dynamically manage cluster configurations without requiring a server restart.
164164

165-
### get_cluster_config
165+
### get_cluster_configs
166166

167167
Retrieve cluster configuration values.
168168

169169
**Syntax:**
170170

171171
```sql
172-
-- Get a specific configuration
173-
CALL [catalog_name.]sys.get_cluster_config(config_key => 'STRING')
172+
-- Get multiple configurations
173+
CALL [catalog_name.]sys.get_cluster_configs(config_keys => 'key1' [, 'key2', ...])
174174

175175
-- Get all cluster configurations
176-
CALL [catalog_name.]sys.get_cluster_config()
176+
CALL [catalog_name.]sys.get_cluster_configs()
177177
```
178178

179179
**Parameters:**
180180

181-
- `config_key` (optional): The configuration key to retrieve. If omitted, returns all cluster configurations.
181+
- `config_keys` (optional): The configuration keys to retrieve. If omitted, returns all cluster configurations.
182182

183183
**Returns:** A table with columns:
184184
- `config_key`: The configuration key name
@@ -192,35 +192,35 @@ CALL [catalog_name.]sys.get_cluster_config()
192192
USE fluss_catalog;
193193

194194
-- Get a specific configuration
195-
CALL sys.get_cluster_config(
196-
config_key => 'kv.rocksdb.shared-rate-limiter.bytes-per-sec'
195+
CALL sys.get_cluster_configs(
196+
config_keys => 'kv.rocksdb.shared-rate-limiter.bytes-per-sec'
197+
);
198+
199+
-- Get multiple configuration
200+
CALL sys.get_cluster_configs(
201+
config_keys => 'kv.rocksdb.shared-rate-limiter.bytes-per-sec', 'datalake.format'
197202
);
198203

199204
-- Get all cluster configurations
200-
CALL sys.get_cluster_config();
205+
CALL sys.get_cluster_configs();
201206
```
202207

203-
### set_cluster_config
208+
### set_cluster_configs
204209

205-
Set or delete a cluster configuration dynamically.
210+
Set cluster configurations dynamically.
206211

207212
**Syntax:**
208213

209214
```sql
210-
-- Set a configuration value
211-
CALL [catalog_name.]sys.set_cluster_config(
212-
config_key => 'STRING',
213-
config_value => 'STRING'
215+
-- Set configuration values
216+
CALL [catalog_name.]sys.set_cluster_configs(
217+
config_pairs => 'key1', 'value1' [, 'key2', 'value2' ...]
214218
)
215-
216-
-- Delete a configuration (reset to default)
217-
CALL [catalog_name.]sys.set_cluster_config(config_key => 'STRING')
218219
```
219220

220221
**Parameters:**
221222

222-
- `config_key` (required): The configuration key to modify.
223-
- `config_value` (optional): The new value to set. If omitted or empty, the configuration is deleted (reset to default).
223+
- `config_pairs`(required): For key-value pairs in configuration items, the number of parameters must be even.
224224

225225
**Important Notes:**
226226

@@ -236,20 +236,51 @@ CALL [catalog_name.]sys.set_cluster_config(config_key => 'STRING')
236236
USE fluss_catalog;
237237

238238
-- Set RocksDB rate limiter
239-
CALL sys.set_cluster_config(
240-
config_key => 'kv.rocksdb.shared-rate-limiter.bytes-per-sec',
241-
config_value => '200MB'
239+
CALL sys.set_cluster_configs(
240+
config_pairs => 'kv.rocksdb.shared-rate-limiter.bytes-per-sec', '200MB'
242241
);
243242

244-
-- Set datalake format
245-
CALL sys.set_cluster_config(
246-
config_key => 'datalake.format',
247-
config_value => 'paimon'
243+
-- Set RocksDB rate limiter and datalake format
244+
CALL sys.set_cluster_configs(
245+
config_pairs => 'kv.rocksdb.shared-rate-limiter.bytes-per-sec', '200MB', 'datalake.format','paimon'
248246
);
247+
```
249248

250-
-- Delete a configuration (reset to default)
251-
CALL sys.set_cluster_config(
252-
config_key => 'kv.rocksdb.shared-rate-limiter.bytes-per-sec'
253-
);
249+
### reset_cluster_configs
250+
251+
reset cluster configurations dynamically.
252+
253+
**Syntax:**
254+
255+
```sql
256+
-- reset configuration values
257+
CALL [catalog_name.]sys.reset_cluster_configs(config_keys => 'key1' [, 'key2', ...])
254258
```
255259

260+
**Parameters:**
261+
262+
- `config_keys`(required): The configuration keys to reset.
263+
264+
**Important Notes:**
265+
266+
- Changes are validated before being applied and persisted in ZooKeeper
267+
- Changes are automatically applied to all servers (Coordinator and TabletServers)
268+
- Changes survive server restarts
269+
- Not all configurations support dynamic changes. The server will reject invalid modifications
270+
271+
**Example:**
272+
273+
```sql title="Flink SQL"
274+
-- Use the Fluss catalog (replace 'fluss_catalog' with your catalog name if different)
275+
USE fluss_catalog;
276+
277+
-- Reset a specific configuration
278+
CALL sys.reset_cluster_configs(
279+
config_keys => 'kv.rocksdb.shared-rate-limiter.bytes-per-sec'
280+
);
281+
282+
-- Reset RocksDB rate limiter and datalake format
283+
CALL sys.reset_cluster_configs(
284+
config_keys => 'kv.rocksdb.shared-rate-limiter.bytes-per-sec', 'datalake.format'
285+
);
286+
```

0 commit comments

Comments
 (0)