Skip to content

Commit ec8a3b9

Browse files
committed
refactor(queueinformer): use kubestate resource events
- Remove QueueIndexer type - Remove informer requirement from QueueInformer (takes the place of QueueIndexers) - Replace factory method parameters with config options - Use ResourceEvents as queue items - Replace stop channel with context for the QueueInformer Operator - Bump requeue limit to 8 - Add an interface that describes QueueInformer Operator - Expose Operator run channels with getter methods - Process deletion events from queue without using an indexer (but never requeue them on failure) - Add legacy sync handler adapter functions - Swap namespace and name parameter order in ResourceQueue methods
1 parent 915d096 commit ec8a3b9

File tree

6 files changed

+625
-381
lines changed

6 files changed

+625
-381
lines changed

pkg/lib/queueinformer/config.go

Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
package queueinformer
2+
3+
import (
4+
"github.com/pkg/errors"
5+
"github.com/sirupsen/logrus"
6+
"k8s.io/client-go/tools/cache"
7+
"k8s.io/client-go/util/workqueue"
8+
9+
"github.com/operator-framework/operator-lifecycle-manager/pkg/lib/kubestate"
10+
"github.com/operator-framework/operator-lifecycle-manager/pkg/metrics"
11+
)
12+
13+
type queueInformerConfig struct {
14+
provider metrics.MetricsProvider
15+
logger *logrus.Logger
16+
queue workqueue.RateLimitingInterface
17+
informer cache.SharedIndexInformer
18+
indexer cache.Indexer
19+
keyFunc KeyFunc
20+
syncer kubestate.Syncer
21+
}
22+
23+
// Option applies an option to the given queue informer config.
24+
type Option func(config *queueInformerConfig)
25+
26+
// apply sequentially applies the given options to the config.
27+
func (c *queueInformerConfig) apply(options []Option) {
28+
for _, option := range options {
29+
option(c)
30+
}
31+
}
32+
33+
func newInvalidConfigError(msg string) error {
34+
return errors.Errorf("invalid queue informer config: %s", msg)
35+
}
36+
37+
func (c *queueInformerConfig) complete() {
38+
if c.indexer == nil && c.informer != nil {
39+
// Extract indexer from informer if
40+
c.indexer = c.informer.GetIndexer()
41+
}
42+
}
43+
44+
// validate returns an error if the config isn't valid.
45+
func (c *queueInformerConfig) validate() (err error) {
46+
switch config := c; {
47+
case config.provider == nil:
48+
err = newInvalidConfigError("nil metrics provider")
49+
case config.logger == nil:
50+
err = newInvalidConfigError("nil logger")
51+
case config.queue == nil:
52+
err = newInvalidConfigError("nil queue")
53+
case config.indexer == nil && config.informer == nil:
54+
err = newInvalidConfigError("nil indexer and informer")
55+
case config.keyFunc == nil:
56+
err = newInvalidConfigError("nil key function")
57+
case config.syncer == nil:
58+
err = newInvalidConfigError("nil syncer")
59+
}
60+
61+
return
62+
}
63+
64+
func defaultKeyFunc(obj interface{}) (string, bool) {
65+
// Get keys nested in resource events up to depth 2
66+
keyable := false
67+
for d := 0; d < 2 && !keyable; d++ {
68+
switch v := obj.(type) {
69+
case string:
70+
return v, true
71+
case kubestate.ResourceEvent:
72+
obj = v.Resource()
73+
default:
74+
keyable = true
75+
}
76+
}
77+
78+
k, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
79+
if err != nil {
80+
return k, false
81+
}
82+
83+
return k, true
84+
}
85+
86+
func defaultConfig() *queueInformerConfig {
87+
return &queueInformerConfig{
88+
provider: metrics.NewMetricsNil(),
89+
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "default"),
90+
logger: logrus.New(),
91+
keyFunc: defaultKeyFunc,
92+
}
93+
}
94+
95+
// WithMetricsProvider configures the QueueInformer's MetricsProvider as provider.
96+
func WithMetricsProvider(provider metrics.MetricsProvider) Option {
97+
return func(config *queueInformerConfig) {
98+
config.provider = provider
99+
}
100+
}
101+
102+
// WithLogger configures logger as the QueueInformer's Logger.
103+
func WithLogger(logger *logrus.Logger) Option {
104+
return func(config *queueInformerConfig) {
105+
config.logger = logger
106+
}
107+
}
108+
109+
// WithQueue sets the queue used by a QueueInformer.
110+
func WithQueue(queue workqueue.RateLimitingInterface) Option {
111+
return func(config *queueInformerConfig) {
112+
config.queue = queue
113+
}
114+
}
115+
116+
// WithInformer sets the informer used by a QueueInformer.
117+
func WithInformer(informer cache.SharedIndexInformer) Option {
118+
return func(config *queueInformerConfig) {
119+
config.informer = informer
120+
}
121+
}
122+
123+
// WithIndexer sets the indexer used by a QueueInformer.
124+
func WithIndexer(indexer cache.Indexer) Option {
125+
return func(config *queueInformerConfig) {
126+
config.indexer = indexer
127+
}
128+
}
129+
130+
// WithKeyFunc sets the key func used by a QueueInformer.
131+
func WithKeyFunc(keyFunc KeyFunc) Option {
132+
return func(config *queueInformerConfig) {
133+
config.keyFunc = keyFunc
134+
}
135+
}
136+
137+
// WithSyncer sets the syncer invoked by a QueueInformer.
138+
func WithSyncer(syncer kubestate.Syncer) Option {
139+
return func(config *queueInformerConfig) {
140+
config.syncer = syncer
141+
}
142+
}

