1010use std:: {
1111 collections:: BTreeSet ,
1212 sync:: { Arc , Mutex } ,
13+ time:: Duration ,
1314} ;
1415
1516use anyhow:: Context as _;
@@ -27,19 +28,24 @@ use tracing::{debug, trace};
2728use uuid:: Uuid ;
2829
2930use crate :: {
30- Error , controller:: materialize:: environmentd:: V161 , k8s:: make_reflector,
31- matching_image_from_environmentd_image_ref, metrics:: Metrics , tls:: DefaultCertificateSpecs ,
31+ Error ,
32+ controller:: materialize:: environmentd:: V161 ,
33+ k8s:: { apply_resource, delete_resource, make_reflector} ,
34+ matching_image_from_environmentd_image_ref,
35+ metrics:: Metrics ,
36+ tls:: DefaultCertificateSpecs ,
3237} ;
3338use mz_cloud_provider:: CloudProvider ;
34- use mz_cloud_resources:: crd:: materialize:: v1alpha1:: {
35- Materialize , MaterializeRolloutStrategy , MaterializeStatus ,
39+ use mz_cloud_resources:: crd:: {
40+ ManagedResource ,
41+ balancer:: v1alpha1:: { Balancer , BalancerSpec } ,
42+ materialize:: v1alpha1:: { Materialize , MaterializeRolloutStrategy , MaterializeStatus } ,
3643} ;
3744use mz_license_keys:: validate;
3845use mz_orchestrator_kubernetes:: KubernetesImagePullPolicy ;
3946use mz_orchestrator_tracing:: TracingCliArgs ;
4047use mz_ore:: { cast:: CastFrom , cli:: KeyValueArg , instrument} ;
4148
42- pub mod balancer;
4349pub mod console;
4450pub mod environmentd;
4551
@@ -79,10 +85,6 @@ pub struct Config {
7985 pub clusterd_node_selector : Vec < KeyValueArg < String , String > > ,
8086 pub clusterd_affinity : Option < Affinity > ,
8187 pub clusterd_tolerations : Option < Vec < Toleration > > ,
82- pub balancerd_node_selector : Vec < KeyValueArg < String , String > > ,
83- pub balancerd_affinity : Option < Affinity > ,
84- pub balancerd_tolerations : Option < Vec < Toleration > > ,
85- pub balancerd_default_resources : Option < ResourceRequirements > ,
8688 pub console_node_selector : Vec < KeyValueArg < String , String > > ,
8789 pub console_affinity : Option < Affinity > ,
8890 pub console_tolerations : Option < Vec < Toleration > > ,
@@ -115,9 +117,7 @@ pub struct Config {
115117 pub environmentd_internal_http_port : u16 ,
116118 pub environmentd_internal_persist_pubsub_port : u16 ,
117119
118- pub balancerd_sql_port : u16 ,
119120 pub balancerd_http_port : u16 ,
120- pub balancerd_internal_http_port : u16 ,
121121
122122 pub console_http_port : u16 ,
123123
@@ -280,6 +280,7 @@ impl k8s_controller::Context for Context {
280280 mz : & Self :: Resource ,
281281 ) -> Result < Option < Action > , Self :: Error > {
282282 let mz_api: Api < Materialize > = Api :: namespaced ( client. clone ( ) , & mz. namespace ( ) ) ;
283+ let balancer_api: Api < Balancer > = Api :: namespaced ( client. clone ( ) , & mz. namespace ( ) ) ;
283284 let secret_api: Api < Secret > = Api :: namespaced ( client. clone ( ) , & mz. namespace ( ) ) ;
284285
285286 let status = mz. status ( ) ;
@@ -431,7 +432,7 @@ impl k8s_controller::Context for Context {
431432 last_completed_rollout_request : status. last_completed_rollout_request ,
432433 last_completed_rollout_environmentd_image_ref : status
433434 . last_completed_rollout_environmentd_image_ref ,
434- resource_id : status. resource_id ,
435+ resource_id : status. resource_id . clone ( ) ,
435436 resources_hash : String :: new ( ) ,
436437 conditions : vec ! [ Condition {
437438 type_: "UpToDate" . into( ) ,
@@ -600,7 +601,7 @@ impl k8s_controller::Context for Context {
600601 last_completed_rollout_request : mz. requested_reconciliation_id ( ) ,
601602 last_completed_rollout_environmentd_image_ref : status
602603 . last_completed_rollout_environmentd_image_ref ,
603- resource_id : status. resource_id ,
604+ resource_id : status. resource_id . clone ( ) ,
604605 resources_hash : status. resources_hash ,
605606 conditions : vec ! [ Condition {
606607 type_: "UpToDate" . into( ) ,
@@ -642,7 +643,7 @@ impl k8s_controller::Context for Context {
642643 last_completed_rollout_request : mz. requested_reconciliation_id ( ) ,
643644 last_completed_rollout_environmentd_image_ref : status
644645 . last_completed_rollout_environmentd_image_ref ,
645- resource_id : status. resource_id ,
646+ resource_id : status. resource_id . clone ( ) ,
646647 resources_hash : status. resources_hash ,
647648 conditions : vec ! [ Condition {
648649 type_: "UpToDate" . into( ) ,
@@ -672,20 +673,44 @@ impl k8s_controller::Context for Context {
672673 // enforced by the environmentd rollout process being able to call
673674 // into the promotion endpoint
674675
675- let balancer = balancer:: Resources :: new ( & self . config , mz) ;
676676 if self . config . create_balancers {
677- result = balancer. apply ( & client, & mz. namespace ( ) ) . await ?;
677+ let balancer = Balancer {
678+ metadata : mz. managed_resource_meta ( mz. name_unchecked ( ) ) ,
679+ spec : BalancerSpec {
680+ balancerd_image_ref : matching_image_from_environmentd_image_ref (
681+ & mz. spec . environmentd_image_ref ,
682+ "balancerd" ,
683+ None ,
684+ ) ,
685+ resource_requirements : mz. spec . balancerd_resource_requirements . clone ( ) ,
686+ replicas : Some ( mz. balancerd_replicas ( ) ) ,
687+ external_certificate_spec : mz. spec . balancerd_external_certificate_spec . clone ( ) ,
688+ internal_certificate_spec : mz. spec . internal_certificate_spec . clone ( ) ,
689+ pod_annotations : mz. spec . pod_annotations . clone ( ) ,
690+ pod_labels : mz. spec . pod_labels . clone ( ) ,
691+ static_routing : Some (
692+ mz_cloud_resources:: crd:: balancer:: v1alpha1:: StaticRoutingConfig {
693+ environmentd_namespace : mz. namespace ( ) ,
694+ environmentd_service_name : mz. environmentd_service_name ( ) ,
695+ } ,
696+ ) ,
697+ frontegg_routing : None ,
698+ resource_id : Some ( status. resource_id ) ,
699+ } ,
700+ status : None ,
701+ } ;
702+ let balancer = apply_resource ( & balancer_api, & balancer) . await ?;
703+ result = wait_for_balancer ( & balancer) ?;
678704 } else {
679- result = balancer . cleanup ( & client , & mz. namespace ( ) ) . await ?;
705+ delete_resource ( & balancer_api , & mz. name_prefixed ( "balancer" ) ) . await ?;
680706 }
681707
682708 if let Some ( action) = result {
683709 return Ok ( Some ( action) ) ;
684710 }
685711
686712 // and the console relies on the balancer service existing, which is
687- // enforced by balancer::Resources::apply having a check for its pods
688- // being up, and not returning successfully until they are
713+ // enforced by wait_for_balancer
689714
690715 let Some ( ( _, environmentd_image_tag) ) = mz. spec . environmentd_image_ref . rsplit_once ( ':' )
691716 else {
@@ -730,3 +755,20 @@ impl k8s_controller::Context for Context {
730755 Ok ( None )
731756 }
732757}
758+
759+ fn wait_for_balancer ( balancer : & Balancer ) -> Result < Option < Action > , Error > {
760+ if let Some ( conditions) = balancer
761+ . status
762+ . as_ref ( )
763+ . map ( |status| status. conditions . as_slice ( ) )
764+ {
765+ if conditions
766+ . iter ( )
767+ . any ( |condition| condition. type_ == "Ready" && condition. status == "True" )
768+ {
769+ return Ok ( None ) ;
770+ }
771+ }
772+
773+ Ok ( Some ( Action :: requeue ( Duration :: from_secs ( 1 ) ) ) )
774+ }
0 commit comments