Skip to content

Commit 124fe29

Browse files
committed
Pass the context to the Venafi clients
Signed-off-by: Richard Wall <[email protected]>
1 parent eb3c30a commit 124fe29

File tree

7 files changed

+49
-51
lines changed

7 files changed

+49
-51
lines changed

hack/e2e/values.venafi-kubernetes-agent.yaml

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,5 +9,7 @@ authentication:
99
enabled: true
1010

1111
extraArgs:
12-
- --logging-format=json
13-
- --log-level=2
12+
- --logging-format=text
13+
# Show trace logs for the venafi-connection-lib client
14+
# See https://github.com/jetstack/venafi-connection-lib/blob/13c2342fe0140ff084d2aabfd29ae3d10721691b/internal/http_client/metrics_transport.go#L93-L115
15+
- --vmodule=metrics_transport=6

pkg/agent/run.go

Lines changed: 3 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -231,8 +231,6 @@ func Run(cmd *cobra.Command, args []string) (returnErr error) {
231231
// If any of the go routines exit (with nil or error) the main context will
232232
// be cancelled, which will cause this blocking loop to exit
233233
// instead of waiting for the time period.
234-
// TODO(wallrj): Pass a context to gatherAndOutputData, so that we don't
235-
// have to wait for it to finish before exiting the process.
236234
for {
237235
if err := gatherAndOutputData(klog.NewContext(ctx, log), eventf, config, preflightClient, dataGatherers); err != nil {
238236
return err
@@ -397,9 +395,7 @@ func postData(ctx context.Context, config CombinedConfig, preflightClient client
397395

398396
if config.AuthMode == VenafiCloudKeypair || config.AuthMode == VenafiCloudVenafiConnection {
399397
// orgID and clusterID are not required for Venafi Cloud auth
400-
// TODO(wallrj): Pass the context to PostDataReadingsWithOptions, so
401-
// that its network operations can be cancelled.
402-
err := preflightClient.PostDataReadingsWithOptions(readings, client.Options{
398+
err := preflightClient.PostDataReadingsWithOptions(ctx, readings, client.Options{
403399
ClusterName: config.ClusterID,
404400
ClusterDescription: config.ClusterDescription,
405401
})
@@ -427,10 +423,7 @@ func postData(ctx context.Context, config CombinedConfig, preflightClient client
427423
if path == "" {
428424
path = "/api/v1/datareadings"
429425
}
430-
// TODO(wallrj): Pass the context to Post, so that its network
431-
// operations can be cancelled.
432-
res, err := preflightClient.Post(path, bytes.NewBuffer(data))
433-
426+
res, err := preflightClient.Post(ctx, path, bytes.NewBuffer(data))
434427
if err != nil {
435428
return fmt.Errorf("failed to post data: %+v", err)
436429
}
@@ -453,9 +446,7 @@ func postData(ctx context.Context, config CombinedConfig, preflightClient client
453446
return fmt.Errorf("post to server failed: missing clusterID from agent configuration")
454447
}
455448

456-
// TODO(wallrj): Pass the context to PostDataReadings, so
457-
// that its network operations can be cancelled.
458-
err := preflightClient.PostDataReadings(config.OrganizationID, config.ClusterID, readings)
449+
err := preflightClient.PostDataReadings(ctx, config.OrganizationID, config.ClusterID, readings)
459450
if err != nil {
460451
return fmt.Errorf("post to server failed: %+v", err)
461452
}

pkg/client/client.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package client
22

33
import (
4+
"context"
45
"fmt"
56
"io"
67
"net/http"
@@ -29,9 +30,9 @@ type (
2930

3031
// The Client interface describes types that perform requests against the Jetstack Secure backend.
3132
Client interface {
32-
PostDataReadings(orgID, clusterID string, readings []*api.DataReading) error
33-
PostDataReadingsWithOptions(readings []*api.DataReading, options Options) error
34-
Post(path string, body io.Reader) (*http.Response, error)
33+
PostDataReadings(ctx context.Context, orgID, clusterID string, readings []*api.DataReading) error
34+
PostDataReadingsWithOptions(ctx context.Context, readings []*api.DataReading, options Options) error
35+
Post(ctx context.Context, path string, body io.Reader) (*http.Response, error)
3536
}
3637

3738
// The Credentials interface describes methods for credential types to implement for verification.

pkg/client/client_api_token.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package client
22

33
import (
44
"bytes"
5+
"context"
56
"encoding/json"
67
"fmt"
78
"io"
@@ -40,13 +41,13 @@ func NewAPITokenClient(agentMetadata *api.AgentMetadata, apiToken, baseURL strin
4041

4142
// PostDataReadingsWithOptions uploads the slice of api.DataReading to the Jetstack Secure backend to be processed for later
4243
// viewing in the user-interface.
43-
func (c *APITokenClient) PostDataReadingsWithOptions(readings []*api.DataReading, opts Options) error {
44-
return c.PostDataReadings(opts.OrgID, opts.ClusterID, readings)
44+
func (c *APITokenClient) PostDataReadingsWithOptions(ctx context.Context, readings []*api.DataReading, opts Options) error {
45+
return c.PostDataReadings(ctx, opts.OrgID, opts.ClusterID, readings)
4546
}
4647

4748
// PostDataReadings uploads the slice of api.DataReading to the Jetstack Secure backend to be processed for later
4849
// viewing in the user-interface.
49-
func (c *APITokenClient) PostDataReadings(orgID, clusterID string, readings []*api.DataReading) error {
50+
func (c *APITokenClient) PostDataReadings(ctx context.Context, orgID, clusterID string, readings []*api.DataReading) error {
5051
payload := api.DataReadingsPost{
5152
AgentMetadata: c.agentMetadata,
5253
DataGatherTime: time.Now().UTC(),
@@ -57,7 +58,7 @@ func (c *APITokenClient) PostDataReadings(orgID, clusterID string, readings []*a
5758
return err
5859
}
5960

60-
res, err := c.Post(filepath.Join("/api/v1/org", orgID, "datareadings", clusterID), bytes.NewBuffer(data))
61+
res, err := c.Post(ctx, filepath.Join("/api/v1/org", orgID, "datareadings", clusterID), bytes.NewBuffer(data))
6162
if err != nil {
6263
return err
6364
}
@@ -77,8 +78,8 @@ func (c *APITokenClient) PostDataReadings(orgID, clusterID string, readings []*a
7778
}
7879

7980
// Post performs an HTTP POST request.
80-
func (c *APITokenClient) Post(path string, body io.Reader) (*http.Response, error) {
81-
req, err := http.NewRequest(http.MethodPost, fullURL(c.baseURL, path), body)
81+
func (c *APITokenClient) Post(ctx context.Context, path string, body io.Reader) (*http.Response, error) {
82+
req, err := http.NewRequestWithContext(ctx, http.MethodPost, fullURL(c.baseURL, path), body)
8283
if err != nil {
8384
return nil, err
8485
}

pkg/client/client_oauth.go

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package client
22

33
import (
44
"bytes"
5+
"context"
56
"encoding/json"
67
"fmt"
78
"io"
@@ -97,13 +98,13 @@ func NewOAuthClient(agentMetadata *api.AgentMetadata, credentials *OAuthCredenti
9798
}, nil
9899
}
99100

100-
func (c *OAuthClient) PostDataReadingsWithOptions(readings []*api.DataReading, opts Options) error {
101-
return c.PostDataReadings(opts.OrgID, opts.ClusterID, readings)
101+
func (c *OAuthClient) PostDataReadingsWithOptions(ctx context.Context, readings []*api.DataReading, opts Options) error {
102+
return c.PostDataReadings(ctx, opts.OrgID, opts.ClusterID, readings)
102103
}
103104

104105
// PostDataReadings uploads the slice of api.DataReading to the Jetstack Secure backend to be processed for later
105106
// viewing in the user-interface.
106-
func (c *OAuthClient) PostDataReadings(orgID, clusterID string, readings []*api.DataReading) error {
107+
func (c *OAuthClient) PostDataReadings(ctx context.Context, orgID, clusterID string, readings []*api.DataReading) error {
107108
payload := api.DataReadingsPost{
108109
AgentMetadata: c.agentMetadata,
109110
DataGatherTime: time.Now().UTC(),
@@ -114,7 +115,7 @@ func (c *OAuthClient) PostDataReadings(orgID, clusterID string, readings []*api.
114115
return err
115116
}
116117

117-
res, err := c.Post(filepath.Join("/api/v1/org", orgID, "datareadings", clusterID), bytes.NewBuffer(data))
118+
res, err := c.Post(ctx, filepath.Join("/api/v1/org", orgID, "datareadings", clusterID), bytes.NewBuffer(data))
118119
if err != nil {
119120
return err
120121
}
@@ -134,13 +135,13 @@ func (c *OAuthClient) PostDataReadings(orgID, clusterID string, readings []*api.
134135
}
135136

136137
// Post performs an HTTP POST request.
137-
func (c *OAuthClient) Post(path string, body io.Reader) (*http.Response, error) {
138-
token, err := c.getValidAccessToken()
138+
func (c *OAuthClient) Post(ctx context.Context, path string, body io.Reader) (*http.Response, error) {
139+
token, err := c.getValidAccessToken(ctx)
139140
if err != nil {
140141
return nil, err
141142
}
142143

143-
req, err := http.NewRequest(http.MethodPost, fullURL(c.baseURL, path), body)
144+
req, err := http.NewRequestWithContext(ctx, http.MethodPost, fullURL(c.baseURL, path), body)
144145
if err != nil {
145146
return nil, err
146147
}
@@ -157,9 +158,9 @@ func (c *OAuthClient) Post(path string, body io.Reader) (*http.Response, error)
157158
// getValidAccessToken returns a valid access token. It will fetch a new access
158159
// token from the auth server in case the current access token does not exist
159160
// or it is expired.
160-
func (c *OAuthClient) getValidAccessToken() (*accessToken, error) {
161+
func (c *OAuthClient) getValidAccessToken(ctx context.Context) (*accessToken, error) {
161162
if c.accessToken.needsRenew() {
162-
err := c.renewAccessToken()
163+
err := c.renewAccessToken(ctx)
163164
if err != nil {
164165
return nil, err
165166
}
@@ -168,7 +169,7 @@ func (c *OAuthClient) getValidAccessToken() (*accessToken, error) {
168169
return c.accessToken, nil
169170
}
170171

171-
func (c *OAuthClient) renewAccessToken() error {
172+
func (c *OAuthClient) renewAccessToken(ctx context.Context) error {
172173
tokenURL := fmt.Sprintf("https://%s/oauth/token", c.credentials.AuthServerDomain)
173174
audience := "https://preflight.jetstack.io/api/v1"
174175
payload := url.Values{}
@@ -178,7 +179,7 @@ func (c *OAuthClient) renewAccessToken() error {
178179
payload.Set("audience", audience)
179180
payload.Set("username", c.credentials.UserID)
180181
payload.Set("password", c.credentials.UserSecret)
181-
req, err := http.NewRequest("POST", tokenURL, strings.NewReader(payload.Encode()))
182+
req, err := http.NewRequestWithContext(ctx, "POST", tokenURL, strings.NewReader(payload.Encode()))
182183
if err != nil {
183184
return errors.WithStack(err)
184185
}
@@ -188,7 +189,8 @@ func (c *OAuthClient) renewAccessToken() error {
188189
if err != nil {
189190
return errors.WithStack(err)
190191
}
191-
192+
// TODO(wallrj): This will block. Read the body incrementally and check for
193+
// context cancellation.
192194
body, err := io.ReadAll(res.Body)
193195
if err != nil {
194196
return errors.WithStack(err)

pkg/client/client_venafi_cloud.go

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package client
33
import (
44
"bytes"
55
"compress/gzip"
6+
"context"
67
"crypto"
78
"crypto/ecdsa"
89
"crypto/ed25519"
@@ -172,7 +173,7 @@ func (c *VenafiSvcAccountCredentials) IsClientSet() (ok bool, why string) {
172173

173174
// PostDataReadingsWithOptions uploads the slice of api.DataReading to the Venafi Cloud backend to be processed.
174175
// The Options are then passed as URL params in the request
175-
func (c *VenafiCloudClient) PostDataReadingsWithOptions(readings []*api.DataReading, opts Options) error {
176+
func (c *VenafiCloudClient) PostDataReadingsWithOptions(ctx context.Context, readings []*api.DataReading, opts Options) error {
176177
payload := api.DataReadingsPost{
177178
AgentMetadata: c.agentMetadata,
178179
DataGatherTime: time.Now().UTC(),
@@ -203,7 +204,7 @@ func (c *VenafiCloudClient) PostDataReadingsWithOptions(readings []*api.DataRead
203204
}
204205
venafiCloudUploadURL.RawQuery = query.Encode()
205206

206-
res, err := c.Post(venafiCloudUploadURL.String(), bytes.NewBuffer(data))
207+
res, err := c.Post(ctx, venafiCloudUploadURL.String(), bytes.NewBuffer(data))
207208
if err != nil {
208209
return err
209210
}
@@ -223,7 +224,7 @@ func (c *VenafiCloudClient) PostDataReadingsWithOptions(readings []*api.DataRead
223224

224225
// PostDataReadings uploads the slice of api.DataReading to the Venafi Cloud backend to be processed for later
225226
// viewing in the user-interface.
226-
func (c *VenafiCloudClient) PostDataReadings(_ string, _ string, readings []*api.DataReading) error {
227+
func (c *VenafiCloudClient) PostDataReadings(ctx context.Context, _ string, _ string, readings []*api.DataReading) error {
227228
// orgID and clusterID are ignored in Venafi Cloud auth
228229

229230
payload := api.DataReadingsPost{
@@ -239,7 +240,7 @@ func (c *VenafiCloudClient) PostDataReadings(_ string, _ string, readings []*api
239240
if !strings.HasSuffix(c.uploadPath, "/") {
240241
c.uploadPath = fmt.Sprintf("%s/", c.uploadPath)
241242
}
242-
res, err := c.Post(filepath.Join(c.uploadPath, c.uploaderID), bytes.NewBuffer(data))
243+
res, err := c.Post(ctx, filepath.Join(c.uploadPath, c.uploaderID), bytes.NewBuffer(data))
243244
if err != nil {
244245
return err
245246
}
@@ -258,8 +259,8 @@ func (c *VenafiCloudClient) PostDataReadings(_ string, _ string, readings []*api
258259
}
259260

260261
// Post performs an HTTP POST request.
261-
func (c *VenafiCloudClient) Post(path string, body io.Reader) (*http.Response, error) {
262-
token, err := c.getValidAccessToken()
262+
func (c *VenafiCloudClient) Post(ctx context.Context, path string, body io.Reader) (*http.Response, error) {
263+
token, err := c.getValidAccessToken(ctx)
263264
if err != nil {
264265
return nil, err
265266
}
@@ -280,7 +281,7 @@ func (c *VenafiCloudClient) Post(path string, body io.Reader) (*http.Response, e
280281
encodedBody = compressed
281282
}
282283

283-
req, err := http.NewRequest(http.MethodPost, fullURL(c.baseURL, path), encodedBody)
284+
req, err := http.NewRequestWithContext(ctx, http.MethodPost, fullURL(c.baseURL, path), encodedBody)
284285
if err != nil {
285286
return nil, err
286287
}
@@ -310,9 +311,9 @@ func (c *VenafiCloudClient) Post(path string, body io.Reader) (*http.Response, e
310311
// getValidAccessToken returns a valid access token. It will fetch a new access
311312
// token from the auth server in case the current access token does not exist
312313
// or it is expired.
313-
func (c *VenafiCloudClient) getValidAccessToken() (*venafiCloudAccessToken, error) {
314+
func (c *VenafiCloudClient) getValidAccessToken(ctx context.Context) (*venafiCloudAccessToken, error) {
314315
if c.accessToken == nil || time.Now().Add(time.Minute).After(c.accessToken.expirationTime) {
315-
err := c.updateAccessToken()
316+
err := c.updateAccessToken(ctx)
316317
if err != nil {
317318
return nil, err
318319
}
@@ -321,7 +322,7 @@ func (c *VenafiCloudClient) getValidAccessToken() (*venafiCloudAccessToken, erro
321322
return c.accessToken, nil
322323
}
323324

324-
func (c *VenafiCloudClient) updateAccessToken() error {
325+
func (c *VenafiCloudClient) updateAccessToken(ctx context.Context) error {
325326
jwtToken, err := c.generateAndSignJwtToken()
326327
if err != nil {
327328
return err
@@ -334,7 +335,7 @@ func (c *VenafiCloudClient) updateAccessToken() error {
334335
tokenURL := fullURL(c.baseURL, accessTokenEndpoint)
335336

336337
encoded := values.Encode()
337-
request, err := http.NewRequest(http.MethodPost, tokenURL, strings.NewReader(encoded))
338+
request, err := http.NewRequestWithContext(ctx, http.MethodPost, tokenURL, strings.NewReader(encoded))
338339
if err != nil {
339340
return err
340341
}

pkg/client/client_venconn.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -127,12 +127,12 @@ func (c *VenConnClient) Start(ctx context.Context) error {
127127

128128
// `opts.ClusterName` and `opts.ClusterDescription` are the only values used
129129
// from the Options struct. OrgID and ClusterID are not used in Venafi Cloud.
130-
func (c *VenConnClient) PostDataReadingsWithOptions(readings []*api.DataReading, opts Options) error {
130+
func (c *VenConnClient) PostDataReadingsWithOptions(ctx context.Context, readings []*api.DataReading, opts Options) error {
131131
if opts.ClusterName == "" {
132132
return fmt.Errorf("programmer mistake: the cluster name (aka `cluster_id` in the config file) cannot be left empty")
133133
}
134134

135-
_, token, err := c.connHandler.Get(context.Background(), c.installNS, auth.Scope{}, types.NamespacedName{Name: c.venConnName, Namespace: c.venConnNS})
135+
_, token, err := c.connHandler.Get(ctx, c.installNS, auth.Scope{}, types.NamespacedName{Name: c.venConnName, Namespace: c.venConnNS})
136136
if err != nil {
137137
return fmt.Errorf("while loading the VenafiConnection %s/%s: %w", c.venConnNS, c.venConnName, err)
138138
}
@@ -176,7 +176,7 @@ func (c *VenConnClient) PostDataReadingsWithOptions(readings []*api.DataReading,
176176
// The path parameter "no" is a dummy parameter to make the Venafi Cloud
177177
// backend happy. This parameter, named `uploaderID` in the backend, is not
178178
// actually used by the backend.
179-
req, err := http.NewRequest(http.MethodPost, fullURL(token.BaseURL, "/v1/tlspk/upload/clusterdata/no"), encodedBody)
179+
req, err := http.NewRequestWithContext(ctx, http.MethodPost, fullURL(token.BaseURL, "/v1/tlspk/upload/clusterdata/no"), encodedBody)
180180
if err != nil {
181181
return err
182182
}
@@ -233,13 +233,13 @@ func (c *VenConnClient) PostDataReadingsWithOptions(readings []*api.DataReading,
233233
// Cloud needs a `clusterName` and `clusterDescription`, but this function can
234234
// only pass `orgID` and `clusterID` which are both useless in Venafi Cloud. Use
235235
// PostDataReadingsWithOptions instead.
236-
func (c *VenConnClient) PostDataReadings(_orgID, _clusterID string, readings []*api.DataReading) error {
236+
func (c *VenConnClient) PostDataReadings(_ context.Context, _orgID, _clusterID string, readings []*api.DataReading) error {
237237
return fmt.Errorf("programmer mistake: PostDataReadings is not implemented for Venafi Cloud")
238238
}
239239

240240
// Post isn't implemented for Venafi Cloud because /v1/tlspk/upload/clusterdata
241241
// requires using the query parameters `name` and `description` which can't be
242242
// set using Post. Use PostDataReadingsWithOptions instead.
243-
func (c *VenConnClient) Post(path string, body io.Reader) (*http.Response, error) {
243+
func (c *VenConnClient) Post(_ context.Context, path string, body io.Reader) (*http.Response, error) {
244244
return nil, fmt.Errorf("programmer mistake: Post is not implemented for Venafi Cloud")
245245
}

0 commit comments

Comments
 (0)