Skip to content

Commit cce0527

Browse files
Merge branch 'apache:main' into FLINK-37126
2 parents e9eec8f + 099cdbd commit cce0527

File tree

33 files changed

+1493
-32
lines changed

33 files changed

+1493
-32
lines changed

.github/workflows/ci.yml

Lines changed: 63 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,22 +16,84 @@
1616
# limitations under the License.
1717
################################################################################
1818

19-
2019
# We need to specify repo related information here since Apache INFRA doesn't differentiate
2120
# between several workflows with the same names while preparing a report for GHA usage
2221
# https://infra-reports.apache.org/#ghactions
2322
name: Flink Kubernetes Operator CI
23+
2424
on:
2525
push:
2626
branches:
2727
- main
2828
- release-*
2929
pull_request:
30+
3031
concurrency:
3132
group: ${{ github.workflow }}-${{ github.ref_name }}
3233
cancel-in-progress: true
3334

35+
env:
36+
HELM_CHART_DIR: helm
37+
FLINK_OPERATOR_CHART: flink-kubernetes-operator
38+
3439
jobs:
40+
helm_lint_test:
41+
runs-on: ubuntu-latest
42+
43+
name: Helm Lint Test
44+
45+
steps:
46+
- name: Determine branch name
47+
id: get_branch
48+
run: |
49+
BRANCH=""
50+
if [ "${{ github.event_name }}" == "push" ]; then
51+
BRANCH=${{ github.ref_name }}
52+
elif [ "${{ github.event_name }}" == "pull_request" ]; then
53+
BRANCH=${{ github.base_ref }}
54+
fi
55+
echo "BRANCH=$BRANCH" >> "$GITHUB_OUTPUT"
56+
57+
- name: Checkout source code
58+
uses: actions/checkout@v4
59+
with:
60+
fetch-depth: 0
61+
62+
- name: Set up Helm
63+
uses: azure/setup-helm@1a275c3b69536ee54be43f2070a358922e12c8d4
64+
with:
65+
version: v3.17.3
66+
67+
- name: Install Helm unittest plugin
68+
run: helm plugin install https://github.com/helm-unittest/helm-unittest.git --version 0.8.1
69+
70+
- name: Run Helm unittest
71+
run: helm unittest ${{ env.HELM_CHART_DIR }}/${{ env.FLINK_OPERATOR_CHART }} --file "tests/**/*_test.yaml" --strict --debug
72+
73+
- name: Set up chart-testing
74+
uses: helm/chart-testing-action@0d28d3144d3a25ea2cc349d6e59901c4ff469b3b
75+
76+
- name: Run chart-testing (list-changed)
77+
id: list-changed
78+
env:
79+
BRANCH: ${{ steps.get_branch.outputs.BRANCH }}
80+
run: |
81+
changed=$(ct list-changed --target-branch $BRANCH --chart-dirs ${{ env.HELM_CHART_DIR }})
82+
if [[ -n "$changed" ]]; then
83+
echo "changed=true" >> "$GITHUB_OUTPUT"
84+
fi
85+
86+
- name: Run Helm lint
87+
if: steps.list-changed.outputs.changed == 'true'
88+
run: |
89+
helm lint ${{ env.HELM_CHART_DIR }}/${{ env.FLINK_OPERATOR_CHART }} --strict --debug
90+
91+
- name: Run chart-testing (lint)
92+
if: steps.list-changed.outputs.changed == 'true'
93+
env:
94+
BRANCH: ${{ steps.get_branch.outputs.BRANCH }}
95+
run: ct lint --target-branch $BRANCH --chart-dirs ${{ env.HELM_CHART_DIR }} --check-version-increment=false --validate-maintainers=false
96+
3597
test_ci:
3698
runs-on: ubuntu-latest
3799
name: maven build
@@ -53,9 +115,6 @@ jobs:
53115
echo "Please generate the java doc via 'mvn clean install -DskipTests -Pgenerate-docs' again"
54116
exit 1
55117
fi
56-
- name: Validate helm chart linting
57-
run: |
58-
helm lint helm/flink-kubernetes-operator
59118
- name: Tests in flink-kubernetes-operator
60119
run: |
61120
cd flink-kubernetes-operator

