Skip to content

Commit fe7eabf

Browse files
committed
Add reconnect logic to RPCEventSubscriber
1 parent b7aecfe commit fe7eabf

File tree

1 file changed

+52
-11
lines changed

1 file changed

+52
-11
lines changed

services/ingestion/event_subscriber.go

Lines changed: 52 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,12 @@ import (
55
"errors"
66
"fmt"
77
"sort"
8+
"time"
89

910
"github.com/onflow/cadence/common"
1011
"github.com/onflow/flow-go/fvm/evm/events"
12+
"google.golang.org/grpc/codes"
13+
"google.golang.org/grpc/status"
1114

1215
"github.com/onflow/flow-evm-gateway/models"
1316
errs "github.com/onflow/flow-evm-gateway/models/errors"
@@ -128,17 +131,30 @@ func (r *RPCEventSubscriber) subscribe(ctx context.Context, height uint64) <-cha
128131
return eventsChan
129132
}
130133

131-
// we always use heartbeat interval of 1 to have the least amount of delay from the access node
132-
eventStream, errChan, err := r.client.SubscribeEventsByBlockHeight(
133-
ctx,
134-
height,
135-
blocksFilter(r.chain),
136-
access.WithHeartbeatInterval(1),
137-
)
138-
if err != nil {
134+
var blockEventsStream <-chan flow.BlockEvents
135+
var errChan <-chan error
136+
137+
lastReceivedHeight := height
138+
connect := func(height uint64) error {
139+
var err error
140+
141+
// we always use heartbeat interval of 1 to have the
142+
// least amount of delay from the access node
143+
blockEventsStream, errChan, err = r.client.SubscribeEventsByBlockHeight(
144+
ctx,
145+
height,
146+
blocksFilter(r.chain),
147+
access.WithHeartbeatInterval(1),
148+
)
149+
150+
return err
151+
}
152+
153+
if err := connect(lastReceivedHeight); err != nil {
139154
eventsChan <- models.NewBlockEventsError(
140155
fmt.Errorf("failed to subscribe to events by block height: %d, with: %w", height, err),
141156
)
157+
close(eventsChan)
142158
return eventsChan
143159
}
144160

@@ -153,8 +169,9 @@ func (r *RPCEventSubscriber) subscribe(ctx context.Context, height uint64) <-cha
153169
r.logger.Info().Msg("event ingestion received done signal")
154170
return
155171

156-
case blockEvents, ok := <-eventStream:
172+
case blockEvents, ok := <-blockEventsStream:
157173
if !ok {
174+
// typically we receive an error in the errChan before the channels are closed
158175
var err error
159176
err = errs.ErrDisconnected
160177
if ctx.Err() != nil {
@@ -183,10 +200,13 @@ func (r *RPCEventSubscriber) subscribe(ctx context.Context, height uint64) <-cha
183200
},
184201
)
185202

203+
lastReceivedHeight = blockEvents.Height
204+
186205
eventsChan <- evmEvents
187206

188207
case err, ok := <-errChan:
189208
if !ok {
209+
// typically we receive an error in the errChan before the channels are closed
190210
var err error
191211
err = errs.ErrDisconnected
192212
if ctx.Err() != nil {
@@ -196,8 +216,29 @@ func (r *RPCEventSubscriber) subscribe(ctx context.Context, height uint64) <-cha
196216
return
197217
}
198218

199-
eventsChan <- models.NewBlockEventsError(fmt.Errorf("%w: %w", errs.ErrDisconnected, err))
200-
return
219+
switch status.Code(err) {
220+
case codes.NotFound:
221+
// we can get not found when reconnecting after a disconnect/restart before the
222+
// next block is finalized. just wait briefly and try again
223+
time.Sleep(200 * time.Millisecond)
224+
case codes.DeadlineExceeded, codes.Internal:
225+
// these are sometimes returned when the stream is disconnected by a middleware or the server
226+
default:
227+
// skip reconnect on all other errors
228+
eventsChan <- models.NewBlockEventsError(fmt.Errorf("%w: %w", errs.ErrDisconnected, err))
229+
return
230+
}
231+
232+
if err := connect(lastReceivedHeight + 1); err != nil {
233+
eventsChan <- models.NewBlockEventsError(
234+
fmt.Errorf(
235+
"failed to resubscribe for events on height: %d, with: %w",
236+
lastReceivedHeight+1,
237+
err,
238+
),
239+
)
240+
return
241+
}
201242
}
202243
}
203244
}()

0 commit comments

Comments
 (0)