Skip to content

Commit 26e8695

Browse files
committed
Migrate to DeserializeGuard
Some cases have been migrated to PartialObjectMeta instead where only the metadata is used anyway. Part of stackabletech/issues#211. Fixes #237.
1 parent 858e3fa commit 26e8695

File tree

3 files changed

+80
-29
lines changed

3 files changed

+80
-29
lines changed

rust/operator-binary/src/pod_enrichment_controller.rs

Lines changed: 37 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,9 @@ use snafu::{ResultExt, Snafu};
55
use stackable_operator::{
66
k8s_openapi::api::core::v1::{Node, Pod},
77
kube::{
8-
core::ObjectMeta,
8+
core::{error_boundary, DeserializeGuard, ObjectMeta},
99
runtime::{controller, reflector::ObjectRef, watcher, Controller},
10+
Resource,
1011
},
1112
logging::controller::{report_controller_reconciled, ReconcilerError},
1213
namespace::WatchNamespace,
@@ -23,11 +24,17 @@ struct Ctx {
2324
#[derive(Snafu, Debug, EnumDiscriminants)]
2425
#[strum_discriminants(derive(IntoStaticStr))]
2526
pub enum Error {
27+
#[snafu(display("Pod object is invalid"))]
28+
InvalidPod {
29+
source: error_boundary::InvalidObject,
30+
},
31+
2632
#[snafu(display("failed to get {node} for Pod"))]
2733
GetNode {
2834
source: stackable_operator::client::Error,
2935
node: ObjectRef<Node>,
3036
},
37+
3138
#[snafu(display("failed to update Pod"))]
3239
UpdatePod {
3340
source: stackable_operator::client::Error,
@@ -41,6 +48,7 @@ impl ReconcilerError for Error {
4148

4249
fn secondary_object(&self) -> Option<ObjectRef<stackable_operator::kube::core::DynamicObject>> {
4350
match self {
51+
Error::InvalidPod { source: _ } => None,
4452
Error::GetNode { node, .. } => Some(node.clone().erase()),
4553
Error::UpdatePod { source: _ } => None,
4654
}
@@ -49,20 +57,23 @@ impl ReconcilerError for Error {
4957

5058
pub async fn start(client: &stackable_operator::client::Client, watch_namespace: &WatchNamespace) {
5159
let controller = Controller::new(
52-
watch_namespace.get_api::<Pod>(client),
60+
watch_namespace.get_api::<DeserializeGuard<Pod>>(client),
5361
watcher::Config::default().labels("enrichment.stackable.tech/enabled=true"),
5462
);
5563
let pods = controller.store();
5664
controller
5765
.watches(
58-
client.get_all_api::<Node>(),
66+
client.get_all_api::<DeserializeGuard<Node>>(),
5967
watcher::Config::default(),
6068
move |node| {
6169
pods.state()
6270
.into_iter()
6371
.filter(move |pod| {
72+
let Ok(pod) = &pod.0 else {
73+
return false;
74+
};
6475
pod.spec.as_ref().and_then(|s| s.node_name.as_deref())
65-
== node.metadata.name.as_deref()
76+
== node.meta().name.as_deref()
6677
})
6778
.map(|pod| ObjectRef::from_obj(&*pod))
6879
},
@@ -86,7 +97,17 @@ pub enum NodeAddressType {
8697
InternalIP,
8798
}
8899

89-
async fn reconcile(pod: Arc<Pod>, ctx: Arc<Ctx>) -> Result<controller::Action, Error> {
100+
async fn reconcile(
101+
pod: Arc<DeserializeGuard<Pod>>,
102+
ctx: Arc<Ctx>,
103+
) -> Result<controller::Action, Error> {
104+
tracing::info!("Starting reconcile");
105+
let pod = pod
106+
.0
107+
.as_ref()
108+
.map_err(error_boundary::InvalidObject::clone)
109+
.context(InvalidPodSnafu)?;
110+
90111
let node_name = pod.spec.as_ref().and_then(|s| s.node_name.as_deref());
91112
let node = if let Some(node_name) = node_name {
92113
ctx.client
@@ -133,6 +154,15 @@ async fn reconcile(pod: Arc<Pod>, ctx: Arc<Ctx>) -> Result<controller::Action, E
133154
Ok(controller::Action::await_change())
134155
}
135156

136-
fn error_policy(_obj: Arc<Pod>, _error: &Error, _ctx: Arc<Ctx>) -> controller::Action {
137-
controller::Action::requeue(Duration::from_secs(5))
157+
fn error_policy(
158+
_obj: Arc<DeserializeGuard<Pod>>,
159+
error: &Error,
160+
_ctx: Arc<Ctx>,
161+
) -> controller::Action {
162+
match error {
163+
// root object is invalid, will be requeued when modified anyway
164+
Error::InvalidPod { .. } => controller::Action::await_change(),
165+
166+
_ => controller::Action::requeue(Duration::from_secs(5)),
167+
}
138168
}

rust/operator-binary/src/restart_controller/pod.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use stackable_operator::{
1010
},
1111
kube::{
1212
self,
13-
api::EvictParams,
13+
api::{EvictParams, PartialObjectMeta},
1414
core::DynamicObject,
1515
runtime::{controller::Action, reflector::ObjectRef, watcher, Controller},
1616
},
@@ -63,7 +63,7 @@ impl ReconcilerError for Error {
6363

6464
pub async fn start(client: &Client, watch_namespace: &WatchNamespace) {
6565
let controller = Controller::new(
66-
watch_namespace.get_api::<Pod>(client),
66+
watch_namespace.get_api::<PartialObjectMeta<Pod>>(client),
6767
watcher::Config::default(),
6868
);
6969
controller
@@ -80,7 +80,7 @@ pub async fn start(client: &Client, watch_namespace: &WatchNamespace) {
8080
.await;
8181
}
8282

83-
async fn reconcile(pod: Arc<Pod>, ctx: Arc<Ctx>) -> Result<Action, Error> {
83+
async fn reconcile(pod: Arc<PartialObjectMeta<Pod>>, ctx: Arc<Ctx>) -> Result<Action, Error> {
8484
tracing::info!("Starting reconciliation ..");
8585
if pod.metadata.deletion_timestamp.is_some() {
8686
// Object is already being deleted, no point trying again
@@ -163,6 +163,6 @@ async fn reconcile(pod: Arc<Pod>, ctx: Arc<Ctx>) -> Result<Action, Error> {
163163
}
164164
}
165165

166-
fn error_policy(_obj: Arc<Pod>, _error: &Error, _ctx: Arc<Ctx>) -> Action {
166+
fn error_policy(_obj: Arc<PartialObjectMeta<Pod>>, _error: &Error, _ctx: Arc<Ctx>) -> Action {
167167
Action::requeue(Duration::from_secs(5))
168168
}

rust/operator-binary/src/restart_controller/statefulset.rs

Lines changed: 39 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,8 @@ use stackable_operator::k8s_openapi::api::core::v1::{
1212
ConfigMap, EnvFromSource, EnvVar, PodSpec, Secret, Volume,
1313
};
1414
use stackable_operator::kube;
15-
use stackable_operator::kube::api::{Patch, PatchParams};
16-
use stackable_operator::kube::core::DynamicObject;
15+
use stackable_operator::kube::api::{PartialObjectMeta, Patch, PatchParams};
16+
use stackable_operator::kube::core::{error_boundary, DeserializeGuard, DynamicObject};
1717
use stackable_operator::kube::runtime::controller::{
1818
trigger_self, trigger_with, Action, ReconcileRequest,
1919
};
@@ -26,15 +26,20 @@ use strum::{EnumDiscriminants, IntoStaticStr};
2626

2727
struct Ctx {
2828
kube: kube::Client,
29-
cms: Store<ConfigMap>,
29+
cms: Store<PartialObjectMeta<ConfigMap>>,
3030
cms_inited: Arc<AtomicBool>,
31-
secrets: Store<Secret>,
31+
secrets: Store<PartialObjectMeta<Secret>>,
3232
secrets_inited: Arc<AtomicBool>,
3333
}
3434

3535
#[derive(Snafu, Debug, EnumDiscriminants)]
3636
#[strum_discriminants(derive(IntoStaticStr))]
3737
enum Error {
38+
#[snafu(display("StatefulSet object is invalid"))]
39+
InvalidStatefulSet {
40+
source: error_boundary::InvalidObject,
41+
},
42+
3843
#[snafu(display("failed to patch object {obj_ref}"))]
3944
PatchFailed {
4045
source: kube::Error,
@@ -55,6 +60,7 @@ impl ReconcilerError for Error {
5560

5661
fn secondary_object(&self) -> Option<ObjectRef<DynamicObject>> {
5762
match self {
63+
Error::InvalidStatefulSet { source: _ } => None,
5864
Error::PatchFailed { obj_ref, .. } => Some(*obj_ref.clone()),
5965
Error::ConfigMapsUninitialized => None,
6066
Error::SecretsUninitialized => None,
@@ -63,10 +69,10 @@ impl ReconcilerError for Error {
6369
}
6470

6571
pub async fn start(client: &Client, watch_namespace: &WatchNamespace) {
66-
let stses = watch_namespace.get_api::<StatefulSet>(client);
67-
let cms = watch_namespace.get_api::<ConfigMap>(client);
68-
let secrets = watch_namespace.get_api::<Secret>(client);
69-
let sts_store = reflector::store::Writer::new(());
72+
let stses = watch_namespace.get_api::<DeserializeGuard<StatefulSet>>(client);
73+
let cms = watch_namespace.get_api::<PartialObjectMeta<ConfigMap>>(client);
74+
let secrets = watch_namespace.get_api::<PartialObjectMeta<Secret>>(client);
75+
let sts_store = reflector::store::Writer::<DeserializeGuard<StatefulSet>>::new(());
7076
let cm_store = reflector::store::Writer::new(());
7177
let secret_store = reflector::store::Writer::new(());
7278
let cms_inited = Arc::new(AtomicBool::from(false));
@@ -161,7 +167,17 @@ fn find_pod_refs<'a, K: Resource + 'a>(
161167
.chain(container_env_from_refs)
162168
}
163169

164-
async fn reconcile(sts: Arc<StatefulSet>, ctx: Arc<Ctx>) -> Result<Action, Error> {
170+
async fn reconcile(
171+
sts: Arc<DeserializeGuard<StatefulSet>>,
172+
ctx: Arc<Ctx>,
173+
) -> Result<Action, Error> {
174+
tracing::info!("Starting reconcile");
175+
let sts = sts
176+
.0
177+
.as_ref()
178+
.map_err(error_boundary::InvalidObject::clone)
179+
.context(InvalidStatefulSetSnafu)?;
180+
165181
if !ctx.cms_inited.load(std::sync::atomic::Ordering::SeqCst) {
166182
return ConfigMapsUninitializedSnafu.fail();
167183
}
@@ -181,12 +197,12 @@ async fn reconcile(sts: Arc<StatefulSet>, ctx: Arc<Ctx>) -> Result<Action, Error
181197
find_pod_refs(
182198
pod_spec,
183199
|volume| {
184-
Some(ObjectRef::<ConfigMap>::new(
200+
Some(ObjectRef::<PartialObjectMeta<ConfigMap>>::new(
185201
&volume.config_map.as_ref()?.name,
186202
))
187203
},
188204
|env_var| {
189-
Some(ObjectRef::<ConfigMap>::new(
205+
Some(ObjectRef::<PartialObjectMeta<ConfigMap>>::new(
190206
&env_var
191207
.value_from
192208
.as_ref()?
@@ -196,7 +212,7 @@ async fn reconcile(sts: Arc<StatefulSet>, ctx: Arc<Ctx>) -> Result<Action, Error
196212
))
197213
},
198214
|env_from| {
199-
Some(ObjectRef::<ConfigMap>::new(
215+
Some(ObjectRef::<PartialObjectMeta<ConfigMap>>::new(
200216
&env_from.config_map_ref.as_ref()?.name,
201217
))
202218
},
@@ -225,17 +241,17 @@ async fn reconcile(sts: Arc<StatefulSet>, ctx: Arc<Ctx>) -> Result<Action, Error
225241
find_pod_refs(
226242
pod_spec,
227243
|volume| {
228-
Some(ObjectRef::<Secret>::new(
244+
Some(ObjectRef::<PartialObjectMeta<Secret>>::new(
229245
volume.secret.as_ref()?.secret_name.as_deref()?,
230246
))
231247
},
232248
|env_var| {
233-
Some(ObjectRef::<Secret>::new(
249+
Some(ObjectRef::<PartialObjectMeta<Secret>>::new(
234250
&env_var.value_from.as_ref()?.secret_key_ref.as_ref()?.name,
235251
))
236252
},
237253
|env_from| {
238-
Some(ObjectRef::<Secret>::new(
254+
Some(ObjectRef::<PartialObjectMeta<Secret>>::new(
239255
&env_from.secret_ref.as_ref()?.name,
240256
))
241257
},
@@ -290,11 +306,16 @@ async fn reconcile(sts: Arc<StatefulSet>, ctx: Arc<Ctx>) -> Result<Action, Error
290306
)
291307
.await
292308
.context(PatchFailedSnafu {
293-
obj_ref: ObjectRef::from_obj(sts.as_ref()).erase(),
309+
obj_ref: ObjectRef::from_obj(sts).erase(),
294310
})?;
295311
Ok(Action::await_change())
296312
}
297313

298-
fn error_policy(_obj: Arc<StatefulSet>, _error: &Error, _ctx: Arc<Ctx>) -> Action {
299-
Action::requeue(Duration::from_secs(5))
314+
fn error_policy(_obj: Arc<DeserializeGuard<StatefulSet>>, error: &Error, _ctx: Arc<Ctx>) -> Action {
315+
match error {
316+
// root object is invalid, will be requeued when modified anyway
317+
Error::InvalidStatefulSet { .. } => Action::await_change(),
318+
319+
_ => Action::requeue(Duration::from_secs(5)),
320+
}
300321
}

0 commit comments

Comments
 (0)