Skip to content

[FLINK-37515] Basic support for Blue/Green deployments #954

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 23 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
9b8c0da
[release] Update version to 1.12-SNAPSHOT
gyfora Feb 21, 2025
e636a00
[release] Docs, ci and config updates after release cut
gyfora Feb 21, 2025
042c27e
[FLINK-37372] Fix infinite loop bug in savepoint error handling
gyfora Feb 24, 2025
9eb3c38
[FLINK-37370] [Observer] Finished batch jobs throw ReconciliationExce…
luca-p-castelli Feb 25, 2025
48359a3
[FLINK-33525] Move ImpulseSource to new Source API (#950)
Poorvankbhatia Mar 10, 2025
a1c510a
First code batch for FLIP-503
schongloo Mar 11, 2025
0cf910c
Merge branch 'apache:main' into main
schongloo Mar 11, 2025
2cfb97b
First code batch for FLIP-503
schongloo Mar 11, 2025
7a8043e
First code batch for FLIP-503
schongloo Mar 12, 2025
c4d460b
[FLINK-37370] [Observer] Fix exception caught when handling checkpoin…
luca-p-castelli Mar 16, 2025
679b033
[FLINK-37405] Validate config prefixes for Flink 2.0
gyfora Feb 27, 2025
6859683
[ci] PRs should not cancel each others or traget branches CI runs
gyfora Mar 27, 2025
7a65e02
[FLINK-37430] Operator hides the actual error on deployment issues
rodmeneses Mar 27, 2025
93e68f2
[FLINK-37571] Fix JobGraph removal for 2.0 last-state upgrades
gyfora Mar 27, 2025
86ab948
[FLINK-37562] Do not check all JM replicas in observer
gyfora Mar 14, 2025
b18bdb4
[hotfix] Do not leak autoscaler configs to jobs
gyfora Mar 7, 2025
cda493e
[FLINK-37455] Create error Event when job goes into FAILED state
gyfora Mar 28, 2025
1578a29
[FLINK-37470] Improve JobManager Deployment / Pod error handling
gyfora Mar 28, 2025
0e65a5a
[FLINK-37530] Record upgrade savepoint correctly in savepointInfo as …
gyfora Mar 21, 2025
1eb5919
Merge branch 'release-1.11' into release-1.11-bluegreen
schongloo Apr 2, 2025
90bf2b8
Merge branch 'apache:main' into release-1.11-bluegreen
schongloo Apr 2, 2025
c81ed91
Added logic to SUSPEND the FlinkDeployment accordingly if it fails to…
schongloo Apr 2, 2025
1f3da3e
Correctly "resuming" a previously SUSPENDED deployment.
schongloo Apr 7, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .asf.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ github:
release-1.8: {}
release-1.9: {}
release-1.10: {}
release-1.11: {}

notifications:
commits: [email protected]
Expand Down
9 changes: 8 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ on:
- release-*
pull_request:
concurrency:
group: ${{ github.workflow }}-${{ github.event.workflow_run.head_branch }}
group: ${{ github.workflow }}-${{ github.ref_name }}
cancel-in-progress: true

jobs:
Expand Down Expand Up @@ -161,6 +161,7 @@ jobs:
- test_autoscaler.sh
- test_flink_operator_ha.sh
- test_snapshot.sh
- test_batch_job.sh
exclude:
- flink-version: v1_16
test: test_autoscaler.sh
Expand All @@ -172,18 +173,24 @@ jobs:
test: test_flink_operator_ha.sh
- flink-version: v1_16
test: test_snapshot.sh
- flink-version: v1_16
test: test_batch_job.sh
- flink-version: v1_17
test: test_dynamic_config.sh
- flink-version: v1_17
test: test_flink_operator_ha.sh
- flink-version: v1_17
test: test_snapshot.sh
- flink-version: v1_17
test: test_batch_job.sh
- flink-version: v1_18
test: test_dynamic_config.sh
- flink-version: v1_18
test: test_flink_operator_ha.sh
- flink-version: v1_18
test: test_snapshot.sh
- flink-version: v1_18
test: test_batch_job.sh
- flink-version: v1_19
test: test_snapshot.sh
uses: ./.github/workflows/e2e.yaml
Expand Down
6 changes: 3 additions & 3 deletions .github/workflows/docs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ jobs:
matrix:
branch:
- main
- release-1.9
- release-1.10
- release-1.11
steps:
- uses: actions/checkout@v3
with:
Expand All @@ -41,8 +41,8 @@ jobs:
echo "flink_branch=${currentBranch}"
echo "flink_branch=${currentBranch}" >> ${GITHUB_ENV}
if [ "${currentBranch}" = "main" ]; then
echo "flink_alias=release-1.11" >> ${GITHUB_ENV}
elif [ "${currentBranch}" = "release-1.10" ]; then
echo "flink_alias=release-1.12" >> ${GITHUB_ENV}
elif [ "${currentBranch}" = "release-1.11" ]; then
echo "flink_alias=stable" >> ${GITHUB_ENV}
else
echo "flink_alias=${currentBranch}" >> ${GITHUB_ENV}
Expand Down
2 changes: 2 additions & 0 deletions .github/workflows/e2e.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ jobs:
EXAMPLES_JAR="https://repo1.maven.org/maven2/org/apache/flink/flink-examples-streaming_2.12/1.14.4/flink-examples-streaming_2.12-1.14.4.jar"
if [[ ${{ inputs.flink-version }} == v2* ]]; then
EXAMPLES_JAR="https://repo1.maven.org/maven2/org/apache/flink/flink-examples-streaming/2.0-preview1/flink-examples-streaming-2.0-preview1.jar"
elif [[ "${{ inputs.test }}" == "test_batch_job.sh" ]]; then
EXAMPLES_JAR="https://repo1.maven.org/maven2/org/apache/flink/flink-examples-streaming/1.20.1/flink-examples-streaming-1.20.1.jar"
fi
ESCAPED_EXAMPLES_JAR=$(printf '%s\n' "$EXAMPLES_JAR" | sed -e 's/[\/&]/\\&/g')

Expand Down
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ RUN cd /app/tools/license; mkdir jars; cd jars; \
FROM eclipse-temurin:${JAVA_VERSION}-jre-jammy
ENV FLINK_HOME=/opt/flink
ENV FLINK_PLUGINS_DIR=$FLINK_HOME/plugins
ENV OPERATOR_VERSION=1.11-SNAPSHOT
ENV OPERATOR_VERSION=1.12-SNAPSHOT
ENV OPERATOR_JAR=flink-kubernetes-operator-$OPERATOR_VERSION-shaded.jar
ENV WEBHOOK_JAR=flink-kubernetes-webhook-$OPERATOR_VERSION-shaded.jar
ENV KUBERNETES_STANDALONE_JAR=flink-kubernetes-standalone-$OPERATOR_VERSION.jar
Expand Down
87 changes: 87 additions & 0 deletions e2e-tests/data/flinkdep-batch-cr.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
namespace: default
name: flink-example-wordcount-batch
spec:
image: flink:1.20
flinkVersion: v1_20
ingress:
template: "/{{namespace}}/{{name}}(/|$)(.*)"
className: "nginx"
annotations:
nginx.ingress.kubernetes.io/rewrite-target: "/$2"
flinkConfiguration:
taskmanager.numberOfTaskSlots: "2"
kubernetes.operator.snapshot.resource.enabled: "false"
serviceAccount: flink
podTemplate:
spec:
initContainers:
- name: artifacts-fetcher
image: busybox:1.35.0
imagePullPolicy: IfNotPresent
# Use wget or other tools to get user jars from remote storage
command: [ 'wget', 'STREAMING_EXAMPLES_JAR_URL', '-O', '/flink-artifact/myjob.jar' ]
volumeMounts:
- mountPath: /flink-artifact
name: flink-artifact
containers:
# Do not change the main container name
- name: flink-main-container
resources:
requests:
ephemeral-storage: 2048Mi
limits:
ephemeral-storage: 2048Mi
volumeMounts:
- mountPath: /opt/flink/usrlib
name: flink-artifact
volumes:
- name: flink-artifact
emptyDir: { }
jobManager:
resource:
memory: "1024m"
cpu: 0.5
taskManager:
resource:
memory: "1Gi"
cpu: 0.5
job:
jarURI: local:///opt/flink/usrlib/myjob.jar
entryClass: org.apache.flink.streaming.examples.wordcount.WordCount
args: ["--execution-mode", "BATCH"]
parallelism: 2
upgradeMode: stateless
mode: native

---
apiVersion: networking.k8s.io/v1
kind: IngressClass
metadata:
annotations:
ingressclass.kubernetes.io/is-default-class: "true"
labels:
app.kubernetes.io/component: controller
name: nginx
spec:
controller: k8s.io/ingress-nginx
62 changes: 62 additions & 0 deletions e2e-tests/test_batch_job.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
#!/usr/bin/env bash
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

# This script tests basic Flink batch job operations on Kubernetes:
# 1. Deploys a FlinkDeployment for a batch job.
# 2. Waits for the JobManager to become ready.
# 3. Verifies that the job reaches the FINISHED state.
# 4. Applies a no-op spec change and verifies the job remains in the FINISHED state.
# 5. Checks the operator logs for the expected job state transition message.
# 6. Checks the JobManager logs for successful application completion.
# 7. Applies a spec change and verifies the job re-runs successfully.
SCRIPT_DIR=$(dirname "$(readlink -f "$0")")
source "${SCRIPT_DIR}/utils.sh"

CLUSTER_ID="flink-example-wordcount-batch"
APPLICATION_YAML="${SCRIPT_DIR}/data/flinkdep-batch-cr.yaml"
APPLICATION_IDENTIFIER="flinkdep/$CLUSTER_ID"
TIMEOUT=300

on_exit cleanup_and_exit "$APPLICATION_YAML" $TIMEOUT $CLUSTER_ID

retry_times 5 30 "kubectl apply -f $APPLICATION_YAML" || exit 1

wait_for_jobmanager_running $CLUSTER_ID $TIMEOUT

# Wait for the job to reach the FINISHED state.
wait_for_status $APPLICATION_IDENTIFIER '.status.jobStatus.state' FINISHED $TIMEOUT || exit 1

# Apply a no-op spec change; verify the job remains in the FINISHED state.
kubectl patch flinkdep ${CLUSTER_ID} --type merge --patch '{"spec":{"flinkConfiguration": {"kubernetes.operator.deployment.readiness.timeout": "6h" } } }'
wait_for_status $APPLICATION_IDENTIFIER '.status.jobStatus.state' FINISHED $TIMEOUT || exit 1

# Verify the job status change to FINISHED shows up in the operator logs.
operator_pod_name=$(get_operator_pod_name)
wait_for_operator_logs "$operator_pod_name" "Job status changed from .* to FINISHED" ${TIMEOUT} || exit 1

# Verify the job completed successfully in the job manager logs.
jm_pod_name=$(get_jm_pod_name $CLUSTER_ID)
wait_for_logs "$jm_pod_name" "Application completed SUCCESSFULLY" ${TIMEOUT} || exit 1

# Apply a spec change; verify the job re-runs and reaches the FINISHED state.
kubectl patch flinkdep ${CLUSTER_ID} --type merge --patch '{"spec":{"job": {"parallelism": 1 } } }'
wait_for_status $APPLICATION_IDENTIFIER '.status.jobStatus.state' RECONCILING $TIMEOUT || exit 1
wait_for_status $APPLICATION_IDENTIFIER '.status.jobStatus.state' FINISHED $TIMEOUT || exit 1

echo "Successfully ran the batch job test"
9 changes: 8 additions & 1 deletion examples/autoscaling/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ under the License.
<parent>
<groupId>org.apache.flink</groupId>
<artifactId>flink-kubernetes-operator-parent</artifactId>
<version>1.11-SNAPSHOT</version>
<version>1.12-SNAPSHOT</version>
<relativePath>../..</relativePath>
</parent>

Expand All @@ -45,6 +45,13 @@ under the License.
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-datagen</artifactId>
<version>${flink.version}</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,16 @@

package autoscaling;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.connector.datagen.source.DataGeneratorSource;
import org.apache.flink.connector.datagen.source.GeneratorFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Collector;

import org.slf4j.Logger;
Expand Down Expand Up @@ -60,6 +64,11 @@ public class LoadSimulationPipeline {

private static final Logger LOG = LoggerFactory.getLogger(LoadSimulationPipeline.class);

// Number of impulses (records) emitted per sampling interval.
// This value determines how many records should be generated within each `samplingIntervalMs`
// period.
private static final int IMPULSES_PER_SAMPLING_INTERVAL = 10;

public static void main(String[] args) throws Exception {
var env = StreamExecutionEnvironment.getExecutionEnvironment();
env.disableOperatorChaining();
Expand All @@ -74,8 +83,39 @@ public static void main(String[] args) throws Exception {
for (String branch : maxLoadPerTask.split("\n")) {
String[] taskLoads = branch.split(";");

/*
* Creates an unbounded stream that continuously emits the constant value 42L.
* Flink's DataGeneratorSource with RateLimiterStrategy is used to control the emission rate.
*
* Emission Rate Logic:
* - The goal is to generate a fixed number of impulses per sampling interval.
* - `samplingIntervalMs` defines the duration of one sampling interval in milliseconds.
* - We define `IMPULSES_PER_SAMPLING_INTERVAL = 10`, meaning that for every sampling interval,
* exactly 10 impulses should be generated.
*
* To calculate the total number of records emitted per second:
* 1. Determine how many sampling intervals fit within one second:
* samplingIntervalsPerSecond = 1000 / samplingIntervalMs;
* 2. Multiply this by the number of impulses per interval to get the total rate:
* impulsesPerSecond = IMPULSES_PER_SAMPLING_INTERVAL * samplingIntervalsPerSecond;
*
* Example:
* - If `samplingIntervalMs = 500 ms` and `IMPULSES_PER_SAMPLING_INTERVAL = 10`:
* impulsesPerSecond = (1000 / 500) * 10 = 2 * 10 = 20 records per second.
*/
DataStream<Long> stream =
env.addSource(new ImpulseSource(samplingIntervalMs)).name("ImpulseSource");
env.fromSource(
new DataGeneratorSource<>(
(GeneratorFunction<Long, Long>)
(index) -> 42L, // Emits constant value 42
Long.MAX_VALUE, // Unbounded stream
RateLimiterStrategy.perSecond(
(1000.0 / samplingIntervalMs)
* IMPULSES_PER_SAMPLING_INTERVAL), // Controls
// rate
Types.LONG),
WatermarkStrategy.noWatermarks(),
"ImpulseSource");

for (String load : taskLoads) {
double maxLoad = Double.parseDouble(load);
Expand All @@ -97,31 +137,6 @@ public static void main(String[] args) throws Exception {
+ ")");
}

private static class ImpulseSource implements SourceFunction<Long> {
private final int maxSleepTimeMs;
volatile boolean canceled;

public ImpulseSource(int samplingInterval) {
this.maxSleepTimeMs = samplingInterval / 10;
}

@Override
public void run(SourceContext<Long> sourceContext) throws Exception {
while (!canceled) {
synchronized (sourceContext.getCheckpointLock()) {
sourceContext.collect(42L);
}
// Provide an impulse to keep the load simulation active
Thread.sleep(maxSleepTimeMs);
}
}

@Override
public void cancel() {
canceled = true;
}
}

private static class LoadSimulationFn extends RichFlatMapFunction<Long, Long> {

private final double maxLoad;
Expand Down
2 changes: 1 addition & 1 deletion examples/flink-beam-example/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ under the License.
<parent>
<groupId>org.apache.flink</groupId>
<artifactId>flink-kubernetes-operator-parent</artifactId>
<version>1.11-SNAPSHOT</version>
<version>1.12-SNAPSHOT</version>
<relativePath>../..</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion examples/flink-sql-runner-example/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ under the License.
<parent>
<groupId>org.apache.flink</groupId>
<artifactId>flink-kubernetes-operator-parent</artifactId>
<version>1.11-SNAPSHOT</version>
<version>1.12-SNAPSHOT</version>
<relativePath>../..</relativePath>
</parent>

Expand Down
Loading