Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -527,6 +527,12 @@ func (n *Node) stopRPC() {
n.ws.stop()
n.httpAuth.stop()
n.wsAuth.stop()

n.http.wait()
n.ws.wait()
n.httpAuth.wait()
n.wsAuth.wait()

n.ipc.stop()
n.stopInProc()
}
Expand Down
45 changes: 39 additions & 6 deletions node/rpcstack.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,12 +70,13 @@ type httpServer struct {
timeouts rpc.HTTPTimeouts
mux http.ServeMux // registered handlers go here

mu sync.Mutex
server *http.Server
listener net.Listener // non-nil when server is running
mu sync.Mutex
server *http.Server
listener net.Listener // non-nil when server is running
ready bool
shutdownWG sync.WaitGroup // WG to wait for shutdown

// HTTP RPC handler things.

httpConfig httpConfig
httpHandler atomic.Value // *rpcHandler

Expand All @@ -93,6 +94,9 @@ type httpServer struct {

const (
shutdownTimeout = 5 * time.Second
// give pending requests stopPendingRequestTimeout the time to finish when the server is stopped
// if readiness probe period is 5 seconds, this is enough time for health check to be triggered
stopPendingRequestTimeout = 7 * time.Second
)

func newHTTPServer(log log.Logger, timeouts rpc.HTTPTimeouts) *httpServer {
Expand Down Expand Up @@ -167,6 +171,7 @@ func (h *httpServer) start() error {
}
h.log.Info("WebSocket enabled", "url", url)
}
h.ready = true
// if server is websocket only, return after logging
if !h.rpcAllowed() {
return nil
Expand Down Expand Up @@ -197,6 +202,22 @@ func (h *httpServer) start() error {
}

func (h *httpServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// server health probe endpoints
if r.Method == http.MethodGet {
// readiness probe fails during shutdown
if r.URL.Path == "/readyz" {
if h.ready {
w.WriteHeader(http.StatusNoContent)
} else {
w.WriteHeader(http.StatusServiceUnavailable)
}
return
// liveness probe always succeeds
} else if r.URL.Path == "/livez" {
w.WriteHeader(http.StatusNoContent)
return
}
}
// check if ws request and serve if ws enabled
ws := h.wsHandler.Load().(*rpcHandler)
if ws != nil && isWebsocket(r) {
Expand Down Expand Up @@ -257,8 +278,20 @@ func validatePrefix(what, path string) error {
// stop shuts down the HTTP server.
func (h *httpServer) stop() {
h.mu.Lock()
defer h.mu.Unlock()
h.doStop()
// unit test executes stop multiple times, so we cannot increment the WG in the start method
h.shutdownWG = sync.WaitGroup{}
h.shutdownWG.Add(1)
h.ready = false
time.AfterFunc(stopPendingRequestTimeout, func() {
defer h.mu.Unlock()
h.doStop()
h.shutdownWG.Done()
})
}

// wait waits for the server to shutdown.
func (h *httpServer) wait() {
h.shutdownWG.Wait()
}

func (h *httpServer) doStop() {
Expand Down