Skip to content

Commit 3844817

Browse files
committed
feat(relay): enforce RelayCountOnNodeError and add separate protocol error retries
When quorum is disabled (no lava-quorum-* headers), RelayCountOnNodeError now acts as a hard limit on the total number of retry batches. Setting RelayCountOnNodeError=0 effectively disables all retries on node errors. When quorum is enabled, quorumParams.Max is used as the retry limit instead, allowing quorum functionality to work independently of RelayCountOnNodeError. Additionally, introduces a new `--set-retry-count-on-protocol-error` CLI flag that controls protocol error retries independently from node error retries. Changes: - Modified retryCondition() in both smart router and consumer state machines to respect RelayCountOnNodeError when quorum is disabled - Updated HasRequiredNodeResults() to count node errors as "results" when RelayCountOnNodeError=0 to prevent quorum logic from triggering retries - Added HasRequiredNodeResults() return value for protocol error count - Update retryCondition to apply appropriate limit based on error type: - Only node errors: use RelayCountOnNodeError - Only protocol errors: use RelayCountOnProtocolError - Both: use max of both limits - Add RelayCountOnProtocolError variable and CLI flags - Added comprehensive tests for retry limit behavior This allows users to disable node error retries (--set-retry-count-on-node-error=0) while still allowing protocol error retries for transient connection issues.
1 parent 2ae0fd6 commit 3844817

File tree

13 files changed

+835
-210
lines changed

13 files changed

+835
-210
lines changed

protocol/chainlib/chainproxy/rpcInterfaceMessages/jsonRPCMessage.go

