Skip to content

Commit 37e3c84

Browse files
authored
[FLINK-37406] Add support for structured YAML config in FlinkDeployment CRD
1 parent b72ad7f commit 37e3c84

File tree

39 files changed

+11487
-120
lines changed

39 files changed

+11487
-120
lines changed

docs/content/docs/custom-resource/reference.md

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,22 @@ This serves as a full reference for FlinkDeployment and FlinkSessionJob custom r
5757
| Parameter | Type | Docs |
5858
| ----------| ---- | ---- |
5959

60+
### ConfigObjectNode
61+
**Class**: org.apache.flink.kubernetes.operator.api.spec.ConfigObjectNode
62+
63+
**Description**: Allows parsing configurations as YAML, and adds related utility methods.
64+
65+
| Parameter | Type | Docs |
66+
| ----------| ---- | ---- |
67+
68+
### ConfigObjectNodeDeserializer
69+
**Class**: org.apache.flink.kubernetes.operator.api.spec.ConfigObjectNodeDeserializer
70+
71+
**Description**: Allows to deserialize to ConfigObjectNode.
72+
73+
| Parameter | Type | Docs |
74+
| ----------| ---- | ---- |
75+
6076
### FlinkDeploymentSpec
6177
**Class**: org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentSpec
6278

@@ -66,7 +82,7 @@ This serves as a full reference for FlinkDeployment and FlinkSessionJob custom r
6682
| ----------| ---- | ---- |
6783
| job | org.apache.flink.kubernetes.operator.api.spec.JobSpec | Job specification for application deployments/session job. Null for session clusters. |
6884
| restartNonce | java.lang.Long | Nonce used to manually trigger restart for the cluster/session job. In order to trigger restart, change the number to a different non-null value. |
69-
| flinkConfiguration | java.util.Map<java.lang.String,java.lang.String> | Flink configuration overrides for the Flink deployment or Flink session job. |
85+
| flinkConfiguration | org.apache.flink.kubernetes.operator.api.spec.ConfigObjectNode | Flink configuration overrides for the Flink deployment or Flink session job. |
7086
| image | java.lang.String | Flink docker image used to start the Job and TaskManager pods. |
7187
| imagePullPolicy | java.lang.String | Image pull policy of the Flink docker image. |
7288
| serviceAccount | java.lang.String | Kubernetes service used by the Flink deployment. |
@@ -87,7 +103,7 @@ This serves as a full reference for FlinkDeployment and FlinkSessionJob custom r
87103
| ----------| ---- | ---- |
88104
| job | org.apache.flink.kubernetes.operator.api.spec.JobSpec | Job specification for application deployments/session job. Null for session clusters. |
89105
| restartNonce | java.lang.Long | Nonce used to manually trigger restart for the cluster/session job. In order to trigger restart, change the number to a different non-null value. |
90-
| flinkConfiguration | java.util.Map<java.lang.String,java.lang.String> | Flink configuration overrides for the Flink deployment or Flink session job. |
106+
| flinkConfiguration | org.apache.flink.kubernetes.operator.api.spec.ConfigObjectNode | Flink configuration overrides for the Flink deployment or Flink session job. |
91107
| deploymentName | java.lang.String | The name of the target session cluster deployment. |
92108

93109
### FlinkStateSnapshotSpec

e2e-tests/data/autoscaler.yaml

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -30,17 +30,19 @@ spec:
3030
annotations:
3131
nginx.ingress.kubernetes.io/rewrite-target: "/$2"
3232
flinkConfiguration:
33-
taskmanager.numberOfTaskSlots: "2"
34-
high-availability.type: kubernetes
35-
high-availability.storageDir: file:///opt/flink/volume/flink-ha
36-
state.checkpoints.dir: file:///opt/flink/volume/flink-cp
37-
state.savepoints.dir: file:///opt/flink/volume/flink-sp
38-
39-
job.autoscaler.enabled: "true"
40-
job.autoscaler.scaling.enabled: "true"
41-
job.autoscaler.stabilization.interval: "5s"
42-
job.autoscaler.metrics.window: "1m"
43-
job.autoscaler.scale-down.interval: "0m"
33+
taskmanager.numberOfTaskSlots: 2
34+
high-availability:
35+
type: kubernetes
36+
storageDir: file:///opt/flink/volume/flink-ha
37+
state:
38+
checkpoints.dir: file:///opt/flink/volume/flink-cp
39+
savepoints.dir: file:///opt/flink/volume/flink-sp
40+
job.autoscaler:
41+
enabled: "true"
42+
scaling.enabled: "true"
43+
stabilization.interval: "5s"
44+
metrics.window: "1m"
45+
scale-down.interval: "0m"
4446

4547
# Invalid Validations for testing autoscaler configurations
4648
# kubernetes.operator.job.autoscaler.scale-down.max-factor: "-0.6"

e2e-tests/data/flinkdep-batch-cr.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,8 @@ spec:
3030
annotations:
3131
nginx.ingress.kubernetes.io/rewrite-target: "/$2"
3232
flinkConfiguration:
33-
taskmanager.numberOfTaskSlots: "2"
34-
kubernetes.operator.snapshot.resource.enabled: "false"
33+
taskmanager.numberOfTaskSlots: 2
34+
kubernetes.operator.snapshot.resource.enabled: false
3535
serviceAccount: flink
3636
podTemplate:
3737
spec:

