77// the Business Source License, use of this software will be governed
88// by the Apache License, Version 2.0.
99
10+ use std:: sync:: Arc ;
11+
1012use anyhow:: bail;
1113use k8s_openapi:: {
1214 api:: {
@@ -25,18 +27,20 @@ use k8s_openapi::{
2527} ;
2628use kube:: {
2729 Api , Client , Resource , ResourceExt ,
28- api:: { ObjectMeta , PostParams } ,
30+ api:: { DeleteParams , ObjectMeta , PostParams } ,
2931 runtime:: {
32+ conditions:: is_deployment_completed,
3033 controller:: Action ,
3134 reflector:: { ObjectRef , Store } ,
35+ wait:: await_condition,
3236 } ,
3337} ;
3438use maplit:: btreemap;
35- use tracing:: trace;
39+ use tracing:: { trace, warn } ;
3640
3741use crate :: {
3842 Error ,
39- k8s:: { apply_resource, make_reflector} ,
43+ k8s:: { apply_resource, make_reflector, replace_resource } ,
4044 tls:: { DefaultCertificateSpecs , create_certificate, issuer_ref_defined} ,
4145} ;
4246use mz_cloud_resources:: crd:: {
@@ -85,7 +89,7 @@ impl Context {
8589 client : & Client ,
8690 balancer : & Balancer ,
8791 ) -> Result < ( ) , kube:: Error > {
88- let namespace = balancer. namespace ( ) . unwrap ( ) ;
92+ let namespace = balancer. namespace ( ) ;
8993 let balancer_api: Api < Balancer > = Api :: namespaced ( client. clone ( ) , & namespace) ;
9094
9195 let Some ( deployment) = self
@@ -448,6 +452,109 @@ impl Context {
448452 status : None ,
449453 }
450454 }
455+
456+ // TODO: remove this once everyone is upgraded to an orchestratord
457+ // version with the separate balancer operator
458+ async fn fix_deployment (
459+ & self ,
460+ deployment_api : & Api < Deployment > ,
461+ new_deployment : & Deployment ,
462+ ) -> Result < ( ) , Error > {
463+ let Some ( mut existing_deployment) = self
464+ . deployments
465+ . get (
466+ & ObjectRef :: new ( & new_deployment. name_unchecked ( ) )
467+ . within ( & new_deployment. namespace ( ) . unwrap ( ) ) ,
468+ )
469+ . map ( Arc :: unwrap_or_clone)
470+ else {
471+ return Ok ( ( ) ) ;
472+ } ;
473+
474+ if existing_deployment. spec . as_ref ( ) . unwrap ( ) . selector
475+ == new_deployment. spec . as_ref ( ) . unwrap ( ) . selector
476+ {
477+ return Ok ( ( ) ) ;
478+ }
479+
480+ warn ! ( "found existing deployment with old label selector, fixing" ) ;
481+
482+ // this is sufficient because the new labels are a superset of the
483+ // old labels, so the existing label selector should still be valid
484+ existing_deployment
485+ . spec
486+ . as_mut ( )
487+ . unwrap ( )
488+ . template
489+ . metadata
490+ . as_mut ( )
491+ . unwrap ( )
492+ . labels = new_deployment
493+ . spec
494+ . as_ref ( )
495+ . unwrap ( )
496+ . template
497+ . metadata
498+ . as_ref ( )
499+ . unwrap ( )
500+ . labels
501+ . clone ( ) ;
502+
503+ // using await_condition is not ideal in a controller loop, but this
504+ // is very temporary and will only ever happen once, so this feels
505+ // simpler than trying to introduce an entire state machine here
506+ replace_resource ( deployment_api, & existing_deployment) . await ?;
507+ await_condition (
508+ deployment_api. clone ( ) ,
509+ & existing_deployment. name_unchecked ( ) ,
510+ |deployment : Option < & Deployment > | {
511+ let observed_generation = deployment
512+ . and_then ( |deployment| deployment. status . as_ref ( ) )
513+ . and_then ( |status| status. observed_generation )
514+ . unwrap_or ( 0 ) ;
515+ let current_generation = deployment
516+ . and_then ( |deployment| deployment. meta ( ) . generation )
517+ . unwrap_or ( 0 ) ;
518+ let previous_generation = existing_deployment. meta ( ) . generation . unwrap_or ( 0 ) ;
519+ observed_generation == current_generation
520+ && current_generation > previous_generation
521+ } ,
522+ )
523+ . await
524+ . map_err ( |e| anyhow:: anyhow!( e) ) ?;
525+ await_condition (
526+ deployment_api. clone ( ) ,
527+ & existing_deployment. name_unchecked ( ) ,
528+ is_deployment_completed ( ) ,
529+ )
530+ . await
531+ . map_err ( |e| anyhow:: anyhow!( e) ) ?;
532+
533+ // delete the deployment but leave the pods around (via
534+ // DeleteParams::orphan)
535+ match kube:: runtime:: wait:: delete:: delete_and_finalize (
536+ deployment_api. clone ( ) ,
537+ & existing_deployment. name_unchecked ( ) ,
538+ & DeleteParams :: orphan ( ) ,
539+ )
540+ . await
541+ {
542+ Ok ( _) => { }
543+ Err ( kube:: runtime:: wait:: delete:: Error :: Delete ( kube:: Error :: Api ( e) ) )
544+ if e. code == 404 =>
545+ {
546+ // the resource already doesn't exist
547+ }
548+ Err ( e) => return Err ( anyhow:: anyhow!( e) . into ( ) ) ,
549+ }
550+
551+ // now, the normal apply of the new deployment (in the main loop)
552+ // will take over the existing pods from the old deployment we just
553+ // deleted, since we already updated the pod labels to be the same as
554+ // the new label selector
555+
556+ Ok ( ( ) )
557+ }
451558}
452559
453560#[ async_trait:: async_trait]
@@ -478,7 +585,7 @@ impl k8s_controller::Context for Context {
478585 return Ok ( None ) ;
479586 }
480587
481- let namespace = balancer. namespace ( ) . unwrap ( ) ;
588+ let namespace = balancer. namespace ( ) ;
482589 let certificate_api: Api < Certificate > = Api :: namespaced ( client. clone ( ) , & namespace) ;
483590 let deployment_api: Api < Deployment > = Api :: namespaced ( client. clone ( ) , & namespace) ;
484591 let service_api: Api < Service > = Api :: namespaced ( client. clone ( ) , & namespace) ;
@@ -489,6 +596,7 @@ impl k8s_controller::Context for Context {
489596 }
490597
491598 let deployment = self . create_deployment_object ( balancer) ?;
599+ self . fix_deployment ( & deployment_api, & deployment) . await ?;
492600 trace ! ( "creating new balancerd deployment" ) ;
493601 apply_resource ( & deployment_api, & deployment) . await ?;
494602
0 commit comments