Skip to content

Commit ebd7203

Browse files
authored
[FLINK-35052] Reject unsupported versions in the webhook validator
1 parent 6eb1c03 commit ebd7203

File tree

2 files changed

+14
-1
lines changed

2 files changed

+14
-1
lines changed

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.apache.flink.kubernetes.operator.api.FlinkSessionJob;
3131
import org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentSpec;
3232
import org.apache.flink.kubernetes.operator.api.spec.FlinkSessionJobSpec;
33+
import org.apache.flink.kubernetes.operator.api.spec.FlinkVersion;
3334
import org.apache.flink.kubernetes.operator.api.spec.IngressSpec;
3435
import org.apache.flink.kubernetes.operator.api.spec.JobManagerSpec;
3536
import org.apache.flink.kubernetes.operator.api.spec.JobSpec;
@@ -133,10 +134,16 @@ private Optional<String> validateDeploymentName(String name) {
133134

134135
private Optional<String> validateFlinkVersion(FlinkDeployment deployment) {
135136
var spec = deployment.getSpec();
136-
if (spec.getFlinkVersion() == null) {
137+
var version = spec.getFlinkVersion();
138+
if (version == null) {
137139
return Optional.of("Flink Version must be defined.");
138140
}
139141

142+
if (!FlinkVersion.isSupported(version)) {
143+
return Optional.of(
144+
"Flink version " + version + " is not supported by this operator version");
145+
}
146+
140147
var lastReconciledSpec =
141148
deployment.getStatus().getReconciliationStatus().deserializeLastReconciledSpec();
142149

flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -483,6 +483,12 @@ public void testValidationWithoutDefaultConfig() {
483483

484484
testError(dep -> dep.getSpec().setFlinkVersion(null), "Flink Version must be defined.");
485485

486+
testError(
487+
dep -> dep.getSpec().setFlinkVersion(FlinkVersion.v1_14),
488+
"Flink version "
489+
+ FlinkVersion.v1_14
490+
+ " is not supported by this operator version");
491+
486492
testSuccess(dep -> dep.getSpec().setFlinkVersion(FlinkVersion.v1_15));
487493

488494
testError(

0 commit comments

Comments
 (0)