Skip to content

Commit 3dfa820

Browse files
committed
[FLINK-37236] Update e2e jar handling logic and improve debugging
1 parent 721e06c commit 3dfa820

File tree

7 files changed

+34
-14
lines changed

7 files changed

+34
-14
lines changed

.github/workflows/e2e.yaml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,10 +84,19 @@ jobs:
8484
if [[ "${{ inputs.append-java-version }}" == "true" ]]; then
8585
FLINK_IMAGE=${FLINK_IMAGE}-java${{ inputs.java-version }}
8686
fi
87+
88+
EXAMPLES_JAR="https://repo1.maven.org/maven2/org/apache/flink/flink-examples-streaming_2.12/1.14.4/flink-examples-streaming_2.12-1.14.4.jar"
89+
if [[ ${{ inputs.flink-version }} == v2* ]]; then
90+
EXAMPLES_JAR="https://repo1.maven.org/maven2/org/apache/flink/flink-examples-streaming/2.0-preview1/flink-examples-streaming-2.0-preview1.jar"
91+
fi
92+
ESCAPED_EXAMPLES_JAR=$(printf '%s\n' "$EXAMPLES_JAR" | sed -e 's/[\/&]/\\&/g')
93+
8794
echo FLINK_IMAGE=${FLINK_IMAGE}
95+
echo EXAMPLES_JAR=${EXAMPLES_JAR}
8896
sed -i "s/image: flink:.*/image: ${FLINK_IMAGE}/" e2e-tests/data/*.yaml
8997
sed -i "s/flinkVersion: .*/flinkVersion: ${{ inputs.flink-version }}/" e2e-tests/data/*.yaml
9098
sed -i "s/mode: .*/mode: ${{ inputs.mode }}/" e2e-tests/data/*.yaml
99+
sed -i "s/STREAMING_EXAMPLES_JAR_URL/${ESCAPED_EXAMPLES_JAR}/" e2e-tests/data/*.yaml
91100
git diff HEAD
92101
echo "Running e2e-tests/$test"
93102
bash e2e-tests/${{ inputs.test }} || exit 1

e2e-tests/data/flinkdep-cr.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ spec:
3131
nginx.ingress.kubernetes.io/rewrite-target: "/$2"
3232
flinkConfiguration:
3333
taskmanager.numberOfTaskSlots: "2"
34-
high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
34+
high-availability.type: kubernetes
3535
high-availability.storageDir: file:///opt/flink/volume/flink-ha
3636
state.checkpoints.dir: file:///opt/flink/volume/flink-cp
3737
state.savepoints.dir: file:///opt/flink/volume/flink-sp
@@ -44,7 +44,7 @@ spec:
4444
image: busybox:1.35.0
4545
imagePullPolicy: IfNotPresent
4646
# Use wget or other tools to get user jars from remote storage
47-
command: [ 'wget', 'https://repo1.maven.org/maven2/org/apache/flink/flink-examples-streaming_2.12/1.14.4/flink-examples-streaming_2.12-1.14.4.jar', '-O', '/flink-artifact/myjob.jar' ]
47+
command: [ 'wget', 'STREAMING_EXAMPLES_JAR_URL', '-O', '/flink-artifact/myjob.jar' ]
4848
volumeMounts:
4949
- mountPath: /flink-artifact
5050
name: flink-artifact

e2e-tests/data/multi-sessionjob.yaml

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ spec:
3131
nginx.ingress.kubernetes.io/rewrite-target: "/$2"
3232
flinkConfiguration:
3333
taskmanager.numberOfTaskSlots: "2"
34-
high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
34+
high-availability.type: kubernetes
3535
high-availability.storageDir: file:///opt/flink/volume/flink-ha
3636
state.checkpoints.dir: file:///opt/flink/volume/flink-cp
3737
state.savepoints.dir: file:///opt/flink/volume/flink-sp
@@ -79,7 +79,7 @@ spec:
7979
nginx.ingress.kubernetes.io/rewrite-target: "/$2"
8080
flinkConfiguration:
8181
taskmanager.numberOfTaskSlots: "2"
82-
high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
82+
high-availability.type: kubernetes
8383
high-availability.storageDir: file:///opt/flink/volume/flink-ha
8484
state.checkpoints.dir: file:///opt/flink/volume/flink-cp
8585
state.savepoints.dir: file:///opt/flink/volume/flink-sp
@@ -120,7 +120,7 @@ metadata:
120120
spec:
121121
deploymentName: session-cluster-1
122122
job:
123-
jarURI: https://repo1.maven.org/maven2/org/apache/flink/flink-examples-streaming_2.12/1.14.4/flink-examples-streaming_2.12-1.14.4.jar
123+
jarURI: STREAMING_EXAMPLES_JAR_URL
124124
parallelism: 2
125125
upgradeMode: savepoint
126126
entryClass: org.apache.flink.streaming.examples.statemachine.StateMachineExample
@@ -134,7 +134,7 @@ metadata:
134134
spec:
135135
deploymentName: session-cluster-1
136136
job:
137-
jarURI: https://repo1.maven.org/maven2/org/apache/flink/flink-examples-streaming_2.12/1.14.4/flink-examples-streaming_2.12-1.14.4.jar
137+
jarURI: STREAMING_EXAMPLES_JAR_URL
138138
parallelism: 2
139139
upgradeMode: savepoint
140140
entryClass: org.apache.flink.streaming.examples.statemachine.StateMachineExample

e2e-tests/data/sessionjob-cr.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ spec:
3131
nginx.ingress.kubernetes.io/rewrite-target: "/$2"
3232
flinkConfiguration:
3333
taskmanager.numberOfTaskSlots: "2"
34-
high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
34+
high-availability.type: kubernetes
3535
high-availability.storageDir: file:///opt/flink/volume/flink-ha
3636
state.checkpoints.dir: file:///opt/flink/volume/flink-cp
3737
state.savepoints.dir: file:///opt/flink/volume/flink-sp
@@ -73,7 +73,7 @@ metadata:
7373
spec:
7474
deploymentName: session-cluster-1
7575
job:
76-
jarURI: https://repo1.maven.org/maven2/org/apache/flink/flink-examples-streaming_2.12/1.14.4/flink-examples-streaming_2.12-1.14.4.jar
76+
jarURI: STREAMING_EXAMPLES_JAR_URL
7777
parallelism: 2
7878
upgradeMode: savepoint
7979
entryClass: org.apache.flink.streaming.examples.statemachine.StateMachineExample

e2e-tests/utils.sh

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -231,8 +231,14 @@ function delete_operator_pod_with_leadership() {
231231
function debug_and_show_logs {
232232
echo "Debugging failed e2e test:"
233233
echo "Currently existing Kubernetes resources"
234+
kubectl get flinkdeployments
235+
kubectl get flinksessionjobs
234236
kubectl get all
237+
kubectl get configmaps
238+
kubectl describe flinkdeployments
239+
kubectl describe flinksessionjobs
235240
kubectl describe all
241+
kubectl describe configmaps
236242

237243
operator_pod_namespace=$(get_operator_pod_namespace)
238244
operator_pod_names=$(get_operator_pod_name)

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -285,11 +285,13 @@ protected JobUpgrade getJobUpgrade(FlinkResourceContext<CR> ctx, Configuration d
285285

286286
boolean cancellable = allowLastStateCancel(ctx);
287287
if (running) {
288-
return getUpgradeModeBasedOnStateAge(ctx, deployConfig, cancellable);
288+
var mode = getUpgradeModeBasedOnStateAge(ctx, deployConfig, cancellable);
289+
LOG.info("Job is running, using {} for last-state upgrade", mode);
290+
return mode;
289291
}
290292

291293
if (cancellable) {
292-
LOG.info("Using cancel to perform last-state upgrade");
294+
LOG.info("Job is not running, using cancel to perform last-state upgrade");
293295
return JobUpgrade.lastStateUsingCancel();
294296
}
295297
}
@@ -356,8 +358,11 @@ private boolean allowLastStateCancel(FlinkResourceContext<CR> ctx) {
356358
}
357359

358360
var conf = ctx.getObserveConfig();
359-
return conf.get(KubernetesOperatorConfigOptions.OPERATOR_JOB_UPGRADE_LAST_STATE_CANCEL_JOB)
360-
|| !ctx.getFlinkService().isHaMetadataAvailable(conf);
361+
if (!ctx.getFlinkService().isHaMetadataAvailable(conf)) {
362+
LOG.info("HA metadata not available, cancel will be used instead of last-state");
363+
return true;
364+
}
365+
return conf.get(KubernetesOperatorConfigOptions.OPERATOR_JOB_UPGRADE_LAST_STATE_CANCEL_JOB);
361366
}
362367

363368
protected void restoreJob(

helm/flink-kubernetes-operator/conf/log4j-operator.properties

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,11 @@ appender.console.layout.pattern = %style{%d}{yellow} %style{%-30c{1.}}{cyan} %hi
2727

2828
# Do not log config loading
2929
logger.conf.name = org.apache.flink.configuration.GlobalConfiguration
30-
logger.conf.level = WARN
30+
logger.conf.level = ERROR
3131

3232
# Avoid logging fallback key INFO messages
3333
logger.conf.name = org.apache.flink.configuration.Configuration
34-
logger.conf.level = WARN
34+
logger.conf.level = ERROR
3535

3636
# The monitor interval in seconds to enable log4j automatic reconfiguration
3737
# monitorInterval = 30

0 commit comments

Comments
 (0)