Skip to content

Commit 0c5e011

Browse files
committed
Receive notification of CSV(s) being watched
OLM operator watches all CSV resource(s) across all namespace(s). We need a mechanism to receive notification of CSV reconciliation events for add/update/delete. Add an interface that a caller can implement to tap into these events.
1 parent e73878d commit 0c5e011

File tree

4 files changed

+62
-1
lines changed

4 files changed

+62
-1
lines changed

pkg/controller/operators/olm/operator.go

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ type Operator struct {
6969
apiLabeler labeler.Labeler
7070
csvSetGenerator csvutility.SetGenerator
7171
csvReplaceFinder csvutility.ReplaceFinder
72+
csvNotification csvutility.WatchNotification
7273
}
7374

7475
func NewOperator(ctx context.Context, options ...OperatorOption) (*Operator, error) {
@@ -401,6 +402,14 @@ func (a *Operator) GetReplaceFinder() csvutility.ReplaceFinder {
401402
return a.csvReplaceFinder
402403
}
403404

405+
func (a *Operator) RegisterCSVWatchNotification(csvNotification csvutility.WatchNotification) {
406+
if csvNotification == nil {
407+
return
408+
}
409+
410+
a.csvNotification = csvNotification
411+
}
412+
404413
func (a *Operator) syncObject(obj interface{}) (syncError error) {
405414
// Assert as metav1.Object
406415
metaObj, ok := obj.(metav1.Object)
@@ -481,6 +490,10 @@ func (a *Operator) handleClusterServiceVersionDeletion(obj interface{}) {
481490
}
482491
}
483492

493+
if a.csvNotification != nil {
494+
a.csvNotification.OnDelete(clusterServiceVersion)
495+
}
496+
484497
logger := a.logger.WithFields(logrus.Fields{
485498
"id": queueinformer.NewLoopID(),
486499
"csv": clusterServiceVersion.GetName(),
@@ -640,6 +653,10 @@ func (a *Operator) syncClusterServiceVersion(obj interface{}) (syncError error)
640653
})
641654
logger.Debug("syncing CSV")
642655

656+
if a.csvNotification != nil {
657+
a.csvNotification.OnAddOrUpdate(clusterServiceVersion)
658+
}
659+
643660
if clusterServiceVersion.IsCopied() {
644661
logger.Debug("skipping copied csv transition, schedule for gc check")
645662
a.csvGCQueueSet.Requeue(clusterServiceVersion.GetNamespace(), clusterServiceVersion.GetName())
@@ -1201,7 +1218,7 @@ func (a *Operator) transitionCSVState(in v1alpha1.ClusterServiceVersion) (out *v
12011218
}
12021219
}
12031220
} else {
1204-
syncError = fmt.Errorf("csv marked as replacement, but no replacmenet csv found in cluster")
1221+
syncError = fmt.Errorf("CSV marked as replacement, but no replacement CSV found in cluster.")
12051222
}
12061223
case v1alpha1.CSVPhaseDeleting:
12071224
syncError = a.client.OperatorsV1alpha1().ClusterServiceVersions(out.GetNamespace()).Delete(out.GetName(), metav1.NewDeleteOptions(0))

pkg/lib/csv/csvset.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ func NewSetGenerator(logger *logrus.Logger, lister operatorlister.OperatorLister
2222
// CSV name; if metav1.NamespaceAll gets the set across all namespaces
2323
type SetGenerator interface {
2424
WithNamespace(namespace string, phase v1alpha1.ClusterServiceVersionPhase) map[string]*v1alpha1.ClusterServiceVersion
25+
WithNamespaceAndLabels(namespace string, phase v1alpha1.ClusterServiceVersionPhase, selector labels.Selector) map[string]*v1alpha1.ClusterServiceVersion
2526
}
2627

2728
type csvSet struct {
@@ -35,6 +36,12 @@ func (s *csvSet) WithNamespace(namespace string, phase v1alpha1.ClusterServiceVe
3536
return s.with(namespace, phase, labels.Everything())
3637
}
3738

39+
// WithNamespaceAndLabels returns all ClusterServiceVersion resource(s) that
40+
// matches the specified phase and label selector from a given namespace.
41+
func (s *csvSet) WithNamespaceAndLabels(namespace string, phase v1alpha1.ClusterServiceVersionPhase, selector labels.Selector) map[string]*v1alpha1.ClusterServiceVersion {
42+
return s.with(namespace, phase, selector)
43+
}
44+
3845
func (s *csvSet) with(namespace string, phase v1alpha1.ClusterServiceVersionPhase, selector labels.Selector) map[string]*v1alpha1.ClusterServiceVersion {
3946
csvsInNamespace, err := s.lister.OperatorsV1alpha1().ClusterServiceVersionLister().ClusterServiceVersions(namespace).List(selector)
4047

pkg/lib/csv/notification.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
package csv
2+
3+
import (
4+
"github.com/operator-framework/operator-lifecycle-manager/pkg/api/apis/operators/v1alpha1"
5+
)
6+
7+
// WatchNotification is an sink interface that can be used to get notification
8+
// of CSV reconciliation request(s) received by the operator.
9+
type WatchNotification interface {
10+
// OnAddOrUpdate is invoked when a add or update reconciliation request has
11+
// been received by the operator.
12+
OnAddOrUpdate(in *v1alpha1.ClusterServiceVersion)
13+
14+
// OnDelete is invoked when a delete reconciliation request has
15+
// been received by the operator.
16+
OnDelete(in *v1alpha1.ClusterServiceVersion)
17+
}

pkg/lib/csv/replace_finder.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ func NewReplaceFinder(logger *logrus.Logger, client versioned.Interface) Replace
2020
type ReplaceFinder interface {
2121
IsBeingReplaced(in *v1alpha1.ClusterServiceVersion, csvsInNamespace map[string]*v1alpha1.ClusterServiceVersion) (replacedBy *v1alpha1.ClusterServiceVersion)
2222
IsReplacing(in *v1alpha1.ClusterServiceVersion) *v1alpha1.ClusterServiceVersion
23+
GetFinalCSVInReplacing(in *v1alpha1.ClusterServiceVersion, csvsInNamespace map[string]*v1alpha1.ClusterServiceVersion) (replacedBy *v1alpha1.ClusterServiceVersion)
2324
}
2425

2526
type replace struct {
@@ -68,3 +69,22 @@ func (r *replace) IsReplacing(in *v1alpha1.ClusterServiceVersion) *v1alpha1.Clus
6869

6970
return previous
7071
}
72+
73+
// GetFinalCSVInReplacing returns the most recent ClustererviceVersion that is
74+
// in the replace chain.
75+
//
76+
// If the corresponding ClusterServiceVersion is not found nil is returned.
77+
func (r *replace) GetFinalCSVInReplacing(in *v1alpha1.ClusterServiceVersion, csvsInNamespace map[string]*v1alpha1.ClusterServiceVersion) (replacedBy *v1alpha1.ClusterServiceVersion) {
78+
current := in
79+
for {
80+
next := r.IsBeingReplaced(current, csvsInNamespace)
81+
if next == nil {
82+
break
83+
}
84+
85+
replacedBy = next
86+
current = next
87+
}
88+
89+
return
90+
}

0 commit comments

Comments
 (0)