Skip to content

Commit 80a3117

Browse files
committed
Pass the context to the Venafi clients
Signed-off-by: Richard Wall <[email protected]>
1 parent 24e02f3 commit 80a3117

File tree

7 files changed

+48
-50
lines changed

7 files changed

+48
-50
lines changed

hack/e2e/values.venafi-kubernetes-agent.yaml

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,5 +9,7 @@ authentication:
99
enabled: true
1010

1111
extraArgs:
12-
- --logging-format=json
13-
- --log-level=2
12+
- --logging-format=text
13+
# Show trace logs for the venafi-connection-lib client
14+
# See https://github.com/jetstack/venafi-connection-lib/blob/13c2342fe0140ff084d2aabfd29ae3d10721691b/internal/http_client/metrics_transport.go#L93-L115
15+
- --vmodule=metrics_transport=6

pkg/agent/run.go

Lines changed: 3 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -231,8 +231,6 @@ func Run(cmd *cobra.Command, args []string) (returnErr error) {
231231
// If any of the go routines exit (with nil or error) the main context will
232232
// be cancelled, which will cause this blocking loop to exit
233233
// instead of waiting for the time period.
234-
// TODO(wallrj): Pass a context to gatherAndOutputData, so that we don't
235-
// have to wait for it to finish before exiting the process.
236234
for {
237235
if err := gatherAndOutputData(klog.NewContext(ctx, log), eventf, config, preflightClient, dataGatherers); err != nil {
238236
return err
@@ -397,9 +395,7 @@ func postData(ctx context.Context, config CombinedConfig, preflightClient client
397395

398396
if config.AuthMode == VenafiCloudKeypair || config.AuthMode == VenafiCloudVenafiConnection {
399397
// orgID and clusterID are not required for Venafi Cloud auth
400-
// TODO(wallrj): Pass the context to PostDataReadingsWithOptions, so
401-
// that its network operations can be cancelled.
402-
err := preflightClient.PostDataReadingsWithOptions(readings, client.Options{
398+
err := preflightClient.PostDataReadingsWithOptions(ctx, readings, client.Options{
403399
ClusterName: config.ClusterID,
404400
ClusterDescription: config.ClusterDescription,
405401
})
@@ -427,10 +423,7 @@ func postData(ctx context.Context, config CombinedConfig, preflightClient client
427423
if path == "" {
428424
path = "/api/v1/datareadings"
429425
}
430-
// TODO(wallrj): Pass the context to Post, so that its network
431-
// operations can be cancelled.
432-
res, err := preflightClient.Post(path, bytes.NewBuffer(data))
433-
426+
res, err := preflightClient.Post(ctx, path, bytes.NewBuffer(data))
434427
if err != nil {
435428
return fmt.Errorf("failed to post data: %+v", err)
436429
}
@@ -453,9 +446,7 @@ func postData(ctx context.Context, config CombinedConfig, preflightClient client
453446
return fmt.Errorf("post to server failed: missing clusterID from agent configuration")
454447
}
455448

456-
// TODO(wallrj): Pass the context to PostDataReadings, so
457-
// that its network operations can be cancelled.
458-
err := preflightClient.PostDataReadings(config.OrganizationID, config.ClusterID, readings)
449+
err := preflightClient.PostDataReadings(ctx, config.OrganizationID, config.ClusterID, readings)
459450
if err != nil {
460451
return fmt.Errorf("post to server failed: %+v", err)
461452
}

pkg/client/client.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package client
22

33
import (
4+
"context"
45
"fmt"
56
"io"
67
"net/http"
@@ -29,9 +30,9 @@ type (
2930

3031
// The Client interface describes types that perform requests against the Jetstack Secure backend.
3132
Client interface {
32-
PostDataReadings(orgID, clusterID string, readings []*api.DataReading) error
33-
PostDataReadingsWithOptions(readings []*api.DataReading, options Options) error
34-
Post(path string, body io.Reader) (*http.Response, error)
33+
PostDataReadings(ctx context.Context, orgID, clusterID string, readings []*api.DataReading) error
34+
PostDataReadingsWithOptions(ctx context.Context, readings []*api.DataReading, options Options) error
35+
Post(ctx context.Context, path string, body io.Reader) (*http.Response, error)
3536
}
3637

3738
// The Credentials interface describes methods for credential types to implement for verification.

pkg/client/client_api_token.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package client
22

33
import (
44
"bytes"
5+
"context"
56
"encoding/json"
67
"fmt"
78
"io"
@@ -40,13 +41,13 @@ func NewAPITokenClient(agentMetadata *api.AgentMetadata, apiToken, baseURL strin
4041

4142
// PostDataReadingsWithOptions uploads the slice of api.DataReading to the Jetstack Secure backend to be processed for later
4243
// viewing in the user-interface.
43-
func (c *APITokenClient) PostDataReadingsWithOptions(readings []*api.DataReading, opts Options) error {
44-
return c.PostDataReadings(opts.OrgID, opts.ClusterID, readings)
44+
func (c *APITokenClient) PostDataReadingsWithOptions(ctx context.Context, readings []*api.DataReading, opts Options) error {
45+
return c.PostDataReadings(ctx, opts.OrgID, opts.ClusterID, readings)
4546
}
4647

4748
// PostDataReadings uploads the slice of api.DataReading to the Jetstack Secure backend to be processed for later
4849
// viewing in the user-interface.
49-
func (c *APITokenClient) PostDataReadings(orgID, clusterID string, readings []*api.DataReading) error {
50+
func (c *APITokenClient) PostDataReadings(ctx context.Context, orgID, clusterID string, readings []*api.DataReading) error {
5051
payload := api.DataReadingsPost{
5152
AgentMetadata: c.agentMetadata,
5253
DataGatherTime: time.Now().UTC(),
@@ -57,7 +58,7 @@ func (c *APITokenClient) PostDataReadings(orgID, clusterID string, readings []*a
5758
return err
5859
}
5960

60-
res, err := c.Post(filepath.Join("/api/v1/org", orgID, "datareadings", clusterID), bytes.NewBuffer(data))
61+
res, err := c.Post(ctx, filepath.Join("/api/v1/org", orgID, "datareadings", clusterID), bytes.NewBuffer(data))
6162
if err != nil {
6263
return err
6364
}
@@ -77,8 +78,8 @@ func (c *APITokenClient) PostDataReadings(orgID, clusterID string, readings []*a
7778
}
7879

7980
// Post performs an HTTP POST request.
80-
func (c *APITokenClient) Post(path string, body io.Reader) (*http.Response, error) {
81-
req, err := http.NewRequest(http.MethodPost, fullURL(c.baseURL, path), body)
81+
func (c *APITokenClient) Post(ctx context.Context, path string, body io.Reader) (*http.Response, error) {
82+
req, err := http.NewRequestWithContext(ctx, http.MethodPost, fullURL(c.baseURL, path), body)
8283
if err != nil {
8384
return nil, err
8485
}

pkg/client/client_oauth.go

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package client
22

33
import (
44
"bytes"
5+
"context"
56
"encoding/json"
67
"fmt"
78
"io"
@@ -97,13 +98,13 @@ func NewOAuthClient(agentMetadata *api.AgentMetadata, credentials *OAuthCredenti
9798
}, nil
9899
}
99100

100-
func (c *OAuthClient) PostDataReadingsWithOptions(readings []*api.DataReading, opts Options) error {
101-
return c.PostDataReadings(opts.OrgID, opts.ClusterID, readings)
101+
func (c *OAuthClient) PostDataReadingsWithOptions(ctx context.Context, readings []*api.DataReading, opts Options) error {
102+
return c.PostDataReadings(ctx, opts.OrgID, opts.ClusterID, readings)
102103
}
103104

104105
// PostDataReadings uploads the slice of api.DataReading to the Jetstack Secure backend to be processed for later
105106
// viewing in the user-interface.
106-
func (c *OAuthClient) PostDataReadings(orgID, clusterID string, readings []*api.DataReading) error {
107+
func (c *OAuthClient) PostDataReadings(ctx context.Context, orgID, clusterID string, readings []*api.DataReading) error {
107108
payload := api.DataReadingsPost{
108109
AgentMetadata: c.agentMetadata,
109110
DataGatherTime: time.Now().UTC(),
@@ -114,7 +115,7 @@ func (c *OAuthClient) PostDataReadings(orgID, clusterID string, readings []*api.
114115
return err
115116
}
116117

117-
res, err := c.Post(filepath.Join("/api/v1/org", orgID, "datareadings", clusterID), bytes.NewBuffer(data))
118+
res, err := c.Post(ctx, filepath.Join("/api/v1/org", orgID, "datareadings", clusterID), bytes.NewBuffer(data))
118119
if err != nil {
119120
return err
120121
}
@@ -134,13 +135,13 @@ func (c *OAuthClient) PostDataReadings(orgID, clusterID string, readings []*api.
134135
}
135136

136137
// Post performs an HTTP POST request.
137-
func (c *OAuthClient) Post(path string, body io.Reader) (*http.Response, error) {
138-
token, err := c.getValidAccessToken()
138+
func (c *OAuthClient) Post(ctx context.Context, path string, body io.Reader) (*http.Response, error) {
139+
token, err := c.getValidAccessToken(ctx)
139140
if err != nil {
140141
return nil, err
141142
}
142143

143-
req, err := http.NewRequest(http.MethodPost, fullURL(c.baseURL, path), body)
144+
req, err := http.NewRequestWithContext(ctx, http.MethodPost, fullURL(c.baseURL, path), body)
144145
if err != nil {
145146
return nil, err
146147
}
@@ -157,9 +158,9 @@ func (c *OAuthClient) Post(path string, body io.Reader) (*http.Response, error)
157158
// getValidAccessToken returns a valid access token. It will fetch a new access
158159
// token from the auth server in case the current access token does not exist
159160
// or it is expired.
160-
func (c *OAuthClient) getValidAccessToken() (*accessToken, error) {
161+
func (c *OAuthClient) getValidAccessToken(ctx context.Context) (*accessToken, error) {
161162
if c.accessToken.needsRenew() {
162-
err := c.renewAccessToken()
163+
err := c.renewAccessToken(ctx)
163164
if err != nil {
164165
return nil, err
165166
}
@@ -168,7 +169,7 @@ func (c *OAuthClient) getValidAccessToken() (*accessToken, error) {
168169
return c.accessToken, nil
169170
}
170171

171-
func (c *OAuthClient) renewAccessToken() error {
172+
func (c *OAuthClient) renewAccessToken(ctx context.Context) error {
172173
tokenURL := fmt.Sprintf("https://%s/oauth/token", c.credentials.AuthServerDomain)
173174
audience := "https://preflight.jetstack.io/api/v1"
174175
payload := url.Values{}
@@ -178,7 +179,7 @@ func (c *OAuthClient) renewAccessToken() error {
178179
payload.Set("audience", audience)
179180
payload.Set("username", c.credentials.UserID)
180181
payload.Set("password", c.credentials.UserSecret)
181-
req, err := http.NewRequest("POST", tokenURL, strings.NewReader(payload.Encode()))
182+
req, err := http.NewRequestWithContext(ctx, "POST", tokenURL, strings.NewReader(payload.Encode()))
182183
if err != nil {
183184
return errors.WithStack(err)
184185
}
@@ -188,7 +189,8 @@ func (c *OAuthClient) renewAccessToken() error {
188189
if err != nil {
189190
return errors.WithStack(err)
190191
}
191-
192+
// TODO(wallrj): This will block. Read the body incrementally and check for
193+
// context cancellation.
192194
body, err := io.ReadAll(res.Body)
193195
if err != nil {
194196
return errors.WithStack(err)

pkg/client/client_venafi_cloud.go

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package client
22

33
import (
44
"bytes"
5+
"context"
56
"crypto"
67
"crypto/ecdsa"
78
"crypto/ed25519"
@@ -168,7 +169,7 @@ func (c *VenafiSvcAccountCredentials) IsClientSet() (ok bool, why string) {
168169

169170
// PostDataReadingsWithOptions uploads the slice of api.DataReading to the Venafi Cloud backend to be processed.
170171
// The Options are then passed as URL params in the request
171-
func (c *VenafiCloudClient) PostDataReadingsWithOptions(readings []*api.DataReading, opts Options) error {
172+
func (c *VenafiCloudClient) PostDataReadingsWithOptions(ctx context.Context, readings []*api.DataReading, opts Options) error {
172173
payload := api.DataReadingsPost{
173174
AgentMetadata: c.agentMetadata,
174175
DataGatherTime: time.Now().UTC(),
@@ -199,7 +200,7 @@ func (c *VenafiCloudClient) PostDataReadingsWithOptions(readings []*api.DataRead
199200
}
200201
venafiCloudUploadURL.RawQuery = query.Encode()
201202

202-
res, err := c.Post(venafiCloudUploadURL.String(), bytes.NewBuffer(data))
203+
res, err := c.Post(ctx, venafiCloudUploadURL.String(), bytes.NewBuffer(data))
203204
if err != nil {
204205
return err
205206
}
@@ -219,7 +220,7 @@ func (c *VenafiCloudClient) PostDataReadingsWithOptions(readings []*api.DataRead
219220

220221
// PostDataReadings uploads the slice of api.DataReading to the Venafi Cloud backend to be processed for later
221222
// viewing in the user-interface.
222-
func (c *VenafiCloudClient) PostDataReadings(_ string, _ string, readings []*api.DataReading) error {
223+
func (c *VenafiCloudClient) PostDataReadings(ctx context.Context, _ string, _ string, readings []*api.DataReading) error {
223224
// orgID and clusterID are ignored in Venafi Cloud auth
224225

225226
payload := api.DataReadingsPost{
@@ -235,7 +236,7 @@ func (c *VenafiCloudClient) PostDataReadings(_ string, _ string, readings []*api
235236
if !strings.HasSuffix(c.uploadPath, "/") {
236237
c.uploadPath = fmt.Sprintf("%s/", c.uploadPath)
237238
}
238-
res, err := c.Post(filepath.Join(c.uploadPath, c.uploaderID), bytes.NewBuffer(data))
239+
res, err := c.Post(ctx, filepath.Join(c.uploadPath, c.uploaderID), bytes.NewBuffer(data))
239240
if err != nil {
240241
return err
241242
}
@@ -254,8 +255,8 @@ func (c *VenafiCloudClient) PostDataReadings(_ string, _ string, readings []*api
254255
}
255256

256257
// Post performs an HTTP POST request.
257-
func (c *VenafiCloudClient) Post(path string, body io.Reader) (*http.Response, error) {
258-
token, err := c.getValidAccessToken()
258+
func (c *VenafiCloudClient) Post(ctx context.Context, path string, body io.Reader) (*http.Response, error) {
259+
token, err := c.getValidAccessToken(ctx)
259260
if err != nil {
260261
return nil, err
261262
}
@@ -278,9 +279,9 @@ func (c *VenafiCloudClient) Post(path string, body io.Reader) (*http.Response, e
278279
// getValidAccessToken returns a valid access token. It will fetch a new access
279280
// token from the auth server in case the current access token does not exist
280281
// or it is expired.
281-
func (c *VenafiCloudClient) getValidAccessToken() (*venafiCloudAccessToken, error) {
282+
func (c *VenafiCloudClient) getValidAccessToken(ctx context.Context) (*venafiCloudAccessToken, error) {
282283
if c.accessToken == nil || time.Now().Add(time.Minute).After(c.accessToken.expirationTime) {
283-
err := c.updateAccessToken()
284+
err := c.updateAccessToken(ctx)
284285
if err != nil {
285286
return nil, err
286287
}
@@ -289,7 +290,7 @@ func (c *VenafiCloudClient) getValidAccessToken() (*venafiCloudAccessToken, erro
289290
return c.accessToken, nil
290291
}
291292

292-
func (c *VenafiCloudClient) updateAccessToken() error {
293+
func (c *VenafiCloudClient) updateAccessToken(ctx context.Context) error {
293294
jwtToken, err := c.generateAndSignJwtToken()
294295
if err != nil {
295296
return err
@@ -302,7 +303,7 @@ func (c *VenafiCloudClient) updateAccessToken() error {
302303
tokenURL := fullURL(c.baseURL, accessTokenEndpoint)
303304

304305
encoded := values.Encode()
305-
request, err := http.NewRequest(http.MethodPost, tokenURL, strings.NewReader(encoded))
306+
request, err := http.NewRequestWithContext(ctx, http.MethodPost, tokenURL, strings.NewReader(encoded))
306307
if err != nil {
307308
return err
308309
}

pkg/client/client_venconn.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -123,12 +123,12 @@ func (c *VenConnClient) Start(ctx context.Context) error {
123123

124124
// `opts.ClusterName` and `opts.ClusterDescription` are the only values used
125125
// from the Options struct. OrgID and ClusterID are not used in Venafi Cloud.
126-
func (c *VenConnClient) PostDataReadingsWithOptions(readings []*api.DataReading, opts Options) error {
126+
func (c *VenConnClient) PostDataReadingsWithOptions(ctx context.Context, readings []*api.DataReading, opts Options) error {
127127
if opts.ClusterName == "" {
128128
return fmt.Errorf("programmer mistake: the cluster name (aka `cluster_id` in the config file) cannot be left empty")
129129
}
130130

131-
_, token, err := c.connHandler.Get(context.Background(), c.installNS, auth.Scope{}, types.NamespacedName{Name: c.venConnName, Namespace: c.venConnNS})
131+
_, token, err := c.connHandler.Get(ctx, c.installNS, auth.Scope{}, types.NamespacedName{Name: c.venConnName, Namespace: c.venConnNS})
132132
if err != nil {
133133
return fmt.Errorf("while loading the VenafiConnection %s/%s: %w", c.venConnNS, c.venConnName, err)
134134
}
@@ -161,7 +161,7 @@ func (c *VenConnClient) PostDataReadingsWithOptions(readings []*api.DataReading,
161161
// The path parameter "no" is a dummy parameter to make the Venafi Cloud
162162
// backend happy. This parameter, named `uploaderID` in the backend, is not
163163
// actually used by the backend.
164-
req, err := http.NewRequest(http.MethodPost, fullURL(token.BaseURL, "/v1/tlspk/upload/clusterdata/no"), encodedBody)
164+
req, err := http.NewRequestWithContext(ctx, http.MethodPost, fullURL(token.BaseURL, "/v1/tlspk/upload/clusterdata/no"), encodedBody)
165165
if err != nil {
166166
return err
167167
}
@@ -206,13 +206,13 @@ func (c *VenConnClient) PostDataReadingsWithOptions(readings []*api.DataReading,
206206
// Cloud needs a `clusterName` and `clusterDescription`, but this function can
207207
// only pass `orgID` and `clusterID` which are both useless in Venafi Cloud. Use
208208
// PostDataReadingsWithOptions instead.
209-
func (c *VenConnClient) PostDataReadings(_orgID, _clusterID string, readings []*api.DataReading) error {
209+
func (c *VenConnClient) PostDataReadings(_ context.Context, _orgID, _clusterID string, readings []*api.DataReading) error {
210210
return fmt.Errorf("programmer mistake: PostDataReadings is not implemented for Venafi Cloud")
211211
}
212212

213213
// Post isn't implemented for Venafi Cloud because /v1/tlspk/upload/clusterdata
214214
// requires using the query parameters `name` and `description` which can't be
215215
// set using Post. Use PostDataReadingsWithOptions instead.
216-
func (c *VenConnClient) Post(path string, body io.Reader) (*http.Response, error) {
216+
func (c *VenConnClient) Post(_ context.Context, path string, body io.Reader) (*http.Response, error) {
217217
return nil, fmt.Errorf("programmer mistake: Post is not implemented for Venafi Cloud")
218218
}

0 commit comments

Comments
 (0)