Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ jobs:
- test_autoscaler.sh
- test_flink_operator_ha.sh
- test_snapshot.sh
- test_batch_job.sh
exclude:
- flink-version: v1_16
test: test_autoscaler.sh
Expand All @@ -172,18 +173,24 @@ jobs:
test: test_flink_operator_ha.sh
- flink-version: v1_16
test: test_snapshot.sh
- flink-version: v1_16
test: test_batch_job.sh
- flink-version: v1_17
test: test_dynamic_config.sh
- flink-version: v1_17
test: test_flink_operator_ha.sh
- flink-version: v1_17
test: test_snapshot.sh
- flink-version: v1_17
test: test_batch_job.sh
- flink-version: v1_18
test: test_dynamic_config.sh
- flink-version: v1_18
test: test_flink_operator_ha.sh
- flink-version: v1_18
test: test_snapshot.sh
- flink-version: v1_18
test: test_batch_job.sh
- flink-version: v1_19
test: test_snapshot.sh
uses: ./.github/workflows/e2e.yaml
Expand Down
2 changes: 2 additions & 0 deletions .github/workflows/e2e.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ jobs:
EXAMPLES_JAR="https://repo1.maven.org/maven2/org/apache/flink/flink-examples-streaming_2.12/1.14.4/flink-examples-streaming_2.12-1.14.4.jar"
if [[ ${{ inputs.flink-version }} == v2* ]]; then
EXAMPLES_JAR="https://repo1.maven.org/maven2/org/apache/flink/flink-examples-streaming/2.0-preview1/flink-examples-streaming-2.0-preview1.jar"
elif [[ "${{ inputs.test }}" == "test_batch_job.sh" ]]; then
EXAMPLES_JAR="https://repo1.maven.org/maven2/org/apache/flink/flink-examples-streaming/1.20.1/flink-examples-streaming-1.20.1.jar"
fi
ESCAPED_EXAMPLES_JAR=$(printf '%s\n' "$EXAMPLES_JAR" | sed -e 's/[\/&]/\\&/g')

Expand Down
87 changes: 87 additions & 0 deletions e2e-tests/data/flinkdep-batch-cr.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
namespace: default
name: flink-example-wordcount-batch
spec:
image: flink:1.20
flinkVersion: v1_20
ingress:
template: "/{{namespace}}/{{name}}(/|$)(.*)"
className: "nginx"
annotations:
nginx.ingress.kubernetes.io/rewrite-target: "/$2"
flinkConfiguration:
taskmanager.numberOfTaskSlots: "2"
kubernetes.operator.snapshot.resource.enabled: "false"
serviceAccount: flink
podTemplate:
spec:
initContainers:
- name: artifacts-fetcher
image: busybox:1.35.0
imagePullPolicy: IfNotPresent
# Use wget or other tools to get user jars from remote storage
command: [ 'wget', 'STREAMING_EXAMPLES_JAR_URL', '-O', '/flink-artifact/myjob.jar' ]
volumeMounts:
- mountPath: /flink-artifact
name: flink-artifact
containers:
# Do not change the main container name
- name: flink-main-container
resources:
requests:
ephemeral-storage: 2048Mi
limits:
ephemeral-storage: 2048Mi
volumeMounts:
- mountPath: /opt/flink/usrlib
name: flink-artifact
volumes:
- name: flink-artifact
emptyDir: { }
jobManager:
resource:
memory: "1024m"
cpu: 0.5
taskManager:
resource:
memory: "1Gi"
cpu: 0.5
job:
jarURI: local:///opt/flink/usrlib/myjob.jar
entryClass: org.apache.flink.streaming.examples.wordcount.WordCount
args: ["--execution-mode", "BATCH"]
parallelism: 2
upgradeMode: stateless
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we set this to last-state?
I feel that with stateless we won't be able to verify that the savepoint observe logic is successful

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could. Although the operator logic still reaches getLastCheckpoint even in stateless mode. Does last-state even make sense for batch jobs given they don't support checkpointing?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you are right, we could leave this in stateless. To avoid sleeping and make sure that is reconciled and stays in finished we can simply add a no-op spec change (such as change some operator config that doesn't result in a new job depoyment) or changing of the upgrade mode

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gyfora Makes sense. Updated.

mode: native

