@@ -55,6 +55,7 @@ use stackable_operator::{
5555 } ,
5656 kube:: {
5757 api:: DynamicObject ,
58+ core:: { error_boundary, DeserializeGuard } ,
5859 runtime:: { controller:: Action , reflector:: ObjectRef } ,
5960 Resource , ResourceExt ,
6061 } ,
@@ -333,6 +334,11 @@ pub enum Error {
333334 AddVolumeMount {
334335 source : builder:: pod:: container:: Error ,
335336 } ,
337+
338+ #[ snafu( display( "KafkaCluster object is invalid" ) ) ]
339+ InvalidKafkaCluster {
340+ source : error_boundary:: InvalidObject ,
341+ } ,
336342}
337343type Result < T , E = Error > = std:: result:: Result < T , E > ;
338344
@@ -394,12 +400,23 @@ impl ReconcilerError for Error {
394400 Error :: ConfigureLogging { .. } => None ,
395401 Error :: AddVolume { .. } => None ,
396402 Error :: AddVolumeMount { .. } => None ,
403+ Error :: InvalidKafkaCluster { .. } => None ,
397404 }
398405 }
399406}
400407
401- pub async fn reconcile_kafka ( kafka : Arc < KafkaCluster > , ctx : Arc < Ctx > ) -> Result < Action > {
408+ pub async fn reconcile_kafka (
409+ kafka : Arc < DeserializeGuard < KafkaCluster > > ,
410+ ctx : Arc < Ctx > ,
411+ ) -> Result < Action > {
402412 tracing:: info!( "Starting reconcile" ) ;
413+
414+ let kafka = kafka
415+ . 0
416+ . as_ref ( )
417+ . map_err ( error_boundary:: InvalidObject :: clone)
418+ . context ( InvalidKafkaClusterSnafu ) ?;
419+
403420 let client = & ctx. client ;
404421 let kafka_role = KafkaRole :: Broker ;
405422
@@ -420,7 +437,7 @@ pub async fn reconcile_kafka(kafka: Arc<KafkaCluster>, ctx: Arc<Ctx>) -> Result<
420437 let validated_config = validate_all_roles_and_groups_config (
421438 & resolved_product_image. product_version ,
422439 & transform_all_roles_to_config (
423- kafka. as_ref ( ) ,
440+ kafka,
424441 [ (
425442 KafkaRole :: Broker . to_string ( ) ,
426443 (
@@ -445,7 +462,7 @@ pub async fn reconcile_kafka(kafka: Arc<KafkaCluster>, ctx: Arc<Ctx>) -> Result<
445462 . map ( Cow :: Borrowed )
446463 . unwrap_or_default ( ) ;
447464
448- let kafka_security = KafkaTlsSecurity :: new_from_kafka_cluster ( client, & kafka)
465+ let kafka_security = KafkaTlsSecurity :: new_from_kafka_cluster ( client, kafka)
449466 . await
450467 . context ( FailedToInitializeSecurityContextSnafu ) ?;
451468
@@ -454,27 +471,22 @@ pub async fn reconcile_kafka(kafka: Arc<KafkaCluster>, ctx: Arc<Ctx>) -> Result<
454471 let opa_connect = if let Some ( opa_spec) = & kafka. spec . cluster_config . authorization . opa {
455472 Some (
456473 opa_spec
457- . full_document_url_from_config_map (
458- client,
459- & * kafka,
460- Some ( "allow" ) ,
461- OpaApiVersion :: V1 ,
462- )
474+ . full_document_url_from_config_map ( client, kafka, Some ( "allow" ) , OpaApiVersion :: V1 )
463475 . await
464476 . context ( InvalidOpaConfigSnafu ) ?,
465477 )
466478 } else {
467479 None
468480 } ;
469481
470- let vector_aggregator_address = resolve_vector_aggregator_address ( & kafka, client)
482+ let vector_aggregator_address = resolve_vector_aggregator_address ( kafka, client)
471483 . await
472484 . context ( ResolveVectorAggregatorAddressSnafu ) ?;
473485
474486 let mut ss_cond_builder = StatefulSetConditionBuilder :: default ( ) ;
475487
476488 let ( rbac_sa, rbac_rolebinding) = build_rbac_resources (
477- kafka. as_ref ( ) ,
489+ kafka,
478490 APP_NAME ,
479491 cluster_resources
480492 . get_required_labels ( )
@@ -501,9 +513,9 @@ pub async fn reconcile_kafka(kafka: Arc<KafkaCluster>, ctx: Arc<Ctx>) -> Result<
501513 . context ( FailedToResolveConfigSnafu ) ?;
502514
503515 let rg_service =
504- build_broker_rolegroup_service ( & kafka, & resolved_product_image, & rolegroup_ref) ?;
516+ build_broker_rolegroup_service ( kafka, & resolved_product_image, & rolegroup_ref) ?;
505517 let rg_configmap = build_broker_rolegroup_config_map (
506- & kafka,
518+ kafka,
507519 & resolved_product_image,
508520 & kafka_security,
509521 & rolegroup_ref,
@@ -512,7 +524,7 @@ pub async fn reconcile_kafka(kafka: Arc<KafkaCluster>, ctx: Arc<Ctx>) -> Result<
512524 vector_aggregator_address. as_deref ( ) ,
513525 ) ?;
514526 let rg_statefulset = build_broker_rolegroup_statefulset (
515- & kafka,
527+ kafka,
516528 & kafka_role,
517529 & resolved_product_image,
518530 & rolegroup_ref,
@@ -523,7 +535,7 @@ pub async fn reconcile_kafka(kafka: Arc<KafkaCluster>, ctx: Arc<Ctx>) -> Result<
523535 & rbac_sa. name_any ( ) ,
524536 ) ?;
525537 let rg_bootstrap_listener = build_broker_rolegroup_bootstrap_listener (
526- & kafka,
538+ kafka,
527539 & resolved_product_image,
528540 & kafka_security,
529541 & rolegroup_ref,
@@ -564,14 +576,14 @@ pub async fn reconcile_kafka(kafka: Arc<KafkaCluster>, ctx: Arc<Ctx>) -> Result<
564576 pod_disruption_budget : pdb,
565577 } ) = role_config
566578 {
567- add_pdbs ( pdb, & kafka, & kafka_role, client, & mut cluster_resources)
579+ add_pdbs ( pdb, kafka, & kafka_role, client, & mut cluster_resources)
568580 . await
569581 . context ( FailedToCreatePdbSnafu ) ?;
570582 }
571583
572584 for discovery_cm in build_discovery_configmaps (
573- & kafka,
574- & * kafka,
585+ kafka,
586+ kafka,
575587 & resolved_product_image,
576588 & kafka_security,
577589 & bootstrap_listeners,
@@ -589,10 +601,7 @@ pub async fn reconcile_kafka(kafka: Arc<KafkaCluster>, ctx: Arc<Ctx>) -> Result<
589601 ClusterOperationsConditionBuilder :: new ( & kafka. spec . cluster_operation ) ;
590602
591603 let status = KafkaClusterStatus {
592- conditions : compute_conditions (
593- kafka. as_ref ( ) ,
594- & [ & ss_cond_builder, & cluster_operation_cond_builder] ,
595- ) ,
604+ conditions : compute_conditions ( kafka, & [ & ss_cond_builder, & cluster_operation_cond_builder] ) ,
596605 } ;
597606
598607 cluster_resources
@@ -601,7 +610,7 @@ pub async fn reconcile_kafka(kafka: Arc<KafkaCluster>, ctx: Arc<Ctx>) -> Result<
601610 . context ( DeleteOrphansSnafu ) ?;
602611
603612 client
604- . apply_patch_status ( OPERATOR_NAME , & * kafka, & status)
613+ . apply_patch_status ( OPERATOR_NAME , kafka, & status)
605614 . await
606615 . context ( ApplyStatusSnafu ) ?;
607616
@@ -1099,8 +1108,15 @@ fn build_broker_rolegroup_statefulset(
10991108 } )
11001109}
11011110
1102- pub fn error_policy ( _obj : Arc < KafkaCluster > , _error : & Error , _ctx : Arc < Ctx > ) -> Action {
1103- Action :: requeue ( * Duration :: from_secs ( 5 ) )
1111+ pub fn error_policy (
1112+ _obj : Arc < DeserializeGuard < KafkaCluster > > ,
1113+ error : & Error ,
1114+ _ctx : Arc < Ctx > ,
1115+ ) -> Action {
1116+ match error {
1117+ Error :: InvalidKafkaCluster { .. } => Action :: await_change ( ) ,
1118+ _ => Action :: requeue ( * Duration :: from_secs ( 5 ) ) ,
1119+ }
11041120}
11051121
11061122/// We only expose client HTTP / HTTPS and Metrics ports.
0 commit comments