diff --git a/.asf.yaml b/.asf.yaml
index 82c2ae19b0..42fbb1f707 100644
--- a/.asf.yaml
+++ b/.asf.yaml
@@ -21,6 +21,7 @@ github:
release-1.8: {}
release-1.9: {}
release-1.10: {}
+ release-1.11: {}
notifications:
commits: commits@flink.apache.org
diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index 9618a2b3b8..724d23d526 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -28,7 +28,7 @@ on:
- release-*
pull_request:
concurrency:
- group: ${{ github.workflow }}-${{ github.event.workflow_run.head_branch }}
+ group: ${{ github.workflow }}-${{ github.ref_name }}
cancel-in-progress: true
jobs:
@@ -161,6 +161,7 @@ jobs:
- test_autoscaler.sh
- test_flink_operator_ha.sh
- test_snapshot.sh
+ - test_batch_job.sh
exclude:
- flink-version: v1_16
test: test_autoscaler.sh
@@ -172,18 +173,24 @@ jobs:
test: test_flink_operator_ha.sh
- flink-version: v1_16
test: test_snapshot.sh
+ - flink-version: v1_16
+ test: test_batch_job.sh
- flink-version: v1_17
test: test_dynamic_config.sh
- flink-version: v1_17
test: test_flink_operator_ha.sh
- flink-version: v1_17
test: test_snapshot.sh
+ - flink-version: v1_17
+ test: test_batch_job.sh
- flink-version: v1_18
test: test_dynamic_config.sh
- flink-version: v1_18
test: test_flink_operator_ha.sh
- flink-version: v1_18
test: test_snapshot.sh
+ - flink-version: v1_18
+ test: test_batch_job.sh
- flink-version: v1_19
test: test_snapshot.sh
uses: ./.github/workflows/e2e.yaml
diff --git a/.github/workflows/docs.yaml b/.github/workflows/docs.yaml
index 936e31e8f7..1f776c97ee 100644
--- a/.github/workflows/docs.yaml
+++ b/.github/workflows/docs.yaml
@@ -28,8 +28,8 @@ jobs:
matrix:
branch:
- main
- - release-1.9
- release-1.10
+ - release-1.11
steps:
- uses: actions/checkout@v3
with:
@@ -41,8 +41,8 @@ jobs:
echo "flink_branch=${currentBranch}"
echo "flink_branch=${currentBranch}" >> ${GITHUB_ENV}
if [ "${currentBranch}" = "main" ]; then
- echo "flink_alias=release-1.11" >> ${GITHUB_ENV}
- elif [ "${currentBranch}" = "release-1.10" ]; then
+ echo "flink_alias=release-1.12" >> ${GITHUB_ENV}
+ elif [ "${currentBranch}" = "release-1.11" ]; then
echo "flink_alias=stable" >> ${GITHUB_ENV}
else
echo "flink_alias=${currentBranch}" >> ${GITHUB_ENV}
diff --git a/.github/workflows/e2e.yaml b/.github/workflows/e2e.yaml
index 26f90a8895..ecfcd07fef 100644
--- a/.github/workflows/e2e.yaml
+++ b/.github/workflows/e2e.yaml
@@ -88,6 +88,8 @@ jobs:
EXAMPLES_JAR="https://repo1.maven.org/maven2/org/apache/flink/flink-examples-streaming_2.12/1.14.4/flink-examples-streaming_2.12-1.14.4.jar"
if [[ ${{ inputs.flink-version }} == v2* ]]; then
EXAMPLES_JAR="https://repo1.maven.org/maven2/org/apache/flink/flink-examples-streaming/2.0-preview1/flink-examples-streaming-2.0-preview1.jar"
+ elif [[ "${{ inputs.test }}" == "test_batch_job.sh" ]]; then
+ EXAMPLES_JAR="https://repo1.maven.org/maven2/org/apache/flink/flink-examples-streaming/1.20.1/flink-examples-streaming-1.20.1.jar"
fi
ESCAPED_EXAMPLES_JAR=$(printf '%s\n' "$EXAMPLES_JAR" | sed -e 's/[\/&]/\\&/g')
diff --git a/Dockerfile b/Dockerfile
index 6c4891f116..b9528af4f3 100644
--- a/Dockerfile
+++ b/Dockerfile
@@ -38,7 +38,7 @@ RUN cd /app/tools/license; mkdir jars; cd jars; \
FROM eclipse-temurin:${JAVA_VERSION}-jre-jammy
ENV FLINK_HOME=/opt/flink
ENV FLINK_PLUGINS_DIR=$FLINK_HOME/plugins
-ENV OPERATOR_VERSION=1.11-SNAPSHOT
+ENV OPERATOR_VERSION=1.12-SNAPSHOT
ENV OPERATOR_JAR=flink-kubernetes-operator-$OPERATOR_VERSION-shaded.jar
ENV WEBHOOK_JAR=flink-kubernetes-webhook-$OPERATOR_VERSION-shaded.jar
ENV KUBERNETES_STANDALONE_JAR=flink-kubernetes-standalone-$OPERATOR_VERSION.jar
diff --git a/e2e-tests/data/flinkdep-batch-cr.yaml b/e2e-tests/data/flinkdep-batch-cr.yaml
new file mode 100644
index 0000000000..159199ce4f
--- /dev/null
+++ b/e2e-tests/data/flinkdep-batch-cr.yaml
@@ -0,0 +1,87 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+apiVersion: flink.apache.org/v1beta1
+kind: FlinkDeployment
+metadata:
+ namespace: default
+ name: flink-example-wordcount-batch
+spec:
+ image: flink:1.20
+ flinkVersion: v1_20
+ ingress:
+ template: "/{{namespace}}/{{name}}(/|$)(.*)"
+ className: "nginx"
+ annotations:
+ nginx.ingress.kubernetes.io/rewrite-target: "/$2"
+ flinkConfiguration:
+ taskmanager.numberOfTaskSlots: "2"
+ kubernetes.operator.snapshot.resource.enabled: "false"
+ serviceAccount: flink
+ podTemplate:
+ spec:
+ initContainers:
+ - name: artifacts-fetcher
+ image: busybox:1.35.0
+ imagePullPolicy: IfNotPresent
+ # Use wget or other tools to get user jars from remote storage
+ command: [ 'wget', 'STREAMING_EXAMPLES_JAR_URL', '-O', '/flink-artifact/myjob.jar' ]
+ volumeMounts:
+ - mountPath: /flink-artifact
+ name: flink-artifact
+ containers:
+ # Do not change the main container name
+ - name: flink-main-container
+ resources:
+ requests:
+ ephemeral-storage: 2048Mi
+ limits:
+ ephemeral-storage: 2048Mi
+ volumeMounts:
+ - mountPath: /opt/flink/usrlib
+ name: flink-artifact
+ volumes:
+ - name: flink-artifact
+ emptyDir: { }
+ jobManager:
+ resource:
+ memory: "1024m"
+ cpu: 0.5
+ taskManager:
+ resource:
+ memory: "1Gi"
+ cpu: 0.5
+ job:
+ jarURI: local:///opt/flink/usrlib/myjob.jar
+ entryClass: org.apache.flink.streaming.examples.wordcount.WordCount
+ args: ["--execution-mode", "BATCH"]
+ parallelism: 2
+ upgradeMode: stateless
+ mode: native
+
+---
+apiVersion: networking.k8s.io/v1
+kind: IngressClass
+metadata:
+ annotations:
+ ingressclass.kubernetes.io/is-default-class: "true"
+ labels:
+ app.kubernetes.io/component: controller
+ name: nginx
+spec:
+ controller: k8s.io/ingress-nginx
diff --git a/e2e-tests/test_batch_job.sh b/e2e-tests/test_batch_job.sh
new file mode 100755
index 0000000000..2cbf6a5d4d
--- /dev/null
+++ b/e2e-tests/test_batch_job.sh
@@ -0,0 +1,62 @@
+#!/usr/bin/env bash
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# This script tests basic Flink batch job operations on Kubernetes:
+# 1. Deploys a FlinkDeployment for a batch job.
+# 2. Waits for the JobManager to become ready.
+# 3. Verifies that the job reaches the FINISHED state.
+# 4. Applies a no-op spec change and verifies the job remains in the FINISHED state.
+# 5. Checks the operator logs for the expected job state transition message.
+# 6. Checks the JobManager logs for successful application completion.
+# 7. Applies a spec change and verifies the job re-runs successfully.
+SCRIPT_DIR=$(dirname "$(readlink -f "$0")")
+source "${SCRIPT_DIR}/utils.sh"
+
+CLUSTER_ID="flink-example-wordcount-batch"
+APPLICATION_YAML="${SCRIPT_DIR}/data/flinkdep-batch-cr.yaml"
+APPLICATION_IDENTIFIER="flinkdep/$CLUSTER_ID"
+TIMEOUT=300
+
+on_exit cleanup_and_exit "$APPLICATION_YAML" $TIMEOUT $CLUSTER_ID
+
+retry_times 5 30 "kubectl apply -f $APPLICATION_YAML" || exit 1
+
+wait_for_jobmanager_running $CLUSTER_ID $TIMEOUT
+
+# Wait for the job to reach the FINISHED state.
+wait_for_status $APPLICATION_IDENTIFIER '.status.jobStatus.state' FINISHED $TIMEOUT || exit 1
+
+# Apply a no-op spec change; verify the job remains in the FINISHED state.
+kubectl patch flinkdep ${CLUSTER_ID} --type merge --patch '{"spec":{"flinkConfiguration": {"kubernetes.operator.deployment.readiness.timeout": "6h" } } }'
+wait_for_status $APPLICATION_IDENTIFIER '.status.jobStatus.state' FINISHED $TIMEOUT || exit 1
+
+# Verify the job status change to FINISHED shows up in the operator logs.
+operator_pod_name=$(get_operator_pod_name)
+wait_for_operator_logs "$operator_pod_name" "Job status changed from .* to FINISHED" ${TIMEOUT} || exit 1
+
+# Verify the job completed successfully in the job manager logs.
+jm_pod_name=$(get_jm_pod_name $CLUSTER_ID)
+wait_for_logs "$jm_pod_name" "Application completed SUCCESSFULLY" ${TIMEOUT} || exit 1
+
+# Apply a spec change; verify the job re-runs and reaches the FINISHED state.
+kubectl patch flinkdep ${CLUSTER_ID} --type merge --patch '{"spec":{"job": {"parallelism": 1 } } }'
+wait_for_status $APPLICATION_IDENTIFIER '.status.jobStatus.state' RECONCILING $TIMEOUT || exit 1
+wait_for_status $APPLICATION_IDENTIFIER '.status.jobStatus.state' FINISHED $TIMEOUT || exit 1
+
+echo "Successfully ran the batch job test"
\ No newline at end of file
diff --git a/examples/autoscaling/pom.xml b/examples/autoscaling/pom.xml
index b14206ee7c..993b22dd45 100644
--- a/examples/autoscaling/pom.xml
+++ b/examples/autoscaling/pom.xml
@@ -23,7 +23,7 @@ under the License.
org.apache.flink
flink-kubernetes-operator-parent
- 1.11-SNAPSHOT
+ 1.12-SNAPSHOT
../..
@@ -45,6 +45,13 @@ under the License.
${flink.version}
provided
+
+
+ org.apache.flink
+ flink-connector-datagen
+ ${flink.version}
+
+
org.apache.flink
flink-clients
diff --git a/examples/autoscaling/src/main/java/autoscaling/LoadSimulationPipeline.java b/examples/autoscaling/src/main/java/autoscaling/LoadSimulationPipeline.java
index 0ddb23023b..14865e1b91 100644
--- a/examples/autoscaling/src/main/java/autoscaling/LoadSimulationPipeline.java
+++ b/examples/autoscaling/src/main/java/autoscaling/LoadSimulationPipeline.java
@@ -18,12 +18,16 @@
package autoscaling;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy;
import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.connector.datagen.source.DataGeneratorSource;
+import org.apache.flink.connector.datagen.source.GeneratorFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
@@ -60,6 +64,11 @@ public class LoadSimulationPipeline {
private static final Logger LOG = LoggerFactory.getLogger(LoadSimulationPipeline.class);
+ // Number of impulses (records) emitted per sampling interval.
+ // This value determines how many records should be generated within each `samplingIntervalMs`
+ // period.
+ private static final int IMPULSES_PER_SAMPLING_INTERVAL = 10;
+
public static void main(String[] args) throws Exception {
var env = StreamExecutionEnvironment.getExecutionEnvironment();
env.disableOperatorChaining();
@@ -74,8 +83,39 @@ public static void main(String[] args) throws Exception {
for (String branch : maxLoadPerTask.split("\n")) {
String[] taskLoads = branch.split(";");
+ /*
+ * Creates an unbounded stream that continuously emits the constant value 42L.
+ * Flink's DataGeneratorSource with RateLimiterStrategy is used to control the emission rate.
+ *
+ * Emission Rate Logic:
+ * - The goal is to generate a fixed number of impulses per sampling interval.
+ * - `samplingIntervalMs` defines the duration of one sampling interval in milliseconds.
+ * - We define `IMPULSES_PER_SAMPLING_INTERVAL = 10`, meaning that for every sampling interval,
+ * exactly 10 impulses should be generated.
+ *
+ * To calculate the total number of records emitted per second:
+ * 1. Determine how many sampling intervals fit within one second:
+ * samplingIntervalsPerSecond = 1000 / samplingIntervalMs;
+ * 2. Multiply this by the number of impulses per interval to get the total rate:
+ * impulsesPerSecond = IMPULSES_PER_SAMPLING_INTERVAL * samplingIntervalsPerSecond;
+ *
+ * Example:
+ * - If `samplingIntervalMs = 500 ms` and `IMPULSES_PER_SAMPLING_INTERVAL = 10`:
+ * impulsesPerSecond = (1000 / 500) * 10 = 2 * 10 = 20 records per second.
+ */
DataStream stream =
- env.addSource(new ImpulseSource(samplingIntervalMs)).name("ImpulseSource");
+ env.fromSource(
+ new DataGeneratorSource<>(
+ (GeneratorFunction)
+ (index) -> 42L, // Emits constant value 42
+ Long.MAX_VALUE, // Unbounded stream
+ RateLimiterStrategy.perSecond(
+ (1000.0 / samplingIntervalMs)
+ * IMPULSES_PER_SAMPLING_INTERVAL), // Controls
+ // rate
+ Types.LONG),
+ WatermarkStrategy.noWatermarks(),
+ "ImpulseSource");
for (String load : taskLoads) {
double maxLoad = Double.parseDouble(load);
@@ -97,31 +137,6 @@ public static void main(String[] args) throws Exception {
+ ")");
}
- private static class ImpulseSource implements SourceFunction {
- private final int maxSleepTimeMs;
- volatile boolean canceled;
-
- public ImpulseSource(int samplingInterval) {
- this.maxSleepTimeMs = samplingInterval / 10;
- }
-
- @Override
- public void run(SourceContext sourceContext) throws Exception {
- while (!canceled) {
- synchronized (sourceContext.getCheckpointLock()) {
- sourceContext.collect(42L);
- }
- // Provide an impulse to keep the load simulation active
- Thread.sleep(maxSleepTimeMs);
- }
- }
-
- @Override
- public void cancel() {
- canceled = true;
- }
- }
-
private static class LoadSimulationFn extends RichFlatMapFunction {
private final double maxLoad;
diff --git a/examples/flink-beam-example/pom.xml b/examples/flink-beam-example/pom.xml
index e1379a2274..f3704c2409 100644
--- a/examples/flink-beam-example/pom.xml
+++ b/examples/flink-beam-example/pom.xml
@@ -23,7 +23,7 @@ under the License.
org.apache.flink
flink-kubernetes-operator-parent
- 1.11-SNAPSHOT
+ 1.12-SNAPSHOT
../..
diff --git a/examples/flink-sql-runner-example/pom.xml b/examples/flink-sql-runner-example/pom.xml
index d2e34d463a..eb8431e960 100644
--- a/examples/flink-sql-runner-example/pom.xml
+++ b/examples/flink-sql-runner-example/pom.xml
@@ -23,7 +23,7 @@ under the License.
org.apache.flink
flink-kubernetes-operator-parent
- 1.11-SNAPSHOT
+ 1.12-SNAPSHOT
../..
diff --git a/examples/kubernetes-client-examples/pom.xml b/examples/kubernetes-client-examples/pom.xml
index 5e99a1ee5a..c1ba5b2cf2 100644
--- a/examples/kubernetes-client-examples/pom.xml
+++ b/examples/kubernetes-client-examples/pom.xml
@@ -24,7 +24,7 @@ under the License.
org.apache.flink
flink-kubernetes-operator-parent
- 1.11-SNAPSHOT
+ 1.12-SNAPSHOT
../..
@@ -40,7 +40,7 @@ under the License.
org.apache.flink
flink-kubernetes-operator
- 1.11-SNAPSHOT
+ 1.12-SNAPSHOT
compile
diff --git a/flink-autoscaler-plugin-jdbc/pom.xml b/flink-autoscaler-plugin-jdbc/pom.xml
index 7d7d95b79d..748e233092 100644
--- a/flink-autoscaler-plugin-jdbc/pom.xml
+++ b/flink-autoscaler-plugin-jdbc/pom.xml
@@ -23,7 +23,7 @@ under the License.
flink-kubernetes-operator-parent
org.apache.flink
- 1.11-SNAPSHOT
+ 1.12-SNAPSHOT
..
diff --git a/flink-autoscaler-standalone/pom.xml b/flink-autoscaler-standalone/pom.xml
index 17fa7425a0..dc3337e3cd 100644
--- a/flink-autoscaler-standalone/pom.xml
+++ b/flink-autoscaler-standalone/pom.xml
@@ -23,7 +23,7 @@ under the License.
org.apache.flink
flink-kubernetes-operator-parent
- 1.11-SNAPSHOT
+ 1.12-SNAPSHOT
..
diff --git a/flink-autoscaler/pom.xml b/flink-autoscaler/pom.xml
index e733a64959..a7463d573b 100644
--- a/flink-autoscaler/pom.xml
+++ b/flink-autoscaler/pom.xml
@@ -23,7 +23,7 @@ under the License.
org.apache.flink
flink-kubernetes-operator-parent
- 1.11-SNAPSHOT
+ 1.12-SNAPSHOT
..
diff --git a/flink-kubernetes-docs/pom.xml b/flink-kubernetes-docs/pom.xml
index 46d37cfd64..6e44541b54 100644
--- a/flink-kubernetes-docs/pom.xml
+++ b/flink-kubernetes-docs/pom.xml
@@ -23,7 +23,7 @@ under the License.
org.apache.flink
flink-kubernetes-operator-parent
- 1.11-SNAPSHOT
+ 1.12-SNAPSHOT
..
diff --git a/flink-kubernetes-operator-api/pom.xml b/flink-kubernetes-operator-api/pom.xml
index 3dc3a27d5e..193f154bd2 100644
--- a/flink-kubernetes-operator-api/pom.xml
+++ b/flink-kubernetes-operator-api/pom.xml
@@ -23,7 +23,7 @@ under the License.
org.apache.flink
flink-kubernetes-operator-parent
- 1.11-SNAPSHOT
+ 1.12-SNAPSHOT
..
diff --git a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/FlinkBlueGreenDeployment.java b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/FlinkBlueGreenDeployment.java
new file mode 100644
index 0000000000..61451243f0
--- /dev/null
+++ b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/FlinkBlueGreenDeployment.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.api;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.kubernetes.operator.api.spec.FlinkBlueGreenDeploymentSpec;
+import org.apache.flink.kubernetes.operator.api.status.FlinkBlueGreenDeploymentStatus;
+
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import io.fabric8.kubernetes.api.model.Namespaced;
+import io.fabric8.kubernetes.client.CustomResource;
+import io.fabric8.kubernetes.model.annotation.Group;
+import io.fabric8.kubernetes.model.annotation.ShortNames;
+import io.fabric8.kubernetes.model.annotation.Version;
+
+/** Custom resource definition that represents a deployments with Blue/Green rollout capability. */
+@Experimental
+@JsonInclude(JsonInclude.Include.NON_NULL)
+@JsonDeserialize()
+@Group(CrdConstants.API_GROUP)
+@Version(CrdConstants.API_VERSION)
+@ShortNames({"flinkbgdep"})
+public class FlinkBlueGreenDeployment
+ extends CustomResource
+ implements Namespaced {}
diff --git a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/bluegreen/DeploymentType.java b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/bluegreen/DeploymentType.java
new file mode 100644
index 0000000000..87b7408a0e
--- /dev/null
+++ b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/bluegreen/DeploymentType.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.api.bluegreen;
+
+import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
+
+/** Enumeration of the two possible Flink Blue/Green deployment types. */
+public enum DeploymentType {
+ BLUE,
+ GREEN;
+
+ public static DeploymentType fromDeployment(FlinkDeployment flinkDeployment) {
+ String typeAnnotation =
+ flinkDeployment.getMetadata().getLabels().get(DeploymentType.class.getSimpleName());
+ return DeploymentType.valueOf(typeAnnotation);
+ }
+}
diff --git a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/FlinkBlueGreenDeploymentSpec.java b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/FlinkBlueGreenDeploymentSpec.java
new file mode 100644
index 0000000000..bfaf99b40e
--- /dev/null
+++ b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/FlinkBlueGreenDeploymentSpec.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.api.spec;
+
+import org.apache.flink.annotation.Experimental;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+/** Spec that describes a Flink application with blue/green deployment capabilities. */
+@Experimental
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class FlinkBlueGreenDeploymentSpec {
+
+ private FlinkDeploymentTemplateSpec template;
+}
diff --git a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/FlinkDeploymentTemplateSpec.java b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/FlinkDeploymentTemplateSpec.java
new file mode 100644
index 0000000000..4a84955bcb
--- /dev/null
+++ b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/FlinkDeploymentTemplateSpec.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.api.spec;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import io.fabric8.kubernetes.api.model.ObjectMeta;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import lombok.experimental.SuperBuilder;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+/** Template Spec that describes a Flink application managed by the blue/green controller. */
+@AllArgsConstructor
+@NoArgsConstructor
+@Data
+@SuperBuilder
+public class FlinkDeploymentTemplateSpec {
+
+ @JsonProperty("metadata")
+ private ObjectMeta metadata;
+
+ @JsonProperty("deploymentDeletionDelaySec")
+ private int deploymentDeletionDelaySec;
+
+ @JsonProperty("maxNumRetries")
+ private int maxNumRetries;
+
+ @JsonProperty("reconciliationReschedulingIntervalMs")
+ private int reconciliationReschedulingIntervalMs;
+
+ @JsonProperty("spec")
+ private FlinkDeploymentSpec spec;
+
+ @JsonIgnore
+ private Map additionalProperties = new LinkedHashMap();
+}
diff --git a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/FlinkBlueGreenDeploymentState.java b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/FlinkBlueGreenDeploymentState.java
new file mode 100644
index 0000000000..5681ee7697
--- /dev/null
+++ b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/FlinkBlueGreenDeploymentState.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.api.status;
+
+/** Enumeration of the possible states of the blue/green transition. */
+public enum FlinkBlueGreenDeploymentState {
+ ACTIVE_BLUE,
+ ACTIVE_GREEN,
+ TRANSITIONING_TO_BLUE,
+ TRANSITIONING_TO_GREEN,
+}
diff --git a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/FlinkBlueGreenDeploymentStatus.java b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/FlinkBlueGreenDeploymentStatus.java
new file mode 100644
index 0000000000..0820803056
--- /dev/null
+++ b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/FlinkBlueGreenDeploymentStatus.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.api.status;
+
+import org.apache.flink.annotation.Experimental;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import lombok.ToString;
+import lombok.experimental.SuperBuilder;
+
+/** Last observed status of the Flink Blue/Green deployment. */
+@Experimental
+@Data
+@AllArgsConstructor
+@NoArgsConstructor
+@ToString(callSuper = true)
+@SuperBuilder
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class FlinkBlueGreenDeploymentStatus {
+
+ private JobStatus jobStatus = new JobStatus();
+
+ /** The state of the blue/green transition. */
+ private FlinkBlueGreenDeploymentState blueGreenState;
+
+ /** Last reconciled (serialized) deployment spec. */
+ private String lastReconciledSpec;
+
+ /** Timestamp of last reconciliation. */
+ private Long lastReconciledTimestamp;
+
+ /** Current number of retries. */
+ private int numRetries;
+
+ /** Information about the TaskManagers for the scale subresource. */
+ private TaskManagerInfo taskManager;
+}
diff --git a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/utils/SpecUtils.java b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/utils/SpecUtils.java
index 458dd69173..4488854509 100644
--- a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/utils/SpecUtils.java
+++ b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/utils/SpecUtils.java
@@ -99,6 +99,28 @@ public static String writeSpecWithMeta(
}
}
+ public static String serializeObject(Object object, String wrapperKey) {
+ ObjectNode wrapper = objectMapper.createObjectNode();
+ wrapper.set(wrapperKey, objectMapper.valueToTree(checkNotNull(object)));
+
+ try {
+ return objectMapper.writeValueAsString(wrapper);
+ } catch (JsonProcessingException e) {
+ throw new RuntimeException(
+ "Could not serialize " + wrapperKey + ", this indicates a bug...", e);
+ }
+ }
+
+ public static T deserializeObject(String serialized, String wrapperKey, Class valueType)
+ throws JsonProcessingException {
+ try {
+ ObjectNode wrapper = (ObjectNode) objectMapper.readTree(serialized);
+ return objectMapper.treeToValue(wrapper.get(wrapperKey), valueType);
+ } catch (JsonProcessingException e) {
+ throw new RuntimeException("Could not deserialize spec, this indicates a bug...", e);
+ }
+ }
+
// We do not have access to Flink's Preconditions from here
private static T checkNotNull(T object) {
if (object == null) {
diff --git a/flink-kubernetes-operator/pom.xml b/flink-kubernetes-operator/pom.xml
index 07d6be88ae..cff4a43f9e 100644
--- a/flink-kubernetes-operator/pom.xml
+++ b/flink-kubernetes-operator/pom.xml
@@ -23,7 +23,7 @@ under the License.
org.apache.flink
flink-kubernetes-operator-parent
- 1.11-SNAPSHOT
+ 1.12-SNAPSHOT
..
@@ -184,6 +184,13 @@ under the License.
+
+ io.fabric8
+ kube-api-test-client-inject
+ ${fabric8.version}
+ test
+
+
com.squareup.okhttp3
mockwebserver
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/FlinkConfMountDecorator.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/FlinkConfMountDecorator.java
index 8683996793..4615423ba8 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/FlinkConfMountDecorator.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/FlinkConfMountDecorator.java
@@ -22,6 +22,7 @@
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ConfigurationUtils;
import org.apache.flink.configuration.DeploymentOptionsInternal;
+import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.configuration.TaskManagerOptions;
@@ -166,9 +167,32 @@ private List getClusterSideConfData(Configuration flinkConfig) {
clusterSideConfig.removeConfig(TaskManagerOptions.BIND_HOST);
clusterSideConfig.removeConfig(TaskManagerOptions.HOST);
+ validateConfigKeysForV2(clusterSideConfig);
+
return ConfigurationUtils.convertConfigToWritableLines(clusterSideConfig, false);
}
+ private void validateConfigKeysForV2(Configuration clusterSideConfig) {
+
+ // Only validate Flink 2.0 yaml configs
+ if (!useStandardYamlConfig()) {
+ return;
+ }
+
+ var keys = clusterSideConfig.keySet();
+
+ for (var key1 : keys) {
+ for (var key2 : keys) {
+ if (key2.startsWith(key1 + ".")) {
+ throw new IllegalConfigurationException(
+ String.format(
+ "Overlapping key prefixes detected (%s -> %s), please replace with Flink v2 compatible, non-deprecated keys.",
+ key1, key2));
+ }
+ }
+ }
+ }
+
@VisibleForTesting
String getFlinkConfData(List confData) throws IOException {
try (StringWriter sw = new StringWriter();
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
index 4bd2836f7a..3d293e89dc 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
@@ -30,6 +30,7 @@
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
+import org.apache.flink.kubernetes.operator.controller.FlinkBlueGreenDeploymentController;
import org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController;
import org.apache.flink.kubernetes.operator.controller.FlinkSessionJobController;
import org.apache.flink.kubernetes.operator.controller.FlinkStateSnapshotController;
@@ -242,6 +243,12 @@ void registerSnapshotController() {
registeredControllers.add(operator.register(controller, this::overrideControllerConfigs));
}
+ @VisibleForTesting
+ void registerBlueGreenController() {
+ var controller = new FlinkBlueGreenDeploymentController(ctxFactory);
+ registeredControllers.add(operator.register(controller, this::overrideControllerConfigs));
+ }
+
private void overrideControllerConfigs(ControllerConfigurationOverrider> overrider) {
var operatorConf = configManager.getOperatorConfiguration();
var watchNamespaces = operatorConf.getWatchedNamespaces();
@@ -262,6 +269,7 @@ public void run() {
registerDeploymentController();
registerSessionJobController();
registerSnapshotController();
+ registerBlueGreenController();
operator.installShutdownHook(
baseConfig.get(KubernetesOperatorConfigOptions.OPERATOR_TERMINATION_TIMEOUT));
operator.start();
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java
index 730e16847c..8a2fd2651d 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java
@@ -238,6 +238,7 @@ public FlinkOperatorConfiguration getOperatorConfiguration(
*
* @param baseConfMap The configuration map that should be searched for relevant Flink version
* prefixes.
+ * @param flinkVersion The FlinkVersion to be used
* @return A list of relevant Flink version prefixes in order of ascending Flink version.
*/
protected static List getRelevantVersionPrefixes(
@@ -381,6 +382,7 @@ private void applyConfigsFromCurrentSpec(
* Get configuration for interacting with session jobs. Similar to the observe configuration for
* FlinkDeployments.
*
+ * @param name The name of the job
* @param deployment FlinkDeployment for the session cluster
* @param sessionJobSpec Session job spec
* @return Session job config
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkBlueGreenDeploymentController.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkBlueGreenDeploymentController.java
new file mode 100644
index 0000000000..5717a75cca
--- /dev/null
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkBlueGreenDeploymentController.java
@@ -0,0 +1,609 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.controller;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.kubernetes.operator.api.FlinkBlueGreenDeployment;
+import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.api.bluegreen.DeploymentType;
+import org.apache.flink.kubernetes.operator.api.lifecycle.ResourceLifecycleState;
+import org.apache.flink.kubernetes.operator.api.spec.FlinkBlueGreenDeploymentSpec;
+import org.apache.flink.kubernetes.operator.api.status.FlinkBlueGreenDeploymentState;
+import org.apache.flink.kubernetes.operator.api.status.FlinkBlueGreenDeploymentStatus;
+import org.apache.flink.kubernetes.operator.api.status.Savepoint;
+import org.apache.flink.kubernetes.operator.api.utils.SpecUtils;
+import org.apache.flink.kubernetes.operator.service.FlinkResourceContextFactory;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableSet;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import io.fabric8.kubernetes.api.model.Event;
+import io.fabric8.kubernetes.api.model.ObjectMeta;
+import io.fabric8.kubernetes.api.model.OwnerReference;
+import io.fabric8.kubernetes.api.model.StatusDetails;
+import io.fabric8.kubernetes.client.dsl.PodResource;
+import io.fabric8.kubernetes.client.dsl.Resource;
+import io.javaoperatorsdk.operator.api.config.informer.InformerConfiguration;
+import io.javaoperatorsdk.operator.api.reconciler.Context;
+import io.javaoperatorsdk.operator.api.reconciler.ControllerConfiguration;
+import io.javaoperatorsdk.operator.api.reconciler.EventSourceContext;
+import io.javaoperatorsdk.operator.api.reconciler.EventSourceInitializer;
+import io.javaoperatorsdk.operator.api.reconciler.Reconciler;
+import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
+import io.javaoperatorsdk.operator.processing.event.source.EventSource;
+import io.javaoperatorsdk.operator.processing.event.source.informer.InformerEventSource;
+import io.javaoperatorsdk.operator.processing.event.source.informer.Mappers;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Instant;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/** Controller that runs the main reconcile loop for Flink Blue/Green deployments. */
+@ControllerConfiguration
+public class FlinkBlueGreenDeploymentController
+ implements Reconciler,
+ EventSourceInitializer {
+
+ private static final Logger LOG = LoggerFactory.getLogger(FlinkDeploymentController.class);
+ private static final int DEFAULT_MAX_NUM_RETRIES = 5;
+ private static final int DEFAULT_RECONCILIATION_RESCHEDULING_INTERVAL_MS = 15000;
+
+ private final FlinkResourceContextFactory ctxFactory;
+
+ public FlinkBlueGreenDeploymentController(FlinkResourceContextFactory ctxFactory) {
+ this.ctxFactory = ctxFactory;
+ }
+
+ @Override
+ public Map prepareEventSources(
+ EventSourceContext eventSourceContext) {
+ InformerConfiguration flinkDeploymentInformerConfig =
+ InformerConfiguration.from(FlinkDeployment.class, eventSourceContext)
+ .withSecondaryToPrimaryMapper(Mappers.fromOwnerReference())
+ .withNamespacesInheritedFromController(eventSourceContext)
+ .followNamespaceChanges(true)
+ .build();
+
+ return EventSourceInitializer.nameEventSources(
+ new InformerEventSource<>(flinkDeploymentInformerConfig, eventSourceContext));
+ }
+
+ @Override
+ public UpdateControl reconcile(
+ FlinkBlueGreenDeployment flinkBlueGreenDeployment,
+ Context josdkContext)
+ throws Exception {
+
+ FlinkBlueGreenDeploymentStatus deploymentStatus = flinkBlueGreenDeployment.getStatus();
+
+ if (deploymentStatus == null) {
+ deploymentStatus = new FlinkBlueGreenDeploymentStatus();
+ deploymentStatus.setLastReconciledSpec(
+ SpecUtils.serializeObject(flinkBlueGreenDeployment.getSpec(), "spec"));
+ return initiateDeployment(
+ flinkBlueGreenDeployment,
+ deploymentStatus,
+ DeploymentType.BLUE,
+ FlinkBlueGreenDeploymentState.TRANSITIONING_TO_BLUE,
+ null,
+ josdkContext,
+ true);
+ } else {
+ FlinkBlueGreenDeployments deployments =
+ FlinkBlueGreenDeployments.fromSecondaryResources(josdkContext);
+
+ // TODO: if a new deployment request comes while in the middle of a transition it's
+ // currently ignored, but the new spec remains changed, should we roll it back?
+ // TODO: if we choose to leave a previously failed deployment 'running' for debug
+ // purposes,
+ // we should flag it somehow as 'ROLLED_BACK' to signal that it can be overriden by a
+ // new deployment attempt.
+ switch (deploymentStatus.getBlueGreenState()) {
+ case ACTIVE_BLUE:
+ return checkAndInitiateDeployment(
+ flinkBlueGreenDeployment,
+ deployments,
+ deploymentStatus,
+ DeploymentType.BLUE,
+ josdkContext);
+ case ACTIVE_GREEN:
+ return checkAndInitiateDeployment(
+ flinkBlueGreenDeployment,
+ deployments,
+ deploymentStatus,
+ DeploymentType.GREEN,
+ josdkContext);
+ case TRANSITIONING_TO_BLUE:
+ return monitorTransition(
+ flinkBlueGreenDeployment,
+ deployments,
+ deploymentStatus,
+ DeploymentType.GREEN,
+ josdkContext);
+ case TRANSITIONING_TO_GREEN:
+ return monitorTransition(
+ flinkBlueGreenDeployment,
+ deployments,
+ deploymentStatus,
+ DeploymentType.BLUE,
+ josdkContext);
+ default:
+ return UpdateControl.noUpdate();
+ }
+ }
+ }
+
+ private UpdateControl monitorTransition(
+ FlinkBlueGreenDeployment bgDeployment,
+ FlinkBlueGreenDeployments deployments,
+ FlinkBlueGreenDeploymentStatus deploymentStatus,
+ DeploymentType currentDeploymentType,
+ Context josdkContext) {
+
+ var nextState = FlinkBlueGreenDeploymentState.ACTIVE_BLUE;
+ FlinkDeployment currentDeployment;
+ FlinkDeployment nextDeployment;
+
+ if (DeploymentType.BLUE == currentDeploymentType) {
+ nextState = FlinkBlueGreenDeploymentState.ACTIVE_GREEN;
+ currentDeployment = deployments.getFlinkDeploymentBlue();
+ nextDeployment = deployments.getFlinkDeploymentGreen();
+ } else {
+ currentDeployment = deployments.getFlinkDeploymentGreen();
+ nextDeployment = deployments.getFlinkDeploymentBlue();
+ }
+
+ Preconditions.checkNotNull(
+ nextDeployment,
+ "Target Dependent Deployment resource not found. Blue/Green deployment name: "
+ + bgDeployment.getMetadata().getName()
+ + ", current deployment type: "
+ + currentDeploymentType);
+
+ if (isDeploymentReady(nextDeployment, josdkContext, deploymentStatus)) {
+ return deleteAndFinalize(
+ bgDeployment,
+ deploymentStatus,
+ currentDeploymentType,
+ josdkContext,
+ currentDeployment,
+ nextState);
+ } else {
+ // This phase requires rescheduling the reconciliation because the pod initialization
+ // could get stuck
+ // (e.g. waiting for resources)
+ // TODO: figure out the course of action for error/failure cases
+
+ int maxNumRetries = bgDeployment.getSpec().getTemplate().getMaxNumRetries();
+ if (maxNumRetries <= 0) {
+ maxNumRetries = DEFAULT_MAX_NUM_RETRIES;
+ }
+
+ if (deploymentStatus.getNumRetries() >= maxNumRetries) {
+ // ABORT
+ // Suspend the nextDeployment (FlinkDeployment)
+ nextDeployment.getStatus().getJobStatus().setState(JobStatus.SUSPENDED);
+ josdkContext.getClient().resource(nextDeployment).update();
+
+ // We indicate this Blue/Green deployment is no longer Transitioning
+ // and rollback the state value
+ deploymentStatus.setBlueGreenState(
+ nextState == FlinkBlueGreenDeploymentState.ACTIVE_BLUE
+ ? FlinkBlueGreenDeploymentState.ACTIVE_GREEN
+ : FlinkBlueGreenDeploymentState.ACTIVE_BLUE);
+
+ // If the current running FlinkDeployment is not in RUNNING/STABLE,
+ // we flag this Blue/Green as FAILING
+ return patchStatusUpdateControl(
+ bgDeployment, deploymentStatus, null, JobStatus.FAILING, false);
+ } else {
+ // RETRY
+ deploymentStatus.setNumRetries(deploymentStatus.getNumRetries() + 1);
+
+ LOG.info("Deployment " + nextDeployment.getMetadata().getName() + " not ready yet");
+ return patchStatusUpdateControl(bgDeployment, deploymentStatus, null, null, false)
+ .rescheduleAfter(getReconciliationReschedInterval(bgDeployment));
+ }
+ }
+ }
+
+ private static int getReconciliationReschedInterval(FlinkBlueGreenDeployment bgDeployment) {
+ int reconciliationReschedInterval =
+ bgDeployment.getSpec().getTemplate().getReconciliationReschedulingIntervalMs();
+ if (reconciliationReschedInterval <= 0) {
+ reconciliationReschedInterval = DEFAULT_RECONCILIATION_RESCHEDULING_INTERVAL_MS;
+ }
+ return reconciliationReschedInterval;
+ }
+
+ private UpdateControl deleteAndFinalize(
+ FlinkBlueGreenDeployment bgDeployment,
+ FlinkBlueGreenDeploymentStatus deploymentStatus,
+ DeploymentType currentDeploymentType,
+ Context josdkContext,
+ FlinkDeployment currentDeployment,
+ FlinkBlueGreenDeploymentState nextState) {
+
+ if (currentDeployment != null) {
+ deleteDeployment(currentDeployment, josdkContext);
+ return UpdateControl.noUpdate();
+ } else {
+ deploymentStatus.setLastReconciledSpec(
+ SpecUtils.serializeObject(bgDeployment.getSpec(), "spec"));
+
+ // TODO: Set the new child job STATUS to RUNNING too
+
+ return patchStatusUpdateControl(
+ bgDeployment, deploymentStatus, nextState, JobStatus.RUNNING, false);
+ }
+ }
+
+ private UpdateControl checkAndInitiateDeployment(
+ FlinkBlueGreenDeployment flinkBlueGreenDeployment,
+ FlinkBlueGreenDeployments deployments,
+ FlinkBlueGreenDeploymentStatus deploymentStatus,
+ DeploymentType currentDeploymentType,
+ Context josdkContext)
+ throws Exception {
+
+ if (hasSpecChanged(
+ flinkBlueGreenDeployment.getSpec(), deploymentStatus, currentDeploymentType)) {
+
+ FlinkDeployment currentFlinkDeployment =
+ DeploymentType.BLUE == currentDeploymentType
+ ? deployments.getFlinkDeploymentBlue()
+ : deployments.getFlinkDeploymentGreen();
+
+ // spec, report the error and abort
+ if (isDeploymentReady(currentFlinkDeployment, josdkContext, deploymentStatus)) {
+
+ DeploymentType nextDeploymentType = DeploymentType.BLUE;
+ FlinkBlueGreenDeploymentState nextState =
+ FlinkBlueGreenDeploymentState.TRANSITIONING_TO_BLUE;
+ FlinkResourceContext resourceContext =
+ ctxFactory.getResourceContext(currentFlinkDeployment, josdkContext);
+
+ // TODO: this operation is already done by hasSpecChanged() above, dedup later
+ String serializedSpec =
+ SpecUtils.serializeObject(flinkBlueGreenDeployment.getSpec(), "spec");
+
+ // Updating status
+ if (DeploymentType.BLUE == currentDeploymentType) {
+ nextState = FlinkBlueGreenDeploymentState.TRANSITIONING_TO_GREEN;
+ nextDeploymentType = DeploymentType.GREEN;
+ }
+
+ Savepoint lastCheckpoint = configureSavepoint(resourceContext);
+
+ return initiateDeployment(
+ flinkBlueGreenDeployment,
+ deploymentStatus,
+ nextDeploymentType,
+ nextState,
+ lastCheckpoint,
+ josdkContext,
+ false);
+ } else {
+ // If the current running FlinkDeployment is not in RUNNING/STABLE,
+ // we flag this Blue/Green as FAILING
+ if (deploymentStatus.getJobStatus().getState() != JobStatus.FAILING) {
+ return patchStatusUpdateControl(
+ flinkBlueGreenDeployment,
+ deploymentStatus,
+ null,
+ JobStatus.FAILING,
+ false);
+ }
+ }
+ }
+
+ return UpdateControl.noUpdate();
+ }
+
+ public void logPotentialWarnings(
+ FlinkDeployment flinkDeployment,
+ Context josdkContext,
+ long lastReconciliationTimestamp) {
+ // Event reason constants
+ // https://github.com/kubernetes/kubernetes/blob/master/pkg/kubelet/events/event.go
+ Set badEventPatterns =
+ ImmutableSet.of(
+ "FAIL", "EXCEPTION", "BACKOFF", "ERROR", "EVICTION", "KILL", "EXCEED");
+ Set goodPodPhases = ImmutableSet.of("PENDING", "RUNNING");
+
+ Set podPhases =
+ getDeploymentPods(josdkContext, flinkDeployment)
+ .map(p -> p.get().getStatus().getPhase().toUpperCase())
+ .collect(Collectors.toSet());
+
+ podPhases.removeAll(goodPodPhases);
+
+ if (!podPhases.isEmpty()) {
+ LOG.warn("Deployment not healthy, some Pods have the following status: " + podPhases);
+ }
+
+ List badEvents =
+ josdkContext
+ .getClient()
+ .v1()
+ .events()
+ .inNamespace(flinkDeployment.getMetadata().getNamespace())
+ .resources()
+ .map(Resource::item)
+ .filter(e -> !e.getType().equalsIgnoreCase("NORMAL"))
+ .filter(
+ e ->
+ e.getInvolvedObject()
+ .getName()
+ .contains(flinkDeployment.getMetadata().getName()))
+ .filter(
+ e ->
+ Instant.parse(e.getLastTimestamp()).toEpochMilli()
+ > lastReconciliationTimestamp)
+ .filter(
+ e ->
+ badEventPatterns.stream()
+ .anyMatch(
+ p ->
+ e.getReason()
+ .toUpperCase()
+ .contains(p)))
+ .collect(Collectors.toList());
+
+ if (!badEvents.isEmpty()) {
+ LOG.warn("Bad events detected: " + badEvents);
+ }
+ }
+
+ private static Savepoint configureSavepoint(
+ FlinkResourceContext resourceContext) throws Exception {
+ // TODO: if the user specified an initialSavepointPath, use it and skip this
+ Optional lastCheckpoint =
+ resourceContext
+ .getFlinkService()
+ .getLastCheckpoint(
+ JobID.fromHexString(
+ resourceContext
+ .getResource()
+ .getStatus()
+ .getJobStatus()
+ .getJobId()),
+ resourceContext.getObserveConfig());
+
+ // TODO 1: check the last CP age with the logic from
+ // AbstractJobReconciler.changeLastStateIfCheckpointTooOld
+ // TODO 2: if no checkpoint is available, take a savepoint? throw error?
+ if (lastCheckpoint.isEmpty()) {
+ throw new IllegalStateException(
+ "Last Checkpoint for Job "
+ + resourceContext.getResource().getMetadata().getName()
+ + " not found!");
+ }
+ return lastCheckpoint.get();
+ }
+
+ private UpdateControl initiateDeployment(
+ FlinkBlueGreenDeployment flinkBlueGreenDeployment,
+ FlinkBlueGreenDeploymentStatus deploymentStatus,
+ DeploymentType nextDeploymentType,
+ FlinkBlueGreenDeploymentState nextState,
+ Savepoint lastCheckpoint,
+ Context josdkContext,
+ boolean isFirstDeployment)
+ throws JsonProcessingException {
+
+ deploy(
+ flinkBlueGreenDeployment,
+ nextDeploymentType,
+ lastCheckpoint,
+ josdkContext,
+ isFirstDeployment);
+
+ // TODO: set child job status to JobStatus.INITIALIZING
+
+ return patchStatusUpdateControl(
+ flinkBlueGreenDeployment,
+ deploymentStatus,
+ nextState,
+ null,
+ isFirstDeployment)
+ .rescheduleAfter(getReconciliationReschedInterval(flinkBlueGreenDeployment));
+ }
+
+ private boolean isDeploymentReady(
+ FlinkDeployment deployment,
+ Context josdkContext,
+ FlinkBlueGreenDeploymentStatus deploymentStatus) {
+ if (ResourceLifecycleState.STABLE == deployment.getStatus().getLifecycleState()
+ && JobStatus.RUNNING == deployment.getStatus().getJobStatus().getState()) {
+ // TODO: verify, e.g. will pods be "pending" after the FlinkDeployment is RUNNING and
+ // STABLE?
+ int notRunningPods =
+ (int)
+ getDeploymentPods(josdkContext, deployment)
+ .filter(
+ p ->
+ !p.get()
+ .getStatus()
+ .getPhase()
+ .equalsIgnoreCase("RUNNING"))
+ .count();
+
+ if (notRunningPods > 0) {
+ LOG.warn("Waiting for " + notRunningPods + " Pods to transition to RUNNING status");
+ }
+
+ return notRunningPods == 0;
+ }
+
+ logPotentialWarnings(
+ deployment, josdkContext, deploymentStatus.getLastReconciledTimestamp());
+ return false;
+ }
+
+ private static Stream getDeploymentPods(
+ Context josdkContext, FlinkDeployment deployment) {
+ var namespace = deployment.getMetadata().getNamespace();
+ var deploymentName = deployment.getMetadata().getName();
+
+ return josdkContext
+ .getClient()
+ .pods()
+ .inNamespace(namespace)
+ .withLabel("app", deploymentName)
+ .resources();
+ }
+
+ private boolean hasSpecChanged(
+ FlinkBlueGreenDeploymentSpec newSpec,
+ FlinkBlueGreenDeploymentStatus deploymentStatus,
+ DeploymentType deploymentType) {
+
+ String lastReconciledSpec = deploymentStatus.getLastReconciledSpec();
+
+ return !lastReconciledSpec.equals(SpecUtils.serializeObject(newSpec, "spec"));
+ }
+
+ private UpdateControl patchStatusUpdateControl(
+ FlinkBlueGreenDeployment flinkBlueGreenDeployment,
+ FlinkBlueGreenDeploymentStatus deploymentStatus,
+ FlinkBlueGreenDeploymentState deploymentState,
+ JobStatus jobState,
+ boolean isFirstDeployment) {
+ if (deploymentState != null) {
+ deploymentStatus.setBlueGreenState(deploymentState);
+ }
+
+ if (jobState != null) {
+ deploymentStatus.getJobStatus().setState(jobState);
+ }
+
+ deploymentStatus.setLastReconciledTimestamp(System.currentTimeMillis());
+ flinkBlueGreenDeployment.setStatus(deploymentStatus);
+ return UpdateControl.patchStatus(flinkBlueGreenDeployment);
+ }
+
+ private void deploy(
+ FlinkBlueGreenDeployment bgDeployment,
+ DeploymentType deploymentType,
+ Savepoint lastCheckpoint,
+ Context josdkContext,
+ boolean isFirstDeployment)
+ throws JsonProcessingException {
+ ObjectMeta bgMeta = bgDeployment.getMetadata();
+
+ // Deployment
+ FlinkDeployment flinkDeployment = new FlinkDeployment();
+ flinkDeployment.setApiVersion("flink.apache.org/v1beta1");
+ flinkDeployment.setKind("FlinkDeployment");
+ FlinkBlueGreenDeploymentSpec spec = bgDeployment.getSpec();
+
+ String childDeploymentName =
+ bgMeta.getName() + "-" + deploymentType.toString().toLowerCase();
+
+ FlinkBlueGreenDeploymentSpec adjustedSpec =
+ adjustNameReferences(
+ spec,
+ bgMeta.getName(),
+ childDeploymentName,
+ "spec",
+ FlinkBlueGreenDeploymentSpec.class);
+
+ if (lastCheckpoint != null) {
+ String location = lastCheckpoint.getLocation().replace("file:", "");
+ LOG.info("Using checkpoint: " + location);
+ adjustedSpec.getTemplate().getSpec().getJob().setInitialSavepointPath(location);
+ }
+
+ flinkDeployment.setSpec(adjustedSpec.getTemplate().getSpec());
+
+ // Deployment metadata
+ ObjectMeta flinkDeploymentMeta = getDependentObjectMeta(bgDeployment);
+ flinkDeploymentMeta.setName(childDeploymentName);
+ flinkDeploymentMeta.setLabels(
+ Map.of(deploymentType.getClass().getSimpleName(), deploymentType.toString()));
+ flinkDeployment.setMetadata(flinkDeploymentMeta);
+
+ // Deploy
+ josdkContext.getClient().resource(flinkDeployment).createOrReplace();
+ }
+
+ private static void deleteDeployment(
+ FlinkDeployment currentDeployment, Context josdkContext) {
+ // TODO: This gets called multiple times, check to see if it's already in a TERMINATING
+ // state
+ // (or only execute if RUNNING)
+ List deletedStatus =
+ josdkContext
+ .getClient()
+ .resources(FlinkDeployment.class)
+ .inNamespace(currentDeployment.getMetadata().getNamespace())
+ .withName(currentDeployment.getMetadata().getName())
+ .delete();
+
+ boolean deleted =
+ deletedStatus.size() == 1
+ && deletedStatus.get(0).getKind().equals("FlinkDeployment");
+ if (!deleted) {
+ LOG.info("Deployment not deleted, will retry");
+ } else {
+ LOG.info("Deployment deleted!");
+ }
+ }
+
+ private ObjectMeta getDependentObjectMeta(FlinkBlueGreenDeployment bgDeployment) {
+ ObjectMeta bgMeta = bgDeployment.getMetadata();
+ ObjectMeta objectMeta = new ObjectMeta();
+ objectMeta.setNamespace(bgMeta.getNamespace());
+ objectMeta.setOwnerReferences(
+ List.of(
+ new OwnerReference(
+ bgDeployment.getApiVersion(),
+ true,
+ false,
+ bgDeployment.getKind(),
+ bgMeta.getName(),
+ bgMeta.getUid())));
+ return objectMeta;
+ }
+
+ private static T adjustNameReferences(
+ T spec,
+ String deploymentName,
+ String childDeploymentName,
+ String wrapperKey,
+ Class valueType)
+ throws JsonProcessingException {
+ String serializedSpec = SpecUtils.serializeObject(spec, wrapperKey);
+ String replacedSerializedSpec = serializedSpec.replace(deploymentName, childDeploymentName);
+ return SpecUtils.deserializeObject(replacedSerializedSpec, wrapperKey, valueType);
+ }
+
+ public static void logAndThrow(String message) {
+ LOG.error(message);
+ throw new RuntimeException(message);
+ }
+}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkBlueGreenDeployments.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkBlueGreenDeployments.java
new file mode 100644
index 0000000000..69e9f084b7
--- /dev/null
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkBlueGreenDeployments.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.controller;
+
+import org.apache.flink.kubernetes.operator.api.FlinkBlueGreenDeployment;
+import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.api.bluegreen.DeploymentType;
+
+import io.javaoperatorsdk.operator.api.reconciler.Context;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.util.Set;
+
+/** Utility to handle A/B deployments. */
+@Data
+@NoArgsConstructor
+class FlinkBlueGreenDeployments {
+ private FlinkDeployment flinkDeploymentBlue;
+ private FlinkDeployment flinkDeploymentGreen;
+
+ static FlinkBlueGreenDeployments fromSecondaryResources(
+ Context context) {
+ Set secondaryResources =
+ context.getSecondaryResources(FlinkDeployment.class);
+
+ if (secondaryResources.isEmpty() || secondaryResources.size() > 2) {
+ FlinkBlueGreenDeploymentController.logAndThrow(
+ "Unexpected number of dependent deployments: " + secondaryResources.size());
+ }
+
+ FlinkBlueGreenDeployments flinkBlueGreenDeployments = new FlinkBlueGreenDeployments();
+
+ for (FlinkDeployment dependentDeployment : secondaryResources) {
+ var flinkBlueGreenDeploymentType = DeploymentType.fromDeployment(dependentDeployment);
+
+ if (flinkBlueGreenDeploymentType == DeploymentType.BLUE) {
+ if (flinkBlueGreenDeployments.getFlinkDeploymentBlue() != null) {
+ FlinkBlueGreenDeploymentController.logAndThrow(
+ "Detected multiple Dependent Deployments of type BLUE");
+ }
+ flinkBlueGreenDeployments.setFlinkDeploymentBlue(dependentDeployment);
+ } else {
+ if (flinkBlueGreenDeployments.getFlinkDeploymentGreen() != null) {
+ FlinkBlueGreenDeploymentController.logAndThrow(
+ "Detected multiple Dependent Deployments of type GREEN");
+ }
+ flinkBlueGreenDeployments.setFlinkDeploymentGreen(dependentDeployment);
+ }
+ }
+
+ return flinkBlueGreenDeployments;
+ }
+}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
index ac0f7356e1..51235b8ebe 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
@@ -33,6 +33,7 @@
import org.apache.flink.kubernetes.operator.service.FlinkResourceContextFactory;
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
import org.apache.flink.kubernetes.operator.utils.EventSourceUtils;
+import org.apache.flink.kubernetes.operator.utils.ExceptionUtils;
import org.apache.flink.kubernetes.operator.utils.KubernetesClientUtils;
import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
import org.apache.flink.kubernetes.operator.utils.ValidatorUtils;
@@ -154,17 +155,15 @@ public UpdateControl reconcile(FlinkDeployment flinkApp, Contex
statusRecorder.patchAndCacheStatus(flinkApp, ctx.getKubernetesClient());
reconcilerFactory.getOrCreate(flinkApp).reconcile(ctx);
} catch (UpgradeFailureException ufe) {
- handleUpgradeFailure(ctx, ufe);
+ ReconciliationUtils.updateForReconciliationError(ctx, ufe);
+ triggerErrorEvent(ctx, ufe, ufe.getReason());
} catch (DeploymentFailedException dfe) {
- handleDeploymentFailed(ctx, dfe);
+ flinkApp.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.ERROR);
+ flinkApp.getStatus().getJobStatus().setState(JobStatus.RECONCILING);
+ ReconciliationUtils.updateForReconciliationError(ctx, dfe);
+ triggerErrorEvent(ctx, dfe, dfe.getReason());
} catch (Exception e) {
- eventRecorder.triggerEvent(
- flinkApp,
- EventRecorder.Type.Warning,
- "ClusterDeploymentException",
- e.getMessage(),
- EventRecorder.Component.JobManagerDeployment,
- josdkContext.getClient());
+ triggerErrorEvent(ctx, e, EventRecorder.Reason.Error.name());
throw new ReconciliationException(e);
}
@@ -174,32 +173,13 @@ public UpdateControl reconcile(FlinkDeployment flinkApp, Contex
ctx.getOperatorConfig(), flinkApp, previousDeployment, true);
}
- private void handleDeploymentFailed(
- FlinkResourceContext ctx, DeploymentFailedException dfe) {
- var flinkApp = ctx.getResource();
- LOG.error("Flink Deployment failed", dfe);
- flinkApp.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.ERROR);
- flinkApp.getStatus().getJobStatus().setState(JobStatus.RECONCILING);
- ReconciliationUtils.updateForReconciliationError(ctx, dfe);
+ private void triggerErrorEvent(
+ FlinkResourceContext ctx, Exception e, String reason) {
eventRecorder.triggerEvent(
- flinkApp,
- EventRecorder.Type.Warning,
- dfe.getReason(),
- dfe.getMessage(),
- EventRecorder.Component.JobManagerDeployment,
- ctx.getKubernetesClient());
- }
-
- private void handleUpgradeFailure(
- FlinkResourceContext ctx, UpgradeFailureException ufe) {
- LOG.error("Error while upgrading Flink Deployment", ufe);
- var flinkApp = ctx.getResource();
- ReconciliationUtils.updateForReconciliationError(ctx, ufe);
- eventRecorder.triggerEvent(
- flinkApp,
+ ctx.getResource(),
EventRecorder.Type.Warning,
- ufe.getReason(),
- ufe.getMessage(),
+ reason,
+ ExceptionUtils.getExceptionMessage(e),
EventRecorder.Component.JobManagerDeployment,
ctx.getKubernetesClient());
}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobController.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobController.java
index 1e818d6659..7454864fe7 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobController.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobController.java
@@ -30,6 +30,7 @@
import org.apache.flink.kubernetes.operator.service.FlinkResourceContextFactory;
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
import org.apache.flink.kubernetes.operator.utils.EventSourceUtils;
+import org.apache.flink.kubernetes.operator.utils.ExceptionUtils;
import org.apache.flink.kubernetes.operator.utils.KubernetesClientUtils;
import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
import org.apache.flink.kubernetes.operator.utils.ValidatorUtils;
@@ -120,13 +121,7 @@ public UpdateControl reconcile(
statusRecorder.patchAndCacheStatus(flinkSessionJob, ctx.getKubernetesClient());
reconciler.reconcile(ctx);
} catch (Exception e) {
- eventRecorder.triggerEvent(
- flinkSessionJob,
- EventRecorder.Type.Warning,
- "SessionJobException",
- e.getMessage(),
- EventRecorder.Component.Job,
- josdkContext.getClient());
+ triggerErrorEvent(ctx, e);
throw new ReconciliationException(e);
}
statusRecorder.patchAndCacheStatus(flinkSessionJob, ctx.getKubernetesClient());
@@ -166,6 +161,16 @@ public DeleteControl cleanup(FlinkSessionJob sessionJob, Context josdkContext) {
return deleteControl;
}
+ private void triggerErrorEvent(FlinkResourceContext> ctx, Exception e) {
+ eventRecorder.triggerEvent(
+ ctx.getResource(),
+ EventRecorder.Type.Warning,
+ EventRecorder.Reason.Error.name(),
+ ExceptionUtils.getExceptionMessage(e),
+ EventRecorder.Component.Job,
+ ctx.getKubernetesClient());
+ }
+
@Override
public ErrorStatusUpdateControl updateErrorStatus(
FlinkSessionJob sessionJob, Context context, Exception e) {
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/exception/DeploymentFailedException.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/exception/DeploymentFailedException.java
index a8f9abf0b3..9fc6143f06 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/exception/DeploymentFailedException.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/exception/DeploymentFailedException.java
@@ -17,15 +17,25 @@
package org.apache.flink.kubernetes.operator.exception;
-import io.fabric8.kubernetes.api.model.ContainerStateWaiting;
+import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableSet;
+
+import io.fabric8.kubernetes.api.model.ContainerStatus;
import io.fabric8.kubernetes.api.model.apps.DeploymentCondition;
+import java.util.Optional;
+import java.util.Set;
+
/** Exception to signal terminal deployment failure. */
public class DeploymentFailedException extends RuntimeException {
- public static final String REASON_CRASH_LOOP_BACKOFF = "CrashLoopBackOff";
- public static final String REASON_IMAGE_PULL_BACKOFF = "ImagePullBackOff";
- public static final String REASON_ERR_IMAGE_PULL = "ErrImagePull";
+ public static final Set CONTAINER_ERROR_REASONS =
+ ImmutableSet.of(
+ "CrashLoopBackOff",
+ "ImagePullBackOff",
+ "ErrImagePull",
+ "RunContainerError",
+ "CreateContainerConfigError",
+ "OOMKilled");
private static final long serialVersionUID = -1070179896083579221L;
@@ -36,11 +46,6 @@ public DeploymentFailedException(DeploymentCondition deployCondition) {
this.reason = deployCondition.getReason();
}
- public DeploymentFailedException(ContainerStateWaiting stateWaiting) {
- super(stateWaiting.getMessage());
- this.reason = stateWaiting.getReason();
- }
-
public DeploymentFailedException(String message, String reason) {
super(message);
this.reason = reason;
@@ -49,4 +54,24 @@ public DeploymentFailedException(String message, String reason) {
public String getReason() {
return reason;
}
+
+ public static DeploymentFailedException forContainerStatus(ContainerStatus status) {
+ var waiting = status.getState().getWaiting();
+ var lastState = status.getLastState();
+ String message = null;
+ if ("CrashLoopBackOff".equals(waiting.getReason())
+ && lastState != null
+ && lastState.getTerminated() != null) {
+ message =
+ Optional.ofNullable(lastState.getTerminated().getMessage())
+ .map(err -> "CrashLoop - " + err)
+ .orElse(null);
+ }
+
+ if (message == null) {
+ message = waiting.getMessage();
+ }
+ return new DeploymentFailedException(
+ String.format("[%s] %s", status.getName(), message), waiting.getReason());
+ }
}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/OperatorJosdkMetrics.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/OperatorJosdkMetrics.java
index 5350e3cd76..bb14cd986c 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/OperatorJosdkMetrics.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/OperatorJosdkMetrics.java
@@ -17,6 +17,7 @@
package org.apache.flink.kubernetes.operator.metrics;
+import org.apache.flink.kubernetes.operator.api.FlinkBlueGreenDeployment;
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
import org.apache.flink.kubernetes.operator.api.FlinkSessionJob;
import org.apache.flink.kubernetes.operator.api.FlinkStateSnapshot;
@@ -211,6 +212,8 @@ private KubernetesResourceNamespaceMetricGroup getResourceNsMg(
resourceClass = FlinkSessionJob.class;
} else if (resourceGvk.getKind().equals(FlinkStateSnapshot.class.getSimpleName())) {
resourceClass = FlinkStateSnapshot.class;
+ } else if (resourceGvk.getKind().equals(FlinkBlueGreenDeployment.class.getSimpleName())) {
+ resourceClass = FlinkBlueGreenDeployment.class;
} else {
return Optional.empty();
}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/mutator/FlinkResourceMutator.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/mutator/FlinkResourceMutator.java
index 606d243b51..47e185c1ce 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/mutator/FlinkResourceMutator.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/mutator/FlinkResourceMutator.java
@@ -31,6 +31,7 @@ public interface FlinkResourceMutator extends Plugin {
* Mutate deployment and return the mutated Object.
*
* @param deployment A Flink application or session cluster deployment.
+ * @return the mutated Flink application or session cluster deployment.
*/
FlinkDeployment mutateDeployment(FlinkDeployment deployment);
@@ -39,6 +40,7 @@ public interface FlinkResourceMutator extends Plugin {
*
* @param sessionJob the session job to be mutated.
* @param session the target session cluster of the session job to be Mutated.
+ * @return the mutated session job.
*/
FlinkSessionJob mutateSessionJob(FlinkSessionJob sessionJob, Optional session);
@@ -46,6 +48,7 @@ public interface FlinkResourceMutator extends Plugin {
* Mutate snapshot and return the mutated Object.
*
* @param stateSnapshot the snapshot to be mutated.
+ * @return the mutated snapshot.
*/
FlinkStateSnapshot mutateStateSnapshot(FlinkStateSnapshot stateSnapshot);
}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/AbstractFlinkResourceObserver.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/AbstractFlinkResourceObserver.java
index 9fd9f59687..1baf68f60a 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/AbstractFlinkResourceObserver.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/AbstractFlinkResourceObserver.java
@@ -107,6 +107,7 @@ protected boolean isResourceReadyToBeObserved(FlinkResourceContext ctx) {
* DEPLOYED state.
*
* @param ctx Context for resource.
+ * @return true if the resource was already upgraded, false otherwise.
*/
protected abstract boolean checkIfAlreadyUpgraded(FlinkResourceContext ctx);
}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java
index c94c1231b3..444d0a0ad5 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java
@@ -27,6 +27,7 @@
import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
+import org.apache.flink.kubernetes.operator.utils.ExceptionUtils;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.slf4j.Logger;
@@ -182,7 +183,7 @@ private void updateJobStatus(FlinkResourceContext ctx, JobStatusMessage clust
markSuspended(resource);
}
- setErrorIfPresent(ctx, clusterJobStatus);
+ recordJobErrorIfPresent(ctx, clusterJobStatus);
eventRecorder.triggerEvent(
resource,
EventRecorder.Type.Normal,
@@ -203,7 +204,8 @@ private static void markSuspended(AbstractFlinkResource, ?> resource) {
});
}
- private void setErrorIfPresent(FlinkResourceContext ctx, JobStatusMessage clusterJobStatus) {
+ private void recordJobErrorIfPresent(
+ FlinkResourceContext ctx, JobStatusMessage clusterJobStatus) {
if (clusterJobStatus.getJobState() == JobStatus.FAILED) {
try {
var result =
@@ -215,10 +217,14 @@ private void setErrorIfPresent(FlinkResourceContext ctx, JobStatusMessage clu
t -> {
updateFlinkResourceException(
t, ctx.getResource(), ctx.getOperatorConfig());
- LOG.error(
- "Job {} failed with error: {}",
- clusterJobStatus.getJobId(),
- t.getFullStringifiedStackTrace());
+
+ eventRecorder.triggerEvent(
+ ctx.getResource(),
+ EventRecorder.Type.Warning,
+ EventRecorder.Reason.Error,
+ EventRecorder.Component.Job,
+ ExceptionUtils.getExceptionMessage(t),
+ ctx.getKubernetesClient());
});
} catch (Exception e) {
LOG.warn("Failed to request the job result", e);
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractFlinkDeploymentObserver.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractFlinkDeploymentObserver.java
index 713e132ad2..54e1573bfb 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractFlinkDeploymentObserver.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractFlinkDeploymentObserver.java
@@ -30,15 +30,14 @@
import org.apache.flink.kubernetes.operator.observer.AbstractFlinkResourceObserver;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
+import org.apache.flink.kubernetes.operator.utils.EventUtils;
import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
-import io.fabric8.kubernetes.api.model.ContainerStateWaiting;
import io.fabric8.kubernetes.api.model.ContainerStatus;
import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.PodList;
import io.fabric8.kubernetes.api.model.apps.Deployment;
import io.fabric8.kubernetes.api.model.apps.DeploymentCondition;
-import io.fabric8.kubernetes.api.model.apps.DeploymentSpec;
import io.fabric8.kubernetes.api.model.apps.DeploymentStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -46,7 +45,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
-import java.util.Set;
+import java.util.stream.Stream;
/** Base observer for session and application clusters. */
public abstract class AbstractFlinkDeploymentObserver
@@ -117,11 +116,11 @@ protected void observeJmDeployment(FlinkResourceContext ctx) {
ctx.getJosdkContext().getSecondaryResource(Deployment.class);
if (deployment.isPresent()) {
DeploymentStatus status = deployment.get().getStatus();
- DeploymentSpec spec = deployment.get().getSpec();
if (status != null
&& status.getAvailableReplicas() != null
- && spec.getReplicas().intValue() == status.getReplicas()
- && spec.getReplicas().intValue() == status.getAvailableReplicas()
+ // One available JM is enough to run the job correctly
+ && status.getReplicas() > 0
+ && status.getAvailableReplicas() > 0
&& ctx.getFlinkService().isJobManagerPortReady(ctx.getObserveConfig())) {
// typically it takes a few seconds for the REST server to be ready
@@ -135,7 +134,7 @@ protected void observeJmDeployment(FlinkResourceContext ctx) {
try {
checkFailedCreate(status);
// checking the pod is expensive; only do it when the deployment isn't ready
- checkContainerBackoff(ctx);
+ checkContainerErrors(ctx);
} catch (DeploymentFailedException dfe) {
// throw only when not already in error status to allow for spec update
deploymentStatus.getJobStatus().setState(JobStatus.RECONCILING);
@@ -172,21 +171,28 @@ private void checkFailedCreate(DeploymentStatus status) {
}
}
- private void checkContainerBackoff(FlinkResourceContext ctx) {
+ private void checkContainerErrors(FlinkResourceContext ctx) {
PodList jmPods =
ctx.getFlinkService().getJmPodList(ctx.getResource(), ctx.getObserveConfig());
for (Pod pod : jmPods.getItems()) {
- for (ContainerStatus cs : pod.getStatus().getContainerStatuses()) {
- ContainerStateWaiting csw = cs.getState().getWaiting();
- if (csw != null
- && Set.of(
- DeploymentFailedException.REASON_CRASH_LOOP_BACKOFF,
- DeploymentFailedException.REASON_IMAGE_PULL_BACKOFF,
- DeploymentFailedException.REASON_ERR_IMAGE_PULL)
- .contains(csw.getReason())) {
- throw new DeploymentFailedException(csw);
- }
- }
+ var podStatus = pod.getStatus();
+ Stream.concat(
+ podStatus.getContainerStatuses().stream(),
+ podStatus.getInitContainerStatuses().stream())
+ .forEach(AbstractFlinkDeploymentObserver::checkContainerError);
+
+ // No obvious errors were found, check for volume mount issues
+ EventUtils.checkForVolumeMountErrors(ctx.getKubernetesClient(), pod);
+ }
+ }
+
+ private static void checkContainerError(ContainerStatus cs) {
+ if (cs.getState() == null || cs.getState().getWaiting() == null) {
+ return;
+ }
+ if (DeploymentFailedException.CONTAINER_ERROR_REASONS.contains(
+ cs.getState().getWaiting().getReason())) {
+ throw DeploymentFailedException.forContainerStatus(cs);
}
}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/SnapshotTriggerTimestampStore.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/SnapshotTriggerTimestampStore.java
index 21cad5e62b..b206840372 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/SnapshotTriggerTimestampStore.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/SnapshotTriggerTimestampStore.java
@@ -53,6 +53,7 @@ public class SnapshotTriggerTimestampStore {
* updated with this value.
*
* @param resource Flink resource
+ * @param snapshotType the snapshot type
* @param snapshotsSupplier supplies related snapshot resources
* @return instant of last trigger
*/
@@ -103,6 +104,7 @@ public Instant getLastPeriodicTriggerInstant(
* Updates the time a periodic snapshot was last triggered for this resource.
*
* @param resource Kubernetes resource
+ * @param snapshotType the snapshot type
* @param instant new timestamp
*/
public void updateLastPeriodicTriggerTimestamp(
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java
index 71ec3dee5e..023396b52a 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java
@@ -31,6 +31,7 @@
import org.apache.flink.kubernetes.operator.api.status.CommonStatus;
import org.apache.flink.kubernetes.operator.api.status.JobStatus;
import org.apache.flink.kubernetes.operator.api.status.ReconciliationState;
+import org.apache.flink.kubernetes.operator.api.status.Savepoint;
import org.apache.flink.kubernetes.operator.api.status.SavepointFormatType;
import org.apache.flink.kubernetes.operator.api.status.SnapshotTriggerType;
import org.apache.flink.kubernetes.operator.autoscaler.KubernetesJobAutoScalerContext;
@@ -398,21 +399,35 @@ protected void restoreJob(
*
* @param ctx context
* @param savepointLocation location of savepoint taken
+ * @param cancelTs Timestamp when upgrade/cancel was triggered
*/
- protected void setUpgradeSavepointPath(FlinkResourceContext> ctx, String savepointLocation) {
+ protected void setUpgradeSavepointPath(
+ FlinkResourceContext> ctx, String savepointLocation, Instant cancelTs) {
var conf = ctx.getObserveConfig();
var savepointFormatType =
- ctx.getObserveConfig()
- .get(KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_FORMAT_TYPE);
+ SavepointFormatType.valueOf(
+ conf.get(KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_FORMAT_TYPE)
+ .name());
FlinkStateSnapshotUtils.createUpgradeSnapshotResource(
conf,
ctx.getOperatorConfig(),
ctx.getKubernetesClient(),
ctx.getResource(),
- SavepointFormatType.valueOf(savepointFormatType.name()),
+ savepointFormatType,
savepointLocation);
- ctx.getResource().getStatus().getJobStatus().setUpgradeSavepointPath(savepointLocation);
+ var jobStatus = ctx.getResource().getStatus().getJobStatus();
+ jobStatus.setUpgradeSavepointPath(savepointLocation);
+
+ // Register created savepoint in the now deprecated savepoint info and history
+ var savepoint =
+ new Savepoint(
+ cancelTs.toEpochMilli(),
+ savepointLocation,
+ SnapshotTriggerType.UPGRADE,
+ savepointFormatType,
+ null);
+ jobStatus.getSavepointInfo().updateLastSavepoint(savepoint);
}
/**
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
index 71cf417958..b5486e0ec7 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
@@ -223,10 +223,12 @@ private void setJobIdIfNecessary(
@Override
protected boolean cancelJob(FlinkResourceContext ctx, SuspendMode suspendMode)
throws Exception {
+ var cancelTs = Instant.now();
var result =
ctx.getFlinkService()
.cancelJob(ctx.getResource(), suspendMode, ctx.getObserveConfig());
- result.getSavepointPath().ifPresent(location -> setUpgradeSavepointPath(ctx, location));
+ result.getSavepointPath()
+ .ifPresent(location -> setUpgradeSavepointPath(ctx, location, cancelTs));
return result.isPending();
}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconciler.java
index f1fa88859a..4932940a4f 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconciler.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconciler.java
@@ -41,6 +41,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.time.Instant;
import java.util.Optional;
/** The reconciler for the {@link FlinkSessionJob}. */
@@ -100,10 +101,12 @@ public void deploy(
@Override
protected boolean cancelJob(FlinkResourceContext ctx, SuspendMode suspendMode)
throws Exception {
+ var cancelTs = Instant.now();
var result =
ctx.getFlinkService()
.cancelSessionJob(ctx.getResource(), suspendMode, ctx.getObserveConfig());
- result.getSavepointPath().ifPresent(location -> setUpgradeSavepointPath(ctx, location));
+ result.getSavepointPath()
+ .ifPresent(location -> setUpgradeSavepointPath(ctx, location, cancelTs));
return result.isPending();
}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
index 8728bc2ca4..733ebd3ae2 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
@@ -21,6 +21,7 @@
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.autoscaler.config.AutoScalerOptions;
import org.apache.flink.autoscaler.utils.JobStatusUtils;
import org.apache.flink.client.program.rest.RestClusterClient;
import org.apache.flink.configuration.CheckpointingOptions;
@@ -142,6 +143,7 @@
import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -546,7 +548,7 @@ public Optional getLastCheckpoint(JobID jobId, Configuration conf) {
try {
latestCheckpointOpt = getCheckpointInfo(jobId, conf).f0;
} catch (Exception e) {
- if (e instanceof RestClientException
+ if (e instanceof ExecutionException
&& e.getMessage() != null
&& e.getMessage().contains("Checkpointing has not been enabled")) {
LOG.warn("Checkpointing not enabled for job {}", jobId, e);
@@ -953,7 +955,15 @@ private void deleteJar(Configuration conf, String jarId) {
}
}
- /** Wait until Deployment is removed, return remaining timeout. */
+ /**
+ * Wait until Deployment is removed, return remaining timeout.
+ *
+ * @param name name of the deployment
+ * @param deployment The deployment resource
+ * @param propagation DeletePropagation
+ * @param timeout Timeout to wait
+ * @return remaining timeout after deletion
+ */
@VisibleForTesting
protected Duration deleteDeploymentBlocking(
String name,
@@ -975,7 +985,8 @@ protected static Configuration removeOperatorConfigs(Configuration config) {
config.toMap()
.forEach(
(k, v) -> {
- if (!k.startsWith(K8S_OP_CONF_PREFIX)) {
+ if (!k.startsWith(K8S_OP_CONF_PREFIX)
+ && !k.startsWith(AutoScalerOptions.AUTOSCALER_CONF_PREFIX)) {
newConfig.setString(k, v);
}
});
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventRecorder.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventRecorder.java
index 02f73fcc17..1989de0e14 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventRecorder.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventRecorder.java
@@ -122,10 +122,16 @@ public boolean triggerEvent(
}
/**
+ * @param resource The resource
+ * @param type The type
+ * @param reason the reason
+ * @param message the message
+ * @param component the component
+ * @param messageKey the message key
+ * @param client the client
* @param interval Interval for dedupe. Null mean no dedupe.
- * @return
*/
- public boolean triggerEventWithInterval(
+ public void triggerEventWithInterval(
AbstractFlinkResource, ?> resource,
Type type,
String reason,
@@ -134,7 +140,7 @@ public boolean triggerEventWithInterval(
String messageKey,
KubernetesClient client,
@Nullable Duration interval) {
- return EventUtils.createOrUpdateEventWithInterval(
+ EventUtils.createOrUpdateEventWithInterval(
client,
resource,
type,
@@ -166,12 +172,18 @@ public boolean triggerEventOnce(
}
/**
+ * @param resource The resource
+ * @param type The type
+ * @param reason the reason
+ * @param message the message
+ * @param component the component
+ * @param messageKey the message key
+ * @param client the client
* @param interval Interval for dedupe. Null mean no dedupe.
* @param dedupePredicate Predicate for dedupe algorithm..
* @param labels Labels to store in meta data for dedupe. Do nothing if null.
- * @return
*/
- public boolean triggerEventWithLabels(
+ public void triggerEventWithLabels(
AbstractFlinkResource, ?> resource,
Type type,
String reason,
@@ -182,7 +194,7 @@ public boolean triggerEventWithLabels(
@Nullable Duration interval,
@Nullable Predicate