Skip to content

Commit d159467

Browse files
committed
Simplify RPCBlockTrackingSubscriber by embedding RPCEventSubscriber for function resuse
1 parent ce83ebb commit d159467

File tree

1 file changed

+19
-317
lines changed

1 file changed

+19
-317
lines changed

services/ingestion/block_tracking_subscriber.go

Lines changed: 19 additions & 317 deletions
Original file line numberDiff line numberDiff line change
@@ -4,32 +4,26 @@ import (
44
"context"
55
"errors"
66
"fmt"
7-
"sort"
87
"strings"
98

10-
"github.com/onflow/cadence/common"
119
"github.com/onflow/flow-evm-gateway/models"
1210
errs "github.com/onflow/flow-evm-gateway/models/errors"
1311
"github.com/onflow/flow-evm-gateway/services/requester"
1412
"github.com/onflow/flow-go-sdk"
15-
"github.com/onflow/flow-go/fvm/evm/events"
16-
"github.com/onflow/flow-go/fvm/systemcontracts"
1713
flowGo "github.com/onflow/flow-go/model/flow"
1814
"github.com/rs/zerolog"
1915
)
2016

2117
var _ EventSubscriber = &RPCBlockTrackingSubscriber{}
2218

2319
type RPCBlockTrackingSubscriber struct {
24-
logger zerolog.Logger
20+
*RPCEventSubscriber
2521

22+
logger zerolog.Logger
2623
client *requester.CrossSporkClient
2724
chain flowGo.ChainID
2825
keyLock requester.KeyLock
2926
height uint64
30-
31-
recovery bool
32-
recoveredEvents []flow.Event
3327
}
3428

