Skip to content

Commit c214e58

Browse files
committed
Stop the server when the context is cancelled
Signed-off-by: Richard Wall <[email protected]>
1 parent 9db7f18 commit c214e58

File tree

1 file changed

+77
-12
lines changed

1 file changed

+77
-12
lines changed

pkg/agent/run.go

Lines changed: 77 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"errors"
88
"fmt"
99
"io"
10+
"net"
1011
"net/http"
1112
"os"
1213
"strings"
@@ -25,6 +26,7 @@ import (
2526
"k8s.io/client-go/kubernetes"
2627
clientgocorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
2728
"k8s.io/client-go/tools/record"
29+
"k8s.io/klog/v2"
2830
"sigs.k8s.io/controller-runtime/pkg/manager"
2931

3032
"github.com/jetstack/preflight/api"
@@ -50,7 +52,12 @@ const schemaVersion string = "v2.0.0"
5052
// Run starts the agent process
5153
func Run(cmd *cobra.Command, args []string) {
5254
logs.Log.Printf("Preflight agent version: %s (%s)", version.PreflightVersion, version.Commit)
53-
ctx, cancel := context.WithCancel(context.Background())
55+
ctx, cancel := context.WithCancel(
56+
klog.NewContext(
57+
context.Background(),
58+
klog.Background(),
59+
),
60+
)
5461
defer cancel()
5562

5663
file, err := os.Open(Flags.ConfigFilePath)
@@ -83,11 +90,13 @@ func Run(cmd *cobra.Command, args []string) {
8390
}
8491
}()
8592

86-
group.Go(func() error {
93+
{
8794
server := http.NewServeMux()
95+
const serverAddress = ":8081"
96+
log := klog.FromContext(ctx).WithName("APIServer").WithValues("addr", serverAddress)
8897

8998
if Flags.Profiling {
90-
logs.Log.Printf("pprof profiling was enabled.")
99+
log.Info("Profiling endpoints enabled", "path", "/debug/pprof")
91100
server.HandleFunc("/debug/pprof/", pprof.Index)
92101
server.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)
93102
server.HandleFunc("/debug/pprof/profile", pprof.Profile)
@@ -96,7 +105,7 @@ func Run(cmd *cobra.Command, args []string) {
96105
}
97106

98107
if Flags.Prometheus {
99-
logs.Log.Printf("Prometheus was enabled.\nRunning prometheus on port :8081")
108+
log.Info("Metrics endpoints enabled", "path", "/metrics")
100109
prometheus.MustRegister(metricPayloadSize)
101110
server.Handle("/metrics", promhttp.Handler())
102111
}
@@ -105,21 +114,32 @@ func Run(cmd *cobra.Command, args []string) {
105114
// what "ready" means for the agent, we just return 200 OK inconditionally.
106115
// The goal is to satisfy some Kubernetes distributions, like OpenShift,
107116
// that require a liveness and health probe to be present for each pod.
117+
log.Info("Healthz endpoints enabled", "path", "/healthz")
108118
server.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) {
109119
w.WriteHeader(http.StatusOK)
110120
})
121+
log.Info("Readyz endpoints enabled", "path", "/readyz")
111122
server.HandleFunc("/readyz", func(w http.ResponseWriter, r *http.Request) {
112123
w.WriteHeader(http.StatusOK)
113124
})
114125

115-
err := http.ListenAndServe(":8081", server)
116-
if err != nil && !errors.Is(err, http.ErrServerClosed) {
117-
return fmt.Errorf("failed to run the health check server: %s", err)
118-
}
119-
// The agent must stop if the management server stops
120-
cancel()
121-
return nil
122-
})
126+
group.Go(func() error {
127+
err := listenAndServe(
128+
klog.NewContext(gctx, log),
129+
&http.Server{
130+
Addr: serverAddress,
131+
Handler: server,
132+
BaseContext: func(_ net.Listener) context.Context {
133+
return gctx
134+
},
135+
},
136+
)
137+
if err != nil {
138+
return fmt.Errorf("APIServer: %s", err)
139+
}
140+
return nil
141+
})
142+
}
123143

124144
_, isVenConn := preflightClient.(*client.VenConnClient)
125145
if isVenConn {
@@ -412,3 +432,48 @@ func postData(config CombinedConfig, preflightClient client.Client, readings []*
412432

413433
return nil
414434
}
435+
436+
// listenAndServe starts the supplied HTTP server and stops it gracefully when
437+
// the supplied context is cancelled.
438+
// It returns when the graceful server shutdown is complete or when the server
439+
// exits with an error.
440+
// If the server fails to start, it returns the server error.
441+
// If the server fails to shutdown gracefully, it returns the shutdown error.
442+
// The server is given 3 seconds to shutdown gracefully before it is stopped
443+
// forcefully.
444+
func listenAndServe(ctx context.Context, server *http.Server) error {
445+
log := klog.FromContext(ctx).WithName("ListenAndServe")
446+
447+
log.V(1).Info("Starting")
448+
449+
listenCTX, listenCancelCause := context.WithCancelCause(context.WithoutCancel(ctx))
450+
go func() {
451+
err := server.ListenAndServe()
452+
listenCancelCause(fmt.Errorf("ListenAndServe: %s", err))
453+
}()
454+
455+
select {
456+
case <-listenCTX.Done():
457+
log.V(1).Info("Shutdown skipped", "reason", "Server already stopped")
458+
return context.Cause(listenCTX)
459+
460+
case <-ctx.Done():
461+
log.V(1).Info("Shutting down")
462+
}
463+
464+
shutdownCTX, shutdownCancel := context.WithTimeout(context.WithoutCancel(ctx), time.Second*3)
465+
shutdownErr := server.Shutdown(shutdownCTX)
466+
shutdownCancel()
467+
if shutdownErr != nil {
468+
shutdownErr = fmt.Errorf("Shutdown: %s", shutdownErr)
469+
}
470+
471+
closeErr := server.Close()
472+
if closeErr != nil {
473+
closeErr = fmt.Errorf("Close: %s", closeErr)
474+
}
475+
476+
log.V(1).Info("Shutdown complete")
477+
478+
return errors.Join(shutdownErr, closeErr)
479+
}

0 commit comments

Comments
 (0)