diff --git a/CHANGELOG.md b/CHANGELOG.md index 31a91754..ece4cd98 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -23,6 +23,7 @@ All notable changes to this project will be documented in this file. - BREAKING: The fields `connection` and `host` on `S3Connection` as well as `bucketName` on `S3Bucket`are now mandatory ([#472]). - Fix `envOverrides` for SparkApplication and SparkHistoryServer ([#451]). - Ensure SparkApplications can only create a single submit Job. Fix for #457 ([#460]). +- Invalid `SparkApplication`/`SparkHistoryServer` objects don't cause the operator to stop functioning (#[482]). ### Removed diff --git a/rust/operator-binary/src/history/history_controller.rs b/rust/operator-binary/src/history/history_controller.rs index d024590f..12b0a8d7 100644 --- a/rust/operator-binary/src/history/history_controller.rs +++ b/rust/operator-binary/src/history/history_controller.rs @@ -2,6 +2,7 @@ use crate::history::operations::pdb::add_pdbs; use crate::product_logging::{self, resolve_vector_aggregator_address}; use crate::Ctx; use product_config::{types::PropertyNameKind, writer::to_java_properties_string}; +use stackable_operator::kube::core::{error_boundary, DeserializeGuard}; use stackable_operator::{ builder::{ self, @@ -196,6 +197,11 @@ pub enum Error { AddVolumeMount { source: builder::pod::container::Error, }, + + #[snafu(display("SparkHistoryServer object is invalid"))] + InvalidSparkHistoryServer { + source: error_boundary::InvalidObject, + }, } type Result = std::result::Result; @@ -206,9 +212,18 @@ impl ReconcilerError for Error { } } /// Updates the status of the SparkApplication that started the pod. -pub async fn reconcile(shs: Arc, ctx: Arc) -> Result { +pub async fn reconcile( + shs: Arc>, + ctx: Arc, +) -> Result { tracing::info!("Starting reconcile history server"); + let shs = shs + .0 + .as_ref() + .map_err(error_boundary::InvalidObject::clone) + .context(InvalidSparkHistoryServerSnafu)?; + let client = &ctx.client; let mut cluster_resources = ClusterResources::new( @@ -244,7 +259,7 @@ pub async fn reconcile(shs: Arc, ctx: Arc) -> Result, ctx: Arc) -> Result, ctx: Arc) -> Result, ctx: Arc) -> Result, ctx: Arc) -> Result, ctx: Arc) -> Result, ctx: Arc) -> Result, ctx: Arc) -> Result, _error: &Error, _ctx: Arc) -> Action { - Action::requeue(*Duration::from_secs(5)) +pub fn error_policy( + _obj: Arc>, + error: &Error, + _ctx: Arc, +) -> Action { + match error { + Error::InvalidSparkHistoryServer { .. } => Action::await_change(), + _ => Action::requeue(*Duration::from_secs(5)), + } } #[allow(clippy::result_large_err)] diff --git a/rust/operator-binary/src/main.rs b/rust/operator-binary/src/main.rs index c8d61a7f..fb9abb42 100644 --- a/rust/operator-binary/src/main.rs +++ b/rust/operator-binary/src/main.rs @@ -13,6 +13,7 @@ use stackable_operator::cli::{Command, ProductOperatorRun}; use stackable_operator::k8s_openapi::api::apps::v1::StatefulSet; use stackable_operator::k8s_openapi::api::core::v1::Pod; use stackable_operator::k8s_openapi::api::core::v1::{ConfigMap, Service}; +use stackable_operator::kube::core::DeserializeGuard; use stackable_operator::kube::runtime::{controller::Controller, watcher}; use stackable_operator::logging::controller::report_controller_reconciled; use stackable_operator::CustomResourceExt; @@ -79,11 +80,11 @@ async fn main() -> anyhow::Result<()> { product_config: product_config.load(&PRODUCT_CONFIG_PATHS)?, }; let app_controller = Controller::new( - watch_namespace.get_api::(&client), + watch_namespace.get_api::>(&client), watcher::Config::default(), ) .owns( - watch_namespace.get_api::(&client), + watch_namespace.get_api::>(&client), watcher::Config::default(), ) .shutdown_on_signal() @@ -102,12 +103,12 @@ async fn main() -> anyhow::Result<()> { .instrument(info_span!("app_controller")); let pod_driver_controller = Controller::new( - watch_namespace.get_api::(&client), + watch_namespace.get_api::>(&client), watcher::Config::default() .labels(&format!("app.kubernetes.io/managed-by={OPERATOR_NAME}_{CONTROLLER_NAME},spark-role=driver")), ) .owns( - watch_namespace.get_api::(&client), + watch_namespace.get_api::>(&client), watcher::Config::default(), ) .shutdown_on_signal() @@ -125,23 +126,23 @@ async fn main() -> anyhow::Result<()> { product_config: product_config.load(&PRODUCT_CONFIG_PATHS)?, }; let history_controller = Controller::new( - watch_namespace.get_api::(&client), + watch_namespace.get_api::>(&client), watcher::Config::default(), ) .owns( - watch_namespace.get_api::(&client), + watch_namespace.get_api::>(&client), watcher::Config::default(), ) .owns( - watch_namespace.get_api::(&client), + watch_namespace.get_api::>(&client), watcher::Config::default(), ) .owns( - watch_namespace.get_api::(&client), + watch_namespace.get_api::>(&client), watcher::Config::default(), ) .owns( - watch_namespace.get_api::(&client), + watch_namespace.get_api::>(&client), watcher::Config::default(), ) .shutdown_on_signal() diff --git a/rust/operator-binary/src/pod_driver_controller.rs b/rust/operator-binary/src/pod_driver_controller.rs index 435f079d..f75d5225 100644 --- a/rust/operator-binary/src/pod_driver_controller.rs +++ b/rust/operator-binary/src/pod_driver_controller.rs @@ -1,5 +1,8 @@ use stackable_operator::{ - client::Client, k8s_openapi::api::core::v1::Pod, kube::runtime::controller::Action, + client::Client, + k8s_openapi::api::core::v1::Pod, + kube::core::{error_boundary, DeserializeGuard}, + kube::runtime::controller::Action, time::Duration, }; use stackable_spark_k8s_crd::{ @@ -35,6 +38,10 @@ pub enum Error { source: stackable_operator::client::Error, name: String, }, + #[snafu(display("Pod object is invalid"))] + InvalidPod { + source: error_boundary::InvalidObject, + }, } type Result = std::result::Result; @@ -45,9 +52,15 @@ impl ReconcilerError for Error { } } /// Updates the status of the SparkApplication that started the pod. -pub async fn reconcile(pod: Arc, client: Arc) -> Result { +pub async fn reconcile(pod: Arc>, client: Arc) -> Result { tracing::info!("Starting reconcile driver pod"); + let pod = pod + .0 + .as_ref() + .map_err(error_boundary::InvalidObject::clone) + .context(InvalidPodSnafu)?; + let pod_name = pod.metadata.name.as_ref().context(PodNameNotFoundSnafu)?; let app_name = pod .metadata @@ -94,6 +107,9 @@ pub async fn reconcile(pod: Arc, client: Arc) -> Result { Ok(Action::await_change()) } -pub fn error_policy(_obj: Arc, _error: &Error, _ctx: Arc) -> Action { - Action::requeue(*Duration::from_secs(5)) +pub fn error_policy(_obj: Arc>, error: &Error, _ctx: Arc) -> Action { + match error { + Error::InvalidPod { .. } => Action::await_change(), + _ => Action::requeue(*Duration::from_secs(5)), + } } diff --git a/rust/operator-binary/src/spark_k8s_controller.rs b/rust/operator-binary/src/spark_k8s_controller.rs index 1866ca47..72dff776 100644 --- a/rust/operator-binary/src/spark_k8s_controller.rs +++ b/rust/operator-binary/src/spark_k8s_controller.rs @@ -43,6 +43,7 @@ use stackable_operator::{ Resource, }, kube::{ + core::{error_boundary, DeserializeGuard}, runtime::{controller::Action, reflector::ObjectRef}, ResourceExt, }, @@ -194,6 +195,11 @@ pub enum Error { AddVolumeMount { source: builder::pod::container::Error, }, + + #[snafu(display("SparkApplication object is invalid"))] + InvalidSparkApplication { + source: error_boundary::InvalidObject, + }, } type Result = std::result::Result; @@ -204,9 +210,18 @@ impl ReconcilerError for Error { } } -pub async fn reconcile(spark_application: Arc, ctx: Arc) -> Result { +pub async fn reconcile( + spark_application: Arc>, + ctx: Arc, +) -> Result { tracing::info!("Starting reconcile"); + let spark_application = spark_application + .0 + .as_ref() + .map_err(error_boundary::InvalidObject::clone) + .context(InvalidSparkApplicationSnafu)?; + let client = &ctx.client; if spark_application.k8s_job_has_been_created() { @@ -269,7 +284,7 @@ pub async fn reconcile(spark_application: Arc, ctx: Arc) .context(InvalidProductConfigSnafu)?; let (serviceaccount, rolebinding) = - build_spark_role_serviceaccount(&spark_application, &resolved_product_image)?; + build_spark_role_serviceaccount(spark_application, &resolved_product_image)?; client .apply_patch(CONTROLLER_NAME, &serviceaccount, &serviceaccount) .await @@ -305,7 +320,7 @@ pub async fn reconcile(spark_application: Arc, ctx: Arc) .and_then(|r| r.get(&"default".to_string())); let driver_pod_template_config_map = pod_template_config_map( - &spark_application, + spark_application, SparkApplicationRole::Driver, &driver_config, driver_product_config, @@ -334,7 +349,7 @@ pub async fn reconcile(spark_application: Arc, ctx: Arc) .and_then(|r| r.get(&"default".to_string())); let executor_pod_template_config_map = pod_template_config_map( - &spark_application, + spark_application, SparkApplicationRole::Executor, &executor_config, executor_product_config, @@ -372,7 +387,7 @@ pub async fn reconcile(spark_application: Arc, ctx: Arc) .and_then(|r| r.get(&"default".to_string())); let submit_job_config_map = submit_job_config_map( - &spark_application, + spark_application, submit_product_config, &resolved_product_image, )?; @@ -386,7 +401,7 @@ pub async fn reconcile(spark_application: Arc, ctx: Arc) .context(ApplyApplicationSnafu)?; let job = spark_job( - &spark_application, + spark_application, &resolved_product_image, &serviceaccount, &env_vars, @@ -406,7 +421,7 @@ pub async fn reconcile(spark_application: Arc, ctx: Arc) client .apply_patch_status( CONTROLLER_NAME, - spark_application.as_ref(), + spark_application, &SparkApplicationStatus { phase: "Unknown".to_string(), }, @@ -984,6 +999,13 @@ fn security_context() -> PodSecurityContext { } } -pub fn error_policy(_obj: Arc, _error: &Error, _ctx: Arc) -> Action { - Action::requeue(*Duration::from_secs(5)) +pub fn error_policy( + _obj: Arc>, + error: &Error, + _ctx: Arc, +) -> Action { + match error { + Error::InvalidSparkApplication { .. } => Action::await_change(), + _ => Action::requeue(*Duration::from_secs(5)), + } }