Skip to content

Commit d2a8c73

Browse files
Merge pull request #1163 from DavidHurta/cvo-configuration-controller
OTA-209: Add CVOConfiguration controller
2 parents 36d66b0 + 63878c0 commit d2a8c73

File tree

118 files changed

+7269
-5
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

118 files changed

+7269
-5
lines changed

cmd/start.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ func init() {
3939
cmd.PersistentFlags().StringVar(&opts.PromQLTarget.KubeSvc.Name, "metrics-service", opts.PromQLTarget.KubeSvc.Name, "The name of the remote PromQL query service. Must be specified when --use-dns-for-services is disabled.")
4040
cmd.PersistentFlags().BoolVar(&opts.PromQLTarget.UseDNS, "use-dns-for-services", opts.PromQLTarget.UseDNS, "Configures the CVO to use DNS for resolution of services in the cluster.")
4141
cmd.PersistentFlags().StringVar(&opts.PrometheusURLString, "metrics-url", opts.PrometheusURLString, "The URL used to access the remote PromQL query service.")
42-
cmd.PersistentFlags().BoolVar(&opts.InjectClusterIdIntoPromQL, "hypershift", opts.InjectClusterIdIntoPromQL, "This options indicates whether the CVO is running inside a hosted control plane.")
42+
cmd.PersistentFlags().BoolVar(&opts.HyperShift, "hypershift", opts.HyperShift, "This options indicates whether the CVO is running inside a hosted control plane.")
4343
cmd.PersistentFlags().StringVar(&opts.UpdateService, "update-service", opts.UpdateService, "The preferred update service. If set, this option overrides any upstream value configured in ClusterVersion spec.")
4444
cmd.PersistentFlags().StringSliceVar(&opts.AlwaysEnableCapabilities, "always-enable-capabilities", opts.AlwaysEnableCapabilities, "List of the cluster capabilities which will always be implicitly enabled.")
4545
rootCmd.AddCommand(cmd)
Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
package configuration
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"time"
7+
8+
apierrors "k8s.io/apimachinery/pkg/api/errors"
9+
"k8s.io/client-go/tools/cache"
10+
"k8s.io/client-go/util/workqueue"
11+
"k8s.io/klog/v2"
12+
13+
operatorv1alpha1 "github.com/openshift/api/operator/v1alpha1"
14+
operatorclientset "github.com/openshift/client-go/operator/clientset/versioned"
15+
cvoclientv1alpha1 "github.com/openshift/client-go/operator/clientset/versioned/typed/operator/v1alpha1"
16+
operatorexternalversions "github.com/openshift/client-go/operator/informers/externalversions"
17+
operatorlistersv1alpha1 "github.com/openshift/client-go/operator/listers/operator/v1alpha1"
18+
)
19+
20+
const ClusterVersionOperatorConfigurationName = "cluster"
21+
22+
type ClusterVersionOperatorConfiguration struct {
23+
queueKey string
24+
// queue tracks checking for the CVO configuration.
25+
//
26+
// The type any is used to comply with the worker method of the cvo.Operator struct.
27+
queue workqueue.TypedRateLimitingInterface[any]
28+
29+
client cvoclientv1alpha1.ClusterVersionOperatorInterface
30+
lister operatorlistersv1alpha1.ClusterVersionOperatorLister
31+
factory operatorexternalversions.SharedInformerFactory
32+
33+
started bool
34+
}
35+
36+
func (config *ClusterVersionOperatorConfiguration) Queue() workqueue.TypedRateLimitingInterface[any] {
37+
return config.queue
38+
}
39+
40+
// clusterVersionOperatorEventHandler queues an update for the cluster version operator on any change to the given object.
41+
// Callers should use this with an informer.
42+
func (config *ClusterVersionOperatorConfiguration) clusterVersionOperatorEventHandler() cache.ResourceEventHandler {
43+
return cache.ResourceEventHandlerFuncs{
44+
AddFunc: func(_ interface{}) {
45+
config.queue.Add(config.queueKey)
46+
},
47+
UpdateFunc: func(_, _ interface{}) {
48+
config.queue.Add(config.queueKey)
49+
},
50+
DeleteFunc: func(_ interface{}) {
51+
config.queue.Add(config.queueKey)
52+
},
53+
}
54+
}
55+
56+
// NewClusterVersionOperatorConfiguration returns ClusterVersionOperatorConfiguration, which might be used
57+
// to synchronize with the ClusterVersionOperator resource.
58+
func NewClusterVersionOperatorConfiguration(client operatorclientset.Interface, factory operatorexternalversions.SharedInformerFactory) *ClusterVersionOperatorConfiguration {
59+
return &ClusterVersionOperatorConfiguration{
60+
queueKey: fmt.Sprintf("ClusterVersionOperator/%s", ClusterVersionOperatorConfigurationName),
61+
queue: workqueue.NewTypedRateLimitingQueueWithConfig[any](
62+
workqueue.DefaultTypedControllerRateLimiter[any](),
63+
workqueue.TypedRateLimitingQueueConfig[any]{Name: "configuration"}),
64+
client: client.OperatorV1alpha1().ClusterVersionOperators(),
65+
factory: factory,
66+
}
67+
}
68+
69+
// Start initializes and starts the configuration's informers. Must be run before Sync is called.
70+
// Blocks until informers caches are synchronized or the context is cancelled.
71+
func (config *ClusterVersionOperatorConfiguration) Start(ctx context.Context) error {
72+
informer := config.factory.Operator().V1alpha1().ClusterVersionOperators()
73+
if _, err := informer.Informer().AddEventHandler(config.clusterVersionOperatorEventHandler()); err != nil {
74+
return err
75+
}
76+
config.lister = informer.Lister()
77+
78+
config.factory.Start(ctx.Done())
79+
synced := config.factory.WaitForCacheSync(ctx.Done())
80+
for _, ok := range synced {
81+
if !ok {
82+
return fmt.Errorf("caches failed to sync: %w", ctx.Err())
83+
}
84+
}
85+
86+
config.started = true
87+
return nil
88+
}
89+
90+
func (config *ClusterVersionOperatorConfiguration) Sync(ctx context.Context, key string) error {
91+
if !config.started {
92+
panic("ClusterVersionOperatorConfiguration instance was not properly started before its synchronization.")
93+
}
94+
startTime := time.Now()
95+
klog.V(2).Infof("Started syncing CVO configuration %q", key)
96+
defer func() {
97+
klog.V(2).Infof("Finished syncing CVO configuration (%v)", time.Since(startTime))
98+
}()
99+
100+
desiredConfig, err := config.lister.Get(ClusterVersionOperatorConfigurationName)
101+
if apierrors.IsNotFound(err) {
102+
// TODO: Set default values
103+
return nil
104+
}
105+
if err != nil {
106+
return err
107+
}
108+
return config.sync(ctx, desiredConfig)
109+
}
110+
111+
func (config *ClusterVersionOperatorConfiguration) sync(_ context.Context, _ *operatorv1alpha1.ClusterVersionOperator) error {
112+
klog.Infof("ClusterVersionOperator configuration has been synced")
113+
return nil
114+
}

