From 100cd6399dc7e84ecf00b02cb2e46278a3a8b6c1 Mon Sep 17 00:00:00 2001 From: Eugene Aleynikov Date: Tue, 15 Apr 2025 18:51:34 -0700 Subject: [PATCH 1/4] Better shutdown logic --- node/node.go | 6 ++++++ node/rpcstack.go | 36 ++++++++++++++++++++++++++++++++---- 2 files changed, 38 insertions(+), 4 deletions(-) diff --git a/node/node.go b/node/node.go index 24905c128e..5117966748 100644 --- a/node/node.go +++ b/node/node.go @@ -527,6 +527,12 @@ func (n *Node) stopRPC() { n.ws.stop() n.httpAuth.stop() n.wsAuth.stop() + + n.http.shutdownWait() + n.ws.shutdownWait() + n.httpAuth.shutdownWait() + n.wsAuth.shutdownWait() + n.ipc.stop() n.stopInProc() } diff --git a/node/rpcstack.go b/node/rpcstack.go index 6d3828ec2b..98a2dfebc9 100644 --- a/node/rpcstack.go +++ b/node/rpcstack.go @@ -73,9 +73,9 @@ type httpServer struct { mu sync.Mutex server *http.Server listener net.Listener // non-nil when server is running + ready bool // HTTP RPC handler things. - httpConfig httpConfig httpHandler atomic.Value // *rpcHandler @@ -93,6 +93,9 @@ type httpServer struct { const ( shutdownTimeout = 5 * time.Second + // give pending requests stopPendingRequestTimeout the time to finish when the server is stopped + // if readiness probe period is 5 seconds, this is enough time for health check to be triggered + stopPendingRequestTimeout = 6 * time.Second ) func newHTTPServer(log log.Logger, timeouts rpc.HTTPTimeouts) *httpServer { @@ -178,6 +181,7 @@ func (h *httpServer) start() error { "cors", strings.Join(h.httpConfig.CorsAllowedOrigins, ","), "vhosts", strings.Join(h.httpConfig.Vhosts, ","), ) + h.ready = true // Log all handlers mounted on server. var paths []string @@ -197,6 +201,20 @@ func (h *httpServer) start() error { } func (h *httpServer) ServeHTTP(w http.ResponseWriter, r *http.Request) { + // Permit dumb empty requests for remote health-checks (AWS) + if r.Method == http.MethodGet && r.ContentLength == 0 && r.URL.RawQuery == "" { + if r.URL.Path == "/readyz" { + if h.ready { + w.WriteHeader(http.StatusNoContent) + } else { + w.WriteHeader(http.StatusServiceUnavailable) + } + return + } else if r.URL.Path == "/livez" { + w.WriteHeader(http.StatusNoContent) + return + } + } // check if ws request and serve if ws enabled ws := h.wsHandler.Load().(*rpcHandler) if ws != nil && isWebsocket(r) { @@ -256,9 +274,19 @@ func validatePrefix(what, path string) error { // stop shuts down the HTTP server. func (h *httpServer) stop() { - h.mu.Lock() - defer h.mu.Unlock() - h.doStop() + h.ready = false + time.AfterFunc(stopPendingRequestTimeout, func() { + h.mu.Lock() + defer h.mu.Unlock() + h.doStop() + }) +} + +// ShutdownWait waits for the server to shutdown. +func (h *httpServer) shutdownWait() { + for h.listener != nil { + time.Sleep(100 * time.Millisecond) + } } func (h *httpServer) doStop() { From 2b883bf86637145f8bb17bea65667003fd435138 Mon Sep 17 00:00:00 2001 From: Eugene Aleynikov Date: Wed, 16 Apr 2025 11:12:45 -0700 Subject: [PATCH 2/4] Fix unit tests --- node/node.go | 8 ++++---- node/rpcstack.go | 19 ++++++++++++++----- 2 files changed, 18 insertions(+), 9 deletions(-) diff --git a/node/node.go b/node/node.go index 5117966748..12b69fc2fa 100644 --- a/node/node.go +++ b/node/node.go @@ -528,10 +528,10 @@ func (n *Node) stopRPC() { n.httpAuth.stop() n.wsAuth.stop() - n.http.shutdownWait() - n.ws.shutdownWait() - n.httpAuth.shutdownWait() - n.wsAuth.shutdownWait() + n.http.wait() + n.ws.wait() + n.httpAuth.wait() + n.wsAuth.wait() n.ipc.stop() n.stopInProc() diff --git a/node/rpcstack.go b/node/rpcstack.go index 98a2dfebc9..a2af1fbd80 100644 --- a/node/rpcstack.go +++ b/node/rpcstack.go @@ -74,6 +74,7 @@ type httpServer struct { server *http.Server listener net.Listener // non-nil when server is running ready bool + shutdown chan struct{} // Channel to wait for termination notifications // HTTP RPC handler things. httpConfig httpConfig @@ -140,6 +141,7 @@ func (h *httpServer) start() error { if h.endpoint == "" || h.listener != nil { return nil // already running or not configured } + h.shutdown = make(chan struct{}) // Initialize the server. h.server = &http.Server{Handler: h} @@ -170,6 +172,7 @@ func (h *httpServer) start() error { } h.log.Info("WebSocket enabled", "url", url) } + h.ready = true // if server is websocket only, return after logging if !h.rpcAllowed() { return nil @@ -181,7 +184,6 @@ func (h *httpServer) start() error { "cors", strings.Join(h.httpConfig.CorsAllowedOrigins, ","), "vhosts", strings.Join(h.httpConfig.Vhosts, ","), ) - h.ready = true // Log all handlers mounted on server. var paths []string @@ -274,18 +276,25 @@ func validatePrefix(what, path string) error { // stop shuts down the HTTP server. func (h *httpServer) stop() { + h.mu.Lock() h.ready = false time.AfterFunc(stopPendingRequestTimeout, func() { - h.mu.Lock() defer h.mu.Unlock() h.doStop() + if h.shutdown != nil { + func() { + // Avoid panic when closing closed channel + defer func() { recover() }() + close(h.shutdown) + }() + } }) } // ShutdownWait waits for the server to shutdown. -func (h *httpServer) shutdownWait() { - for h.listener != nil { - time.Sleep(100 * time.Millisecond) +func (h *httpServer) wait() { + if h.shutdown != nil { + <-h.shutdown } } From 31cdd0d806e14a1bfd5d67b411e1d27642329fce Mon Sep 17 00:00:00 2001 From: Eugene Aleynikov Date: Wed, 16 Apr 2025 11:33:45 -0700 Subject: [PATCH 3/4] Increase timeout from 6 to 7 seconds per design document --- node/rpcstack.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/node/rpcstack.go b/node/rpcstack.go index a2af1fbd80..a8ffa09a9e 100644 --- a/node/rpcstack.go +++ b/node/rpcstack.go @@ -96,7 +96,7 @@ const ( shutdownTimeout = 5 * time.Second // give pending requests stopPendingRequestTimeout the time to finish when the server is stopped // if readiness probe period is 5 seconds, this is enough time for health check to be triggered - stopPendingRequestTimeout = 6 * time.Second + stopPendingRequestTimeout = 7 * time.Second ) func newHTTPServer(log log.Logger, timeouts rpc.HTTPTimeouts) *httpServer { From be87548af2ad62d93ab74adc5c832277e18ff070 Mon Sep 17 00:00:00 2001 From: Eugene Aleynikov Date: Mon, 21 Apr 2025 17:46:03 -0700 Subject: [PATCH 4/4] Address review comments --- node/rpcstack.go | 34 +++++++++++++++------------------- 1 file changed, 15 insertions(+), 19 deletions(-) diff --git a/node/rpcstack.go b/node/rpcstack.go index a8ffa09a9e..39997a6164 100644 --- a/node/rpcstack.go +++ b/node/rpcstack.go @@ -70,11 +70,11 @@ type httpServer struct { timeouts rpc.HTTPTimeouts mux http.ServeMux // registered handlers go here - mu sync.Mutex - server *http.Server - listener net.Listener // non-nil when server is running - ready bool - shutdown chan struct{} // Channel to wait for termination notifications + mu sync.Mutex + server *http.Server + listener net.Listener // non-nil when server is running + ready bool + shutdownWG sync.WaitGroup // WG to wait for shutdown // HTTP RPC handler things. httpConfig httpConfig @@ -141,7 +141,6 @@ func (h *httpServer) start() error { if h.endpoint == "" || h.listener != nil { return nil // already running or not configured } - h.shutdown = make(chan struct{}) // Initialize the server. h.server = &http.Server{Handler: h} @@ -203,8 +202,9 @@ func (h *httpServer) start() error { } func (h *httpServer) ServeHTTP(w http.ResponseWriter, r *http.Request) { - // Permit dumb empty requests for remote health-checks (AWS) - if r.Method == http.MethodGet && r.ContentLength == 0 && r.URL.RawQuery == "" { + // server health probe endpoints + if r.Method == http.MethodGet { + // readiness probe fails during shutdown if r.URL.Path == "/readyz" { if h.ready { w.WriteHeader(http.StatusNoContent) @@ -212,6 +212,7 @@ func (h *httpServer) ServeHTTP(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusServiceUnavailable) } return + // liveness probe always succeeds } else if r.URL.Path == "/livez" { w.WriteHeader(http.StatusNoContent) return @@ -277,25 +278,20 @@ func validatePrefix(what, path string) error { // stop shuts down the HTTP server. func (h *httpServer) stop() { h.mu.Lock() + // unit test executes stop multiple times, so we cannot increment the WG in the start method + h.shutdownWG = sync.WaitGroup{} + h.shutdownWG.Add(1) h.ready = false time.AfterFunc(stopPendingRequestTimeout, func() { defer h.mu.Unlock() h.doStop() - if h.shutdown != nil { - func() { - // Avoid panic when closing closed channel - defer func() { recover() }() - close(h.shutdown) - }() - } + h.shutdownWG.Done() }) } -// ShutdownWait waits for the server to shutdown. +// wait waits for the server to shutdown. func (h *httpServer) wait() { - if h.shutdown != nil { - <-h.shutdown - } + h.shutdownWG.Wait() } func (h *httpServer) doStop() {