@@ -32,6 +32,7 @@ use stackable_operator::{
3232 } ,
3333 kube:: {
3434 api:: ObjectMeta ,
35+ core:: { error_boundary, DeserializeGuard } ,
3536 runtime:: { controller:: Action , reflector:: ObjectRef } ,
3637 Resource , ResourceExt ,
3738 } ,
@@ -240,6 +241,11 @@ pub enum Error {
240241
241242 #[ snafu( display( "invalid OPA configuration" ) ) ]
242243 InvalidOpaConfig { source : security:: opa:: Error } ,
244+
245+ #[ snafu( display( "HdfsCluster object is invalid" ) ) ]
246+ InvalidHdfsCluster {
247+ source : error_boundary:: InvalidObject ,
248+ } ,
243249}
244250
245251impl ReconcilerError for Error {
@@ -255,23 +261,32 @@ pub struct Ctx {
255261 pub product_config : ProductConfigManager ,
256262}
257263
258- pub async fn reconcile_hdfs ( hdfs : Arc < HdfsCluster > , ctx : Arc < Ctx > ) -> HdfsOperatorResult < Action > {
264+ pub async fn reconcile_hdfs (
265+ hdfs : Arc < DeserializeGuard < HdfsCluster > > ,
266+ ctx : Arc < Ctx > ,
267+ ) -> HdfsOperatorResult < Action > {
259268 tracing:: info!( "Starting reconcile" ) ;
269+
270+ let hdfs = hdfs
271+ . 0
272+ . as_ref ( )
273+ . map_err ( error_boundary:: InvalidObject :: clone)
274+ . context ( InvalidHdfsClusterSnafu ) ?;
260275 let client = & ctx. client ;
261276
262277 let resolved_product_image = hdfs
263278 . spec
264279 . image
265280 . resolve ( DOCKER_IMAGE_BASE_NAME , crate :: built_info:: PKG_VERSION ) ;
266281
267- let vector_aggregator_address = resolve_vector_aggregator_address ( & hdfs, client)
282+ let vector_aggregator_address = resolve_vector_aggregator_address ( hdfs, client)
268283 . await
269284 . context ( ResolveVectorAggregatorAddressSnafu ) ?;
270285
271286 let validated_config = validate_all_roles_and_groups_config (
272287 & resolved_product_image. product_version ,
273288 & transform_all_roles_to_config (
274- hdfs. as_ref ( ) ,
289+ hdfs,
275290 hdfs. build_role_properties ( )
276291 . context ( BuildRolePropertiesSnafu ) ?,
277292 )
@@ -301,7 +316,7 @@ pub async fn reconcile_hdfs(hdfs: Arc<HdfsCluster>, ctx: Arc<Ctx>) -> HdfsOperat
301316
302317 // The service account and rolebinding will be created per cluster
303318 let ( rbac_sa, rbac_rolebinding) = build_rbac_resources (
304- hdfs. as_ref ( ) ,
319+ hdfs,
305320 APP_NAME ,
306321 cluster_resources
307322 . get_required_labels ( )
@@ -320,7 +335,7 @@ pub async fn reconcile_hdfs(hdfs: Arc<HdfsCluster>, ctx: Arc<Ctx>) -> HdfsOperat
320335
321336 let hdfs_opa_config = match & hdfs. spec . cluster_config . authorization {
322337 Some ( opa_config) => Some (
323- HdfsOpaConfig :: from_opa_config ( client, & hdfs, opa_config)
338+ HdfsOpaConfig :: from_opa_config ( client, hdfs, opa_config)
324339 . await
325340 . context ( InvalidOpaConfigSnafu ) ?,
326341 ) ,
@@ -353,9 +368,9 @@ pub async fn reconcile_hdfs(hdfs: Arc<HdfsCluster>, ctx: Arc<Ctx>) -> HdfsOperat
353368 continue ;
354369 } ;
355370
356- if let Some ( content) = build_invalid_replica_message ( & hdfs, & role, dfs_replication) {
371+ if let Some ( content) = build_invalid_replica_message ( hdfs, & role, dfs_replication) {
357372 publish_event (
358- & hdfs,
373+ hdfs,
359374 client,
360375 "Reconcile" ,
361376 "Invalid replicas" ,
@@ -367,7 +382,7 @@ pub async fn reconcile_hdfs(hdfs: Arc<HdfsCluster>, ctx: Arc<Ctx>) -> HdfsOperat
367382
368383 for ( rolegroup_name, rolegroup_config) in group_config. iter ( ) {
369384 let merged_config = role
370- . merged_config ( & hdfs, rolegroup_name)
385+ . merged_config ( hdfs, rolegroup_name)
371386 . context ( ConfigMergeSnafu ) ?;
372387
373388 let env_overrides = rolegroup_config. get ( & PropertyNameKind :: Env ) ;
@@ -378,25 +393,25 @@ pub async fn reconcile_hdfs(hdfs: Arc<HdfsCluster>, ctx: Arc<Ctx>) -> HdfsOperat
378393 // to avoid the compiler error "E0716 (temporary value dropped while borrowed)".
379394 let mut metadata = ObjectMetaBuilder :: new ( ) ;
380395 let metadata = metadata
381- . name_and_namespace ( hdfs. as_ref ( ) )
396+ . name_and_namespace ( hdfs)
382397 . name ( rolegroup_ref. object_name ( ) )
383- . ownerreference_from_resource ( hdfs. as_ref ( ) , None , Some ( true ) )
398+ . ownerreference_from_resource ( hdfs, None , Some ( true ) )
384399 . with_context ( |_| ObjectMissingMetadataForOwnerRefSnafu {
385- obj_ref : ObjectRef :: from_obj ( hdfs. as_ref ( ) ) ,
400+ obj_ref : ObjectRef :: from_obj ( hdfs) ,
386401 } ) ?
387402 . with_recommended_labels ( build_recommended_labels (
388- hdfs. as_ref ( ) ,
403+ hdfs,
389404 RESOURCE_MANAGER_HDFS_CONTROLLER ,
390405 & resolved_product_image. app_version_label ,
391406 & rolegroup_ref. role ,
392407 & rolegroup_ref. role_group ,
393408 ) )
394409 . context ( ObjectMetaSnafu ) ?;
395410
396- let rg_service = rolegroup_service ( & hdfs, metadata, & role, & rolegroup_ref) ?;
411+ let rg_service = rolegroup_service ( hdfs, metadata, & role, & rolegroup_ref) ?;
397412
398413 let rg_configmap = rolegroup_config_map (
399- & hdfs,
414+ hdfs,
400415 metadata,
401416 & rolegroup_ref,
402417 rolegroup_config,
@@ -408,7 +423,7 @@ pub async fn reconcile_hdfs(hdfs: Arc<HdfsCluster>, ctx: Arc<Ctx>) -> HdfsOperat
408423 ) ?;
409424
410425 let rg_statefulset = rolegroup_statefulset (
411- & hdfs,
426+ hdfs,
412427 metadata,
413428 & role,
414429 & rolegroup_ref,
@@ -460,7 +475,7 @@ pub async fn reconcile_hdfs(hdfs: Arc<HdfsCluster>, ctx: Arc<Ctx>) -> HdfsOperat
460475 pod_disruption_budget : pdb,
461476 } ) = role_config
462477 {
463- add_pdbs ( pdb, & hdfs, & role, client, & mut cluster_resources)
478+ add_pdbs ( pdb, hdfs, & role, client, & mut cluster_resources)
464479 . await
465480 . context ( FailedToCreatePdbSnafu ) ?;
466481 }
@@ -469,7 +484,7 @@ pub async fn reconcile_hdfs(hdfs: Arc<HdfsCluster>, ctx: Arc<Ctx>) -> HdfsOperat
469484 // Discovery CM will fail to build until the rest of the cluster has been deployed, so do it last
470485 // so that failure won't inhibit the rest of the cluster from booting up.
471486 let discovery_cm = build_discovery_configmap (
472- & hdfs,
487+ hdfs,
473488 HDFS_CONTROLLER ,
474489 & hdfs
475490 . namenode_listener_refs ( client)
@@ -492,10 +507,7 @@ pub async fn reconcile_hdfs(hdfs: Arc<HdfsCluster>, ctx: Arc<Ctx>) -> HdfsOperat
492507 ClusterOperationsConditionBuilder :: new ( & hdfs. spec . cluster_operation ) ;
493508
494509 let status = HdfsClusterStatus {
495- conditions : compute_conditions (
496- hdfs. as_ref ( ) ,
497- & [ & ss_cond_builder, & cluster_operation_cond_builder] ,
498- ) ,
510+ conditions : compute_conditions ( hdfs, & [ & ss_cond_builder, & cluster_operation_cond_builder] ) ,
499511 // FIXME: We can't currently leave upgrade mode automatically, since we don't know when an upgrade is finalized
500512 deployed_product_version : Some (
501513 hdfs. status
@@ -535,7 +547,7 @@ pub async fn reconcile_hdfs(hdfs: Arc<HdfsCluster>, ctx: Arc<Ctx>) -> HdfsOperat
535547 . context ( DeleteOrphanedResourcesSnafu ) ?;
536548 }
537549 client
538- . apply_patch_status ( OPERATOR_NAME , & * hdfs, & status)
550+ . apply_patch_status ( OPERATOR_NAME , hdfs, & status)
539551 . await
540552 . context ( ApplyStatusSnafu ) ?;
541553
@@ -886,8 +898,15 @@ fn rolegroup_statefulset(
886898 } )
887899}
888900
889- pub fn error_policy ( _obj : Arc < HdfsCluster > , _error : & Error , _ctx : Arc < Ctx > ) -> Action {
890- Action :: requeue ( * Duration :: from_secs ( 5 ) )
901+ pub fn error_policy (
902+ _obj : Arc < DeserializeGuard < HdfsCluster > > ,
903+ error : & Error ,
904+ _ctx : Arc < Ctx > ,
905+ ) -> Action {
906+ match error {
907+ Error :: InvalidHdfsCluster { .. } => Action :: await_change ( ) ,
908+ _ => Action :: requeue ( * Duration :: from_secs ( 5 ) ) ,
909+ }
891910}
892911
893912#[ cfg( test) ]
0 commit comments