Skip to content

Commit 9ce6a03

Browse files
committed
pkg: Add CVOConfiguration controller
The controller has an empty reconcile logic as of now. The logic will be implemented later. The goal of this commit is to introduce a new CVO controller, which can optionally (depending on the state of the CVOConfiguration feature gate) create a new informer. The purpose of the `Start` method is to make the creation of a ClusterVersionOperator informer optional, and to make the creation possible later in the CVO logic by restarting the operator informer factory. To decide whether to create the informer, feature gates must be known, which happens normally later in the run when all the other informer factories are already created and inaccessible to the main CVO controller. Related enhancement: [1] [1]: https://github.com/openshift/enhancements/blob/2890cccf20ebcb94fce901f7afb170ca680aa2d9/enhancements/update/cvo-log-level-api.md
1 parent 13fd095 commit 9ce6a03

File tree

3 files changed

+163
-4
lines changed

3 files changed

+163
-4
lines changed
Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
package configuration
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"time"
7+
8+
apierrors "k8s.io/apimachinery/pkg/api/errors"
9+
"k8s.io/client-go/tools/cache"
10+
"k8s.io/client-go/util/workqueue"
11+
"k8s.io/klog/v2"
12+
13+
operatorv1alpha1 "github.com/openshift/api/operator/v1alpha1"
14+
operatorclientset "github.com/openshift/client-go/operator/clientset/versioned"
15+
cvoclientv1alpha1 "github.com/openshift/client-go/operator/clientset/versioned/typed/operator/v1alpha1"
16+
operatorexternalversions "github.com/openshift/client-go/operator/informers/externalversions"
17+
operatorlistersv1alpha1 "github.com/openshift/client-go/operator/listers/operator/v1alpha1"
18+
)
19+
20+
const ClusterVersionOperatorConfigurationName = "cluster"
21+
22+
type ClusterVersionOperatorConfiguration struct {
23+
queueKey string
24+
// queue tracks checking for the CVO configuration.
25+
//
26+
// The type any is used to comply with the worker method of the cvo.Operator struct.
27+
queue workqueue.TypedRateLimitingInterface[any]
28+
29+
client cvoclientv1alpha1.ClusterVersionOperatorInterface
30+
lister operatorlistersv1alpha1.ClusterVersionOperatorLister
31+
factory operatorexternalversions.SharedInformerFactory
32+
33+
started bool
34+
}
35+
36+
func (config *ClusterVersionOperatorConfiguration) Queue() workqueue.TypedRateLimitingInterface[any] {
37+
return config.queue
38+
}
39+
40+
// clusterVersionOperatorEventHandler queues an update for the cluster version operator on any change to the given object.
41+
// Callers should use this with an informer.
42+
func (config *ClusterVersionOperatorConfiguration) clusterVersionOperatorEventHandler() cache.ResourceEventHandler {
43+
return cache.ResourceEventHandlerFuncs{
44+
AddFunc: func(_ interface{}) {
45+
config.queue.Add(config.queueKey)
46+
},
47+
UpdateFunc: func(_, _ interface{}) {
48+
config.queue.Add(config.queueKey)
49+
},
50+
DeleteFunc: func(_ interface{}) {},
51+
}
52+
}
53+
54+
// NewClusterVersionOperatorConfiguration returns ClusterVersionOperatorConfiguration, which might be used
55+
// to synchronize with the ClusterVersionOperator resource.
56+
func NewClusterVersionOperatorConfiguration(client operatorclientset.Interface, factory operatorexternalversions.SharedInformerFactory) *ClusterVersionOperatorConfiguration {
57+
return &ClusterVersionOperatorConfiguration{
58+
queueKey: fmt.Sprintf("ClusterVersionOperator/%s", ClusterVersionOperatorConfigurationName),
59+
queue: workqueue.NewTypedRateLimitingQueueWithConfig[any](
60+
workqueue.DefaultTypedControllerRateLimiter[any](),
61+
workqueue.TypedRateLimitingQueueConfig[any]{Name: "configuration"}),
62+
client: client.OperatorV1alpha1().ClusterVersionOperators(),
63+
factory: factory,
64+
}
65+
}
66+
67+
// Start initializes and starts the configuration's informers. Must be run before Sync is called.
68+
// Blocks until informers caches are synchronized or the context is cancelled.
69+
func (config *ClusterVersionOperatorConfiguration) Start(ctx context.Context) error {
70+
informer := config.factory.Operator().V1alpha1().ClusterVersionOperators()
71+
if _, err := informer.Informer().AddEventHandler(config.clusterVersionOperatorEventHandler()); err != nil {
72+
return err
73+
}
74+
config.lister = informer.Lister()
75+
76+
config.factory.Start(ctx.Done())
77+
synced := config.factory.WaitForCacheSync(ctx.Done())
78+
for _, ok := range synced {
79+
if !ok {
80+
return fmt.Errorf("caches failed to sync: %w", ctx.Err())
81+
}
82+
}
83+
84+
config.started = true
85+
return nil
86+
}
87+
88+
func (config *ClusterVersionOperatorConfiguration) Sync(ctx context.Context, key string) error {
89+
if !config.started {
90+
panic("ClusterVersionOperatorConfiguration instance was not properly started before its synchronization.")
91+
}
92+
startTime := time.Now()
93+
klog.V(2).Infof("Started syncing CVO configuration %q", key)
94+
defer func() {
95+
klog.V(2).Infof("Finished syncing CVO configuration (%v)", time.Since(startTime))
96+
}()
97+
98+
desiredConfig, err := config.lister.Get(ClusterVersionOperatorConfigurationName)
99+
if apierrors.IsNotFound(err) {
100+
return nil
101+
}
102+
if err != nil {
103+
return err
104+
}
105+
return config.sync(ctx, desiredConfig)
106+
}
107+
108+
func (config *ClusterVersionOperatorConfiguration) sync(_ context.Context, _ *operatorv1alpha1.ClusterVersionOperator) error {
109+
klog.Infof("ClusterVersionOperator configuration has been synced")
110+
return nil
111+
}

