Skip to content

Commit 496b877

Browse files
committed
refactor(packageserver): plumb context throughout
1 parent 497e554 commit 496b877

File tree

3 files changed

+38
-37
lines changed

3 files changed

+38
-37
lines changed

pkg/package-server/provider/registry.go

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,12 @@ import (
1515
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1616
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
1717
"k8s.io/client-go/tools/cache"
18-
"k8s.io/client-go/util/workqueue"
1918
"k8s.io/kubernetes/pkg/util/labels"
2019

2120
operatorsv1alpha1 "github.com/operator-framework/operator-lifecycle-manager/pkg/api/apis/operators/v1alpha1"
2221
"github.com/operator-framework/operator-lifecycle-manager/pkg/api/client/clientset/versioned"
2322
"github.com/operator-framework/operator-lifecycle-manager/pkg/api/client/informers/externalversions"
2423
"github.com/operator-framework/operator-lifecycle-manager/pkg/lib/queueinformer"
25-
"github.com/operator-framework/operator-lifecycle-manager/pkg/metrics"
2624
"github.com/operator-framework/operator-lifecycle-manager/pkg/package-server/apis/operators"
2725
)
2826

@@ -51,7 +49,7 @@ func newRegistryClient(source *operatorsv1alpha1.CatalogSource, conn *grpc.Clien
5149

5250
// RegistryProvider aggregates several `CatalogSources` and establishes gRPC connections to their registry servers.
5351
type RegistryProvider struct {
54-
*queueinformer.Operator
52+
queueinformer.Operator
5553

5654
mu sync.RWMutex
5755
globalNamespace string
@@ -60,30 +58,32 @@ type RegistryProvider struct {
6058

6159
var _ PackageManifestProvider = &RegistryProvider{}
6260

63-
func NewRegistryProvider(crClient versioned.Interface, operator *queueinformer.Operator, wakeupInterval time.Duration, watchedNamespaces []string, globalNamespace string) *RegistryProvider {
61+
func NewRegistryProvider(ctx context.Context, crClient versioned.Interface, operator queueinformer.Operator, wakeupInterval time.Duration, watchedNamespaces []string, globalNamespace string) (*RegistryProvider, error) {
6462
p := &RegistryProvider{
6563
Operator: operator,
6664

6765
globalNamespace: globalNamespace,
6866
clients: make(map[sourceKey]registryClient),
6967
}
7068

71-
sourceHandlers := &cache.ResourceEventHandlerFuncs{
72-
DeleteFunc: p.catalogSourceDeleted,
73-
}
7469
for _, namespace := range watchedNamespaces {
75-
factory := externalversions.NewSharedInformerFactoryWithOptions(crClient, wakeupInterval, externalversions.WithNamespace(namespace))
76-
sourceInformer := factory.Operators().V1alpha1().CatalogSources()
70+
informerFactory := externalversions.NewSharedInformerFactoryWithOptions(crClient, wakeupInterval, externalversions.WithNamespace(namespace))
71+
catsrcInformer := informerFactory.Operators().V1alpha1().CatalogSources()
7772

7873
// Register queue and QueueInformer
7974
logrus.WithField("namespace", namespace).Info("watching catalogsources")
80-
queueName := fmt.Sprintf("%s/catalogsources", namespace)
81-
sourceQueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), queueName)
82-
sourceQueueInformer := queueinformer.NewInformer(sourceQueue, sourceInformer.Informer(), p.syncCatalogSource, sourceHandlers, queueName, metrics.NewMetricsNil(), logrus.New())
83-
p.RegisterQueueInformer(sourceQueueInformer)
75+
catsrcQueueInformer, err := queueinformer.NewQueueInformer(
76+
ctx,
77+
queueinformer.WithInformer(catsrcInformer.Informer()),
78+
queueinformer.WithSyncer(queueinformer.LegacySyncHandler(p.syncCatalogSource).ToSyncerWithDelete(p.catalogSourceDeleted)),
79+
)
80+
if err != nil {
81+
return nil, err
82+
}
83+
p.RegisterQueueInformer(catsrcQueueInformer)
8484
}
8585

86-
return p
86+
return p, nil
8787
}
8888

8989
func (p *RegistryProvider) getClient(key sourceKey) (registryClient, bool) {

pkg/package-server/provider/registry_test.go

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
package provider
33

44
import (
5+
"context"
56
"encoding/json"
67
"net"
78
"os"
@@ -77,19 +78,19 @@ func packageManifest(value packageValue) operators.PackageManifest {
7778
}
7879
}
7980

80-
func NewFakeRegistryProvider(clientObjs []runtime.Object, k8sObjs []runtime.Object, watchedNamespaces []string, globalNamespace string, stopCh <-chan struct{}) (*RegistryProvider, error) {
81+
func NewFakeRegistryProvider(ctx context.Context, clientObjs []runtime.Object, k8sObjs []runtime.Object, watchedNamespaces []string, globalNamespace string) (*RegistryProvider, error) {
8182
clientFake := fake.NewSimpleClientset(clientObjs...)
8283
k8sClientFake := k8sfake.NewSimpleClientset(k8sObjs...)
8384
opClientFake := operatorclient.NewClient(k8sClientFake, nil, nil)
8485

85-
operator, err := queueinformer.NewOperatorFromClient(opClientFake, logrus.StandardLogger())
86+
op, err := queueinformer.NewOperatorFromClient(opClientFake.KubernetesInterface().Discovery(), logrus.StandardLogger())
8687
if err != nil {
8788
return nil, err
8889
}
8990

90-
wakeupInterval := 5 * time.Minute
91+
resyncInterval := 5 * time.Minute
9192

92-
return NewRegistryProvider(clientFake, operator, wakeupInterval, watchedNamespaces, globalNamespace), nil
93+
return NewRegistryProvider(ctx, clientFake, op, resyncInterval, watchedNamespaces, globalNamespace)
9394
}
9495

9596
func catalogSource(name, namespace string) *operatorsv1alpha1.CatalogSource {
@@ -386,9 +387,9 @@ func TestRegistryProviderGet(t *testing.T) {
386387

387388
for _, test := range tests {
388389
t.Run(test.name, func(t *testing.T) {
389-
stopCh := make(chan struct{})
390-
defer close(stopCh)
391-
provider, err := NewFakeRegistryProvider(nil, nil, test.namespaces, test.globalNS, stopCh)
390+
ctx, cancel := context.WithCancel(context.TODO())
391+
defer cancel()
392+
provider, err := NewFakeRegistryProvider(ctx, nil, nil, test.namespaces, test.globalNS)
392393
require.NoError(t, err)
393394

394395
for _, cs := range test.catalogSources {
@@ -587,9 +588,9 @@ func TestRegistryProviderList(t *testing.T) {
587588

588589
for _, test := range tests {
589590
t.Run(test.name, func(t *testing.T) {
590-
stopCh := make(chan struct{})
591-
defer close(stopCh)
592-
provider, err := NewFakeRegistryProvider(nil, nil, test.namespaces, test.globalNS, stopCh)
591+
ctx, cancel := context.WithCancel(context.TODO())
592+
defer cancel()
593+
provider, err := NewFakeRegistryProvider(ctx, nil, nil, test.namespaces, test.globalNS)
593594
require.NoError(t, err)
594595

595596
for _, cs := range test.catalogSources {

pkg/package-server/server/server.go

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package server
22

33
import (
4+
"context"
45
"fmt"
56
"io"
67
"net"
@@ -25,12 +26,12 @@ import (
2526

2627
// NewCommandStartPackageServer provides a CLI handler for 'start master' command
2728
// with a default PackageServerOptions.
28-
func NewCommandStartPackageServer(defaults *PackageServerOptions, stopCh <-chan struct{}) *cobra.Command {
29+
func NewCommandStartPackageServer(ctx context.Context, defaults *PackageServerOptions) *cobra.Command {
2930
cmd := &cobra.Command{
3031
Short: "Launch a package API server",
3132
Long: "Launch a package API server",
3233
RunE: func(c *cobra.Command, args []string) error {
33-
if err := defaults.Run(stopCh); err != nil {
34+
if err := defaults.Run(ctx); err != nil {
3435
return err
3536
}
3637
return nil
@@ -128,7 +129,7 @@ func (o *PackageServerOptions) Config() (*apiserver.Config, error) {
128129
}, nil
129130
}
130131

131-
func (o *PackageServerOptions) Run(stopCh <-chan struct{}) error {
132+
func (o *PackageServerOptions) Run(ctx context.Context) error {
132133
if o.Debug {
133134
log.SetLevel(log.DebugLevel)
134135
}
@@ -164,12 +165,15 @@ func (o *PackageServerOptions) Run(stopCh <-chan struct{}) error {
164165
return err
165166
}
166167

167-
queueOperator, err := queueinformer.NewOperator(o.Kubeconfig, log.New())
168+
queueOperator, err := queueinformer.NewOperatorFromClient(crClient.Discovery(), log.New())
168169
if err != nil {
169170
return err
170171
}
171172

172-
sourceProvider := provider.NewRegistryProvider(crClient, queueOperator, o.WakeupInterval, o.WatchedNamespaces, o.GlobalNamespace)
173+
sourceProvider, err := provider.NewRegistryProvider(ctx, crClient, queueOperator, o.WakeupInterval, o.WatchedNamespaces, o.GlobalNamespace)
174+
if err != nil {
175+
return err
176+
}
173177
config.ProviderConfig.Provider = sourceProvider
174178

175179
// we should never need to resync, since we're not worried about missing events,
@@ -182,15 +186,11 @@ func (o *PackageServerOptions) Run(stopCh <-chan struct{}) error {
182186
return err
183187
}
184188

185-
// Ensure that provider stops after the apiserver gracefully shuts down
186-
provCh := make(chan struct{})
187-
ready, done, _ := sourceProvider.Run(provCh)
188-
<-ready
189-
190-
err = server.GenericAPIServer.PrepareRun().Run(stopCh)
191-
go func() { provCh <- struct{}{} }()
189+
sourceProvider.Run(ctx)
190+
<-sourceProvider.Ready()
192191

193-
<-done
192+
err = server.GenericAPIServer.PrepareRun().Run(ctx.Done())
193+
<-sourceProvider.Done()
194194

195195
return err
196196
}

0 commit comments

Comments
 (0)