Skip to content

Commit f9f88f5

Browse files
committed
config: Introduce configuration provider interface
Introduce a configuration provider interface. The goal of the interface is to abstract the underlying system of fetching the CVO configuration. Be it a Kubernetes API server, or a configuration file. For the moment, the Kubernetes API server is the sole provider. Refactor the code to adhere to the new logic. Apply a default configuration on a NotFound error (was previously noted as a TODO in the source code).
1 parent 5fafb38 commit f9f88f5

File tree

4 files changed

+130
-75
lines changed

4 files changed

+130
-75
lines changed

pkg/cvo/configuration/configuration.go

Lines changed: 29 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -5,44 +5,48 @@ import (
55
"fmt"
66
"time"
77

8-
apierrors "k8s.io/apimachinery/pkg/api/errors"
9-
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
10-
"k8s.io/client-go/tools/cache"
118
"k8s.io/client-go/util/workqueue"
129
"k8s.io/klog/v2"
1310

1411
operatorv1 "github.com/openshift/api/operator/v1"
15-
operatorv1alpha1 "github.com/openshift/api/operator/v1alpha1"
1612
operatorclientset "github.com/openshift/client-go/operator/clientset/versioned"
17-
cvoclientv1alpha1 "github.com/openshift/client-go/operator/clientset/versioned/typed/operator/v1alpha1"
1813
operatorexternalversions "github.com/openshift/client-go/operator/informers/externalversions"
19-
operatorlistersv1alpha1 "github.com/openshift/client-go/operator/listers/operator/v1alpha1"
2014
"github.com/openshift/library-go/pkg/operator/loglevel"
2115

2216
i "github.com/openshift/cluster-version-operator/pkg/internal"
2317
)
2418

2519
const ClusterVersionOperatorConfigurationName = "cluster"
2620

21+
const defaultQueueKey = "ClusterVersionOperator/" + ClusterVersionOperatorConfigurationName
22+
2723
type configuration struct {
2824
lastObservedGeneration int64
2925
desiredLogLevel operatorv1.LogLevel
3026
}
3127

28+
func defaultConfiguration() configuration {
29+
return configuration{
30+
lastObservedGeneration: 0,
31+
desiredLogLevel: operatorv1.Normal,
32+
}
33+
}
34+
35+
type ConfigProvider interface {
36+
start(ctx context.Context) error
37+
get(context.Context) (configuration, error)
38+
}
39+
3240
type ClusterVersionOperatorConfiguration struct {
33-
queueKey string
3441
// queue tracks checking for the CVO configuration.
3542
//
3643
// The type any is used to comply with the worker method of the cvo.Operator struct.
3744
queue workqueue.TypedRateLimitingInterface[any]
3845

39-
client cvoclientv1alpha1.ClusterVersionOperatorInterface
40-
lister operatorlistersv1alpha1.ClusterVersionOperatorLister
41-
factory operatorexternalversions.SharedInformerFactory
42-
4346
started bool
4447

4548
configuration configuration
49+
provider ConfigProvider
4650

4751
// Function to handle an update to the internal configuration.
4852
//
@@ -57,28 +61,9 @@ func (config *ClusterVersionOperatorConfiguration) Queue() workqueue.TypedRateLi
5761
return config.queue
5862
}
5963