docs/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ should copy into their pom.xml file. It will render out to:
102102

103103
```xml
104104
<dependency>
105-
<groupdId>org.apache.flink</groupId>
105+
<groupId>org.apache.flink</groupId>
106106
<artifactId>flink-streaming-java</artifactId>
107107
<version><!-- current flink version --></version>
108108
</dependency>

docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,12 @@
170170
<td>Integer</td>
171171
<td>The port the health probe will use to expose the status.</td>
172172
</tr>
173+
<tr>
174+
<td><h5>kubernetes.operator.ingress.manage</h5></td>
175+
<td style="word-wrap: break-word;">true</td>
176+
<td>Boolean</td>
177+
<td>Feature flag if operator will manage the Ingress resource. If false, no InformerEventSource will be registered for Ingress, and Ingress won't be created.</td>
178+
</tr>
173179
<tr>
174180
<td><h5>kubernetes.operator.jm-deployment-recovery.enabled</h5></td>
175181
<td style="word-wrap: break-word;">true</td>

docs/layouts/shortcodes/generated/system_advanced_section.html

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,12 @@
6868
<td>Integer</td>
6969
<td>The port the health probe will use to expose the status.</td>
7070
</tr>
71+
<tr>
72+
<td><h5>kubernetes.operator.ingress.manage</h5></td>
73+
<td style="word-wrap: break-word;">true</td>
74+
<td>Boolean</td>
75+
<td>Feature flag if operator will manage the Ingress resource. If false, no InformerEventSource will be registered for Ingress, and Ingress won't be created.</td>
76+
</tr>
7177
<tr>
7278
<td><h5>kubernetes.operator.label.selector</h5></td>
7379
<td style="word-wrap: break-word;">(none)</td>

examples/autoscaling/README.md

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
<!--
2+
Licensed to the Apache Software Foundation (ASF) under one
3+
or more contributor license agreements. See the NOTICE file
4+
distributed with this work for additional information
5+
regarding copyright ownership. The ASF licenses this file
6+
to you under the Apache License, Version 2.0 (the
7+
"License"); you may not use this file except in compliance
8+
with the License. You may obtain a copy of the License at
9+
10+
http://www.apache.org/licenses/LICENSE-2.0
11+
12+
Unless required by applicable law or agreed to in writing,
13+
software distributed under the License is distributed on an
14+
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
KIND, either express or implied. See the License for the
16+
specific language governing permissions and limitations
17+
under the License.
18+
-->
19+
20+
# Flink Kubernetes Autoscaling Example
21+
22+
## Overview
23+
24+
This example contains two Flink applications showcasing the Flink Autoscaler capabilities:
25+
26+
- `AutoscalingExample.java` with its accompanying `autoscaling.yaml` containing the `FlinkDeployment` definition
27+
- `LoadSimulationPipeline.java` with its accompanying `autoscaling-dynamic.yaml` containing the `FlinkDeployment` definition
28+
29+
Both applications are packaged into a single fat jar, which is then included in a Docker image
30+
built from the provided `Dockerfile`.
31+
32+
### AutoscalingExample
33+
34+
This application contains a source that emits long values, a map function with an emulated
35+
processing load, and a discard sink. The processing load per record can be configured by
36+
adjusting the job argument in `autoscaling.yaml`:
37+
38+
```
39+
job:
40+
args: ["10"]
41+
```
42+
43+
The argument value specifies how many synthetic iterations are performed for each record.
44+
45+
### LoadSimulationPipeline
46+
47+
This application simulates fluctuating load that could be configured via the job arguments in
48+
`autoscaling-dynamic.yaml`:
49+
50+
```
51+
job:
52+
args:
53+
- --maxLoadPerTask "1;2;4;8;16;\n16;8;4;2;1\n8;4;16;1;2" --repeatsAfterMinutes "60"
54+
```
55+
56+
Refer to `LoadSimulationPipeline.java`'s JavaDoc and comments for the details concerning the argument
57+
notation and simulated load pattern.
58+
59+
## Usage
60+
61+
The following steps assume that you have the Flink Kubernetes Operator installed and running in
62+
your environment. If not, please follow the Flink Kubernetes Operator [quickstart](https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/try-flink-kubernetes-operator/quick-start/) to start with.
63+
64+
**Step 1**: Build Autoscaling example maven project
65+
```bash
66+
cd examples/autoscaling
67+
mvn clean package
68+
```
69+
70+
**Step 2**: Build docker image
71+
```bash
72+
# Uncomment when building for local minikube env:
73+
# eval $(minikube docker-env)
74+
75+
DOCKER_BUILDKIT=1 docker build . -t autoscaling-example:latest
76+
```
77+
78+
This step will create an image based on an official Flink base image including the Autoscaling application jar.
79+
80+
**Step 3**: Only for AutoscalingExample: Mount volume to keep savepoints and checkpoints
81+
82+
```bash
83+
# Assuming minikube is used for local testing or alternatively ensure any other k8s cluster setup with access to a persistent volume
84+
mkdir /tmp/flink # or any other local directory
85+
minikube mount --uid=9999 --gid=9999 /tmp/flink:/tmp/flink
86+
```
87+
88+
**Step 4**: Submit FlinkDeployment Yaml
89+
90+
For *AutoscalingExample*:
91+
92+
```bash
93+
kubectl apply -f autoscaling.yaml
94+
```
95+
96+
or for *LoadSimulationPipeline*:
97+
98+
```bash
99+
kubectl apply -f autoscaling-dynamic.yaml
100+
```

examples/kubernetes-client-examples/README.MD

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ This is an end-to-end example of running Flink Job via code using the Flink Kube
2525

2626
*What's in this example?*
2727

28-
1. Sample code for submitting an application similar to examples/basic.yaml programatically.
28+
1. Sample code for submitting an application similar to examples/basic.yaml programmatically.
2929

3030
## How does it work?
3131

flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/ConfigObjectNode.java

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,15 @@
2020
import org.apache.flink.configuration.Configuration;
2121

2222
import com.fasterxml.jackson.databind.JsonNode;
23+
import com.fasterxml.jackson.databind.node.ArrayNode;
2324
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
2425
import com.fasterxml.jackson.databind.node.ObjectNode;
2526

27+
import java.util.ArrayList;
2628
import java.util.Arrays;
2729
import java.util.HashMap;
2830
import java.util.Iterator;
31+
import java.util.List;
2932
import java.util.Map;
3033

3134
/** Allows parsing configurations as YAML, and adds related utility methods. */
@@ -78,12 +81,26 @@ private static void flattenHelper(
7881
}
7982
} else if (node.isArray()) {
8083
for (int i = 0; i < node.size(); i++) {
81-
String newKey = parentKey + "[" + i + "]";
82-
flattenHelper(node.get(i), newKey, flatMap);
84+
if (node instanceof ArrayNode) {
85+
flatMap.put(parentKey, arrayNodeToSemicolonSepratedString((ArrayNode) node));
86+
} else {
87+
String newKey = parentKey + "[" + i + "]";
88+
flattenHelper(node.get(i), newKey, flatMap);
89+
}
8390
}
8491
} else {
85-
// Store values as strings
8692
flatMap.put(parentKey, node.asText());
8793
}
8894
}
95+
96+
private static String arrayNodeToSemicolonSepratedString(ArrayNode arrayNode) {
97+
if (arrayNode == null || arrayNode.isEmpty()) {
98+
return "";
99+
}
100+
List<String> stringValues = new ArrayList<>();
101+
for (JsonNode node : arrayNode) {
102+
stringValues.add(node.asText());
103+
}
104+
return String.join(";", stringValues);
105+
}
89106
}
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.kubernetes.operator.api.spec;
19+
20+
import com.fasterxml.jackson.core.JsonProcessingException;
21+
import com.fasterxml.jackson.databind.ObjectMapper;
22+
import com.fasterxml.jackson.databind.module.SimpleModule;
23+
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
24+
import org.junit.jupiter.api.BeforeEach;
25+
import org.junit.jupiter.api.Test;
26+
27+
import java.util.Map;
28+
29+
import static org.assertj.core.api.Assertions.assertThat;
30+
31+
class ConfigObjectNodeTest {
32+
33+
private final ObjectMapper objectMapper = new ObjectMapper(new YAMLFactory());
34+
35+
@BeforeEach
36+
void setup() {
37+
SimpleModule module = new SimpleModule();
38+
module.addDeserializer(ConfigObjectNode.class, new ConfigObjectNodeDeserializer());
39+
objectMapper.registerModule(module);
40+
}
41+
42+
@Test
43+
void deserializeArray() throws JsonProcessingException {
44+
var value = "a: 1\n" + "b:\n" + " c: [1,2]";
45+
46+
var flatMap = objectMapper.readValue(value, ConfigObjectNode.class).asFlatMap();
47+
48+
assertThat(flatMap).containsExactlyInAnyOrderEntriesOf(Map.of("a", "1", "b.c", "1;2"));
49+
}
50+
51+
@Test
52+
void deserializeStringArray() throws JsonProcessingException {
53+
var value = "a: 1\n" + "b:\n" + " c: [\"1\",\"2\"]";
54+
55+
var flatMap = objectMapper.readValue(value, ConfigObjectNode.class).asFlatMap();
56+
57+
assertThat(flatMap).containsExactlyInAnyOrderEntriesOf(Map.of("a", "1", "b.c", "1;2"));
58+
}
59+
}

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,8 @@ void registerDeploymentController() {
199199
observerFactory,
200200
statusRecorder,
201201
eventRecorder,
202-
canaryResourceManager);
202+
canaryResourceManager,
203+
configManager);
203204
registeredControllers.add(operator.register(controller, this::overrideControllerConfigs));
204205
}
205206

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ public class FlinkOperatorConfiguration {
7979
Duration slowRequestThreshold;
8080
int reportedExceptionEventsMaxCount;
8181
int reportedExceptionEventsMaxStackTraceLength;
82+
boolean manageIngress;
8283

8384
public static FlinkOperatorConfiguration fromConfiguration(Configuration operatorConfig) {
8485
Duration reconcileInterval =
@@ -203,6 +204,9 @@ public static FlinkOperatorConfiguration fromConfiguration(Configuration operato
203204
operatorConfig.get(
204205
KubernetesOperatorConfigOptions.OPERATOR_EVENT_EXCEPTION_STACKTRACE_LINES);
205206

207+
boolean manageIngress =
208+
operatorConfig.get(KubernetesOperatorConfigOptions.OPERATOR_MANAGE_INGRESS);
209+
206210
return new FlinkOperatorConfiguration(
207211
reconcileInterval,
208212
reconcilerMaxParallelism,
@@ -234,7 +238,8 @@ public static FlinkOperatorConfiguration fromConfiguration(Configuration operato
234238
snapshotResourcesEnabled,
235239
slowRequestThreshold,
236240
reportedExceptionEventsMaxCount,
237-
reportedExceptionEventsMaxStackTraceLength);
241+
reportedExceptionEventsMaxStackTraceLength,
242+
manageIngress);
238243
}
239244

240245
private static GenericRetry getRetryConfig(Configuration conf) {

0 commit comments

Comments
 (0)