@@ -3,7 +3,10 @@ use std::{
33 sync:: Arc ,
44} ;
55
6- use futures:: { future:: try_join_all, StreamExt } ;
6+ use futures:: {
7+ future:: { try_join, try_join_all} ,
8+ StreamExt ,
9+ } ;
710use snafu:: { OptionExt , ResultExt , Snafu } ;
811use stackable_operator:: {
912 builder:: meta:: OwnerReferenceBuilder ,
@@ -12,7 +15,7 @@ use stackable_operator::{
1215 ListenerStatus , ServiceType ,
1316 } ,
1417 k8s_openapi:: {
15- api:: core:: v1:: { Node , PersistentVolume , Service , ServicePort , ServiceSpec } ,
18+ api:: core:: v1:: { Endpoints , Node , PersistentVolume , Service , ServicePort , ServiceSpec } ,
1619 apimachinery:: pkg:: apis:: meta:: v1:: LabelSelector ,
1720 } ,
1821 kube:: {
@@ -33,8 +36,28 @@ use stackable_operator::k8s_openapi::api::core::v1::Pod;
3336const FIELD_MANAGER_SCOPE : & str = "listener" ;
3437
3538pub async fn run ( client : stackable_operator:: client:: Client ) {
36- controller:: Controller :: new ( client. get_all_api :: < Listener > ( ) , watcher:: Config :: default ( ) )
39+ let controller =
40+ controller:: Controller :: new ( client. get_all_api :: < Listener > ( ) , watcher:: Config :: default ( ) ) ;
41+ let listener_store = controller. store ( ) ;
42+ controller
3743 . owns ( client. get_all_api :: < Service > ( ) , watcher:: Config :: default ( ) )
44+ . watches (
45+ client. get_all_api :: < Endpoints > ( ) ,
46+ watcher:: Config :: default ( ) ,
47+ move |endpoints| {
48+ listener_store
49+ . state ( )
50+ . into_iter ( )
51+ . filter ( move |listener| {
52+ listener
53+ . status
54+ . as_ref ( )
55+ . and_then ( |s| s. service_name . as_deref ( ) )
56+ == endpoints. metadata . name . as_deref ( )
57+ } )
58+ . map ( |l| ObjectRef :: from_obj ( & * l) )
59+ } ,
60+ )
3861 . watches (
3962 client. get_all_api :: < PersistentVolume > ( ) ,
4063 watcher:: Config :: default ( ) ,
@@ -81,6 +104,10 @@ pub enum Error {
81104 ListenerPodSelector {
82105 source : ListenerMountedPodLabelError ,
83106 } ,
107+ #[ snafu( display( "failed to get PersistentVolumes for Listener" ) ) ]
108+ GetListenerPvs {
109+ source : stackable_operator:: client:: Error ,
110+ } ,
84111 #[ snafu( display( "failed to get {obj}" ) ) ]
85112 GetObject {
86113 source : stackable_operator:: client:: Error ,
@@ -113,6 +140,7 @@ impl ReconcilerError for Error {
113140 Self :: NoListenerClass => None ,
114141 Self :: ListenerPvSelector { source : _ } => None ,
115142 Self :: ListenerPodSelector { source : _ } => None ,
143+ Self :: GetListenerPvs { source : _ } => None ,
116144 Self :: GetObject { source : _, obj } => Some ( obj. clone ( ) ) ,
117145 Self :: BuildListenerOwnerRef { .. } => None ,
118146 Self :: ApplyService { source : _, svc } => Some ( svc. clone ( ) . erase ( ) ) ,
@@ -220,18 +248,34 @@ pub async fn reconcile(listener: Arc<Listener>, ctx: Arc<Ctx>) -> Result<control
220248 let ports: BTreeMap < String , i32 > ;
221249 match listener_class. spec . service_type {
222250 ServiceType :: NodePort => {
223- let pvs = ctx
224- . client
225- . list_with_label_selector :: < PersistentVolume > (
226- & ( ) ,
227- & LabelSelector {
228- match_labels : Some ( listener_persistent_volume_label ( & listener) . unwrap ( ) ) ,
229- ..Default :: default ( )
230- } ,
231- )
232- . await
233- . unwrap ( ) ;
234- let node_names = pvs
251+ let ( pvs, endpoints) = try_join (
252+ async {
253+ ctx. client
254+ . list_with_label_selector :: < PersistentVolume > (
255+ & ( ) ,
256+ & LabelSelector {
257+ match_labels : Some (
258+ listener_persistent_volume_label ( & listener) . unwrap ( ) ,
259+ ) ,
260+ ..Default :: default ( )
261+ } ,
262+ )
263+ . await
264+ . context ( GetListenerPvsSnafu )
265+ } ,
266+ async {
267+ ctx. client
268+ // Endpoints object may not yet be created by its respective controller
269+ . get_opt :: < Endpoints > ( & svc_name, ns)
270+ . await
271+ . with_context ( |_| GetObjectSnafu {
272+ obj : ObjectRef :: < Endpoints > :: new ( & svc_name) . within ( ns) . erase ( ) ,
273+ } )
274+ } ,
275+ )
276+ . await ?;
277+
278+ let pv_node_names = pvs
235279 . into_iter ( )
236280 . filter_map ( |pv| pv. spec ?. node_affinity ?. required )
237281 . flat_map ( |affinity| affinity. node_selector_terms )
@@ -241,6 +285,33 @@ pub async fn reconcile(listener: Arc<Listener>, ctx: Arc<Ctx>) -> Result<control
241285 . filter_map ( |expr| expr. values )
242286 . flatten ( )
243287 . collect :: < BTreeSet < _ > > ( ) ;
288+
289+ // Old objects that haven't been mounted before the PV lookup mechanism was added will
290+ // not have the correct labels, so we also look up using Endpoints.
291+ let endpoints_node_names = endpoints
292+ . into_iter ( )
293+ . filter_map ( |endpoints| endpoints. subsets )
294+ . flatten ( )
295+ . flat_map ( |subset| subset. addresses )
296+ . flatten ( )
297+ . flat_map ( |addr| addr. node_name )
298+ . collect :: < BTreeSet < _ > > ( ) ;
299+
300+ let node_names_missing_from_pv = endpoints_node_names
301+ . difference ( & pv_node_names)
302+ . collect :: < Vec < _ > > ( ) ;
303+ if !node_names_missing_from_pv. is_empty ( ) {
304+ tracing:: warn!(
305+ ?node_names_missing_from_pv,
306+ "some backing Nodes could only be found via legacy Endpoints discovery method, {} {}" ,
307+ "this may cause discovery config to be unstable" ,
308+ "(hint: try restarting the Pods backing this Listener)"
309+ ) ;
310+ }
311+
312+ let mut node_names = pv_node_names;
313+ node_names. extend ( endpoints_node_names) ;
314+
244315 nodes = try_join_all ( node_names. iter ( ) . map ( |node_name| async {
245316 ctx. client
246317 . get :: < Node > ( node_name, & ( ) )
0 commit comments