@@ -4,32 +4,26 @@ import (
44 "context"
55 "errors"
66 "fmt"
7- "sort"
87 "strings"
98
10- "github.com/onflow/cadence/common"
119 "github.com/onflow/flow-evm-gateway/models"
1210 errs "github.com/onflow/flow-evm-gateway/models/errors"
1311 "github.com/onflow/flow-evm-gateway/services/requester"
1412 "github.com/onflow/flow-go-sdk"
15- "github.com/onflow/flow-go/fvm/evm/events"
16- "github.com/onflow/flow-go/fvm/systemcontracts"
1713 flowGo "github.com/onflow/flow-go/model/flow"
1814 "github.com/rs/zerolog"
1915)
2016
2117var _ EventSubscriber = & RPCBlockTrackingSubscriber {}
2218
2319type RPCBlockTrackingSubscriber struct {
24- logger zerolog. Logger
20+ * RPCEventSubscriber
2521
22+ logger zerolog.Logger
2623 client * requester.CrossSporkClient
2724 chain flowGo.ChainID
2825 keyLock requester.KeyLock
2926 height uint64
30-
31- recovery bool
32- recoveredEvents []flow.Event
3327}
3428
3529func NewRPCBlockTrackingSubscriber (
@@ -39,14 +33,22 @@ func NewRPCBlockTrackingSubscriber(
3933 keyLock requester.KeyLock ,
4034 startHeight uint64 ,
4135) * RPCBlockTrackingSubscriber {
36+ eventSubscriber := NewRPCEventSubscriber (
37+ logger ,
38+ client ,
39+ chainID ,
40+ keyLock ,
41+ startHeight ,
42+ )
4243 logger = logger .With ().Str ("component" , "subscriber" ).Logger ()
43- return & RPCBlockTrackingSubscriber {
44- logger : logger ,
4544
46- client : client ,
47- chain : chainID ,
48- keyLock : keyLock ,
49- height : startHeight ,
45+ return & RPCBlockTrackingSubscriber {
46+ RPCEventSubscriber : eventSubscriber ,
47+ logger : logger ,
48+ client : client ,
49+ chain : chainID ,
50+ keyLock : keyLock ,
51+ height : startHeight ,
5052 }
5153}
5254
@@ -179,6 +181,9 @@ func (r *RPCBlockTrackingSubscriber) subscribe(ctx context.Context, height uint6
179181 return
180182 }
181183 blockEvent := evts [0 ]
184+ blockEvents .BlockID = blockEvent .BlockID
185+ blockEvents .BlockTimestamp = blockEvent .BlockTimestamp
186+ blockEvents .Height = blockEvent .Height
182187 blockEvents .Events = append (blockEvents .Events , blockEvent .Events ... )
183188 }
184189
@@ -238,306 +243,3 @@ func (r *RPCBlockTrackingSubscriber) subscribe(ctx context.Context, height uint6
238243
239244 return eventsChan
240245}
241-
242- // backfill returns a channel that is filled with block events from the provided fromCadenceHeight up to the first
243- // height in the current spork.
244- func (r * RPCBlockTrackingSubscriber ) backfill (ctx context.Context , fromCadenceHeight uint64 ) <- chan models.BlockEvents {
245- eventsChan := make (chan models.BlockEvents )
246-
247- go func () {
248- defer func () {
249- close (eventsChan )
250- }()
251-
252- for {
253- // check if the current fromCadenceHeight is still in past sporks, and if not return since we are done with backfilling
254- if ! r .client .IsPastSpork (fromCadenceHeight ) {
255- r .logger .Info ().
256- Uint64 ("height" , fromCadenceHeight ).
257- Msg ("completed backfilling" )
258-
259- return
260- }
261-
262- var err error
263- fromCadenceHeight , err = r .backfillSporkFromHeight (ctx , fromCadenceHeight , eventsChan )
264- if err != nil {
265- r .logger .Error ().Err (err ).Msg ("error backfilling spork" )
266- eventsChan <- models .NewBlockEventsError (err )
267- return
268- }
269-
270- r .logger .Info ().
271- Uint64 ("next-cadence-height" , fromCadenceHeight ).
272- Msg ("reached the end of spork, checking next spork" )
273- }
274- }()
275-
276- return eventsChan
277- }
278-
279- // / backfillSporkFromHeight will fill the eventsChan with block events from the provided fromHeight up to the first height in the spork that comes
280- // after the spork of the provided fromHeight.
281- func (r * RPCBlockTrackingSubscriber ) backfillSporkFromHeight (ctx context.Context , fromCadenceHeight uint64 , eventsChan chan <- models.BlockEvents ) (uint64 , error ) {
282- evmAddress := common .Address (systemcontracts .SystemContractsForChain (r .chain ).EVMContract .Address )
283-
284- blockExecutedEvent := common .NewAddressLocation (
285- nil ,
286- evmAddress ,
287- string (events .EventTypeBlockExecuted ),
288- ).ID ()
289-
290- transactionExecutedEvent := common .NewAddressLocation (
291- nil ,
292- evmAddress ,
293- string (events .EventTypeTransactionExecuted ),
294- ).ID ()
295-
296- lastHeight , err := r .client .GetLatestHeightForSpork (ctx , fromCadenceHeight )
297- if err != nil {
298- eventsChan <- models .NewBlockEventsError (err )
299- return 0 , err
300- }
301-
302- r .logger .Info ().
303- Uint64 ("start-height" , fromCadenceHeight ).
304- Uint64 ("last-spork-height" , lastHeight ).
305- Msg ("backfilling spork" )
306-
307- for fromCadenceHeight < lastHeight {
308- r .logger .Debug ().Msg (fmt .Sprintf ("backfilling [%d / %d] ..." , fromCadenceHeight , lastHeight ))
309-
310- startHeight := fromCadenceHeight
311- endHeight := fromCadenceHeight + maxRangeForGetEvents
312- if endHeight > lastHeight {
313- endHeight = lastHeight
314- }
315-
316- blocks , err := r .client .GetEventsForHeightRange (ctx , blockExecutedEvent , startHeight , endHeight )
317- if err != nil {
318- return 0 , fmt .Errorf ("failed to get block events: %w" , err )
319- }
320-
321- transactions , err := r .client .GetEventsForHeightRange (ctx , transactionExecutedEvent , startHeight , endHeight )
322- if err != nil {
323- return 0 , fmt .Errorf ("failed to get block events: %w" , err )
324- }
325-
326- if len (transactions ) != len (blocks ) {
327- return 0 , fmt .Errorf ("transactions and blocks have different length" )
328- }
329-
330- // sort both, just in case
331- sort .Slice (blocks , func (i , j int ) bool {
332- return blocks [i ].Height < blocks [j ].Height
333- })
334- sort .Slice (transactions , func (i , j int ) bool {
335- return transactions [i ].Height < transactions [j ].Height
336- })
337-
338- for i := range transactions {
339- if transactions [i ].Height != blocks [i ].Height {
340- return 0 , fmt .Errorf ("transactions and blocks have different height" )
341- }
342-
343- // append the transaction events to the block events
344- blocks [i ].Events = append (blocks [i ].Events , transactions [i ].Events ... )
345-
346- evmEvents := models .NewSingleBlockEvents (blocks [i ])
347- if evmEvents .Err != nil && errors .Is (evmEvents .Err , errs .ErrMissingBlock ) {
348- evmEvents , err = r .accumulateBlockEvents (
349- ctx ,
350- blocks [i ],
351- blockExecutedEvent ,
352- transactionExecutedEvent ,
353- )
354- if err != nil {
355- return 0 , err
356- }
357- eventsChan <- evmEvents
358- // advance the height
359- fromCadenceHeight = evmEvents .Events .CadenceHeight () + 1
360- break
361- }
362- eventsChan <- evmEvents
363-
364- // advance the height
365- fromCadenceHeight = evmEvents .Events .CadenceHeight () + 1
366- }
367-
368- }
369- return fromCadenceHeight , nil
370- }
371-
372- // accumulateBlockEvents will keep fetching `EVM.TransactionExecuted` events
373- // until it finds their `EVM.BlockExecuted` event.
374- // At that point it will return the valid models.BlockEvents.
375- func (r * RPCBlockTrackingSubscriber ) accumulateBlockEvents (
376- ctx context.Context ,
377- block flow.BlockEvents ,
378- blockExecutedEventType string ,
379- txExecutedEventType string ,
380- ) (models.BlockEvents , error ) {
381- evmEvents := models .NewSingleBlockEvents (block )
382- currentHeight := block .Height
383- transactionEvents := make ([]flow.Event , 0 )
384-
385- for evmEvents .Err != nil && errors .Is (evmEvents .Err , errs .ErrMissingBlock ) {
386- blocks , err := r .client .GetEventsForHeightRange (
387- ctx ,
388- blockExecutedEventType ,
389- currentHeight ,
390- currentHeight + maxRangeForGetEvents ,
391- )
392- if err != nil {
393- return models.BlockEvents {}, fmt .Errorf ("failed to get block events: %w" , err )
394- }
395-
396- transactions , err := r .client .GetEventsForHeightRange (
397- ctx ,
398- txExecutedEventType ,
399- currentHeight ,
400- currentHeight + maxRangeForGetEvents ,
401- )
402- if err != nil {
403- return models.BlockEvents {}, fmt .Errorf ("failed to get block events: %w" , err )
404- }
405-
406- if len (transactions ) != len (blocks ) {
407- return models.BlockEvents {}, fmt .Errorf ("transactions and blocks have different length" )
408- }
409-
410- // sort both, just in case
411- sort .Slice (blocks , func (i , j int ) bool {
412- return blocks [i ].Height < blocks [j ].Height
413- })
414- sort .Slice (transactions , func (i , j int ) bool {
415- return transactions [i ].Height < transactions [j ].Height
416- })
417-
418- for i := range blocks {
419- if transactions [i ].Height != blocks [i ].Height {
420- return models.BlockEvents {}, fmt .Errorf ("transactions and blocks have different height" )
421- }
422-
423- // If no EVM.BlockExecuted event found, keep accumulating the incoming
424- // EVM.TransactionExecuted events, until we find the EVM.BlockExecuted
425- // event that includes them.
426- if len (blocks [i ].Events ) == 0 {
427- txEvents := transactions [i ].Events
428- // Sort `EVM.TransactionExecuted` events
429- sort .Slice (txEvents , func (i , j int ) bool {
430- if txEvents [i ].TransactionIndex != txEvents [j ].TransactionIndex {
431- return txEvents [i ].TransactionIndex < txEvents [j ].TransactionIndex
432- }
433- return txEvents [i ].EventIndex < txEvents [j ].EventIndex
434- })
435- transactionEvents = append (transactionEvents , txEvents ... )
436- } else {
437- blocks [i ].Events = append (blocks [i ].Events , transactionEvents ... )
438- // We use `models.NewMultiBlockEvents`, as the `transactionEvents`
439- // are coming from different Flow blocks.
440- evmEvents = models .NewMultiBlockEvents (blocks [i ])
441- if evmEvents .Err == nil {
442- return evmEvents , nil
443- }
444- }
445-
446- currentHeight = blocks [i ].Height + 1
447- }
448- }
449- return evmEvents , nil
450- }
451-
452- // fetchMissingData is used as a backup mechanism for fetching EVM-related
453- // events, when the event streaming API returns an inconsistent response.
454- // An inconsistent response could be an EVM block that references EVM
455- // transactions which are not present in the response. It falls back
456- // to using grpc requests instead of streaming.
457- func (r * RPCBlockTrackingSubscriber ) fetchMissingData (
458- ctx context.Context ,
459- blockEvents flow.BlockEvents ,
460- ) models.BlockEvents {
461- // remove existing events
462- blockEvents .Events = nil
463-
464- for _ , eventType := range blocksFilter (r .chain ).EventTypes {
465- recoveredEvents , err := r .client .GetEventsForHeightRange (
466- ctx ,
467- eventType ,
468- blockEvents .Height ,
469- blockEvents .Height ,
470- )
471- if err != nil {
472- return models .NewBlockEventsError (err )
473- }
474-
475- if len (recoveredEvents ) != 1 {
476- return models .NewBlockEventsError (
477- fmt .Errorf (
478- "received %d but expected 1 event for height %d" ,
479- len (recoveredEvents ),
480- blockEvents .Height ,
481- ),
482- )
483- }
484-
485- blockEvents .Events = append (blockEvents .Events , recoveredEvents [0 ].Events ... )
486- }
487-
488- return models .NewSingleBlockEvents (blockEvents )
489- }
490-
491- // accumulateEventsMissingBlock will keep receiving transaction events until it can produce a valid
492- // EVM block event containing a block and transactions. At that point it will reset the recovery mode
493- // and return the valid block events.
494- func (r * RPCBlockTrackingSubscriber ) accumulateEventsMissingBlock (events flow.BlockEvents ) models.BlockEvents {
495- txEvents := events .Events
496- // Sort `EVM.TransactionExecuted` events
497- sort .Slice (txEvents , func (i , j int ) bool {
498- if txEvents [i ].TransactionIndex != txEvents [j ].TransactionIndex {
499- return txEvents [i ].TransactionIndex < txEvents [j ].TransactionIndex
500- }
501- return txEvents [i ].EventIndex < txEvents [j ].EventIndex
502- })
503- r .recoveredEvents = append (r .recoveredEvents , txEvents ... )
504- events .Events = r .recoveredEvents
505-
506- // We use `models.NewMultiBlockEvents`, as the `transactionEvents`
507- // are coming from different Flow blocks.
508- recovered := models .NewMultiBlockEvents (events )
509- r .recovery = recovered .Err != nil
510-
511- if ! r .recovery {
512- r .recoveredEvents = nil
513- }
514-
515- return recovered
516- }
517-
518- // recover tries to recover from an invalid data sent over the event stream.
519- //
520- // An invalid data can be a cause of corrupted index or network issue from the source,
521- // in which case we might miss one of the events (missing transaction), or it can be
522- // due to a failure from the system transaction which commits an EVM block, which results
523- // in missing EVM block event but present transactions.
524- func (r * RPCBlockTrackingSubscriber ) recover (
525- ctx context.Context ,
526- events flow.BlockEvents ,
527- err error ,
528- ) models.BlockEvents {
529- r .logger .Warn ().Err (err ).Msgf (
530- "failed to parse EVM block events for Flow height: %d, entering recovery" ,
531- events .Height ,
532- )
533-
534- if errors .Is (err , errs .ErrMissingBlock ) || r .recovery {
535- return r .accumulateEventsMissingBlock (events )
536- }
537-
538- if errors .Is (err , errs .ErrMissingTransactions ) {
539- return r .fetchMissingData (ctx , events )
540- }
541-
542- return models .NewBlockEventsError (err )
543- }
0 commit comments