Skip to content

Commit e940d2b

Browse files
committed
refactor: Block.WaitUntilExecuted instead of synchronous enqueue
1 parent f176688 commit e940d2b

File tree

7 files changed

+51
-51
lines changed

7 files changed

+51
-51
lines changed

block.go

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -43,16 +43,20 @@ func (vm *VM) AcceptBlock(ctx context.Context, b *blocks.Block) error {
4343
vm.last.settled.Store(b.LastSettled())
4444
vm.last.accepted.Store(b)
4545

46+
// This MUST NOT happen before the database and [VM.last] are updated to
47+
// reflect that the block has been accepted.
48+
if err := vm.exec.EnqueueAccepted(ctx, b); err != nil {
49+
return err
50+
}
4651
// When the chain is bootstrapping, avalanchego expects to be able to call
4752
// `Verify` and `Accept` in a loop over blocks. Reporting an error during
4853
// either `Verify` or `Accept` is considered FATAL during this process.
4954
// Therefore, we must ensure that avalanchego does not get too far ahead of
5055
// the execution thread and FATAL during block verification.
51-
execSynchronous := vm.consensusState.Get() == snow.Bootstrapping
52-
// This MUST NOT happen before the database and [VM.last] are updated to
53-
// reflect that the block has been accepted.
54-
if err := vm.exec.EnqueueAccepted(ctx, b, execSynchronous); err != nil {
55-
return err
56+
if vm.consensusState.Get() == snow.Bootstrapping {
57+
if err := b.WaitUntilExecuted(ctx); err != nil {
58+
return fmt.Errorf("waiting for block %d to execute: %v", b.Height(), err)
59+
}
5660
}
5761

5862
vm.logger().Debug(

blocks/block.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,9 @@ type Block struct {
2929
ancestry atomic.Pointer[ancestry]
3030
execution atomic.Pointer[executionResults]
3131

32+
executed chan struct{} // closed after `execution` is set
33+
settled chan struct{} // closed after `ancestry` is cleared
34+
3235
log logging.Logger
3336
}
3437

@@ -43,7 +46,9 @@ func InMemoryBlockCount() uint64 {
4346
// New constructs a new Block.
4447
func New(eth *types.Block, parent, lastSettled *Block, log logging.Logger) (*Block, error) {
4548
b := &Block{
46-
Block: eth,
49+
Block: eth,
50+
executed: make(chan struct{}),
51+
settled: make(chan struct{}),
4752
}
4853
// TODO(arr4n) change to runtime.AddCleanup after the Go version has been
4954
// bumped to >=1.24.0.

blocks/execution.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package blocks
22

33
import (
4+
"context"
45
"fmt"
56
"slices"
67
"time"
@@ -93,9 +94,21 @@ func (b *Block) markExecuted(e *executionResults) error {
9394
b.log.Error("Block re-marked as executed")
9495
return fmt.Errorf("block %d re-marked as executed", b.Height())
9596
}
97+
close(b.executed)
9698
return nil
9799
}
98100

101+
// WaitUntilExecuted blocks until either [Block.MarkExecuted] is called or the
102+
// [context.Context] is cancelled.
103+
func (b *Block) WaitUntilExecuted(ctx context.Context) error {
104+
select {
105+
case <-ctx.Done():
106+
return ctx.Err()
107+
case <-b.executed:
108+
return nil
109+
}
110+
}
111+
99112
// Executed reports whether [Block.MarkExecuted] has been called and returned
100113
// without error.
101114
func (b *Block) Executed() bool {

blocks/settlement.go

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
package blocks
22

3-
import "slices"
3+
import (
4+
"context"
5+
"slices"
6+
)
47

58
type ancestry struct {
69
parent, lastSettled *Block
@@ -17,11 +20,23 @@ func (b *Block) MarkSettled() {
1720
b.log.Fatal("Block re-settled")
1821
}
1922
if b.ancestry.CompareAndSwap(a, nil) {
23+
close(b.settled)
2024
return
2125
}
2226
b.log.Fatal("Block ancestry changed")
2327
}
2428

29+
// WaitUntilSettled blocks until either [Block.MarkSettled] is called or the
30+
// [context.Context] is cancelled.
31+
func (b *Block) WaitUntilSettled(ctx context.Context) error {
32+
select {
33+
case <-ctx.Done():
34+
return ctx.Err()
35+
case <-b.settled:
36+
return nil
37+
}
38+
}
39+
2540
// ParentBlock returns the block's parent unless [Block.MarkSettled] has been
2641
// called, in which case it returns nil.
2742
func (b *Block) ParentBlock() *Block {

sae_test.go

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -398,11 +398,7 @@ func TestBasicE2E(t *testing.T) {
398398

399399
t.Run("subscriptions", func(t *testing.T) {
400400
t.Run("head_events", func(t *testing.T) {
401-
for n := len(acceptedBlocks); !acceptedBlocks[n-1].Executed(); {
402-
// There's no need to block on execution in production so
403-
// making the executed bool a channel is overkill.
404-
runtime.Gosched()
405-
}
401+
require.NoError(t, acceptedBlocks[len(acceptedBlocks)-1].WaitUntilExecuted(ctx))
406402

407403
headSub.Unsubscribe()
408404
require.NoErrorf(t, <-headSub.Err(), "receive on %T.Err()", headSub)

saexec/execution.go

Lines changed: 6 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -23,42 +23,24 @@ import (
2323
"github.com/ava-labs/strevm/queue"
2424
)
2525

26-
// Enqueue pushes a new block to the FIFO queue. It is non-blocking unless the
27-
// `synchronous` argument is true, in which case it returns when either the
28-
// [context.Context] is cancelled or the block has been executed.
29-
//
30-
// The `synchronous` argument SHOULD be true i.f.f. the chain is bootstrapping.
31-
func (e *Executor) EnqueueAccepted(ctx context.Context, block *blocks.Block, synchronous bool) error {
32-
err := e.queue.UseThenSignal(ctx, func(q *queue.FIFO[*blocks.Block]) error {
26+
// Enqueue pushes a new block to the FIFO queue.
27+
func (e *Executor) EnqueueAccepted(ctx context.Context, block *blocks.Block) error {
28+
return e.queue.UseThenSignal(ctx, func(q *queue.FIFO[*blocks.Block]) error {
3329
q.Push(block)
34-
e.queueCleared.Block()
3530
return nil
3631
})
37-
if err != nil || !synchronous {
38-
return err
39-
}
40-
return e.queueCleared.Wait(ctx)
4132
}
4233

4334
func (e *Executor) processQueue() {
4435
ctx := e.quitCtx()
4536

4637
for {
47-
type pop struct {
48-
block *blocks.Block
49-
emptyAfter bool
50-
}
51-
52-
popped, err := sink.FromMonitor(ctx, e.queue,
38+
block, err := sink.FromMonitor(ctx, e.queue,
5339
func(q *queue.FIFO[*blocks.Block]) bool {
5440
return q.Len() > 0
5541
},
56-
func(q *queue.FIFO[*blocks.Block]) (pop, error) {
57-
b := q.Pop()
58-
return pop{
59-
block: b,
60-
emptyAfter: q.Len() == 0,
61-
}, nil
42+
func(q *queue.FIFO[*blocks.Block]) (*blocks.Block, error) {
43+
return q.Pop(), nil
6244
},
6345
)
6446
if errors.Is(err, context.Canceled) {
@@ -72,7 +54,6 @@ func (e *Executor) processQueue() {
7254
return
7355
}
7456

75-
block := popped.block
7657
switch err := e.execute(ctx, block); {
7758
case errors.Is(err, context.Canceled):
7859
return
@@ -86,13 +67,6 @@ func (e *Executor) processQueue() {
8667
)
8768
return
8869
}
89-
90-
// This may race with a concurrent call to [VM.AcceptBlock], but that is
91-
// documented and also acceptable as we only ever Wait() inside
92-
// [VM.AcceptBlock].
93-
if popped.emptyAfter {
94-
e.queueCleared.Open()
95-
}
9670
}
9771
}
9872

saexec/saexec.go

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -37,12 +37,6 @@ type Executor struct {
3737
queue sink.Monitor[*queue.FIFO[*blocks.Block]]
3838
lastExecuted *atomic.Pointer[blocks.Block] // shared with VM
3939

40-
// During bootstrapping we need to know when the queue has been executed to
41-
// avoid verification returning an error due to waiting on the execution
42-
// stream for settlement. This [sink.Gate] is only valid in the absence of
43-
// concurrent calls to [Executor.EnqueueAccepted].
44-
queueCleared sink.Gate
45-
4640
headEvents event.FeedOf[core.ChainHeadEvent]
4741
chainEvents event.FeedOf[core.ChainEvent]
4842
logEvents event.FeedOf[[]*types.Log]
@@ -119,7 +113,6 @@ func (e *Executor) init() error {
119113
func (e *Executor) Run(quit <-chan struct{}, ready chan<- struct{}) {
120114
e.quit = quit
121115
e.queue = sink.NewMonitor(new(queue.FIFO[*blocks.Block]))
122-
e.queueCleared = sink.NewGate()
123116
e.spawn(e.processQueue)
124117

125118
close(ready)

0 commit comments

Comments
 (0)