Skip to content

Commit 1d39e30

Browse files
e2e test
1 parent 48359a3 commit 1d39e30

File tree

5 files changed

+186
-3
lines changed

5 files changed

+186
-3
lines changed

.github/workflows/ci.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,7 @@ jobs:
161161
- test_autoscaler.sh
162162
- test_flink_operator_ha.sh
163163
- test_snapshot.sh
164+
- test_batch_job.sh
164165
exclude:
165166
- flink-version: v1_16
166167
test: test_autoscaler.sh

e2e-tests/data/batch-job-cr.yaml

Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
################################################################################
2+
# Licensed to the Apache Software Foundation (ASF) under one
3+
# or more contributor license agreements. See the NOTICE file
4+
# distributed with this work for additional information
5+
# regarding copyright ownership. The ASF licenses this file
6+
# to you under the Apache License, Version 2.0 (the
7+
# "License"); you may not use this file except in compliance
8+
# with the License. You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing, software
13+
# distributed under the License is distributed on an "AS IS" BASIS,
14+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
# See the License for the specific language governing permissions and
16+
# limitations under the License.
17+
################################################################################
18+
19+
apiVersion: flink.apache.org/v1beta1
20+
kind: FlinkDeployment
21+
metadata:
22+
namespace: default
23+
name: flink-example-wordcount-batch
24+
spec:
25+
image: flink:1.20
26+
flinkVersion: v1_20
27+
ingress:
28+
template: "/{{namespace}}/{{name}}(/|$)(.*)"
29+
className: "nginx"
30+
annotations:
31+
nginx.ingress.kubernetes.io/rewrite-target: "/$2"
32+
flinkConfiguration:
33+
taskmanager.numberOfTaskSlots: "2"
34+
high-availability.type: kubernetes
35+
high-availability.storageDir: file:///opt/flink/volume/flink-ha
36+
state.checkpoints.dir: file:///opt/flink/volume/flink-cp
37+
state.savepoints.dir: file:///opt/flink/volume/flink-sp
38+
kubernetes.operator.snapshot.resource.enabled: "false"
39+
serviceAccount: flink
40+
podTemplate:
41+
spec:
42+
initContainers:
43+
- name: artifacts-fetcher
44+
image: busybox:1.35.0
45+
imagePullPolicy: IfNotPresent
46+
# Use wget or other tools to get user jars from remote storage
47+
command: [ 'wget', 'STREAMING_EXAMPLES_JAR_URL', '-O', '/flink-artifact/myjob.jar' ]
48+
volumeMounts:
49+
- mountPath: /flink-artifact
50+
name: flink-artifact
51+
containers:
52+
# Do not change the main container name
53+
- name: flink-main-container
54+
resources:
55+
requests:
56+
ephemeral-storage: 2048Mi
57+
limits:
58+
ephemeral-storage: 2048Mi
59+
volumeMounts:
60+
- mountPath: /opt/flink/usrlib
61+
name: flink-artifact
62+
- mountPath: /opt/flink/volume
63+
name: flink-volume
64+
volumes:
65+
- name: flink-artifact
66+
emptyDir: { }
67+
- name: flink-volume
68+
persistentVolumeClaim:
69+
claimName: flink-example-wordcount-batch
70+
jobManager:
71+
resource:
72+
memory: "1024m"
73+
cpu: 0.5
74+
taskManager:
75+
resource:
76+
memory: "1Gi"
77+
cpu: 0.5
78+
job:
79+
jarURI: local:///opt/flink/usrlib/myjob.jar
80+
entryClass: org.apache.flink.streaming.examples.wordcount.WordCount
81+
parallelism: 2
82+
upgradeMode: stateless
83+
mode: native
84+
85+
---
86+
apiVersion: v1
87+
kind: PersistentVolumeClaim
88+
metadata:
89+
name: flink-example-wordcount-batch
90+
spec:
91+
accessModes:
92+
- ReadWriteOnce
93+
volumeMode: Filesystem
94+
resources:
95+
requests:
96+
storage: 1Gi
97+
98+
---
99+
apiVersion: networking.k8s.io/v1
100+
kind: IngressClass
101+
metadata:
102+
annotations:
103+
ingressclass.kubernetes.io/is-default-class: "true"
104+
labels:
105+
app.kubernetes.io/component: controller
106+
name: nginx
107+
spec:
108+
controller: k8s.io/ingress-nginx

