Skip to content

Commit 5cb141e

Browse files
Fix AltDA batcher (#174)
Co-authored-by: Philippe Camacho <[email protected]>
1 parent e190c0a commit 5cb141e

File tree

5 files changed

+19
-23
lines changed

5 files changed

+19
-23
lines changed

espresso/environment/2_espresso_liveness_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -283,7 +283,7 @@ func TestE2eDevNetWithEspressoEspressoDegradedLivenessViaCaffNode(t *testing.T)
283283
require.NoError(t, err, "failed to get safe L2 block ref")
284284
finalizedL1BlockRef, err := l1RefClient.L1BlockRefByLabel(streamBlocksCtx, eth.Finalized)
285285
require.NoError(t, err, "failed to get finalized L1 block ref")
286-
_, err = streamer.Refresh(streamBlocksCtx, finalizedL1BlockRef, l2BlockRef.Number, l2BlockRef.L1Origin)
286+
err = streamer.Refresh(streamBlocksCtx, finalizedL1BlockRef, l2BlockRef.Number, l2BlockRef.L1Origin)
287287
require.NoError(t, err, "failed to refresh streamer")
288288
lastTransaction := transactions[N-1]
289289

@@ -312,7 +312,7 @@ func TestE2eDevNetWithEspressoEspressoDegradedLivenessViaCaffNode(t *testing.T)
312312
safeL2, safeL2Error := l2RefClient.L2BlockRefByLabel(ctx, eth.Safe)
313313
if finalizedL1Err == nil && safeL2Error == nil {
314314
// Refresh the Streamer with the latest finalized L1 and safe L2
315-
_, err := streamer.Refresh(ctx, finalizedL1, safeL2.Number, safeL2.L1Origin)
315+
err := streamer.Refresh(ctx, finalizedL1, safeL2.Number, safeL2.L1Origin)
316316
if have, want := err, error(nil); have != want {
317317
// NOTE: we are in a go-routine here, so we are unable
318318
// to fail fatally here. Instead, we'll Fail and and

espresso/streamer.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -130,24 +130,24 @@ func (s *EspressoStreamer[B]) Reset() {
130130
}
131131

132132
// Handle both L1 reorgs and batcher restarts by updating our state in case it is
133-
// not consistent with what's on the L1. Returns true if the state was updated.
134-
func (s *EspressoStreamer[B]) Refresh(ctx context.Context, finalizedL1 eth.L1BlockRef, safeBatchNumber uint64, safeL1Origin eth.BlockID) (bool, error) {
133+
// not consistent with what's on the L1.
134+
func (s *EspressoStreamer[B]) Refresh(ctx context.Context, finalizedL1 eth.L1BlockRef, safeBatchNumber uint64, safeL1Origin eth.BlockID) error {
135135
s.FinalizedL1 = finalizedL1
136136

137137
err := s.confirmEspressoBlockHeight(safeL1Origin)
138138
if err != nil {
139-
return false, err
139+
return err
140140
}
141141

142142
// NOTE: be sure to update s.finalizedL1 before checking this condition and returning
143143
if s.fallbackBatchPos == safeBatchNumber {
144144
// This means everything is in sync, no state update needed
145-
return false, nil
145+
return nil
146146
}
147147

148148
s.fallbackBatchPos = safeBatchNumber
149149
s.Reset()
150-
return true, nil
150+
return nil
151151
}
152152

153153
func (s *EspressoStreamer[B]) CheckBatch(ctx context.Context, batch B) (BatchValidity, int) {

espresso/streamer_test.go

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -374,10 +374,7 @@ func TestStreamerSmoke(t *testing.T) {
374374

375375
// update the state of our streamer
376376
syncStatus := state.SyncStatus()
377-
updated, err := streamer.Refresh(ctx, syncStatus.FinalizedL1, syncStatus.SafeL2.Number, syncStatus.SafeL2.L1Origin)
378-
if have, want := updated, false; have != want {
379-
t.Fatalf("failed to refresh streamer state:\nhave:\n\t%v\nwant:\n\t%v\n", updated, want)
380-
}
377+
err := streamer.Refresh(ctx, syncStatus.FinalizedL1, syncStatus.SafeL2.Number, syncStatus.SafeL2.L1Origin)
381378

382379
if have, want := err, error(nil); have != want {
383380
t.Fatalf("failed to refresh streamer state encountered error:\nhave:\n\t\"%v\"\nwant:\n\t\"%v\"\n", have, want)
@@ -415,7 +412,7 @@ func TestEspressoStreamerSimpleIncremental(t *testing.T) {
415412
for i := 0; i < N; i++ {
416413
// update the state of our streamer
417414
syncStatus := state.SyncStatus()
418-
_, err := streamer.Refresh(ctx, syncStatus.FinalizedL1, syncStatus.SafeL2.Number, syncStatus.SafeL2.L1Origin)
415+
err := streamer.Refresh(ctx, syncStatus.FinalizedL1, syncStatus.SafeL2.Number, syncStatus.SafeL2.L1Origin)
419416

420417
if have, want := err, error(nil); have != want {
421418
t.Fatalf("failed to refresh streamer state encountered error:\nhave:\n\t\"%v\"\nwant:\n\t\"%v\"\n", have, want)
@@ -477,7 +474,7 @@ func TestEspressoStreamerIncrementalDelayedConsumption(t *testing.T) {
477474

478475
// update the state of our streamer
479476
syncStatus := state.SyncStatus()
480-
_, err := streamer.Refresh(ctx, syncStatus.FinalizedL1, syncStatus.SafeL2.Number, syncStatus.SafeL2.L1Origin)
477+
err := streamer.Refresh(ctx, syncStatus.FinalizedL1, syncStatus.SafeL2.Number, syncStatus.SafeL2.L1Origin)
481478

482479
for i := 0; i < N; i++ {
483480
batch, _, _, espTxnInBlock := state.CreateEspressoTxnData(
@@ -545,7 +542,7 @@ func TestStreamerEspressoOutOfOrder(t *testing.T) {
545542

546543
// update the state of our streamer
547544
syncStatus := state.SyncStatus()
548-
_, err := streamer.Refresh(ctx, syncStatus.FinalizedL1, syncStatus.SafeL2.Number, syncStatus.SafeL2.L1Origin)
545+
err := streamer.Refresh(ctx, syncStatus.FinalizedL1, syncStatus.SafeL2.Number, syncStatus.SafeL2.L1Origin)
549546

550547
if have, want := err, error(nil); have != want {
551548
t.Fatalf("failed to refresh streamer state encountered error:\nhave:\n\t\"%v\"\nwant:\n\t\"%v\"\n", have, want)
@@ -630,7 +627,7 @@ func TestEspressoStreamerDuplicationHandling(t *testing.T) {
630627

631628
// update the state of our streamer
632629
syncStatus := state.SyncStatus()
633-
_, err := streamer.Refresh(ctx, syncStatus.FinalizedL1, syncStatus.SafeL2.Number, syncStatus.SafeL2.L1Origin)
630+
err := streamer.Refresh(ctx, syncStatus.FinalizedL1, syncStatus.SafeL2.Number, syncStatus.SafeL2.L1Origin)
634631

635632
if have, want := err, error(nil); have != want {
636633
t.Fatalf("failed to refresh streamer state encountered error:\nhave:\n\t\"%v\"\nwant:\n\t\"%v\"\n", have, want)
@@ -651,7 +648,7 @@ func TestEspressoStreamerDuplicationHandling(t *testing.T) {
651648
for j := 0; j < 2; j++ {
652649
// update the state of our streamer
653650
syncStatus := state.SyncStatus()
654-
_, err := streamer.Refresh(ctx, syncStatus.FinalizedL1, syncStatus.SafeL2.Number, syncStatus.SafeL2.L1Origin)
651+
err := streamer.Refresh(ctx, syncStatus.FinalizedL1, syncStatus.SafeL2.Number, syncStatus.SafeL2.L1Origin)
655652

656653
require.NoError(t, err)
657654

op-batcher/batcher/espresso.go

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -656,8 +656,10 @@ func (l *BatchSubmitter) queueBlockToEspresso(ctx context.Context, block *types.
656656
}
657657

658658
func (l *BatchSubmitter) espressoSyncAndRefresh(ctx context.Context, newSyncStatus *eth.SyncStatus) {
659-
shouldClearState, err := l.streamer.Refresh(ctx, newSyncStatus.FinalizedL1, newSyncStatus.SafeL2.Number, newSyncStatus.SafeL2.L1Origin)
660-
shouldClearState = shouldClearState || err != nil
659+
err := l.streamer.Refresh(ctx, newSyncStatus.FinalizedL1, newSyncStatus.SafeL2.Number, newSyncStatus.SafeL2.L1Origin)
660+
if err != nil {
661+
l.Log.Warn("Failed to refresh Espresso streamer", "err", err)
662+
}
661663

662664
l.channelMgrMutex.Lock()
663665
defer l.channelMgrMutex.Unlock()
@@ -667,10 +669,7 @@ func (l *BatchSubmitter) espressoSyncAndRefresh(ctx context.Context, newSyncStat
667669
return
668670
}
669671
l.prevCurrentL1 = newSyncStatus.CurrentL1
670-
if syncActions.clearState == nil && shouldClearState {
671-
l.channelMgr.Clear(newSyncStatus.SafeL2.L1Origin)
672-
l.streamer.Reset()
673-
} else if syncActions.clearState != nil {
672+
if syncActions.clearState != nil {
674673
l.channelMgr.Clear(*syncActions.clearState)
675674
l.streamer.Reset()
676675
} else {

op-node/rollup/derive/attributes_queue.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,7 @@ func CaffNextBatch(s *espresso.EspressoStreamer[EspressoBatch], ctx context.Cont
144144
return nil, false, err
145145
}
146146
// Refresh the sync status
147-
if _, err := s.Refresh(ctx, finalizedL1Block, parent.Number, parent.L1Origin); err != nil {
147+
if err := s.Refresh(ctx, finalizedL1Block, parent.Number, parent.L1Origin); err != nil {
148148
return nil, false, err
149149
}
150150

0 commit comments

Comments
 (0)