@@ -37,6 +37,8 @@ import (
37
37
"github.com/openshift/cluster-version-operator/pkg/cvo/internal/dynamicclient"
38
38
"github.com/openshift/cluster-version-operator/pkg/internal"
39
39
"github.com/openshift/cluster-version-operator/pkg/payload"
40
+ "github.com/openshift/cluster-version-operator/pkg/payload/precondition"
41
+ preconditioncv "github.com/openshift/cluster-version-operator/pkg/payload/precondition/clusterversion"
40
42
"github.com/openshift/cluster-version-operator/pkg/verify"
41
43
)
42
44
@@ -113,11 +115,18 @@ type Operator struct {
113
115
queue workqueue.RateLimitingInterface
114
116
// availableUpdatesQueue tracks checking for updates from the update server.
115
117
availableUpdatesQueue workqueue.RateLimitingInterface
118
+ // upgradeableQueue tracks checking for upgradeable.
119
+ upgradeableQueue workqueue.RateLimitingInterface
116
120
117
121
// statusLock guards access to modifying available updates
118
122
statusLock sync.Mutex
119
123
availableUpdates * availableUpdates
120
124
125
+ // upgradeableStatusLock guards access to modifying Upgradeable conditions
126
+ upgradeableStatusLock sync.Mutex
127
+ upgradeable * upgradeable
128
+ upgradeableChecks []upgradeableCheck
129
+
121
130
// verifier, if provided, will be used to check an update before it is executed.
122
131
// Any error will prevent an update payload from being accessed.
123
132
verifier verify.Interface
@@ -171,11 +180,9 @@ func New(
171
180
172
181
queue : workqueue .NewNamedRateLimitingQueue (workqueue .DefaultControllerRateLimiter (), "clusterversion" ),
173
182
availableUpdatesQueue : workqueue .NewNamedRateLimitingQueue (workqueue .DefaultControllerRateLimiter (), "availableupdates" ),
183
+ upgradeableQueue : workqueue .NewNamedRateLimitingQueue (workqueue .DefaultControllerRateLimiter (), "upgradeable" ),
174
184
}
175
185
176
- optr .proxyLister = proxyInformer .Lister ()
177
- proxyInformer .Informer ().AddEventHandler (optr .eventHandler ())
178
-
179
186
cvInformer .Informer ().AddEventHandler (optr .eventHandler ())
180
187
181
188
optr .coLister = coInformer .Lister ()
@@ -184,8 +191,12 @@ func New(
184
191
optr .cvLister = cvInformer .Lister ()
185
192
optr .cacheSynced = append (optr .cacheSynced , cvInformer .Informer ().HasSynced )
186
193
194
+ optr .proxyLister = proxyInformer .Lister ()
187
195
optr .cmConfigLister = cmConfigInformer .Lister ().ConfigMaps (internal .ConfigNamespace )
188
196
197
+ // make sure this is initialized after all the listers are initialized
198
+ optr .upgradeableChecks = optr .defaultUpgradeableChecks ()
199
+
189
200
if enableMetrics {
190
201
if err := optr .registerMetrics (coInformer .Informer ()); err != nil {
191
202
panic (err )
@@ -225,9 +236,10 @@ func (optr *Operator) InitializeFromPayload(restConfig *rest.Config, burstRestCo
225
236
226
237
// after the verifier has been loaded, initialize the sync worker with a payload retriever
227
238
// which will consume the verifier
228
- optr .configSync = NewSyncWorker (
239
+ optr .configSync = NewSyncWorkerWithPreconditions (
229
240
optr .defaultPayloadRetriever (),
230
241
NewResourceBuilder (restConfig , burstRestConfig , optr .coLister ),
242
+ optr .defaultPreconditionChecks (),
231
243
optr .minimumUpdateCheckInterval ,
232
244
wait.Backoff {
233
245
Duration : time .Second * 10 ,
@@ -263,6 +275,7 @@ func (optr *Operator) Run(ctx context.Context, workers int) {
263
275
go runThrottledStatusNotifier (stopCh , optr .statusInterval , 2 , optr .configSync .StatusCh (), func () { optr .queue .Add (optr .queueKey ()) })
264
276
go optr .configSync .Start (ctx , 16 )
265
277
go wait .Until (func () { optr .worker (optr .availableUpdatesQueue , optr .availableUpdatesSync ) }, time .Second , stopCh )
278
+ go wait .Until (func () { optr .worker (optr .upgradeableQueue , optr .upgradeableSync ) }, time .Second , stopCh )
266
279
go wait .Until (func () {
267
280
defer close (workerStopCh )
268
281
@@ -292,10 +305,12 @@ func (optr *Operator) eventHandler() cache.ResourceEventHandler {
292
305
AddFunc : func (obj interface {}) {
293
306
optr .queue .Add (workQueueKey )
294
307
optr .availableUpdatesQueue .Add (workQueueKey )
308
+ optr .upgradeableQueue .Add (workQueueKey )
295
309
},
296
310
UpdateFunc : func (old , new interface {}) {
297
311
optr .queue .Add (workQueueKey )
298
312
optr .availableUpdatesQueue .Add (workQueueKey )
313
+ optr .upgradeableQueue .Add (workQueueKey )
299
314
},
300
315
DeleteFunc : func (obj interface {}) {
301
316
optr .queue .Add (workQueueKey )
@@ -432,6 +447,29 @@ func (optr *Operator) availableUpdatesSync(key string) error {
432
447
return optr .syncAvailableUpdates (config )
433
448
}
434
449
450
+ // upgradeableSync is triggered on cluster version change (and periodic requeues) to
451
+ // sync upgradeableCondition. It only modifies cluster version.
452
+ func (optr * Operator ) upgradeableSync (key string ) error {
453
+ startTime := time .Now ()
454
+ klog .V (4 ).Infof ("Started syncing upgradeable %q (%v)" , key , startTime )
455
+ defer func () {
456
+ klog .V (4 ).Infof ("Finished syncing upgradeable %q (%v)" , key , time .Since (startTime ))
457
+ }()
458
+
459
+ config , err := optr .cvLister .Get (optr .name )
460
+ if apierrors .IsNotFound (err ) {
461
+ return nil
462
+ }
463
+ if err != nil {
464
+ return err
465
+ }
466
+ if errs := validation .ValidateClusterVersion (config ); len (errs ) > 0 {
467
+ return nil
468
+ }
469
+
470
+ return optr .syncUpgradeable (config )
471
+ }
472
+
435
473
// isOlderThanLastUpdate returns true if the cluster version is older than
436
474
// the last update we saw.
437
475
func (optr * Operator ) isOlderThanLastUpdate (config * configv1.ClusterVersion ) bool {
@@ -608,3 +646,9 @@ func hasReachedLevel(cv *configv1.ClusterVersion, desired configv1.Update) bool
608
646
}
609
647
return desired .Image == cv .Status .History [0 ].Image
610
648
}
649
+
650
+ func (optr * Operator ) defaultPreconditionChecks () precondition.List {
651
+ return []precondition.Precondition {
652
+ preconditioncv .NewUpgradeable (optr .cvLister ),
653
+ }
654
+ }
0 commit comments