Skip to content

Commit 6a7f5b3

Browse files
committed
fix(relay): enforce RelayCountOnNodeError as hard retry limit
When quorum is disabled (no lava-quorum-* headers), RelayCountOnNodeError now acts as a hard limit on the total number of retry batches. Setting RelayCountOnNodeError=0 effectively disables all retries on node errors. When quorum is enabled, quorumParams.Max is used as the retry limit instead, allowing quorum functionality to work independently of RelayCountOnNodeError. Changes: - Modified retryCondition() in both smart router and consumer state machines to respect RelayCountOnNodeError when quorum is disabled - Updated HasRequiredNodeResults() to count node errors as "results" when RelayCountOnNodeError=0 to prevent quorum logic from triggering retries - Added comprehensive tests for retry limit behavior
1 parent 3659faf commit 6a7f5b3

File tree

5 files changed

+572
-9
lines changed

5 files changed

+572
-9
lines changed

protocol/relaycore/relay_processor.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -319,7 +319,14 @@ func (rp *RelayProcessor) HasRequiredNodeResults(tries int) (bool, int) {
319319
}
320320
} else {
321321
// Quorum feature disabled: check if we have enough results for quorum
322-
retryForQuorumNeeded = !(resultsCount >= neededForQuorum)
322+
// When RelayCountOnNodeError is 0, treat node errors as "results" for quorum purposes
323+
// This ensures that when retries are disabled, we don't retry to replace node errors
324+
if RelayCountOnNodeError == 0 {
325+
totalNodeResults := resultsCount + nodeErrors
326+
retryForQuorumNeeded = !(totalNodeResults >= neededForQuorum)
327+
} else {
328+
retryForQuorumNeeded = !(resultsCount >= neededForQuorum)
329+
}
323330
}
324331

