Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
147 changes: 47 additions & 100 deletions pkg/cvo/configuration/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,68 +5,65 @@ import (
"fmt"
"time"

apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"

operatorv1 "github.com/openshift/api/operator/v1"
operatorv1alpha1 "github.com/openshift/api/operator/v1alpha1"
operatorclientset "github.com/openshift/client-go/operator/clientset/versioned"
cvoclientv1alpha1 "github.com/openshift/client-go/operator/clientset/versioned/typed/operator/v1alpha1"
operatorexternalversions "github.com/openshift/client-go/operator/informers/externalversions"
operatorlistersv1alpha1 "github.com/openshift/client-go/operator/listers/operator/v1alpha1"
"github.com/openshift/library-go/pkg/operator/loglevel"

i "github.com/openshift/cluster-version-operator/pkg/internal"
)

const ClusterVersionOperatorConfigurationName = "cluster"

const defaultQueueKey = "ClusterVersionOperator/" + ClusterVersionOperatorConfigurationName

type configuration struct {
lastObservedGeneration int64
desiredLogLevel operatorv1.LogLevel
}

func defaultConfiguration() configuration {
return configuration{
lastObservedGeneration: 0,
desiredLogLevel: operatorv1.Normal,
}
}

type ConfigProvider interface {
start(ctx context.Context) error
get(context.Context) (configuration, error)
}

type ClusterVersionOperatorConfiguration struct {
queueKey string
// queue tracks checking for the CVO configuration.
//
// The type any is used to comply with the worker method of the cvo.Operator struct.
queue workqueue.TypedRateLimitingInterface[any]

client cvoclientv1alpha1.ClusterVersionOperatorInterface
lister operatorlistersv1alpha1.ClusterVersionOperatorLister
factory operatorexternalversions.SharedInformerFactory

started bool

desiredLogLevel operatorv1.LogLevel
lastObservedGeneration int64
configuration configuration
provider ConfigProvider

// Function to handle an update to the internal configuration.
//
// In the future, the desired configuration may be consumed via other controllers.
// This will require some sort of synchronization upon a configuration change.
// For the moment, the log level is the sole consumer of the configuration.
// Apply the log level directly here for simplicity for the time being.
handler func(configuration) error
}

func (config *ClusterVersionOperatorConfiguration) Queue() workqueue.TypedRateLimitingInterface[any] {
return config.queue
}

// clusterVersionOperatorEventHandler queues an update for the cluster version operator on any change to the given object.
// Callers should use this with an informer.
func (config *ClusterVersionOperatorConfiguration) clusterVersionOperatorEventHandler() cache.ResourceEventHandler {
return cache.ResourceEventHandlerFuncs{
AddFunc: func(_ interface{}) {
config.queue.Add(config.queueKey)
klog.V(i.Debug).Infof("ClusterVersionOperator resource was added; queuing a sync")
},
UpdateFunc: func(_, _ interface{}) {
config.queue.Add(config.queueKey)
klog.V(i.Debug).Infof("ClusterVersionOperator resource was modified or resync period has passed; queuing a sync")
},
DeleteFunc: func(_ interface{}) {
config.queue.Add(config.queueKey)
klog.V(i.Debug).Infof("ClusterVersionOperator resource was deleted; queuing a sync")
},
}
}

