@@ -32,6 +32,7 @@ use stackable_operator::{
3232 } ,
3333 kube:: {
3434 api:: ObjectMeta ,
35+ core:: { error_boundary, DeserializeGuard } ,
3536 runtime:: { controller:: Action , reflector:: ObjectRef } ,
3637 Resource , ResourceExt ,
3738 } ,
@@ -241,6 +242,11 @@ pub enum Error {
241242
242243 #[ snafu( display( "invalid OPA configuration" ) ) ]
243244 InvalidOpaConfig { source : security:: opa:: Error } ,
245+
246+ #[ snafu( display( "HdfsCluster object is invalid" ) ) ]
247+ InvalidHdfsCluster {
248+ source : error_boundary:: InvalidObject ,
249+ } ,
244250}
245251
246252impl ReconcilerError for Error {
@@ -256,23 +262,32 @@ pub struct Ctx {
256262 pub product_config : ProductConfigManager ,
257263}
258264
259- pub async fn reconcile_hdfs ( hdfs : Arc < HdfsCluster > , ctx : Arc < Ctx > ) -> HdfsOperatorResult < Action > {
265+ pub async fn reconcile_hdfs (
266+ hdfs : Arc < DeserializeGuard < HdfsCluster > > ,
267+ ctx : Arc < Ctx > ,
268+ ) -> HdfsOperatorResult < Action > {
260269 tracing:: info!( "Starting reconcile" ) ;
270+
271+ let hdfs = hdfs
272+ . 0
273+ . as_ref ( )
274+ . map_err ( error_boundary:: InvalidObject :: clone)
275+ . context ( InvalidHdfsClusterSnafu ) ?;
261276 let client = & ctx. client ;
262277
263278 let resolved_product_image = hdfs
264279 . spec
265280 . image
266281 . resolve ( DOCKER_IMAGE_BASE_NAME , crate :: built_info:: PKG_VERSION ) ;
267282
268- let vector_aggregator_address = resolve_vector_aggregator_address ( & hdfs, client)
283+ let vector_aggregator_address = resolve_vector_aggregator_address ( hdfs, client)
269284 . await
270285 . context ( ResolveVectorAggregatorAddressSnafu ) ?;
271286
272287 let validated_config = validate_all_roles_and_groups_config (
273288 & resolved_product_image. product_version ,
274289 & transform_all_roles_to_config (
275- hdfs. as_ref ( ) ,
290+ hdfs,
276291 hdfs. build_role_properties ( )
277292 . context ( BuildRolePropertiesSnafu ) ?,
278293 )
@@ -302,7 +317,7 @@ pub async fn reconcile_hdfs(hdfs: Arc<HdfsCluster>, ctx: Arc<Ctx>) -> HdfsOperat
302317
303318 // The service account and rolebinding will be created per cluster
304319 let ( rbac_sa, rbac_rolebinding) = build_rbac_resources (
305- hdfs. as_ref ( ) ,
320+ hdfs,
306321 APP_NAME ,
307322 cluster_resources
308323 . get_required_labels ( )
@@ -321,7 +336,7 @@ pub async fn reconcile_hdfs(hdfs: Arc<HdfsCluster>, ctx: Arc<Ctx>) -> HdfsOperat
321336
322337 let hdfs_opa_config = match & hdfs. spec . cluster_config . authorization {
323338 Some ( opa_config) => Some (
324- HdfsOpaConfig :: from_opa_config ( client, & hdfs, opa_config)
339+ HdfsOpaConfig :: from_opa_config ( client, hdfs, opa_config)
325340 . await
326341 . context ( InvalidOpaConfigSnafu ) ?,
327342 ) ,
@@ -354,9 +369,9 @@ pub async fn reconcile_hdfs(hdfs: Arc<HdfsCluster>, ctx: Arc<Ctx>) -> HdfsOperat
354369 continue ;
355370 } ;
356371
357- if let Some ( content) = build_invalid_replica_message ( & hdfs, & role, dfs_replication) {
372+ if let Some ( content) = build_invalid_replica_message ( hdfs, & role, dfs_replication) {
358373 publish_event (
359- & hdfs,
374+ hdfs,
360375 client,
361376 "Reconcile" ,
362377 "Invalid replicas" ,
@@ -368,7 +383,7 @@ pub async fn reconcile_hdfs(hdfs: Arc<HdfsCluster>, ctx: Arc<Ctx>) -> HdfsOperat
368383
369384 for ( rolegroup_name, rolegroup_config) in group_config. iter ( ) {
370385 let merged_config = role
371- . merged_config ( & hdfs, rolegroup_name)
386+ . merged_config ( hdfs, rolegroup_name)
372387 . context ( ConfigMergeSnafu ) ?;
373388
374389 let env_overrides = rolegroup_config. get ( & PropertyNameKind :: Env ) ;
@@ -379,25 +394,25 @@ pub async fn reconcile_hdfs(hdfs: Arc<HdfsCluster>, ctx: Arc<Ctx>) -> HdfsOperat
379394 // to avoid the compiler error "E0716 (temporary value dropped while borrowed)".
380395 let mut metadata = ObjectMetaBuilder :: new ( ) ;
381396 let metadata = metadata
382- . name_and_namespace ( hdfs. as_ref ( ) )
397+ . name_and_namespace ( hdfs)
383398 . name ( rolegroup_ref. object_name ( ) )
384- . ownerreference_from_resource ( hdfs. as_ref ( ) , None , Some ( true ) )
399+ . ownerreference_from_resource ( hdfs, None , Some ( true ) )
385400 . with_context ( |_| ObjectMissingMetadataForOwnerRefSnafu {
386- obj_ref : ObjectRef :: from_obj ( hdfs. as_ref ( ) ) ,
401+ obj_ref : ObjectRef :: from_obj ( hdfs) ,
387402 } ) ?
388403 . with_recommended_labels ( build_recommended_labels (
389- hdfs. as_ref ( ) ,
404+ hdfs,
390405 RESOURCE_MANAGER_HDFS_CONTROLLER ,
391406 & resolved_product_image. app_version_label ,
392407 & rolegroup_ref. role ,
393408 & rolegroup_ref. role_group ,
394409 ) )
395410 . context ( ObjectMetaSnafu ) ?;
396411
397- let rg_service = rolegroup_service ( & hdfs, metadata, & role, & rolegroup_ref) ?;
412+ let rg_service = rolegroup_service ( hdfs, metadata, & role, & rolegroup_ref) ?;
398413
399414 let rg_configmap = rolegroup_config_map (
400- & hdfs,
415+ hdfs,
401416 & client. kubernetes_cluster_info ,
402417 metadata,
403418 & rolegroup_ref,
@@ -410,7 +425,7 @@ pub async fn reconcile_hdfs(hdfs: Arc<HdfsCluster>, ctx: Arc<Ctx>) -> HdfsOperat
410425 ) ?;
411426
412427 let rg_statefulset = rolegroup_statefulset (
413- & hdfs,
428+ hdfs,
414429 & client. kubernetes_cluster_info ,
415430 metadata,
416431 & role,
@@ -463,7 +478,7 @@ pub async fn reconcile_hdfs(hdfs: Arc<HdfsCluster>, ctx: Arc<Ctx>) -> HdfsOperat
463478 pod_disruption_budget : pdb,
464479 } ) = role_config
465480 {
466- add_pdbs ( pdb, & hdfs, & role, client, & mut cluster_resources)
481+ add_pdbs ( pdb, hdfs, & role, client, & mut cluster_resources)
467482 . await
468483 . context ( FailedToCreatePdbSnafu ) ?;
469484 }
@@ -472,7 +487,7 @@ pub async fn reconcile_hdfs(hdfs: Arc<HdfsCluster>, ctx: Arc<Ctx>) -> HdfsOperat
472487 // Discovery CM will fail to build until the rest of the cluster has been deployed, so do it last
473488 // so that failure won't inhibit the rest of the cluster from booting up.
474489 let discovery_cm = build_discovery_configmap (
475- & hdfs,
490+ hdfs,
476491 & client. kubernetes_cluster_info ,
477492 HDFS_CONTROLLER ,
478493 & hdfs
@@ -496,10 +511,7 @@ pub async fn reconcile_hdfs(hdfs: Arc<HdfsCluster>, ctx: Arc<Ctx>) -> HdfsOperat
496511 ClusterOperationsConditionBuilder :: new ( & hdfs. spec . cluster_operation ) ;
497512
498513 let status = HdfsClusterStatus {
499- conditions : compute_conditions (
500- hdfs. as_ref ( ) ,
501- & [ & ss_cond_builder, & cluster_operation_cond_builder] ,
502- ) ,
514+ conditions : compute_conditions ( hdfs, & [ & ss_cond_builder, & cluster_operation_cond_builder] ) ,
503515 // FIXME: We can't currently leave upgrade mode automatically, since we don't know when an upgrade is finalized
504516 deployed_product_version : Some (
505517 hdfs. status
@@ -539,7 +551,7 @@ pub async fn reconcile_hdfs(hdfs: Arc<HdfsCluster>, ctx: Arc<Ctx>) -> HdfsOperat
539551 . context ( DeleteOrphanedResourcesSnafu ) ?;
540552 }
541553 client
542- . apply_patch_status ( OPERATOR_NAME , & * hdfs, & status)
554+ . apply_patch_status ( OPERATOR_NAME , hdfs, & status)
543555 . await
544556 . context ( ApplyStatusSnafu ) ?;
545557
@@ -893,8 +905,15 @@ fn rolegroup_statefulset(
893905 } )
894906}
895907
896- pub fn error_policy ( _obj : Arc < HdfsCluster > , _error : & Error , _ctx : Arc < Ctx > ) -> Action {
897- Action :: requeue ( * Duration :: from_secs ( 5 ) )
908+ pub fn error_policy (
909+ _obj : Arc < DeserializeGuard < HdfsCluster > > ,
910+ error : & Error ,
911+ _ctx : Arc < Ctx > ,
912+ ) -> Action {
913+ match error {
914+ Error :: InvalidHdfsCluster { .. } => Action :: await_change ( ) ,
915+ _ => Action :: requeue ( * Duration :: from_secs ( 5 ) ) ,
916+ }
898917}
899918
900919#[ cfg( test) ]
0 commit comments