Skip to content

Commit ef6e0e5

Browse files
authored
feat(source): use EndpointSlices instead of Endpoints for Service (kubernetes-sigs#5493)
* feat(source): use EndpointSlice for Service source * feat(source): use indexer for EndpointSlice listing
1 parent cab4e85 commit ef6e0e5

File tree

7 files changed

+293
-174
lines changed

7 files changed

+293
-174
lines changed

charts/external-dns/templates/clusterrole.yaml

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,10 @@ rules:
1818
{{- end }}
1919
{{- if or (has "service" .Values.sources) (has "contour-httpproxy" .Values.sources) (has "gloo-proxy" .Values.sources) (has "istio-gateway" .Values.sources) (has "istio-virtualservice" .Values.sources) (has "openshift-route" .Values.sources) (has "skipper-routegroup" .Values.sources) }}
2020
- apiGroups: [""]
21-
resources: ["services","endpoints"]
21+
resources: ["services"]
22+
verbs: ["get","watch","list"]
23+
- apiGroups: ["discovery.k8s.io"]
24+
resources: ["endpointslices"]
2225
verbs: ["get","watch","list"]
2326
{{- end }}
2427
{{- if or (has "ingress" .Values.sources) (has "istio-gateway" .Values.sources) (has "istio-virtualservice" .Values.sources) (has "contour-httpproxy" .Values.sources) (has "openshift-route" .Values.sources) (has "skipper-routegroup" .Values.sources) }}

charts/external-dns/tests/rbac_test.yaml

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,10 @@ tests:
4343
resources: ["pods"]
4444
verbs: ["get", "watch", "list"]
4545
- apiGroups: [""]
46-
resources: ["services","endpoints"]
46+
resources: ["services"]
47+
verbs: ["get","watch","list"]
48+
- apiGroups: ["discovery.k8s.io"]
49+
resources: ["endpointslices"]
4750
verbs: ["get","watch","list"]
4851
- apiGroups: ["extensions","networking.k8s.io"]
4952
resources: ["ingresses"]

docs/flags.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
| `--kubeconfig=""` | Retrieve target cluster configuration from a Kubernetes configuration file (default: auto-detect) |
1212
| `--request-timeout=30s` | Request timeout when calling Kubernetes APIs. 0s means no timeout |
1313
| `--[no-]resolve-service-load-balancer-hostname` | Resolve the hostname of LoadBalancer-type Service object to IP addresses in order to create DNS A/AAAA records instead of CNAMEs |
14-
| `--[no-]listen-endpoint-events` | Trigger a reconcile on changes to Endpoints, for Service source (default: false) |
14+
| `--[no-]listen-endpoint-events` | Trigger a reconcile on changes to EndpointSlices, for Service source (default: false) |
1515
| `--cf-api-endpoint=""` | The fully-qualified domain name of the cloud foundry instance you are targeting |
1616
| `--cf-username=""` | The username to log into the cloud foundry API |
1717
| `--cf-password=""` | The password to log into the cloud foundry API |

pkg/apis/externaldns/types.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -438,7 +438,7 @@ func App(cfg *Config) *kingpin.Application {
438438
app.Flag("kubeconfig", "Retrieve target cluster configuration from a Kubernetes configuration file (default: auto-detect)").Default(defaultConfig.KubeConfig).StringVar(&cfg.KubeConfig)
439439
app.Flag("request-timeout", "Request timeout when calling Kubernetes APIs. 0s means no timeout").Default(defaultConfig.RequestTimeout.String()).DurationVar(&cfg.RequestTimeout)
440440
app.Flag("resolve-service-load-balancer-hostname", "Resolve the hostname of LoadBalancer-type Service object to IP addresses in order to create DNS A/AAAA records instead of CNAMEs").BoolVar(&cfg.ResolveServiceLoadBalancerHostname)
441-
app.Flag("listen-endpoint-events", "Trigger a reconcile on changes to Endpoints, for Service source (default: false)").BoolVar(&cfg.ListenEndpointEvents)
441+
app.Flag("listen-endpoint-events", "Trigger a reconcile on changes to EndpointSlices, for Service source (default: false)").BoolVar(&cfg.ListenEndpointEvents)
442442

443443
// Flags related to cloud foundry
444444
app.Flag("cf-api-endpoint", "The fully-qualified domain name of the cloud foundry instance you are targeting").Default(defaultConfig.CFAPIEndpoint).StringVar(&cfg.CFAPIEndpoint)

source/service.go

Lines changed: 79 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,13 @@ import (
2828

2929
log "github.com/sirupsen/logrus"
3030
v1 "k8s.io/api/core/v1"
31+
discoveryv1 "k8s.io/api/discovery/v1"
3132
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
3233
"k8s.io/apimachinery/pkg/labels"
34+
"k8s.io/apimachinery/pkg/types"
3335
kubeinformers "k8s.io/client-go/informers"
3436
coreinformers "k8s.io/client-go/informers/core/v1"
37+
discoveryinformers "k8s.io/client-go/informers/discovery/v1"
3538
"k8s.io/client-go/kubernetes"
3639
"k8s.io/client-go/tools/cache"
3740

@@ -50,6 +53,7 @@ var (
5053
v1.ServiceTypeLoadBalancer: {}, // Exposes the service externally using a cloud provider's load balancer.
5154
v1.ServiceTypeExternalName: {}, // Maps the service to an external DNS name.
5255
}
56+
serviceNameIndexKey = "serviceName"
5357
)
5458

5559
// serviceSource is an implementation of Source for Kubernetes service objects.
@@ -72,7 +76,7 @@ type serviceSource struct {
7276
resolveLoadBalancerHostname bool
7377
listenEndpointEvents bool
7478
serviceInformer coreinformers.ServiceInformer
75-
endpointsInformer coreinformers.EndpointsInformer
79+
endpointSlicesInformer discoveryinformers.EndpointSliceInformer
7680
podInformer coreinformers.PodInformer
7781
nodeInformer coreinformers.NodeInformer
7882
serviceTypeFilter *serviceTypes
@@ -93,7 +97,7 @@ func NewServiceSource(ctx context.Context, kubeClient kubernetes.Interface, name
9397
// Set the resync period to 0 to prevent processing when nothing has changed
9498
informerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(kubeClient, 0, kubeinformers.WithNamespace(namespace))
9599
serviceInformer := informerFactory.Core().V1().Services()
96-
endpointsInformer := informerFactory.Core().V1().Endpoints()
100+
endpointSlicesInformer := informerFactory.Discovery().V1().EndpointSlices()
97101
podInformer := informerFactory.Core().V1().Pods()
98102
nodeInformer := informerFactory.Core().V1().Nodes()
99103

@@ -104,7 +108,7 @@ func NewServiceSource(ctx context.Context, kubeClient kubernetes.Interface, name
104108
},
105109
},
106110
)
107-
endpointsInformer.Informer().AddEventHandler(
111+
endpointSlicesInformer.Informer().AddEventHandler(
108112
cache.ResourceEventHandlerFuncs{
109113
AddFunc: func(obj interface{}) {
110114
},
@@ -123,6 +127,26 @@ func NewServiceSource(ctx context.Context, kubeClient kubernetes.Interface, name
123127
},
124128
)
125129

130+
// Add an indexer to the EndpointSlice informer to index by the service name label
131+
err = endpointSlicesInformer.Informer().AddIndexers(cache.Indexers{
132+
serviceNameIndexKey: func(obj any) ([]string, error) {
133+
endpointSlice, ok := obj.(*discoveryv1.EndpointSlice)
134+
if !ok {
135+
// This should never happen because the Informer should only contain EndpointSlice objects
136+
return nil, fmt.Errorf("expected %T but got %T instead", endpointSlice, obj)
137+
}
138+
serviceName := endpointSlice.Labels[discoveryv1.LabelServiceName]
139+
if serviceName == "" {
140+
return nil, nil
141+
}
142+
key := types.NamespacedName{Namespace: endpointSlice.Namespace, Name: serviceName}.String()
143+
return []string{key}, nil
144+
},
145+
})
146+
if err != nil {
147+
return nil, err
148+
}
149+
126150
informerFactory.Start(ctx.Done())
127151

128152
// wait for the local cache to be populated.
@@ -148,7 +172,7 @@ func NewServiceSource(ctx context.Context, kubeClient kubernetes.Interface, name
148172
publishHostIP: publishHostIP,
149173
alwaysPublishNotReadyAddresses: alwaysPublishNotReadyAddresses,
150174
serviceInformer: serviceInformer,
151-
endpointsInformer: endpointsInformer,
175+
endpointSlicesInformer: endpointSlicesInformer,
152176
podInformer: podInformer,
153177
nodeInformer: nodeInformer,
154178
serviceTypeFilter: sTypesFilter,
@@ -278,42 +302,63 @@ func (sc *serviceSource) extractHeadlessEndpoints(svc *v1.Service, hostname stri
278302
return nil
279303
}
280304

281-
endpointsObject, err := sc.endpointsInformer.Lister().Endpoints(svc.Namespace).Get(svc.GetName())
305+
serviceKey := cache.ObjectName{Namespace: svc.Namespace, Name: svc.Name}.String()
306+
rawEndpointSlices, err := sc.endpointSlicesInformer.Informer().GetIndexer().ByIndex(serviceNameIndexKey, serviceKey)
282307
if err != nil {
283-
log.Errorf("Get endpoints of service[%s] error:%v", svc.GetName(), err)
284-
return endpoints
308+
// Should never happen as long as the index exists
309+
log.Errorf("Get EndpointSlices of service[%s] error:%v", svc.GetName(), err)
310+
return nil
311+
}
312+
313+
endpointSlices := make([]*discoveryv1.EndpointSlice, 0, len(rawEndpointSlices))
314+
for _, obj := range rawEndpointSlices {
315+
endpointSlice, ok := obj.(*discoveryv1.EndpointSlice)
316+
if !ok {
317+
// Should never happen as the indexer can only contain EndpointSlice objects
318+
log.Errorf("Expected %T but got %T instead, skipping", endpointSlice, obj)
319+
continue
320+
}
321+
endpointSlices = append(endpointSlices, endpointSlice)
285322
}
286323

287324
pods, err := sc.podInformer.Lister().Pods(svc.Namespace).List(selector)
288325
if err != nil {
289-
log.Errorf("List pods of service[%s] error: %v", svc.GetName(), err)
326+
log.Errorf("List Pods of service[%s] error:%v", svc.GetName(), err)
290327
return endpoints
291328
}
292329

293330
endpointsType := getEndpointsTypeFromAnnotations(svc.Annotations)
331+
publishPodIPs := endpointsType != EndpointsTypeNodeExternalIP && endpointsType != EndpointsTypeHostIP && !sc.publishHostIP
332+
publishNotReadyAddresses := svc.Spec.PublishNotReadyAddresses || sc.alwaysPublishNotReadyAddresses
294333

295334
targetsByHeadlessDomainAndType := make(map[endpoint.EndpointKey]endpoint.Targets)
296-
for _, subset := range endpointsObject.Subsets {
297-
addresses := subset.Addresses
298-
if svc.Spec.PublishNotReadyAddresses || sc.alwaysPublishNotReadyAddresses {
299-
addresses = append(addresses, subset.NotReadyAddresses...)
300-
}
335+
for _, endpointSlice := range endpointSlices {
336+
for _, ep := range endpointSlice.Endpoints {
337+
if !conditionToBool(ep.Conditions.Ready) && !publishNotReadyAddresses {
338+
continue
339+
}
340+
341+
if publishPodIPs &&
342+
endpointSlice.AddressType != discoveryv1.AddressTypeIPv4 &&
343+
endpointSlice.AddressType != discoveryv1.AddressTypeIPv6 {
344+
log.Debugf("Skipping EndpointSlice %s/%s because its address type is unsupported: %s", endpointSlice.Namespace, endpointSlice.Name, endpointSlice.AddressType)
345+
continue
346+
}
301347

302-
for _, address := range addresses {
303348
// find pod for this address
304-
if address.TargetRef == nil || address.TargetRef.APIVersion != "" || address.TargetRef.Kind != "Pod" {
305-
log.Debugf("Skipping address because its target is not a pod: %v", address)
349+
if ep.TargetRef == nil || ep.TargetRef.APIVersion != "" || ep.TargetRef.Kind != "Pod" {
350+
log.Debugf("Skipping address because its target is not a pod: %v", ep)
306351
continue
307352
}
308353
var pod *v1.Pod
309354
for _, v := range pods {
310-
if v.Name == address.TargetRef.Name {
355+
if v.Name == ep.TargetRef.Name {
311356
pod = v
312357
break
313358
}
314359
}
315360
if pod == nil {
316-
log.Errorf("Pod %s not found for address %v", address.TargetRef.Name, address)
361+
log.Errorf("Pod %s not found for address %v", ep.TargetRef.Name, ep)
317362
continue
318363
}
319364

@@ -341,8 +386,13 @@ func (sc *serviceSource) extractHeadlessEndpoints(svc *v1.Service, hostname stri
341386
targets = endpoint.Targets{pod.Status.HostIP}
342387
log.Debugf("Generating matching endpoint %s with HostIP %s", headlessDomain, pod.Status.HostIP)
343388
} else {
344-
targets = endpoint.Targets{address.IP}
345-
log.Debugf("Generating matching endpoint %s with EndpointAddress IP %s", headlessDomain, address.IP)
389+
if len(ep.Addresses) == 0 {
390+
log.Warnf("EndpointSlice %s/%s has no addresses for endpoint %v", endpointSlice.Namespace, endpointSlice.Name, ep)
391+
continue
392+
}
393+
address := ep.Addresses[0] // Only use the first address, as additional addresses have no semantic defined
394+
targets = endpoint.Targets{address}
395+
log.Debugf("Generating matching endpoint %s with EndpointSliceAddress IP %s", headlessDomain, address)
346396
}
347397
}
348398
for _, target := range targets {
@@ -758,7 +808,7 @@ func (sc *serviceSource) AddEventHandler(_ context.Context, handler func()) {
758808
// https://github.com/kubernetes/kubernetes/issues/79610
759809
sc.serviceInformer.Informer().AddEventHandler(eventHandlerFunc(handler))
760810
if sc.listenEndpointEvents {
761-
sc.endpointsInformer.Informer().AddEventHandler(eventHandlerFunc(handler))
811+
sc.endpointSlicesInformer.Informer().AddEventHandler(eventHandlerFunc(handler))
762812
}
763813
}
764814

@@ -789,3 +839,11 @@ func newServiceTypesFilter(filter []string) (*serviceTypes, error) {
789839
types: types,
790840
}, nil
791841
}
842+
843+
// conditionToBool converts an EndpointConditions condition to a bool value.
844+
func conditionToBool(v *bool) bool {
845+
if v == nil {
846+
return true // nil should be interpreted as "true" as per EndpointConditions spec
847+
}
848+
return *v
849+
}

0 commit comments

Comments
 (0)