diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 9618a2b3b8..a2dd76da1b 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -161,6 +161,7 @@ jobs: - test_autoscaler.sh - test_flink_operator_ha.sh - test_snapshot.sh + - test_batch_job.sh exclude: - flink-version: v1_16 test: test_autoscaler.sh @@ -172,18 +173,24 @@ jobs: test: test_flink_operator_ha.sh - flink-version: v1_16 test: test_snapshot.sh + - flink-version: v1_16 + test: test_batch_job.sh - flink-version: v1_17 test: test_dynamic_config.sh - flink-version: v1_17 test: test_flink_operator_ha.sh - flink-version: v1_17 test: test_snapshot.sh + - flink-version: v1_17 + test: test_batch_job.sh - flink-version: v1_18 test: test_dynamic_config.sh - flink-version: v1_18 test: test_flink_operator_ha.sh - flink-version: v1_18 test: test_snapshot.sh + - flink-version: v1_18 + test: test_batch_job.sh - flink-version: v1_19 test: test_snapshot.sh uses: ./.github/workflows/e2e.yaml diff --git a/.github/workflows/e2e.yaml b/.github/workflows/e2e.yaml index 26f90a8895..ecfcd07fef 100644 --- a/.github/workflows/e2e.yaml +++ b/.github/workflows/e2e.yaml @@ -88,6 +88,8 @@ jobs: EXAMPLES_JAR="https://repo1.maven.org/maven2/org/apache/flink/flink-examples-streaming_2.12/1.14.4/flink-examples-streaming_2.12-1.14.4.jar" if [[ ${{ inputs.flink-version }} == v2* ]]; then EXAMPLES_JAR="https://repo1.maven.org/maven2/org/apache/flink/flink-examples-streaming/2.0-preview1/flink-examples-streaming-2.0-preview1.jar" + elif [[ "${{ inputs.test }}" == "test_batch_job.sh" ]]; then + EXAMPLES_JAR="https://repo1.maven.org/maven2/org/apache/flink/flink-examples-streaming/1.20.1/flink-examples-streaming-1.20.1.jar" fi ESCAPED_EXAMPLES_JAR=$(printf '%s\n' "$EXAMPLES_JAR" | sed -e 's/[\/&]/\\&/g') diff --git a/e2e-tests/data/flinkdep-batch-cr.yaml b/e2e-tests/data/flinkdep-batch-cr.yaml new file mode 100644 index 0000000000..159199ce4f --- /dev/null +++ b/e2e-tests/data/flinkdep-batch-cr.yaml @@ -0,0 +1,87 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +apiVersion: flink.apache.org/v1beta1 +kind: FlinkDeployment +metadata: + namespace: default + name: flink-example-wordcount-batch +spec: + image: flink:1.20 + flinkVersion: v1_20 + ingress: + template: "/{{namespace}}/{{name}}(/|$)(.*)" + className: "nginx" + annotations: + nginx.ingress.kubernetes.io/rewrite-target: "/$2" + flinkConfiguration: + taskmanager.numberOfTaskSlots: "2" + kubernetes.operator.snapshot.resource.enabled: "false" + serviceAccount: flink + podTemplate: + spec: + initContainers: + - name: artifacts-fetcher + image: busybox:1.35.0 + imagePullPolicy: IfNotPresent + # Use wget or other tools to get user jars from remote storage + command: [ 'wget', 'STREAMING_EXAMPLES_JAR_URL', '-O', '/flink-artifact/myjob.jar' ] + volumeMounts: + - mountPath: /flink-artifact + name: flink-artifact + containers: + # Do not change the main container name + - name: flink-main-container + resources: + requests: + ephemeral-storage: 2048Mi + limits: + ephemeral-storage: 2048Mi + volumeMounts: + - mountPath: /opt/flink/usrlib + name: flink-artifact + volumes: + - name: flink-artifact + emptyDir: { } + jobManager: + resource: + memory: "1024m" + cpu: 0.5 + taskManager: + resource: + memory: "1Gi" + cpu: 0.5 + job: + jarURI: local:///opt/flink/usrlib/myjob.jar + entryClass: org.apache.flink.streaming.examples.wordcount.WordCount + args: ["--execution-mode", "BATCH"] + parallelism: 2 + upgradeMode: stateless + mode: native + +--- +apiVersion: networking.k8s.io/v1 +kind: IngressClass +metadata: + annotations: + ingressclass.kubernetes.io/is-default-class: "true" + labels: + app.kubernetes.io/component: controller + name: nginx +spec: + controller: k8s.io/ingress-nginx diff --git a/e2e-tests/test_batch_job.sh b/e2e-tests/test_batch_job.sh new file mode 100755 index 0000000000..2cbf6a5d4d --- /dev/null +++ b/e2e-tests/test_batch_job.sh @@ -0,0 +1,62 @@ +#!/usr/bin/env bash +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +# This script tests basic Flink batch job operations on Kubernetes: +# 1. Deploys a FlinkDeployment for a batch job. +# 2. Waits for the JobManager to become ready. +# 3. Verifies that the job reaches the FINISHED state. +# 4. Applies a no-op spec change and verifies the job remains in the FINISHED state. +# 5. Checks the operator logs for the expected job state transition message. +# 6. Checks the JobManager logs for successful application completion. +# 7. Applies a spec change and verifies the job re-runs successfully. +SCRIPT_DIR=$(dirname "$(readlink -f "$0")") +source "${SCRIPT_DIR}/utils.sh" + +CLUSTER_ID="flink-example-wordcount-batch" +APPLICATION_YAML="${SCRIPT_DIR}/data/flinkdep-batch-cr.yaml" +APPLICATION_IDENTIFIER="flinkdep/$CLUSTER_ID" +TIMEOUT=300 + +on_exit cleanup_and_exit "$APPLICATION_YAML" $TIMEOUT $CLUSTER_ID + +retry_times 5 30 "kubectl apply -f $APPLICATION_YAML" || exit 1 + +wait_for_jobmanager_running $CLUSTER_ID $TIMEOUT + +# Wait for the job to reach the FINISHED state. +wait_for_status $APPLICATION_IDENTIFIER '.status.jobStatus.state' FINISHED $TIMEOUT || exit 1 + +# Apply a no-op spec change; verify the job remains in the FINISHED state. +kubectl patch flinkdep ${CLUSTER_ID} --type merge --patch '{"spec":{"flinkConfiguration": {"kubernetes.operator.deployment.readiness.timeout": "6h" } } }' +wait_for_status $APPLICATION_IDENTIFIER '.status.jobStatus.state' FINISHED $TIMEOUT || exit 1 + +# Verify the job status change to FINISHED shows up in the operator logs. +operator_pod_name=$(get_operator_pod_name) +wait_for_operator_logs "$operator_pod_name" "Job status changed from .* to FINISHED" ${TIMEOUT} || exit 1 + +# Verify the job completed successfully in the job manager logs. +jm_pod_name=$(get_jm_pod_name $CLUSTER_ID) +wait_for_logs "$jm_pod_name" "Application completed SUCCESSFULLY" ${TIMEOUT} || exit 1 + +# Apply a spec change; verify the job re-runs and reaches the FINISHED state. +kubectl patch flinkdep ${CLUSTER_ID} --type merge --patch '{"spec":{"job": {"parallelism": 1 } } }' +wait_for_status $APPLICATION_IDENTIFIER '.status.jobStatus.state' RECONCILING $TIMEOUT || exit 1 +wait_for_status $APPLICATION_IDENTIFIER '.status.jobStatus.state' FINISHED $TIMEOUT || exit 1 + +echo "Successfully ran the batch job test" \ No newline at end of file diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java index 8728bc2ca4..d88aca65be 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java @@ -142,6 +142,7 @@ import java.util.Optional; import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -546,7 +547,7 @@ public Optional getLastCheckpoint(JobID jobId, Configuration conf) { try { latestCheckpointOpt = getCheckpointInfo(jobId, conf).f0; } catch (Exception e) { - if (e instanceof RestClientException + if (e instanceof ExecutionException && e.getMessage() != null && e.getMessage().contains("Checkpointing has not been enabled")) { LOG.warn("Checkpointing not enabled for job {}", jobId, e); diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java index fec1dfa64e..c18e8f34c5 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java @@ -98,6 +98,7 @@ import java.util.Random; import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeoutException; import java.util.function.Consumer; import java.util.stream.Collectors; @@ -595,8 +596,9 @@ public Optional getLastCheckpoint(JobID jobId, Configuration conf) { Optional> getCheckpointInfo(JobID jobId, Configuration conf) throws Exception { if (throwCheckpointingDisabledError) { - throw new RestClientException( - "Checkpointing has not been enabled", HttpResponseStatus.BAD_REQUEST); + throw new ExecutionException( + new RestClientException( + "Checkpointing has not been enabled", HttpResponseStatus.BAD_REQUEST)); } if (checkpointInfo != null) {