From ce2e31494af6af2ab85d705c7f54e8f3b55a4dff Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Natalie=20Klestrup=20R=C3=B6ijezon?= Date: Fri, 4 Oct 2024 15:51:32 +0200 Subject: [PATCH 01/10] Silence unactionable Clippy warnings --- rust/csi-grpc/src/lib.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/rust/csi-grpc/src/lib.rs b/rust/csi-grpc/src/lib.rs index d6adb506..b46312cf 100644 --- a/rust/csi-grpc/src/lib.rs +++ b/rust/csi-grpc/src/lib.rs @@ -1,5 +1,8 @@ //! Include gRPC definition files that have been generated by `build.rs` +// CSI docs don't quite align with Rustdoc conventions +#![allow(clippy::doc_lazy_continuation)] + pub static FILE_DESCRIPTOR_SET_BYTES: &[u8] = include_bytes!(concat!(env!("OUT_DIR"), "/file_descriptor_set.bin")); From df97d7a2c4436c9261ace663bfa0102a0fd833a4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Natalie=20Klestrup=20R=C3=B6ijezon?= Date: Fri, 4 Oct 2024 15:52:31 +0200 Subject: [PATCH 02/10] Generate NodePort address listings from PersistentVolume reservations Fixes #220, fixes #229. --- rust/operator-binary/src/csi_server/node.rs | 46 +++++++- .../src/listener_controller.rs | 108 ++++++++++++------ 2 files changed, 119 insertions(+), 35 deletions(-) diff --git a/rust/operator-binary/src/csi_server/node.rs b/rust/operator-binary/src/csi_server/node.rs index 4986bca2..5167ab27 100644 --- a/rust/operator-binary/src/csi_server/node.rs +++ b/rust/operator-binary/src/csi_server/node.rs @@ -17,7 +17,10 @@ use std::{fmt::Debug, path::PathBuf}; use tonic::{Request, Response, Status}; use crate::{ - listener_controller::{listener_mounted_pod_label, ListenerMountedPodLabelError}, + listener_controller::{ + listener_mounted_pod_label, listener_persistent_volume_label, ListenerMountedPodLabelError, + ListenerPersistentVolumeLabelError, + }, utils::{error_full_message, node_primary_address}, }; @@ -25,6 +28,8 @@ use super::{tonic_unimplemented, ListenerSelector, ListenerVolumeContext}; const FIELD_MANAGER_SCOPE: &str = "volume"; +pub const NODE_TOPOLOGY_LABEL_HOSTNAME: &str = "listeners.stackable.tech/hostname"; + pub struct ListenerOperatorNode { pub client: stackable_operator::client::Client, pub node_name: String, @@ -55,6 +60,12 @@ enum PublishVolumeError { #[snafu(display("PersistentVolume has no corresponding PersistentVolumeClaim"))] UnclaimedPv, + #[snafu(display("failed to generate {listener}'s PersistentVolume selector"))] + ListenerPvReference { + source: ListenerPersistentVolumeLabelError, + listener: ObjectRef, + }, + #[snafu(display("failed to generate {listener}'s pod selector"))] ListenerPodSelector { source: ListenerMountedPodLabelError, @@ -75,6 +86,12 @@ enum PublishVolumeError { listener: ObjectRef, }, + #[snafu(display("failed to add listener label to {pv}"))] + AddListenerLabelToPv { + source: stackable_operator::client::Error, + pv: ObjectRef, + }, + #[snafu(display("failed to add listener label to {pod}"))] AddListenerLabelToPod { source: stackable_operator::client::Error, @@ -112,9 +129,11 @@ impl From for Status { PublishVolumeError::GetObject { .. } => Status::unavailable(full_msg), PublishVolumeError::UnclaimedPv => Status::unavailable(full_msg), PublishVolumeError::PodHasNoNode { .. } => Status::unavailable(full_msg), + PublishVolumeError::ListenerPvReference { .. } => Status::failed_precondition(full_msg), PublishVolumeError::ListenerPodSelector { .. } => Status::failed_precondition(full_msg), PublishVolumeError::BuildListenerOwnerRef { .. } => Status::unavailable(full_msg), PublishVolumeError::ApplyListener { .. } => Status::unavailable(full_msg), + PublishVolumeError::AddListenerLabelToPv { .. } => Status::unavailable(full_msg), PublishVolumeError::AddListenerLabelToPod { .. } => Status::unavailable(full_msg), PublishVolumeError::NoAddresses { .. } => Status::unavailable(full_msg), PublishVolumeError::PreparePodDir { .. } => Status::internal(full_msg), @@ -155,7 +174,7 @@ impl csi::v1::node_server::Node for ListenerOperatorNode { max_volumes_per_node: i64::MAX, accessible_topology: Some(Topology { segments: [( - "listeners.stackable.tech/hostname".to_string(), + NODE_TOPOLOGY_LABEL_HOSTNAME.to_string(), self.node_name.clone(), )] .into(), @@ -276,6 +295,29 @@ impl csi::v1::node_server::Node for ListenerOperatorNode { } }; + // Add listener label to PV, allowing traffic to be directed based on reservations, rather than which replicas are *currently* active. + // See https://github.com/stackabletech/listener-operator/issues/220 + self.client + .apply_patch( + FIELD_MANAGER_SCOPE, + &pv, + &PersistentVolume { + metadata: ObjectMeta { + labels: Some(listener_persistent_volume_label(&listener).context( + ListenerPvReferenceSnafu { + listener: ObjectRef::from_obj(&listener), + }, + )?), + ..Default::default() + }, + ..Default::default() + }, + ) + .await + .with_context(|_| AddListenerLabelToPvSnafu { + pv: ObjectRef::from_obj(&pv), + })?; + // Add listener label to pod so that traffic can be directed to it self.client // IMPORTANT diff --git a/rust/operator-binary/src/listener_controller.rs b/rust/operator-binary/src/listener_controller.rs index 27084dfb..f64f71a3 100644 --- a/rust/operator-binary/src/listener_controller.rs +++ b/rust/operator-binary/src/listener_controller.rs @@ -1,4 +1,7 @@ -use std::{collections::BTreeMap, sync::Arc}; +use std::{ + collections::{BTreeMap, BTreeSet}, + sync::Arc, +}; use futures::{future::try_join_all, StreamExt}; use snafu::{OptionExt, ResultExt, Snafu}; @@ -8,17 +11,21 @@ use stackable_operator::{ AddressType, Listener, ListenerClass, ListenerIngress, ListenerPort, ListenerSpec, ListenerStatus, ServiceType, }, - k8s_openapi::api::core::v1::{Endpoints, Node, Service, ServicePort, ServiceSpec}, + k8s_openapi::{ + api::core::v1::{Node, PersistentVolume, Service, ServicePort, ServiceSpec}, + apimachinery::pkg::apis::meta::v1::LabelSelector, + }, kube::{ api::{DynamicObject, ObjectMeta}, runtime::{controller, reflector::ObjectRef, watcher}, + ResourceExt, }, logging::controller::{report_controller_reconciled, ReconcilerError}, time::Duration, }; use strum::IntoStaticStr; -use crate::utils::node_primary_address; +use crate::{csi_server::node::NODE_TOPOLOGY_LABEL_HOSTNAME, utils::node_primary_address}; #[cfg(doc)] use stackable_operator::k8s_openapi::api::core::v1::Pod; @@ -26,26 +33,17 @@ use stackable_operator::k8s_openapi::api::core::v1::Pod; const FIELD_MANAGER_SCOPE: &str = "listener"; pub async fn run(client: stackable_operator::client::Client) { - let controller = - controller::Controller::new(client.get_all_api::(), watcher::Config::default()); - let listener_store = controller.store(); - controller + controller::Controller::new(client.get_all_api::(), watcher::Config::default()) .owns(client.get_all_api::(), watcher::Config::default()) .watches( - client.get_all_api::(), + client.get_all_api::(), watcher::Config::default(), - move |endpoints| { - listener_store - .state() - .into_iter() - .filter(move |listener| { - listener - .status - .as_ref() - .and_then(|s| s.service_name.as_deref()) - == endpoints.metadata.name.as_deref() - }) - .map(|l| ObjectRef::from_obj(&*l)) + |pv| { + let labels = pv.labels(); + labels + .get(PV_LABEL_LISTENER_NAMESPACE) + .zip(labels.get(PV_LABEL_LISTENER_NAME)) + .map(|(ns, name)| ObjectRef::::new(name).within(ns)) }, ) .shutdown_on_signal() @@ -75,7 +73,11 @@ pub enum Error { NoName, #[snafu(display("object has no ListenerClass (.spec.class_name)"))] NoListenerClass, - #[snafu(display("failed to generate listener's pod selector"))] + #[snafu(display("failed to generate Listener's PersistentVolume selector"))] + ListenerPvSelector { + source: ListenerPersistentVolumeLabelError, + }, + #[snafu(display("failed to generate Listener's Pod selector"))] ListenerPodSelector { source: ListenerMountedPodLabelError, }, @@ -109,6 +111,7 @@ impl ReconcilerError for Error { Self::NoNs => None, Self::NoName => None, Self::NoListenerClass => None, + Self::ListenerPvSelector { source: _ } => None, Self::ListenerPodSelector { source: _ } => None, Self::GetObject { source: _, obj } => Some(obj.clone()), Self::BuildListenerOwnerRef { .. } => None, @@ -217,23 +220,27 @@ pub async fn reconcile(listener: Arc, ctx: Arc) -> Result; match listener_class.spec.service_type { ServiceType::NodePort => { - let endpoints = ctx + let pvs = ctx .client - .get_opt::(&svc_name, ns) + .list_with_label_selector::( + &(), + &LabelSelector { + match_labels: Some(listener_persistent_volume_label(&listener).unwrap()), + ..Default::default() + }, + ) .await - .with_context(|_| GetObjectSnafu { - obj: ObjectRef::::new(&svc_name).within(ns).erase(), - })? - // Endpoints object may not yet be created by its respective controller - .unwrap_or_default(); - let node_names = endpoints - .subsets + .unwrap(); + let node_names = pvs .into_iter() + .filter_map(|pv| pv.spec?.node_affinity?.required) + .flat_map(|affinity| affinity.node_selector_terms) + .filter_map(|terms| terms.match_expressions) .flatten() - .flat_map(|subset| subset.addresses) + .filter(|expr| expr.key == NODE_TOPOLOGY_LABEL_HOSTNAME && expr.operator == "In") + .filter_map(|expr| expr.values) .flatten() - .flat_map(|addr| addr.node_name) - .collect::>(); + .collect::>(); nodes = try_join_all(node_names.iter().map(|node_name| async { ctx.client .get::(node_name, &()) @@ -356,8 +363,43 @@ pub fn listener_mounted_pod_label( // 60. // We prefer uid over name because uids have a consistent length. Ok(( + // This should probably have been listeners.stackable.tech/ instead, but too late to change now format!("listener.stackable.tech/mnt.{}", uid.replace('-', "")), // Arbitrary, but (hopefully) helps indicate to users which listener it applies to listener.metadata.name.clone().context(NoNameSnafu)?, )) } + +#[derive(Snafu, Debug)] +#[snafu(module)] +pub enum ListenerPersistentVolumeLabelError { + #[snafu(display("object has no name"))] + NoName, + #[snafu(display("object has no namespace"))] + NoNamespace, +} + +const PV_LABEL_LISTENER_NAMESPACE: &str = "listeners.stackable.tech/listener-namespace"; +const PV_LABEL_LISTENER_NAME: &str = "listeners.stackable.tech/listener-name"; + +/// A label that identifies which [`Listener`] corresponds to a given [`PersistentVolume`]. +pub fn listener_persistent_volume_label( + listener: &Listener, +) -> Result, ListenerPersistentVolumeLabelError> { + use listener_persistent_volume_label_error::*; + Ok([ + ( + PV_LABEL_LISTENER_NAMESPACE.to_string(), + listener + .metadata + .namespace + .clone() + .context(NoNamespaceSnafu)?, + ), + ( + PV_LABEL_LISTENER_NAME.to_string(), + listener.metadata.name.clone().context(NoNameSnafu)?, + ), + ] + .into()) +} From fac61dc32789a4cc8252c264cbffef81ba17fa8f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Natalie=20Klestrup=20R=C3=B6ijezon?= Date: Fri, 4 Oct 2024 16:07:29 +0200 Subject: [PATCH 03/10] Changelog --- CHANGELOG.md | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 9011d876..5f7fb6df 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,16 @@ All notable changes to this project will be documented in this file. ## [Unreleased] +### Changed + +- Listener.status.addresses for NodePort listeners now includes replicas that are currently unavailable ([#231]). + +### Fixed + +- Listener.status.addresses is now de-duplicated ([#231]). + +[#231]: https://github.com/stackabletech/listener-operator/pull/231 + ## [24.7.0] - 2024-07-24 ### Added From 2fb9ee2a96bd584757bb127ae8e65a21f5a703aa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Natalie=20Klestrup=20R=C3=B6ijezon?= Date: Fri, 4 Oct 2024 16:14:49 +0200 Subject: [PATCH 04/10] Update Cargo.nix --- Cargo.nix | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/Cargo.nix b/Cargo.nix index 48ca33dd..42eebabe 100644 --- a/Cargo.nix +++ b/Cargo.nix @@ -8333,9 +8333,9 @@ rec { }; "tonic" = rec { crateName = "tonic"; - version = "0.12.2"; + version = "0.12.3"; edition = "2021"; - sha256 = "1bc8m8r7ysgkb7mhs3b3mvivd43nwaix6qnqhfp5hb2bkscbmxn6"; + sha256 = "0ljd1lfjpw0vrm5wbv15x6nq2i38llsanls5rkzmdn2n0wrmnz47"; authors = [ "Lucio Franco " ]; @@ -8462,7 +8462,7 @@ rec { } ]; features = { - "channel" = [ "dep:hyper" "hyper?/client" "dep:hyper-util" "hyper-util?/client-legacy" "dep:tower" "tower?/balance" "tower?/buffer" "tower?/discover" "tower?/limit" "dep:tokio" "tokio?/time" "dep:hyper-timeout" ]; + "channel" = [ "dep:hyper" "hyper?/client" "dep:hyper-util" "hyper-util?/client-legacy" "dep:tower" "tower?/balance" "tower?/buffer" "tower?/discover" "tower?/limit" "tower?/util" "dep:tokio" "tokio?/time" "dep:hyper-timeout" ]; "codegen" = [ "dep:async-trait" ]; "default" = [ "transport" "codegen" "prost" ]; "gzip" = [ "dep:flate2" ]; From a4a8cfab236f28b59ca0622ad50301509c75b28c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Natalie=20Klestrup=20R=C3=B6ijezon?= Date: Fri, 4 Oct 2024 17:06:05 +0200 Subject: [PATCH 05/10] Reintroduce Endpoints-based fallback --- Cargo.lock | 1 + Cargo.nix | 4 + rust/operator-binary/Cargo.toml | 1 + .../src/listener_controller.rs | 101 +++++++++++++++--- 4 files changed, 92 insertions(+), 15 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index bdc9aa29..500fead6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2326,6 +2326,7 @@ dependencies = [ "tokio-stream", "tonic", "tonic-reflection", + "tracing", ] [[package]] diff --git a/Cargo.nix b/Cargo.nix index 42eebabe..21493783 100644 --- a/Cargo.nix +++ b/Cargo.nix @@ -7364,6 +7364,10 @@ rec { name = "tonic-reflection"; packageId = "tonic-reflection"; } + { + name = "tracing"; + packageId = "tracing"; + } ]; buildDependencies = [ { diff --git a/rust/operator-binary/Cargo.toml b/rust/operator-binary/Cargo.toml index 701a54b4..5922a807 100644 --- a/rust/operator-binary/Cargo.toml +++ b/rust/operator-binary/Cargo.toml @@ -27,6 +27,7 @@ serde.workspace = true snafu.workspace = true strum.workspace = true h2.workspace = true +tracing = "0.1.40" [build-dependencies] built.workspace = true diff --git a/rust/operator-binary/src/listener_controller.rs b/rust/operator-binary/src/listener_controller.rs index f64f71a3..add59d09 100644 --- a/rust/operator-binary/src/listener_controller.rs +++ b/rust/operator-binary/src/listener_controller.rs @@ -3,7 +3,10 @@ use std::{ sync::Arc, }; -use futures::{future::try_join_all, StreamExt}; +use futures::{ + future::{try_join, try_join_all}, + StreamExt, +}; use snafu::{OptionExt, ResultExt, Snafu}; use stackable_operator::{ builder::meta::OwnerReferenceBuilder, @@ -12,7 +15,7 @@ use stackable_operator::{ ListenerStatus, ServiceType, }, k8s_openapi::{ - api::core::v1::{Node, PersistentVolume, Service, ServicePort, ServiceSpec}, + api::core::v1::{Endpoints, Node, PersistentVolume, Service, ServicePort, ServiceSpec}, apimachinery::pkg::apis::meta::v1::LabelSelector, }, kube::{ @@ -33,8 +36,28 @@ use stackable_operator::k8s_openapi::api::core::v1::Pod; const FIELD_MANAGER_SCOPE: &str = "listener"; pub async fn run(client: stackable_operator::client::Client) { - controller::Controller::new(client.get_all_api::(), watcher::Config::default()) + let controller = + controller::Controller::new(client.get_all_api::(), watcher::Config::default()); + let listener_store = controller.store(); + controller .owns(client.get_all_api::(), watcher::Config::default()) + .watches( + client.get_all_api::(), + watcher::Config::default(), + move |endpoints| { + listener_store + .state() + .into_iter() + .filter(move |listener| { + listener + .status + .as_ref() + .and_then(|s| s.service_name.as_deref()) + == endpoints.metadata.name.as_deref() + }) + .map(|l| ObjectRef::from_obj(&*l)) + }, + ) .watches( client.get_all_api::(), watcher::Config::default(), @@ -81,6 +104,10 @@ pub enum Error { ListenerPodSelector { source: ListenerMountedPodLabelError, }, + #[snafu(display("failed to get PersistentVolumes for Listener"))] + GetListenerPvs { + source: stackable_operator::client::Error, + }, #[snafu(display("failed to get {obj}"))] GetObject { source: stackable_operator::client::Error, @@ -113,6 +140,7 @@ impl ReconcilerError for Error { Self::NoListenerClass => None, Self::ListenerPvSelector { source: _ } => None, Self::ListenerPodSelector { source: _ } => None, + Self::GetListenerPvs { source: _ } => None, Self::GetObject { source: _, obj } => Some(obj.clone()), Self::BuildListenerOwnerRef { .. } => None, Self::ApplyService { source: _, svc } => Some(svc.clone().erase()), @@ -220,18 +248,34 @@ pub async fn reconcile(listener: Arc, ctx: Arc) -> Result; match listener_class.spec.service_type { ServiceType::NodePort => { - let pvs = ctx - .client - .list_with_label_selector::( - &(), - &LabelSelector { - match_labels: Some(listener_persistent_volume_label(&listener).unwrap()), - ..Default::default() - }, - ) - .await - .unwrap(); - let node_names = pvs + let (pvs, endpoints) = try_join( + async { + ctx.client + .list_with_label_selector::( + &(), + &LabelSelector { + match_labels: Some( + listener_persistent_volume_label(&listener).unwrap(), + ), + ..Default::default() + }, + ) + .await + .context(GetListenerPvsSnafu) + }, + async { + ctx.client + // Endpoints object may not yet be created by its respective controller + .get_opt::(&svc_name, ns) + .await + .with_context(|_| GetObjectSnafu { + obj: ObjectRef::::new(&svc_name).within(ns).erase(), + }) + }, + ) + .await?; + + let pv_node_names = pvs .into_iter() .filter_map(|pv| pv.spec?.node_affinity?.required) .flat_map(|affinity| affinity.node_selector_terms) @@ -241,6 +285,33 @@ pub async fn reconcile(listener: Arc, ctx: Arc) -> Result>(); + + // Old objects that haven't been mounted before the PV lookup mechanism was added will + // not have the correct labels, so we also look up using Endpoints. + let endpoints_node_names = endpoints + .into_iter() + .filter_map(|endpoints| endpoints.subsets) + .flatten() + .flat_map(|subset| subset.addresses) + .flatten() + .flat_map(|addr| addr.node_name) + .collect::>(); + + let node_names_missing_from_pv = endpoints_node_names + .difference(&pv_node_names) + .collect::>(); + if !node_names_missing_from_pv.is_empty() { + tracing::warn!( + ?node_names_missing_from_pv, + "some backing Nodes could only be found via legacy Endpoints discovery method, {} {}", + "this may cause discovery config to be unstable", + "(hint: try restarting the Pods backing this Listener)" + ); + } + + let mut node_names = pv_node_names; + node_names.extend(endpoints_node_names); + nodes = try_join_all(node_names.iter().map(|node_name| async { ctx.client .get::(node_name, &()) From 31817b1b89cb9476e276fbc3950fac4b0ef1fa45 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Natalie=20Klestrup=20R=C3=B6ijezon?= Date: Fri, 4 Oct 2024 17:09:58 +0200 Subject: [PATCH 06/10] Add watch on ListenerClass to the Listener controller --- rust/operator-binary/src/listener_controller.rs | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/rust/operator-binary/src/listener_controller.rs b/rust/operator-binary/src/listener_controller.rs index add59d09..296c4249 100644 --- a/rust/operator-binary/src/listener_controller.rs +++ b/rust/operator-binary/src/listener_controller.rs @@ -41,6 +41,22 @@ pub async fn run(client: stackable_operator::client::Client) { let listener_store = controller.store(); controller .owns(client.get_all_api::(), watcher::Config::default()) + .watches( + client.get_all_api::(), + watcher::Config::default(), + { + let listener_store = listener_store.clone(); + move |listenerclass| { + listener_store + .state() + .into_iter() + .filter(move |listener| { + listener.spec.class_name == listenerclass.metadata.name + }) + .map(|l| ObjectRef::from_obj(&*l)) + } + }, + ) .watches( client.get_all_api::(), watcher::Config::default(), From 43c6ad9eeeb379d7d6a160be9ddd2b05b6a64f38 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Natalie=20Klestrup=20R=C3=B6ijezon?= Date: Fri, 4 Oct 2024 17:11:07 +0200 Subject: [PATCH 07/10] Changelog --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5f7fb6df..df9415cf 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ All notable changes to this project will be documented in this file. ### Fixed - Listener.status.addresses is now de-duplicated ([#231]). +- Listener controller now listens for ListenerClass updates ([#231]). [#231]: https://github.com/stackabletech/listener-operator/pull/231 From 09859508d36a93a3060e899507144d286d871c51 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Natalie=20Klestrup=20R=C3=B6ijezon?= Date: Fri, 4 Oct 2024 17:18:56 +0200 Subject: [PATCH 08/10] Spit the hairy node lookup logic into a separate function --- .../src/listener_controller.rs | 141 ++++++++++-------- 1 file changed, 77 insertions(+), 64 deletions(-) diff --git a/rust/operator-binary/src/listener_controller.rs b/rust/operator-binary/src/listener_controller.rs index 296c4249..fa52c0da 100644 --- a/rust/operator-binary/src/listener_controller.rs +++ b/rust/operator-binary/src/listener_controller.rs @@ -264,70 +264,8 @@ pub async fn reconcile(listener: Arc, ctx: Arc) -> Result; match listener_class.spec.service_type { ServiceType::NodePort => { - let (pvs, endpoints) = try_join( - async { - ctx.client - .list_with_label_selector::( - &(), - &LabelSelector { - match_labels: Some( - listener_persistent_volume_label(&listener).unwrap(), - ), - ..Default::default() - }, - ) - .await - .context(GetListenerPvsSnafu) - }, - async { - ctx.client - // Endpoints object may not yet be created by its respective controller - .get_opt::(&svc_name, ns) - .await - .with_context(|_| GetObjectSnafu { - obj: ObjectRef::::new(&svc_name).within(ns).erase(), - }) - }, - ) - .await?; - - let pv_node_names = pvs - .into_iter() - .filter_map(|pv| pv.spec?.node_affinity?.required) - .flat_map(|affinity| affinity.node_selector_terms) - .filter_map(|terms| terms.match_expressions) - .flatten() - .filter(|expr| expr.key == NODE_TOPOLOGY_LABEL_HOSTNAME && expr.operator == "In") - .filter_map(|expr| expr.values) - .flatten() - .collect::>(); - - // Old objects that haven't been mounted before the PV lookup mechanism was added will - // not have the correct labels, so we also look up using Endpoints. - let endpoints_node_names = endpoints - .into_iter() - .filter_map(|endpoints| endpoints.subsets) - .flatten() - .flat_map(|subset| subset.addresses) - .flatten() - .flat_map(|addr| addr.node_name) - .collect::>(); - - let node_names_missing_from_pv = endpoints_node_names - .difference(&pv_node_names) - .collect::>(); - if !node_names_missing_from_pv.is_empty() { - tracing::warn!( - ?node_names_missing_from_pv, - "some backing Nodes could only be found via legacy Endpoints discovery method, {} {}", - "this may cause discovery config to be unstable", - "(hint: try restarting the Pods backing this Listener)" - ); - } - - let mut node_names = pv_node_names; - node_names.extend(endpoints_node_names); - + let node_names = + node_names_for_nodeport_listener(&ctx.client, &listener, ns, &svc_name).await?; nodes = try_join_all(node_names.iter().map(|node_name| async { ctx.client .get::(node_name, &()) @@ -428,6 +366,81 @@ pub fn error_policy(_obj: Arc, _error: &Error, _ctx: Arc) -> controll controller::Action::requeue(*Duration::from_secs(5)) } +/// Lists the names of the [`Node`]s backing this [`Listener`]. +/// +/// Should only be used for [`NodePort`](`ServiceType::NodePort`) [`Listener`]s. +async fn node_names_for_nodeport_listener( + client: &stackable_operator::client::Client, + listener: &Listener, + namespace: &str, + service_name: &str, +) -> Result> { + let (pvs, endpoints) = try_join( + async { + client + .list_with_label_selector::( + &(), + &LabelSelector { + match_labels: Some(listener_persistent_volume_label(listener).unwrap()), + ..Default::default() + }, + ) + .await + .context(GetListenerPvsSnafu) + }, + async { + client + // Endpoints object may not yet be created by its respective controller + .get_opt::(service_name, namespace) + .await + .with_context(|_| GetObjectSnafu { + obj: ObjectRef::::new(service_name) + .within(namespace) + .erase(), + }) + }, + ) + .await?; + + let pv_node_names = pvs + .into_iter() + .filter_map(|pv| pv.spec?.node_affinity?.required) + .flat_map(|affinity| affinity.node_selector_terms) + .filter_map(|terms| terms.match_expressions) + .flatten() + .filter(|expr| expr.key == NODE_TOPOLOGY_LABEL_HOSTNAME && expr.operator == "In") + .filter_map(|expr| expr.values) + .flatten() + .collect::>(); + + // Old objects that haven't been mounted before the PV lookup mechanism was added will + // not have the correct labels, so we also look up using Endpoints. + let endpoints_node_names = endpoints + .into_iter() + .filter_map(|endpoints| endpoints.subsets) + .flatten() + .flat_map(|subset| subset.addresses) + .flatten() + .flat_map(|addr| addr.node_name) + .collect::>(); + + let node_names_missing_from_pv = endpoints_node_names + .difference(&pv_node_names) + .collect::>(); + if !node_names_missing_from_pv.is_empty() { + tracing::warn!( + ?node_names_missing_from_pv, + "some backing Nodes could only be found via legacy Endpoints discovery method, {} {}", + "this may cause discovery config to be unstable", + "(hint: try restarting the Pods backing this Listener)" + ); + } + + let mut node_names = pv_node_names; + node_names.extend(endpoints_node_names); + Ok(node_names) +} + #[derive(Snafu, Debug)] #[snafu(module)] pub enum ListenerMountedPodLabelError { From 91336aaef6c62cf6efbcecc2ffe18f136d9bb2f0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Natalie=20Klestrup=20R=C3=B6ijezon?= Date: Mon, 7 Oct 2024 15:04:05 +0200 Subject: [PATCH 09/10] Formatting --- Cargo.toml | 1 + rust/operator-binary/Cargo.toml | 2 +- rust/operator-binary/src/listener_controller.rs | 16 +++++++++++++--- 3 files changed, 15 insertions(+), 4 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index b3561eb0..041f4e2c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -29,6 +29,7 @@ tokio-stream = { version = "0.1", features = ["net"] } tonic = "0.12" tonic-build = "0.12" tonic-reflection = "0.12" +tracing = "0.1.40" # [patch."https://github.com/stackabletech/operator-rs.git"] # stackable-operator = { git = "https://github.com/stackabletech//operator-rs.git", branch = "main" } diff --git a/rust/operator-binary/Cargo.toml b/rust/operator-binary/Cargo.toml index 5922a807..cf3e273a 100644 --- a/rust/operator-binary/Cargo.toml +++ b/rust/operator-binary/Cargo.toml @@ -27,7 +27,7 @@ serde.workspace = true snafu.workspace = true strum.workspace = true h2.workspace = true -tracing = "0.1.40" +tracing.workspace = true [build-dependencies] built.workspace = true diff --git a/rust/operator-binary/src/listener_controller.rs b/rust/operator-binary/src/listener_controller.rs index fa52c0da..43b36398 100644 --- a/rust/operator-binary/src/listener_controller.rs +++ b/rust/operator-binary/src/listener_controller.rs @@ -108,36 +108,45 @@ pub struct Ctx { pub enum Error { #[snafu(display("object has no namespace"))] NoNs, + #[snafu(display("object has no name"))] NoName, + #[snafu(display("object has no ListenerClass (.spec.class_name)"))] NoListenerClass, + #[snafu(display("failed to generate Listener's PersistentVolume selector"))] ListenerPvSelector { source: ListenerPersistentVolumeLabelError, }, + #[snafu(display("failed to generate Listener's Pod selector"))] ListenerPodSelector { source: ListenerMountedPodLabelError, }, + #[snafu(display("failed to get PersistentVolumes for Listener"))] GetListenerPvs { source: stackable_operator::client::Error, }, + #[snafu(display("failed to get {obj}"))] GetObject { source: stackable_operator::client::Error, obj: ObjectRef, }, + #[snafu(display("failed to build owner reference to Listener"))] BuildListenerOwnerRef { source: stackable_operator::builder::meta::Error, }, + #[snafu(display("failed to apply {svc}"))] ApplyService { source: stackable_operator::client::Error, svc: ObjectRef, }, + #[snafu(display("failed to apply status for Listener"))] ApplyStatus { source: stackable_operator::client::Error, @@ -430,9 +439,9 @@ async fn node_names_for_nodeport_listener( if !node_names_missing_from_pv.is_empty() { tracing::warn!( ?node_names_missing_from_pv, - "some backing Nodes could only be found via legacy Endpoints discovery method, {} {}", - "this may cause discovery config to be unstable", - "(hint: try restarting the Pods backing this Listener)" + "some backing Nodes could only be found via legacy Endpoints discovery method, \ + this may cause discovery config to be unstable \ + (hint: try restarting the Pods backing this Listener)", ); } @@ -475,6 +484,7 @@ pub fn listener_mounted_pod_label( pub enum ListenerPersistentVolumeLabelError { #[snafu(display("object has no name"))] NoName, + #[snafu(display("object has no namespace"))] NoNamespace, } From ace06202a98be1d444f8506a168349743408a007 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Natalie=20Klestrup=20R=C3=B6ijezon?= Date: Mon, 7 Oct 2024 15:45:50 +0200 Subject: [PATCH 10/10] Update futures, disable unused compat --- Cargo.lock | 51 +++++++++++++++++--------------------- Cargo.nix | 72 ++++++++++++++++++++---------------------------------- Cargo.toml | 2 +- 3 files changed, 49 insertions(+), 76 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 500fead6..f5fd108d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -698,15 +698,9 @@ dependencies = [ [[package]] name = "futures" -version = "0.1.31" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3a471a38ef8ed83cd6e40aa59c1ffe17db6855c18e3604d9c4ed8c08ebc28678" - -[[package]] -name = "futures" -version = "0.3.30" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "645c6916888f6cb6350d2550b80fb63e734897a8498abe35cfb732b6487804b0" +checksum = "65bc07b1a8bc7c85c5f2e110c476c7389b4554ba72af57d8445ea63a576b0876" dependencies = [ "futures-channel", "futures-core", @@ -719,9 +713,9 @@ dependencies = [ [[package]] name = "futures-channel" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eac8f7d7865dcb88bd4373ab671c8cf4508703796caa2b1985a9ca867b3fcb78" +checksum = "2dff15bf788c671c1934e366d07e30c1814a8ef514e1af724a602e8a2fbe1b10" dependencies = [ "futures-core", "futures-sink", @@ -729,15 +723,15 @@ dependencies = [ [[package]] name = "futures-core" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dfc6580bb841c5a68e9ef15c77ccc837b40a7504914d52e47b8b0e9bbda25a1d" +checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e" [[package]] name = "futures-executor" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a576fc72ae164fca6b9db127eaa9a9dda0d61316034f33a0a0d4eda41f02b01d" +checksum = "1e28d1d997f585e54aebc3f97d39e72338912123a67330d723fdbb564d646c9f" dependencies = [ "futures-core", "futures-task", @@ -746,15 +740,15 @@ dependencies = [ [[package]] name = "futures-io" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a44623e20b9681a318efdd71c299b6b222ed6f231972bfe2f224ebad6311f0c1" +checksum = "9e5c1b78ca4aae1ac06c48a526a655760685149f0d465d21f37abfe57ce075c6" [[package]] name = "futures-macro" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac" +checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650" dependencies = [ "proc-macro2", "quote", @@ -763,23 +757,22 @@ dependencies = [ [[package]] name = "futures-sink" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9fb8e00e87438d937621c1c6269e53f536c14d3fbd6a042bb24879e57d474fb5" +checksum = "e575fab7d1e0dcb8d0c7bcf9a63ee213816ab51902e6d244a95819acacf1d4f7" [[package]] name = "futures-task" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "38d84fa142264698cdce1a9f9172cf383a0c82de1bddcf3092901442c4097004" +checksum = "f90f7dce0722e95104fcb095585910c0977252f286e354b5e3bd38902cd99988" [[package]] name = "futures-util" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3d6401deb83407ab3da39eba7e33987a73c3df0c82b4bb5813ee871c19c41d48" +checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81" dependencies = [ - "futures 0.1.31", "futures-channel", "futures-core", "futures-io", @@ -1256,7 +1249,7 @@ dependencies = [ "bytes", "chrono", "either", - "futures 0.3.30", + "futures", "home", "http", "http-body", @@ -1327,7 +1320,7 @@ dependencies = [ "async-trait", "backoff", "derivative", - "futures 0.3.30", + "futures", "hashbrown 0.14.5", "json-patch", "jsonptr", @@ -2312,7 +2305,7 @@ dependencies = [ "built", "clap", "csi-grpc", - "futures 0.3.30", + "futures", "h2", "libc", "pin-project", @@ -2341,7 +2334,7 @@ dependencies = [ "derivative", "dockerfile-parser", "either", - "futures 0.3.30", + "futures", "json-patch", "k8s-openapi", "kube", diff --git a/Cargo.nix b/Cargo.nix index 21493783..17344144 100644 --- a/Cargo.nix +++ b/Cargo.nix @@ -2022,24 +2022,11 @@ rec { }; resolvedDefaultFeatures = [ "alloc" "default" "std" ]; }; - "futures 0.1.31" = rec { + "futures" = rec { crateName = "futures"; - version = "0.1.31"; - edition = "2015"; - sha256 = "0y46qbmhi37dqkch8dlfq5aninqpzqgrr98awkb3rn4fxww1lirs"; - authors = [ - "Alex Crichton " - ]; - features = { - "default" = [ "use_std" "with-deprecated" ]; - }; - resolvedDefaultFeatures = [ "default" "use_std" "with-deprecated" ]; - }; - "futures 0.3.30" = rec { - crateName = "futures"; - version = "0.3.30"; + version = "0.3.31"; edition = "2018"; - sha256 = "1c04g14bccmprwsvx2j9m2blhwrynq7vhl151lsvcv4gi0b6jp34"; + sha256 = "0xh8ddbkm9jy8kc5gbvjp9a4b6rqqxvc8471yb2qaz5wm2qhgg35"; dependencies = [ { name = "futures-channel"; @@ -2094,13 +2081,13 @@ rec { "unstable" = [ "futures-core/unstable" "futures-task/unstable" "futures-channel/unstable" "futures-io/unstable" "futures-util/unstable" ]; "write-all-vectored" = [ "futures-util/write-all-vectored" ]; }; - resolvedDefaultFeatures = [ "alloc" "async-await" "compat" "default" "executor" "futures-executor" "std" ]; + resolvedDefaultFeatures = [ "alloc" "async-await" "default" "executor" "futures-executor" "std" ]; }; "futures-channel" = rec { crateName = "futures-channel"; - version = "0.3.30"; + version = "0.3.31"; edition = "2018"; - sha256 = "0y6b7xxqdjm9hlcjpakcg41qfl7lihf6gavk8fyqijsxhvbzgj7a"; + sha256 = "040vpqpqlbk099razq8lyn74m0f161zd0rp36hciqrwcg2zibzrd"; libName = "futures_channel"; dependencies = [ { @@ -2126,9 +2113,9 @@ rec { }; "futures-core" = rec { crateName = "futures-core"; - version = "0.3.30"; + version = "0.3.31"; edition = "2018"; - sha256 = "07aslayrn3lbggj54kci0ishmd1pr367fp7iks7adia1p05miinz"; + sha256 = "0gk6yrxgi5ihfanm2y431jadrll00n5ifhnpx090c2f2q1cr1wh5"; libName = "futures_core"; features = { "default" = [ "std" ]; @@ -2139,9 +2126,9 @@ rec { }; "futures-executor" = rec { crateName = "futures-executor"; - version = "0.3.30"; + version = "0.3.31"; edition = "2018"; - sha256 = "07dh08gs9vfll2h36kq32q9xd86xm6lyl9xikmmwlkqnmrrgqxm5"; + sha256 = "17vcci6mdfzx4gbk0wx64chr2f13wwwpvyf3xd5fb1gmjzcx2a0y"; libName = "futures_executor"; dependencies = [ { @@ -2170,9 +2157,9 @@ rec { }; "futures-io" = rec { crateName = "futures-io"; - version = "0.3.30"; + version = "0.3.31"; edition = "2018"; - sha256 = "1hgh25isvsr4ybibywhr4dpys8mjnscw4wfxxwca70cn1gi26im4"; + sha256 = "1ikmw1yfbgvsychmsihdkwa8a1knank2d9a8dk01mbjar9w1np4y"; libName = "futures_io"; features = { "default" = [ "std" ]; @@ -2181,9 +2168,9 @@ rec { }; "futures-macro" = rec { crateName = "futures-macro"; - version = "0.3.30"; + version = "0.3.31"; edition = "2018"; - sha256 = "1b49qh9d402y8nka4q6wvvj0c88qq91wbr192mdn5h54nzs0qxc7"; + sha256 = "0l1n7kqzwwmgiznn0ywdc5i24z72zvh9q1dwps54mimppi7f6bhn"; procMacro = true; libName = "futures_macro"; dependencies = [ @@ -2205,9 +2192,9 @@ rec { }; "futures-sink" = rec { crateName = "futures-sink"; - version = "0.3.30"; + version = "0.3.31"; edition = "2018"; - sha256 = "1dag8xyyaya8n8mh8smx7x6w2dpmafg2din145v973a3hw7f1f4z"; + sha256 = "1xyly6naq6aqm52d5rh236snm08kw8zadydwqz8bip70s6vzlxg5"; libName = "futures_sink"; features = { "default" = [ "std" ]; @@ -2217,9 +2204,9 @@ rec { }; "futures-task" = rec { crateName = "futures-task"; - version = "0.3.30"; + version = "0.3.31"; edition = "2018"; - sha256 = "013h1724454hj8qczp8vvs10qfiqrxr937qsrv6rhii68ahlzn1q"; + sha256 = "124rv4n90f5xwfsm9qw6y99755y021cmi5dhzh253s920z77s3zr"; libName = "futures_task"; features = { "default" = [ "std" ]; @@ -2229,17 +2216,11 @@ rec { }; "futures-util" = rec { crateName = "futures-util"; - version = "0.3.30"; + version = "0.3.31"; edition = "2018"; - sha256 = "0j0xqhcir1zf2dcbpd421kgw6wvsk0rpxflylcysn1rlp3g02r1x"; + sha256 = "10aa1ar8bgkgbr4wzxlidkqkcxf77gffyj8j7768h831pcaq784z"; libName = "futures_util"; dependencies = [ - { - name = "futures"; - packageId = "futures 0.1.31"; - rename = "futures_01"; - optional = true; - } { name = "futures-channel"; packageId = "futures-channel"; @@ -2317,7 +2298,7 @@ rec { "unstable" = [ "futures-core/unstable" "futures-task/unstable" ]; "write-all-vectored" = [ "io" ]; }; - resolvedDefaultFeatures = [ "alloc" "async-await" "async-await-macro" "channel" "compat" "futures-channel" "futures-io" "futures-macro" "futures-sink" "futures_01" "io" "memchr" "sink" "slab" "std" ]; + resolvedDefaultFeatures = [ "alloc" "async-await" "async-await-macro" "channel" "futures-channel" "futures-io" "futures-macro" "futures-sink" "io" "memchr" "sink" "slab" "std" ]; }; "generic-array" = rec { crateName = "generic-array"; @@ -3820,7 +3801,7 @@ rec { } { name = "futures"; - packageId = "futures 0.3.30"; + packageId = "futures"; optional = true; usesDefaultFeatures = false; features = [ "std" ]; @@ -3961,7 +3942,7 @@ rec { devDependencies = [ { name = "futures"; - packageId = "futures 0.3.30"; + packageId = "futures"; usesDefaultFeatures = false; features = [ "async-await" ]; } @@ -4179,7 +4160,7 @@ rec { } { name = "futures"; - packageId = "futures 0.3.30"; + packageId = "futures"; usesDefaultFeatures = false; features = [ "async-await" ]; } @@ -7305,8 +7286,7 @@ rec { } { name = "futures"; - packageId = "futures 0.3.30"; - features = [ "compat" ]; + packageId = "futures"; } { name = "h2"; @@ -7425,7 +7405,7 @@ rec { } { name = "futures"; - packageId = "futures 0.3.30"; + packageId = "futures"; } { name = "json-patch"; diff --git a/Cargo.toml b/Cargo.toml index 041f4e2c..29bc2bf6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,7 +13,7 @@ repository = "https://github.com/stackabletech/listener-operator" anyhow = "1.0" built = { version = "0.7", features = ["chrono", "git2"] } clap = "4.5" -futures = { version = "0.3", features = ["compat"] } +futures = { version = "0.3" } h2 = "0.4" libc = "0.2" pin-project = "1.1"