Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ jobs:
- test_autoscaler.sh
- test_flink_operator_ha.sh
- test_snapshot.sh
- test_batch_job.sh
exclude:
- flink-version: v1_16
test: test_autoscaler.sh
Expand Down
109 changes: 109 additions & 0 deletions e2e-tests/data/flinkdep-batch-cr.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
namespace: default
name: flink-example-wordcount-batch
spec:
image: flink:1.20
flinkVersion: v1_20
ingress:
template: "/{{namespace}}/{{name}}(/|$)(.*)"
className: "nginx"
annotations:
nginx.ingress.kubernetes.io/rewrite-target: "/$2"
flinkConfiguration:
taskmanager.numberOfTaskSlots: "2"
high-availability.type: kubernetes
high-availability.storageDir: file:///opt/flink/volume/flink-ha
state.checkpoints.dir: file:///opt/flink/volume/flink-cp
state.savepoints.dir: file:///opt/flink/volume/flink-sp
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need HA/checkpoint configs? We could remove this and also simplify the pod template

kubernetes.operator.snapshot.resource.enabled: "false"
serviceAccount: flink
podTemplate:
spec:
initContainers:
- name: artifacts-fetcher
image: busybox:1.35.0
imagePullPolicy: IfNotPresent
# Use wget or other tools to get user jars from remote storage
command: [ 'wget', 'STREAMING_EXAMPLES_JAR_URL', '-O', '/flink-artifact/myjob.jar' ]
volumeMounts:
- mountPath: /flink-artifact
name: flink-artifact
containers:
# Do not change the main container name
- name: flink-main-container
resources:
requests:
ephemeral-storage: 2048Mi
limits:
ephemeral-storage: 2048Mi
volumeMounts:
- mountPath: /opt/flink/usrlib
name: flink-artifact
- mountPath: /opt/flink/volume
name: flink-volume
volumes:
- name: flink-artifact
emptyDir: { }
- name: flink-volume
persistentVolumeClaim:
claimName: flink-example-wordcount-batch
jobManager:
resource:
memory: "1024m"
cpu: 0.5
taskManager:
resource:
memory: "1Gi"
cpu: 0.5
job:
jarURI: local:///opt/flink/usrlib/myjob.jar
entryClass: org.apache.flink.streaming.examples.wordcount.WordCount
args: ["--execution-mode", "BATCH"]
parallelism: 2
upgradeMode: stateless
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we set this to last-state?
I feel that with stateless we won't be able to verify that the savepoint observe logic is successful

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could. Although the operator logic still reaches getLastCheckpoint even in stateless mode. Does last-state even make sense for batch jobs given they don't support checkpointing?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you are right, we could leave this in stateless. To avoid sleeping and make sure that is reconciled and stays in finished we can simply add a no-op spec change (such as change some operator config that doesn't result in a new job depoyment) or changing of the upgrade mode

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gyfora Makes sense. Updated.

mode: native

---
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: flink-example-wordcount-batch
spec:
accessModes:
- ReadWriteOnce
volumeMode: Filesystem
resources:
requests:
storage: 1Gi

---
apiVersion: networking.k8s.io/v1
kind: IngressClass
metadata:
annotations:
ingressclass.kubernetes.io/is-default-class: "true"
labels:
app.kubernetes.io/component: controller
name: nginx
spec:
controller: k8s.io/ingress-nginx
60 changes: 60 additions & 0 deletions e2e-tests/test_batch_job.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
#!/usr/bin/env bash
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

# This script tests basic Flink batch job operations on Kubernetes:
# 1. Deploys a FlinkDeployment for a batch job.
# 2. Waits for the JobManager to become ready.
# 3. Verifies that the job reaches the FINISHED state (first check).
# 4. Sleeps for a configurable duration to test for state persistence.
# 5. Verifies that the job remains in the FINISHED state (second check).
# 6. Checks the operator logs for the expected job state transition message.
# 7. Checks the JobManager logs for successful application completion.
SCRIPT_DIR=$(dirname "$(readlink -f "$0")")
source "${SCRIPT_DIR}/utils.sh"

CLUSTER_ID="flink-example-wordcount-batch"
APPLICATION_YAML="${SCRIPT_DIR}/data/flinkdep-batch-cr.yaml"
APPLICATION_IDENTIFIER="flinkdep/$CLUSTER_ID"
TIMEOUT=300
SLEEP_DURATION=30

on_exit cleanup_and_exit "$APPLICATION_YAML" $TIMEOUT $CLUSTER_ID

retry_times 5 30 "kubectl apply -f $APPLICATION_YAML" || exit 1

wait_for_jobmanager_running $CLUSTER_ID $TIMEOUT

# Wait for the job to reach the FINISHED state (first check).
wait_for_status $APPLICATION_IDENTIFIER '.status.jobStatus.state' FINISHED $TIMEOUT || exit 1

echo "Job reached FINISHED state. Sleeping for $SLEEP_DURATION seconds..."
sleep "$SLEEP_DURATION"

# Verify the job is *still* in the FINISHED state (second check).
wait_for_status $APPLICATION_IDENTIFIER '.status.jobStatus.state' FINISHED $TIMEOUT || exit 1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of sleeping and checking again, would it make sense to maybe send in a spec change and make sure the operator re-triggers the batch job correctly? I think that would rule out the savepoint observe errors etc.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea behind the sleeping and checking again came from the fact that in #944, I fixed a bug where a bounded streaming job would reach finished and then would be set back to reconciling.

I do like the idea of adding a spec change and verifying the batch job reruns and finishes though.


# Verify the job status change to FINISHED shows up in the operator logs.
operator_pod_name=$(get_operator_pod_name)
wait_for_operator_logs "$operator_pod_name" "Job status changed from .* to FINISHED" ${TIMEOUT} || exit 1

# Verify the job completed successfully in the job manager logs.
jm_pod_name=$(get_jm_pod_name $CLUSTER_ID)
wait_for_logs "$jm_pod_name" "Application completed SUCCESSFULLY" ${TIMEOUT} || exit 1

echo "Successfully ran the batch job test"
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@
import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
Expand Down Expand Up @@ -546,7 +547,7 @@ public Optional<Savepoint> getLastCheckpoint(JobID jobId, Configuration conf) {
try {
latestCheckpointOpt = getCheckpointInfo(jobId, conf).f0;
} catch (Exception e) {
if (e instanceof RestClientException
if (e instanceof ExecutionException
&& e.getMessage() != null
&& e.getMessage().contains("Checkpointing has not been enabled")) {
LOG.warn("Checkpointing not enabled for job {}", jobId, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@
import java.util.Random;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -595,8 +596,9 @@ public Optional<Savepoint> getLastCheckpoint(JobID jobId, Configuration conf) {
Optional<CheckpointHistoryWrapper.PendingCheckpointInfo>>
getCheckpointInfo(JobID jobId, Configuration conf) throws Exception {
if (throwCheckpointingDisabledError) {
throw new RestClientException(
"Checkpointing has not been enabled", HttpResponseStatus.BAD_REQUEST);
throw new ExecutionException(
new RestClientException(
"Checkpointing has not been enabled", HttpResponseStatus.BAD_REQUEST));
}

if (checkpointInfo != null) {
Expand Down