|
25 | 25 | import org.apache.flink.kubernetes.operator.TestingFlinkService; |
26 | 26 | import org.apache.flink.kubernetes.operator.api.FlinkBlueGreenDeployment; |
27 | 27 | import org.apache.flink.kubernetes.operator.api.FlinkDeployment; |
| 28 | +import org.apache.flink.kubernetes.operator.api.spec.ConfigObjectNode; |
28 | 29 | import org.apache.flink.kubernetes.operator.api.spec.FlinkBlueGreenDeploymentSpec; |
29 | 30 | import org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentSpec; |
30 | 31 | import org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentTemplateSpec; |
@@ -270,12 +271,8 @@ public void verifyFailureDuringTransition(FlinkVersion flinkVersion) throws Exce |
270 | 271 | rs.reconciledStatus.getBlueGreenState()); |
271 | 272 | assertEquals( |
272 | 273 | customValue, |
273 | | - rs.deployment |
274 | | - .getSpec() |
275 | | - .getTemplate() |
276 | | - .getSpec() |
277 | | - .getFlinkConfiguration() |
278 | | - .get(CUSTOM_CONFIG_FIELD)); |
| 274 | + getFlinkConfigurationValue( |
| 275 | + rs.deployment.getSpec().getTemplate().getSpec(), CUSTOM_CONFIG_FIELD)); |
279 | 276 |
|
280 | 277 | // Simulating the Blue deployment doesn't start correctly (status will remain the same) |
281 | 278 | Long reschedDelayMs = 0L; |
@@ -328,6 +325,11 @@ public void verifyFailureDuringTransition(FlinkVersion flinkVersion) throws Exce |
328 | 325 | testTransitionToGreen(rs, customValue, null); |
329 | 326 | } |
330 | 327 |
|
| 328 | + private static String getFlinkConfigurationValue( |
| 329 | + FlinkDeploymentSpec flinkDeploymentSpec, String propertyName) { |
| 330 | + return flinkDeploymentSpec.getFlinkConfiguration().get(propertyName).asText(); |
| 331 | + } |
| 332 | + |
331 | 333 | @ParameterizedTest |
332 | 334 | @MethodSource("org.apache.flink.kubernetes.operator.TestUtils#flinkVersions") |
333 | 335 | public void verifySpecChangeDuringTransition(FlinkVersion flinkVersion) throws Exception { |
@@ -702,11 +704,9 @@ public void verifySpecificBehavior( |
702 | 704 | assertEquals(1, deployments.size()); |
703 | 705 | assertEquals( |
704 | 706 | "100 SECONDS", |
705 | | - deployments |
706 | | - .get(0) |
707 | | - .getSpec() |
708 | | - .getFlinkConfiguration() |
709 | | - .get("kubernetes.operator.reconcile.interval")); |
| 707 | + getFlinkConfigurationValue( |
| 708 | + deployments.get(0).getSpec(), |
| 709 | + "kubernetes.operator.reconcile.interval")); |
710 | 710 | } |
711 | 711 | } |
712 | 712 |
|
@@ -763,11 +763,9 @@ public void verifySpecificBehavior( |
763 | 763 | // Child spec change should be applied to FlinkDeployment |
764 | 764 | assertEquals( |
765 | 765 | "100 SECONDS", |
766 | | - deployments |
767 | | - .get(0) |
768 | | - .getSpec() |
769 | | - .getFlinkConfiguration() |
770 | | - .get("kubernetes.operator.reconcile.interval")); |
| 766 | + getFlinkConfigurationValue( |
| 767 | + deployments.get(0).getSpec(), |
| 768 | + "kubernetes.operator.reconcile.interval")); |
771 | 769 |
|
772 | 770 | // Top-level changes should be preserved in reconciled spec |
773 | 771 | assertNotNull(result.rs.reconciledStatus.getLastReconciledSpec()); |
@@ -1043,12 +1041,8 @@ private void testTransitionToGreen( |
1043 | 1041 | assertEquals(0, instantStrToMillis(rs.reconciledStatus.getDeploymentReadyTimestamp())); |
1044 | 1042 | assertEquals( |
1045 | 1043 | customValue, |
1046 | | - rs.deployment |
1047 | | - .getSpec() |
1048 | | - .getTemplate() |
1049 | | - .getSpec() |
1050 | | - .getFlinkConfiguration() |
1051 | | - .get(CUSTOM_CONFIG_FIELD)); |
| 1044 | + getFlinkConfigurationValue( |
| 1045 | + rs.deployment.getSpec().getTemplate().getSpec(), CUSTOM_CONFIG_FIELD)); |
1052 | 1046 |
|
1053 | 1047 | // Initiate and mark the Green deployment ready |
1054 | 1048 | simulateSuccessfulJobStart(getFlinkDeployments().get(1)); |
@@ -1213,12 +1207,14 @@ private static FlinkBlueGreenDeploymentSpec getTestFlinkDeploymentSpec(FlinkVers |
1213 | 1207 | .imagePullPolicy(IMAGE_POLICY) |
1214 | 1208 | .serviceAccount(SERVICE_ACCOUNT) |
1215 | 1209 | .flinkVersion(version) |
1216 | | - .flinkConfiguration(conf) |
| 1210 | + .flinkConfiguration(new ConfigObjectNode()) |
1217 | 1211 | .jobManager(new JobManagerSpec(new Resource(1.0, "2048m", "2G"), 1, null)) |
1218 | 1212 | .taskManager( |
1219 | 1213 | new TaskManagerSpec(new Resource(1.0, "2048m", "2G"), null, null)) |
1220 | 1214 | .build(); |
1221 | 1215 |
|
| 1216 | + flinkDeploymentSpec.setFlinkConfiguration(conf); |
| 1217 | + |
1222 | 1218 | Map<String, String> configuration = new HashMap<>(); |
1223 | 1219 | configuration.put(ABORT_GRACE_PERIOD.key(), "1"); |
1224 | 1220 | configuration.put(RECONCILIATION_RESCHEDULING_INTERVAL.key(), "500"); |
|
0 commit comments