pkg/lib/queueinformer/config_test.go

Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
package queueinformer
2+
3+
import (
4+
"testing"
5+
6+
"github.com/stretchr/testify/require"
7+
corev1 "k8s.io/api/core/v1"
8+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
9+
"k8s.io/client-go/tools/cache"
10+
11+
"github.com/operator-framework/operator-lifecycle-manager/pkg/lib/kubestate"
12+
)
13+
14+
func TestDefaultKeyFunc(t *testing.T) {
15+
tests := []struct {
16+
description string
17+
obj interface{}
18+
expectedKey string
19+
expectedCreated bool
20+
}{
21+
{
22+
description: "String/Created",
23+
obj: "a-string-key",
24+
expectedKey: "a-string-key",
25+
expectedCreated: true,
26+
},
27+
{
28+
description: "ExplicitKey/Created",
29+
obj: cache.ExplicitKey("an-explicit-key"),
30+
expectedKey: "an-explicit-key",
31+
expectedCreated: true,
32+
},
33+
{
34+
description: "Meta/Created",
35+
obj: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Namespace: "default", Name: "a-pod"}},
36+
expectedKey: "default/a-pod",
37+
expectedCreated: true,
38+
},
39+
{
40+
description: "Meta/NonNamespaced/Created",
41+
obj: &corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: "a-namespace"}},
42+
expectedKey: "a-namespace",
43+
expectedCreated: true,
44+
},
45+
{
46+
description: "ResourceEvent/String/Created",
47+
obj: kubestate.NewResourceEvent(kubestate.ResourceAdded, "a-string-key"),
48+
expectedKey: "a-string-key",
49+
expectedCreated: true,
50+
},
51+
{
52+
description: "ResourceEvent/ExplicitKey/Created",
53+
obj: kubestate.NewResourceEvent(kubestate.ResourceAdded, cache.ExplicitKey("an-explicit-key")),
54+
expectedKey: "an-explicit-key",
55+
expectedCreated: true,
56+
},
57+
{
58+
description: "ResourceEvent/Meta/Created",
59+
obj: kubestate.NewResourceEvent(kubestate.ResourceAdded, &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Namespace: "default", Name: "a-pod"}}),
60+
expectedKey: "default/a-pod",
61+
expectedCreated: true,
62+
},
63+
{
64+
description: "ResourceEvent/Meta/NonNamespaced/Created",
65+
obj: kubestate.NewResourceEvent(kubestate.ResourceAdded, &corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: "a-namespace"}}),
66+
expectedKey: "a-namespace",
67+
expectedCreated: true,
68+
},
69+
{
70+
description: "ResourceEvent/ResourceEvent/ExplicitKey/Created",
71+
obj: kubestate.NewResourceEvent(kubestate.ResourceAdded, kubestate.NewResourceEvent(kubestate.ResourceAdded, cache.ExplicitKey("an-explicit-key"))),
72+
expectedKey: "an-explicit-key",
73+
expectedCreated: true,
74+
},
75+
{
76+
description: "ResourceEvent/ResourceEvent/Meta/Created",
77+
obj: kubestate.NewResourceEvent(kubestate.ResourceAdded, kubestate.NewResourceEvent(kubestate.ResourceAdded, &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Namespace: "default", Name: "a-pod"}})),
78+
expectedKey: "default/a-pod",
79+
expectedCreated: true,
80+
},
81+
{
82+
description: "Arbitrary/NotCreated",
83+
obj: struct{}{},
84+
expectedKey: "",
85+
expectedCreated: false,
86+
},
87+
{
88+
description: "ResourceEvent/Arbitrary/NotCreated",
89+
obj: kubestate.NewResourceEvent(kubestate.ResourceAdded, struct{}{}),
90+
expectedKey: "",
91+
expectedCreated: false,
92+
},
93+
{
94+
description: "ResourceEvent/ResourceEvent/Arbitrary/NotCreated",
95+
obj: kubestate.NewResourceEvent(kubestate.ResourceAdded, kubestate.NewResourceEvent(kubestate.ResourceAdded, struct{}{})),
96+
expectedKey: "",
97+
expectedCreated: false,
98+
},
99+
{
100+
description: "ResourceEvent/ResourceEvent/ResourceEvent/String/NotCreated",
101+
obj: kubestate.NewResourceEvent(kubestate.ResourceAdded, kubestate.NewResourceEvent(kubestate.ResourceAdded, kubestate.NewResourceEvent(kubestate.ResourceAdded, &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Namespace: "default", Name: "a-pod"}}))),
102+
expectedKey: "",
103+
expectedCreated: false,
104+
},
105+
}
106+
107+
for _, tt := range tests {
108+
t.Run(tt.description, func(t *testing.T) {
109+
key, created := defaultKeyFunc(tt.obj)
110+
require.Equal(t, tt.expectedKey, key)
111+
require.Equal(t, tt.expectedCreated, created)
112+
})
113+
}
114+
}

pkg/lib/queueinformer/queueindexer.go

Lines changed: 0 additions & 65 deletions
This file was deleted.

0 commit comments

Comments
 (0)