Skip to content

Commit 6a52c02

Browse files
committed
Don't block on ctx.Done() if startup fails.
Operator startup errors aren't recoverable without restarting the process, so it's critical that the operator process is allowed to terminate in these cases.
1 parent 34f3888 commit 6a52c02

File tree

2 files changed

+21
-12
lines changed

2 files changed

+21
-12
lines changed

pkg/lib/queueinformer/queueinformer_operator.go

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -169,13 +169,16 @@ func (o *operator) Run(ctx context.Context) {
169169
close(o.atLevel)
170170
close(o.done)
171171
}()
172-
o.start(ctx)
172+
if err := o.start(ctx); err != nil {
173+
o.logger.WithError(err).Error("error encountered during startup")
174+
return
175+
}
173176
<-ctx.Done()
174177
}()
175178
})
176179
}
177180

178-
func (o *operator) start(ctx context.Context) {
181+
func (o *operator) start(ctx context.Context) error {
179182
defer close(o.ready)
180183

181184
// goroutine will be unnecessary after https://github.com/kubernetes/enhancements/pull/1503
@@ -197,21 +200,19 @@ func (o *operator) start(ctx context.Context) {
197200
select {
198201
case err := <-errs:
199202
if err != nil {
200-
o.logger.Infof("operator not ready: %s", err.Error())
201-
return
203+
return fmt.Errorf("operator not ready: %s", err.Error())
202204
}
203205
o.logger.Info("operator ready")
204206
case <-ctx.Done():
205-
return
207+
return nil
206208
}
207209

208210
o.logger.Info("starting informers...")
209211
o.RunInformers(ctx)
210212

211213
o.logger.Info("waiting for caches to sync...")
212214
if ok := cache.WaitForCacheSync(ctx.Done(), o.hasSynced); !ok {
213-
o.logger.Info("failed to wait for caches to sync")
214-
return
215+
return fmt.Errorf("failed to wait for caches to sync")
215216
}
216217

217218
o.logger.Info("starting workers...")
@@ -220,6 +221,8 @@ func (o *operator) start(ctx context.Context) {
220221
go o.worker(ctx, queueInformer)
221222
}
222223
}
224+
225+
return nil
223226
}
224227

225228
// worker runs a worker thread that just dequeues items, processes them, and marks them done.

pkg/lib/queueinformer/queueinformer_operator_test.go

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ func (f versionFunc) ServerVersion() (*version.Info, error) {
1818
return (func() (*version.Info, error))(f)()
1919
}
2020

21-
func TestOperatorRunReadyChannelClosed(t *testing.T) {
21+
func TestOperatorRunChannelClosure(t *testing.T) {
2222
for _, tc := range []struct {
2323
name string
2424
// set up the operator under test and return a cleanup func to be invoked when the test completes
@@ -78,10 +78,16 @@ func TestOperatorRunReadyChannelClosed(t *testing.T) {
7878

7979
o.Run(ctx)
8080

81-
select {
82-
case <-o.Ready():
83-
case <-time.After(time.Second):
84-
t.Error("timed out before ready channel closed")
81+
timeout := time.After(time.Second)
82+
for n, ch := range map[string]<-chan struct{}{
83+
"ready": o.Ready(),
84+
"done": o.Done(),
85+
} {
86+
select {
87+
case <-ch:
88+
case <-timeout:
89+
t.Errorf("timed out before %s channel closed", n)
90+
}
8591
}
8692
})
8793
}

0 commit comments

Comments
 (0)