@@ -21,11 +21,13 @@ import (
2121
2222 "github.com/go-logr/logr"
2323 corev1 "k8s.io/api/core/v1"
24+ apierrors "k8s.io/apimachinery/pkg/api/errors"
2425 "k8s.io/apimachinery/pkg/runtime"
2526 ctrl "sigs.k8s.io/controller-runtime"
2627 "sigs.k8s.io/controller-runtime/pkg/builder"
2728 "sigs.k8s.io/controller-runtime/pkg/client"
2829 "sigs.k8s.io/controller-runtime/pkg/event"
30+ "sigs.k8s.io/controller-runtime/pkg/handler"
2931 "sigs.k8s.io/controller-runtime/pkg/predicate"
3032 "sigs.k8s.io/controller-runtime/pkg/reconcile"
3133)
@@ -58,15 +60,28 @@ type EndpointsReconciler struct {
5860// - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.15.0/pkg/reconcile
5961func (r * EndpointsReconciler ) Reconcile (ctx context.Context , req ctrl.Request ) (ctrl.Result , error ) {
6062 logger := ctrl .Log .WithName ("endpoints" )
63+ // Check if managed service is created
64+ managedService := & corev1.Service {}
65+ err := r .Get (ctx , client.ObjectKey {Namespace : r .ManagedEndpointNamespace , Name : r .ManagedEndpointName }, managedService )
66+ if err != nil {
67+ if apierrors .IsNotFound (err ) {
68+ logger .Info ("Service is missing" , "name" , r .ManagedEndpointName , "namespace" , r .ManagedEndpointNamespace )
69+ } else {
70+ logger .Error (err , "Could not get service" , "name" , r .ManagedEndpointName , "namespace" , r .ManagedEndpointNamespace )
71+ return reconcile.Result {}, err
72+ }
73+
74+ return reconcile.Result {}, nil
75+ }
6176
62- // Fetch the Endpoints object
77+ // Fetch default Endpoints object
6378 endpoints := & corev1.Endpoints {}
64- if err := r .Get (ctx , req . NamespacedName , endpoints ); err != nil {
65- return reconcile.Result {}, client . IgnoreNotFound ( err )
79+ if err := r .Get (ctx , client. ObjectKey { Namespace : r . DefaultEndpointNamespace , Name : r . DefaultEndpointName } , endpoints ); err != nil {
80+ return reconcile.Result {}, err
6681 }
6782
6883 // update the endpoints
69- if err := r .syncEndpoints (ctx , logger , endpoints ); err != nil {
84+ if err := r .syncEndpoints (ctx , logger , endpoints , managedService ); err != nil {
7085 logger .Error (err , "error syncing endpoint" )
7186 return reconcile.Result {}, err
7287 }
@@ -90,21 +105,48 @@ func (r *EndpointsReconciler) SetupWithManager(mgr ctrl.Manager) error {
90105 return e .Object .GetNamespace () == r .DefaultEndpointNamespace && e .Object .GetName () == r .DefaultEndpointName
91106 },
92107 })).
93- Complete (r )
108+ Watches (& corev1.Service {}, & handler.EnqueueRequestForObject {}, builder .WithPredicates (predicate.Funcs {
109+ CreateFunc : func (e event.CreateEvent ) bool {
110+ return e .Object .GetNamespace () == r .ManagedEndpointNamespace && e .Object .GetName () == r .ManagedEndpointName
111+ },
112+ UpdateFunc : func (e event.UpdateEvent ) bool {
113+ return e .ObjectOld .GetNamespace () == r .ManagedEndpointNamespace && e .ObjectOld .GetName () == r .ManagedEndpointName
114+ },
115+ DeleteFunc : func (e event.DeleteEvent ) bool {
116+ return e .Object .GetNamespace () == r .ManagedEndpointNamespace && e .Object .GetName () == r .ManagedEndpointName
117+ },
118+ })).Complete (r )
94119}
95120
96121// syncEndpoint updates the Endpoint resource with the current node IPs.
97- func (r * EndpointsReconciler ) syncEndpoints (ctx context.Context , logger logr.Logger , defaultEndpoints * corev1.Endpoints ) error {
122+ func (r * EndpointsReconciler ) syncEndpoints (ctx context.Context , logger logr.Logger , defaultEndpoints * corev1.Endpoints , managedService * corev1. Service ) error {
98123 managedEndpoints := & corev1.Endpoints {}
99124 managedEndpoints .ObjectMeta .Name = r .ManagedEndpointName
100125 managedEndpoints .ObjectMeta .Namespace = r .ManagedEndpointNamespace
101126 managedEndpoints .ObjectMeta .Labels = map [string ]string {"endpointslice.kubernetes.io/managed-by" : Name }
102127
103- // Copy only subset addresses without the ports
104128 managedEndpoints .Subsets = []corev1.EndpointSubset {}
105129 for _ , subset := range defaultEndpoints .Subsets {
106- subset .Ports = []corev1.EndpointPort {{Port : int32 (r .ApiserverPort ), Protocol : corev1 .Protocol (r .ApiserverProtocol )}}
107- managedEndpoints .Subsets = append (managedEndpoints .Subsets , subset )
130+ var copiedPorts []corev1.EndpointPort
131+ for _ , port := range managedService .Spec .Ports {
132+ endpointPort := corev1.EndpointPort {
133+ Name : port .Name ,
134+ Port : port .Port ,
135+ Protocol : port .Protocol ,
136+ }
137+ copiedPorts = append (copiedPorts , endpointPort )
138+ }
139+
140+ // Copy the addresses
141+ copiedAddresses := make ([]corev1.EndpointAddress , len (subset .Addresses ))
142+ copy (copiedAddresses , subset .Addresses )
143+
144+ newSubset := corev1.EndpointSubset {
145+ Addresses : copiedAddresses ,
146+ Ports : copiedPorts ,
147+ }
148+
149+ managedEndpoints .Subsets = append (managedEndpoints .Subsets , newSubset )
108150 }
109151
110152 // Update the custom Endpoints resource with the updated IP addresses.
0 commit comments