pkg/cvo/cvo.go

Lines changed: 34 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@ import (
3131
clientset "github.com/openshift/client-go/config/clientset/versioned"
3232
configinformersv1 "github.com/openshift/client-go/config/informers/externalversions/config/v1"
3333
configlistersv1 "github.com/openshift/client-go/config/listers/config/v1"
34+
operatorclientset "github.com/openshift/client-go/operator/clientset/versioned"
35+
operatorexternalversions "github.com/openshift/client-go/operator/informers/externalversions"
3436
"github.com/openshift/library-go/pkg/manifest"
3537
"github.com/openshift/library-go/pkg/verify"
3638
"github.com/openshift/library-go/pkg/verify/store"
@@ -43,6 +45,7 @@ import (
4345
"github.com/openshift/cluster-version-operator/pkg/clusterconditions"
4446
"github.com/openshift/cluster-version-operator/pkg/clusterconditions/standard"
4547
"github.com/openshift/cluster-version-operator/pkg/customsignaturestore"
48+
"github.com/openshift/cluster-version-operator/pkg/cvo/configuration"
4649
cvointernal "github.com/openshift/cluster-version-operator/pkg/cvo/internal"
4750
"github.com/openshift/cluster-version-operator/pkg/cvo/internal/dynamicclient"
4851
"github.com/openshift/cluster-version-operator/pkg/featuregates"
@@ -93,9 +96,10 @@ type Operator struct {
9396
// releaseCreated, if set, is the timestamp of the current update.
9497
releaseCreated time.Time
9598

96-
client clientset.Interface
97-
kubeClient kubernetes.Interface
98-
eventRecorder record.EventRecorder
99+
client clientset.Interface
100+
kubeClient kubernetes.Interface
101+
operatorClient operatorclientset.Interface
102+
eventRecorder record.EventRecorder
99103

100104
// minimumUpdateCheckInterval is the minimum duration to check for updates from
101105
// the update service.
@@ -176,6 +180,9 @@ type Operator struct {
176180
// alwaysEnableCapabilities is a list of the cluster capabilities which should
177181
// always be implicitly enabled.
178182
alwaysEnableCapabilities []configv1.ClusterVersionCapability
183+
184+
// configuration, if enabled, reconciles the ClusterVersionOperator configuration.
185+
configuration *configuration.ClusterVersionOperatorConfiguration
179186
}
180187

181188
// New returns a new cluster version operator.
@@ -190,8 +197,10 @@ func New(
190197
cmConfigInformer informerscorev1.ConfigMapInformer,
191198
cmConfigManagedInformer informerscorev1.ConfigMapInformer,
192199
proxyInformer configinformersv1.ProxyInformer,
200+
operatorInformerFactory operatorexternalversions.SharedInformerFactory,
193201
client clientset.Interface,
194202
kubeClient kubernetes.Interface,
203+
operatorClient operatorclientset.Interface,
195204
exclude string,
196205
clusterProfile string,
197206
promqlTarget clusterconditions.PromQLTarget,
@@ -219,6 +228,7 @@ func New(
219228

220229
client: client,
221230
kubeClient: kubeClient,
231+
operatorClient: operatorClient,
222232
eventRecorder: eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: namespace}),
223233
queue: workqueue.NewTypedRateLimitingQueueWithConfig[any](workqueue.DefaultTypedControllerRateLimiter[any](), workqueue.TypedRateLimitingQueueConfig[any]{Name: "clusterversion"}),
224234
availableUpdatesQueue: workqueue.NewTypedRateLimitingQueueWithConfig[any](workqueue.DefaultTypedControllerRateLimiter[any](), workqueue.TypedRateLimitingQueueConfig[any]{Name: "availableupdates"}),
@@ -262,6 +272,8 @@ func New(
262272
// make sure this is initialized after all the listers are initialized
263273
optr.upgradeableChecks = optr.defaultUpgradeableChecks()
264274

275+
optr.configuration = configuration.NewClusterVersionOperatorConfiguration(operatorClient, operatorInformerFactory)
276+
265277
return optr, nil
266278
}
267279

@@ -408,6 +420,7 @@ func (optr *Operator) Run(runContext context.Context, shutdownContext context.Co
408420
defer optr.queue.ShutDown()
409421
defer optr.availableUpdatesQueue.ShutDown()
410422
defer optr.upgradeableQueue.ShutDown()
423+
defer optr.configuration.Queue().ShutDown()
411424
stopCh := runContext.Done()
412425

413426
klog.Infof("Starting ClusterVersionOperator with minimum reconcile period %s", optr.minimumUpdateCheckInterval)
@@ -446,6 +459,23 @@ func (optr *Operator) Run(runContext context.Context, shutdownContext context.Co
446459
resultChannel <- asyncResult{name: "available updates"}
447460
}()
448461

462+
if optr.enabledFeatureGates.CVOConfiguration() {
463+
resultChannelCount++
464+
go func() {
465+
defer utilruntime.HandleCrash()
466+
if err := optr.configuration.Start(runContext); err != nil {
467+
utilruntime.HandleError(fmt.Errorf("unable to initialize the CVO configuration sync: %v", err))
468+
} else {
469+
wait.UntilWithContext(runContext, func(runContext context.Context) {
470+
optr.worker(runContext, optr.configuration.Queue(), optr.configuration.Sync)
471+
}, time.Second)
472+
}
473+
resultChannel <- asyncResult{name: "cvo configuration"}
474+
}()
475+
} else {
476+
klog.Infof("The ClusterVersionOperatorConfiguration feature gate is disabled; skipping initialization of configuration sync routine")
477+
}
478+
449479
resultChannelCount++
450480
go func() {
451481
defer utilruntime.HandleCrash()
@@ -515,6 +545,7 @@ func (optr *Operator) Run(runContext context.Context, shutdownContext context.Co
515545
optr.queue.ShutDown()
516546
optr.availableUpdatesQueue.ShutDown()
517547
optr.upgradeableQueue.ShutDown()
548+
optr.configuration.Queue().ShutDown()
518549
}
519550
}
520551

pkg/start/start.go

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717
v1 "k8s.io/api/core/v1"
1818
apierrors "k8s.io/apimachinery/pkg/api/errors"
1919
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
20+
"k8s.io/apimachinery/pkg/fields"
2021
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
2122
"k8s.io/apimachinery/pkg/util/wait"
2223
"k8s.io/client-go/informers"
@@ -34,12 +35,15 @@ import (
3435
configv1 "github.com/openshift/api/config/v1"
3536
clientset "github.com/openshift/client-go/config/clientset/versioned"
3637
"github.com/openshift/client-go/config/informers/externalversions"
38+
operatorclientset "github.com/openshift/client-go/operator/clientset/versioned"
39+
operatorexternalversions "github.com/openshift/client-go/operator/informers/externalversions"
3740
"github.com/openshift/library-go/pkg/config/clusterstatus"
3841
libgoleaderelection "github.com/openshift/library-go/pkg/config/leaderelection"
3942

4043
"github.com/openshift/cluster-version-operator/pkg/autoupdate"
4144
"github.com/openshift/cluster-version-operator/pkg/clusterconditions"
4245
"github.com/openshift/cluster-version-operator/pkg/cvo"
46+
"github.com/openshift/cluster-version-operator/pkg/cvo/configuration"
4347
"github.com/openshift/cluster-version-operator/pkg/featuregates"
4448
"github.com/openshift/cluster-version-operator/pkg/internal"
4549
"github.com/openshift/cluster-version-operator/pkg/payload"
@@ -220,6 +224,7 @@ func (o *Options) run(ctx context.Context, controllerCtx *Context, lock resource
220224
controllerCtx.OpenshiftConfigInformerFactory.Start(informersDone)
221225
controllerCtx.OpenshiftConfigManagedInformerFactory.Start(informersDone)
222226
controllerCtx.InformerFactory.Start(informersDone)
227+
controllerCtx.OperatorInformerFactory.Start(informersDone)
223228

224229
allSynced := controllerCtx.CVInformerFactory.WaitForCacheSync(informersDone)
225230
for _, synced := range allSynced {
@@ -390,6 +395,10 @@ func (cb *ClientBuilder) KubeClientOrDie(name string, configFns ...func(*rest.Co
390395
return kubernetes.NewForConfigOrDie(rest.AddUserAgent(cb.RestConfig(configFns...), name))
391396
}
392397

398+
func (cb *ClientBuilder) OperatorClientOrDie(name string, configFns ...func(*rest.Config)) operatorclientset.Interface {
399+
return operatorclientset.NewForConfigOrDie(rest.AddUserAgent(cb.RestConfig(configFns...), name))
400+
}
401+
393402
func newClientBuilder(kubeconfig string) (*ClientBuilder, error) {
394403
clientCfg := clientcmd.NewDefaultClientConfigLoadingRules()
395404
clientCfg.ExplicitPath = kubeconfig
@@ -449,6 +458,7 @@ type Context struct {
449458
OpenshiftConfigInformerFactory informers.SharedInformerFactory
450459
OpenshiftConfigManagedInformerFactory informers.SharedInformerFactory
451460
InformerFactory externalversions.SharedInformerFactory
461+
OperatorInformerFactory operatorexternalversions.SharedInformerFactory
452462

453463
fgLister configlistersv1.FeatureGateLister
454464
}
@@ -458,14 +468,18 @@ type Context struct {
458468
func (o *Options) NewControllerContext(cb *ClientBuilder, alwaysEnableCapabilities []configv1.ClusterVersionCapability) (*Context, error) {
459469
client := cb.ClientOrDie("shared-informer")
460470
kubeClient := cb.KubeClientOrDie(internal.ConfigNamespace, useProtobuf)
471+
operatorClient := cb.OperatorClientOrDie("operator-client")
461472

462473
cvInformer := externalversions.NewFilteredSharedInformerFactory(client, resyncPeriod(o.ResyncInterval), "", func(opts *metav1.ListOptions) {
463474
opts.FieldSelector = fmt.Sprintf("metadata.name=%s", o.Name)
464475
})
465476
openshiftConfigInformer := informers.NewSharedInformerFactoryWithOptions(kubeClient, resyncPeriod(o.ResyncInterval), informers.WithNamespace(internal.ConfigNamespace))
466477
openshiftConfigManagedInformer := informers.NewSharedInformerFactoryWithOptions(kubeClient, resyncPeriod(o.ResyncInterval), informers.WithNamespace(internal.ConfigManagedNamespace))
467-
468478
sharedInformers := externalversions.NewSharedInformerFactory(client, resyncPeriod(o.ResyncInterval))
479+
operatorInformerFactory := operatorexternalversions.NewSharedInformerFactoryWithOptions(operatorClient, o.ResyncInterval,
480+
operatorexternalversions.WithTweakListOptions(func(opts *metav1.ListOptions) {
481+
opts.FieldSelector = fields.OneTermEqualSelector("metadata.name", configuration.ClusterVersionOperatorConfigurationName).String()
482+
}))
469483

470484
coInformer := sharedInformers.Config().V1().ClusterOperators()
471485

@@ -482,8 +496,10 @@ func (o *Options) NewControllerContext(cb *ClientBuilder, alwaysEnableCapabiliti
482496
openshiftConfigInformer.Core().V1().ConfigMaps(),
483497
openshiftConfigManagedInformer.Core().V1().ConfigMaps(),
484498
sharedInformers.Config().V1().Proxies(),
499+
operatorInformerFactory,
485500
cb.ClientOrDie(o.Namespace),
486501
cvoKubeClient,
502+
operatorClient,
487503
o.Exclude,
488504
o.ClusterProfile,
489505
o.PromQLTarget,
@@ -505,6 +521,7 @@ func (o *Options) NewControllerContext(cb *ClientBuilder, alwaysEnableCapabiliti
505521
OpenshiftConfigInformerFactory: openshiftConfigInformer,
506522
OpenshiftConfigManagedInformerFactory: openshiftConfigManagedInformer,
507523
InformerFactory: sharedInformers,
524+
OperatorInformerFactory: operatorInformerFactory,
508525
CVO: cvo,
509526
StopOnFeatureGateChange: featureChangeStopper,
510527

0 commit comments

Comments
 (0)