Skip to content

Commit 5883c0a

Browse files
committed
Refactor CALL procedure "sys.xxx_cluster_config" to support multiple conifg keys
Signed-off-by: Pei Yu <[email protected]>
1 parent 09653a4 commit 5883c0a

File tree

5 files changed

+171
-62
lines changed

5 files changed

+171
-62
lines changed

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/GetClusterConfigsProcedure.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -68,13 +68,13 @@ public Row[] call(ProcedureContext context) throws Exception {
6868
}
6969

7070
@ProcedureHint(
71-
argument = {@ArgumentHint(name = "config_key", type = @DataTypeHint("STRING"))},
71+
argument = {@ArgumentHint(name = "config_keys", type = @DataTypeHint("STRING"))},
7272
isVarArgs = true,
7373
output =
7474
@DataTypeHint(
7575
"ROW<config_key STRING, config_value STRING, config_source STRING>"))
76-
public Row[] call(ProcedureContext context, String... configKey) throws Exception {
77-
return getConfigs(configKey);
76+
public Row[] call(ProcedureContext context, String... configKeys) throws Exception {
77+
return getConfigs(configKeys);
7878
}
7979

8080
private Row[] getConfigs(@Nullable String... configKeys) throws Exception {
@@ -109,8 +109,6 @@ private Row[] getConfigs(@Nullable String... configKeys) throws Exception {
109109
entry.source() != null
110110
? entry.source().name()
111111
: "UNKNOWN"));
112-
} else {
113-
results.add(Row.of());
114112
}
115113
}
116114
}

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/ResetClusterConfigsProcedure.java

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,9 @@
2929
import java.util.List;
3030

3131
/**
32-
* Procedure to reset cluster configuration dynamically.
32+
* Procedure to reset cluster configurations to their default values.
3333
*
34-
* <p>This procedure allows modifying dynamic cluster configurations. The changes are:
34+
* <p>This procedure reverts the configurations to their initial system defaults. The changes are:
3535
*
3636
* <ul>
3737
* <li>Validated by the CoordinatorServer before persistence
@@ -47,12 +47,12 @@
4747
* CALL sys.reset_cluster_configs('kv.rocksdb.shared-rate-limiter.bytes-per-sec');
4848
*
4949
* -- reset multiple configurations at one time
50-
* CALL sys.reset_cluster_configs('kv.rocksdb.shared-rate-limiter.bytes-per-sec','datalake.format');
50+
* CALL sys.reset_cluster_configs('kv.rocksdb.shared-rate-limiter.bytes-per-sec', 'datalake.format');
5151
*
5252
* </pre>
5353
*
54-
* <p><b>Note:</b> Not all configurations support dynamic changes. The server will validate the
55-
* change and reject it if the configuration cannot be reset dynamically.
54+
* <p><b>Note:</b> In theory, an operation like <b>Reset to default value</b> should always succeed,
55+
* as the default value should be a valid one
5656
*/
5757
public class ResetClusterConfigsProcedure extends ProcedureBase {
5858

@@ -64,27 +64,27 @@ public String[] call(ProcedureContext context, String... configKeys) throws Exce
6464
// Validate config key
6565
if (configKeys.length == 0) {
6666
throw new IllegalArgumentException(
67-
"config_pairs cannot be null or empty. "
67+
"config_keys cannot be null or empty. "
6868
+ "Please specify valid configuration keys.");
6969
}
7070

7171
List<AlterConfig> configList = new ArrayList<>();
72-
StringBuilder resultMessage = new StringBuilder();
72+
List<String> resultMessage = new ArrayList();
7373

7474
for (String key : configKeys) {
7575
String configKey = key.trim();
7676
if (configKey.isEmpty()) {
7777
throw new IllegalArgumentException(
7878
"Config key cannot be null or empty. "
79-
+ "Please specify valid configuration key.");
79+
+ "Please specify a valid configuration key.");
8080
}
8181

8282
String operationDesc = "deleted (reset to default)";
8383

8484
AlterConfig alterConfig =
8585
new AlterConfig(configKey, null, AlterConfigOpType.DELETE);
8686
configList.add(alterConfig);
87-
resultMessage.append(
87+
resultMessage.add(
8888
String.format(
8989
"Successfully %s configuration '%s'. ", operationDesc, configKey));
9090
}
@@ -93,9 +93,7 @@ public String[] call(ProcedureContext context, String... configKeys) throws Exce
9393
// This will trigger validation on CoordinatorServer before persistence
9494
admin.alterClusterConfigs(configList).get();
9595

96-
return new String[] {
97-
resultMessage + "The change is persisted in ZooKeeper and applied to all servers."
98-
};
96+
return resultMessage.toArray(new String[0]);
9997
} catch (IllegalArgumentException e) {
10098
// Re-throw validation errors with original message
10199
throw e;

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/SetClusterConfigsProcedure.java

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -49,9 +49,6 @@
4949
*
5050
* -- Set multiple configurations at one time
5151
* CALL sys.set_cluster_configs('kv.rocksdb.shared-rate-limiter.bytes-per-sec', '200MB','datalake.format', 'paimon');
52-
*
53-
* -- Delete a configuration (reset to default)
54-
* CALL sys.set_cluster_configs('kv.rocksdb.shared-rate-limiter.bytes-per-sec', '');
5552
* </pre>
5653
*
5754
* <p><b>Note:</b> Not all configurations support dynamic changes. The server will validate the
@@ -78,7 +75,7 @@ public String[] call(ProcedureContext context, String... configPairs) throws Exc
7875
+ "Please specify a valid configuration pairs.");
7976
}
8077
List<AlterConfig> configList = new ArrayList<>();
81-
StringBuilder resultMessage = new StringBuilder();
78+
List<String> resultMessage = new ArrayList<>();
8279

