1010use std:: {
1111 collections:: BTreeSet ,
1212 sync:: { Arc , Mutex } ,
13+ time:: Duration ,
1314} ;
1415
1516use anyhow:: Context as _;
@@ -27,19 +28,23 @@ use tracing::{debug, trace};
2728use uuid:: Uuid ;
2829
2930use crate :: {
30- DefaultCertificateSpecs , Error , controller:: materialize:: environmentd:: V161 ,
31- k8s:: make_reflector, matching_image_from_environmentd_image_ref, metrics:: Metrics ,
31+ DefaultCertificateSpecs , Error ,
32+ controller:: materialize:: environmentd:: V161 ,
33+ k8s:: { apply_resource, delete_resource, make_reflector} ,
34+ matching_image_from_environmentd_image_ref,
35+ metrics:: Metrics ,
3236} ;
3337use mz_cloud_provider:: CloudProvider ;
34- use mz_cloud_resources:: crd:: materialize:: v1alpha1:: {
35- Materialize , MaterializeRolloutStrategy , MaterializeStatus ,
38+ use mz_cloud_resources:: crd:: {
39+ ManagedResource ,
40+ balancer:: v1alpha1:: { Balancer , BalancerSpec } ,
41+ materialize:: v1alpha1:: { Materialize , MaterializeRolloutStrategy , MaterializeStatus } ,
3642} ;
3743use mz_license_keys:: validate;
3844use mz_orchestrator_kubernetes:: KubernetesImagePullPolicy ;
3945use mz_orchestrator_tracing:: TracingCliArgs ;
4046use mz_ore:: { cast:: CastFrom , cli:: KeyValueArg , instrument} ;
4147
42- pub mod balancer;
4348pub mod console;
4449pub mod environmentd;
4550
@@ -79,10 +84,6 @@ pub struct Config {
7984 pub clusterd_node_selector : Vec < KeyValueArg < String , String > > ,
8085 pub clusterd_affinity : Option < Affinity > ,
8186 pub clusterd_tolerations : Option < Vec < Toleration > > ,
82- pub balancerd_node_selector : Vec < KeyValueArg < String , String > > ,
83- pub balancerd_affinity : Option < Affinity > ,
84- pub balancerd_tolerations : Option < Vec < Toleration > > ,
85- pub balancerd_default_resources : Option < ResourceRequirements > ,
8687 pub console_node_selector : Vec < KeyValueArg < String , String > > ,
8788 pub console_affinity : Option < Affinity > ,
8889 pub console_tolerations : Option < Vec < Toleration > > ,
@@ -115,9 +116,7 @@ pub struct Config {
115116 pub environmentd_internal_http_port : u16 ,
116117 pub environmentd_internal_persist_pubsub_port : u16 ,
117118
118- pub balancerd_sql_port : u16 ,
119119 pub balancerd_http_port : u16 ,
120- pub balancerd_internal_http_port : u16 ,
121120
122121 pub console_http_port : u16 ,
123122
@@ -277,6 +276,7 @@ impl k8s_controller::Context for Context {
277276 mz : & Self :: Resource ,
278277 ) -> Result < Option < Action > , Self :: Error > {
279278 let mz_api: Api < Materialize > = Api :: namespaced ( client. clone ( ) , & mz. namespace ( ) ) ;
279+ let balancer_api: Api < Balancer > = Api :: namespaced ( client. clone ( ) , & mz. namespace ( ) ) ;
280280 let secret_api: Api < Secret > = Api :: namespaced ( client. clone ( ) , & mz. namespace ( ) ) ;
281281
282282 let status = mz. status ( ) ;
@@ -426,7 +426,7 @@ impl k8s_controller::Context for Context {
426426 // we fail later on, we want to ensure that the
427427 // rollout gets retried.
428428 last_completed_rollout_request : status. last_completed_rollout_request ,
429- resource_id : status. resource_id ,
429+ resource_id : status. resource_id . clone ( ) ,
430430 resources_hash : String :: new ( ) ,
431431 conditions : vec ! [ Condition {
432432 type_: "UpToDate" . into( ) ,
@@ -557,7 +557,7 @@ impl k8s_controller::Context for Context {
557557 MaterializeStatus {
558558 active_generation,
559559 last_completed_rollout_request : mz. requested_reconciliation_id ( ) ,
560- resource_id : status. resource_id ,
560+ resource_id : status. resource_id . clone ( ) ,
561561 resources_hash : status. resources_hash ,
562562 conditions : vec ! [ Condition {
563563 type_: "UpToDate" . into( ) ,
@@ -597,7 +597,7 @@ impl k8s_controller::Context for Context {
597597 MaterializeStatus {
598598 active_generation,
599599 last_completed_rollout_request : mz. requested_reconciliation_id ( ) ,
600- resource_id : status. resource_id ,
600+ resource_id : status. resource_id . clone ( ) ,
601601 resources_hash : status. resources_hash ,
602602 conditions : vec ! [ Condition {
603603 type_: "UpToDate" . into( ) ,
@@ -627,20 +627,44 @@ impl k8s_controller::Context for Context {
627627 // enforced by the environmentd rollout process being able to call
628628 // into the promotion endpoint
629629
630- let balancer = balancer:: Resources :: new ( & self . config , mz) ;
631630 if self . config . create_balancers {
632- result = balancer. apply ( & client, & mz. namespace ( ) ) . await ?;
631+ let balancer = Balancer {
632+ metadata : mz. managed_resource_meta ( mz. name_unchecked ( ) ) ,
633+ spec : BalancerSpec {
634+ balancerd_image_ref : matching_image_from_environmentd_image_ref (
635+ & mz. spec . environmentd_image_ref ,
636+ "balancerd" ,
637+ None ,
638+ ) ,
639+ resource_requirements : mz. spec . balancerd_resource_requirements . clone ( ) ,
640+ replicas : Some ( mz. balancerd_replicas ( ) ) ,
641+ external_certificate_spec : mz. spec . balancerd_external_certificate_spec . clone ( ) ,
642+ internal_certificate_spec : mz. spec . internal_certificate_spec . clone ( ) ,
643+ pod_annotations : mz. spec . pod_annotations . clone ( ) ,
644+ pod_labels : mz. spec . pod_labels . clone ( ) ,
645+ static_routing : Some (
646+ mz_cloud_resources:: crd:: balancer:: v1alpha1:: StaticRoutingConfig {
647+ environmentd_namespace : mz. namespace ( ) ,
648+ environmentd_service_name : mz. environmentd_service_name ( ) ,
649+ } ,
650+ ) ,
651+ frontegg_routing : None ,
652+ resource_id : Some ( status. resource_id ) ,
653+ } ,
654+ status : None ,
655+ } ;
656+ let balancer = apply_resource ( & balancer_api, & balancer) . await ?;
657+ result = wait_for_balancer ( & balancer) ?;
633658 } else {
634- result = balancer . cleanup ( & client , & mz. namespace ( ) ) . await ?;
659+ delete_resource ( & balancer_api , & mz. name_prefixed ( "balancer" ) ) . await ?;
635660 }
636661
637662 if let Some ( action) = result {
638663 return Ok ( Some ( action) ) ;
639664 }
640665
641666 // and the console relies on the balancer service existing, which is
642- // enforced by balancer::Resources::apply having a check for its pods
643- // being up, and not returning successfully until they are
667+ // enforced by wait_for_balancer
644668
645669 let Some ( ( _, environmentd_image_tag) ) = mz. spec . environmentd_image_ref . rsplit_once ( ':' )
646670 else {
@@ -685,3 +709,20 @@ impl k8s_controller::Context for Context {
685709 Ok ( None )
686710 }
687711}
712+
713+ fn wait_for_balancer ( balancer : & Balancer ) -> Result < Option < Action > , Error > {
714+ if let Some ( conditions) = balancer
715+ . status
716+ . as_ref ( )
717+ . map ( |status| status. conditions . as_slice ( ) )
718+ {
719+ if conditions
720+ . iter ( )
721+ . any ( |condition| condition. type_ == "Ready" && condition. status == "True" )
722+ {
723+ return Ok ( None ) ;
724+ }
725+ }
726+
727+ Ok ( Some ( Action :: requeue ( Duration :: from_secs ( 1 ) ) ) )
728+ }
0 commit comments