Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions charts/newrelic-infrastructure/templates/clusterrole.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,14 @@ rules:
- "namespaces"
- "pods"
verbs: [ "get", "list", "watch" ]
- apiGroups:
- discovery.k8s.io
resources:
- endpointslices
verbs:
- get
- list
- watch
- nonResourceURLs: ["/metrics"]
verbs: ["get"]
{{- if .Values.rbac.pspEnabled }}
Expand Down
149 changes: 149 additions & 0 deletions internal/discovery/endpoint_slices_discoverer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
package discovery

import (
"errors"
"fmt"
"net"
"sort"
"strconv"
"time"

apidiscoveryv1 "k8s.io/api/discovery/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
discoverylistersv1 "k8s.io/client-go/listers/discovery/v1"
)

type EndpointSlicesDiscoveryConfig struct {
// LabelSelector is the selector used to filter Endpoints.
LabelSelector string
// Namespace can be used to restric the search to a particular namespace.
Namespace string
// If set, Port will discard all endpoints discovered that do not use this specified port
Port int

// Client is the Kubernetes client.Interface used to build informers.
Client kubernetes.Interface
}

type EndpointSlicesDiscoverer interface {
Discover() ([]string, error)
}

type endpointSlicesDiscoverer struct {
lister discoverylistersv1.EndpointSliceLister
port int
fixedEndpointSorted []string
}

func NewEndpointSlicesDiscoverer(config EndpointSlicesDiscoveryConfig) (EndpointSlicesDiscoverer, error) {

Check failure on line 41 in internal/discovery/endpoint_slices_discoverer.go

View workflow job for this annotation

GitHub Actions / Static analysis and linting

NewEndpointSlicesDiscoverer returns interface (github.com/newrelic/nri-kubernetes/v3/internal/discovery.EndpointSlicesDiscoverer) (ireturn)
if config.Client == nil {
return nil, fmt.Errorf("client must be configured")

Check failure on line 43 in internal/discovery/endpoint_slices_discoverer.go

View workflow job for this annotation

GitHub Actions / Static analysis and linting

do not define dynamic errors, use wrapped static errors instead: "fmt.Errorf(\"client must be configured\")" (err113)
}

// Arbitrary value, same used in Prometheus.
resyncDuration := 10 * time.Minute
stopCh := make(chan struct{})

var _ = apidiscoveryv1.EndpointSlice{}

Check failure on line 50 in internal/discovery/endpoint_slices_discoverer.go

View workflow job for this annotation

GitHub Actions / Static analysis and linting

File is not properly formatted (gofumpt)

el := func(options ...informers.SharedInformerOption) discoverylistersv1.EndpointSliceLister {
factory := informers.NewSharedInformerFactoryWithOptions(config.Client, resyncDuration, options...)

lister := factory.Discovery().V1().EndpointSlices().Lister()

factory.Start(stopCh)
factory.WaitForCacheSync(stopCh)

return lister
}

return &endpointSlicesDiscoverer{
lister: el(
informers.WithNamespace(config.Namespace),
informers.WithTweakListOptions(func(options *metav1.ListOptions) {
options.LabelSelector = config.LabelSelector
}),
),
port: config.Port,
}, nil
}

func (d *endpointSlicesDiscoverer) Discover() ([]string, error) {

Check failure on line 74 in internal/discovery/endpoint_slices_discoverer.go

View workflow job for this annotation

GitHub Actions / Static analysis and linting

cognitive complexity 41 of func `(*endpointSlicesDiscoverer).Discover` is high (> 30) (gocognit)
if len(d.fixedEndpointSorted) != 0 {
return d.fixedEndpointSorted, nil
}

endpointSlices, err := d.lister.List(labels.Everything())
if err != nil {
return nil, fmt.Errorf("listing endpoints: %w", err)
}

var hosts []string

for _, endpointSlice := range endpointSlices {
for _, endpoint := range endpointSlice.Endpoints {
for _, address := range endpoint.Addresses {
for _, port := range endpointSlice.Ports {
if port.Port == nil {
continue
}
if d.port != 0 && d.port != int(*port.Port) {
continue
}

//@todo: validate if these checks are needed

Check failure on line 97 in internal/discovery/endpoint_slices_discoverer.go

View workflow job for this annotation

GitHub Actions / Static analysis and linting

commentFormatting: put a space between `//` and comment text (gocritic)
if endpoint.Conditions.Ready != nil && !*endpoint.Conditions.Ready {
continue
}
if endpoint.Conditions.Serving != nil && !*endpoint.Conditions.Serving {
continue
}
if endpoint.Conditions.Terminating != nil && *endpoint.Conditions.Terminating {
continue
}

hosts = append(hosts, net.JoinHostPort(address, strconv.Itoa(int(*port.Port))))
}
}
}
}

// Sorting the array is needed to be sure we are hitting each time the endpoints in the same order
sort.Strings(hosts)

return hosts, nil
}

// ErrEnpointSlicesDiscoveryTimeout is returned by EndpointsDiscovererWithTimeout when discovery times out

Check failure on line 120 in internal/discovery/endpoint_slices_discoverer.go

View workflow job for this annotation

GitHub Actions / Static analysis and linting

Comment should end in a period (godot)
var ErrEnpointSlicesDiscoveryTimeout = errors.New("timeout discovering endpoint slices")

// EndpointsDiscovererWithTimeout implements EndpointsDiscoverer with a retry mechanism if no endpoints are found.
type EndpointSlicesDiscovererWithTimeout struct {
EndpointsDiscoverer
BackoffDelay time.Duration
Timeout time.Duration
}

