@@ -54,6 +54,7 @@ use stackable_operator::{
5454 } ,
5555 DeepMerge ,
5656 } ,
57+ kube:: core:: { error_boundary, DeserializeGuard } ,
5758 kube:: { runtime:: controller:: Action , Resource , ResourceExt } ,
5859 kvp:: { Label , Labels , ObjectLabels } ,
5960 logging:: controller:: ReconcilerError ,
@@ -323,6 +324,11 @@ pub enum Error {
323324 AddVolumeMount {
324325 source : builder:: pod:: container:: Error ,
325326 } ,
327+
328+ #[ snafu( display( "HiveCluster object is invalid" ) ) ]
329+ InvalidHiveCluster {
330+ source : error_boundary:: InvalidObject ,
331+ } ,
326332}
327333type Result < T , E = Error > = std:: result:: Result < T , E > ;
328334
@@ -332,8 +338,16 @@ impl ReconcilerError for Error {
332338 }
333339}
334340
335- pub async fn reconcile_hive ( hive : Arc < HiveCluster > , ctx : Arc < Ctx > ) -> Result < Action > {
341+ pub async fn reconcile_hive (
342+ hive : Arc < DeserializeGuard < HiveCluster > > ,
343+ ctx : Arc < Ctx > ,
344+ ) -> Result < Action > {
336345 tracing:: info!( "Starting reconcile" ) ;
346+ let hive = hive
347+ . 0
348+ . as_ref ( )
349+ . map_err ( error_boundary:: InvalidObject :: clone)
350+ . context ( InvalidHiveClusterSnafu ) ?;
337351 let client = & ctx. client ;
338352 let hive_namespace = hive. namespace ( ) . context ( ObjectHasNoNamespaceSnafu ) ?;
339353
@@ -361,7 +375,7 @@ pub async fn reconcile_hive(hive: Arc<HiveCluster>, ctx: Arc<Ctx>) -> Result<Act
361375 let validated_config = validate_all_roles_and_groups_config (
362376 & resolved_product_image. product_version ,
363377 & transform_all_roles_to_config (
364- hive. as_ref ( ) ,
378+ hive,
365379 [ (
366380 HiveRole :: MetaStore . to_string ( ) ,
367381 (
@@ -399,7 +413,7 @@ pub async fn reconcile_hive(hive: Arc<HiveCluster>, ctx: Arc<Ctx>) -> Result<Act
399413 . context ( CreateClusterResourcesSnafu ) ?;
400414
401415 let ( rbac_sa, rbac_rolebinding) = build_rbac_resources (
402- hive. as_ref ( ) ,
416+ hive,
403417 APP_NAME ,
404418 cluster_resources
405419 . get_required_labels ( )
@@ -416,15 +430,15 @@ pub async fn reconcile_hive(hive: Arc<HiveCluster>, ctx: Arc<Ctx>) -> Result<Act
416430 . await
417431 . context ( ApplyRoleBindingSnafu ) ?;
418432
419- let metastore_role_service = build_metastore_role_service ( & hive, & resolved_product_image) ?;
433+ let metastore_role_service = build_metastore_role_service ( hive, & resolved_product_image) ?;
420434
421435 // we have to get the assigned ports
422436 let metastore_role_service = cluster_resources
423437 . add ( client, metastore_role_service)
424438 . await
425439 . context ( ApplyRoleServiceSnafu ) ?;
426440
427- let vector_aggregator_address = resolve_vector_aggregator_address ( & hive, client)
441+ let vector_aggregator_address = resolve_vector_aggregator_address ( hive, client)
428442 . await
429443 . context ( ResolveVectorAggregatorAddressSnafu ) ?;
430444
@@ -437,9 +451,9 @@ pub async fn reconcile_hive(hive: Arc<HiveCluster>, ctx: Arc<Ctx>) -> Result<Act
437451 . merged_config ( & HiveRole :: MetaStore , & rolegroup)
438452 . context ( FailedToResolveResourceConfigSnafu ) ?;
439453
440- let rg_service = build_rolegroup_service ( & hive, & resolved_product_image, & rolegroup) ?;
454+ let rg_service = build_rolegroup_service ( hive, & resolved_product_image, & rolegroup) ?;
441455 let rg_configmap = build_metastore_rolegroup_config_map (
442- & hive,
456+ hive,
443457 & hive_namespace,
444458 & resolved_product_image,
445459 & rolegroup,
@@ -449,7 +463,7 @@ pub async fn reconcile_hive(hive: Arc<HiveCluster>, ctx: Arc<Ctx>) -> Result<Act
449463 vector_aggregator_address. as_deref ( ) ,
450464 ) ?;
451465 let rg_statefulset = build_metastore_rolegroup_statefulset (
452- & hive,
466+ hive,
453467 & hive_role,
454468 & resolved_product_image,
455469 & rolegroup,
@@ -488,7 +502,7 @@ pub async fn reconcile_hive(hive: Arc<HiveCluster>, ctx: Arc<Ctx>) -> Result<Act
488502 pod_disruption_budget : pdb,
489503 } ) = role_config
490504 {
491- add_pdbs ( pdb, & hive, & hive_role, client, & mut cluster_resources)
505+ add_pdbs ( pdb, hive, & hive_role, client, & mut cluster_resources)
492506 . await
493507 . context ( FailedToCreatePdbSnafu ) ?;
494508 }
@@ -498,8 +512,8 @@ pub async fn reconcile_hive(hive: Arc<HiveCluster>, ctx: Arc<Ctx>) -> Result<Act
498512 let mut discovery_hash = FnvHasher :: with_key ( 0 ) ;
499513 for discovery_cm in discovery:: build_discovery_configmaps (
500514 client,
501- & * hive,
502- & hive,
515+ hive,
516+ hive,
503517 & resolved_product_image,
504518 & metastore_role_service,
505519 None ,
@@ -523,14 +537,11 @@ pub async fn reconcile_hive(hive: Arc<HiveCluster>, ctx: Arc<Ctx>) -> Result<Act
523537 // Serialize as a string to discourage users from trying to parse the value,
524538 // and to keep things flexible if we end up changing the hasher at some point.
525539 discovery_hash : Some ( discovery_hash. finish ( ) . to_string ( ) ) ,
526- conditions : compute_conditions (
527- hive. as_ref ( ) ,
528- & [ & ss_cond_builder, & cluster_operation_cond_builder] ,
529- ) ,
540+ conditions : compute_conditions ( hive, & [ & ss_cond_builder, & cluster_operation_cond_builder] ) ,
530541 } ;
531542
532543 client
533- . apply_patch_status ( OPERATOR_NAME , & * hive, & status)
544+ . apply_patch_status ( OPERATOR_NAME , hive, & status)
534545 . await
535546 . context ( ApplyStatusSnafu ) ?;
536547
@@ -1117,8 +1128,16 @@ fn env_var_from_secret(var_name: &str, secret: &str, secret_key: &str) -> EnvVar
11171128 }
11181129}
11191130
1120- pub fn error_policy ( _obj : Arc < HiveCluster > , _error : & Error , _ctx : Arc < Ctx > ) -> Action {
1121- Action :: requeue ( * Duration :: from_secs ( 5 ) )
1131+ pub fn error_policy (
1132+ _obj : Arc < DeserializeGuard < HiveCluster > > ,
1133+ error : & Error ,
1134+ _ctx : Arc < Ctx > ,
1135+ ) -> Action {
1136+ match error {
1137+ // An invalid HBaseCluster was deserialized. Await for it to change.
1138+ Error :: InvalidHiveCluster { .. } => Action :: await_change ( ) ,
1139+ _ => Action :: requeue ( * Duration :: from_secs ( 5 ) ) ,
1140+ }
11221141}
11231142
11241143pub fn service_ports ( ) -> Vec < ServicePort > {
0 commit comments