Skip to content

Commit a49fef5

Browse files
Merge pull request #411 from deads2k/emit-events-on-update
emit events for each new payload
2 parents b658b42 + 475e71f commit a49fef5

File tree

9 files changed

+49
-20
lines changed

9 files changed

+49
-20
lines changed

pkg/cvo/cvo.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -268,6 +268,7 @@ func (optr *Operator) InitializeFromPayload(restConfig *rest.Config, burstRestCo
268268
Steps: 3,
269269
},
270270
optr.exclude,
271+
optr.eventRecorder,
271272
)
272273

273274
return nil

pkg/cvo/cvo_scenarios_test.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,16 +11,16 @@ import (
1111
"github.com/davecgh/go-spew/spew"
1212
"github.com/google/uuid"
1313

14-
"k8s.io/apimachinery/pkg/util/diff"
15-
"k8s.io/apimachinery/pkg/util/wait"
16-
1714
"k8s.io/apimachinery/pkg/api/errors"
1815
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1916
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
2017
"k8s.io/apimachinery/pkg/runtime"
2118
"k8s.io/apimachinery/pkg/runtime/schema"
19+
"k8s.io/apimachinery/pkg/util/diff"
20+
"k8s.io/apimachinery/pkg/util/wait"
2221
dynamicfake "k8s.io/client-go/dynamic/fake"
2322
clientgotesting "k8s.io/client-go/testing"
23+
"k8s.io/client-go/tools/record"
2424
"k8s.io/client-go/util/workqueue"
2525

2626
configv1 "github.com/openshift/api/config/v1"
@@ -77,6 +77,7 @@ func setupCVOTest(payloadDir string) (*Operator, map[string]runtime.Object, *fak
7777
client: client,
7878
cvLister: &clientCVLister{client: client},
7979
exclude: "exclude-test",
80+
eventRecorder: record.NewFakeRecorder(100),
8081
}
8182

8283
dynamicScheme := runtime.NewScheme()
@@ -92,6 +93,7 @@ func setupCVOTest(payloadDir string) (*Operator, map[string]runtime.Object, *fak
9293
Steps: 1,
9394
},
9495
"exclude-test",
96+
record.NewFakeRecorder(100),
9597
)
9698
o.configSync = worker
9799

pkg/cvo/cvo_test.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import (
3030
kfake "k8s.io/client-go/kubernetes/fake"
3131
"k8s.io/client-go/rest"
3232
ktesting "k8s.io/client-go/testing"
33+
"k8s.io/client-go/tools/record"
3334
"k8s.io/client-go/util/workqueue"
3435
"k8s.io/klog"
3536

@@ -2259,6 +2260,7 @@ func TestOperator_sync(t *testing.T) {
22592260
}
22602261
optr.configSync = &fakeSyncRecorder{Returns: expectStatus}
22612262
}
2263+
optr.eventRecorder = record.NewFakeRecorder(100)
22622264

