Skip to content

Commit 7b1c6b5

Browse files
committed
fix(queueInformers): use separate queue for each namespace
- Uses a separate queue for each namespaced QueueInformer - Extends `make e2e-local` test timeout to 20 minutes - Checks for olm.operatorNamespace difference to determine copied CSVs
1 parent ecafeca commit 7b1c6b5

File tree

131 files changed

+9236
-9009
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

131 files changed

+9236
-9009
lines changed

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ require (
6666
google.golang.org/grpc v1.16.0 // indirect
6767
gopkg.in/inf.v0 v0.9.1 // indirect
6868
gopkg.in/natefinch/lumberjack.v2 v2.0.0-20170531160350-a96e63847dc3 // indirect
69-
k8s.io/api v0.0.0-20180904230853-4e7be11eab3f
69+
k8s.io/api v0.0.0-20181108095152-eee84a6322ca
7070
k8s.io/apiextensions-apiserver v0.0.0-20180905004947-16750353bf97
7171
k8s.io/apimachinery v0.0.0-20181026144827-8ee1a638bafa
7272
k8s.io/apiserver v0.0.0-20181026151315-13cfe3978170

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -210,8 +210,8 @@ gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWD
210210
gopkg.in/yaml.v2 v2.2.1 h1:mUhvW9EsL+naU5Q3cakzfE91YhliOondGd6ZrsDBHQE=
211211
gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
212212
honnef.co/go/tools v0.0.0-20180728063816-88497007e858/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
213-
k8s.io/api v0.0.0-20180904230853-4e7be11eab3f h1:DLRkv8Ps4Sdx8Srj+UtGisj4whV7v/HezlHx6QqiZqE=
214-
k8s.io/api v0.0.0-20180904230853-4e7be11eab3f/go.mod h1:iuAfoD4hCxJ8Onx9kaTIt30j7jUFS00AXQi6QMi99vA=
213+
k8s.io/api v0.0.0-20181108095152-eee84a6322ca h1:0gIeW03B5m7yni69Y95oPgDXv7ow7puUMt2WqhJIKY8=
214+
k8s.io/api v0.0.0-20181108095152-eee84a6322ca/go.mod h1:iuAfoD4hCxJ8Onx9kaTIt30j7jUFS00AXQi6QMi99vA=
215215
k8s.io/apiextensions-apiserver v0.0.0-20180905004947-16750353bf97 h1:s4lWWs6JN5kWVzk5bztddkr5kgO/cGIbqTDP+QttUeQ=
216216
k8s.io/apiextensions-apiserver v0.0.0-20180905004947-16750353bf97/go.mod h1:IxkesAMoaCRoLrPJdZNZUQp9NfZnzqaVzLhb2VEQzXE=
217217
k8s.io/apimachinery v0.0.0-20181026144827-8ee1a638bafa h1:i0EOpPFWExNx7efINILpw8LJeah7gakRl1zjvwVfjiI=

pkg/controller/operators/olm/operator.go

Lines changed: 61 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -50,10 +50,10 @@ const (
5050

5151
type Operator struct {
5252
*queueinformer.Operator
53-
csvQueue workqueue.RateLimitingInterface
54-
client versioned.Interface
55-
resolver install.StrategyResolverInterface
56-
lister operatorlister.OperatorLister
53+
csvQueues map[string]workqueue.RateLimitingInterface
54+
client versioned.Interface
55+
resolver install.StrategyResolverInterface
56+
lister operatorlister.OperatorLister
5757
recorder record.EventRecorder
5858
}
5959

@@ -75,11 +75,12 @@ func NewOperator(crClient versioned.Interface, opClient operatorclient.ClientInt
7575
}
7676

7777
op := &Operator{
78-
Operator: queueOperator,
79-
client: crClient,
80-
lister: operatorlister.NewLister(),
81-
resolver: resolver,
82-
recorder: eventRecorder,
78+
Operator: queueOperator,
79+
csvQueues: make(map[string]workqueue.RateLimitingInterface),
80+
client: crClient,
81+
lister: operatorlister.NewLister(),
82+
resolver: resolver,
83+
recorder: eventRecorder,
8384
}
8485

8586
// Set up RBAC informers
@@ -96,7 +97,7 @@ func NewOperator(crClient versioned.Interface, opClient operatorclient.ClientInt
9697
namespaceInformer.Informer(),
9798
op.syncObject,
9899
nil,
99-
"namespace",
100+
"namespaces",
100101
metrics.NewMetricsNil(),
101102
)
102103
op.RegisterQueueInformer(queueInformer)
@@ -117,7 +118,7 @@ func NewOperator(crClient versioned.Interface, opClient operatorclient.ClientInt
117118
&cache.ResourceEventHandlerFuncs{
118119
DeleteFunc: op.handleDeletion,
119120
},
120-
"namespace",
121+
"rbac",
121122
metrics.NewMetricsNil(),
122123
)
123124
for _, informer := range rbacQueueInformers {
@@ -181,95 +182,74 @@ func NewOperator(crClient versioned.Interface, opClient operatorclient.ClientInt
181182
&cache.ResourceEventHandlerFuncs{
182183
DeleteFunc: op.handleDeletion,
183184
},
184-
"services",
185+
"serviceaccounts",
185186
metrics.NewMetricsNil(),
186187
))
187188
op.lister.CoreV1().RegisterServiceAccountLister(metav1.NamespaceAll, serviceAccountInformer.Lister())
188189

189-
// Set up watch on CSVs
190-
csvInformers := []cache.SharedIndexInformer{}
190+
// csvInformers for each namespace all use the same backing queue keys are namespaced
191+
csvHandlers := &cache.ResourceEventHandlerFuncs{
192+
DeleteFunc: op.deleteClusterServiceVersion,
193+
}
191194
for _, namespace := range namespaces {
192-
log.Debugf("watching for CSVs in namespace %s", namespace)
195+
log.WithField("namespace", namespace).Infof("watching CSVs")
193196
sharedInformerFactory := externalversions.NewSharedInformerFactoryWithOptions(crClient, wakeupInterval, externalversions.WithNamespace(namespace))
194-
informer := sharedInformerFactory.Operators().V1alpha1().ClusterServiceVersions()
195-
csvInformers = append(csvInformers, informer.Informer())
196-
op.lister.OperatorsV1alpha1().RegisterClusterServiceVersionLister(namespace, informer.Lister())
197-
}
197+
csvInformer := sharedInformerFactory.Operators().V1alpha1().ClusterServiceVersions()
198+
op.lister.OperatorsV1alpha1().RegisterClusterServiceVersionLister(namespace, csvInformer.Lister())
198199

199-
// csvInformers for each namespace all use the same backing queue
200-
// queue keys are namespaced
201-
csvQueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "clusterserviceversions")
202-
csvHandlers := cache.ResourceEventHandlerFuncs{
203-
DeleteFunc: op.deleteClusterServiceVersion,
204-
}
205-
queueInformers := queueinformer.New(
206-
csvQueue,
207-
csvInformers,
208-
op.syncClusterServiceVersion,
209-
&csvHandlers,
210-
"csv",
211-
metrics.NewMetricsCSV(op.client),
212-
)
213-
for _, informer := range queueInformers {
214-
op.RegisterQueueInformer(informer)
200+
// Register queue and QueueInformer
201+
queueName := fmt.Sprintf("%s/clusterserviceversions", namespace)
202+
csvQueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), queueName)
203+
csvQueueInformer := queueinformer.NewInformer(csvQueue, csvInformer.Informer(), op.syncClusterServiceVersion, csvHandlers, queueName, metrics.NewMetricsCSV(op.client))
204+
op.RegisterQueueInformer(csvQueueInformer)
205+
op.csvQueues[namespace] = csvQueue
215206
}
216-
op.csvQueue = csvQueue
217207