3529
func NewRPCBlockTrackingSubscriber(
@@ -39,14 +33,22 @@ func NewRPCBlockTrackingSubscriber(
3933
keyLock requester.KeyLock,
4034
startHeight uint64,
4135
) *RPCBlockTrackingSubscriber {
36+
eventSubscriber := NewRPCEventSubscriber(
37+
logger,
38+
client,
39+
chainID,
40+
keyLock,
41+
startHeight,
42+
)
4243
logger = logger.With().Str("component", "subscriber").Logger()
43-
return &RPCBlockTrackingSubscriber{
44-
logger: logger,
4544

46-
client: client,
47-
chain: chainID,
48-
keyLock: keyLock,
49-
height: startHeight,
45+
return &RPCBlockTrackingSubscriber{
46+
RPCEventSubscriber: eventSubscriber,
47+
logger: logger,
48+
client: client,
49+
chain: chainID,
50+
keyLock: keyLock,
51+
height: startHeight,
5052
}
5153
}
5254

@@ -179,6 +181,9 @@ func (r *RPCBlockTrackingSubscriber) subscribe(ctx context.Context, height uint6
179181
return
180182
}
181183
blockEvent := evts[0]
184+
blockEvents.BlockID = blockEvent.BlockID
185+
blockEvents.BlockTimestamp = blockEvent.BlockTimestamp
186+
blockEvents.Height = blockEvent.Height
182187
blockEvents.Events = append(blockEvents.Events, blockEvent.Events...)
183188
}
184189

@@ -238,306 +243,3 @@ func (r *RPCBlockTrackingSubscriber) subscribe(ctx context.Context, height uint6
238243

239244
return eventsChan
240245
}
241-
242-
// backfill returns a channel that is filled with block events from the provided fromCadenceHeight up to the first
243-
// height in the current spork.
244-
func (r *RPCBlockTrackingSubscriber) backfill(ctx context.Context, fromCadenceHeight uint64) <-chan models.BlockEvents {
245-
eventsChan := make(chan models.BlockEvents)
246-
247-
go func() {
248-
defer func() {
249-
close(eventsChan)
250-
}()
251-
252-
for {
253-
// check if the current fromCadenceHeight is still in past sporks, and if not return since we are done with backfilling
254-
if !r.client.IsPastSpork(fromCadenceHeight) {
255-
r.logger.Info().
256-
Uint64("height", fromCadenceHeight).
257-
Msg("completed backfilling")
258-
259-
return
260-
}
261-
262-
var err error
263-
fromCadenceHeight, err = r.backfillSporkFromHeight(ctx, fromCadenceHeight, eventsChan)
264-
if err != nil {
265-
r.logger.Error().Err(err).Msg("error backfilling spork")
266-
eventsChan <- models.NewBlockEventsError(err)
267-
return
268-
}
269-
270-
r.logger.Info().
271-
Uint64("next-cadence-height", fromCadenceHeight).
272-
Msg("reached the end of spork, checking next spork")
273-
}
274-
}()
275-
276-
return eventsChan
277-
}
278-
279-
// / backfillSporkFromHeight will fill the eventsChan with block events from the provided fromHeight up to the first height in the spork that comes
280-
// after the spork of the provided fromHeight.
281-
func (r *RPCBlockTrackingSubscriber) backfillSporkFromHeight(ctx context.Context, fromCadenceHeight uint64, eventsChan chan<- models.BlockEvents) (uint64, error) {
282-
evmAddress := common.Address(systemcontracts.SystemContractsForChain(r.chain).EVMContract.Address)
283-
284-
blockExecutedEvent := common.NewAddressLocation(
285-
nil,
286-
evmAddress,
287-
string(events.EventTypeBlockExecuted),
288-
).ID()
289-
290-
transactionExecutedEvent := common.NewAddressLocation(
291-
nil,
292-
evmAddress,
293-
string(events.EventTypeTransactionExecuted),
294-
).ID()
295-
296-
lastHeight, err := r.client.GetLatestHeightForSpork(ctx, fromCadenceHeight)
297-
if err != nil {
298-
eventsChan <- models.NewBlockEventsError(err)
299-
return 0, err
300-
}
301-
302-
r.logger.Info().
303-
Uint64("start-height", fromCadenceHeight).
304-
Uint64("last-spork-height", lastHeight).
305-
Msg("backfilling spork")
306-
307-
for fromCadenceHeight < lastHeight {
308-
r.logger.Debug().Msg(fmt.Sprintf("backfilling [%d / %d] ...", fromCadenceHeight, lastHeight))
309-
310-
startHeight := fromCadenceHeight
311-
endHeight := fromCadenceHeight + maxRangeForGetEvents
312-
if endHeight > lastHeight {
313-
endHeight = lastHeight
314-
}
315-
316-
blocks, err := r.client.GetEventsForHeightRange(ctx, blockExecutedEvent, startHeight, endHeight)
317-
if err != nil {
318-
return 0, fmt.Errorf("failed to get block events: %w", err)
319-
}
320-
321-
transactions, err := r.client.GetEventsForHeightRange(ctx, transactionExecutedEvent, startHeight, endHeight)
322-
if err != nil {
323-
return 0, fmt.Errorf("failed to get block events: %w", err)
324-
}
325-
326-
if len(transactions) != len(blocks) {
327-
return 0, fmt.Errorf("transactions and blocks have different length")
328-
}
329-
330-
// sort both, just in case
331-
sort.Slice(blocks, func(i, j int) bool {
332-
return blocks[i].Height < blocks[j].Height
333-
})
334-
sort.Slice(transactions, func(i, j int) bool {
335-
return transactions[i].Height < transactions[j].Height
336-
})
337-
338-
for i := range transactions {
339-
if transactions[i].Height != blocks[i].Height {
340-
return 0, fmt.Errorf("transactions and blocks have different height")
341-
}
342-
343-
// append the transaction events to the block events
344-
blocks[i].Events = append(blocks[i].Events, transactions[i].Events...)
345-
346-
evmEvents := models.NewSingleBlockEvents(blocks[i])
347-
if evmEvents.Err != nil && errors.Is(evmEvents.Err, errs.ErrMissingBlock) {
348-
evmEvents, err = r.accumulateBlockEvents(
349-
ctx,
350-
blocks[i],
351-
blockExecutedEvent,
352-
transactionExecutedEvent,
353-
)
354-
if err != nil {
355-
return 0, err
356-
}
357-
eventsChan <- evmEvents
358-
// advance the height
359-
fromCadenceHeight = evmEvents.Events.CadenceHeight() + 1
360-
break
361-
}
362-
eventsChan <- evmEvents
363-
364-
// advance the height
365-
fromCadenceHeight = evmEvents.Events.CadenceHeight() + 1
366-
}
367-
368-
}
369-
return fromCadenceHeight, nil
370-
}
371-
372-
// accumulateBlockEvents will keep fetching `EVM.TransactionExecuted` events
373-
// until it finds their `EVM.BlockExecuted` event.
374-
// At that point it will return the valid models.BlockEvents.
375-
func (r *RPCBlockTrackingSubscriber) accumulateBlockEvents(
376-
ctx context.Context,
377-
block flow.BlockEvents,
378-
blockExecutedEventType string,
379-
txExecutedEventType string,
380-
) (models.BlockEvents, error) {
381-
evmEvents := models.NewSingleBlockEvents(block)
382-
currentHeight := block.Height
383-
transactionEvents := make([]flow.Event, 0)
384-
385-
for evmEvents.Err != nil && errors.Is(evmEvents.Err, errs.ErrMissingBlock) {
386-
blocks, err := r.client.GetEventsForHeightRange(
387-
ctx,
388-
blockExecutedEventType,
389-
currentHeight,
390-
currentHeight+maxRangeForGetEvents,
391-
)
392-
if err != nil {
393-
return models.BlockEvents{}, fmt.Errorf("failed to get block events: %w", err)
394-
}
395-
396-
transactions, err := r.client.GetEventsForHeightRange(
397-
ctx,
398-
txExecutedEventType,
399-
currentHeight,
400-
currentHeight+maxRangeForGetEvents,
401-
)
402-
if err != nil {
403-
return models.BlockEvents{}, fmt.Errorf("failed to get block events: %w", err)
404-
}
405-
406-
if len(transactions) != len(blocks) {
407-
return models.BlockEvents{}, fmt.Errorf("transactions and blocks have different length")
408-
}
409-
410-
// sort both, just in case
411-
sort.Slice(blocks, func(i, j int) bool {
412-
return blocks[i].Height < blocks[j].Height
413-
})
414-
sort.Slice(transactions, func(i, j int) bool {
415-
return transactions[i].Height < transactions[j].Height
416-
})
417-
418-
for i := range blocks {
419-
if transactions[i].Height != blocks[i].Height {
420-
return models.BlockEvents{}, fmt.Errorf("transactions and blocks have different height")
421-
}
422-
423-
// If no EVM.BlockExecuted event found, keep accumulating the incoming
424-
// EVM.TransactionExecuted events, until we find the EVM.BlockExecuted
425-
// event that includes them.
426-
if len(blocks[i].Events) == 0 {
427-
txEvents := transactions[i].Events
428-
// Sort `EVM.TransactionExecuted` events
429-
sort.Slice(txEvents, func(i, j int) bool {
430-
if txEvents[i].TransactionIndex != txEvents[j].TransactionIndex {
431-
return txEvents[i].TransactionIndex < txEvents[j].TransactionIndex
432-
}
433-
return txEvents[i].EventIndex < txEvents[j].EventIndex
434-
})
435-
transactionEvents = append(transactionEvents, txEvents...)
436-
} else {
437-
blocks[i].Events = append(blocks[i].Events, transactionEvents...)
438-
// We use `models.NewMultiBlockEvents`, as the `transactionEvents`
439-
// are coming from different Flow blocks.
440-
evmEvents = models.NewMultiBlockEvents(blocks[i])
441-
if evmEvents.Err == nil {
442-
return evmEvents, nil
443-
}
444-
}
445-
446-
currentHeight = blocks[i].Height + 1
447-
}
448-
}
449-
return evmEvents, nil
450-
}
451-
452-
// fetchMissingData is used as a backup mechanism for fetching EVM-related
453-
// events, when the event streaming API returns an inconsistent response.
454-
// An inconsistent response could be an EVM block that references EVM
455-
// transactions which are not present in the response. It falls back
456-
// to using grpc requests instead of streaming.
457-
func (r *RPCBlockTrackingSubscriber) fetchMissingData(
458-
ctx context.Context,
459-
blockEvents flow.BlockEvents,
460-
) models.BlockEvents {
461-
// remove existing events
462-
blockEvents.Events = nil
463-
464-
for _, eventType := range blocksFilter(r.chain).EventTypes {
465-
recoveredEvents, err := r.client.GetEventsForHeightRange(
466-
ctx,
467-
eventType,
468-
blockEvents.Height,
469-
blockEvents.Height,
470-
)
471-
if err != nil {
472-
return models.NewBlockEventsError(err)
473-
}
474-
475-
if len(recoveredEvents) != 1 {
476-
return models.NewBlockEventsError(
477-
fmt.Errorf(
478-
"received %d but expected 1 event for height %d",
479-
len(recoveredEvents),
480-
blockEvents.Height,
481-
),
482-
)
483-
}
484-
485-
blockEvents.Events = append(blockEvents.Events, recoveredEvents[0].Events...)
486-
}
487-
488-
return models.NewSingleBlockEvents(blockEvents)
489-
}
490-
491-
// accumulateEventsMissingBlock will keep receiving transaction events until it can produce a valid
492-
// EVM block event containing a block and transactions. At that point it will reset the recovery mode
493-
// and return the valid block events.
494-
func (r *RPCBlockTrackingSubscriber) accumulateEventsMissingBlock(events flow.BlockEvents) models.BlockEvents {
495-
txEvents := events.Events
496-
// Sort `EVM.TransactionExecuted` events
497-
sort.Slice(txEvents, func(i, j int) bool {
498-
if txEvents[i].TransactionIndex != txEvents[j].TransactionIndex {
499-
return txEvents[i].TransactionIndex < txEvents[j].TransactionIndex
500-
}
501-
return txEvents[i].EventIndex < txEvents[j].EventIndex
502-
})
503-
r.recoveredEvents = append(r.recoveredEvents, txEvents...)
504-
events.Events = r.recoveredEvents
505-
506-
// We use `models.NewMultiBlockEvents`, as the `transactionEvents`
507-
// are coming from different Flow blocks.
508-
recovered := models.NewMultiBlockEvents(events)
509-
r.recovery = recovered.Err != nil
510-
511-
if !r.recovery {
512-
r.recoveredEvents = nil
513-
}
514-
515-
return recovered
516-
}
517-
518-
// recover tries to recover from an invalid data sent over the event stream.
519-
//
520-
// An invalid data can be a cause of corrupted index or network issue from the source,
521-
// in which case we might miss one of the events (missing transaction), or it can be
522-
// due to a failure from the system transaction which commits an EVM block, which results
523-
// in missing EVM block event but present transactions.
524-
func (r *RPCBlockTrackingSubscriber) recover(
525-
ctx context.Context,
526-
events flow.BlockEvents,
527-
err error,
528-
) models.BlockEvents {
529-
r.logger.Warn().Err(err).Msgf(
530-
"failed to parse EVM block events for Flow height: %d, entering recovery",
531-
events.Height,
532-
)
533-
534-
if errors.Is(err, errs.ErrMissingBlock) || r.recovery {
535-
return r.accumulateEventsMissingBlock(events)
536-
}
537-
538-
if errors.Is(err, errs.ErrMissingTransactions) {
539-
return r.fetchMissingData(ctx, events)
540-
}
541-
542-
return models.NewBlockEventsError(err)
543-
}

0 commit comments

Comments
 (0)