Skip to content

Commit 37bccd5

Browse files
committed
fix(olm): move csv copying into its own workqueue to free up csv
processing
1 parent b7594dd commit 37bccd5

File tree

18 files changed

+557
-110
lines changed

18 files changed

+557
-110
lines changed

go.mod

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ require (
2929
github.com/spf13/cobra v0.0.3
3030
github.com/stretchr/testify v1.2.2
3131
go.etcd.io/bbolt v1.3.2 // indirect
32-
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c // indirect
32+
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c
3333
google.golang.org/grpc v1.16.0
3434
k8s.io/api v0.0.0-20190118113203-912cbe2bfef3
3535
k8s.io/apiextensions-apiserver v0.0.0-20190223021643-57c81b676ab1
@@ -41,5 +41,5 @@ require (
4141
k8s.io/klog v0.2.0 // indirect
4242
k8s.io/kube-aggregator v0.0.0-20190223015803-f706565beac0
4343
k8s.io/kube-openapi v0.0.0-20181031203759-72693cb1fadd
44-
k8s.io/kubernetes v1.11.8-beta.0.0.20190223014307-4e209c9383fa
44+
k8s.io/kubernetes v1.11.9-beta.0.0.20190305054513-b2539d50ae56
4545
)

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -320,5 +320,5 @@ k8s.io/kube-openapi v0.0.0-20181031203759-72693cb1fadd h1:ggv/Vfza0i5xuhUZyYyxcc
320320
k8s.io/kube-openapi v0.0.0-20181031203759-72693cb1fadd/go.mod h1:BXM9ceUBTj2QnfH2MK1odQs778ajze1RxcmP6S8RVVc=
321321
k8s.io/kubernetes v1.11.7-beta.0.0.20181219023948-b875d52ea96d/go.mod h1:ocZa8+6APFNC2tX1DZASIbocyYT5jHzqFVsY5aoB7Jk=
322322
k8s.io/kubernetes v1.11.8-beta.0.0.20190124204751-3a10094374f2/go.mod h1:ocZa8+6APFNC2tX1DZASIbocyYT5jHzqFVsY5aoB7Jk=
323-
k8s.io/kubernetes v1.11.8-beta.0.0.20190223014307-4e209c9383fa h1:aKI5IFZpu2ilGUMRIMfitvAvTBv8Th2opq5ErM7Y1SU=
324-
k8s.io/kubernetes v1.11.8-beta.0.0.20190223014307-4e209c9383fa/go.mod h1:ocZa8+6APFNC2tX1DZASIbocyYT5jHzqFVsY5aoB7Jk=
323+
k8s.io/kubernetes v1.11.9-beta.0.0.20190305054513-b2539d50ae56 h1:54Sglifzgv95ALJ0VzgQrU7IZH54lviTMUkWegwyweM=
324+
k8s.io/kubernetes v1.11.9-beta.0.0.20190305054513-b2539d50ae56/go.mod h1:ocZa8+6APFNC2tX1DZASIbocyYT5jHzqFVsY5aoB7Jk=

pkg/api/apis/operators/v1alpha1/clusterserviceversion.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,10 @@ import (
88
"k8s.io/client-go/tools/record"
99
)
1010

11+
const (
12+
CopiedLabelKey = "olm.copiedFrom"
13+
)
14+
1115
// obsoleteReasons are the set of reasons that mean a CSV should no longer be processed as active
1216
var obsoleteReasons = map[ConditionReason]struct{}{
1317
CSVReasonReplaced: {},
@@ -89,6 +93,12 @@ func (c *ClusterServiceVersion) IsCopied() bool {
8993
if c.Status.Reason == CSVReasonCopied || ok && c.GetNamespace() != operatorNamespace {
9094
return true
9195
}
96+
97+
if labels := c.GetLabels(); labels != nil {
98+
if _, ok := labels[CopiedLabelKey]; ok {
99+
return true
100+
}
101+
}
92102
return false
93103
}
94104

pkg/controller/operators/catalog/operator_test.go

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"github.com/ghodss/yaml"
1111
"github.com/sirupsen/logrus"
1212
"github.com/stretchr/testify/require"
13+
"golang.org/x/time/rate"
1314
appsv1 "k8s.io/api/apps/v1"
1415
corev1 "k8s.io/api/core/v1"
1516
rbacv1 "k8s.io/api/rbac/v1"
@@ -516,13 +517,18 @@ func NewFakeOperator(clientObjs []runtime.Object, k8sObjs []runtime.Object, extO
516517
// Create the new operator
517518
queueOperator, err := queueinformer.NewOperatorFromClient(opClientFake, logrus.New())
518519
op := &Operator{
519-
Operator: queueOperator,
520-
client: clientFake,
521-
lister: lister,
522-
namespace: namespace,
523-
namespaceResolveQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "resolver"),
524-
sources: make(map[resolver.CatalogKey]resolver.SourceRef),
525-
resolver: &fakes.FakeResolver{},
520+
Operator: queueOperator,
521+
client: clientFake,
522+
lister: lister,
523+
namespace: namespace,
524+
namespaceResolveQueue: workqueue.NewNamedRateLimitingQueue(
525+
workqueue.NewMaxOfRateLimiter(
526+
workqueue.NewItemExponentialFailureRateLimiter(1*time.Second, 1000*time.Second),
527+
// 1 qps, 100 bucket size. This is only for retry speed and its only the overall factor (not per item)
528+
&workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(1), 100)},
529+
), "resolver"),
530+
sources: make(map[resolver.CatalogKey]resolver.SourceRef),
531+
resolver: &fakes.FakeResolver{},
526532
}
527533