8380
for (int i = 0; i < configPairs.length; i += 2) {
8481
String configKey = configPairs[i].trim();
@@ -95,7 +92,7 @@ public String[] call(ProcedureContext context, String... configPairs) throws Exc
9592
AlterConfig alterConfig =
9693
new AlterConfig(configKey, configValue, AlterConfigOpType.SET);
9794
configList.add(alterConfig);
98-
resultMessage.append(
95+
resultMessage.add(
9996
String.format(
10097
"Successfully %s configuration '%s'. ", operationDesc, configKey));
10198
}
@@ -104,9 +101,7 @@ public String[] call(ProcedureContext context, String... configPairs) throws Exc
104101
// This will trigger validation on CoordinatorServer before persistence
105102
admin.alterClusterConfigs(configList).get();
106103

107-
return new String[] {
108-
resultMessage + "The change is persisted in ZooKeeper and applied to all servers."
109-
};
104+
return resultMessage.toArray(new String[0]);
110105
} catch (IllegalArgumentException e) {
111106
// Re-throw validation errors with original message
112107
throw e;

fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/procedure/FlinkProcedureITCase.java

Lines changed: 94 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -284,6 +284,31 @@ void testGetClusterConfigs() throws Exception {
284284
assertThat(row.getField(2)).isNotNull(); // config_source
285285
}
286286

287+
// Get multiple config
288+
try (CloseableIterator<Row> resultIterator =
289+
tEnv.executeSql(
290+
String.format(
291+
"Call %s.sys.get_cluster_configs('%s', '%s')",
292+
CATALOG_NAME,
293+
ConfigOptions.KV_SHARED_RATE_LIMITER_BYTES_PER_SEC.key(),
294+
ConfigOptions.DATALAKE_FORMAT.key()))
295+
.collect()) {
296+
List<Row> results = CollectionUtil.iteratorToList(resultIterator);
297+
assertThat(results).hasSize(2);
298+
// the first row
299+
Row row0 = results.get(0);
300+
assertThat(row0.getField(0))
301+
.isEqualTo(ConfigOptions.KV_SHARED_RATE_LIMITER_BYTES_PER_SEC.key());
302+
assertThat(row0.getField(1)).isEqualTo("100 mb");
303+
assertThat(row0.getField(2)).isNotNull(); // config_source
304+
305+
// the second row
306+
Row row1 = results.get(1);
307+
assertThat(row1.getField(0)).isEqualTo(ConfigOptions.DATALAKE_FORMAT.key());
308+
assertThat(row1.getField(1)).isEqualTo("paimon");
309+
assertThat(row1.getField(2)).isNotNull(); // config_source
310+
}
311+
287312
// Get all configs
288313
try (CloseableIterator<Row> resultIterator =
289314
tEnv.executeSql(String.format("Call %s.sys.get_cluster_configs()", CATALOG_NAME))
@@ -340,11 +365,14 @@ void testSetClusterConfigs() throws Exception {
340365
ConfigOptions.DATALAKE_FORMAT.key()))
341366
.collect()) {
342367
List<Row> results = CollectionUtil.iteratorToList(resultIterator);
343-
assertThat(results).hasSize(1);
368+
assertThat(results).hasSize(2);
344369
assertThat(results.get(0).getField(0))
345370
.asString()
346371
.contains("Successfully set to '300MB'")
347-
.contains(ConfigOptions.KV_SHARED_RATE_LIMITER_BYTES_PER_SEC.key())
372+
.contains(ConfigOptions.KV_SHARED_RATE_LIMITER_BYTES_PER_SEC.key());
373+
374+
assertThat(results.get(1).getField(0))
375+
.asString()
348376
.contains("Successfully set to 'paimon'")
349377
.contains(ConfigOptions.DATALAKE_FORMAT.key());
350378
}
@@ -365,9 +393,10 @@ void testSetClusterConfigs() throws Exception {
365393
// reset cluster configs.
366394
tEnv.executeSql(
367395
String.format(
368-
"Call %s.sys.reset_cluster_configs('%s')",
396+
"Call %s.sys.reset_cluster_configs('%s', '%s')",
369397
CATALOG_NAME,
370-
ConfigOptions.KV_SHARED_RATE_LIMITER_BYTES_PER_SEC.key()))
398+
ConfigOptions.KV_SHARED_RATE_LIMITER_BYTES_PER_SEC.key(),
399+
ConfigOptions.DATALAKE_FORMAT.key()))
371400
.await();
372401
}
373402

