Skip to content

Commit f9bf450

Browse files
charlesma4jelias2claude
authored
feat(proxyd): add healthcheck and drain functionality from Uniswap fork (#534)
* feat(proxyd): add healthcheck and drain functionality from Uniswap fork * Various fixes based on PR comments * feat(proxyd): prometheus metrics for probe worker * lint * enforce nonnegative probe cfg vals * fix(proxyd): Use client TLS config for probe worker * remove insecureSkipVerify config var * Update proxyd/backend.go Co-authored-by: Jacob Elias <19310318+jelias2@users.noreply.github.com> * fixes for comments * configurable graceful shutdown duration * Cant repro test failures locally, trying adding 0s shutdown duration to all tests * fix test * fix: change graceful shutdown default to 0s and improve drain behavior (#2) Changes: - Set graceful_shutdown_seconds default to 0 (no delay) instead of 10s - Remove graceful_shutdown_seconds = 0 from all test configs (no longer needed) - Fix transport resource leak in ProbeWorker by closing connections on Stop() - Make Drain() actually reject new RPC and WebSocket requests during drain period Rationale: The healthcheck and drain functionality should be disabled by default. The previous 10-second default forced unnecessary changes to all existing test configurations. Now configs only need to set graceful_shutdown_seconds if they want to enable the drain delay. Co-authored-by: Claude Sonnet 4.5 <noreply@anthropic.com> --------- Co-authored-by: Jacob Elias <19310318+jelias2@users.noreply.github.com> Co-authored-by: Claude Sonnet 4.5 <noreply@anthropic.com>
1 parent 81a603d commit f9bf450

34 files changed

+858
-26
lines changed

proxyd/backend.go

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
"strconv"
1717
"strings"
1818
"sync"
19+
"sync/atomic"
1920
"time"
2021

2122
sw "github.com/ethereum-optimism/infra/proxyd/pkg/avg-sliding-window"
@@ -322,6 +323,11 @@ type Backend struct {
322323
maxLatencyThreshold time.Duration
323324
maxErrorRateThreshold float64
324325

326+
probeSpec *ProbeSpec
327+
probeURL string
328+
ProbeWorker *ProbeWorker
329+
healthyProbe atomic.Bool
330+
325331
latencySlidingWindow *sw.AvgSlidingWindow
326332
networkRequestsSlidingWindow *sw.AvgSlidingWindow
327333
intermittentErrorsSlidingWindow *sw.AvgSlidingWindow
@@ -474,6 +480,32 @@ func WithIntermittentNetworkErrorSlidingWindow(sw *sw.AvgSlidingWindow) BackendO
474480
}
475481
}
476482

483+
func WithProbe(probeURL string, probeFailureThreshold int, probeSuccessThreshold int, probePeriodSeconds int, probeTimeoutSeconds int) BackendOpt {
484+
return func(b *Backend) {
485+
b.probeURL = probeURL
486+
probeSpec := ProbeSpec{
487+
// default values
488+
FailureThreshold: 1,
489+
SuccessThreshold: 2,
490+
Period: 4 * time.Second,
491+
Timeout: 1 * time.Second,
492+
}
493+
if probeFailureThreshold > 0 {
494+
probeSpec.FailureThreshold = probeFailureThreshold
495+
}
496+
if probeSuccessThreshold > 0 {
497+
probeSpec.SuccessThreshold = probeSuccessThreshold
498+
}
499+
if probePeriodSeconds > 0 {
500+
probeSpec.Period = time.Duration(probePeriodSeconds) * time.Second
501+
}
502+
if probeTimeoutSeconds > 0 {
503+
probeSpec.Timeout = time.Duration(probeTimeoutSeconds) * time.Second
504+
}
505+
b.probeSpec = &probeSpec
506+
}
507+
}
508+
477509
type indexedReqRes struct {
478510
index int
479511
req *RPCReq
@@ -527,7 +559,9 @@ func NewBackend(
527559
networkRequestsSlidingWindow: sw.NewSlidingWindow(),
528560
intermittentErrorsSlidingWindow: sw.NewSlidingWindow(),
529561
allowedStatusCodes: []int{400, 413}, // Alchemy returns a 400 on bad JSONs, and Quicknode returns a 413 on too large requests
562+
530563
}
564+
backend.healthyProbe.Store(true)
531565

532566
backend.Override(opts...)
533567

@@ -890,6 +924,9 @@ func (b *Backend) doForward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool
890924

891925
// IsHealthy checks if the backend is able to serve traffic, based on dynamic parameters
892926
func (b *Backend) IsHealthy() bool {
927+
if !b.IsProbeHealthy() {
928+
return false
929+
}
893930
errorRate := b.ErrorRate()
894931
avgLatency := time.Duration(b.latencySlidingWindow.Avg())
895932
if errorRate >= b.maxErrorRateThreshold {
@@ -901,6 +938,20 @@ func (b *Backend) IsHealthy() bool {
901938
return true
902939
}
903940

941+
func (b *Backend) IsProbeHealthy() bool {
942+
if b.probeSpec == nil {
943+
return true
944+
}
945+
return b.healthyProbe.Load()
946+
}
947+
948+
func (b *Backend) SetProbeHealth(healthy bool) {
949+
if b.probeSpec == nil {
950+
return
951+
}
952+
b.healthyProbe.Store(healthy)
953+
}
954+
904955
// ErrorRate returns the instant error rate of the backend
905956
func (b *Backend) ErrorRate() (errorRate float64) {
906957
// we only really start counting the error rate after a minimum of 10 requests

proxyd/backend_probe.go

Lines changed: 210 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,210 @@
1+
// backend_probe.go implements HTTP health check probing for backend servers.
2+
//
3+
// The probe system is inspired by Kubernetes liveness/readiness probes and provides:
4+
// - Periodic HTTP health checks against a configurable endpoint
5+
// - Configurable success/failure thresholds to prevent flapping
6+
// - Async operation via a background goroutine per backend
7+
//
8+
// # Usage
9+
//
10+
// When a backend is configured with a probe_url, a ProbeWorker runs in the background,
11+
// periodically checking the endpoint. The backend is only marked unhealthy after
12+
// FailureThreshold consecutive failures, and only marked healthy after SuccessThreshold
13+
// consecutive successes. This threshold behavior prevents health status from flapping
14+
// due to transient network issues.
15+
//
16+
// # Configuration
17+
//
18+
// ProbeSpec controls the probe behavior:
19+
// - FailureThreshold: consecutive failures before marking unhealthy (default: 1)
20+
// - SuccessThreshold: consecutive successes before marking healthy (default: 2)
21+
// - Period: interval between probes (default: 4s)
22+
// - Timeout: HTTP request timeout per probe (default: 4s)
23+
//
24+
// # HTTP Probe Behavior
25+
//
26+
// The probe sends a GET request to the configured URL. Success is determined by:
27+
// - 2xx status codes: success
28+
// - 3xx (redirects), 4xx, 5xx, or connection errors: failure
29+
//
30+
// The probe uses a custom dialer with SO_LINGER set (borrowed from Kubernetes) to ensure
31+
// clean connection teardown, and disables keep-alives to get fresh connection state each probe.
32+
package proxyd
33+
34+
import (
35+
"crypto/tls"
36+
"fmt"
37+
"io"
38+
"math/rand"
39+
"net"
40+
"net/http"
41+
"net/url"
42+
"syscall"
43+
"time"
44+
)
45+
46+
type ProbeSpec struct {
47+
FailureThreshold int
48+
SuccessThreshold int
49+
Period time.Duration
50+
Timeout time.Duration
51+
}
52+
53+
// borrowed from https://github.com/kubernetes/kubernetes/blob/b53b9fb5573323484af9a19cf3f5bfe80760abba/pkg/probe/dialer_others.go#L37
54+
// probeDialer is a dialer that sets the SO_LINGER option to 1 second.
55+
func probeDialer() *net.Dialer {
56+
dialer := &net.Dialer{
57+
Control: func(network, address string, c syscall.RawConn) error {
58+
return c.Control(func(fd uintptr) {
59+
_ = syscall.SetsockoptLinger(int(fd), syscall.SOL_SOCKET, syscall.SO_LINGER, &syscall.Linger{Onoff: 1, Linger: 1})
60+
})
61+
},
62+
}
63+
return dialer
64+
}
65+
66+
var defaultTransport = http.DefaultTransport.(*http.Transport)
67+
68+
func doHTTPProbe(req *http.Request, client *http.Client) (bool, string) {
69+
res, err := client.Do(req)
70+
if err != nil {
71+
// Convert errors into failures to catch timeouts.
72+
return false, err.Error()
73+
}
74+
defer res.Body.Close()
75+
if _, err = io.ReadAll(res.Body); err != nil {
76+
return false, err.Error()
77+
}
78+
if res.StatusCode >= http.StatusOK && res.StatusCode < http.StatusBadRequest {
79+
if res.StatusCode >= http.StatusMultipleChoices { // Redirect
80+
return false, fmt.Sprintf("HTTP Probe result is a redirect: %s", res.Status)
81+
}
82+
return true, ""
83+
}
84+
return false, fmt.Sprintf("HTTP probe failed with statuscode: %d", res.StatusCode)
85+
}
86+
87+
type ProbeWorker struct {
88+
stopCh chan struct{}
89+
spec ProbeSpec
90+
transport *http.Transport
91+
req *http.Request
92+
resultHandler func(bool, string)
93+
lastResult bool
94+
resultRun int
95+
backendName string
96+
}
97+
98+
func NewProbeWorker(
99+
backendName string,
100+
probeUrl string,
101+
probeSpec ProbeSpec,
102+
resultHandler func(bool, string),
103+
tlsConfig *tls.Config,
104+
) (*ProbeWorker, error) {
105+
106+
u, err := url.Parse(probeUrl)
107+
if err != nil {
108+
return nil, err
109+
}
110+
req, err := http.NewRequest("GET", u.String(), nil)
111+
if err != nil {
112+
return nil, err
113+
}
114+
115+
req.Header = http.Header{
116+
"User-Agent": {"proxyd-probe"},
117+
"Accept": {"*/*"},
118+
}
119+
120+
// Use provided TLS config or fall back to default (secure) configuration
121+
if tlsConfig == nil {
122+
tlsConfig = &tls.Config{}
123+
}
124+
125+
transport := &http.Transport{
126+
TLSClientConfig: tlsConfig,
127+
TLSHandshakeTimeout: defaultTransport.TLSHandshakeTimeout,
128+
DisableKeepAlives: true,
129+
DisableCompression: true,
130+
DialContext: probeDialer().DialContext,
131+
IdleConnTimeout: defaultTransport.IdleConnTimeout,
132+
}
133+
134+
return &ProbeWorker{
135+
stopCh: make(chan struct{}, 1), // Buffer so stop() can be non-blocking.
136+
spec: probeSpec,
137+
resultHandler: resultHandler,
138+
transport: transport,
139+
req: req,
140+
backendName: backendName,
141+
}, nil
142+
}
143+
144+
func (w *ProbeWorker) run() {
145+
probeTickerPeriod := w.spec.Period
146+
147+
// first wait period is random to avoid simultaneous probes
148+
time.Sleep(time.Duration(rand.Float64() * float64(probeTickerPeriod)))
149+
150+
probeTicker := time.NewTicker(probeTickerPeriod)
151+
152+
defer func() {
153+
// Clean up.
154+
probeTicker.Stop()
155+
156+
}()
157+
158+
probeLoop:
159+
for {
160+
w.doProbe()
161+
// Wait for next probe tick.
162+
select {
163+
case <-w.stopCh:
164+
break probeLoop
165+
case <-probeTicker.C:
166+
// continue
167+
}
168+
}
169+
}
170+
171+
func (w *ProbeWorker) Stop() {
172+
select {
173+
case w.stopCh <- struct{}{}:
174+
default: // Non-blocking.
175+
}
176+
}
177+
178+
func (w *ProbeWorker) Start() {
179+
go w.run()
180+
}
181+
182+
func (w *ProbeWorker) doProbe() {
183+
client := &http.Client{
184+
Timeout: w.spec.Timeout,
185+
Transport: w.transport,
186+
}
187+
188+
start := time.Now()
189+
result, message := doHTTPProbe(w.req, client)
190+
duration := time.Since(start)
191+
192+
RecordBackendProbeDuration(w.backendName, duration)
193+
RecordBackendProbeCheck(w.backendName, result)
194+
195+
if w.lastResult == result {
196+
w.resultRun++
197+
} else {
198+
w.lastResult = result
199+
w.resultRun = 1
200+
}
201+
202+
if (!result && w.resultRun < int(w.spec.FailureThreshold)) ||
203+
(result && w.resultRun < int(w.spec.SuccessThreshold)) {
204+
// Success or failure is below threshold - leave the probe state unchanged.
205+
return
206+
}
207+
208+
RecordBackendProbeHealthy(w.backendName, result)
209+
w.resultHandler(result, message)
210+
}

0 commit comments

Comments
 (0)