@@ -30,7 +30,11 @@ use stackable_operator::{
3030 apimachinery:: pkg:: { apis:: meta:: v1:: LabelSelector , util:: intstr:: IntOrString } ,
3131 DeepMerge ,
3232 } ,
33- kube:: { runtime:: controller:: Action , Resource , ResourceExt } ,
33+ kube:: {
34+ core:: { error_boundary, DeserializeGuard } ,
35+ runtime:: controller:: Action ,
36+ Resource , ResourceExt ,
37+ } ,
3438 kvp:: { Labels , ObjectLabels } ,
3539 logging:: controller:: ReconcilerError ,
3640 memory:: { BinaryMultiple , MemoryQuantity } ,
@@ -238,6 +242,11 @@ pub enum Error {
238242 AddVolumeMount {
239243 source : builder:: pod:: container:: Error ,
240244 } ,
245+
246+ #[ snafu( display( "HelloCluster object is invalid" ) ) ]
247+ InvalidHelloCluster {
248+ source : error_boundary:: InvalidObject ,
249+ } ,
241250}
242251type Result < T , E = Error > = std:: result:: Result < T , E > ;
243252
@@ -247,8 +256,18 @@ impl ReconcilerError for Error {
247256 }
248257}
249258
250- pub async fn reconcile_hello ( hello : Arc < HelloCluster > , ctx : Arc < Ctx > ) -> Result < Action > {
259+ pub async fn reconcile_hello (
260+ hello : Arc < DeserializeGuard < HelloCluster > > ,
261+ ctx : Arc < Ctx > ,
262+ ) -> Result < Action > {
251263 tracing:: info!( "Starting reconcile" ) ;
264+
265+ let hello = hello
266+ . 0
267+ . as_ref ( )
268+ . map_err ( error_boundary:: InvalidObject :: clone)
269+ . context ( InvalidHelloClusterSnafu ) ?;
270+
252271 let client = & ctx. client ;
253272 let resolved_product_image: ResolvedProductImage = hello
254273 . spec
@@ -259,7 +278,7 @@ pub async fn reconcile_hello(hello: Arc<HelloCluster>, ctx: Arc<Ctx>) -> Result<
259278 let validated_config = validate_all_roles_and_groups_config (
260279 & resolved_product_image. product_version ,
261280 & transform_all_roles_to_config (
262- hello. as_ref ( ) ,
281+ hello,
263282 [ (
264283 HelloRole :: Server . to_string ( ) ,
265284 (
@@ -296,7 +315,7 @@ pub async fn reconcile_hello(hello: Arc<HelloCluster>, ctx: Arc<Ctx>) -> Result<
296315 . context ( CreateClusterResourcesSnafu ) ?;
297316
298317 let ( rbac_sa, rbac_rolebinding) = build_rbac_resources (
299- hello. as_ref ( ) ,
318+ hello,
300319 APP_NAME ,
301320 cluster_resources
302321 . get_required_labels ( )
@@ -313,15 +332,15 @@ pub async fn reconcile_hello(hello: Arc<HelloCluster>, ctx: Arc<Ctx>) -> Result<
313332 . await
314333 . context ( ApplyRoleBindingSnafu ) ?;
315334
316- let server_role_service = build_server_role_service ( & hello, & resolved_product_image) ?;
335+ let server_role_service = build_server_role_service ( hello, & resolved_product_image) ?;
317336
318337 // we have to get the assigned ports
319338 cluster_resources
320339 . add ( client, server_role_service)
321340 . await
322341 . context ( ApplyRoleServiceSnafu ) ?;
323342
324- let vector_aggregator_address = resolve_vector_aggregator_address ( & hello, client)
343+ let vector_aggregator_address = resolve_vector_aggregator_address ( hello, client)
325344 . await
326345 . context ( ResolveVectorAggregatorAddressSnafu ) ?;
327346
@@ -334,17 +353,17 @@ pub async fn reconcile_hello(hello: Arc<HelloCluster>, ctx: Arc<Ctx>) -> Result<
334353 . merged_config ( & HelloRole :: Server , & role_group_ref)
335354 . context ( FailedToResolveResourceConfigSnafu ) ?;
336355
337- let rg_service = build_rolegroup_service ( & hello, & resolved_product_image, & role_group_ref) ?;
356+ let rg_service = build_rolegroup_service ( hello, & resolved_product_image, & role_group_ref) ?;
338357 let rg_configmap = build_server_rolegroup_config_map (
339- & hello,
358+ hello,
340359 & resolved_product_image,
341360 & role_group_ref,
342361 rolegroup_config,
343362 & config,
344363 vector_aggregator_address. as_deref ( ) ,
345364 ) ?;
346365 let rg_statefulset = build_server_rolegroup_statefulset (
347- & hello,
366+ hello,
348367 & resolved_product_image,
349368 & hello_role,
350369 & role_group_ref,
@@ -382,7 +401,7 @@ pub async fn reconcile_hello(hello: Arc<HelloCluster>, ctx: Arc<Ctx>) -> Result<
382401 pod_disruption_budget : pdb,
383402 } ) = role_config
384403 {
385- add_pdbs ( pdb, & hello, & hello_role, client, & mut cluster_resources)
404+ add_pdbs ( pdb, hello, & hello_role, client, & mut cluster_resources)
386405 . await
387406 . context ( FailedToCreatePdbSnafu ) ?;
388407 }
@@ -391,14 +410,11 @@ pub async fn reconcile_hello(hello: Arc<HelloCluster>, ctx: Arc<Ctx>) -> Result<
391410 ClusterOperationsConditionBuilder :: new ( & hello. spec . cluster_operation ) ;
392411
393412 let status = HelloClusterStatus {
394- conditions : compute_conditions (
395- hello. as_ref ( ) ,
396- & [ & ss_cond_builder, & cluster_operation_cond_builder] ,
397- ) ,
413+ conditions : compute_conditions ( hello, & [ & ss_cond_builder, & cluster_operation_cond_builder] ) ,
398414 } ;
399415
400416 client
401- . apply_patch_status ( OPERATOR_NAME , & * hello, & status)
417+ . apply_patch_status ( OPERATOR_NAME , hello, & status)
402418 . await
403419 . context ( ApplyStatusSnafu ) ?;
404420
@@ -807,8 +823,15 @@ fn build_server_rolegroup_statefulset(
807823 } )
808824}
809825
810- pub fn error_policy ( _obj : Arc < HelloCluster > , _error : & Error , _ctx : Arc < Ctx > ) -> Action {
811- Action :: requeue ( Duration :: from_secs ( 5 ) )
826+ pub fn error_policy (
827+ _obj : Arc < DeserializeGuard < HelloCluster > > ,
828+ error : & Error ,
829+ _ctx : Arc < Ctx > ,
830+ ) -> Action {
831+ match error {
832+ Error :: InvalidHelloCluster { .. } => Action :: await_change ( ) ,
833+ _ => Action :: requeue ( Duration :: from_secs ( 5 ) ) ,
834+ }
812835}
813836
814837fn service_ports ( ) -> Vec < ServicePort > {
0 commit comments