e2e-tests/test_batch_job.sh

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
#!/usr/bin/env bash
2+
################################################################################
3+
# Licensed to the Apache Software Foundation (ASF) under one
4+
# or more contributor license agreements. See the NOTICE file
5+
# distributed with this work for additional information
6+
# regarding copyright ownership. The ASF licenses this file
7+
# to you under the Apache License, Version 2.0 (the
8+
# "License"); you may not use this file except in compliance
9+
# with the License. You may obtain a copy of the License at
10+
#
11+
# http://www.apache.org/licenses/LICENSE-2.0
12+
#
13+
# Unless required by applicable law or agreed to in writing, software
14+
# distributed under the License is distributed on an "AS IS" BASIS,
15+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
# See the License for the specific language governing permissions and
17+
# limitations under the License.
18+
################################################################################
19+
20+
# This script tests the batch job operations:
21+
# 1. Deploy a batch job (WordCount)
22+
# 2. Verify job completion
23+
# 3. Verify job results
24+
25+
SCRIPT_DIR=$(dirname "$(readlink -f "$0")")
26+
source "${SCRIPT_DIR}/utils.sh"
27+
28+
CLUSTER_ID="flink-batch-wordcount"
29+
BATCH_JOB_YAML="${SCRIPT_DIR}/data/batch-job-cr.yaml"
30+
BATCH_JOB_IDENTIFIER="flinkdep/$CLUSTER_ID"
31+
TIMEOUT=300
32+
33+
on_exit cleanup_and_exit "$BATCH_JOB_YAML" $TIMEOUT $CLUSTER_ID
34+
35+
echo "Deploying batch job..."
36+
retry_times 5 30 "kubectl apply -f $BATCH_JOB_YAML" || exit 1
37+
38+
echo "Waiting for JobManager to be running..."
39+
wait_for_jobmanager_running $CLUSTER_ID $TIMEOUT
40+
jm_pod_name=$(get_jm_pod_name $CLUSTER_ID)
41+
42+
echo "Waiting for job to start..."
43+
wait_for_logs $jm_pod_name "Job has been submitted with JobID" ${TIMEOUT} || exit 1
44+
wait_for_status $BATCH_JOB_IDENTIFIER '.status.jobManagerDeploymentStatus' READY ${TIMEOUT} || exit 1
45+
wait_for_status $BATCH_JOB_IDENTIFIER '.status.jobStatus.state' RUNNING ${TIMEOUT} || exit 1
46+
47+
echo "Waiting for job to complete..."
48+
# For batch jobs, we need to wait for the FINISHED state
49+
wait_for_status $BATCH_JOB_IDENTIFIER '.status.jobStatus.state' FINISHED ${TIMEOUT} || {
50+
echo "Job did not complete successfully within timeout"
51+
kubectl logs $jm_pod_name -c flink-main-container
52+
exit 1
53+
}
54+
55+
echo "Verifying job results..."
56+
# Check if the output file exists and contains results
57+
result=$(kubectl exec $jm_pod_name -c flink-main-container -- cat /tmp/wordcount-result.txt 2>/dev/null)
58+
if [ -z "$result" ]; then
59+
echo "Output file is empty or does not exist"
60+
exit 1
61+
fi
62+
63+
echo "Checking for expected output patterns..."
64+
# Check for some expected patterns in the output
65+
if ! echo "$result" | grep -q "flink"; then
66+
echo "Expected word 'flink' not found in results"
67+
echo "Results: $result"
68+
exit 1
69+
fi
70+
71+
echo "Successfully completed batch job test"

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,7 @@
142142
import java.util.Optional;
143143
import java.util.concurrent.Callable;
144144
import java.util.concurrent.CompletableFuture;
145+
import java.util.concurrent.ExecutionException;
145146
import java.util.concurrent.ExecutorService;
146147
import java.util.concurrent.TimeUnit;
147148
import java.util.concurrent.TimeoutException;
@@ -546,7 +547,7 @@ public Optional<Savepoint> getLastCheckpoint(JobID jobId, Configuration conf) {
546547
try {
547548
latestCheckpointOpt = getCheckpointInfo(jobId, conf).f0;
548549
} catch (Exception e) {
549-
if (e instanceof RestClientException
550+
if (e instanceof ExecutionException
550551
&& e.getMessage() != null
551552
&& e.getMessage().contains("Checkpointing has not been enabled")) {
552553
LOG.warn("Checkpointing not enabled for job {}", jobId, e);

flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@
9898
import java.util.Random;
9999
import java.util.Set;
100100
import java.util.concurrent.CompletableFuture;
101+
import java.util.concurrent.ExecutionException;
101102
import java.util.concurrent.TimeoutException;
102103
import java.util.function.Consumer;
103104
import java.util.stream.Collectors;
@@ -595,8 +596,9 @@ public Optional<Savepoint> getLastCheckpoint(JobID jobId, Configuration conf) {
595596
Optional<CheckpointHistoryWrapper.PendingCheckpointInfo>>
596597
getCheckpointInfo(JobID jobId, Configuration conf) throws Exception {
597598
if (throwCheckpointingDisabledError) {
598-
throw new RestClientException(
599-
"Checkpointing has not been enabled", HttpResponseStatus.BAD_REQUEST);
599+
throw new ExecutionException(
600+
new RestClientException(
601+
"Checkpointing has not been enabled", HttpResponseStatus.BAD_REQUEST));
600602
}
601603

602604
if (checkpointInfo != null) {

0 commit comments

Comments
 (0)