Skip to content

Commit ffb5c8b

Browse files
committed
Fix cases where an operator's ready channel may never close.
In at least three error/cancellation scenarios, an operator would finish running without closing its ready channel. Users (i.e. olm, catalog, and package-server) block first on <-o.Ready() and then on <-o.Done(), so if the operator terminates without closing its ready channel, they will block indefinitely, which prevents recovery via automatic restarts.
1 parent 8e442f2 commit ffb5c8b

File tree

3 files changed

+121
-28
lines changed

3 files changed

+121
-28
lines changed

pkg/lib/queueinformer/config.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,7 @@ func WithSyncer(syncer kubestate.Syncer) Option {
161161
}
162162

163163
type operatorConfig struct {
164-
discovery discovery.DiscoveryInterface
164+
serverVersion discovery.ServerVersionInterface
165165
queueInformers []*QueueInformer
166166
informers []cache.SharedIndexInformer
167167
logger *logrus.Logger
@@ -217,7 +217,7 @@ func WithNumWorkers(numWorkers int) OperatorOption {
217217
// validate returns an error if the config isn't valid.
218218
func (c *operatorConfig) validate() (err error) {
219219
switch config := c; {
220-
case config.discovery == nil:
220+
case config.serverVersion == nil:
221221
err = newInvalidOperatorConfigError("discovery client nil")
222222
case config.numWorkers < 1:
223223
err = newInvalidOperatorConfigError("must specify at least one worker per queue")

pkg/lib/queueinformer/queueinformer_operator.go

Lines changed: 31 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ type Operator interface {
4646
}
4747

4848
type operator struct {
49-
discovery discovery.DiscoveryInterface
49+
serverVersion discovery.ServerVersionInterface
5050
queueInformers []*QueueInformer
5151
informers []cache.SharedIndexInformer
5252
hasSynced cache.InformerSynced
@@ -161,26 +161,34 @@ func (o *operator) RunInformers(ctx context.Context) {
161161
// Run starts the operator's control loops.
162162
func (o *operator) Run(ctx context.Context) {
163163
o.reconcileOnce.Do(func() {
164-
go o.run(ctx)
164+
go func() {
165+
defer func() {
166+
for _, queueInformer := range o.queueInformers {
167+
queueInformer.queue.ShutDown()
168+
}
169+
close(o.atLevel)
170+
close(o.done)
171+
}()
172+
o.start(ctx)
173+
<-ctx.Done()
174+
}()
165175
})
166176
}
167177

168-
func (o *operator) run(ctx context.Context) {
169-
defer func() {
170-
close(o.atLevel)
171-
close(o.done)
172-
}()
173-
174-
for _, queueInformer := range o.queueInformers {
175-
defer queueInformer.queue.ShutDown()
176-
}
178+
func (o *operator) start(ctx context.Context) {
179+
defer close(o.ready)
177180

181+
// goroutine will be unnecessary after https://github.com/kubernetes/enhancements/pull/1503
178182
errs := make(chan error)
179183
go func() {
180184
defer close(errs)
181-
v, err := o.discovery.ServerVersion()
185+
v, err := o.serverVersion.ServerVersion()
182186
if err != nil {
183-
errs <- errors.Wrap(err, "communicating with server failed")
187+
select {
188+
case errs <- errors.Wrap(err, "communicating with server failed"):
189+
case <-ctx.Done():
190+
// don't block send forever on cancellation
191+
}
184192
return
185193
}
186194
o.logger.Infof("connection established. cluster-version: %v", v)
@@ -212,9 +220,6 @@ func (o *operator) run(ctx context.Context) {
212220
go o.worker(ctx, queueInformer)
213221
}
214222
}
215-
216-
close(o.ready)
217-
<-ctx.Done()
218223
}
219224

220225
// worker runs a worker thread that just dequeues items, processes them, and marks them done.
@@ -293,10 +298,10 @@ func (o *operator) processNextWorkItem(ctx context.Context, loop *QueueInformer)
293298
return true
294299
}
295300

296-
// NewOperator returns a new Operator configured to manage the cluster with the given discovery client.
297-
func NewOperator(disc discovery.DiscoveryInterface, options ...OperatorOption) (Operator, error) {
301+
// NewOperator returns a new Operator configured to manage the cluster with the given server version client.
302+
func NewOperator(sv discovery.ServerVersionInterface, options ...OperatorOption) (Operator, error) {
298303
config := defaultOperatorConfig()
299-
config.discovery = disc
304+
config.serverVersion = sv
300305
config.apply(options)
301306
if err := config.validate(); err != nil {
302307
return nil, err
@@ -306,14 +311,14 @@ func NewOperator(disc discovery.DiscoveryInterface, options ...OperatorOption) (
306311

307312
}
308313

309-
func newOperatorFromConfig(config *operatorConfig) (Operator, error) {
314+
func newOperatorFromConfig(config *operatorConfig) (*operator, error) {
310315
op := &operator{
311-
discovery: config.discovery,
312-
numWorkers: config.numWorkers,
313-
logger: config.logger,
314-
ready: make(chan struct{}),
315-
done: make(chan struct{}),
316-
atLevel: make(chan error, 25),
316+
serverVersion: config.serverVersion,
317+
numWorkers: config.numWorkers,
318+
logger: config.logger,
319+
ready: make(chan struct{}),
320+
done: make(chan struct{}),
321+
atLevel: make(chan error, 25),
317322
}
318323
op.syncCh = op.atLevel
319324

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
package queueinformer
2+
3+
import (
4+
"context"
5+
"testing"
6+
"time"
7+
8+
"github.com/pkg/errors"
9+
"k8s.io/apimachinery/pkg/version"
10+
)
11+
12+
type versionFunc func() (*version.Info, error)
13+
14+
func (f versionFunc) ServerVersion() (*version.Info, error) {
15+
if f == nil {
16+
return &version.Info{}, nil
17+
}
18+
return (func() (*version.Info, error))(f)()
19+
}
20+
21+
func TestOperatorRunReadyChannelClosed(t *testing.T) {
22+
for _, tc := range []struct {
23+
name string
24+
// set up the operator under test and return a cleanup func to be invoked when the test completes
25+
of func(cancel context.CancelFunc, o *operator) func()
26+
}{
27+
{
28+
name: "error getting server version",
29+
of: func(cancel context.CancelFunc, o *operator) func() {
30+
o.serverVersion = versionFunc(func() (*version.Info, error) {
31+
return nil, errors.New("test error")
32+
})
33+
return func() {}
34+
},
35+
},
36+
{
37+
name: "context cancelled while getting server version",
38+
of: func(cancel context.CancelFunc, o *operator) func() {
39+
done := make(chan struct{})
40+
o.serverVersion = versionFunc(func() (*version.Info, error) {
41+
defer func() {
42+
<-done
43+
}()
44+
cancel()
45+
return nil, errors.New("test error")
46+
})
47+
return func() {
48+
close(done)
49+
}
50+
},
51+
},
52+
{
53+
name: "context cancelled before cache sync",
54+
of: func(cancel context.CancelFunc, o *operator) func() {
55+
o.hasSynced = func() bool {
56+
cancel()
57+
return false
58+
}
59+
return func() {}
60+
},
61+
},
62+
} {
63+
t.Run(tc.name, func(t *testing.T) {
64+
ctx, cancel := context.WithCancel(context.Background())
65+
defer cancel()
66+
o, err := newOperatorFromConfig(defaultOperatorConfig())
67+
if err != nil {
68+
t.Fatalf("could not create operator from default config: %s", err)
69+
}
70+
o.serverVersion = versionFunc(nil)
71+
o.hasSynced = func() bool { return true }
72+
73+
done := func() {}
74+
if tc.of != nil {
75+
done = tc.of(cancel, o)
76+
}
77+
defer done()
78+
79+
o.Run(ctx)
80+
81+
select {
82+
case <-o.Ready():
83+
case <-time.After(time.Second):
84+
t.Error("timed out before ready channel closed")
85+
}
86+
})
87+
}
88+
}

0 commit comments

Comments
 (0)