Skip to content

Commit 1adeb58

Browse files
committed
feat(queueinformer): add config options
- Add variadic config options to QueueInformer constructors - Make number of queue workers configurable (default to 2) - Ensure InstallPlan sync only triggers resolution *after* a change
1 parent 149a0d1 commit 1adeb58

File tree

11 files changed

+131
-50
lines changed

11 files changed

+131
-50
lines changed

cmd/catalog/main.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -95,8 +95,7 @@ func main() {
9595

9696
logger := log.New()
9797
if *debug {
98-
// TODO: change back to debug level
99-
logger.SetLevel(log.TraceLevel)
98+
logger.SetLevel(log.DebugLevel)
10099
}
101100
logger.Infof("log level %s", logger.Level)
102101

cmd/olm/main.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -91,8 +91,7 @@ func main() {
9191
// Set log level to debug if `debug` flag set
9292
logger := log.New()
9393
if *debug {
94-
// TODO: Switch back to debug level
95-
logger.SetLevel(log.TraceLevel)
94+
logger.SetLevel(log.DebugLevel)
9695
}
9796
logger.Infof("log level %s", logger.Level)
9897

pkg/api/apis/operators/register.go

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,3 @@ func addKnownTypes(scheme *runtime.Scheme) error {
4848
)
4949
return nil
5050
}
51-
52-
func init() {
53-
54-
}

pkg/controller/operators/catalog/operator.go

Lines changed: 13 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
8888

8989
// Create a new queueinformer-based operator.
9090
opClient := operatorclient.NewClientFromConfig(kubeconfigPath, logger)
91-
queueOperator, err := queueinformer.NewOperatorFromClient(opClient.KubernetesInterface().Discovery(), logger)
91+
queueOperator, err := queueinformer.NewOperator(opClient.KubernetesInterface().Discovery(), queueinformer.WithOperatorLogger(logger))
9292
if err != nil {
9393
return nil, err
9494
}
@@ -550,7 +550,6 @@ func (o *Operator) syncCatalogSources(obj interface{}) (syncError error) {
550550
}
551551

552552
// Trigger a resolve, will pick up any subscriptions that depend on the catalog
553-
// o.resolveNamespace(out.GetNamespace())
554553
o.nsResolveQueue.Add(out.GetNamespace())
555554

556555
return nil
@@ -915,18 +914,6 @@ func (o *Operator) syncInstallPlans(obj interface{}) (syncError error) {
915914

916915
logger.Info("syncing")
917916

918-
defer func() {
919-
// make sure to notify subscription loop of installplan changes
920-
if owners := ownerutil.GetOwnersByKind(plan, v1alpha1.SubscriptionKind); len(owners) > 0 {
921-
for _, owner := range owners {
922-
logger.WithField("owner", owner).Debug("requeueing installplan owner")
923-
o.subQueueSet.Requeue(plan.GetNamespace(), owner.Name)
924-
}
925-
} else {
926-
logger.Trace("no installplan owner subscriptions found to requeue")
927-
}
928-
}()
929-
930917
if len(plan.Status.Plan) == 0 {
931918
logger.Info("skip processing installplan without status - subscription sync responsible for initial status")
932919
return
@@ -943,6 +930,18 @@ func (o *Operator) syncInstallPlans(obj interface{}) (syncError error) {
943930
return
944931
}
945932

933+
defer func() {
934+
// Notify subscription loop of installplan changes
935+
if owners := ownerutil.GetOwnersByKind(plan, v1alpha1.SubscriptionKind); len(owners) > 0 {
936+
for _, owner := range owners {
937+
logger.WithField("owner", owner).Debug("requeueing installplan owner")
938+
o.subQueueSet.Requeue(plan.GetNamespace(), owner.Name)
939+
}
940+
} else {
941+
logger.Trace("no installplan owner subscriptions found to requeue")
942+
}
943+
}()
944+
946945
// Update InstallPlan with status of transition. Log errors if we can't write them to the status.
947946
if _, err := o.client.OperatorsV1alpha1().InstallPlans(plan.GetNamespace()).UpdateStatus(outInstallPlan); err != nil {
948947
logger = logger.WithField("updateError", err.Error())

pkg/controller/operators/catalog/operator_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -664,7 +664,7 @@ func NewFakeOperator(ctx context.Context, namespace string, watchedNamespaces []
664664
}
665665

666666
// Create the new operator
667-
queueOperator, err := queueinformer.NewOperatorFromClient(opClientFake.KubernetesInterface().Discovery(), logrus.New())
667+
queueOperator, err := queueinformer.NewOperator(opClientFake.KubernetesInterface().Discovery())
668668
for _, informer := range sharedInformers {
669669
queueOperator.RegisterInformer(informer)
670670
}

pkg/controller/operators/olm/operator.go

Lines changed: 1 addition & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -83,18 +83,7 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat
8383
return nil, err
8484
}
8585

86-
// // Create a new client for OLM types (CRs)
87-
// crClient, err := client.NewClient(kubeconfigPath)
88-
// if err != nil {
89-
// return nil, err
90-
// }
91-
92-
// internalClient, err := client.NewInternalClient(kubeconfigPath)
93-
// if err != nil {
94-
// return nil, err
95-
// }
96-
97-
queueOperator, err := queueinformer.NewOperatorFromClient(config.operatorClient.KubernetesInterface().Discovery(), config.logger)
86+
queueOperator, err := queueinformer.NewOperator(config.operatorClient.KubernetesInterface().Discovery(), queueinformer.WithOperatorLogger(config.logger))
9887
if err != nil {
9988
return nil, err
10089
}

pkg/controller/operators/olm/requirements.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -331,7 +331,7 @@ func (a *Operator) permissionStatus(strategyDetailsDeployment *install.StrategyD
331331

332332
statuses := []v1alpha1.RequirementStatus{}
333333
for key, status := range statusesSet {
334-
a.logger.WithField("key", key).WithField("status", status).Debugf("appending permission status")
334+
a.logger.WithField("key", key).WithField("status", status).Tracef("appending permission status")
335335
statuses = append(statuses, status)
336336
}
337337

pkg/lib/queueinformer/config.go

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package queueinformer
33
import (
44
"github.com/pkg/errors"
55
"github.com/sirupsen/logrus"
6+
"k8s.io/client-go/discovery"
67
"k8s.io/client-go/tools/cache"
78
"k8s.io/client-go/util/workqueue"
89

@@ -140,3 +141,76 @@ func WithSyncer(syncer kubestate.Syncer) Option {
140141
config.syncer = syncer
141142
}
142143
}
144+
145+
type operatorConfig struct {
146+
discovery discovery.DiscoveryInterface
147+
queueInformers []*QueueInformer
148+
informers []cache.SharedIndexInformer
149+
logger *logrus.Logger
150+
numWorkers int
151+
}
152+
153+
type OperatorOption func(*operatorConfig)
154+
155+
// apply sequentially applies the given options to the config.
156+
func (c *operatorConfig) apply(options []OperatorOption) {
157+
for _, option := range options {
158+
option(c)
159+
}
160+
}
161+
162+
func newInvalidOperatorConfigError(msg string) error {
163+
return errors.Errorf("invalid queue informer operator config: %s", msg)
164+
}
165+
166+
// WithOperatorLogger sets the logger used by an Operator.
167+
func WithOperatorLogger(logger *logrus.Logger) OperatorOption {
168+
return func(config *operatorConfig) {
169+
config.logger = logger
170+
}
171+
}
172+
173+
// WithQueueInformers registers a set of initial QueueInformers with an Operator.
174+
// If the QueueInformer is configured with a SharedIndexInformer, that SharedIndexInformer
175+
// is registered with the Operator automatically.
176+
func WithQueueInformers(queueInformers ...*QueueInformer) OperatorOption {
177+
return func(config *operatorConfig) {
178+
config.queueInformers = queueInformers
179+
}
180+
}
181+
182+
// WithQueueInformers registers a set of initial Informers with an Operator.
183+
func WithInformers(informers ...cache.SharedIndexInformer) OperatorOption {
184+
return func(config *operatorConfig) {
185+
config.informers = informers
186+
}
187+
}
188+
189+
// WithNumWorkers sets the number of workers an Operator uses to process each queue.
190+
// It translates directly to the number of queue items processed in parallel for a given queue.
191+
// Specifying zero or less workers is an invariant and will cause an error upon configuration.
192+
// Specifying one worker indicates that each queue will only have one item processed at a time.
193+
func WithNumWorkers(numWorkers int) OperatorOption {
194+
return func(config *operatorConfig) {
195+
config.numWorkers = numWorkers
196+
}
197+
}
198+
199+
// validate returns an error if the config isn't valid.
200+
func (c *operatorConfig) validate() (err error) {
201+
switch config := c; {
202+
case config.discovery == nil:
203+
err = newInvalidOperatorConfigError("discovery client nil")
204+
case config.numWorkers < 1:
205+
err = newInvalidOperatorConfigError("must specify at least one worker per queue")
206+
}
207+
208+
return
209+
}
210+
211+
func defaultOperatorConfig() *operatorConfig {
212+
return &operatorConfig{
213+
logger: logrus.New(),
214+
numWorkers: 2,
215+
}
216+
}

pkg/lib/queueinformer/queueinformer_operator.go

Lines changed: 37 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ type operator struct {
5151
informers []cache.SharedIndexInformer
5252
hasSynced cache.InformerSynced
5353
mu sync.RWMutex
54+
numWorkers int
5455
runInformersOnce sync.Once
5556
reconcileOnce sync.Once
5657
logger *logrus.Logger
@@ -206,8 +207,9 @@ func (o *operator) run(ctx context.Context) {
206207

207208
o.logger.Info("starting workers...")
208209
for _, queueInformer := range o.queueInformers {
209-
go o.worker(ctx, queueInformer)
210-
go o.worker(ctx, queueInformer)
210+
for w := 0; w < o.numWorkers; w++ {
211+
go o.worker(ctx, queueInformer)
212+
}
211213
}
212214

213215
close(o.ready)
@@ -249,7 +251,7 @@ func (o *operator) processNextWorkItem(ctx context.Context, loop *QueueInformer)
249251
resource, exists, err := loop.indexer.GetByKey(key)
250252
if err != nil {
251253
logger.WithError(err).Error("cache get failed")
252-
// queue.Forget(item)
254+
queue.Forget(item)
253255
return true
254256
}
255257
if !exists {
@@ -283,18 +285,41 @@ func (o *operator) processNextWorkItem(ctx context.Context, loop *QueueInformer)
283285
return true
284286
}
285287

286-
// NewOperatorFromClient returns a new Operator configured to manage the cluster with the given discovery client.
287-
func NewOperatorFromClient(disc discovery.DiscoveryInterface, logger *logrus.Logger) (Operator, error) {
288+
// NewOperator returns a new Operator configured to manage the cluster with the given discovery client.
289+
func NewOperator(disc discovery.DiscoveryInterface, options ...OperatorOption) (Operator, error) {
290+
config := defaultOperatorConfig()
291+
config.discovery = disc
292+
config.apply(options)
293+
if err := config.validate(); err != nil {
294+
return nil, err
295+
}
296+
297+
return newOperatorFromConfig(config)
298+
299+
}
300+
301+
func newOperatorFromConfig(config *operatorConfig) (Operator, error) {
288302
op := &operator{
289-
discovery: disc,
290-
queueInformers: []*QueueInformer{},
291-
informers: []cache.SharedIndexInformer{},
292-
logger: logger,
293-
ready: make(chan struct{}),
294-
done: make(chan struct{}),
295-
atLevel: make(chan error, 25),
303+
discovery: config.discovery,
304+
numWorkers: config.numWorkers,
305+
logger: config.logger,
306+
ready: make(chan struct{}),
307+
done: make(chan struct{}),
308+
atLevel: make(chan error, 25),
296309
}
297310
op.syncCh = op.atLevel
298311

312+
// Register QueueInformers and Informers
313+
for _, queueInformer := range op.queueInformers {
314+
if err := op.RegisterQueueInformer(queueInformer); err != nil {
315+
return nil, err
316+
}
317+
}
318+
for _, informer := range op.informers {
319+
if err := op.RegisterInformer(informer); err != nil {
320+
return nil, err
321+
}
322+
}
323+
299324
return op, nil
300325
}

pkg/package-server/provider/registry_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ func NewFakeRegistryProvider(ctx context.Context, clientObjs []runtime.Object, k
8383
k8sClientFake := k8sfake.NewSimpleClientset(k8sObjs...)
8484
opClientFake := operatorclient.NewClient(k8sClientFake, nil, nil)
8585

86-
op, err := queueinformer.NewOperatorFromClient(opClientFake.KubernetesInterface().Discovery(), logrus.StandardLogger())
86+
op, err := queueinformer.NewOperator(opClientFake.KubernetesInterface().Discovery())
8787
if err != nil {
8888
return nil, err
8989
}

0 commit comments

Comments
 (0)