218-
// set up watch on deployments
219-
depInformers := []cache.SharedIndexInformer{}
220-
for _, namespace := range namespaces {
221-
log.Debugf("watching deployments in namespace %s", namespace)
222-
informer := informers.NewSharedInformerFactoryWithOptions(opClient.KubernetesInterface(), wakeupInterval, informers.WithNamespace(namespace)).Apps().V1().Deployments()
223-
depInformers = append(depInformers, informer.Informer())
224-
op.lister.AppsV1().RegisterDeploymentLister(namespace, informer.Lister())
208+
// Set up watch on deployments
209+
depHandlers := &cache.ResourceEventHandlerFuncs{
210+
DeleteFunc: op.handleDeletion,
225211
}
212+
for _, namespace := range namespaces {
213+
log.WithField("namespace", namespace).Infof("watching deployments")
214+
depInformer := informers.NewSharedInformerFactoryWithOptions(opClient.KubernetesInterface(), wakeupInterval, informers.WithNamespace(namespace)).Apps().V1().Deployments()
215+
op.lister.AppsV1().RegisterDeploymentLister(namespace, depInformer.Lister())
226216

227-
depQueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "csv-deployments")
228-
depQueueInformers := queueinformer.New(
229-
depQueue,
230-
depInformers,
231-
op.syncDeployment,
232-
&cache.ResourceEventHandlerFuncs{
233-
DeleteFunc: op.handleDeletion,
234-
},
235-
"deployment",
236-
metrics.NewMetricsNil(),
237-
)
238-
for _, informer := range depQueueInformers {
239-
op.RegisterQueueInformer(informer)
217+
// Register queue and QueueInformer
218+
queueName := fmt.Sprintf("%s/csv-deployments", namespace)
219+
depQueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), queueName)
220+
depQueueInformer := queueinformer.NewInformer(depQueue, depInformer.Informer(), op.syncDeployment, depHandlers, queueName, metrics.NewMetricsNil())
221+
op.RegisterQueueInformer(depQueueInformer)
240222
}
241223

