Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
212 changes: 155 additions & 57 deletions op-batcher/batcher/espresso.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package batcher

import (
"errors"
"fmt"
"strings"
"time"
Expand Down Expand Up @@ -689,27 +690,30 @@ func (l *BatchSubmitter) queueBlockToEspresso(ctx context.Context, block *types.
return nil
}

func (l *BatchSubmitter) espressoSyncAndRefresh(ctx context.Context, newSyncStatus *eth.SyncStatus) {
err := l.espressoStreamer.Refresh(ctx, newSyncStatus.FinalizedL1, newSyncStatus.SafeL2.Number, newSyncStatus.SafeL2.L1Origin)
if err != nil {
l.Log.Warn("Failed to refresh Espresso streamer", "err", err)
}

// espressoSyncAndPrune is a copied implementation of syncAndPrune, but returns
// syncActions instead of the inclusiveBlockRange.
func (l *BatchSubmitter) espressoSyncAndPrune(newSyncStatus *eth.SyncStatus) syncActions {
l.channelMgrMutex.Lock()
defer l.channelMgrMutex.Unlock()

// Decide appropriate actions
syncActions, outOfSync := computeSyncActions(*newSyncStatus, l.prevCurrentL1, l.channelMgr.blocks, l.channelMgr.channelQueue, l.Log, l.Config.PreferLocalSafeL2)

if outOfSync {
l.Log.Warn("Sequencer is out of sync, retrying next tick.")
return
return syncActions
}

l.prevCurrentL1 = newSyncStatus.CurrentL1

// Manage existing state / garbage collection
if syncActions.clearState != nil {
l.channelMgr.Clear(*syncActions.clearState)
l.espressoStreamer.Reset()
} else {
l.channelMgr.PruneSafeBlocks(syncActions.blocksToPrune)
l.channelMgr.PruneChannels(syncActions.channelsToPrune)
}
return syncActions
}

// AdaptL1BlockRefClient is a wrapper around eth.L1BlockRef that implements the espresso.L1Client interface
Expand All @@ -734,13 +738,18 @@ func (c *AdaptL1BlockRefClient) HeaderHashByNumber(ctx context.Context, number *
return expectedL1BlockRef.Hash(), nil
}

// Periodically refreshes the sync status and polls Espresso streamer for new batches
func (l *BatchSubmitter) espressoBatchLoadingLoop(ctx context.Context, wg *sync.WaitGroup, publishSignal chan struct{}) {
l.Log.Info("Starting EspressoBatchLoadingLoop")
// Periodically refreshes the sync status and polls Espresso streamer
// for new batches.
//
// This is equivalent to the method `blockLoadingLoop`, but targeting Espresso
// instead of the sequencer as the source of the blocks.
func (l *BatchSubmitter) espressoBatchLoadingLoop(ctx context.Context, wg *sync.WaitGroup, pendingBytesUpdated chan int64, publishSignal chan struct{}) {
l.Log.Info("Starting EspressoBatchLoadingLoop", "pollInterval", l.Config.PollInterval)

defer wg.Done()
ticker := time.NewTicker(l.Config.EspressoPollInterval)
ticker := time.NewTicker(l.Config.PollInterval)
defer ticker.Stop()
defer close(pendingBytesUpdated)
defer close(publishSignal)

for {
Expand All @@ -752,64 +761,147 @@ func (l *BatchSubmitter) espressoBatchLoadingLoop(ctx context.Context, wg *sync.
continue
}

l.espressoSyncAndRefresh(ctx, newSyncStatus)
syncActions := l.espressoSyncAndPrune(newSyncStatus)
if clearState := syncActions.clearState; clearState != nil {
// We reset the streamer here explicitly.
// TODO: we really want to set the read position of the
// streamer to this clear state value, if possible.
l.espressoStreamer.Reset()
}
blocksToLoad := syncActions.blocksToLoad

err = l.espressoStreamer.Update(ctx)
remainingListLen := len(l.espressoStreamer.RemainingBatches)
l.espressoStreamer.Refresh(ctx, newSyncStatus.FinalizedL1, newSyncStatus.FinalizedL2.Number, newSyncStatus.FinalizedL2.L1Origin)
remainingListLen := l.espressoStreamer.RemainingBatchesLen()
if remainingListLen > 0 {
l.Log.Warn("Remaining list not empty.", "Number items", remainingListLen)
}

var batch *derive.EspressoBatch

for {
if blocksToLoad != nil {
// Get fresh unsafe blocks
if err := l.espressoLoadBlocksIntoState(ctx, blocksToLoad.start, blocksToLoad.end); errors.Is(err, ErrReorg) {
l.Log.Warn("error loading blocks, clearing state and waiting for node sync", "err", err)
l.waitNodeSyncAndClearState()
} else {
l.sendToThrottlingLoop(pendingBytesUpdated)
}
}
trySignal(publishSignal) // always signal the write loop to ensure we periodically publish even if we aren't loading blocks

batch = l.espressoStreamer.Next(ctx)
case <-ctx.Done():
l.Log.Info("espressoBatchLoadingLoop returning")
return
}
}
}

if batch == nil {
break
}
// espressoLoadBlocksIntoState loads the blocks between start and end
// (inclusive). If there is a reorg, it will return an error.
//
// This is equivalent to the method `loadBlocksIntoState`, but targeting
// Espressos instead of the sequencer as the source.
func (l *BatchSubmitter) espressoLoadBlocksIntoState(ctx context.Context, start, end uint64) error {
if end < start {
return fmt.Errorf("start number is > end number %d,%d", start, end)
}

// we don't want to print it in the 1-block case as `loadBlockIntoState` already does
if end > start {
l.Log.Info("Loading blocks into state", "start", start, "end", end)
}

var latestBlock *types.Block
// Add all blocks to "state"
for i := start; i <= end; i++ {
block, err := l.espressoLoadBlockIntoState(ctx, i)
if errors.Is(err, ErrReorg) {
l.Log.Warn("Found L2 reorg", "block_number", i)
return err
} else if err != nil {
l.Log.Warn("Failed to load block into state", "block_number", i, "err", err)
return err
}
latestBlock = block
}

// This should happen ONLY if the batch is malformed. ToBlock has to guarantee no
// transient errors.
block, err := batch.ToBlock(l.RollupConfig)
if err != nil {
l.Log.Error("failed to convert singular batch to block", "err", err)
continue
}
l2ref, err := derive.L2BlockToBlockRef(l.RollupConfig, latestBlock)
if err != nil {
l.Log.Warn("Invalid L2 block loaded into state", "err", err)
return err
}

l.Log.Trace(
"Received block from Espresso",
"blockNr", block.NumberU64(),
"blockHash", block.Hash(),
"parentHash", block.ParentHash(),
)

l.channelMgrMutex.Lock()
err = l.channelMgr.AddL2Block(block)
l.channelMgrMutex.Unlock()

if err != nil {
l.Log.Error("failed to add L2 block to channel manager", "err", err)
l.clearState(ctx)
l.espressoStreamer.Reset()
}
l.Metr.RecordL2BlocksLoaded(l2ref)
return nil
}

l.Log.Info("Added L2 block to channel manager")
// espressoLoadBlockIntoState fetches and stores a single block into `state`.
// The block retrieved is returned.
//
// This is equivalent to the method `loadBlockIntoState`, but targeting
// Espressos instead of the sequencer as the source.
func (l *BatchSubmitter) espressoLoadBlockIntoState(ctx context.Context, blockNumber uint64) (*types.Block, error) {
for {
if !l.espressoStreamer.HasNext(ctx) {
if err := l.espressoStreamer.Update(ctx); err != nil {
return nil, err
}
}

trySignal(publishSignal)
batch := l.espressoStreamer.Next(ctx)

// A failure in the streamer Update can happen after the buffer has been partially filled
if err != nil {
l.Log.Error("failed to update Espresso streamer", "err", err)
continue
}
if batch == nil {
// We don't have a batch available right now, so we will need
// to wait to see if one becomes available.
l.Log.Info("No batch available from Espresso, waiting for more data", "blockNumber", blockNumber)
time.Sleep(time.Millisecond * 100)
// Let's try again
continue
}

case <-ctx.Done():
l.Log.Info("espressoBatchLoadingLoop returning")
return
// This should happen ONLY if the batch is malformed. ToBlock has to guarantee no
// transient errors.
block, err := batch.ToBlock(l.RollupConfig)
if err != nil {
l.Log.Error("failed to convert singular batch to block", "err", err)
continue
}

blockNumberFromBatch := block.NumberU64()
if blockNumberFromBatch < blockNumber {
l.Log.Debug("Received block with number less than expected, skipping", "blockNumber", blockNumberFromBatch, "expected", blockNumber)
// We should still be able to get the block we are looking for, we
// just need to progress forward in the stream.

// TODO: we actually know the size of the block gap here, so we
// might be able to advance the streamer to the expected block
// position.
continue
}

if blockNumberFromBatch != blockNumber {
// We received a block that is not the one we are looking for.
// It indicates the the streamer is ahead of where we want to be
// so we need to reset the streamer, and wait for the correct
// block to be available.
l.Log.Info("Received block with number greater than expected, resetting streamer", "blockNumber", blockNumberFromBatch, "expected", blockNumber)
l.espressoStreamer.Reset()
continue
}

l.Log.Trace(
"Received block from Espresso",
"blockNr", block.NumberU64(),
"blockHash", block.Hash(),
"parentHash", block.ParentHash(),
)

l.channelMgrMutex.Lock()
defer l.channelMgrMutex.Unlock()
if err := l.channelMgr.AddL2Block(block); err != nil {
return nil, fmt.Errorf("adding L2 block to state: %w", err)
}

l.Log.Info("Added L2 block to local state", "block", eth.ToBlockID(block), "tx_count", len(block.Transactions()), "time", block.Time())
return block, nil
}
}

Expand Down Expand Up @@ -899,7 +991,13 @@ func (l *BlockLoader) nextBlockRange(newSyncStatus *eth.SyncStatus) (inclusiveBl

// State empty, just enqueue all unsafe blocks
if len(l.queuedBlocks) == 0 {
return inclusiveBlockRange{safeL2.Number + 1, newSyncStatus.UnsafeL2.Number}, ActionEnqueue
if newSyncStatus.UnsafeL2.Number <= safeL2.Number+1 {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: can we make a firstUnsafeBlock := safeL2.Number + 1 or something like that? I feel like the next few lines would be easier to read then.

// no unsafe blocks to enqueue, just retry
l.batcher.Log.Warn("no unsafe blocks to enqueue", "safeL2", safeL2, "unsafeL2", newSyncStatus.UnsafeL2)
return inclusiveBlockRange{}, ActionRetry
}

return inclusiveBlockRange{start: safeL2.Number + 1, end: newSyncStatus.UnsafeL2.Number}, ActionEnqueue
}

lastQueuedBlock := l.queuedBlocks[len(l.queuedBlocks)-1]
Expand Down Expand Up @@ -947,7 +1045,7 @@ func (l *BlockLoader) nextBlockRange(newSyncStatus *eth.SyncStatus) (inclusiveBl
l.queuedBlocks = l.queuedBlocks[numFinalizedBlocks:]
}

return inclusiveBlockRange{lastQueuedBlock.Number + 1, newSyncStatus.UnsafeL2.Number}, ActionEnqueue
return inclusiveBlockRange{start: lastQueuedBlock.Number + 1, end: newSyncStatus.UnsafeL2.Number}, ActionEnqueue
}

// blockLoadingLoop
Expand Down
Loading