1- use std:: { collections:: BTreeMap , sync:: Arc } ;
1+ use std:: {
2+ collections:: { BTreeMap , BTreeSet } ,
3+ sync:: Arc ,
4+ } ;
25
36use futures:: { future:: try_join_all, StreamExt } ;
47use snafu:: { OptionExt , ResultExt , Snafu } ;
@@ -8,44 +11,39 @@ use stackable_operator::{
811 AddressType , Listener , ListenerClass , ListenerIngress , ListenerPort , ListenerSpec ,
912 ListenerStatus , ServiceType ,
1013 } ,
11- k8s_openapi:: api:: core:: v1:: { Endpoints , Node , Service , ServicePort , ServiceSpec } ,
14+ k8s_openapi:: {
15+ api:: core:: v1:: { Node , PersistentVolume , Service , ServicePort , ServiceSpec } ,
16+ apimachinery:: pkg:: apis:: meta:: v1:: LabelSelector ,
17+ } ,
1218 kube:: {
1319 api:: { DynamicObject , ObjectMeta } ,
1420 runtime:: { controller, reflector:: ObjectRef , watcher} ,
21+ ResourceExt ,
1522 } ,
1623 logging:: controller:: { report_controller_reconciled, ReconcilerError } ,
1724 time:: Duration ,
1825} ;
1926use strum:: IntoStaticStr ;
2027
21- use crate :: utils:: node_primary_address;
28+ use crate :: { csi_server :: node :: NODE_TOPOLOGY_LABEL_HOSTNAME , utils:: node_primary_address} ;
2229
2330#[ cfg( doc) ]
2431use stackable_operator:: k8s_openapi:: api:: core:: v1:: Pod ;
2532
2633const FIELD_MANAGER_SCOPE : & str = "listener" ;
2734
2835pub async fn run ( client : stackable_operator:: client:: Client ) {
29- let controller =
30- controller:: Controller :: new ( client. get_all_api :: < Listener > ( ) , watcher:: Config :: default ( ) ) ;
31- let listener_store = controller. store ( ) ;
32- controller
36+ controller:: Controller :: new ( client. get_all_api :: < Listener > ( ) , watcher:: Config :: default ( ) )
3337 . owns ( client. get_all_api :: < Service > ( ) , watcher:: Config :: default ( ) )
3438 . watches (
35- client. get_all_api :: < Endpoints > ( ) ,
39+ client. get_all_api :: < PersistentVolume > ( ) ,
3640 watcher:: Config :: default ( ) ,
37- move |endpoints| {
38- listener_store
39- . state ( )
40- . into_iter ( )
41- . filter ( move |listener| {
42- listener
43- . status
44- . as_ref ( )
45- . and_then ( |s| s. service_name . as_deref ( ) )
46- == endpoints. metadata . name . as_deref ( )
47- } )
48- . map ( |l| ObjectRef :: from_obj ( & * l) )
41+ |pv| {
42+ let labels = pv. labels ( ) ;
43+ labels
44+ . get ( PV_LABEL_LISTENER_NAMESPACE )
45+ . zip ( labels. get ( PV_LABEL_LISTENER_NAME ) )
46+ . map ( |( ns, name) | ObjectRef :: < Listener > :: new ( name) . within ( ns) )
4947 } ,
5048 )
5149 . shutdown_on_signal ( )
@@ -75,7 +73,11 @@ pub enum Error {
7573 NoName ,
7674 #[ snafu( display( "object has no ListenerClass (.spec.class_name)" ) ) ]
7775 NoListenerClass ,
78- #[ snafu( display( "failed to generate listener's pod selector" ) ) ]
76+ #[ snafu( display( "failed to generate Listener's PersistentVolume selector" ) ) ]
77+ ListenerPvSelector {
78+ source : ListenerPersistentVolumeLabelError ,
79+ } ,
80+ #[ snafu( display( "failed to generate Listener's Pod selector" ) ) ]
7981 ListenerPodSelector {
8082 source : ListenerMountedPodLabelError ,
8183 } ,
@@ -109,6 +111,7 @@ impl ReconcilerError for Error {
109111 Self :: NoNs => None ,
110112 Self :: NoName => None ,
111113 Self :: NoListenerClass => None ,
114+ Self :: ListenerPvSelector { source : _ } => None ,
112115 Self :: ListenerPodSelector { source : _ } => None ,
113116 Self :: GetObject { source : _, obj } => Some ( obj. clone ( ) ) ,
114117 Self :: BuildListenerOwnerRef { .. } => None ,
@@ -217,23 +220,27 @@ pub async fn reconcile(listener: Arc<Listener>, ctx: Arc<Ctx>) -> Result<control
217220 let ports: BTreeMap < String , i32 > ;
218221 match listener_class. spec . service_type {
219222 ServiceType :: NodePort => {
220- let endpoints = ctx
223+ let pvs = ctx
221224 . client
222- . get_opt :: < Endpoints > ( & svc_name, ns)
225+ . list_with_label_selector :: < PersistentVolume > (
226+ & ( ) ,
227+ & LabelSelector {
228+ match_labels : Some ( listener_persistent_volume_label ( & listener) . unwrap ( ) ) ,
229+ ..Default :: default ( )
230+ } ,
231+ )
223232 . await
224- . with_context ( |_| GetObjectSnafu {
225- obj : ObjectRef :: < Endpoints > :: new ( & svc_name) . within ( ns) . erase ( ) ,
226- } ) ?
227- // Endpoints object may not yet be created by its respective controller
228- . unwrap_or_default ( ) ;
229- let node_names = endpoints
230- . subsets
233+ . unwrap ( ) ;
234+ let node_names = pvs
231235 . into_iter ( )
236+ . filter_map ( |pv| pv. spec ?. node_affinity ?. required )
237+ . flat_map ( |affinity| affinity. node_selector_terms )
238+ . filter_map ( |terms| terms. match_expressions )
232239 . flatten ( )
233- . flat_map ( |subset| subset. addresses )
240+ . filter ( |expr| expr. key == NODE_TOPOLOGY_LABEL_HOSTNAME && expr. operator == "In" )
241+ . filter_map ( |expr| expr. values )
234242 . flatten ( )
235- . flat_map ( |addr| addr. node_name )
236- . collect :: < Vec < _ > > ( ) ;
243+ . collect :: < BTreeSet < _ > > ( ) ;
237244 nodes = try_join_all ( node_names. iter ( ) . map ( |node_name| async {
238245 ctx. client
239246 . get :: < Node > ( node_name, & ( ) )
@@ -356,8 +363,43 @@ pub fn listener_mounted_pod_label(
356363 // 60.
357364 // We prefer uid over name because uids have a consistent length.
358365 Ok ( (
366+ // This should probably have been listeners.stackable.tech/ instead, but too late to change now
359367 format ! ( "listener.stackable.tech/mnt.{}" , uid. replace( '-' , "" ) ) ,
360368 // Arbitrary, but (hopefully) helps indicate to users which listener it applies to
361369 listener. metadata . name . clone ( ) . context ( NoNameSnafu ) ?,
362370 ) )
363371}
372+
373+ #[ derive( Snafu , Debug ) ]
374+ #[ snafu( module) ]
375+ pub enum ListenerPersistentVolumeLabelError {
376+ #[ snafu( display( "object has no name" ) ) ]
377+ NoName ,
378+ #[ snafu( display( "object has no namespace" ) ) ]
379+ NoNamespace ,
380+ }
381+
382+ const PV_LABEL_LISTENER_NAMESPACE : & str = "listeners.stackable.tech/listener-namespace" ;
383+ const PV_LABEL_LISTENER_NAME : & str = "listeners.stackable.tech/listener-name" ;
384+
385+ /// A label that identifies which [`Listener`] corresponds to a given [`PersistentVolume`].
386+ pub fn listener_persistent_volume_label (
387+ listener : & Listener ,
388+ ) -> Result < BTreeMap < String , String > , ListenerPersistentVolumeLabelError > {
389+ use listener_persistent_volume_label_error:: * ;
390+ Ok ( [
391+ (
392+ PV_LABEL_LISTENER_NAMESPACE . to_string ( ) ,
393+ listener
394+ . metadata
395+ . namespace
396+ . clone ( )
397+ . context ( NoNamespaceSnafu ) ?,
398+ ) ,
399+ (
400+ PV_LABEL_LISTENER_NAME . to_string ( ) ,
401+ listener. metadata . name . clone ( ) . context ( NoNameSnafu ) ?,
402+ ) ,
403+ ]
404+ . into ( ) )
405+ }
0 commit comments