Skip to content
3 changes: 3 additions & 0 deletions rust/csi-grpc/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
//! Include gRPC definition files that have been generated by `build.rs`

// CSI docs don't quite align with Rustdoc conventions
#![allow(clippy::doc_lazy_continuation)]

pub static FILE_DESCRIPTOR_SET_BYTES: &[u8] =
include_bytes!(concat!(env!("OUT_DIR"), "/file_descriptor_set.bin"));

Expand Down
46 changes: 44 additions & 2 deletions rust/operator-binary/src/csi_server/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,19 @@ use std::{fmt::Debug, path::PathBuf};
use tonic::{Request, Response, Status};

use crate::{
listener_controller::{listener_mounted_pod_label, ListenerMountedPodLabelError},
listener_controller::{
listener_mounted_pod_label, listener_persistent_volume_label, ListenerMountedPodLabelError,
ListenerPersistentVolumeLabelError,
},
utils::{error_full_message, node_primary_address},
};

use super::{tonic_unimplemented, ListenerSelector, ListenerVolumeContext};

const FIELD_MANAGER_SCOPE: &str = "volume";

pub const NODE_TOPOLOGY_LABEL_HOSTNAME: &str = "listeners.stackable.tech/hostname";

pub struct ListenerOperatorNode {
pub client: stackable_operator::client::Client,
pub node_name: String,
Expand Down Expand Up @@ -55,6 +60,12 @@ enum PublishVolumeError {
#[snafu(display("PersistentVolume has no corresponding PersistentVolumeClaim"))]
UnclaimedPv,

#[snafu(display("failed to generate {listener}'s PersistentVolume selector"))]
ListenerPvReference {
source: ListenerPersistentVolumeLabelError,
listener: ObjectRef<Listener>,
},

#[snafu(display("failed to generate {listener}'s pod selector"))]
ListenerPodSelector {
source: ListenerMountedPodLabelError,
Expand All @@ -75,6 +86,12 @@ enum PublishVolumeError {
listener: ObjectRef<Listener>,
},

#[snafu(display("failed to add listener label to {pv}"))]
AddListenerLabelToPv {
source: stackable_operator::client::Error,
pv: ObjectRef<PersistentVolume>,
},

#[snafu(display("failed to add listener label to {pod}"))]
AddListenerLabelToPod {
source: stackable_operator::client::Error,
Expand Down Expand Up @@ -112,9 +129,11 @@ impl From<PublishVolumeError> for Status {
PublishVolumeError::GetObject { .. } => Status::unavailable(full_msg),
PublishVolumeError::UnclaimedPv => Status::unavailable(full_msg),
PublishVolumeError::PodHasNoNode { .. } => Status::unavailable(full_msg),
PublishVolumeError::ListenerPvReference { .. } => Status::failed_precondition(full_msg),
PublishVolumeError::ListenerPodSelector { .. } => Status::failed_precondition(full_msg),
PublishVolumeError::BuildListenerOwnerRef { .. } => Status::unavailable(full_msg),
PublishVolumeError::ApplyListener { .. } => Status::unavailable(full_msg),
PublishVolumeError::AddListenerLabelToPv { .. } => Status::unavailable(full_msg),
PublishVolumeError::AddListenerLabelToPod { .. } => Status::unavailable(full_msg),
PublishVolumeError::NoAddresses { .. } => Status::unavailable(full_msg),
PublishVolumeError::PreparePodDir { .. } => Status::internal(full_msg),
Expand Down Expand Up @@ -155,7 +174,7 @@ impl csi::v1::node_server::Node for ListenerOperatorNode {
max_volumes_per_node: i64::MAX,
accessible_topology: Some(Topology {
segments: [(
"listeners.stackable.tech/hostname".to_string(),
NODE_TOPOLOGY_LABEL_HOSTNAME.to_string(),
self.node_name.clone(),
)]
.into(),
Expand Down Expand Up @@ -276,6 +295,29 @@ impl csi::v1::node_server::Node for ListenerOperatorNode {
}
};

// Add listener label to PV, allowing traffic to be directed based on reservations, rather than which replicas are *currently* active.
// See https://github.com/stackabletech/listener-operator/issues/220
self.client
.apply_patch(
FIELD_MANAGER_SCOPE,
&pv,
&PersistentVolume {
metadata: ObjectMeta {
labels: Some(listener_persistent_volume_label(&listener).context(
ListenerPvReferenceSnafu {
listener: ObjectRef::from_obj(&listener),
},
)?),
..Default::default()
},
..Default::default()
},
)
.await
.with_context(|_| AddListenerLabelToPvSnafu {
pv: ObjectRef::from_obj(&pv),
})?;

// Add listener label to pod so that traffic can be directed to it
self.client
// IMPORTANT
Expand Down
108 changes: 75 additions & 33 deletions rust/operator-binary/src/listener_controller.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use std::{collections::BTreeMap, sync::Arc};
use std::{
collections::{BTreeMap, BTreeSet},
sync::Arc,
};

use futures::{future::try_join_all, StreamExt};
use snafu::{OptionExt, ResultExt, Snafu};
Expand All @@ -8,44 +11,39 @@ use stackable_operator::{
AddressType, Listener, ListenerClass, ListenerIngress, ListenerPort, ListenerSpec,
ListenerStatus, ServiceType,
},
k8s_openapi::api::core::v1::{Endpoints, Node, Service, ServicePort, ServiceSpec},
k8s_openapi::{
api::core::v1::{Node, PersistentVolume, Service, ServicePort, ServiceSpec},
apimachinery::pkg::apis::meta::v1::LabelSelector,
},
kube::{
api::{DynamicObject, ObjectMeta},
runtime::{controller, reflector::ObjectRef, watcher},
ResourceExt,
},
logging::controller::{report_controller_reconciled, ReconcilerError},
time::Duration,
};
use strum::IntoStaticStr;

use crate::utils::node_primary_address;
use crate::{csi_server::node::NODE_TOPOLOGY_LABEL_HOSTNAME, utils::node_primary_address};

#[cfg(doc)]
use stackable_operator::k8s_openapi::api::core::v1::Pod;

const FIELD_MANAGER_SCOPE: &str = "listener";

pub async fn run(client: stackable_operator::client::Client) {
let controller =
controller::Controller::new(client.get_all_api::<Listener>(), watcher::Config::default());
let listener_store = controller.store();
controller
controller::Controller::new(client.get_all_api::<Listener>(), watcher::Config::default())
.owns(client.get_all_api::<Service>(), watcher::Config::default())
.watches(
client.get_all_api::<Endpoints>(),
client.get_all_api::<PersistentVolume>(),
watcher::Config::default(),
move |endpoints| {
listener_store
.state()
.into_iter()
.filter(move |listener| {
listener
.status
.as_ref()
.and_then(|s| s.service_name.as_deref())
== endpoints.metadata.name.as_deref()
})
.map(|l| ObjectRef::from_obj(&*l))
|pv| {
let labels = pv.labels();
labels
.get(PV_LABEL_LISTENER_NAMESPACE)
.zip(labels.get(PV_LABEL_LISTENER_NAME))
.map(|(ns, name)| ObjectRef::<Listener>::new(name).within(ns))
},
)
.shutdown_on_signal()
Expand Down Expand Up @@ -75,7 +73,11 @@ pub enum Error {
NoName,
#[snafu(display("object has no ListenerClass (.spec.class_name)"))]
NoListenerClass,
#[snafu(display("failed to generate listener's pod selector"))]
#[snafu(display("failed to generate Listener's PersistentVolume selector"))]
ListenerPvSelector {
source: ListenerPersistentVolumeLabelError,
},
#[snafu(display("failed to generate Listener's Pod selector"))]
ListenerPodSelector {
source: ListenerMountedPodLabelError,
},
Expand Down Expand Up @@ -109,6 +111,7 @@ impl ReconcilerError for Error {
Self::NoNs => None,
Self::NoName => None,
Self::NoListenerClass => None,
Self::ListenerPvSelector { source: _ } => None,
Self::ListenerPodSelector { source: _ } => None,
Self::GetObject { source: _, obj } => Some(obj.clone()),
Self::BuildListenerOwnerRef { .. } => None,
Expand Down Expand Up @@ -217,23 +220,27 @@ pub async fn reconcile(listener: Arc<Listener>, ctx: Arc<Ctx>) -> Result<control
let ports: BTreeMap<String, i32>;
match listener_class.spec.service_type {
ServiceType::NodePort => {
let endpoints = ctx
let pvs = ctx
.client
.get_opt::<Endpoints>(&svc_name, ns)
.list_with_label_selector::<PersistentVolume>(
&(),
&LabelSelector {
match_labels: Some(listener_persistent_volume_label(&listener).unwrap()),
..Default::default()
},
)
.await
.with_context(|_| GetObjectSnafu {
obj: ObjectRef::<Endpoints>::new(&svc_name).within(ns).erase(),
})?
// Endpoints object may not yet be created by its respective controller
.unwrap_or_default();
let node_names = endpoints
.subsets
.unwrap();
let node_names = pvs
.into_iter()
.filter_map(|pv| pv.spec?.node_affinity?.required)
.flat_map(|affinity| affinity.node_selector_terms)
.filter_map(|terms| terms.match_expressions)
.flatten()
.flat_map(|subset| subset.addresses)
.filter(|expr| expr.key == NODE_TOPOLOGY_LABEL_HOSTNAME && expr.operator == "In")
.filter_map(|expr| expr.values)
.flatten()
.flat_map(|addr| addr.node_name)
.collect::<Vec<_>>();
.collect::<BTreeSet<_>>();
nodes = try_join_all(node_names.iter().map(|node_name| async {
ctx.client
.get::<Node>(node_name, &())
Expand Down Expand Up @@ -356,8 +363,43 @@ pub fn listener_mounted_pod_label(
// 60.
// We prefer uid over name because uids have a consistent length.
Ok((
// This should probably have been listeners.stackable.tech/ instead, but too late to change now
format!("listener.stackable.tech/mnt.{}", uid.replace('-', "")),
// Arbitrary, but (hopefully) helps indicate to users which listener it applies to
listener.metadata.name.clone().context(NoNameSnafu)?,
))
}

#[derive(Snafu, Debug)]
#[snafu(module)]
pub enum ListenerPersistentVolumeLabelError {
#[snafu(display("object has no name"))]
NoName,
#[snafu(display("object has no namespace"))]
NoNamespace,
}

const PV_LABEL_LISTENER_NAMESPACE: &str = "listeners.stackable.tech/listener-namespace";
const PV_LABEL_LISTENER_NAME: &str = "listeners.stackable.tech/listener-name";

/// A label that identifies which [`Listener`] corresponds to a given [`PersistentVolume`].
pub fn listener_persistent_volume_label(
listener: &Listener,
) -> Result<BTreeMap<String, String>, ListenerPersistentVolumeLabelError> {
use listener_persistent_volume_label_error::*;
Ok([
(
PV_LABEL_LISTENER_NAMESPACE.to_string(),
listener
.metadata
.namespace
.clone()
.context(NoNamespaceSnafu)?,
),
(
PV_LABEL_LISTENER_NAME.to_string(),
listener.metadata.name.clone().context(NoNameSnafu)?,
),
]
.into())
}