diff --git a/CHANGELOG.md b/CHANGELOG.md index 912c327d..9b07e2b1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,9 +14,14 @@ - `podOverrides` - `affinity` +### Fixed + +- Invalid `SupersetCluster`, `DruidConnection` or `AuthenticationClass` objects don't stop the operator from reconciling ([#551]). + [#528]: https://github.com/stackabletech/superset-operator/pull/528 [#530]: https://github.com/stackabletech/superset-operator/pull/530 [#549]: https://github.com/stackabletech/superset-operator/pull/549 +[#551]: https://github.com/stackabletech/superset-operator/pull/551 ## [24.7.0] - 2024-07-24 diff --git a/rust/operator-binary/src/druid_connection_controller.rs b/rust/operator-binary/src/druid_connection_controller.rs index 4286935b..d642cb01 100644 --- a/rust/operator-binary/src/druid_connection_controller.rs +++ b/rust/operator-binary/src/druid_connection_controller.rs @@ -15,6 +15,7 @@ use stackable_operator::{ }, kube::{ core::DynamicObject, + core::{error_boundary, DeserializeGuard}, runtime::{controller::Action, reflector::ObjectRef}, ResourceExt, }, @@ -84,6 +85,11 @@ pub enum Error { SupersetClusterRetrieval { source: stackable_operator::client::Error, }, + + #[snafu(display("DruidConnection object is invalid"))] + InvalidDruidConnection { + source: error_boundary::InvalidObject, + }, } type Result = std::result::Result; @@ -110,20 +116,25 @@ impl ReconcilerError for Error { Error::ApplyServiceAccount { .. } => None, Error::ApplyRoleBinding { .. } => None, Error::SupersetClusterRetrieval { .. } => None, + Error::InvalidDruidConnection { .. } => None, } } } pub async fn reconcile_druid_connection( - druid_connection: Arc, + druid_connection: Arc>, ctx: Arc, ) -> Result { tracing::info!("Starting reconciling DruidConnections"); + let druid_connection = druid_connection + .0 + .as_ref() + .map_err(error_boundary::InvalidObject::clone) + .context(InvalidDruidConnectionSnafu)?; let client = &ctx.client; - let (rbac_sa, rbac_rolebinding) = - rbac::build_rbac_resources(druid_connection.as_ref(), APP_NAME); + let (rbac_sa, rbac_rolebinding) = rbac::build_rbac_resources(druid_connection, APP_NAME); client .apply_patch(DRUID_CONNECTION_CONTROLLER_NAME, &rbac_sa, &rbac_sa) .await @@ -146,7 +157,7 @@ pub async fn reconcile_druid_connection( &druid_connection.druid_name(), &druid_connection.druid_namespace().context( DruidConnectionNoNamespaceSnafu { - druid_connection: ObjectRef::from_obj(&*druid_connection), + druid_connection: ObjectRef::from_obj(druid_connection), }, )?, ) @@ -159,7 +170,7 @@ pub async fn reconcile_druid_connection( &druid_connection.superset_name(), &druid_connection.superset_namespace().context( DruidConnectionNoNamespaceSnafu { - druid_connection: ObjectRef::from_obj(&*druid_connection), + druid_connection: ObjectRef::from_obj(druid_connection), }, )?, ) @@ -183,7 +194,7 @@ pub async fn reconcile_druid_connection( &druid_connection.druid_name(), &druid_connection.druid_namespace().context( DruidConnectionNoNamespaceSnafu { - druid_connection: ObjectRef::from_obj(&*druid_connection), + druid_connection: ObjectRef::from_obj(druid_connection), }, )?, client, @@ -195,7 +206,7 @@ pub async fn reconcile_druid_connection( .resolve(DOCKER_IMAGE_BASE_NAME, crate::built_info::PKG_VERSION); let job = build_import_job( &superset_cluster, - &druid_connection, + druid_connection, &resolved_product_image, &sqlalchemy_str, &rbac_sa.name_any(), @@ -209,7 +220,7 @@ pub async fn reconcile_druid_connection( client .apply_patch_status( DRUID_CONNECTION_CONTROLLER_NAME, - &*druid_connection, + druid_connection, &s.importing(), ) .await @@ -236,11 +247,7 @@ pub async fn reconcile_druid_connection( if let Some(ns) = new_status { client - .apply_patch_status( - DRUID_CONNECTION_CONTROLLER_NAME, - &*druid_connection, - &ns, - ) + .apply_patch_status(DRUID_CONNECTION_CONTROLLER_NAME, druid_connection, &ns) .await .context(ApplyStatusSnafu)?; } @@ -253,7 +260,7 @@ pub async fn reconcile_druid_connection( client .apply_patch_status( DRUID_CONNECTION_CONTROLLER_NAME, - &*druid_connection, + druid_connection, &DruidConnectionStatus::new(), ) .await @@ -360,6 +367,13 @@ async fn build_import_job( Ok(job) } -pub fn error_policy(_obj: Arc, _error: &Error, _ctx: Arc) -> Action { - Action::requeue(*Duration::from_secs(5)) +pub fn error_policy( + _obj: Arc>, + error: &Error, + _ctx: Arc, +) -> Action { + match error { + Error::InvalidDruidConnection { .. } => Action::await_change(), + _ => Action::requeue(*Duration::from_secs(5)), + } } diff --git a/rust/operator-binary/src/main.rs b/rust/operator-binary/src/main.rs index 98dc9dd7..9a73b80f 100644 --- a/rust/operator-binary/src/main.rs +++ b/rust/operator-binary/src/main.rs @@ -11,6 +11,7 @@ use stackable_operator::{ core::v1::{ConfigMap, Service}, }, kube::{ + core::DeserializeGuard, runtime::{reflector::ObjectRef, watcher, Controller}, ResourceExt, }, @@ -86,28 +87,28 @@ async fn main() -> anyhow::Result<()> { .await?; let superset_controller_builder = Controller::new( - watch_namespace.get_api::(&client), + watch_namespace.get_api::>(&client), watcher::Config::default(), ); let superset_store_1 = superset_controller_builder.store(); let superset_controller = superset_controller_builder .owns( - watch_namespace.get_api::(&client), + watch_namespace.get_api::>(&client), watcher::Config::default(), ) .owns( - watch_namespace.get_api::(&client), + watch_namespace.get_api::>(&client), watcher::Config::default(), ) .shutdown_on_signal() .watches( - client.get_api::(&()), + client.get_api::>(&()), watcher::Config::default(), move |authentication_class| { superset_store_1 .state() .into_iter() - .filter(move |superset: &Arc| { + .filter(move |superset| { references_authentication_class(superset, &authentication_class) }) .map(|superset| ObjectRef::from_obj(&*superset)) @@ -130,7 +131,7 @@ async fn main() -> anyhow::Result<()> { }); let druid_connection_controller_builder = Controller::new( - watch_namespace.get_api::(&client), + watch_namespace.get_api::>(&client), watcher::Config::default(), ); let druid_connection_store_1 = druid_connection_controller_builder.store(); @@ -139,46 +140,38 @@ async fn main() -> anyhow::Result<()> { let druid_connection_controller = druid_connection_controller_builder .shutdown_on_signal() .watches( - watch_namespace.get_api::(&client), + watch_namespace.get_api::>(&client), watcher::Config::default(), move |superset_cluster| { druid_connection_store_1 .state() .into_iter() .filter(move |druid_connection| { - druid_connection.superset_name() == superset_cluster.name_any() - && druid_connection.superset_namespace().ok() - == superset_cluster.namespace() + valid_druid_connection(&superset_cluster, druid_connection) }) .map(|druid_connection| ObjectRef::from_obj(&*druid_connection)) }, ) .watches( - watch_namespace.get_api::(&client), + watch_namespace.get_api::>(&client), watcher::Config::default(), move |job| { druid_connection_store_2 .state() .into_iter() - .filter(move |druid_connection| { - druid_connection.metadata.namespace == job.metadata.namespace - && Some(druid_connection.job_name()) == job.metadata.name - }) + .filter(move |druid_connection| valid_druid_job(druid_connection, &job)) .map(|druid_connection| ObjectRef::from_obj(&*druid_connection)) }, ) .watches( - watch_namespace.get_api::(&client), + watch_namespace.get_api::>(&client), watcher::Config::default(), move |config_map| { druid_connection_store_3 .state() .into_iter() .filter(move |druid_connection| { - druid_connection.druid_namespace().ok() - == config_map.metadata.namespace - && Some(druid_connection.druid_name()) - == config_map.metadata.name + valid_druid_connection_namespace(druid_connection, &config_map) }) .map(|druid_connection| ObjectRef::from_obj(&*druid_connection)) }, @@ -208,9 +201,13 @@ async fn main() -> anyhow::Result<()> { } fn references_authentication_class( - superset: &SupersetCluster, - authentication_class: &AuthenticationClass, + superset: &DeserializeGuard, + authentication_class: &DeserializeGuard, ) -> bool { + let Ok(superset) = &superset.0 else { + return false; + }; + let authentication_class_name = authentication_class.name_any(); superset .spec @@ -219,3 +216,36 @@ fn references_authentication_class( .iter() .any(|c| c.common.authentication_class_name() == &authentication_class_name) } + +fn valid_druid_connection( + superset_cluster: &DeserializeGuard, + druid_connection: &DeserializeGuard, +) -> bool { + let Ok(druid_connection) = &druid_connection.0 else { + return false; + }; + druid_connection.superset_name() == superset_cluster.name_any() + && druid_connection.superset_namespace().ok() == superset_cluster.namespace() +} + +fn valid_druid_connection_namespace( + druid_connection: &DeserializeGuard, + config_map: &DeserializeGuard, +) -> bool { + let Ok(druid_connection) = &druid_connection.0 else { + return false; + }; + druid_connection.druid_namespace().ok() == config_map.meta().namespace + && Some(druid_connection.druid_name()) == config_map.meta().name +} + +fn valid_druid_job( + druid_connection: &DeserializeGuard, + job: &DeserializeGuard, +) -> bool { + let Ok(druid_connection) = &druid_connection.0 else { + return false; + }; + druid_connection.metadata.namespace == job.meta().namespace + && Some(druid_connection.job_name()) == job.meta().name +} diff --git a/rust/operator-binary/src/superset_controller.rs b/rust/operator-binary/src/superset_controller.rs index 09c8ca8c..f47a2a68 100644 --- a/rust/operator-binary/src/superset_controller.rs +++ b/rust/operator-binary/src/superset_controller.rs @@ -38,7 +38,11 @@ use stackable_operator::{ apimachinery::pkg::{apis::meta::v1::LabelSelector, util::intstr::IntOrString}, DeepMerge, }, - kube::{runtime::controller::Action, Resource, ResourceExt}, + kube::{ + core::{error_boundary, DeserializeGuard}, + runtime::controller::Action, + Resource, ResourceExt, + }, kvp::{Label, Labels}, logging::controller::ReconcilerError, product_config_utils::{ @@ -274,6 +278,11 @@ pub enum Error { AddVolumeMount { source: builder::pod::container::Error, }, + + #[snafu(display("SupersetCluster object is invalid"))] + InvalidSupersetCluster { + source: error_boundary::InvalidObject, + }, } type Result = std::result::Result; @@ -284,9 +293,18 @@ impl ReconcilerError for Error { } } -pub async fn reconcile_superset(superset: Arc, ctx: Arc) -> Result { +pub async fn reconcile_superset( + superset: Arc>, + ctx: Arc, +) -> Result { tracing::info!("Starting reconcile"); + let superset = superset + .0 + .as_ref() + .map_err(error_boundary::InvalidObject::clone) + .context(InvalidSupersetClusterSnafu)?; + let client = &ctx.client; let resolved_product_image: ResolvedProductImage = superset .spec @@ -299,7 +317,7 @@ pub async fn reconcile_superset(superset: Arc, ctx: Arc) - let vector_aggregator_address = resolve_vector_aggregator_address( client, - superset.as_ref(), + superset, superset .spec .cluster_config @@ -319,7 +337,7 @@ pub async fn reconcile_superset(superset: Arc, ctx: Arc) - let validated_config = validate_all_roles_and_groups_config( &resolved_product_image.product_version, &transform_all_roles_to_config( - superset.as_ref(), + superset, [( superset_role.to_string(), ( @@ -354,7 +372,7 @@ pub async fn reconcile_superset(superset: Arc, ctx: Arc) - .context(CreateClusterResourcesSnafu)?; let (rbac_sa, rbac_rolebinding) = build_rbac_resources( - superset.as_ref(), + superset, APP_NAME, cluster_resources .get_required_labels() @@ -371,7 +389,7 @@ pub async fn reconcile_superset(superset: Arc, ctx: Arc) - .await .context(ApplyRoleBindingSnafu)?; - let node_role_service = build_node_role_service(&superset, &resolved_product_image)?; + let node_role_service = build_node_role_service(superset, &resolved_product_image)?; cluster_resources .add(client, node_role_service) .await @@ -387,9 +405,9 @@ pub async fn reconcile_superset(superset: Arc, ctx: Arc) - .context(FailedToResolveConfigSnafu)?; let rg_service = - build_node_rolegroup_service(&superset, &resolved_product_image, &rolegroup)?; + build_node_rolegroup_service(superset, &resolved_product_image, &rolegroup)?; let rg_configmap = build_rolegroup_config_map( - &superset, + superset, &resolved_product_image, &rolegroup, rolegroup_config, @@ -398,7 +416,7 @@ pub async fn reconcile_superset(superset: Arc, ctx: Arc) - vector_aggregator_address.as_deref(), )?; let rg_statefulset = build_server_rolegroup_statefulset( - &superset, + superset, &resolved_product_image, &superset_role, &rolegroup, @@ -436,7 +454,7 @@ pub async fn reconcile_superset(superset: Arc, ctx: Arc) - { add_pdbs( pdb, - &superset, + superset, &superset_role, client, &mut cluster_resources, @@ -452,12 +470,12 @@ pub async fn reconcile_superset(superset: Arc, ctx: Arc) - let status = SupersetClusterStatus { conditions: compute_conditions( - superset.as_ref(), + superset, &[&ss_cond_builder, &cluster_operation_cond_builder], ), }; client - .apply_patch_status(OPERATOR_NAME, &*superset, &status) + .apply_patch_status(OPERATOR_NAME, superset, &status) .await .context(ApplyStatusSnafu)?; @@ -1019,6 +1037,13 @@ fn authentication_start_commands( .join("\n") } -pub fn error_policy(_obj: Arc, _error: &Error, _ctx: Arc) -> Action { - Action::requeue(*Duration::from_secs(5)) +pub fn error_policy( + _obj: Arc>, + error: &Error, + _ctx: Arc, +) -> Action { + match error { + Error::InvalidSupersetCluster { .. } => Action::await_change(), + _ => Action::requeue(*Duration::from_secs(5)), + } }