Skip to content

Commit 79a0de7

Browse files
committed
Handle resource exhaustion errors in the ingestion engine
1 parent 6d11b34 commit 79a0de7

File tree

1 file changed

+23
-0
lines changed

1 file changed

+23
-0
lines changed

services/ingestion/event_subscriber.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,12 @@ import (
55
"errors"
66
"fmt"
77
"sort"
8+
"time"
89

910
"github.com/onflow/cadence/common"
1011
"github.com/onflow/flow-go/fvm/evm/events"
12+
"google.golang.org/grpc/codes"
13+
"google.golang.org/grpc/status"
1114

1215
"github.com/onflow/flow-evm-gateway/models"
1316
errs "github.com/onflow/flow-evm-gateway/models/errors"
@@ -271,11 +274,21 @@ func (r *RPCEventSubscriber) backfillSporkFromHeight(ctx context.Context, fromCa
271274

272275
blocks, err := r.client.GetEventsForHeightRange(ctx, blockExecutedEvent, startHeight, endHeight)
273276
if err != nil {
277+
// if we are rate limited by the AN, wait a bit and try again
278+
if status.Code(err) == codes.ResourceExhausted {
279+
time.Sleep(100 * time.Millisecond)
280+
continue
281+
}
274282
return 0, fmt.Errorf("failed to get block events: %w", err)
275283
}
276284

277285
transactions, err := r.client.GetEventsForHeightRange(ctx, transactionExecutedEvent, startHeight, endHeight)
278286
if err != nil {
287+
// if we are rate limited by the AN, wait a bit and try again
288+
if status.Code(err) == codes.ResourceExhausted {
289+
time.Sleep(100 * time.Millisecond)
290+
continue
291+
}
279292
return 0, fmt.Errorf("failed to get block events: %w", err)
280293
}
281294

@@ -346,6 +359,11 @@ func (r *RPCEventSubscriber) accumulateBlockEvents(
346359
currentHeight+maxRangeForGetEvents,
347360
)
348361
if err != nil {
362+
// if we are rate limited by the AN, wait a bit and try again
363+
if status.Code(err) == codes.ResourceExhausted {
364+
time.Sleep(100 * time.Millisecond)
365+
continue
366+
}
349367
return models.BlockEvents{}, fmt.Errorf("failed to get block events: %w", err)
350368
}
351369

@@ -356,6 +374,11 @@ func (r *RPCEventSubscriber) accumulateBlockEvents(
356374
currentHeight+maxRangeForGetEvents,
357375
)
358376
if err != nil {
377+
// if we are rate limited by the AN, wait a bit and try again
378+
if status.Code(err) == codes.ResourceExhausted {
379+
time.Sleep(100 * time.Millisecond)
380+
continue
381+
}
359382
return models.BlockEvents{}, fmt.Errorf("failed to get block events: %w", err)
360383
}
361384

0 commit comments

Comments
 (0)