60-
// clusterVersionOperatorEventHandler queues an update for the cluster version operator on any change to the given object.
61-
// Callers should use this with an informer.
62-
func (config *ClusterVersionOperatorConfiguration) clusterVersionOperatorEventHandler() cache.ResourceEventHandler {
63-
return cache.ResourceEventHandlerFuncs{
64-
AddFunc: func(_ interface{}) {
65-
config.queue.Add(config.queueKey)
66-
klog.V(i.Debug).Infof("ClusterVersionOperator resource was added; queuing a sync")
67-
},
68-
UpdateFunc: func(_, _ interface{}) {
69-
config.queue.Add(config.queueKey)
70-
klog.V(i.Debug).Infof("ClusterVersionOperator resource was modified or resync period has passed; queuing a sync")
71-
},
72-
DeleteFunc: func(_ interface{}) {
73-
config.queue.Add(config.queueKey)
74-
klog.V(i.Debug).Infof("ClusterVersionOperator resource was deleted; queuing a sync")
75-
},
76-
}
77-
}
78-
7964
// NewClusterVersionOperatorConfiguration returns ClusterVersionOperatorConfiguration, which might be used
8065
// to synchronize with the ClusterVersionOperator resource.
81-
func NewClusterVersionOperatorConfiguration(client operatorclientset.Interface, factory operatorexternalversions.SharedInformerFactory) *ClusterVersionOperatorConfiguration {
66+
func NewClusterVersionOperatorConfiguration() *ClusterVersionOperatorConfiguration {
8267
var desiredLogLevel operatorv1.LogLevel
8368
if currentLogLevel, notFound := loglevel.GetLogLevel(); notFound {
8469
klog.Warningf("The current log level could not be found; assuming the 'Normal' level is the currently desired")
@@ -88,34 +73,28 @@ func NewClusterVersionOperatorConfiguration(client operatorclientset.Interface,
8873
}
8974

9075
return &ClusterVersionOperatorConfiguration{
91-
queueKey: fmt.Sprintf("ClusterVersionOperator/%s", ClusterVersionOperatorConfigurationName),
9276
queue: workqueue.NewTypedRateLimitingQueueWithConfig[any](
9377
workqueue.DefaultTypedControllerRateLimiter[any](),
9478
workqueue.TypedRateLimitingQueueConfig[any]{Name: "configuration"}),
95-
client: client.OperatorV1alpha1().ClusterVersionOperators(),
96-
factory: factory,
9779
configuration: configuration{desiredLogLevel: desiredLogLevel},
9880
handler: func(c configuration) error { return applyLogLevel(c.desiredLogLevel) },
9981
}
10082
}
10183

84+
func (config *ClusterVersionOperatorConfiguration) UsingKubeAPIServer(client operatorclientset.Interface, factory operatorexternalversions.SharedInformerFactory) *ClusterVersionOperatorConfiguration {
85+
config.provider = newConfigProviderKubeAPIServer(client, factory, config.queue)
86+
return config
87+
}
88+
10289
// Start initializes and starts the configuration's informers. Must be run before Sync is called.
10390
// Blocks until informers caches are synchronized or the context is cancelled.
10491
func (config *ClusterVersionOperatorConfiguration) Start(ctx context.Context) error {
105-
informer := config.factory.Operator().V1alpha1().ClusterVersionOperators()
106-
if _, err := informer.Informer().AddEventHandler(config.clusterVersionOperatorEventHandler()); err != nil {
107-
return err
92+
if config.provider == nil {
93+
return fmt.Errorf("the configuration provider must be initialized")
10894
}
109-
config.lister = informer.Lister()
110-
111-
config.factory.Start(ctx.Done())
112-
synced := config.factory.WaitForCacheSync(ctx.Done())
113-
for _, ok := range synced {
114-
if !ok {
115-
return fmt.Errorf("caches failed to sync: %w", ctx.Err())
116-
}
95+
if err := config.provider.start(ctx); err != nil {
96+
return err
11797
}
118-
11998
config.started = true
12099
return nil
121100
}
@@ -130,36 +109,14 @@ func (config *ClusterVersionOperatorConfiguration) Sync(ctx context.Context, key
130109
klog.V(i.Normal).Infof("Finished syncing CVO configuration (%v)", time.Since(startTime))
131110
}()
132111

133-
desiredConfig, err := config.lister.Get(ClusterVersionOperatorConfigurationName)
134-
if apierrors.IsNotFound(err) {
135-
// TODO: Set default values
136-
return nil
137-
}
112+
desiredConfig, err := config.provider.get(ctx)
138113
if err != nil {
139114
return err
140115
}
141-
return config.sync(ctx, desiredConfig)
142-
}
143-
144-
// sync synchronizes the local configuration based on the desired configuration
145-
// and updates the status of the Kubernetes resource if needed.
146-
//
147-
// desiredConfig is a read-only representation of the desired configuration.
148-
func (config *ClusterVersionOperatorConfiguration) sync(ctx context.Context, desiredConfig *operatorv1alpha1.ClusterVersionOperator) error {
149-
if desiredConfig.Status.ObservedGeneration != desiredConfig.Generation {
150-
newConfig := desiredConfig.DeepCopy()
151-
newConfig.Status.ObservedGeneration = desiredConfig.Generation
152-
_, err := config.client.UpdateStatus(ctx, newConfig, metav1.UpdateOptions{})
153-
if err != nil {
154-
return fmt.Errorf("failed to update the ClusterVersionOperator resource: %w", err)
155-
}
156-
}
157-
158-
config.configuration.lastObservedGeneration = desiredConfig.Generation
159-
config.configuration.desiredLogLevel = desiredConfig.Spec.OperatorLogLevel
160-
if config.configuration.desiredLogLevel == "" {
161-
config.configuration.desiredLogLevel = operatorv1.Normal
116+
if desiredConfig.desiredLogLevel == "" {
117+
desiredConfig.desiredLogLevel = operatorv1.Normal
162118
}
119+
config.configuration = desiredConfig
163120

164121
if config.handler != nil {
165122
return config.handler(config.configuration)

pkg/cvo/configuration/configuration_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,7 @@ func TestClusterVersionOperatorConfiguration_sync(t *testing.T) {
169169
client := operatorclientsetfake.NewClientset(&tt.config)
170170
factory := operatorexternalversions.NewSharedInformerFactoryWithOptions(client, time.Minute)
171171

172-
configController := NewClusterVersionOperatorConfiguration(client, factory)
172+
configController := NewClusterVersionOperatorConfiguration().UsingKubeAPIServer(client, factory)
173173

174174
called := false
175175
configController.handler = func(_ configuration) error {
@@ -265,7 +265,7 @@ func TestClusterVersionOperatorConfiguration_Sync(t *testing.T) {
265265
}
266266

267267
factory := operatorexternalversions.NewSharedInformerFactoryWithOptions(client, time.Minute)
268-
cvoConfiguration := NewClusterVersionOperatorConfiguration(client, factory)
268+
cvoConfiguration := NewClusterVersionOperatorConfiguration().UsingKubeAPIServer(client, factory)
269269
defer cvoConfiguration.queue.ShutDown()
270270

271271
ctx, cancelFunc := context.WithDeadline(context.Background(), time.Now().Add(time.Minute))

pkg/cvo/configuration/resource.go

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
package configuration
2+
3+
import (
4+
"context"
5+
"fmt"
6+
7+
apierrors "k8s.io/apimachinery/pkg/api/errors"
8+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
9+
"k8s.io/client-go/tools/cache"
10+
"k8s.io/client-go/util/workqueue"
11+
"k8s.io/klog/v2"
12+
13+
operatorclientset "github.com/openshift/client-go/operator/clientset/versioned"
14+
cvoclientv1alpha1 "github.com/openshift/client-go/operator/clientset/versioned/typed/operator/v1alpha1"
15+
operatorexternalversions "github.com/openshift/client-go/operator/informers/externalversions"
16+
operatorlistersv1alpha1 "github.com/openshift/client-go/operator/listers/operator/v1alpha1"
17+
18+
i "github.com/openshift/cluster-version-operator/pkg/internal"
19+
)
20+
21+
type ConfigProviderKubeAPIServer struct {
22+
queueAdd func(any)
23+
queueKey string
24+
25+
client cvoclientv1alpha1.ClusterVersionOperatorInterface
26+
lister operatorlistersv1alpha1.ClusterVersionOperatorLister
27+
factory operatorexternalversions.SharedInformerFactory
28+
}
29+
30+
func newConfigProviderKubeAPIServer(client operatorclientset.Interface, factory operatorexternalversions.SharedInformerFactory, queue workqueue.TypedRateLimitingInterface[any]) *ConfigProviderKubeAPIServer {
31+
return &ConfigProviderKubeAPIServer{
32+
queueAdd: queue.Add,
33+
queueKey: defaultQueueKey,
34+
client: client.OperatorV1alpha1().ClusterVersionOperators(),
35+
factory: factory,
36+
}
37+
}
38+
39+
func (c *ConfigProviderKubeAPIServer) start(ctx context.Context) error {
40+
informer := c.factory.Operator().V1alpha1().ClusterVersionOperators()
41+
if _, err := informer.Informer().AddEventHandler(c.clusterVersionOperatorEventHandler()); err != nil {
42+
return err
43+
}
44+
c.lister = informer.Lister()
45+
46+
c.factory.Start(ctx.Done())
47+
synced := c.factory.WaitForCacheSync(ctx.Done())
48+
for _, ok := range synced {
49+
if !ok {
50+
return fmt.Errorf("caches failed to sync: %w", ctx.Err())
51+
}
52+
}
53+
return nil
54+
}
55+
56+
func (c *ConfigProviderKubeAPIServer) get(ctx context.Context) (configuration, error) {
57+
config := defaultConfiguration()
58+
59+
desiredConfig, err := c.lister.Get(ClusterVersionOperatorConfigurationName)
60+
if apierrors.IsNotFound(err) {
61+
return config, nil
62+
}
63+
if err != nil {
64+
return config, err
65+
}
66+
67+
if desiredConfig.Status.ObservedGeneration != desiredConfig.Generation {
68+
newConfig := desiredConfig.DeepCopy()
69+
newConfig.Status.ObservedGeneration = desiredConfig.Generation
70+
_, err := c.client.UpdateStatus(ctx, newConfig, metav1.UpdateOptions{})
71+
if err != nil {
72+
return config, fmt.Errorf("failed to update the ClusterVersionOperator resource: %w", err)
73+
}
74+
}
75+
76+
config.lastObservedGeneration = desiredConfig.Generation
77+
config.desiredLogLevel = desiredConfig.Spec.OperatorLogLevel
78+
return config, nil
79+
}
80+
81+
// clusterVersionOperatorEventHandler queues an update for the cluster version operator on any change to the given object.
82+
// Callers should use this with an informer.
83+
func (c *ConfigProviderKubeAPIServer) clusterVersionOperatorEventHandler() cache.ResourceEventHandler {
84+
return cache.ResourceEventHandlerFuncs{
85+
AddFunc: func(_ interface{}) {
86+
c.queueAdd(c.queueKey)
87+
klog.V(i.Debug).Infof("ClusterVersionOperator resource was added; queuing a sync")
88+
},
89+
UpdateFunc: func(_, _ interface{}) {
90+
c.queueAdd(c.queueKey)
91+
klog.V(i.Debug).Infof("ClusterVersionOperator resource was modified or resync period has passed; queuing a sync")
92+
},
93+
DeleteFunc: func(_ interface{}) {
94+
c.queueAdd(c.queueKey)
95+
klog.V(i.Debug).Infof("ClusterVersionOperator resource was deleted; queuing a sync")
96+
},
97+
}
98+
}

pkg/cvo/cvo.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -291,7 +291,7 @@ func New(
291291
optr.upgradeableChecks = optr.defaultUpgradeableChecks()
292292

293293
if shouldReconcileCVOConfiguration(cvoGates, optr.hypershift) {
294-
optr.configuration = configuration.NewClusterVersionOperatorConfiguration(operatorClient, operatorInformerFactory)
294+
optr.configuration = configuration.NewClusterVersionOperatorConfiguration().UsingKubeAPIServer(operatorClient, operatorInformerFactory)
295295
}
296296

297297
return optr, nil

0 commit comments

Comments
 (0)