Skip to content

Commit 42e8e34

Browse files
authored
feat(http): add graceful shutdown of http server by catching interruption signals (164)
Move http serving under its specific dir --- Add gracefully shutdown for http server
1 parent 00e4f18 commit 42e8e34

File tree

3 files changed

+114
-64
lines changed

3 files changed

+114
-64
lines changed

pkg/http/http.go

Lines changed: 51 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -1,61 +1,71 @@
11
package http
22

33
import (
4-
"bufio"
5-
"net"
4+
"context"
5+
"errors"
66
"net/http"
7+
"os"
8+
"os/signal"
9+
"syscall"
710
"time"
811

912
"k8s.io/klog/v2"
10-
)
1113

12-
func RequestMiddleware(next http.Handler) http.Handler {
13-
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
14-
start := time.Now()
14+
"github.com/manusa/kubernetes-mcp-server/pkg/mcp"
15+
)
1516

16-
lrw := &loggingResponseWriter{
17-
ResponseWriter: w,
18-
statusCode: http.StatusOK,
19-
}
17+
func Serve(ctx context.Context, mcpServer *mcp.Server, port, sseBaseUrl string) error {
18+
mux := http.NewServeMux()
19+
wrappedMux := RequestMiddleware(mux)
2020

21-
next.ServeHTTP(lrw, r)
21+
httpServer := &http.Server{
22+
Addr: ":" + port,
23+
Handler: wrappedMux,
24+
}
2225

23-
duration := time.Since(start)
24-
klog.V(5).Infof("%s %s %d %v", r.Method, r.URL.Path, lrw.statusCode, duration)
26+
sseServer := mcpServer.ServeSse(sseBaseUrl, httpServer)
27+
streamableHttpServer := mcpServer.ServeHTTP(httpServer)
28+
mux.Handle("/sse", sseServer)
29+
mux.Handle("/message", sseServer)
30+
mux.Handle("/mcp", streamableHttpServer)
31+
mux.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) {
32+
w.WriteHeader(http.StatusOK)
2533
})
26-
}
2734

28-
type loggingResponseWriter struct {
29-
http.ResponseWriter
30-
statusCode int
31-
headerWritten bool
32-
}
35+
ctx, cancel := context.WithCancel(ctx)
36+
defer cancel()
3337

34-
func (lrw *loggingResponseWriter) WriteHeader(code int) {
35-
if !lrw.headerWritten {
36-
lrw.statusCode = code
37-
lrw.headerWritten = true
38-
lrw.ResponseWriter.WriteHeader(code)
39-
}
40-
}
38+
sigChan := make(chan os.Signal, 1)
39+
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
4140

42-
func (lrw *loggingResponseWriter) Write(b []byte) (int, error) {
43-
if !lrw.headerWritten {
44-
lrw.statusCode = http.StatusOK
45-
lrw.headerWritten = true
46-
}
47-
return lrw.ResponseWriter.Write(b)
48-
}
41+
serverErr := make(chan error, 1)
42+
go func() {
43+
klog.V(0).Infof("Streaming and SSE HTTP servers starting on port %s and paths /mcp, /sse, /message", port)
44+
if err := httpServer.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
45+
serverErr <- err
46+
}
47+
}()
4948

