Skip to content

Commit 6db0c9d

Browse files
sebastianstgeoknee
andauthored
op-node/rollup/derive: Reorganize baseBatchStage (#13866)
* op-node/rollup/derive: Reorganize baseBatchStage Fixes #12567 * reword comment Co-authored-by: George Knee <[email protected]> --------- Co-authored-by: George Knee <[email protected]>
1 parent 1add88a commit 6db0c9d

File tree

3 files changed

+220
-201
lines changed

3 files changed

+220
-201
lines changed
Lines changed: 208 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,208 @@
1+
package derive
2+
3+
import (
4+
"context"
5+
"io"
6+
7+
"github.com/ethereum/go-ethereum/log"
8+
9+
"github.com/ethereum-optimism/optimism/op-node/rollup"
10+
"github.com/ethereum-optimism/optimism/op-service/eth"
11+
)
12+
13+
type NextBatchProvider interface {
14+
ChannelFlusher
15+
Origin() eth.L1BlockRef
16+
NextBatch(ctx context.Context) (Batch, error)
17+
}
18+
19+
type SafeBlockFetcher interface {
20+
L2BlockRefByNumber(context.Context, uint64) (eth.L2BlockRef, error)
21+
PayloadByNumber(context.Context, uint64) (*eth.ExecutionPayloadEnvelope, error)
22+
}
23+
24+
// The baseBatchStage is a shared implementation of basic channel stage functionality. It is
25+
// currently shared between the legacy BatchQueue, which buffers future batches, and the
26+
// post-Holocene BatchStage, which requires strictly ordered batches.
27+
type baseBatchStage struct {
28+
log log.Logger
29+
config *rollup.Config
30+
prev NextBatchProvider
31+
l2 SafeBlockFetcher
32+
33+
origin eth.L1BlockRef
34+
35+
// l1Blocks contains consecutive eth.L1BlockRef sorted by time.
36+
// Every L1 origin of unsafe L2 blocks must be eventually included in l1Blocks.
37+
// Batch queue's job is to ensure below two rules:
38+
// If every L2 block corresponding to single L1 block becomes safe, it will be popped from l1Blocks.
39+
// If new L2 block's L1 origin is not included in l1Blocks, fetch and push to l1Blocks.
40+
// length of l1Blocks never exceeds SequencerWindowSize
41+
l1Blocks []eth.L1BlockRef
42+
43+
// nextSpan is cached SingularBatches derived from SpanBatch
44+
nextSpan []*SingularBatch
45+
}
46+
47+
func newBaseBatchStage(log log.Logger, cfg *rollup.Config, prev NextBatchProvider, l2 SafeBlockFetcher) baseBatchStage {
48+
return baseBatchStage{
49+
log: log,
50+
config: cfg,
51+
prev: prev,
52+
l2: l2,
53+
}
54+
}
55+
56+
func (bs *baseBatchStage) base() *baseBatchStage {
57+
return bs
58+
}
59+
60+
func (bs *baseBatchStage) Log() log.Logger {
61+
if len(bs.l1Blocks) == 0 {
62+
return bs.log.New("origin", bs.origin.ID())
63+
} else {
64+
return bs.log.New("origin", bs.origin.ID(), "epoch", bs.l1Blocks[0])
65+
}
66+
}
67+
68+
func (bs *baseBatchStage) Origin() eth.L1BlockRef {
69+
return bs.prev.Origin()
70+
}
71+
72+
// popNextBatch pops the next batch from the current queued up span-batch nextSpan.
73+
// The queue must be non-empty, or the function will panic.
74+
func (bs *baseBatchStage) popNextBatch(parent eth.L2BlockRef) *SingularBatch {
75+
if len(bs.nextSpan) == 0 {
76+
panic("popping non-existent span-batch, invalid state")
77+
}
78+
nextBatch := bs.nextSpan[0]
79+
bs.nextSpan = bs.nextSpan[1:]
80+
// Must set ParentHash before return. we can use parent because the parentCheck is verified in CheckBatch().
81+
nextBatch.ParentHash = parent.Hash
82+
bs.log.Debug("pop next batch from the cached span batch")
83+
return nextBatch
84+
}
85+
86+
// NextBatch return next valid batch upon the given safe head.
87+
// It also returns the boolean that indicates if the batch is the last block in the batch.
88+
func (bs *baseBatchStage) nextFromSpanBatch(parent eth.L2BlockRef) (*SingularBatch, bool) {
89+
if len(bs.nextSpan) > 0 {
90+
// There are cached singular batches derived from the span batch.
91+
// Check if the next cached batch matches the given parent block.
92+
if bs.nextSpan[0].Timestamp == parent.Time+bs.config.BlockTime {
93+
// Pop first one and return.
94+
nextBatch := bs.popNextBatch(parent)
95+
// len(bq.nextSpan) == 0 means it's the last batch of the span.
96+
return nextBatch, len(bs.nextSpan) == 0
97+
} else {
98+
// Given parent block does not match the next batch. It means the previously returned batch is invalid.
99+
// Drop cached batches and find another batch.
100+
bs.log.Warn("parent block does not match the next batch. dropped cached batches", "parent", parent.ID(), "nextBatchTime", bs.nextSpan[0].GetTimestamp())
101+
bs.nextSpan = bs.nextSpan[:0]
102+
}
103+
}
104+
return nil, false
105+
}
106+
107+
func (bs *baseBatchStage) updateOrigins(parent eth.L2BlockRef) {
108+
// Note: We use the origin that we will have to determine if it's behind. This is important
109+
// because it's the future origin that gets saved into the l1Blocks array.
110+
// We always update the origin of this stage if it is not the same so after the update code
111+
// runs, this is consistent.
112+
originBehind := bs.originBehind(parent)
113+
114+
// Advance origin if needed
115+
// Note: The entire pipeline has the same origin
116+
// We just don't accept batches prior to the L1 origin of the L2 safe head
117+
if bs.origin != bs.prev.Origin() {
118+
bs.origin = bs.prev.Origin()
119+
if !originBehind {
120+
bs.l1Blocks = append(bs.l1Blocks, bs.origin)
121+
} else {
122+
// This is to handle the special case of startup. At startup we call Reset & include
123+
// the L1 origin. That is the only time where immediately after `Reset` is called
124+
// originBehind is false.
125+
bs.l1Blocks = bs.l1Blocks[:0]
126+
}
127+
bs.log.Info("Advancing bq origin", "origin", bs.origin, "originBehind", originBehind)
128+
}
129+
130+
// If the epoch is advanced, update bq.l1Blocks
131+
// Before Holocene, advancing the epoch must be done after the pipeline successfully applied the entire span batch to the chain.
132+
// This is because the entire span batch can be reverted after finding an invalid batch.
133+
// So we must preserve the existing l1Blocks to verify the epochs of the next candidate batch.
134+
if len(bs.l1Blocks) > 0 && parent.L1Origin.Number > bs.l1Blocks[0].Number {
135+
for i, l1Block := range bs.l1Blocks {
136+
if parent.L1Origin.Number == l1Block.Number {
137+
bs.l1Blocks = bs.l1Blocks[i:]
138+
bs.log.Debug("Advancing internal L1 blocks", "next_epoch", bs.l1Blocks[0].ID(), "next_epoch_time", bs.l1Blocks[0].Time)
139+
break
140+
}
141+
}
142+
// If we can't find the origin of parent block, we have to advance bq.origin.
143+
}
144+
}
145+
146+
func (bs *baseBatchStage) originBehind(parent eth.L2BlockRef) bool {
147+
return bs.prev.Origin().Number < parent.L1Origin.Number
148+
}
149+
150+
func (bs *baseBatchStage) reset(base eth.L1BlockRef) {
151+
// Copy over the Origin from the next stage
152+
// It is set in the engine queue (two stages away) such that the L2 Safe Head origin is the progress
153+
bs.origin = base
154+
bs.l1Blocks = bs.l1Blocks[:0]
155+
// Include the new origin as an origin to build on
156+
// Note: This is only for the initialization case. During normal resets we will later
157+
// throw out this block.
158+
bs.l1Blocks = append(bs.l1Blocks, base)
159+
bs.nextSpan = bs.nextSpan[:0]
160+
}
161+
162+
// deriveNextEmptyBatch may derive an empty batch if the sequencing window is expired
163+
func (bs *baseBatchStage) deriveNextEmptyBatch(ctx context.Context, outOfData bool, parent eth.L2BlockRef) (*SingularBatch, error) {
164+
epoch := bs.l1Blocks[0]
165+
// If the current epoch is too old compared to the L1 block we are at,
166+
// i.e. if the sequence window expired, we create empty batches for the current epoch
167+
expiryEpoch := epoch.Number + bs.config.SeqWindowSize
168+
forceEmptyBatches := (expiryEpoch == bs.origin.Number && outOfData) || expiryEpoch < bs.origin.Number
169+
firstOfEpoch := epoch.Number == parent.L1Origin.Number+1
170+
nextTimestamp := parent.Time + bs.config.BlockTime
171+
172+
bs.log.Trace("Potentially generating an empty batch",
173+
"expiryEpoch", expiryEpoch, "forceEmptyBatches", forceEmptyBatches, "nextTimestamp", nextTimestamp,
174+
"epoch_time", epoch.Time, "len_l1_blocks", len(bs.l1Blocks), "firstOfEpoch", firstOfEpoch)
175+
176+
if !forceEmptyBatches {
177+
// sequence window did not expire yet, still room to receive batches for the current epoch,
178+
// no need to force-create empty batch(es) towards the next epoch yet.
179+
return nil, io.EOF
180+
}
181+
if len(bs.l1Blocks) < 2 {
182+
// need next L1 block to proceed towards
183+
return nil, io.EOF
184+
}
185+
186+
nextEpoch := bs.l1Blocks[1]
187+
// Fill with empty L2 blocks of the same epoch until we meet the time of the next L1 origin,
188+
// to preserve that L2 time >= L1 time. If this is the first block of the epoch, always generate a
189+
// batch to ensure that we at least have one batch per epoch.
190+
if nextTimestamp < nextEpoch.Time || firstOfEpoch {
191+
bs.log.Info("Generating next batch", "epoch", epoch, "timestamp", nextTimestamp)
192+
return &SingularBatch{
193+
ParentHash: parent.Hash,
194+
EpochNum: rollup.Epoch(epoch.Number),
195+
EpochHash: epoch.Hash,
196+
Timestamp: nextTimestamp,
197+
Transactions: nil,
198+
}, nil
199+
}
200+
201+
// At this point we have auto generated every batch for the current epoch
202+
// that we can, so we can advance to the next epoch.
203+
// TODO(12444): Instead of manually advancing the epoch here, it may be better to generate a
204+
// batch for the next epoch, so that updateOrigins then properly advances the origin.
205+
bs.log.Trace("Advancing internal L1 blocks", "next_timestamp", nextTimestamp, "next_epoch_time", nextEpoch.Time)
206+
bs.l1Blocks = bs.l1Blocks[1:]
207+
return nil, io.EOF
208+
}

0 commit comments

Comments
 (0)