Skip to content

Commit b28a0ba

Browse files
authored
fix: invalid objects don't stop the reconciliation (#482)
1 parent e138c37 commit b28a0ba

File tree

5 files changed

+94
-32
lines changed

5 files changed

+94
-32
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ All notable changes to this project will be documented in this file.
2525
- BREAKING: The fields `connection` and `host` on `S3Connection` as well as `bucketName` on `S3Bucket`are now mandatory ([#472]).
2626
- Fix `envOverrides` for SparkApplication and SparkHistoryServer ([#451]).
2727
- Ensure SparkApplications can only create a single submit Job. Fix for #457 ([#460]).
28+
- Invalid `SparkApplication`/`SparkHistoryServer` objects don't cause the operator to stop functioning (#[482]).
2829

2930
### Removed
3031

rust/operator-binary/src/history/history_controller.rs

Lines changed: 32 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ use crate::history::operations::pdb::add_pdbs;
22
use crate::product_logging::{self, resolve_vector_aggregator_address};
33
use crate::Ctx;
44
use product_config::{types::PropertyNameKind, writer::to_java_properties_string};
5+
use stackable_operator::kube::core::{error_boundary, DeserializeGuard};
56
use stackable_operator::{
67
builder::{
78
self,
@@ -196,6 +197,11 @@ pub enum Error {
196197
AddVolumeMount {
197198
source: builder::pod::container::Error,
198199
},
200+
201+
#[snafu(display("SparkHistoryServer object is invalid"))]
202+
InvalidSparkHistoryServer {
203+
source: error_boundary::InvalidObject,
204+
},
199205
}
200206

201207
type Result<T, E = Error> = std::result::Result<T, E>;
@@ -206,9 +212,18 @@ impl ReconcilerError for Error {
206212
}
207213
}
208214
/// Updates the status of the SparkApplication that started the pod.
209-
pub async fn reconcile(shs: Arc<SparkHistoryServer>, ctx: Arc<Ctx>) -> Result<Action> {
215+
pub async fn reconcile(
216+
shs: Arc<DeserializeGuard<SparkHistoryServer>>,
217+
ctx: Arc<Ctx>,
218+
) -> Result<Action> {
210219
tracing::info!("Starting reconcile history server");
211220

221+
let shs = shs
222+
.0
223+
.as_ref()
224+
.map_err(error_boundary::InvalidObject::clone)
225+
.context(InvalidSparkHistoryServerSnafu)?;
226+
212227
let client = &ctx.client;
213228

214229
let mut cluster_resources = ClusterResources::new(
@@ -244,7 +259,7 @@ pub async fn reconcile(shs: Arc<SparkHistoryServer>, ctx: Arc<Ctx>) -> Result<Ac
244259

245260
// Use a dedicated service account for history server pods.
246261
let (serviceaccount, rolebinding) =
247-
build_history_role_serviceaccount(&shs, &resolved_product_image.app_version_label)?;
262+
build_history_role_serviceaccount(shs, &resolved_product_image.app_version_label)?;
248263
let serviceaccount = cluster_resources
249264
.add(client, serviceaccount)
250265
.await
@@ -261,7 +276,7 @@ pub async fn reconcile(shs: Arc<SparkHistoryServer>, ctx: Arc<Ctx>) -> Result<Ac
261276
.iter()
262277
{
263278
let service = build_service(
264-
&shs,
279+
shs,
265280
&resolved_product_image.app_version_label,
266281
role_name,
267282
None,
@@ -273,7 +288,7 @@ pub async fn reconcile(shs: Arc<SparkHistoryServer>, ctx: Arc<Ctx>) -> Result<Ac
273288

274289
for (rolegroup_name, rolegroup_config) in role_config.iter() {
275290
let rgr = RoleGroupRef {
276-
cluster: ObjectRef::from_obj(&*shs),
291+
cluster: ObjectRef::from_obj(shs),
277292
role: role_name.into(),
278293
role_group: rolegroup_name.into(),
279294
};
@@ -283,7 +298,7 @@ pub async fn reconcile(shs: Arc<SparkHistoryServer>, ctx: Arc<Ctx>) -> Result<Ac
283298
.context(FailedToResolveConfigSnafu)?;
284299

285300
let service = build_service(
286-
&shs,
301+
shs,
287302
&resolved_product_image.app_version_label,
288303
role_name,
289304
Some(&rgr),
@@ -294,7 +309,7 @@ pub async fn reconcile(shs: Arc<SparkHistoryServer>, ctx: Arc<Ctx>) -> Result<Ac
294309
.context(ApplyServiceSnafu)?;
295310

296311
let config_map = build_config_map(
297-
&shs,
312+
shs,
298313
rolegroup_config,
299314
&merged_config,
300315
&resolved_product_image.app_version_label,
@@ -308,7 +323,7 @@ pub async fn reconcile(shs: Arc<SparkHistoryServer>, ctx: Arc<Ctx>) -> Result<Ac
308323
.context(ApplyConfigMapSnafu)?;
309324

310325
let sts = build_stateful_set(
311-
&shs,
326+
shs,
312327
&resolved_product_image,
313328
&rgr,
314329
&log_dir,
@@ -324,7 +339,7 @@ pub async fn reconcile(shs: Arc<SparkHistoryServer>, ctx: Arc<Ctx>) -> Result<Ac
324339
let role_config = &shs.spec.nodes.role_config;
325340
add_pdbs(
326341
&role_config.pod_disruption_budget,
327-
&shs,
342+
shs,
328343
client,
329344
&mut cluster_resources,
330345
)
@@ -340,8 +355,15 @@ pub async fn reconcile(shs: Arc<SparkHistoryServer>, ctx: Arc<Ctx>) -> Result<Ac
340355
Ok(Action::await_change())
341356
}
342357

343-
pub fn error_policy(_obj: Arc<SparkHistoryServer>, _error: &Error, _ctx: Arc<Ctx>) -> Action {
344-
Action::requeue(*Duration::from_secs(5))
358+
pub fn error_policy(
359+
_obj: Arc<DeserializeGuard<SparkHistoryServer>>,
360+
error: &Error,
361+
_ctx: Arc<Ctx>,
362+
) -> Action {
363+
match error {
364+
Error::InvalidSparkHistoryServer { .. } => Action::await_change(),
365+
_ => Action::requeue(*Duration::from_secs(5)),
366+
}
345367
}
346368

347369
#[allow(clippy::result_large_err)]

rust/operator-binary/src/main.rs

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ use stackable_operator::cli::{Command, ProductOperatorRun};
1313
use stackable_operator::k8s_openapi::api::apps::v1::StatefulSet;
1414
use stackable_operator::k8s_openapi::api::core::v1::Pod;
1515
use stackable_operator::k8s_openapi::api::core::v1::{ConfigMap, Service};
16+
use stackable_operator::kube::core::DeserializeGuard;
1617
use stackable_operator::kube::runtime::{controller::Controller, watcher};
1718
use stackable_operator::logging::controller::report_controller_reconciled;
1819
use stackable_operator::CustomResourceExt;
@@ -83,11 +84,11 @@ async fn main() -> anyhow::Result<()> {
8384
product_config: product_config.load(&PRODUCT_CONFIG_PATHS)?,
8485
};
8586
let app_controller = Controller::new(
86-
watch_namespace.get_api::<SparkApplication>(&client),
87+
watch_namespace.get_api::<DeserializeGuard<SparkApplication>>(&client),
8788
watcher::Config::default(),
8889
)
8990
.owns(
90-
watch_namespace.get_api::<ConfigMap>(&client),
91+
watch_namespace.get_api::<DeserializeGuard<ConfigMap>>(&client),
9192
watcher::Config::default(),
9293
)
9394
.shutdown_on_signal()
@@ -106,12 +107,12 @@ async fn main() -> anyhow::Result<()> {
106107
.instrument(info_span!("app_controller"));
107108

108109
let pod_driver_controller = Controller::new(
109-
watch_namespace.get_api::<Pod>(&client),
110+
watch_namespace.get_api::<DeserializeGuard<Pod>>(&client),
110111
watcher::Config::default()
111112
.labels(&format!("app.kubernetes.io/managed-by={OPERATOR_NAME}_{CONTROLLER_NAME},spark-role=driver")),
112113
)
113114
.owns(
114-
watch_namespace.get_api::<Pod>(&client),
115+
watch_namespace.get_api::<DeserializeGuard<Pod>>(&client),
115116
watcher::Config::default(),
116117
)
117118
.shutdown_on_signal()
@@ -129,23 +130,23 @@ async fn main() -> anyhow::Result<()> {
129130
product_config: product_config.load(&PRODUCT_CONFIG_PATHS)?,
130131
};
131132
let history_controller = Controller::new(
132-
watch_namespace.get_api::<SparkHistoryServer>(&client),
133+
watch_namespace.get_api::<DeserializeGuard<SparkHistoryServer>>(&client),
133134
watcher::Config::default(),
134135
)
135136
.owns(
136-
watch_namespace.get_api::<SparkHistoryServer>(&client),
137+
watch_namespace.get_api::<DeserializeGuard<SparkHistoryServer>>(&client),
137138
watcher::Config::default(),
138139
)
139140
.owns(
140-
watch_namespace.get_api::<StatefulSet>(&client),
141+
watch_namespace.get_api::<DeserializeGuard<StatefulSet>>(&client),
141142
watcher::Config::default(),
142143
)
143144
.owns(
144-
watch_namespace.get_api::<Service>(&client),
145+
watch_namespace.get_api::<DeserializeGuard<Service>>(&client),
145146
watcher::Config::default(),
146147
)
147148
.owns(
148-
watch_namespace.get_api::<ConfigMap>(&client),
149+
watch_namespace.get_api::<DeserializeGuard<ConfigMap>>(&client),
149150
watcher::Config::default(),
150151
)
151152
.shutdown_on_signal()

rust/operator-binary/src/pod_driver_controller.rs

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
use stackable_operator::{
2-
client::Client, k8s_openapi::api::core::v1::Pod, kube::runtime::controller::Action,
2+
client::Client,
3+
k8s_openapi::api::core::v1::Pod,
4+
kube::core::{error_boundary, DeserializeGuard},
5+
kube::runtime::controller::Action,
36
time::Duration,
47
};
58
use stackable_spark_k8s_crd::{
@@ -35,6 +38,10 @@ pub enum Error {
3538
source: stackable_operator::client::Error,
3639
name: String,
3740
},
41+
#[snafu(display("Pod object is invalid"))]
42+
InvalidPod {
43+
source: error_boundary::InvalidObject,
44+
},
3845
}
3946

4047
type Result<T, E = Error> = std::result::Result<T, E>;
@@ -45,9 +52,15 @@ impl ReconcilerError for Error {
4552
}
4653
}
4754
/// Updates the status of the SparkApplication that started the pod.
48-
pub async fn reconcile(pod: Arc<Pod>, client: Arc<Client>) -> Result<Action> {
55+
pub async fn reconcile(pod: Arc<DeserializeGuard<Pod>>, client: Arc<Client>) -> Result<Action> {
4956
tracing::info!("Starting reconcile driver pod");
5057

58+
let pod = pod
59+
.0
60+
.as_ref()
61+
.map_err(error_boundary::InvalidObject::clone)
62+
.context(InvalidPodSnafu)?;
63+
5164
let pod_name = pod.metadata.name.as_ref().context(PodNameNotFoundSnafu)?;
5265
let app_name = pod
5366
.metadata
@@ -94,6 +107,9 @@ pub async fn reconcile(pod: Arc<Pod>, client: Arc<Client>) -> Result<Action> {
94107
Ok(Action::await_change())
95108
}
96109

97-
pub fn error_policy(_obj: Arc<Pod>, _error: &Error, _ctx: Arc<Client>) -> Action {
98-
Action::requeue(*Duration::from_secs(5))
110+
pub fn error_policy(_obj: Arc<DeserializeGuard<Pod>>, error: &Error, _ctx: Arc<Client>) -> Action {
111+
match error {
112+
Error::InvalidPod { .. } => Action::await_change(),
113+
_ => Action::requeue(*Duration::from_secs(5)),
114+
}
99115
}

rust/operator-binary/src/spark_k8s_controller.rs

Lines changed: 31 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ use stackable_operator::{
4343
Resource,
4444
},
4545
kube::{
46+
core::{error_boundary, DeserializeGuard},
4647
runtime::{controller::Action, reflector::ObjectRef},
4748
ResourceExt,
4849
},
@@ -194,6 +195,11 @@ pub enum Error {
194195
AddVolumeMount {
195196
source: builder::pod::container::Error,
196197
},
198+
199+
#[snafu(display("SparkApplication object is invalid"))]
200+
InvalidSparkApplication {
201+
source: error_boundary::InvalidObject,
202+
},
197203
}
198204

199205
type Result<T, E = Error> = std::result::Result<T, E>;
@@ -204,9 +210,18 @@ impl ReconcilerError for Error {
204210
}
205211
}
206212

207-
pub async fn reconcile(spark_application: Arc<SparkApplication>, ctx: Arc<Ctx>) -> Result<Action> {
213+
pub async fn reconcile(
214+
spark_application: Arc<DeserializeGuard<SparkApplication>>,
215+
ctx: Arc<Ctx>,
216+
) -> Result<Action> {
208217
tracing::info!("Starting reconcile");
209218

219+
let spark_application = spark_application
220+
.0
221+
.as_ref()
222+
.map_err(error_boundary::InvalidObject::clone)
223+
.context(InvalidSparkApplicationSnafu)?;
224+
210225
let client = &ctx.client;
211226

212227
if spark_application.k8s_job_has_been_created() {
@@ -269,7 +284,7 @@ pub async fn reconcile(spark_application: Arc<SparkApplication>, ctx: Arc<Ctx>)
269284
.context(InvalidProductConfigSnafu)?;
270285

271286
let (serviceaccount, rolebinding) =
272-
build_spark_role_serviceaccount(&spark_application, &resolved_product_image)?;
287+
build_spark_role_serviceaccount(spark_application, &resolved_product_image)?;
273288
client
274289
.apply_patch(CONTROLLER_NAME, &serviceaccount, &serviceaccount)
275290
.await
@@ -305,7 +320,7 @@ pub async fn reconcile(spark_application: Arc<SparkApplication>, ctx: Arc<Ctx>)
305320
.and_then(|r| r.get(&"default".to_string()));
306321

307322
let driver_pod_template_config_map = pod_template_config_map(
308-
&spark_application,
323+
spark_application,
309324
SparkApplicationRole::Driver,
310325
&driver_config,
311326
driver_product_config,
@@ -334,7 +349,7 @@ pub async fn reconcile(spark_application: Arc<SparkApplication>, ctx: Arc<Ctx>)
334349
.and_then(|r| r.get(&"default".to_string()));
335350

336351
let executor_pod_template_config_map = pod_template_config_map(
337-
&spark_application,
352+
spark_application,
338353
SparkApplicationRole::Executor,
339354
&executor_config,
340355
executor_product_config,
@@ -372,7 +387,7 @@ pub async fn reconcile(spark_application: Arc<SparkApplication>, ctx: Arc<Ctx>)
372387
.and_then(|r| r.get(&"default".to_string()));
373388

374389
let submit_job_config_map = submit_job_config_map(
375-
&spark_application,
390+
spark_application,
376391
submit_product_config,
377392
&resolved_product_image,
378393
)?;
@@ -386,7 +401,7 @@ pub async fn reconcile(spark_application: Arc<SparkApplication>, ctx: Arc<Ctx>)
386401
.context(ApplyApplicationSnafu)?;
387402

388403
let job = spark_job(
389-
&spark_application,
404+
spark_application,
390405
&resolved_product_image,
391406
&serviceaccount,
392407
&env_vars,
@@ -406,7 +421,7 @@ pub async fn reconcile(spark_application: Arc<SparkApplication>, ctx: Arc<Ctx>)
406421
client
407422
.apply_patch_status(
408423
CONTROLLER_NAME,
409-
spark_application.as_ref(),
424+
spark_application,
410425
&SparkApplicationStatus {
411426
phase: "Unknown".to_string(),
412427
},
@@ -984,6 +999,13 @@ fn security_context() -> PodSecurityContext {
984999
}
9851000
}
9861001

987-
pub fn error_policy(_obj: Arc<SparkApplication>, _error: &Error, _ctx: Arc<Ctx>) -> Action {
988-
Action::requeue(*Duration::from_secs(5))
1002+
pub fn error_policy(
1003+
_obj: Arc<DeserializeGuard<SparkApplication>>,
1004+
error: &Error,
1005+
_ctx: Arc<Ctx>,
1006+
) -> Action {
1007+
match error {
1008+
Error::InvalidSparkApplication { .. } => Action::await_change(),
1009+
_ => Action::requeue(*Duration::from_secs(5)),
1010+
}
9891011
}

0 commit comments

Comments
 (0)