@@ -45,6 +45,7 @@ use stackable_operator::{
4545 DeepMerge ,
4646 } ,
4747 kube:: {
48+ core:: { error_boundary, DeserializeGuard } ,
4849 runtime:: { controller:: Action , reflector:: ObjectRef } ,
4950 Resource , ResourceExt ,
5051 } ,
@@ -289,6 +290,11 @@ pub enum Error {
289290 "failed to write to String (Vec<u8> to be precise) containing Airflow config"
290291 ) ) ]
291292 WriteToConfigFileString { source : std:: io:: Error } ,
293+
294+ #[ snafu( display( "AirflowCluster object is invalid" ) ) ]
295+ InvalidAirflowCluster {
296+ source : error_boundary:: InvalidObject ,
297+ } ,
292298}
293299
294300type Result < T , E = Error > = std:: result:: Result < T , E > ;
@@ -299,9 +305,18 @@ impl ReconcilerError for Error {
299305 }
300306}
301307
302- pub async fn reconcile_airflow ( airflow : Arc < AirflowCluster > , ctx : Arc < Ctx > ) -> Result < Action > {
308+ pub async fn reconcile_airflow (
309+ airflow : Arc < DeserializeGuard < AirflowCluster > > ,
310+ ctx : Arc < Ctx > ,
311+ ) -> Result < Action > {
303312 tracing:: info!( "Starting reconcile" ) ;
304313
314+ let airflow = airflow
315+ . 0
316+ . as_ref ( )
317+ . map_err ( error_boundary:: InvalidObject :: clone)
318+ . context ( InvalidAirflowClusterSnafu ) ?;
319+
305320 let client = & ctx. client ;
306321 let resolved_product_image: ResolvedProductImage = airflow
307322 . spec
@@ -338,7 +353,7 @@ pub async fn reconcile_airflow(airflow: Arc<AirflowCluster>, ctx: Arc<Ctx>) -> R
338353 }
339354 }
340355
341- let role_config = transform_all_roles_to_config :: < AirflowConfigFragment , _ > ( & airflow, roles) ;
356+ let role_config = transform_all_roles_to_config :: < AirflowConfigFragment , _ > ( airflow, roles) ;
342357 let validated_role_config = validate_all_roles_and_groups_config (
343358 & resolved_product_image. product_version ,
344359 & role_config. context ( ProductConfigTransformSnafu ) ?,
@@ -350,7 +365,7 @@ pub async fn reconcile_airflow(airflow: Arc<AirflowCluster>, ctx: Arc<Ctx>) -> R
350365
351366 let vector_aggregator_address = resolve_vector_aggregator_address (
352367 client,
353- airflow. as_ref ( ) ,
368+ airflow,
354369 airflow
355370 . spec
356371 . cluster_config
@@ -374,8 +389,7 @@ pub async fn reconcile_airflow(airflow: Arc<AirflowCluster>, ctx: Arc<Ctx>) -> R
374389 . context ( BuildLabelSnafu ) ?;
375390
376391 let ( rbac_sa, rbac_rolebinding) =
377- build_rbac_resources ( airflow. as_ref ( ) , APP_NAME , required_labels)
378- . context ( BuildRBACObjectsSnafu ) ?;
392+ build_rbac_resources ( airflow, APP_NAME , required_labels) . context ( BuildRBACObjectsSnafu ) ?;
379393
380394 let rbac_sa = cluster_resources
381395 . add ( client, rbac_sa)
@@ -397,7 +411,7 @@ pub async fn reconcile_airflow(airflow: Arc<AirflowCluster>, ctx: Arc<Ctx>) -> R
397411 } = & airflow_executor
398412 {
399413 build_executor_template (
400- & airflow,
414+ airflow,
401415 common_configuration,
402416 & resolved_product_image,
403417 & authentication_config,
@@ -418,7 +432,7 @@ pub async fn reconcile_airflow(airflow: Arc<AirflowCluster>, ctx: Arc<Ctx>) -> R
418432 // some roles will only run "internally" and do not need to be created as services
419433 if let Some ( resolved_port) = role_port ( role_name) {
420434 let role_service =
421- build_role_service ( & airflow, & resolved_product_image, role_name, resolved_port) ?;
435+ build_role_service ( airflow, & resolved_product_image, role_name, resolved_port) ?;
422436 cluster_resources
423437 . add ( client, role_service)
424438 . await
@@ -427,7 +441,7 @@ pub async fn reconcile_airflow(airflow: Arc<AirflowCluster>, ctx: Arc<Ctx>) -> R
427441
428442 for ( rolegroup_name, rolegroup_config) in role_config. iter ( ) {
429443 let rolegroup = RoleGroupRef {
430- cluster : ObjectRef :: from_obj ( & * airflow) ,
444+ cluster : ObjectRef :: from_obj ( airflow) ,
431445 role : role_name. into ( ) ,
432446 role_group : rolegroup_name. into ( ) ,
433447 } ;
@@ -436,16 +450,15 @@ pub async fn reconcile_airflow(airflow: Arc<AirflowCluster>, ctx: Arc<Ctx>) -> R
436450 . merged_config ( & airflow_role, & rolegroup)
437451 . context ( FailedToResolveConfigSnafu ) ?;
438452
439- let rg_service =
440- build_rolegroup_service ( & airflow, & resolved_product_image, & rolegroup) ?;
453+ let rg_service = build_rolegroup_service ( airflow, & resolved_product_image, & rolegroup) ?;
441454 cluster_resources. add ( client, rg_service) . await . context (
442455 ApplyRoleGroupServiceSnafu {
443456 rolegroup : rolegroup. clone ( ) ,
444457 } ,
445458 ) ?;
446459
447460 let rg_statefulset = build_server_rolegroup_statefulset (
448- & airflow,
461+ airflow,
449462 & resolved_product_image,
450463 & airflow_role,
451464 & rolegroup,
@@ -466,7 +479,7 @@ pub async fn reconcile_airflow(airflow: Arc<AirflowCluster>, ctx: Arc<Ctx>) -> R
466479 ) ;
467480
468481 let rg_configmap = build_rolegroup_config_map (
469- & airflow,
482+ airflow,
470483 & resolved_product_image,
471484 & rolegroup,
472485 rolegroup_config,
@@ -488,7 +501,7 @@ pub async fn reconcile_airflow(airflow: Arc<AirflowCluster>, ctx: Arc<Ctx>) -> R
488501 pod_disruption_budget : pdb,
489502 } ) = role_config
490503 {
491- add_pdbs ( pdb, & airflow, & airflow_role, client, & mut cluster_resources)
504+ add_pdbs ( pdb, airflow, & airflow_role, client, & mut cluster_resources)
492505 . await
493506 . context ( FailedToCreatePdbSnafu ) ?;
494507 }
@@ -501,13 +514,13 @@ pub async fn reconcile_airflow(airflow: Arc<AirflowCluster>, ctx: Arc<Ctx>) -> R
501514
502515 let status = AirflowClusterStatus {
503516 conditions : compute_conditions (
504- airflow. as_ref ( ) ,
517+ airflow,
505518 & [ & ss_cond_builder, & cluster_operation_cond_builder] ,
506519 ) ,
507520 } ;
508521
509522 client
510- . apply_patch_status ( OPERATOR_NAME , & * airflow, & status)
523+ . apply_patch_status ( OPERATOR_NAME , airflow, & status)
511524 . await
512525 . context ( ApplyStatusSnafu ) ?;
513526
@@ -516,7 +529,7 @@ pub async fn reconcile_airflow(airflow: Arc<AirflowCluster>, ctx: Arc<Ctx>) -> R
516529
517530#[ allow( clippy:: too_many_arguments) ]
518531async fn build_executor_template (
519- airflow : & Arc < AirflowCluster > ,
532+ airflow : & AirflowCluster ,
520533 common_config : & CommonConfiguration < ExecutorConfigFragment > ,
521534 resolved_product_image : & ResolvedProductImage ,
522535 authentication_config : & Vec < AirflowAuthenticationConfigResolved > ,
@@ -529,7 +542,7 @@ async fn build_executor_template(
529542 . merged_executor_config ( & common_config. config )
530543 . context ( FailedToResolveConfigSnafu ) ?;
531544 let rolegroup = RoleGroupRef {
532- cluster : ObjectRef :: from_obj ( & * * airflow) ,
545+ cluster : ObjectRef :: from_obj ( airflow) ,
533546 role : "executor" . into ( ) ,
534547 role_group : "kubernetes" . into ( ) ,
535548 } ;
@@ -1223,8 +1236,17 @@ fn build_gitsync_container(
12231236 Ok ( gitsync_container)
12241237}
12251238
1226- pub fn error_policy ( _obj : Arc < AirflowCluster > , _error : & Error , _ctx : Arc < Ctx > ) -> Action {
1227- Action :: requeue ( * Duration :: from_secs ( 5 ) )
1239+ pub fn error_policy (
1240+ _obj : Arc < DeserializeGuard < AirflowCluster > > ,
1241+ error : & Error ,
1242+ _ctx : Arc < Ctx > ,
1243+ ) -> Action {
1244+ match error {
1245+ // root object is invalid, will be requeued when modified anyway
1246+ Error :: InvalidAirflowCluster { .. } => Action :: await_change ( ) ,
1247+
1248+ _ => Action :: requeue ( * Duration :: from_secs ( 10 ) ) ,
1249+ }
12281250}
12291251
12301252fn add_authentication_volumes_and_volume_mounts (
0 commit comments