diff --git a/charts/newrelic-infrastructure/templates/clusterrole.yaml b/charts/newrelic-infrastructure/templates/clusterrole.yaml index 391dc1e1fd..177a338d5c 100644 --- a/charts/newrelic-infrastructure/templates/clusterrole.yaml +++ b/charts/newrelic-infrastructure/templates/clusterrole.yaml @@ -20,6 +20,14 @@ rules: - "namespaces" - "pods" verbs: [ "get", "list", "watch" ] + - apiGroups: + - discovery.k8s.io + resources: + - endpointslices + verbs: + - get + - list + - watch - nonResourceURLs: ["/metrics"] verbs: ["get"] {{- if .Values.rbac.pspEnabled }} diff --git a/internal/discovery/endpoint_slices_discoverer.go b/internal/discovery/endpoint_slices_discoverer.go new file mode 100644 index 0000000000..30c18e68ed --- /dev/null +++ b/internal/discovery/endpoint_slices_discoverer.go @@ -0,0 +1,149 @@ +package discovery + +import ( + "errors" + "fmt" + "net" + "sort" + "strconv" + "time" + + apidiscoveryv1 "k8s.io/api/discovery/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes" + discoverylistersv1 "k8s.io/client-go/listers/discovery/v1" +) + +type EndpointSlicesDiscoveryConfig struct { + // LabelSelector is the selector used to filter Endpoints. + LabelSelector string + // Namespace can be used to restric the search to a particular namespace. + Namespace string + // If set, Port will discard all endpoints discovered that do not use this specified port + Port int + + // Client is the Kubernetes client.Interface used to build informers. + Client kubernetes.Interface +} + +type EndpointSlicesDiscoverer interface { + Discover() ([]string, error) +} + +type endpointSlicesDiscoverer struct { + lister discoverylistersv1.EndpointSliceLister + port int + fixedEndpointSorted []string +} + +func NewEndpointSlicesDiscoverer(config EndpointSlicesDiscoveryConfig) (EndpointSlicesDiscoverer, error) { + if config.Client == nil { + return nil, fmt.Errorf("client must be configured") + } + + // Arbitrary value, same used in Prometheus. + resyncDuration := 10 * time.Minute + stopCh := make(chan struct{}) + + var _ = apidiscoveryv1.EndpointSlice{} + + el := func(options ...informers.SharedInformerOption) discoverylistersv1.EndpointSliceLister { + factory := informers.NewSharedInformerFactoryWithOptions(config.Client, resyncDuration, options...) + + lister := factory.Discovery().V1().EndpointSlices().Lister() + + factory.Start(stopCh) + factory.WaitForCacheSync(stopCh) + + return lister + } + + return &endpointSlicesDiscoverer{ + lister: el( + informers.WithNamespace(config.Namespace), + informers.WithTweakListOptions(func(options *metav1.ListOptions) { + options.LabelSelector = config.LabelSelector + }), + ), + port: config.Port, + }, nil +} + +func (d *endpointSlicesDiscoverer) Discover() ([]string, error) { + if len(d.fixedEndpointSorted) != 0 { + return d.fixedEndpointSorted, nil + } + + endpointSlices, err := d.lister.List(labels.Everything()) + if err != nil { + return nil, fmt.Errorf("listing endpoints: %w", err) + } + + var hosts []string + + for _, endpointSlice := range endpointSlices { + for _, endpoint := range endpointSlice.Endpoints { + for _, address := range endpoint.Addresses { + for _, port := range endpointSlice.Ports { + if port.Port == nil { + continue + } + if d.port != 0 && d.port != int(*port.Port) { + continue + } + + //@todo: validate if these checks are needed + if endpoint.Conditions.Ready != nil && !*endpoint.Conditions.Ready { + continue + } + if endpoint.Conditions.Serving != nil && !*endpoint.Conditions.Serving { + continue + } + if endpoint.Conditions.Terminating != nil && *endpoint.Conditions.Terminating { + continue + } + + hosts = append(hosts, net.JoinHostPort(address, strconv.Itoa(int(*port.Port)))) + } + } + } + } + + // Sorting the array is needed to be sure we are hitting each time the endpoints in the same order + sort.Strings(hosts) + + return hosts, nil +} + +// ErrEnpointSlicesDiscoveryTimeout is returned by EndpointsDiscovererWithTimeout when discovery times out +var ErrEnpointSlicesDiscoveryTimeout = errors.New("timeout discovering endpoint slices") + +// EndpointsDiscovererWithTimeout implements EndpointsDiscoverer with a retry mechanism if no endpoints are found. +type EndpointSlicesDiscovererWithTimeout struct { + EndpointsDiscoverer + BackoffDelay time.Duration + Timeout time.Duration +} + +// Discover will call poll the inner EndpointsDiscoverer every BackoffDelay seconds up to a max of Retries times until it +// returns an error, or a non-empty list of endpoints. +// If the max number of Retries is exceeded, it will return ErrDiscoveryTimeout. +func (edt *EndpointSlicesDiscovererWithTimeout) Discover() ([]string, error) { + start := time.Now() + for time.Since(start) < edt.Timeout { + endpoints, err := edt.EndpointsDiscoverer.Discover() + if err != nil { + return nil, err + } + + if len(endpoints) > 0 { + return endpoints, nil + } + + time.Sleep(edt.BackoffDelay) + } + + return nil, ErrEnpointSlicesDiscoveryTimeout +} diff --git a/src/ksm/scraper.go b/src/ksm/scraper.go index 325aee3c33..fcccc3361c 100644 --- a/src/ksm/scraper.go +++ b/src/ksm/scraper.go @@ -33,13 +33,14 @@ type Providers struct { // Scraper takes care of getting metrics from an autodiscovered KSM instance. type Scraper struct { Providers - logger *log.Logger - config *config.Config - k8sVersion *version.Info - endpointsDiscoverer discovery.EndpointsDiscoverer - servicesLister listersv1.ServiceLister - informerClosers []chan<- struct{} - Filterer discovery.NamespaceFilterer + logger *log.Logger + config *config.Config + k8sVersion *version.Info + endpointsDiscoverer discovery.EndpointsDiscoverer + endpointSlicesDiscoverer discovery.EndpointSlicesDiscoverer + servicesLister listersv1.ServiceLister + informerClosers []chan<- struct{} + Filterer discovery.NamespaceFilterer } // ScraperOpt are options that can be used to configure the Scraper @@ -98,6 +99,13 @@ func NewScraper(config *config.Config, providers Providers, options ...ScraperOp s.endpointsDiscoverer = endpointsDiscoverer + s.logger.Debugf("Building KSM endpoint slices discoverer") + endpointSlicesDiscoverer, err := s.buildEndpointSlicesDiscoverer() + if err != nil { + return nil, fmt.Errorf("building endpoint slices disoverer: %w", err) + } + s.endpointSlicesDiscoverer = endpointSlicesDiscoverer + servicesLister, servicesCloser := discovery.NewServicesLister(providers.K8s) s.servicesLister = servicesLister s.informerClosers = append(s.informerClosers, servicesCloser) @@ -204,17 +212,63 @@ func (s *Scraper) buildDiscoverer() (discovery.EndpointsDiscoverer, error) { }, nil } +// buildEndpointSlicesDiscoverer returns a discovery.EndpointSlicesDiscoverer, configured to discover KSM endpoints in the cluster, +// or to return the static endpoint defined by the user in the config. +func (s *Scraper) buildEndpointSlicesDiscoverer() (discovery.EndpointSlicesDiscoverer, error) { + dc := discovery.EndpointSlicesDiscoveryConfig{ + LabelSelector: defaultLabelSelector, + Client: s.K8s, + } + + if s.config.KSM.Namespace != "" { + s.logger.Debugf("Restricting KSM discovery to namespace %q", s.config.KSM.Namespace) + dc.Namespace = s.config.KSM.Namespace + } + + if s.config.KSM.Selector != "" { + s.logger.Debugf("Overriding default KSM labelSelector (%q) to %q", defaultLabelSelector, s.config.KSM.Selector) + dc.LabelSelector = s.config.KSM.Selector + } + + if s.config.KSM.Port != 0 { + s.logger.Debugf("Overriding default KSM port to %d", s.config.KSM.Port) + dc.Port = s.config.KSM.Port + } + + discoverer, err := discovery.NewEndpointSlicesDiscoverer(dc) + if err != nil { + return nil, err + } + + return &discovery.EndpointSlicesDiscovererWithTimeout{ + EndpointsDiscoverer: discoverer, + + BackoffDelay: s.config.KSM.Discovery.BackoffDelay, + Timeout: s.config.KSM.Discovery.Timeout, + }, nil +} + func (s *Scraper) ksmURLs() ([]string, error) { if u := s.config.KSM.StaticURL; u != "" { s.logger.Debugf("Using overridden endpoint for ksm %q", u) return []string{u}, nil } - endpoints, err := s.endpointsDiscoverer.Discover() + var endpoints []string + var err error + endpoints, err = s.endpointSlicesDiscoverer.Discover() if err != nil { return nil, fmt.Errorf("discovering KSM endpoints: %w", err) } + //@todo: decide if we need this + if false { + endpoints, err = s.endpointsDiscoverer.Discover() + if err != nil { + return nil, fmt.Errorf("discovering KSM endpoints: %w", err) + } + } + scheme := s.config.KSM.Scheme if scheme == "" { scheme = defaultScheme