Skip to content

Commit 603a3fc

Browse files
committed
refactor(rpcprovider)!: use pure gRPC instead of grpc-web wrapper
BREAKING CHANGE: gRPC-web protocol is no longer supported. Browser-based clients that relied on grpc-web to communicate directly with providers will no longer work. Only native gRPC clients are supported. The previous implementation used grpc-web and h2c HTTP wrappers to support the Lava SDK (lava-sdk/lavajs) which allowed browser-based JavaScript clients to communicate with providers. Since SDK support has been removed (see #2186), this infrastructure is no longer needed. Replace the grpc-web/h2c/HTTP hybrid server with a cleaner architecture: - Use cmux to multiplex HTTP health checks and native gRPC on same port - Remove grpcweb.WrapServer and h2c.NewHandler complexity - Add gRPC health checking protocol support (grpc_health_v1) - Keep HTTP health endpoint for Kubernetes probes compatibility - Optimize cmux matching: HTTP1Fast() first, then Any() for gRPC (avoids expensive HTTP/2 header parsing at high scale) Benefits: - Native gRPC connection management (keepalives, stream limits) - Proper graceful shutdown via grpcServer.GracefulStop() - Simpler, more maintainable code - Better performance at scale (no protocol translation overhead) Also adds comprehensive test suite for provider_listener.go with 14 tests and 2 benchmarks covering HTTP health, gRPC health, relay/probe RPCs, concurrent requests, and error handling.
1 parent b5ccee3 commit 603a3fc

File tree

4 files changed

+848
-44
lines changed

4 files changed

+848
-44
lines changed

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,7 @@ require (
134134
github.com/rogpeppe/go-internal v1.14.1 // indirect
135135
github.com/sagikazarmark/locafero v0.4.0 // indirect
136136
github.com/sagikazarmark/slog-shim v0.1.0 // indirect
137+
github.com/soheilhy/cmux v0.1.5 // indirect
137138
github.com/sourcegraph/conc v0.3.0 // indirect
138139
github.com/supranational/blst v0.3.13 // indirect
139140
github.com/tidwall/btree v1.6.0 // indirect

go.sum

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1218,6 +1218,8 @@ github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVs
12181218
github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc=
12191219
github.com/smartystreets/goconvey v1.6.4/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA=
12201220
github.com/soheilhy/cmux v0.1.4/go.mod h1:IM3LyeVVIOuxMH7sFAkER9+bJ4dT7Ms6E4xg4kGIyLM=
1221+
github.com/soheilhy/cmux v0.1.5 h1:jjzc5WVemNEDTLwv9tlmemhC73tI08BNOIGwBOo10Js=
1222+
github.com/soheilhy/cmux v0.1.5/go.mod h1:T7TcVDs9LWfQgPlPsdngu6I6QIoyIFZDDC6sNE1GqG0=
12211223
github.com/sony/gobreaker v0.4.1/go.mod h1:ZKptC7FHNvhBz7dN2LGjPVBz2sZJmc0/PkyDJOjmxWY=
12221224
github.com/sourcegraph/conc v0.3.0 h1:OQTbbt6P72L20UqAkXXuLOj79LfEanQ+YQFNpLA9ySo=
12231225
github.com/sourcegraph/conc v0.3.0/go.mod h1:Sdozi7LEKbFPqYX2/J+iBAM6HpqSLTASQIKqDmF7Mt0=
@@ -1488,6 +1490,7 @@ golang.org/x/net v0.0.0-20201010224723-4f7140c49acb/go.mod h1:sp8m0HH+o8qH0wwXwY
14881490
golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
14891491
golang.org/x/net v0.0.0-20201031054903-ff519b6c9102/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
14901492
golang.org/x/net v0.0.0-20201110031124-69a78807bb2b/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
1493+
golang.org/x/net v0.0.0-20201202161906-c7110b5ffcbb/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
14911494
golang.org/x/net v0.0.0-20201209123823-ac852fbbde11/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
14921495
golang.org/x/net v0.0.0-20210119194325-5f4716e94777/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
14931496
golang.org/x/net v0.0.0-20210220033124-5f55cee0dc0d/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=

protocol/rpcprovider/provider_listener.go

Lines changed: 96 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -2,35 +2,52 @@ package rpcprovider
22

33
import (
44
"context"
5-
"errors"
6-
"fmt"
5+
"crypto/tls"
6+
"net"
77
"net/http"
88
"strings"
99
"sync"
1010

1111
"github.com/gogo/status"
12-
"github.com/improbable-eng/grpc-web/go/grpcweb"
1312
"github.com/lavanet/lava/v5/protocol/chainlib"
14-
"github.com/lavanet/lava/v5/protocol/common"
1513
"github.com/lavanet/lava/v5/protocol/lavaprotocol/protocolerrors"
1614
"github.com/lavanet/lava/v5/protocol/lavasession"
1715
"github.com/lavanet/lava/v5/utils"
1816
pairingtypes "github.com/lavanet/lava/v5/x/pairing/types"
19-
"golang.org/x/net/http2"
20-
"golang.org/x/net/http2/h2c"
17+
"github.com/soheilhy/cmux"
2118
grpc "google.golang.org/grpc"
2219
"google.golang.org/grpc/codes"
20+
"google.golang.org/grpc/health"
21+
healthgrpc "google.golang.org/grpc/health/grpc_health_v1"
2322
)
2423

2524
const (
2625
HealthCheckURLPathFlagName = "health-check-url-path"
2726
HealthCheckURLPathFlagDefault = "/lava/health"
2827
)
2928

29+
// isShutdownError returns true if the error is expected during graceful shutdown
30+
func isShutdownError(err error) bool {
31+
if err == nil {
32+
return false
33+
}
34+
if err == http.ErrServerClosed || err == cmux.ErrListenerClosed || err == net.ErrClosed {
35+
return true
36+
}
37+
// Check error message for common shutdown patterns
38+
errStr := err.Error()
39+
return strings.Contains(errStr, "use of closed network connection") ||
40+
strings.Contains(errStr, "server closed") ||
41+
strings.Contains(errStr, "mux: listener closed")
42+
}
43+
3044
type ProviderListener struct {
3145
networkAddress string
3246
relayServer *relayServer
33-
httpServer http.Server
47+
grpcServer *grpc.Server
48+
httpServer *http.Server
49+
healthServer *health.Server
50+
cmux cmux.CMux
3451
}
3552

3653
func (pl *ProviderListener) Key() string {
@@ -47,69 +64,104 @@ func (pl *ProviderListener) RegisterReceiver(existingReceiver RelayReceiver, end
4764
return utils.LavaFormatError("double_receiver_setup receiver already defined on this address with the same chainID and apiInterface", nil, utils.Attribute{Key: "chainID", Value: endpoint.ChainID}, utils.Attribute{Key: "apiInterface", Value: endpoint.ApiInterface})
4865
}
4966
pl.relayServer.relayReceivers[listen_endpoint.Key()] = &relayReceiverWrapper{relayReceiver: &existingReceiver, enabled: true}
67+
// Mark service as healthy when receiver is registered
68+
serviceName := endpoint.ChainID + "-" + endpoint.ApiInterface
69+
pl.healthServer.SetServingStatus(serviceName, healthgrpc.HealthCheckResponse_SERVING)
5070
utils.LavaFormatInfo("[++] Provider Listening on Address", utils.Attribute{Key: "chainID", Value: endpoint.ChainID}, utils.Attribute{Key: "apiInterface", Value: endpoint.ApiInterface}, utils.Attribute{Key: "Address", Value: endpoint.NetworkAddress})
5171
return nil
5272
}
5373

5474
func (pl *ProviderListener) Shutdown(shutdownCtx context.Context) error {
55-
if err := pl.httpServer.Shutdown(shutdownCtx); err != nil {
56-
utils.LavaFormatFatal("Provider failed to shutdown", err)
57-
}
75+
pl.healthServer.Shutdown()
76+
pl.httpServer.Shutdown(shutdownCtx)
77+
pl.grpcServer.GracefulStop()
5878
return nil
5979
}
6080

6181
func NewProviderListener(ctx context.Context, networkAddress lavasession.NetworkAddressData, healthCheckPath string) *ProviderListener {
6282
pl := &ProviderListener{networkAddress: networkAddress.Address}
6383

64-
// GRPC
6584
lis := chainlib.GetListenerWithRetryGrpc("tcp", networkAddress.Address)
66-
opts := []grpc.ServerOption{
67-
grpc.MaxRecvMsgSize(1024 * 1024 * 512), // setting receive size to 512mb for large debug responses
68-
grpc.MaxSendMsgSize(1024 * 1024 * 512), // setting send size to 512mb for large debug responses
69-
}
70-
grpcServer := grpc.NewServer(opts...)
71-
wrappedServer := grpcweb.WrapServer(grpcServer)
72-
handler := func(resp http.ResponseWriter, req *http.Request) {
73-
// Set CORS headers
74-
resp.Header().Set("Access-Control-Allow-Origin", "*")
75-
resp.Header().Set("Access-Control-Allow-Headers", fmt.Sprintf("Content-Type, x-grpc-web, %s", common.LAVA_CONSUMER_PROCESS_GUID))
76-
77-
if req.URL.Path == healthCheckPath && req.Method == http.MethodGet {
78-
resp.WriteHeader(http.StatusOK)
79-
resp.Write([]byte("Healthy"))
80-
return
81-
}
8285

83-
wrappedServer.ServeHTTP(resp, req)
86+
// Wrap with TLS if enabled
87+
if !networkAddress.DisableTLS {
88+
tlsConfig := lavasession.GetTlsConfig(networkAddress)
89+
lis = tls.NewListener(lis, tlsConfig)
90+
} else {
91+
utils.LavaFormatInfo("Running with disabled TLS configuration")
8492
}
8593

86-
pl.httpServer = http.Server{
87-
Handler: h2c.NewHandler(http.HandlerFunc(handler), &http2.Server{}),
88-
}
94+
// Create connection multiplexer to handle both HTTP and gRPC on same port
95+
mux := cmux.New(lis)
96+
// Match HTTP/1.1 first for health checks (fast prefix match)
97+
httpListener := mux.Match(cmux.HTTP1Fast())
98+
// Everything else goes to gRPC (avoids expensive header parsing)
99+
grpcListener := mux.Match(cmux.Any())
100+
pl.cmux = mux
89101

90-
var serveExecutor func() error
91-
if networkAddress.DisableTLS {
92-
utils.LavaFormatInfo("Running with disabled TLS configuration")
93-
serveExecutor = func() error {
94-
return pl.httpServer.Serve(lis)
95-
}
96-
} else {
97-
pl.httpServer.TLSConfig = lavasession.GetTlsConfig(networkAddress)
98-
serveExecutor = func() error {
99-
return pl.httpServer.ServeTLS(lis, "", "")
100-
}
102+
// Build gRPC server
103+
opts := []grpc.ServerOption{
104+
grpc.MaxRecvMsgSize(1024 * 1024 * 512), // 512MB for large debug responses
105+
grpc.MaxSendMsgSize(1024 * 1024 * 512), // 512MB for large debug responses
101106
}
107+
grpcServer := grpc.NewServer(opts...)
108+
pl.grpcServer = grpcServer
102109

110+
// Register gRPC health checking service
111+
healthServer := health.NewServer()
112+
healthgrpc.RegisterHealthServer(grpcServer, healthServer)
113+
healthServer.SetServingStatus("", healthgrpc.HealthCheckResponse_SERVING)
114+
pl.healthServer = healthServer
115+
116+
// Register relay server
103117
relayServer := &relayServer{relayReceivers: map[string]*relayReceiverWrapper{}}
104118
pl.relayServer = relayServer
105119
pairingtypes.RegisterRelayerServer(grpcServer, relayServer)
120+
121+
// Create HTTP server for health checks
122+
httpMux := http.NewServeMux()
123+
httpMux.HandleFunc(healthCheckPath, func(w http.ResponseWriter, r *http.Request) {
124+
if r.Method == http.MethodGet {
125+
w.WriteHeader(http.StatusOK)
126+
w.Write([]byte("Healthy"))
127+
} else {
128+
w.WriteHeader(http.StatusMethodNotAllowed)
129+
}
130+
})
131+
httpServer := &http.Server{Handler: httpMux}
132+
pl.httpServer = httpServer
133+
134+
// Start servers
106135
go func() {
107136
utils.LavaFormatInfo("New provider listener active", utils.Attribute{Key: "address", Value: networkAddress})
108-
if err := serveExecutor(); !errors.Is(err, http.ErrServerClosed) {
109-
utils.LavaFormatFatal("provider failed to serve", err, utils.Attribute{Key: "Address", Value: lis.Addr().String()})
137+
if err := grpcServer.Serve(grpcListener); err != nil {
138+
// Ignore expected shutdown errors
139+
if isShutdownError(err) {
140+
return
141+
}
142+
utils.LavaFormatFatal("gRPC server failed", err, utils.Attribute{Key: "Address", Value: networkAddress.Address})
143+
}
144+
}()
145+
146+
go func() {
147+
if err := httpServer.Serve(httpListener); err != nil {
148+
// Ignore expected shutdown errors
149+
if isShutdownError(err) {
150+
return
151+
}
152+
utils.LavaFormatFatal("HTTP health server failed", err, utils.Attribute{Key: "Address", Value: networkAddress.Address})
153+
}
154+
}()
155+
156+
go func() {
157+
if err := mux.Serve(); err != nil {
158+
if !isShutdownError(err) {
159+
utils.LavaFormatError("cmux serve error", err)
160+
}
110161
}
111162
utils.LavaFormatInfo("listener closed server", utils.Attribute{Key: "address", Value: networkAddress})
112163
}()
164+
113165
return pl
114166
}
115167

0 commit comments

Comments
 (0)