Skip to content

Commit 2cf6f06

Browse files
committed
feat(subscriptions): add subscription catalog health syncer
- Add Subscription catalog health Syncer, Reconciler, and typestates
1 parent 943145e commit 2cf6f06

File tree

7 files changed

+1785
-0
lines changed

7 files changed

+1785
-0
lines changed
Lines changed: 152 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,152 @@
1+
package subscription
2+
3+
import (
4+
"github.com/pkg/errors"
5+
"github.com/sirupsen/logrus"
6+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
7+
utilclock "k8s.io/apimachinery/pkg/util/clock"
8+
"k8s.io/client-go/tools/cache"
9+
"k8s.io/client-go/util/workqueue"
10+
11+
"github.com/operator-framework/operator-lifecycle-manager/pkg/api/client/clientset/versioned"
12+
"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/reconciler"
13+
"github.com/operator-framework/operator-lifecycle-manager/pkg/lib/kubestate"
14+
"github.com/operator-framework/operator-lifecycle-manager/pkg/lib/operatorlister"
15+
)
16+
17+
type syncerConfig struct {
18+
logger *logrus.Logger
19+
clock utilclock.Clock
20+
client versioned.Interface
21+
lister operatorlister.OperatorLister
22+
subscriptionInformer cache.SharedIndexInformer
23+
catalogInformer cache.SharedIndexInformer
24+
subscriptionQueue workqueue.RateLimitingInterface
25+
reconcilers kubestate.ReconcilerChain
26+
registryReconcilerFactory reconciler.RegistryReconcilerFactory
27+
globalCatalogNamespace string
28+
}
29+
30+
// SyncerOption is a configuration option for a subscription syncer.
31+
type SyncerOption func(*syncerConfig)
32+
33+
func defaultSyncerConfig() *syncerConfig {
34+
return &syncerConfig{
35+
logger: logrus.New(),
36+
clock: utilclock.RealClock{},
37+
reconcilers: kubestate.ReconcilerChain{},
38+
}
39+
}
40+
41+
func (s *syncerConfig) apply(options []SyncerOption) {
42+
for _, option := range options {
43+
option(s)
44+
}
45+
}
46+
47+
// WithLogger sets a syncer's logger.
48+
func WithLogger(logger *logrus.Logger) SyncerOption {
49+
return func(config *syncerConfig) {
50+
config.logger = logger
51+
}
52+
}
53+
54+
// WithClock sets a syncer's clock.
55+
func WithClock(clock utilclock.Clock) SyncerOption {
56+
return func(config *syncerConfig) {
57+
config.clock = clock
58+
}
59+
}
60+
61+
// WithClient sets a syncer's OLM client.
62+
func WithClient(client versioned.Interface) SyncerOption {
63+
return func(config *syncerConfig) {
64+
config.client = client
65+
}
66+
}
67+
68+
// WithSubscriptionInformer sets the informer a syncer will extract its subscription indexer from.
69+
func WithSubscriptionInformer(subscriptionInformer cache.SharedIndexInformer) SyncerOption {
70+
return func(config *syncerConfig) {
71+
config.subscriptionInformer = subscriptionInformer
72+
}
73+
}
74+
75+
// WithCatalogInformer sets the informer a syncer will wire dependent subscription notifications to.
76+
func WithCatalogInformer(catalogInformer cache.SharedIndexInformer) SyncerOption {
77+
return func(config *syncerConfig) {
78+
config.catalogInformer = catalogInformer
79+
}
80+
}
81+
82+
// WithOperatorLister sets a syncer's operator lister.
83+
func WithOperatorLister(lister operatorlister.OperatorLister) SyncerOption {
84+
return func(config *syncerConfig) {
85+
config.lister = lister
86+
}
87+
}
88+
89+
// WithSubscriptionQueue sets a syncer's subscription queue.
90+
func WithSubscriptionQueue(subscriptionQueue workqueue.RateLimitingInterface) SyncerOption {
91+
return func(config *syncerConfig) {
92+
config.subscriptionQueue = subscriptionQueue
93+
}
94+
}
95+
96+
// WithAppendedReconcilers adds the given reconcilers to the end of a syncer's reconciler chain, to be
97+
// invoked after its default reconcilers have been called.
98+
func WithAppendedReconcilers(reconcilers ...kubestate.Reconciler) SyncerOption {
99+
return func(config *syncerConfig) {
100+
// Add non-nil reconcilers to the chain
101+
for _, rec := range reconcilers {
102+
if rec != nil {
103+
config.reconcilers = append(config.reconcilers, rec)
104+
}
105+
}
106+
}
107+
}
108+
109+
// WithRegistryReconcilerFactory sets a syncer's registry reconciler factory.
110+
func WithRegistryReconcilerFactory(r reconciler.RegistryReconcilerFactory) SyncerOption {
111+
return func(config *syncerConfig) {
112+
config.registryReconcilerFactory = r
113+
}
114+
}
115+
116+
// WithGlobalCatalogNamespace sets a syncer's global catalog namespace.
117+
func WithGlobalCatalogNamespace(namespace string) SyncerOption {
118+
return func(config *syncerConfig) {
119+
config.globalCatalogNamespace = namespace
120+
}
121+
}
122+
123+
func newInvalidConfigError(msg string) error {
124+
return errors.Errorf("invalid subscription syncer config: %s", msg)
125+
}
126+
127+
func (s *syncerConfig) validate() (err error) {
128+
switch {
129+
case s.logger == nil:
130+
err = newInvalidConfigError("nil logger")
131+
case s.clock == nil:
132+
err = newInvalidConfigError("nil clock")
133+
case s.client == nil:
134+
err = newInvalidConfigError("nil client")
135+
case s.lister == nil:
136+
err = newInvalidConfigError("nil lister")
137+
case s.subscriptionInformer == nil:
138+
err = newInvalidConfigError("nil subscription informer")
139+
case s.catalogInformer == nil:
140+
err = newInvalidConfigError("nil catalog informer")
141+
case s.subscriptionQueue == nil:
142+
err = newInvalidConfigError("nil subscription queue")
143+
case len(s.reconcilers) == 0:
144+
err = newInvalidConfigError("no reconcilers")
145+
case s.registryReconcilerFactory == nil:
146+
err = newInvalidConfigError("nil reconciler factory")
147+
case s.globalCatalogNamespace == metav1.NamespaceAll:
148+
err = newInvalidConfigError("global catalog namespace cannot be namespace all")
149+
}
150+
151+
return
152+
}
Lines changed: 180 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,180 @@
1+
package subscription
2+
3+
import (
4+
"context"
5+
"errors"
6+
"fmt"
7+
8+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
9+
"k8s.io/apimachinery/pkg/labels"
10+
utilerrors "k8s.io/apimachinery/pkg/util/errors"
11+
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
12+
13+
"github.com/operator-framework/operator-lifecycle-manager/pkg/api/apis/operators/reference"
14+
"github.com/operator-framework/operator-lifecycle-manager/pkg/api/apis/operators/v1alpha1"
15+
"github.com/operator-framework/operator-lifecycle-manager/pkg/api/client/clientset/versioned"
16+
listers "github.com/operator-framework/operator-lifecycle-manager/pkg/api/client/listers/operators/v1alpha1"
17+
"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/reconciler"
18+
"github.com/operator-framework/operator-lifecycle-manager/pkg/lib/kubestate"
19+
"github.com/operator-framework/operator-lifecycle-manager/pkg/lib/queueinformer"
20+
)
21+
22+
// catalogHealthReconciler reconciles catalog health status for subscriptions.
23+
type catalogHealthReconciler struct {
24+
now func() *metav1.Time
25+
client versioned.Interface
26+
catalogLister listers.CatalogSourceLister
27+
registryReconcilerFactory reconciler.RegistryReconcilerFactory
28+
globalCatalogNamespace string
29+
}
30+
31+
// Reconcile reconciles subscription catalog health conditions.
32+
func (c *catalogHealthReconciler) Reconcile(ctx context.Context, in kubestate.State) (out kubestate.State, err error) {
33+
next := in
34+
var prev kubestate.State
35+
36+
// loop until this state can no longer transition
37+
for err == nil && out == nil && next != nil && !next.Terminal() && prev != next {
38+
select {
39+
case <-ctx.Done():
40+
err = errors.New("subscription catalog health reconciliation timed out")
41+
default:
42+
switch s := next.(type) {
43+
case CatalogHealthKnownState:
44+
// Target state already known, no work to do
45+
out = s
46+
case CatalogHealthState:
47+
// Gather catalog health and transition state
48+
ns := s.Subscription().GetNamespace()
49+
var catalogHealth []v1alpha1.SubscriptionCatalogHealth
50+
catalogHealth, err = c.catalogHealth(ns)
51+
if err != nil {
52+
break
53+
}
54+
55+
prev = s
56+
next, err = s.UpdateHealth(c.now(), c.client.OperatorsV1alpha1().Subscriptions(ns), catalogHealth...)
57+
case SubscriptionExistsState:
58+
if s == nil {
59+
break
60+
}
61+
if s.Subscription() == nil {
62+
break
63+
}
64+
65+
// Set up fresh state
66+
next = NewCatalogHealthState(s)
67+
default:
68+
// Ignore all other typestates
69+
utilruntime.HandleError(fmt.Errorf("unexpected subscription state in catalog health reconciler %T", next))
70+
out = s
71+
}
72+
}
73+
}
74+
75+
if prev == next {
76+
out = prev
77+
}
78+
79+
return
80+
}
81+
82+
// catalogHealth gets the health of catalogs that can affect Susbcriptions in the given namespace.
83+
// This means all catalogs in the given namespace, as well as any catalogs in the operator's global catalog namespace.
84+
func (c *catalogHealthReconciler) catalogHealth(namespace string) ([]v1alpha1.SubscriptionCatalogHealth, error) {
85+
catalogs, err := c.catalogLister.CatalogSources(namespace).List(labels.Everything())
86+
if err != nil {
87+
return nil, err
88+
}
89+
90+
if namespace != c.globalCatalogNamespace {
91+
globals, err := c.catalogLister.CatalogSources(c.globalCatalogNamespace).List(labels.Everything())
92+
if err != nil {
93+
return nil, err
94+
}
95+
96+
catalogs = append(catalogs, globals...)
97+
}
98+
99+
catalogHealth := make([]v1alpha1.SubscriptionCatalogHealth, len(catalogs))
100+
now := c.now()
101+
var errs []error
102+
for i, catalog := range catalogs {
103+
h, err := c.health(now, catalog)
104+
if err != nil {
105+
errs = append(errs, err)
106+
continue
107+
}
108+
109+
// Prevent assignment when any error has been encountered since the results will be discarded
110+
if errs == nil {
111+
catalogHealth[i] = *h
112+
}
113+
}
114+
115+
if errs != nil || len(catalogHealth) == 0 {
116+
// Assign meaningful zero value
117+
catalogHealth = nil
118+
}
119+
120+
return catalogHealth, utilerrors.NewAggregate(errs)
121+
}
122+
123+
// health returns a SusbcriptionCatalogHealth for the given catalog with the given now.
124+
func (c *catalogHealthReconciler) health(now *metav1.Time, catalog *v1alpha1.CatalogSource) (*v1alpha1.SubscriptionCatalogHealth, error) {
125+
healthy, err := c.healthy(catalog)
126+
if err != nil {
127+
return nil, err
128+
}
129+
130+
ref, err := reference.GetReference(catalog)
131+
if err != nil {
132+
return nil, err
133+
}
134+
if ref == nil {
135+
return nil, errors.New("nil reference")
136+
}
137+
138+
h := &v1alpha1.SubscriptionCatalogHealth{
139+
CatalogSourceRef: ref,
140+
// TODO: Should LastUpdated be set here, or at time of subscription update?
141+
LastUpdated: now,
142+
Healthy: healthy,
143+
}
144+
145+
return h, nil
146+
}
147+
148+
// healthy returns true if the given catalog is healthy, false otherwise, and any error encountered
149+
// while checking the catalog's registry server.
150+
func (c *catalogHealthReconciler) healthy(catalog *v1alpha1.CatalogSource) (bool, error) {
151+
return c.registryReconcilerFactory.ReconcilerForSource(catalog).CheckRegistryServer(catalog)
152+
}
153+
154+
// ReconcilerFromLegacySyncHandler returns a reconciler that invokes the given legacy sync handler and on delete funcs.
155+
// Since the reconciler does not return an updated kubestate, it MUST be the last reconciler in a given chain.
156+
func ReconcilerFromLegacySyncHandler(sync queueinformer.LegacySyncHandler, onDelete func(obj interface{})) kubestate.Reconciler {
157+
var rec kubestate.ReconcilerFunc = func(ctx context.Context, in kubestate.State) (out kubestate.State, err error) {
158+
out = in
159+
switch s := in.(type) {
160+
case SubscriptionExistsState:
161+
if sync != nil {
162+
err = sync(s.Subscription())
163+
}
164+
case SubscriptionDeletedState:
165+
if onDelete != nil {
166+
onDelete(s.Subscription())
167+
}
168+
case SubscriptionState:
169+
if sync != nil {
170+
err = sync(s.Subscription())
171+
}
172+
default:
173+
utilruntime.HandleError(fmt.Errorf("unexpected subscription state in legacy reconciler: %T", s))
174+
}
175+
176+
return
177+
}
178+
179+
return rec
180+
}

0 commit comments

Comments
 (0)