e2e-tests/data/flinkdep-cr.yaml

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -30,12 +30,14 @@ spec:
3030
annotations:
3131
nginx.ingress.kubernetes.io/rewrite-target: "/$2"
3232
flinkConfiguration:
33-
taskmanager.numberOfTaskSlots: "2"
34-
high-availability.type: kubernetes
35-
high-availability.storageDir: file:///opt/flink/volume/flink-ha
36-
state.checkpoints.dir: file:///opt/flink/volume/flink-cp
37-
state.savepoints.dir: file:///opt/flink/volume/flink-sp
38-
kubernetes.operator.snapshot.resource.enabled: "false"
33+
taskmanager.numberOfTaskSlots: 2
34+
high-availability:
35+
type: kubernetes
36+
storageDir: file:///opt/flink/volume/flink-ha
37+
state:
38+
checkpoints.dir: file:///opt/flink/volume/flink-cp
39+
savepoints.dir: file:///opt/flink/volume/flink-sp
40+
kubernetes.operator.snapshot.resource.enabled: false
3941
serviceAccount: flink
4042
podTemplate:
4143
spec:

e2e-tests/data/multi-sessionjob.yaml

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,11 +30,13 @@ spec:
3030
annotations:
3131
nginx.ingress.kubernetes.io/rewrite-target: "/$2"
3232
flinkConfiguration:
33-
taskmanager.numberOfTaskSlots: "2"
34-
high-availability.type: kubernetes
35-
high-availability.storageDir: file:///opt/flink/volume/flink-ha
36-
state.checkpoints.dir: file:///opt/flink/volume/flink-cp
37-
state.savepoints.dir: file:///opt/flink/volume/flink-sp
33+
taskmanager.numberOfTaskSlots: 2
34+
high-availability:
35+
type: kubernetes
36+
storageDir: file:///opt/flink/volume/flink-ha
37+
state:
38+
checkpoints.dir: file:///opt/flink/volume/flink-cp
39+
savepoints.dir: file:///opt/flink/volume/flink-sp
3840
serviceAccount: flink
3941
podTemplate:
4042
spec:

e2e-tests/data/sessionjob-cr.yaml

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -30,12 +30,14 @@ spec:
3030
annotations:
3131
nginx.ingress.kubernetes.io/rewrite-target: "/$2"
3232
flinkConfiguration:
33-
taskmanager.numberOfTaskSlots: "2"
34-
high-availability.type: kubernetes
35-
high-availability.storageDir: file:///opt/flink/volume/flink-ha
36-
state.checkpoints.dir: file:///opt/flink/volume/flink-cp
37-
state.savepoints.dir: file:///opt/flink/volume/flink-sp
38-
kubernetes.operator.snapshot.resource.enabled: "false"
33+
taskmanager.numberOfTaskSlots: 2
34+
high-availability:
35+
type: kubernetes
36+
storageDir: file:///opt/flink/volume/flink-ha
37+
state:
38+
checkpoints.dir: file:///opt/flink/volume/flink-cp
39+
savepoints.dir: file:///opt/flink/volume/flink-sp
40+
kubernetes.operator.snapshot.resource.enabled: false
3941
serviceAccount: flink
4042
podTemplate:
4143
spec:

flink-kubernetes-operator-api/pom.xml

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ under the License.
125125
<version>${flink.version}</version>
126126
<scope>test</scope>
127127
</dependency>
128-
128+
129129
<dependency>
130130
<groupId>org.apache.flink</groupId>
131131
<artifactId>flink-runtime</artifactId>
@@ -138,6 +138,33 @@ under the License.
138138
<artifactId>junit-jupiter-params</artifactId>
139139
<scope>test</scope>
140140
</dependency>
141+
142+
<dependency>
143+
<groupId>org.assertj</groupId>
144+
<artifactId>assertj-core</artifactId>
145+
<version>${assertj.version}</version>
146+
<scope>test</scope>
147+
</dependency>
148+
149+
<dependency>
150+
<groupId>io.fabric8</groupId>
151+
<artifactId>kube-api-test-client-inject</artifactId>
152+
<version>${fabric8.version}</version>
153+
<scope>test</scope>
154+
</dependency>
155+
156+
<dependency>
157+
<groupId>io.fabric8</groupId>
158+
<artifactId>kubernetes-httpclient-${fabric8.httpclient.impl}</artifactId>
159+
<version>${fabric8.version}</version>
160+
<exclusions>
161+
<exclusion>
162+
<groupId>com.squareup.okhttp3</groupId>
163+
<artifactId>okhttp</artifactId>
164+
</exclusion>
165+
</exclusions>
166+
<scope>test</scope>
167+
</dependency>
141168
</dependencies>
142169

143170
<profiles>

flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/AbstractFlinkSpec.java

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,18 @@
1818
package org.apache.flink.kubernetes.operator.api.spec;
1919

