Skip to content

Commit 415805c

Browse files
committed
Migrate to EndpointSlices
1 parent 19eb4ec commit 415805c

File tree

4 files changed

+82
-103
lines changed

4 files changed

+82
-103
lines changed

README.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,10 @@ kind: Service
2121
metadata:
2222
name: kubernetes-vip
2323
namespace: default
24+
annotations:
25+
endpoint-copier/enabled: "true"
26+
endpoint-copier/default-service-name: "kubernetes"
27+
endpoint-copier/default-service-namespace: "default"
2428
spec:
2529
internalTrafficPolicy: Cluster
2630
ipFamilies:

cmd/main.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ func main() {
122122
os.Exit(1)
123123
}
124124

125-
if err = (&controller.EndpointsReconciler{
125+
if err = (&controller.EndpointSliceReconciler{
126126
Client: mgr.GetClient(),
127127
Scheme: mgr.GetScheme(),
128128
DefaultEndpointName: defaultEndpointName,

helm/endpoint-copier-operator/templates/rbac/role.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,9 @@ metadata:
77
name: {{ include "endpoint-copier-operator.fullname" . }}
88
rules:
99
- apiGroups:
10-
- ""
10+
- "discovery.k8s.io"
1111
resources:
12-
- endpoints
12+
- endpointslices
1313
verbs:
1414
- create
1515
- delete

internal/controller/endpoints_controller.go

Lines changed: 75 additions & 100 deletions
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,15 @@
1-
/*
2-
Copyright 2023.
3-
4-
Licensed under the Apache License, Version 2.0 (the "License");
5-
you may not use this file except in compliance with the License.
6-
You may obtain a copy of the License at
7-
8-
http://www.apache.org/licenses/LICENSE-2.0
9-
10-
Unless required by applicable law or agreed to in writing, software
11-
distributed under the License is distributed on an "AS IS" BASIS,
12-
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13-
See the License for the specific language governing permissions and
14-
limitations under the License.
15-
*/
16-
171
package controller
182

193
import (
204
"context"
215

226
"github.com/go-logr/logr"
237
corev1 "k8s.io/api/core/v1"
8+
discoveryv1 "k8s.io/api/discovery/v1"
249
apierrors "k8s.io/apimachinery/pkg/api/errors"
10+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2511
"k8s.io/apimachinery/pkg/runtime"
12+
"k8s.io/apimachinery/pkg/runtime/schema"
2613
"k8s.io/apimachinery/pkg/util/intstr"
2714
ctrl "sigs.k8s.io/controller-runtime"
2815
"sigs.k8s.io/controller-runtime/pkg/builder"
@@ -33,17 +20,13 @@ import (
3320
)
3421

3522
var (
36-
// Name is the name of the operator
37-
Name = "endpoint-copier-operator"
38-
39-
// Annotation used on services to enable endpoints syncing
23+
Name = "endpoint-copier-operator"
4024
ServiceAnnotationEnabled = "endpoint-copier/enabled"
4125
AnnotationDefaultServiceName = "endpoint-copier/default-service-name"
4226
AnnotationDefaultServiceNamespace = "endpoint-copier/default-service-namespace"
4327
)
4428

45-
// EndpointsReconciler reconciles a Endpoints object
46-
type EndpointsReconciler struct {
29+
type EndpointSliceReconciler struct {
4730
client.Client
4831
Scheme *runtime.Scheme
4932
DefaultEndpointName string
@@ -54,23 +37,35 @@ type EndpointsReconciler struct {
5437
ApiserverProtocol string
5538
}
5639

57-
// Reconcile is part of the main kubernetes reconciliation loop which aims to
58-
// move the current state of the cluster closer to the desired state.
59-
// TODO(user): Modify the Reconcile function to compare the state specified by
60-
// the Endpoints object against the actual cluster state, and then
61-
// perform operations to make the cluster state reflect the state specified by
62-
// the user.
63-
//
64-
// For more details, check Reconcile and its Result here:
65-
// - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.15.0/pkg/reconcile
66-
func (r *EndpointsReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
67-
logger := ctrl.Log.WithName("endpoints")
68-
69-
// Fetch the Service that triggered the reconcile
40+
func (r *EndpointSliceReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
41+
logger := ctrl.Log.WithName("endpointslice")
42+
7043
svc := &corev1.Service{}
7144
if err := r.Get(ctx, req.NamespacedName, svc); err != nil {
7245
if apierrors.IsNotFound(err) {
73-
logger.Info("Service not found", "name", req.Name, "namespace", req.Namespace)
46+
logger.Info("Service not found, cleaning up EndpointSlices", "name", req.Name, "namespace", req.Namespace)
47+
48+
var slices discoveryv1.EndpointSliceList
49+
err := r.List(ctx, &slices, client.InNamespace(req.Namespace), client.MatchingLabels{
50+
"kubernetes.io/service-name": req.Name,
51+
})
52+
if err != nil {
53+
logger.Error(err, "Failed to list EndpointSlices for cleanup")
54+
return ctrl.Result{}, err
55+
}
56+
57+
for _, slice := range slices.Items {
58+
// Optional: only delete slices managed by your controller
59+
if slice.Labels["endpoint-copier/source"] != "" {
60+
if err := r.Delete(ctx, &slice); err != nil && !apierrors.IsNotFound(err) {
61+
logger.Error(err, "Failed to delete EndpointSlice", "name", slice.Name)
62+
// Don't return yet; continue trying to clean up others
63+
} else {
64+
logger.Info("Deleted EndpointSlice", "name", slice.Name)
65+
}
66+
}
67+
}
68+
7469
return ctrl.Result{}, nil
7570
}
7671
logger.Error(err, "Failed to get Service")
@@ -84,70 +79,52 @@ func (r *EndpointsReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
8479
var defaultServiceName, defaultServiceNamespace string
8580

8681
if enabled {
87-
// If annotation enabled: The managed service is the current Service itself
8882
managedServiceName = svc.Name
8983
managedServiceNamespace = svc.Namespace
90-
91-
// The default service is read from the annotations on the managed service
9284
defaultServiceName = annotations[AnnotationDefaultServiceName]
9385
defaultServiceNamespace = annotations[AnnotationDefaultServiceNamespace]
94-
95-
logger.Info("Annotation enabled: using dynamic managed and default services",
96-
"managedServiceName", managedServiceName, "managedServiceNamespace", managedServiceNamespace,
97-
"defaultServiceName", defaultServiceName, "defaultServiceNamespace", defaultServiceNamespace)
9886
} else {
99-
// Legacy mode fallback - use configured fixed names and namespaces
10087
managedServiceName = r.ManagedEndpointName
10188
managedServiceNamespace = r.ManagedEndpointNamespace
102-
10389
defaultServiceName = r.DefaultEndpointName
10490
defaultServiceNamespace = r.DefaultEndpointNamespace
105-
106-
logger.Info("Annotation not enabled, using legacy static configuration — this behavior is DEPRECATED",
107-
"managedServiceName", managedServiceName, "managedServiceNamespace", managedServiceNamespace,
108-
"defaultServiceName", defaultServiceName, "defaultServiceNamespace", defaultServiceNamespace)
10991
}
11092

111-
// Get the managed Service object
11293
managedService := &corev1.Service{}
11394
if err := r.Get(ctx, client.ObjectKey{Namespace: managedServiceNamespace, Name: managedServiceName}, managedService); err != nil {
11495
if apierrors.IsNotFound(err) {
115-
logger.Info("Managed Service not found", "name", managedServiceName, "namespace", managedServiceNamespace)
96+
logger.Info("Managed Service not found")
11697
return ctrl.Result{}, nil
11798
}
118-
logger.Error(err, "Error getting managed service")
11999
return ctrl.Result{}, err
120100
}
121101

122-
// Get the default Endpoints object
123-
endpoints := &corev1.Endpoints{}
124-
if err := r.Get(ctx, client.ObjectKey{Namespace: defaultServiceNamespace, Name: defaultServiceName}, endpoints); err != nil {
102+
var slices discoveryv1.EndpointSliceList
103+
err := r.List(ctx, &slices, client.MatchingLabels{"kubernetes.io/service-name": defaultServiceName}, client.InNamespace(defaultServiceNamespace))
104+
if err != nil {
125105
return ctrl.Result{}, err
126106
}
127107

128-
// Sync endpoints from default to managed service
129-
if err := r.syncEndpoints(ctx, logger, endpoints, managedService); err != nil {
130-
logger.Error(err, "error syncing endpoints")
108+
if err := r.syncEndpointSlices(ctx, logger, slices.Items, managedService); err != nil {
109+
logger.Error(err, "Error syncing endpoint slices")
131110
return ctrl.Result{}, err
132111
}
133112

134-
logger.Info("Successfully updated endpoint", "name", managedServiceName, "namespace", managedServiceNamespace)
135-
113+
logger.Info("Successfully updated endpoint slices", "name", managedServiceName, "namespace", managedServiceNamespace)
136114
return ctrl.Result{}, nil
137115
}
138116

139-
// SetupWithManager sets up the controller with the Manager.
140-
func (r *EndpointsReconciler) SetupWithManager(mgr ctrl.Manager) error {
117+
func (r *EndpointSliceReconciler) SetupWithManager(mgr ctrl.Manager) error {
141118
return ctrl.NewControllerManagedBy(mgr).
142-
For(&corev1.Endpoints{}, builder.WithPredicates(predicate.Funcs{
119+
For(&discoveryv1.EndpointSlice{}, builder.WithPredicates(predicate.Funcs{
143120
CreateFunc: func(e event.CreateEvent) bool {
144-
return e.Object.GetNamespace() == r.DefaultEndpointNamespace && e.Object.GetName() == r.DefaultEndpointName
121+
return e.Object.GetLabels()["kubernetes.io/service-name"] == r.DefaultEndpointName && e.Object.GetNamespace() == r.DefaultEndpointNamespace
145122
},
146123
UpdateFunc: func(e event.UpdateEvent) bool {
147-
return e.ObjectOld.GetNamespace() == r.DefaultEndpointNamespace && e.ObjectOld.GetName() == r.DefaultEndpointName
124+
return e.ObjectOld.GetLabels()["kubernetes.io/service-name"] == r.DefaultEndpointName && e.ObjectOld.GetNamespace() == r.DefaultEndpointNamespace
148125
},
149126
DeleteFunc: func(e event.DeleteEvent) bool {
150-
return e.Object.GetNamespace() == r.DefaultEndpointNamespace && e.Object.GetName() == r.DefaultEndpointName
127+
return e.Object.GetLabels()["kubernetes.io/service-name"] == r.DefaultEndpointName && e.Object.GetNamespace() == r.DefaultEndpointNamespace
151128
},
152129
})).
153130
Watches(&corev1.Service{}, &handler.EnqueueRequestForObject{}, builder.WithPredicates(predicate.Funcs{
@@ -163,52 +140,50 @@ func (r *EndpointsReconciler) SetupWithManager(mgr ctrl.Manager) error {
163140
})).Complete(r)
164141
}
165142

166-
// syncEndpoint updates the Endpoint resource with the current node IPs.
167-
func (r *EndpointsReconciler) syncEndpoints(ctx context.Context, logger logr.Logger, defaultEndpoints *corev1.Endpoints, managedService *corev1.Service) error {
168-
managedEndpoints := &corev1.Endpoints{}
169-
managedEndpoints.ObjectMeta.Name = managedService.Name
170-
managedEndpoints.ObjectMeta.Namespace = managedService.Namespace
171-
managedEndpoints.ObjectMeta.Labels = map[string]string{"endpointslice.kubernetes.io/managed-by": Name}
172-
173-
managedEndpoints.Subsets = []corev1.EndpointSubset{}
174-
for _, subset := range defaultEndpoints.Subsets {
175-
var copiedPorts []corev1.EndpointPort
143+
func (r *EndpointSliceReconciler) syncEndpointSlices(ctx context.Context, logger logr.Logger, sourceSlices []discoveryv1.EndpointSlice, managedService *corev1.Service) error {
144+
for _, src := range sourceSlices {
145+
copiedPorts := []discoveryv1.EndpointPort{}
176146
for _, port := range managedService.Spec.Ports {
177-
var portNumber int32
147+
portNum := port.Port
178148
if port.TargetPort.Type == intstr.Int {
179-
portNumber = port.TargetPort.IntVal
180-
} else {
181-
portNumber = port.Port
182-
}
183-
endpointPort := corev1.EndpointPort{
184-
Name: port.Name,
185-
Port: portNumber,
186-
Protocol: port.Protocol,
149+
portNum = port.TargetPort.IntVal
187150
}
188-
copiedPorts = append(copiedPorts, endpointPort)
151+
copiedPorts = append(copiedPorts, discoveryv1.EndpointPort{
152+
Name: &port.Name,
153+
Port: &portNum,
154+
Protocol: &port.Protocol,
155+
})
189156
}
190157

191-
// Copy the addresses
192-
copiedAddresses := make([]corev1.EndpointAddress, len(subset.Addresses))
193-
copy(copiedAddresses, subset.Addresses)
194-
195-
newSubset := corev1.EndpointSubset{
196-
Addresses: copiedAddresses,
197-
Ports: copiedPorts,
158+
newSlice := &discoveryv1.EndpointSlice{
159+
ObjectMeta: metav1.ObjectMeta{
160+
Name: managedService.Name,
161+
Namespace: managedService.Namespace,
162+
Labels: map[string]string{
163+
"kubernetes.io/service-name": managedService.Name,
164+
"endpoint-copier/source": src.Name,
165+
},
166+
},
167+
AddressType: src.AddressType,
168+
Endpoints: src.Endpoints,
169+
Ports: copiedPorts,
198170
}
199171

200-
managedEndpoints.Subsets = append(managedEndpoints.Subsets, newSubset)
201-
}
172+
newSlice.SetGroupVersionKind(schema.GroupVersionKind{
173+
Group: "discovery.k8s.io",
174+
Version: "v1",
175+
Kind: "EndpointSlice",
176+
})
202177

203-
// Update the custom Endpoints resource with the updated IP addresses.
204-
if err := r.Update(ctx, managedEndpoints); err != nil {
205-
return err
178+
// Upsert logic
179+
err := r.Patch(ctx, newSlice, client.Apply, client.ForceOwnership, client.FieldOwner(Name))
180+
if err != nil {
181+
logger.Error(err, "Failed to patch EndpointSlice", "name", newSlice.Name)
182+
}
206183
}
207-
208184
return nil
209185
}
210186

211-
// helper func to check annotation
212187
func hasEndpointCopierEnabledAnnotation(obj client.Object) bool {
213188
annotations := obj.GetAnnotations()
214189
if annotations == nil {

0 commit comments

Comments
 (0)