---
apiVersion: networking.k8s.io/v1
kind: IngressClass
metadata:
annotations:
ingressclass.kubernetes.io/is-default-class: "true"
labels:
app.kubernetes.io/component: controller
name: nginx
spec:
controller: k8s.io/ingress-nginx
62 changes: 62 additions & 0 deletions e2e-tests/test_batch_job.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
#!/usr/bin/env bash
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

# This script tests basic Flink batch job operations on Kubernetes:
# 1. Deploys a FlinkDeployment for a batch job.
# 2. Waits for the JobManager to become ready.
# 3. Verifies that the job reaches the FINISHED state.
# 4. Applies a no-op spec change and verifies the job remains in the FINISHED state.
# 5. Checks the operator logs for the expected job state transition message.
# 6. Checks the JobManager logs for successful application completion.
# 7. Applies a spec change and verifies the job re-runs successfully.
SCRIPT_DIR=$(dirname "$(readlink -f "$0")")
source "${SCRIPT_DIR}/utils.sh"

CLUSTER_ID="flink-example-wordcount-batch"
APPLICATION_YAML="${SCRIPT_DIR}/data/flinkdep-batch-cr.yaml"
APPLICATION_IDENTIFIER="flinkdep/$CLUSTER_ID"
TIMEOUT=300

on_exit cleanup_and_exit "$APPLICATION_YAML" $TIMEOUT $CLUSTER_ID

retry_times 5 30 "kubectl apply -f $APPLICATION_YAML" || exit 1

wait_for_jobmanager_running $CLUSTER_ID $TIMEOUT

# Wait for the job to reach the FINISHED state.
wait_for_status $APPLICATION_IDENTIFIER '.status.jobStatus.state' FINISHED $TIMEOUT || exit 1

# Apply a no-op spec change; verify the job remains in the FINISHED state.
kubectl patch flinkdep ${CLUSTER_ID} --type merge --patch '{"spec":{"flinkConfiguration": {"kubernetes.operator.deployment.readiness.timeout": "6h" } } }'
wait_for_status $APPLICATION_IDENTIFIER '.status.jobStatus.state' FINISHED $TIMEOUT || exit 1

# Verify the job status change to FINISHED shows up in the operator logs.
operator_pod_name=$(get_operator_pod_name)
wait_for_operator_logs "$operator_pod_name" "Job status changed from .* to FINISHED" ${TIMEOUT} || exit 1

# Verify the job completed successfully in the job manager logs.
jm_pod_name=$(get_jm_pod_name $CLUSTER_ID)
wait_for_logs "$jm_pod_name" "Application completed SUCCESSFULLY" ${TIMEOUT} || exit 1

# Apply a spec change; verify the job re-runs and reaches the FINISHED state.
kubectl patch flinkdep ${CLUSTER_ID} --type merge --patch '{"spec":{"job": {"parallelism": 1 } } }'
wait_for_status $APPLICATION_IDENTIFIER '.status.jobStatus.state' RECONCILING $TIMEOUT || exit 1
wait_for_status $APPLICATION_IDENTIFIER '.status.jobStatus.state' FINISHED $TIMEOUT || exit 1

echo "Successfully ran the batch job test"
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@
import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
Expand Down Expand Up @@ -546,7 +547,7 @@ public Optional<Savepoint> getLastCheckpoint(JobID jobId, Configuration conf) {
try {
latestCheckpointOpt = getCheckpointInfo(jobId, conf).f0;
} catch (Exception e) {
if (e instanceof RestClientException
if (e instanceof ExecutionException
&& e.getMessage() != null
&& e.getMessage().contains("Checkpointing has not been enabled")) {
LOG.warn("Checkpointing not enabled for job {}", jobId, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@
import java.util.Random;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -595,8 +596,9 @@ public Optional<Savepoint> getLastCheckpoint(JobID jobId, Configuration conf) {
Optional<CheckpointHistoryWrapper.PendingCheckpointInfo>>
getCheckpointInfo(JobID jobId, Configuration conf) throws Exception {
if (throwCheckpointingDisabledError) {
throw new RestClientException(
"Checkpointing has not been enabled", HttpResponseStatus.BAD_REQUEST);
throw new ExecutionException(
new RestClientException(
"Checkpointing has not been enabled", HttpResponseStatus.BAD_REQUEST));
}

if (checkpointInfo != null) {
Expand Down