22632265
err := optr.sync(optr.queueKey())
22642266
if err != nil && tt.wantErr == nil {
@@ -2626,6 +2628,7 @@ func TestOperator_availableUpdatesSync(t *testing.T) {
26262628
optr.proxyLister = &clientProxyLister{client: optr.client}
26272629
optr.coLister = &clientCOLister{client: optr.client}
26282630
optr.cvLister = &clientCVLister{client: optr.client}
2631+
optr.eventRecorder = record.NewFakeRecorder(100)
26292632

26302633
if tt.handler != nil {
26312634
s := httptest.NewServer(http.HandlerFunc(tt.handler))
@@ -3129,6 +3132,7 @@ func TestOperator_upgradeableSync(t *testing.T) {
31293132
optr.coLister = &clientCOLister{client: optr.client}
31303133
optr.cvLister = &clientCVLister{client: optr.client}
31313134
optr.upgradeableChecks = optr.defaultUpgradeableChecks()
3135+
optr.eventRecorder = record.NewFakeRecorder(100)
31323136

31333137
err := optr.upgradeableSync(optr.queueKey())
31343138
if err != nil && tt.wantErr == nil {

pkg/cvo/metrics_test.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
dto "github.com/prometheus/client_model/go"
1313
corev1 "k8s.io/api/core/v1"
1414
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
15+
"k8s.io/client-go/tools/record"
1516

1617
configv1 "github.com/openshift/api/config/v1"
1718
)
@@ -512,6 +513,7 @@ func Test_operatorMetrics_Collect(t *testing.T) {
512513
}
513514
for _, tt := range tests {
514515
t.Run(tt.name, func(t *testing.T) {
516+
tt.optr.eventRecorder = record.NewFakeRecorder(100)
515517
if tt.optr.cvLister == nil {
516518
tt.optr.cvLister = &cvLister{}
517519
}
@@ -588,7 +590,8 @@ func Test_operatorMetrics_CollectTransitions(t *testing.T) {
588590
},
589591
},
590592
optr: &Operator{
591-
coLister: &coLister{},
593+
coLister: &coLister{},
594+
eventRecorder: record.NewFakeRecorder(100),
592595
},
593596
wants: func(t *testing.T, metrics []prometheus.Metric) {
594597
if len(metrics) != 5 {

pkg/cvo/status_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77

88
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
99
"k8s.io/apimachinery/pkg/util/diff"
10+
"k8s.io/client-go/tools/record"
1011

1112
configv1 "github.com/openshift/api/config/v1"
1213
"github.com/openshift/client-go/config/clientset/versioned/fake"
@@ -184,6 +185,7 @@ func TestOperator_syncFailingStatus(t *testing.T) {
184185
},
185186
},
186187
),
188+
eventRecorder: record.NewFakeRecorder(100),
187189
},
188190
wantErr: func(t *testing.T, err error) {
189191
if err == nil || err.Error() != "bad" {

pkg/cvo/sync_test.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ import (
99
"sync"
1010
"testing"
1111

12+
"k8s.io/client-go/tools/record"
13+
1214
"github.com/davecgh/go-spew/spew"
1315

1416
"k8s.io/apimachinery/pkg/api/meta"
@@ -125,7 +127,7 @@ func Test_SyncWorker_apply(t *testing.T) {
125127
testMapper.RegisterGVK(schema.GroupVersionKind{"test.cvo.io", "v1", "TestB"}, newTestBuilder(r, test.reactors))
126128
testMapper.AddToMap(resourcebuilder.Mapper)
127129

128-
worker := &SyncWorker{}
130+
worker := &SyncWorker{eventRecorder: record.NewFakeRecorder(100)}
129131
worker.builder = NewResourceBuilder(nil, nil, nil)
130132

131133
ctx, cancel := context.WithCancel(context.Background())
@@ -311,7 +313,7 @@ func Test_SyncWorker_apply_generic(t *testing.T) {
311313
dynamicClient := dynamicfake.NewSimpleDynamicClient(dynamicScheme)
312314

313315
up := &payload.Update{ReleaseImage: "test", ReleaseVersion: "v0.0.0", Manifests: manifests}
314-
worker := &SyncWorker{}
316+
worker := &SyncWorker{eventRecorder: record.NewFakeRecorder(100)}
315317
worker.backoff.Steps = 1
316318
worker.builder = &testResourceBuilder{
317319
client: dynamicClient,

pkg/cvo/sync_worker.go

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,13 @@ import (
1313
configlistersv1 "github.com/openshift/client-go/config/listers/config/v1"
1414
"github.com/prometheus/client_golang/prometheus"
1515
"golang.org/x/time/rate"
16-
"k8s.io/klog"
17-
16+
corev1 "k8s.io/api/core/v1"
1817
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1918
"k8s.io/apimachinery/pkg/util/errors"
2019
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
2120
"k8s.io/apimachinery/pkg/util/wait"
21+
"k8s.io/client-go/tools/record"
22+
"k8s.io/klog"
2223

2324
configv1 "github.com/openshift/api/config/v1"
2425

@@ -131,6 +132,7 @@ type SyncWorker struct {
131132
retriever PayloadRetriever
132133
builder payload.ResourceBuilder
133134
preconditions precondition.List
135+
eventRecorder record.EventRecorder
134136

135137
// minimumReconcileInterval is the minimum time between reconcile attempts, and is
136138
// used to define the maximum backoff interval when syncOnce() returns an error.
@@ -157,11 +159,12 @@ type SyncWorker struct {
157159

158160
// NewSyncWorker initializes a ConfigSyncWorker that will retrieve payloads to disk, apply them via builder
159161
// to a server, and obey limits about how often to reconcile or retry on errors.
160-
func NewSyncWorker(retriever PayloadRetriever, builder payload.ResourceBuilder, reconcileInterval time.Duration, backoff wait.Backoff, exclude string) *SyncWorker {
162+
func NewSyncWorker(retriever PayloadRetriever, builder payload.ResourceBuilder, reconcileInterval time.Duration, backoff wait.Backoff, exclude string, eventRecorder record.EventRecorder) *SyncWorker {
161163
return &SyncWorker{
162-
retriever: retriever,
163-
builder: builder,
164-
backoff: backoff,
164+
retriever: retriever,
165+
builder: builder,
166+
backoff: backoff,
167+
eventRecorder: eventRecorder,
165168

166169
minimumReconcileInterval: reconcileInterval,
167170

@@ -178,8 +181,8 @@ func NewSyncWorker(retriever PayloadRetriever, builder payload.ResourceBuilder,
178181
// NewSyncWorkerWithPreconditions initializes a ConfigSyncWorker that will retrieve payloads to disk, apply them via builder
179182
// to a server, and obey limits about how often to reconcile or retry on errors.
180183
// It allows providing preconditions for loading payload.
181-
func NewSyncWorkerWithPreconditions(retriever PayloadRetriever, builder payload.ResourceBuilder, preconditions precondition.List, reconcileInterval time.Duration, backoff wait.Backoff, exclude string) *SyncWorker {
182-
worker := NewSyncWorker(retriever, builder, reconcileInterval, backoff, exclude)
184+
func NewSyncWorkerWithPreconditions(retriever PayloadRetriever, builder payload.ResourceBuilder, preconditions precondition.List, reconcileInterval time.Duration, backoff wait.Backoff, exclude string, eventRecorder record.EventRecorder) *SyncWorker {
185+
worker := NewSyncWorker(retriever, builder, reconcileInterval, backoff, exclude, eventRecorder)
183186
worker.preconditions = preconditions
184187
return worker
185188
}
@@ -479,6 +482,8 @@ func (w *SyncWorker) syncOnce(ctx context.Context, work *SyncWork, maxWorkers in
479482
validPayload := w.payload
480483
if validPayload == nil || !equalUpdate(configv1.Update{Image: validPayload.ReleaseImage}, configv1.Update{Image: update.Image}) {
481484
klog.V(4).Infof("Loading payload")
485+
cvoObjectRef := &corev1.ObjectReference{APIVersion: "config.openshift.io/v1", Kind: "ClusterVersion", Name: "version", Namespace: "openshift-cluster-version"}
486+
w.eventRecorder.Eventf(cvoObjectRef, corev1.EventTypeNormal, "RetrievePayload", "retrieving payload version=%q image=%q", update.Version, update.Image)
482487
reporter.Report(SyncWorkerStatus{
483488
Generation: work.Generation,
484489
Step: "RetrievePayload",
@@ -488,6 +493,7 @@ func (w *SyncWorker) syncOnce(ctx context.Context, work *SyncWork, maxWorkers in
488493
})
489494
info, err := w.retriever.RetrievePayload(ctx, update)
490495
if err != nil {
496+
w.eventRecorder.Eventf(cvoObjectRef, corev1.EventTypeWarning, "RetrievePayloadFailed", "retrieving payload failed version=%q image=%q failure=%v", update.Version, update.Image, err)
491497
reporter.Report(SyncWorkerStatus{
492498
Generation: work.Generation,
493499
Failure: err,
@@ -499,8 +505,10 @@ func (w *SyncWorker) syncOnce(ctx context.Context, work *SyncWork, maxWorkers in
499505
return err
500506
}
501507

508+
w.eventRecorder.Eventf(cvoObjectRef, corev1.EventTypeNormal, "VerifyPayload", "verifying payload version=%q image=%q", update.Version, update.Image)
502509
payloadUpdate, err := payload.LoadUpdate(info.Directory, update.Image, w.exclude)
503510
if err != nil {
511+
w.eventRecorder.Eventf(cvoObjectRef, corev1.EventTypeWarning, "VerifyPayloadFailed", "verifying payload failed version=%q image=%q failure=%v", update.Version, update.Image, err)
504512
reporter.Report(SyncWorkerStatus{
505513
Generation: work.Generation,
506514
Failure: err,
@@ -532,7 +540,9 @@ func (w *SyncWorker) syncOnce(ctx context.Context, work *SyncWork, maxWorkers in
532540
if err := precondition.Summarize(w.preconditions.RunAll(ctx, precondition.ReleaseContext{DesiredVersion: payloadUpdate.ReleaseVersion}, clusterVersion)); err != nil {
533541
if update.Force {
534542
klog.V(4).Infof("Forcing past precondition failures: %s", err)
543+
w.eventRecorder.Eventf(cvoObjectRef, corev1.EventTypeWarning, "PreconditionsForced", "preconditions forced for payload loaded version=%q image=%q failures=%v", update.Version, update.Image, err)
535544
} else {
545+
w.eventRecorder.Eventf(cvoObjectRef, corev1.EventTypeWarning, "PreconditionsFailed", "preconditions failed for payload loaded version=%q image=%q failures=%v", update.Version, update.Image, err)
536546
reporter.Report(SyncWorkerStatus{
537547
Generation: work.Generation,
538548
Failure: err,
@@ -545,9 +555,11 @@ func (w *SyncWorker) syncOnce(ctx context.Context, work *SyncWork, maxWorkers in
545555
return err
546556
}
547557
}
558+
w.eventRecorder.Eventf(cvoObjectRef, corev1.EventTypeNormal, "PreconditionsPassed", "preconditions passed for payload loaded version=%q image=%q", update.Version, update.Image)
548559
}
549560

550561
w.payload = payloadUpdate
562+
w.eventRecorder.Eventf(cvoObjectRef, corev1.EventTypeNormal, "PayloadLoaded", "payload loaded version=%q image=%q", update.Version, update.Image)
551563
klog.V(4).Infof("Payload loaded from %s with hash %s", payloadUpdate.ReleaseImage, payloadUpdate.ManifestHash)
552564
}
553565

pkg/cvo/sync_worker_test.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ import (
55
"testing"
66
"time"
77

8+
"k8s.io/client-go/tools/record"
9+
810
configv1 "github.com/openshift/api/config/v1"
911
)
1012

@@ -77,7 +79,7 @@ func Test_statusWrapper_ReportProgress(t *testing.T) {
7779
w := &statusWrapper{
7880
previousStatus: &tt.previous,
7981
}
80-
w.w = &SyncWorker{report: make(chan SyncWorkerStatus, 1)}
82+
w.w = &SyncWorker{report: make(chan SyncWorkerStatus, 1), eventRecorder: record.NewFakeRecorder(100)}
8183
w.Report(tt.next)
8284
close(w.w.report)
8385
if tt.want {
@@ -130,7 +132,7 @@ func Test_statusWrapper_ReportGeneration(t *testing.T) {
130132
w := &statusWrapper{
131133
previousStatus: &tt.previous,
132134
}
133-
w.w = &SyncWorker{report: make(chan SyncWorkerStatus, 1)}
135+
w.w = &SyncWorker{report: make(chan SyncWorkerStatus, 1), eventRecorder: record.NewFakeRecorder(100)}
134136
w.Report(tt.next)
135137
close(w.w.report)
136138

pkg/start/start_integration_test.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828
randutil "k8s.io/apimachinery/pkg/util/rand"
2929
"k8s.io/apimachinery/pkg/util/wait"
3030
"k8s.io/client-go/kubernetes"
31+
"k8s.io/client-go/tools/record"
3132
"k8s.io/klog"
3233

3334
configv1 "github.com/openshift/api/config/v1"
@@ -240,7 +241,7 @@ func TestIntegrationCVO_initializeAndUpgrade(t *testing.T) {
240241
options.PayloadOverride = filepath.Join(dir, "ignored")
241242
controllers := options.NewControllerContext(cb)
242243

243-
worker := cvo.NewSyncWorker(retriever, cvo.NewResourceBuilder(cfg, cfg, nil), 5*time.Second, wait.Backoff{Steps: 3}, "")
244+
worker := cvo.NewSyncWorker(retriever, cvo.NewResourceBuilder(cfg, cfg, nil), 5*time.Second, wait.Backoff{Steps: 3}, "", record.NewFakeRecorder(100))
244245
controllers.CVO.SetSyncWorkerForTesting(worker)
245246

246247
ctx, cancel := context.WithCancel(context.Background())
@@ -392,7 +393,7 @@ func TestIntegrationCVO_initializeAndHandleError(t *testing.T) {
392393
options.ResyncInterval = 3 * time.Second
393394
controllers := options.NewControllerContext(cb)
394395

395-
worker := cvo.NewSyncWorker(retriever, cvo.NewResourceBuilder(cfg, cfg, nil), 5*time.Second, wait.Backoff{Duration: time.Second, Factor: 1.2}, "")
396+
worker := cvo.NewSyncWorker(retriever, cvo.NewResourceBuilder(cfg, cfg, nil), 5*time.Second, wait.Backoff{Duration: time.Second, Factor: 1.2}, "", record.NewFakeRecorder(100))
396397
controllers.CVO.SetSyncWorkerForTesting(worker)
397398

398399
ctx, cancel := context.WithCancel(context.Background())
@@ -497,7 +498,7 @@ func TestIntegrationCVO_gracefulStepDown(t *testing.T) {
497498
options.NodeName = "test-node"
498499
controllers := options.NewControllerContext(cb)
499500

500-
worker := cvo.NewSyncWorker(&mapPayloadRetriever{}, cvo.NewResourceBuilder(cfg, cfg, nil), 5*time.Second, wait.Backoff{Steps: 3}, "")
501+
worker := cvo.NewSyncWorker(&mapPayloadRetriever{}, cvo.NewResourceBuilder(cfg, cfg, nil), 5*time.Second, wait.Backoff{Steps: 3}, "", record.NewFakeRecorder(100))
501502
controllers.CVO.SetSyncWorkerForTesting(worker)
502503

503504
lock, err := createResourceLock(cb, ns, ns)
@@ -669,7 +670,7 @@ metadata:
669670
t.Fatal(err)
670671
}
671672

672-
worker := cvo.NewSyncWorker(retriever, cvo.NewResourceBuilder(cfg, cfg, nil), 5*time.Second, wait.Backoff{Steps: 3}, "")
673+
worker := cvo.NewSyncWorker(retriever, cvo.NewResourceBuilder(cfg, cfg, nil), 5*time.Second, wait.Backoff{Steps: 3}, "", record.NewFakeRecorder(100))
673674
controllers.CVO.SetSyncWorkerForTesting(worker)
674675

675676
ctx, cancel := context.WithCancel(context.Background())

0 commit comments

Comments
 (0)