Skip to content

Commit 4c3024d

Browse files
Merge pull request #410 from jottofar/ota-244-add-context
Use context to add timeout to cincinnati HTTP request
2 parents 7141c36 + 1d1de3b commit 4c3024d

File tree

7 files changed

+41
-32
lines changed

7 files changed

+41
-32
lines changed

pkg/cincinnati/cincinnati.go

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,14 @@
11
package cincinnati
22

33
import (
4+
"context"
45
"crypto/tls"
56
"encoding/json"
67
"fmt"
78
"io/ioutil"
89
"net/http"
910
"net/url"
11+
"time"
1012

1113
"github.com/blang/semver/v4"
1214
"github.com/google/uuid"
@@ -16,6 +18,9 @@ const (
1618
// GraphMediaType is the media-type specified in the HTTP Accept header
1719
// of requests sent to the Cincinnati-v1 Graph API.
1820
GraphMediaType = "application/json"
21+
22+
// Timeout when calling upstream Cincinnati stack.
23+
getUpdatesTimeout = time.Minute * 60
1924
)
2025

2126
// Client is a Cincinnati client which can be used to fetch update graphs from
@@ -58,7 +63,7 @@ func (err *Error) Error() string {
5863
// finding all of the children. These children are the available updates for
5964
// the current version and their payloads indicate from where the actual update
6065
// image can be downloaded.
61-
func (c Client) GetUpdates(uri *url.URL, arch string, channel string, version semver.Version) ([]Update, error) {
66+
func (c Client) GetUpdates(ctx context.Context, uri *url.URL, arch string, channel string, version semver.Version) ([]Update, error) {
6267
transport := http.Transport{}
6368
// Prepare parametrized cincinnati query.
6469
queryParams := uri.Query()
@@ -83,7 +88,9 @@ func (c Client) GetUpdates(uri *url.URL, arch string, channel string, version se
8388
}
8489

8590
client := http.Client{Transport: &transport}
86-
resp, err := client.Do(req)
91+
timeoutCtx, cancel := context.WithTimeout(ctx, getUpdatesTimeout)
92+
defer cancel()
93+
resp, err := client.Do(req.WithContext(timeoutCtx))
8794
if err != nil {
8895
return nil, &Error{Reason: "RemoteFailed", Message: err.Error(), cause: err}
8996
}

pkg/cincinnati/cincinnati_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package cincinnati
22

33
import (
4+
"context"
45
"crypto/tls"
56
"encoding/json"
67
"fmt"
@@ -132,7 +133,7 @@ func TestGetUpdates(t *testing.T) {
132133
t.Fatal(err)
133134
}
134135

135-
updates, err := c.GetUpdates(uri, arch, channelName, semver.MustParse(test.version))
136+
updates, err := c.GetUpdates(context.Background(), uri, arch, channelName, semver.MustParse(test.version))
136137
if test.err == "" {
137138
if err != nil {
138139
t.Fatalf("expected nil error, got: %v", err)

pkg/cvo/availableupdates.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package cvo
22

33
import (
4+
"context"
45
"crypto/tls"
56
"fmt"
67
"net/url"
@@ -23,7 +24,7 @@ const noChannel string = "NoChannel"
2324
// syncAvailableUpdates attempts to retrieve the latest updates and update the status of the ClusterVersion
2425
// object. It will set the RetrievedUpdates condition. Updates are only checked if it has been more than
2526
// the minimumUpdateCheckInterval since the last check.
26-
func (optr *Operator) syncAvailableUpdates(config *configv1.ClusterVersion) error {
27+
func (optr *Operator) syncAvailableUpdates(ctx context.Context, config *configv1.ClusterVersion) error {
2728
usedDefaultUpstream := false
2829
upstream := string(config.Spec.Upstream)
2930
if len(upstream) == 0 {
@@ -45,7 +46,7 @@ func (optr *Operator) syncAvailableUpdates(config *configv1.ClusterVersion) erro
4546
return err
4647
}
4748

48-
updates, condition := calculateAvailableUpdatesStatus(string(config.Spec.ClusterID), proxyURL, tlsConfig, upstream, arch, channel, optr.releaseVersion)
49+
updates, condition := calculateAvailableUpdatesStatus(ctx, string(config.Spec.ClusterID), proxyURL, tlsConfig, upstream, arch, channel, optr.releaseVersion)
4950

5051
if usedDefaultUpstream {
5152
upstream = ""
@@ -139,7 +140,7 @@ func (optr *Operator) getAvailableUpdates() *availableUpdates {
139140
return optr.availableUpdates
140141
}
141142

142-
func calculateAvailableUpdatesStatus(clusterID string, proxyURL *url.URL, tlsConfig *tls.Config, upstream, arch, channel, version string) ([]configv1.Update, configv1.ClusterOperatorStatusCondition) {
143+
func calculateAvailableUpdatesStatus(ctx context.Context, clusterID string, proxyURL *url.URL, tlsConfig *tls.Config, upstream, arch, channel, version string) ([]configv1.Update, configv1.ClusterOperatorStatusCondition) {
143144
if len(upstream) == 0 {
144145
return nil, configv1.ClusterOperatorStatusCondition{
145146
Type: configv1.RetrievedUpdates, Status: configv1.ConditionFalse, Reason: "NoUpstream",
@@ -193,7 +194,7 @@ func calculateAvailableUpdatesStatus(clusterID string, proxyURL *url.URL, tlsCon
193194
}
194195
}
195196

196-
updates, err := cincinnati.NewClient(uuid, proxyURL, tlsConfig).GetUpdates(upstreamURI, arch, channel, currentVersion)
197+
updates, err := cincinnati.NewClient(uuid, proxyURL, tlsConfig).GetUpdates(ctx, upstreamURI, arch, channel, currentVersion)
197198
if err != nil {
198199
klog.V(2).Infof("Upstream server %s could not return available updates: %v", upstream, err)
199200
if updateError, ok := err.(*cincinnati.Error); ok {

pkg/cvo/cvo.go

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -326,19 +326,19 @@ func (optr *Operator) Run(ctx context.Context, workers int) {
326326
optr.queue.Add(optr.queueKey())
327327

328328
// start the config sync loop, and have it notify the queue when new status is detected
329-
go runThrottledStatusNotifier(stopCh, optr.statusInterval, 2, optr.configSync.StatusCh(), func() { optr.queue.Add(optr.queueKey()) })
329+
go runThrottledStatusNotifier(ctx, optr.statusInterval, 2, optr.configSync.StatusCh(), func() { optr.queue.Add(optr.queueKey()) })
330330
go optr.configSync.Start(ctx, 16, optr.name, optr.cvLister)
331-
go wait.Until(func() { optr.worker(ctx, optr.availableUpdatesQueue, optr.availableUpdatesSync) }, time.Second, stopCh)
332-
go wait.Until(func() { optr.worker(ctx, optr.upgradeableQueue, optr.upgradeableSync) }, time.Second, stopCh)
333-
go wait.Until(func() {
331+
go wait.UntilWithContext(ctx, func(ctx context.Context) { optr.worker(ctx, optr.availableUpdatesQueue, optr.availableUpdatesSync) }, time.Second)
332+
go wait.UntilWithContext(ctx, func(ctx context.Context) { optr.worker(ctx, optr.upgradeableQueue, optr.upgradeableSync) }, time.Second)
333+
go wait.UntilWithContext(ctx, func(ctx context.Context) {
334334
defer close(workerStopCh)
335335

336336
// run the worker, then when the queue is closed sync one final time to flush any pending status
337-
optr.worker(ctx, optr.queue, func(key string) error { return optr.sync(ctx, key) })
337+
optr.worker(ctx, optr.queue, func(ctx context.Context, key string) error { return optr.sync(ctx, key) })
338338
if err := optr.sync(ctx, optr.queueKey()); err != nil {
339339
utilruntime.HandleError(fmt.Errorf("unable to perform final sync: %v", err))
340340
}
341-
}, time.Second, stopCh)
341+
}, time.Second)
342342
if optr.signatureStore != nil {
343343
go optr.signatureStore.Run(ctx, optr.minimumUpdateCheckInterval*2)
344344
}
@@ -375,21 +375,21 @@ func (optr *Operator) eventHandler() cache.ResourceEventHandler {
375375
}
376376
}
377377

378-
func (optr *Operator) worker(ctx context.Context, queue workqueue.RateLimitingInterface, syncHandler func(string) error) {
378+
func (optr *Operator) worker(ctx context.Context, queue workqueue.RateLimitingInterface, syncHandler func(context.Context, string) error) {
379379
for processNextWorkItem(ctx, queue, syncHandler, optr.syncFailingStatus) {
380380
}
381381
}
382382

383383
type syncFailingStatusFunc func(ctx context.Context, config *configv1.ClusterVersion, err error) error
384384

385-
func processNextWorkItem(ctx context.Context, queue workqueue.RateLimitingInterface, syncHandler func(string) error, syncFailingStatus syncFailingStatusFunc) bool {
385+
func processNextWorkItem(ctx context.Context, queue workqueue.RateLimitingInterface, syncHandler func(context.Context, string) error, syncFailingStatus syncFailingStatusFunc) bool {
386386
key, quit := queue.Get()
387387
if quit {
388388
return false
389389
}
390390
defer queue.Done(key)
391391

392-
err := syncHandler(key.(string))
392+
err := syncHandler(ctx, key.(string))
393393
handleErr(ctx, queue, err, key, syncFailingStatus)
394394
return true
395395
}
@@ -486,7 +486,7 @@ func (optr *Operator) sync(ctx context.Context, key string) error {
486486

487487
// availableUpdatesSync is triggered on cluster version change (and periodic requeues) to
488488
// sync available updates. It only modifies cluster version.
489-
func (optr *Operator) availableUpdatesSync(key string) error {
489+
func (optr *Operator) availableUpdatesSync(ctx context.Context, key string) error {
490490
startTime := time.Now()
491491
klog.V(4).Infof("Started syncing available updates %q (%v)", key, startTime)
492492
defer func() {
@@ -503,13 +503,12 @@ func (optr *Operator) availableUpdatesSync(key string) error {
503503
if errs := validation.ValidateClusterVersion(config); len(errs) > 0 {
504504
return nil
505505
}
506-
507-
return optr.syncAvailableUpdates(config)
506+
return optr.syncAvailableUpdates(ctx, config)
508507
}
509508

510509
// upgradeableSync is triggered on cluster version change (and periodic requeues) to
511510
// sync upgradeableCondition. It only modifies cluster version.
512-
func (optr *Operator) upgradeableSync(key string) error {
511+
func (optr *Operator) upgradeableSync(ctx context.Context, key string) error {
513512
startTime := time.Now()
514513
klog.V(4).Infof("Started syncing upgradeable %q (%v)", key, startTime)
515514
defer func() {

pkg/cvo/cvo_test.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -256,7 +256,6 @@ func (c *fakeApiExtClient) Patch(ctx context.Context, name string, pt types.Patc
256256
}
257257

258258
func TestOperator_sync(t *testing.T) {
259-
ctx := context.Background()
260259
id := uuid.Must(uuid.NewRandom()).String()
261260

262261
tests := []struct {
@@ -2271,6 +2270,7 @@ func TestOperator_sync(t *testing.T) {
22712270
}
22722271
optr.eventRecorder = record.NewFakeRecorder(100)
22732272

2273+
ctx := context.Background()
22742274
err := optr.sync(ctx, optr.queueKey())
22752275
if err != nil && tt.wantErr == nil {
22762276
t.Fatalf("Operator.sync() unexpected error: %v", err)
@@ -2651,7 +2651,8 @@ func TestOperator_availableUpdatesSync(t *testing.T) {
26512651
}
26522652
old := optr.availableUpdates
26532653

2654-
err := optr.availableUpdatesSync(optr.queueKey())
2654+
ctx := context.Background()
2655+
err := optr.availableUpdatesSync(ctx, optr.queueKey())
26552656
if err != nil && tt.wantErr == nil {
26562657
t.Fatalf("Operator.sync() unexpected error: %v", err)
26572658
}
@@ -3143,7 +3144,8 @@ func TestOperator_upgradeableSync(t *testing.T) {
31433144
optr.upgradeableChecks = optr.defaultUpgradeableChecks()
31443145
optr.eventRecorder = record.NewFakeRecorder(100)
31453146

3146-
err := optr.upgradeableSync(optr.queueKey())
3147+
ctx := context.Background()
3148+
err := optr.upgradeableSync(ctx, optr.queueKey())
31473149
if err != nil && tt.wantErr == nil {
31483150
t.Fatalf("Operator.sync() unexpected error: %v", err)
31493151
}

pkg/cvo/sync_worker.go

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -988,21 +988,20 @@ func ownerRefModifier(config *configv1.ClusterVersion) resourcebuilder.MetaV1Obj
988988

989989
// runThrottledStatusNotifier invokes fn every time ch is updated, but no more often than once
990990
// every interval. If bucket is non-zero then the channel is throttled like a rate limiter bucket.
991-
func runThrottledStatusNotifier(stopCh <-chan struct{}, interval time.Duration, bucket int, ch <-chan SyncWorkerStatus, fn func()) {
991+
func runThrottledStatusNotifier(ctx context.Context, interval time.Duration, bucket int, ch <-chan SyncWorkerStatus, fn func()) {
992992
// notify the status change function fairly infrequently to avoid updating
993993
// the caller status more frequently than is needed
994994
throttle := rate.NewLimiter(rate.Every(interval), bucket)
995-
wait.Until(func() {
996-
ctx := context.Background()
995+
wait.UntilWithContext(ctx, func(ctx context.Context) {
997996
var last SyncWorkerStatus
998997
for {
999998
select {
1000-
case <-stopCh:
999+
case <-ctx.Done():
10011000
return
10021001
case next := <-ch:
10031002
// only throttle if we aren't on an edge
10041003
if next.Generation == last.Generation && next.Actual == last.Actual && next.Reconciling == last.Reconciling && (next.Failure != nil) == (last.Failure != nil) {
1005-
if err := throttle.Wait(ctx); err != nil {
1004+
if err := throttle.Wait(ctx); err != nil && err != context.Canceled && err != context.DeadlineExceeded {
10061005
utilruntime.HandleError(fmt.Errorf("unable to throttle status notification: %v", err))
10071006
}
10081007
}
@@ -1011,5 +1010,5 @@ func runThrottledStatusNotifier(stopCh <-chan struct{}, interval time.Duration,
10111010
fn()
10121011
}
10131012
}
1014-
}, 1*time.Second, stopCh)
1013+
}, 1*time.Second)
10151014
}

pkg/cvo/sync_worker_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package cvo
22

33
import (
4+
"context"
45
"fmt"
56
"testing"
67
"time"
@@ -146,12 +147,11 @@ func Test_statusWrapper_ReportGeneration(t *testing.T) {
146147
}
147148
}
148149
func Test_runThrottledStatusNotifier(t *testing.T) {
149-
stopCh := make(chan struct{})
150-
defer close(stopCh)
151150
in := make(chan SyncWorkerStatus)
152151
out := make(chan struct{}, 100)
153152

154-
go runThrottledStatusNotifier(stopCh, 30*time.Second, 1, in, func() { out <- struct{}{} })
153+
ctx := context.Background()
154+
go runThrottledStatusNotifier(ctx, 30*time.Second, 1, in, func() { out <- struct{}{} })
155155

156156
in <- SyncWorkerStatus{Actual: configv1.Update{Image: "test"}}
157157
select {

0 commit comments

Comments
 (0)