Skip to content

Commit 96eff65

Browse files
authored
Merge pull request #856 from onflow/mpeter/event-subscriber-add-reconnect-logic
Add reconnect logic to `RPCEventSubscriber`
2 parents b7aecfe + 29b5098 commit 96eff65

File tree

1 file changed

+56
-12
lines changed

1 file changed

+56
-12
lines changed

services/ingestion/event_subscriber.go

Lines changed: 56 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,12 @@ import (
55
"errors"
66
"fmt"
77
"sort"
8+
"time"
89

910
"github.com/onflow/cadence/common"
1011
"github.com/onflow/flow-go/fvm/evm/events"
12+
"google.golang.org/grpc/codes"
13+
"google.golang.org/grpc/status"
1114

1215
"github.com/onflow/flow-evm-gateway/models"
1316
errs "github.com/onflow/flow-evm-gateway/models/errors"
@@ -119,26 +122,42 @@ func (r *RPCEventSubscriber) Subscribe(ctx context.Context) <-chan models.BlockE
119122
// Subscribing to EVM specific events and handle any disconnection errors
120123
// as well as context cancellations.
121124
func (r *RPCEventSubscriber) subscribe(ctx context.Context, height uint64) <-chan models.BlockEvents {
122-
eventsChan := make(chan models.BlockEvents)
125+
// create the channel with a buffer size of 1,
126+
// to avoid blocking on the two error cases below
127+
eventsChan := make(chan models.BlockEvents, 1)
123128

124129
_, err := r.client.GetBlockHeaderByHeight(ctx, height)
125130
if err != nil {
126131
err = fmt.Errorf("failed to subscribe for events, the block height %d doesn't exist: %w", height, err)
127132
eventsChan <- models.NewBlockEventsError(err)
133+
close(eventsChan)
128134
return eventsChan
129135
}
130136

131-
// we always use heartbeat interval of 1 to have the least amount of delay from the access node
132-
eventStream, errChan, err := r.client.SubscribeEventsByBlockHeight(
133-
ctx,
134-
height,
135-
blocksFilter(r.chain),
136-
access.WithHeartbeatInterval(1),
137-
)
138-
if err != nil {
137+
var blockEventsStream <-chan flow.BlockEvents
138+
var errChan <-chan error
139+
140+
lastReceivedHeight := height
141+
connect := func(height uint64) error {
142+
var err error
143+
144+
// we always use heartbeat interval of 1 to have the
145+
// least amount of delay from the access node
146+
blockEventsStream, errChan, err = r.client.SubscribeEventsByBlockHeight(
147+
ctx,
148+
height,
149+
blocksFilter(r.chain),
150+
access.WithHeartbeatInterval(1),
151+
)
152+
153+
return err
154+
}
155+
156+
if err := connect(lastReceivedHeight); err != nil {
139157
eventsChan <- models.NewBlockEventsError(
140158
fmt.Errorf("failed to subscribe to events by block height: %d, with: %w", height, err),
141159
)
160+
close(eventsChan)
142161
return eventsChan
143162
}
144163

@@ -153,8 +172,9 @@ func (r *RPCEventSubscriber) subscribe(ctx context.Context, height uint64) <-cha
153172
r.logger.Info().Msg("event ingestion received done signal")
154173
return
155174

156-
case blockEvents, ok := <-eventStream:
175+
case blockEvents, ok := <-blockEventsStream:
157176
if !ok {
177+
// typically we receive an error in the errChan before the channels are closed
158178
var err error
159179
err = errs.ErrDisconnected
160180
if ctx.Err() != nil {
@@ -183,10 +203,13 @@ func (r *RPCEventSubscriber) subscribe(ctx context.Context, height uint64) <-cha
183203
},
184204
)
185205

206+
lastReceivedHeight = blockEvents.Height
207+
186208
eventsChan <- evmEvents
187209

188210
case err, ok := <-errChan:
189211
if !ok {
212+
// typically we receive an error in the errChan before the channels are closed
190213
var err error
191214
err = errs.ErrDisconnected
192215
if ctx.Err() != nil {
@@ -196,8 +219,29 @@ func (r *RPCEventSubscriber) subscribe(ctx context.Context, height uint64) <-cha
196219
return
197220
}
198221

199-
eventsChan <- models.NewBlockEventsError(fmt.Errorf("%w: %w", errs.ErrDisconnected, err))
200-
return
222+
switch status.Code(err) {
223+
case codes.NotFound:
224+
// we can get not found when reconnecting after a disconnect/restart before the
225+
// next block is finalized. just wait briefly and try again
226+
time.Sleep(200 * time.Millisecond)
227+
case codes.DeadlineExceeded, codes.Internal:
228+
// these are sometimes returned when the stream is disconnected by a middleware or the server
229+
default:
230+
// skip reconnect on all other errors
231+
eventsChan <- models.NewBlockEventsError(fmt.Errorf("%w: %w", errs.ErrDisconnected, err))
232+
return
233+
}
234+
235+
if err := connect(lastReceivedHeight + 1); err != nil {
236+
eventsChan <- models.NewBlockEventsError(
237+
fmt.Errorf(
238+
"failed to resubscribe for events on height: %d, with: %w",
239+
lastReceivedHeight+1,
240+
err,
241+
),
242+
)
243+
return
244+
}
201245
}
202246
}
203247
}()

0 commit comments

Comments
 (0)