2121import org .apache .flink .configuration .CheckpointingOptions ;
2222import org .apache .flink .configuration .Configuration ;
2323import org .apache .flink .configuration .TaskManagerOptions ;
24+ import org .apache .flink .kubernetes .operator .TestUtils ;
2425import org .apache .flink .kubernetes .operator .TestingFlinkService ;
2526import org .apache .flink .kubernetes .operator .api .FlinkBlueGreenDeployment ;
2627import org .apache .flink .kubernetes .operator .api .FlinkDeployment ;
5152import org .jetbrains .annotations .NotNull ;
5253import org .junit .jupiter .api .BeforeEach ;
5354import org .junit .jupiter .params .ParameterizedTest ;
55+ import org .junit .jupiter .params .provider .Arguments ;
5456import org .junit .jupiter .params .provider .MethodSource ;
5557
5658import java .time .Instant ;
5759import java .util .HashMap ;
5860import java .util .List ;
5961import java .util .Map ;
6062import java .util .UUID ;
63+ import java .util .stream .Stream ;
6164
6265import static org .apache .flink .kubernetes .operator .api .spec .FlinkBlueGreenDeploymentConfigOptions .ABORT_GRACE_PERIOD ;
6366import static org .apache .flink .kubernetes .operator .api .spec .FlinkBlueGreenDeploymentConfigOptions .DEPLOYMENT_DELETION_DELAY ;
@@ -328,54 +331,193 @@ public void verifyFailureBeforeFirstDeployment(FlinkVersion flinkVersion) throws
328331 }
329332
330333 @ ParameterizedTest
331- @ MethodSource ("org.apache.flink.kubernetes.operator.TestUtils#flinkVersions" )
332- public void verifyPatchChildScenario (FlinkVersion flinkVersion ) throws Exception {
334+ @ MethodSource ("patchScenarioProvider" )
335+ public void verifyPatchScenario (FlinkVersion flinkVersion , PatchTestCase testCase )
336+ throws Exception {
337+ var rs = setupActiveBlueDeployment (flinkVersion );
338+
339+ testCase .applyChanges (rs .deployment , kubernetesClient );
340+
341+ var result = reconcileAndVerifyPatchBehavior (rs );
342+
343+ testCase .verifySpecificBehavior (result , getFlinkDeployments ());
344+
345+ assertFinalized (
346+ result .minReconciliationTs , result .rs , FlinkBlueGreenDeploymentState .ACTIVE_BLUE );
347+ }
348+
349+ static Stream <Arguments > patchScenarioProvider () {
350+ // Extract FlinkVersions from TestUtils and combine with PatchTypes
351+ return TestUtils .flinkVersions ()
352+ .flatMap (
353+ args -> {
354+ FlinkVersion version = (FlinkVersion ) args .get ()[0 ];
355+ return Stream .of (
356+ Arguments .of (version , new PatchChildTestCase ()),
357+ Arguments .of (version , new PatchTopLevelTestCase ()),
358+ Arguments .of (version , new PatchBothTestCase ()));
359+ });
360+ }
361+
362+ // ==================== Test Case Interfaces and Implementations ====================
363+
364+ interface PatchTestCase {
365+ void applyChanges (FlinkBlueGreenDeployment deployment , KubernetesClient client );
366+
367+ void verifySpecificBehavior (ReconcileResult result , List <FlinkDeployment > deployments );
368+ }
369+
370+ static class PatchChildTestCase implements PatchTestCase {
371+ @ Override
372+ public void applyChanges (FlinkBlueGreenDeployment deployment , KubernetesClient client ) {
373+ FlinkDeploymentSpec spec = deployment .getSpec ().getTemplate ().getSpec ();
374+ spec .getJob ().setSavepointRedeployNonce (12345L );
375+ deployment .getSpec ().getTemplate ().setSpec (spec );
376+ client .resource (deployment ).createOrReplace ();
377+ }
378+
379+ @ Override
380+ public void verifySpecificBehavior (
381+ ReconcileResult result , List <FlinkDeployment > deployments ) {
382+ assertEquals (1 , deployments .size ());
383+ assertEquals (
384+ 12345L ,
385+ (long ) deployments .get (0 ).getSpec ().getJob ().getSavepointRedeployNonce ());
386+ }
387+ }
388+
389+ static class PatchTopLevelTestCase implements PatchTestCase {
390+ @ Override
391+ public void applyChanges (FlinkBlueGreenDeployment deployment , KubernetesClient client ) {
392+ FlinkDeploymentTemplateSpec template = deployment .getSpec ().getTemplate ();
393+ Map <String , String > configuration = new HashMap <>(template .getConfiguration ());
394+ configuration .put ("custom.top.level" , "custom-top-level-value" );
395+ template .setConfiguration (configuration );
396+ deployment .getSpec ().setTemplate (template );
397+ client .resource (deployment ).createOrReplace ();
398+ }
399+
400+ @ Override
401+ public void verifySpecificBehavior (
402+ ReconcileResult result , List <FlinkDeployment > deployments ) {
403+ assertEquals (1 , deployments .size ());
404+ var existingDeployment = result .existingFlinkDeployment ;
405+ var currentDeployment = deployments .get (0 );
406+
407+ // FlinkDeployment should remain unchanged for top-level only changes
408+ assertEquals (existingDeployment , currentDeployment );
409+ }
410+ }
411+
412+ static class PatchBothTestCase implements PatchTestCase {
413+ @ Override
414+ public void applyChanges (FlinkBlueGreenDeployment deployment , KubernetesClient client ) {
415+ FlinkDeploymentTemplateSpec template = deployment .getSpec ().getTemplate ();
416+
417+ // 1. Add top-level configuration change
418+ Map <String , String > configuration = new HashMap <>(template .getConfiguration ());
419+ configuration .put ("custom.both.level" , "custom-both-level-value" );
420+ template .setConfiguration (configuration );
421+
422+ // 2. Add nested spec change
423+ FlinkDeploymentSpec spec = template .getSpec ();
424+ spec .getJob ().setSavepointRedeployNonce (67890L );
425+ template .setSpec (spec );
426+
427+ deployment .getSpec ().setTemplate (template );
428+ client .resource (deployment ).createOrReplace ();
429+ }
430+
431+ @ Override
432+ public void verifySpecificBehavior (
433+ ReconcileResult result , List <FlinkDeployment > deployments ) {
434+ assertEquals (1 , deployments .size ());
435+ var updatedDeployment = deployments .get (0 );
436+
437+ // Child spec change should be applied to FlinkDeployment
438+ assertEquals (
439+ 67890L ,
440+ (long ) updatedDeployment .getSpec ().getJob ().getSavepointRedeployNonce ());
441+
442+ // Top-level changes should be preserved in reconciled spec
443+ assertNotNull (result .rs .reconciledStatus .getLastReconciledSpec ());
444+ assertEquals (
445+ SpecUtils .writeSpecAsJSON (result .rs .deployment .getSpec (), "spec" ),
446+ result .rs .reconciledStatus .getLastReconciledSpec ());
447+ }
448+ }
449+
450+ // ==================== Helper Classes ====================
451+
452+ static class ReconcileResult {
453+ final long minReconciliationTs ;
454+ final TestingFlinkBlueGreenDeploymentController .BlueGreenReconciliationResult rs ;
455+ final FlinkDeployment existingFlinkDeployment ;
456+
457+ ReconcileResult (
458+ long minReconciliationTs ,
459+ TestingFlinkBlueGreenDeploymentController .BlueGreenReconciliationResult rs ,
460+ FlinkDeployment existingFlinkDeployment ) {
461+ this .minReconciliationTs = minReconciliationTs ;
462+ this .rs = rs ;
463+ this .existingFlinkDeployment = existingFlinkDeployment ;
464+ }
465+ }
466+
467+ // ==================== Common Test Helper Methods ====================
468+
469+ private TestingFlinkBlueGreenDeploymentController .BlueGreenReconciliationResult
470+ setupActiveBlueDeployment (FlinkVersion flinkVersion ) throws Exception {
333471 var blueGreenDeployment =
334472 buildSessionCluster (TEST_DEPLOYMENT_NAME , TEST_NAMESPACE , flinkVersion );
473+ return executeBasicDeployment (flinkVersion , blueGreenDeployment , false );
474+ }
335475
336- // Execute basic deployment to get to ACTIVE_BLUE state
337- var rs = executeBasicDeployment (flinkVersion , blueGreenDeployment , false );
476+ private ReconcileResult reconcileAndVerifyPatchBehavior (
477+ TestingFlinkBlueGreenDeploymentController .BlueGreenReconciliationResult rs )
478+ throws Exception {
338479
339- // Modify template.spec to trigger PATCH_CHILD (non-scale/upgrade change)
340- simulatePatchChildChange (rs .deployment );
480+ var flinkDeployments = getFlinkDeployments ();
481+ assertEquals (1 , flinkDeployments .size ());
482+ var existingFlinkDeployment = flinkDeployments .get (0 );
341483
342- // Reconcile to trigger the PATCH_CHILD path
343484 var minReconciliationTs = System .currentTimeMillis () - 1 ;
344485 rs = reconcile (rs .deployment );
345486
346- // Verify the patch operation was triggered
487+ assertPatchOperationTriggered (rs , minReconciliationTs );
488+ assertTransitioningState (rs );
489+
490+ minReconciliationTs = System .currentTimeMillis () - 1 ;
491+ rs = reconcile (rs .deployment );
492+
493+ return new ReconcileResult (minReconciliationTs , rs , existingFlinkDeployment );
494+ }
495+
496+ private void assertPatchOperationTriggered (
497+ TestingFlinkBlueGreenDeploymentController .BlueGreenReconciliationResult rs ,
498+ long minReconciliationTs ) {
347499 assertTrue (rs .updateControl .isPatchStatus ());
348500 assertTrue (rs .updateControl .getScheduleDelay ().isPresent ());
349501 assertTrue (rs .updateControl .getScheduleDelay ().get () > 0 );
350502 assertTrue (
351503 minReconciliationTs
352504 < instantStrToMillis (rs .reconciledStatus .getLastReconciledTimestamp ()));
505+ }
353506
354- // Verify state transition to TRANSITIONING_TO_BLUE (patching the blue deployment)
507+ private void assertTransitioningState (
508+ TestingFlinkBlueGreenDeploymentController .BlueGreenReconciliationResult rs ) {
355509 assertEquals (
356510 FlinkBlueGreenDeploymentState .TRANSITIONING_TO_BLUE ,
357511 rs .reconciledStatus .getBlueGreenState ());
358512 assertEquals (JobStatus .RECONCILING , rs .reconciledStatus .getJobStatus ().getState ());
359-
360- minReconciliationTs = System .currentTimeMillis () - 1 ;
361- rs = reconcile (rs .deployment );
362-
363- assertTrue (rs .updateControl .isPatchStatus ());
364-
365- // Verify that the FlinkDeployment was updated with the new configuration
366- var flinkDeployments = getFlinkDeployments ();
367- assertEquals (1 , flinkDeployments .size ());
368- var updatedDeployment = flinkDeployments .get (0 );
369-
370- assertTrue (updatedDeployment .getSpec ().getJob ().getSavepointRedeployNonce () == 12345L );
371- assertFinalized (minReconciliationTs , rs , FlinkBlueGreenDeploymentState .ACTIVE_BLUE );
372513 }
373514
374515 private void assertFinalized (
375516 long minReconciliationTs ,
376517 TestingFlinkBlueGreenDeploymentController .BlueGreenReconciliationResult rs ,
377518 FlinkBlueGreenDeploymentState expectedBGDeploymentState )
378519 throws Exception {
520+ assertTrue (rs .updateControl .isPatchStatus ());
379521 assertTrue (
380522 minReconciliationTs
381523 < instantStrToMillis (rs .reconciledStatus .getLastReconciledTimestamp ()));
@@ -393,19 +535,6 @@ private void assertFinalized(
393535 assertTrue (rs .updateControl .isNoUpdate ());
394536 }
395537
396- private void simulatePatchChildChange (FlinkBlueGreenDeployment blueGreenDeployment ) {
397- // Modify a field in template.spec that would trigger PATCH_CHILD
398- // This is a change that doesn't trigger SCALE or UPGRADE according to
399- // FlinkBlueGreenDeploymentSpecDiff
400- FlinkDeploymentSpec spec = blueGreenDeployment .getSpec ().getTemplate ().getSpec ();
401-
402- // Change nested spec property that doesn't trigger SCALE/UPGRADE
403- spec .getJob ().setSavepointRedeployNonce (12345L );
404-
405- blueGreenDeployment .getSpec ().getTemplate ().setSpec (spec );
406- kubernetesClient .resource (blueGreenDeployment ).createOrReplace ();
407- }
408-
409538 private TestingFlinkBlueGreenDeploymentController .BlueGreenReconciliationResult
410539 executeBasicDeployment (
411540 FlinkVersion flinkVersion ,
0 commit comments