diff --git a/pkg/cvo/configuration/configuration.go b/pkg/cvo/configuration/configuration.go index 66782421e..9e1e0861e 100644 --- a/pkg/cvo/configuration/configuration.go +++ b/pkg/cvo/configuration/configuration.go @@ -5,18 +5,12 @@ import ( "fmt" "time" - apierrors "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" "k8s.io/klog/v2" operatorv1 "github.com/openshift/api/operator/v1" - operatorv1alpha1 "github.com/openshift/api/operator/v1alpha1" operatorclientset "github.com/openshift/client-go/operator/clientset/versioned" - cvoclientv1alpha1 "github.com/openshift/client-go/operator/clientset/versioned/typed/operator/v1alpha1" operatorexternalversions "github.com/openshift/client-go/operator/informers/externalversions" - operatorlistersv1alpha1 "github.com/openshift/client-go/operator/listers/operator/v1alpha1" "github.com/openshift/library-go/pkg/operator/loglevel" i "github.com/openshift/cluster-version-operator/pkg/internal" @@ -24,49 +18,52 @@ import ( const ClusterVersionOperatorConfigurationName = "cluster" +const defaultQueueKey = "ClusterVersionOperator/" + ClusterVersionOperatorConfigurationName + +type configuration struct { + lastObservedGeneration int64 + desiredLogLevel operatorv1.LogLevel +} + +func defaultConfiguration() configuration { + return configuration{ + lastObservedGeneration: 0, + desiredLogLevel: operatorv1.Normal, + } +} + +type ConfigProvider interface { + start(ctx context.Context) error + get(context.Context) (configuration, error) +} + type ClusterVersionOperatorConfiguration struct { - queueKey string // queue tracks checking for the CVO configuration. // // The type any is used to comply with the worker method of the cvo.Operator struct. queue workqueue.TypedRateLimitingInterface[any] - client cvoclientv1alpha1.ClusterVersionOperatorInterface - lister operatorlistersv1alpha1.ClusterVersionOperatorLister - factory operatorexternalversions.SharedInformerFactory - started bool - desiredLogLevel operatorv1.LogLevel - lastObservedGeneration int64 + configuration configuration + provider ConfigProvider + + // Function to handle an update to the internal configuration. + // + // In the future, the desired configuration may be consumed via other controllers. + // This will require some sort of synchronization upon a configuration change. + // For the moment, the log level is the sole consumer of the configuration. + // Apply the log level directly here for simplicity for the time being. + handler func(configuration) error } func (config *ClusterVersionOperatorConfiguration) Queue() workqueue.TypedRateLimitingInterface[any] { return config.queue } -// clusterVersionOperatorEventHandler queues an update for the cluster version operator on any change to the given object. -// Callers should use this with an informer. -func (config *ClusterVersionOperatorConfiguration) clusterVersionOperatorEventHandler() cache.ResourceEventHandler { - return cache.ResourceEventHandlerFuncs{ - AddFunc: func(_ interface{}) { - config.queue.Add(config.queueKey) - klog.V(i.Debug).Infof("ClusterVersionOperator resource was added; queuing a sync") - }, - UpdateFunc: func(_, _ interface{}) { - config.queue.Add(config.queueKey) - klog.V(i.Debug).Infof("ClusterVersionOperator resource was modified or resync period has passed; queuing a sync") - }, - DeleteFunc: func(_ interface{}) { - config.queue.Add(config.queueKey) - klog.V(i.Debug).Infof("ClusterVersionOperator resource was deleted; queuing a sync") - }, - } -} - // NewClusterVersionOperatorConfiguration returns ClusterVersionOperatorConfiguration, which might be used // to synchronize with the ClusterVersionOperator resource. -func NewClusterVersionOperatorConfiguration(client operatorclientset.Interface, factory operatorexternalversions.SharedInformerFactory) *ClusterVersionOperatorConfiguration { +func NewClusterVersionOperatorConfiguration() *ClusterVersionOperatorConfiguration { var desiredLogLevel operatorv1.LogLevel if currentLogLevel, notFound := loglevel.GetLogLevel(); notFound { klog.Warningf("The current log level could not be found; assuming the 'Normal' level is the currently desired") @@ -76,33 +73,28 @@ func NewClusterVersionOperatorConfiguration(client operatorclientset.Interface, } return &ClusterVersionOperatorConfiguration{ - queueKey: fmt.Sprintf("ClusterVersionOperator/%s", ClusterVersionOperatorConfigurationName), queue: workqueue.NewTypedRateLimitingQueueWithConfig[any]( workqueue.DefaultTypedControllerRateLimiter[any](), workqueue.TypedRateLimitingQueueConfig[any]{Name: "configuration"}), - client: client.OperatorV1alpha1().ClusterVersionOperators(), - factory: factory, - desiredLogLevel: desiredLogLevel, + configuration: configuration{desiredLogLevel: desiredLogLevel}, + handler: func(c configuration) error { return applyLogLevel(c.desiredLogLevel) }, } } +func (config *ClusterVersionOperatorConfiguration) UsingKubeAPIServer(client operatorclientset.Interface, factory operatorexternalversions.SharedInformerFactory) *ClusterVersionOperatorConfiguration { + config.provider = newConfigProviderKubeAPIServer(client, factory, config.queue) + return config +} + // Start initializes and starts the configuration's informers. Must be run before Sync is called. // Blocks until informers caches are synchronized or the context is cancelled. func (config *ClusterVersionOperatorConfiguration) Start(ctx context.Context) error { - informer := config.factory.Operator().V1alpha1().ClusterVersionOperators() - if _, err := informer.Informer().AddEventHandler(config.clusterVersionOperatorEventHandler()); err != nil { - return err + if config.provider == nil { + return fmt.Errorf("the configuration provider must be initialized") } - config.lister = informer.Lister() - - config.factory.Start(ctx.Done()) - synced := config.factory.WaitForCacheSync(ctx.Done()) - for _, ok := range synced { - if !ok { - return fmt.Errorf("caches failed to sync: %w", ctx.Err()) - } + if err := config.provider.start(ctx); err != nil { + return err } - config.started = true return nil } @@ -117,62 +109,17 @@ func (config *ClusterVersionOperatorConfiguration) Sync(ctx context.Context, key klog.V(i.Normal).Infof("Finished syncing CVO configuration (%v)", time.Since(startTime)) }() - desiredConfig, err := config.lister.Get(ClusterVersionOperatorConfigurationName) - if apierrors.IsNotFound(err) { - // TODO: Set default values - return nil - } + desiredConfig, err := config.provider.get(ctx) if err != nil { return err } - return config.sync(ctx, desiredConfig) -} - -// sync synchronizes the local configuration based on the desired configuration -// and updates the status of the Kubernetes resource if needed. -// -// desiredConfig is a read-only representation of the desired configuration. -func (config *ClusterVersionOperatorConfiguration) sync(ctx context.Context, desiredConfig *operatorv1alpha1.ClusterVersionOperator) error { - if desiredConfig.Status.ObservedGeneration != desiredConfig.Generation { - newConfig := desiredConfig.DeepCopy() - newConfig.Status.ObservedGeneration = desiredConfig.Generation - _, err := config.client.UpdateStatus(ctx, newConfig, metav1.UpdateOptions{}) - if err != nil { - return fmt.Errorf("failed to update the ClusterVersionOperator resource: %w", err) - } - } - - config.lastObservedGeneration = desiredConfig.Generation - config.desiredLogLevel = desiredConfig.Spec.OperatorLogLevel - if config.desiredLogLevel == "" { - config.desiredLogLevel = operatorv1.Normal + if desiredConfig.desiredLogLevel == "" { + desiredConfig.desiredLogLevel = operatorv1.Normal } + config.configuration = desiredConfig - currentLogLevel, notFound := loglevel.GetLogLevel() - if notFound { - klog.Warningf("The current log level could not be found; an attempt to set the log level to the desired level will be made") - } - - if !notFound && currentLogLevel == config.desiredLogLevel { - klog.V(i.Debug).Infof("No need to update the current CVO log level '%s'; it is already set to the desired value", currentLogLevel) - } else { - if err := loglevel.SetLogLevel(config.desiredLogLevel); err != nil { - return fmt.Errorf("failed to set the log level to %q: %w", config.desiredLogLevel, err) - } - - // E2E testing will be checking for existence or absence of these logs - switch config.desiredLogLevel { - case operatorv1.Normal: - klog.V(i.Normal).Infof("Successfully updated the log level from '%s' to 'Normal'", currentLogLevel) - case operatorv1.Debug: - klog.V(i.Debug).Infof("Successfully updated the log level from '%s' to 'Debug'", currentLogLevel) - case operatorv1.Trace: - klog.V(i.Trace).Infof("Successfully updated the log level from '%s' to 'Trace'", currentLogLevel) - case operatorv1.TraceAll: - klog.V(i.TraceAll).Infof("Successfully updated the log level from '%s' to 'TraceAll'", currentLogLevel) - default: - klog.Errorf("The CVO logging level has unexpected value '%s'", config.desiredLogLevel) - } + if config.handler != nil { + return config.handler(config.configuration) } return nil } diff --git a/pkg/cvo/configuration/configuration_test.go b/pkg/cvo/configuration/configuration_test.go index 8ed7602b5..4c97b44f5 100644 --- a/pkg/cvo/configuration/configuration_test.go +++ b/pkg/cvo/configuration/configuration_test.go @@ -17,17 +17,29 @@ import ( operatorexternalversions "github.com/openshift/client-go/operator/informers/externalversions" ) -func TestClusterVersionOperatorConfiguration_sync(t *testing.T) { +func TestClusterVersionOperatorConfiguration_APIServerSync(t *testing.T) { tests := []struct { name string - config operatorv1alpha1.ClusterVersionOperator - expectedConfig operatorv1alpha1.ClusterVersionOperator - internalConfig ClusterVersionOperatorConfiguration - expectedInternalConfig ClusterVersionOperatorConfiguration + config *operatorv1alpha1.ClusterVersionOperator + expectedConfig *operatorv1alpha1.ClusterVersionOperator + internalConfig configuration + expectedInternalConfig configuration + handlerFunctionCalled bool }{ + { + name: "the configuration resource does not exist in the cluster -> default configuration", + config: nil, + expectedConfig: nil, + internalConfig: configuration{}, + expectedInternalConfig: configuration{ + lastObservedGeneration: 0, + desiredLogLevel: operatorv1.Normal, + }, + handlerFunctionCalled: true, + }, { name: "first sync run correctly updates the status", - config: operatorv1alpha1.ClusterVersionOperator{ + config: &operatorv1alpha1.ClusterVersionOperator{ ObjectMeta: metav1.ObjectMeta{ Generation: 1, }, @@ -35,7 +47,7 @@ func TestClusterVersionOperatorConfiguration_sync(t *testing.T) { OperatorLogLevel: operatorv1.Normal, }, }, - expectedConfig: operatorv1alpha1.ClusterVersionOperator{ + expectedConfig: &operatorv1alpha1.ClusterVersionOperator{ ObjectMeta: metav1.ObjectMeta{ Generation: 1, }, @@ -46,18 +58,19 @@ func TestClusterVersionOperatorConfiguration_sync(t *testing.T) { ObservedGeneration: 1, }, }, - internalConfig: ClusterVersionOperatorConfiguration{ + internalConfig: configuration{ desiredLogLevel: operatorv1.Normal, lastObservedGeneration: 0, }, - expectedInternalConfig: ClusterVersionOperatorConfiguration{ + expectedInternalConfig: configuration{ desiredLogLevel: operatorv1.Normal, lastObservedGeneration: 1, }, + handlerFunctionCalled: true, }, { name: "sync updates observed generation correctly", - config: operatorv1alpha1.ClusterVersionOperator{ + config: &operatorv1alpha1.ClusterVersionOperator{ ObjectMeta: metav1.ObjectMeta{ Generation: 3, }, @@ -68,7 +81,7 @@ func TestClusterVersionOperatorConfiguration_sync(t *testing.T) { ObservedGeneration: 2, }, }, - expectedConfig: operatorv1alpha1.ClusterVersionOperator{ + expectedConfig: &operatorv1alpha1.ClusterVersionOperator{ ObjectMeta: metav1.ObjectMeta{ Generation: 3, }, @@ -79,18 +92,19 @@ func TestClusterVersionOperatorConfiguration_sync(t *testing.T) { ObservedGeneration: 3, }, }, - internalConfig: ClusterVersionOperatorConfiguration{ + internalConfig: configuration{ desiredLogLevel: operatorv1.Normal, lastObservedGeneration: 2, }, - expectedInternalConfig: ClusterVersionOperatorConfiguration{ + expectedInternalConfig: configuration{ desiredLogLevel: operatorv1.Normal, lastObservedGeneration: 3, }, + handlerFunctionCalled: true, }, { name: "sync updates desired log level correctly", - config: operatorv1alpha1.ClusterVersionOperator{ + config: &operatorv1alpha1.ClusterVersionOperator{ ObjectMeta: metav1.ObjectMeta{ Generation: 4, }, @@ -101,7 +115,7 @@ func TestClusterVersionOperatorConfiguration_sync(t *testing.T) { ObservedGeneration: 3, }, }, - expectedConfig: operatorv1alpha1.ClusterVersionOperator{ + expectedConfig: &operatorv1alpha1.ClusterVersionOperator{ ObjectMeta: metav1.ObjectMeta{ Generation: 4, }, @@ -112,18 +126,19 @@ func TestClusterVersionOperatorConfiguration_sync(t *testing.T) { ObservedGeneration: 4, }, }, - internalConfig: ClusterVersionOperatorConfiguration{ + internalConfig: configuration{ desiredLogLevel: operatorv1.Normal, lastObservedGeneration: 3, }, - expectedInternalConfig: ClusterVersionOperatorConfiguration{ + expectedInternalConfig: configuration{ desiredLogLevel: operatorv1.Trace, lastObservedGeneration: 4, }, + handlerFunctionCalled: true, }, { name: "number of not observed generations does not impact sync", - config: operatorv1alpha1.ClusterVersionOperator{ + config: &operatorv1alpha1.ClusterVersionOperator{ ObjectMeta: metav1.ObjectMeta{ Generation: 40, }, @@ -134,7 +149,7 @@ func TestClusterVersionOperatorConfiguration_sync(t *testing.T) { ObservedGeneration: 3, }, }, - expectedConfig: operatorv1alpha1.ClusterVersionOperator{ + expectedConfig: &operatorv1alpha1.ClusterVersionOperator{ ObjectMeta: metav1.ObjectMeta{ Generation: 40, }, @@ -145,131 +160,74 @@ func TestClusterVersionOperatorConfiguration_sync(t *testing.T) { ObservedGeneration: 40, }, }, - internalConfig: ClusterVersionOperatorConfiguration{ + internalConfig: configuration{ desiredLogLevel: operatorv1.Normal, lastObservedGeneration: 3, }, - expectedInternalConfig: ClusterVersionOperatorConfiguration{ + expectedInternalConfig: configuration{ desiredLogLevel: operatorv1.TraceAll, lastObservedGeneration: 40, }, + handlerFunctionCalled: true, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { // Initialize testing logic - client := operatorclientsetfake.NewClientset(&tt.config) - tt.internalConfig.client = client.OperatorV1alpha1().ClusterVersionOperators() - ctx, cancelFunc := context.WithDeadline(context.Background(), time.Now().Add(time.Minute)) - - // Run tested functionality - if err := tt.internalConfig.sync(ctx, &tt.config); err != nil { - t.Errorf("unexpected error %v", err) - } - - // Verify results - if tt.internalConfig.lastObservedGeneration != tt.expectedInternalConfig.lastObservedGeneration { - t.Errorf("unexpected 'lastObservedGeneration' value; wanted=%v, got=%v", tt.expectedInternalConfig.lastObservedGeneration, tt.internalConfig.lastObservedGeneration) - } - if tt.internalConfig.desiredLogLevel != tt.expectedInternalConfig.desiredLogLevel { - t.Errorf("unexpected 'desiredLogLevel' value; wanted=%v, got=%v", tt.expectedInternalConfig.desiredLogLevel, tt.internalConfig.desiredLogLevel) - } - - config, err := client.OperatorV1alpha1().ClusterVersionOperators().Get(ctx, "", metav1.GetOptions{}) - if err != nil { - t.Errorf("unexpected error %v", err) - } - if diff := cmp.Diff(tt.expectedConfig, *config, cmpopts.IgnoreFields(metav1.ObjectMeta{}, "ManagedFields")); diff != "" { - t.Errorf("unexpected config (-want, +got) = %v", diff) - } - - // Shutdown created resources - cancelFunc() - }) - } -} - -func TestClusterVersionOperatorConfiguration_Sync(t *testing.T) { - tests := []struct { - name string - config *operatorv1alpha1.ClusterVersionOperator - expectedConfig *operatorv1alpha1.ClusterVersionOperator - }{ - { - name: "the configuration resource does not exist in the cluster -> ignore", - config: nil, - expectedConfig: nil, - }, - { - name: "Sync updates the ClusterVersionOperator resource", - config: &operatorv1alpha1.ClusterVersionOperator{ - ObjectMeta: metav1.ObjectMeta{ - Name: "cluster", - Generation: 4, - }, - Spec: operatorv1alpha1.ClusterVersionOperatorSpec{ - OperatorLogLevel: operatorv1.Trace, - }, - Status: operatorv1alpha1.ClusterVersionOperatorStatus{ - ObservedGeneration: 3, - }, - }, - expectedConfig: &operatorv1alpha1.ClusterVersionOperator{ - ObjectMeta: metav1.ObjectMeta{ - Name: "cluster", - Generation: 4, - }, - Spec: operatorv1alpha1.ClusterVersionOperatorSpec{ - OperatorLogLevel: operatorv1.Trace, - }, - Status: operatorv1alpha1.ClusterVersionOperatorStatus{ - ObservedGeneration: 4, - }, - }, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - // Initialize testing logic - var client *operatorclientsetfake.Clientset + client := operatorclientsetfake.NewClientset() if tt.config != nil { + tt.config.Name = ClusterVersionOperatorConfigurationName + tt.expectedConfig.Name = ClusterVersionOperatorConfigurationName client = operatorclientsetfake.NewClientset(tt.config) - } else { - client = operatorclientsetfake.NewClientset() } - factory := operatorexternalversions.NewSharedInformerFactoryWithOptions(client, time.Minute) - cvoConfiguration := NewClusterVersionOperatorConfiguration(client, factory) - defer cvoConfiguration.queue.ShutDown() + configController := NewClusterVersionOperatorConfiguration().UsingKubeAPIServer(client, factory) + + called := false + configController.handler = func(_ configuration) error { + called = true + return nil + } ctx, cancelFunc := context.WithDeadline(context.Background(), time.Now().Add(time.Minute)) - defer cancelFunc() - err := cvoConfiguration.Start(ctx) - if err != nil { + if err := configController.Start(ctx); err != nil { t.Errorf("unexpected error %v", err) } + configController.configuration = tt.internalConfig // Run tested functionality - err = cvoConfiguration.Sync(ctx, "ClusterVersionOperator/cluster") - if err != nil { + if err := configController.Sync(ctx, "key"); err != nil { t.Errorf("unexpected error %v", err) } // Verify results - config, err := client.OperatorV1alpha1().ClusterVersionOperators().Get(ctx, "cluster", metav1.GetOptions{}) + if configController.configuration.lastObservedGeneration != tt.expectedInternalConfig.lastObservedGeneration { + t.Errorf("unexpected 'lastObservedGeneration' value; wanted=%v, got=%v", tt.expectedInternalConfig.lastObservedGeneration, configController.configuration.lastObservedGeneration) + } + if configController.configuration.desiredLogLevel != tt.expectedInternalConfig.desiredLogLevel { + t.Errorf("unexpected 'desiredLogLevel' value; wanted=%v, got=%v", tt.expectedInternalConfig.desiredLogLevel, configController.configuration.desiredLogLevel) + } + + config, err := client.OperatorV1alpha1().ClusterVersionOperators().Get(ctx, ClusterVersionOperatorConfigurationName, metav1.GetOptions{}) if err != nil && !apierrors.IsNotFound(err) { t.Errorf("unexpected error %v", err) } - switch { - case apierrors.IsNotFound(err) && tt.expectedConfig != nil: - t.Errorf("expected config to be '%v', got NotFound", *tt.expectedConfig) - case err == nil: - if diff := cmp.Diff(*tt.expectedConfig, *config, cmpopts.IgnoreFields(metav1.ObjectMeta{}, "ManagedFields")); diff != "" { - t.Errorf("unexpected config (-want, +got) = %v", diff) - } + // Set nil to differentiate between nonexisting configurations + if apierrors.IsNotFound(err) { + config = nil + } + if diff := cmp.Diff(tt.expectedConfig, config, cmpopts.IgnoreFields(metav1.ObjectMeta{}, "ManagedFields")); diff != "" { + t.Errorf("unexpected config (-want, +got) = %v", diff) + } + + if tt.handlerFunctionCalled != called { + t.Errorf("unexpected handler function execution; wanted=%v, got=%v", tt.handlerFunctionCalled, called) } + + // Shutdown created resources + cancelFunc() }) } } diff --git a/pkg/cvo/configuration/loglevel.go b/pkg/cvo/configuration/loglevel.go new file mode 100644 index 000000000..0e87f47dc --- /dev/null +++ b/pkg/cvo/configuration/loglevel.go @@ -0,0 +1,42 @@ +package configuration + +import ( + "fmt" + + "k8s.io/klog/v2" + + operatorv1 "github.com/openshift/api/operator/v1" + "github.com/openshift/library-go/pkg/operator/loglevel" + + i "github.com/openshift/cluster-version-operator/pkg/internal" +) + +func applyLogLevel(level operatorv1.LogLevel) error { + currentLogLevel, notFound := loglevel.GetLogLevel() + if notFound { + klog.Warningf("The current log level could not be found; an attempt to set the log level to the desired level will be made") + } + + if !notFound && currentLogLevel == level { + klog.V(i.Debug).Infof("No need to update the current CVO log level '%s'; it is already set to the desired value", currentLogLevel) + } else { + if err := loglevel.SetLogLevel(level); err != nil { + return fmt.Errorf("failed to set the log level to %q: %w", level, err) + } + + // E2E testing will be checking for existence or absence of these logs + switch level { + case operatorv1.Normal: + klog.V(i.Normal).Infof("Successfully updated the log level from '%s' to 'Normal'", currentLogLevel) + case operatorv1.Debug: + klog.V(i.Debug).Infof("Successfully updated the log level from '%s' to 'Debug'", currentLogLevel) + case operatorv1.Trace: + klog.V(i.Trace).Infof("Successfully updated the log level from '%s' to 'Trace'", currentLogLevel) + case operatorv1.TraceAll: + klog.V(i.TraceAll).Infof("Successfully updated the log level from '%s' to 'TraceAll'", currentLogLevel) + default: + klog.Errorf("The CVO logging level has unexpected value '%s'", level) + } + } + return nil +} diff --git a/pkg/cvo/configuration/resource.go b/pkg/cvo/configuration/resource.go new file mode 100644 index 000000000..43f98fcd7 --- /dev/null +++ b/pkg/cvo/configuration/resource.go @@ -0,0 +1,98 @@ +package configuration + +import ( + "context" + "fmt" + + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" + "k8s.io/klog/v2" + + operatorclientset "github.com/openshift/client-go/operator/clientset/versioned" + cvoclientv1alpha1 "github.com/openshift/client-go/operator/clientset/versioned/typed/operator/v1alpha1" + operatorexternalversions "github.com/openshift/client-go/operator/informers/externalversions" + operatorlistersv1alpha1 "github.com/openshift/client-go/operator/listers/operator/v1alpha1" + + i "github.com/openshift/cluster-version-operator/pkg/internal" +) + +type ConfigProviderKubeAPIServer struct { + queueAdd func(any) + queueKey string + + client cvoclientv1alpha1.ClusterVersionOperatorInterface + lister operatorlistersv1alpha1.ClusterVersionOperatorLister + factory operatorexternalversions.SharedInformerFactory +} + +func newConfigProviderKubeAPIServer(client operatorclientset.Interface, factory operatorexternalversions.SharedInformerFactory, queue workqueue.TypedRateLimitingInterface[any]) *ConfigProviderKubeAPIServer { + return &ConfigProviderKubeAPIServer{ + queueAdd: queue.Add, + queueKey: defaultQueueKey, + client: client.OperatorV1alpha1().ClusterVersionOperators(), + factory: factory, + } +} + +func (c *ConfigProviderKubeAPIServer) start(ctx context.Context) error { + informer := c.factory.Operator().V1alpha1().ClusterVersionOperators() + if _, err := informer.Informer().AddEventHandler(c.clusterVersionOperatorEventHandler()); err != nil { + return err + } + c.lister = informer.Lister() + + c.factory.Start(ctx.Done()) + synced := c.factory.WaitForCacheSync(ctx.Done()) + for _, ok := range synced { + if !ok { + return fmt.Errorf("caches failed to sync: %w", ctx.Err()) + } + } + return nil +} + +func (c *ConfigProviderKubeAPIServer) get(ctx context.Context) (configuration, error) { + config := defaultConfiguration() + + desiredConfig, err := c.lister.Get(ClusterVersionOperatorConfigurationName) + if apierrors.IsNotFound(err) { + return config, nil + } + if err != nil { + return config, err + } + + if desiredConfig.Status.ObservedGeneration != desiredConfig.Generation { + newConfig := desiredConfig.DeepCopy() + newConfig.Status.ObservedGeneration = desiredConfig.Generation + _, err := c.client.UpdateStatus(ctx, newConfig, metav1.UpdateOptions{}) + if err != nil { + return config, fmt.Errorf("failed to update the ClusterVersionOperator resource: %w", err) + } + } + + config.lastObservedGeneration = desiredConfig.Generation + config.desiredLogLevel = desiredConfig.Spec.OperatorLogLevel + return config, nil +} + +// clusterVersionOperatorEventHandler queues an update for the cluster version operator on any change to the given object. +// Callers should use this with an informer. +func (c *ConfigProviderKubeAPIServer) clusterVersionOperatorEventHandler() cache.ResourceEventHandler { + return cache.ResourceEventHandlerFuncs{ + AddFunc: func(_ interface{}) { + c.queueAdd(c.queueKey) + klog.V(i.Debug).Infof("ClusterVersionOperator resource was added; queuing a sync") + }, + UpdateFunc: func(_, _ interface{}) { + c.queueAdd(c.queueKey) + klog.V(i.Debug).Infof("ClusterVersionOperator resource was modified or resync period has passed; queuing a sync") + }, + DeleteFunc: func(_ interface{}) { + c.queueAdd(c.queueKey) + klog.V(i.Debug).Infof("ClusterVersionOperator resource was deleted; queuing a sync") + }, + } +} diff --git a/pkg/cvo/cvo.go b/pkg/cvo/cvo.go index 6f186479f..aff635983 100644 --- a/pkg/cvo/cvo.go +++ b/pkg/cvo/cvo.go @@ -290,7 +290,9 @@ func New( // make sure this is initialized after all the listers are initialized optr.upgradeableChecks = optr.defaultUpgradeableChecks() - optr.configuration = configuration.NewClusterVersionOperatorConfiguration(operatorClient, operatorInformerFactory) + if shouldReconcileCVOConfiguration(cvoGates, optr.hypershift) { + optr.configuration = configuration.NewClusterVersionOperatorConfiguration().UsingKubeAPIServer(operatorClient, operatorInformerFactory) + } return optr, nil } @@ -444,7 +446,9 @@ func (optr *Operator) Run(runContext context.Context, shutdownContext context.Co defer optr.queue.ShutDown() defer optr.availableUpdatesQueue.ShutDown() defer optr.upgradeableQueue.ShutDown() - defer optr.configuration.Queue().ShutDown() + if optr.configuration != nil { + defer optr.configuration.Queue().ShutDown() + } stopCh := runContext.Done() klog.Infof("Starting ClusterVersionOperator with minimum reconcile period %s", optr.minimumUpdateCheckInterval) @@ -457,6 +461,12 @@ func (optr *Operator) Run(runContext context.Context, shutdownContext context.Co return fmt.Errorf("caches never synchronized: %w", runContext.Err()) } + if optr.configuration != nil { + if err := optr.configuration.Start(runContext); err != nil { + return fmt.Errorf("unable to initialize the CVO configuration controller: %v", err) + } + } + // trigger the first cluster version reconcile always optr.queue.Add(optr.queueKey()) @@ -483,17 +493,13 @@ func (optr *Operator) Run(runContext context.Context, shutdownContext context.Co resultChannel <- asyncResult{name: "available updates"} }() - if optr.shouldReconcileCVOConfiguration() { + if optr.configuration != nil { resultChannelCount++ go func() { defer utilruntime.HandleCrash() - if err := optr.configuration.Start(runContext); err != nil { - utilruntime.HandleError(fmt.Errorf("unable to initialize the CVO configuration sync: %v", err)) - } else { - wait.UntilWithContext(runContext, func(runContext context.Context) { - optr.worker(runContext, optr.configuration.Queue(), optr.configuration.Sync) - }, time.Second) - } + wait.UntilWithContext(runContext, func(runContext context.Context) { + optr.worker(runContext, optr.configuration.Queue(), optr.configuration.Sync) + }, time.Second) resultChannel <- asyncResult{name: "cvo configuration"} }() } else { @@ -569,7 +575,9 @@ func (optr *Operator) Run(runContext context.Context, shutdownContext context.Co optr.queue.ShutDown() optr.availableUpdatesQueue.ShutDown() optr.upgradeableQueue.ShutDown() - optr.configuration.Queue().ShutDown() + if optr.configuration != nil { + optr.configuration.Queue().ShutDown() + } } } @@ -1070,7 +1078,7 @@ func (optr *Operator) HTTPClient() (*http.Client, error) { // shouldReconcileCVOConfiguration returns whether the CVO should reconcile its configuration using the API server. // // enabledFeatureGates must be initialized before the function is called. -func (optr *Operator) shouldReconcileCVOConfiguration() bool { +func shouldReconcileCVOConfiguration(enabledFeatureGates featuregates.CvoGateChecker, isHypershift bool) bool { // The relevant CRD and CR are not applied in HyperShift, which configures the CVO via a configuration file - return optr.enabledFeatureGates.CVOConfiguration() && !optr.hypershift + return enabledFeatureGates.CVOConfiguration() && !isHypershift }