Skip to content

Commit d848b97

Browse files
committed
fix(operatorgroups): gc copied csvs from a separate queue
1 parent 26943c9 commit d848b97

File tree

7 files changed

+59
-88
lines changed

7 files changed

+59
-88
lines changed

go.sum

Lines changed: 2 additions & 71 deletions
Large diffs are not rendered by default.

pkg/controller/operators/olm/operator.go

Lines changed: 29 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ type Operator struct {
5858
lister operatorlister.OperatorLister
5959
recorder record.EventRecorder
6060
copyQueueIndexer *queueinformer.QueueIndexer
61+
gcQueueIndexer *queueinformer.QueueIndexer
6162
}
6263

6364
func NewOperator(logger *logrus.Logger, crClient versioned.Interface, opClient operatorclient.ClientInterface, strategyResolver install.StrategyResolverInterface, wakeupInterval time.Duration, namespaces []string) (*Operator, error) {
@@ -261,6 +262,12 @@ func NewOperator(logger *logrus.Logger, crClient versioned.Interface, opClient o
261262
op.RegisterQueueIndexer(csvQueueIndexer)
262263
op.copyQueueIndexer = csvQueueIndexer
263264

265+
// Register separate queue for copying csvs
266+
csvGCQueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "csvGC")
267+
csvGCQueueIndexer := queueinformer.NewQueueIndexer(csvGCQueue, csvIndexes, op.syncGcCsv, "csvGC", logger, metrics.NewMetricsNil())
268+
op.RegisterQueueIndexer(csvGCQueueIndexer)
269+
op.gcQueueIndexer = csvGCQueueIndexer
270+
264271
// Set up watch on deployments
265272
depHandlers := &cache.ResourceEventHandlerFuncs{
266273
// TODO: pass closure that forgets queue item after calling custom deletion handler.
@@ -435,7 +442,7 @@ func (a *Operator) handleClusterServiceVersionDeletion(obj interface{}) {
435442
for _, namespace := range namespaces {
436443
if namespace != operatorNamespace {
437444
logger.WithField("targetNamespace", namespace).Debug("requeueing child csv for deletion")
438-
a.csvQueueSet.Requeue(clusterServiceVersion.GetName(), namespace)
445+
a.gcQueueIndexer.Add(fmt.Sprintf("%s/%s", namespace, clusterServiceVersion.GetName()))
439446
}
440447
}
441448

@@ -478,33 +485,34 @@ func (a *Operator) removeDanglingChildCSVs(csv *v1alpha1.ClusterServiceVersion)
478485
operatorNamespace, ok := csv.Annotations[v1alpha2.OperatorGroupNamespaceAnnotationKey]
479486
if !ok {
480487
logger.Debug("missing operator namespace annotation on copied CSV")
481-
return a.deleteChild(csv)
488+
return a.deleteChild(csv, logger)
482489
}
483490

484491
logger = logger.WithField("parentNamespace", operatorNamespace)
485492
parent, err := a.lister.OperatorsV1alpha1().ClusterServiceVersionLister().ClusterServiceVersions(operatorNamespace).Get(csv.GetName())
486493
if k8serrors.IsNotFound(err) || k8serrors.IsGone(err) || parent == nil {
487494
logger.Debug("deleting copied CSV since parent is missing")
488-
return a.deleteChild(csv)
495+
return a.deleteChild(csv, logger)
489496
}
490497

491498
if parent.Status.Phase == v1alpha1.CSVPhaseFailed && parent.Status.Reason == v1alpha1.CSVReasonInterOperatorGroupOwnerConflict {
492499
logger.Debug("deleting copied CSV since parent has intersecting operatorgroup conflict")
493-
return a.deleteChild(csv)
500+
return a.deleteChild(csv, logger)
494501
}
495502

496503
if annotations := parent.GetAnnotations(); annotations != nil {
497504
if !resolver.NewNamespaceSetFromString(annotations[v1alpha2.OperatorGroupTargetsAnnotationKey]).Contains(csv.GetNamespace()) {
498505
logger.WithField("parentTargets", annotations[v1alpha2.OperatorGroupTargetsAnnotationKey]).
499506
Debug("deleting copied CSV since parent no longer lists this as a target namespace")
500-
return a.deleteChild(csv)
507+
return a.deleteChild(csv, logger)
501508
}
502509
}
503510

504511
return nil
505512
}
506513

507-
func (a *Operator) deleteChild(csv *v1alpha1.ClusterServiceVersion) error {
514+
func (a *Operator) deleteChild(csv *v1alpha1.ClusterServiceVersion, logger *logrus.Entry) error {
515+
logger.Debug("gcing csv")
508516
return a.client.OperatorsV1alpha1().ClusterServiceVersions(csv.GetNamespace()).Delete(csv.GetName(), &metav1.DeleteOptions{})
509517
}
510518

@@ -609,6 +617,19 @@ func (a *Operator) syncCopyCSV(obj interface{}) (syncError error) {
609617
return
610618
}
611619

620+
func (a *Operator) syncGcCsv(obj interface{}) (syncError error) {
621+
clusterServiceVersion, ok := obj.(*v1alpha1.ClusterServiceVersion)
622+
if !ok {
623+
a.Log.Debugf("wrong type: %#v", obj)
624+
return fmt.Errorf("casting ClusterServiceVersion failed")
625+
}
626+
if clusterServiceVersion.IsCopied() {
627+
syncError = a.removeDanglingChildCSVs(clusterServiceVersion)
628+
return
629+
}
630+
return
631+
}
632+
612633
// operatorGroupFromAnnotations returns the OperatorGroup for the CSV only if the CSV is active one in the group
613634
func (a *Operator) operatorGroupFromAnnotations(logger *logrus.Entry, csv *v1alpha1.ClusterServiceVersion) *v1alpha2.OperatorGroup {
614635
annotations := csv.GetAnnotations()
@@ -712,8 +733,8 @@ func (a *Operator) transitionCSVState(in v1alpha1.ClusterServiceVersion) (out *v
712733
now := timeNow()
713734

714735
if out.IsCopied() {
715-
logger.Debug("skipping copied csv transition")
716-
syncError = a.removeDanglingChildCSVs(out)
736+
logger.Debug("skipping copied csv transition, schedule for gc check")
737+
a.gcQueueIndexer.Enqueue(out)
717738
return
718739
}
719740

pkg/controller/operators/olm/operatorgroup.go

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ var (
3333
AdminVerbs = []string{"*"}
3434
EditVerbs = []string{"create", "update", "patch", "delete"}
3535
ViewVerbs = []string{"get", "list", "watch"}
36+
Suffices = []string{AdminSuffix, EditSuffix, ViewSuffix}
3637
VerbsForSuffix = map[string][]string{
3738
AdminSuffix: AdminVerbs,
3839
EditSuffix: EditVerbs,
@@ -294,8 +295,11 @@ func (a *Operator) ensureRBACInTargetNamespace(csv *v1alpha1.ClusterServiceVersi
294295
}
295296
ruleChecker := install.NewCSVRuleChecker(a.lister.RbacV1().RoleLister(), a.lister.RbacV1().RoleBindingLister(), a.lister.RbacV1().ClusterRoleLister(), a.lister.RbacV1().ClusterRoleBindingLister(), csv)
296297

298+
logger := a.Log.WithField("opgroup", operatorGroup.GetName()).WithField("csv", csv.GetName())
299+
297300
// if OperatorGroup is global (all namespaces) we generate cluster roles / cluster role bindings instead
298301
if len(targetNamespaces) == 1 && targetNamespaces[0] == corev1.NamespaceAll {
302+
logger.Debug("opgroup is global")
299303

300304
// synthesize cluster permissions to verify rbac
301305
for _, p := range strategyDetailsDeployment.Permissions {
@@ -309,8 +313,10 @@ func (a *Operator) ensureRBACInTargetNamespace(csv *v1alpha1.ClusterServiceVersi
309313

310314
// operator already has access at the cluster scope
311315
if permMet {
316+
logger.Debug("global operator has correct global permissions")
312317
return nil
313318
}
319+
logger.Debug("lift roles/rolebindings to clusterroles/rolebindings")
314320
if err := a.ensureSingletonRBAC(operatorGroup.GetNamespace(), csv); err != nil {
315321
return err
316322
}
@@ -347,6 +353,7 @@ func (a *Operator) ensureSingletonRBAC(operatorNamespace string, csv *v1alpha1.C
347353
}
348354

349355
for _, r := range ownedRoles {
356+
a.Log.Debug("processing role")
350357
_, err := a.lister.RbacV1().ClusterRoleLister().Get(r.GetName())
351358
if err != nil {
352359
clusterRole := &rbacv1.ClusterRole{
@@ -363,6 +370,7 @@ func (a *Operator) ensureSingletonRBAC(operatorNamespace string, csv *v1alpha1.C
363370
if _, err := a.OpClient.CreateClusterRole(clusterRole); err != nil {
364371
return err
365372
}
373+
a.Log.Debug("created cluster role")
366374
}
367375
}
368376

@@ -578,24 +586,19 @@ func (a *Operator) copyToNamespace(csv *v1alpha1.ClusterServiceVersion, namespac
578586
return nil
579587
}
580588

581-
// TODO: do we want to do this? or just let the dangling CSV clean it up
582589
func (a *Operator) pruneFromNamespace(operatorGroupName, namespace string) error {
583590
fetchedCSVs, err := a.lister.OperatorsV1alpha1().ClusterServiceVersionLister().ClusterServiceVersions(namespace).List(labels.Everything())
584591
if err != nil {
585592
return err
586593
}
587594

588-
errlist := []error{}
589595
for _, csv := range fetchedCSVs {
590596
if csv.IsCopied() && csv.GetAnnotations()[v1alpha2.OperatorGroupAnnotationKey] == operatorGroupName {
591597
a.Log.Debugf("Found CSV '%v' in namespace %v to delete", csv.GetName(), namespace)
592-
err := a.client.OperatorsV1alpha1().ClusterServiceVersions(namespace).Delete(csv.GetName(), &metav1.DeleteOptions{})
593-
if err != nil {
594-
errlist = append(errlist, err)
595-
}
598+
a.gcQueueIndexer.Enqueue(csv)
596599
}
597600
}
598-
return errors.NewAggregate(errlist)
601+
return nil
599602
}
600603

601604
func (a *Operator) setOperatorGroupAnnotations(obj *metav1.ObjectMeta, op *v1alpha2.OperatorGroup, addTargets bool) {

pkg/lib/queueinformer/queueindexer.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,10 @@ func (q *QueueIndexer) Enqueue(obj interface{}) {
3838
q.queue.Add(key)
3939
}
4040

41+
func (q *QueueIndexer) Add(key string) {
42+
q.queue.Add(key)
43+
}
44+
4145
// keyFunc turns an object into a key for the queue. In the future will use a (name, namespace) struct as key
4246
func (q *QueueIndexer) keyFunc(obj interface{}) (string, bool) {
4347
k, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)

pkg/lib/queueinformer/queueinformer_operator.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -140,10 +140,12 @@ func (o *Operator) Run(stopc <-chan struct{}) (ready, done chan struct{}, atLeve
140140
o.Log.Info("starting workers...")
141141
for _, queueInformer := range o.queueInformers {
142142
go o.worker(queueInformer)
143+
go o.worker(queueInformer)
143144
}
144145

145146
for _, queueIndexer := range o.queueIndexers {
146147
go o.indexerWorker(queueIndexer)
148+
go o.indexerWorker(queueIndexer)
147149
}
148150
ready <- struct{}{}
149151
<-stopc
@@ -204,7 +206,7 @@ func (o *Operator) sync(loop *QueueInformer, key string) error {
204206
return loop.syncHandler(obj)
205207
}
206208

207-
// This provides the same function as above, but for indexes and queues not fed by informers.
209+
// This provides the same function as above, but for queues that are not auto-fed by informers.
208210
// indexerWorker runs a worker thread that just dequeues items, processes them, and marks them done.
209211
// It enforces that the syncHandler is never invoked concurrently with the same key.
210212
func (o *Operator) indexerWorker(loop *QueueIndexer) {

test/e2e/operator_groups_e2e_test.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,7 @@ func TestOperatorGroup(t *testing.T) {
130130
// Verify the copied CSV transitions to FAILED
131131
// Delete CSV
132132
// Verify copied CVS is deleted
133+
defer cleaner.NotifyTestComplete(t, true)
133134

134135
c := newKubeClient(t)
135136
crc := newCRClient(t)
@@ -464,6 +465,8 @@ func TestOperatorGroupInstallModeSupport(t *testing.T) {
464465
// Update csvA to have AllNamespaces supported=true
465466
// Ensure csvA transitions to Pending
466467

468+
defer cleaner.NotifyTestComplete(t, true)
469+
467470
// Generate namespaceA and namespaceB
468471
nsA := genName("a")
469472
nsB := genName("b")
@@ -695,6 +698,8 @@ func TestOperatorGroupIntersection(t *testing.T) {
695698
// Wait for operatorGroupB to have providedAPI annotation with crdB's Kind.version.group
696699
// Wait for csvB to have a CSV with a copied status in namespace C
697700

701+
defer cleaner.NotifyTestComplete(t, true)
702+
698703
// Create a catalog for csvA, csvB, and csvD
699704
pkgA := genName("a-")
700705
pkgB := genName("b-")
@@ -927,6 +932,8 @@ func TestStaticProviderOperatorGroup(t *testing.T) {
927932
// Wait for KindA.version.group providedAPI annotation to be removed from operatorGroupC's providedAPIs annotation
928933
// Ensure KindA.version.group providedAPI annotation on operatorGroupA
929934

935+
defer cleaner.NotifyTestComplete(t, true)
936+
930937
// Create a catalog for csvA, csvB
931938
pkgA := genName("a-")
932939
pkgB := genName("b-")
@@ -1118,6 +1125,7 @@ func TestStaticProviderOperatorGroup(t *testing.T) {
11181125
// TODO: Test Subscriptions with depedencies and transitive dependencies in intersecting OperatorGroups
11191126
// TODO: Test Subscription upgrade paths with + and - providedAPIs
11201127
func TestCSVCopyWatchingAllNamespaces(t *testing.T) {
1128+
defer cleaner.NotifyTestComplete(t, true)
11211129
c := newKubeClient(t)
11221130
crc := newCRClient(t)
11231131
csvName := genName("another-csv-") // must be lowercase for DNS-1123 validation
@@ -1298,7 +1306,7 @@ func TestCSVCopyWatchingAllNamespaces(t *testing.T) {
12981306
require.NoError(t, err)
12991307
}()
13001308

1301-
err = wait.Poll(pollInterval, pollDuration, func() (bool, error) {
1309+
err = wait.Poll(pollInterval, 2*pollDuration, func() (bool, error) {
13021310
_, fetchErr := crc.OperatorsV1alpha1().ClusterServiceVersions(otherNamespaceName).Get(csvName, metav1.GetOptions{})
13031311
if fetchErr != nil {
13041312
if errors.IsNotFound(fetchErr) {

test/e2e/packagemanifest_e2e_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@ func fetchPackageManifest(t *testing.T, pmc pmversioned.Interface, namespace, na
4545
}
4646

4747
func TestPackageManifestLoading(t *testing.T) {
48+
defer cleaner.NotifyTestComplete(t, true)
49+
4850
// create a simple catalogsource
4951
packageName := genName("nginx")
5052
stableChannel := "stable"

0 commit comments

Comments
 (0)