Skip to content

Commit 70a8358

Browse files
committed
Switch to use an interceptor
1 parent 138e285 commit 70a8358

File tree

3 files changed

+150
-27
lines changed

3 files changed

+150
-27
lines changed

bootstrap/bootstrap.go

Lines changed: 63 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ import (
88
"time"
99

1010
pebbleDB "github.com/cockroachdb/pebble"
11-
"github.com/onflow/flow-evm-gateway/metrics"
1211
"github.com/onflow/flow-go-sdk/access"
1312
"github.com/onflow/flow-go-sdk/access/grpc"
1413
"github.com/onflow/flow-go/fvm/environment"
@@ -21,9 +20,12 @@ import (
2120
"github.com/rs/zerolog"
2221
"github.com/sethvargo/go-limiter/memorystore"
2322
grpcOpts "google.golang.org/grpc"
23+
"google.golang.org/grpc/codes"
24+
"google.golang.org/grpc/status"
2425

2526
"github.com/onflow/flow-evm-gateway/api"
2627
"github.com/onflow/flow-evm-gateway/config"
28+
"github.com/onflow/flow-evm-gateway/metrics"
2729
"github.com/onflow/flow-evm-gateway/models"
2830
errs "github.com/onflow/flow-evm-gateway/models/errors"
2931
"github.com/onflow/flow-evm-gateway/services/ingestion"
@@ -33,6 +35,19 @@ import (
3335
"github.com/onflow/flow-evm-gateway/storage/pebble"
3436
)
3537

38+
const (
39+
// DefaultMaxMessageSize is the default maximum message size for gRPC responses
40+
DefaultMaxMessageSize = 1024 * 1024 * 1024
41+
42+
// DefaultResourceExhaustedRetryDelay is the default delay between retries when the server returns
43+
// a ResourceExhausted error.
44+
DefaultResourceExhaustedRetryDelay = 100 * time.Millisecond
45+
46+
// DefaultResourceExhaustedMaxRetryDelay is the default max request duration when retrying server
47+
// ResourceExhausted errors.
48+
DefaultResourceExhaustedMaxRetryDelay = 30 * time.Second
49+
)
50+
3651
type Storages struct {
3752
Storage *pebble.Storage
3853
Registers *pebble.RegisterStorage
@@ -452,7 +467,13 @@ func setupCrossSporkClient(config config.Config, logger zerolog.Logger) (*reques
452467
// create access client with cross-spork capabilities
453468
currentSporkClient, err := grpc.NewClient(
454469
config.AccessNodeHost,
455-
grpc.WithGRPCDialOptions(grpcOpts.WithDefaultCallOptions(grpcOpts.MaxCallRecvMsgSize(1024*1024*1024))),
470+
grpc.WithGRPCDialOptions(
471+
grpcOpts.WithDefaultCallOptions(grpcOpts.MaxCallRecvMsgSize(DefaultMaxMessageSize)),
472+
grpcOpts.WithUnaryInterceptor(retryInterceptor(
473+
DefaultResourceExhaustedMaxRetryDelay,
474+
DefaultResourceExhaustedRetryDelay,
475+
)),
476+
),
456477
)
457478
if err != nil {
458479
return nil, fmt.Errorf(
@@ -487,6 +508,44 @@ func setupCrossSporkClient(config config.Config, logger zerolog.Logger) (*reques
487508
return client, nil
488509
}
489510

511+
// retryInterceptor is a gRPC client interceptor that retries the request when the server returns
512+
// a ResourceExhausted error
513+
func retryInterceptor(maxDuration, pauseDuration time.Duration) grpcOpts.UnaryClientInterceptor {
514+
return func(
515+
ctx context.Context,
516+
method string,
517+
req, reply interface{},
518+
cc *grpcOpts.ClientConn,
519+
invoker grpcOpts.UnaryInvoker,
520+
opts ...grpcOpts.CallOption,
521+
) error {
522+
start := time.Now()
523+
attempts := 0
524+
for {
525+
err := invoker(ctx, method, req, reply, cc, opts...)
526+
if err == nil {
527+
return nil
528+
}
529+
530+
if status.Code(err) != codes.ResourceExhausted {
531+
return err
532+
}
533+
534+
attempts++
535+
duration := time.Since(start)
536+
if duration >= maxDuration {
537+
return fmt.Errorf("request failed (attempts: %d, duration: %v): %w", attempts, duration, err)
538+
}
539+
540+
select {
541+
case <-ctx.Done():
542+
return ctx.Err()
543+
case <-time.After(pauseDuration):
544+
}
545+
}
546+
}
547+
}
548+
490549
// setupStorage creates storage and initializes it with configured starting cadence height
491550
// in case such a height doesn't already exist in the database.
492551
func setupStorage(
@@ -570,9 +629,9 @@ func setupStorage(
570629
Stringer("fvm_address_for_evm_storage_account", storageAddress).
571630
Msgf("database initialized with cadence height: %d", cadenceHeight)
572631
}
573-
//else {
632+
// else {
574633
// // TODO(JanezP): verify storage account owner is correct
575-
//}
634+
// }
576635

577636
return db, &Storages{
578637
Storage: store,

bootstrap/bootstrap_test.go

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
package bootstrap
2+
3+
import (
4+
"context"
5+
"testing"
6+
"time"
7+
8+
"github.com/stretchr/testify/assert"
9+
"google.golang.org/grpc"
10+
"google.golang.org/grpc/codes"
11+
"google.golang.org/grpc/status"
12+
)
13+
14+
func TestRetryInterceptor(t *testing.T) {
15+
expecterErr := status.Error(codes.ResourceExhausted, "resource exhausted")
16+
interceptor := retryInterceptor(100*time.Millisecond, 10*time.Millisecond)
17+
18+
testCases := []struct {
19+
name string
20+
invoker func(callCount int) error
21+
maxRequestTime time.Duration
22+
callCount int // expect exact count
23+
minCallCount int // min, for when using a timeout
24+
expectedErr error
25+
}{
26+
{
27+
name: "no error",
28+
invoker: func(callCount int) error {
29+
return nil
30+
},
31+
maxRequestTime: 10 * time.Millisecond,
32+
callCount: 1,
33+
expectedErr: nil,
34+
},
35+
{
36+
name: "succeeds on 3rd attempt",
37+
invoker: func(callCount int) error {
38+
if callCount >= 3 {
39+
return nil
40+
}
41+
return expecterErr
42+
},
43+
maxRequestTime: 40 * time.Millisecond,
44+
callCount: 3,
45+
expectedErr: nil,
46+
},
47+
{
48+
name: "fails after timeout",
49+
invoker: func(callCount int) error {
50+
return expecterErr
51+
},
52+
maxRequestTime: 150 * time.Millisecond, // add a buffer for test slowness
53+
minCallCount: 10,
54+
expectedErr: expecterErr,
55+
},
56+
}
57+
58+
for _, tc := range testCases {
59+
t.Run(tc.name, func(t *testing.T) {
60+
t.Parallel()
61+
62+
callCount := 0
63+
invoker := func(context.Context, string, any, any, *grpc.ClientConn, ...grpc.CallOption) error {
64+
callCount++
65+
return tc.invoker(callCount)
66+
}
67+
68+
start := time.Now()
69+
err := interceptor(
70+
context.Background(), "", nil, nil, nil,
71+
invoker,
72+
)
73+
if tc.expectedErr != nil {
74+
assert.ErrorIs(t, err, tc.expectedErr)
75+
} else {
76+
assert.NoError(t, err)
77+
}
78+
79+
if tc.minCallCount > 0 {
80+
assert.GreaterOrEqual(t, callCount, tc.minCallCount)
81+
} else {
82+
assert.Equal(t, callCount, tc.callCount)
83+
}
84+
assert.LessOrEqual(t, time.Since(start), tc.maxRequestTime)
85+
})
86+
}
87+
}

services/ingestion/event_subscriber.go

Lines changed: 0 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,9 @@ import (
55
"errors"
66
"fmt"
77
"sort"
8-
"time"
98

109
"github.com/onflow/cadence/common"
1110
"github.com/onflow/flow-go/fvm/evm/events"
12-
"google.golang.org/grpc/codes"
13-
"google.golang.org/grpc/status"
1411

1512
"github.com/onflow/flow-evm-gateway/models"
1613
errs "github.com/onflow/flow-evm-gateway/models/errors"
@@ -281,21 +278,11 @@ func (r *RPCEventSubscriber) backfillSporkFromHeight(ctx context.Context, fromCa
281278

282279
blocks, err := r.client.GetEventsForHeightRange(ctx, blockExecutedEvent, startHeight, endHeight)
283280
if err != nil {
284-
// if we are rate limited by the AN, wait a bit and try again
285-
if status.Code(err) == codes.ResourceExhausted {
286-
time.Sleep(100 * time.Millisecond)
287-
continue
288-
}
289281
return 0, fmt.Errorf("failed to get block events: %w", err)
290282
}
291283

292284
transactions, err := r.client.GetEventsForHeightRange(ctx, transactionExecutedEvent, startHeight, endHeight)
293285
if err != nil {
294-
// if we are rate limited by the AN, wait a bit and try again
295-
if status.Code(err) == codes.ResourceExhausted {
296-
time.Sleep(100 * time.Millisecond)
297-
continue
298-
}
299286
return 0, fmt.Errorf("failed to get block events: %w", err)
300287
}
301288

@@ -366,11 +353,6 @@ func (r *RPCEventSubscriber) accumulateBlockEvents(
366353
currentHeight+maxRangeForGetEvents,
367354
)
368355
if err != nil {
369-
// if we are rate limited by the AN, wait a bit and try again
370-
if status.Code(err) == codes.ResourceExhausted {
371-
time.Sleep(100 * time.Millisecond)
372-
continue
373-
}
374356
return models.BlockEvents{}, fmt.Errorf("failed to get block events: %w", err)
375357
}
376358

@@ -381,11 +363,6 @@ func (r *RPCEventSubscriber) accumulateBlockEvents(
381363
currentHeight+maxRangeForGetEvents,
382364
)
383365
if err != nil {
384-
// if we are rate limited by the AN, wait a bit and try again
385-
if status.Code(err) == codes.ResourceExhausted {
386-
time.Sleep(100 * time.Millisecond)
387-
continue
388-
}
389366
return models.BlockEvents{}, fmt.Errorf("failed to get block events: %w", err)
390367
}
391368

0 commit comments

Comments
 (0)