Skip to content

Commit e7406e4

Browse files
committed
Rename RPCBlockHeaderSubscriber to RPCBlockTrackingSubscriber
1 parent 4fd4a5d commit e7406e4

File tree

2 files changed

+14
-14
lines changed

2 files changed

+14
-14
lines changed

bootstrap/bootstrap.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ func (b *Bootstrap) StartEventIngestion(ctx context.Context) error {
142142
chainID := b.config.FlowNetworkID
143143

144144
// create event subscriber
145-
subscriber := ingestion.NewRPCBlockHeaderSubscriber(
145+
subscriber := ingestion.NewRPCBlockTrackingSubscriber(
146146
b.logger,
147147
b.client,
148148
chainID,

services/ingestion/block_header_subscriber.go renamed to services/ingestion/block_tracking_subscriber.go

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,9 @@ import (
1818
"github.com/rs/zerolog"
1919
)
2020

21-
var _ EventSubscriber = &RPCEventSubscriber{}
21+
var _ EventSubscriber = &RPCBlockTrackingSubscriber{}
2222

23-
type RPCBlockHeaderSubscriber struct {
23+
type RPCBlockTrackingSubscriber struct {
2424
logger zerolog.Logger
2525

2626
client *requester.CrossSporkClient
@@ -32,15 +32,15 @@ type RPCBlockHeaderSubscriber struct {
3232
recoveredEvents []flow.Event
3333
}
3434

35-
func NewRPCBlockHeaderSubscriber(
35+
func NewRPCBlockTrackingSubscriber(
3636
logger zerolog.Logger,
3737
client *requester.CrossSporkClient,
3838
chainID flowGo.ChainID,
3939
keyLock requester.KeyLock,
4040
startHeight uint64,
41-
) *RPCBlockHeaderSubscriber {
41+
) *RPCBlockTrackingSubscriber {
4242
logger = logger.With().Str("component", "subscriber").Logger()
43-
return &RPCBlockHeaderSubscriber{
43+
return &RPCBlockTrackingSubscriber{
4444
logger: logger,
4545

4646
client: client,
@@ -55,7 +55,7 @@ func NewRPCBlockHeaderSubscriber(
5555
// to listen all new events in the current spork.
5656
//
5757
// If error is encountered during backfill the subscription will end and the response chanel will be closed.
58-
func (r *RPCBlockHeaderSubscriber) Subscribe(ctx context.Context) <-chan models.BlockEvents {
58+
func (r *RPCBlockTrackingSubscriber) Subscribe(ctx context.Context) <-chan models.BlockEvents {
5959
// buffered channel so that the decoding of the events can happen in parallel to other operations
6060
eventsChan := make(chan models.BlockEvents, 1000)
6161

@@ -107,7 +107,7 @@ func (r *RPCBlockHeaderSubscriber) Subscribe(ctx context.Context) <-chan models.
107107
//
108108
// Subscribing to EVM specific events and handle any disconnection errors
109109
// as well as context cancellations.
110-
func (r *RPCBlockHeaderSubscriber) subscribe(ctx context.Context, height uint64) <-chan models.BlockEvents {
110+
func (r *RPCBlockTrackingSubscriber) subscribe(ctx context.Context, height uint64) <-chan models.BlockEvents {
111111
eventsChan := make(chan models.BlockEvents)
112112

113113
_, err := r.client.GetBlockHeaderByHeight(ctx, height)
@@ -227,7 +227,7 @@ func (r *RPCBlockHeaderSubscriber) subscribe(ctx context.Context, height uint64)
227227

228228
// backfill returns a channel that is filled with block events from the provided fromCadenceHeight up to the first
229229
// height in the current spork.
230-
func (r *RPCBlockHeaderSubscriber) backfill(ctx context.Context, fromCadenceHeight uint64) <-chan models.BlockEvents {
230+
func (r *RPCBlockTrackingSubscriber) backfill(ctx context.Context, fromCadenceHeight uint64) <-chan models.BlockEvents {
231231
eventsChan := make(chan models.BlockEvents)
232232

233233
go func() {
@@ -264,7 +264,7 @@ func (r *RPCBlockHeaderSubscriber) backfill(ctx context.Context, fromCadenceHeig
264264

265265
// / backfillSporkFromHeight will fill the eventsChan with block events from the provided fromHeight up to the first height in the spork that comes
266266
// after the spork of the provided fromHeight.
267-
func (r *RPCBlockHeaderSubscriber) backfillSporkFromHeight(ctx context.Context, fromCadenceHeight uint64, eventsChan chan<- models.BlockEvents) (uint64, error) {
267+
func (r *RPCBlockTrackingSubscriber) backfillSporkFromHeight(ctx context.Context, fromCadenceHeight uint64, eventsChan chan<- models.BlockEvents) (uint64, error) {
268268
evmAddress := common.Address(systemcontracts.SystemContractsForChain(r.chain).EVMContract.Address)
269269

270270
blockExecutedEvent := common.NewAddressLocation(
@@ -358,7 +358,7 @@ func (r *RPCBlockHeaderSubscriber) backfillSporkFromHeight(ctx context.Context,
358358
// accumulateBlockEvents will keep fetching `EVM.TransactionExecuted` events
359359
// until it finds their `EVM.BlockExecuted` event.
360360
// At that point it will return the valid models.BlockEvents.
361-
func (r *RPCBlockHeaderSubscriber) accumulateBlockEvents(
361+
func (r *RPCBlockTrackingSubscriber) accumulateBlockEvents(
362362
ctx context.Context,
363363
block flow.BlockEvents,
364364
blockExecutedEventType string,
@@ -440,7 +440,7 @@ func (r *RPCBlockHeaderSubscriber) accumulateBlockEvents(
440440
// An inconsistent response could be an EVM block that references EVM
441441
// transactions which are not present in the response. It falls back
442442
// to using grpc requests instead of streaming.
443-
func (r *RPCBlockHeaderSubscriber) fetchMissingData(
443+
func (r *RPCBlockTrackingSubscriber) fetchMissingData(
444444
ctx context.Context,
445445
blockEvents flow.BlockEvents,
446446
) models.BlockEvents {
@@ -477,7 +477,7 @@ func (r *RPCBlockHeaderSubscriber) fetchMissingData(
477477
// accumulateEventsMissingBlock will keep receiving transaction events until it can produce a valid
478478
// EVM block event containing a block and transactions. At that point it will reset the recovery mode
479479
// and return the valid block events.
480-
func (r *RPCBlockHeaderSubscriber) accumulateEventsMissingBlock(events flow.BlockEvents) models.BlockEvents {
480+
func (r *RPCBlockTrackingSubscriber) accumulateEventsMissingBlock(events flow.BlockEvents) models.BlockEvents {
481481
txEvents := events.Events
482482
// Sort `EVM.TransactionExecuted` events
483483
sort.Slice(txEvents, func(i, j int) bool {
@@ -507,7 +507,7 @@ func (r *RPCBlockHeaderSubscriber) accumulateEventsMissingBlock(events flow.Bloc
507507
// in which case we might miss one of the events (missing transaction), or it can be
508508
// due to a failure from the system transaction which commits an EVM block, which results
509509
// in missing EVM block event but present transactions.
510-
func (r *RPCBlockHeaderSubscriber) recover(
510+
func (r *RPCBlockTrackingSubscriber) recover(
511511
ctx context.Context,
512512
events flow.BlockEvents,
513513
err error,

0 commit comments

Comments
 (0)