Skip to content

Commit 0985950

Browse files
committed
Spit the hairy node lookup logic into a separate function
1 parent 43c6ad9 commit 0985950

File tree

1 file changed

+77
-64
lines changed

1 file changed

+77
-64
lines changed

rust/operator-binary/src/listener_controller.rs

Lines changed: 77 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -264,70 +264,8 @@ pub async fn reconcile(listener: Arc<Listener>, ctx: Arc<Ctx>) -> Result<control
264264
let ports: BTreeMap<String, i32>;
265265
match listener_class.spec.service_type {
266266
ServiceType::NodePort => {
267-
let (pvs, endpoints) = try_join(
268-
async {
269-
ctx.client
270-
.list_with_label_selector::<PersistentVolume>(
271-
&(),
272-
&LabelSelector {
273-
match_labels: Some(
274-
listener_persistent_volume_label(&listener).unwrap(),
275-
),
276-
..Default::default()
277-
},
278-
)
279-
.await
280-
.context(GetListenerPvsSnafu)
281-
},
282-
async {
283-
ctx.client
284-
// Endpoints object may not yet be created by its respective controller
285-
.get_opt::<Endpoints>(&svc_name, ns)
286-
.await
287-
.with_context(|_| GetObjectSnafu {
288-
obj: ObjectRef::<Endpoints>::new(&svc_name).within(ns).erase(),
289-
})
290-
},
291-
)
292-
.await?;
293-
294-
let pv_node_names = pvs
295-
.into_iter()
296-
.filter_map(|pv| pv.spec?.node_affinity?.required)
297-
.flat_map(|affinity| affinity.node_selector_terms)
298-
.filter_map(|terms| terms.match_expressions)
299-
.flatten()
300-
.filter(|expr| expr.key == NODE_TOPOLOGY_LABEL_HOSTNAME && expr.operator == "In")
301-
.filter_map(|expr| expr.values)
302-
.flatten()
303-
.collect::<BTreeSet<_>>();
304-
305-
// Old objects that haven't been mounted before the PV lookup mechanism was added will
306-
// not have the correct labels, so we also look up using Endpoints.
307-
let endpoints_node_names = endpoints
308-
.into_iter()
309-
.filter_map(|endpoints| endpoints.subsets)
310-
.flatten()
311-
.flat_map(|subset| subset.addresses)
312-
.flatten()
313-
.flat_map(|addr| addr.node_name)
314-
.collect::<BTreeSet<_>>();
315-
316-
let node_names_missing_from_pv = endpoints_node_names
317-
.difference(&pv_node_names)
318-
.collect::<Vec<_>>();
319-
if !node_names_missing_from_pv.is_empty() {
320-
tracing::warn!(
321-
?node_names_missing_from_pv,
322-
"some backing Nodes could only be found via legacy Endpoints discovery method, {} {}",
323-
"this may cause discovery config to be unstable",
324-
"(hint: try restarting the Pods backing this Listener)"
325-
);
326-
}
327-
328-
let mut node_names = pv_node_names;
329-
node_names.extend(endpoints_node_names);
330-
267+
let node_names =
268+
node_names_for_nodeport_listener(&ctx.client, &listener, ns, &svc_name).await?;
331269
nodes = try_join_all(node_names.iter().map(|node_name| async {
332270
ctx.client
333271
.get::<Node>(node_name, &())
@@ -428,6 +366,81 @@ pub fn error_policy<T>(_obj: Arc<T>, _error: &Error, _ctx: Arc<Ctx>) -> controll
428366
controller::Action::requeue(*Duration::from_secs(5))
429367
}
430368

369+
/// Lists the names of the [`Node`]s backing this [`Listener`].
370+
///
371+
/// Should only be used for [`NodePort`](`ServiceType::NodePort`) [`Listener`]s.
372+
async fn node_names_for_nodeport_listener(
373+
client: &stackable_operator::client::Client,
374+
listener: &Listener,
375+
namespace: &str,
376+
service_name: &str,
377+
) -> Result<BTreeSet<String>> {
378+
let (pvs, endpoints) = try_join(
379+
async {
380+
client
381+
.list_with_label_selector::<PersistentVolume>(
382+
&(),
383+
&LabelSelector {
384+
match_labels: Some(listener_persistent_volume_label(listener).unwrap()),
385+
..Default::default()
386+
},
387+
)
388+
.await
389+
.context(GetListenerPvsSnafu)
390+
},
391+
async {
392+
client
393+
// Endpoints object may not yet be created by its respective controller
394+
.get_opt::<Endpoints>(service_name, namespace)
395+
.await
396+
.with_context(|_| GetObjectSnafu {
397+
obj: ObjectRef::<Endpoints>::new(service_name)
398+
.within(namespace)
399+
.erase(),
400+
})
401+
},
402+
)
403+
.await?;
404+
405+
let pv_node_names = pvs
406+
.into_iter()
407+
.filter_map(|pv| pv.spec?.node_affinity?.required)
408+
.flat_map(|affinity| affinity.node_selector_terms)
409+
.filter_map(|terms| terms.match_expressions)
410+
.flatten()
411+
.filter(|expr| expr.key == NODE_TOPOLOGY_LABEL_HOSTNAME && expr.operator == "In")
412+
.filter_map(|expr| expr.values)
413+
.flatten()
414+
.collect::<BTreeSet<_>>();
415+
416+
// Old objects that haven't been mounted before the PV lookup mechanism was added will
417+
// not have the correct labels, so we also look up using Endpoints.
418+
let endpoints_node_names = endpoints
419+
.into_iter()
420+
.filter_map(|endpoints| endpoints.subsets)
421+
.flatten()
422+
.flat_map(|subset| subset.addresses)
423+
.flatten()
424+
.flat_map(|addr| addr.node_name)
425+
.collect::<BTreeSet<_>>();
426+
427+
let node_names_missing_from_pv = endpoints_node_names
428+
.difference(&pv_node_names)
429+
.collect::<Vec<_>>();
430+
if !node_names_missing_from_pv.is_empty() {
431+
tracing::warn!(
432+
?node_names_missing_from_pv,
433+
"some backing Nodes could only be found via legacy Endpoints discovery method, {} {}",
434+
"this may cause discovery config to be unstable",
435+
"(hint: try restarting the Pods backing this Listener)"
436+
);
437+
}
438+
439+
let mut node_names = pv_node_names;
440+
node_names.extend(endpoints_node_names);
441+
Ok(node_names)
442+
}
443+
431444
#[derive(Snafu, Debug)]
432445
#[snafu(module)]
433446
pub enum ListenerMountedPodLabelError {

0 commit comments

Comments
 (0)