11//! Ensures that `Pod`s are configured and running for each [`DruidCluster`]
22use std:: {
33 collections:: { BTreeMap , HashMap } ,
4- ops:: Deref ,
54 str:: FromStr ,
65 sync:: Arc ,
76} ;
@@ -49,6 +48,7 @@ use stackable_operator::{
4948 DeepMerge ,
5049 } ,
5150 kube:: {
51+ core:: { error_boundary, DeserializeGuard } ,
5252 runtime:: { controller:: Action , reflector:: ObjectRef } ,
5353 Resource ,
5454 } ,
@@ -358,6 +358,11 @@ pub enum Error {
358358 AddVolumeMount {
359359 source : builder:: pod:: container:: Error ,
360360 } ,
361+
362+ #[ snafu( display( "DruidCluster object is invalid" ) ) ]
363+ InvalidDruidCluster {
364+ source : error_boundary:: InvalidObject ,
365+ } ,
361366}
362367
363368type Result < T , E = Error > = std:: result:: Result < T , E > ;
@@ -368,8 +373,17 @@ impl ReconcilerError for Error {
368373 }
369374}
370375
371- pub async fn reconcile_druid ( druid : Arc < DruidCluster > , ctx : Arc < Ctx > ) -> Result < Action > {
376+ pub async fn reconcile_druid (
377+ druid : Arc < DeserializeGuard < DruidCluster > > ,
378+ ctx : Arc < Ctx > ,
379+ ) -> Result < Action > {
372380 tracing:: info!( "Starting reconcile" ) ;
381+ let druid = druid
382+ . 0
383+ . as_ref ( )
384+ . map_err ( error_boundary:: InvalidObject :: clone)
385+ . context ( InvalidDruidClusterSnafu ) ?;
386+
373387 let client = & ctx. client ;
374388 let namespace = & druid
375389 . metadata
@@ -394,7 +408,7 @@ pub async fn reconcile_druid(druid: Arc<DruidCluster>, ctx: Arc<Ctx>) -> Result<
394408 cm_name : zk_confmap. clone ( ) ,
395409 } ) ?;
396410
397- let vector_aggregator_address = resolve_vector_aggregator_address ( & druid, client)
411+ let vector_aggregator_address = resolve_vector_aggregator_address ( druid, client)
398412 . await
399413 . context ( ResolveVectorAggregatorAddressSnafu ) ?;
400414
@@ -404,12 +418,7 @@ pub async fn reconcile_druid(druid: Arc<DruidCluster>, ctx: Arc<Ctx>) -> Result<
404418 {
405419 Some (
406420 opa_config
407- . full_document_url_from_config_map (
408- client,
409- druid. deref ( ) ,
410- Some ( "allow" ) ,
411- OpaApiVersion :: V1 ,
412- )
421+ . full_document_url_from_config_map ( client, druid, Some ( "allow" ) , OpaApiVersion :: V1 )
413422 . await
414423 . context ( GetOpaConnStringSnafu {
415424 cm_name : opa_config. config_map_name . clone ( ) ,
@@ -444,12 +453,12 @@ pub async fn reconcile_druid(druid: Arc<DruidCluster>, ctx: Arc<Ctx>) -> Result<
444453 . context ( AuthenticationClassRetrievalSnafu ) ?;
445454
446455 let druid_tls_security =
447- DruidTlsSecurity :: new_from_druid_cluster ( & druid, & resolved_auth_classes) ;
456+ DruidTlsSecurity :: new_from_druid_cluster ( druid, & resolved_auth_classes) ;
448457
449458 let druid_auth_config = DruidAuthenticationConfig :: try_from ( resolved_auth_classes)
450459 . context ( InvalidDruidAuthenticationConfigSnafu ) ?;
451460
452- let role_config = transform_all_roles_to_config ( druid. as_ref ( ) , druid. build_role_properties ( ) ) ;
461+ let role_config = transform_all_roles_to_config ( druid, druid. build_role_properties ( ) ) ;
453462 let validated_role_config = validate_all_roles_and_groups_config (
454463 & resolved_product_image. product_version ,
455464 & role_config. context ( ProductConfigTransformSnafu ) ?,
@@ -471,7 +480,7 @@ pub async fn reconcile_druid(druid: Arc<DruidCluster>, ctx: Arc<Ctx>) -> Result<
471480 let merged_config = druid. merged_config ( ) . context ( FailedToResolveConfigSnafu ) ?;
472481
473482 let ( rbac_sa, rbac_rolebinding) = build_rbac_resources (
474- druid. as_ref ( ) ,
483+ druid,
475484 APP_NAME ,
476485 cluster_resources
477486 . get_required_labels ( )
@@ -495,7 +504,7 @@ pub async fn reconcile_druid(druid: Arc<DruidCluster>, ctx: Arc<Ctx>) -> Result<
495504 } ) ?;
496505
497506 let role_service = build_role_service (
498- & druid,
507+ druid,
499508 & resolved_product_image,
500509 & druid_role,
501510 & druid_tls_security,
@@ -505,13 +514,13 @@ pub async fn reconcile_druid(druid: Arc<DruidCluster>, ctx: Arc<Ctx>) -> Result<
505514 . await
506515 . context ( ApplyRoleServiceSnafu ) ?;
507516
508- create_shared_internal_secret ( & druid, client, DRUID_CONTROLLER_NAME )
517+ create_shared_internal_secret ( druid, client, DRUID_CONTROLLER_NAME )
509518 . await
510519 . context ( FailedInternalSecretCreationSnafu ) ?;
511520
512521 for ( rolegroup_name, rolegroup_config) in role_config. iter ( ) {
513522 let rolegroup = RoleGroupRef {
514- cluster : ObjectRef :: from_obj ( & * druid) ,
523+ cluster : ObjectRef :: from_obj ( druid) ,
515524 role : role_name. into ( ) ,
516525 role_group : rolegroup_name. into ( ) ,
517526 } ;
@@ -521,13 +530,13 @@ pub async fn reconcile_druid(druid: Arc<DruidCluster>, ctx: Arc<Ctx>) -> Result<
521530 . context ( FailedToResolveConfigSnafu ) ?;
522531
523532 let rg_service = build_rolegroup_services (
524- & druid,
533+ druid,
525534 & resolved_product_image,
526535 & rolegroup,
527536 & druid_tls_security,
528537 ) ?;
529538 let rg_configmap = build_rolegroup_config_map (
530- & druid,
539+ druid,
531540 & resolved_product_image,
532541 & rolegroup,
533542 rolegroup_config,
@@ -541,7 +550,7 @@ pub async fn reconcile_druid(druid: Arc<DruidCluster>, ctx: Arc<Ctx>) -> Result<
541550 & druid_auth_config,
542551 ) ?;
543552 let rg_statefulset = build_rolegroup_statefulset (
544- & druid,
553+ druid,
545554 & resolved_product_image,
546555 & druid_role,
547556 & rolegroup,
@@ -577,7 +586,7 @@ pub async fn reconcile_druid(druid: Arc<DruidCluster>, ctx: Arc<Ctx>) -> Result<
577586
578587 add_pdbs (
579588 & role_config. pod_disruption_budget ,
580- & druid,
589+ druid,
581590 & druid_role,
582591 client,
583592 & mut cluster_resources,
@@ -587,14 +596,10 @@ pub async fn reconcile_druid(druid: Arc<DruidCluster>, ctx: Arc<Ctx>) -> Result<
587596 }
588597
589598 // discovery
590- for discovery_cm in build_discovery_configmaps (
591- & druid,
592- & * druid,
593- & resolved_product_image,
594- & druid_tls_security,
595- )
596- . await
597- . context ( BuildDiscoveryConfigSnafu ) ?
599+ for discovery_cm in
600+ build_discovery_configmaps ( druid, druid, & resolved_product_image, & druid_tls_security)
601+ . await
602+ . context ( BuildDiscoveryConfigSnafu ) ?
598603 {
599604 cluster_resources
600605 . add ( client, discovery_cm)
@@ -606,18 +611,15 @@ pub async fn reconcile_druid(druid: Arc<DruidCluster>, ctx: Arc<Ctx>) -> Result<
606611 ClusterOperationsConditionBuilder :: new ( & druid. spec . cluster_operation ) ;
607612
608613 let status = DruidClusterStatus {
609- conditions : compute_conditions (
610- druid. as_ref ( ) ,
611- & [ & ss_cond_builder, & cluster_operation_cond_builder] ,
612- ) ,
614+ conditions : compute_conditions ( druid, & [ & ss_cond_builder, & cluster_operation_cond_builder] ) ,
613615 } ;
614616
615617 cluster_resources
616618 . delete_orphaned_resources ( client)
617619 . await
618620 . context ( DeleteOrphanedResourcesSnafu ) ?;
619621 client
620- . apply_patch_status ( OPERATOR_NAME , & * druid, & status)
622+ . apply_patch_status ( OPERATOR_NAME , druid, & status)
621623 . await
622624 . context ( ApplyStatusSnafu ) ?;
623625
@@ -1297,8 +1299,15 @@ fn add_log_volume_and_volume_mounts(
12971299 Ok ( ( ) )
12981300}
12991301
1300- pub fn error_policy ( _obj : Arc < DruidCluster > , _error : & Error , _ctx : Arc < Ctx > ) -> Action {
1301- Action :: requeue ( * Duration :: from_secs ( 5 ) )
1302+ pub fn error_policy (
1303+ _obj : Arc < DeserializeGuard < DruidCluster > > ,
1304+ error : & Error ,
1305+ _ctx : Arc < Ctx > ,
1306+ ) -> Action {
1307+ match error {
1308+ Error :: InvalidDruidCluster { .. } => Action :: await_change ( ) ,
1309+ _ => Action :: requeue ( * Duration :: from_secs ( 5 ) ) ,
1310+ }
13021311}
13031312
13041313#[ cfg( test) ]
0 commit comments