diff --git a/op-batcher/batcher/espresso.go b/op-batcher/batcher/espresso.go index 6fd2579cba6d5..4dd8677157952 100644 --- a/op-batcher/batcher/espresso.go +++ b/op-batcher/batcher/espresso.go @@ -1,6 +1,7 @@ package batcher import ( + "errors" "fmt" "strings" "time" @@ -689,27 +690,30 @@ func (l *BatchSubmitter) queueBlockToEspresso(ctx context.Context, block *types. return nil } -func (l *BatchSubmitter) espressoSyncAndRefresh(ctx context.Context, newSyncStatus *eth.SyncStatus) { - err := l.espressoStreamer.Refresh(ctx, newSyncStatus.FinalizedL1, newSyncStatus.SafeL2.Number, newSyncStatus.SafeL2.L1Origin) - if err != nil { - l.Log.Warn("Failed to refresh Espresso streamer", "err", err) - } - +// espressoSyncAndPrune is a copied implementation of syncAndPrune, but returns +// syncActions instead of the inclusiveBlockRange. +func (l *BatchSubmitter) espressoSyncAndPrune(newSyncStatus *eth.SyncStatus) syncActions { l.channelMgrMutex.Lock() defer l.channelMgrMutex.Unlock() + + // Decide appropriate actions syncActions, outOfSync := computeSyncActions(*newSyncStatus, l.prevCurrentL1, l.channelMgr.blocks, l.channelMgr.channelQueue, l.Log, l.Config.PreferLocalSafeL2) + if outOfSync { l.Log.Warn("Sequencer is out of sync, retrying next tick.") - return + return syncActions } + l.prevCurrentL1 = newSyncStatus.CurrentL1 + + // Manage existing state / garbage collection if syncActions.clearState != nil { l.channelMgr.Clear(*syncActions.clearState) - l.espressoStreamer.Reset() } else { l.channelMgr.PruneSafeBlocks(syncActions.blocksToPrune) l.channelMgr.PruneChannels(syncActions.channelsToPrune) } + return syncActions } // AdaptL1BlockRefClient is a wrapper around eth.L1BlockRef that implements the espresso.L1Client interface @@ -734,13 +738,18 @@ func (c *AdaptL1BlockRefClient) HeaderHashByNumber(ctx context.Context, number * return expectedL1BlockRef.Hash(), nil } -// Periodically refreshes the sync status and polls Espresso streamer for new batches -func (l *BatchSubmitter) espressoBatchLoadingLoop(ctx context.Context, wg *sync.WaitGroup, publishSignal chan struct{}) { - l.Log.Info("Starting EspressoBatchLoadingLoop") +// Periodically refreshes the sync status and polls Espresso streamer +// for new batches. +// +// This is equivalent to the method `blockLoadingLoop`, but targeting Espresso +// instead of the sequencer as the source of the blocks. +func (l *BatchSubmitter) espressoBatchLoadingLoop(ctx context.Context, wg *sync.WaitGroup, pendingBytesUpdated chan int64, publishSignal chan struct{}) { + l.Log.Info("Starting EspressoBatchLoadingLoop", "pollInterval", l.Config.PollInterval) defer wg.Done() - ticker := time.NewTicker(l.Config.EspressoPollInterval) + ticker := time.NewTicker(l.Config.PollInterval) defer ticker.Stop() + defer close(pendingBytesUpdated) defer close(publishSignal) for { @@ -752,64 +761,147 @@ func (l *BatchSubmitter) espressoBatchLoadingLoop(ctx context.Context, wg *sync. continue } - l.espressoSyncAndRefresh(ctx, newSyncStatus) + syncActions := l.espressoSyncAndPrune(newSyncStatus) + if clearState := syncActions.clearState; clearState != nil { + // We reset the streamer here explicitly. + // TODO: we really want to set the read position of the + // streamer to this clear state value, if possible. + l.espressoStreamer.Reset() + } + blocksToLoad := syncActions.blocksToLoad - err = l.espressoStreamer.Update(ctx) - remainingListLen := len(l.espressoStreamer.RemainingBatches) + l.espressoStreamer.Refresh(ctx, newSyncStatus.FinalizedL1, newSyncStatus.FinalizedL2.Number, newSyncStatus.FinalizedL2.L1Origin) + remainingListLen := l.espressoStreamer.RemainingBatchesLen() if remainingListLen > 0 { l.Log.Warn("Remaining list not empty.", "Number items", remainingListLen) } - var batch *derive.EspressoBatch - - for { + if blocksToLoad != nil { + // Get fresh unsafe blocks + if err := l.espressoLoadBlocksIntoState(ctx, blocksToLoad.start, blocksToLoad.end); errors.Is(err, ErrReorg) { + l.Log.Warn("error loading blocks, clearing state and waiting for node sync", "err", err) + l.waitNodeSyncAndClearState() + } else { + l.sendToThrottlingLoop(pendingBytesUpdated) + } + } + trySignal(publishSignal) // always signal the write loop to ensure we periodically publish even if we aren't loading blocks - batch = l.espressoStreamer.Next(ctx) + case <-ctx.Done(): + l.Log.Info("espressoBatchLoadingLoop returning") + return + } + } +} - if batch == nil { - break - } +// espressoLoadBlocksIntoState loads the blocks between start and end +// (inclusive). If there is a reorg, it will return an error. +// +// This is equivalent to the method `loadBlocksIntoState`, but targeting +// Espressos instead of the sequencer as the source. +func (l *BatchSubmitter) espressoLoadBlocksIntoState(ctx context.Context, start, end uint64) error { + if end < start { + return fmt.Errorf("start number is > end number %d,%d", start, end) + } + + // we don't want to print it in the 1-block case as `loadBlockIntoState` already does + if end > start { + l.Log.Info("Loading blocks into state", "start", start, "end", end) + } + + var latestBlock *types.Block + // Add all blocks to "state" + for i := start; i <= end; i++ { + block, err := l.espressoLoadBlockIntoState(ctx, i) + if errors.Is(err, ErrReorg) { + l.Log.Warn("Found L2 reorg", "block_number", i) + return err + } else if err != nil { + l.Log.Warn("Failed to load block into state", "block_number", i, "err", err) + return err + } + latestBlock = block + } - // This should happen ONLY if the batch is malformed. ToBlock has to guarantee no - // transient errors. - block, err := batch.ToBlock(l.RollupConfig) - if err != nil { - l.Log.Error("failed to convert singular batch to block", "err", err) - continue - } + l2ref, err := derive.L2BlockToBlockRef(l.RollupConfig, latestBlock) + if err != nil { + l.Log.Warn("Invalid L2 block loaded into state", "err", err) + return err + } - l.Log.Trace( - "Received block from Espresso", - "blockNr", block.NumberU64(), - "blockHash", block.Hash(), - "parentHash", block.ParentHash(), - ) - - l.channelMgrMutex.Lock() - err = l.channelMgr.AddL2Block(block) - l.channelMgrMutex.Unlock() - - if err != nil { - l.Log.Error("failed to add L2 block to channel manager", "err", err) - l.clearState(ctx) - l.espressoStreamer.Reset() - } + l.Metr.RecordL2BlocksLoaded(l2ref) + return nil +} - l.Log.Info("Added L2 block to channel manager") +// espressoLoadBlockIntoState fetches and stores a single block into `state`. +// The block retrieved is returned. +// +// This is equivalent to the method `loadBlockIntoState`, but targeting +// Espressos instead of the sequencer as the source. +func (l *BatchSubmitter) espressoLoadBlockIntoState(ctx context.Context, blockNumber uint64) (*types.Block, error) { + for { + if !l.espressoStreamer.HasNext(ctx) { + if err := l.espressoStreamer.Update(ctx); err != nil { + return nil, err } + } - trySignal(publishSignal) + batch := l.espressoStreamer.Next(ctx) - // A failure in the streamer Update can happen after the buffer has been partially filled - if err != nil { - l.Log.Error("failed to update Espresso streamer", "err", err) - continue - } + if batch == nil { + // We don't have a batch available right now, so we will need + // to wait to see if one becomes available. + l.Log.Info("No batch available from Espresso, waiting for more data", "blockNumber", blockNumber) + time.Sleep(time.Millisecond * 100) + // Let's try again + continue + } - case <-ctx.Done(): - l.Log.Info("espressoBatchLoadingLoop returning") - return + // This should happen ONLY if the batch is malformed. ToBlock has to guarantee no + // transient errors. + block, err := batch.ToBlock(l.RollupConfig) + if err != nil { + l.Log.Error("failed to convert singular batch to block", "err", err) + continue } + + blockNumberFromBatch := block.NumberU64() + if blockNumberFromBatch < blockNumber { + l.Log.Debug("Received block with number less than expected, skipping", "blockNumber", blockNumberFromBatch, "expected", blockNumber) + // We should still be able to get the block we are looking for, we + // just need to progress forward in the stream. + + // TODO: we actually know the size of the block gap here, so we + // might be able to advance the streamer to the expected block + // position. + continue + } + + if blockNumberFromBatch != blockNumber { + // We received a block that is not the one we are looking for. + // It indicates the the streamer is ahead of where we want to be + // so we need to reset the streamer, and wait for the correct + // block to be available. + l.Log.Info("Received block with number greater than expected, resetting streamer", "blockNumber", blockNumberFromBatch, "expected", blockNumber) + l.espressoStreamer.Reset() + continue + } + + l.Log.Trace( + "Received block from Espresso", + "blockNr", block.NumberU64(), + "blockHash", block.Hash(), + "parentHash", block.ParentHash(), + ) + + l.channelMgrMutex.Lock() + defer l.channelMgrMutex.Unlock() + if err := l.channelMgr.AddL2Block(block); err != nil { + return nil, fmt.Errorf("adding L2 block to state: %w", err) + } + + l.Log.Info("Added L2 block to local state", "block", eth.ToBlockID(block), "tx_count", len(block.Transactions()), "time", block.Time()) + return block, nil } } @@ -899,7 +991,13 @@ func (l *BlockLoader) nextBlockRange(newSyncStatus *eth.SyncStatus) (inclusiveBl // State empty, just enqueue all unsafe blocks if len(l.queuedBlocks) == 0 { - return inclusiveBlockRange{safeL2.Number + 1, newSyncStatus.UnsafeL2.Number}, ActionEnqueue + if newSyncStatus.UnsafeL2.Number <= safeL2.Number+1 { + // no unsafe blocks to enqueue, just retry + l.batcher.Log.Warn("no unsafe blocks to enqueue", "safeL2", safeL2, "unsafeL2", newSyncStatus.UnsafeL2) + return inclusiveBlockRange{}, ActionRetry + } + + return inclusiveBlockRange{start: safeL2.Number + 1, end: newSyncStatus.UnsafeL2.Number}, ActionEnqueue } lastQueuedBlock := l.queuedBlocks[len(l.queuedBlocks)-1] @@ -947,7 +1045,7 @@ func (l *BlockLoader) nextBlockRange(newSyncStatus *eth.SyncStatus) (inclusiveBl l.queuedBlocks = l.queuedBlocks[numFinalizedBlocks:] } - return inclusiveBlockRange{lastQueuedBlock.Number + 1, newSyncStatus.UnsafeL2.Number}, ActionEnqueue + return inclusiveBlockRange{start: lastQueuedBlock.Number + 1, end: newSyncStatus.UnsafeL2.Number}, ActionEnqueue } // blockLoadingLoop