Skip to content

Commit 21f3b33

Browse files
ecordelltkashem
authored andcommitted
test(grpc): add basic connection event tests for source manager
1 parent f59c37a commit 21f3b33

File tree

7 files changed

+1085
-64
lines changed

7 files changed

+1085
-64
lines changed

cmd/olm/cleanup.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,10 @@ func cleanup(logger *logrus.Logger, c operatorclient.ClientInterface, crc versio
3737
logger.WithError(err).Fatal("couldn't clean previous release")
3838
}
3939

40+
if err := waitForDelete(checkClusterServiceVersion(crc, "packageserver.v0.10.1"), deleteClusterServiceVersion(crc, "packageserver.v0.10.0")); err != nil {
41+
logger.WithError(err).Fatal("couldn't clean previous release")
42+
}
43+
4044
if err := waitForDelete(checkClusterServiceVersion(crc, "packageserver.v0.9.0"), deleteClusterServiceVersion(crc, "packageserver.v0.9.0")); err != nil {
4145
logger.WithError(err).Fatal("couldn't clean previous release")
4246
}

pkg/controller/operators/catalog/operator.go

Lines changed: 21 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -136,20 +136,24 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
136136
serviceAccountQuerier: scoped.NewUserDefinedServiceAccountQuerier(logger, crClient),
137137
clientAttenuator: scoped.NewClientAttenuator(logger, config, opClient, crClient),
138138
}
139-
op.sources = grpc.NewSourceStore(logger, op.syncSourceState)
139+
op.sources = grpc.NewSourceStore(logger, 10*time.Second, 10*time.Minute, op.syncSourceState)
140140
op.reconciler = reconciler.NewRegistryReconcilerFactory(lister, opClient, configmapRegistryImage, op.now)
141141

142142
// Set up syncing for namespace-scoped resources
143143
for _, namespace := range watchedNamespaces {
144-
// Wire OLM CR informers
144+
// Wire OLM CR sharedIndexInformers
145145
crInformerFactory := externalversions.NewSharedInformerFactoryWithOptions(op.client, resyncPeriod, externalversions.WithNamespace(namespace))
146146

147147
// Wire CSVs
148148
csvInformer := crInformerFactory.Operators().V1alpha1().ClusterServiceVersions()
149149
op.lister.OperatorsV1alpha1().RegisterClusterServiceVersionLister(namespace, csvInformer.Lister())
150-
op.RegisterInformer(csvInformer.Informer())
150+
if err := op.RegisterInformer(csvInformer.Informer()); err != nil {
151+
return nil, err
152+
}
151153

152-
csvInformer.Informer().AddIndexers(cache.Indexers{index.ProvidedAPIsIndexFuncKey: index.ProvidedAPIsIndexFunc})
154+
if err := csvInformer.Informer().AddIndexers(cache.Indexers{index.ProvidedAPIsIndexFuncKey: index.ProvidedAPIsIndexFunc}); err != nil {
155+
return nil, err
156+
}
153157
csvIndexer := csvInformer.Informer().GetIndexer()
154158
op.csvProvidedAPIsIndexer[namespace] = csvIndexer
155159

@@ -228,43 +232,43 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
228232
return nil, err
229233
}
230234

231-
// Wire k8s informers
235+
// Wire k8s sharedIndexInformers
232236
k8sInformerFactory := informers.NewSharedInformerFactoryWithOptions(op.opClient.KubernetesInterface(), resyncPeriod, informers.WithNamespace(namespace))
233-
informers := []cache.SharedIndexInformer{}
237+
sharedIndexInformers := []cache.SharedIndexInformer{}
234238

235239
// Wire Roles
236240
roleInformer := k8sInformerFactory.Rbac().V1().Roles()
237241
op.lister.RbacV1().RegisterRoleLister(namespace, roleInformer.Lister())
238-
informers = append(informers, roleInformer.Informer())
242+
sharedIndexInformers = append(sharedIndexInformers, roleInformer.Informer())
239243

240244
// Wire RoleBindings
241245
roleBindingInformer := k8sInformerFactory.Rbac().V1().RoleBindings()
242246
op.lister.RbacV1().RegisterRoleBindingLister(namespace, roleBindingInformer.Lister())
243-
informers = append(informers, roleBindingInformer.Informer())
247+
sharedIndexInformers = append(sharedIndexInformers, roleBindingInformer.Informer())
244248

245249
// Wire ServiceAccounts
246250
serviceAccountInformer := k8sInformerFactory.Core().V1().ServiceAccounts()
247251
op.lister.CoreV1().RegisterServiceAccountLister(namespace, serviceAccountInformer.Lister())
248-
informers = append(informers, serviceAccountInformer.Informer())
252+
sharedIndexInformers = append(sharedIndexInformers, serviceAccountInformer.Informer())
249253

