Skip to content

Commit a9c1289

Browse files
authored
Merge pull request #686 from onflow/petera/handle-resourceexhaused-ingestion
Petera/handle resourceexhaused ingestion
2 parents 2829e9e + c27ad32 commit a9c1289

File tree

2 files changed

+151
-4
lines changed

2 files changed

+151
-4
lines changed

bootstrap/bootstrap.go

Lines changed: 63 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ import (
88
"time"
99

1010
pebbleDB "github.com/cockroachdb/pebble"
11-
"github.com/onflow/flow-evm-gateway/metrics"
1211
"github.com/onflow/flow-go-sdk/access"
1312
"github.com/onflow/flow-go-sdk/access/grpc"
1413
"github.com/onflow/flow-go/fvm/environment"
@@ -21,9 +20,12 @@ import (
2120
"github.com/rs/zerolog"
2221
"github.com/sethvargo/go-limiter/memorystore"
2322
grpcOpts "google.golang.org/grpc"
23+
"google.golang.org/grpc/codes"
24+
"google.golang.org/grpc/status"
2425

2526
"github.com/onflow/flow-evm-gateway/api"
2627
"github.com/onflow/flow-evm-gateway/config"
28+
"github.com/onflow/flow-evm-gateway/metrics"
2729
"github.com/onflow/flow-evm-gateway/models"
2830
errs "github.com/onflow/flow-evm-gateway/models/errors"
2931
"github.com/onflow/flow-evm-gateway/services/ingestion"
@@ -33,6 +35,19 @@ import (
3335
"github.com/onflow/flow-evm-gateway/storage/pebble"
3436
)
3537

38+
const (
39+
// DefaultMaxMessageSize is the default maximum message size for gRPC responses
40+
DefaultMaxMessageSize = 1024 * 1024 * 1024
41+
42+
// DefaultResourceExhaustedRetryDelay is the default delay between retries when the server returns
43+
// a ResourceExhausted error.
44+
DefaultResourceExhaustedRetryDelay = 100 * time.Millisecond
45+
46+
// DefaultResourceExhaustedMaxRetryDelay is the default max request duration when retrying server
47+
// ResourceExhausted errors.
48+
DefaultResourceExhaustedMaxRetryDelay = 30 * time.Second
49+
)
50+
3651
type Storages struct {
3752
Storage *pebble.Storage
3853
Registers *pebble.RegisterStorage
@@ -452,7 +467,13 @@ func setupCrossSporkClient(config config.Config, logger zerolog.Logger) (*reques
452467
// create access client with cross-spork capabilities
453468
currentSporkClient, err := grpc.NewClient(
454469
config.AccessNodeHost,
455-
grpc.WithGRPCDialOptions(grpcOpts.WithDefaultCallOptions(grpcOpts.MaxCallRecvMsgSize(1024*1024*1024))),
470+
grpc.WithGRPCDialOptions(
471+
grpcOpts.WithDefaultCallOptions(grpcOpts.MaxCallRecvMsgSize(DefaultMaxMessageSize)),
472+
grpcOpts.WithUnaryInterceptor(retryInterceptor(
473+
DefaultResourceExhaustedMaxRetryDelay,
474+
DefaultResourceExhaustedRetryDelay,
475+
)),
476+
),
456477
)
457478
if err != nil {
458479
return nil, fmt.Errorf(
@@ -487,6 +508,44 @@ func setupCrossSporkClient(config config.Config, logger zerolog.Logger) (*reques
487508
return client, nil
488509
}
489510

511+
// retryInterceptor is a gRPC client interceptor that retries the request when the server returns
512+
// a ResourceExhausted error
513+
func retryInterceptor(maxDuration, pauseDuration time.Duration) grpcOpts.UnaryClientInterceptor {
514+
return func(
515+
ctx context.Context,
516+
method string,
517+
req, reply interface{},
518+
cc *grpcOpts.ClientConn,
519+
invoker grpcOpts.UnaryInvoker,
520+
opts ...grpcOpts.CallOption,
521+
) error {
522+
start := time.Now()
523+
attempts := 0
524+
for {
525+
err := invoker(ctx, method, req, reply, cc, opts...)
526+
if err == nil {
527+
return nil
528+
}
529+
530+
if status.Code(err) != codes.ResourceExhausted {
531+
return err
532+
}
533+
534+
attempts++
535+
duration := time.Since(start)
536+
if duration >= maxDuration {
537+
return fmt.Errorf("request failed (attempts: %d, duration: %v): %w", attempts, duration, err)
538+
}
539+
540+
select {
541+
case <-ctx.Done():
542+
return ctx.Err()
543+
case <-time.After(pauseDuration):
544+
}
545+
}
546+
}
547+
}
548+
490549
// setupStorage creates storage and initializes it with configured starting cadence height
491550
// in case such a height doesn't already exist in the database.
492551
func setupStorage(
@@ -570,9 +629,9 @@ func setupStorage(
570629
Stringer("fvm_address_for_evm_storage_account", storageAddress).
571630
Msgf("database initialized with cadence height: %d", cadenceHeight)
572631
}
573-
//else {
632+
// else {
574633
// // TODO(JanezP): verify storage account owner is correct
575-
//}
634+
// }
576635

577636
return db, &Storages{
578637
Storage: store,

bootstrap/bootstrap_test.go

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
package bootstrap
2+
3+
import (
4+
"context"
5+
"testing"
6+
"time"
7+
8+
"github.com/stretchr/testify/assert"
9+
"google.golang.org/grpc"
10+
"google.golang.org/grpc/codes"
11+
"google.golang.org/grpc/status"
12+
)
13+
14+
func TestRetryInterceptor(t *testing.T) {
15+
expecterErr := status.Error(codes.ResourceExhausted, "resource exhausted")
16+
interceptor := retryInterceptor(100*time.Millisecond, 10*time.Millisecond)
17+
18+
testCases := []struct {
19+
name string
20+
invoker func(callCount int) error
21+
maxRequestTime time.Duration
22+
callCount int // expect exact count
23+
minCallCount int // min, for when using a timeout
24+
expectedErr error
25+
}{
26+
{
27+
name: "no error",
28+
invoker: func(callCount int) error {
29+
return nil
30+
},
31+
maxRequestTime: 10 * time.Millisecond,
32+
callCount: 1,
33+
expectedErr: nil,
34+
},
35+
{
36+
name: "succeeds on 3rd attempt",
37+
invoker: func(callCount int) error {
38+
if callCount >= 3 {
39+
return nil
40+
}
41+
return expecterErr
42+
},
43+
maxRequestTime: 40 * time.Millisecond,
44+
callCount: 3,
45+
expectedErr: nil,
46+
},
47+
{
48+
name: "fails after timeout",
49+
invoker: func(callCount int) error {
50+
return expecterErr
51+
},
52+
maxRequestTime: 150 * time.Millisecond, // add a buffer for test slowness
53+
minCallCount: 10,
54+
expectedErr: expecterErr,
55+
},
56+
}
57+
58+
for _, tc := range testCases {
59+
tc := tc
60+
t.Run(tc.name, func(t *testing.T) {
61+
t.Parallel()
62+
63+
callCount := 0
64+
invoker := func(context.Context, string, any, any, *grpc.ClientConn, ...grpc.CallOption) error {
65+
callCount++
66+
return tc.invoker(callCount)
67+
}
68+
69+
start := time.Now()
70+
err := interceptor(
71+
context.Background(), "", nil, nil, nil,
72+
invoker,
73+
)
74+
if tc.expectedErr != nil {
75+
assert.ErrorIs(t, err, tc.expectedErr)
76+
} else {
77+
assert.NoError(t, err)
78+
}
79+
80+
if tc.minCallCount > 0 {
81+
assert.GreaterOrEqual(t, callCount, tc.minCallCount)
82+
} else {
83+
assert.Equal(t, callCount, tc.callCount)
84+
}
85+
assert.LessOrEqual(t, time.Since(start), tc.maxRequestTime)
86+
})
87+
}
88+
}

0 commit comments

Comments
 (0)