528534
op.reconciler = &reconciler.RegistryReconcilerFactory{

pkg/controller/operators/olm/operator.go

Lines changed: 83 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,6 @@ import (
1212
k8serrors "k8s.io/apimachinery/pkg/api/errors"
1313
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1414
"k8s.io/apimachinery/pkg/labels"
15-
"k8s.io/apimachinery/pkg/runtime"
16-
"k8s.io/apimachinery/pkg/types"
1715
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
1816
"k8s.io/client-go/informers"
1917
"k8s.io/client-go/tools/cache"
@@ -50,13 +48,14 @@ const (
5048

5149
type Operator struct {
5250
*queueinformer.Operator
53-
csvQueueSet queueinformer.ResourceQueueSet
54-
ogQueueSet queueinformer.ResourceQueueSet
55-
client versioned.Interface
56-
resolver install.StrategyResolverInterface
57-
apiReconciler resolver.APIIntersectionReconciler
58-
lister operatorlister.OperatorLister
59-
recorder record.EventRecorder
51+
csvQueueSet queueinformer.ResourceQueueSet
52+
ogQueueSet queueinformer.ResourceQueueSet
53+
client versioned.Interface
54+
resolver install.StrategyResolverInterface
55+
apiReconciler resolver.APIIntersectionReconciler
56+
lister operatorlister.OperatorLister
57+
recorder record.EventRecorder
58+
copyQueueIndexer *queueinformer.QueueIndexer
6059
}
6160

6261
func NewOperator(logger *logrus.Logger, crClient versioned.Interface, opClient operatorclient.ClientInterface, strategyResolver install.StrategyResolverInterface, wakeupInterval time.Duration, namespaces []string) (*Operator, error) {
@@ -146,7 +145,10 @@ func NewOperator(logger *logrus.Logger, crClient versioned.Interface, opClient o
146145
workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "namespaces"),
147146
namespaceInformer.Informer(),
148147
op.syncObject,
149-
nil,
148+
&cache.ResourceEventHandlerFuncs{
149+
DeleteFunc: op.namespaceAddedOrRemoved,
150+
AddFunc: op.namespaceAddedOrRemoved,
151+
},
150152
"namespaces",
151153
metrics.NewMetricsNil(),
152154
logger,
@@ -229,6 +231,8 @@ func NewOperator(logger *logrus.Logger, crClient versioned.Interface, opClient o
229231
))
230232
op.lister.CoreV1().RegisterServiceAccountLister(metav1.NamespaceAll, serviceAccountInformer.Lister())
231233

234+
csvIndexes := map[string]cache.Indexer{}
235+
232236
// csvInformers for each namespace all use the same backing queue keys are namespaced
233237
csvHandlers := &cache.ResourceEventHandlerFuncs{
234238
DeleteFunc: op.handleClusterServiceVersionDeletion,
@@ -245,8 +249,16 @@ func NewOperator(logger *logrus.Logger, crClient versioned.Interface, opClient o
245249
csvQueueInformer := queueinformer.NewInformer(csvQueue, csvInformer.Informer(), op.syncClusterServiceVersion, csvHandlers, queueName, metrics.NewMetricsCSV(op.lister.OperatorsV1alpha1().ClusterServiceVersionLister()), logger)
246250
op.RegisterQueueInformer(csvQueueInformer)
247251
op.csvQueueSet[namespace] = csvQueue
252+
253+
csvIndexes[namespace] = csvInformer.Informer().GetIndexer()
248254
}
249255

256+
// Register separate queue for copying csvs
257+
csvCopyQueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "csvCopy")
258+
csvQueueIndexer := queueinformer.NewQueueIndexer(csvCopyQueue, csvIndexes, op.syncCopyCSV, "csvCopy", logger, metrics.NewMetricsNil())
259+
op.RegisterQueueIndexer(csvQueueIndexer)
260+
op.copyQueueIndexer = csvQueueIndexer
261+
250262
// Set up watch on deployments
251263
depHandlers := &cache.ResourceEventHandlerFuncs{
252264
// TODO: pass closure that forgets queue item after calling custom deletion handler.
@@ -283,35 +295,19 @@ func NewOperator(logger *logrus.Logger, crClient versioned.Interface, opClient o
283295
}
284296

285297
func (a *Operator) syncObject(obj interface{}) (syncError error) {
286-
// Assert as runtime.Object
287-
runtimeObj, ok := obj.(runtime.Object)
288-
if !ok {
289-
syncError = errors.New("object sync: casting to runtime.Object failed")
290-
a.Log.Warn(syncError.Error())
291-
return
292-
}
293-
294-
gvk := runtimeObj.GetObjectKind().GroupVersionKind()
295-
logger := a.Log.WithFields(logrus.Fields{
296-
"group": gvk.Group,
297-
"version": gvk.Version,
298-
"kind": gvk.Kind,
299-
})
300-
301298
// Assert as metav1.Object
302299
metaObj, ok := obj.(metav1.Object)
303300
if !ok {
304301
syncError = errors.New("object sync: casting to metav1.Object failed")
305-
logger.Warn(syncError.Error())
302+
a.Log.Warn(syncError.Error())
306303
return
307304
}
308-
logger = a.Log.WithFields(logrus.Fields{
305+
logger := a.Log.WithFields(logrus.Fields{
309306
"name": metaObj.GetName(),
310307
"namespace": metaObj.GetNamespace(),
308+
"sel": metaObj.GetSelfLink(),
311309
})
312310

313-
logger.Debug("syncing")
314-
315311
// Requeue all owner CSVs
316312
if ownerutil.IsOwnedByKind(metaObj, v1alpha1.ClusterServiceVersionKind) {
317313
logger.Debug("requeueing owner CSVs")
@@ -324,29 +320,34 @@ func (a *Operator) syncObject(obj interface{}) (syncError error) {
324320
a.requeueOwnerCSVs(metaObj)
325321
}
326322

327-
// TODO: only check this on namespace add/delete, not on every namespace sync
323+
return nil
324+
}
325+
326+
func (a *Operator) namespaceAddedOrRemoved(obj interface{}) {
328327
// Check to see if any operator groups are associated with this namespace
329328
namespace, ok := obj.(*corev1.Namespace)
330329
if !ok {
331-
return nil
330+
return
332331
}
333332

333+
logger := a.Log.WithFields(logrus.Fields{
334+
"name": namespace.GetName(),
335+
})
336+
334337
operatorGroupList, err := a.lister.OperatorsV1alpha2().OperatorGroupLister().OperatorGroups(metav1.NamespaceAll).List(labels.Everything())
335338
if err != nil {
336-
syncError = fmt.Errorf("lister failed: %v", err)
337-
logger.Warn(syncError.Error())
339+
logger.WithError(err).Warn("lister failed")
338340
return
339341
}
340342

341343
for _, group := range operatorGroupList {
342344
if resolver.NewNamespaceSet(group.Status.Namespaces).Contains(namespace.GetName()) {
343345
if err := a.ogQueueSet.Requeue(group.Name, group.Namespace); err != nil {
344-
logger.Warn(err)
346+
logger.WithError(err).Warn("error requeuing operatorgroup")
345347
}
346348
}
347349
}
348-
349-
return nil
350+
return
350351
}
351352

352353
func (a *Operator) handleClusterServiceVersionDeletion(obj interface{}) {
@@ -482,8 +483,9 @@ func (a *Operator) removeDanglingChildCSVs(csv *v1alpha1.ClusterServiceVersion)
482483
}
483484

484485
if annotations := parent.GetAnnotations(); annotations != nil {
485-
if resolver.NewNamespaceSet(strings.Split(annotations[v1alpha2.OperatorGroupTargetsAnnotationKey], ",")).Contains(csv.GetNamespace()) {
486-
logger.Debug("deleting copied CSV since parent no longer lists this as a target namespace")
486+
if !resolver.NewNamespaceSetFromString(annotations[v1alpha2.OperatorGroupTargetsAnnotationKey]).Contains(csv.GetNamespace()) {
487+
logger.WithField("parentTargets", annotations[v1alpha2.OperatorGroupTargetsAnnotationKey]).
488+
Debug("deleting copied CSV since parent no longer lists this as a target namespace")
487489
return a.deleteChild(csv)
488490
}
489491
}
@@ -513,6 +515,10 @@ func (a *Operator) syncClusterServiceVersion(obj interface{}) (syncError error)
513515

514516
outCSV, syncError := a.transitionCSVState(*clusterServiceVersion)
515517

518+
if outCSV == nil {
519+
return
520+
}
521+
516522
// status changed, update CSV
517523
if !(outCSV.Status.LastUpdateTime == clusterServiceVersion.Status.LastUpdateTime &&
518524
outCSV.Status.Phase == clusterServiceVersion.Status.Phase &&
@@ -536,27 +542,55 @@ func (a *Operator) syncClusterServiceVersion(obj interface{}) (syncError error)
536542
return
537543
}
538544

539-
operatorGroup := a.operatorGroupForActiveCSV(logger, outCSV)
545+
a.copyQueueIndexer.Enqueue(outCSV)
546+
547+
return
548+
}
549+
550+
func (a *Operator) syncCopyCSV(obj interface{}) (syncError error) {
551+
clusterServiceVersion, ok := obj.(*v1alpha1.ClusterServiceVersion)
552+
if !ok {
553+
a.Log.Debugf("wrong type: %#v", obj)
554+
return fmt.Errorf("casting ClusterServiceVersion failed")
555+
}
556+
557+
logger := a.Log.WithFields(logrus.Fields{
558+
"id": queueinformer.NewLoopID(),
559+
"csv": clusterServiceVersion.GetName(),
560+
"namespace": clusterServiceVersion.GetNamespace(),
561+
"phase": clusterServiceVersion.Status.Phase,
562+
})
563+
564+
logger.Debug("copying CSV")
565+
566+
operatorGroup := a.operatorGroupForActiveCSV(logger, clusterServiceVersion)
540567
if operatorGroup == nil {
541-
logger.WithField("reason", "no operatorgroup found for active CSV").Info("skipping CSV resource copy to target namespaces")
568+
logger.WithField("reason", "no operatorgroup found for active CSV").Debug("skipping CSV resource copy to target namespaces")
569+
return
570+
}
571+
572+
if len(operatorGroup.Status.Namespaces) == 1 && operatorGroup.Status.Namespaces[0] == operatorGroup.GetNamespace() {
573+
logger.Debug("skipping copy for OwnNamespace operatorgroup")
542574
return
543575
}
544576

545577
// Check if we need to do any copying / annotation for the operatorgroup
546578
if err := a.ensureCSVsInNamespaces(clusterServiceVersion, operatorGroup, resolver.NewNamespaceSet(operatorGroup.Status.Namespaces)); err != nil {
547579
logger.WithError(err).Info("couldn't copy CSV to target namespaces")
580+
syncError = err
548581
}
549582

550583
// Ensure operator has access to targetnamespaces
551-
if err := a.ensureRBACInTargetNamespace(outCSV, operatorGroup); err != nil {
584+
if err := a.ensureRBACInTargetNamespace(clusterServiceVersion, operatorGroup); err != nil {
552585
logger.WithError(err).Info("couldn't ensure RBAC in target namespaces")
586+
syncError = err
553587
}
554588

555589
// Ensure cluster roles exist for using provided apis
556-
if err := a.ensureClusterRolesForCSV(outCSV, operatorGroup); err != nil {
590+
if err := a.ensureClusterRolesForCSV(clusterServiceVersion, operatorGroup); err != nil {
557591
logger.WithError(err).Info("couldn't ensure clusterroles for provided api types")
592+
syncError = err
558593
}
559-
560594
return
561595
}
562596

@@ -1214,51 +1248,22 @@ func (a *Operator) requeueOwnerCSVs(ownee metav1.Object) {
12141248

12151249
// Attempt to requeue CSV owners in the same namespace as the object
12161250
owners := ownerutil.GetOwnersByKind(ownee, v1alpha1.ClusterServiceVersionKind)
1217-
if len(owners) == 0 {
1218-
logger.Debugf("No ownerreferences found")
1219-
return
1220-
}
1221-
1222-
if ownee.GetNamespace() != metav1.NamespaceAll {
1251+
if len(owners) > 0 && ownee.GetNamespace() != metav1.NamespaceAll {
12231252
for _, ownerCSV := range owners {
12241253
// Since cross-namespace CSVs can't exist we're guaranteed the owner will be in the same namespace
12251254
err := a.csvQueueSet.Requeue(ownerCSV.Name, ownee.GetNamespace())
12261255
if err != nil {
1227-
a.Log.Warn(err.Error())
1256+
logger.Warn(err.Error())
12281257
}
12291258
}
1230-
12311259
return
12321260
}
12331261

1234-
// Get all existing CSVs from the indexer
1235-
csvs, err := a.lister.OperatorsV1alpha1().ClusterServiceVersionLister().List(labels.Everything())
1236-
if err != nil {
1237-
logger.Warnf("error attempting to list all CSVs in indexer: %s", err.Error())
1238-
return
1239-
}
1240-
if len(csvs) == 0 {
1241-
logger.Infof("no existing CSVs found")
1242-
return
1243-
}
1244-
1245-
csvSet := make(map[types.UID]*v1alpha1.ClusterServiceVersion, len(csvs))
1246-
for _, csv := range csvs {
1247-
csvSet[csv.GetUID()] = csv
1248-
}
1249-
logger.WithField("clusterwide", len(csvs)).Debug("number of csvs")
1250-
1251-
// Requeue existing owner CSVs
1252-
for _, owner := range owners {
1253-
csv, ok := csvSet[owner.UID]
1254-
if !ok {
1255-
logger.Warnf("owner %v does not exist", owner.UID)
1256-
continue
1257-
}
1258-
1259-
err = a.csvQueueSet.Requeue(csv.GetName(), csv.GetNamespace())
1262+
// Requeue owners based on labels
1263+
if name, ns, ok := ownerutil.GetOwnerByKindLabel(ownee, v1alpha1.ClusterServiceVersionKind); ok {
1264+
err := a.csvQueueSet.Requeue(name, ns)
12601265
if err != nil {
1261-
a.Log.Warn(err.Error())
1266+
logger.Warn(err.Error())
12621267
}
12631268
}
12641269
}

0 commit comments

Comments
 (0)