Skip to content

Commit 8b9931b

Browse files
Merge pull request #349 from wking/graceful-metrics-shutdown
pkg/cvo/metrics: Graceful server shutdown
2 parents 9b26547 + 06cb138 commit 8b9931b

File tree

6 files changed

+203
-120
lines changed

6 files changed

+203
-120
lines changed

pkg/cvo/availableupdates.go

Lines changed: 0 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package cvo
22

33
import (
44
"crypto/tls"
5-
"crypto/x509"
65
"fmt"
76
"net/url"
87
"runtime"
@@ -11,7 +10,6 @@ import (
1110
"github.com/blang/semver"
1211
"github.com/google/uuid"
1312
"k8s.io/apimachinery/pkg/api/equality"
14-
"k8s.io/apimachinery/pkg/api/errors"
1513
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1614
"k8s.io/klog"
1715

@@ -226,54 +224,3 @@ func calculateAvailableUpdatesStatus(clusterID string, proxyURL *url.URL, tlsCon
226224
LastTransitionTime: metav1.Now(),
227225
}
228226
}
229-
230-
// getHTTPSProxyURL returns a url.URL object for the configured
231-
// https proxy only. It can be nil if does not exist or there is an error.
232-
func (optr *Operator) getHTTPSProxyURL() (*url.URL, string, error) {
233-
proxy, err := optr.proxyLister.Get("cluster")
234-
235-
if errors.IsNotFound(err) {
236-
return nil, "", nil
237-
}
238-
if err != nil {
239-
return nil, "", err
240-
}
241-
242-
if &proxy.Spec != nil {
243-
if proxy.Spec.HTTPSProxy != "" {
244-
proxyURL, err := url.Parse(proxy.Spec.HTTPSProxy)
245-
if err != nil {
246-
return nil, "", err
247-
}
248-
return proxyURL, proxy.Spec.TrustedCA.Name, nil
249-
}
250-
}
251-
return nil, "", nil
252-
}
253-
254-
func (optr *Operator) getTLSConfig(cmNameRef string) (*tls.Config, error) {
255-
cm, err := optr.cmConfigLister.Get(cmNameRef)
256-
257-
if err != nil {
258-
return nil, err
259-
}
260-
261-
certPool, _ := x509.SystemCertPool()
262-
if certPool == nil {
263-
certPool = x509.NewCertPool()
264-
}
265-
266-
if cm.Data["ca-bundle.crt"] != "" {
267-
if ok := certPool.AppendCertsFromPEM([]byte(cm.Data["ca-bundle.crt"])); !ok {
268-
return nil, fmt.Errorf("unable to add ca-bundle.crt certificates")
269-
}
270-
} else {
271-
return nil, nil
272-
}
273-
274-
config := &tls.Config{
275-
RootCAs: certPool,
276-
}
277-
278-
return config, nil
279-
}

pkg/cvo/cvo.go

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,6 @@ func New(
171171
proxyInformer configinformersv1.ProxyInformer,
172172
client clientset.Interface,
173173
kubeClient kubernetes.Interface,
174-
enableMetrics bool,
175174
exclude string,
176175
) *Operator {
177176
eventBroadcaster := record.NewBroadcaster()
@@ -216,11 +215,6 @@ func New(
216215
// make sure this is initialized after all the listers are initialized
217216
optr.upgradeableChecks = optr.defaultUpgradeableChecks()
218217

219-
if enableMetrics {
220-
if err := optr.registerMetrics(coInformer.Informer()); err != nil {
221-
panic(err)
222-
}
223-
}
224218
return optr
225219
}
226220

pkg/cvo/egress.go

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
package cvo
2+
3+
import (
4+
"crypto/tls"
5+
"crypto/x509"
6+
"fmt"
7+
"net/url"
8+
9+
"k8s.io/apimachinery/pkg/api/errors"
10+
)
11+
12+
// getHTTPSProxyURL returns a url.URL object for the configured
13+
// https proxy only. It can be nil if does not exist or there is an error.
14+
func (optr *Operator) getHTTPSProxyURL() (*url.URL, string, error) {
15+
proxy, err := optr.proxyLister.Get("cluster")
16+
17+
if errors.IsNotFound(err) {
18+
return nil, "", nil
19+
}
20+
if err != nil {
21+
return nil, "", err
22+
}
23+
24+
if &proxy.Spec != nil {
25+
if proxy.Spec.HTTPSProxy != "" {
26+
proxyURL, err := url.Parse(proxy.Spec.HTTPSProxy)
27+
if err != nil {
28+
return nil, "", err
29+
}
30+
return proxyURL, proxy.Spec.TrustedCA.Name, nil
31+
}
32+
}
33+
return nil, "", nil
34+
}
35+
36+
func (optr *Operator) getTLSConfig(cmNameRef string) (*tls.Config, error) {
37+
cm, err := optr.cmConfigLister.Get(cmNameRef)
38+
39+
if err != nil {
40+
return nil, err
41+
}
42+
43+
certPool, _ := x509.SystemCertPool()
44+
if certPool == nil {
45+
certPool = x509.NewCertPool()
46+
}
47+
48+
if cm.Data["ca-bundle.crt"] != "" {
49+
if ok := certPool.AppendCertsFromPEM([]byte(cm.Data["ca-bundle.crt"])); !ok {
50+
return nil, fmt.Errorf("unable to add ca-bundle.crt certificates")
51+
}
52+
} else {
53+
return nil, nil
54+
}
55+
56+
config := &tls.Config{
57+
RootCAs: certPool,
58+
}
59+
60+
return config, nil
61+
}

pkg/cvo/metrics.go

Lines changed: 91 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,15 @@
11
package cvo
22

33
import (
4+
"context"
5+
"crypto/tls"
6+
"net"
7+
"net/http"
48
"time"
59

10+
"github.com/cockroachdb/cmux"
611
"github.com/prometheus/client_golang/prometheus"
12+
"github.com/prometheus/client_golang/prometheus/promhttp"
713
corev1 "k8s.io/api/core/v1"
814
apierrors "k8s.io/apimachinery/pkg/api/errors"
915
"k8s.io/apimachinery/pkg/labels"
@@ -16,7 +22,9 @@ import (
1622
"github.com/openshift/cluster-version-operator/pkg/internal"
1723
)
1824

19-
func (optr *Operator) registerMetrics(coInformer cache.SharedInformer) error {
25+
// RegisterMetrics initializes metrics and registers them with the
26+
// Prometheus implementation.
27+
func (optr *Operator) RegisterMetrics(coInformer cache.SharedInformer) error {
2028
m := newOperatorMetrics(optr)
2129
coInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
2230
UpdateFunc: m.clusterOperatorChanged,
@@ -92,6 +100,88 @@ version for 'cluster', or empty for 'initial'.
92100
}
93101
}
94102

103+
// RunMetrics launches an server bound to listenAddress serving
104+
// Prometheus metrics at /metrics over HTTP, and, if tlsConfig is
105+
// non-nil, also over HTTPS. Continues serving until runContext.Done()
106+
// and then attempts a clean shutdown limited by shutdownContext.Done().
107+
// Assumes runContext.Done() occurs before or simultaneously with
108+
// shutdownContext.Done().
109+
func RunMetrics(runContext context.Context, shutdownContext context.Context, listenAddress string, tlsConfig *tls.Config) error {
110+
handler := http.NewServeMux()
111+
handler.Handle("/metrics", promhttp.Handler())
112+
server := &http.Server{
113+
Handler: handler,
114+
}
115+
116+
tcpListener, err := net.Listen("tcp", listenAddress)
117+
if err != nil {
118+
return err
119+
}
120+
121+
// if a TLS connection was requested, set up a connection mux that will send TLS requests to
122+
// the TLS server but send HTTP requests to the HTTP server. Preserves the ability for legacy
123+
// HTTP, needed during upgrade, while still allowing TLS certs and end to end metrics protection.
124+
mux := cmux.New(tcpListener)
125+
126+
errorChannel := make(chan error, 1)
127+
errorChannelCount := 1
128+
129+
go func() {
130+
// match HTTP first
131+
httpListener := mux.Match(cmux.HTTP1())
132+
klog.Infof("Metrics port listening for HTTP on %v", listenAddress)
133+
errorChannel <- server.Serve(httpListener)
134+
}()
135+
136+
if tlsConfig != nil {
137+
errorChannelCount++
138+
go func() {
139+
tlsListener := tls.NewListener(mux.Match(cmux.Any()), tlsConfig)
140+
klog.Infof("Metrics port listening for HTTPS on %v", listenAddress)
141+
errorChannel <- server.Serve(tlsListener)
142+
}()
143+
}
144+
145+
errorChannelCount++
146+
go func() {
147+
errorChannel <- mux.Serve()
148+
}()
149+
150+
shutdown := false
151+
var loopError error
152+
for errorChannelCount > 0 {
153+
if shutdown {
154+
err := <-errorChannel
155+
errorChannelCount--
156+
if err != nil && err != http.ErrServerClosed && err != cmux.ErrListenerClosed {
157+
if loopError == nil {
158+
loopError = err
159+
} else if err != nil { // log the error we are discarding
160+
klog.Errorf("Failed to gracefully shut down metrics server: %s", err)
161+
}
162+
}
163+
} else {
164+
select {
165+
case <-runContext.Done(): // clean shutdown
166+
case err := <-errorChannel: // crashed before a shutdown was requested
167+
errorChannelCount--
168+
if err != nil && err != http.ErrServerClosed && err != cmux.ErrListenerClosed {
169+
loopError = err
170+
}
171+
}
172+
shutdown = true
173+
shutdownError := server.Shutdown(shutdownContext)
174+
if loopError == nil {
175+
loopError = shutdownError
176+
} else if shutdownError != nil { // log the error we are discarding
177+
klog.Errorf("Failed to gracefully shut down metrics server: %s", shutdownError)
178+
}
179+
}
180+
}
181+
182+
return loopError
183+
}
184+
95185
type conditionKey struct {
96186
Name string
97187
Type string

0 commit comments

Comments
 (0)