Skip to content

Commit 3b24cb7

Browse files
author
Shawn Reuland
committed
#5571: fixed rpc ledger backend to retry requests if request duration limit errors are returned by rpc
1 parent ef10cd0 commit 3b24cb7

File tree

2 files changed

+81
-0
lines changed

2 files changed

+81
-0
lines changed

ingest/ledgerbackend/rpc_backend.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,16 @@ import (
99
"sync/atomic"
1010
"time"
1111

12+
"github.com/creachadair/jrpc2"
1213
"github.com/stellar/go/xdr"
1314
rpc "github.com/stellar/stellar-rpc/client"
1415
"github.com/stellar/stellar-rpc/protocol"
1516
)
1617

1718
const rpcBackendDefaultBufferSize uint32 = 10
1819
const rpcBackendDefaultWaitIntervalSeconds uint32 = 2
20+
const rpcErrRequestExceededProcessingLimitThreshold = -32001
21+
const rpcErrFailToProcessDueToInternalIssue = -32003
1922

2023
type RPCLedgerMissingError struct {
2124
Sequence uint32
@@ -250,6 +253,16 @@ func (b *RPCLedgerBackend) getBufferedLedger(ctx context.Context, sequence uint3
250253

251254
ledgers, err := b.client.GetLedgers(ctx, req)
252255
if err != nil {
256+
var rpcErr jrpc2.Error
257+
if errors.As(err, &rpcErr) &&
258+
// rpc request duration limits may trigger these errors, it is acceptable to retry
259+
(rpcErr.Code == rpcErrRequestExceededProcessingLimitThreshold ||
260+
rpcErr.Code == rpcErrFailToProcessDueToInternalIssue) {
261+
return xdr.LedgerCloseMeta{}, &RPCLedgerBeyondLatestError{
262+
Sequence: sequence,
263+
LatestLedger: 0,
264+
}
265+
}
253266
return xdr.LedgerCloseMeta{}, fmt.Errorf("failed to get ledgers starting from %d: %w", sequence, err)
254267
}
255268

ingest/ledgerbackend/rpc_backend_test.go

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"testing"
77
"time"
88

9+
"github.com/creachadair/jrpc2"
910
"github.com/stretchr/testify/assert"
1011
"github.com/stretchr/testify/mock"
1112

@@ -210,6 +211,73 @@ func TestGetLedgerBeyondLatest(t *testing.T) {
210211

211212
}
212213

214+
func TestGetLedgerBeyondLatestBasedOnDurationLimitErr(t *testing.T) {
215+
rpcBackend, mockClient := setupRPCTest(t)
216+
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
217+
defer cancel()
218+
requestedSequence := uint32(100)
219+
220+
rpcGetLedgersRequest := protocol.GetLedgersRequest{
221+
StartLedger: requestedSequence,
222+
Pagination: &protocol.LedgerPaginationOptions{
223+
Limit: uint(rpcBackendDefaultBufferSize),
224+
},
225+
}
226+
// Setup err responses indicating rpc request elapsed beyond request duration
227+
// triggering specific error codes related to request duration limits which rpc may emit
228+
exceededProcessingLimitThreshold := jrpc2.Error{
229+
Code: -32001,
230+
}
231+
232+
failToProcessDueToInternalIssue := jrpc2.Error{
233+
Code: -32003,
234+
}
235+
236+
// called by PrepareRange
237+
mockClient.On("GetLedgers", ctx, rpcGetLedgersRequest).Return(protocol.GetLedgersResponse{}, exceededProcessingLimitThreshold).Once()
238+
// called by GetLedger first time
239+
mockClient.On("GetLedgers", ctx, rpcGetLedgersRequest).Return(protocol.GetLedgersResponse{}, failToProcessDueToInternalIssue).Once()
240+
241+
// Setup second call to return the requested ledger
242+
lcm := xdr.LedgerCloseMeta{
243+
V: 0,
244+
V0: &xdr.LedgerCloseMetaV0{
245+
LedgerHeader: xdr.LedgerHeaderHistoryEntry{
246+
Header: xdr.LedgerHeader{
247+
LedgerSeq: xdr.Uint32(requestedSequence),
248+
},
249+
},
250+
},
251+
}
252+
encodedLCM, err := xdr.MarshalBase64(lcm)
253+
assert.NoError(t, err)
254+
255+
secondResponse := protocol.GetLedgersResponse{
256+
LatestLedger: requestedSequence,
257+
Ledgers: []protocol.LedgerInfo{
258+
{
259+
Sequence: requestedSequence,
260+
LedgerMetadata: encodedLCM,
261+
},
262+
},
263+
}
264+
// called by GetLedger second time
265+
mockClient.On("GetLedgers", ctx, rpcGetLedgersRequest).Return(secondResponse, nil).Once()
266+
267+
preparedRange := Range{from: requestedSequence, to: requestedSequence + 10, bounded: true}
268+
rpcBackend.PrepareRange(ctx, preparedRange)
269+
270+
startTime := time.Now()
271+
actualLCM, err := rpcBackend.GetLedger(ctx, requestedSequence)
272+
duration := time.Since(startTime)
273+
274+
assert.NoError(t, err)
275+
assert.Equal(t, requestedSequence, uint32(actualLCM.V0.LedgerHeader.Header.LedgerSeq))
276+
277+
// Verify timing - GetLedger should have waited one interval and then refetched ledgers from rpc on second call
278+
assert.GreaterOrEqual(t, duration.Seconds(), float64(rpcBackendDefaultWaitIntervalSeconds))
279+
}
280+
213281
func TestGetLedgerContextTimeout(t *testing.T) {
214282
rpcBackend, mockClient := setupRPCTest(t)
215283
sequence := uint32(100)

0 commit comments

Comments
 (0)