diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index ab62db95f8..9618a2b3b8 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -91,6 +91,7 @@ jobs:
http-client: [ "okhttp" ]
java-version: [ "11", "17"]
flink-version:
+ - "v2_0"
- "v1_20"
- "v1_19"
- "v1_18"
@@ -107,11 +108,11 @@ jobs:
strategy:
matrix:
flink-version:
+ - "v2_0"
- "v1_20"
- "v1_19"
- "v1_18"
- "v1_17"
- - "v1_16"
mode:
- "native"
- "standalone"
@@ -143,11 +144,11 @@ jobs:
strategy:
matrix:
flink-version:
+ - "v2_0"
- "v1_20"
- "v1_18"
- "v1_19"
- "v1_17"
- - "v1_16"
mode:
- "native"
- "standalone"
diff --git a/.github/workflows/e2e.yaml b/.github/workflows/e2e.yaml
index 9a1f6ce429..26f90a8895 100644
--- a/.github/workflows/e2e.yaml
+++ b/.github/workflows/e2e.yaml
@@ -84,10 +84,19 @@ jobs:
if [[ "${{ inputs.append-java-version }}" == "true" ]]; then
FLINK_IMAGE=${FLINK_IMAGE}-java${{ inputs.java-version }}
fi
+
+ EXAMPLES_JAR="https://repo1.maven.org/maven2/org/apache/flink/flink-examples-streaming_2.12/1.14.4/flink-examples-streaming_2.12-1.14.4.jar"
+ if [[ ${{ inputs.flink-version }} == v2* ]]; then
+ EXAMPLES_JAR="https://repo1.maven.org/maven2/org/apache/flink/flink-examples-streaming/2.0-preview1/flink-examples-streaming-2.0-preview1.jar"
+ fi
+ ESCAPED_EXAMPLES_JAR=$(printf '%s\n' "$EXAMPLES_JAR" | sed -e 's/[\/&]/\\&/g')
+
echo FLINK_IMAGE=${FLINK_IMAGE}
+ echo EXAMPLES_JAR=${EXAMPLES_JAR}
sed -i "s/image: flink:.*/image: ${FLINK_IMAGE}/" e2e-tests/data/*.yaml
sed -i "s/flinkVersion: .*/flinkVersion: ${{ inputs.flink-version }}/" e2e-tests/data/*.yaml
sed -i "s/mode: .*/mode: ${{ inputs.mode }}/" e2e-tests/data/*.yaml
+ sed -i "s/STREAMING_EXAMPLES_JAR_URL/${ESCAPED_EXAMPLES_JAR}/" e2e-tests/data/*.yaml
git diff HEAD
echo "Running e2e-tests/$test"
bash e2e-tests/${{ inputs.test }} || exit 1
diff --git a/docs/content.zh/docs/custom-resource/autoscaler.md b/docs/content.zh/docs/custom-resource/autoscaler.md
index 3b5f128c77..648ca406d4 100644
--- a/docs/content.zh/docs/custom-resource/autoscaler.md
+++ b/docs/content.zh/docs/custom-resource/autoscaler.md
@@ -177,7 +177,7 @@ We suggest setting this based on your actual objective, such us 10,30,60 minutes
### Basic configuration example
```yaml
...
-flinkVersion: v1_17
+flinkVersion: v1_20
flinkConfiguration:
job.autoscaler.enabled: "true"
job.autoscaler.stabilization.interval: 1m
diff --git a/docs/content.zh/docs/custom-resource/job-management.md b/docs/content.zh/docs/custom-resource/job-management.md
index 74d3b026da..70cf930808 100644
--- a/docs/content.zh/docs/custom-resource/job-management.md
+++ b/docs/content.zh/docs/custom-resource/job-management.md
@@ -111,8 +111,8 @@ kind: FlinkDeployment
metadata:
name: basic-checkpoint-ha-example
spec:
- image: flink:1.17
- flinkVersion: v1_17
+ image: flink:1.20
+ flinkVersion: v1_20
flinkConfiguration:
taskmanager.numberOfTaskSlots: "2"
state.savepoints.dir: file:///flink-data/savepoints
diff --git a/docs/content.zh/docs/custom-resource/overview.md b/docs/content.zh/docs/custom-resource/overview.md
index 3c7cc94354..fecc4c93f2 100644
--- a/docs/content.zh/docs/custom-resource/overview.md
+++ b/docs/content.zh/docs/custom-resource/overview.md
@@ -114,8 +114,8 @@ metadata:
namespace: default
name: basic-example
spec:
- image: flink:1.17
- flinkVersion: v1_17
+ image: flink:1.20
+ flinkVersion: v1_20
flinkConfiguration:
taskmanager.numberOfTaskSlots: "2"
serviceAccount: flink
diff --git a/docs/content.zh/docs/custom-resource/pod-template.md b/docs/content.zh/docs/custom-resource/pod-template.md
index a10efc79d7..af4eaffe84 100644
--- a/docs/content.zh/docs/custom-resource/pod-template.md
+++ b/docs/content.zh/docs/custom-resource/pod-template.md
@@ -46,8 +46,8 @@ metadata:
namespace: default
name: pod-template-example
spec:
- image: flink:1.17
- flinkVersion: v1_17
+ image: flink:1.20
+ flinkVersion: v1_20
flinkConfiguration:
taskmanager.numberOfTaskSlots: "2"
serviceAccount: flink
diff --git a/docs/content.zh/docs/operations/ingress.md b/docs/content.zh/docs/operations/ingress.md
index 0947f2d288..7cd19e9857 100644
--- a/docs/content.zh/docs/operations/ingress.md
+++ b/docs/content.zh/docs/operations/ingress.md
@@ -35,8 +35,8 @@ metadata:
namespace: default
name: advanced-ingress
spec:
- image: flink:1.17
- flinkVersion: v1_17
+ image: flink:1.20
+ flinkVersion: v1_20
ingress:
template: "flink.k8s.io/{{namespace}}/{{name}}(/|$)(.*)"
className: "nginx"
diff --git a/docs/content/docs/custom-resource/autoscaler.md b/docs/content/docs/custom-resource/autoscaler.md
index cb5dfe27ae..e55f56e87c 100644
--- a/docs/content/docs/custom-resource/autoscaler.md
+++ b/docs/content/docs/custom-resource/autoscaler.md
@@ -177,7 +177,7 @@ We suggest setting this based on your actual objective, such us 10,30,60 minutes
### Basic configuration example
```yaml
...
-flinkVersion: v1_17
+flinkVersion: v1_20
flinkConfiguration:
job.autoscaler.enabled: "true"
job.autoscaler.stabilization.interval: 1m
diff --git a/docs/content/docs/custom-resource/job-management.md b/docs/content/docs/custom-resource/job-management.md
index cfae67b0d1..078fcf8d9c 100644
--- a/docs/content/docs/custom-resource/job-management.md
+++ b/docs/content/docs/custom-resource/job-management.md
@@ -111,8 +111,8 @@ kind: FlinkDeployment
metadata:
name: basic-checkpoint-ha-example
spec:
- image: flink:1.17
- flinkVersion: v1_17
+ image: flink:1.20
+ flinkVersion: v1_20
flinkConfiguration:
taskmanager.numberOfTaskSlots: "2"
state.savepoints.dir: file:///flink-data/savepoints
diff --git a/docs/content/docs/custom-resource/overview.md b/docs/content/docs/custom-resource/overview.md
index 61beb10af1..868558147c 100644
--- a/docs/content/docs/custom-resource/overview.md
+++ b/docs/content/docs/custom-resource/overview.md
@@ -117,8 +117,8 @@ metadata:
namespace: default
name: basic-example
spec:
- image: flink:1.17
- flinkVersion: v1_17
+ image: flink:1.20
+ flinkVersion: v1_20
flinkConfiguration:
taskmanager.numberOfTaskSlots: "2"
serviceAccount: flink
diff --git a/docs/content/docs/custom-resource/pod-template.md b/docs/content/docs/custom-resource/pod-template.md
index 3695864ce9..e8ef676ea0 100644
--- a/docs/content/docs/custom-resource/pod-template.md
+++ b/docs/content/docs/custom-resource/pod-template.md
@@ -49,8 +49,8 @@ metadata:
namespace: default
name: pod-template-example
spec:
- image: flink:1.17
- flinkVersion: v1_17
+ image: flink:1.20
+ flinkVersion: v1_20
flinkConfiguration:
taskmanager.numberOfTaskSlots: "2"
serviceAccount: flink
diff --git a/docs/content/docs/custom-resource/reference.md b/docs/content/docs/custom-resource/reference.md
index 41bd17efc5..909343c103 100644
--- a/docs/content/docs/custom-resource/reference.md
+++ b/docs/content/docs/custom-resource/reference.md
@@ -112,11 +112,12 @@ This serves as a full reference for FlinkDeployment and FlinkSessionJob custom r
| v1_13 | No longer supported since 1.7 operator release. |
| v1_14 | No longer supported since 1.7 operator release. |
| v1_15 | Deprecated since 1.10 operator release. |
-| v1_16 | |
+| v1_16 | Deprecated since 1.11 operator release. |
| v1_17 | |
| v1_18 | |
| v1_19 | |
| v1_20 | |
+| v2_0 | |
| majorVersion | int | The major integer from the Flink semver. For example for Flink 1.18.1 this would be 1. |
| minorVersion | int | The minor integer from the Flink semver. For example for Flink 1.18.1 this would be 18. |
diff --git a/docs/content/docs/operations/ingress.md b/docs/content/docs/operations/ingress.md
index 26c300bacb..2ca9cf74c9 100644
--- a/docs/content/docs/operations/ingress.md
+++ b/docs/content/docs/operations/ingress.md
@@ -35,8 +35,8 @@ metadata:
namespace: default
name: advanced-ingress
spec:
- image: flink:1.17
- flinkVersion: v1_17
+ image: flink:1.20
+ flinkVersion: v1_20
ingress:
template: "flink.k8s.io/{{namespace}}/{{name}}(/|$)(.*)"
className: "nginx"
diff --git a/e2e-tests/data/autoscaler.yaml b/e2e-tests/data/autoscaler.yaml
index 5e0b30432f..ca106ce1cd 100644
--- a/e2e-tests/data/autoscaler.yaml
+++ b/e2e-tests/data/autoscaler.yaml
@@ -22,8 +22,8 @@ metadata:
namespace: default
name: flink-autoscaler-e2e
spec:
- image: flink:1.18
- flinkVersion: v1_18
+ image: flink:1.20
+ flinkVersion: v1_20
ingress:
template: "/{{namespace}}/{{name}}(/|$)(.*)"
className: "nginx"
diff --git a/e2e-tests/data/flinkdep-cr.yaml b/e2e-tests/data/flinkdep-cr.yaml
index 85dd86ab35..f4847678e0 100644
--- a/e2e-tests/data/flinkdep-cr.yaml
+++ b/e2e-tests/data/flinkdep-cr.yaml
@@ -22,8 +22,8 @@ metadata:
namespace: default
name: flink-example-statemachine
spec:
- image: flink:1.17
- flinkVersion: v1_17
+ image: flink:1.20
+ flinkVersion: v1_20
ingress:
template: "/{{namespace}}/{{name}}(/|$)(.*)"
className: "nginx"
@@ -31,7 +31,7 @@ spec:
nginx.ingress.kubernetes.io/rewrite-target: "/$2"
flinkConfiguration:
taskmanager.numberOfTaskSlots: "2"
- high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
+ high-availability.type: kubernetes
high-availability.storageDir: file:///opt/flink/volume/flink-ha
state.checkpoints.dir: file:///opt/flink/volume/flink-cp
state.savepoints.dir: file:///opt/flink/volume/flink-sp
@@ -44,7 +44,7 @@ spec:
image: busybox:1.35.0
imagePullPolicy: IfNotPresent
# Use wget or other tools to get user jars from remote storage
- command: [ 'wget', 'https://repo1.maven.org/maven2/org/apache/flink/flink-examples-streaming_2.12/1.14.4/flink-examples-streaming_2.12-1.14.4.jar', '-O', '/flink-artifact/myjob.jar' ]
+ command: [ 'wget', 'STREAMING_EXAMPLES_JAR_URL', '-O', '/flink-artifact/myjob.jar' ]
volumeMounts:
- mountPath: /flink-artifact
name: flink-artifact
diff --git a/e2e-tests/data/multi-sessionjob.yaml b/e2e-tests/data/multi-sessionjob.yaml
index d10b6c113e..e8f84ce1a8 100644
--- a/e2e-tests/data/multi-sessionjob.yaml
+++ b/e2e-tests/data/multi-sessionjob.yaml
@@ -22,8 +22,8 @@ metadata:
namespace: default
name: session-cluster-1
spec:
- image: flink:1.17
- flinkVersion: v1_17
+ image: flink:1.20
+ flinkVersion: v1_20
ingress:
template: "/{{namespace}}/{{name}}(/|$)(.*)"
className: "nginx"
@@ -31,7 +31,7 @@ spec:
nginx.ingress.kubernetes.io/rewrite-target: "/$2"
flinkConfiguration:
taskmanager.numberOfTaskSlots: "2"
- high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
+ high-availability.type: kubernetes
high-availability.storageDir: file:///opt/flink/volume/flink-ha
state.checkpoints.dir: file:///opt/flink/volume/flink-cp
state.savepoints.dir: file:///opt/flink/volume/flink-sp
@@ -70,8 +70,8 @@ metadata:
namespace: flink
name: session-cluster-1
spec:
- image: flink:1.17
- flinkVersion: v1_17
+ image: flink:1.20
+ flinkVersion: v1_20
ingress:
template: "/{{namespace}}/{{name}}(/|$)(.*)"
className: "nginx"
@@ -79,7 +79,7 @@ spec:
nginx.ingress.kubernetes.io/rewrite-target: "/$2"
flinkConfiguration:
taskmanager.numberOfTaskSlots: "2"
- high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
+ high-availability.type: kubernetes
high-availability.storageDir: file:///opt/flink/volume/flink-ha
state.checkpoints.dir: file:///opt/flink/volume/flink-cp
state.savepoints.dir: file:///opt/flink/volume/flink-sp
@@ -120,7 +120,7 @@ metadata:
spec:
deploymentName: session-cluster-1
job:
- jarURI: https://repo1.maven.org/maven2/org/apache/flink/flink-examples-streaming_2.12/1.14.4/flink-examples-streaming_2.12-1.14.4.jar
+ jarURI: STREAMING_EXAMPLES_JAR_URL
parallelism: 2
upgradeMode: savepoint
entryClass: org.apache.flink.streaming.examples.statemachine.StateMachineExample
@@ -134,7 +134,7 @@ metadata:
spec:
deploymentName: session-cluster-1
job:
- jarURI: https://repo1.maven.org/maven2/org/apache/flink/flink-examples-streaming_2.12/1.14.4/flink-examples-streaming_2.12-1.14.4.jar
+ jarURI: STREAMING_EXAMPLES_JAR_URL
parallelism: 2
upgradeMode: savepoint
entryClass: org.apache.flink.streaming.examples.statemachine.StateMachineExample
diff --git a/e2e-tests/data/sessionjob-cr.yaml b/e2e-tests/data/sessionjob-cr.yaml
index f7759739a6..1d96988f73 100644
--- a/e2e-tests/data/sessionjob-cr.yaml
+++ b/e2e-tests/data/sessionjob-cr.yaml
@@ -22,8 +22,8 @@ metadata:
namespace: default
name: session-cluster-1
spec:
- image: flink:1.17
- flinkVersion: v1_17
+ image: flink:1.20
+ flinkVersion: v1_20
ingress:
template: "/{{namespace}}/{{name}}(/|$)(.*)"
className: "nginx"
@@ -31,7 +31,7 @@ spec:
nginx.ingress.kubernetes.io/rewrite-target: "/$2"
flinkConfiguration:
taskmanager.numberOfTaskSlots: "2"
- high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
+ high-availability.type: kubernetes
high-availability.storageDir: file:///opt/flink/volume/flink-ha
state.checkpoints.dir: file:///opt/flink/volume/flink-cp
state.savepoints.dir: file:///opt/flink/volume/flink-sp
@@ -73,7 +73,7 @@ metadata:
spec:
deploymentName: session-cluster-1
job:
- jarURI: https://repo1.maven.org/maven2/org/apache/flink/flink-examples-streaming_2.12/1.14.4/flink-examples-streaming_2.12-1.14.4.jar
+ jarURI: STREAMING_EXAMPLES_JAR_URL
parallelism: 2
upgradeMode: savepoint
entryClass: org.apache.flink.streaming.examples.statemachine.StateMachineExample
diff --git a/e2e-tests/utils.sh b/e2e-tests/utils.sh
index 5356a40fac..bf2f0e64d3 100755
--- a/e2e-tests/utils.sh
+++ b/e2e-tests/utils.sh
@@ -231,8 +231,14 @@ function delete_operator_pod_with_leadership() {
function debug_and_show_logs {
echo "Debugging failed e2e test:"
echo "Currently existing Kubernetes resources"
+ kubectl get flinkdeployments
+ kubectl get flinksessionjobs
kubectl get all
+ kubectl get configmaps
+ kubectl describe flinkdeployments
+ kubectl describe flinksessionjobs
kubectl describe all
+ kubectl describe configmaps
operator_pod_namespace=$(get_operator_pod_namespace)
operator_pod_names=$(get_operator_pod_name)
diff --git a/examples/advanced-ingress.yaml b/examples/advanced-ingress.yaml
index c67e2f366f..5b3eb13d84 100644
--- a/examples/advanced-ingress.yaml
+++ b/examples/advanced-ingress.yaml
@@ -21,8 +21,8 @@ kind: FlinkDeployment
metadata:
name: advanced-ingress
spec:
- image: flink:1.17
- flinkVersion: v1_17
+ image: flink:1.20
+ flinkVersion: v1_20
ingress:
template: "/{{namespace}}/{{name}}(/|$)(.*)"
className: "nginx"
diff --git a/examples/autoscaling/autoscaling-dynamic.yaml b/examples/autoscaling/autoscaling-dynamic.yaml
index 7aade64471..85f5eb0d9d 100644
--- a/examples/autoscaling/autoscaling-dynamic.yaml
+++ b/examples/autoscaling/autoscaling-dynamic.yaml
@@ -22,7 +22,7 @@ metadata:
name: autoscaling-dynamic
spec:
image: autoscaling-example
- flinkVersion: v1_18
+ flinkVersion: v1_20
flinkConfiguration:
job.autoscaler.enabled: "true"
job.autoscaler.stabilization.interval: "1m"
diff --git a/examples/autoscaling/autoscaling.yaml b/examples/autoscaling/autoscaling.yaml
index a973ccd38e..afa30daee3 100644
--- a/examples/autoscaling/autoscaling.yaml
+++ b/examples/autoscaling/autoscaling.yaml
@@ -22,7 +22,7 @@ metadata:
name: autoscaling-example
spec:
image: autoscaling-example
- flinkVersion: v1_18
+ flinkVersion: v1_20
flinkConfiguration:
job.autoscaler.enabled: "true"
job.autoscaler.stabilization.interval: "1m"
diff --git a/examples/basic-checkpoint-ha.yaml b/examples/basic-checkpoint-ha.yaml
index 48cd6c709e..4cdce8e5b2 100644
--- a/examples/basic-checkpoint-ha.yaml
+++ b/examples/basic-checkpoint-ha.yaml
@@ -21,8 +21,8 @@ kind: FlinkDeployment
metadata:
name: basic-checkpoint-ha-example
spec:
- image: flink:1.17
- flinkVersion: v1_17
+ image: flink:1.20
+ flinkVersion: v1_20
flinkConfiguration:
taskmanager.numberOfTaskSlots: "2"
state.savepoints.dir: file:///flink-data/savepoints
diff --git a/examples/basic-ingress.yaml b/examples/basic-ingress.yaml
index b81c4f8f6d..db88b3de28 100644
--- a/examples/basic-ingress.yaml
+++ b/examples/basic-ingress.yaml
@@ -21,8 +21,8 @@ kind: FlinkDeployment
metadata:
name: basic-ingress
spec:
- image: flink:1.17
- flinkVersion: v1_17
+ image: flink:1.20
+ flinkVersion: v1_20
ingress:
template: "{{name}}.{{namespace}}.flink.k8s.io"
flinkConfiguration:
diff --git a/examples/basic-reactive.yaml b/examples/basic-reactive.yaml
index 3a6ad24028..9fcd8d332c 100644
--- a/examples/basic-reactive.yaml
+++ b/examples/basic-reactive.yaml
@@ -21,8 +21,8 @@ kind: FlinkDeployment
metadata:
name: basic-reactive-example
spec:
- image: flink:1.17
- flinkVersion: v1_17
+ image: flink:1.20
+ flinkVersion: v1_20
flinkConfiguration:
scheduler-mode: REACTIVE
taskmanager.numberOfTaskSlots: "2"
diff --git a/examples/basic-session-deployment-and-job.yaml b/examples/basic-session-deployment-and-job.yaml
index ea1d992387..5c7ff95475 100644
--- a/examples/basic-session-deployment-and-job.yaml
+++ b/examples/basic-session-deployment-and-job.yaml
@@ -21,8 +21,8 @@ kind: FlinkDeployment
metadata:
name: basic-session-deployment-example
spec:
- image: flink:1.17
- flinkVersion: v1_17
+ image: flink:1.20
+ flinkVersion: v1_20
jobManager:
resource:
memory: "2048m"
diff --git a/examples/basic-session-deployment-only.yaml b/examples/basic-session-deployment-only.yaml
index 044554e019..786eb414b0 100644
--- a/examples/basic-session-deployment-only.yaml
+++ b/examples/basic-session-deployment-only.yaml
@@ -21,8 +21,8 @@ kind: FlinkDeployment
metadata:
name: basic-session-deployment-only-example
spec:
- image: flink:1.17
- flinkVersion: v1_17
+ image: flink:1.20
+ flinkVersion: v1_20
flinkConfiguration:
taskmanager.numberOfTaskSlots: "2"
serviceAccount: flink
diff --git a/examples/basic.yaml b/examples/basic.yaml
index 66153d155b..b0761936d8 100644
--- a/examples/basic.yaml
+++ b/examples/basic.yaml
@@ -21,8 +21,8 @@ kind: FlinkDeployment
metadata:
name: basic-example
spec:
- image: flink:1.17
- flinkVersion: v1_17
+ image: flink:1.20
+ flinkVersion: v1_20
flinkConfiguration:
taskmanager.numberOfTaskSlots: "2"
serviceAccount: flink
diff --git a/examples/custom-logging.yaml b/examples/custom-logging.yaml
index e3dc70c36b..76a2e691c4 100644
--- a/examples/custom-logging.yaml
+++ b/examples/custom-logging.yaml
@@ -21,8 +21,8 @@ kind: FlinkDeployment
metadata:
name: custom-logging-example
spec:
- image: flink:1.17
- flinkVersion: v1_17
+ image: flink:1.20
+ flinkVersion: v1_20
flinkConfiguration:
taskmanager.numberOfTaskSlots: "2"
serviceAccount: flink
diff --git a/examples/flink-beam-example/beam-example.yaml b/examples/flink-beam-example/beam-example.yaml
index 94d72e45a5..ca311b33c0 100644
--- a/examples/flink-beam-example/beam-example.yaml
+++ b/examples/flink-beam-example/beam-example.yaml
@@ -22,7 +22,7 @@ metadata:
name: beam-example
spec:
image: flink-beam-example:latest
- flinkVersion: v1_16
+ flinkVersion: v1_19
flinkConfiguration:
taskmanager.numberOfTaskSlots: "1"
serviceAccount: flink
diff --git a/examples/flink-beam-example/pom.xml b/examples/flink-beam-example/pom.xml
index 55473f23c5..e1379a2274 100644
--- a/examples/flink-beam-example/pom.xml
+++ b/examples/flink-beam-example/pom.xml
@@ -33,7 +33,7 @@ under the License.
true
- 2.47.0
+ 2.62.0
@@ -71,7 +71,7 @@ under the License.
org.apache.beam
- beam-runners-flink-1.16
+ beam-runners-flink-1.19
${beam.version}
diff --git a/examples/flink-python-example/python-example.yaml b/examples/flink-python-example/python-example.yaml
index 958f81b6c3..8898f26987 100644
--- a/examples/flink-python-example/python-example.yaml
+++ b/examples/flink-python-example/python-example.yaml
@@ -22,7 +22,7 @@ metadata:
name: python-example
spec:
image: flink-python-example:latest
- flinkVersion: v1_16
+ flinkVersion: v1_20
flinkConfiguration:
taskmanager.numberOfTaskSlots: "1"
serviceAccount: flink
diff --git a/examples/flink-sql-runner-example/sql-example.yaml b/examples/flink-sql-runner-example/sql-example.yaml
index 0e29bab131..98a4b236ca 100644
--- a/examples/flink-sql-runner-example/sql-example.yaml
+++ b/examples/flink-sql-runner-example/sql-example.yaml
@@ -22,7 +22,7 @@ metadata:
name: sql-example
spec:
image: flink-sql-runner-example:latest
- flinkVersion: v1_16
+ flinkVersion: v1_20
flinkConfiguration:
taskmanager.numberOfTaskSlots: "1"
serviceAccount: flink
diff --git a/examples/flink-tls-example/basic-secure-deployment-only.yaml b/examples/flink-tls-example/basic-secure-deployment-only.yaml
index 253dfe477d..dd7844d7b8 100644
--- a/examples/flink-tls-example/basic-secure-deployment-only.yaml
+++ b/examples/flink-tls-example/basic-secure-deployment-only.yaml
@@ -21,8 +21,8 @@ kind: FlinkDeployment
metadata:
name: basic-secure-deployment-only
spec:
- image: flink:1.17
- flinkVersion: v1_17
+ image: flink:1.20
+ flinkVersion: v1_20
flinkConfiguration:
taskmanager.numberOfTaskSlots: "2"
security.ssl.internal.enabled: 'true'
diff --git a/examples/flink-tls-example/basic-secure.yaml b/examples/flink-tls-example/basic-secure.yaml
index a5245ca326..5c425a5e01 100644
--- a/examples/flink-tls-example/basic-secure.yaml
+++ b/examples/flink-tls-example/basic-secure.yaml
@@ -21,8 +21,8 @@ kind: FlinkDeployment
metadata:
name: basic-secure
spec:
- image: flink:1.17
- flinkVersion: v1_17
+ image: flink:1.20
+ flinkVersion: v1_20
flinkConfiguration:
taskmanager.numberOfTaskSlots: "2"
security.ssl.enabled: 'true'
diff --git a/examples/pod-template.yaml b/examples/pod-template.yaml
index bab35d4a5c..f44f8a781e 100644
--- a/examples/pod-template.yaml
+++ b/examples/pod-template.yaml
@@ -21,8 +21,8 @@ kind: FlinkDeployment
metadata:
name: pod-template-example
spec:
- image: flink:1.17
- flinkVersion: v1_17
+ image: flink:1.20
+ flinkVersion: v1_20
flinkConfiguration:
taskmanager.numberOfTaskSlots: "2"
serviceAccount: flink
diff --git a/examples/snapshot/job-from-savepoint.yaml b/examples/snapshot/job-from-savepoint.yaml
index cc6627a0bc..8f4eaeafcb 100644
--- a/examples/snapshot/job-from-savepoint.yaml
+++ b/examples/snapshot/job-from-savepoint.yaml
@@ -34,8 +34,8 @@ kind: FlinkDeployment
metadata:
name: example-deployment
spec:
- image: flink:1.19
- flinkVersion: v1_19
+ image: flink:1.20
+ flinkVersion: v1_20
flinkConfiguration:
state.checkpoints.dir: file:///flink-data/checkpoints
state.savepoints.dir: file:///flink-data/savepoints
diff --git a/flink-autoscaler-standalone/src/main/resources/META-INF/NOTICE b/flink-autoscaler-standalone/src/main/resources/META-INF/NOTICE
index e4cb5bdb9d..a46452d489 100644
--- a/flink-autoscaler-standalone/src/main/resources/META-INF/NOTICE
+++ b/flink-autoscaler-standalone/src/main/resources/META-INF/NOTICE
@@ -6,21 +6,32 @@ The Apache Software Foundation (http://www.apache.org/).
This project bundles the following dependencies under the Apache Software License 2.0 (http://www.apache.org/licenses/LICENSE-2.0.txt)
-- org.objenesis:objenesis:2.1
-- commons-collections:commons-collections:3.2.2
-- org.apache.commons:commons-math3:3.6.1
-- com.twitter:chill-java:0.7.6
-- commons-io:commons-io:2.17.0
-- org.apache.commons:commons-lang3:3.12.0
-- commons-cli:commons-cli:1.5.0
-- org.javassist:javassist:3.24.0-GA
-- com.google.code.findbugs:jsr305:1.3.9
-- org.slf4j:slf4j-api:1.7.36
-- org.apache.logging.log4j:log4j-slf4j-impl:2.17.1
-- org.apache.logging.log4j:log4j-api:2.17.1
-- org.apache.logging.log4j:log4j-core:2.17.1
-- org.apache.logging.log4j:log4j-1.2-api:2.17.1
-- com.zaxxer:HikariCP:5.1.0
+- com.squareup.okhttp3:okhttp:jar:4.12.0
+- com.squareup.okio:okio-jvm:jar:3.6.0
+- com.squareup.okio:okio:jar:3.6.0
+- com.twitter:chill-java:jar:0.7.6
+- com.zaxxer:HikariCP:jar:5.1.0
+- commons-cli:commons-cli:jar:1.5.0
+- commons-codec:commons-codec:jar:1.17.0
+- commons-collections:commons-collections:jar:3.2.2
+- commons-io:commons-io:jar:2.15.1
+- org.apache.commons:commons-lang3:jar:3.12.0
+- org.apache.commons:commons-math3:jar:3.6.1
+- org.apache.logging.log4j:log4j-1.2-api:jar:2.23.1
+- org.apache.logging.log4j:log4j-api:jar:2.23.1
+- org.apache.logging.log4j:log4j-core:jar:2.23.1
+- org.apache.logging.log4j:log4j-slf4j-impl:jar:2.23.1
+- org.javassist:javassist:jar:3.24.0-GA
+- org.jetbrains.kotlin:kotlin-stdlib-common:jar:1.9.10
+- org.jetbrains.kotlin:kotlin-stdlib-jdk7:jar:1.8.21
+- org.jetbrains.kotlin:kotlin-stdlib-jdk8:jar:1.8.21
+- org.jetbrains.kotlin:kotlin-stdlib:jar:1.8.21
+- org.jetbrains:annotations:jar:13.0
+- org.objenesis:objenesis:jar:2.1
+- org.quartz-scheduler:quartz:jar:2.4.0
+- org.slf4j:slf4j-api:jar:1.7.36
+- org.snakeyaml:snakeyaml-engine:jar:2.6
+- tools.profiler:async-profiler:jar:2.9
This project bundles the following dependencies under the BSD License.
See bundled license files for details.
diff --git a/flink-kubernetes-operator-api/pom.xml b/flink-kubernetes-operator-api/pom.xml
index 5baa6ee910..3dc3a27d5e 100644
--- a/flink-kubernetes-operator-api/pom.xml
+++ b/flink-kubernetes-operator-api/pom.xml
@@ -55,6 +55,18 @@ under the License.
+
+ org.apache.flink
+ flink-core-api
+ ${flink.version}
+
+
+ *
+ *
+
+
+
+
io.fabric8
diff --git a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/FlinkVersion.java b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/FlinkVersion.java
index 1734e95b81..b636ae53ef 100644
--- a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/FlinkVersion.java
+++ b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/FlinkVersion.java
@@ -32,11 +32,14 @@ public enum FlinkVersion {
/** Deprecated since 1.10 operator release. */
@Deprecated
v1_15(1, 15),
+ /** Deprecated since 1.11 operator release. */
+ @Deprecated
v1_16(1, 16),
v1_17(1, 17),
v1_18(1, 18),
v1_19(1, 19),
- v1_20(1, 20);
+ v1_20(1, 20),
+ v2_0(2, 0);
/** The major integer from the Flink semver. For example for Flink 1.18.1 this would be 1. */
private final int majorVersion;
@@ -59,15 +62,6 @@ public boolean isEqualOrNewer(FlinkVersion otherVersion) {
return false;
}
- /**
- * Returns the current version.
- *
- * @return The current version.
- */
- public static FlinkVersion current() {
- return values()[values().length - 1];
- }
-
public static boolean isSupported(FlinkVersion version) {
return version != null && version.isEqualOrNewer(FlinkVersion.v1_15);
}
diff --git a/flink-kubernetes-operator-api/src/test/java/org/apache/flink/kubernetes/operator/api/spec/FlinkVersionTest.java b/flink-kubernetes-operator-api/src/test/java/org/apache/flink/kubernetes/operator/api/spec/FlinkVersionTest.java
index d830d7de7a..2cbe49b01f 100644
--- a/flink-kubernetes-operator-api/src/test/java/org/apache/flink/kubernetes/operator/api/spec/FlinkVersionTest.java
+++ b/flink-kubernetes-operator-api/src/test/java/org/apache/flink/kubernetes/operator/api/spec/FlinkVersionTest.java
@@ -33,11 +33,6 @@ void isEqualOrNewer() {
assertTrue(FlinkVersion.v1_18.isEqualOrNewer(FlinkVersion.v1_17));
}
- @Test
- void current() {
- assertEquals(FlinkVersion.v1_20, FlinkVersion.current());
- }
-
@Test
void isSupported() {
assertTrue(FlinkVersion.isSupported(FlinkVersion.v1_20));
diff --git a/flink-kubernetes-operator-api/src/test/resources/test-deployment-with-unknown-fields.yaml b/flink-kubernetes-operator-api/src/test/resources/test-deployment-with-unknown-fields.yaml
index d82ef37118..2e5f2212a5 100644
--- a/flink-kubernetes-operator-api/src/test/resources/test-deployment-with-unknown-fields.yaml
+++ b/flink-kubernetes-operator-api/src/test/resources/test-deployment-with-unknown-fields.yaml
@@ -21,8 +21,8 @@ kind: FlinkDeployment
metadata:
name: basic-example
spec:
- image: flink:1.17
- flinkVersion: v1_17
+ image: flink:1.20
+ flinkVersion: v1_20
flinkConfiguration:
taskmanager.numberOfTaskSlots: "2"
serviceAccount: flink
diff --git a/flink-kubernetes-operator-api/src/test/resources/test-deployment.yaml b/flink-kubernetes-operator-api/src/test/resources/test-deployment.yaml
index 66153d155b..b0761936d8 100644
--- a/flink-kubernetes-operator-api/src/test/resources/test-deployment.yaml
+++ b/flink-kubernetes-operator-api/src/test/resources/test-deployment.yaml
@@ -21,8 +21,8 @@ kind: FlinkDeployment
metadata:
name: basic-example
spec:
- image: flink:1.17
- flinkVersion: v1_17
+ image: flink:1.20
+ flinkVersion: v1_20
flinkConfiguration:
taskmanager.numberOfTaskSlots: "2"
serviceAccount: flink
diff --git a/flink-kubernetes-operator/pom.xml b/flink-kubernetes-operator/pom.xml
index b9ec48cded..07d6be88ae 100644
--- a/flink-kubernetes-operator/pom.xml
+++ b/flink-kubernetes-operator/pom.xml
@@ -102,6 +102,12 @@ under the License.
${flink.version}
+
+ org.apache.flink
+ flink-metrics-core
+ ${flink.version}
+
+
org.apache.flink
flink-kubernetes-standalone
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/FlinkConfMountDecorator.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/FlinkConfMountDecorator.java
new file mode 100644
index 0000000000..8683996793
--- /dev/null
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/FlinkConfMountDecorator.java
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.kubeclient.decorators;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ConfigurationUtils;
+import org.apache.flink.configuration.DeploymentOptionsInternal;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
+import org.apache.flink.kubernetes.kubeclient.FlinkPod;
+import org.apache.flink.kubernetes.kubeclient.parameters.AbstractKubernetesParameters;
+import org.apache.flink.kubernetes.operator.api.spec.FlinkVersion;
+import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.ConfigMap;
+import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.ConfigMapBuilder;
+import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.Container;
+import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.ContainerBuilder;
+import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.HasMetadata;
+import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.KeyToPath;
+import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.KeyToPathBuilder;
+import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.Pod;
+import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.PodBuilder;
+import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.Volume;
+import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.VolumeBuilder;
+import org.apache.flink.kubernetes.utils.Constants;
+
+import org.apache.flink.shaded.guava31.com.google.common.io.Files;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.kubernetes.operator.config.FlinkConfigBuilder.FLINK_VERSION;
+import static org.apache.flink.kubernetes.utils.Constants.CONFIG_FILE_LOG4J_NAME;
+import static org.apache.flink.kubernetes.utils.Constants.CONFIG_FILE_LOGBACK_NAME;
+import static org.apache.flink.kubernetes.utils.Constants.CONFIG_MAP_PREFIX;
+import static org.apache.flink.kubernetes.utils.Constants.FLINK_CONF_VOLUME;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Copied from Flink core and modified to handle all Flink versions. */
+public class FlinkConfMountDecorator extends AbstractKubernetesStepDecorator {
+
+ private final AbstractKubernetesParameters kubernetesComponentConf;
+
+ public FlinkConfMountDecorator(AbstractKubernetesParameters kubernetesComponentConf) {
+ this.kubernetesComponentConf = checkNotNull(kubernetesComponentConf);
+ }
+
+ @Override
+ public FlinkPod decorateFlinkPod(FlinkPod flinkPod) {
+ final Pod mountedPod = decoratePod(flinkPod.getPodWithoutMainContainer());
+
+ final Container mountedMainContainer =
+ new ContainerBuilder(flinkPod.getMainContainer())
+ .addNewVolumeMount()
+ .withName(FLINK_CONF_VOLUME)
+ .withMountPath(kubernetesComponentConf.getFlinkConfDirInPod())
+ .endVolumeMount()
+ .build();
+
+ return new FlinkPod.Builder(flinkPod)
+ .withPod(mountedPod)
+ .withMainContainer(mountedMainContainer)
+ .build();
+ }
+
+ private Pod decoratePod(Pod pod) {
+ final List keyToPaths =
+ getLocalLogConfFiles().stream()
+ .map(
+ file ->
+ new KeyToPathBuilder()
+ .withKey(file.getName())
+ .withPath(file.getName())
+ .build())
+ .collect(Collectors.toList());
+ keyToPaths.add(
+ new KeyToPathBuilder()
+ .withKey(getFlinkConfFilename())
+ .withPath(getFlinkConfFilename())
+ .build());
+
+ final Volume flinkConfVolume =
+ new VolumeBuilder()
+ .withName(FLINK_CONF_VOLUME)
+ .withNewConfigMap()
+ .withName(getFlinkConfConfigMapName(kubernetesComponentConf.getClusterId()))
+ .withItems(keyToPaths)
+ .endConfigMap()
+ .build();
+
+ return new PodBuilder(pod)
+ .editSpec()
+ .addNewVolumeLike(flinkConfVolume)
+ .endVolume()
+ .endSpec()
+ .build();
+ }
+
+ @Override
+ public List buildAccompanyingKubernetesResources() throws IOException {
+ final String clusterId = kubernetesComponentConf.getClusterId();
+
+ final Map data = new HashMap<>();
+
+ final List localLogFiles = getLocalLogConfFiles();
+ for (File file : localLogFiles) {
+ data.put(file.getName(), Files.toString(file, StandardCharsets.UTF_8));
+ }
+
+ final List confData =
+ getClusterSideConfData(kubernetesComponentConf.getFlinkConfiguration());
+ data.put(getFlinkConfFilename(), getFlinkConfData(confData));
+
+ final ConfigMap flinkConfConfigMap =
+ new ConfigMapBuilder()
+ .withApiVersion(Constants.API_VERSION)
+ .withNewMetadata()
+ .withName(getFlinkConfConfigMapName(clusterId))
+ .withLabels(kubernetesComponentConf.getCommonLabels())
+ .endMetadata()
+ .addToData(data)
+ .build();
+
+ return Collections.singletonList(flinkConfConfigMap);
+ }
+
+ /** Get properties map for the cluster-side after removal of some keys. */
+ private List getClusterSideConfData(Configuration flinkConfig) {
+ // For Flink versions that use the standard config we have to set the standardYaml flag in
+ // the Configuration object manually instead of simply cloning, otherwise it would simply
+ // inherit it from the base config (which would always be false currently).
+ Configuration clusterSideConfig = new Configuration(useStandardYamlConfig());
+ clusterSideConfig.addAll(flinkConfig);
+ // Remove some configuration options that should not be taken to cluster side.
+ clusterSideConfig.removeConfig(KubernetesConfigOptions.KUBE_CONFIG_FILE);
+ clusterSideConfig.removeConfig(DeploymentOptionsInternal.CONF_DIR);
+ clusterSideConfig.removeConfig(RestOptions.BIND_ADDRESS);
+ clusterSideConfig.removeConfig(JobManagerOptions.BIND_HOST);
+ clusterSideConfig.removeConfig(TaskManagerOptions.BIND_HOST);
+ clusterSideConfig.removeConfig(TaskManagerOptions.HOST);
+
+ return ConfigurationUtils.convertConfigToWritableLines(clusterSideConfig, false);
+ }
+
+ @VisibleForTesting
+ String getFlinkConfData(List confData) throws IOException {
+ try (StringWriter sw = new StringWriter();
+ PrintWriter out = new PrintWriter(sw)) {
+ confData.forEach(out::println);
+
+ return sw.toString();
+ }
+ }
+
+ private List getLocalLogConfFiles() {
+ final String confDir = kubernetesComponentConf.getConfigDirectory();
+ final File logbackFile = new File(confDir, CONFIG_FILE_LOGBACK_NAME);
+ final File log4jFile = new File(confDir, CONFIG_FILE_LOG4J_NAME);
+
+ List localLogConfFiles = new ArrayList<>();
+ if (logbackFile.exists()) {
+ localLogConfFiles.add(logbackFile);
+ }
+ if (log4jFile.exists()) {
+ localLogConfFiles.add(log4jFile);
+ }
+
+ return localLogConfFiles;
+ }
+
+ @VisibleForTesting
+ public static String getFlinkConfConfigMapName(String clusterId) {
+ return CONFIG_MAP_PREFIX + clusterId;
+ }
+
+ /**
+ * We have to override the GlobalConfiguration#getFlinkConfFilename() logic to make sure we can
+ * separate operator level (Global) and Flink deployment specific config handling.
+ *
+ * @return conf file name
+ */
+ public String getFlinkConfFilename() {
+ return useStandardYamlConfig() ? "config.yaml" : "flink-conf.yaml";
+ }
+
+ /**
+ * Determine based on the Flink Version if we should use the new standard config.yaml vs the old
+ * flink-conf.yaml. While technically 1.19+ could use this we don't want to change the behaviour
+ * for already released Flink versions, so only switch to new yaml from Flink 2.0 onwards.
+ *
+ * @return True for Flink version 2.0 and above
+ */
+ boolean useStandardYamlConfig() {
+ return kubernetesComponentConf
+ .getFlinkConfiguration()
+ .get(FLINK_VERSION)
+ .isEqualOrNewer(FlinkVersion.v2_0);
+ }
+}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/state/ConfigMapStore.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/state/ConfigMapStore.java
index b840a292e5..2abb14203e 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/state/ConfigMapStore.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/state/ConfigMapStore.java
@@ -39,7 +39,7 @@ public class ConfigMapStore {
private static final Logger LOG = LoggerFactory.getLogger(KubernetesAutoScalerStateStore.class);
- private static final String LABEL_COMPONENT_AUTOSCALER = "autoscaler";
+ public static final String LABEL_COMPONENT_AUTOSCALER = "autoscaler";
private final KubernetesClient kubernetesClient;
@@ -110,15 +110,19 @@ private ObjectMeta createCmObjectMeta(ResourceID uid) {
var objectMeta = new ObjectMeta();
objectMeta.setName("autoscaler-" + uid.getName());
uid.getNamespace().ifPresent(objectMeta::setNamespace);
- objectMeta.setLabels(
- Map.of(
- Constants.LABEL_COMPONENT_KEY,
- LABEL_COMPONENT_AUTOSCALER,
- Constants.LABEL_APP_KEY,
- uid.getName()));
+ objectMeta.setLabels(getAutoscalerCmLabels(uid));
return objectMeta;
}
+ @VisibleForTesting
+ public static Map getAutoscalerCmLabels(ResourceID uid) {
+ return Map.of(
+ Constants.LABEL_COMPONENT_KEY,
+ LABEL_COMPONENT_AUTOSCALER,
+ Constants.LABEL_APP_KEY,
+ uid.getName());
+ }
+
private ConfigMap buildConfigMap(HasMetadata cr, ObjectMeta meta) {
var cm = new ConfigMap();
cm.setMetadata(meta);
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java
index 3c3ad889c5..ba16068e74 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java
@@ -301,7 +301,7 @@ protected FlinkConfigBuilder applyJobOrSessionSpec() throws URISyntaxException {
effectiveConfig.set(
DeploymentOptions.TARGET, KubernetesDeploymentTarget.APPLICATION.getName());
- if (jobSpec.getJarURI() != null) {
+ if (deploymentMode == KubernetesDeploymentMode.NATIVE && jobSpec.getJarURI() != null) {
effectiveConfig.set(
PipelineOptions.JARS,
Collections.singletonList(new URI(jobSpec.getJarURI()).toString()));
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java
index a6db17b989..71ec3dee5e 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java
@@ -285,11 +285,13 @@ protected JobUpgrade getJobUpgrade(FlinkResourceContext ctx, Configuration d
boolean cancellable = allowLastStateCancel(ctx);
if (running) {
- return getUpgradeModeBasedOnStateAge(ctx, deployConfig, cancellable);
+ var mode = getUpgradeModeBasedOnStateAge(ctx, deployConfig, cancellable);
+ LOG.info("Job is running, using {} for last-state upgrade", mode);
+ return mode;
}
if (cancellable) {
- LOG.info("Using cancel to perform last-state upgrade");
+ LOG.info("Job is not running, using cancel to perform last-state upgrade");
return JobUpgrade.lastStateUsingCancel();
}
}
@@ -356,8 +358,11 @@ private boolean allowLastStateCancel(FlinkResourceContext ctx) {
}
var conf = ctx.getObserveConfig();
- return conf.get(KubernetesOperatorConfigOptions.OPERATOR_JOB_UPGRADE_LAST_STATE_CANCEL_JOB)
- || !ctx.getFlinkService().isHaMetadataAvailable(conf);
+ if (!ctx.getFlinkService().isHaMetadataAvailable(conf)) {
+ LOG.info("HA metadata not available, cancel will be used instead of last-state");
+ return true;
+ }
+ return conf.get(KubernetesOperatorConfigOptions.OPERATOR_JOB_UPGRADE_LAST_STATE_CANCEL_JOB);
}
protected void restoreJob(
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
index 68d7fe897f..7011152954 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
@@ -27,6 +27,7 @@
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.configuration.SecurityOptions;
+import org.apache.flink.core.execution.RestoreMode;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.kubernetes.kubeclient.decorators.ExternalServiceDecorator;
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
@@ -53,7 +54,6 @@
import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneClientHAServices;
-import org.apache.flink.runtime.jobgraph.RestoreMode;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
import org.apache.flink.runtime.messages.FlinkJobTerminatedWithoutCancellationException;
@@ -810,6 +810,7 @@ protected void runJar(
JarRunHeaders headers = JarRunHeaders.getInstance();
JarRunMessageParameters parameters = headers.getUnresolvedMessageParameters();
parameters.jarIdPathParameter.resolve(jarId);
+ var flinkVersion = conf.get(FLINK_VERSION);
JarRunRequestBody runRequestBody =
new JarRunRequestBody(
job.getEntryClass(),
@@ -819,7 +820,12 @@ protected void runJar(
jobID,
job.getAllowNonRestoredState(),
savepoint,
- RestoreMode.DEFAULT,
+ flinkVersion.isEqualOrNewer(FlinkVersion.v1_20)
+ ? null
+ : RestoreMode.DEFAULT,
+ flinkVersion.isEqualOrNewer(FlinkVersion.v1_20)
+ ? RestoreMode.DEFAULT
+ : null,
conf.get(FLINK_VERSION).isEqualOrNewer(FlinkVersion.v1_17)
? conf.toMap()
: null);
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkService.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkService.java
index c91341f213..7c483f7f42 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkService.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkService.java
@@ -99,8 +99,8 @@ protected FlinkStandaloneKubeClient createNamespacedKubeClient(Configuration con
final int poolSize =
configuration.get(KubernetesConfigOptions.KUBERNETES_CLIENT_IO_EXECUTOR_POOL_SIZE);
- ExecutorService executorService =
- Executors.newFixedThreadPool(
+ var executorService =
+ Executors.newScheduledThreadPool(
poolSize,
new ExecutorThreadFactory("flink-kubeclient-io-for-standalone-service"));
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
index 3d81e50952..4c41aedf8f 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
@@ -25,6 +25,7 @@
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory;
import org.apache.flink.kubernetes.kubeclient.parameters.KubernetesJobManagerParameters;
+import org.apache.flink.kubernetes.operator.autoscaler.state.ConfigMapStore;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import org.apache.flink.kubernetes.utils.Constants;
import org.apache.flink.kubernetes.utils.KubernetesUtils;
@@ -47,6 +48,8 @@
import io.fabric8.kubernetes.api.model.apps.Deployment;
import io.fabric8.kubernetes.api.model.apps.DeploymentCondition;
import io.fabric8.kubernetes.client.KubernetesClient;
+import io.fabric8.kubernetes.client.dsl.FilterWatchListDeletable;
+import io.fabric8.kubernetes.client.dsl.Resource;
import io.javaoperatorsdk.operator.api.reconciler.Context;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -60,8 +63,6 @@
import java.util.Optional;
import java.util.function.Predicate;
-import static org.apache.flink.kubernetes.utils.Constants.LABEL_CONFIGMAP_TYPE_HIGH_AVAILABILITY;
-
/** Flink Utility methods used by the operator. */
public class FlinkUtils {
@@ -214,13 +215,20 @@ public static void deleteZookeeperHAMetadata(Configuration conf) {
public static void deleteKubernetesHAMetadata(
String clusterId, String namespace, KubernetesClient kubernetesClient) {
- kubernetesClient
+ getFlinkKubernetesHaConfigmaps(clusterId, namespace, kubernetesClient).delete();
+ }
+
+ private static FilterWatchListDeletable>
+ getFlinkKubernetesHaConfigmaps(
+ String clusterId, String namespace, KubernetesClient kubernetesClient) {
+ return kubernetesClient
.configMaps()
.inNamespace(namespace)
- .withLabels(
- KubernetesUtils.getConfigMapLabels(
- clusterId, LABEL_CONFIGMAP_TYPE_HIGH_AVAILABILITY))
- .delete();
+ .withNewFilter()
+ .withLabels(KubernetesUtils.getCommonLabels(clusterId))
+ .withoutLabel(
+ Constants.LABEL_COMPONENT_KEY, ConfigMapStore.LABEL_COMPONENT_AUTOSCALER)
+ .endFilter();
}
public static void deleteJobGraphInZookeeperHA(Configuration conf) throws Exception {
@@ -233,16 +241,8 @@ public static void deleteJobGraphInZookeeperHA(Configuration conf) throws Except
public static void deleteJobGraphInKubernetesHA(
String clusterId, String namespace, KubernetesClient kubernetesClient) {
- // The HA ConfigMap names have been changed from 1.15, so we use the labels to filter out
- // them and delete job graph key
- final Map haConfigMapLabels =
- KubernetesUtils.getConfigMapLabels(
- clusterId, Constants.LABEL_CONFIGMAP_TYPE_HIGH_AVAILABILITY);
- final ConfigMapList configMaps =
- kubernetesClient
- .configMaps()
- .inNamespace(namespace)
- .withLabels(haConfigMapLabels)
+ var configMaps =
+ FlinkUtils.getFlinkKubernetesHaConfigmaps(clusterId, namespace, kubernetesClient)
.list();
boolean shouldUpdate = false;
@@ -303,18 +303,11 @@ private static boolean isKubernetesHaMetadataAvailable(
KubernetesClient kubernetesClient,
Predicate cmPredicate) {
- String clusterId = conf.get(KubernetesConfigOptions.CLUSTER_ID);
- String namespace = conf.get(KubernetesConfigOptions.NAMESPACE);
-
- var haConfigMapLabels =
- KubernetesUtils.getConfigMapLabels(
- clusterId, Constants.LABEL_CONFIGMAP_TYPE_HIGH_AVAILABILITY);
-
var configMaps =
- kubernetesClient
- .configMaps()
- .inNamespace(namespace)
- .withLabels(haConfigMapLabels)
+ FlinkUtils.getFlinkKubernetesHaConfigmaps(
+ conf.get(KubernetesConfigOptions.CLUSTER_ID),
+ conf.get(KubernetesConfigOptions.NAMESPACE),
+ kubernetesClient)
.list()
.getItems();
diff --git a/flink-kubernetes-operator/src/main/resources/META-INF/NOTICE b/flink-kubernetes-operator/src/main/resources/META-INF/NOTICE
index 5fea17da57..fd1f7a07e2 100644
--- a/flink-kubernetes-operator/src/main/resources/META-INF/NOTICE
+++ b/flink-kubernetes-operator/src/main/resources/META-INF/NOTICE
@@ -6,21 +6,24 @@ The Apache Software Foundation (http://www.apache.org/).
This project bundles the following dependencies under the Apache Software License 2.0 (http://www.apache.org/licenses/LICENSE-2.0.txt)
-- com.fasterxml.jackson.core:jackson-annotations:2.15.0
-- com.fasterxml.jackson.core:jackson-core:2.15.0
-- com.fasterxml.jackson.core:jackson-databind:2.15.0
-- com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:2.15.0
+- com.esotericsoftware.kryo:kryo:jar:2.24.0
+- com.esotericsoftware.minlog:minlog:jar:1.2
+- com.fasterxml.jackson.core:jackson-annotations:jar:2.15.0
+- com.fasterxml.jackson.core:jackson-core:jar:2.15.0
+- com.fasterxml.jackson.core:jackson-databind:jar:2.15.0
+- com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:jar:2.15.0
- com.fasterxml.jackson.datatype:jackson-datatype-jsr310:jar:2.15.0
-- com.google.code.findbugs:jsr305:1.3.9
-- com.squareup.okhttp3:logging-interceptor:4.12.0
-- com.squareup.okhttp3:okhttp:4.12.0
-- com.squareup.okio:okio-jvm:3.6.0
+- com.google.code.findbugs:jsr305:jar:1.3.9
+- com.squareup.okhttp3:logging-interceptor:jar:3.12.12
+- com.squareup.okhttp3:okhttp:jar:4.12.0
+- com.squareup.okio:okio-jvm:jar:3.6.0
- com.squareup.okio:okio:jar:3.6.0
-- com.squareup:javapoet:1.13.0
-- com.twitter:chill-java:0.7.6
-- commons-cli:commons-cli:1.5.0
-- commons-collections:commons-collections:3.2.2
-- commons-io:commons-io:2.17.0
+- com.squareup:javapoet:jar:1.13.0
+- com.twitter:chill-java:jar:0.7.6
+- com.zaxxer:HikariCP:jar:5.1.0
+- commons-cli:commons-cli:jar:1.5.0
+- commons-collections:commons-collections:jar:3.2.2
+- commons-io:commons-io:jar:2.17.0
- io.fabric8:kubernetes-client-api:jar:6.13.2
- io.fabric8:kubernetes-client:jar:6.13.2
- io.fabric8:kubernetes-httpclient-okhttp:jar:6.13.2
@@ -49,26 +52,28 @@ This project bundles the following dependencies under the Apache Software Licens
- io.fabric8:zjsonpatch:jar:0.3.0
- io.javaoperatorsdk:operator-framework-core:jar:4.9.4
- io.javaoperatorsdk:operator-framework:jar:4.9.4
-- org.apache.commons:commons-compress:1.21
-- org.apache.commons:commons-lang3:3.14.0
-- org.apache.commons:commons-math3:3.6.1
+- org.apache.commons:commons-compress:jar:1.26.0
+- org.apache.commons:commons-lang3:jar:3.16.0
+- org.apache.commons:commons-math3:jar:3.6.1
- org.apache.commons:commons-text:jar:1.10.0
-- org.apache.logging.log4j:log4j-1.2-api:2.23.1
-- org.apache.logging.log4j:log4j-api:2.23.1
-- org.apache.logging.log4j:log4j-core:2.23.1
-- org.apache.logging.log4j:log4j-slf4j-impl:2.23.1
-- org.javassist:javassist:3.24.0-GA
-- org.jetbrains.kotlin:kotlin-stdlib-common:1.9.10
-- org.jetbrains.kotlin:kotlin-stdlib-jdk7:1.8.21
-- org.jetbrains.kotlin:kotlin-stdlib-jdk8:1.8.21
-- org.jetbrains.kotlin:kotlin-stdlib:1.8.21
-- org.jetbrains:annotations:13.0
-- org.lz4:lz4-java:1.8.0
-- org.objenesis:objenesis:2.1
-- org.slf4j:slf4j-api:1.7.36
+- org.apache.logging.log4j:log4j-1.2-api:jar:2.23.1
+- org.apache.logging.log4j:log4j-api:jar:2.23.1
+- org.apache.logging.log4j:log4j-core:jar:2.23.1
+- org.apache.logging.log4j:log4j-slf4j-impl:jar:2.23.1
+- org.javassist:javassist:jar:3.24.0-GA
+- org.jetbrains.kotlin:kotlin-stdlib-common:jar:1.8.21
+- org.jetbrains.kotlin:kotlin-stdlib-jdk7:jar:1.8.21
+- org.jetbrains.kotlin:kotlin-stdlib-jdk8:jar:1.8.21
+- org.jetbrains.kotlin:kotlin-stdlib:jar:1.8.21
+- org.jetbrains:annotations:jar:13.0
+- org.lz4:lz4-java:jar:1.8.0
+- org.objenesis:objenesis:jar:2.1
+- org.quartz-scheduler:quartz:jar:2.4.0
+- org.slf4j:slf4j-api:jar:1.7.36
- org.snakeyaml:snakeyaml-engine:jar:2.6
-- org.xerial.snappy:snappy-java:1.1.7
-- org.yaml:snakeyaml:2.0
+- org.xerial.snappy:snappy-java:jar:1.1.10.4
+- org.yaml:snakeyaml:jar:2.0
+- tools.profiler:async-profiler:jar:2.9
This project bundles the following dependencies under the BSD License.
See bundled license files for details.
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/FlinkConfMountDecoratorTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/FlinkConfMountDecoratorTest.java
new file mode 100644
index 0000000000..3669176f3c
--- /dev/null
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/FlinkConfMountDecoratorTest.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.kubeclient.decorators;
+
+import org.apache.flink.client.deployment.ClusterSpecification;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.YamlParserUtils;
+import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
+import org.apache.flink.kubernetes.kubeclient.parameters.KubernetesJobManagerParameters;
+import org.apache.flink.kubernetes.operator.api.spec.FlinkVersion;
+import org.apache.flink.kubernetes.operator.config.FlinkConfigBuilder;
+import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.ConfigMap;
+import org.apache.flink.kubernetes.utils.Constants;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Stream;
+
+import static org.apache.flink.kubernetes.kubeclient.decorators.FlinkConfMountDecorator.getFlinkConfConfigMapName;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/** General tests for the {@link FlinkConfMountDecorator}. */
+class FlinkConfMountDecoratorTest {
+
+ protected static final String CLUSTER_ID = "my-flink-cluster1";
+ private static final String FLINK_CONF_DIR_IN_POD = "/opt/flink/flink-conf-";
+
+ private FlinkConfMountDecorator flinkConfMountDecorator;
+ private Configuration flinkConfig = new Configuration();
+
+ @BeforeEach
+ protected void onSetup() throws Exception {
+ this.flinkConfig.set(KubernetesConfigOptions.FLINK_CONF_DIR, FLINK_CONF_DIR_IN_POD);
+ this.flinkConfig.set(KubernetesConfigOptions.CLUSTER_ID, CLUSTER_ID);
+
+ var clusterSpecification =
+ new ClusterSpecification.ClusterSpecificationBuilder()
+ .setMasterMemoryMB(100)
+ .setTaskManagerMemoryMB(1024)
+ .setSlotsPerTaskManager(3)
+ .createClusterSpecification();
+
+ var kubernetesJobManagerParameters =
+ new KubernetesJobManagerParameters(flinkConfig, clusterSpecification);
+ this.flinkConfMountDecorator = new FlinkConfMountDecorator(kubernetesJobManagerParameters);
+ }
+
+ @ParameterizedTest
+ @MethodSource("testArgs")
+ void testConfigMap(FlinkVersion version, String expectedConfName, boolean standardYaml)
+ throws IOException {
+ flinkConfig.set(FlinkConfigBuilder.FLINK_VERSION, version);
+ var additionalResources = flinkConfMountDecorator.buildAccompanyingKubernetesResources();
+ assertThat(additionalResources).hasSize(1);
+
+ var resultConfigMap = (ConfigMap) additionalResources.get(0);
+
+ assertThat(resultConfigMap.getApiVersion()).isEqualTo(Constants.API_VERSION);
+
+ assertThat(resultConfigMap.getMetadata().getName())
+ .isEqualTo(getFlinkConfConfigMapName(CLUSTER_ID));
+
+ Map resultDatas = resultConfigMap.getData();
+ var conf = YamlParserUtils.convertToObject(resultDatas.get(expectedConfName), Map.class);
+ if (standardYaml) {
+ assertTrue(conf.containsKey("kubernetes"));
+ assertFalse(conf.containsKey("kubernetes.cluster-id"));
+ } else {
+ assertFalse(conf.containsKey("kubernetes"));
+ assertTrue(conf.containsKey("kubernetes.cluster-id"));
+ }
+ }
+
+ private static Stream testArgs() {
+ return Stream.of(
+ Arguments.arguments(FlinkVersion.v1_19, "flink-conf.yaml", false),
+ Arguments.arguments(FlinkVersion.v1_20, "flink-conf.yaml", false),
+ Arguments.arguments(FlinkVersion.v2_0, "config.yaml", true));
+ }
+
+ private Map getCommonLabels() {
+ Map labels = new HashMap<>();
+ labels.put(Constants.LABEL_TYPE_KEY, Constants.LABEL_TYPE_NATIVE_TYPE);
+ labels.put(Constants.LABEL_APP_KEY, CLUSTER_ID);
+ return labels;
+ }
+}
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkUtilsTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkUtilsTest.java
index 34fdc11bbe..68ee3e3d95 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkUtilsTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkUtilsTest.java
@@ -27,12 +27,12 @@
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.kubernetes.operator.TestUtils;
+import org.apache.flink.kubernetes.operator.autoscaler.state.ConfigMapStore;
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
import org.apache.flink.kubernetes.utils.Constants;
import org.apache.flink.kubernetes.utils.KubernetesUtils;
import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
-import io.fabric8.kubernetes.api.model.ConfigMap;
import io.fabric8.kubernetes.api.model.ConfigMapBuilder;
import io.fabric8.kubernetes.api.model.Container;
import io.fabric8.kubernetes.api.model.EmptyDirVolumeSource;
@@ -49,6 +49,7 @@
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
import io.fabric8.kubernetes.client.server.mock.KubernetesMockServer;
+import io.javaoperatorsdk.operator.processing.event.ResourceID;
import org.junit.jupiter.api.Test;
import java.net.HttpURLConnection;
@@ -403,21 +404,39 @@ public void testMergePodUsingArrayName() {
assertEquals(List.of(v1merged, volume2, volume3), mergedPod.getSpec().getVolumes());
}
+ @Test
+ public void testKubernetesHaMetaDeletion() {
+ var clusterId = "cluster-id";
+ var ns = kubernetesClient.getNamespace();
+ createHAConfigMapWithData("test", ns, clusterId, Map.of("k", "v"));
+
+ // Create autoscaler configmap to test that it's not deleted
+ var autoscalerData =
+ new ConfigMapBuilder()
+ .withNewMetadata()
+ .withName("autoscaler-cm")
+ .withNamespace(ns)
+ .withLabels(ConfigMapStore.getAutoscalerCmLabels(new ResourceID(clusterId)))
+ .endMetadata()
+ .withData(Map.of("a", "s"))
+ .build();
+ kubernetesClient.resource(autoscalerData).create();
+ FlinkUtils.deleteKubernetesHAMetadata(clusterId, ns, kubernetesClient);
+ assertNotNull(kubernetesClient.resource(autoscalerData).get());
+ }
+
private void createHAConfigMapWithData(
String configMapName, String namespace, String clusterId, Map data) {
- final ConfigMap kubernetesConfigMap =
+ var cm =
new ConfigMapBuilder()
.withNewMetadata()
.withName(configMapName)
.withNamespace(namespace)
- .withLabels(
- KubernetesUtils.getConfigMapLabels(
- clusterId,
- Constants.LABEL_CONFIGMAP_TYPE_HIGH_AVAILABILITY))
+ .withLabels(KubernetesUtils.getCommonLabels(clusterId))
.endMetadata()
.withData(data)
.build();
- kubernetesClient.configMaps().resource(kubernetesConfigMap).createOrReplace();
+ kubernetesClient.configMaps().resource(cm).createOrReplace();
}
}
diff --git a/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/Fabric8FlinkStandaloneKubeClient.java b/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/Fabric8FlinkStandaloneKubeClient.java
index 677f4c35c2..3976d4f562 100644
--- a/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/Fabric8FlinkStandaloneKubeClient.java
+++ b/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/Fabric8FlinkStandaloneKubeClient.java
@@ -27,7 +27,7 @@
import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.client.DefaultKubernetesClient;
import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.client.NamespacedKubernetesClient;
-import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -41,7 +41,7 @@ public class Fabric8FlinkStandaloneKubeClient extends Fabric8FlinkKubeClient
public Fabric8FlinkStandaloneKubeClient(
Configuration flinkConfig,
NamespacedKubernetesClient client,
- ExecutorService executorService) {
+ ScheduledExecutorService executorService) {
super(flinkConfig, client, executorService);
internalClient = checkNotNull(client);
}
@@ -69,7 +69,7 @@ public void stopAndCleanupCluster(String clusterId) {
}
public static Fabric8FlinkStandaloneKubeClient create(
- Configuration conf, ExecutorService executorService) {
+ Configuration conf, ScheduledExecutorService executorService) {
var client =
new DefaultKubernetesClient()
.inNamespace(conf.get(KubernetesConfigOptions.NAMESPACE));
diff --git a/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/standalone/KubernetesStandaloneClusterDescriptor.java b/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/standalone/KubernetesStandaloneClusterDescriptor.java
index 31a5f4cf4f..28a8adb9e9 100644
--- a/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/standalone/KubernetesStandaloneClusterDescriptor.java
+++ b/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/standalone/KubernetesStandaloneClusterDescriptor.java
@@ -32,6 +32,7 @@
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.kubernetes.KubernetesClusterDescriptor;
+import org.apache.flink.kubernetes.artifact.DefaultKubernetesArtifactUploader;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.kubernetes.kubeclient.Endpoint;
import org.apache.flink.kubernetes.kubeclient.FlinkKubeClientFactory;
@@ -74,7 +75,10 @@ public class KubernetesStandaloneClusterDescriptor extends KubernetesClusterDesc
public KubernetesStandaloneClusterDescriptor(
Configuration flinkConfig, FlinkStandaloneKubeClient client) {
- super(flinkConfig, FlinkKubeClientFactory.getInstance());
+ super(
+ flinkConfig,
+ FlinkKubeClientFactory.getInstance(),
+ new DefaultKubernetesArtifactUploader());
this.flinkConfig = checkNotNull(flinkConfig);
this.standaloneKubeClient = checkNotNull(client);
this.clusterId =
diff --git a/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/kubeclient/Fabric8FlinkStandaloneKubeClientTest.java b/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/kubeclient/Fabric8FlinkStandaloneKubeClientTest.java
index 2ac8eb3719..fed398ab0a 100644
--- a/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/kubeclient/Fabric8FlinkStandaloneKubeClientTest.java
+++ b/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/kubeclient/Fabric8FlinkStandaloneKubeClientTest.java
@@ -31,7 +31,6 @@
import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.client.ConfigBuilder;
import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.client.DefaultKubernetesClient;
import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.client.NamespacedKubernetesClient;
-import org.apache.flink.util.concurrent.Executors;
import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
import io.fabric8.kubernetes.client.server.mock.KubernetesMockServer;
@@ -39,6 +38,7 @@
import org.junit.jupiter.api.Test;
import java.util.List;
+import java.util.concurrent.Executors;
import static org.apache.flink.kubernetes.operator.kubeclient.utils.TestUtils.TEST_NAMESPACE;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -61,7 +61,7 @@ public final void setup() {
flinkKubeClient =
new Fabric8FlinkStandaloneKubeClient(
- flinkConfig, getClient(), Executors.newDirectExecutorService());
+ flinkConfig, getClient(), Executors.newSingleThreadScheduledExecutor());
clusterSpecification = TestUtils.createClusterSpecification();
taskManagerParameters =
diff --git a/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/standalone/KubernetesStandaloneClusterDescriptorTest.java b/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/standalone/KubernetesStandaloneClusterDescriptorTest.java
index b53c51a8b2..5936d301e4 100644
--- a/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/standalone/KubernetesStandaloneClusterDescriptorTest.java
+++ b/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/standalone/KubernetesStandaloneClusterDescriptorTest.java
@@ -35,7 +35,6 @@
import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.client.DefaultKubernetesClient;
import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.client.NamespacedKubernetesClient;
import org.apache.flink.kubernetes.utils.Constants;
-import org.apache.flink.util.concurrent.Executors;
import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
import io.fabric8.kubernetes.client.server.mock.KubernetesMockServer;
@@ -43,6 +42,7 @@
import org.junit.jupiter.api.Test;
import java.util.List;
+import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import static org.apache.flink.kubernetes.operator.kubeclient.utils.TestUtils.JM_ENV_VALUE;
@@ -69,7 +69,7 @@ public void setup() {
flinkConfig = TestUtils.createTestFlinkConfig();
flinkKubeClient =
new Fabric8FlinkStandaloneKubeClient(
- flinkConfig, getClient(), Executors.newDirectExecutorService());
+ flinkConfig, getClient(), Executors.newSingleThreadScheduledExecutor());
clusterDescriptor = new KubernetesStandaloneClusterDescriptor(flinkConfig, flinkKubeClient);
}
diff --git a/helm/flink-kubernetes-operator/conf/log4j-operator.properties b/helm/flink-kubernetes-operator/conf/log4j-operator.properties
index e6d0318f07..7c4285fbff 100644
--- a/helm/flink-kubernetes-operator/conf/log4j-operator.properties
+++ b/helm/flink-kubernetes-operator/conf/log4j-operator.properties
@@ -27,11 +27,11 @@ appender.console.layout.pattern = %style{%d}{yellow} %style{%-30c{1.}}{cyan} %hi
# Do not log config loading
logger.conf.name = org.apache.flink.configuration.GlobalConfiguration
-logger.conf.level = WARN
+logger.conf.level = ERROR
# Avoid logging fallback key INFO messages
logger.conf.name = org.apache.flink.configuration.Configuration
-logger.conf.level = WARN
+logger.conf.level = ERROR
# The monitor interval in seconds to enable log4j automatic reconfiguration
# monitorInterval = 30
\ No newline at end of file
diff --git a/helm/flink-kubernetes-operator/crds/flinkdeployments.flink.apache.org-v1.yml b/helm/flink-kubernetes-operator/crds/flinkdeployments.flink.apache.org-v1.yml
index 2f851e5ce2..f498ae2431 100644
--- a/helm/flink-kubernetes-operator/crds/flinkdeployments.flink.apache.org-v1.yml
+++ b/helm/flink-kubernetes-operator/crds/flinkdeployments.flink.apache.org-v1.yml
@@ -45,6 +45,7 @@ spec:
- v1_18
- v1_19
- v1_20
+ - v2_0
type: string
image:
type: string
diff --git a/pom.xml b/pom.xml
index 53dcc069b5..0ac8393a85 100644
--- a/pom.xml
+++ b/pom.xml
@@ -83,7 +83,7 @@ under the License.
1.18.30
3.12.0
2.17.0
- 1.19.1
+ 1.20.0
1.7.36
2.23.1