Skip to content

Commit 1949a0a

Browse files
janezpodhostnikm-Peter
authored andcommitted
Refactor re-execution
1 parent a9fe319 commit 1949a0a

File tree

3 files changed

+52
-25
lines changed

3 files changed

+52
-25
lines changed

services/ingestion/engine.go

Lines changed: 51 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,29 @@ func (e *Engine) Run(ctx context.Context) error {
145145
}
146146
}
147147

148+
// withBatch will execute the provided function with a new batch, and commit the batch
149+
// afterwards if no error is returned.
150+
func (e *Engine) withBatch(f func(batch *pebbleDB.Batch) error) error {
151+
batch := e.store.NewBatch()
152+
defer func(batch *pebbleDB.Batch) {
153+
err := batch.Close()
154+
if err != nil {
155+
e.log.Fatal().Err(err).Msg("failed to close batch")
156+
}
157+
}(batch)
158+
159+
err := f(batch)
160+
if err != nil {
161+
return err
162+
}
163+
164+
if err := batch.Commit(pebbleDB.Sync); err != nil {
165+
return fmt.Errorf("failed to commit batch: %w", err)
166+
}
167+
168+
return nil
169+
}
170+
148171
// processEvents converts the events to block and transactions and indexes them.
149172
//
150173
// BlockEvents are received by the access node API and contain Cadence height (always a single Flow block),
@@ -163,14 +186,36 @@ func (e *Engine) processEvents(events *models.CadenceEvents) error {
163186
Int("cadence-event-length", events.Length()).
164187
Msg("received new cadence evm events")
165188

166-
batch := e.store.NewBatch()
167-
defer func(batch *pebbleDB.Batch) {
168-
err := batch.Close()
169-
if err != nil {
170-
e.log.Fatal().Err(err).Msg("failed to close batch")
189+
err := e.withBatch(
190+
func(batch *pebbleDB.Batch) error {
191+
return e.indexEvents(events, batch)
192+
},
193+
)
194+
if err != nil {
195+
return fmt.Errorf("failed to index events for cadence block %d: %w", events.CadenceHeight(), err)
196+
}
197+
198+
e.collector.CadenceHeightIndexed(events.CadenceHeight())
199+
200+
if events.Empty() {
201+
return nil // nothing else to do this was heartbeat event with not event payloads
202+
}
203+
204+
// emit block event and logs, only after we successfully commit the data
205+
e.blocksPublisher.Publish(events.Block())
206+
for _, r := range events.Receipts() {
207+
if len(r.Logs) > 0 {
208+
e.logsPublisher.Publish(r.Logs)
171209
}
172-
}(batch)
210+
}
211+
212+
e.collector.EVMTransactionIndexed(len(events.Transactions()))
213+
e.collector.EVMHeightIndexed(events.Block().Height)
214+
return nil
215+
}
173216

217+
// indexEvents will replay the evm transactions using the block events and index all results.
218+
func (e *Engine) indexEvents(events *models.CadenceEvents, batch *pebbleDB.Batch) error {
174219
// if heartbeat interval with no data still update the cadence height
175220
if events.Empty() {
176221
if err := e.blocks.SetLatestCadenceHeight(events.CadenceHeight(), batch); err != nil {
@@ -180,7 +225,6 @@ func (e *Engine) processEvents(events *models.CadenceEvents) error {
180225
err,
181226
)
182227
}
183-
e.collector.CadenceHeightIndexed(events.CadenceHeight())
184228
return nil // nothing else to do this was heartbeat event with not event payloads
185229
}
186230

@@ -262,22 +306,6 @@ func (e *Engine) processEvents(events *models.CadenceEvents) error {
262306
}
263307
}
264308

265-
if err := batch.Commit(pebbleDB.Sync); err != nil {
266-
return fmt.Errorf("failed to commit indexed data for Cadence block %d: %w", events.CadenceHeight(), err)
267-
}
268-
269-
// emit block event and logs, only after we successfully commit the data
270-
e.blocksPublisher.Publish(events.Block())
271-
272-
for _, r := range events.Receipts() {
273-
if len(r.Logs) > 0 {
274-
e.logsPublisher.Publish(r.Logs)
275-
}
276-
}
277-
278-
e.collector.EVMTransactionIndexed(len(events.Transactions()))
279-
e.collector.EVMHeightIndexed(events.Block().Height)
280-
e.collector.CadenceHeightIndexed(events.CadenceHeight())
281309
return nil
282310
}
283311

services/ingestion/engine_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,7 @@ func TestSerialBlockIngestion(t *testing.T) {
164164
go func() {
165165
err := engine.Run(context.Background())
166166
assert.ErrorIs(t, err, models.ErrInvalidHeight)
167-
assert.EqualError(t, err, "invalid height: received new block: 20, non-sequential of latest block: 11")
167+
assert.ErrorContains(t, err, "invalid height: received new block: 20, non-sequential of latest block: 11")
168168
close(waitErr)
169169
}()
170170

storage/index.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,6 @@ type BlockIndexer interface {
4242
LatestCadenceHeight() (uint64, error)
4343

4444
// SetLatestCadenceHeight sets the latest Cadence height.
45-
// Batch is required to batch multiple indexer operations, skipped if nil.
4645
SetLatestCadenceHeight(cadenceHeight uint64, batch *pebble.Batch) error
4746

4847
// GetCadenceHeight returns the Cadence height that matches the

0 commit comments

Comments
 (0)