@@ -39,6 +39,7 @@ use stackable_operator::{
3939 apimachinery:: pkg:: { apis:: meta:: v1:: LabelSelector , util:: intstr:: IntOrString } ,
4040 DeepMerge ,
4141 } ,
42+ kube:: core:: { error_boundary, DeserializeGuard } ,
4243 kube:: { runtime:: controller:: Action , Resource } ,
4344 kvp:: { Label , LabelError , Labels , ObjectLabels } ,
4445 logging:: controller:: ReconcilerError ,
@@ -305,6 +306,11 @@ pub enum Error {
305306 AddVolumeMount {
306307 source : builder:: pod:: container:: Error ,
307308 } ,
309+
310+ #[ snafu( display( "HBaseCluster object is invalid" ) ) ]
311+ InvalidHBaseCluster {
312+ source : error_boundary:: InvalidObject ,
313+ } ,
308314}
309315
310316type Result < T , E = Error > = std:: result:: Result < T , E > ;
@@ -315,31 +321,39 @@ impl ReconcilerError for Error {
315321 }
316322}
317323
318- pub async fn reconcile_hbase ( hbase : Arc < HbaseCluster > , ctx : Arc < Ctx > ) -> Result < Action > {
324+ pub async fn reconcile_hbase (
325+ hbase : Arc < DeserializeGuard < HbaseCluster > > ,
326+ ctx : Arc < Ctx > ,
327+ ) -> Result < Action > {
319328 tracing:: info!( "Starting reconcile" ) ;
320329
330+ let hbase = hbase
331+ . 0
332+ . as_ref ( )
333+ . map_err ( error_boundary:: InvalidObject :: clone)
334+ . context ( InvalidHBaseClusterSnafu ) ?;
335+
321336 let client = & ctx. client ;
322337
323- validate_cr ( & hbase) ?;
338+ validate_cr ( hbase) ?;
324339
325340 let resolved_product_image = hbase
326341 . spec
327342 . image
328343 . resolve ( DOCKER_IMAGE_BASE_NAME , crate :: built_info:: PKG_VERSION ) ;
329- let zookeeper_connection_information = ZookeeperConnectionInformation :: retrieve ( & hbase, client)
344+ let zookeeper_connection_information = ZookeeperConnectionInformation :: retrieve ( hbase, client)
330345 . await
331346 . context ( RetrieveZookeeperConnectionInformationSnafu ) ?;
332347
333- let vector_aggregator_address = resolve_vector_aggregator_address ( & hbase, client)
348+ let vector_aggregator_address = resolve_vector_aggregator_address ( hbase, client)
334349 . await
335350 . context ( ResolveVectorAggregatorAddressSnafu ) ?;
336351
337- let roles = build_roles ( & hbase) ?;
352+ let roles = build_roles ( hbase) ?;
338353
339354 let validated_config = validate_all_roles_and_groups_config (
340355 & resolved_product_image. app_version_label ,
341- & transform_all_roles_to_config ( hbase. as_ref ( ) , roles)
342- . context ( GenerateProductConfigSnafu ) ?,
356+ & transform_all_roles_to_config ( hbase, roles) . context ( GenerateProductConfigSnafu ) ?,
343357 & ctx. product_config ,
344358 false ,
345359 false ,
@@ -348,7 +362,7 @@ pub async fn reconcile_hbase(hbase: Arc<HbaseCluster>, ctx: Arc<Ctx>) -> Result<
348362
349363 let hbase_opa_config = match & hbase. spec . cluster_config . authorization {
350364 Some ( opa_config) => Some (
351- HbaseOpaConfig :: from_opa_config ( client, & hbase, opa_config)
365+ HbaseOpaConfig :: from_opa_config ( client, hbase, opa_config)
352366 . await
353367 . context ( InvalidOpaConfigSnafu ) ?,
354368 ) ,
@@ -365,15 +379,15 @@ pub async fn reconcile_hbase(hbase: Arc<HbaseCluster>, ctx: Arc<Ctx>) -> Result<
365379 . context ( CreateClusterResourcesSnafu ) ?;
366380
367381 let region_server_role_service =
368- build_region_server_role_service ( & hbase, & resolved_product_image) ?;
382+ build_region_server_role_service ( hbase, & resolved_product_image) ?;
369383 cluster_resources
370384 . add ( client, region_server_role_service)
371385 . await
372386 . context ( ApplyRoleServiceSnafu ) ?;
373387
374388 // discovery config map
375389 let discovery_cm = build_discovery_configmap (
376- & hbase,
390+ hbase,
377391 & client. kubernetes_cluster_info ,
378392 & zookeeper_connection_information,
379393 & resolved_product_image,
@@ -385,7 +399,7 @@ pub async fn reconcile_hbase(hbase: Arc<HbaseCluster>, ctx: Arc<Ctx>) -> Result<
385399 . context ( ApplyDiscoveryConfigMapSnafu ) ?;
386400
387401 let ( rbac_sa, rbac_rolebinding) = build_rbac_resources (
388- hbase. as_ref ( ) ,
402+ hbase,
389403 APP_NAME ,
390404 cluster_resources
391405 . get_required_labels ( )
@@ -419,9 +433,9 @@ pub async fn reconcile_hbase(hbase: Arc<HbaseCluster>, ctx: Arc<Ctx>) -> Result<
419433 . context ( FailedToResolveConfigSnafu ) ?;
420434
421435 let rg_service =
422- build_rolegroup_service ( & hbase, & hbase_role, & rolegroup, & resolved_product_image) ?;
436+ build_rolegroup_service ( hbase, & hbase_role, & rolegroup, & resolved_product_image) ?;
423437 let rg_configmap = build_rolegroup_config_map (
424- & hbase,
438+ hbase,
425439 & client. kubernetes_cluster_info ,
426440 & rolegroup,
427441 rolegroup_config,
@@ -432,7 +446,7 @@ pub async fn reconcile_hbase(hbase: Arc<HbaseCluster>, ctx: Arc<Ctx>) -> Result<
432446 vector_aggregator_address. as_deref ( ) ,
433447 ) ?;
434448 let rg_statefulset = build_rolegroup_statefulset (
435- & hbase,
449+ hbase,
436450 & hbase_role,
437451 & rolegroup,
438452 rolegroup_config,
@@ -466,7 +480,7 @@ pub async fn reconcile_hbase(hbase: Arc<HbaseCluster>, ctx: Arc<Ctx>) -> Result<
466480 pod_disruption_budget : pdb,
467481 } ) = role_config
468482 {
469- add_pdbs ( pdb, & hbase, & hbase_role, client, & mut cluster_resources)
483+ add_pdbs ( pdb, hbase, & hbase_role, client, & mut cluster_resources)
470484 . await
471485 . context ( FailedToCreatePdbSnafu ) ?;
472486 }
@@ -476,18 +490,15 @@ pub async fn reconcile_hbase(hbase: Arc<HbaseCluster>, ctx: Arc<Ctx>) -> Result<
476490 ClusterOperationsConditionBuilder :: new ( & hbase. spec . cluster_operation ) ;
477491
478492 let status = HbaseClusterStatus {
479- conditions : compute_conditions (
480- hbase. as_ref ( ) ,
481- & [ & ss_cond_builder, & cluster_operation_cond_builder] ,
482- ) ,
493+ conditions : compute_conditions ( hbase, & [ & ss_cond_builder, & cluster_operation_cond_builder] ) ,
483494 } ;
484495
485496 cluster_resources
486497 . delete_orphaned_resources ( client)
487498 . await
488499 . context ( DeleteOrphanedResourcesSnafu ) ?;
489500 client
490- . apply_patch_status ( OPERATOR_NAME , hbase. as_ref ( ) , & status)
501+ . apply_patch_status ( OPERATOR_NAME , hbase, & status)
491502 . await
492503 . context ( ApplyStatusSnafu ) ?;
493504
@@ -1104,8 +1115,16 @@ where
11041115 } )
11051116}
11061117
1107- pub fn error_policy ( _obj : Arc < HbaseCluster > , _error : & Error , _ctx : Arc < Ctx > ) -> Action {
1108- Action :: requeue ( * Duration :: from_secs ( 5 ) )
1118+ pub fn error_policy (
1119+ _obj : Arc < DeserializeGuard < HbaseCluster > > ,
1120+ error : & Error ,
1121+ _ctx : Arc < Ctx > ,
1122+ ) -> Action {
1123+ match error {
1124+ // root object is invalid, will be requed when modified
1125+ Error :: InvalidHBaseCluster { .. } => Action :: await_change ( ) ,
1126+ _ => Action :: requeue ( * Duration :: from_secs ( 5 ) ) ,
1127+ }
11091128}
11101129
11111130pub fn build_recommended_labels < ' a > (
0 commit comments