50-
func (lrw *loggingResponseWriter) Flush() {
51-
if flusher, ok := lrw.ResponseWriter.(http.Flusher); ok {
52-
flusher.Flush()
49+
select {
50+
case sig := <-sigChan:
51+
klog.V(0).Infof("Received signal %v, initiating graceful shutdown", sig)
52+
cancel()
53+
case <-ctx.Done():
54+
klog.V(0).Infof("Context cancelled, initiating graceful shutdown")
55+
case err := <-serverErr:
56+
klog.Errorf("HTTP server error: %v", err)
57+
return err
5358
}
54-
}
5559

56-
func (lrw *loggingResponseWriter) Hijack() (net.Conn, *bufio.ReadWriter, error) {
57-
if hijacker, ok := lrw.ResponseWriter.(http.Hijacker); ok {
58-
return hijacker.Hijack()
60+
shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 10*time.Second)
61+
defer shutdownCancel()
62+
63+
klog.V(0).Infof("Shutting down HTTP server gracefully...")
64+
if err := httpServer.Shutdown(shutdownCtx); err != nil {
65+
klog.Errorf("HTTP server shutdown error: %v", err)
66+
return err
5967
}
60-
return nil, nil, http.ErrNotSupported
68+
69+
klog.V(0).Infof("HTTP server shutdown complete")
70+
return nil
6171
}

pkg/http/middleware.go

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
package http
2+
3+
import (
4+
"bufio"
5+
"net"
6+
"net/http"
7+
"time"
8+
9+
"k8s.io/klog/v2"
10+
)
11+
12+
func RequestMiddleware(next http.Handler) http.Handler {
13+
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
14+
start := time.Now()
15+
16+
lrw := &loggingResponseWriter{
17+
ResponseWriter: w,
18+
statusCode: http.StatusOK,
19+
}
20+
21+
next.ServeHTTP(lrw, r)
22+
23+
duration := time.Since(start)
24+
klog.V(5).Infof("%s %s %d %v", r.Method, r.URL.Path, lrw.statusCode, duration)
25+
})
26+
}
27+
28+
type loggingResponseWriter struct {
29+
http.ResponseWriter
30+
statusCode int
31+
headerWritten bool
32+
}
33+
34+
func (lrw *loggingResponseWriter) WriteHeader(code int) {
35+
if !lrw.headerWritten {
36+
lrw.statusCode = code
37+
lrw.headerWritten = true
38+
lrw.ResponseWriter.WriteHeader(code)
39+
}
40+
}
41+
42+
func (lrw *loggingResponseWriter) Write(b []byte) (int, error) {
43+
if !lrw.headerWritten {
44+
lrw.statusCode = http.StatusOK
45+
lrw.headerWritten = true
46+
}
47+
return lrw.ResponseWriter.Write(b)
48+
}
49+
50+
func (lrw *loggingResponseWriter) Flush() {
51+
if flusher, ok := lrw.ResponseWriter.(http.Flusher); ok {
52+
flusher.Flush()
53+
}
54+
}
55+
56+
func (lrw *loggingResponseWriter) Hijack() (net.Conn, *bufio.ReadWriter, error) {
57+
if hijacker, ok := lrw.ResponseWriter.(http.Hijacker); ok {
58+
return hijacker.Hijack()
59+
}
60+
return nil, nil, http.ErrNotSupported
61+
}

pkg/kubernetes-mcp-server/cmd/root.go

Lines changed: 2 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import (
55
"errors"
66
"flag"
77
"fmt"
8-
"net/http"
98
"strconv"
109
"strings"
1110

@@ -206,28 +205,8 @@ func (m *MCPServerOptions) Run() error {
206205
defer mcpServer.Close()
207206

208207
if m.StaticConfig.Port != "" {
209-
mux := http.NewServeMux()
210-
wrappedMux := internalhttp.RequestMiddleware(mux)
211-
212-
httpServer := &http.Server{
213-
Addr: ":" + m.StaticConfig.Port,
214-
Handler: wrappedMux,
215-
}
216-
217-
sseServer := mcpServer.ServeSse(m.SSEBaseUrl, httpServer)
218-
streamableHttpServer := mcpServer.ServeHTTP(httpServer)
219-
mux.Handle("/sse", sseServer)
220-
mux.Handle("/message", sseServer)
221-
mux.Handle("/mcp", streamableHttpServer)
222-
mux.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) {
223-
w.WriteHeader(http.StatusOK)
224-
})
225-
226-
klog.V(0).Infof("Streaming and SSE HTTP servers starting on port %s and paths /mcp, /sse, /message", m.StaticConfig.Port)
227-
if err := httpServer.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
228-
return err
229-
}
230-
return nil
208+
ctx := context.Background()
209+
return internalhttp.Serve(ctx, mcpServer, m.StaticConfig.Port, m.SSEBaseUrl)
231210
}
232211

233212
if err := mcpServer.ServeStdio(); err != nil && !errors.Is(err, context.Canceled) {

0 commit comments

Comments
 (0)