Skip to content

Commit eb8b59f

Browse files
authored
Implement native k8s ExternalMetricClient instead of HTTP clients
Quite an extensive refactor thanks to using native k8s packages. Lots more tests and more coverage.
1 parent 7b05355 commit eb8b59f

File tree

4 files changed

+274
-232
lines changed

4 files changed

+274
-232
lines changed

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ require (
2424
google.golang.org/grpc v1.76.0
2525
google.golang.org/protobuf v1.36.10
2626
gopkg.in/h2non/gock.v1 v1.1.2
27+
gopkg.in/inf.v0 v0.9.1
2728
k8s.io/api v0.34.2
2829
k8s.io/apimachinery v0.34.2
2930
k8s.io/client-go v0.34.2
@@ -97,7 +98,6 @@ require (
9798
google.golang.org/genproto/googleapis/api v0.0.0-20250804133106-a7a43d27e69b // indirect
9899
google.golang.org/genproto/googleapis/rpc v0.0.0-20251002232023-7c0ddcbb5797 // indirect
99100
gopkg.in/evanphx/json-patch.v4 v4.12.0 // indirect
100-
gopkg.in/inf.v0 v0.9.1 // indirect
101101
gopkg.in/yaml.v3 v3.0.1 // indirect
102102
k8s.io/gengo/v2 v2.0.0-20250604051438-85fd79dbfd9f // indirect
103103
k8s.io/kube-openapi v0.0.0-20250710124328-f3f2b991d03b // indirect

pkg/metrics/providers/externalmetrics.go

Lines changed: 97 additions & 90 deletions
Original file line numberDiff line numberDiff line change
@@ -17,139 +17,146 @@ limitations under the License.
1717
package providers
1818

1919
import (
20-
"context"
21-
"crypto/tls"
22-
"encoding/json"
2320
"fmt"
24-
"io"
25-
"net"
26-
"net/http"
2721
"net/url"
28-
"os"
22+
"strings"
2923
"time"
3024

3125
flaggerv1 "github.com/fluxcd/flagger/pkg/apis/flagger/v1beta1"
32-
"k8s.io/metrics/pkg/apis/external_metrics"
33-
)
34-
35-
const (
36-
metricServiceEndpointPath = "/apis/external.metrics.k8s.io/v1beta1"
37-
namespacesPath = "/namespaces/"
38-
39-
authorizationHeaderKey = "Authorization"
40-
applicationBearerToken = "token"
26+
"k8s.io/apimachinery/pkg/labels"
27+
"k8s.io/client-go/rest"
28+
externalmetrics_client "k8s.io/metrics/pkg/client/external_metrics"
4129
)
4230

4331
// ExternalMetricsProvider fetches metrics from an ExternalMetricsProvider.
4432
type ExternalMetricsProvider struct {
45-
metricServiceEndpoint string
46-
bearerToken string
47-
4833
timeout time.Duration
49-
client *http.Client
34+
client externalmetrics_client.NamespacedMetricsGetter
5035
}
5136

5237
// NewExternalMetricsProvider takes a canary spec, a provider spec, and
53-
// returns a client ready to execute queries against the Service
54-
func NewExternalMetricsProvider(metricInterval string,
38+
// returns a client ready to execute queries against the Service.
39+
func NewExternalMetricsProvider(
5540
provider flaggerv1.MetricTemplateProvider,
5641
credentials map[string][]byte) (*ExternalMetricsProvider, error) {
42+
return newExternalMetricsProviderWithBuilder(
43+
provider, credentials, rest.InClusterConfig,
44+
)
45+
}
5746

58-
if provider.Address == "" {
59-
return nil, fmt.Errorf("the Url of the external metric service must be provided")
47+
// newExternalMetricsProviderWithBuilder is like NewExternalMetricsProvider but
48+
// accepts a rest.Config builder function. Used for testing as InClusterConfig is hard to mock
49+
func newExternalMetricsProviderWithBuilder(
50+
provider flaggerv1.MetricTemplateProvider,
51+
credentials map[string][]byte,
52+
configBuilder func() (*rest.Config, error),
53+
) (*ExternalMetricsProvider, error) {
54+
restConfig, err := configBuilder()
55+
if err != nil || restConfig == nil {
56+
return nil, fmt.Errorf("Not in a kubernetes cluster: %w", err)
6057
}
6158

62-
emp := ExternalMetricsProvider{
63-
metricServiceEndpoint: fmt.Sprintf("%s%s", provider.Address, metricServiceEndpointPath),
64-
timeout: 5 * time.Second,
65-
client: http.DefaultClient,
59+
// Handling overrides from MetricTemplateProvider
60+
if provider.Address != "" {
61+
restConfig.Host = provider.Address
6662
}
67-
68-
if provider.InsecureSkipVerify {
69-
t := http.DefaultTransport.(*http.Transport).Clone()
70-
t.TLSClientConfig = &tls.Config{InsecureSkipVerify: true}
71-
emp.client = &http.Client{Transport: t}
63+
restConfig.TLSClientConfig = rest.TLSClientConfig{
64+
Insecure: provider.InsecureSkipVerify,
7265
}
66+
if tokenBytes, ok := credentials["token"]; ok {
67+
restConfig.BearerToken = string(tokenBytes)
68+
}
69+
// TODO: handle user name/password auth if needed
7370

74-
if b, ok := credentials[applicationBearerToken]; ok {
75-
emp.bearerToken = string(b)
76-
} else {
77-
// In the absence of a provided token,
78-
// read service account token from volume mount
79-
token, err := os.ReadFile("/var/run/secrets/kubernetes.io/serviceaccount/token")
80-
if err != nil {
81-
return nil, fmt.Errorf("error reading service account token: %w", err)
82-
}
83-
if len(token) == 0 {
84-
return nil, fmt.Errorf("pod's service account token is empty")
85-
}
86-
emp.bearerToken = string(token)
71+
client, err := externalmetrics_client.NewForConfig(restConfig)
72+
if err != nil {
73+
return nil, fmt.Errorf("error creating external metric client: %w", err)
8774
}
8875

89-
return &emp, nil
76+
return &ExternalMetricsProvider{
77+
timeout: 5 * time.Second,
78+
client: client,
79+
}, nil
9080
}
9181

92-
// RunQuery retrieves the ExternalMetricValue from the ExternalMetricsProvider.metricServiceUrl
93-
// and returns the first result as a float64
82+
// RunQuery retrieves the ExternalMetricValue from the External Metrics API
83+
// at the ExternalMetricsProvider's Address, using the provided query string,
84+
// and returns the *first* result as a float64.
9485
func (p *ExternalMetricsProvider) RunQuery(query string) (float64, error) {
95-
u := fmt.Sprintf("%s%s%s", p.metricServiceEndpoint, namespacesPath, query)
96-
97-
req, err := http.NewRequest("GET", u, nil)
98-
if err != nil {
99-
return 0, fmt.Errorf("error http.NewRequest: %w", err)
100-
}
101-
if p.bearerToken != "" {
102-
req.Header.Add(authorizationHeaderKey, fmt.Sprintf("Bearer %s", p.bearerToken))
103-
}
104-
105-
ctx, cancel := context.WithTimeout(req.Context(), p.timeout)
106-
defer cancel()
107-
r, err := p.client.Do(req.WithContext(ctx))
86+
// The Provider interface only allows a plain string query so decode it
87+
namespace, metricName, selector, err := parseExternalMetricsQuery(query)
10888
if err != nil {
109-
return 0, fmt.Errorf("request failed: %w", err)
89+
return 0, fmt.Errorf("error parsing metric query: %w", err)
11090
}
11191

112-
defer r.Body.Close()
113-
b, err := io.ReadAll(r.Body)
92+
nm := p.client.NamespacedMetrics(namespace)
93+
metricsList, err := nm.List(metricName, selector)
11494
if err != nil {
115-
return 0, fmt.Errorf("error reading body: %w", err)
116-
}
117-
118-
if r.StatusCode != http.StatusOK {
119-
return 0, fmt.Errorf("error response: %s: %w", string(b), err)
120-
}
121-
122-
var res external_metrics.ExternalMetricValueList
123-
if err := json.Unmarshal(b, &res); err != nil {
124-
return 0, fmt.Errorf("error unmarshaling result: %w, '%s'", err, string(b))
95+
return 0, fmt.Errorf("error querying external metrics API: %w", err)
12596
}
12697

127-
if len(res.Items) < 1 {
128-
return 0, fmt.Errorf("invalid response: %s: %w", string(b), ErrNoValuesFound)
98+
if len(metricsList.Items) < 1 {
99+
return 0, fmt.Errorf("no external metrics found: %w", ErrNoValuesFound)
129100
}
130101

131-
vs := res.Items[0].Value.AsApproximateFloat64()
102+
vs := metricsList.Items[0].Value.AsApproximateFloat64()
132103

133104
return vs, nil
134105
}
135106

136-
// IsOnline will only check the TCP endpoint reachability,
137-
// given that external metric servers don't have a standard health check endpoint defined
107+
// IsOnline tests that the External Metrics API is reachable by looking for dummy metrics.
108+
// If we don't get a network error, we assume the service is online.
138109
func (p *ExternalMetricsProvider) IsOnline() (bool, error) {
139-
var d net.Dialer
110+
nm := p.client.NamespacedMetrics("kube-system")
111+
_, err := nm.List("dummy-metric", labels.Everything())
140112

141-
ctx, cancel := context.WithTimeout(context.Background(), p.timeout)
142-
defer cancel()
143-
144-
u, err := url.Parse(p.metricServiceEndpoint)
145113
if err != nil {
146-
return false, fmt.Errorf("error parsing metric service url: %w", err)
114+
return false, fmt.Errorf("external metrics service unavailable: %w", err)
147115
}
116+
return true, nil
117+
}
148118

149-
conn, err := d.DialContext(ctx, "tcp", u.Host)
150-
defer conn.Close()
119+
// parseExternalMetricsQuery parses a query string in the format:
120+
// <namespace>/<metricName>?labelSelector=<urlencoded label selectors>
121+
// where only the metricName is required.
122+
// and returns the namespace, metricName, and labelSelector separately.
123+
func parseExternalMetricsQuery(query string) (namespace string, metricName string, labelSelector labels.Selector, err error) {
124+
u, err := url.Parse("dummy:///" + query)
151125
if err != nil {
152-
return false, fmt.Errorf("connection failed: %w", err)
126+
return "", "", labels.Everything(), fmt.Errorf("malformed query string, expected <namespace>/<metricName>?labelSelector=<urlencoded label selectors>, got %s", query)
153127
}
154-
return true, err
128+
path := strings.TrimPrefix(u.Path, "/")
129+
parts := strings.Split(path, "/")
130+
if len(parts) > 2 {
131+
return "", "", labels.Everything(), fmt.Errorf("malformed query string, too many slashes, expected <namespace>/<metricName>?labelSelector=<urlencoded label selectors>, got %s", query)
132+
}
133+
134+
namespace = "default"
135+
switch len(parts) {
136+
case 1:
137+
// Format: "metric"
138+
metricName = parts[0]
139+
case 2:
140+
// Format: "namespace/metric" or "/metric"
141+
if parts[0] != "" {
142+
namespace = parts[0]
143+
}
144+
metricName = parts[1]
145+
}
146+
if metricName == "" {
147+
return "", "", labels.Everything(), fmt.Errorf("metric name cannot be empty")
148+
}
149+
150+
qp := u.Query()
151+
rawSelector := qp.Get("labelSelector")
152+
if rawSelector == "" {
153+
labelSelector = labels.Everything()
154+
} else {
155+
labelSelector, err = labels.Parse(rawSelector)
156+
if err != nil {
157+
return "", "", labels.Everything(), fmt.Errorf("error parsing label selector from string %s: %w", rawSelector, err)
158+
}
159+
}
160+
161+
return namespace, metricName, labelSelector, nil
155162
}

0 commit comments

Comments
 (0)