// NewClusterVersionOperatorConfiguration returns ClusterVersionOperatorConfiguration, which might be used
// to synchronize with the ClusterVersionOperator resource.
func NewClusterVersionOperatorConfiguration(client operatorclientset.Interface, factory operatorexternalversions.SharedInformerFactory) *ClusterVersionOperatorConfiguration {
func NewClusterVersionOperatorConfiguration() *ClusterVersionOperatorConfiguration {
var desiredLogLevel operatorv1.LogLevel
if currentLogLevel, notFound := loglevel.GetLogLevel(); notFound {
klog.Warningf("The current log level could not be found; assuming the 'Normal' level is the currently desired")
Expand All @@ -76,33 +73,28 @@ func NewClusterVersionOperatorConfiguration(client operatorclientset.Interface,
}

return &ClusterVersionOperatorConfiguration{
queueKey: fmt.Sprintf("ClusterVersionOperator/%s", ClusterVersionOperatorConfigurationName),
queue: workqueue.NewTypedRateLimitingQueueWithConfig[any](
workqueue.DefaultTypedControllerRateLimiter[any](),
workqueue.TypedRateLimitingQueueConfig[any]{Name: "configuration"}),
client: client.OperatorV1alpha1().ClusterVersionOperators(),
factory: factory,
desiredLogLevel: desiredLogLevel,
configuration: configuration{desiredLogLevel: desiredLogLevel},
handler: func(c configuration) error { return applyLogLevel(c.desiredLogLevel) },
}
}

func (config *ClusterVersionOperatorConfiguration) UsingKubeAPIServer(client operatorclientset.Interface, factory operatorexternalversions.SharedInformerFactory) *ClusterVersionOperatorConfiguration {
config.provider = newConfigProviderKubeAPIServer(client, factory, config.queue)
return config
}

// Start initializes and starts the configuration's informers. Must be run before Sync is called.
// Blocks until informers caches are synchronized or the context is cancelled.
func (config *ClusterVersionOperatorConfiguration) Start(ctx context.Context) error {
informer := config.factory.Operator().V1alpha1().ClusterVersionOperators()
if _, err := informer.Informer().AddEventHandler(config.clusterVersionOperatorEventHandler()); err != nil {
return err
if config.provider == nil {
return fmt.Errorf("the configuration provider must be initialized")
}
config.lister = informer.Lister()

config.factory.Start(ctx.Done())
synced := config.factory.WaitForCacheSync(ctx.Done())
for _, ok := range synced {
if !ok {
return fmt.Errorf("caches failed to sync: %w", ctx.Err())
}
if err := config.provider.start(ctx); err != nil {
return err
}

config.started = true
return nil
}
Expand All @@ -117,62 +109,17 @@ func (config *ClusterVersionOperatorConfiguration) Sync(ctx context.Context, key
klog.V(i.Normal).Infof("Finished syncing CVO configuration (%v)", time.Since(startTime))
}()

desiredConfig, err := config.lister.Get(ClusterVersionOperatorConfigurationName)
if apierrors.IsNotFound(err) {
// TODO: Set default values
return nil
}
desiredConfig, err := config.provider.get(ctx)
if err != nil {
return err
}
return config.sync(ctx, desiredConfig)
}

// sync synchronizes the local configuration based on the desired configuration
// and updates the status of the Kubernetes resource if needed.
//
// desiredConfig is a read-only representation of the desired configuration.
func (config *ClusterVersionOperatorConfiguration) sync(ctx context.Context, desiredConfig *operatorv1alpha1.ClusterVersionOperator) error {
if desiredConfig.Status.ObservedGeneration != desiredConfig.Generation {
newConfig := desiredConfig.DeepCopy()
newConfig.Status.ObservedGeneration = desiredConfig.Generation
_, err := config.client.UpdateStatus(ctx, newConfig, metav1.UpdateOptions{})
if err != nil {
return fmt.Errorf("failed to update the ClusterVersionOperator resource: %w", err)
}
}

config.lastObservedGeneration = desiredConfig.Generation
config.desiredLogLevel = desiredConfig.Spec.OperatorLogLevel
if config.desiredLogLevel == "" {
config.desiredLogLevel = operatorv1.Normal
if desiredConfig.desiredLogLevel == "" {
desiredConfig.desiredLogLevel = operatorv1.Normal
}
config.configuration = desiredConfig

currentLogLevel, notFound := loglevel.GetLogLevel()
if notFound {
klog.Warningf("The current log level could not be found; an attempt to set the log level to the desired level will be made")
}

if !notFound && currentLogLevel == config.desiredLogLevel {
klog.V(i.Debug).Infof("No need to update the current CVO log level '%s'; it is already set to the desired value", currentLogLevel)
} else {
if err := loglevel.SetLogLevel(config.desiredLogLevel); err != nil {
return fmt.Errorf("failed to set the log level to %q: %w", config.desiredLogLevel, err)
}

// E2E testing will be checking for existence or absence of these logs
switch config.desiredLogLevel {
case operatorv1.Normal:
klog.V(i.Normal).Infof("Successfully updated the log level from '%s' to 'Normal'", currentLogLevel)
case operatorv1.Debug:
klog.V(i.Debug).Infof("Successfully updated the log level from '%s' to 'Debug'", currentLogLevel)
case operatorv1.Trace:
klog.V(i.Trace).Infof("Successfully updated the log level from '%s' to 'Trace'", currentLogLevel)
case operatorv1.TraceAll:
klog.V(i.TraceAll).Infof("Successfully updated the log level from '%s' to 'TraceAll'", currentLogLevel)
default:
klog.Errorf("The CVO logging level has unexpected value '%s'", config.desiredLogLevel)
}
if config.handler != nil {
return config.handler(config.configuration)
}
return nil
}
Loading