Skip to content

Commit 80ade1c

Browse files
committed
add support for dynamic token and certificates reloading
1 parent 5af150a commit 80ade1c

File tree

3 files changed

+71
-90
lines changed

3 files changed

+71
-90
lines changed

app/vlagent/kubernetescollector/client.go

Lines changed: 24 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,6 @@ package kubernetescollector
22

33
import (
44
"context"
5-
"crypto/tls"
6-
"crypto/x509"
75
"encoding/json"
86
"fmt"
97
"io"
@@ -13,14 +11,12 @@ import (
1311

1412
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httputil"
1513
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
14+
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
1615
)
1716

1817
type kubeAPIConfig struct {
19-
Server string
20-
BearerToken string
21-
ClientCert []byte
22-
ClientCertKey []byte
23-
GetCACert func() (*x509.CertPool, error)
18+
server string
19+
ac *promauth.Config
2420
}
2521

2622
type kubeAPIClient struct {
@@ -31,34 +27,17 @@ type kubeAPIClient struct {
3127
}
3228

3329
func newKubeAPIClient(cfg *kubeAPIConfig) (*kubeAPIClient, error) {
34-
certPool, err := cfg.GetCACert()
35-
if err != nil {
36-
return nil, fmt.Errorf("cannot get CA certificate: %w", err)
37-
}
38-
39-
var clientCerts []tls.Certificate
40-
if len(cfg.ClientCert) != 0 {
41-
cc, err := tls.X509KeyPair(cfg.ClientCert, cfg.ClientCertKey)
42-
if err != nil {
43-
return nil, fmt.Errorf("cannot load client certificate: %w", err)
44-
}
45-
clientCerts = append(clientCerts, cc)
46-
}
47-
4830
tr := httputil.NewTransport(false, "vlagent_kubernetescollector")
4931
tr.IdleConnTimeout = time.Minute
5032
tr.TLSHandshakeTimeout = time.Second * 15
51-
tr.TLSClientConfig.RootCAs = certPool
52-
tr.TLSClientConfig.Certificates = clientCerts
5333

54-
// todo: ca cert can be updated, so we need to reload it periodically
5534
c := &http.Client{
56-
Transport: tr,
35+
Transport: cfg.ac.NewRoundTripper(tr),
5736
}
5837

59-
apiURL, err := url.Parse(cfg.Server)
38+
apiURL, err := url.Parse(cfg.server)
6039
if err != nil {
61-
return nil, fmt.Errorf("cannot parse server URL %q: %w", cfg.Server, err)
40+
return nil, fmt.Errorf("cannot parse server URL %q: %w", cfg.server, err)
6241
}
6342

6443
return &kubeAPIClient{
@@ -98,7 +77,7 @@ func (c *kubeAPIClient) watchNodePods(ctx context.Context, nodeName, resourceVer
9877
}
9978

10079
req := c.mustCreateRequest(ctx, http.MethodGet, "/api/v1/pods", args)
101-
resp, err := c.c.Do(req)
80+
resp, err := c.sendRequest(req)
10281
if err != nil {
10382
return podWatchStream{}, fmt.Errorf("cannot do %q GET request: %w", req.URL.String(), err)
10483
}
@@ -223,7 +202,7 @@ func (c *kubeAPIClient) getNodePods(ctx context.Context, nodeName string) (podLi
223202
}
224203

225204
req := c.mustCreateRequest(ctx, http.MethodGet, "/api/v1/pods", args)
226-
resp, err := c.c.Do(req)
205+
resp, err := c.sendRequest(req)
227206
if err != nil {
228207
return podList{}, fmt.Errorf("cannot do %q GET request: %w", req.URL.String(), err)
229208
}
@@ -250,7 +229,7 @@ func (c *kubeAPIClient) getNodePods(ctx context.Context, nodeName string) (podLi
250229
// See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.26/#read-pod-v1-core
251230
func (c *kubeAPIClient) getPod(ctx context.Context, namespace, podName string) (pod, error) {
252231
req := c.mustCreateRequest(ctx, http.MethodGet, "/api/v1/namespaces/"+namespace+"/pods/"+podName, nil)
253-
resp, err := c.c.Do(req)
232+
resp, err := c.sendRequest(req)
254233
if err != nil {
255234
return pod{}, fmt.Errorf("cannot do /pods/<podName> request: %w", err)
256235
}
@@ -290,7 +269,7 @@ type nodeList struct {
290269
// See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.26/#list-node-v1-core
291270
func (c *kubeAPIClient) getNodes(ctx context.Context) ([]string, error) {
292271
req := c.mustCreateRequest(ctx, http.MethodGet, "/api/v1/nodes", nil)
293-
resp, err := c.c.Do(req)
272+
resp, err := c.sendRequest(req)
294273
if err != nil {
295274
return nil, fmt.Errorf("cannot do %q GET request: %w", req.URL.String(), err)
296275
}
@@ -321,7 +300,7 @@ func (c *kubeAPIClient) getNodes(ctx context.Context) ([]string, error) {
321300
// See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.26/#read-node-v1-core
322301
func (c *kubeAPIClient) getNodeByName(ctx context.Context, nodeName string) (node, error) {
323302
req := c.mustCreateRequest(ctx, http.MethodGet, "/api/v1/nodes/"+nodeName, nil)
324-
resp, err := c.c.Do(req)
303+
resp, err := c.sendRequest(req)
325304
if err != nil {
326305
return node{}, fmt.Errorf("cannot do %q GET request: %w", req.URL.String(), err)
327306
}
@@ -346,14 +325,23 @@ func (c *kubeAPIClient) getNodeByName(ctx context.Context, nodeName string) (nod
346325
func (c *kubeAPIClient) mustCreateRequest(ctx context.Context, method, urlPath string, args url.Values) *http.Request {
347326
req, err := http.NewRequestWithContext(ctx, method, "/", nil)
348327
if err != nil {
349-
logger.Panicf("BUG: cannot create request: %w", err)
328+
logger.Panicf("BUG: cannot create request: %s", err)
350329
}
351330
u := *c.apiURL
352331
req.URL = &u
353332
req.URL.Path = urlPath
354333
req.URL.RawQuery = args.Encode()
355-
if c.config.BearerToken != "" {
356-
req.Header.Set("Authorization", "Bearer "+c.config.BearerToken)
357-
}
358334
return req
359335
}
336+
337+
func (c *kubeAPIClient) sendRequest(req *http.Request) (*http.Response, error) {
338+
if err := c.config.ac.SetHeaders(req, true); err != nil {
339+
return nil, err
340+
}
341+
342+
resp, err := c.c.Do(req)
343+
if err != nil {
344+
return nil, err
345+
}
346+
return resp, nil
347+
}

app/vlagent/kubernetescollector/client_config.go

Lines changed: 45 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,13 @@
11
package kubernetescollector
22

33
import (
4-
"crypto/x509"
54
"encoding/base64"
65
"fmt"
76
"net"
87
"os"
98
"path/filepath"
109

10+
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
1111
"gopkg.in/yaml.v2"
1212
)
1313

@@ -36,26 +36,27 @@ func loadInClusterConfig() (*kubeAPIConfig, error) {
3636
return nil, fmt.Errorf("KUBERNETES_SERVICE_HOST/KUBERNETES_SERVICE_PORT environment variables are not set")
3737
}
3838

39-
token, err := os.ReadFile("/var/run/secrets/kubernetes.io/serviceaccount/token")
39+
const bearerTokenFile = "/var/run/secrets/kubernetes.io/serviceaccount/token"
40+
// Verify that vlagent is running in a Kubernetes cluster.
41+
if _, err := os.Stat(bearerTokenFile); err != nil {
42+
return nil, err
43+
}
44+
45+
opts := &promauth.Options{
46+
BearerTokenFile: bearerTokenFile,
47+
TLSConfig: &promauth.TLSConfig{
48+
CAFile: "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt",
49+
},
50+
}
51+
ac, err := opts.NewConfig()
4052
if err != nil {
41-
return nil, fmt.Errorf("cannot read in-cluster token: %w", err)
53+
return nil, fmt.Errorf("cannot initialize in-cluster auth config: %w", err)
4254
}
4355

56+
server := "https://" + net.JoinHostPort(host, port)
4457
return &kubeAPIConfig{
45-
Server: "https://" + net.JoinHostPort(host, port),
46-
BearerToken: string(token),
47-
GetCACert: func() (*x509.CertPool, error) {
48-
certs, err := os.ReadFile("/var/run/secrets/kubernetes.io/serviceaccount/ca.crt")
49-
if err != nil {
50-
return nil, fmt.Errorf("cannot read root CA: %w", err)
51-
}
52-
53-
roots := x509.NewCertPool()
54-
if !roots.AppendCertsFromPEM(certs) {
55-
return nil, fmt.Errorf("cannot parse PEM encoded certificates")
56-
}
57-
return roots, nil
58-
},
58+
server: server,
59+
ac: ac,
5960
}, nil
6061
}
6162

@@ -136,7 +137,7 @@ func loadLocalConfig() (*kubeAPIConfig, error) {
136137

137138
rawConfig, err := os.ReadFile(configPath)
138139
if err != nil {
139-
return nil, fmt.Errorf("cannot read %q: %w", configPath, err)
140+
return nil, err
140141
}
141142

142143
var cfg kubeConfig
@@ -154,64 +155,54 @@ func loadLocalConfig() (*kubeAPIConfig, error) {
154155
return nil, fmt.Errorf("cannot find cluster %q in %q", cctx.Context.Cluster, configPath)
155156
}
156157

157-
var ca []byte
158+
tlsCfg := promauth.TLSConfig{}
159+
158160
if cl.Cluster.CertificateAuthority != "" {
159-
ca, err = os.ReadFile(cl.Cluster.CertificateAuthority)
160-
if err != nil {
161-
return nil, fmt.Errorf("cannot read cluster certificate authority: %w", err)
162-
}
161+
tlsCfg.CAFile = cl.Cluster.CertificateAuthority
163162
} else if cl.Cluster.CertificateAuthorityData != "" {
164-
ca, err = base64.StdEncoding.AppendDecode(nil, []byte(cl.Cluster.CertificateAuthorityData))
163+
ca, err := base64.StdEncoding.DecodeString(cl.Cluster.CertificateAuthorityData)
165164
if err != nil {
166-
return nil, fmt.Errorf("cannot decode base64 encoded CA certificate data: %w", err)
165+
return nil, fmt.Errorf("cannot decode base64 encoded CA certificate data from file %q: %w", configPath, err)
167166
}
167+
tlsCfg.CA = string(ca)
168168
}
169169

170170
u, ok := cfg.findUser(cctx.Context.User)
171171
if !ok {
172-
return nil, fmt.Errorf("cannot find user %q in %q", cctx.Context.User, configPath)
172+
return nil, fmt.Errorf("cannot find current user %q in %q", cctx.Context.User, configPath)
173173
}
174174

175-
var clientCert []byte
176175
if u.User.ClientCertificate != "" {
177-
clientCert, err = os.ReadFile(u.User.ClientCertificate)
178-
if err != nil {
179-
return nil, fmt.Errorf("cannot read client certificate from %q: %w", u.User.ClientCertificate, err)
180-
}
176+
tlsCfg.CertFile = u.User.ClientCertificate
181177
} else if u.User.ClientCertificateData != "" {
182-
clientCert, err = base64.StdEncoding.AppendDecode(nil, []byte(u.User.ClientCertificateData))
178+
clientCert, err := base64.StdEncoding.DecodeString(u.User.ClientCertificateData)
183179
if err != nil {
184-
return nil, fmt.Errorf("cannot decode base64 encoded client certificate data: %w", err)
180+
return nil, fmt.Errorf("cannot decode base64 encoded client certificate data from file %q: %w", configPath, err)
185181
}
182+
tlsCfg.Cert = string(clientCert)
186183
}
187184

188-
var clientCertKey []byte
189185
if u.User.ClientKey != "" {
190-
clientCertKey, err = os.ReadFile(u.User.ClientKey)
191-
if err != nil {
192-
return nil, fmt.Errorf("cannot read client key from %q: %w", u.User.ClientKey, err)
193-
}
186+
tlsCfg.KeyFile = u.User.ClientKey
194187
} else if u.User.ClientKeyData != "" {
195-
clientCertKey, err = base64.StdEncoding.AppendDecode(nil, []byte(u.User.ClientKeyData))
188+
clientCertKey, err := base64.StdEncoding.DecodeString(u.User.ClientKeyData)
196189
if err != nil {
197-
return nil, fmt.Errorf("cannot decode base64 encoded client certificate key data: %w", err)
190+
return nil, fmt.Errorf("cannot decode base64 encoded client certificate key data from file %q: %w", configPath, err)
198191
}
192+
tlsCfg.Key = string(clientCertKey)
193+
}
194+
195+
opts := &promauth.Options{
196+
BearerToken: u.User.Token,
197+
TLSConfig: &tlsCfg,
198+
}
199+
ac, err := opts.NewConfig()
200+
if err != nil {
201+
return nil, fmt.Errorf("cannot initialize local auth config from file %q: %w", configPath, err)
199202
}
200203

201204
return &kubeAPIConfig{
202-
Server: cl.Cluster.Server,
203-
BearerToken: u.User.Token,
204-
ClientCert: clientCert,
205-
ClientCertKey: clientCertKey,
206-
GetCACert: func() (*x509.CertPool, error) {
207-
if len(ca) == 0 {
208-
return nil, nil
209-
}
210-
roots := x509.NewCertPool()
211-
if !roots.AppendCertsFromPEM(ca) {
212-
return nil, fmt.Errorf("cannot parse root CA for %q cluster from %q for user %q; no certs fetched", cl, configPath, cctx.Context.User)
213-
}
214-
return roots, nil
215-
},
205+
server: cl.Cluster.Server,
206+
ac: ac,
216207
}, nil
217208
}

docs/victorialogs/CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ according to the following docs:
2121

2222
## tip
2323

24+
* FEATURE: [Kubernetes Collector](https://docs.victoriametrics.com/victorialogs/vlagent/#collect-kubernetes-pod-logs): add support for dynamic token and certificates reloading. Previously, vlagent only read credentials at startup, which led to authentication errors after token rotation. See [#995](https://github.com/VictoriaMetrics/VictoriaLogs/issues/995).
25+
2426
## [v1.43.1](https://github.com/VictoriaMetrics/VictoriaLogs/releases/tag/v1.43.1)
2527

2628
Released at 2025-12-26

0 commit comments

Comments
 (0)