Skip to content

Commit 3645fe3

Browse files
authored
Merge pull request #573 from ggriffiths/http_server_refactor
Refactor http server and register leaderelection health check
2 parents b5b7190 + 0476dce commit 3645fe3

File tree

5 files changed

+91
-70
lines changed

5 files changed

+91
-70
lines changed

cmd/snapshot-controller/main.go

Lines changed: 37 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ import (
2020
"context"
2121
"flag"
2222
"fmt"
23+
"net"
24+
"net/http"
2325
"os"
2426
"os/signal"
2527
"sync"
@@ -143,23 +145,15 @@ func main() {
143145
// Create and register metrics manager
144146
metricsManager := metrics.NewMetricsManager()
145147
wg := &sync.WaitGroup{}
146-
wg.Add(1)
148+
149+
mux := http.NewServeMux()
147150
if *httpEndpoint != "" {
148-
srv, err := metricsManager.StartMetricsEndpoint(*metricsPath, *httpEndpoint, promklog{}, wg)
151+
err := metricsManager.PrepareMetricsPath(mux, *metricsPath, promklog{})
149152
if err != nil {
150-
klog.Errorf("Failed to start metrics server: %s", err.Error())
153+
klog.Errorf("Failed to prepare metrics path: %s", err.Error())
151154
os.Exit(1)
152155
}
153-
defer func() {
154-
err := srv.Shutdown(context.Background())
155-
if err != nil {
156-
klog.Errorf("Failed to shutdown metrics server: %s", err.Error())
157-
}
158-
159-
klog.Infof("Metrics server successfully shutdown")
160-
wg.Done()
161-
}()
162-
klog.Infof("Metrics server successfully started on %s, %s", *httpEndpoint, *metricsPath)
156+
klog.Infof("Metrics path successfully registered at %s", *metricsPath)
163157
}
164158

165159
// Add Snapshot types to the default Kubernetes so events can be logged for them
@@ -199,6 +193,32 @@ func main() {
199193
close(stopCh)
200194
}
201195

196+
// start listening & serving http endpoint if set
197+
if *httpEndpoint != "" {
198+
l, err := net.Listen("tcp", *httpEndpoint)
199+
if err != nil {
200+
klog.Fatalf("failed to listen on address[%s], error[%v]", *httpEndpoint, err)
201+
}
202+
srv := &http.Server{Addr: l.Addr().String(), Handler: mux}
203+
go func() {
204+
defer wg.Done()
205+
if err := srv.Serve(l); err != http.ErrServerClosed {
206+
klog.Fatalf("failed to start endpoint at:%s/%s, error: %v", *httpEndpoint, *metricsPath, err)
207+
}
208+
}()
209+
klog.Infof("Metrics http server successfully started on %s, %s", *httpEndpoint, *metricsPath)
210+
211+
defer func() {
212+
err := srv.Shutdown(context.Background())
213+
if err != nil {
214+
klog.Errorf("Failed to shutdown metrics server: %s", err.Error())
215+
}
216+
217+
klog.Infof("Metrics server successfully shutdown")
218+
wg.Done()
219+
}()
220+
}
221+
202222
if !*leaderElection {
203223
run(context.TODO())
204224
} else {
@@ -210,6 +230,10 @@ func main() {
210230
klog.Fatalf("failed to create leaderelection client: %v", err)
211231
}
212232
le := leaderelection.NewLeaderElection(leClientset, lockName, run)
233+
if *httpEndpoint != "" {
234+
le.PrepareHealthCheck(mux, leaderelection.DefaultHealthCheckTimeout)
235+
}
236+
213237
if *leaderElectionNamespace != "" {
214238
le.WithNamespace(*leaderElectionNamespace)
215239
}

pkg/common-controller/framework_test.go

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ package common_controller
1919
import (
2020
"errors"
2121
"fmt"
22-
"k8s.io/client-go/util/workqueue"
22+
"net/http"
2323
"reflect"
2424
sysruntime "runtime"
2525
"strconv"
@@ -29,6 +29,8 @@ import (
2929
"testing"
3030
"time"
3131

32+
"k8s.io/client-go/util/workqueue"
33+
3234
crdv1 "github.com/kubernetes-csi/external-snapshotter/client/v4/apis/volumesnapshot/v1"
3335
clientset "github.com/kubernetes-csi/external-snapshotter/client/v4/clientset/versioned"
3436
"github.com/kubernetes-csi/external-snapshotter/client/v4/clientset/versioned/fake"
@@ -737,9 +739,14 @@ func newTestController(kubeClient kubernetes.Interface, clientset clientset.Inte
737739

738740
coreFactory := coreinformers.NewSharedInformerFactory(kubeClient, utils.NoResyncPeriodFunc())
739741
metricsManager := metrics.NewMetricsManager()
740-
wg := &sync.WaitGroup{}
741-
wg.Add(1)
742-
metricsManager.StartMetricsEndpoint("/metrics", "localhost:0", nil, wg)
742+
mux := http.NewServeMux()
743+
metricsManager.PrepareMetricsPath(mux, "/metrics", nil)
744+
go func() {
745+
err := http.ListenAndServe("localhost:0", mux)
746+
if err != nil {
747+
t.Errorf("failed to prepare metrics path: %v", err)
748+
}
749+
}()
743750

744751
ctrl := NewCSISnapshotCommonController(
745752
clientset,

pkg/metrics/metrics.go

Lines changed: 7 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -17,16 +17,13 @@ limitations under the License.
1717
package metrics
1818

1919
import (
20-
"fmt"
21-
"net"
2220
"net/http"
2321
"sync"
2422
"time"
2523

2624
"github.com/prometheus/client_golang/prometheus/promhttp"
2725
"k8s.io/apimachinery/pkg/types"
2826
k8smetrics "k8s.io/component-base/metrics"
29-
klog "k8s.io/klog/v2"
3027
)
3128

3229
const (
@@ -89,12 +86,11 @@ type OperationStatus interface {
8986
var metricBuckets = []float64{0.1, 0.25, 0.5, 1, 2.5, 5, 10, 15, 30, 60, 120, 300, 600}
9087

9188
type MetricsManager interface {
92-
// StartMetricsEndpoint starts the metrics endpoint at the specified addr/pattern for
93-
// metrics managed by this MetricsManager. It spawns a goroutine to listen to
94-
// and serve HTTP requests received on addr/pattern.
95-
// If the "pattern" is empty (i.e., ""), no endpoint will be started.
89+
// PrepareMetricsPath prepares the metrics path the specified pattern for
90+
// metrics managed by this MetricsManager.
91+
// If the "pattern" is empty (i.e., ""), it will not be registered.
9692
// An error will be returned if there is any.
97-
StartMetricsEndpoint(pattern, addr string, logger promhttp.Logger, wg *sync.WaitGroup) (*http.Server, error)
93+
PrepareMetricsPath(mux *http.ServeMux, pattern string, logger promhttp.Logger) error
9894

9995
// OperationStart takes in an operation and caches its start time.
10096
// if the operation already exists, it's an no-op.
@@ -304,31 +300,15 @@ func (opMgr *operationMetricsManager) scheduleOpsInFlightMetric() {
304300
}
305301
}
306302

307-
func (opMgr *operationMetricsManager) StartMetricsEndpoint(pattern, addr string, logger promhttp.Logger, wg *sync.WaitGroup) (*http.Server, error) {
308-
if addr == "" {
309-
return nil, fmt.Errorf("metrics endpoint will not be started as endpoint address is not specified")
310-
}
311-
// start listening
312-
l, err := net.Listen("tcp", addr)
313-
if err != nil {
314-
return nil, fmt.Errorf("failed to listen on address[%s], error[%v]", addr, err)
315-
}
316-
mux := http.NewServeMux()
303+
func (opMgr *operationMetricsManager) PrepareMetricsPath(mux *http.ServeMux, pattern string, logger promhttp.Logger) error {
317304
mux.Handle(pattern, k8smetrics.HandlerFor(
318305
opMgr.registry,
319306
k8smetrics.HandlerOpts{
320307
ErrorLog: logger,
321308
ErrorHandling: k8smetrics.ContinueOnError,
322309
}))
323-
srv := &http.Server{Addr: l.Addr().String(), Handler: mux}
324-
// start serving the endpoint
325-
go func() {
326-
defer wg.Done()
327-
if err := srv.Serve(l); err != http.ErrServerClosed {
328-
klog.Fatalf("failed to start endpoint at:%s/%s, error: %v", addr, pattern, err)
329-
}
330-
}()
331-
return srv, nil
310+
311+
return nil
332312
}
333313

334314
func (opMgr *operationMetricsManager) GetRegistry() k8smetrics.KubeRegistry {

pkg/metrics/metrics_test.go

Lines changed: 35 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"io"
2323
"io/ioutil"
2424
"log"
25+
"net"
2526
"net/http"
2627
"reflect"
2728
"sort"
@@ -60,35 +61,44 @@ func (s *fakeOpStatus) String() string {
6061
return "Unknown"
6162
}
6263

63-
func initMgr() (MetricsManager, *sync.WaitGroup, *http.Server) {
64-
wg := &sync.WaitGroup{}
65-
wg.Add(1)
64+
func initMgr() (MetricsManager, *http.Server) {
6665
mgr := NewMetricsManager()
67-
srv, err := mgr.StartMetricsEndpoint(httpPattern, addr, nil, wg)
66+
mux := http.NewServeMux()
67+
err := mgr.PrepareMetricsPath(mux, httpPattern, nil)
6868
if err != nil {
6969
log.Fatalf("failed to start serving [%v]", err)
7070
}
71-
return mgr, wg, srv
71+
l, err := net.Listen("tcp", addr)
72+
if err != nil {
73+
log.Fatalf("failed to listen on address[%s], error[%v]", addr, err)
74+
}
75+
srv := &http.Server{Addr: l.Addr().String(), Handler: mux}
76+
go func() {
77+
if err := srv.Serve(l); err != http.ErrServerClosed {
78+
log.Fatalf("failed to start endpoint at:%s/%s, error: %v", addr, httpPattern, err)
79+
}
80+
}()
81+
82+
return mgr, srv
7283
}
7384

74-
func shutdown(srv *http.Server, wg *sync.WaitGroup) {
85+
func shutdown(srv *http.Server) {
7586
if err := srv.Shutdown(context.Background()); err != nil {
7687
panic(err)
7788
}
78-
wg.Wait()
7989
}
8090

8191
func TestNew(t *testing.T) {
82-
mgr, wg, srv := initMgr()
83-
defer shutdown(srv, wg)
92+
mgr, srv := initMgr()
93+
defer shutdown(srv)
8494
if mgr == nil {
8595
t.Errorf("failed testing new")
8696
}
8797
}
8898

8999
func TestDropNonExistingOperation(t *testing.T) {
90-
mgr, wg, srv := initMgr()
91-
defer shutdown(srv, wg)
100+
mgr, srv := initMgr()
101+
defer shutdown(srv)
92102
op := OperationKey{
93103
Name: "drop-non-existing-operation-should-be-noop",
94104
ResourceID: types.UID("uid"),
@@ -97,9 +107,9 @@ func TestDropNonExistingOperation(t *testing.T) {
97107
}
98108

99109
func TestRecordMetricsForNonExistingOperation(t *testing.T) {
100-
mgr, wg, srv := initMgr()
110+
mgr, srv := initMgr()
101111
srvAddr := "http://" + srv.Addr + httpPattern
102-
defer shutdown(srv, wg)
112+
defer shutdown(srv)
103113
opKey := OperationKey{
104114
Name: "no-metrics-should-be-recorded-as-operation-did-not-start",
105115
ResourceID: types.UID("uid"),
@@ -119,9 +129,9 @@ func TestRecordMetricsForNonExistingOperation(t *testing.T) {
119129
}
120130

121131
func TestDropOperation(t *testing.T) {
122-
mgr, wg, srv := initMgr()
132+
mgr, srv := initMgr()
123133
srvAddr := "http://" + srv.Addr + httpPattern
124-
defer shutdown(srv, wg)
134+
defer shutdown(srv)
125135
opKey := OperationKey{
126136
Name: "should-have-been-dropped",
127137
ResourceID: types.UID("uid"),
@@ -176,9 +186,9 @@ snapshot_controller_operation_total_seconds_count{driver_name="driver",operation
176186
}
177187

178188
func TestUnknownStatus(t *testing.T) {
179-
mgr, wg, srv := initMgr()
189+
mgr, srv := initMgr()
180190
srvAddr := "http://" + srv.Addr + httpPattern
181-
defer shutdown(srv, wg)
191+
defer shutdown(srv)
182192
opKey := OperationKey{
183193
Name: "unknown-status-operation",
184194
ResourceID: types.UID("uid"),
@@ -214,9 +224,9 @@ snapshot_controller_operation_total_seconds_count{driver_name="driver",operation
214224
}
215225

216226
func TestRecordMetrics(t *testing.T) {
217-
mgr, wg, srv := initMgr()
227+
mgr, srv := initMgr()
218228
srvAddr := "http://" + srv.Addr + httpPattern
219-
defer shutdown(srv, wg)
229+
defer shutdown(srv)
220230
// add an operation
221231
opKey := OperationKey{
222232
Name: "op1",
@@ -284,9 +294,9 @@ snapshot_controller_operation_total_seconds_count{driver_name="driver2",operatio
284294
}
285295

286296
func TestConcurrency(t *testing.T) {
287-
mgr, wg, srv := initMgr()
297+
mgr, srv := initMgr()
288298
srvAddr := "http://" + srv.Addr + httpPattern
289-
defer shutdown(srv, wg)
299+
defer shutdown(srv)
290300
success := &fakeOpStatus{
291301
statusCode: 0,
292302
}
@@ -482,8 +492,8 @@ snapshot_controller_operation_total_seconds_count{driver_name="driver5",operatio
482492
func TestInFlightMetric(t *testing.T) {
483493
inFlightCheckInterval = time.Millisecond * 50
484494

485-
mgr, wg, srv := initMgr()
486-
defer shutdown(srv, wg)
495+
mgr, srv := initMgr()
496+
defer shutdown(srv)
487497
srvAddr := "http://" + srv.Addr + httpPattern
488498

489499
// Start first operation, should be 1
@@ -710,8 +720,8 @@ func containsMetrics(expectedMfs, gotMfs []*cmg.MetricFamily) bool {
710720
}
711721

712722
func TestProcessStartTimeMetricExist(t *testing.T) {
713-
mgr, wg, srv := initMgr()
714-
defer shutdown(srv, wg)
723+
mgr, srv := initMgr()
724+
defer shutdown(srv)
715725
metricsFamilies, err := mgr.GetRegistry().Gather()
716726
if err != nil {
717727
t.Fatalf("Error fetching metrics: %v", err)

pkg/sidecar-controller/snapshot_controller_base.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import (
2626
storagelisters "github.com/kubernetes-csi/external-snapshotter/client/v4/listers/volumesnapshot/v1"
2727
"github.com/kubernetes-csi/external-snapshotter/v4/pkg/snapshotter"
2828

29-
"k8s.io/api/core/v1"
29+
v1 "k8s.io/api/core/v1"
3030
"k8s.io/apimachinery/pkg/api/errors"
3131
"k8s.io/apimachinery/pkg/labels"
3232
"k8s.io/apimachinery/pkg/util/wait"

0 commit comments

Comments
 (0)