Skip to content

Commit ca18d33

Browse files
committed
Handle ingestion of missing EVM.BlockExecuted event in backfill process
1 parent 6727d4a commit ca18d33

File tree

4 files changed

+162
-35
lines changed

4 files changed

+162
-35
lines changed

models/events.go

Lines changed: 33 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -46,15 +46,6 @@ type CadenceEvents struct {
4646

4747
// NewCadenceEvents decodes the events into evm types.
4848
func NewCadenceEvents(events flow.BlockEvents) (*CadenceEvents, error) {
49-
// first we sort all the events in the block, by their TransactionIndex,
50-
// and then we also sort events in the same transaction, by their EventIndex.
51-
sort.Slice(events.Events, func(i, j int) bool {
52-
if events.Events[i].TransactionIndex != events.Events[j].TransactionIndex {
53-
return events.Events[i].TransactionIndex < events.Events[j].TransactionIndex
54-
}
55-
return events.Events[i].EventIndex < events.Events[j].EventIndex
56-
})
57-
5849
e, err := decodeCadenceEvents(events)
5950
if err != nil {
6051
return nil, err
@@ -219,11 +210,41 @@ type BlockEvents struct {
219210
Err error
220211
}
221212

222-
func NewBlockEvents(events flow.BlockEvents) BlockEvents {
223-
blockEvents, err := NewCadenceEvents(events)
213+
// NewMultiBlockEvents will decode any possible `EVM.TransactionExecuted` &
214+
// `EVM.BlockExecuted` events and populate the resulting `Block`, `Transaction` &
215+
// `Receipt` values.
216+
// The `EVM.TransactionExecuted` events are expected to be properly sorted by
217+
// the caller.
218+
// Use this method when dealing with `flow.BlockEvents` from multiple Flow blocks.
219+
// The `EVM.TransactionExecuted` events could be produced at a Flow block, that
220+
// comes prior to the Flow block that produced the `EVM.BlockExecuted` event.
221+
func NewMultiBlockEvents(events flow.BlockEvents) BlockEvents {
222+
cdcEvents, err := NewCadenceEvents(events)
223+
return BlockEvents{
224+
Events: cdcEvents,
225+
Err: err,
226+
}
227+
}
228+
229+
// NewSingleBlockEvents will decode any possible `EVM.TransactionExecuted` &
230+
// `EVM.BlockExecuted` events and populate the resulting `Block`, `Transaction` &
231+
// `Receipt` values.
232+
// The `EVM.TransactionExecuted` events will be sorted by `TransactionIndex` &
233+
// `EventIndex`, prior to decoding.
234+
// Use this method when dealing with `flow.BlockEvents` from a single Flow block.
235+
func NewSingleBlockEvents(events flow.BlockEvents) BlockEvents {
236+
// first we sort all the events in the block, by their TransactionIndex,
237+
// and then we also sort events in the same transaction, by their EventIndex.
238+
sort.Slice(events.Events, func(i, j int) bool {
239+
if events.Events[i].TransactionIndex != events.Events[j].TransactionIndex {
240+
return events.Events[i].TransactionIndex < events.Events[j].TransactionIndex
241+
}
242+
return events.Events[i].EventIndex < events.Events[j].EventIndex
243+
})
224244

245+
cdcEvents, err := NewCadenceEvents(events)
225246
return BlockEvents{
226-
Events: blockEvents,
247+
Events: cdcEvents,
227248
Err: err,
228249
}
229250
}

models/events_test.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -201,8 +201,9 @@ func TestCadenceEvents_Block(t *testing.T) {
201201
blockEvents.Events = append(blockEvents.Events, blockEvent)
202202

203203
// parse the EventStreaming API response
204-
cdcEvents, err := NewCadenceEvents(blockEvents)
205-
require.NoError(t, err)
204+
blkEvents := NewSingleBlockEvents(blockEvents)
205+
require.NoError(t, blkEvents.Err)
206+
cdcEvents := blkEvents.Events
206207

207208
// assert that Flow events are sorted by their TransactionIndex and EventIndex fields
208209
assert.Equal(

services/ingestion/engine_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ func TestSerialBlockIngestion(t *testing.T) {
104104
}).
105105
Once()
106106

107-
eventsChan <- models.NewBlockEvents(flow.BlockEvents{
107+
eventsChan <- models.NewSingleBlockEvents(flow.BlockEvents{
108108
Events: []flow.Event{{
109109
Type: string(blockEvent.Etype),
110110
Value: blockCdc,
@@ -318,7 +318,7 @@ func TestBlockAndTransactionIngestion(t *testing.T) {
318318
}).
319319
Once()
320320

321-
eventsChan <- models.NewBlockEvents(flow.BlockEvents{
321+
eventsChan <- models.NewSingleBlockEvents(flow.BlockEvents{
322322
Events: []flow.Event{{
323323
Type: string(blockEvent.Etype),
324324
Value: blockCdc,
@@ -420,7 +420,7 @@ func TestBlockAndTransactionIngestion(t *testing.T) {
420420
}).
421421
Once()
422422

423-
eventsChan <- models.NewBlockEvents(flow.BlockEvents{
423+
eventsChan <- models.NewSingleBlockEvents(flow.BlockEvents{
424424
Events: []flow.Event{
425425
// first transaction
426426
{
@@ -557,7 +557,7 @@ func TestBlockAndTransactionIngestion(t *testing.T) {
557557
// and it will make the first block be swapped with second block out-of-order
558558
events[1], events[2] = events[2], events[1]
559559

560-
eventsChan <- models.NewBlockEvents(flow.BlockEvents{
560+
eventsChan <- models.NewSingleBlockEvents(flow.BlockEvents{
561561
Events: events,
562562
Height: latestCadenceHeight + 1,
563563
})

services/ingestion/event_subscriber.go

Lines changed: 122 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,7 @@ func (r *RPCEventSubscriber) subscribe(ctx context.Context, height uint64) <-cha
160160
return
161161
}
162162

163-
evmEvents := models.NewBlockEvents(blockEvents)
163+
evmEvents := models.NewSingleBlockEvents(blockEvents)
164164
// if events contain an error, or we are in a recovery mode
165165
if evmEvents.Err != nil || r.recovery {
166166
evmEvents = r.recover(ctx, blockEvents, evmEvents.Err)
@@ -237,6 +237,18 @@ const maxRangeForGetEvents = uint64(249)
237237
func (r *RPCEventSubscriber) backfillSporkFromHeight(ctx context.Context, fromCadenceHeight uint64, eventsChan chan<- models.BlockEvents) (uint64, error) {
238238
evmAddress := common.Address(systemcontracts.SystemContractsForChain(r.chain).EVMContract.Address)
239239

240+
blockExecutedEvent := common.NewAddressLocation(
241+
nil,
242+
evmAddress,
243+
string(events.EventTypeBlockExecuted),
244+
).ID()
245+
246+
transactionExecutedEvent := common.NewAddressLocation(
247+
nil,
248+
evmAddress,
249+
string(events.EventTypeTransactionExecuted),
250+
).ID()
251+
240252
lastHeight, err := r.client.GetLatestHeightForSpork(ctx, fromCadenceHeight)
241253
if err != nil {
242254
eventsChan <- models.NewBlockEventsError(err)
@@ -257,18 +269,6 @@ func (r *RPCEventSubscriber) backfillSporkFromHeight(ctx context.Context, fromCa
257269
endHeight = lastHeight
258270
}
259271

260-
blockExecutedEvent := common.NewAddressLocation(
261-
nil,
262-
evmAddress,
263-
string(events.EventTypeBlockExecuted),
264-
).ID()
265-
266-
transactionExecutedEvent := common.NewAddressLocation(
267-
nil,
268-
evmAddress,
269-
string(events.EventTypeTransactionExecuted),
270-
).ID()
271-
272272
blocks, err := r.client.GetEventsForHeightRange(ctx, blockExecutedEvent, startHeight, endHeight)
273273
if err != nil {
274274
return 0, fmt.Errorf("failed to get block events: %w", err)
@@ -299,7 +299,22 @@ func (r *RPCEventSubscriber) backfillSporkFromHeight(ctx context.Context, fromCa
299299
// append the transaction events to the block events
300300
blocks[i].Events = append(blocks[i].Events, transactions[i].Events...)
301301

302-
evmEvents := models.NewBlockEvents(blocks[i])
302+
evmEvents := models.NewSingleBlockEvents(blocks[i])
303+
if evmEvents.Err != nil && errors.Is(evmEvents.Err, errs.ErrMissingBlock) {
304+
evmEvents, err = r.accumulateBlockEvents(
305+
ctx,
306+
blocks[i],
307+
blockExecutedEvent,
308+
transactionExecutedEvent,
309+
)
310+
if err != nil {
311+
return 0, err
312+
}
313+
eventsChan <- evmEvents
314+
// advance the height
315+
fromCadenceHeight = evmEvents.Events.CadenceHeight() + 1
316+
break
317+
}
303318
eventsChan <- evmEvents
304319

305320
// advance the height
@@ -310,6 +325,86 @@ func (r *RPCEventSubscriber) backfillSporkFromHeight(ctx context.Context, fromCa
310325
return fromCadenceHeight, nil
311326
}
312327

328+
// accumulateBlockEvents will keep fetching `EVM.TransactionExecuted` events
329+
// until it finds their `EVM.BlockExecuted` event.
330+
// At that point it will return the valid models.BlockEvents.
331+
func (r *RPCEventSubscriber) accumulateBlockEvents(
332+
ctx context.Context,
333+
block flow.BlockEvents,
334+
blockExecutedEventType string,
335+
txExecutedEventType string,
336+
) (models.BlockEvents, error) {
337+
evmEvents := models.NewSingleBlockEvents(block)
338+
currentHeight := block.Height
339+
transactionEvents := make([]flow.Event, 0)
340+
341+
for evmEvents.Err != nil && errors.Is(evmEvents.Err, errs.ErrMissingBlock) {
342+
blocks, err := r.client.GetEventsForHeightRange(
343+
ctx,
344+
blockExecutedEventType,
345+
currentHeight,
346+
currentHeight+maxRangeForGetEvents,
347+
)
348+
if err != nil {
349+
return models.BlockEvents{}, fmt.Errorf("failed to get block events: %w", err)
350+
}
351+
352+
transactions, err := r.client.GetEventsForHeightRange(
353+
ctx,
354+
txExecutedEventType,
355+
currentHeight,
356+
currentHeight+maxRangeForGetEvents,
357+
)
358+
if err != nil {
359+
return models.BlockEvents{}, fmt.Errorf("failed to get block events: %w", err)
360+
}
361+
362+
if len(transactions) != len(blocks) {
363+
return models.BlockEvents{}, fmt.Errorf("transactions and blocks have different length")
364+
}
365+
366+
// sort both, just in case
367+
sort.Slice(blocks, func(i, j int) bool {
368+
return blocks[i].Height < blocks[j].Height
369+
})
370+
sort.Slice(transactions, func(i, j int) bool {
371+
return transactions[i].Height < transactions[j].Height
372+
})
373+
374+
for i := range blocks {
375+
if transactions[i].Height != blocks[i].Height {
376+
return models.BlockEvents{}, fmt.Errorf("transactions and blocks have different height")
377+
}
378+
379+
// If no EVM.BlockExecuted event found, keep accumulating the incoming
380+
// EVM.TransactionExecuted events, until we find the EVM.BlockExecuted
381+
// event that includes them.
382+
if len(blocks[i].Events) == 0 {
383+
txEvents := transactions[i].Events
384+
// Sort `EVM.TransactionExecuted` events
385+
sort.Slice(txEvents, func(i, j int) bool {
386+
if txEvents[i].TransactionIndex != txEvents[j].TransactionIndex {
387+
return txEvents[i].TransactionIndex < txEvents[j].TransactionIndex
388+
}
389+
return txEvents[i].EventIndex < txEvents[j].EventIndex
390+
})
391+
transactionEvents = append(transactionEvents, txEvents...)
392+
} else {
393+
blocks[i].Events = append(blocks[i].Events, transactionEvents...)
394+
// We use `models.NewMultiBlockEvents`, as the `transactionEvents`
395+
// are coming from different Flow blocks.
396+
evmEvents = models.NewMultiBlockEvents(blocks[i])
397+
if evmEvents.Err == nil {
398+
return evmEvents, nil
399+
}
400+
}
401+
402+
currentHeight = blocks[i].Height + 1
403+
}
404+
}
405+
return evmEvents, nil
406+
}
407+
313408
// fetchMissingData is used as a backup mechanism for fetching EVM-related
314409
// events, when the event streaming API returns an inconsistent response.
315410
// An inconsistent response could be an EVM block that references EVM
@@ -346,17 +441,27 @@ func (r *RPCEventSubscriber) fetchMissingData(
346441
blockEvents.Events = append(blockEvents.Events, recoveredEvents[0].Events...)
347442
}
348443

349-
return models.NewBlockEvents(blockEvents)
444+
return models.NewSingleBlockEvents(blockEvents)
350445
}
351446

352447
// accumulateEventsMissingBlock will keep receiving transaction events until it can produce a valid
353448
// EVM block event containing a block and transactions. At that point it will reset the recovery mode
354449
// and return the valid block events.
355450
func (r *RPCEventSubscriber) accumulateEventsMissingBlock(events flow.BlockEvents) models.BlockEvents {
356-
r.recoveredEvents = append(r.recoveredEvents, events.Events...)
451+
txEvents := events.Events
452+
// Sort `EVM.TransactionExecuted` events
453+
sort.Slice(txEvents, func(i, j int) bool {
454+
if txEvents[i].TransactionIndex != txEvents[j].TransactionIndex {
455+
return txEvents[i].TransactionIndex < txEvents[j].TransactionIndex
456+
}
457+
return txEvents[i].EventIndex < txEvents[j].EventIndex
458+
})
459+
r.recoveredEvents = append(r.recoveredEvents, txEvents...)
357460
events.Events = r.recoveredEvents
358461

359-
recovered := models.NewBlockEvents(events)
462+
// We use `models.NewMultiBlockEvents`, as the `transactionEvents`
463+
// are coming from different Flow blocks.
464+
recovered := models.NewMultiBlockEvents(events)
360465
r.recovery = recovered.Err != nil
361466

362467
if !r.recovery {

0 commit comments

Comments
 (0)