@@ -392,11 +421,15 @@ void testResetClusterConfigs() throws Exception {
392421
ConfigOptions.KV_SHARED_RATE_LIMITER_BYTES_PER_SEC.key()))
393422
.collect()) {
394423
List<Row> results = CollectionUtil.iteratorToList(resultIterator);
395-
assertThat(results).hasSize(1);
424+
assertThat(results).hasSize(2);
396425
assertThat(results.get(0).getField(0))
397426
.asString()
398427
.contains("Successfully deleted")
399-
.contains(ConfigOptions.DATALAKE_FORMAT.key())
428+
.contains(ConfigOptions.DATALAKE_FORMAT.key());
429+
430+
assertThat(results.get(1).getField(0))
431+
.asString()
432+
.contains("Successfully deleted")
400433
.contains(ConfigOptions.KV_SHARED_RATE_LIMITER_BYTES_PER_SEC.key());
401434
}
402435
}
@@ -428,7 +461,61 @@ void testSetClusterConfigValidation() throws Exception {
428461
.await())
429462
.rootCause()
430463
.hasMessageContaining(
431-
"config_pairs must be set in pairs. Please specify a valid configuration pairs.");
464+
"config_pairs must be set in pairs. Please specify a valid configuration pairs");
465+
466+
// Try to no parameters passed
467+
assertThatThrownBy(
468+
() ->
469+
tEnv.executeSql(
470+
String.format(
471+
"Call %s.sys.set_cluster_configs()",
472+
CATALOG_NAME))
473+
.await())
474+
.rootCause()
475+
.hasMessageContaining(
476+
"config_pairs cannot be null or empty. Please specify a valid configuration pairs");
477+
478+
// Try to mismatched key-value pairs in the input parameters.
479+
assertThatThrownBy(
480+
() ->
481+
tEnv.executeSql(
482+
String.format(
483+
"Call %s.sys.set_cluster_configs('%s', 'paimon')",
484+
CATALOG_NAME,
485+
ConfigOptions
486+
.KV_SHARED_RATE_LIMITER_BYTES_PER_SEC
487+
.key()))
488+
.await())
489+
.rootCause()
490+
.hasMessageContaining(
491+
"Could not parse value 'paimon' for key 'kv.rocksdb.shared-rate-limiter.bytes-per-sec'");
492+
}
493+
494+
@Test
495+
void testResetClusterConfigValidation() throws Exception {
496+
// Try to reset an invalid config
497+
assertThatThrownBy(
498+
() ->
499+
tEnv.executeSql(
500+
String.format(
501+
"Call %s.sys.reset_cluster_configs('invalid.config.key')",
502+
CATALOG_NAME))
503+
.await())
504+
.rootCause()
505+
.hasMessageContaining(
506+
"The config key invalid.config.key is not allowed to be changed dynamically");
507+
508+
// Try to no parameters passed
509+
assertThatThrownBy(
510+
() ->
511+
tEnv.executeSql(
512+
String.format(
513+
"Call %s.sys.reset_cluster_configs()",
514+
CATALOG_NAME))
515+
.await())
516+
.rootCause()
517+
.hasMessageContaining(
518+
"config_keys cannot be null or empty. Please specify valid configuration keys");
432519
}
433520

434521
@ParameterizedTest

0 commit comments

Comments
 (0)