@@ -8,18 +8,13 @@ import (
8
8
"fmt"
9
9
"io/ioutil"
10
10
"math/rand"
11
- "net"
12
- "net/http"
13
11
"os"
14
12
"os/signal"
15
13
"sync"
16
14
"syscall"
17
15
"time"
18
16
19
- "github.com/cockroachdb/cmux"
20
-
21
17
"github.com/google/uuid"
22
- "github.com/prometheus/client_golang/prometheus/promhttp"
23
18
v1 "k8s.io/api/core/v1"
24
19
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
25
20
"k8s.io/client-go/informers"
@@ -188,56 +183,23 @@ func (o *Options) makeTLSConfig() (*tls.Config, error) {
188
183
}
189
184
190
185
func (o * Options ) run (ctx context.Context , controllerCtx * Context , lock * resourcelock.ConfigMapLock ) {
191
- // listen on metrics
186
+ runContext , runCancel := context .WithCancel (ctx )
187
+ shutdownContext , shutdownCancel := context .WithCancel (ctx )
188
+ errorChannel := make (chan error , 1 )
189
+ errorChannelCount := 0
192
190
if o .ListenAddr != "" {
193
- handler := http .NewServeMux ()
194
- handler .Handle ("/metrics" , promhttp .Handler ())
195
- tcpl , err := net .Listen ("tcp" , o .ListenAddr )
196
- if err != nil {
197
- klog .Fatalf ("Listen error: %v" , err )
198
- }
199
-
200
- // if a TLS connection was requested, set up a connection mux that will send TLS requests to
201
- // the TLS server but send HTTP requests to the HTTP server. Preserves the ability for legacy
202
- // HTTP, needed during upgrade, while still allowing TLS certs and end to end metrics protection.
203
- m := cmux .New (tcpl )
204
-
205
- // match HTTP first
206
- httpl := m .Match (cmux .HTTP1 ())
207
- go func () {
208
- s := & http.Server {
209
- Handler : handler ,
210
- }
211
- if err := s .Serve (httpl ); err != cmux .ErrListenerClosed {
212
- klog .Fatalf ("HTTP serve error: %v" , err )
213
- }
214
- }()
215
-
191
+ var tlsConfig * tls.Config
216
192
if o .ServingCertFile != "" || o .ServingKeyFile != "" {
217
- tlsConfig , err := o .makeTLSConfig ()
193
+ var err error
194
+ tlsConfig , err = o .makeTLSConfig ()
218
195
if err != nil {
219
196
klog .Fatalf ("Failed to create TLS config: %v" , err )
220
197
}
221
-
222
- tlsListener := tls .NewListener (m .Match (cmux .Any ()), tlsConfig )
223
- klog .Infof ("Metrics port listening for HTTP and HTTPS on %v" , o .ListenAddr )
224
- go func () {
225
- s := & http.Server {
226
- Handler : handler ,
227
- }
228
- if err := s .Serve (tlsListener ); err != cmux .ErrListenerClosed {
229
- klog .Fatalf ("HTTPS serve error: %v" , err )
230
- }
231
- }()
232
-
233
- go func () {
234
- if err := m .Serve (); err != nil {
235
- klog .Errorf ("CMUX serve error: %v" , err )
236
- }
237
- }()
238
- } else {
239
- klog .Infof ("Metrics port listening for HTTP on %v" , o .ListenAddr )
240
198
}
199
+ errorChannelCount ++
200
+ go func () {
201
+ errorChannel <- cvo .RunMetrics (runContext , shutdownContext , o .ListenAddr , tlsConfig )
202
+ }()
241
203
}
242
204
243
205
exit := make (chan struct {})
@@ -253,9 +215,9 @@ func (o *Options) run(ctx context.Context, controllerCtx *Context, lock *resourc
253
215
RetryPeriod : retryPeriod ,
254
216
Callbacks : leaderelection.LeaderCallbacks {
255
217
OnStartedLeading : func (localCtx context.Context ) {
256
- controllerCtx .Start (ctx )
218
+ controllerCtx .Start (runContext )
257
219
select {
258
- case <- ctx .Done ():
220
+ case <- runContext .Done ():
259
221
// WARNING: this is not completely safe until we have Kube 1.14 and ReleaseOnCancel
260
222
// and client-go ContextCancelable, which allows us to block new API requests before
261
223
// we step down. However, the CVO isn't that sensitive to races and can tolerate
@@ -284,6 +246,34 @@ func (o *Options) run(ctx context.Context, controllerCtx *Context, lock *resourc
284
246
},
285
247
})
286
248
249
+ for errorChannelCount > 0 {
250
+ var shutdownTimer * time.Timer
251
+ if shutdownTimer == nil { // running
252
+ select {
253
+ case <- runContext .Done ():
254
+ shutdownTimer = time .NewTimer (2 * time .Minute )
255
+ case err := <- errorChannel :
256
+ errorChannelCount --
257
+ if err != nil {
258
+ klog .Error (err )
259
+ runCancel () // this will cause shutdownTimer initialization in the next loop
260
+ }
261
+ }
262
+ } else { // shutting down
263
+ select {
264
+ case <- shutdownTimer .C : // never triggers after the channel is stopped, although it would not matter much if it did because subsequent cancel calls do nothing.
265
+ shutdownCancel ()
266
+ shutdownTimer .Stop ()
267
+ case err := <- errorChannel :
268
+ errorChannelCount --
269
+ if err != nil {
270
+ klog .Error (err )
271
+ runCancel ()
272
+ }
273
+ }
274
+ }
275
+ }
276
+
287
277
<- exit
288
278
}
289
279
0 commit comments