6161import java .util .stream .Collectors ;
6262import java .util .stream .Stream ;
6363
64+ import static io .javaoperatorsdk .operator .api .reconciler .UpdateControl .noUpdate ;
65+
6466/** Controller that runs the main reconcile loop for Flink Blue/Green deployments. */
6567@ ControllerConfiguration
6668public class FlinkBlueGreenDeploymentController
6769 implements Reconciler <FlinkBlueGreenDeployment >,
6870 EventSourceInitializer <FlinkBlueGreenDeployment > {
6971
7072 private static final Logger LOG = LoggerFactory .getLogger (FlinkDeploymentController .class );
71- private static final int DEFAULT_MAX_NUM_RETRIES = 5 ;
73+ private static final int DEFAULT_MAX_NUM_RETRIES = 10 ;
7274 private static final int DEFAULT_RECONCILIATION_RESCHEDULING_INTERVAL_MS = 15000 ;
7375
7476 private final FlinkResourceContextFactory ctxFactory ;
@@ -144,7 +146,7 @@ public UpdateControl<FlinkBlueGreenDeployment> reconcile(
144146 DeploymentType .BLUE ,
145147 josdkContext );
146148 default :
147- return UpdateControl . noUpdate ();
149+ return noUpdate ();
148150 }
149151 }
150152 }
@@ -191,39 +193,82 @@ private UpdateControl<FlinkBlueGreenDeployment> monitorTransition(
191193 + currentDeploymentType );
192194
193195 if (isDeploymentReady (nextDeployment , josdkContext , deploymentStatus )) {
194- return deleteAndFinalize (
196+ return canDelete (
195197 bgDeployment , deploymentStatus , josdkContext , currentDeployment , nextState );
196198 } else {
197- int maxNumRetries = bgDeployment . getSpec (). getTemplate (). getMaxNumRetries ();
198- if ( maxNumRetries <= 0 ) {
199- maxNumRetries = DEFAULT_MAX_NUM_RETRIES ;
200- }
199+ return retryOrAbort (
200+ bgDeployment , deploymentStatus , josdkContext , nextDeployment , nextState );
201+ }
202+ }
201203
202- if (deploymentStatus .getNumRetries () >= maxNumRetries ) {
203- // ABORT
204- // Suspend the nextDeployment (FlinkDeployment)
205- nextDeployment .getStatus ().getJobStatus ().setState (JobStatus .SUSPENDED );
206- josdkContext .getClient ().resource (nextDeployment ).update ();
204+ private UpdateControl <FlinkBlueGreenDeployment > canDelete (
205+ FlinkBlueGreenDeployment bgDeployment ,
206+ FlinkBlueGreenDeploymentStatus deploymentStatus ,
207+ Context <FlinkBlueGreenDeployment > josdkContext ,
208+ FlinkDeployment currentDeployment ,
209+ FlinkBlueGreenDeploymentState nextState ) {
210+ int deploymentDeletionDelayMs =
211+ Math .max (bgDeployment .getSpec ().getTemplate ().getDeploymentDeletionDelayMs (), 0 );
212+
213+ if (deploymentStatus .getDeploymentReadyTimestamp () == 0 ) {
214+ LOG .info (
215+ "Deployment marked ready on "
216+ + System .currentTimeMillis ()
217+ + ", rescheduling reconciliation in "
218+ + deploymentDeletionDelayMs
219+ + " ms." );
220+ deploymentStatus .setDeploymentReadyTimestamp (System .currentTimeMillis ());
221+ return patchStatusUpdateControl (bgDeployment , deploymentStatus , null , null )
222+ .rescheduleAfter (deploymentDeletionDelayMs );
223+ }
207224
208- // We indicate this Blue/Green deployment is no longer Transitioning
209- // and rollback the state value
210- deploymentStatus .setBlueGreenState (
211- nextState == FlinkBlueGreenDeploymentState .ACTIVE_BLUE
212- ? FlinkBlueGreenDeploymentState .ACTIVE_GREEN
213- : FlinkBlueGreenDeploymentState .ACTIVE_BLUE );
225+ var deletionTs = deploymentStatus .getDeploymentReadyTimestamp () + deploymentDeletionDelayMs ;
214226
215- // If the current running FlinkDeployment is not in RUNNING/STABLE,
216- // we flag this Blue/Green as FAILING
217- return patchStatusUpdateControl (
218- bgDeployment , deploymentStatus , null , JobStatus .FAILING , false );
219- } else {
220- // RETRY
221- deploymentStatus .setNumRetries (deploymentStatus .getNumRetries () + 1 );
227+ if (deletionTs < System .currentTimeMillis ()) {
228+ return deleteAndFinalize (
229+ bgDeployment , deploymentStatus , josdkContext , currentDeployment , nextState );
230+ } else {
231+ long delay = deletionTs - System .currentTimeMillis ();
232+ LOG .info ("Rescheduling reconciliation (to delete) in " + delay + " ms." );
233+ return UpdateControl .<FlinkBlueGreenDeployment >noUpdate ().rescheduleAfter (delay );
234+ }
235+ }
222236
223- LOG .info ("Deployment " + nextDeployment .getMetadata ().getName () + " not ready yet" );
224- return patchStatusUpdateControl (bgDeployment , deploymentStatus , null , null , false )
225- .rescheduleAfter (getReconciliationReschedInterval (bgDeployment ));
226- }
237+ private UpdateControl <FlinkBlueGreenDeployment > retryOrAbort (
238+ FlinkBlueGreenDeployment bgDeployment ,
239+ FlinkBlueGreenDeploymentStatus deploymentStatus ,
240+ Context <FlinkBlueGreenDeployment > josdkContext ,
241+ FlinkDeployment nextDeployment ,
242+ FlinkBlueGreenDeploymentState nextState ) {
243+ int maxNumRetries = bgDeployment .getSpec ().getTemplate ().getMaxNumRetries ();
244+ if (maxNumRetries <= 0 ) {
245+ maxNumRetries = DEFAULT_MAX_NUM_RETRIES ;
246+ }
247+
248+ if (deploymentStatus .getNumRetries () >= maxNumRetries ) {
249+ // ABORT
250+ // Suspend the nextDeployment (FlinkDeployment)
251+ nextDeployment .getStatus ().getJobStatus ().setState (JobStatus .SUSPENDED );
252+ josdkContext .getClient ().resource (nextDeployment ).replace ();
253+
254+ // We indicate this Blue/Green deployment is no longer Transitioning
255+ // and rollback the state value
256+ deploymentStatus .setBlueGreenState (
257+ nextState == FlinkBlueGreenDeploymentState .ACTIVE_BLUE
258+ ? FlinkBlueGreenDeploymentState .ACTIVE_GREEN
259+ : FlinkBlueGreenDeploymentState .ACTIVE_BLUE );
260+
261+ // If the current running FlinkDeployment is not in RUNNING/STABLE,
262+ // we flag this Blue/Green as FAILING
263+ return patchStatusUpdateControl (
264+ bgDeployment , deploymentStatus , null , JobStatus .FAILING );
265+ } else {
266+ // RETRY
267+ deploymentStatus .setNumRetries (deploymentStatus .getNumRetries () + 1 );
268+
269+ LOG .info ("Deployment " + nextDeployment .getMetadata ().getName () + " not ready yet" );
270+ return patchStatusUpdateControl (bgDeployment , deploymentStatus , null , null )
271+ .rescheduleAfter (getReconciliationReschedInterval (bgDeployment ));
227272 }
228273 }
229274
@@ -245,10 +290,11 @@ private UpdateControl<FlinkBlueGreenDeployment> deleteAndFinalize(
245290
246291 if (currentDeployment != null ) {
247292 deleteDeployment (currentDeployment , josdkContext );
248- return UpdateControl . noUpdate ();
293+ return noUpdate ();
249294 } else {
295+ deploymentStatus .setDeploymentReadyTimestamp (0 );
250296 return patchStatusUpdateControl (
251- bgDeployment , deploymentStatus , nextState , JobStatus .RUNNING , false );
297+ bgDeployment , deploymentStatus , nextState , JobStatus .RUNNING );
252298 }
253299 }
254300
@@ -301,16 +347,12 @@ private UpdateControl<FlinkBlueGreenDeployment> checkAndInitiateDeployment(
301347 // we flag this Blue/Green as FAILING
302348 if (deploymentStatus .getJobStatus ().getState () != JobStatus .FAILING ) {
303349 return patchStatusUpdateControl (
304- flinkBlueGreenDeployment ,
305- deploymentStatus ,
306- null ,
307- JobStatus .FAILING ,
308- false );
350+ flinkBlueGreenDeployment , deploymentStatus , null , JobStatus .FAILING );
309351 }
310352 }
311353 }
312354
313- return UpdateControl . noUpdate ();
355+ return noUpdate ();
314356 }
315357
316358 private static void setLastReconciledSpec (
@@ -418,12 +460,7 @@ private UpdateControl<FlinkBlueGreenDeployment> initiateDeployment(
418460 josdkContext ,
419461 isFirstDeployment );
420462
421- return patchStatusUpdateControl (
422- flinkBlueGreenDeployment ,
423- deploymentStatus ,
424- nextState ,
425- null ,
426- isFirstDeployment )
463+ return patchStatusUpdateControl (flinkBlueGreenDeployment , deploymentStatus , nextState , null )
427464 .rescheduleAfter (getReconciliationReschedInterval (flinkBlueGreenDeployment ));
428465 }
429466
@@ -476,16 +513,15 @@ private boolean hasSpecChanged(
476513 DeploymentType deploymentType ) {
477514
478515 String lastReconciledSpec = deploymentStatus .getLastReconciledSpec ();
479-
480- return !lastReconciledSpec .equals (SpecUtils . serializeObject ( newSpec , "spec" ) );
516+ String newSpecSerialized = SpecUtils . serializeObject ( newSpec , "spec" );
517+ return !lastReconciledSpec .equals (newSpecSerialized );
481518 }
482519
483520 private UpdateControl <FlinkBlueGreenDeployment > patchStatusUpdateControl (
484521 FlinkBlueGreenDeployment flinkBlueGreenDeployment ,
485522 FlinkBlueGreenDeploymentStatus deploymentStatus ,
486523 FlinkBlueGreenDeploymentState deploymentState ,
487- JobStatus jobState ,
488- boolean isFirstDeployment ) {
524+ JobStatus jobState ) {
489525 if (deploymentState != null ) {
490526 deploymentStatus .setBlueGreenState (deploymentState );
491527 }
@@ -494,6 +530,7 @@ private UpdateControl<FlinkBlueGreenDeployment> patchStatusUpdateControl(
494530 deploymentStatus .getJobStatus ().setState (jobState );
495531 }
496532
533+ deploymentStatus .setLastReconciledTimestamp (System .currentTimeMillis ());
497534 flinkBlueGreenDeployment .setStatus (deploymentStatus );
498535 return UpdateControl .patchStatus (flinkBlueGreenDeployment );
499536 }
0 commit comments