Skip to content

Commit 6f258c8

Browse files
committed
app/vlagent/kubernetescollector: add support for dynamic token and certificates reloading
1 parent c93d114 commit 6f258c8

File tree

3 files changed

+70
-90
lines changed

3 files changed

+70
-90
lines changed

app/vlagent/kubernetescollector/client.go

Lines changed: 24 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,6 @@ package kubernetescollector
22

33
import (
44
"context"
5-
"crypto/tls"
6-
"crypto/x509"
75
"encoding/json"
86
"fmt"
97
"io"
@@ -13,14 +11,12 @@ import (
1311

1412
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httputil"
1513
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
14+
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
1615
)
1716

1817
type kubeAPIConfig struct {
19-
Server string
20-
BearerToken string
21-
ClientCert []byte
22-
ClientCertKey []byte
23-
GetCACert func() (*x509.CertPool, error)
18+
server string
19+
ac *promauth.Config
2420
}
2521

2622
type kubeAPIClient struct {
@@ -31,34 +27,17 @@ type kubeAPIClient struct {
3127
}
3228

3329
func newKubeAPIClient(cfg *kubeAPIConfig) (*kubeAPIClient, error) {
34-
certPool, err := cfg.GetCACert()
35-
if err != nil {
36-
return nil, fmt.Errorf("cannot get CA certificate: %w", err)
37-
}
38-
39-
var clientCerts []tls.Certificate
40-
if len(cfg.ClientCert) != 0 {
41-
cc, err := tls.X509KeyPair(cfg.ClientCert, cfg.ClientCertKey)
42-
if err != nil {
43-
return nil, fmt.Errorf("cannot load client certificate: %w", err)
44-
}
45-
clientCerts = append(clientCerts, cc)
46-
}
47-
4830
tr := httputil.NewTransport(false, "vlagent_kubernetescollector")
4931
tr.IdleConnTimeout = time.Minute
5032
tr.TLSHandshakeTimeout = time.Second * 15
51-
tr.TLSClientConfig.RootCAs = certPool
52-
tr.TLSClientConfig.Certificates = clientCerts
5333

54-
// todo: ca cert can be updated, so we need to reload it periodically
5534
c := &http.Client{
56-
Transport: tr,
35+
Transport: cfg.ac.NewRoundTripper(tr),
5736
}
5837

59-
apiURL, err := url.Parse(cfg.Server)
38+
apiURL, err := url.Parse(cfg.server)
6039
if err != nil {
61-
return nil, fmt.Errorf("cannot parse server URL %q: %w", cfg.Server, err)
40+
return nil, fmt.Errorf("cannot parse server URL %q: %w", cfg.server, err)
6241
}
6342

6443
return &kubeAPIClient{
@@ -98,7 +77,7 @@ func (c *kubeAPIClient) watchNodePods(ctx context.Context, nodeName, resourceVer
9877
}
9978

10079
req := c.mustCreateRequest(ctx, http.MethodGet, "/api/v1/pods", args)
101-
resp, err := c.c.Do(req)
80+
resp, err := c.sendRequest(req)
10281
if err != nil {
10382
return podWatchStream{}, fmt.Errorf("cannot do %q GET request: %w", req.URL.String(), err)
10483
}
@@ -234,7 +213,7 @@ func (c *kubeAPIClient) getNodePods(ctx context.Context, nodeName string) (podLi
234213
}
235214

236215
req := c.mustCreateRequest(ctx, http.MethodGet, "/api/v1/pods", args)
237-
resp, err := c.c.Do(req)
216+
resp, err := c.sendRequest(req)
238217
if err != nil {
239218
return podList{}, fmt.Errorf("cannot do %q GET request: %w", req.URL.String(), err)
240219
}
@@ -261,7 +240,7 @@ func (c *kubeAPIClient) getNodePods(ctx context.Context, nodeName string) (podLi
261240
// See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.26/#read-pod-v1-core
262241
func (c *kubeAPIClient) getPod(ctx context.Context, namespace, podName string) (pod, error) {
263242
req := c.mustCreateRequest(ctx, http.MethodGet, "/api/v1/namespaces/"+namespace+"/pods/"+podName, nil)
264-
resp, err := c.c.Do(req)
243+
resp, err := c.sendRequest(req)
265244
if err != nil {
266245
return pod{}, fmt.Errorf("cannot do /pods/<podName> request: %w", err)
267246
}
@@ -299,7 +278,7 @@ type node struct {
299278
// See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.26/#list-node-v1-core
300279
func (c *kubeAPIClient) getNodes(ctx context.Context) ([]string, error) {
301280
req := c.mustCreateRequest(ctx, http.MethodGet, "/api/v1/nodes", nil)
302-
resp, err := c.c.Do(req)
281+
resp, err := c.sendRequest(req)
303282
if err != nil {
304283
return nil, fmt.Errorf("cannot do %q GET request: %w", req.URL.String(), err)
305284
}
@@ -330,7 +309,7 @@ func (c *kubeAPIClient) getNodes(ctx context.Context) ([]string, error) {
330309
// See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.26/#read-node-v1-core
331310
func (c *kubeAPIClient) getNodeByName(ctx context.Context, nodeName string) (node, error) {
332311
req := c.mustCreateRequest(ctx, http.MethodGet, "/api/v1/nodes/"+nodeName, nil)
333-
resp, err := c.c.Do(req)
312+
resp, err := c.sendRequest(req)
334313
if err != nil {
335314
return node{}, fmt.Errorf("cannot do %q GET request: %w", req.URL.String(), err)
336315
}
@@ -355,14 +334,23 @@ func (c *kubeAPIClient) getNodeByName(ctx context.Context, nodeName string) (nod
355334
func (c *kubeAPIClient) mustCreateRequest(ctx context.Context, method, urlPath string, args url.Values) *http.Request {
356335
req, err := http.NewRequestWithContext(ctx, method, "/", nil)
357336
if err != nil {
358-
logger.Panicf("BUG: cannot create request: %w", err)
337+
logger.Panicf("BUG: cannot create request: %s", err)
359338
}
360339
u := *c.apiURL
361340
req.URL = &u
362341
req.URL.Path = urlPath
363342
req.URL.RawQuery = args.Encode()
364-
if c.config.BearerToken != "" {
365-
req.Header.Set("Authorization", "Bearer "+c.config.BearerToken)
366-
}
367343
return req
368344
}
345+
346+
func (c *kubeAPIClient) sendRequest(req *http.Request) (*http.Response, error) {
347+
if err := c.config.ac.SetHeaders(req, true); err != nil {
348+
return nil, err
349+
}
350+
351+
resp, err := c.c.Do(req)
352+
if err != nil {
353+
return nil, err
354+
}
355+
return resp, nil
356+
}

app/vlagent/kubernetescollector/client_config.go

Lines changed: 45 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,13 @@
11
package kubernetescollector
22

33
import (
4-
"crypto/x509"
54
"encoding/base64"
65
"fmt"
76
"net"
87
"os"
98
"path/filepath"
109

10+
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
1111
"gopkg.in/yaml.v2"
1212
)
1313

@@ -36,26 +36,27 @@ func loadInClusterConfig() (*kubeAPIConfig, error) {
3636
return nil, fmt.Errorf("KUBERNETES_SERVICE_HOST/KUBERNETES_SERVICE_PORT environment variables are not set")
3737
}
3838

39-
token, err := os.ReadFile("/var/run/secrets/kubernetes.io/serviceaccount/token")
39+
const bearerTokenFile = "/var/run/secrets/kubernetes.io/serviceaccount/token"
40+
// Verify that vlagent is running in a Kubernetes cluster.
41+
if _, err := os.Stat(bearerTokenFile); err != nil {
42+
return nil, err
43+
}
44+
45+
opts := &promauth.Options{
46+
BearerTokenFile: bearerTokenFile,
47+
TLSConfig: &promauth.TLSConfig{
48+
CAFile: "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt",
49+
},
50+
}
51+
ac, err := opts.NewConfig()
4052
if err != nil {
41-
return nil, fmt.Errorf("cannot read in-cluster token: %w", err)
53+
return nil, fmt.Errorf("cannot initialize in-cluster auth config: %w", err)
4254
}
4355

56+
server := "https://" + net.JoinHostPort(host, port)
4457
return &kubeAPIConfig{
45-
Server: "https://" + net.JoinHostPort(host, port),
46-
BearerToken: string(token),
47-
GetCACert: func() (*x509.CertPool, error) {
48-
certs, err := os.ReadFile("/var/run/secrets/kubernetes.io/serviceaccount/ca.crt")
49-
if err != nil {
50-
return nil, fmt.Errorf("cannot read root CA: %w", err)
51-
}
52-
53-
roots := x509.NewCertPool()
54-
if !roots.AppendCertsFromPEM(certs) {
55-
return nil, fmt.Errorf("cannot parse PEM encoded certificates")
56-
}
57-
return roots, nil
58-
},
58+
server: server,
59+
ac: ac,
5960
}, nil
6061
}
6162

@@ -136,7 +137,7 @@ func loadLocalConfig() (*kubeAPIConfig, error) {
136137

137138
rawConfig, err := os.ReadFile(configPath)
138139
if err != nil {
139-
return nil, fmt.Errorf("cannot read %q: %w", configPath, err)
140+
return nil, err
140141
}
141142

142143
var cfg kubeConfig
@@ -154,64 +155,54 @@ func loadLocalConfig() (*kubeAPIConfig, error) {
154155
return nil, fmt.Errorf("cannot find cluster %q in %q", cctx.Context.Cluster, configPath)
155156
}
156157

157-
var ca []byte
158+
tlsCfg := promauth.TLSConfig{}
159+
158160
if cl.Cluster.CertificateAuthority != "" {
159-
ca, err = os.ReadFile(cl.Cluster.CertificateAuthority)
160-
if err != nil {
161-
return nil, fmt.Errorf("cannot read cluster certificate authority: %w", err)
162-
}
161+
tlsCfg.CAFile = cl.Cluster.CertificateAuthority
163162
} else if cl.Cluster.CertificateAuthorityData != "" {
164-
ca, err = base64.StdEncoding.AppendDecode(nil, []byte(cl.Cluster.CertificateAuthorityData))
163+
ca, err := base64.StdEncoding.DecodeString(cl.Cluster.CertificateAuthorityData)
165164
if err != nil {
166-
return nil, fmt.Errorf("cannot decode base64 encoded CA certificate data: %w", err)
165+
return nil, fmt.Errorf("cannot decode base64 encoded CA certificate data from file %q: %w", configPath, err)
167166
}
167+
tlsCfg.CA = string(ca)
168168
}
169169

170170
u, ok := cfg.findUser(cctx.Context.User)
171171
if !ok {
172-
return nil, fmt.Errorf("cannot find user %q in %q", cctx.Context.User, configPath)
172+
return nil, fmt.Errorf("cannot find current user %q in %q", cctx.Context.User, configPath)
173173
}
174174

175-
var clientCert []byte
176175
if u.User.ClientCertificate != "" {
177-
clientCert, err = os.ReadFile(u.User.ClientCertificate)
178-
if err != nil {
179-
return nil, fmt.Errorf("cannot read client certificate from %q: %w", u.User.ClientCertificate, err)
180-
}
176+
tlsCfg.CertFile = u.User.ClientCertificate
181177
} else if u.User.ClientCertificateData != "" {
182-
clientCert, err = base64.StdEncoding.AppendDecode(nil, []byte(u.User.ClientCertificateData))
178+
clientCert, err := base64.StdEncoding.DecodeString(u.User.ClientCertificateData)
183179
if err != nil {
184-
return nil, fmt.Errorf("cannot decode base64 encoded client certificate data: %w", err)
180+
return nil, fmt.Errorf("cannot decode base64 encoded client certificate data from file %q: %w", configPath, err)
185181
}
182+
tlsCfg.Cert = string(clientCert)
186183
}
187184

188-
var clientCertKey []byte
189185
if u.User.ClientKey != "" {
190-
clientCertKey, err = os.ReadFile(u.User.ClientKey)
191-
if err != nil {
192-
return nil, fmt.Errorf("cannot read client key from %q: %w", u.User.ClientKey, err)
193-
}
186+
tlsCfg.KeyFile = u.User.ClientKey
194187
} else if u.User.ClientKeyData != "" {
195-
clientCertKey, err = base64.StdEncoding.AppendDecode(nil, []byte(u.User.ClientKeyData))
188+
clientCertKey, err := base64.StdEncoding.DecodeString(u.User.ClientKeyData)
196189
if err != nil {
197-
return nil, fmt.Errorf("cannot decode base64 encoded client certificate key data: %w", err)
190+
return nil, fmt.Errorf("cannot decode base64 encoded client certificate key data from file %q: %w", configPath, err)
198191
}
192+
tlsCfg.Key = string(clientCertKey)
193+
}
194+
195+
opts := &promauth.Options{
196+
BearerToken: u.User.Token,
197+
TLSConfig: &tlsCfg,
198+
}
199+
ac, err := opts.NewConfig()
200+
if err != nil {
201+
return nil, fmt.Errorf("cannot initialize local auth config from file %q: %w", configPath, err)
199202
}
200203

201204
return &kubeAPIConfig{
202-
Server: cl.Cluster.Server,
203-
BearerToken: u.User.Token,
204-
ClientCert: clientCert,
205-
ClientCertKey: clientCertKey,
206-
GetCACert: func() (*x509.CertPool, error) {
207-
if len(ca) == 0 {
208-
return nil, nil
209-
}
210-
roots := x509.NewCertPool()
211-
if !roots.AppendCertsFromPEM(ca) {
212-
return nil, fmt.Errorf("cannot parse root CA for %q cluster from %q for user %q; no certs fetched", cl, configPath, cctx.Context.User)
213-
}
214-
return roots, nil
215-
},
205+
server: cl.Cluster.Server,
206+
ac: ac,
216207
}, nil
217208
}

docs/victorialogs/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ according to the following docs:
3232
* BUGFIX: [LogsQL](https://docs.victoriametrics.com/victorialogs/logsql/): use local time zone for the VictoriaLogs server when the [`day_range`](https://docs.victoriametrics.com/victorialogs/logsql/#day-range-filter) or [`week_range`](https://docs.victoriametrics.com/victorialogs/logsql/#week-range-filter) filter doesn't contain explicitly specified `offset ...` suffix. This aligns with the behaviour when the timezone information is missing in the [`_time` filter](https://docs.victoriametrics.com/victorialogs/logsql/#time-filter).
3333
* BUGFIX: [web UI](https://docs.victoriametrics.com/victorialogs/querying/#web-ui): fix bars width calculation and visual misalignment relative to time axis. See [#900](https://github.com/VictoriaMetrics/VictoriaLogs/issues/900).
3434
* BUGFIX: [metrics](https://docs.victoriametrics.com/victorialogs/metrics/): fix `vl_http_errors_total{path="..."}` metric name mismatch for `/internal/select/*` endpoints (it was exposed as `vl_http_request_errors_total{path="..."}`). See [#1005](https://github.com/VictoriaMetrics/VictoriaLogs/pull/1005).
35+
* BUGFIX: [Kubernetes Collector](https://docs.victoriametrics.com/victorialogs/vlagent/#collect-kubernetes-pod-logs): add support for dynamic token and certificates reloading. Previously, vlagent only read credentials at startup, which led to authentication errors after token rotation. See [#995](https://github.com/VictoriaMetrics/VictoriaLogs/issues/995).
3536

3637
## [v1.43.1](https://github.com/VictoriaMetrics/VictoriaLogs/releases/tag/v1.43.1)
3738

0 commit comments

Comments
 (0)