Skip to content

Commit 381951b

Browse files
committed
improvements + cleanup
Signed-off-by: Attila Mészáros <[email protected]>
1 parent 3ac4339 commit 381951b

File tree

9 files changed

+56
-51
lines changed

9 files changed

+56
-51
lines changed

examples/kubernetes-client-examples/src/main/java/org/apache/flink/examples/Basic.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ public static void main(String[] args) {
4949
flinkDeploymentSpec.setImage("flink:1.19");
5050
Map<String, String> flinkConfiguration =
5151
Map.ofEntries(entry("taskmanager.numberOfTaskSlots", "2"));
52-
flinkDeploymentSpec.getFlinkConfiguration().set(flinkConfiguration);
52+
flinkDeploymentSpec.setConfiguration(flinkConfiguration);
5353
flinkDeployment.setSpec(flinkDeploymentSpec);
5454
flinkDeploymentSpec.setServiceAccount("flink");
5555
JobManagerSpec jobManagerSpec = new JobManagerSpec();

flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/AbstractFlinkSpec.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,15 +18,20 @@
1818
package org.apache.flink.kubernetes.operator.api.spec;
1919

2020
import org.apache.flink.annotation.Experimental;
21+
import org.apache.flink.configuration.Configuration;
2122
import org.apache.flink.kubernetes.operator.api.diff.DiffType;
2223
import org.apache.flink.kubernetes.operator.api.diff.Diffable;
2324
import org.apache.flink.kubernetes.operator.api.diff.SpecDiff;
2425

26+
import com.fasterxml.jackson.annotation.JsonIgnore;
2527
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
2628
import lombok.AllArgsConstructor;
2729
import lombok.Data;
2830
import lombok.NoArgsConstructor;
2931
import lombok.experimental.SuperBuilder;
32+
import lombok.experimental.Tolerate;
33+
34+
import java.util.Map;
3035

3136
/** The common spec. */
3237
@Experimental
@@ -58,4 +63,16 @@ public abstract class AbstractFlinkSpec implements Diffable<AbstractFlinkSpec> {
5863
})
5964
@JsonDeserialize(using = ConfigObjectNodeDeserializer.class)
6065
private ConfigObjectNode flinkConfiguration = new ConfigObjectNode();
66+
67+
@Tolerate
68+
@JsonIgnore
69+
public void setConfiguration(Map<String, String> config) {
70+
flinkConfiguration.setAllFrom(config);
71+
}
72+
73+
@Tolerate
74+
@JsonIgnore
75+
public void setConfiguration(Configuration config) {
76+
setConfiguration(config.toMap());
77+
}
6178
}

flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/ConfigObjectNode.java

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,13 @@
1717

1818
package org.apache.flink.kubernetes.operator.api.spec;
1919

20+
import org.apache.flink.configuration.Configuration;
21+
2022
import com.fasterxml.jackson.databind.JsonNode;
2123
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
2224
import com.fasterxml.jackson.databind.node.ObjectNode;
2325

26+
import java.util.Arrays;
2427
import java.util.HashMap;
2528
import java.util.Iterator;
2629
import java.util.Map;
@@ -40,17 +43,15 @@ public ConfigObjectNode(JsonNodeFactory nc) {
4043
super(nc);
4144
}
4245

