Skip to content

Commit f46d396

Browse files
committed
Pass the context to the Venafi clients
Add the client-go debug round tripper to the venafi clients Signed-off-by: Richard Wall <[email protected]>
1 parent 24e02f3 commit f46d396

File tree

6 files changed

+62
-54
lines changed

6 files changed

+62
-54
lines changed

pkg/agent/run.go

Lines changed: 3 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,6 @@ func Run(cmd *cobra.Command, args []string) (returnErr error) {
148148
if err != nil {
149149
return fmt.Errorf("failed to start a controller-runtime component: %v", err)
150150
}
151-
152151
// The agent must stop if the controller-runtime component stops.
153152
cancel()
154153
return nil
@@ -231,8 +230,6 @@ func Run(cmd *cobra.Command, args []string) (returnErr error) {
231230
// If any of the go routines exit (with nil or error) the main context will
232231
// be cancelled, which will cause this blocking loop to exit
233232
// instead of waiting for the time period.
234-
// TODO(wallrj): Pass a context to gatherAndOutputData, so that we don't
235-
// have to wait for it to finish before exiting the process.
236233
for {
237234
if err := gatherAndOutputData(klog.NewContext(ctx, log), eventf, config, preflightClient, dataGatherers); err != nil {
238235
return err
@@ -397,9 +394,7 @@ func postData(ctx context.Context, config CombinedConfig, preflightClient client
397394

398395
if config.AuthMode == VenafiCloudKeypair || config.AuthMode == VenafiCloudVenafiConnection {
399396
// orgID and clusterID are not required for Venafi Cloud auth
400-
// TODO(wallrj): Pass the context to PostDataReadingsWithOptions, so
401-
// that its network operations can be cancelled.
402-
err := preflightClient.PostDataReadingsWithOptions(readings, client.Options{
397+
err := preflightClient.PostDataReadingsWithOptions(ctx, readings, client.Options{
403398
ClusterName: config.ClusterID,
404399
ClusterDescription: config.ClusterDescription,
405400
})
@@ -427,10 +422,7 @@ func postData(ctx context.Context, config CombinedConfig, preflightClient client
427422
if path == "" {
428423
path = "/api/v1/datareadings"
429424
}
430-
// TODO(wallrj): Pass the context to Post, so that its network
431-
// operations can be cancelled.
432-
res, err := preflightClient.Post(path, bytes.NewBuffer(data))
433-
425+
res, err := preflightClient.Post(ctx, path, bytes.NewBuffer(data))
434426
if err != nil {
435427
return fmt.Errorf("failed to post data: %+v", err)
436428
}
@@ -453,9 +445,7 @@ func postData(ctx context.Context, config CombinedConfig, preflightClient client
453445
return fmt.Errorf("post to server failed: missing clusterID from agent configuration")
454446
}
455447

456-
// TODO(wallrj): Pass the context to PostDataReadings, so
457-
// that its network operations can be cancelled.
458-
err := preflightClient.PostDataReadings(config.OrganizationID, config.ClusterID, readings)
448+
err := preflightClient.PostDataReadings(ctx, config.OrganizationID, config.ClusterID, readings)
459449
if err != nil {
460450
return fmt.Errorf("post to server failed: %+v", err)
461451
}

pkg/client/client.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package client
22

33
import (
4+
"context"
45
"fmt"
56
"io"
67
"net/http"
@@ -29,9 +30,9 @@ type (
2930

3031
// The Client interface describes types that perform requests against the Jetstack Secure backend.
3132
Client interface {
32-
PostDataReadings(orgID, clusterID string, readings []*api.DataReading) error
33-
PostDataReadingsWithOptions(readings []*api.DataReading, options Options) error
34-
Post(path string, body io.Reader) (*http.Response, error)
33+
PostDataReadings(ctx context.Context, orgID, clusterID string, readings []*api.DataReading) error
34+
PostDataReadingsWithOptions(ctx context.Context, readings []*api.DataReading, options Options) error
35+
Post(ctx context.Context, path string, body io.Reader) (*http.Response, error)
3536
}
3637

3738
// The Credentials interface describes methods for credential types to implement for verification.

pkg/client/client_api_token.go

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package client
22

33
import (
44
"bytes"
5+
"context"
56
"encoding/json"
67
"fmt"
78
"io"
@@ -10,6 +11,7 @@ import (
1011
"time"
1112

1213
"github.com/jetstack/preflight/api"
14+
"k8s.io/client-go/transport"
1315
)
1416

1517
type (
@@ -34,19 +36,22 @@ func NewAPITokenClient(agentMetadata *api.AgentMetadata, apiToken, baseURL strin
3436
apiToken: apiToken,
3537
agentMetadata: agentMetadata,
3638
baseURL: baseURL,
37-
client: &http.Client{Timeout: time.Minute},
39+
client: &http.Client{
40+
Timeout: time.Minute,
41+
Transport: transport.DebugWrappers(http.DefaultTransport),
42+
},
3843
}, nil
3944
}
4045

4146
// PostDataReadingsWithOptions uploads the slice of api.DataReading to the Jetstack Secure backend to be processed for later
4247
// viewing in the user-interface.
43-
func (c *APITokenClient) PostDataReadingsWithOptions(readings []*api.DataReading, opts Options) error {
44-
return c.PostDataReadings(opts.OrgID, opts.ClusterID, readings)
48+
func (c *APITokenClient) PostDataReadingsWithOptions(ctx context.Context, readings []*api.DataReading, opts Options) error {
49+
return c.PostDataReadings(ctx, opts.OrgID, opts.ClusterID, readings)
4550
}
4651

4752
// PostDataReadings uploads the slice of api.DataReading to the Jetstack Secure backend to be processed for later
4853
// viewing in the user-interface.
49-
func (c *APITokenClient) PostDataReadings(orgID, clusterID string, readings []*api.DataReading) error {
54+
func (c *APITokenClient) PostDataReadings(ctx context.Context, orgID, clusterID string, readings []*api.DataReading) error {
5055
payload := api.DataReadingsPost{
5156
AgentMetadata: c.agentMetadata,
5257
DataGatherTime: time.Now().UTC(),
@@ -57,7 +62,7 @@ func (c *APITokenClient) PostDataReadings(orgID, clusterID string, readings []*a
5762
return err
5863
}
5964

60-
res, err := c.Post(filepath.Join("/api/v1/org", orgID, "datareadings", clusterID), bytes.NewBuffer(data))
65+
res, err := c.Post(ctx, filepath.Join("/api/v1/org", orgID, "datareadings", clusterID), bytes.NewBuffer(data))
6166
if err != nil {
6267
return err
6368
}
@@ -77,8 +82,8 @@ func (c *APITokenClient) PostDataReadings(orgID, clusterID string, readings []*a
7782
}
7883

7984
// Post performs an HTTP POST request.
80-
func (c *APITokenClient) Post(path string, body io.Reader) (*http.Response, error) {
81-
req, err := http.NewRequest(http.MethodPost, fullURL(c.baseURL, path), body)
85+
func (c *APITokenClient) Post(ctx context.Context, path string, body io.Reader) (*http.Response, error) {
86+
req, err := http.NewRequestWithContext(ctx, http.MethodPost, fullURL(c.baseURL, path), body)
8287
if err != nil {
8388
return nil, err
8489
}

pkg/client/client_oauth.go

Lines changed: 19 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package client
22

33
import (
44
"bytes"
5+
"context"
56
"encoding/json"
67
"fmt"
78
"io"
@@ -13,6 +14,7 @@ import (
1314

1415
"github.com/hashicorp/go-multierror"
1516
"github.com/pkg/errors"
17+
"k8s.io/client-go/transport"
1618

1719
"github.com/jetstack/preflight/api"
1820
)
@@ -93,17 +95,20 @@ func NewOAuthClient(agentMetadata *api.AgentMetadata, credentials *OAuthCredenti
9395
credentials: credentials,
9496
baseURL: baseURL,
9597
accessToken: &accessToken{},
96-
client: &http.Client{Timeout: time.Minute},
98+
client: &http.Client{
99+
Timeout: time.Minute,
100+
Transport: transport.DebugWrappers(http.DefaultTransport),
101+
},
97102
}, nil
98103
}
99104

100-
func (c *OAuthClient) PostDataReadingsWithOptions(readings []*api.DataReading, opts Options) error {
101-
return c.PostDataReadings(opts.OrgID, opts.ClusterID, readings)
105+
func (c *OAuthClient) PostDataReadingsWithOptions(ctx context.Context, readings []*api.DataReading, opts Options) error {
106+
return c.PostDataReadings(ctx, opts.OrgID, opts.ClusterID, readings)
102107
}
103108

104109
// PostDataReadings uploads the slice of api.DataReading to the Jetstack Secure backend to be processed for later
105110
// viewing in the user-interface.
106-
func (c *OAuthClient) PostDataReadings(orgID, clusterID string, readings []*api.DataReading) error {
111+
func (c *OAuthClient) PostDataReadings(ctx context.Context, orgID, clusterID string, readings []*api.DataReading) error {
107112
payload := api.DataReadingsPost{
108113
AgentMetadata: c.agentMetadata,
109114
DataGatherTime: time.Now().UTC(),
@@ -114,7 +119,7 @@ func (c *OAuthClient) PostDataReadings(orgID, clusterID string, readings []*api.
114119
return err
115120
}
116121

117-
res, err := c.Post(filepath.Join("/api/v1/org", orgID, "datareadings", clusterID), bytes.NewBuffer(data))
122+
res, err := c.Post(ctx, filepath.Join("/api/v1/org", orgID, "datareadings", clusterID), bytes.NewBuffer(data))
118123
if err != nil {
119124
return err
120125
}
@@ -134,13 +139,13 @@ func (c *OAuthClient) PostDataReadings(orgID, clusterID string, readings []*api.
134139
}
135140

136141
// Post performs an HTTP POST request.
137-
func (c *OAuthClient) Post(path string, body io.Reader) (*http.Response, error) {
138-
token, err := c.getValidAccessToken()
142+
func (c *OAuthClient) Post(ctx context.Context, path string, body io.Reader) (*http.Response, error) {
143+
token, err := c.getValidAccessToken(ctx)
139144
if err != nil {
140145
return nil, err
141146
}
142147

143-
req, err := http.NewRequest(http.MethodPost, fullURL(c.baseURL, path), body)
148+
req, err := http.NewRequestWithContext(ctx, http.MethodPost, fullURL(c.baseURL, path), body)
144149
if err != nil {
145150
return nil, err
146151
}
@@ -157,9 +162,9 @@ func (c *OAuthClient) Post(path string, body io.Reader) (*http.Response, error)
157162
// getValidAccessToken returns a valid access token. It will fetch a new access
158163
// token from the auth server in case the current access token does not exist
159164
// or it is expired.
160-
func (c *OAuthClient) getValidAccessToken() (*accessToken, error) {
165+
func (c *OAuthClient) getValidAccessToken(ctx context.Context) (*accessToken, error) {
161166
if c.accessToken.needsRenew() {
162-
err := c.renewAccessToken()
167+
err := c.renewAccessToken(ctx)
163168
if err != nil {
164169
return nil, err
165170
}
@@ -168,7 +173,7 @@ func (c *OAuthClient) getValidAccessToken() (*accessToken, error) {
168173
return c.accessToken, nil
169174
}
170175

171-
func (c *OAuthClient) renewAccessToken() error {
176+
func (c *OAuthClient) renewAccessToken(ctx context.Context) error {
172177
tokenURL := fmt.Sprintf("https://%s/oauth/token", c.credentials.AuthServerDomain)
173178
audience := "https://preflight.jetstack.io/api/v1"
174179
payload := url.Values{}
@@ -178,7 +183,7 @@ func (c *OAuthClient) renewAccessToken() error {
178183
payload.Set("audience", audience)
179184
payload.Set("username", c.credentials.UserID)
180185
payload.Set("password", c.credentials.UserSecret)
181-
req, err := http.NewRequest("POST", tokenURL, strings.NewReader(payload.Encode()))
186+
req, err := http.NewRequestWithContext(ctx, "POST", tokenURL, strings.NewReader(payload.Encode()))
182187
if err != nil {
183188
return errors.WithStack(err)
184189
}
@@ -188,7 +193,8 @@ func (c *OAuthClient) renewAccessToken() error {
188193
if err != nil {
189194
return errors.WithStack(err)
190195
}
191-
196+
// TODO(wallrj): This will block. Read the body incrementally and check for
197+
// context cancellation.
192198
body, err := io.ReadAll(res.Body)
193199
if err != nil {
194200
return errors.WithStack(err)

pkg/client/client_venafi_cloud.go

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package client
22

33
import (
44
"bytes"
5+
"context"
56
"crypto"
67
"crypto/ecdsa"
78
"crypto/ed25519"
@@ -26,6 +27,7 @@ import (
2627
"github.com/google/uuid"
2728
"github.com/hashicorp/go-multierror"
2829
"github.com/microcosm-cc/bluemonday"
30+
"k8s.io/client-go/transport"
2931

3032
"github.com/jetstack/preflight/api"
3133
)
@@ -111,7 +113,10 @@ func NewVenafiCloudClient(agentMetadata *api.AgentMetadata, credentials *VenafiS
111113
credentials: credentials,
112114
baseURL: baseURL,
113115
accessToken: &venafiCloudAccessToken{},
114-
Client: &http.Client{Timeout: time.Minute},
116+
Client: &http.Client{
117+
Timeout: time.Minute,
118+
Transport: transport.DebugWrappers(http.DefaultTransport),
119+
},
115120
uploaderID: uploaderID,
116121
uploadPath: uploadPath,
117122
privateKey: privateKey,
@@ -168,7 +173,7 @@ func (c *VenafiSvcAccountCredentials) IsClientSet() (ok bool, why string) {
168173

169174
// PostDataReadingsWithOptions uploads the slice of api.DataReading to the Venafi Cloud backend to be processed.
170175
// The Options are then passed as URL params in the request
171-
func (c *VenafiCloudClient) PostDataReadingsWithOptions(readings []*api.DataReading, opts Options) error {
176+
func (c *VenafiCloudClient) PostDataReadingsWithOptions(ctx context.Context, readings []*api.DataReading, opts Options) error {
172177
payload := api.DataReadingsPost{
173178
AgentMetadata: c.agentMetadata,
174179
DataGatherTime: time.Now().UTC(),
@@ -199,7 +204,7 @@ func (c *VenafiCloudClient) PostDataReadingsWithOptions(readings []*api.DataRead
199204
}
200205
venafiCloudUploadURL.RawQuery = query.Encode()
201206

202-
res, err := c.Post(venafiCloudUploadURL.String(), bytes.NewBuffer(data))
207+
res, err := c.Post(ctx, venafiCloudUploadURL.String(), bytes.NewBuffer(data))
203208
if err != nil {
204209
return err
205210
}
@@ -219,7 +224,7 @@ func (c *VenafiCloudClient) PostDataReadingsWithOptions(readings []*api.DataRead
219224

220225
// PostDataReadings uploads the slice of api.DataReading to the Venafi Cloud backend to be processed for later
221226
// viewing in the user-interface.
222-
func (c *VenafiCloudClient) PostDataReadings(_ string, _ string, readings []*api.DataReading) error {
227+
func (c *VenafiCloudClient) PostDataReadings(ctx context.Context, _ string, _ string, readings []*api.DataReading) error {
223228
// orgID and clusterID are ignored in Venafi Cloud auth
224229

225230
payload := api.DataReadingsPost{
@@ -235,7 +240,7 @@ func (c *VenafiCloudClient) PostDataReadings(_ string, _ string, readings []*api
235240
if !strings.HasSuffix(c.uploadPath, "/") {
236241
c.uploadPath = fmt.Sprintf("%s/", c.uploadPath)
237242
}
238-
res, err := c.Post(filepath.Join(c.uploadPath, c.uploaderID), bytes.NewBuffer(data))
243+
res, err := c.Post(ctx, filepath.Join(c.uploadPath, c.uploaderID), bytes.NewBuffer(data))
239244
if err != nil {
240245
return err
241246
}
@@ -254,8 +259,8 @@ func (c *VenafiCloudClient) PostDataReadings(_ string, _ string, readings []*api
254259
}
255260

256261
// Post performs an HTTP POST request.
257-
func (c *VenafiCloudClient) Post(path string, body io.Reader) (*http.Response, error) {
258-
token, err := c.getValidAccessToken()
262+
func (c *VenafiCloudClient) Post(ctx context.Context, path string, body io.Reader) (*http.Response, error) {
263+
token, err := c.getValidAccessToken(ctx)
259264
if err != nil {
260265
return nil, err
261266
}
@@ -278,9 +283,9 @@ func (c *VenafiCloudClient) Post(path string, body io.Reader) (*http.Response, e
278283
// getValidAccessToken returns a valid access token. It will fetch a new access
279284
// token from the auth server in case the current access token does not exist
280285
// or it is expired.
281-
func (c *VenafiCloudClient) getValidAccessToken() (*venafiCloudAccessToken, error) {
286+
func (c *VenafiCloudClient) getValidAccessToken(ctx context.Context) (*venafiCloudAccessToken, error) {
282287
if c.accessToken == nil || time.Now().Add(time.Minute).After(c.accessToken.expirationTime) {
283-
err := c.updateAccessToken()
288+
err := c.updateAccessToken(ctx)
284289
if err != nil {
285290
return nil, err
286291
}
@@ -289,7 +294,7 @@ func (c *VenafiCloudClient) getValidAccessToken() (*venafiCloudAccessToken, erro
289294
return c.accessToken, nil
290295
}
291296

292-
func (c *VenafiCloudClient) updateAccessToken() error {
297+
func (c *VenafiCloudClient) updateAccessToken(ctx context.Context) error {
293298
jwtToken, err := c.generateAndSignJwtToken()
294299
if err != nil {
295300
return err
@@ -302,7 +307,7 @@ func (c *VenafiCloudClient) updateAccessToken() error {
302307
tokenURL := fullURL(c.baseURL, accessTokenEndpoint)
303308

304309
encoded := values.Encode()
305-
request, err := http.NewRequest(http.MethodPost, tokenURL, strings.NewReader(encoded))
310+
request, err := http.NewRequestWithContext(ctx, http.MethodPost, tokenURL, strings.NewReader(encoded))
306311
if err != nil {
307312
return err
308313
}

0 commit comments

Comments
 (0)