@@ -29,6 +29,7 @@ use stackable_operator::{
2929 } ,
3030 apimachinery:: pkg:: { apis:: meta:: v1:: LabelSelector , util:: intstr:: IntOrString } ,
3131 } ,
32+ kube:: core:: { error_boundary, DeserializeGuard } ,
3233 kube:: { runtime:: controller:: Action , Resource } ,
3334 kvp:: { Label , LabelError , Labels , ObjectLabels } ,
3435 logging:: controller:: ReconcilerError ,
@@ -280,6 +281,11 @@ pub enum Error {
280281
281282 #[ snafu( display( "authorization is only supported from HBase 2.6 onwards" ) ) ]
282283 AuthorizationNotSupported ,
284+
285+ #[ snafu( display( "HBaseCluster object is invalid" ) ) ]
286+ InvalidHBaseCluster {
287+ source : error_boundary:: InvalidObject ,
288+ } ,
283289}
284290
285291type Result < T , E = Error > = std:: result:: Result < T , E > ;
@@ -290,31 +296,39 @@ impl ReconcilerError for Error {
290296 }
291297}
292298
293- pub async fn reconcile_hbase ( hbase : Arc < HbaseCluster > , ctx : Arc < Ctx > ) -> Result < Action > {
299+ pub async fn reconcile_hbase (
300+ hbase : Arc < DeserializeGuard < HbaseCluster > > ,
301+ ctx : Arc < Ctx > ,
302+ ) -> Result < Action > {
294303 tracing:: info!( "Starting reconcile" ) ;
295304
305+ let hbase = hbase
306+ . 0
307+ . as_ref ( )
308+ . map_err ( error_boundary:: InvalidObject :: clone)
309+ . context ( InvalidHBaseClusterSnafu ) ?;
310+
296311 let client = & ctx. client ;
297312
298- validate_cr ( & hbase) ?;
313+ validate_cr ( hbase) ?;
299314
300315 let resolved_product_image = hbase
301316 . spec
302317 . image
303318 . resolve ( DOCKER_IMAGE_BASE_NAME , crate :: built_info:: PKG_VERSION ) ;
304- let zookeeper_connection_information = ZookeeperConnectionInformation :: retrieve ( & hbase, client)
319+ let zookeeper_connection_information = ZookeeperConnectionInformation :: retrieve ( hbase, client)
305320 . await
306321 . context ( RetrieveZookeeperConnectionInformationSnafu ) ?;
307322
308- let vector_aggregator_address = resolve_vector_aggregator_address ( & hbase, client)
323+ let vector_aggregator_address = resolve_vector_aggregator_address ( hbase, client)
309324 . await
310325 . context ( ResolveVectorAggregatorAddressSnafu ) ?;
311326
312327 let roles = hbase. build_role_properties ( ) . context ( RolePropertiesSnafu ) ?;
313328
314329 let validated_config = validate_all_roles_and_groups_config (
315330 & resolved_product_image. app_version_label ,
316- & transform_all_roles_to_config ( hbase. as_ref ( ) , roles)
317- . context ( GenerateProductConfigSnafu ) ?,
331+ & transform_all_roles_to_config ( hbase, roles) . context ( GenerateProductConfigSnafu ) ?,
318332 & ctx. product_config ,
319333 false ,
320334 false ,
@@ -323,7 +337,7 @@ pub async fn reconcile_hbase(hbase: Arc<HbaseCluster>, ctx: Arc<Ctx>) -> Result<
323337
324338 let hbase_opa_config = match & hbase. spec . cluster_config . authorization {
325339 Some ( opa_config) => Some (
326- HbaseOpaConfig :: from_opa_config ( client, & hbase, opa_config)
340+ HbaseOpaConfig :: from_opa_config ( client, hbase, opa_config)
327341 . await
328342 . context ( InvalidOpaConfigSnafu ) ?,
329343 ) ,
@@ -340,15 +354,15 @@ pub async fn reconcile_hbase(hbase: Arc<HbaseCluster>, ctx: Arc<Ctx>) -> Result<
340354 . context ( CreateClusterResourcesSnafu ) ?;
341355
342356 let region_server_role_service =
343- build_region_server_role_service ( & hbase, & resolved_product_image) ?;
357+ build_region_server_role_service ( hbase, & resolved_product_image) ?;
344358 cluster_resources
345359 . add ( client, region_server_role_service)
346360 . await
347361 . context ( ApplyRoleServiceSnafu ) ?;
348362
349363 // discovery config map
350364 let discovery_cm = build_discovery_configmap (
351- & hbase,
365+ hbase,
352366 & zookeeper_connection_information,
353367 & resolved_product_image,
354368 )
@@ -359,7 +373,7 @@ pub async fn reconcile_hbase(hbase: Arc<HbaseCluster>, ctx: Arc<Ctx>) -> Result<
359373 . context ( ApplyDiscoveryConfigMapSnafu ) ?;
360374
361375 let ( rbac_sa, rbac_rolebinding) = build_rbac_resources (
362- hbase. as_ref ( ) ,
376+ hbase,
363377 APP_NAME ,
364378 cluster_resources
365379 . get_required_labels ( )
@@ -393,9 +407,9 @@ pub async fn reconcile_hbase(hbase: Arc<HbaseCluster>, ctx: Arc<Ctx>) -> Result<
393407 . context ( FailedToResolveConfigSnafu ) ?;
394408
395409 let rg_service =
396- build_rolegroup_service ( & hbase, & hbase_role, & rolegroup, & resolved_product_image) ?;
410+ build_rolegroup_service ( hbase, & hbase_role, & rolegroup, & resolved_product_image) ?;
397411 let rg_configmap = build_rolegroup_config_map (
398- & hbase,
412+ hbase,
399413 & rolegroup,
400414 rolegroup_config,
401415 & zookeeper_connection_information,
@@ -405,7 +419,7 @@ pub async fn reconcile_hbase(hbase: Arc<HbaseCluster>, ctx: Arc<Ctx>) -> Result<
405419 vector_aggregator_address. as_deref ( ) ,
406420 ) ?;
407421 let rg_statefulset = build_rolegroup_statefulset (
408- & hbase,
422+ hbase,
409423 & hbase_role,
410424 & rolegroup,
411425 rolegroup_config,
@@ -439,7 +453,7 @@ pub async fn reconcile_hbase(hbase: Arc<HbaseCluster>, ctx: Arc<Ctx>) -> Result<
439453 pod_disruption_budget : pdb,
440454 } ) = role_config
441455 {
442- add_pdbs ( pdb, & hbase, & hbase_role, client, & mut cluster_resources)
456+ add_pdbs ( pdb, hbase, & hbase_role, client, & mut cluster_resources)
443457 . await
444458 . context ( FailedToCreatePdbSnafu ) ?;
445459 }
@@ -449,18 +463,15 @@ pub async fn reconcile_hbase(hbase: Arc<HbaseCluster>, ctx: Arc<Ctx>) -> Result<
449463 ClusterOperationsConditionBuilder :: new ( & hbase. spec . cluster_operation ) ;
450464
451465 let status = HbaseClusterStatus {
452- conditions : compute_conditions (
453- hbase. as_ref ( ) ,
454- & [ & ss_cond_builder, & cluster_operation_cond_builder] ,
455- ) ,
466+ conditions : compute_conditions ( hbase, & [ & ss_cond_builder, & cluster_operation_cond_builder] ) ,
456467 } ;
457468
458469 cluster_resources
459470 . delete_orphaned_resources ( client)
460471 . await
461472 . context ( DeleteOrphanedResourcesSnafu ) ?;
462473 client
463- . apply_patch_status ( OPERATOR_NAME , hbase. as_ref ( ) , & status)
474+ . apply_patch_status ( OPERATOR_NAME , hbase, & status)
464475 . await
465476 . context ( ApplyStatusSnafu ) ?;
466477
@@ -998,8 +1009,16 @@ where
9981009 } )
9991010}
10001011
1001- pub fn error_policy ( _obj : Arc < HbaseCluster > , _error : & Error , _ctx : Arc < Ctx > ) -> Action {
1002- Action :: requeue ( * Duration :: from_secs ( 5 ) )
1012+ pub fn error_policy (
1013+ _obj : Arc < DeserializeGuard < HbaseCluster > > ,
1014+ error : & Error ,
1015+ _ctx : Arc < Ctx > ,
1016+ ) -> Action {
1017+ match error {
1018+ // root object is invalid, will be requed when modified
1019+ Error :: InvalidHBaseCluster { .. } => Action :: await_change ( ) ,
1020+ _ => Action :: requeue ( * Duration :: from_secs ( 5 ) ) ,
1021+ }
10031022}
10041023
10051024pub fn build_recommended_labels < ' a > (
0 commit comments