325332
if !retryForQuorumNeeded {

protocol/relaycore/relay_processor_test.go

Lines changed: 281 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1933,3 +1933,284 @@ func TestQuorumDisabledScenario(t *testing.T) {
19331933
result = rp.getRequiredQuorumSize(3)
19341934
require.Equal(t, 2, result, "Quorum enabled with 3 responses: ceil(0.66 * 3) = 2")
19351935
}
1936+
1937+
// TestRelayCountOnNodeErrorZeroDisablesRetries tests that when RelayCountOnNodeError=0,
1938+
// node errors are counted towards quorum and retries are disabled.
1939+
func TestRelayCountOnNodeErrorZeroDisablesRetries(t *testing.T) {
1940+
ctx := context.Background()
1941+
// Create a minimal protocol message to avoid nil pointer panic
1942+
chainParser, _, _, closeServer, _, err := chainlib.CreateChainLibMocks(ctx, "LAV1", spectypes.APIInterfaceRest, nil, nil, "../../", nil)
1943+
if closeServer != nil {
1944+
defer closeServer()
1945+
}
1946+
require.NoError(t, err)
1947+
chainMsg, err := chainParser.ParseMsg("/cosmos/base/tendermint/v1beta1/blocks/17", nil, http.MethodGet, nil, extensionslib.ExtensionInfo{LatestBlock: 0})
1948+
require.NoError(t, err)
1949+
protocolMessage := chainlib.NewProtocolMessage(chainMsg, nil, nil, "", "")
1950+
1951+
// Quorum disabled params (Min=1)
1952+
quorumParams := common.QuorumParams{Rate: 1, Max: 1, Min: 1}
1953+
1954+
tests := []struct {
1955+
name string
1956+
relayCountOnNodeError int
1957+
successResults int
1958+
nodeErrors int
1959+
expectedHasResults bool
1960+
description string
1961+
}{
1962+
{
1963+
name: "RelayCountOnNodeError=0 with only node errors should NOT retry",
1964+
relayCountOnNodeError: 0,
1965+
successResults: 0,
1966+
nodeErrors: 1,
1967+
expectedHasResults: true, // Node error counts as result when flag is 0
1968+
description: "When RelayCountOnNodeError=0, node errors should count toward quorum",
1969+
},
1970+
{
1971+
name: "RelayCountOnNodeError=2 with only node errors SHOULD retry",
1972+
relayCountOnNodeError: 2,
1973+
successResults: 0,
1974+
nodeErrors: 1,
1975+
expectedHasResults: false, // Node error does NOT count as result when flag > 0
1976+
description: "When RelayCountOnNodeError>0, should retry to get a success",
1977+
},
1978+
{
1979+
name: "RelayCountOnNodeError=0 with success should NOT retry",
1980+
relayCountOnNodeError: 0,
1981+
successResults: 1,
1982+
nodeErrors: 0,
1983+
expectedHasResults: true,
1984+
description: "Success should always meet quorum",
1985+
},
1986+
{
1987+
name: "RelayCountOnNodeError=2 with success should NOT retry",
1988+
relayCountOnNodeError: 2,
1989+
successResults: 1,
1990+
nodeErrors: 0,
1991+
expectedHasResults: true,
1992+
description: "Success should always meet quorum",
1993+
},
1994+
{
1995+
name: "RelayCountOnNodeError=0 with multiple node errors should NOT retry",
1996+
relayCountOnNodeError: 0,
1997+
successResults: 0,
1998+
nodeErrors: 3,
1999+
expectedHasResults: true, // Node errors count as results
2000+
description: "Multiple node errors should count toward quorum when flag is 0",
2001+
},
2002+
}
2003+
2004+
for _, tt := range tests {
2005+
t.Run(tt.name, func(t *testing.T) {
2006+
// Save original value and restore after test
2007+
originalValue := RelayCountOnNodeError
2008+
RelayCountOnNodeError = tt.relayCountOnNodeError
2009+
defer func() { RelayCountOnNodeError = originalValue }()
2010+
2011+
// Create fresh usedProviders for each test
2012+
usedProviders := lavasession.NewUsedProviders(nil)
2013+
2014+
relayProcessor := NewRelayProcessor(
2015+
ctx,
2016+
quorumParams,
2017+
nil,
2018+
RelayProcessorMetrics,
2019+
RelayProcessorMetrics,
2020+
RelayRetriesManagerInstance,
2021+
newMockRelayStateMachineWithSelection(protocolMessage, usedProviders, Quorum),
2022+
qos.NewQoSManager(),
2023+
)
2024+
2025+
// Set up providers
2026+
consumerSessionsMap := lavasession.ConsumerSessionsMap{}
2027+
totalResponses := tt.successResults + tt.nodeErrors
2028+
for i := 0; i < totalResponses; i++ {
2029+
consumerSessionsMap[fmt.Sprintf("provider%d", i)] = &lavasession.SessionInfo{}
2030+
}
2031+
relayProcessor.GetUsedProviders().AddUsed(consumerSessionsMap, nil)
2032+
2033+
// Add success results (status 200, no error)
2034+
for i := 0; i < tt.successResults; i++ {
2035+
provider := fmt.Sprintf("provider%d", i)
2036+
relayProcessor.GetUsedProviders().RemoveUsed(provider, lavasession.NewRouterKey(nil), nil)
2037+
mockResponse := &RelayResponse{
2038+
RelayResult: common.RelayResult{
2039+
Request: &pairingtypes.RelayRequest{
2040+
RelaySession: &pairingtypes.RelaySession{},
2041+
RelayData: &pairingtypes.RelayPrivateData{},
2042+
},
2043+
Reply: &pairingtypes.RelayReply{Data: []byte(`{"result": "success"}`)},
2044+
ProviderInfo: common.ProviderInfo{ProviderAddress: provider},
2045+
StatusCode: 200, // Success status code
2046+
},
2047+
Err: nil,
2048+
}
2049+
relayProcessor.SetResponse(mockResponse)
2050+
}
2051+
2052+
// Add node error results (status 500, triggers CheckResponseError)
2053+
// REST API expects error format: {"message": "...", "code": ...} at top level
2054+
for i := 0; i < tt.nodeErrors; i++ {
2055+
provider := fmt.Sprintf("provider%d", i+tt.successResults)
2056+
relayProcessor.GetUsedProviders().RemoveUsed(provider, lavasession.NewRouterKey(nil), nil)
2057+
mockResponse := &RelayResponse{
2058+
RelayResult: common.RelayResult{
2059+
Request: &pairingtypes.RelayRequest{
2060+
RelaySession: &pairingtypes.RelaySession{},
2061+
RelayData: &pairingtypes.RelayPrivateData{},
2062+
},
2063+
Reply: &pairingtypes.RelayReply{Data: []byte(`{"message": "Slot was skipped", "code": 500}`)},
2064+
ProviderInfo: common.ProviderInfo{ProviderAddress: provider},
2065+
StatusCode: 500, // Node error status code - triggers CheckResponseError
2066+
},
2067+
Err: nil, // Must be nil to go through setValidResponse -> CheckResponseError
2068+
}
2069+
relayProcessor.SetResponse(mockResponse)
2070+
}
2071+
2072+
// Wait for responses to be processed
2073+
waitCtx, cancel := context.WithTimeout(context.Background(), time.Millisecond*100)
2074+
defer cancel()
2075+
relayProcessor.WaitForResults(waitCtx)
2076+
2077+
// Test HasRequiredNodeResults
2078+
hasResults, _ := relayProcessor.HasRequiredNodeResults(1)
2079+
2080+
require.Equal(t, tt.expectedHasResults, hasResults,
2081+
"%s: HasRequiredNodeResults mismatch", tt.description)
2082+
})
2083+
}
2084+
}
2085+
2086+
// TestRelayCountOnNodeErrorZeroBatchRequest simulates the real-world scenario
2087+
// where a batch request returns partial errors (like Solana skipped slots)
2088+
func TestRelayCountOnNodeErrorZeroBatchRequest(t *testing.T) {
2089+
ctx := context.Background()
2090+
chainParser, _, _, closeServer, _, err := chainlib.CreateChainLibMocks(ctx, "LAV1", spectypes.APIInterfaceRest, nil, nil, "../../", nil)
2091+
if closeServer != nil {
2092+
defer closeServer()
2093+
}
2094+
require.NoError(t, err)
2095+
chainMsg, err := chainParser.ParseMsg("/cosmos/base/tendermint/v1beta1/blocks/17", nil, http.MethodGet, nil, extensionslib.ExtensionInfo{LatestBlock: 0})
2096+
require.NoError(t, err)
2097+
protocolMessage := chainlib.NewProtocolMessage(chainMsg, nil, nil, "", "")
2098+
2099+
quorumParams := common.QuorumParams{Rate: 1, Max: 1, Min: 1}
2100+
2101+
t.Run("Batch request with node error should not retry when RelayCountOnNodeError=0", func(t *testing.T) {
2102+
// Save and set RelayCountOnNodeError to 0
2103+
originalValue := RelayCountOnNodeError
2104+
RelayCountOnNodeError = 0
2105+
defer func() { RelayCountOnNodeError = originalValue }()
2106+
2107+
// Create fresh usedProviders for this test
2108+
usedProviders := lavasession.NewUsedProviders(nil)
2109+
2110+
relayProcessor := NewRelayProcessor(
2111+
ctx,
2112+
quorumParams,
2113+
nil,
2114+
RelayProcessorMetrics,
2115+
RelayProcessorMetrics,
2116+
RelayRetriesManagerInstance,
2117+
newMockRelayStateMachineWithSelection(protocolMessage, usedProviders, Quorum),
2118+
qos.NewQoSManager(),
2119+
)
2120+
2121+
// Set up one provider
2122+
consumerSessionsMap := lavasession.ConsumerSessionsMap{
2123+
"quicknode": &lavasession.SessionInfo{},
2124+
}
2125+
relayProcessor.GetUsedProviders().AddUsed(consumerSessionsMap, nil)
2126+
2127+
// Remove provider from used (simulates response received)
2128+
relayProcessor.GetUsedProviders().RemoveUsed("quicknode", lavasession.NewRouterKey(nil), nil)
2129+
2130+
// Simulate node error response (status 500 triggers CheckResponseError)
2131+
// REST API expects error format: {"message": "...", "code": ...} at top level
2132+
nodeErrorResponse := []byte(`{"message": "Slot was skipped", "code": 500}`)
2133+
mockResponse := &RelayResponse{
2134+
RelayResult: common.RelayResult{
2135+
Request: &pairingtypes.RelayRequest{
2136+
RelaySession: &pairingtypes.RelaySession{},
2137+
RelayData: &pairingtypes.RelayPrivateData{},
2138+
},
2139+
Reply: &pairingtypes.RelayReply{Data: nodeErrorResponse},
2140+
ProviderInfo: common.ProviderInfo{ProviderAddress: "quicknode"},
2141+
StatusCode: 500, // Node error status code
2142+
},
2143+
Err: nil, // Must be nil to go through setValidResponse
2144+
}
2145+
relayProcessor.SetResponse(mockResponse)
2146+
2147+
// Wait for response
2148+
waitCtx, cancel := context.WithTimeout(context.Background(), time.Millisecond*100)
2149+
defer cancel()
2150+
relayProcessor.WaitForResults(waitCtx)
2151+
2152+
// With RelayCountOnNodeError=0, should have required results (no retry needed)
2153+
hasResults, nodeErrorCount := relayProcessor.HasRequiredNodeResults(1)
2154+
2155+
require.True(t, hasResults, "With RelayCountOnNodeError=0, batch node error should count as result")
2156+
require.Equal(t, 1, nodeErrorCount, "Should have 1 node error")
2157+
})
2158+
2159+
t.Run("Batch request with node error SHOULD retry when RelayCountOnNodeError=2", func(t *testing.T) {
2160+
// Save and set RelayCountOnNodeError to 2
2161+
originalValue := RelayCountOnNodeError
2162+
RelayCountOnNodeError = 2
2163+
defer func() { RelayCountOnNodeError = originalValue }()
2164+
2165+
// Create fresh usedProviders for this test
2166+
usedProviders := lavasession.NewUsedProviders(nil)
2167+
2168+
relayProcessor := NewRelayProcessor(
2169+
ctx,
2170+
quorumParams,
2171+
nil,
2172+
RelayProcessorMetrics,
2173+
RelayProcessorMetrics,
2174+
RelayRetriesManagerInstance,
2175+
newMockRelayStateMachineWithSelection(protocolMessage, usedProviders, Quorum),
2176+
qos.NewQoSManager(),
2177+
)
2178+
2179+
// Set up one provider
2180+
consumerSessionsMap := lavasession.ConsumerSessionsMap{
2181+
"quicknode": &lavasession.SessionInfo{},
2182+
}
2183+
relayProcessor.GetUsedProviders().AddUsed(consumerSessionsMap, nil)
2184+
2185+
// Remove provider from used (simulates response received)
2186+
relayProcessor.GetUsedProviders().RemoveUsed("quicknode", lavasession.NewRouterKey(nil), nil)
2187+
2188+
// Simulate node error response (status 500 triggers CheckResponseError)
2189+
// REST API expects error format: {"message": "...", "code": ...} at top level
2190+
nodeErrorResponse := []byte(`{"message": "Slot was skipped", "code": 500}`)
2191+
mockResponse := &RelayResponse{
2192+
RelayResult: common.RelayResult{
2193+
Request: &pairingtypes.RelayRequest{
2194+
RelaySession: &pairingtypes.RelaySession{},
2195+
RelayData: &pairingtypes.RelayPrivateData{},
2196+
},
2197+
Reply: &pairingtypes.RelayReply{Data: nodeErrorResponse},
2198+
ProviderInfo: common.ProviderInfo{ProviderAddress: "quicknode"},
2199+
StatusCode: 500, // Node error status code
2200+
},
2201+
Err: nil, // Must be nil to go through setValidResponse
2202+
}
2203+
relayProcessor.SetResponse(mockResponse)
2204+
2205+
// Wait for response
2206+
waitCtx, cancel := context.WithTimeout(context.Background(), time.Millisecond*100)
2207+
defer cancel()
2208+
relayProcessor.WaitForResults(waitCtx)
2209+
2210+
// With RelayCountOnNodeError=2, should NOT have required results (retry needed)
2211+
hasResults, nodeErrorCount := relayProcessor.HasRequiredNodeResults(1)
2212+
2213+
require.False(t, hasResults, "With RelayCountOnNodeError=2, should retry to get a success")
2214+
require.Equal(t, 1, nodeErrorCount, "Should have 1 node error")
2215+
})
2216+
}

protocol/rpcconsumer/consumer_relay_state_machine.go

Lines changed: 30 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -183,12 +183,38 @@ func (crsm *ConsumerRelayStateMachine) retryCondition(numberOfRetriesLaunched in
183183
return false
184184
}
185185

186-
if crsm.resultsChecker.GetQuorumParams().Enabled() && numberOfRetriesLaunched > crsm.resultsChecker.GetQuorumParams().Max {
187-
return false
188-
} else if numberOfRetriesLaunched >= MaximumNumberOfTickerRelayRetries {
186+
// Determine retry limit based on quorum settings:
187+
// - If quorum is enabled (user set lava-quorum-* headers): use quorumParams.Max
188+
// - If quorum is disabled (no headers): use RelayCountOnNodeError
189+
if crsm.resultsChecker.GetQuorumParams().Enabled() {
190+
// Quorum enabled: respect quorum.Max as the retry limit
191+
if numberOfRetriesLaunched > crsm.resultsChecker.GetQuorumParams().Max {
192+
utils.LavaFormatTrace("[StateMachine] retryCondition: quorum Max limit reached",
193+
utils.LogAttr("GUID", crsm.ctx),
194+
utils.LogAttr("numberOfRetriesLaunched", numberOfRetriesLaunched),
195+
utils.LogAttr("quorumMax", crsm.resultsChecker.GetQuorumParams().Max))
196+
return false
197+
}
198+
} else {
199+
// Quorum disabled: use RelayCountOnNodeError as the hard limit
200+
// numberOfRetriesLaunched is the batch number (1 = initial request done, considering retry)
201+
// If RelayCountOnNodeError=0, no retries allowed (batch 1 > 0, return false)
202+
// If RelayCountOnNodeError=2, allow 2 retries (batches 1,2 ok; batch 3 > 2, return false)
203+
if numberOfRetriesLaunched > relaycore.RelayCountOnNodeError {
204+
utils.LavaFormatTrace("[StateMachine] retryCondition: RelayCountOnNodeError limit reached",
205+
utils.LogAttr("GUID", crsm.ctx),
206+
utils.LogAttr("numberOfRetriesLaunched", numberOfRetriesLaunched),
207+
utils.LogAttr("RelayCountOnNodeError", relaycore.RelayCountOnNodeError))
208+
return false
209+
}
210+
}
211+
212+
// Absolute maximum safety limit
213+
if numberOfRetriesLaunched >= MaximumNumberOfTickerRelayRetries {
189214
return false
190215
}
191-
// best result sends to top 10 providers anyway.
216+
217+
// BestResult mode doesn't retry (sends to top providers at once)
192218
return crsm.selection != relaycore.BestResult
193219
}
194220

protocol/rpcsmartrouter/smartrouter_relay_state_machine.go

Lines changed: 30 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -184,12 +184,38 @@ func (srsm *SmartRouterRelayStateMachine) retryCondition(numberOfRetriesLaunched
184184
return false
185185
}
186186

187-
if srsm.resultsChecker.GetQuorumParams().Enabled() && numberOfRetriesLaunched > srsm.resultsChecker.GetQuorumParams().Max {
188-
return false
189-
} else if numberOfRetriesLaunched >= MaximumNumberOfTickerRelayRetries {
187+
// Determine retry limit based on quorum settings:
188+
// - If quorum is enabled (user set lava-quorum-* headers): use quorumParams.Max
189+
// - If quorum is disabled (no headers): use RelayCountOnNodeError
190+
if srsm.resultsChecker.GetQuorumParams().Enabled() {
191+
// Quorum enabled: respect quorum.Max as the retry limit
192+
if numberOfRetriesLaunched > srsm.resultsChecker.GetQuorumParams().Max {
193+
utils.LavaFormatTrace("[StateMachine] retryCondition: quorum Max limit reached",
194+
utils.LogAttr("GUID", srsm.ctx),
195+
utils.LogAttr("numberOfRetriesLaunched", numberOfRetriesLaunched),
196+
utils.LogAttr("quorumMax", srsm.resultsChecker.GetQuorumParams().Max))
197+
return false
198+
}
199+
} else {
200+
// Quorum disabled: use RelayCountOnNodeError as the hard limit
201+
// numberOfRetriesLaunched is the batch number (1 = initial request done, considering retry)
202+
// If RelayCountOnNodeError=0, no retries allowed (batch 1 > 0, return false)
203+
// If RelayCountOnNodeError=2, allow 2 retries (batches 1,2 ok; batch 3 > 2, return false)
204+
if numberOfRetriesLaunched > relaycore.RelayCountOnNodeError {
205+
utils.LavaFormatTrace("[StateMachine] retryCondition: RelayCountOnNodeError limit reached",
206+
utils.LogAttr("GUID", srsm.ctx),
207+
utils.LogAttr("numberOfRetriesLaunched", numberOfRetriesLaunched),
208+
utils.LogAttr("RelayCountOnNodeError", relaycore.RelayCountOnNodeError))
209+
return false
210+
}
211+
}
212+
213+
// Absolute maximum safety limit
214+
if numberOfRetriesLaunched >= MaximumNumberOfTickerRelayRetries {
190215
return false
191216
}
192-
// best result sends to top 10 providers anyway.
217+
218+
// BestResult mode doesn't retry (sends to top providers at once)
193219
return srsm.selection != relaycore.BestResult
194220
}
195221

0 commit comments

Comments
 (0)