43-
public void removeAll(String... names) {
44-
for (String name : names) {
45-
remove(name);
46-
}
46+
public void remove(String... names) {
47+
remove(Arrays.asList(names));
4748
}
4849

4950
public void putAllFrom(Map<String, String> value) {
5051
value.forEach(this::put);
5152
}
5253

53-
public void set(Map<String, String> value) {
54+
public void setAllFrom(Map<String, String> value) {
5455
removeAll();
5556
putAllFrom(value);
5657
}
@@ -61,6 +62,10 @@ public Map<String, String> asFlatMap() {
6162
return flatMap;
6263
}
6364

65+
public Configuration asConfiguration() {
66+
return Configuration.fromMap(asFlatMap());
67+
}
68+
6469
private static void flattenHelper(
6570
JsonNode node, String parentKey, Map<String, String> flatMap) {
6671
if (node.isObject()) {

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesScalingRealizer.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import org.apache.flink.autoscaler.realizer.ScalingRealizer;
2121
import org.apache.flink.autoscaler.tuning.ConfigChanges;
2222
import org.apache.flink.autoscaler.tuning.MemoryTuning;
23-
import org.apache.flink.configuration.Configuration;
2423
import org.apache.flink.configuration.ConfigurationUtils;
2524
import org.apache.flink.configuration.MemorySize;
2625
import org.apache.flink.configuration.PipelineOptions;
@@ -69,8 +68,7 @@ public void realizeConfigOverrides(
6968
// Update total memory in spec
7069
var totalMemoryOverride =
7170
MemoryTuning.getTotalMemory(
72-
Configuration.fromMap(
73-
flinkDeployment.getSpec().getFlinkConfiguration().asFlatMap()),
71+
flinkDeployment.getSpec().getFlinkConfiguration().asConfiguration(),
7472
context);
7573
if (totalMemoryOverride.compareTo(MemorySize.ZERO) <= 0) {
7674
LOG.warn("Total memory override {} is not valid", totalMemoryOverride);

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -372,7 +372,7 @@ private void applyConfigsFromCurrentSpec(
372372
AbstractFlinkSpec spec, Configuration conf, ConfigOption... configOptions) {
373373
addOperatorConfigsFromSpec(spec, conf);
374374
if (spec.getFlinkConfiguration() != null) {
375-
var deployConfig = Configuration.fromMap(spec.getFlinkConfiguration().asFlatMap());
375+
var deployConfig = spec.getFlinkConfiguration().asConfiguration();
376376
for (ConfigOption configOption : configOptions) {
377377
deployConfig.getOptional(configOption).ifPresent(v -> conf.set(configOption, v));
378378
}

flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerUpgradeModeTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -751,7 +751,7 @@ public void testLastStateOnDeletedDeployment() throws Exception {
751751
verifyAndSetRunningJobsToStatus(deployment, flinkService.listJobs());
752752

753753
// Delete cluster and keep HA metadata
754-
var conf = Configuration.fromMap(deployment.getSpec().getFlinkConfiguration().asFlatMap());
754+
var conf = deployment.getSpec().getFlinkConfiguration().asConfiguration();
755755
flinkService.deleteClusterDeployment(
756756
deployment.getMetadata(), deployment.getStatus(), conf, false);
757757
flinkService.setHaDataAvailable(true);

flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/diff/SpecDiffTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ public void testFlinkDeploymentSpecChanges() {
101101
assertEquals(8, diff.getNumDiffs());
102102

103103
right.getFlinkConfiguration()
104-
.removeAll(
104+
.remove(
105105
SCOPE_NAMING_KUBERNETES_OPERATOR.key(),
106106
AutoScalerOptions.METRICS_WINDOW.key());
107107

flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/NativeFlinkServiceTest.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -277,17 +277,17 @@ protected void updateVertexResources(
277277
var spec = flinkDep.getSpec();
278278
spec.setFlinkVersion(FlinkVersion.v1_18);
279279

280-
var appConfig = Configuration.fromMap(spec.getFlinkConfiguration().asFlatMap());
280+
var appConfig = spec.getFlinkConfiguration().asConfiguration();
281281
appConfig.set(JobManagerOptions.SCHEDULER, JobManagerOptions.SchedulerType.Adaptive);
282282

283-
spec.getFlinkConfiguration().set(appConfig.toMap());
283+
spec.setConfiguration(appConfig);
284284
var reconStatus = flinkDep.getStatus().getReconciliationStatus();
285285
reconStatus.serializeAndSetLastReconciledSpec(spec, flinkDep);
286286

287287
appConfig.set(
288288
PipelineOptions.PARALLELISM_OVERRIDES,
289289
Map.of(v1.toHexString(), "4", v2.toHexString(), "1"));
290-
spec.getFlinkConfiguration().set(appConfig.toMap());
290+
spec.setConfiguration(appConfig);
291291

292292
flinkDep.getStatus().getJobStatus().setState(JobStatus.RUNNING);
293293

@@ -321,7 +321,7 @@ protected void updateVertexResources(
321321

322322
// Baseline
323323
appConfig.set(PipelineOptions.PARALLELISM_OVERRIDES, Map.of(v1.toHexString(), "4"));
324-
spec.getFlinkConfiguration().set(appConfig.toMap());
324+
spec.setConfiguration(appConfig);
325325
testScaleConditionDep(flinkDep, service, d -> {}, true);
326326
testScaleConditionLastSpec(flinkDep, service, d -> {}, true);
327327

flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java

Lines changed: 20 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,7 @@ public void testValidationWithoutDefaultConfig() {
151151

152152
testError(
153153
dep -> {
154-
dep.getSpec().getFlinkConfiguration().set(new HashMap<>());
154+
dep.getSpec().getFlinkConfiguration().setAllFrom(new HashMap<>());
155155
dep.getSpec()
156156
.getJob()
157157
.setSavepointTriggerNonce(ThreadLocalRandom.current().nextLong());
@@ -163,8 +163,7 @@ public void testValidationWithoutDefaultConfig() {
163163
testError(
164164
dep ->
165165
dep.getSpec()
166-
.getFlinkConfiguration()
167-
.set(
166+
.setConfiguration(
168167
Map.of(
169168
KubernetesOperatorConfigOptions
170169
.PERIODIC_SAVEPOINT_INTERVAL
@@ -177,8 +176,7 @@ public void testValidationWithoutDefaultConfig() {
177176
testError(
178177
dep ->
179178
dep.getSpec()
180-
.getFlinkConfiguration()
181-
.set(
179+
.setConfiguration(
182180
Map.of(
183181
KubernetesOperatorConfigOptions
184182
.OPERATOR_JOB_UPGRADE_LAST_STATE_CHECKPOINT_MAX_AGE
@@ -192,13 +190,11 @@ public void testValidationWithoutDefaultConfig() {
192190
testSuccess(
193191
dep ->
194192
dep.getSpec()
195-
.getFlinkConfiguration()
196-
.set(Collections.singletonMap("random", "config")));
193+
.setConfiguration(Collections.singletonMap("random", "config")));
197194
testError(
198195
dep ->
199196
dep.getSpec()
200-
.getFlinkConfiguration()
201-
.set(
197+
.setConfiguration(
202198
Collections.singletonMap(
203199
KubernetesConfigOptions.NAMESPACE.key(), "myns")),
204200
"Forbidden Flink config key");
@@ -207,7 +203,7 @@ public void testValidationWithoutDefaultConfig() {
207203
dep ->
208204
dep.getSpec()
209205
.getFlinkConfiguration()
210-
.set(
206+
.setAllFrom(
211207
Collections.singletonMap(
212208
HighAvailabilityOptions.HA_CLUSTER_ID.key(),
213209
"my-cluster-id")),
@@ -216,8 +212,7 @@ public void testValidationWithoutDefaultConfig() {
216212
testError(
217213
dep ->
218214
dep.getSpec()
219-
.getFlinkConfiguration()
220-
.set(
215+
.setConfiguration(
221216
Map.of(
222217
KubernetesOperatorConfigOptions
223218
.OPERATOR_CLUSTER_HEALTH_CHECK_ENABLED
@@ -265,16 +260,15 @@ public void testValidationWithoutDefaultConfig() {
265260

266261
testError(
267262
dep -> {
268-
dep.getSpec().getFlinkConfiguration().set(new HashMap<>());
263+
dep.getSpec().setConfiguration(new HashMap<>());
269264
dep.getSpec().getJobManager().setReplicas(2);
270265
},
271266
"High availability should be enabled when starting standby JobManagers.");
272267

273268
testError(
274269
dep ->
275270
dep.getSpec()
276-
.getFlinkConfiguration()
277-
.set(
271+
.setConfiguration(
278272
Map.of(
279273
KubernetesOperatorConfigOptions
280274
.DEPLOYMENT_ROLLBACK_ENABLED
@@ -309,32 +303,25 @@ public void testValidationWithoutDefaultConfig() {
309303
testError(
310304
dep -> {
311305
dep.getSpec().getTaskManager().getResource().setMemory(null);
312-
dep.getSpec()
313-
.getFlinkConfiguration()
314-
.set(Map.of(TASK_HEAP_MEMORY.key(), "1024m"));
306+
dep.getSpec().setConfiguration(Map.of(TASK_HEAP_MEMORY.key(), "1024m"));
315307
},
316308
"TaskManager resource memory must be defined using `spec.taskManager.resource.memory`");
317309

318310
testSuccess(
319311
dep -> {
320312
dep.getSpec().getJobManager().getResource().setMemory(null);
321-
dep.getSpec()
322-
.getFlinkConfiguration()
323-
.set(Map.of(JVM_HEAP_MEMORY.key(), "2048m"));
313+
dep.getSpec().setConfiguration(Map.of(JVM_HEAP_MEMORY.key(), "2048m"));
324314
});
325315
testSuccess(
326316
dep -> {
327317
dep.getSpec().getTaskManager().getResource().setMemory(null);
328-
dep.getSpec()
329-
.getFlinkConfiguration()
330-
.set(Map.of(TOTAL_FLINK_MEMORY.key(), "2048m"));
318+
dep.getSpec().setConfiguration(Map.of(TOTAL_FLINK_MEMORY.key(), "2048m"));
331319
});
332320
testSuccess(
333321
dep -> {
334322
dep.getSpec().getTaskManager().getResource().setMemory(null);
335323
dep.getSpec()
336-
.getFlinkConfiguration()
337-
.set(
324+
.setConfiguration(
338325
Map.of(
339326
TASK_HEAP_MEMORY.key(),
340327
"1024m",
@@ -521,7 +508,7 @@ public void testValidationWithDefaultConfig() {
521508
new DefaultValidator(new FlinkConfigManager(defaultFlinkConf));
522509
testSuccess(
523510
dep -> {
524-
dep.getSpec().getFlinkConfiguration().set(new HashMap<>());
511+
dep.getSpec().setConfiguration(new HashMap<>());
525512
dep.getSpec().getJob().setUpgradeMode(UpgradeMode.LAST_STATE);
526513
},
527514
validatorWithDefaultConfig);
@@ -707,8 +694,7 @@ public void testSessionJobWithSession() {
707694
sessionJob ->
708695
sessionJob
709696
.getSpec()
710-
.getFlinkConfiguration()
711-
.set(
697+
.setConfiguration(
712698
Map.of(
713699
KubernetesOperatorConfigOptions
714700
.JAR_ARTIFACT_HTTP_HEADER
@@ -721,8 +707,7 @@ public void testSessionJobWithSession() {
721707
sessionJob ->
722708
sessionJob
723709
.getSpec()
724-
.getFlinkConfiguration()
725-
.set(
710+
.setConfiguration(
726711
Map.of(
727712
KubernetesOperatorConfigOptions
728713
.PERIODIC_SAVEPOINT_INTERVAL
@@ -748,15 +733,15 @@ public void testSessionJobWithSession() {
748733
sessionJob
749734
.getSpec()
750735
.getFlinkConfiguration()
751-
.set(
736+
.setAllFrom(
752737
Map.of(
753738
CheckpointingOptions.SAVEPOINT_DIRECTORY.key(),
754739
"test-savepoint-dir",
755740
CheckpointingOptions.CHECKPOINTS_DIRECTORY.key(),
756741
"test-checkpoint-dir"));
757742
},
758743
flinkDeployment -> {
759-
flinkDeployment.getSpec().getFlinkConfiguration().set(Map.of());
744+
flinkDeployment.getSpec().setConfiguration(Map.of());
760745
},
761746
null);
762747
}
@@ -1109,7 +1094,7 @@ private Optional<String> testSessionJobAutoScalerConfiguration(
11091094
var sessionJob = TestUtils.buildSessionJob();
11101095
var flinkConfiguration = getDefaultTestAutoScalerFlinkConfigurationMap();
11111096
flinkConfigurationModifier.accept(flinkConfiguration);
1112-
sessionCluster.getSpec().getFlinkConfiguration().set(flinkConfiguration);
1097+
sessionCluster.getSpec().setConfiguration(flinkConfiguration);
11131098
return validator.validateSessionJob(sessionJob, Optional.of(sessionCluster));
11141099
}
11151100

@@ -1118,7 +1103,7 @@ public Optional<String> testAutoScalerConfiguration(
11181103
FlinkDeployment deployment = TestUtils.buildApplicationCluster();
11191104
var flinkConfiguration = getDefaultTestAutoScalerFlinkConfigurationMap();
11201105
flinkConfigurationModifier.accept(flinkConfiguration);
1121-
deployment.getSpec().getFlinkConfiguration().set(flinkConfiguration);
1106+
deployment.getSpec().setConfiguration(flinkConfiguration);
11221107
return validator.validateDeployment(deployment);
11231108
}
11241109

0 commit comments

Comments
 (0)