Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
e881243
fix: update consumer script to enable selection stats in RPC command
avitenzer Jan 5, 2026
d0a89c8
refactor: enhance session management with composition pattern
avitenzer Jan 5, 2026
4360bcd
feat: implement direct RPC connection handling and error mapping
avitenzer Jan 6, 2026
4c640dd
feat: enhance logging and session management for direct RPC relay
avitenzer Jan 6, 2026
7695329
refactor: clean up logging and improve RPC session handling
avitenzer Jan 6, 2026
a0a5700
feat: enhance direct RPC session tracking and block synchronization
avitenzer Jan 6, 2026
df09018
feat: implement REST support and enhance direct RPC handling
avitenzer Jan 7, 2026
4579e26
feat: add comprehensive REST integration tests for DirectRPCRelaySender
avitenzer Jan 7, 2026
86bc2a1
added lava smart router init script
avitenzer Jan 7, 2026
5cf3fc0
feat: enhance error handling and testing for REST responses
avitenzer Jan 9, 2026
00a4d13
feat: add Smart Router configuration for local Lava node
avitenzer Jan 11, 2026
72f9e3a
fix: create minimal endpoint for ChainTracker in RPCSmartRouter
avitenzer Jan 11, 2026
014a123
Merge branch 'main' into feat/direct-rpc
avitenzer Jan 11, 2026
57185c9
feat: update global latest block height in RPCSmartRouter
avitenzer Jan 11, 2026
f69a22f
feat: add GetNodeUrl method to DirectRPCConnection interfaces
avitenzer Jan 11, 2026
eff1396
refactor: unify WebSocket subscription management across components
avitenzer Jan 13, 2026
34f665e
feat: implement multi-client subscription handling with unique router…
avitenzer Jan 14, 2026
dbdc056
feat: add gRPC support with dynamic message handling and connection p…
avitenzer Jan 15, 2026
c346a27
feat: implement gRPC streaming subscription management with connectio…
avitenzer Jan 15, 2026
4d03f75
feat: add direct RPC configuration for Ethereum endpoints and gRPC re…
avitenzer Jan 18, 2026
3054aa2
feat: implement health checking for direct RPC endpoints in smart rou…
avitenzer Jan 18, 2026
059e4cc
feat: enhance DirectWSSubscriptionManager with Tendermint and EVM sub…
avitenzer Jan 19, 2026
4743cf9
feat: enhance block height extraction and subscription response handl…
avitenzer Jan 19, 2026
5bd871f
feat: update smart router initialization script to support gRPC endpo…
avitenzer Jan 19, 2026
7da10c6
feat: enhance smart router initialization script to support Tendermin…
avitenzer Jan 20, 2026
84a61b3
Merge branch 'main' into feat/direct-rpc
avitenzer Jan 20, 2026
6f44174
feat: update smart router configuration to support additional Tenderm…
avitenzer Jan 20, 2026
8052c19
feat: add NoOpWSSubscriptionManager and enhance endpoint definitions
avitenzer Jan 21, 2026
5643837
refactor: clean up log messages and comments for clarity
avitenzer Jan 21, 2026
05c4de8
feat: add comprehensive tests for gRPC subscription manager and relat…
avitenzer Jan 22, 2026
9f81fe2
refactor: rename test function and remove unused compression tests
avitenzer Jan 22, 2026
180ef1e
refactor: update terminology from providers to endpoints in RPCSmartR…
avitenzer Jan 22, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion protocol/chainlib/chainlib.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/lavanet/lava/v5/utils"
pairingtypes "github.com/lavanet/lava/v5/x/pairing/types"
spectypes "github.com/lavanet/lava/v5/x/spec/types"
"google.golang.org/grpc"
)

