Skip to content

Commit 09e6c01

Browse files
flinkdep-batch-cr
1 parent 4ea2aed commit 09e6c01

File tree

3 files changed

+171
-40
lines changed

3 files changed

+171
-40
lines changed
Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
################################################################################
2+
# Licensed to the Apache Software Foundation (ASF) under one
3+
# or more contributor license agreements. See the NOTICE file
4+
# distributed with this work for additional information
5+
# regarding copyright ownership. The ASF licenses this file
6+
# to you under the Apache License, Version 2.0 (the
7+
# "License"); you may not use this file except in compliance
8+
# with the License. You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing, software
13+
# distributed under the License is distributed on an "AS IS" BASIS,
14+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
# See the License for the specific language governing permissions and
16+
# limitations under the License.
17+
################################################################################
18+
19+
apiVersion: flink.apache.org/v1beta1
20+
kind: FlinkDeployment
21+
metadata:
22+
namespace: default
23+
name: flink-example-wordcount
24+
spec:
25+
image: flink:1.20
26+
flinkVersion: v1_20
27+
ingress:
28+
template: "/{{namespace}}/{{name}}(/|$)(.*)"
29+
className: "nginx"
30+
annotations:
31+
nginx.ingress.kubernetes.io/rewrite-target: "/$2"
32+
flinkConfiguration:
33+
taskmanager.numberOfTaskSlots: "2"
34+
high-availability.type: kubernetes
35+
high-availability.storageDir: file:///opt/flink/volume/flink-ha
36+
state.checkpoints.dir: file:///opt/flink/volume/flink-cp
37+
state.savepoints.dir: file:///opt/flink/volume/flink-sp
38+
kubernetes.operator.snapshot.resource.enabled: "false"
39+
serviceAccount: flink
40+
podTemplate:
41+
spec:
42+
initContainers:
43+
- name: artifacts-fetcher
44+
image: busybox:1.35.0
45+
imagePullPolicy: IfNotPresent
46+
# Use wget or other tools to get user jars from remote storage
47+
command: [ 'wget', 'STREAMING_EXAMPLES_JAR_URL', '-O', '/flink-artifact/myjob.jar' ]
48+
volumeMounts:
49+
- mountPath: /flink-artifact
50+
name: flink-artifact
51+
containers:
52+
# Do not change the main container name
53+
- name: flink-main-container
54+
resources:
55+
requests:
56+
ephemeral-storage: 2048Mi
57+
limits:
58+
ephemeral-storage: 2048Mi
59+
volumeMounts:
60+
- mountPath: /opt/flink/usrlib
61+
name: flink-artifact
62+
- mountPath: /opt/flink/volume
63+
name: flink-volume
64+
volumes:
65+
- name: flink-artifact
66+
emptyDir: { }
67+
- name: flink-volume
68+
persistentVolumeClaim:
69+
claimName: flink-example-wordcount
70+
jobManager:
71+
resource:
72+
memory: "1024m"
73+
cpu: 0.5
74+
taskManager:
75+
resource:
76+
memory: "1Gi"
77+
cpu: 0.5
78+
job:
79+
jarURI: local:///opt/flink/usrlib/myjob.jar
80+
entryClass: org.apache.flink.streaming.examples.wordcount.WordCount
81+
parallelism: 2
82+
upgradeMode: last-state
83+
mode: native
84+
85+
---
86+
apiVersion: v1
87+
kind: PersistentVolumeClaim
88+
metadata:
89+
name: flink-example-wordcount
90+
spec:
91+
accessModes:
92+
- ReadWriteOnce
93+
volumeMode: Filesystem
94+
resources:
95+
requests:
96+
storage: 1Gi
97+
98+
---
99+
apiVersion: networking.k8s.io/v1
100+
kind: IngressClass
101+
metadata:
102+
annotations:
103+
ingressclass.kubernetes.io/is-default-class: "true"
104+
labels:
105+
app.kubernetes.io/component: controller
106+
name: nginx
107+
spec:
108+
controller: k8s.io/ingress-nginx

e2e-tests/test_batch_operations.sh

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
#!/usr/bin/env bash
2+
################################################################################
3+
# Licensed to the Apache Software Foundation (ASF) under one
4+
# or more contributor license agreements. See the NOTICE file
5+
# distributed with this work for additional information
6+
# regarding copyright ownership. The ASF licenses this file
7+
# to you under the Apache License, Version 2.0 (the
8+
# "License"); you may not use this file except in compliance
9+
# with the License. You may obtain a copy of the License at
10+
#
11+
# http://www.apache.org/licenses/LICENSE-2.0
12+
#
13+
# Unless required by applicable law or agreed to in writing, software
14+
# distributed under the License is distributed on an "AS IS" BASIS,
15+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
# See the License for the specific language governing permissions and
17+
# limitations under the License.
18+
################################################################################
19+
20+
# This script tests the batch job operations:
21+
# 1. Deploy a batch job
22+
# 2. Verify job completion
23+
# 3. Verify job results
24+
25+
SCRIPT_DIR=$(dirname "$(readlink -f "$0")")
26+
source "${SCRIPT_DIR}/utils.sh"
27+
28+
CLUSTER_ID="flink-batch-wordcount"
29+
APPLICATION_YAML="${SCRIPT_DIR}/data/flinkdep-batch-cr.yaml"
30+
APPLICATION_IDENTIFIER="flinkdep/$CLUSTER_ID"
31+
TIMEOUT=300
32+
33+
on_exit cleanup_and_exit "$APPLICATION_YAML" $TIMEOUT $CLUSTER_ID
34+
35+
retry_times 5 30 "kubectl apply -f $APPLICATION_YAML" || exit 1
36+
37+
wait_for_jobmanager_running $CLUSTER_ID $TIMEOUT
38+
jm_pod_name=$(get_jm_pod_name $CLUSTER_ID)
39+
40+
# Wait for the job to be submitted
41+
wait_for_logs $jm_pod_name "Job has been submitted with JobID" ${TIMEOUT} || exit 1
42+
wait_for_status $APPLICATION_IDENTIFIER '.status.jobManagerDeploymentStatus' READY ${TIMEOUT} || exit 1
43+
44+
# Extract the job ID
45+
job_id=$(kubectl logs $jm_pod_name -c flink-main-container | grep -E -o 'Job [a-z0-9]+ is submitted' | awk '{print $2}')
46+
echo "Batch job ID: $job_id"
47+
48+
# Wait for the job to complete
49+
wait_for_logs $jm_pod_name "Job $job_id reached globally terminal state FINISHED" ${TIMEOUT} || exit 1
50+
wait_for_status $APPLICATION_IDENTIFIER '.status.jobStatus.state' FINISHED ${TIMEOUT} || exit 1
51+
52+
# Verify the job result exists in the container
53+
echo "Verifying job results..."
54+
kubectl exec $jm_pod_name -c flink-main-container -- cat /tmp/wordcount-result.txt > /dev/null || exit 1
55+
56+
# Verify some expected content in the result file
57+
result_content=$(kubectl exec $jm_pod_name -c flink-main-container -- cat /tmp/wordcount-result.txt)
58+
if ! echo "$result_content" | grep -q "the"; then
59+
echo "Expected word 'the' not found in the result"
60+
exit 1
61+
fi
62+
63+
echo "Successfully completed the batch job test"

examples/basic-batch.yaml

Lines changed: 0 additions & 40 deletions
This file was deleted.

0 commit comments

Comments
 (0)