@@ -46,7 +46,7 @@ type Operator interface {
46
46
}
47
47
48
48
type operator struct {
49
- discovery discovery.DiscoveryInterface
49
+ serverVersion discovery.ServerVersionInterface
50
50
queueInformers []* QueueInformer
51
51
informers []cache.SharedIndexInformer
52
52
hasSynced cache.InformerSynced
@@ -161,26 +161,34 @@ func (o *operator) RunInformers(ctx context.Context) {
161
161
// Run starts the operator's control loops.
162
162
func (o * operator ) Run (ctx context.Context ) {
163
163
o .reconcileOnce .Do (func () {
164
- go o .run (ctx )
164
+ go func () {
165
+ defer func () {
166
+ for _ , queueInformer := range o .queueInformers {
167
+ queueInformer .queue .ShutDown ()
168
+ }
169
+ close (o .atLevel )
170
+ close (o .done )
171
+ }()
172
+ o .start (ctx )
173
+ <- ctx .Done ()
174
+ }()
165
175
})
166
176
}
167
177
168
- func (o * operator ) run (ctx context.Context ) {
169
- defer func () {
170
- close (o .atLevel )
171
- close (o .done )
172
- }()
173
-
174
- for _ , queueInformer := range o .queueInformers {
175
- defer queueInformer .queue .ShutDown ()
176
- }
178
+ func (o * operator ) start (ctx context.Context ) {
179
+ defer close (o .ready )
177
180
181
+ // goroutine will be unnecessary after https://github.com/kubernetes/enhancements/pull/1503
178
182
errs := make (chan error )
179
183
go func () {
180
184
defer close (errs )
181
- v , err := o .discovery .ServerVersion ()
185
+ v , err := o .serverVersion .ServerVersion ()
182
186
if err != nil {
183
- errs <- errors .Wrap (err , "communicating with server failed" )
187
+ select {
188
+ case errs <- errors .Wrap (err , "communicating with server failed" ):
189
+ case <- ctx .Done ():
190
+ // don't block send forever on cancellation
191
+ }
184
192
return
185
193
}
186
194
o .logger .Infof ("connection established. cluster-version: %v" , v )
@@ -212,9 +220,6 @@ func (o *operator) run(ctx context.Context) {
212
220
go o .worker (ctx , queueInformer )
213
221
}
214
222
}
215
-
216
- close (o .ready )
217
- <- ctx .Done ()
218
223
}
219
224
220
225
// worker runs a worker thread that just dequeues items, processes them, and marks them done.
@@ -293,10 +298,10 @@ func (o *operator) processNextWorkItem(ctx context.Context, loop *QueueInformer)
293
298
return true
294
299
}
295
300
296
- // NewOperator returns a new Operator configured to manage the cluster with the given discovery client.
297
- func NewOperator (disc discovery.DiscoveryInterface , options ... OperatorOption ) (Operator , error ) {
301
+ // NewOperator returns a new Operator configured to manage the cluster with the given server version client.
302
+ func NewOperator (sv discovery.ServerVersionInterface , options ... OperatorOption ) (Operator , error ) {
298
303
config := defaultOperatorConfig ()
299
- config .discovery = disc
304
+ config .serverVersion = sv
300
305
config .apply (options )
301
306
if err := config .validate (); err != nil {
302
307
return nil , err
@@ -306,14 +311,14 @@ func NewOperator(disc discovery.DiscoveryInterface, options ...OperatorOption) (
306
311
307
312
}
308
313
309
- func newOperatorFromConfig (config * operatorConfig ) (Operator , error ) {
314
+ func newOperatorFromConfig (config * operatorConfig ) (* operator , error ) {
310
315
op := & operator {
311
- discovery : config .discovery ,
312
- numWorkers : config .numWorkers ,
313
- logger : config .logger ,
314
- ready : make (chan struct {}),
315
- done : make (chan struct {}),
316
- atLevel : make (chan error , 25 ),
316
+ serverVersion : config .serverVersion ,
317
+ numWorkers : config .numWorkers ,
318
+ logger : config .logger ,
319
+ ready : make (chan struct {}),
320
+ done : make (chan struct {}),
321
+ atLevel : make (chan error , 25 ),
317
322
}
318
323
op .syncCh = op .atLevel
319
324
0 commit comments