Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 34 additions & 2 deletions plugin/evm/block_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ type blockBuilder struct {
// but at least after a minimum delay of minBlockBuildingRetryDelay.
lastBuildParentHash common.Hash
lastBuildTime time.Time

blockNumLock sync.RWMutex
pendingBlockNum uint64
finalBlockNum uint64
}

// NewBlockBuilder creates a new block builder. extraMempool is an optional mempool (can be nil) that
Expand Down Expand Up @@ -104,10 +108,13 @@ func (b *blockBuilder) awaitSubmittedTxs() {
extraChan = b.extraMempool.SubscribePendingTxs()
}

events := make(chan core.NewTxPoolReorgEvent)
sub := b.txPool.SubscribeNewReorgEvent(events)

b.shutdownWg.Add(1)
go b.ctx.Log.RecoverAndPanic(func() {
defer b.shutdownWg.Done()

defer sub.Unsubscribe()
for {
select {
case <-txSubmitChan:
Expand All @@ -118,6 +125,13 @@ func (b *blockBuilder) awaitSubmittedTxs() {
b.signalCanBuild()
case <-b.shutdownChan:
return
case event := <-events:
if event.Head == nil || event.Head.Number == nil {
log.Warn("nil head or block number in tx pool reorg event")
continue
}
b.setFinalBlockNum(event.Head.Number.Uint64())
b.signalCanBuild()
}
}
})
Expand Down Expand Up @@ -156,7 +170,7 @@ func (b *blockBuilder) waitForEvent(ctx context.Context, currentHeader *types.He
func (b *blockBuilder) waitForNeedToBuild(ctx context.Context) (time.Time, common.Hash, error) {
b.buildBlockLock.Lock()
defer b.buildBlockLock.Unlock()
for !b.needToBuild() {
for !b.needToBuild() || b.pendingPoolUpdate() {
if err := b.pendingSignal.Wait(ctx); err != nil {
return time.Time{}, common.Hash{}, err
}
Expand Down Expand Up @@ -199,3 +213,21 @@ func minNextBlockTime(parent *types.Header) time.Time {
requiredDelay := time.Duration(acp226DelayExcess.Delay()) * time.Millisecond
return parentTime.Add(requiredDelay)
}

func (b *blockBuilder) setPendingBlockNum(pendingBlockNum uint64) {
b.blockNumLock.Lock()
defer b.blockNumLock.Unlock()
b.pendingBlockNum = pendingBlockNum
}

func (b *blockBuilder) setFinalBlockNum(num uint64) {
b.blockNumLock.Lock()
defer b.blockNumLock.Unlock()
b.finalBlockNum = num
}

func (b *blockBuilder) pendingPoolUpdate() bool {
b.blockNumLock.RLock()
defer b.blockNumLock.RUnlock()
return b.pendingBlockNum != b.finalBlockNum
}
18 changes: 17 additions & 1 deletion plugin/evm/vm.go
Original file line number Diff line number Diff line change
Expand Up @@ -1023,7 +1023,12 @@ func (vm *VM) SetPreference(ctx context.Context, blkID ids.ID) error {
return fmt.Errorf("failed to set preference to %s: %w", blkID, err)
}

return vm.blockChain.SetPreference(block.GetEthBlock())
err = vm.blockChain.SetPreference(block.GetEthBlock())
if err != nil {
return err
}
vm.setPendingBlock(block.GetEthBlock().NumberU64())
return err
}

// GetBlockIDAtHeight returns the canonical block at [height].
Expand Down Expand Up @@ -1200,6 +1205,17 @@ func attachEthService(handler *rpc.Server, apis []rpc.API, names []string) error
return nil
}

func (vm *VM) setPendingBlock(blockNum uint64) {
vm.builderLock.Lock()
defer vm.builderLock.Unlock()

if vm.builder == nil {
return
}

vm.builder.setPendingBlockNum(blockNum)
}

func (vm *VM) stateSyncEnabled(lastAcceptedHeight uint64) bool {
if vm.config.StateSyncEnabled != nil {
// if the config is set, use that
Expand Down
23 changes: 19 additions & 4 deletions plugin/evm/vm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"context"
"crypto/ecdsa"
"encoding/json"
"errors"
"fmt"
"math/big"
"os"
Expand Down Expand Up @@ -1597,14 +1598,28 @@ func TestWaitForEvent(t *testing.T) {
},
},
{
name: "WaitForEvent doesn't return if mempool is empty",
name: "WaitForEvent doesn't return once a block is built and accepted",
testCase: func(t *testing.T, vm *VM) {
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*100)
defer cancel()

var wg sync.WaitGroup
wg.Add(1)

signedTx := newSignedLegacyTx(t, vm.chainConfig, vmtest.TestKeys[0].ToECDSA(), 0, &vmtest.TestEthAddrs[1], big.NewInt(1), 21000, vmtest.InitialBaseFee, nil)

err := errors.Join(vm.txPool.AddRemotesSync([]*types.Transaction{signedTx})...)
require.NoError(t, err)

blk, err := vm.BuildBlock(context.Background())
require.NoError(t, err)

require.NoError(t, blk.Verify(context.Background()))

require.NoError(t, vm.SetPreference(context.Background(), blk.ID()))

require.NoError(t, blk.Accept(context.Background()))

ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*100)
defer cancel()

// We run WaitForEvent in a goroutine to ensure it can be safely called concurrently.
go func() {
defer wg.Done()
Expand Down
7 changes: 7 additions & 0 deletions plugin/evm/wrapped_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,13 @@ func (b *wrappedBlock) Accept(context.Context) error {
"id", blkID,
"height", b.Height(),
)

if b.ethBlock != nil {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we set this here? I think setting this in SetPreference should be enough?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because we later call vm.blockchain.Accept() and then setPreference.

vm.setPendingBlock(b.ethBlock.NumberU64())
} else {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can this happen?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, we do check this in verification

log.Warn("nil eth block")
}

// Call Accept for relevant precompile logs. Note we do this prior to
// calling Accept on the blockChain so any side effects (eg warp signatures)
// take place before the accepted log is emitted to subscribers.
Expand Down