const (
Expand Down Expand Up @@ -42,7 +43,7 @@ func NewChainListener(
healthReporter HealthReporter,
rpcConsumerLogs *metrics.RPCConsumerLogs,
chainParser ChainParser,
consumerWsSubscriptionManager *ConsumerWSSubscriptionManager,
consumerWsSubscriptionManager WSSubscriptionManager,
) (ChainListener, error) {
if listenEndpoint.NetworkAddress == INTERNAL_ADDRESS {
utils.LavaFormatDebug("skipping chain listener for internal address")
Expand Down Expand Up @@ -118,6 +119,16 @@ type HealthReporter interface {
IsHealthy() bool
}

// GRPCReflectionProvider is an optional interface that can be implemented by RelaySender
// to provide gRPC reflection support. When implemented, the gRPC listener will register
// a reflection proxy service that enables tools like grpcurl to discover services.
type GRPCReflectionProvider interface {
// GetGRPCReflectionConnection returns a gRPC connection for reflection requests.
// The cleanup function should be called when the connection is no longer needed.
// Returns nil if reflection is not supported.
GetGRPCReflectionConnection(ctx context.Context) (conn *grpc.ClientConn, cleanup func(), err error)
}

type RelaySender interface {
SendRelay(
ctx context.Context,
Expand Down
49 changes: 46 additions & 3 deletions protocol/chainlib/chainproxy/rpcInterfaceMessages/restMessage.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package rpcInterfaceMessages

import (
"fmt"
"net/url"
"strings"

Expand Down Expand Up @@ -49,6 +50,13 @@ func (rm *RestMessage) GetRawRequestHash() ([]byte, error) {
}

func (jm RestMessage) CheckResponseError(data []byte, httpStatusCode int) (hasError bool, errorMessage string) {
// Treat 5xx and 429 as node errors (triggers retries)
if httpStatusCode >= 500 || httpStatusCode == 429 {
// Server error or rate limit - treat as node error for retry logic
errorMsg := extractErrorMessage(data, httpStatusCode)
return true, errorMsg
}

// Check Cosmos SDK transaction errors (HTTP 2xx with error code in JSON)
if httpStatusCode >= 200 && httpStatusCode < 300 {
if hasError, errMsg := checkCosmosTxError(data); hasError {
Expand All @@ -58,8 +66,9 @@ func (jm RestMessage) CheckResponseError(data []byte, httpStatusCode int) (hasEr
return false, ""
}

// Check generic REST errors (non-2xx HTTP status)
return checkGenericRESTError(data)
// 4xx (except 429) are client errors - NOT node errors
// Return false so state machine doesn't retry (client error won't succeed on retry)
return false, ""
}

// checkCosmosTxError detects errors in Cosmos SDK transaction responses
Expand All @@ -78,7 +87,41 @@ func checkCosmosTxError(data []byte) (bool, string) {
return false, ""
}

// checkGenericRESTError detects errors in generic REST API responses
// extractErrorMessage attempts to extract error message from response body
// Tries common fields in order: "message", "error", raw body (truncated), or fallback to status
func extractErrorMessage(data []byte, httpStatusCode int) string {
// Try to parse as JSON and extract error message
var result map[string]interface{}
if err := json.Unmarshal(data, &result); err == nil {
// Try common error field names
if msg, ok := result["message"].(string); ok && msg != "" {
return msg
}
if msg, ok := result["error"].(string); ok && msg != "" {
return msg
}
// Try nested error.message
if errorObj, ok := result["error"].(map[string]interface{}); ok {
if msg, ok := errorObj["message"].(string); ok && msg != "" {
return msg
}
}
}

// Fallback: use raw body (truncated to 1KB)
bodyStr := string(data)
if len(bodyStr) > 1024 {
bodyStr = bodyStr[:1024] + "..."
}
if bodyStr != "" {
return bodyStr
}

// Final fallback: use HTTP status code
return fmt.Sprintf("HTTP %d", httpStatusCode)
}

// checkGenericRESTError detects errors in generic REST API responses (deprecated - kept for backward compatibility)
// Expects format: {"message": "error text", "code": <error_code>}
func checkGenericRESTError(data []byte) (bool, string) {
result := make(map[string]interface{})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -541,3 +541,93 @@
})
}
}

// TestCheckResponseError_ServerErrors tests 5xx and 429 handling (Phase 4 corrections)
func TestCheckResponseError_ServerErrors(t *testing.T) {
testCases := []struct {
name string
httpStatus int
response string
expectedError bool // Should be node error?

Check failure on line 551 in protocol/chainlib/chainproxy/rpcInterfaceMessages/restMessage_test.go

View workflow job for this annotation

GitHub Actions / lint

File is not `gofmt`-ed with `-s` (gofmt)

Check failure on line 551 in protocol/chainlib/chainproxy/rpcInterfaceMessages/restMessage_test.go

View workflow job for this annotation

GitHub Actions / lint

File is not `gofmt`-ed with `-s` (gofmt)
errorCheck func(t *testing.T, errorMessage string)
}{
{
name: "503 Service Unavailable with JSON error",
httpStatus: 503,
response: `{"error":"service temporarily unavailable"}`,
expectedError: true, // Node error - triggers retry
errorCheck: func(t *testing.T, errorMessage string) {
require.Contains(t, errorMessage, "service temporarily unavailable")
},
},
{
name: "503 with plain text body",
httpStatus: 503,
response: `Service Unavailable`,
expectedError: true, // Node error even without JSON
errorCheck: func(t *testing.T, errorMessage string) {
require.Contains(t, errorMessage, "Service Unavailable")
},
},
{
name: "500 Internal Server Error",
httpStatus: 500,
response: `{"message":"internal server error"}`,
expectedError: true, // Node error
errorCheck: func(t *testing.T, errorMessage string) {
require.Contains(t, errorMessage, "internal server error")
},
},
{
name: "502 Bad Gateway",
httpStatus: 502,
response: `Bad Gateway`,
expectedError: true, // Node error
},
{
name: "429 Rate Limit Exceeded",
httpStatus: 429,
response: `{"error":"rate limit exceeded"}`,
expectedError: true, // Node error (triggers backoff/retry)
errorCheck: func(t *testing.T, errorMessage string) {
require.Contains(t, errorMessage, "rate limit exceeded")
},
},
{
name: "429 with empty body",
httpStatus: 429,
response: ``,
expectedError: true, // Still node error
errorCheck: func(t *testing.T, errorMessage string) {
require.Equal(t, "HTTP 429", errorMessage) // Fallback to status

Check failure on line 602 in protocol/chainlib/chainproxy/rpcInterfaceMessages/restMessage_test.go

View workflow job for this annotation

GitHub Actions / lint

File is not `gofumpt`-ed (gofumpt)
},
},
{
name: "404 Not Found (client error)",
httpStatus: 404,
response: `{"code":5,"message":"block not found"}`,
expectedError: false, // NOT a node error - client error
},
{
name: "400 Bad Request (client error)",
httpStatus: 400,
response: `{"error":"invalid request"}`,
expectedError: false, // NOT a node error
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
restMsg := RestMessage{}

hasError, errorMessage := restMsg.CheckResponseError([]byte(tc.response), tc.httpStatus)

require.Equal(t, tc.expectedError, hasError,
"Expected node error=%v for HTTP %d", tc.expectedError, tc.httpStatus)

if tc.errorCheck != nil {
tc.errorCheck(t, errorMessage)
}
})
}
}
8 changes: 4 additions & 4 deletions protocol/chainlib/chainproxy/rpcclient/subscription_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func TestClientSubscription_UnsubscribeBeforeRun(t *testing.T) {
select {
case <-done:
// Success - Unsubscribe completed without blocking
t.Log(" Unsubscribe completed without deadlock")
t.Log("[PASS] Unsubscribe completed without deadlock")
case <-time.After(1 * time.Second):
t.Fatal("Unsubscribe deadlocked when called before run() started")
}
Expand Down Expand Up @@ -87,7 +87,7 @@ func TestClientSubscription_UnsubscribeRace(t *testing.T) {

select {
case <-done:
t.Log(" Concurrent Unsubscribe calls completed safely")
t.Log("[PASS] Concurrent Unsubscribe calls completed safely")
case <-time.After(2 * time.Second):
t.Fatal("Concurrent Unsubscribe calls deadlocked")
}
Expand Down Expand Up @@ -127,7 +127,7 @@ func TestClientSubscription_UnsubscribeWithRunning(t *testing.T) {

select {
case <-done:
t.Log(" Subscription run() completed after Unsubscribe")
t.Log("[PASS] Subscription run() completed after Unsubscribe")
case <-time.After(2 * time.Second):
t.Fatal("Subscription run() didn't exit after Unsubscribe")
}
Expand Down Expand Up @@ -168,7 +168,7 @@ func TestClientSubscription_BufferedQuitChannelPreventsDeadlock(t *testing.T) {
// This is the key test: with buffered channel, this should succeed
select {
case <-unsubscribeComplete:
t.Log(" Buffered quit channel prevented deadlock")
t.Log("[PASS] Buffered quit channel prevented deadlock")

// Now verify the quit signal is still available if run() starts later
select {
Expand Down
4 changes: 2 additions & 2 deletions protocol/chainlib/consumer_websocket_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ type ConsumerWebsocketManager struct {
apiInterface string
connectionType string
relaySender RelaySender
consumerWsSubscriptionManager *ConsumerWSSubscriptionManager
consumerWsSubscriptionManager WSSubscriptionManager
WebsocketConnectionUID string
headerRateLimit uint64
}
Expand All @@ -52,7 +52,7 @@ type ConsumerWebsocketManagerOptions struct {
ApiInterface string
ConnectionType string
RelaySender RelaySender
ConsumerWsSubscriptionManager *ConsumerWSSubscriptionManager
ConsumerWsSubscriptionManager WSSubscriptionManager
WebsocketConnectionUID string
headerRateLimit uint64
}
Expand Down
11 changes: 10 additions & 1 deletion protocol/chainlib/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,16 @@ func (apil *GrpcChainListener) Serve(ctx context.Context, cmdFlags common.Consum
return relayReply.Data, convertRelayMetaDataToMDMetaData(metadataToReply), nil
}

_, httpServer, err := grpcproxy.NewGRPCProxy(sendRelayCallback, apil.endpoint.HealthCheckPath, cmdFlags, apil.healthReporter)
// Check if the relay sender supports gRPC reflection (optional interface)
var reflectionCallback grpcproxy.ReflectionProxyCallback
if reflectionProvider, ok := apil.relaySender.(GRPCReflectionProvider); ok {
reflectionCallback = reflectionProvider.GetGRPCReflectionConnection
utils.LavaFormatInfo("gRPC reflection support enabled",
utils.LogAttr("address", apil.endpoint.NetworkAddress),
)
}

_, httpServer, err := grpcproxy.NewGRPCProxyWithReflection(sendRelayCallback, apil.endpoint.HealthCheckPath, cmdFlags, apil.healthReporter, reflectionCallback)
if err != nil {
utils.LavaFormatFatal("provider failure RegisterServer", err, utils.Attribute{Key: "listenAddr", Value: apil.endpoint.NetworkAddress})
}
Expand Down
Loading
Loading