Lines changed: 12 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,6 @@ import (
1515

1616
var ErrFailedToConvertMessage = sdkerrors.New("RPC error", 1000, "failed to convert a message")
1717

18-
// BatchNodeErrorOnAny controls batch request error detection:
19-
// - false (default): batch is an error only if ALL sub-requests failed
20-
// - true: batch is an error if ANY sub-request failed (strict mode)
21-
var BatchNodeErrorOnAny = false
22-
2318
type JsonrpcMessage struct {
2419
Version string `json:"jsonrpc,omitempty"`
2520
ID json.RawMessage `json:"id,omitempty"`
@@ -233,48 +228,28 @@ func NewBatchMessage(msgs []JsonrpcMessage) (JsonrpcBatchMessage, error) {
233228
}
234229

235230
// returns if error exists and
236-
// Behavior controlled by BatchNodeErrorOnAny flag:
237-
// - false (default): batch is an error only if ALL sub-requests failed
238-
// - true: batch is an error if ANY sub-request failed (strict mode)
239231
func CheckResponseErrorForJsonRpcBatch(data []byte, httpStatusCode int) (hasError bool, errorMessage string) {
240-
// Use a temporary struct that checks for both error and result presence
232+
// Use a temporary struct that omits the Result field to avoid allocating memory for large results
241233
var result []struct {
242-
Error *rpcclient.JsonError `json:"error,omitempty"`
243-
Result json.RawMessage `json:"result,omitempty"`
234+
Error *rpcclient.JsonError `json:"error,omitempty"`
244235
}
245236
err := json.Unmarshal(data, &result)
246237
if err != nil {
247238
utils.LavaFormatWarning("Failed unmarshalling CheckError", err, utils.LogAttr("data", string(data)))
248239
return false, ""
249240
}
250-
251-
hasAnySuccess := false
252-
hasAnyError := false
253-
aggregatedErrors := ""
254-
255-
for _, batchResult := range result {
256-
if batchResult.Error == nil && len(batchResult.Result) > 0 {
257-
hasAnySuccess = true
241+
aggregatedResults := ""
242+
numberOfBatchElements := len(result)
243+
for idx, batchResult := range result {
244+
if batchResult.Error == nil {
245+
continue
258246
}
259-
if batchResult.Error != nil && batchResult.Error.Message != "" {
260-
hasAnyError = true
261-
if aggregatedErrors != "" {
262-
aggregatedErrors += ",-," // add a unique comma separator between results
247+
if batchResult.Error.Message != "" {
248+
aggregatedResults += batchResult.Error.Message
249+
if idx < numberOfBatchElements-1 {
250+
aggregatedResults += ",-," // add a unique comma separator between results
263251
}
264-
aggregatedErrors += batchResult.Error.Message
265252
}
266253
}
267-
268-
// BatchNodeErrorOnAny=true (strict): error if ANY sub-request failed
269-
// BatchNodeErrorOnAny=false (default): error only if ALL sub-requests failed
270-
if BatchNodeErrorOnAny {
271-
// Strict mode: any error means batch failed
272-
return hasAnyError, aggregatedErrors
273-
}
274-
275-
// Default mode: only error if no successes at all
276-
if hasAnySuccess {
277-
return false, ""
278-
}
279-
return aggregatedErrors != "", aggregatedErrors
254+
return aggregatedResults != "", aggregatedResults
280255
}

protocol/chainlib/chainproxy/rpcInterfaceMessages/jsonRPCMessage_test.go

Lines changed: 0 additions & 132 deletions
Original file line numberDiff line numberDiff line change
@@ -151,135 +151,3 @@ func TestParseJsonRPCBatch(t *testing.T) {
151151
}
152152
}
153153
}
154-
155-
func TestCheckResponseErrorForJsonRpcBatch(t *testing.T) {
156-
t.Run("all_success_no_error", func(t *testing.T) {
157-
// All sub-requests succeeded
158-
data := []byte(`[
159-
{"jsonrpc":"2.0","id":1,"result":{"blockNumber":"0x123"}},
160-
{"jsonrpc":"2.0","id":2,"result":{"blockNumber":"0x124"}},
161-
{"jsonrpc":"2.0","id":3,"result":{"blockNumber":"0x125"}}
162-
]`)
163-
hasError, errorMsg := CheckResponseErrorForJsonRpcBatch(data, 200)
164-
require.False(t, hasError, "All success should not have error")
165-
require.Empty(t, errorMsg)
166-
})
167-
168-
t.Run("all_errors_should_return_error", func(t *testing.T) {
169-
// All sub-requests failed
170-
data := []byte(`[
171-
{"jsonrpc":"2.0","id":1,"error":{"code":-32000,"message":"Slot was skipped"}},
172-
{"jsonrpc":"2.0","id":2,"error":{"code":-32000,"message":"Slot was skipped"}},
173-
{"jsonrpc":"2.0","id":3,"error":{"code":-32000,"message":"Slot was skipped"}}
174-
]`)
175-
hasError, errorMsg := CheckResponseErrorForJsonRpcBatch(data, 200)
176-
require.True(t, hasError, "All errors should return error")
177-
require.Contains(t, errorMsg, "Slot was skipped")
178-
})
179-
180-
t.Run("partial_success_should_not_error", func(t *testing.T) {
181-
// Some sub-requests succeeded, some failed - should NOT be considered an error
182-
data := []byte(`[
183-
{"jsonrpc":"2.0","id":1,"result":{"blockNumber":"0x123"}},
184-
{"jsonrpc":"2.0","id":2,"error":{"code":-32000,"message":"Slot was skipped"}},
185-
{"jsonrpc":"2.0","id":3,"result":{"blockNumber":"0x125"}}
186-
]`)
187-
hasError, errorMsg := CheckResponseErrorForJsonRpcBatch(data, 200)
188-
require.False(t, hasError, "Partial success should not be considered an error")
189-
require.Empty(t, errorMsg)
190-
})
191-
192-
t.Run("single_success_among_errors_should_not_error", func(t *testing.T) {
193-
// Only one success among multiple errors - should NOT be considered an error
194-
data := []byte(`[
195-
{"jsonrpc":"2.0","id":1,"error":{"code":-32000,"message":"Slot was skipped"}},
196-
{"jsonrpc":"2.0","id":2,"error":{"code":-32000,"message":"Slot was skipped"}},
197-
{"jsonrpc":"2.0","id":3,"result":{"blockNumber":"0x125"}}
198-
]`)
199-
hasError, errorMsg := CheckResponseErrorForJsonRpcBatch(data, 200)
200-
require.False(t, hasError, "Single success should prevent error classification")
201-
require.Empty(t, errorMsg)
202-
})
203-
204-
t.Run("empty_batch_no_error", func(t *testing.T) {
205-
data := []byte(`[]`)
206-
hasError, errorMsg := CheckResponseErrorForJsonRpcBatch(data, 200)
207-
require.False(t, hasError)
208-
require.Empty(t, errorMsg)
209-
})
210-
211-
t.Run("invalid_json_no_error", func(t *testing.T) {
212-
data := []byte(`not valid json`)
213-
hasError, errorMsg := CheckResponseErrorForJsonRpcBatch(data, 200)
214-
require.False(t, hasError, "Invalid JSON should not return error (fail-open)")
215-
require.Empty(t, errorMsg)
216-
})
217-
218-
t.Run("null_result_with_no_error_is_success", func(t *testing.T) {
219-
// null result without error is still a valid response (API returned successfully)
220-
data := []byte(`[
221-
{"jsonrpc":"2.0","id":1,"result":null},
222-
{"jsonrpc":"2.0","id":2,"error":{"code":-32000,"message":"Some error"}}
223-
]`)
224-
// null result is a valid JSON-RPC response - it means the method executed successfully
225-
// and returned null. This counts as a success.
226-
hasError, _ := CheckResponseErrorForJsonRpcBatch(data, 200)
227-
require.False(t, hasError, "null result is a valid success response")
228-
})
229-
230-
t.Run("real_world_solana_batch_partial_error", func(t *testing.T) {
231-
// Real-world scenario: Solana batch request where some slots are skipped
232-
data := []byte(`[
233-
{"jsonrpc":"2.0","id":1,"result":{"slot":123,"blockhash":"abc"}},
234-
{"jsonrpc":"2.0","id":2,"error":{"code":-32007,"message":"Slot 124 was skipped, or missing in long-term storage"}},
235-
{"jsonrpc":"2.0","id":3,"result":{"slot":125,"blockhash":"def"}}
236-
]`)
237-
hasError, errorMsg := CheckResponseErrorForJsonRpcBatch(data, 200)
238-
require.False(t, hasError, "Solana batch with partial skipped slots should not trigger retry")
239-
require.Empty(t, errorMsg)
240-
})
241-
}
242-
243-
func TestCheckResponseErrorForJsonRpcBatch_StrictMode(t *testing.T) {
244-
// Save and restore original flag value
245-
originalValue := BatchNodeErrorOnAny
246-
defer func() { BatchNodeErrorOnAny = originalValue }()
247-
248-
t.Run("strict_mode_partial_success_is_error", func(t *testing.T) {
249-
BatchNodeErrorOnAny = true // Enable strict mode
250-
251-
data := []byte(`[
252-
{"jsonrpc":"2.0","id":1,"result":{"blockNumber":"0x123"}},
253-
{"jsonrpc":"2.0","id":2,"error":{"code":-32000,"message":"Slot was skipped"}},
254-
{"jsonrpc":"2.0","id":3,"result":{"blockNumber":"0x125"}}
255-
]`)
256-
hasError, errorMsg := CheckResponseErrorForJsonRpcBatch(data, 200)
257-
require.True(t, hasError, "Strict mode: partial success should be an error")
258-
require.Contains(t, errorMsg, "Slot was skipped")
259-
})
260-
261-
t.Run("strict_mode_all_success_no_error", func(t *testing.T) {
262-
BatchNodeErrorOnAny = true // Enable strict mode
263-
264-
data := []byte(`[
265-
{"jsonrpc":"2.0","id":1,"result":{"blockNumber":"0x123"}},
266-
{"jsonrpc":"2.0","id":2,"result":{"blockNumber":"0x124"}},
267-
{"jsonrpc":"2.0","id":3,"result":{"blockNumber":"0x125"}}
268-
]`)
269-
hasError, errorMsg := CheckResponseErrorForJsonRpcBatch(data, 200)
270-
require.False(t, hasError, "Strict mode: all success should not have error")
271-
require.Empty(t, errorMsg)
272-
})
273-
274-
t.Run("default_mode_partial_success_no_error", func(t *testing.T) {
275-
BatchNodeErrorOnAny = false // Default mode
276-
277-
data := []byte(`[
278-
{"jsonrpc":"2.0","id":1,"result":{"blockNumber":"0x123"}},
279-
{"jsonrpc":"2.0","id":2,"error":{"code":-32000,"message":"Slot was skipped"}},
280-
{"jsonrpc":"2.0","id":3,"result":{"blockNumber":"0x125"}}
281-
]`)
282-
hasError, _ := CheckResponseErrorForJsonRpcBatch(data, 200)
283-
require.False(t, hasError, "Default mode: partial success should not be an error")
284-
})
285-
}

protocol/common/cobra_common.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,8 @@ const (
3030
SharedStateFlag = "shared-state"
3131
// Disable relay retries when we get node errors.
3232
// This feature is suppose to help with successful relays in some chains that return node errors on rare race conditions on the serviced chains.
33-
SetRelayCountOnNodeErrorFlag = "set-retry-count-on-node-error"
33+
SetRelayCountOnNodeErrorFlag = "set-retry-count-on-node-error"
34+
SetRelayCountOnProtocolErrorFlag = "set-retry-count-on-protocol-error"
3435
UseStaticSpecFlag = "use-static-spec" // allows the user to manually load a spec providing a path, this is useful to test spec changes before they hit the blockchain
3536
GitHubTokenFlag = "github-token" // GitHub personal access token for accessing private repositories and higher API rate limits
3637
EpochDurationFlag = "epoch-duration" // duration of each epoch for time-based epoch system (standalone mode)

protocol/relaycore/interfaces.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ type RelayStateMachine interface {
2525
// ResultsCheckerInf interface for checking results
2626
type ResultsCheckerInf interface {
2727
WaitForResults(ctx context.Context) error
28-
HasRequiredNodeResults(tries int) (bool, int)
28+
HasRequiredNodeResults(tries int) (bool, int, int) // returns (done, nodeErrors, protocolErrors)
2929
GetQuorumParams() common.QuorumParams
3030
}
3131

protocol/relaycore/relay_processor.go

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -251,9 +251,9 @@ func (rp *RelayProcessor) shouldRetryRelay(resultsCount int, hashErr error, node
251251
return false
252252
}
253253

254-
func (rp *RelayProcessor) HasRequiredNodeResults(tries int) (bool, int) {
254+
func (rp *RelayProcessor) HasRequiredNodeResults(tries int) (bool, int, int) {
255255
if rp == nil {
256-
return false, 0
256+
return false, 0, 0
257257
}
258258
rp.lock.RLock()
259259
defer rp.lock.RUnlock()
@@ -293,7 +293,7 @@ func (rp *RelayProcessor) HasRequiredNodeResults(tries int) (bool, int) {
293293
utils.LogAttr("currentQourumEqualResults", rp.currentQourumEqualResults),
294294
)
295295
}
296-
return true, nodeErrors
296+
return true, nodeErrors, protocolErrors
297297
}
298298
if rp.selection == Quorum {
299299
// We need a quorum of all node results
@@ -319,7 +319,14 @@ func (rp *RelayProcessor) HasRequiredNodeResults(tries int) (bool, int) {
319319
}
320320
} else {
321321
// Quorum feature disabled: check if we have enough results for quorum
322-
retryForQuorumNeeded = !(resultsCount >= neededForQuorum)
322+
// When RelayCountOnNodeError is 0, treat node errors as "results" for quorum purposes
323+
// This ensures that when retries are disabled, we don't retry to replace node errors
324+
if RelayCountOnNodeError == 0 {
325+
totalNodeResults := resultsCount + nodeErrors
326+
retryForQuorumNeeded = !(totalNodeResults >= neededForQuorum)
327+
} else {
328+
retryForQuorumNeeded = !(resultsCount >= neededForQuorum)
329+
}
323330
}
324331

325332
if !retryForQuorumNeeded {
@@ -338,7 +345,7 @@ func (rp *RelayProcessor) HasRequiredNodeResults(tries int) (bool, int) {
338345
utils.LogAttr("currentQourumEqualResults", rp.currentQourumEqualResults),
339346
)
340347
}
341-
return !shouldRetry, nodeErrors
348+
return !shouldRetry, nodeErrors, protocolErrors
342349
}
343350
}
344351
// on BestResult we want to retry if there is no success
@@ -354,7 +361,7 @@ func (rp *RelayProcessor) HasRequiredNodeResults(tries int) (bool, int) {
354361
utils.LogAttr("currentQourumEqualResults", rp.currentQourumEqualResults),
355362
)
356363
}
357-
return false, nodeErrors
364+
return false, nodeErrors, protocolErrors
358365
}
359366

360367
func (rp *RelayProcessor) handleResponse(response *RelayResponse) {

0 commit comments

Comments
 (0)