242224
// Create an informer for the operator group
243-
operatorGroupInformers := []cache.SharedIndexInformer{}
244225
for _, namespace := range namespaces {
245-
informerFactory := externalversions.NewSharedInformerFactoryWithOptions(crClient, wakeupInterval, externalversions.WithNamespace(namespace))
246-
informer := informerFactory.Operators().V1alpha2().OperatorGroups()
247-
operatorGroupInformers = append(operatorGroupInformers, informer.Informer())
248-
op.lister.OperatorsV1alpha2().RegisterOperatorGroupLister(namespace, informer.Lister())
249-
}
250-
251-
// Register OperatorGroup informers.
252-
operatorGroupQueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "operatorgroups")
253-
operatorGroupQueueInformer := queueinformer.New(
254-
operatorGroupQueue,
255-
operatorGroupInformers,
256-
op.syncOperatorGroups,
257-
nil,
258-
"operatorgroups",
259-
metrics.NewMetricsNil(),
260-
)
261-
for _, informer := range operatorGroupQueueInformer {
262-
op.RegisterQueueInformer(informer)
226+
log.WithField("namespace", namespace).Infof("watching OperatorGroups")
227+
sharedInformerFactory := externalversions.NewSharedInformerFactoryWithOptions(crClient, wakeupInterval, externalversions.WithNamespace(namespace))
228+
operatorGroupInformer := sharedInformerFactory.Operators().V1alpha2().OperatorGroups()
229+
op.lister.OperatorsV1alpha2().RegisterOperatorGroupLister(namespace, operatorGroupInformer.Lister())
230+
231+
// Register queue and QueueInformer
232+
queueName := fmt.Sprintf("%s/operatorgroups", namespace)
233+
operatorGroupQueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), queueName)
234+
operatorGroupQueueInformer := queueinformer.NewInformer(operatorGroupQueue, operatorGroupInformer.Informer(), op.syncOperatorGroups, nil, queueName, metrics.NewMetricsNil())
235+
op.RegisterQueueInformer(operatorGroupQueueInformer)
263236
}
264237

265238
return op, nil
266239
}
267240

268241
func (a *Operator) requeueCSV(name, namespace string) {
269-
// we can build the key directly, will need to change if queue uses different key scheme
242+
// We can build the key directly, will need to change if queue uses different key scheme
270243
key := fmt.Sprintf("%s/%s", namespace, name)
271-
log.Debugf("requeueing CSV %s", key)
272-
a.csvQueue.AddRateLimited(key)
244+
logger := log.WithField("key", key)
245+
logger.Debugf("requeueing CSV")
246+
247+
if queue, ok := a.csvQueues[namespace]; ok {
248+
queue.AddRateLimited(key)
249+
return
250+
}
251+
252+
logger.Debugf("couldn't find queue for CSV")
273253
}
274254

275255
func (a *Operator) syncDeployment(obj interface{}) (syncError error) {
@@ -398,7 +378,9 @@ func (a *Operator) syncClusterServiceVersion(obj interface{}) (syncError error)
398378
"phase": clusterServiceVersion.Status.Phase,
399379
})
400380

401-
if clusterServiceVersion.Status.Reason == v1alpha1.CSVReasonCopied {
381+
operatorNamespace, ok := clusterServiceVersion.GetAnnotations()["olm.operatorNamespace"]
382+
if clusterServiceVersion.Status.Reason == v1alpha1.CSVReasonCopied ||
383+
ok && clusterServiceVersion.GetNamespace() != operatorNamespace {
402384
logger.Info("skip sync of dummy CSV")
403385
return a.removeDanglingChildCSVs(clusterServiceVersion)
404386
}

pkg/controller/operators/olm/operator_test.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -133,11 +133,11 @@ func NewFakeOperator(clientObjs []runtime.Object, k8sObjs []runtime.Object, extO
133133
// Create the new operator
134134
queueOperator, err := queueinformer.NewOperatorFromClient(opClientFake)
135135
op := &Operator{
136-
Operator: queueOperator,
137-
client: clientFake,
138-
lister: operatorlister.NewLister(),
139-
resolver: resolver,
140-
csvQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "clusterserviceversions"),
136+
Operator: queueOperator,
137+
client: clientFake,
138+
lister: operatorlister.NewLister(),
139+
resolver: resolver,
140+
csvQueues: make(map[string]workqueue.RateLimitingInterface),
141141
recorder: eventRecorder,
142142
}
143143

scripts/run_e2e_local.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,4 +42,4 @@ trap cleanupAndExit SIGINT SIGTERM EXIT
4242

4343
# run tests
4444
e2e_kubeconfig=${KUBECONFIG:-~/.kube/config}
45-
KUBECONFIG=${e2e_kubeconfig} NAMESPACE=${namespace} go test -v ./test/e2e/... ${1/[[:alnum:]-]*/-run ${1}}
45+
KUBECONFIG=${e2e_kubeconfig} NAMESPACE=${namespace} go test -v -timeout 20m ./test/e2e/... ${1/[[:alnum:]-]*/-run ${1}}

0 commit comments

Comments
 (0)