// Discover will call poll the inner EndpointsDiscoverer every BackoffDelay seconds up to a max of Retries times until it
// returns an error, or a non-empty list of endpoints.
// If the max number of Retries is exceeded, it will return ErrDiscoveryTimeout.
func (edt *EndpointSlicesDiscovererWithTimeout) Discover() ([]string, error) {
start := time.Now()
for time.Since(start) < edt.Timeout {
endpoints, err := edt.EndpointsDiscoverer.Discover()
if err != nil {
return nil, err
}

if len(endpoints) > 0 {
return endpoints, nil
}

time.Sleep(edt.BackoffDelay)
}

return nil, ErrEnpointSlicesDiscoveryTimeout
}
70 changes: 62 additions & 8 deletions src/ksm/scraper.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,14 @@
// Scraper takes care of getting metrics from an autodiscovered KSM instance.
type Scraper struct {
Providers
logger *log.Logger
config *config.Config
k8sVersion *version.Info
endpointsDiscoverer discovery.EndpointsDiscoverer
servicesLister listersv1.ServiceLister
informerClosers []chan<- struct{}
Filterer discovery.NamespaceFilterer
logger *log.Logger
config *config.Config
k8sVersion *version.Info
endpointsDiscoverer discovery.EndpointsDiscoverer
endpointSlicesDiscoverer discovery.EndpointSlicesDiscoverer
servicesLister listersv1.ServiceLister
informerClosers []chan<- struct{}
Filterer discovery.NamespaceFilterer
}

// ScraperOpt are options that can be used to configure the Scraper
Expand Down Expand Up @@ -98,6 +99,13 @@

s.endpointsDiscoverer = endpointsDiscoverer

s.logger.Debugf("Building KSM endpoint slices discoverer")
endpointSlicesDiscoverer, err := s.buildEndpointSlicesDiscoverer()
if err != nil {
return nil, fmt.Errorf("building endpoint slices disoverer: %w", err)
}
s.endpointSlicesDiscoverer = endpointSlicesDiscoverer

servicesLister, servicesCloser := discovery.NewServicesLister(providers.K8s)
s.servicesLister = servicesLister
s.informerClosers = append(s.informerClosers, servicesCloser)
Expand Down Expand Up @@ -204,17 +212,63 @@
}, nil
}

// buildEndpointSlicesDiscoverer returns a discovery.EndpointSlicesDiscoverer, configured to discover KSM endpoints in the cluster,
// or to return the static endpoint defined by the user in the config.
func (s *Scraper) buildEndpointSlicesDiscoverer() (discovery.EndpointSlicesDiscoverer, error) {
dc := discovery.EndpointSlicesDiscoveryConfig{
LabelSelector: defaultLabelSelector,
Client: s.K8s,
}

if s.config.KSM.Namespace != "" {

Check failure on line 223 in src/ksm/scraper.go

View workflow job for this annotation

GitHub Actions / Static analysis and linting

QF1008: could remove embedded field "KSM" from selector (staticcheck)
s.logger.Debugf("Restricting KSM discovery to namespace %q", s.config.KSM.Namespace)

Check failure on line 224 in src/ksm/scraper.go

View workflow job for this annotation

GitHub Actions / Static analysis and linting

QF1008: could remove embedded field "KSM" from selector (staticcheck)
dc.Namespace = s.config.KSM.Namespace

Check failure on line 225 in src/ksm/scraper.go

View workflow job for this annotation

GitHub Actions / Static analysis and linting

QF1008: could remove embedded field "KSM" from selector (staticcheck)
}

if s.config.KSM.Selector != "" {
s.logger.Debugf("Overriding default KSM labelSelector (%q) to %q", defaultLabelSelector, s.config.KSM.Selector)
dc.LabelSelector = s.config.KSM.Selector
}

if s.config.KSM.Port != 0 {
s.logger.Debugf("Overriding default KSM port to %d", s.config.KSM.Port)
dc.Port = s.config.KSM.Port
}

discoverer, err := discovery.NewEndpointSlicesDiscoverer(dc)
if err != nil {
return nil, err
}

return &discovery.EndpointSlicesDiscovererWithTimeout{
EndpointsDiscoverer: discoverer,

BackoffDelay: s.config.KSM.Discovery.BackoffDelay,
Timeout: s.config.KSM.Discovery.Timeout,
}, nil
}

func (s *Scraper) ksmURLs() ([]string, error) {
if u := s.config.KSM.StaticURL; u != "" {
s.logger.Debugf("Using overridden endpoint for ksm %q", u)
return []string{u}, nil
}

endpoints, err := s.endpointsDiscoverer.Discover()
var endpoints []string
var err error
endpoints, err = s.endpointSlicesDiscoverer.Discover()
if err != nil {
return nil, fmt.Errorf("discovering KSM endpoints: %w", err)
}

//@todo: decide if we need this

Check failure on line 264 in src/ksm/scraper.go

View workflow job for this annotation

GitHub Actions / Static analysis and linting

commentFormatting: put a space between `//` and comment text (gocritic)
if false {
endpoints, err = s.endpointsDiscoverer.Discover()
if err != nil {
return nil, fmt.Errorf("discovering KSM endpoints: %w", err)
}
}

scheme := s.config.KSM.Scheme
if scheme == "" {
scheme = defaultScheme
Expand Down
Loading