pkg/cvo/cvo.go

Lines changed: 47 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@ import (
3131
clientset "github.com/openshift/client-go/config/clientset/versioned"
3232
configinformersv1 "github.com/openshift/client-go/config/informers/externalversions/config/v1"
3333
configlistersv1 "github.com/openshift/client-go/config/listers/config/v1"
34+
operatorclientset "github.com/openshift/client-go/operator/clientset/versioned"
35+
operatorexternalversions "github.com/openshift/client-go/operator/informers/externalversions"
3436
"github.com/openshift/library-go/pkg/manifest"
3537
"github.com/openshift/library-go/pkg/verify"
3638
"github.com/openshift/library-go/pkg/verify/store"
@@ -42,6 +44,7 @@ import (
4244
"github.com/openshift/cluster-version-operator/pkg/clusterconditions"
4345
"github.com/openshift/cluster-version-operator/pkg/clusterconditions/standard"
4446
"github.com/openshift/cluster-version-operator/pkg/customsignaturestore"
47+
"github.com/openshift/cluster-version-operator/pkg/cvo/configuration"
4548
cvointernal "github.com/openshift/cluster-version-operator/pkg/cvo/internal"
4649
"github.com/openshift/cluster-version-operator/pkg/cvo/internal/dynamicclient"
4750
"github.com/openshift/cluster-version-operator/pkg/featuregates"
@@ -92,9 +95,10 @@ type Operator struct {
9295
// releaseCreated, if set, is the timestamp of the current update.
9396
releaseCreated time.Time
9497

95-
client clientset.Interface
96-
kubeClient kubernetes.Interface
97-
eventRecorder record.EventRecorder
98+
client clientset.Interface
99+
kubeClient kubernetes.Interface
100+
operatorClient operatorclientset.Interface
101+
eventRecorder record.EventRecorder
98102

99103
// minimumUpdateCheckInterval is the minimum duration to check for updates from
100104
// the update service.
@@ -141,6 +145,9 @@ type Operator struct {
141145
// conditionRegistry is used to evaluate whether a particular condition is risky or not.
142146
conditionRegistry clusterconditions.ConditionRegistry
143147

148+
// hypershift signals whether the CVO is running inside a hosted control plane.
149+
hypershift bool
150+
144151
// injectClusterIdIntoPromQL indicates whether the CVO should inject the cluster id
145152
// into PromQL queries while evaluating risks from conditional updates. This is needed
146153
// in HyperShift to differentiate between metrics from multiple hosted clusters in
@@ -175,6 +182,9 @@ type Operator struct {
175182
// alwaysEnableCapabilities is a list of the cluster capabilities which should
176183
// always be implicitly enabled.
177184
alwaysEnableCapabilities []configv1.ClusterVersionCapability
185+
186+
// configuration, if enabled, reconciles the ClusterVersionOperator configuration.
187+
configuration *configuration.ClusterVersionOperatorConfiguration
178188
}
179189

180190
// New returns a new cluster version operator.
@@ -189,10 +199,13 @@ func New(
189199
cmConfigInformer informerscorev1.ConfigMapInformer,
190200
cmConfigManagedInformer informerscorev1.ConfigMapInformer,
191201
proxyInformer configinformersv1.ProxyInformer,
202+
operatorInformerFactory operatorexternalversions.SharedInformerFactory,
192203
client clientset.Interface,
193204
kubeClient kubernetes.Interface,
205+
operatorClient operatorclientset.Interface,
194206
exclude string,
195207
clusterProfile string,
208+
hypershift bool,
196209
promqlTarget clusterconditions.PromQLTarget,
197210
injectClusterIdIntoPromQL bool,
198211
updateService string,
@@ -218,11 +231,13 @@ func New(
218231

219232
client: client,
220233
kubeClient: kubeClient,
234+
operatorClient: operatorClient,
221235
eventRecorder: eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: namespace}),
222236
queue: workqueue.NewTypedRateLimitingQueueWithConfig[any](workqueue.DefaultTypedControllerRateLimiter[any](), workqueue.TypedRateLimitingQueueConfig[any]{Name: "clusterversion"}),
223237
availableUpdatesQueue: workqueue.NewTypedRateLimitingQueueWithConfig[any](workqueue.DefaultTypedControllerRateLimiter[any](), workqueue.TypedRateLimitingQueueConfig[any]{Name: "availableupdates"}),
224238
upgradeableQueue: workqueue.NewTypedRateLimitingQueueWithConfig[any](workqueue.DefaultTypedControllerRateLimiter[any](), workqueue.TypedRateLimitingQueueConfig[any]{Name: "upgradeable"}),
225239

240+
hypershift: hypershift,
226241
exclude: exclude,
227242
clusterProfile: clusterProfile,
228243
conditionRegistry: standard.NewConditionRegistry(promqlTarget),
@@ -261,6 +276,8 @@ func New(
261276
// make sure this is initialized after all the listers are initialized
262277
optr.upgradeableChecks = optr.defaultUpgradeableChecks()
263278

279+
optr.configuration = configuration.NewClusterVersionOperatorConfiguration(operatorClient, operatorInformerFactory)
280+
264281
return optr, nil
265282
}
266283

@@ -407,6 +424,7 @@ func (optr *Operator) Run(runContext context.Context, shutdownContext context.Co
407424
defer optr.queue.ShutDown()
408425
defer optr.availableUpdatesQueue.ShutDown()
409426
defer optr.upgradeableQueue.ShutDown()
427+
defer optr.configuration.Queue().ShutDown()
410428
stopCh := runContext.Done()
411429

412430
klog.Infof("Starting ClusterVersionOperator with minimum reconcile period %s", optr.minimumUpdateCheckInterval)
@@ -445,6 +463,23 @@ func (optr *Operator) Run(runContext context.Context, shutdownContext context.Co
445463
resultChannel <- asyncResult{name: "available updates"}
446464
}()
447465

466+
if optr.shouldReconcileCVOConfiguration() {
467+
resultChannelCount++
468+
go func() {
469+
defer utilruntime.HandleCrash()
470+
if err := optr.configuration.Start(runContext); err != nil {
471+
utilruntime.HandleError(fmt.Errorf("unable to initialize the CVO configuration sync: %v", err))
472+
} else {
473+
wait.UntilWithContext(runContext, func(runContext context.Context) {
474+
optr.worker(runContext, optr.configuration.Queue(), optr.configuration.Sync)
475+
}, time.Second)
476+
}
477+
resultChannel <- asyncResult{name: "cvo configuration"}
478+
}()
479+
} else {
480+
klog.Infof("The ClusterVersionOperatorConfiguration feature gate is disabled or HyperShift is detected; the configuration sync routine will not run.")
481+
}
482+
448483
resultChannelCount++
449484
go func() {
450485
defer utilruntime.HandleCrash()
@@ -514,6 +549,7 @@ func (optr *Operator) Run(runContext context.Context, shutdownContext context.Co
514549
optr.queue.ShutDown()
515550
optr.availableUpdatesQueue.ShutDown()
516551
optr.upgradeableQueue.ShutDown()
552+
optr.configuration.Queue().ShutDown()
517553
}
518554
}
519555

@@ -1010,3 +1046,11 @@ func (optr *Operator) HTTPClient() (*http.Client, error) {
10101046
Transport: transport,
10111047
}, nil
10121048
}
1049+
1050+
// shouldReconcileCVOConfiguration returns whether the CVO should reconcile its configuration using the API server.
1051+
//
1052+
// enabledFeatureGates must be initialized before the function is called.
1053+
func (optr *Operator) shouldReconcileCVOConfiguration() bool {
1054+
// The relevant CRD and CR are not applied in HyperShift, which configures the CVO via a configuration file
1055+
return optr.enabledFeatureGates.CVOConfiguration() && !optr.hypershift
1056+
}

pkg/cvo/status_test.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,7 @@ type fakeRiFlags struct {
202202
unknownVersion bool
203203
reconciliationIssuesCondition bool
204204
statusReleaseArchitecture bool
205+
cvoConfiguration bool
205206
}
206207

207208
func (f fakeRiFlags) UnknownVersion() bool {
@@ -216,6 +217,10 @@ func (f fakeRiFlags) StatusReleaseArchitecture() bool {
216217
return f.statusReleaseArchitecture
217218
}
218219

220+
func (f fakeRiFlags) CVOConfiguration() bool {
221+
return f.cvoConfiguration
222+
}
223+
219224
func TestUpdateClusterVersionStatus_UnknownVersionAndReconciliationIssues(t *testing.T) {
220225
ignoreLastTransitionTime := cmpopts.IgnoreFields(configv1.ClusterOperatorStatusCondition{}, "LastTransitionTime")
221226

pkg/featuregates/featuregates.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,10 @@ type CvoGateChecker interface {
2929
// StatusReleaseArchitecture controls whether CVO populates
3030
// Release.Architecture in status properties like status.desired and status.history[].
3131
StatusReleaseArchitecture() bool
32+
33+
// CVOConfiguration controls whether the CVO reconciles the ClusterVersionOperator resource that corresponds
34+
// to its configuration.
35+
CVOConfiguration() bool
3236
}
3337

3438
type panicOnUsageBeforeInitializationFunc func()
@@ -56,6 +60,11 @@ func (p panicOnUsageBeforeInitializationFunc) UnknownVersion() bool {
5660
return false
5761
}
5862

63+
func (p panicOnUsageBeforeInitializationFunc) CVOConfiguration() bool {
64+
p()
65+
return false
66+
}
67+
5968
// CvoGates contains flags that control CVO functionality gated by product feature gates. The
6069
// names do not correspond to product feature gates, the booleans here are "smaller" (product-level
6170
// gate will enable multiple CVO behaviors).
@@ -68,6 +77,7 @@ type CvoGates struct {
6877
unknownVersion bool
6978
reconciliationIssuesCondition bool
7079
statusReleaseArchitecture bool
80+
cvoConfiguration bool
7181
}
7282

7383
func (c CvoGates) ReconciliationIssuesCondition() bool {
@@ -82,13 +92,18 @@ func (c CvoGates) UnknownVersion() bool {
8292
return c.unknownVersion
8393
}
8494

95+
func (c CvoGates) CVOConfiguration() bool {
96+
return c.cvoConfiguration
97+
}
98+
8599
// DefaultCvoGates apply when actual features for given version are unknown
86100
func DefaultCvoGates(version string) CvoGates {
87101
return CvoGates{
88102
desiredVersion: version,
89103
unknownVersion: true,
90104
reconciliationIssuesCondition: false,
91105
statusReleaseArchitecture: false,
106+
cvoConfiguration: false,
92107
}
93108
}
94109

@@ -110,6 +125,8 @@ func CvoGatesFromFeatureGate(gate *configv1.FeatureGate, version string) CvoGate
110125
enabledGates.reconciliationIssuesCondition = true
111126
case features.FeatureGateImageStreamImportMode:
112127
enabledGates.statusReleaseArchitecture = true
128+
case features.FeatureGateCVOConfiguration:
129+
enabledGates.cvoConfiguration = true
113130
}
114131
}
115132
for _, disabled := range g.Disabled {
@@ -118,6 +135,8 @@ func CvoGatesFromFeatureGate(gate *configv1.FeatureGate, version string) CvoGate
118135
enabledGates.reconciliationIssuesCondition = false
119136
case features.FeatureGateImageStreamImportMode:
120137
enabledGates.statusReleaseArchitecture = false
138+
case features.FeatureGateCVOConfiguration:
139+
enabledGates.cvoConfiguration = false
121140
}
122141
}
123142
}

0 commit comments

Comments
 (0)