Skip to content

Commit 2fa93da

Browse files
feat: use endpointslices provider and listener
1 parent 8032ef1 commit 2fa93da

File tree

9 files changed

+152
-115
lines changed

9 files changed

+152
-115
lines changed

comp/core/autodiscovery/common/utils/prometheus_apiserver.go

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"github.com/DataDog/datadog-agent/comp/core/autodiscovery/providers/names"
1414
"github.com/DataDog/datadog-agent/pkg/util/kubernetes/apiserver"
1515
"github.com/DataDog/datadog-agent/pkg/util/log"
16+
discv1 "k8s.io/api/discovery/v1"
1617

1718
v1 "k8s.io/api/core/v1"
1819
)
@@ -89,16 +90,31 @@ func ResolveEndpointConfigAuto(conf *integration.Config, addr v1.EndpointAddress
8990
if targetRef := addr.TargetRef; targetRef != nil && targetRef.Kind == kubePodKind {
9091
// The endpoint is backed by a pod.
9192
// We add the pod uid as AD identifiers so the check can get the pod tags.
92-
podUID := string(targetRef.UID)
93-
conf.ADIdentifiers = append(conf.ADIdentifiers, getPodEntity(podUID))
94-
if nodeName := addr.NodeName; nodeName != nil {
95-
// Set the node name to schedule the endpoint check on the correct node.
96-
// This field needs to be set only when the endpoint is backed by a pod.
97-
conf.NodeName = *nodeName
93+
var nodeName string
94+
if addr.NodeName != nil {
95+
nodeName = *addr.NodeName
9896
}
97+
resolveTargetRefToConfig(conf, targetRef, nodeName)
9998
}
10099
}
101100

101+
func ResolveEndpointSliceConfigAuto(conf *integration.Config, endpoint discv1.Endpoint) {
102+
log.Debugf("using 'auto' resolve for config: %s, entity: %s", conf.Name, conf.ServiceID)
103+
if endpoint.TargetRef != nil && endpoint.TargetRef.Kind == kubePodKind {
104+
var nodeName string
105+
if endpoint.NodeName != nil {
106+
nodeName = *endpoint.NodeName
107+
}
108+
resolveTargetRefToConfig(conf, endpoint.TargetRef, nodeName)
109+
}
110+
}
111+
112+
func resolveTargetRefToConfig(conf *integration.Config, targetRef *v1.ObjectReference, nodeName string) {
113+
podUID := string(targetRef.UID)
114+
conf.ADIdentifiers = append(conf.ADIdentifiers, getPodEntity(podUID))
115+
conf.NodeName = nodeName
116+
}
117+
102118
// getPodEntity returns pod entity
103119
func getPodEntity(podUID string) string {
104120
return KubePodPrefix + podUID
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
// Unless explicitly stated otherwise all files in this repository are licensed
2+
// under the Apache License Version 2.0.
3+
// This product includes software developed at Datadog (https://www.datadoghq.com/).
4+
// Copyright 2016-present Datadog, Inc.
5+
6+
//go:build !(clusterchecks && kubeapiserver)
7+
8+
package listeners
9+
10+
// NewKubeEndpointSlicesListener returns the kube endpoints implementation of the ServiceListener interface
11+
var NewKubeEndpointSlicesListener ServiceListenerFactory

comp/core/autodiscovery/listeners/listeners.go

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -11,13 +11,14 @@ const (
1111
containerListenerName = "container"
1212
environmentListenerName = "environment"
1313
kubeEndpointsListenerName = "kube_endpoints"
14-
kubeServicesListenerName = "kube_services"
15-
kubeletListenerName = "kubelet"
16-
processListenerName = "process"
17-
snmpListenerName = "snmp"
18-
staticConfigListenerName = "static config"
19-
dbmAuroraListenerName = "database-monitoring-aurora"
20-
dbmRdsListenerName = "database-monitoring-rds"
14+
//kubeEndpointslicesListenerName = "kube_endpointslices"
15+
kubeServicesListenerName = "kube_services"
16+
kubeletListenerName = "kubelet"
17+
processListenerName = "process"
18+
snmpListenerName = "snmp"
19+
staticConfigListenerName = "static config"
20+
dbmAuroraListenerName = "database-monitoring-aurora"
21+
dbmRdsListenerName = "database-monitoring-rds"
2122
)
2223

2324
// RegisterListeners registers the available autodiscovery listerners.
@@ -26,7 +27,8 @@ func RegisterListeners(serviceListenerFactories map[string]ServiceListenerFactor
2627
Register(cloudFoundryBBSListenerName, NewCloudFoundryListener, serviceListenerFactories)
2728
Register(containerListenerName, NewContainerListener, serviceListenerFactories)
2829
Register(environmentListenerName, NewEnvironmentListener, serviceListenerFactories)
29-
Register(kubeEndpointsListenerName, NewKubeEndpointsListener, serviceListenerFactories)
30+
//Register(kubeEndpointsListenerName, NewKubeEndpointsListener, serviceListenerFactories)
31+
Register(kubeEndpointsListenerName, NewKubeEndpointSlicesListener, serviceListenerFactories)
3032
Register(kubeServicesListenerName, NewKubeServiceListener, serviceListenerFactories)
3133
Register(kubeletListenerName, NewKubeletListener, serviceListenerFactories)
3234
Register(processListenerName, NewProcessListener, serviceListenerFactories)

comp/core/autodiscovery/providers/kube_endpointslices.go

Lines changed: 55 additions & 83 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,9 @@ import (
1313
"strings"
1414
"sync"
1515

16-
pkgconfigsetup "github.com/DataDog/datadog-agent/pkg/config/setup"
1716
v1 "k8s.io/api/core/v1"
1817
discv1 "k8s.io/api/discovery/v1"
18+
"k8s.io/apimachinery/pkg/api/equality"
1919
"k8s.io/apimachinery/pkg/labels"
2020
listersv1 "k8s.io/client-go/listers/core/v1"
2121
disclisters "k8s.io/client-go/listers/discovery/v1"
@@ -26,13 +26,12 @@ import (
2626
"github.com/DataDog/datadog-agent/comp/core/autodiscovery/providers/names"
2727
"github.com/DataDog/datadog-agent/comp/core/autodiscovery/providers/types"
2828
"github.com/DataDog/datadog-agent/comp/core/autodiscovery/telemetry"
29-
"github.com/DataDog/datadog-agent/pkg/config/model"
29+
pkgconfigsetup "github.com/DataDog/datadog-agent/pkg/config/setup"
3030
"github.com/DataDog/datadog-agent/pkg/util/kubernetes/apiserver"
3131
"github.com/DataDog/datadog-agent/pkg/util/log"
3232
)
3333

3434
const (
35-
kubeEndpointSliceID = "endpointslices"
3635
kubeEndpointSliceAnnotationPrefix = "ad.datadoghq.com/endpoints."
3736
kubeEndpointSliceAnnotationPrefixLegacy = "service-discovery.datadoghq.com/endpoints."
3837
kubeEndpointSliceResolvePath = "resolve"
@@ -60,8 +59,8 @@ type configInfoSlices struct {
6059

6160
// NewKubeEndpointSlicesConfigProvider returns a new ConfigProvider connected to apiserver using EndpointSlices.
6261
// Connectivity is not checked at this stage to allow for retries, Collect will do it.
63-
func NewKubeEndpointSlicesConfigProvider(telemetryStore *telemetry.Store) (types.ConfigProvider, error) {
64-
// Using GetAPIClient (no wait) as Client should already be initialized by Cluster Agent main entrypoint before
62+
// Using GetAPIClient (no wait) as Client should already be initialized by Cluster Agent main entrypoint before
63+
func NewKubeEndpointSlicesConfigProvider(_ *pkgconfigsetup.ConfigurationProviders, telemetryStore *telemetry.Store) (types.ConfigProvider, error) {
6564
ac, err := apiserver.GetAPIClient()
6665
if err != nil {
6766
return nil, fmt.Errorf("cannot connect to apiserver: %s", err)
@@ -100,12 +99,19 @@ func NewKubeEndpointSlicesConfigProvider(telemetryStore *telemetry.Store) (types
10099
return nil, fmt.Errorf("cannot add event handler to endpointslice informer: %s", err)
101100
}
102101

102+
if pkgconfigsetup.Datadog().GetBool("cluster_checks.support_hybrid_ignore_ad_tags") {
103+
log.Warnf("The `cluster_checks.support_hybrid_ignore_ad_tags` flag is" +
104+
" deprecated and will be removed in a future version. Please replace " +
105+
"`ad.datadoghq.com/endpoints.ignore_autodiscovery_tags` in your service annotations" +
106+
"using adv2 for check specification and adv1 for `ignore_autodiscovery_tags`.")
107+
}
108+
103109
return p, nil
104110
}
105111

106112
// String returns a string representation of the kubeEndpointSlicesConfigProvider
107113
func (k *kubeEndpointSlicesConfigProvider) String() string {
108-
return names.KubeEndpoints
114+
return names.KubeEndpointSlices
109115
}
110116

111117
// Collect retrieves services from the apiserver, builds Config objects and returns them
@@ -117,7 +123,7 @@ func (k *kubeEndpointSlicesConfigProvider) Collect(context.Context) ([]integrati
117123
k.setUpToDate(true)
118124

119125
var generatedConfigs []integration.Config
120-
parsedConfigsInfo := k.parseServiceAnnotationsForEndpointSlices(services, pkgconfigsetup.Datadog())
126+
parsedConfigsInfo := k.parseServiceAnnotationsForEndpointSlices(services)
121127
for _, conf := range parsedConfigsInfo {
122128
// Fetch all EndpointSlices for this service
123129
slices, err := k.endpointSliceLister.EndpointSlices(conf.namespace).List(
@@ -127,7 +133,10 @@ func (k *kubeEndpointSlicesConfigProvider) Collect(context.Context) ([]integrati
127133
log.Errorf("Cannot get Kubernetes endpointslices: %s", err)
128134
continue
129135
}
130-
generatedConfigs = append(generatedConfigs, generateConfigsFromSlices(conf.tpl, conf.resolveMode, slices, conf.namespace, conf.serviceName)...)
136+
for _, slice := range slices {
137+
generatedConfigs = append(generatedConfigs, generateConfigFromSlice(conf.tpl, conf.resolveMode, slice, conf.namespace, conf.serviceName)...)
138+
}
139+
131140
serviceKey := fmt.Sprintf("%s/%s", conf.namespace, conf.serviceName)
132141
k.Lock()
133142
k.monitoredServices[serviceKey] = true
@@ -192,7 +201,6 @@ func (k *kubeEndpointSlicesConfigProvider) invalidateOnServiceUpdate(old, obj in
192201
k.setUpToDate(false)
193202
return
194203
}
195-
// Quick exit if resource version did not change
196204
if svc.ResourceVersion == oldSvc.ResourceVersion {
197205
return
198206
}
@@ -240,7 +248,6 @@ func (k *kubeEndpointSlicesConfigProvider) invalidateOnEndpointSliceUpdate(old,
240248
return
241249
}
242250

243-
// Get service name from label
244251
serviceName := slice.Labels[kubernetesServiceNameLabelProvider]
245252
if serviceName == "" {
246253
return
@@ -251,42 +258,18 @@ func (k *kubeEndpointSlicesConfigProvider) invalidateOnEndpointSliceUpdate(old,
251258
k.Lock()
252259
defer k.Unlock()
253260
if found := k.monitoredServices[serviceKey]; found {
254-
// Invalidate only when endpoints change
255-
if endpointSliceEndpointsDifferProvider(slice, oldSlice) {
256-
k.upToDate = false
257-
}
261+
k.upToDate = equality.Semantic.DeepEqual(slice.Endpoints, oldSlice.Endpoints)
258262
}
259263
}
260264

261-
// endpointSliceEndpointsDifferProvider compares the endpoints in two endpointslices
262-
func endpointSliceEndpointsDifferProvider(first, second *discv1.EndpointSlice) bool {
263-
if len(first.Endpoints) != len(second.Endpoints) {
264-
return true
265-
}
266-
267-
// Simple comparison - could be optimized with a more sophisticated diff
268-
for i := range first.Endpoints {
269-
if len(first.Endpoints[i].Addresses) != len(second.Endpoints[i].Addresses) {
270-
return true
271-
}
272-
for j := range first.Endpoints[i].Addresses {
273-
if first.Endpoints[i].Addresses[j] != second.Endpoints[i].Addresses[j] {
274-
return true
275-
}
276-
}
277-
}
278-
279-
return false
280-
}
281-
282265
// setUpToDate is a thread-safe method to update the upToDate value
283266
func (k *kubeEndpointSlicesConfigProvider) setUpToDate(v bool) {
284267
k.Lock()
285268
defer k.Unlock()
286269
k.upToDate = v
287270
}
288271

289-
func (k *kubeEndpointSlicesConfigProvider) parseServiceAnnotationsForEndpointSlices(services []*v1.Service, cfg model.Config) []configInfoSlices {
272+
func (k *kubeEndpointSlicesConfigProvider) parseServiceAnnotationsForEndpointSlices(services []*v1.Service) []configInfoSlices {
290273
var configsInfo []configInfoSlices
291274

292275
setServiceKeys := map[string]struct{}{}
@@ -300,7 +283,6 @@ func (k *kubeEndpointSlicesConfigProvider) parseServiceAnnotationsForEndpointSli
300283
serviceKey := fmt.Sprintf("%s/%s", svc.Namespace, svc.Name)
301284
setServiceKeys[serviceKey] = struct{}{}
302285

303-
// Use same annotation prefix as Endpoints for compatibility
304286
endptConf, errors := utils.ExtractTemplatesFromAnnotations(serviceKey, svc.GetAnnotations(), kubeEndpointID)
305287
for _, err := range errors {
306288
log.Errorf("Cannot parse endpoint template for service %s: %s", serviceKey, err)
@@ -323,8 +305,9 @@ func (k *kubeEndpointSlicesConfigProvider) parseServiceAnnotationsForEndpointSli
323305

324306
ignoreAdForHybridScenariosTags := ignoreADTagsFromAnnotations(svc.GetAnnotations(), kubeEndpointSliceAnnotationPrefix)
325307
for i := range endptConf {
326-
endptConf[i].Source = "kube_endpointslices:" + serviceKey
327-
if cfg.GetBool("cluster_checks.support_hybrid_ignore_ad_tags") {
308+
// TODO: Kept same source for now, but we should consider using a different source.
309+
endptConf[i].Source = "kube_endpoints:" + apiserver.EntityForEndpoints(svc.Namespace, svc.Name, "")
310+
if pkgconfigsetup.Datadog().GetBool("cluster_checks.support_hybrid_ignore_ad_tags") {
328311
endptConf[i].IgnoreAutodiscoveryTags = endptConf[i].IgnoreAutodiscoveryTags || ignoreAdForHybridScenariosTags
329312
}
330313
configsInfo = append(configsInfo, configInfoSlices{
@@ -360,70 +343,59 @@ func hasEndpointSliceAnnotations(svc *v1.Service) bool {
360343
return false
361344
}
362345

363-
// generateConfigsFromSlices creates a config template for each endpoint IP across all slices
364-
func generateConfigsFromSlices(tpl integration.Config, resolveMode endpointResolveMode, slices []*discv1.EndpointSlice, namespace, serviceName string) []integration.Config {
365-
if len(slices) == 0 {
366-
log.Warnf("No EndpointSlices for service %s/%s, cannot generate config templates", namespace, serviceName)
346+
// generateConfigFromSlice creates a config template for each endpoint IP across all slices
347+
func generateConfigFromSlice(tpl integration.Config, resolveMode endpointResolveMode, slice *discv1.EndpointSlice, namespace, serviceName string) []integration.Config {
348+
if slice == nil {
349+
log.Warnf("EndpointSlice for %s/%s is nil, cannot generate config templates", namespace, serviceName)
367350
return []integration.Config{tpl}
368351
}
369352
generatedConfigs := make([]integration.Config, 0)
370353

371354
// Check resolve annotation to know how we should process this endpoint
372-
resolveFunc := getEndpointResolveFuncForSlices(resolveMode, namespace, serviceName)
373-
374-
for _, slice := range slices {
375-
for _, endpoint := range slice.Endpoints {
376-
for _, ip := range endpoint.Addresses {
377-
// Set a new entity containing the endpoint's IP
378-
entity := apiserver.EntityForEndpoints(namespace, serviceName, ip)
379-
newConfig := integration.Config{
380-
ServiceID: entity,
381-
Name: tpl.Name,
382-
Instances: tpl.Instances,
383-
InitConfig: tpl.InitConfig,
384-
MetricConfig: tpl.MetricConfig,
385-
LogsConfig: tpl.LogsConfig,
386-
ADIdentifiers: []string{entity},
387-
ClusterCheck: true,
388-
Provider: tpl.Provider,
389-
Source: tpl.Source,
390-
IgnoreAutodiscoveryTags: tpl.IgnoreAutodiscoveryTags,
391-
}
392-
393-
if resolveFunc != nil {
394-
resolveFunc(&newConfig, endpoint, ip)
395-
}
396-
397-
generatedConfigs = append(generatedConfigs, newConfig)
355+
resolveFunc := getEndpointResolveFuncForSlice(resolveMode, namespace, serviceName)
356+
357+
for _, endpoint := range slice.Endpoints {
358+
for _, ip := range endpoint.Addresses {
359+
// Set a new entity containing the endpoint's IP
360+
entity := apiserver.EntityForEndpoints(namespace, serviceName, ip)
361+
newConfig := integration.Config{
362+
ServiceID: entity,
363+
Name: tpl.Name,
364+
Instances: tpl.Instances,
365+
InitConfig: tpl.InitConfig,
366+
MetricConfig: tpl.MetricConfig,
367+
LogsConfig: tpl.LogsConfig,
368+
ADIdentifiers: []string{entity},
369+
ClusterCheck: true,
370+
Provider: tpl.Provider,
371+
Source: tpl.Source,
372+
IgnoreAutodiscoveryTags: tpl.IgnoreAutodiscoveryTags,
373+
}
374+
375+
if resolveFunc != nil {
376+
resolveFunc(&newConfig, endpoint)
398377
}
378+
379+
generatedConfigs = append(generatedConfigs, newConfig)
399380
}
400381
}
401382
return generatedConfigs
402383
}
403384

404-
// getEndpointResolveFuncForSlices returns a function that resolves the endpoint address for EndpointSlices
405-
func getEndpointResolveFuncForSlices(resolveMode endpointResolveMode, namespace, name string) func(*integration.Config, discv1.Endpoint, string) {
406-
var resolveFunc func(*integration.Config, discv1.Endpoint, string)
385+
// getEndpointResolveFuncForSlice returns a function that resolves the endpoint address for EndpointSlices
386+
func getEndpointResolveFuncForSlice(resolveMode endpointResolveMode, namespace, name string) func(*integration.Config, discv1.Endpoint) {
387+
var resolveFunc func(*integration.Config, discv1.Endpoint)
407388

408389
switch resolveMode {
409390
case kubeEndpointResolveIP:
410391
// IP: we explicitly ignore what's behind this address (nothing to do)
411392

412393
case "", kubeEndpointResolveAuto:
413394
// Auto or empty (default to auto): we try to resolve the POD behind this address
414-
resolveFunc = func(config *integration.Config, endpoint discv1.Endpoint, _ string) {
415-
if endpoint.TargetRef != nil && endpoint.TargetRef.Kind == "Pod" {
416-
config.NodeName = endpoint.TargetRef.Name
417-
}
418-
}
419-
395+
resolveFunc = utils.ResolveEndpointSliceConfigAuto
420396
default:
421397
log.Warnf("Unknown resolve mode %s for service %s/%s, defaulting to auto", resolveMode, namespace, name)
422-
resolveFunc = func(config *integration.Config, endpoint discv1.Endpoint, _ string) {
423-
if endpoint.TargetRef != nil && endpoint.TargetRef.Kind == "Pod" {
424-
config.NodeName = endpoint.TargetRef.Name
425-
}
426-
}
398+
resolveFunc = utils.ResolveEndpointSliceConfigAuto
427399
}
428400

429401
return resolveFunc
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
// Unless explicitly stated otherwise all files in this repository are licensed
2+
// under the Apache License Version 2.0.
3+
// This product includes software developed at Datadog (https://www.datadoghq.com/).
4+
// Copyright 2016-present Datadog, Inc.
5+
6+
//go:build !(clusterchecks && kubeapiserver)
7+
8+
package providers
9+
10+
import (
11+
"github.com/DataDog/datadog-agent/comp/core/autodiscovery/providers/types"
12+
"github.com/DataDog/datadog-agent/comp/core/autodiscovery/telemetry"
13+
pkgconfigsetup "github.com/DataDog/datadog-agent/pkg/config/setup"
14+
)
15+
16+
// NewKubeEndpointSlicesConfigProvider returns a new ConfigProvider connected to apiserver.
17+
// Connectivity is not checked at this stage to allow for retries, Collect will do it.
18+
var NewKubeEndpointSlicesConfigProvider func(providerConfig *pkgconfigsetup.ConfigurationProviders, telemetryStore *telemetry.Store) (types.ConfigProvider, error)

0 commit comments

Comments
 (0)