@@ -9,7 +9,8 @@ use futures::{
99} ;
1010use snafu:: { OptionExt , ResultExt , Snafu } ;
1111use stackable_operator:: {
12- builder:: meta:: OwnerReferenceBuilder ,
12+ builder:: meta:: ObjectMetaBuilder ,
13+ cluster_resources:: { ClusterResourceApplyStrategy , ClusterResources } ,
1314 commons:: listener:: {
1415 AddressType , Listener , ListenerClass , ListenerIngress , ListenerPort , ListenerSpec ,
1516 ListenerStatus , ServiceType ,
@@ -21,19 +22,23 @@ use stackable_operator::{
2122 kube:: {
2223 api:: { DynamicObject , ObjectMeta } ,
2324 runtime:: { controller, reflector:: ObjectRef , watcher} ,
24- ResourceExt ,
25+ Resource , ResourceExt ,
2526 } ,
27+ kvp:: Labels ,
2628 logging:: controller:: { report_controller_reconciled, ReconcilerError } ,
2729 time:: Duration ,
2830} ;
2931use strum:: IntoStaticStr ;
3032
31- use crate :: { csi_server:: node:: NODE_TOPOLOGY_LABEL_HOSTNAME , utils:: node_primary_address} ;
33+ use crate :: {
34+ csi_server:: node:: NODE_TOPOLOGY_LABEL_HOSTNAME , utils:: node_primary_address, APP_NAME ,
35+ OPERATOR_KEY ,
36+ } ;
3237
3338#[ cfg( doc) ]
3439use stackable_operator:: k8s_openapi:: api:: core:: v1:: Pod ;
3540
36- const FIELD_MANAGER_SCOPE : & str = "listener" ;
41+ const CONTROLLER_NAME : & str = "listener" ;
3742
3843pub async fn run ( client : stackable_operator:: client:: Client ) {
3944 let controller =
@@ -112,6 +117,11 @@ pub enum Error {
112117 #[ snafu( display( "object has no name" ) ) ]
113118 NoName ,
114119
120+ #[ snafu( display( "failed to create cluster resources" ) ) ]
121+ CreateClusterResources {
122+ source : stackable_operator:: cluster_resources:: Error ,
123+ } ,
124+
115125 #[ snafu( display( "object has no ListenerClass (.spec.class_name)" ) ) ]
116126 NoListenerClass ,
117127
@@ -130,6 +140,16 @@ pub enum Error {
130140 source : stackable_operator:: client:: Error ,
131141 } ,
132142
143+ #[ snafu( display( "failed to validate labels passed through from Listener" ) ) ]
144+ ValidateListenerLabels {
145+ source : stackable_operator:: kvp:: LabelError ,
146+ } ,
147+
148+ #[ snafu( display( "failed to build cluster resource labels" ) ) ]
149+ BuildClusterResourcesLabels {
150+ source : stackable_operator:: kvp:: LabelError ,
151+ } ,
152+
133153 #[ snafu( display( "failed to get {obj}" ) ) ]
134154 GetObject {
135155 source : stackable_operator:: client:: Error ,
@@ -143,10 +163,15 @@ pub enum Error {
143163
144164 #[ snafu( display( "failed to apply {svc}" ) ) ]
145165 ApplyService {
146- source : stackable_operator:: client :: Error ,
166+ source : stackable_operator:: cluster_resources :: Error ,
147167 svc : ObjectRef < Service > ,
148168 } ,
149169
170+ #[ snafu( display( "failed to delete orphaned resources" ) ) ]
171+ DeleteOrphans {
172+ source : stackable_operator:: cluster_resources:: Error ,
173+ } ,
174+
150175 #[ snafu( display( "failed to apply status for Listener" ) ) ]
151176 ApplyStatus {
152177 source : stackable_operator:: client:: Error ,
@@ -162,19 +187,33 @@ impl ReconcilerError for Error {
162187 match self {
163188 Self :: NoNs => None ,
164189 Self :: NoName => None ,
190+ Self :: CreateClusterResources { source : _ } => None ,
165191 Self :: NoListenerClass => None ,
166192 Self :: ListenerPvSelector { source : _ } => None ,
167193 Self :: ListenerPodSelector { source : _ } => None ,
168194 Self :: GetListenerPvs { source : _ } => None ,
195+ Self :: ValidateListenerLabels { source : _ } => None ,
196+ Self :: BuildClusterResourcesLabels { source : _ } => None ,
169197 Self :: GetObject { source : _, obj } => Some ( obj. clone ( ) ) ,
170198 Self :: BuildListenerOwnerRef { .. } => None ,
171199 Self :: ApplyService { source : _, svc } => Some ( svc. clone ( ) . erase ( ) ) ,
200+ Self :: DeleteOrphans { source : _ } => None ,
172201 Self :: ApplyStatus { source : _ } => None ,
173202 }
174203 }
175204}
176205
177206pub async fn reconcile ( listener : Arc < Listener > , ctx : Arc < Ctx > ) -> Result < controller:: Action > {
207+ let mut cluster_resources = ClusterResources :: new (
208+ APP_NAME ,
209+ OPERATOR_KEY ,
210+ CONTROLLER_NAME ,
211+ & listener. object_ref ( & ( ) ) ,
212+ // Listeners don't currently support pausing
213+ ClusterResourceApplyStrategy :: Default ,
214+ )
215+ . context ( CreateClusterResourcesSnafu ) ?;
216+
178217 let ns = listener. metadata . namespace . as_deref ( ) . context ( NoNsSnafu ) ?;
179218 let listener_class_name = listener
180219 . spec
@@ -228,17 +267,29 @@ pub async fn reconcile(listener: Arc<Listener>, ctx: Arc<Ctx>) -> Result<control
228267 } ;
229268
230269 let svc = Service {
231- metadata : ObjectMeta {
232- namespace : Some ( ns. to_string ( ) ) ,
233- name : Some ( svc_name. clone ( ) ) ,
234- owner_references : Some ( vec ! [ OwnerReferenceBuilder :: new( )
235- . initialize_from_resource( & * listener)
236- . build( )
237- . context( BuildListenerOwnerRefSnafu ) ?] ) ,
238- // Propagate the labels from the Listener object to the Service object, so it can be found easier
239- labels : listener. metadata . labels . clone ( ) ,
240- ..Default :: default ( )
241- } ,
270+ metadata : ObjectMetaBuilder :: new ( )
271+ . namespace ( ns)
272+ . name ( & svc_name)
273+ . ownerreference_from_resource ( & * listener, Some ( true ) , Some ( true ) )
274+ . context ( BuildListenerOwnerRefSnafu ) ?
275+ . with_labels (
276+ Labels :: try_from (
277+ listener
278+ . metadata
279+ . labels
280+ . as_ref ( )
281+ . unwrap_or ( & BTreeMap :: new ( ) ) ,
282+ )
283+ . context ( ValidateListenerLabelsSnafu ) ?,
284+ )
285+ . with_labels (
286+ cluster_resources
287+ // Not using Labels::recommended, since it carries a bunch of extra information that is
288+ // only relevant for stacklets (such as rolegroups and product versions).
289+ . get_required_labels ( )
290+ . context ( BuildClusterResourcesLabelsSnafu ) ?,
291+ )
292+ . build ( ) ,
242293 spec : Some ( ServiceSpec {
243294 // We explicitly match here and do not implement `ToString` as there might be more (non vanilla k8s Service
244295 // types) in the future.
@@ -260,13 +311,11 @@ pub async fn reconcile(listener: Arc<Listener>, ctx: Arc<Ctx>) -> Result<control
260311 } ) ,
261312 ..Default :: default ( )
262313 } ;
263- let svc = ctx
264- . client
265- . apply_patch ( FIELD_MANAGER_SCOPE , & svc , & svc)
314+ let svc_ref = ObjectRef :: from_obj ( & svc ) ;
315+ let svc = cluster_resources
316+ . add ( & ctx . client , svc)
266317 . await
267- . with_context ( |_| ApplyServiceSnafu {
268- svc : ObjectRef :: from_obj ( & svc) ,
269- } ) ?;
318+ . context ( ApplyServiceSnafu { svc : svc_ref } ) ?;
270319
271320 let nodes: Vec < Node > ;
272321 let addresses: Vec < ( & str , AddressType ) > ;
@@ -363,8 +412,14 @@ pub async fn reconcile(listener: Arc<Listener>, ctx: Arc<Ctx>) -> Result<control
363412 ) ,
364413 node_ports : ( listener_class. spec . service_type == ServiceType :: NodePort ) . then_some ( ports) ,
365414 } ;
415+
416+ cluster_resources
417+ . delete_orphaned_resources ( & ctx. client )
418+ . await
419+ . context ( DeleteOrphansSnafu ) ?;
420+
366421 ctx. client
367- . apply_patch_status ( FIELD_MANAGER_SCOPE , & listener_status_meta, & listener_status)
422+ . apply_patch_status ( CONTROLLER_NAME , & listener_status_meta, & listener_status)
368423 . await
369424 . context ( ApplyStatusSnafu ) ?;
370425
0 commit comments