2020
import org.apache.flink.annotation.Experimental;
21+
import org.apache.flink.configuration.Configuration;
2122
import org.apache.flink.kubernetes.operator.api.diff.DiffType;
2223
import org.apache.flink.kubernetes.operator.api.diff.Diffable;
2324
import org.apache.flink.kubernetes.operator.api.diff.SpecDiff;
2425

26+
import com.fasterxml.jackson.annotation.JsonIgnore;
27+
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
2528
import lombok.AllArgsConstructor;
2629
import lombok.Data;
2730
import lombok.NoArgsConstructor;
2831
import lombok.experimental.SuperBuilder;
32+
import lombok.experimental.Tolerate;
2933

3034
import java.util.Map;
3135

@@ -57,5 +61,18 @@ public abstract class AbstractFlinkSpec implements Diffable<AbstractFlinkSpec> {
5761
type = DiffType.SCALE,
5862
mode = KubernetesDeploymentMode.NATIVE)
5963
})
60-
private Map<String, String> flinkConfiguration;
64+
@JsonDeserialize(using = ConfigObjectNodeDeserializer.class)
65+
private ConfigObjectNode flinkConfiguration = new ConfigObjectNode();
66+
67+
@Tolerate
68+
@JsonIgnore
69+
public void setFlinkConfiguration(Map<String, String> config) {
70+
flinkConfiguration.setAllFrom(config);
71+
}
72+
73+
@Tolerate
74+
@JsonIgnore
75+
public void setFlinkConfiguration(Configuration config) {
76+
setFlinkConfiguration(config.toMap());
77+
}
6178
}
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.kubernetes.operator.api.spec;
19+
20+
import org.apache.flink.configuration.Configuration;
21+
22+
import com.fasterxml.jackson.databind.JsonNode;
23+
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
24+
import com.fasterxml.jackson.databind.node.ObjectNode;
25+
26+
import java.util.Arrays;
27+
import java.util.HashMap;
28+
import java.util.Iterator;
29+
import java.util.Map;
30+
31+
/** Allows parsing configurations as YAML, and adds related utility methods. */
32+
public class ConfigObjectNode extends ObjectNode {
33+
34+
public ConfigObjectNode() {
35+
this(JsonNodeFactory.instance);
36+
}
37+
38+
public ConfigObjectNode(JsonNodeFactory nc, Map<String, JsonNode> kids) {
39+
super(nc, kids);
40+
}
41+
42+
public ConfigObjectNode(JsonNodeFactory nc) {
43+
super(nc);
44+
}
45+
46+
public void remove(String... names) {
47+
remove(Arrays.asList(names));
48+
}
49+
50+
public void putAllFrom(Map<String, String> value) {
51+
value.forEach(this::put);
52+
}
53+
54+
public void setAllFrom(Map<String, String> value) {
55+
removeAll();
56+
putAllFrom(value);
57+
}
58+
59+
public Map<String, String> asFlatMap() {
60+
Map<String, String> flatMap = new HashMap<>();
61+
flattenHelper(this, "", flatMap);
62+
return flatMap;
63+
}
64+
65+
public Configuration asConfiguration() {
66+
return Configuration.fromMap(asFlatMap());
67+
}
68+
69+
private static void flattenHelper(
70+
JsonNode node, String parentKey, Map<String, String> flatMap) {
71+
if (node.isObject()) {
72+
Iterator<Map.Entry<String, JsonNode>> fields = node.fields();
73+
while (fields.hasNext()) {
74+
Map.Entry<String, JsonNode> field = fields.next();
75+
String newKey =
76+
parentKey.isEmpty() ? field.getKey() : parentKey + "." + field.getKey();
77+
flattenHelper(field.getValue(), newKey, flatMap);
78+
}
79+
} else if (node.isArray()) {
80+
for (int i = 0; i < node.size(); i++) {
81+
String newKey = parentKey + "[" + i + "]";
82+
flattenHelper(node.get(i), newKey, flatMap);
83+
}
84+
} else {
85+
// Store values as strings
86+
flatMap.put(parentKey, node.asText());
87+
}
88+
}
89+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.kubernetes.operator.api.spec;
19+
20+
import com.fasterxml.jackson.core.JsonParser;
21+
import com.fasterxml.jackson.databind.DeserializationContext;
22+
import com.fasterxml.jackson.databind.JsonDeserializer;
23+
import com.fasterxml.jackson.databind.node.ObjectNode;
24+
25+
import java.io.IOException;
26+
27+
/** Allows to deserialize to ConfigObjectNode. */
28+
public class ConfigObjectNodeDeserializer extends JsonDeserializer<ConfigObjectNode> {
29+
30+
@Override
31+
public ConfigObjectNode deserialize(
32+
JsonParser jsonParser, DeserializationContext deserializationContext)
33+
throws IOException {
34+
ObjectNode tree = jsonParser.readValueAsTree();
35+
var res = new ConfigObjectNode();
36+
tree.fields().forEachRemaining(entry -> res.set(entry.getKey(), entry.getValue()));
37+
return res;
38+
}
39+
}

0 commit comments

Comments
 (0)