From f34482d722abe26fa7e06eaa25c574ea58422d49 Mon Sep 17 00:00:00 2001 From: Luca Castelli Date: Thu, 13 Mar 2025 21:39:43 -0400 Subject: [PATCH 1/3] [FLINK-37370] [Observer] Fix exception caught when handling checkpointing not enabled for batch jobs and add batch e2e test --- .github/workflows/ci.yml | 1 + e2e-tests/data/flinkdep-batch-cr.yaml | 109 ++++++++++++++++++ e2e-tests/test_batch_job.sh | 60 ++++++++++ .../service/AbstractFlinkService.java | 3 +- .../operator/TestingFlinkService.java | 6 +- 5 files changed, 176 insertions(+), 3 deletions(-) create mode 100644 e2e-tests/data/flinkdep-batch-cr.yaml create mode 100755 e2e-tests/test_batch_job.sh diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 9618a2b3b8..7545b63f6b 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -161,6 +161,7 @@ jobs: - test_autoscaler.sh - test_flink_operator_ha.sh - test_snapshot.sh + - test_batch_job.sh exclude: - flink-version: v1_16 test: test_autoscaler.sh diff --git a/e2e-tests/data/flinkdep-batch-cr.yaml b/e2e-tests/data/flinkdep-batch-cr.yaml new file mode 100644 index 0000000000..e14f018c4b --- /dev/null +++ b/e2e-tests/data/flinkdep-batch-cr.yaml @@ -0,0 +1,109 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +apiVersion: flink.apache.org/v1beta1 +kind: FlinkDeployment +metadata: + namespace: default + name: flink-example-wordcount-batch +spec: + image: flink:1.20 + flinkVersion: v1_20 + ingress: + template: "/{{namespace}}/{{name}}(/|$)(.*)" + className: "nginx" + annotations: + nginx.ingress.kubernetes.io/rewrite-target: "/$2" + flinkConfiguration: + taskmanager.numberOfTaskSlots: "2" + high-availability.type: kubernetes + high-availability.storageDir: file:///opt/flink/volume/flink-ha + state.checkpoints.dir: file:///opt/flink/volume/flink-cp + state.savepoints.dir: file:///opt/flink/volume/flink-sp + kubernetes.operator.snapshot.resource.enabled: "false" + serviceAccount: flink + podTemplate: + spec: + initContainers: + - name: artifacts-fetcher + image: busybox:1.35.0 + imagePullPolicy: IfNotPresent + # Use wget or other tools to get user jars from remote storage + command: [ 'wget', 'STREAMING_EXAMPLES_JAR_URL', '-O', '/flink-artifact/myjob.jar' ] + volumeMounts: + - mountPath: /flink-artifact + name: flink-artifact + containers: + # Do not change the main container name + - name: flink-main-container + resources: + requests: + ephemeral-storage: 2048Mi + limits: + ephemeral-storage: 2048Mi + volumeMounts: + - mountPath: /opt/flink/usrlib + name: flink-artifact + - mountPath: /opt/flink/volume + name: flink-volume + volumes: + - name: flink-artifact + emptyDir: { } + - name: flink-volume + persistentVolumeClaim: + claimName: flink-example-wordcount-batch + jobManager: + resource: + memory: "1024m" + cpu: 0.5 + taskManager: + resource: + memory: "1Gi" + cpu: 0.5 + job: + jarURI: local:///opt/flink/usrlib/myjob.jar + entryClass: org.apache.flink.streaming.examples.wordcount.WordCount + args: ["--execution-mode", "BATCH"] + parallelism: 2 + upgradeMode: stateless + mode: native + +--- +apiVersion: v1 +kind: PersistentVolumeClaim +metadata: + name: flink-example-wordcount-batch +spec: + accessModes: + - ReadWriteOnce + volumeMode: Filesystem + resources: + requests: + storage: 1Gi + +--- +apiVersion: networking.k8s.io/v1 +kind: IngressClass +metadata: + annotations: + ingressclass.kubernetes.io/is-default-class: "true" + labels: + app.kubernetes.io/component: controller + name: nginx +spec: + controller: k8s.io/ingress-nginx diff --git a/e2e-tests/test_batch_job.sh b/e2e-tests/test_batch_job.sh new file mode 100755 index 0000000000..eb3a665de1 --- /dev/null +++ b/e2e-tests/test_batch_job.sh @@ -0,0 +1,60 @@ +#!/usr/bin/env bash +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +# This script tests basic Flink batch job operations on Kubernetes: +# 1. Deploys a FlinkDeployment for a batch job. +# 2. Waits for the JobManager to become ready. +# 3. Verifies that the job reaches the FINISHED state (first check). +# 4. Sleeps for a configurable duration to test for state persistence. +# 5. Verifies that the job remains in the FINISHED state (second check). +# 6. Checks the operator logs for the expected job state transition message. +# 7. Checks the JobManager logs for successful application completion. +SCRIPT_DIR=$(dirname "$(readlink -f "$0")") +source "${SCRIPT_DIR}/utils.sh" + +CLUSTER_ID="flink-example-wordcount-batch" +APPLICATION_YAML="${SCRIPT_DIR}/data/flinkdep-batch-cr.yaml" +APPLICATION_IDENTIFIER="flinkdep/$CLUSTER_ID" +TIMEOUT=300 +SLEEP_DURATION=30 + +on_exit cleanup_and_exit "$APPLICATION_YAML" $TIMEOUT $CLUSTER_ID + +retry_times 5 30 "kubectl apply -f $APPLICATION_YAML" || exit 1 + +wait_for_jobmanager_running $CLUSTER_ID $TIMEOUT + +# Wait for the job to reach the FINISHED state (first check). +wait_for_status $APPLICATION_IDENTIFIER '.status.jobStatus.state' FINISHED $TIMEOUT || exit 1 + +echo "Job reached FINISHED state. Sleeping for $SLEEP_DURATION seconds..." +sleep "$SLEEP_DURATION" + +# Verify the job is *still* in the FINISHED state (second check). +wait_for_status $APPLICATION_IDENTIFIER '.status.jobStatus.state' FINISHED $TIMEOUT || exit 1 + +# Verify the job status change to FINISHED shows up in the operator logs. +operator_pod_name=$(get_operator_pod_name) +wait_for_operator_logs "$operator_pod_name" "Job status changed from .* to FINISHED" ${TIMEOUT} || exit 1 + +# Verify the job completed successfully in the job manager logs. +jm_pod_name=$(get_jm_pod_name $CLUSTER_ID) +wait_for_logs "$jm_pod_name" "Application completed SUCCESSFULLY" ${TIMEOUT} || exit 1 + +echo "Successfully ran the batch job test" \ No newline at end of file diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java index 8728bc2ca4..d88aca65be 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java @@ -142,6 +142,7 @@ import java.util.Optional; import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -546,7 +547,7 @@ public Optional getLastCheckpoint(JobID jobId, Configuration conf) { try { latestCheckpointOpt = getCheckpointInfo(jobId, conf).f0; } catch (Exception e) { - if (e instanceof RestClientException + if (e instanceof ExecutionException && e.getMessage() != null && e.getMessage().contains("Checkpointing has not been enabled")) { LOG.warn("Checkpointing not enabled for job {}", jobId, e); diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java index fec1dfa64e..c18e8f34c5 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java @@ -98,6 +98,7 @@ import java.util.Random; import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeoutException; import java.util.function.Consumer; import java.util.stream.Collectors; @@ -595,8 +596,9 @@ public Optional getLastCheckpoint(JobID jobId, Configuration conf) { Optional> getCheckpointInfo(JobID jobId, Configuration conf) throws Exception { if (throwCheckpointingDisabledError) { - throw new RestClientException( - "Checkpointing has not been enabled", HttpResponseStatus.BAD_REQUEST); + throw new ExecutionException( + new RestClientException( + "Checkpointing has not been enabled", HttpResponseStatus.BAD_REQUEST)); } if (checkpointInfo != null) { From 80763a6d45539cd28ae0d4e9320a6aef6d6b8884 Mon Sep 17 00:00:00 2001 From: Luca Castelli Date: Fri, 14 Mar 2025 08:57:21 -0400 Subject: [PATCH 2/3] Tweaks to e2e test --- .github/workflows/e2e.yaml | 2 ++ e2e-tests/data/flinkdep-batch-cr.yaml | 22 ---------------------- e2e-tests/test_batch_job.sh | 6 ++++++ 3 files changed, 8 insertions(+), 22 deletions(-) diff --git a/.github/workflows/e2e.yaml b/.github/workflows/e2e.yaml index 26f90a8895..ecfcd07fef 100644 --- a/.github/workflows/e2e.yaml +++ b/.github/workflows/e2e.yaml @@ -88,6 +88,8 @@ jobs: EXAMPLES_JAR="https://repo1.maven.org/maven2/org/apache/flink/flink-examples-streaming_2.12/1.14.4/flink-examples-streaming_2.12-1.14.4.jar" if [[ ${{ inputs.flink-version }} == v2* ]]; then EXAMPLES_JAR="https://repo1.maven.org/maven2/org/apache/flink/flink-examples-streaming/2.0-preview1/flink-examples-streaming-2.0-preview1.jar" + elif [[ "${{ inputs.test }}" == "test_batch_job.sh" ]]; then + EXAMPLES_JAR="https://repo1.maven.org/maven2/org/apache/flink/flink-examples-streaming/1.20.1/flink-examples-streaming-1.20.1.jar" fi ESCAPED_EXAMPLES_JAR=$(printf '%s\n' "$EXAMPLES_JAR" | sed -e 's/[\/&]/\\&/g') diff --git a/e2e-tests/data/flinkdep-batch-cr.yaml b/e2e-tests/data/flinkdep-batch-cr.yaml index e14f018c4b..159199ce4f 100644 --- a/e2e-tests/data/flinkdep-batch-cr.yaml +++ b/e2e-tests/data/flinkdep-batch-cr.yaml @@ -31,10 +31,6 @@ spec: nginx.ingress.kubernetes.io/rewrite-target: "/$2" flinkConfiguration: taskmanager.numberOfTaskSlots: "2" - high-availability.type: kubernetes - high-availability.storageDir: file:///opt/flink/volume/flink-ha - state.checkpoints.dir: file:///opt/flink/volume/flink-cp - state.savepoints.dir: file:///opt/flink/volume/flink-sp kubernetes.operator.snapshot.resource.enabled: "false" serviceAccount: flink podTemplate: @@ -59,14 +55,9 @@ spec: volumeMounts: - mountPath: /opt/flink/usrlib name: flink-artifact - - mountPath: /opt/flink/volume - name: flink-volume volumes: - name: flink-artifact emptyDir: { } - - name: flink-volume - persistentVolumeClaim: - claimName: flink-example-wordcount-batch jobManager: resource: memory: "1024m" @@ -83,19 +74,6 @@ spec: upgradeMode: stateless mode: native ---- -apiVersion: v1 -kind: PersistentVolumeClaim -metadata: - name: flink-example-wordcount-batch -spec: - accessModes: - - ReadWriteOnce - volumeMode: Filesystem - resources: - requests: - storage: 1Gi - --- apiVersion: networking.k8s.io/v1 kind: IngressClass diff --git a/e2e-tests/test_batch_job.sh b/e2e-tests/test_batch_job.sh index eb3a665de1..e3b838a70f 100755 --- a/e2e-tests/test_batch_job.sh +++ b/e2e-tests/test_batch_job.sh @@ -25,6 +25,7 @@ # 5. Verifies that the job remains in the FINISHED state (second check). # 6. Checks the operator logs for the expected job state transition message. # 7. Checks the JobManager logs for successful application completion. +# 8. Applies a spec change to the FlinkDeployment and verifies the job re-runs successfully. SCRIPT_DIR=$(dirname "$(readlink -f "$0")") source "${SCRIPT_DIR}/utils.sh" @@ -57,4 +58,9 @@ wait_for_operator_logs "$operator_pod_name" "Job status changed from .* to FINIS jm_pod_name=$(get_jm_pod_name $CLUSTER_ID) wait_for_logs "$jm_pod_name" "Application completed SUCCESSFULLY" ${TIMEOUT} || exit 1 +# Apply a spec change; verify the job re-runs and reaches the FINISHED state. +kubectl patch flinkdep ${CLUSTER_ID} --type merge --patch '{"spec":{"job": {"parallelism": 1 } } }' +wait_for_status $APPLICATION_IDENTIFIER '.status.jobStatus.state' RECONCILING $TIMEOUT || exit 1 +wait_for_status $APPLICATION_IDENTIFIER '.status.jobStatus.state' FINISHED $TIMEOUT || exit 1 + echo "Successfully ran the batch job test" \ No newline at end of file From 8af910d8d4ce57c1bffd70e016bedca1b1669999 Mon Sep 17 00:00:00 2001 From: Luca Castelli Date: Fri, 14 Mar 2025 09:51:04 -0400 Subject: [PATCH 3/3] exclude e2e batch tests from flink version 1.18 and older and add no-op spec change --- .github/workflows/ci.yml | 6 ++++++ e2e-tests/test_batch_job.sh | 20 ++++++++------------ 2 files changed, 14 insertions(+), 12 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 7545b63f6b..a2dd76da1b 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -173,18 +173,24 @@ jobs: test: test_flink_operator_ha.sh - flink-version: v1_16 test: test_snapshot.sh + - flink-version: v1_16 + test: test_batch_job.sh - flink-version: v1_17 test: test_dynamic_config.sh - flink-version: v1_17 test: test_flink_operator_ha.sh - flink-version: v1_17 test: test_snapshot.sh + - flink-version: v1_17 + test: test_batch_job.sh - flink-version: v1_18 test: test_dynamic_config.sh - flink-version: v1_18 test: test_flink_operator_ha.sh - flink-version: v1_18 test: test_snapshot.sh + - flink-version: v1_18 + test: test_batch_job.sh - flink-version: v1_19 test: test_snapshot.sh uses: ./.github/workflows/e2e.yaml diff --git a/e2e-tests/test_batch_job.sh b/e2e-tests/test_batch_job.sh index e3b838a70f..2cbf6a5d4d 100755 --- a/e2e-tests/test_batch_job.sh +++ b/e2e-tests/test_batch_job.sh @@ -20,12 +20,11 @@ # This script tests basic Flink batch job operations on Kubernetes: # 1. Deploys a FlinkDeployment for a batch job. # 2. Waits for the JobManager to become ready. -# 3. Verifies that the job reaches the FINISHED state (first check). -# 4. Sleeps for a configurable duration to test for state persistence. -# 5. Verifies that the job remains in the FINISHED state (second check). -# 6. Checks the operator logs for the expected job state transition message. -# 7. Checks the JobManager logs for successful application completion. -# 8. Applies a spec change to the FlinkDeployment and verifies the job re-runs successfully. +# 3. Verifies that the job reaches the FINISHED state. +# 4. Applies a no-op spec change and verifies the job remains in the FINISHED state. +# 5. Checks the operator logs for the expected job state transition message. +# 6. Checks the JobManager logs for successful application completion. +# 7. Applies a spec change and verifies the job re-runs successfully. SCRIPT_DIR=$(dirname "$(readlink -f "$0")") source "${SCRIPT_DIR}/utils.sh" @@ -33,7 +32,6 @@ CLUSTER_ID="flink-example-wordcount-batch" APPLICATION_YAML="${SCRIPT_DIR}/data/flinkdep-batch-cr.yaml" APPLICATION_IDENTIFIER="flinkdep/$CLUSTER_ID" TIMEOUT=300 -SLEEP_DURATION=30 on_exit cleanup_and_exit "$APPLICATION_YAML" $TIMEOUT $CLUSTER_ID @@ -41,13 +39,11 @@ retry_times 5 30 "kubectl apply -f $APPLICATION_YAML" || exit 1 wait_for_jobmanager_running $CLUSTER_ID $TIMEOUT -# Wait for the job to reach the FINISHED state (first check). +# Wait for the job to reach the FINISHED state. wait_for_status $APPLICATION_IDENTIFIER '.status.jobStatus.state' FINISHED $TIMEOUT || exit 1 -echo "Job reached FINISHED state. Sleeping for $SLEEP_DURATION seconds..." -sleep "$SLEEP_DURATION" - -# Verify the job is *still* in the FINISHED state (second check). +# Apply a no-op spec change; verify the job remains in the FINISHED state. +kubectl patch flinkdep ${CLUSTER_ID} --type merge --patch '{"spec":{"flinkConfiguration": {"kubernetes.operator.deployment.readiness.timeout": "6h" } } }' wait_for_status $APPLICATION_IDENTIFIER '.status.jobStatus.state' FINISHED $TIMEOUT || exit 1 # Verify the job status change to FINISHED shows up in the operator logs.