250254
// Wire Services
251255
serviceInformer := k8sInformerFactory.Core().V1().Services()
252256
op.lister.CoreV1().RegisterServiceLister(namespace, serviceInformer.Lister())
253-
informers = append(informers, serviceInformer.Informer())
257+
sharedIndexInformers = append(sharedIndexInformers, serviceInformer.Informer())
254258

255259
// Wire Pods
256260
podInformer := k8sInformerFactory.Core().V1().Pods()
257261
op.lister.CoreV1().RegisterPodLister(namespace, podInformer.Lister())
258-
informers = append(informers, podInformer.Informer())
262+
sharedIndexInformers = append(sharedIndexInformers, podInformer.Informer())
259263

260264
// Wire ConfigMaps
261265
configMapInformer := k8sInformerFactory.Core().V1().ConfigMaps()
262266
op.lister.CoreV1().RegisterConfigMapLister(namespace, configMapInformer.Lister())
263-
informers = append(informers, configMapInformer.Informer())
267+
sharedIndexInformers = append(sharedIndexInformers, configMapInformer.Informer())
264268

265269
// Generate and register QueueInformers for k8s resources
266270
k8sSyncer := queueinformer.LegacySyncHandler(op.syncObject).ToSyncerWithDelete(op.handleDeletion)
267-
for _, informer := range informers {
271+
for _, informer := range sharedIndexInformers {
268272
queueInformer, err := queueinformer.NewQueueInformer(
269273
ctx,
270274
queueinformer.WithLogger(op.logger),
@@ -294,7 +298,9 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
294298
if err != nil {
295299
return nil, err
296300
}
297-
op.RegisterQueueInformer(crdQueueInformer)
301+
if err := op.RegisterQueueInformer(crdQueueInformer); err != nil {
302+
return nil, err
303+
}
298304

299305
// Namespace sync for resolving subscriptions
300306
namespaceInformer := informers.NewSharedInformerFactory(op.opClient.KubernetesInterface(), resyncPeriod).Core().V1().Namespaces()
@@ -328,7 +334,7 @@ func (o *Operator) syncSourceState(state grpc.SourceState) {
328334

329335
switch state.State {
330336
case connectivity.Ready:
331-
o.resolveNamespace(state.Key.Namespace)
337+
o.nsResolveQueue.Add(state.Key.Namespace)
332338
default:
333339
if err := o.catsrcQueueSet.Requeue(state.Key.Namespace, state.Key.Name); err != nil {
334340
o.logger.WithError(err).Info("couldn't requeue catalogsource from catalog status change")
@@ -509,12 +515,6 @@ func (o *Operator) syncCatalogSources(obj interface{}) (syncError error) {
509515
return err
510516
}
511517

512-
logger.Debug("updating catsrc status")
513-
if _, err := o.client.OperatorsV1alpha1().CatalogSources(out.GetNamespace()).UpdateStatus(out); err != nil {
514-
return err
515-
}
516-
logger.Debug("registry server recreated")
517-
518518
if err := o.sources.Remove(sourceKey); err != nil {
519519
o.logger.WithError(err).Debug("error closing client connection")
520520
}
@@ -672,10 +672,6 @@ func (o *Operator) syncSubscriptions(obj interface{}) error {
672672
return nil
673673
}
674674

675-
func (o *Operator) resolveNamespace(namespace string) {
676-
o.nsResolveQueue.Add(namespace)
677-
}
678-
679675
func (o *Operator) nothingToUpdate(logger *logrus.Entry, sub *v1alpha1.Subscription) bool {
680676
// Only sync if catalog has been updated since last sync time
681677
if o.sourcesLastUpdate.Before(&sub.Status.LastUpdated) && sub.Status.State != v1alpha1.SubscriptionStateNone && sub.Status.State != v1alpha1.SubscriptionStateUpgradeAvailable {

pkg/controller/operators/catalog/operator_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -787,12 +787,12 @@ func NewFakeOperator(ctx context.Context, namespace string, watchedNamespaces []
787787
// 1 qps, 100 bucket size. This is only for retry speed and its only the overall factor (not per item)
788788
&workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(1), 100)},
789789
), "resolver"),
790-
resolver: &fakes.FakeResolver{},
790+
resolver: config.resolver,
791791
reconciler: config.reconciler,
792792
clientAttenuator: scoped.NewClientAttenuator(logger, &rest.Config{}, opClientFake, clientFake),
793793
serviceAccountQuerier: scoped.NewUserDefinedServiceAccountQuerier(logger, clientFake),
794794
}
795-
op.sources = grpc.NewSourceStore(config.logger, op.syncSourceState)
795+
op.sources = grpc.NewSourceStore(config.logger, 1*time.Second, 5*time.Second, op.syncSourceState)
796796
if op.reconciler == nil {
797797
op.reconciler = reconciler.NewRegistryReconcilerFactory(lister, op.opClient, "test:pod", op.now)
798798
}

pkg/controller/registry/grpc/source.go

Lines changed: 45 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -35,35 +35,42 @@ type SourceConn struct {
3535
}
3636

3737
type SourceStore struct {
38-
sources map[resolver.CatalogKey]SourceConn
39-
sourcesLock sync.RWMutex
40-
syncFn func(SourceState)
41-
logger *logrus.Logger
42-
notify chan SourceState
38+
sync.Once
39+
sources map[resolver.CatalogKey]SourceConn
40+
sourcesLock sync.RWMutex
41+
syncFn func(SourceState)
42+
logger *logrus.Logger
43+
notify chan SourceState
44+
timeout time.Duration
45+
readyTimeout time.Duration
4346
}
4447

45-
func NewSourceStore(logger *logrus.Logger, sync func(SourceState)) *SourceStore {
48+
func NewSourceStore(logger *logrus.Logger, timeout, readyTimeout time.Duration, sync func(SourceState)) *SourceStore {
4649
return &SourceStore{
47-
sources: make(map[resolver.CatalogKey]SourceConn),
48-
notify: make(chan SourceState),
49-
syncFn: sync,
50-
logger: logger,
50+
sources: make(map[resolver.CatalogKey]SourceConn),
51+
notify: make(chan SourceState),
52+
syncFn: sync,
53+
logger: logger,
54+
timeout: timeout,
55+
readyTimeout: readyTimeout,
5156
}
5257
}
5358

5459
func (s *SourceStore) Start(ctx context.Context) {
55-
s.logger.Warn("starting it")
60+
s.logger.Debug("starting source manager")
5661
go func() {
57-
for {
58-
select {
59-
case <-ctx.Done():
60-
s.logger.Warn("ending it")
61-
return
62-
case e := <-s.notify:
63-
s.logger.Warnf("Got event: %#v", e)
64-
s.syncFn(e)
62+
s.Do(func() {
63+
for {
64+
select {
65+
case <-ctx.Done():
66+
s.logger.Debug("closing source manager")
67+
return
68+
case e := <-s.notify:
69+
s.logger.Debugf("Got source event: %#v", e)
70+
s.syncFn(e)
71+
}
6572
}
66-
}
73+
})
6774
}()
6875
}
6976

@@ -123,37 +130,39 @@ func (s *SourceStore) Add(key resolver.CatalogKey, address string) (*SourceConn,
123130
return &source, nil
124131
}
125132

133+
func (s *SourceStore) stateTimeout(state connectivity.State) time.Duration {
134+
if state == connectivity.Ready {
135+
return s.readyTimeout
136+
}
137+
return s.timeout
138+
}
139+
126140
func (s *SourceStore) watch(ctx context.Context, key resolver.CatalogKey, source SourceConn) {
127141
state := source.ConnectionState
128142
for {
129143
select {
130144
case <-ctx.Done():
131145
return
132146
default:
133-
s.logger.Warnf("source state: %s", state.String())
134-
timeout := 10 * time.Second
135-
if state == connectivity.Ready {
136-
timeout = 10 * time.Minute
137-
}
138-
timer, _ := context.WithTimeout(context.Background(), timeout)
147+
timer, _ := context.WithTimeout(ctx, s.stateTimeout(state))
139148
if source.Conn.WaitForStateChange(timer, state) {
140149
newState := source.Conn.GetState()
141150
state = newState
142-
s.logger.Warnf("source state changed: %s", newState.String())
143151

144152
// update connection state
145-
if src := s.Get(key); src != nil {
146-
src.LastConnect = metav1.Now()
147-
src.ConnectionState = newState
148-
s.logger.Warnf("setting state")
149-
s.sourcesLock.Lock()
150-
s.sources[key] = *src
151-
s.sourcesLock.Unlock()
152-
s.logger.Warnf("state set")
153+
src := s.Get(key)
154+
if src == nil {
155+
// source was removed, cleanup this goroutine
156+
return
153157
}
154158

159+
src.LastConnect = metav1.Now()
160+
src.ConnectionState = newState
161+
s.sourcesLock.Lock()
162+
s.sources[key] = *src
163+
s.sourcesLock.Unlock()
164+
155165
// notify subscriber
156-
s.logger.Warnf("notify")
157166
s.notify <- SourceState{Key: key, State: newState}
158167
}
159168
}

0 commit comments

Comments
 (0)