Skip to content

Commit 18da43c

Browse files
committed
Defensive code vs. non-BASIC TransitionMode (only for FLIP-503)
1 parent 24f65e8 commit 18da43c

File tree

2 files changed

+37
-1
lines changed

2 files changed

+37
-1
lines changed

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkBlueGreenDeploymentController.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.flink.kubernetes.operator.api.FlinkBlueGreenDeployment;
2323
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
2424
import org.apache.flink.kubernetes.operator.api.bluegreen.DeploymentType;
25+
import org.apache.flink.kubernetes.operator.api.bluegreen.TransitionMode;
2526
import org.apache.flink.kubernetes.operator.api.lifecycle.ResourceLifecycleState;
2627
import org.apache.flink.kubernetes.operator.api.spec.FlinkBlueGreenDeploymentSpec;
2728
import org.apache.flink.kubernetes.operator.api.spec.JobState;
@@ -54,6 +55,8 @@
5455
import org.slf4j.Logger;
5556
import org.slf4j.LoggerFactory;
5657

58+
import javax.naming.OperationNotSupportedException;
59+
5760
import java.time.Instant;
5861
import java.util.List;
5962
import java.util.Map;
@@ -98,6 +101,12 @@ public UpdateControl<FlinkBlueGreenDeployment> reconcile(
98101
FlinkBlueGreenDeployment bgDeployment, Context<FlinkBlueGreenDeployment> josdkContext)
99102
throws Exception {
100103

104+
// TODO: this verification is only for FLIP-503, remove later.
105+
if (bgDeployment.getSpec().getTemplate().getTransitionMode() != TransitionMode.BASIC) {
106+
throw new OperationNotSupportedException(
107+
"Only TransitionMode == BASIC is currently supported");
108+
}
109+
101110
FlinkBlueGreenDeploymentStatus deploymentStatus = bgDeployment.getStatus();
102111

103112
if (deploymentStatus == null) {
@@ -568,6 +577,9 @@ private boolean hasSpecChanged(
568577

569578
String lastReconciledSpec = deploymentStatus.getLastReconciledSpec();
570579
String newSpecSerialized = SpecUtils.serializeObject(newSpec, "spec");
580+
581+
// TODO: in FLIP-504 check here the TransitionMode has not been changed
582+
571583
return !lastReconciledSpec.equals(newSpecSerialized);
572584
}
573585

flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkBlueGreenDeploymentControllerTest.java

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.flink.kubernetes.operator.TestingFlinkService;
2525
import org.apache.flink.kubernetes.operator.api.FlinkBlueGreenDeployment;
2626
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
27+
import org.apache.flink.kubernetes.operator.api.bluegreen.TransitionMode;
2728
import org.apache.flink.kubernetes.operator.api.spec.FlinkBlueGreenDeploymentSpec;
2829
import org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentSpec;
2930
import org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentTemplateSpec;
@@ -53,6 +54,8 @@
5354
import org.junit.jupiter.params.ParameterizedTest;
5455
import org.junit.jupiter.params.provider.MethodSource;
5556

57+
import javax.naming.OperationNotSupportedException;
58+
5659
import java.time.Instant;
5760
import java.util.HashMap;
5861
import java.util.List;
@@ -66,6 +69,7 @@
6669
import static org.junit.jupiter.api.Assertions.assertFalse;
6770
import static org.junit.jupiter.api.Assertions.assertNotNull;
6871
import static org.junit.jupiter.api.Assertions.assertNull;
72+
import static org.junit.jupiter.api.Assertions.assertThrows;
6973
import static org.junit.jupiter.api.Assertions.assertTrue;
7074

7175
/** {@link FlinkBlueGreenDeploymentController} tests. */
@@ -280,7 +284,6 @@ public void verifyFailureBeforeFirstDeployment(FlinkVersion flinkVersion) throws
280284
buildSessionCluster(TEST_DEPLOYMENT_NAME, TEST_NAMESPACE, flinkVersion);
281285

282286
// Initiate the Blue deployment
283-
var originalSpec = blueGreenDeployment.getSpec();
284287
var rs = initialPhaseBasicDeployment(blueGreenDeployment, false);
285288

286289
// Simulating the job did not start correctly before the AbortGracePeriodMs
@@ -319,6 +322,20 @@ public void verifyFailureBeforeFirstDeployment(FlinkVersion flinkVersion) throws
319322
rs.reconciledStatus.getBlueGreenState());
320323
}
321324

325+
// TODO: this test is only for FLIP-503, remove later.
326+
@ParameterizedTest
327+
@MethodSource("org.apache.flink.kubernetes.operator.TestUtils#flinkVersions")
328+
public void verifyOnlyBasicTransitionIsAllowed(FlinkVersion flinkVersion) throws Exception {
329+
var blueGreenDeployment =
330+
buildSessionCluster(TEST_DEPLOYMENT_NAME, TEST_NAMESPACE, flinkVersion);
331+
332+
blueGreenDeployment.getSpec().getTemplate().setTransitionMode(null);
333+
assertThrows(OperationNotSupportedException.class, () -> reconcile(blueGreenDeployment));
334+
335+
blueGreenDeployment.getSpec().getTemplate().setTransitionMode(TransitionMode.ADVANCED);
336+
assertThrows(OperationNotSupportedException.class, () -> reconcile(blueGreenDeployment));
337+
}
338+
322339
private TestingFlinkBlueGreenDeploymentController.BlueGreenReconciliationResult
323340
executeBasicDeployment(
324341
FlinkVersion flinkVersion,
@@ -372,6 +389,12 @@ public void verifyFailureBeforeFirstDeployment(FlinkVersion flinkVersion) throws
372389
initialPhaseBasicDeployment(
373390
FlinkBlueGreenDeployment blueGreenDeployment, boolean execAssertions)
374391
throws Exception {
392+
if (blueGreenDeployment.getSpec().getTemplate().getTransitionMode()
393+
!= TransitionMode.BASIC) {
394+
throw new OperationNotSupportedException(
395+
"Only TransitionMode == BASIC is currently supported");
396+
}
397+
375398
Long minReconciliationTs = System.currentTimeMillis() - 1;
376399

377400
// 1a. Initializing deploymentStatus with this call
@@ -594,6 +617,7 @@ private static FlinkBlueGreenDeployment buildSessionCluster(
594617
.state(JobState.RUNNING)
595618
.build());
596619

620+
bgDeploymentSpec.getTemplate().setTransitionMode(TransitionMode.BASIC);
597621
deployment.setSpec(bgDeploymentSpec);
598622
return deployment;
599623
}

0 commit comments

Comments
 (0)