diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go
index 7a0203a69382..936d0ddb384d 100644
--- a/cmd/utils/flags.go
+++ b/cmd/utils/flags.go
@@ -45,7 +45,7 @@ import (
"github.com/ethereum/go-ethereum/core/vm"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/eth"
- "github.com/ethereum/go-ethereum/eth/catalyst"
+ fcatalyst "github.com/ethereum/go-ethereum/eth/catalyst"
"github.com/ethereum/go-ethereum/eth/downloader"
"github.com/ethereum/go-ethereum/eth/ethconfig"
"github.com/ethereum/go-ethereum/eth/gasprice"
@@ -56,6 +56,7 @@ import (
"github.com/ethereum/go-ethereum/internal/ethapi"
"github.com/ethereum/go-ethereum/internal/flags"
"github.com/ethereum/go-ethereum/les"
+ lcatalyst "github.com/ethereum/go-ethereum/les/catalyst"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/metrics/exp"
@@ -1718,7 +1719,7 @@ func RegisterEthService(stack *node.Node, cfg *ethconfig.Config, isCatalyst bool
}
stack.RegisterAPIs(tracers.APIs(backend.ApiBackend))
if isCatalyst {
- if err := catalyst.RegisterLight(stack, backend); err != nil {
+ if err := lcatalyst.Register(stack, backend); err != nil {
Fatalf("Failed to register the catalyst service: %v", err)
}
}
@@ -1735,7 +1736,7 @@ func RegisterEthService(stack *node.Node, cfg *ethconfig.Config, isCatalyst bool
}
}
if isCatalyst {
- if err := catalyst.Register(stack, backend); err != nil {
+ if err := fcatalyst.Register(stack, backend); err != nil {
Fatalf("Failed to register the catalyst service: %v", err)
}
}
diff --git a/core/beacon/errors.go b/core/beacon/errors.go
new file mode 100644
index 000000000000..58d52467acc5
--- /dev/null
+++ b/core/beacon/errors.go
@@ -0,0 +1,33 @@
+// Copyright 2022 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see
+
+package beacon
+
+import "github.com/ethereum/go-ethereum/rpc"
+
+var (
+ VALID = "VALID"
+ INVALIDBLOCKHASH = "INVALID_BLOCK_HASH"
+ ACCEPTED = "ACCEPTED"
+ INVALID = "INVALID"
+ SYNCING = "SYNCING"
+ STATUS_SUCCESS = ForkChoiceResponse{PayloadStatus: PayloadStatusV1{Status: VALID}, PayloadID: nil}
+ STATUS_INVALID = ForkChoiceResponse{PayloadStatus: PayloadStatusV1{Status: INVALID}, PayloadID: nil}
+ STATUS_SYNCING = ForkChoiceResponse{PayloadStatus: PayloadStatusV1{Status: SYNCING}, PayloadID: nil}
+ GenericServerError = rpc.CustomError{Code: -32000, ValidationError: "Server error"}
+ UnknownPayload = rpc.CustomError{Code: -32001, ValidationError: "Unknown payload"}
+ InvalidTB = rpc.CustomError{Code: -32002, ValidationError: "Invalid terminal block"}
+)
diff --git a/eth/catalyst/gen_blockparams.go b/core/beacon/gen_blockparams.go
similarity index 99%
rename from eth/catalyst/gen_blockparams.go
rename to core/beacon/gen_blockparams.go
index ccf5c327ffa3..d3d569b7da75 100644
--- a/eth/catalyst/gen_blockparams.go
+++ b/core/beacon/gen_blockparams.go
@@ -1,6 +1,6 @@
// Code generated by github.com/fjl/gencodec. DO NOT EDIT.
-package catalyst
+package beacon
import (
"encoding/json"
diff --git a/eth/catalyst/gen_ed.go b/core/beacon/gen_ed.go
similarity index 99%
rename from eth/catalyst/gen_ed.go
rename to core/beacon/gen_ed.go
index 46eb45808bca..ac94f49a562a 100644
--- a/eth/catalyst/gen_ed.go
+++ b/core/beacon/gen_ed.go
@@ -1,6 +1,6 @@
// Code generated by github.com/fjl/gencodec. DO NOT EDIT.
-package catalyst
+package beacon
import (
"encoding/json"
diff --git a/eth/catalyst/api_types.go b/core/beacon/types.go
similarity index 57%
rename from eth/catalyst/api_types.go
rename to core/beacon/types.go
index 07636239fac3..2d06fed01c7b 100644
--- a/eth/catalyst/api_types.go
+++ b/core/beacon/types.go
@@ -1,4 +1,4 @@
-// Copyright 2020 The go-ethereum Authors
+// Copyright 2022 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
@@ -14,7 +14,7 @@
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see .
-package catalyst
+package beacon
import (
"fmt"
@@ -22,6 +22,8 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
+ "github.com/ethereum/go-ethereum/core/types"
+ "github.com/ethereum/go-ethereum/trie"
)
//go:generate go run github.com/fjl/gencodec -type PayloadAttributesV1 -field-override payloadAttributesMarshaling -out gen_blockparams.go
@@ -45,7 +47,7 @@ type ExecutableDataV1 struct {
ParentHash common.Hash `json:"parentHash" gencodec:"required"`
FeeRecipient common.Address `json:"feeRecipient" gencodec:"required"`
StateRoot common.Hash `json:"stateRoot" gencodec:"required"`
- ReceiptsRoot common.Hash `json:"receiptsRoot" gencodec:"required"`
+ ReceiptsRoot common.Hash `json:"receiptsRoot" gencodec:"required"`
LogsBloom []byte `json:"logsBloom" gencodec:"required"`
Random common.Hash `json:"random" gencodec:"required"`
Number uint64 `json:"blockNumber" gencodec:"required"`
@@ -82,9 +84,10 @@ type GenericStringResponse struct {
Status string `json:"status"`
}
-type ExecutePayloadResponse struct {
+type PayloadStatusV1 struct {
Status string `json:"status"`
LatestValidHash common.Hash `json:"latestValidHash"`
+ ValidationError string `json:"validationError"`
}
type ConsensusValidatedParams struct {
@@ -112,8 +115,8 @@ func (b *PayloadID) UnmarshalText(input []byte) error {
}
type ForkChoiceResponse struct {
- Status string `json:"status"`
- PayloadID *PayloadID `json:"payloadId"`
+ PayloadStatus PayloadStatusV1 `json:"payloadStatus"`
+ PayloadID *PayloadID `json:"payloadId"`
}
type ForkchoiceStateV1 struct {
@@ -121,3 +124,82 @@ type ForkchoiceStateV1 struct {
SafeBlockHash common.Hash `json:"safeBlockHash"`
FinalizedBlockHash common.Hash `json:"finalizedBlockHash"`
}
+
+func encodeTransactions(txs []*types.Transaction) [][]byte {
+ var enc = make([][]byte, len(txs))
+ for i, tx := range txs {
+ enc[i], _ = tx.MarshalBinary()
+ }
+ return enc
+}
+
+func decodeTransactions(enc [][]byte) ([]*types.Transaction, error) {
+ var txs = make([]*types.Transaction, len(enc))
+ for i, encTx := range enc {
+ var tx types.Transaction
+ if err := tx.UnmarshalBinary(encTx); err != nil {
+ return nil, fmt.Errorf("invalid transaction %d: %v", i, err)
+ }
+ txs[i] = &tx
+ }
+ return txs, nil
+}
+
+// ExecutableDataToBlock constructs a block from executable data.
+// It verifies that the following fields:
+// len(extraData) <= 32
+// uncleHash = emptyUncleHash
+// difficulty = 0
+// and that the blockhash of the constructed block matches the parameters.
+func ExecutableDataToBlock(params ExecutableDataV1) (*types.Block, error) {
+ txs, err := decodeTransactions(params.Transactions)
+ if err != nil {
+ return nil, err
+ }
+ if len(params.ExtraData) > 32 {
+ return nil, fmt.Errorf("invalid extradata length: %v", len(params.ExtraData))
+ }
+ header := &types.Header{
+ ParentHash: params.ParentHash,
+ UncleHash: types.EmptyUncleHash,
+ Coinbase: params.FeeRecipient,
+ Root: params.StateRoot,
+ TxHash: types.DeriveSha(types.Transactions(txs), trie.NewStackTrie(nil)),
+ ReceiptHash: params.ReceiptsRoot,
+ Bloom: types.BytesToBloom(params.LogsBloom),
+ Difficulty: common.Big0,
+ Number: new(big.Int).SetUint64(params.Number),
+ GasLimit: params.GasLimit,
+ GasUsed: params.GasUsed,
+ Time: params.Timestamp,
+ BaseFee: params.BaseFeePerGas,
+ Extra: params.ExtraData,
+ MixDigest: params.Random,
+ }
+ block := types.NewBlockWithHeader(header).WithBody(txs, nil /* uncles */)
+ if block.Hash() != params.BlockHash {
+ return nil, fmt.Errorf("blockhash mismatch, want %x, got %x", params.BlockHash, block.Hash())
+ }
+ return block, nil
+}
+
+// BlockToExecutableData constructs the executableDataV1 structure by filling the
+// fields from the given block. It assumes the given block is post-merge block.
+func BlockToExecutableData(block *types.Block) *ExecutableDataV1 {
+ return &ExecutableDataV1{
+ BlockHash: block.Hash(),
+ ParentHash: block.ParentHash(),
+ FeeRecipient: block.Coinbase(),
+ StateRoot: block.Root(),
+ Number: block.NumberU64(),
+ GasLimit: block.GasLimit(),
+ GasUsed: block.GasUsed(),
+ BaseFeePerGas: block.BaseFee(),
+ Timestamp: block.Time(),
+ ReceiptsRoot: block.ReceiptHash(),
+ LogsBloom: block.Bloom().Bytes(),
+ Transactions: encodeTransactions(block.Transactions()),
+ Random: block.MixDigest(),
+ ExtraData: block.Extra(),
+ }
+}
diff --git a/core/rawdb/accessors_sync.go b/core/rawdb/accessors_sync.go
new file mode 100644
index 000000000000..50dfb848e4e0
--- /dev/null
+++ b/core/rawdb/accessors_sync.go
@@ -0,0 +1,80 @@
+// Copyright 2021 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see .
+
+package rawdb
+
+import (
+ "bytes"
+
+ "github.com/ethereum/go-ethereum/core/types"
+ "github.com/ethereum/go-ethereum/ethdb"
+ "github.com/ethereum/go-ethereum/log"
+ "github.com/ethereum/go-ethereum/rlp"
+)
+
+// ReadSkeletonSyncStatus retrieves the serialized sync status saved at shutdown.
+func ReadSkeletonSyncStatus(db ethdb.KeyValueReader) []byte {
+ data, _ := db.Get(skeletonSyncStatusKey)
+ return data
+}
+
+// WriteSkeletonSyncStatus stores the serialized sync status to save at shutdown.
+func WriteSkeletonSyncStatus(db ethdb.KeyValueWriter, status []byte) {
+ if err := db.Put(skeletonSyncStatusKey, status); err != nil {
+ log.Crit("Failed to store skeleton sync status", "err", err)
+ }
+}
+
+// DeleteSkeletonSyncStatus deletes the serialized sync status saved at the last
+// shutdown
+func DeleteSkeletonSyncStatus(db ethdb.KeyValueWriter) {
+ if err := db.Delete(skeletonSyncStatusKey); err != nil {
+ log.Crit("Failed to remove skeleton sync status", "err", err)
+ }
+}
+
+// ReadSkeletonHeader retrieves a block header from the skeleton sync store,
+func ReadSkeletonHeader(db ethdb.KeyValueReader, number uint64) *types.Header {
+ data, _ := db.Get(skeletonHeaderKey(number))
+ if len(data) == 0 {
+ return nil
+ }
+ header := new(types.Header)
+ if err := rlp.Decode(bytes.NewReader(data), header); err != nil {
+ log.Error("Invalid skeleton header RLP", "number", number, "err", err)
+ return nil
+ }
+ return header
+}
+
+// WriteSkeletonHeader stores a block header into the skeleton sync store.
+func WriteSkeletonHeader(db ethdb.KeyValueWriter, header *types.Header) {
+ data, err := rlp.EncodeToBytes(header)
+ if err != nil {
+ log.Crit("Failed to RLP encode header", "err", err)
+ }
+ key := skeletonHeaderKey(header.Number.Uint64())
+ if err := db.Put(key, data); err != nil {
+ log.Crit("Failed to store skeleton header", "err", err)
+ }
+}
+
+// DeleteSkeletonHeader removes all block header data associated with a hash.
+func DeleteSkeletonHeader(db ethdb.KeyValueWriter, number uint64) {
+ if err := db.Delete(skeletonHeaderKey(number)); err != nil {
+ log.Crit("Failed to delete skeleton header", "err", err)
+ }
+}
diff --git a/core/rawdb/schema.go b/core/rawdb/schema.go
index b35fcba45f79..f869f36e1040 100644
--- a/core/rawdb/schema.go
+++ b/core/rawdb/schema.go
@@ -63,6 +63,9 @@ var (
// snapshotSyncStatusKey tracks the snapshot sync status across restarts.
snapshotSyncStatusKey = []byte("SnapshotSyncStatus")
+ // skeletonSyncStatusKey tracks the skeleton sync status across restarts.
+ skeletonSyncStatusKey = []byte("SkeletonSyncStatus")
+
// txIndexTailKey tracks the oldest block whose transactions have been indexed.
txIndexTailKey = []byte("TransactionIndexTail")
@@ -92,6 +95,7 @@ var (
SnapshotAccountPrefix = []byte("a") // SnapshotAccountPrefix + account hash -> account trie value
SnapshotStoragePrefix = []byte("o") // SnapshotStoragePrefix + account hash + storage hash -> storage trie value
CodePrefix = []byte("c") // CodePrefix + code hash -> account code
+ skeletonHeaderPrefix = []byte("S") // skeletonHeaderPrefox + num (uint64 big endian) -> header
PreimagePrefix = []byte("secure-key-") // PreimagePrefix + hash -> preimage
configPrefix = []byte("ethereum-config-") // config prefix for the db
@@ -210,6 +214,11 @@ func bloomBitsKey(bit uint, section uint64, hash common.Hash) []byte {
return key
}
+// skeletonHeaderKey = skeletonHeaderPrefix + num (uint64 big endian)
+func skeletonHeaderKey(number uint64) []byte {
+ return append(skeletonHeaderPrefix, encodeBlockNumber(number)...)
+}
+
// preimageKey = PreimagePrefix + hash
func preimageKey(hash common.Hash) []byte {
return append(PreimagePrefix, hash.Bytes()...)
diff --git a/eth/api.go b/eth/api.go
index f81dfa922b7a..bc2a615ad1dd 100644
--- a/eth/api.go
+++ b/eth/api.go
@@ -257,6 +257,16 @@ func (api *PrivateAdminAPI) ImportChain(file string) (bool, error) {
return true, nil
}
+// NewHead requests the node to beacon-sync to the designated head header.
+func (api *PrivateAdminAPI) NewHead(blob hexutil.Bytes) error {
+ header := new(types.Header)
+ if err := rlp.DecodeBytes(blob, header); err != nil {
+ return err
+ }
+ mode, _ := api.eth.handler.chainSync.modeAndLocalHead()
+ return api.eth.Downloader().BeaconSync(mode, header)
+}
+
// PublicDebugAPI is the collection of Ethereum full node APIs exposed
// over the public debugging endpoint.
type PublicDebugAPI struct {
diff --git a/eth/catalyst/api.go b/eth/catalyst/api.go
index 1087496d167c..c26f66ab6757 100644
--- a/eth/catalyst/api.go
+++ b/eth/catalyst/api.go
@@ -20,30 +20,19 @@ package catalyst
import (
"crypto/sha256"
"encoding/binary"
- "errors"
"fmt"
- "math/big"
"github.com/ethereum/go-ethereum/common"
- "github.com/ethereum/go-ethereum/consensus"
+ "github.com/ethereum/go-ethereum/core/beacon"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/eth"
- "github.com/ethereum/go-ethereum/les"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/rpc"
- "github.com/ethereum/go-ethereum/trie"
+ "github.com/hashicorp/golang-lru/simplelru"
)
-var (
- VALID = GenericStringResponse{"VALID"}
- SUCCESS = GenericStringResponse{"SUCCESS"}
- INVALID = ForkChoiceResponse{Status: "INVALID", PayloadID: nil}
- SYNCING = ForkChoiceResponse{Status: "SYNCING", PayloadID: nil}
- GenericServerError = rpc.CustomError{Code: -32000, ValidationError: "Server error"}
- UnknownPayload = rpc.CustomError{Code: -32001, ValidationError: "Unknown payload"}
- InvalidTB = rpc.CustomError{Code: -32002, ValidationError: "Invalid terminal block"}
-)
+const headerCacheSize = 1024
// Register adds catalyst APIs to the full node.
func Register(stack *node.Node, backend *eth.Ethereum) error {
@@ -52,21 +41,7 @@ func Register(stack *node.Node, backend *eth.Ethereum) error {
{
Namespace: "engine",
Version: "1.0",
- Service: NewConsensusAPI(backend, nil),
- Public: true,
- },
- })
- return nil
-}
-
-// RegisterLight adds catalyst APIs to the light client.
-func RegisterLight(stack *node.Node, backend *les.LightEthereum) error {
- log.Warn("Catalyst mode enabled", "protocol", "les")
- stack.RegisterAPIs([]rpc.API{
- {
- Namespace: "engine",
- Version: "1.0",
- Service: NewConsensusAPI(nil, backend),
+ Service: NewConsensusAPI(backend),
Public: true,
},
})
@@ -74,255 +49,163 @@ func RegisterLight(stack *node.Node, backend *les.LightEthereum) error {
}
type ConsensusAPI struct {
- light bool
eth *eth.Ethereum
- les *les.LightEthereum
preparedBlocks *payloadQueue // preparedBlocks caches payloads (*ExecutableDataV1) by payload ID (PayloadID)
+ headerCache *simplelru.LRU
}
-func NewConsensusAPI(eth *eth.Ethereum, les *les.LightEthereum) *ConsensusAPI {
- if eth == nil {
- if les.BlockChain().Config().TerminalTotalDifficulty == nil {
- panic("Catalyst started without valid total difficulty")
- }
- } else {
- if eth.BlockChain().Config().TerminalTotalDifficulty == nil {
- panic("Catalyst started without valid total difficulty")
- }
- }
-
+// NewConsensusAPI creates a new consensus api for the given backend.
+// The underlying blockchain needs to have a valid terminal total difficulty set.
+func NewConsensusAPI(eth *eth.Ethereum) *ConsensusAPI {
+ if eth.BlockChain().Config().TerminalTotalDifficulty == nil {
+ panic("Catalyst started without valid total difficulty")
+ }
+ // Setup cache
+ cache, _ := simplelru.NewLRU(headerCacheSize, func(key, value interface{}) {
+ log.Trace("Evicted header from catalyst cache", "hash", key)
+ })
return &ConsensusAPI{
- light: eth == nil,
eth: eth,
- les: les,
preparedBlocks: newPayloadQueue(),
+ headerCache: cache,
}
}
-func (api *ConsensusAPI) GetPayloadV1(payloadID PayloadID) (*ExecutableDataV1, error) {
- log.Trace("Engine API request received", "method", "GetPayload", "id", payloadID)
- data := api.preparedBlocks.get(payloadID)
- if data == nil {
- return nil, &UnknownPayload
- }
- return data, nil
-}
-
-func (api *ConsensusAPI) ForkchoiceUpdatedV1(heads ForkchoiceStateV1, payloadAttributes *PayloadAttributesV1) (ForkChoiceResponse, error) {
+// ForkchoiceUpdatedV1 has several responsibilities:
+// If the method is called with an empty head block:
+// we return success, which can be used to check if the catalyst mode is enabled
+// If the total difficulty was not reached:
+// we return INVALID
+// If the finalizedBlockHash is set:
+// we check if we have the finalizedBlockHash in our db, if not we start a sync
+// We try to set our blockchain to the headBlock
+// If there are payloadAttributes:
+// we try to assemble a block with the payloadAttributes and return its payloadID
+func (api *ConsensusAPI) ForkchoiceUpdatedV1(heads beacon.ForkchoiceStateV1, payloadAttributes *beacon.PayloadAttributesV1) (beacon.ForkChoiceResponse, error) {
log.Trace("Engine API request received", "method", "ForkChoiceUpdated", "head", heads.HeadBlockHash, "finalized", heads.FinalizedBlockHash, "safe", heads.SafeBlockHash)
if heads.HeadBlockHash == (common.Hash{}) {
- return ForkChoiceResponse{Status: SUCCESS.Status, PayloadID: nil}, nil
+ return beacon.STATUS_SUCCESS, nil
}
if err := api.checkTerminalTotalDifficulty(heads.HeadBlockHash); err != nil {
- if api.light {
- if header := api.les.BlockChain().GetHeaderByHash(heads.HeadBlockHash); header == nil {
- // TODO (MariusVanDerWijden) trigger sync
- return SYNCING, nil
- }
- return INVALID, err
- } else {
- if block := api.eth.BlockChain().GetBlockByHash(heads.HeadBlockHash); block == nil {
- // TODO (MariusVanDerWijden) trigger sync
- return SYNCING, nil
- }
- return INVALID, err
+ if block := api.eth.BlockChain().GetBlockByHash(heads.HeadBlockHash); block == nil {
+ // TODO (MariusVanDerWijden) trigger sync
+ return beacon.STATUS_SYNCING, nil
}
+ return beacon.ForkChoiceResponse{PayloadStatus: api.invalid(err)}, nil
}
// If the finalized block is set, check if it is in our blockchain
if heads.FinalizedBlockHash != (common.Hash{}) {
- if api.light {
- if header := api.les.BlockChain().GetHeaderByHash(heads.FinalizedBlockHash); header == nil {
- // TODO (MariusVanDerWijden) trigger sync
- return SYNCING, nil
- }
- } else {
- if block := api.eth.BlockChain().GetBlockByHash(heads.FinalizedBlockHash); block == nil {
- // TODO (MariusVanDerWijden) trigger sync
- return SYNCING, nil
- }
+ if block := api.eth.BlockChain().GetBlockByHash(heads.FinalizedBlockHash); block == nil {
+ // TODO (MariusVanDerWijden) trigger sync
+ return beacon.STATUS_SYNCING, nil
}
}
+ if block := api.eth.BlockChain().GetBlockByHash(heads.HeadBlockHash); block == nil {
+ // block not in bc, try cache
+ header, ok := api.headerCache.Get(heads.HeadBlockHash)
+ if !ok {
+ log.Warn("Could not find head in cache, need a new_payload with it before fcu")
+ return beacon.STATUS_SYNCING, nil
+ }
+ if err := api.eth.Downloader().BeaconSync(api.eth.SyncMode(), header.(*types.Header)); err != nil {
+ return beacon.STATUS_SYNCING, err
+ }
+ // TODO (MariusVanDerWijden) trigger sync
+ return beacon.STATUS_SYNCING, nil
+ }
// SetHead
if err := api.setHead(heads.HeadBlockHash); err != nil {
- return INVALID, err
+ return beacon.ForkChoiceResponse{PayloadStatus: api.invalid(err)}, nil
}
// Assemble block (if needed). It only works for full node.
- if !api.light && payloadAttributes != nil {
+ if payloadAttributes != nil {
data, err := api.assembleBlock(heads.HeadBlockHash, payloadAttributes)
if err != nil {
- return INVALID, err
+ return beacon.STATUS_INVALID, err
}
id := computePayloadId(heads.HeadBlockHash, payloadAttributes)
api.preparedBlocks.put(id, data)
log.Info("Created payload", "payloadID", id)
- return ForkChoiceResponse{Status: SUCCESS.Status, PayloadID: &id}, nil
+ return beacon.ForkChoiceResponse{PayloadStatus: beacon.PayloadStatusV1{Status: beacon.VALID}, PayloadID: &id}, nil
}
- return ForkChoiceResponse{Status: SUCCESS.Status, PayloadID: nil}, nil
-}
-
-func computePayloadId(headBlockHash common.Hash, params *PayloadAttributesV1) PayloadID {
- // Hash
- hasher := sha256.New()
- hasher.Write(headBlockHash[:])
- binary.Write(hasher, binary.BigEndian, params.Timestamp)
- hasher.Write(params.Random[:])
- hasher.Write(params.SuggestedFeeRecipient[:])
- var out PayloadID
- copy(out[:], hasher.Sum(nil)[:8])
- return out
+ return beacon.STATUS_SUCCESS, nil
}
-func (api *ConsensusAPI) invalid() ExecutePayloadResponse {
- if api.light {
- return ExecutePayloadResponse{Status: INVALID.Status, LatestValidHash: api.les.BlockChain().CurrentHeader().Hash()}
+// GetPayloadV1 returns a cached payload by id.
+func (api *ConsensusAPI) GetPayloadV1(payloadID beacon.PayloadID) (*beacon.ExecutableDataV1, error) {
+ log.Trace("Engine API request received", "method", "GetPayload", "id", payloadID)
+ data := api.preparedBlocks.get(payloadID)
+ if data == nil {
+ return nil, &beacon.UnknownPayload
}
- return ExecutePayloadResponse{Status: INVALID.Status, LatestValidHash: api.eth.BlockChain().CurrentHeader().Hash()}
+ return data, nil
}
-// ExecutePayloadV1 creates an Eth1 block, inserts it in the chain, and returns the status of the chain.
-func (api *ConsensusAPI) ExecutePayloadV1(params ExecutableDataV1) (ExecutePayloadResponse, error) {
+// NewPayloadV1 creates an Eth1 block, inserts it in the chain, and returns the status of the chain.
+func (api *ConsensusAPI) NewPayloadV1(params beacon.ExecutableDataV1) (beacon.PayloadStatusV1, error) {
log.Trace("Engine API request received", "method", "ExecutePayload", params.BlockHash, "number", params.Number)
- block, err := ExecutableDataToBlock(params)
+ block, err := beacon.ExecutableDataToBlock(params)
if err != nil {
- return api.invalid(), err
- }
- if api.light {
- if !api.les.BlockChain().HasHeader(block.ParentHash(), block.NumberU64()-1) {
- /*
- TODO (MariusVanDerWijden) reenable once sync is merged
- if err := api.eth.Downloader().BeaconSync(api.eth.SyncMode(), block.Header()); err != nil {
- return SYNCING, err
- }
- */
- // TODO (MariusVanDerWijden) we should return nil here not empty hash
- return ExecutePayloadResponse{Status: SYNCING.Status, LatestValidHash: common.Hash{}}, nil
- }
- parent := api.les.BlockChain().GetHeaderByHash(params.ParentHash)
- td := api.les.BlockChain().GetTd(parent.Hash(), block.NumberU64()-1)
- ttd := api.les.BlockChain().Config().TerminalTotalDifficulty
- if td.Cmp(ttd) < 0 {
- return api.invalid(), fmt.Errorf("can not execute payload on top of block with low td got: %v threshold %v", td, ttd)
- }
- if err = api.les.BlockChain().InsertHeader(block.Header()); err != nil {
- return api.invalid(), err
- }
- if merger := api.merger(); !merger.TDDReached() {
- merger.ReachTTD()
- }
- return ExecutePayloadResponse{Status: VALID.Status, LatestValidHash: block.Hash()}, nil
+ return beacon.PayloadStatusV1{Status: beacon.INVALIDBLOCKHASH}, nil
}
+ // block is a valid block, store in cache for later use
+ api.headerCache.Add(block.Hash(), block.Header())
+
if !api.eth.BlockChain().HasBlock(block.ParentHash(), block.NumberU64()-1) {
/*
- TODO (MariusVanDerWijden) reenable once sync is merged
+ TODO (MariusVanDerWijden) maybe reenable once sync is merged
if err := api.eth.Downloader().BeaconSync(api.eth.SyncMode(), block.Header()); err != nil {
return SYNCING, err
}
*/
// TODO (MariusVanDerWijden) we should return nil here not empty hash
- return ExecutePayloadResponse{Status: SYNCING.Status, LatestValidHash: common.Hash{}}, nil
+ return beacon.PayloadStatusV1{Status: beacon.ACCEPTED}, nil
}
parent := api.eth.BlockChain().GetBlockByHash(params.ParentHash)
td := api.eth.BlockChain().GetTd(parent.Hash(), block.NumberU64()-1)
ttd := api.eth.BlockChain().Config().TerminalTotalDifficulty
if td.Cmp(ttd) < 0 {
- return api.invalid(), fmt.Errorf("can not execute payload on top of block with low td got: %v threshold %v", td, ttd)
+ err := fmt.Errorf("can not execute payload on top of block with low td got: %v threshold %v", td, ttd)
+ return api.invalid(err), nil
}
log.Trace("Inserting block without head", "hash", block.Hash(), "number", block.Number)
if err := api.eth.BlockChain().InsertBlockWithoutSetHead(block); err != nil {
- return api.invalid(), err
+ return api.invalid(err), nil
}
- if merger := api.merger(); !merger.TDDReached() {
+ if merger := api.eth.Merger(); !merger.TDDReached() {
merger.ReachTTD()
}
- return ExecutePayloadResponse{Status: VALID.Status, LatestValidHash: block.Hash()}, nil
+ return beacon.PayloadStatusV1{Status: beacon.VALID, LatestValidHash: block.Hash()}, nil
}
-// AssembleBlock creates a new block, inserts it into the chain, and returns the "execution
-// data" required for eth2 clients to process the new block.
-func (api *ConsensusAPI) assembleBlock(parentHash common.Hash, params *PayloadAttributesV1) (*ExecutableDataV1, error) {
- if api.light {
- return nil, errors.New("not supported")
- }
- log.Info("Producing block", "parentHash", parentHash)
- block, err := api.eth.Miner().GetSealingBlock(parentHash, params.Timestamp, params.SuggestedFeeRecipient, params.Random)
- if err != nil {
- return nil, err
- }
- return BlockToExecutableData(block), nil
+// computePayloadId computes a pseudo-random payloadid, based on the parameters.
+func computePayloadId(headBlockHash common.Hash, params *beacon.PayloadAttributesV1) beacon.PayloadID {
+ // Hash
+ hasher := sha256.New()
+ hasher.Write(headBlockHash[:])
+ binary.Write(hasher, binary.BigEndian, params.Timestamp)
+ hasher.Write(params.Random[:])
+ hasher.Write(params.SuggestedFeeRecipient[:])
+ var out beacon.PayloadID
+ copy(out[:], hasher.Sum(nil)[:8])
+ return out
}
-func encodeTransactions(txs []*types.Transaction) [][]byte {
- var enc = make([][]byte, len(txs))
- for i, tx := range txs {
- enc[i], _ = tx.MarshalBinary()
- }
- return enc
+// invalid returns a response "INVALID" with the latest valid hash set to the current head.
+func (api *ConsensusAPI) invalid(err error) beacon.PayloadStatusV1 {
+ return beacon.PayloadStatusV1{Status: beacon.INVALID, LatestValidHash: api.eth.BlockChain().CurrentHeader().Hash(), ValidationError: err.Error()}
}
-func decodeTransactions(enc [][]byte) ([]*types.Transaction, error) {
- var txs = make([]*types.Transaction, len(enc))
- for i, encTx := range enc {
- var tx types.Transaction
- if err := tx.UnmarshalBinary(encTx); err != nil {
- return nil, fmt.Errorf("invalid transaction %d: %v", i, err)
- }
- txs[i] = &tx
- }
- return txs, nil
-}
-
-func ExecutableDataToBlock(params ExecutableDataV1) (*types.Block, error) {
- txs, err := decodeTransactions(params.Transactions)
+// assembleBlock creates a new block and returns the "execution
+// data" required for beacon clients to process the new block.
+func (api *ConsensusAPI) assembleBlock(parentHash common.Hash, params *beacon.PayloadAttributesV1) (*beacon.ExecutableDataV1, error) {
+ log.Info("Producing block", "parentHash", parentHash)
+ block, err := api.eth.Miner().GetSealingBlock(parentHash, params.Timestamp, params.SuggestedFeeRecipient, params.Random)
if err != nil {
return nil, err
}
- if len(params.ExtraData) > 32 {
- return nil, fmt.Errorf("invalid extradata length: %v", len(params.ExtraData))
- }
- header := &types.Header{
- ParentHash: params.ParentHash,
- UncleHash: types.EmptyUncleHash,
- Coinbase: params.FeeRecipient,
- Root: params.StateRoot,
- TxHash: types.DeriveSha(types.Transactions(txs), trie.NewStackTrie(nil)),
- ReceiptHash: params.ReceiptsRoot,
- Bloom: types.BytesToBloom(params.LogsBloom),
- Difficulty: common.Big0,
- Number: new(big.Int).SetUint64(params.Number),
- GasLimit: params.GasLimit,
- GasUsed: params.GasUsed,
- Time: params.Timestamp,
- BaseFee: params.BaseFeePerGas,
- Extra: params.ExtraData,
- MixDigest: params.Random,
- }
- block := types.NewBlockWithHeader(header).WithBody(txs, nil /* uncles */)
- if block.Hash() != params.BlockHash {
- return nil, fmt.Errorf("blockhash mismatch, want %x, got %x", params.BlockHash, block.Hash())
- }
- return block, nil
-}
-
-// BlockToExecutableData constructs the executableDataV1 structure by filling the
-// fields from the given block. It assumes the given block is post-merge block.
-func BlockToExecutableData(block *types.Block) *ExecutableDataV1 {
- return &ExecutableDataV1{
- BlockHash: block.Hash(),
- ParentHash: block.ParentHash(),
- FeeRecipient: block.Coinbase(),
- StateRoot: block.Root(),
- Number: block.NumberU64(),
- GasLimit: block.GasLimit(),
- GasUsed: block.GasUsed(),
- BaseFeePerGas: block.BaseFee(),
- Timestamp: block.Time(),
- ReceiptsRoot: block.ReceiptHash(),
- LogsBloom: block.Bloom().Bytes(),
- Transactions: encodeTransactions(block.Transactions()),
- Random: block.MixDigest(),
- ExtraData: block.Extra(),
- }
+ return beacon.BlockToExecutableData(block), nil
}
// Used in tests to add a the list of transactions from a block to the tx pool.
@@ -335,29 +218,17 @@ func (api *ConsensusAPI) insertTransactions(txs types.Transactions) error {
func (api *ConsensusAPI) checkTerminalTotalDifficulty(head common.Hash) error {
// shortcut if we entered PoS already
- if api.merger().PoSFinalized() {
- return nil
- }
- if api.light {
- // make sure the parent has enough terminal total difficulty
- header := api.les.BlockChain().GetHeaderByHash(head)
- if header == nil {
- return &GenericServerError
- }
- td := api.les.BlockChain().GetTd(header.Hash(), header.Number.Uint64())
- if td != nil && td.Cmp(api.les.BlockChain().Config().TerminalTotalDifficulty) < 0 {
- return &InvalidTB
- }
+ if api.eth.Merger().PoSFinalized() {
return nil
}
// make sure the parent has enough terminal total difficulty
newHeadBlock := api.eth.BlockChain().GetBlockByHash(head)
if newHeadBlock == nil {
- return &GenericServerError
+ return &beacon.GenericServerError
}
td := api.eth.BlockChain().GetTd(newHeadBlock.Hash(), newHeadBlock.NumberU64())
if td != nil && td.Cmp(api.eth.BlockChain().Config().TerminalTotalDifficulty) < 0 {
- return &InvalidTB
+ return &beacon.InvalidTB
}
return nil
}
@@ -365,48 +236,22 @@ func (api *ConsensusAPI) checkTerminalTotalDifficulty(head common.Hash) error {
// setHead is called to perform a force choice.
func (api *ConsensusAPI) setHead(newHead common.Hash) error {
log.Info("Setting head", "head", newHead)
- if api.light {
- headHeader := api.les.BlockChain().CurrentHeader()
- if headHeader.Hash() == newHead {
- return nil
- }
- newHeadHeader := api.les.BlockChain().GetHeaderByHash(newHead)
- if newHeadHeader == nil {
- return &GenericServerError
- }
- if err := api.les.BlockChain().SetChainHead(newHeadHeader); err != nil {
- return err
- }
- // Trigger the transition if it's the first `NewHead` event.
- if merger := api.merger(); !merger.PoSFinalized() {
- merger.FinalizePoS()
- }
- return nil
- }
headBlock := api.eth.BlockChain().CurrentBlock()
if headBlock.Hash() == newHead {
return nil
}
newHeadBlock := api.eth.BlockChain().GetBlockByHash(newHead)
if newHeadBlock == nil {
- return &GenericServerError
+ return &beacon.GenericServerError
}
if err := api.eth.BlockChain().SetChainHead(newHeadBlock); err != nil {
return err
}
// Trigger the transition if it's the first `NewHead` event.
- if merger := api.merger(); !merger.PoSFinalized() {
+ if merger := api.eth.Merger(); !merger.PoSFinalized() {
merger.FinalizePoS()
}
// TODO (MariusVanDerWijden) are we really synced now?
api.eth.SetSynced()
return nil
}
-
-// Helper function, return the merger instance.
-func (api *ConsensusAPI) merger() *consensus.Merger {
- if api.light {
- return api.les.Merger()
- }
- return api.eth.Merger()
-}
diff --git a/eth/catalyst/api_test.go b/eth/catalyst/api_test.go
index b802fb05c86b..8fd65af7cca2 100644
--- a/eth/catalyst/api_test.go
+++ b/eth/catalyst/api_test.go
@@ -25,6 +25,7 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/consensus/ethash"
"github.com/ethereum/go-ethereum/core"
+ "github.com/ethereum/go-ethereum/core/beacon"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
@@ -78,14 +79,14 @@ func TestEth2AssembleBlock(t *testing.T) {
n, ethservice := startEthService(t, genesis, blocks)
defer n.Close()
- api := NewConsensusAPI(ethservice, nil)
+ api := NewConsensusAPI(ethservice)
signer := types.NewEIP155Signer(ethservice.BlockChain().Config().ChainID)
tx, err := types.SignTx(types.NewTransaction(uint64(10), blocks[9].Coinbase(), big.NewInt(1000), params.TxGas, big.NewInt(params.InitialBaseFee), nil), signer, testKey)
if err != nil {
t.Fatalf("error signing transaction, err=%v", err)
}
ethservice.TxPool().AddLocal(tx)
- blockParams := PayloadAttributesV1{
+ blockParams := beacon.PayloadAttributesV1{
Timestamp: blocks[9].Time() + 5,
}
execData, err := api.assembleBlock(blocks[9].Hash(), &blockParams)
@@ -102,11 +103,11 @@ func TestEth2AssembleBlockWithAnotherBlocksTxs(t *testing.T) {
n, ethservice := startEthService(t, genesis, blocks[:9])
defer n.Close()
- api := NewConsensusAPI(ethservice, nil)
+ api := NewConsensusAPI(ethservice)
// Put the 10th block's tx in the pool and produce a new block
api.insertTransactions(blocks[9].Transactions())
- blockParams := PayloadAttributesV1{
+ blockParams := beacon.PayloadAttributesV1{
Timestamp: blocks[8].Time() + 5,
}
execData, err := api.assembleBlock(blocks[8].Hash(), &blockParams)
@@ -123,14 +124,16 @@ func TestSetHeadBeforeTotalDifficulty(t *testing.T) {
n, ethservice := startEthService(t, genesis, blocks)
defer n.Close()
- api := NewConsensusAPI(ethservice, nil)
- fcState := ForkchoiceStateV1{
+ api := NewConsensusAPI(ethservice)
+ fcState := beacon.ForkchoiceStateV1{
HeadBlockHash: blocks[5].Hash(),
SafeBlockHash: common.Hash{},
FinalizedBlockHash: common.Hash{},
}
- if _, err := api.ForkchoiceUpdatedV1(fcState, nil); err == nil {
- t.Errorf("fork choice updated before total terminal difficulty should fail")
+ if resp, err := api.ForkchoiceUpdatedV1(fcState, nil); err != nil {
+ t.Errorf("fork choice updated should not error")
+ } else if resp.PayloadStatus.Status != beacon.INVALID {
+ t.Errorf("fork choice updated before total terminal difficulty should be INVALID")
}
}
@@ -141,14 +144,14 @@ func TestEth2PrepareAndGetPayload(t *testing.T) {
n, ethservice := startEthService(t, genesis, blocks[:9])
defer n.Close()
- api := NewConsensusAPI(ethservice, nil)
+ api := NewConsensusAPI(ethservice)
// Put the 10th block's tx in the pool and produce a new block
api.insertTransactions(blocks[9].Transactions())
- blockParams := PayloadAttributesV1{
+ blockParams := beacon.PayloadAttributesV1{
Timestamp: blocks[8].Time() + 5,
}
- fcState := ForkchoiceStateV1{
+ fcState := beacon.ForkchoiceStateV1{
HeadBlockHash: blocks[8].Hash(),
SafeBlockHash: common.Hash{},
FinalizedBlockHash: common.Hash{},
@@ -166,7 +169,7 @@ func TestEth2PrepareAndGetPayload(t *testing.T) {
t.Fatalf("invalid number of transactions %d != 1", len(execData.Transactions))
}
// Test invalid payloadID
- var invPayload PayloadID
+ var invPayload beacon.PayloadID
copy(invPayload[:], payloadID[:])
invPayload[0] = ^invPayload[0]
_, err = api.GetPayloadV1(invPayload)
@@ -199,7 +202,7 @@ func TestInvalidPayloadTimestamp(t *testing.T) {
ethservice.Merger().ReachTTD()
defer n.Close()
var (
- api = NewConsensusAPI(ethservice, nil)
+ api = NewConsensusAPI(ethservice)
parent = ethservice.BlockChain().CurrentBlock()
)
tests := []struct {
@@ -215,12 +218,12 @@ func TestInvalidPayloadTimestamp(t *testing.T) {
for i, test := range tests {
t.Run(fmt.Sprintf("Timestamp test: %v", i), func(t *testing.T) {
- params := PayloadAttributesV1{
+ params := beacon.PayloadAttributesV1{
Timestamp: test.time,
Random: crypto.Keccak256Hash([]byte{byte(123)}),
SuggestedFeeRecipient: parent.Coinbase(),
}
- fcState := ForkchoiceStateV1{
+ fcState := beacon.ForkchoiceStateV1{
HeadBlockHash: parent.Hash(),
SafeBlockHash: common.Hash{},
FinalizedBlockHash: common.Hash{},
@@ -242,7 +245,7 @@ func TestEth2NewBlock(t *testing.T) {
defer n.Close()
var (
- api = NewConsensusAPI(ethservice, nil)
+ api = NewConsensusAPI(ethservice)
parent = preMergeBlocks[len(preMergeBlocks)-1]
// This EVM code generates a log when the contract is created.
@@ -260,17 +263,17 @@ func TestEth2NewBlock(t *testing.T) {
tx, _ := types.SignTx(types.NewContractCreation(nonce, new(big.Int), 1000000, big.NewInt(2*params.InitialBaseFee), logCode), types.LatestSigner(ethservice.BlockChain().Config()), testKey)
ethservice.TxPool().AddLocal(tx)
- execData, err := api.assembleBlock(parent.Hash(), &PayloadAttributesV1{
+ execData, err := api.assembleBlock(parent.Hash(), &beacon.PayloadAttributesV1{
Timestamp: parent.Time() + 5,
})
if err != nil {
t.Fatalf("Failed to create the executable data %v", err)
}
- block, err := ExecutableDataToBlock(*execData)
+ block, err := beacon.ExecutableDataToBlock(*execData)
if err != nil {
t.Fatalf("Failed to convert executable data to block %v", err)
}
- newResp, err := api.ExecutePayloadV1(*execData)
+ newResp, err := api.NewPayloadV1(*execData)
if err != nil || newResp.Status != "VALID" {
t.Fatalf("Failed to insert block: %v", err)
}
@@ -278,7 +281,7 @@ func TestEth2NewBlock(t *testing.T) {
t.Fatalf("Chain head shouldn't be updated")
}
checkLogEvents(t, newLogCh, rmLogsCh, 0, 0)
- fcState := ForkchoiceStateV1{
+ fcState := beacon.ForkchoiceStateV1{
HeadBlockHash: block.Hash(),
SafeBlockHash: block.Hash(),
FinalizedBlockHash: block.Hash(),
@@ -300,17 +303,17 @@ func TestEth2NewBlock(t *testing.T) {
)
parent = preMergeBlocks[len(preMergeBlocks)-1]
for i := 0; i < 10; i++ {
- execData, err := api.assembleBlock(parent.Hash(), &PayloadAttributesV1{
+ execData, err := api.assembleBlock(parent.Hash(), &beacon.PayloadAttributesV1{
Timestamp: parent.Time() + 6,
})
if err != nil {
t.Fatalf("Failed to create the executable data %v", err)
}
- block, err := ExecutableDataToBlock(*execData)
+ block, err := beacon.ExecutableDataToBlock(*execData)
if err != nil {
t.Fatalf("Failed to convert executable data to block %v", err)
}
- newResp, err := api.ExecutePayloadV1(*execData)
+ newResp, err := api.NewPayloadV1(*execData)
if err != nil || newResp.Status != "VALID" {
t.Fatalf("Failed to insert block: %v", err)
}
@@ -318,7 +321,7 @@ func TestEth2NewBlock(t *testing.T) {
t.Fatalf("Chain head shouldn't be updated")
}
- fcState := ForkchoiceStateV1{
+ fcState := beacon.ForkchoiceStateV1{
HeadBlockHash: block.Hash(),
SafeBlockHash: block.Hash(),
FinalizedBlockHash: block.Hash(),
@@ -412,7 +415,7 @@ func TestFullAPI(t *testing.T) {
ethservice.Merger().ReachTTD()
defer n.Close()
var (
- api = NewConsensusAPI(ethservice, nil)
+ api = NewConsensusAPI(ethservice)
parent = ethservice.BlockChain().CurrentBlock()
// This EVM code generates a log when the contract is created.
logCode = common.Hex2Bytes("60606040525b7f24ec1d3ff24c2f6ff210738839dbc339cd45a5294d85c79361016243157aae7b60405180905060405180910390a15b600a8060416000396000f360606040526008565b00")
@@ -423,12 +426,12 @@ func TestFullAPI(t *testing.T) {
tx, _ := types.SignTx(types.NewContractCreation(nonce, new(big.Int), 1000000, big.NewInt(2*params.InitialBaseFee), logCode), types.LatestSigner(ethservice.BlockChain().Config()), testKey)
ethservice.TxPool().AddLocal(tx)
- params := PayloadAttributesV1{
+ params := beacon.PayloadAttributesV1{
Timestamp: parent.Time() + 1,
Random: crypto.Keccak256Hash([]byte{byte(i)}),
SuggestedFeeRecipient: parent.Coinbase(),
}
- fcState := ForkchoiceStateV1{
+ fcState := beacon.ForkchoiceStateV1{
HeadBlockHash: parent.Hash(),
SafeBlockHash: common.Hash{},
FinalizedBlockHash: common.Hash{},
@@ -437,22 +440,22 @@ func TestFullAPI(t *testing.T) {
if err != nil {
t.Fatalf("error preparing payload, err=%v", err)
}
- if resp.Status != SUCCESS.Status {
- t.Fatalf("error preparing payload, invalid status: %v", resp.Status)
+ if resp.PayloadStatus.Status != beacon.VALID {
+ t.Fatalf("error preparing payload, invalid status: %v", resp.PayloadStatus.Status)
}
payloadID := computePayloadId(parent.Hash(), ¶ms)
payload, err := api.GetPayloadV1(payloadID)
if err != nil {
t.Fatalf("can't get payload: %v", err)
}
- execResp, err := api.ExecutePayloadV1(*payload)
+ execResp, err := api.NewPayloadV1(*payload)
if err != nil {
t.Fatalf("can't execute payload: %v", err)
}
- if execResp.Status != VALID.Status {
+ if execResp.Status != beacon.VALID {
t.Fatalf("invalid status: %v", execResp.Status)
}
- fcState = ForkchoiceStateV1{
+ fcState = beacon.ForkchoiceStateV1{
HeadBlockHash: payload.BlockHash,
SafeBlockHash: payload.ParentHash,
FinalizedBlockHash: payload.ParentHash,
diff --git a/eth/catalyst/queue.go b/eth/catalyst/queue.go
index db373a6c7904..aa2ce7823d66 100644
--- a/eth/catalyst/queue.go
+++ b/eth/catalyst/queue.go
@@ -16,7 +16,11 @@
package catalyst
-import "sync"
+import (
+ "sync"
+
+ "github.com/ethereum/go-ethereum/core/beacon"
+)
// maxTrackedPayloads is the maximum number of prepared payloads the execution
// engine tracks before evicting old ones. Ideally we should only ever track the
@@ -26,8 +30,8 @@ const maxTrackedPayloads = 10
// payloadQueueItem represents an id->payload tuple to store until it's retrieved
// or evicted.
type payloadQueueItem struct {
- id PayloadID
- payload *ExecutableDataV1
+ id beacon.PayloadID
+ payload *beacon.ExecutableDataV1
}
// payloadQueue tracks the latest handful of constructed payloads to be retrieved
@@ -46,7 +50,7 @@ func newPayloadQueue() *payloadQueue {
}
// put inserts a new payload into the queue at the given id.
-func (q *payloadQueue) put(id PayloadID, data *ExecutableDataV1) {
+func (q *payloadQueue) put(id beacon.PayloadID, data *beacon.ExecutableDataV1) {
q.lock.Lock()
defer q.lock.Unlock()
@@ -58,7 +62,7 @@ func (q *payloadQueue) put(id PayloadID, data *ExecutableDataV1) {
}
// get retrieves a previously stored payload item or nil if it does not exist.
-func (q *payloadQueue) get(id PayloadID) *ExecutableDataV1 {
+func (q *payloadQueue) get(id beacon.PayloadID) *beacon.ExecutableDataV1 {
q.lock.RLock()
defer q.lock.RUnlock()
diff --git a/eth/downloader/beaconsync.go b/eth/downloader/beaconsync.go
new file mode 100644
index 000000000000..4f787a9414e0
--- /dev/null
+++ b/eth/downloader/beaconsync.go
@@ -0,0 +1,261 @@
+// Copyright 2021 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see .
+
+package downloader
+
+import (
+ "fmt"
+ "sync"
+ "sync/atomic"
+ "time"
+
+ "github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/core/types"
+ "github.com/ethereum/go-ethereum/log"
+)
+
+// beaconBackfiller is the chain and state backfilling that can be commenced once
+// the skeleton syncer has successfully reverse downloaded all the headers up to
+// the genesis block or an existing header in the database. Its operation is fully
+// directed by the skeleton sync's head/tail events.
+type beaconBackfiller struct {
+ downloader *Downloader // Downloader to direct via this callback implementation
+ syncMode SyncMode // Sync mode to use for backfilling the skeleton chains
+ success func() // Callback to run on successful sync cycle completion
+ filling bool // Flag whether the downloader is backfilling or not
+ started chan struct{} // Notification channel whether the downloader inited
+ lock sync.Mutex // Mutex protecting the sync lock
+}
+
+// newBeaconBackfiller is a helper method to create the backfiller.
+func newBeaconBackfiller(dl *Downloader, success func()) backfiller {
+ return &beaconBackfiller{
+ downloader: dl,
+ success: success,
+ }
+}
+
+// suspend cancels any background downloader threads.
+func (b *beaconBackfiller) suspend() {
+ // If no filling is running, don't waste cycles
+ b.lock.Lock()
+ filling := b.filling
+ started := b.started
+ b.lock.Unlock()
+
+ if !filling {
+ return
+ }
+ // A previous filling should be running, though it may happen that it hasn't
+ // yet started (being done on a new goroutine). Many concurrent beacon head
+ // announcements can lead to sync start/stop thrashing. In that case we need
+ // to wait for initialization before we can safely cancel it. It is safe to
+ // read this channel multiple times, it gets closed on startup.
+ <-started
+
+ // Now that we're sure the downloader successfully started up, we can cancel
+ // it safely without running the risk of data races.
+ b.downloader.Cancel()
+}
+
+// resume starts the downloader threads for backfilling state and chain data.
+func (b *beaconBackfiller) resume() {
+ b.lock.Lock()
+ if b.filling {
+ // If a previous filling cycle is still running, just ignore this start
+ // request. // TODO(karalabe): We should make this channel driven
+ b.lock.Unlock()
+ return
+ }
+ b.filling = true
+ b.started = make(chan struct{})
+ mode := b.syncMode
+ b.lock.Unlock()
+
+ // Start the backfilling on its own thread since the downloader does not have
+ // its own lifecycle runloop.
+ go func() {
+ // Set the backfiller to non-filling when download completes
+ defer func() {
+ b.lock.Lock()
+ b.filling = false
+ b.lock.Unlock()
+ }()
+ // If the downloader fails, report an error as in beacon chain mode there
+ // should be no errors as long as the chain we're syncing to is valid.
+ if err := b.downloader.synchronise("", common.Hash{}, nil, mode, true, b.started); err != nil {
+ log.Error("Beacon backfilling failed", "err", err)
+ return
+ }
+ // Synchronization succeeded. Since this happens async, notify the outer
+ // context to disable snap syncing and enable transaction propagation.
+ if b.success != nil {
+ b.success()
+ }
+ }()
+}
+
+// setMode updates the sync mode from the current one to the requested one. If
+// there's an active sync in progress, it will be cancelled and restarted.
+func (b *beaconBackfiller) setMode(mode SyncMode) {
+ // Update the old sync mode and track if it was changed
+ b.lock.Lock()
+ updated := b.syncMode != mode
+ filling := b.filling
+ b.syncMode = mode
+ b.lock.Unlock()
+
+ // If the sync mode was changed mid-sync, restart. This should never ever
+ // really happen, we just handle it to detect programming errors.
+ if !updated || !filling {
+ return
+ }
+ log.Error("Downloader sync mode changed mid-run", "old", mode.String(), "new", mode.String())
+ b.suspend()
+ b.resume()
+}
+
+// BeaconSync is the Ethereum 2 version of the chain synchronization, where the
+// chain is not downloaded from genesis onward, rather from trusted head announces
+// backwards.
+//
+// Internally backfilling and state sync is done the same way, but the header
+// retrieval and scheduling is replaced.
+func (d *Downloader) BeaconSync(mode SyncMode, head *types.Header) error {
+ // When the downloader starts a sync cycle, it needs to be aware of the sync
+ // mode to use (full, snap). To keep the skeleton chain oblivious, inject the
+ // mode into the backfiller directly.
+ //
+ // Super crazy dangerous type cast. Should be fine (TM), we're only using a
+ // different backfiller implementation for skeleton tests.
+ d.skeleton.filler.(*beaconBackfiller).setMode(mode)
+
+ // Signal the skeleton sync to switch to a new head, however it wants
+ if err := d.skeleton.Sync(head); err != nil {
+ return err
+ }
+ return nil
+}
+
+// findBeaconAncestor tries to locate the common ancestor link of the local chain
+// and the beacon chain just requested. In the general case when our node was in
+// sync and on the correct chain, checking the top N links should already get us
+// a match. In the rare scenario when we ended up on a long reorganisation (i.e.
+// none of the head links match), we do a binary search to find the ancestor.
+func (d *Downloader) findBeaconAncestor() uint64 {
+ // Figure out the current local head position
+ var chainHead *types.Header
+
+ switch d.getMode() {
+ case FullSync:
+ chainHead = d.blockchain.CurrentBlock().Header()
+ case SnapSync:
+ chainHead = d.blockchain.CurrentFastBlock().Header()
+ default:
+ chainHead = d.lightchain.CurrentHeader()
+ }
+ number := chainHead.Number.Uint64()
+
+ // If the head is present in the skeleton chain, return that
+ if chainHead.Hash() == d.skeleton.Header(number).Hash() {
+ return number
+ }
+ // Head header not present, binary search to find the ancestor
+ start, end := uint64(0), number
+
+ beaconHead, err := d.skeleton.Head()
+ if err != nil {
+ panic(fmt.Sprintf("failed to read skeleton head: %v", err)) // can't reach this method without a head
+ }
+ if number := beaconHead.Number.Uint64(); end > number {
+ // This shouldn't really happen in a healty network, but if the consensus
+ // clients feeds us a shorter chain as the canonical, we should not attempt
+ // to access non-existent skeleton items.
+ log.Warn("Beacon head lower than local chain", "beacon", number, "local", end)
+ end = number
+ }
+ for start+1 < end {
+ // Split our chain interval in two, and request the hash to cross check
+ check := (start + end) / 2
+
+ h := d.skeleton.Header(check)
+ n := h.Number.Uint64()
+
+ var known bool
+ switch d.getMode() {
+ case FullSync:
+ known = d.blockchain.HasBlock(h.Hash(), n)
+ case SnapSync:
+ known = d.blockchain.HasFastBlock(h.Hash(), n)
+ default:
+ known = d.lightchain.HasHeader(h.Hash(), n)
+ }
+ if !known {
+ end = check
+ continue
+ }
+ start = check
+ }
+ return start
+}
+
+// fetchBeaconHeaders feeds skeleton headers to the downloader queue for scheduling
+// until sync errors or is finished.
+func (d *Downloader) fetchBeaconHeaders(from uint64) error {
+ head, err := d.skeleton.Head()
+ if err != nil {
+ return err
+ }
+ for {
+ // Retrieve a batch of headers and feed it to the header processor
+ var (
+ headers = make([]*types.Header, 0, maxHeadersProcess)
+ hashes = make([]common.Hash, 0, maxHeadersProcess)
+ )
+ for i := 0; i < maxHeadersProcess && from <= head.Number.Uint64(); i++ {
+ headers = append(headers, d.skeleton.Header(from))
+ hashes = append(hashes, headers[i].Hash())
+ from++
+ }
+ select {
+ case d.headerProcCh <- &headerTask{
+ headers: headers,
+ hashes: hashes,
+ }:
+ case <-d.cancelCh:
+ return errCanceled
+ }
+ // If we still have headers to import, loop and keep pushing them
+ if from <= head.Number.Uint64() {
+ continue
+ }
+ // If the pivot block is committed, signal header sync termination
+ if atomic.LoadInt32(&d.committed) == 1 {
+ d.headerProcCh <- nil
+ return nil
+ }
+ // State sync still going, wait a bit for new headers and retry
+ select {
+ case <-time.After(fsHeaderContCheck):
+ case <-d.cancelCh:
+ return errCanceled
+ }
+ head, err = d.skeleton.Head()
+ if err != nil {
+ return err
+ }
+ }
+}
diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go
index 28ad18b81579..1b38274bc775 100644
--- a/eth/downloader/downloader.go
+++ b/eth/downloader/downloader.go
@@ -30,7 +30,6 @@ import (
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/state/snapshot"
"github.com/ethereum/go-ethereum/core/types"
- "github.com/ethereum/go-ethereum/eth/protocols/eth"
"github.com/ethereum/go-ethereum/eth/protocols/snap"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/event"
@@ -123,6 +122,9 @@ type Downloader struct {
// Channels
headerProcCh chan *headerTask // Channel to feed the header processor new tasks
+ // Skeleton sync
+ skeleton *skeleton // Header skeleton to backfill the chain with (eth2 mode)
+
// State sync
pivotHeader *types.Header // Pivot block header to dynamically push the syncing state root
pivotLock sync.RWMutex // Lock protecting pivot header reads from updates
@@ -201,7 +203,7 @@ type BlockChain interface {
}
// New creates a new downloader to fetch hashes and blocks from remote peers.
-func New(checkpoint uint64, stateDb ethdb.Database, mux *event.TypeMux, chain BlockChain, lightchain LightChain, dropPeer peerDropFn) *Downloader {
+func New(checkpoint uint64, stateDb ethdb.Database, mux *event.TypeMux, chain BlockChain, lightchain LightChain, dropPeer peerDropFn, success func()) *Downloader {
if lightchain == nil {
lightchain = chain
}
@@ -219,6 +221,8 @@ func New(checkpoint uint64, stateDb ethdb.Database, mux *event.TypeMux, chain Bl
SnapSyncer: snap.NewSyncer(stateDb),
stateSyncStart: make(chan *stateSync),
}
+ dl.skeleton = newSkeleton(stateDb, dl.peers, dropPeer, newBeaconBackfiller(dl, success))
+
go dl.stateFetcher()
return dl
}
@@ -318,10 +322,10 @@ func (d *Downloader) UnregisterPeer(id string) error {
return nil
}
-// Synchronise tries to sync up our local block chain with a remote peer, both
+// LegacySync tries to sync up our local block chain with a remote peer, both
// adding various sanity checks as well as wrapping it with various log entries.
-func (d *Downloader) Synchronise(id string, head common.Hash, td *big.Int, mode SyncMode) error {
- err := d.synchronise(id, head, td, mode)
+func (d *Downloader) LegacySync(id string, head common.Hash, td *big.Int, mode SyncMode) error {
+ err := d.synchronise(id, head, td, mode, false, nil)
switch err {
case nil, errBusy, errCanceled:
@@ -347,7 +351,21 @@ func (d *Downloader) Synchronise(id string, head common.Hash, td *big.Int, mode
// synchronise will select the peer and use it for synchronising. If an empty string is given
// it will use the best peer possible and synchronize if its TD is higher than our own. If any of the
// checks fail an error will be returned. This method is synchronous
-func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int, mode SyncMode) error {
+func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int, mode SyncMode, beaconMode bool, beaconPing chan struct{}) error {
+ // The beacon header syncer is async. It will start this synchronization and
+ // will continue doing other tasks. However, if synchornization needs to be
+ // cancelled, the syncer needs to know if we reached the startup point (and
+ // inited the cancel cannel) or not yet. Make sure that we'll signal even in
+ // case of a failure.
+ if beaconPing != nil {
+ defer func() {
+ select {
+ case <-beaconPing: // already notified
+ default:
+ close(beaconPing) // weird exit condition, notify that it's safe to cancel (the nothing)
+ }
+ }()
+ }
// Mock out the synchronisation if testing
if d.synchroniseMock != nil {
return d.synchroniseMock(id, hash)
@@ -402,11 +420,17 @@ func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int, mode
atomic.StoreUint32(&d.mode, uint32(mode))
// Retrieve the origin peer and initiate the downloading process
- p := d.peers.Peer(id)
- if p == nil {
- return errUnknownPeer
+ var p *peerConnection
+ if !beaconMode { // Beacon mode doesn't need a peer to sync from
+ p = d.peers.Peer(id)
+ if p == nil {
+ return errUnknownPeer
+ }
}
- return d.syncWithPeer(p, hash, td)
+ if beaconPing != nil {
+ close(beaconPing)
+ }
+ return d.syncWithPeer(p, hash, td, beaconMode)
}
func (d *Downloader) getMode() SyncMode {
@@ -415,7 +439,7 @@ func (d *Downloader) getMode() SyncMode {
// syncWithPeer starts a block synchronization based on the hash chain from the
// specified peer and head hash.
-func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td *big.Int) (err error) {
+func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td *big.Int, beaconMode bool) (err error) {
d.mux.Post(StartEvent{})
defer func() {
// reset on error
@@ -426,33 +450,57 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td *big.I
d.mux.Post(DoneEvent{latest})
}
}()
- if p.version < eth.ETH66 {
- return fmt.Errorf("%w: advertized %d < required %d", errTooOld, p.version, eth.ETH66)
- }
mode := d.getMode()
- log.Debug("Synchronising with the network", "peer", p.id, "eth", p.version, "head", hash, "td", td, "mode", mode)
+ if !beaconMode {
+ log.Debug("Synchronising with the network", "peer", p.id, "eth", p.version, "head", hash, "td", td, "mode", mode)
+ } else {
+ log.Debug("Backfilling with the network", "mode", mode)
+ }
defer func(start time.Time) {
log.Debug("Synchronisation terminated", "elapsed", common.PrettyDuration(time.Since(start)))
}(time.Now())
// Look up the sync boundaries: the common ancestor and the target block
- latest, pivot, err := d.fetchHead(p)
- if err != nil {
- return err
- }
+ var latest, pivot *types.Header
+ if !beaconMode {
+ // In legacy mode, use the master peer to retrieve the headers from
+ latest, pivot, err = d.fetchHead(p)
+ if err != nil {
+ return err
+ }
+ } else {
+ // In beacon mode, user the skeleton chain to retrieve the headers from
+ latest, err = d.skeleton.Head()
+ if err != nil {
+ return err
+ }
+ // Opposed to legacy mode, in beacon mode we trust the chain we've been
+ // told to sync to, so no need to leave a gap between the pivot and head
+ // to full sync. Still, the downloader's been architected to do a full
+ // block import after the pivot, so make it off by one to avoid having
+ // to special case everything internally.
+ pivot = d.skeleton.Header(latest.Number.Uint64() - 1)
+ }
+ // If no pivot block was returned, the head is below the min full block
+ // threshold (i.e. new chain). In that case we won't really snap sync
+ // anyway, but still need a valid pivot block to avoid some code hitting
+ // nil panics on access.
if mode == SnapSync && pivot == nil {
- // If no pivot block was returned, the head is below the min full block
- // threshold (i.e. new chain). In that case we won't really snap sync
- // anyway, but still need a valid pivot block to avoid some code hitting
- // nil panics on an access.
pivot = d.blockchain.CurrentBlock().Header()
}
height := latest.Number.Uint64()
- origin, err := d.findAncestor(p, latest)
- if err != nil {
- return err
+ var origin uint64
+ if !beaconMode {
+ // In legacy mode, reach out to the network and find the ancestor
+ origin, err = d.findAncestor(p, latest)
+ if err != nil {
+ return err
+ }
+ } else {
+ // In beacon mode, use the skeleton chain for the ancestor lookup
+ origin = d.findBeaconAncestor()
}
d.syncStatsLock.Lock()
if d.syncStatsChainHeight <= origin || d.syncStatsChainOrigin > origin {
@@ -523,11 +571,19 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td *big.I
if d.syncInitHook != nil {
d.syncInitHook(origin, height)
}
+ var headerFetcher func() error
+ if !beaconMode {
+ // In legacy mode, headers are retrieved from the network
+ headerFetcher = func() error { return d.fetchHeaders(p, origin+1, latest.Number.Uint64()) }
+ } else {
+ // In beacon mode, headers are served by the skeleton syncer
+ headerFetcher = func() error { return d.fetchBeaconHeaders(origin + 1) }
+ }
fetchers := []func() error{
- func() error { return d.fetchHeaders(p, origin+1, latest.Number.Uint64()) }, // Headers are always retrieved
- func() error { return d.fetchBodies(origin + 1) }, // Bodies are retrieved during normal and snap sync
- func() error { return d.fetchReceipts(origin + 1) }, // Receipts are retrieved during snap sync
- func() error { return d.processHeaders(origin+1, td) },
+ headerFetcher, // Headers are always retrieved
+ func() error { return d.fetchBodies(origin+1, beaconMode) }, // Bodies are retrieved during normal and snap sync
+ func() error { return d.fetchReceipts(origin+1, beaconMode) }, // Receipts are retrieved during snap sync
+ func() error { return d.processHeaders(origin+1, td, beaconMode) },
}
if mode == SnapSync {
d.pivotLock.Lock()
@@ -1127,7 +1183,7 @@ func (d *Downloader) fillHeaderSkeleton(from uint64, skeleton []*types.Header) (
log.Debug("Filling up skeleton", "from", from)
d.queue.ScheduleSkeleton(from, skeleton)
- err := d.concurrentFetch((*headerQueue)(d))
+ err := d.concurrentFetch((*headerQueue)(d), false)
if err != nil {
log.Debug("Skeleton fill failed", "err", err)
}
@@ -1141,9 +1197,9 @@ func (d *Downloader) fillHeaderSkeleton(from uint64, skeleton []*types.Header) (
// fetchBodies iteratively downloads the scheduled block bodies, taking any
// available peers, reserving a chunk of blocks for each, waiting for delivery
// and also periodically checking for timeouts.
-func (d *Downloader) fetchBodies(from uint64) error {
+func (d *Downloader) fetchBodies(from uint64, beaconMode bool) error {
log.Debug("Downloading block bodies", "origin", from)
- err := d.concurrentFetch((*bodyQueue)(d))
+ err := d.concurrentFetch((*bodyQueue)(d), beaconMode)
log.Debug("Block body download terminated", "err", err)
return err
@@ -1152,9 +1208,9 @@ func (d *Downloader) fetchBodies(from uint64) error {
// fetchReceipts iteratively downloads the scheduled block receipts, taking any
// available peers, reserving a chunk of receipts for each, waiting for delivery
// and also periodically checking for timeouts.
-func (d *Downloader) fetchReceipts(from uint64) error {
+func (d *Downloader) fetchReceipts(from uint64, beaconMode bool) error {
log.Debug("Downloading receipts", "origin", from)
- err := d.concurrentFetch((*receiptQueue)(d))
+ err := d.concurrentFetch((*receiptQueue)(d), beaconMode)
log.Debug("Receipt download terminated", "err", err)
return err
@@ -1163,7 +1219,7 @@ func (d *Downloader) fetchReceipts(from uint64) error {
// processHeaders takes batches of retrieved headers from an input channel and
// keeps processing and scheduling them into the header chain and downloader's
// queue until the stream ends or a failure occurs.
-func (d *Downloader) processHeaders(origin uint64, td *big.Int) error {
+func (d *Downloader) processHeaders(origin uint64, td *big.Int, beaconMode bool) error {
// Keep a count of uncertain headers to roll back
var (
rollback uint64 // Zero means no rollback (fine as you can't unroll the genesis)
@@ -1211,35 +1267,40 @@ func (d *Downloader) processHeaders(origin uint64, td *big.Int) error {
case <-d.cancelCh:
}
}
- // If no headers were retrieved at all, the peer violated its TD promise that it had a
- // better chain compared to ours. The only exception is if its promised blocks were
- // already imported by other means (e.g. fetcher):
- //
- // R , L : Both at block 10
- // R: Mine block 11, and propagate it to L
- // L: Queue block 11 for import
- // L: Notice that R's head and TD increased compared to ours, start sync
- // L: Import of block 11 finishes
- // L: Sync begins, and finds common ancestor at 11
- // L: Request new headers up from 11 (R's TD was higher, it must have something)
- // R: Nothing to give
- if mode != LightSync {
- head := d.blockchain.CurrentBlock()
- if !gotHeaders && td.Cmp(d.blockchain.GetTd(head.Hash(), head.NumberU64())) > 0 {
- return errStallingPeer
+ // If we're in legacy sync mode, we need to check total difficulty
+ // violations from malicious peers. That is not needed in beacon
+ // mode and we can skip to terminating sync.
+ if !beaconMode {
+ // If no headers were retrieved at all, the peer violated its TD promise that it had a
+ // better chain compared to ours. The only exception is if its promised blocks were
+ // already imported by other means (e.g. fetcher):
+ //
+ // R , L : Both at block 10
+ // R: Mine block 11, and propagate it to L
+ // L: Queue block 11 for import
+ // L: Notice that R's head and TD increased compared to ours, start sync
+ // L: Import of block 11 finishes
+ // L: Sync begins, and finds common ancestor at 11
+ // L: Request new headers up from 11 (R's TD was higher, it must have something)
+ // R: Nothing to give
+ if mode != LightSync {
+ head := d.blockchain.CurrentBlock()
+ if !gotHeaders && td.Cmp(d.blockchain.GetTd(head.Hash(), head.NumberU64())) > 0 {
+ return errStallingPeer
+ }
}
- }
- // If snap or light syncing, ensure promised headers are indeed delivered. This is
- // needed to detect scenarios where an attacker feeds a bad pivot and then bails out
- // of delivering the post-pivot blocks that would flag the invalid content.
- //
- // This check cannot be executed "as is" for full imports, since blocks may still be
- // queued for processing when the header download completes. However, as long as the
- // peer gave us something useful, we're already happy/progressed (above check).
- if mode == SnapSync || mode == LightSync {
- head := d.lightchain.CurrentHeader()
- if td.Cmp(d.lightchain.GetTd(head.Hash(), head.Number.Uint64())) > 0 {
- return errStallingPeer
+ // If snap or light syncing, ensure promised headers are indeed delivered. This is
+ // needed to detect scenarios where an attacker feeds a bad pivot and then bails out
+ // of delivering the post-pivot blocks that would flag the invalid content.
+ //
+ // This check cannot be executed "as is" for full imports, since blocks may still be
+ // queued for processing when the header download completes. However, as long as the
+ // peer gave us something useful, we're already happy/progressed (above check).
+ if mode == SnapSync || mode == LightSync {
+ head := d.lightchain.CurrentHeader()
+ if td.Cmp(d.lightchain.GetTd(head.Hash(), head.Number.Uint64())) > 0 {
+ return errStallingPeer
+ }
}
}
// Disable any rollback and return
diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go
index 70c6a51215b5..ffae433c48fb 100644
--- a/eth/downloader/downloader_test.go
+++ b/eth/downloader/downloader_test.go
@@ -75,7 +75,7 @@ func newTester() *downloadTester {
chain: chain,
peers: make(map[string]*downloadTesterPeer),
}
- tester.downloader = New(0, db, new(event.TypeMux), tester.chain, nil, tester.dropPeer)
+ tester.downloader = New(0, db, new(event.TypeMux), tester.chain, nil, tester.dropPeer, nil)
return tester
}
@@ -96,7 +96,7 @@ func (dl *downloadTester) sync(id string, td *big.Int, mode SyncMode) error {
td = dl.peers[id].chain.GetTd(head.Hash(), head.NumberU64())
}
// Synchronise with the chosen peer and ensure proper cleanup afterwards
- err := dl.downloader.synchronise(id, head.Hash(), td, mode)
+ err := dl.downloader.synchronise(id, head.Hash(), td, mode, false, nil)
select {
case <-dl.downloader.cancelCh:
// Ok, downloader fully cancelled after sync cycle
@@ -971,7 +971,7 @@ func testBlockHeaderAttackerDropping(t *testing.T, protocol uint) {
// Simulate a synchronisation and check the required result
tester.downloader.synchroniseMock = func(string, common.Hash) error { return tt.result }
- tester.downloader.Synchronise(id, tester.chain.Genesis().Hash(), big.NewInt(1000), FullSync)
+ tester.downloader.LegacySync(id, tester.chain.Genesis().Hash(), big.NewInt(1000), FullSync)
if _, ok := tester.peers[id]; !ok != tt.drop {
t.Errorf("test %d: peer drop mismatch for %v: have %v, want %v", i, tt.result, !ok, tt.drop)
}
diff --git a/eth/downloader/fetchers_concurrent.go b/eth/downloader/fetchers_concurrent.go
index 4bade2b4c3dd..a0aa197175a3 100644
--- a/eth/downloader/fetchers_concurrent.go
+++ b/eth/downloader/fetchers_concurrent.go
@@ -76,7 +76,7 @@ type typedQueue interface {
// concurrentFetch iteratively downloads scheduled block parts, taking available
// peers, reserving a chunk of fetch requests for each and waiting for delivery
// or timeouts.
-func (d *Downloader) concurrentFetch(queue typedQueue) error {
+func (d *Downloader) concurrentFetch(queue typedQueue, beaconMode bool) error {
// Create a delivery channel to accept responses from all peers
responses := make(chan *eth.Response)
@@ -127,7 +127,7 @@ func (d *Downloader) concurrentFetch(queue typedQueue) error {
finished := false
for {
// Short circuit if we lost all our peers
- if d.peers.Len() == 0 {
+ if d.peers.Len() == 0 && !beaconMode {
return errNoPeers
}
// If there's nothing more to fetch, wait or terminate
@@ -209,7 +209,7 @@ func (d *Downloader) concurrentFetch(queue typedQueue) error {
}
// Make sure that we have peers available for fetching. If all peers have been tried
// and all failed throw an error
- if !progressed && !throttled && len(pending) == 0 && len(idles) == d.peers.Len() && queued > 0 {
+ if !progressed && !throttled && len(pending) == 0 && len(idles) == d.peers.Len() && queued > 0 && !beaconMode {
return errPeersUnavailable
}
}
diff --git a/eth/downloader/peer.go b/eth/downloader/peer.go
index 324fdb9cd51f..d74d23e74d55 100644
--- a/eth/downloader/peer.go
+++ b/eth/downloader/peer.go
@@ -294,19 +294,19 @@ func (ps *peerSet) AllPeers() []*peerConnection {
// peerCapacitySort implements sort.Interface.
// It sorts peer connections by capacity (descending).
type peerCapacitySort struct {
- p []*peerConnection
- tp []int
+ peers []*peerConnection
+ caps []int
}
func (ps *peerCapacitySort) Len() int {
- return len(ps.p)
+ return len(ps.peers)
}
func (ps *peerCapacitySort) Less(i, j int) bool {
- return ps.tp[i] > ps.tp[j]
+ return ps.caps[i] > ps.caps[j]
}
func (ps *peerCapacitySort) Swap(i, j int) {
- ps.p[i], ps.p[j] = ps.p[j], ps.p[i]
- ps.tp[i], ps.tp[j] = ps.tp[j], ps.tp[i]
+ ps.peers[i], ps.peers[j] = ps.peers[j], ps.peers[i]
+ ps.caps[i], ps.caps[j] = ps.caps[j], ps.caps[i]
}
diff --git a/eth/downloader/skeleton.go b/eth/downloader/skeleton.go
new file mode 100644
index 000000000000..41631752ad7d
--- /dev/null
+++ b/eth/downloader/skeleton.go
@@ -0,0 +1,965 @@
+// Copyright 2021 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see .
+
+package downloader
+
+import (
+ "encoding/json"
+ "errors"
+ "math/rand"
+ "sort"
+ "time"
+
+ "github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/core/rawdb"
+ "github.com/ethereum/go-ethereum/core/types"
+ "github.com/ethereum/go-ethereum/eth/protocols/eth"
+ "github.com/ethereum/go-ethereum/ethdb"
+ "github.com/ethereum/go-ethereum/log"
+)
+
+// scratchHeaders is the number of headers to store in a scratch space to allow
+// concurrent downloads. A header is about 0.5KB in size, so there is no worry
+// about using too much memory. The only catch is that we can only validate gaps
+// afer they're linked to the head, so the bigger the scratch space, the larger
+// potential for invalid headers.
+//
+// The current scratch space of 131072 headers is expected to use 64MB RAM.
+const scratchHeaders = 131072
+
+// requestHeaders is the number of header to request from a remote peer in a single
+// network packet. Although the skeleton downloader takes into consideration peer
+// capacities when picking idlers, the packet size was decided to remain constant
+// since headers are relatively small and it's easier to work with fixed batches
+// vs. dynamic interval fillings.
+const requestHeaders = 512
+
+// errSyncLinked is an internal helper error to signal that the current sync
+// cycle linked up to the genesis block, this the skeleton syncer should ping
+// the backfiller to resume. Since we already have that logic on sync start,
+// piggie-back on that instead of 2 entrypoints.
+var errSyncLinked = errors.New("sync linked")
+
+// errSyncMerged is an internal helper error to signal that the current sync
+// cycle merged with a previously aborted subchain, thus the skeleton syncer
+// should abort and restart with the new state.
+var errSyncMerged = errors.New("sync merged")
+
+// errSyncReorged is an internal helper error to signal that the head chain of
+// the current sync cycle was (partially) reorged, thus the skeleton syncer
+// should abort and restart with the new state.
+var errSyncReorged = errors.New("sync reorged")
+
+// errTerminated is returned if the sync mechanism was terminated for this run of
+// the process. This is usually the case when Geth is shutting down and some events
+// might still be propagating.
+var errTerminated = errors.New("terminated")
+
+func init() {
+ // Tuning parameters is nice, but the scratch space must be assignable in
+ // full to peers. It's a useless cornercase to support a dangling half-group.
+ if scratchHeaders%requestHeaders != 0 {
+ panic("Please make scratchHeaders divisible by requestHeaders")
+ }
+}
+
+// subchain is a contiguous header chain segment that is backed by the database,
+// but may not be linked to the live chain. The skeleton downloader may produce
+// a new one of these every time it is restarted until the subchain grows large
+// enough to connect with a previous subchain.
+//
+// The subchains use the exact same database namespace and are not disjoint from
+// each other. As such, extending one to overlap the other entails reducing the
+// second one first. This combined buffer model is used to avoid having to move
+// data on disk when two subchains are joined together.
+type subchain struct {
+ Head uint64 // Block number of the newest header in the subchain
+ Tail uint64 // Block number of the oldest header in the subchain
+ Next common.Hash // Block hash of the next oldest header in the subchain
+}
+
+// skeletonProgress is a database entry to allow suspending and resuming a chain
+// sync. As the skeleton header chain is downloaded backwards, restarts can and
+// will produce temporarily disjoint subchains. There is no way to restart a
+// suspended skeleton sync without prior knowledge of all prior suspension points.
+type skeletonProgress struct {
+ Subchains []*subchain // Disjoint subchains downloaded until now
+}
+
+// headerRequest tracks a pending header request to ensure responses are to
+// actual requests and to validate any security constraints.
+//
+// Concurrency note: header requests and responses are handled concurrently from
+// the main runloop to allow Keccak256 hash verifications on the peer's thread and
+// to drop on invalid response. The request struct must contain all the data to
+// construct the response without accessing runloop internals (i.e. subchains).
+// That is only included to allow the runloop to match a response to the task being
+// synced without having yet another set of maps.
+type headerRequest struct {
+ peer string // Peer to which this request is assigned
+ id uint64 // Request ID of this request
+
+ deliver chan *headerResponse // Channel to deliver successful response on
+ revert chan *headerRequest // Channel to deliver request failure on
+ cancel chan struct{} // Channel to track sync cancellation
+ stale chan struct{} // Channel to signal the request was dropped
+
+ head uint64 // Head number of the requested batch of headers
+}
+
+// headerResponse is an already verified remote response to a header request.
+type headerResponse struct {
+ peer *peerConnection // Peer from which this response originates
+ reqid uint64 // Request ID that this response fulfils
+ headers []*types.Header // Chain of headers
+}
+
+// backfiller is a callback interface through which the skeleton sync can tell
+// the downloader that it should suspend or resume backfilling on specific head
+// events (e.g. suspend on forks or gaps, resume on successful linkups).
+type backfiller interface {
+ // suspend requests the backfiller to abort any running full or snap sync
+ // based on the skeleton chain as it might be invalid. The backfiller should
+ // gracefully handle multiple consecutive suspends without a resume, even
+ // on initial sartup.
+ suspend()
+
+ // resume requests the backfiller to start running fill or snap sync based on
+ // the skeleton chain as it has successfully been linked. Appending new heads
+ // to the end of the chain will not result in suspend/resume cycles.
+ resume()
+}
+
+// skeleton represents a header chain synchronized after the Ethereum 2 merge,
+// where blocks aren't validated any more via PoW in a forward fashion, rather
+// are dictated and extended at the head via the beacon chain and backfilled on
+// the original Ethereum 1 block sync protocol.
+//
+// Since the skeleton is grown backwards from head to genesis, it is handled as
+// a separate entity, not mixed in with the logical sequential transition of the
+// blocks. Once the skeleton is connected to an existing, validated chain, the
+// headers will be moved into the main downloader for filling and execution.
+//
+// Opposed to the Ethereum 1 block synchronization which is trustless (and uses a
+// master peer to minimize the attack surface), Ethereum 2 block synchronization
+// starts from a trusted head. As such, there is no need for a master peer any
+// more and headers can be requested fully concurrently (though some batches might
+// be discarded if they don't link up correctly).
+//
+// Although a skeleton is part of a sync cycle, it is not recreated, rather stays
+// alive throughout the lifetime of the downloader. This allows it to be extended
+// concurrently with the sync cycle, since extensions arrive from an API surface,
+// not from within (vs. Ethereum 1 sync).
+//
+// Since the skeleton tracks the entire header chain until it is cosumed by the
+// forward block filling, it needs 0.5KB/block storage. At current mainnet sizes
+// this is only possible with a disk backend. Since the skeleton is separate from
+// the node's header chain, storing the headers ephemerally until sync finishes
+// is wasted disk IO, but it's a price we're going to pay to keep things simple
+// for now.
+type skeleton struct {
+ db ethdb.Database // Database backing the skeleton
+ filler backfiller // Chain syncer suspended/resumed by head events
+
+ peers *peerSet // Set of peers we can sync from
+ idles map[string]*peerConnection // Set of idle peers in the current sync cycle
+ drop peerDropFn // Drops a peer for misbehaving
+
+ progress *skeletonProgress // Sync progress tracker for resumption and metrics
+ started time.Time // Timestamp when the skeleton syncer was created
+ logged time.Time // Timestamp when progress was last logged to the user
+ pulled uint64 // Number of headers downloaded in this run
+
+ scratchSpace []*types.Header // Scratch space to accumulate headers in (first = recent)
+ scratchOwners []string // Peer IDs owning chunks of the scratch space (pend or delivered)
+ scratchHead uint64 // Block number of the first item in the scratch space
+
+ requests map[uint64]*headerRequest // Header requests currently running
+
+ headEvents chan *types.Header // Notification channel for new heads
+ terminate chan chan error // Termination channel to abort sync
+ terminated chan struct{} // Channel to signal that the syner is dead
+
+ // Callback hooks used during testing
+ syncStarting func() // callback triggered after a sync cycle is inited but before started
+}
+
+// newSkeleton creates a new sync skeleton that tracks a potentially dangling
+// header chain until it's linked into an existing set of blocks.
+func newSkeleton(db ethdb.Database, peers *peerSet, drop peerDropFn, filler backfiller) *skeleton {
+ sk := &skeleton{
+ db: db,
+ filler: filler,
+ peers: peers,
+ drop: drop,
+ requests: make(map[uint64]*headerRequest),
+ headEvents: make(chan *types.Header),
+ terminate: make(chan chan error),
+ terminated: make(chan struct{}),
+ }
+ go sk.startup()
+ return sk
+}
+
+// startup is an initial background loop which waits for an event to start or
+// tear the syncer down. This is required to make the skeleton sync loop once
+// per process but at the same time not start before the beacon chain announces
+// a new (existing) head.
+func (s *skeleton) startup() {
+ // Close a notification channel so anyone sending us events will know if the
+ // sync loop was torn down for good.
+ defer close(s.terminated)
+
+ // Wait for startup or teardown
+ select {
+ case errc := <-s.terminate:
+ // No head was announced but Geth is shutting down
+ errc <- nil
+ return
+
+ case head := <-s.headEvents:
+ // New head announced, start syncing to it, looping every time a current
+ // cycle is terminated due to a chain event (head reorg, old chain merge)
+ s.started = time.Now()
+
+ for {
+ // If the sync cycle terminated or was terminated, propagate up when
+ // higher layers request termination. There's no fancy explicit error
+ // signalling as the sync loop should never terminate (TM).
+ newhead, err := s.sync(head)
+ switch {
+ case err == errSyncLinked:
+ // Sync cycle linked up to the genesis block. Tear down the loop
+ // and restart it so, it can properly notify the backfiller. Don't
+ // account a new head.
+ head = nil
+
+ case err == errSyncMerged:
+ // Subchains were merged, we just need to reinit the internal
+ // start to continue on the tail of the merged chain. Don't
+ // announce a new head,
+ head = nil
+
+ case err == errSyncReorged:
+ // The subchain being synced got modified at the head in a
+ // way that requires resyncing it. Restart sync with the new
+ // head to force a cleanup.
+ head = newhead
+
+ case err == errTerminated:
+ // Sync was requested to be terminated from within, stop and
+ // return (no need to pass a message, was already done internally)
+ return
+
+ default:
+ // Sync either successfully terminated or failed with an unhandled
+ // error. Abort and wait until Geth requests a termination.
+ errc := <-s.terminate
+ errc <- err
+ return
+ }
+ }
+ }
+}
+
+// Terminate tears down the syncer indefinitely.
+func (s *skeleton) Terminate() error {
+ // Request termination and fetch any errors
+ errc := make(chan error)
+ s.terminate <- errc
+ err := <-errc
+
+ // Wait for full shutdown (not necessary, but cleaner)
+ <-s.terminated
+ return err
+}
+
+// Sync starts or resumes a previous sync cycle to download and maintain a reverse
+// header chain starting at the head and leading towards genesis to an available
+// ancestor.
+//
+// This method does not block, rather it just waits until the syncer receives the
+// fed header. What the syncer does with it is the syncer's problem.
+func (s *skeleton) Sync(head *types.Header) error {
+ log.Trace("New skeleton head announced", "number", head.Number, "hash", head.Hash())
+ select {
+ case s.headEvents <- head:
+ return nil
+ case <-s.terminated:
+ return errTerminated
+ }
+}
+
+// sync is the internal version of Sync that executes a single sync cycle, either
+// until some termination condition is reached, or until the current cycle merges
+// with a previously aborted run.
+func (s *skeleton) sync(head *types.Header) (*types.Header, error) {
+ // If we're continuing a previous merge interrupt, just access the existing
+ // old state without initing from disk.
+ if head == nil {
+ head = rawdb.ReadSkeletonHeader(s.db, s.progress.Subchains[0].Head)
+ } else {
+ // Otherwise, initialize the sync, trimming and previous leftovers until
+ // we're consistent with the newly requested chain head
+ s.initSync(head)
+ }
+ // Create the scratch space to fill with concurrently downloaded headers
+ s.scratchSpace = make([]*types.Header, scratchHeaders)
+ defer func() { s.scratchSpace = nil }() // don't hold on to references after sync
+
+ s.scratchOwners = make([]string, scratchHeaders/requestHeaders)
+ defer func() { s.scratchOwners = nil }() // don't hold on to references after sync
+
+ s.scratchHead = s.progress.Subchains[0].Tail - 1 // tail must not be 0!
+
+ // If the sync is already done, resume the backfiller. When the loop stops,
+ // terminate the backfiller too.
+ if s.scratchHead == 0 {
+ s.filler.resume()
+ }
+ defer s.filler.suspend()
+
+ // Create a set of unique channels for this sync cycle. We need these to be
+ // ephemeral so a data race doesn't accidentally deliver something stale on
+ // a persistent channel across syncs (yup, this happened)
+ var (
+ requestFails = make(chan *headerRequest)
+ responses = make(chan *headerResponse)
+ )
+ cancel := make(chan struct{})
+ defer close(cancel)
+
+ log.Debug("Starting reverse header sync cycle", "head", head.Number, "hash", head.Hash(), "cont", s.scratchHead)
+
+ // Whether sync completed or not, disregard any future packets
+ defer func() {
+ log.Debug("Terminating reverse header sync cycle", "head", head.Number, "hash", head.Hash(), "cont", s.scratchHead)
+ s.requests = make(map[uint64]*headerRequest)
+ }()
+
+ // Start tracking idle peers for task assignments
+ peering := make(chan *peeringEvent, 64) // arbitrary buffer, just some burst protection
+
+ peeringSub := s.peers.SubscribeEvents(peering)
+ defer peeringSub.Unsubscribe()
+
+ s.idles = make(map[string]*peerConnection)
+ for _, peer := range s.peers.AllPeers() {
+ s.idles[peer.id] = peer
+ }
+ // Nofity any tester listening for startup events
+ if s.syncStarting != nil {
+ s.syncStarting()
+ }
+ for {
+ // Something happened, try to assign new tasks to any idle peers
+ s.assingTasks(responses, requestFails, cancel)
+
+ // Wait for something to happen
+ select {
+ case event := <-peering:
+ // A peer joined or left, the tasks queue and allocations need to be
+ // checked for potential assignment or reassignment
+ peerid := event.peer.id
+ if event.join {
+ s.idles[peerid] = event.peer
+ } else {
+ s.revertRequests(peerid)
+ delete(s.idles, peerid)
+ }
+
+ case errc := <-s.terminate:
+ errc <- nil
+ return nil, errTerminated
+
+ case head := <-s.headEvents:
+ // New head was announced, try to integrate it. If successful, nothing
+ // needs to be done as the head simply extended the last range. For now
+ // we don't seamlessly integrate reorgs to keep things simple. If the
+ // network starts doing many mini reorgs, it might be worthwhile handling
+ // a limited depth without an error.
+ if reorged := s.processNewHead(head); reorged {
+ return head, errSyncReorged
+ }
+ // New head was integrated into the skeleton chain. If the backfiller
+ // is still running, it will pick it up. If it already terminated,
+ // a new cycle needs to be spun up.
+ if s.scratchHead == 0 {
+ s.filler.resume()
+ }
+
+ case req := <-requestFails:
+ s.revertRequest(req)
+
+ case res := <-responses:
+ // Process the batch of headers. If though processing we managed to
+ // link the curret subchain to a previously downloaded one, abort the
+ // sync and restart with the merged subchains. We could probably hack
+ // the internal state to switch the scratch space over to the tail of
+ // the extended subchain, but since the scenario is rare, it's cleaner
+ // to rely on the restart mechanism than a stateful modification.
+ if merged := s.processResponse(res); merged {
+ return nil, errSyncMerged
+ }
+ // If we've just reached the genesis block, tear down the sync cycle
+ // and restart it to resume the backfiller. We could just as well do
+ // a signalling here, but it's a tad cleaner to have only one entry
+ // pathway to suspending/resuming it.
+ return nil, errSyncLinked
+ }
+ }
+}
+
+// initSync attempts to get the skeleton sync into a consistent state wrt any
+// past state on disk and the newly requested head to sync to. If the new head
+// is nil, the method will return and continue from the previous head.
+func (s *skeleton) initSync(head *types.Header) {
+ // Extract the head number, we'll need it all over
+ number := head.Number.Uint64()
+
+ // Retrieve the previously saved sync progress
+ if status := rawdb.ReadSkeletonSyncStatus(s.db); len(status) > 0 {
+ s.progress = new(skeletonProgress)
+ if err := json.Unmarshal(status, s.progress); err != nil {
+ log.Error("Failed to decode skeleton sync status", "err", err)
+ } else {
+ // Previous sync was available, print some continuation logs
+ for _, subchain := range s.progress.Subchains {
+ log.Debug("Restarting skeleton subchain", "head", subchain.Head, "tail", subchain.Tail)
+ }
+ // Create a new subchain for the head (unless the last can be extended),
+ // trimming anything it would overwrite
+ headchain := &subchain{
+ Head: number,
+ Tail: number,
+ Next: head.ParentHash,
+ }
+ for len(s.progress.Subchains) > 0 {
+ // If the last chain is above the new head, delete altogether
+ lastchain := s.progress.Subchains[0]
+ if lastchain.Tail >= headchain.Tail {
+ log.Debug("Dropping skeleton subchain", "head", lastchain.Head, "tail", lastchain.Tail)
+ s.progress.Subchains = s.progress.Subchains[1:]
+ continue
+ }
+ // Otherwise truncate the last chain if needed and abort trimming
+ if lastchain.Head >= headchain.Tail {
+ log.Debug("Trimming skeleton subchain", "oldhead", lastchain.Head, "newhead", headchain.Tail-1, "tail", lastchain.Tail)
+ lastchain.Head = headchain.Tail - 1
+ }
+ break
+ }
+ // If the last subchain can be extended, we're lucky. Otherwise create
+ // a new subchain sync task.
+ var extended bool
+ if n := len(s.progress.Subchains); n > 0 {
+ lastchain := s.progress.Subchains[0]
+ if lastchain.Head == headchain.Tail-1 {
+ lasthead := rawdb.ReadSkeletonHeader(s.db, lastchain.Head)
+ if lasthead.Hash() == head.ParentHash {
+ log.Debug("Extended skeleton subchain with new head", "head", headchain.Tail, "tail", lastchain.Tail)
+ lastchain.Head = headchain.Tail
+ extended = true
+ }
+ }
+ }
+ if !extended {
+ log.Debug("Created new skeleton subchain", "head", number, "tail", number)
+ s.progress.Subchains = append([]*subchain{headchain}, s.progress.Subchains...)
+ }
+ // Update the database with the new sync stats and insert the new
+ // head header. We won't delete any trimmed skeleton headers since
+ // those will be outside the index space of the many subchains and
+ // the database space will be reclaimed eventually when processing
+ // blocks above the current head (TODO(karalabe): don't forget).
+ batch := s.db.NewBatch()
+
+ rawdb.WriteSkeletonHeader(batch, head)
+ s.saveSyncStatus(batch)
+
+ if err := batch.Write(); err != nil {
+ log.Crit("Failed to write skeleton sync status", "err", err)
+ }
+ return
+ }
+ }
+ // Either we've failed to decode the previus state, or there was none. Start
+ // a fresh sync with a single subchain represented by the currently sent
+ // chain head.
+ s.progress = &skeletonProgress{
+ Subchains: []*subchain{
+ {
+ Head: number,
+ Tail: number,
+ Next: head.ParentHash,
+ },
+ },
+ }
+ batch := s.db.NewBatch()
+
+ rawdb.WriteSkeletonHeader(batch, head)
+ s.saveSyncStatus(batch)
+
+ if err := batch.Write(); err != nil {
+ log.Crit("Failed to write initial skeleton sync status", "err", err)
+ }
+ log.Debug("Created initial skeleton subchain", "head", number, "tail", number)
+}
+
+// saveSyncStatus marshals the remaining sync tasks into leveldb.
+func (s *skeleton) saveSyncStatus(db ethdb.KeyValueWriter) {
+ status, err := json.Marshal(s.progress)
+ if err != nil {
+ panic(err) // This can only fail during implementation
+ }
+ rawdb.WriteSkeletonSyncStatus(db, status)
+}
+
+// processNewHead does the internal shuffling for a new head marker and either
+// accepts and integrates it into the skeleton or requests a reorg. Upon reorg,
+// the syncer will tear itself down and restart with a fresh head. It is simpler
+// to reconstruct the sync state than to mutate it and hope for the best.
+func (s *skeleton) processNewHead(head *types.Header) bool {
+ // If the header cannot be inserted without interruption, return an error for
+ // the outer loop to tear down the skeleton sync and restart it
+ number := head.Number.Uint64()
+
+ lastchain := s.progress.Subchains[0]
+ if lastchain.Tail >= number {
+ log.Warn("Beacon chain reorged", "tail", lastchain.Tail, "newHead", number)
+ return true
+ }
+ if lastchain.Head+1 < number {
+ log.Warn("Beacon chain gapped", "head", lastchain.Head, "newHead", number)
+ return true
+ }
+ if parent := rawdb.ReadSkeletonHeader(s.db, number-1); parent.Hash() != head.ParentHash {
+ log.Warn("Beacon chain forked", "ancestor", parent.Number, "hash", parent.Hash(), "want", head.ParentHash)
+ return true
+ }
+ // New header seems to be in the last subchain range. Unwind any extra headers
+ // from the chain tip and insert the new head. We won't delete any trimmed
+ // skeleton headers since those will be outside the index space of the many
+ // subchains and the database space will be reclaimed eventually when processing
+ // blocks above the current head (TODO(karalabe): don't forget).
+ batch := s.db.NewBatch()
+
+ rawdb.WriteSkeletonHeader(batch, head)
+ lastchain.Head = number
+ s.saveSyncStatus(batch)
+
+ if err := batch.Write(); err != nil {
+ log.Crit("Failed to write skeleton sync status", "err", err)
+ }
+ return false
+}
+
+// assingTasks attempts to match idle peers to pending header retrievals.
+func (s *skeleton) assingTasks(success chan *headerResponse, fail chan *headerRequest, cancel chan struct{}) {
+ // Sort the peers by download capacity to use faster ones if many available
+ idlers := &peerCapacitySort{
+ peers: make([]*peerConnection, 0, len(s.idles)),
+ caps: make([]int, 0, len(s.idles)),
+ }
+ targetTTL := s.peers.rates.TargetTimeout()
+ for _, peer := range s.idles {
+ idlers.peers = append(idlers.peers, peer)
+ idlers.caps = append(idlers.caps, s.peers.rates.Capacity(peer.id, eth.BlockHeadersMsg, targetTTL))
+ }
+ if len(idlers.peers) == 0 {
+ return
+ }
+ sort.Sort(idlers)
+
+ // Find header regions not yet downloading and fill them
+ for task, owner := range s.scratchOwners {
+ // If we're out of idle peers, stop assigning tasks
+ if len(idlers.peers) == 0 {
+ return
+ }
+ // Skip any tasks already filling
+ if owner != "" {
+ continue
+ }
+ // If we've reached the genesis, stop assigning tasks
+ if uint64(task*requestHeaders) >= s.scratchHead {
+ return
+ }
+ // Found a task and have peers available, assign it
+ idle := idlers.peers[0]
+
+ idlers.peers = idlers.peers[1:]
+ idlers.caps = idlers.caps[1:]
+
+ // Matched a pending task to an idle peer, allocate a unique request id
+ var reqid uint64
+ for {
+ reqid = uint64(rand.Int63())
+ if reqid == 0 {
+ continue
+ }
+ if _, ok := s.requests[reqid]; ok {
+ continue
+ }
+ break
+ }
+ // Generate the network query and send it to the peer
+ req := &headerRequest{
+ peer: idle.id,
+ id: reqid,
+ deliver: success,
+ revert: fail,
+ cancel: cancel,
+ stale: make(chan struct{}),
+ head: s.scratchHead - uint64(task*requestHeaders),
+ }
+ s.requests[reqid] = req
+ delete(s.idles, idle.id)
+
+ // Generate the network query and send it to the peer
+ go s.executeTask(idle, req)
+
+ // Inject the request into the task to block further assignments
+ s.scratchOwners[task] = idle.id
+ }
+}
+
+// executeTask executes a single fetch request, blocking until either a result
+// arrives or a timeouts / cancellation is triggered. The method should be run
+// on its own goroutine and will deliver on the requested channels.
+func (s *skeleton) executeTask(peer *peerConnection, req *headerRequest) {
+ start := time.Now()
+ resCh := make(chan *eth.Response)
+
+ // Figure out how many headers to fetch. Usually this will be a full batch,
+ // but for the very tail of the chain, trim the request to the number left.
+ // Since nodes may or may not return the genesis header for a batch request,
+ // don't even request it. The parent hash of block #1 is enough to link.
+ requestCount := requestHeaders
+ if req.head < requestHeaders {
+ requestCount = int(req.head)
+ }
+ peer.log.Trace("Fetching skeleton headers", "from", req.head, "count", requestCount)
+ netreq, err := peer.peer.RequestHeadersByNumber(req.head, requestCount, 0, true, resCh)
+ if err != nil {
+ peer.log.Trace("Failed to request headers", "err", err)
+ s.scheduleRevertRequest(req)
+ return
+ }
+ defer netreq.Close()
+
+ // Wait until the response arrives, the request is cancelled or times out
+ ttl := s.peers.rates.TargetTimeout()
+
+ timeoutTimer := time.NewTimer(ttl)
+ defer timeoutTimer.Stop()
+
+ select {
+ case <-req.cancel:
+ peer.log.Debug("Header request cancelled")
+ s.scheduleRevertRequest(req)
+
+ case <-timeoutTimer.C:
+ // Header retrieval timed out, update the metrics
+ peer.log.Trace("Header request timed out", "elapsed", ttl)
+ headerTimeoutMeter.Mark(1)
+ s.peers.rates.Update(peer.id, eth.BlockHeadersMsg, 0, 0)
+ s.scheduleRevertRequest(req)
+
+ case res := <-resCh:
+ // Headers successfully retrieved, update the metrics
+ headers := *res.Res.(*eth.BlockHeadersPacket)
+
+ headerReqTimer.Update(time.Since(start))
+ s.peers.rates.Update(peer.id, eth.BlockHeadersMsg, res.Time, len(headers))
+
+ // Cross validate the headers with the requests
+ switch {
+ case len(headers) == 0:
+ // No headers were delivered, reject the response and reschedule
+ peer.log.Debug("No headers delivered")
+ res.Done <- errors.New("no headers delivered")
+ s.scheduleRevertRequest(req)
+
+ case headers[0].Number.Uint64() != req.head:
+ // Header batch anchored at non-requested number
+ peer.log.Debug("Invalid header response head", "have", headers[0].Number, "want", req.head)
+ res.Done <- errors.New("invalid header batch anchor")
+ s.scheduleRevertRequest(req)
+
+ case headers[0].Number.Uint64() >= requestHeaders && len(headers) != requestHeaders:
+ // Invalid number of non-genesis headers delivered, reject the response and reschedule
+ peer.log.Debug("Invalid non-genesis header count", "have", len(headers), "want", requestHeaders)
+ res.Done <- errors.New("not enough non-genesis headers delivered")
+ s.scheduleRevertRequest(req)
+
+ case headers[0].Number.Uint64() < requestHeaders && uint64(len(headers)) != headers[0].Number.Uint64():
+ // Invalid number of genesis headers delivered, reject the response and reschedule
+ peer.log.Debug("Invalid genesis header count", "have", len(headers), "want", headers[0].Number.Uint64())
+ res.Done <- errors.New("not enough genesis headers delivered")
+ s.scheduleRevertRequest(req)
+
+ default:
+ // Packet seems structurally valid, check hash progression and if it
+ // is correct too, deliver for storage
+ for i := 0; i < len(headers)-1; i++ {
+ if headers[i].ParentHash != headers[i+1].Hash() {
+ peer.log.Debug("Invalid genesis header count", "have", len(headers), "want", headers[0].Number.Uint64())
+ res.Done <- errors.New("not enough genesis headers delivered")
+ s.scheduleRevertRequest(req)
+ return
+ }
+ }
+ // Hash chain is valid. The delivery might still be junk as we're
+ // downloading batches concurrently (so no way to link the headers
+ // until gaps are filled); in that case, we'll nuke the peer when
+ // we detect the fault.
+ res.Done <- nil
+
+ select {
+ case req.deliver <- &headerResponse{
+ peer: peer,
+ reqid: req.id,
+ headers: headers,
+ }:
+ case <-req.cancel:
+ }
+ }
+ }
+}
+
+// revertRequests locates all the currently pending reuqests from a particular
+// peer and reverts them, rescheduling for others to fulfill.
+func (s *skeleton) revertRequests(peer string) {
+ // Gather the requests first, revertals need the lock too
+ var requests []*headerRequest
+ for _, req := range s.requests {
+ if req.peer == peer {
+ requests = append(requests, req)
+ }
+ }
+ // Revert all the requests matching the peer
+ for _, req := range requests {
+ s.revertRequest(req)
+ }
+}
+
+// scheduleRevertRequest asks the event loop to clean up a request and return
+// all failed retrieval tasks to the scheduler for reassignment.
+func (s *skeleton) scheduleRevertRequest(req *headerRequest) {
+ select {
+ case req.revert <- req:
+ // Sync event loop notified
+ case <-req.cancel:
+ // Sync cycle got cancelled
+ case <-req.stale:
+ // Request already reverted
+ }
+}
+
+// revertRequest cleans up a request and returns all failed retrieval tasks to
+// the scheduler for reassignment.
+//
+// Note, this needs to run on the event runloop thread to reschedule to idle peers.
+// On peer threads, use scheduleRevertRequest.
+func (s *skeleton) revertRequest(req *headerRequest) {
+ log.Trace("Reverting header request", "peer", req.peer, "reqid", req.id)
+ select {
+ case <-req.stale:
+ log.Trace("Header request already reverted", "peer", req.peer, "reqid", req.id)
+ return
+ default:
+ }
+ close(req.stale)
+
+ // Remove the request from the tracked set
+ delete(s.requests, req.id)
+
+ // Remove the request from the tracked set and mark the task as not-pending,
+ // ready for resheduling
+ s.scratchOwners[(s.scratchHead-req.head)/requestHeaders] = ""
+}
+
+func (s *skeleton) processResponse(res *headerResponse) bool {
+ res.peer.log.Trace("Processing header response", "head", res.headers[0].Number, "hash", res.headers[0].Hash(), "count", len(res.headers))
+
+ // Whether or not the response is valid, we can mark the peer as idle and
+ // notify the scheduler to assign a new task. If the response is invalid,
+ // we'll drop the peer in a bit.
+ s.idles[res.peer.id] = res.peer
+
+ // Ensure the response is for a valid request
+ if _, ok := s.requests[res.reqid]; !ok {
+ // Request stale, perhaps the peer timed out but came through in the end
+ res.peer.log.Warn("Unexpected header packet")
+ return false
+ }
+ delete(s.requests, res.reqid)
+
+ // Insert the delivered headers into the scratch space independent of the
+ // content or continuation; those will be validated in a moment
+ head := res.headers[0].Number.Uint64()
+ copy(s.scratchSpace[s.scratchHead-head:], res.headers)
+
+ // If there's still a gap in the head of the scratch space, abort
+ if s.scratchSpace[0] == nil {
+ return false
+ }
+ // Try to consume any head headers, validating the boundary conditions
+ var merged bool // Whether subchains were merged
+
+ batch := s.db.NewBatch()
+ for s.scratchSpace[0] != nil {
+ // Next batch of headers available, cross-reference with the subchain
+ // we are extending and either accept or discard
+ if s.progress.Subchains[0].Next != s.scratchSpace[0].Hash() {
+ // Print a log messages to track what's going on
+ tail := s.progress.Subchains[0].Tail
+ want := s.progress.Subchains[0].Next
+ have := s.scratchSpace[0].Hash()
+
+ log.Warn("Invalid skeleton headers", "peer", s.scratchOwners[0], "number", tail-1, "want", want, "have", have)
+
+ // The peer delivered junk, or at least not the subchain we are
+ // syncing to. Free up the scratch space and assignment, reassign
+ // and drop the original peer.
+ for i := 0; i < requestHeaders; i++ {
+ s.scratchSpace[i] = nil
+ }
+ s.drop(s.scratchOwners[0])
+ s.scratchOwners[0] = ""
+ break
+ }
+ // Scratch delivery matches required subchain, deliver the batch of
+ // headers and push the subchain forward
+ var consumed int
+ for _, header := range s.scratchSpace[:requestHeaders] {
+ if header != nil { // nil when the genesis is reached
+ consumed++
+
+ rawdb.WriteSkeletonHeader(batch, header)
+ s.pulled++
+
+ s.progress.Subchains[0].Tail--
+ s.progress.Subchains[0].Next = header.ParentHash
+ }
+ }
+ // Batch of headers consumed, shift the download window forward
+ head := s.progress.Subchains[0].Head
+ tail := s.progress.Subchains[0].Tail
+ next := s.progress.Subchains[0].Next
+
+ log.Trace("Primary subchain extended", "head", head, "tail", tail, "next", next)
+
+ copy(s.scratchSpace, s.scratchSpace[requestHeaders:])
+ for i := 0; i < requestHeaders; i++ {
+ s.scratchSpace[scratchHeaders-i-1] = nil
+ }
+ copy(s.scratchOwners, s.scratchOwners[1:])
+ s.scratchOwners[scratchHeaders/requestHeaders-1] = ""
+
+ s.scratchHead -= uint64(consumed)
+
+ // If the subchain extended into the next subchain, we need to handle
+ // the overlap. Since there could be many overlaps (come on), do this
+ // in a loop.
+ for len(s.progress.Subchains) > 1 && s.progress.Subchains[1].Head >= s.progress.Subchains[0].Tail {
+ // Extract some stats from the second subchain
+ head := s.progress.Subchains[1].Head
+ tail := s.progress.Subchains[1].Tail
+ next := s.progress.Subchains[1].Next
+
+ // Since we just overwrote part of the next subchain, we need to trim
+ // its head independent of matching or mismatching content
+ if s.progress.Subchains[1].Tail >= s.progress.Subchains[0].Tail {
+ // Fully overwritten, get rid of the subchain as a whole
+ log.Debug("Previous subchain fully overwritten", "head", head, "tail", tail, "next", next)
+ s.progress.Subchains = append(s.progress.Subchains[:1], s.progress.Subchains[2:]...)
+ continue
+ } else {
+ // Partially overwritten, trim the head to the overwritten size
+ log.Debug("Previous subchain partially overwritten", "head", head, "tail", tail, "next", next)
+ s.progress.Subchains[1].Head = s.progress.Subchains[0].Tail - 1
+ }
+ // If the old subchain is an extension of the new one, merge the two
+ // and let the skeleton syncer restart (to clean internal state)
+ if rawdb.ReadSkeletonHeader(s.db, s.progress.Subchains[1].Head).Hash() == s.progress.Subchains[0].Next {
+ log.Debug("Previous subchain merged", "head", head, "tail", tail, "next", next)
+ s.progress.Subchains[0].Tail = s.progress.Subchains[1].Tail
+ s.progress.Subchains[0].Next = s.progress.Subchains[1].Next
+
+ s.progress.Subchains = append(s.progress.Subchains[:1], s.progress.Subchains[2:]...)
+ merged = true
+ }
+ }
+ }
+ s.saveSyncStatus(batch)
+ if err := batch.Write(); err != nil {
+ log.Crit("Failed to write skeleton headers and progress", "err", err)
+ }
+ // Print a progress report to make the UX a bit nicer
+ left := s.progress.Subchains[0].Tail - 1
+ if time.Since(s.logged) > 8*time.Second || left == 0 {
+ s.logged = time.Now()
+
+ if s.pulled == 0 {
+ log.Info("Beacon sync starting", "left", left)
+ } else {
+ eta := float64(time.Since(s.started)) / float64(s.pulled) * float64(left)
+ log.Info("Syncing beacon headers", "downloaded", s.pulled, "left", left, "eta", common.PrettyDuration(eta))
+ }
+ }
+ return merged
+}
+
+// Head retrieves the current head tracked by the skeleton syncer. This method
+// is meant to be used by the backfiller, whose life cycle is controlled by the
+// skeleton syncer.
+//
+// Note, the method will not use the internal state of the skeleton, but will
+// rather blindly pull stuff from the database. This is fine, because the back-
+// filler will only run when the skeleton chain is fully downloaded and stable.
+// There might be new heads appended, but those are atomic from the perspective
+// of this method. Any head reorg will first tear down the backfiller and only
+// then make the modification.
+func (s *skeleton) Head() (*types.Header, error) {
+ // Read the current sync progress from disk and figure out the current head.
+ // Although there's a lot of error handling here, these are mostly as sanity
+ // checks to avoid crashing if a programming error happens. These should not
+ // happen in live code.
+ status := rawdb.ReadSkeletonSyncStatus(s.db)
+ if len(status) == 0 {
+ return nil, errors.New("beacon sync not yet started")
+ }
+ progress := new(skeletonProgress)
+ if err := json.Unmarshal(status, progress); err != nil {
+ return nil, err
+ }
+ if progress.Subchains[0].Tail != 1 {
+ return nil, errors.New("beacon sync not yet finished")
+ }
+ return rawdb.ReadSkeletonHeader(s.db, progress.Subchains[0].Head), nil
+}
+
+// Header retrieves a specific header tracked by the skeleton syncer. This method
+// is meant to be used by the backfiller, whose life cycle is controlled by the
+// skeleton syncer.
+//
+// Note, outside the permitted runtimes, this method might return nil results and
+// subsequent calls might return headers from different chains.
+func (s *skeleton) Header(number uint64) *types.Header {
+ return rawdb.ReadSkeletonHeader(s.db, number)
+}
diff --git a/eth/downloader/skeleton_test.go b/eth/downloader/skeleton_test.go
new file mode 100644
index 000000000000..7c2e07a43866
--- /dev/null
+++ b/eth/downloader/skeleton_test.go
@@ -0,0 +1,257 @@
+// Copyright 2021 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see .
+
+package downloader
+
+import (
+ "encoding/json"
+ "math/big"
+ "os"
+ "testing"
+
+ "github.com/ethereum/go-ethereum/core/rawdb"
+ "github.com/ethereum/go-ethereum/core/types"
+ "github.com/ethereum/go-ethereum/log"
+)
+
+// hookedBackfiller is a tester backfiller with all interface methods mocked and
+// hooked so tests can implement only the things they need.
+type hookedBackfiller struct {
+ // suspendHook is an optional hook to be called when the filler is requested
+ // to be suspended.
+ suspendHook func()
+
+ // resumeHook is an optional hook to be called when the filler is requested
+ // to be resumed.
+ resumeHook func()
+}
+
+// suspend requests the backfiller to abort any running full or snap sync
+// based on the skeleton chain as it might be invalid. The backfiller should
+// gracefully handle multiple consecutive suspends without a resume, even
+// on initial sartup.
+func (hf *hookedBackfiller) suspend() {
+ if hf.suspendHook != nil {
+ hf.suspendHook()
+ }
+}
+
+// resume requests the backfiller to start running fill or snap sync based on
+// the skeleton chain as it has successfully been linked. Appending new heads
+// to the end of the chain will not result in suspend/resume cycles.
+func (hf *hookedBackfiller) resume() {
+ if hf.resumeHook != nil {
+ hf.resumeHook()
+ }
+}
+
+// newNoopBackfiller creates a hooked backfiller with all callbacks disabled,
+// essentially acting as a noop.
+func newNoopBackfiller() backfiller {
+ return new(hookedBackfiller)
+}
+
+// Tests various sync initialzations based on previous leftovers in the database
+// and announced heads.
+func TestSkeletonSyncInit(t *testing.T) {
+ log.Root().SetHandler(log.LvlFilterHandler(log.LvlTrace, log.StreamHandler(os.Stderr, log.TerminalFormat(true))))
+
+ // Create a few key headers
+ var (
+ genesis = &types.Header{Number: big.NewInt(0)}
+ block49 = &types.Header{Number: big.NewInt(49)}
+ block49B = &types.Header{Number: big.NewInt(49), Extra: []byte("B")}
+ block50 = &types.Header{Number: big.NewInt(50), ParentHash: block49.Hash()}
+ )
+ tests := []struct {
+ headers []*types.Header // Database content (beside the genesis)
+ oldstate []*subchain // Old sync state with various interrupted subchains
+ head *types.Header // New head header to announce to reorg to
+ newstate []*subchain // Expected sync state after the reorg
+ }{
+ // Completely empty database with only the genesis set. The sync is expected
+ // to create a single subchain with the requested head.
+ {
+ head: block50,
+ newstate: []*subchain{{Head: 50, Tail: 50}},
+ },
+ // Empty database with only the genesis set with a leftover empty sync
+ // progess. This is a synthetic case, just for the sake of covering things.
+ {
+ oldstate: []*subchain{},
+ head: block50,
+ newstate: []*subchain{{Head: 50, Tail: 50}},
+ },
+ // A single leftover subchain is present, older than the new head. The
+ // old subchain should be left as is and a new one appended to the sync
+ // status.
+ {
+ oldstate: []*subchain{{Head: 10, Tail: 5}},
+ head: block50,
+ newstate: []*subchain{
+ {Head: 50, Tail: 50},
+ {Head: 10, Tail: 5},
+ },
+ },
+ // Multiple leftover subchains are present, older than the new head. The
+ // old subchains should be left as is and a new one appended to the sync
+ // status.
+ {
+ oldstate: []*subchain{
+ {Head: 20, Tail: 15},
+ {Head: 10, Tail: 5},
+ },
+ head: block50,
+ newstate: []*subchain{
+ {Head: 50, Tail: 50},
+ {Head: 20, Tail: 15},
+ {Head: 10, Tail: 5},
+ },
+ },
+ // A single leftover subchain is present, newer than the new head. The
+ // newer subchain should be deleted and a fresh one created for the head.
+ {
+ oldstate: []*subchain{{Head: 65, Tail: 60}},
+ head: block50,
+ newstate: []*subchain{{Head: 50, Tail: 50}},
+ },
+ // Multiple leftover subchain is present, newer than the new head. The
+ // newer subchains should be deleted and a fresh one created for the head.
+ {
+ oldstate: []*subchain{
+ {Head: 75, Tail: 70},
+ {Head: 65, Tail: 60},
+ },
+ head: block50,
+ newstate: []*subchain{{Head: 50, Tail: 50}},
+ },
+
+ // Two leftover subchains are present, one fully older and one fully
+ // newer than the announced head. The head should delete the newer one,
+ // keeping the older one.
+ {
+ oldstate: []*subchain{
+ {Head: 65, Tail: 60},
+ {Head: 10, Tail: 5},
+ },
+ head: block50,
+ newstate: []*subchain{
+ {Head: 50, Tail: 50},
+ {Head: 10, Tail: 5},
+ },
+ },
+ // Multiple leftover subchains are present, some fully older and some
+ // fully newer than the announced head. The head should delete the newer
+ // ones, keeping the older ones.
+ {
+ oldstate: []*subchain{
+ {Head: 75, Tail: 70},
+ {Head: 65, Tail: 60},
+ {Head: 20, Tail: 15},
+ {Head: 10, Tail: 5},
+ },
+ head: block50,
+ newstate: []*subchain{
+ {Head: 50, Tail: 50},
+ {Head: 20, Tail: 15},
+ {Head: 10, Tail: 5},
+ },
+ },
+ // A single leftover subchain is present and the new head is extending
+ // it with one more header. We expect the subchain head to be pushed
+ // forward.
+ {
+ headers: []*types.Header{block49},
+ oldstate: []*subchain{{Head: 49, Tail: 5}},
+ head: block50,
+ newstate: []*subchain{{Head: 50, Tail: 5}},
+ },
+ // A single leftover subchain is present and although the new head does
+ // extend it number wise, the hash chain does not link up. We expect a
+ // new subchain to be created for the dangling head.
+ {
+ headers: []*types.Header{block49B},
+ oldstate: []*subchain{{Head: 49, Tail: 5}},
+ head: block50,
+ newstate: []*subchain{
+ {Head: 50, Tail: 50},
+ {Head: 49, Tail: 5},
+ },
+ },
+ // A single leftover subchain is present. A new head is announced that
+ // links into the middle of it, correctly anchoring into an existing
+ // header. We expect the old subchain to be truncated and extended with
+ // the new head.
+ {
+ headers: []*types.Header{block49},
+ oldstate: []*subchain{{Head: 100, Tail: 5}},
+ head: block50,
+ newstate: []*subchain{{Head: 50, Tail: 5}},
+ },
+ // A single leftover subchain is present. A new head is announced that
+ // links into the middle of it, but does not anchor into an existing
+ // header. We expect the old subchain to be truncated and a new chain
+ // be created for the dangling head.
+ {
+ headers: []*types.Header{block49B},
+ oldstate: []*subchain{{Head: 100, Tail: 5}},
+ head: block50,
+ newstate: []*subchain{
+ {Head: 50, Tail: 50},
+ {Head: 49, Tail: 5},
+ },
+ },
+ }
+ for i, tt := range tests {
+ // Create a fresh database and initialize it with the starting state
+ db := rawdb.NewMemoryDatabase()
+
+ rawdb.WriteHeader(db, genesis)
+ for _, header := range tt.headers {
+ rawdb.WriteSkeletonHeader(db, header)
+ }
+ if tt.oldstate != nil {
+ blob, _ := json.Marshal(&skeletonProgress{Subchains: tt.oldstate})
+ rawdb.WriteSkeletonSyncStatus(db, blob)
+ }
+ // Create a skeleton sync and run a cycle
+ wait := make(chan struct{})
+
+ skeleton := newSkeleton(db, newPeerSet(), func(string) {}, newNoopBackfiller())
+ skeleton.syncStarting = func() { close(wait) }
+ skeleton.Sync(tt.head)
+
+ <-wait
+ skeleton.Terminate()
+
+ // Ensure the correct resulting sync status
+ var progress skeletonProgress
+ json.Unmarshal(rawdb.ReadSkeletonSyncStatus(db), &progress)
+
+ if len(progress.Subchains) != len(tt.newstate) {
+ t.Errorf("test %d: subchain count mismatch: have %d, want %d", i, len(progress.Subchains), len(tt.newstate))
+ continue
+ }
+ for j := 0; j < len(progress.Subchains); j++ {
+ if progress.Subchains[j].Head != tt.newstate[j].Head {
+ t.Errorf("test %d: subchain %d head mismatch: have %d, want %d", i, j, progress.Subchains[j].Head, tt.newstate[j].Head)
+ }
+ if progress.Subchains[j].Tail != tt.newstate[j].Tail {
+ t.Errorf("test %d: subchain %d tail mismatch: have %d, want %d", i, j, progress.Subchains[j].Tail, tt.newstate[j].Tail)
+ }
+ }
+ }
+}
diff --git a/eth/handler.go b/eth/handler.go
index 921a62dba501..1e0c543d54a6 100644
--- a/eth/handler.go
+++ b/eth/handler.go
@@ -171,10 +171,30 @@ func newHandler(config *handlerConfig) (*handler, error) {
h.checkpointNumber = (config.Checkpoint.SectionIndex+1)*params.CHTFrequency - 1
h.checkpointHash = config.Checkpoint.SectionHead
}
+ // If sync succeeds, pass a callback to potentially disable snap sync mode
+ // and enable transaction propagation.
+ success := func() {
+ // If we were running snap sync and it finished, disable doing another
+ // round on next sync cycle
+ if atomic.LoadUint32(&h.snapSync) == 1 {
+ log.Info("Snap sync complete, auto disabling")
+ atomic.StoreUint32(&h.snapSync, 0)
+ }
+ // If we've successfully finished a sync cycle and passed any required
+ // checkpoint, enable accepting transactions from the network
+ head := h.chain.CurrentBlock()
+ if head.NumberU64() >= h.checkpointNumber {
+ // Checkpoint passed, sanity check the timestamp to have a fallback mechanism
+ // for non-checkpointed (number = 0) private networks.
+ if head.Time() >= uint64(time.Now().AddDate(0, -1, 0).Unix()) {
+ atomic.StoreUint32(&h.acceptTxs, 1)
+ }
+ }
+ }
// Construct the downloader (long sync) and its backing state bloom if snap
// sync is requested. The downloader is responsible for deallocating the state
// bloom when it's done.
- h.downloader = downloader.New(h.checkpointNumber, config.Database, h.eventMux, h.chain, nil, h.removePeer)
+ h.downloader = downloader.New(h.checkpointNumber, config.Database, h.eventMux, h.chain, nil, h.removePeer, success)
// Construct the fetcher (short sync)
validator := func(header *types.Header) error {
diff --git a/eth/sync.go b/eth/sync.go
index b8ac67d3b2d1..384b42bfaa80 100644
--- a/eth/sync.go
+++ b/eth/sync.go
@@ -113,9 +113,9 @@ func (cs *chainSyncer) loop() {
defer cs.force.Stop()
for {
- if op := cs.nextSyncOp(); op != nil {
- cs.startSync(op)
- }
+ //if op := cs.nextSyncOp(); op != nil {
+ // cs.startSync(op)
+ //}
select {
case <-cs.peerEventCh:
// Peer information changed, recheck.
@@ -227,7 +227,7 @@ func (h *handler) doSync(op *chainSyncOp) error {
}
}
// Run the sync cycle, and disable snap sync if we're past the pivot block
- err := h.downloader.Synchronise(op.peer.ID(), op.head, op.td, op.mode)
+ err := h.downloader.LegacySync(op.peer.ID(), op.head, op.td, op.mode)
if err != nil {
return err
}
diff --git a/internal/web3ext/web3ext.go b/internal/web3ext/web3ext.go
index c4bdbaeb8d20..acc38dcd40ee 100644
--- a/internal/web3ext/web3ext.go
+++ b/internal/web3ext/web3ext.go
@@ -152,6 +152,11 @@ web3._extend({
call: 'admin_importChain',
params: 1
}),
+ new web3._extend.Method({
+ name: 'newHead',
+ call: 'admin_newHead',
+ params: 1
+ }),
new web3._extend.Method({
name: 'sleepBlocks',
call: 'admin_sleepBlocks',
diff --git a/les/catalyst/api.go b/les/catalyst/api.go
new file mode 100644
index 000000000000..0e5c07995b5a
--- /dev/null
+++ b/les/catalyst/api.go
@@ -0,0 +1,178 @@
+// Copyright 2022 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see .
+
+// Package catalyst implements the temporary eth1/eth2 RPC integration.
+package catalyst
+
+import (
+ "errors"
+ "fmt"
+
+ "github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/core/beacon"
+ "github.com/ethereum/go-ethereum/les"
+ "github.com/ethereum/go-ethereum/log"
+ "github.com/ethereum/go-ethereum/node"
+ "github.com/ethereum/go-ethereum/rpc"
+)
+
+// Register adds catalyst APIs to the light client.
+func Register(stack *node.Node, backend *les.LightEthereum) error {
+ log.Warn("Catalyst mode enabled", "protocol", "les")
+ stack.RegisterAPIs([]rpc.API{
+ {
+ Namespace: "engine",
+ Version: "1.0",
+ Service: NewConsensusAPI(backend),
+ Public: true,
+ },
+ })
+ return nil
+}
+
+type ConsensusAPI struct {
+ les *les.LightEthereum
+}
+
+// NewConsensusAPI creates a new consensus api for the given backend.
+// The underlying blockchain needs to have a valid terminal total difficulty set.
+func NewConsensusAPI(les *les.LightEthereum) *ConsensusAPI {
+ if les.BlockChain().Config().TerminalTotalDifficulty == nil {
+ panic("Catalyst started without valid total difficulty")
+ }
+ return &ConsensusAPI{les: les}
+}
+
+// ForkchoiceUpdatedV1 has several responsibilities:
+// If the method is called with an empty head block:
+// we return success, which can be used to check if the catalyst mode is enabled
+// If the total difficulty was not reached:
+// we return INVALID
+// If the finalizedBlockHash is set:
+// we check if we have the finalizedBlockHash in our db, if not we start a sync
+// We try to set our blockchain to the headBlock
+// If there are payloadAttributes:
+// we return an error since block creation is not supported in les mode
+func (api *ConsensusAPI) ForkchoiceUpdatedV1(heads beacon.ForkchoiceStateV1, payloadAttributes *beacon.PayloadAttributesV1) (beacon.ForkChoiceResponse, error) {
+ if heads.HeadBlockHash == (common.Hash{}) {
+ return beacon.STATUS_SUCCESS, nil
+ }
+ if err := api.checkTerminalTotalDifficulty(heads.HeadBlockHash); err != nil {
+ if header := api.les.BlockChain().GetHeaderByHash(heads.HeadBlockHash); header == nil {
+ // TODO (MariusVanDerWijden) trigger sync
+ return beacon.STATUS_SYNCING, nil
+ }
+ return beacon.STATUS_INVALID, err
+ }
+ // If the finalized block is set, check if it is in our blockchain
+ if heads.FinalizedBlockHash != (common.Hash{}) {
+ if header := api.les.BlockChain().GetHeaderByHash(heads.FinalizedBlockHash); header == nil {
+ // TODO (MariusVanDerWijden) trigger sync
+ return beacon.STATUS_SYNCING, nil
+ }
+ }
+ // SetHead
+ if err := api.setHead(heads.HeadBlockHash); err != nil {
+ return beacon.STATUS_INVALID, err
+ }
+ if payloadAttributes != nil {
+ return beacon.STATUS_INVALID, errors.New("not supported")
+ }
+ return beacon.STATUS_SUCCESS, nil
+}
+
+// GetPayloadV1 returns a cached payload by id. It's not supported in les mode.
+func (api *ConsensusAPI) GetPayloadV1(payloadID beacon.PayloadID) (*beacon.ExecutableDataV1, error) {
+ return nil, &beacon.GenericServerError
+}
+
+// ExecutePayloadV1 creates an Eth1 block, inserts it in the chain, and returns the status of the chain.
+func (api *ConsensusAPI) ExecutePayloadV1(params beacon.ExecutableDataV1) (beacon.PayloadStatusV1, error) {
+ block, err := beacon.ExecutableDataToBlock(params)
+ if err != nil {
+ return api.invalid(), err
+ }
+ if !api.les.BlockChain().HasHeader(block.ParentHash(), block.NumberU64()-1) {
+ /*
+ TODO (MariusVanDerWijden) reenable once sync is merged
+ if err := api.eth.Downloader().BeaconSync(api.eth.SyncMode(), block.Header()); err != nil {
+ return SYNCING, err
+ }
+ */
+ // TODO (MariusVanDerWijden) we should return nil here not empty hash
+ return beacon.PayloadStatusV1{Status: beacon.SYNCING, LatestValidHash: common.Hash{}}, nil
+ }
+ parent := api.les.BlockChain().GetHeaderByHash(params.ParentHash)
+ if parent == nil {
+ return api.invalid(), fmt.Errorf("could not find parent %x", params.ParentHash)
+ }
+ td := api.les.BlockChain().GetTd(parent.Hash(), block.NumberU64()-1)
+ ttd := api.les.BlockChain().Config().TerminalTotalDifficulty
+ if td.Cmp(ttd) < 0 {
+ return api.invalid(), fmt.Errorf("can not execute payload on top of block with low td got: %v threshold %v", td, ttd)
+ }
+ if err = api.les.BlockChain().InsertHeader(block.Header()); err != nil {
+ return api.invalid(), err
+ }
+ if merger := api.les.Merger(); !merger.TDDReached() {
+ merger.ReachTTD()
+ }
+ return beacon.PayloadStatusV1{Status: beacon.VALID, LatestValidHash: block.Hash()}, nil
+}
+
+// invalid returns a response "INVALID" with the latest valid hash set to the current head.
+func (api *ConsensusAPI) invalid() beacon.PayloadStatusV1 {
+ return beacon.PayloadStatusV1{Status: beacon.INVALID, LatestValidHash: api.les.BlockChain().CurrentHeader().Hash()}
+}
+
+func (api *ConsensusAPI) checkTerminalTotalDifficulty(head common.Hash) error {
+ // shortcut if we entered PoS already
+ if api.les.Merger().PoSFinalized() {
+ return nil
+ }
+ // make sure the parent has enough terminal total difficulty
+ header := api.les.BlockChain().GetHeaderByHash(head)
+ if header == nil {
+ return &beacon.GenericServerError
+ }
+ td := api.les.BlockChain().GetTd(header.Hash(), header.Number.Uint64())
+ if td != nil && td.Cmp(api.les.BlockChain().Config().TerminalTotalDifficulty) < 0 {
+ return &beacon.InvalidTB
+ }
+ return nil
+}
+
+// setHead is called to perform a force choice.
+func (api *ConsensusAPI) setHead(newHead common.Hash) error {
+ log.Info("Setting head", "head", newHead)
+
+ headHeader := api.les.BlockChain().CurrentHeader()
+ if headHeader.Hash() == newHead {
+ return nil
+ }
+ newHeadHeader := api.les.BlockChain().GetHeaderByHash(newHead)
+ if newHeadHeader == nil {
+ return &beacon.GenericServerError
+ }
+ if err := api.les.BlockChain().SetChainHead(newHeadHeader); err != nil {
+ return err
+ }
+ // Trigger the transition if it's the first `NewHead` event.
+ if merger := api.les.Merger(); !merger.PoSFinalized() {
+ merger.FinalizePoS()
+ }
+ return nil
+}
diff --git a/les/catalyst/api_test.go b/les/catalyst/api_test.go
new file mode 100644
index 000000000000..c1cbf645ccc8
--- /dev/null
+++ b/les/catalyst/api_test.go
@@ -0,0 +1,244 @@
+// Copyright 2020 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see .
+
+package catalyst
+
+import (
+ "math/big"
+ "testing"
+
+ "github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/consensus/ethash"
+ "github.com/ethereum/go-ethereum/core"
+ "github.com/ethereum/go-ethereum/core/beacon"
+ "github.com/ethereum/go-ethereum/core/rawdb"
+ "github.com/ethereum/go-ethereum/core/types"
+ "github.com/ethereum/go-ethereum/crypto"
+ "github.com/ethereum/go-ethereum/eth/downloader"
+ "github.com/ethereum/go-ethereum/eth/ethconfig"
+ "github.com/ethereum/go-ethereum/les"
+ "github.com/ethereum/go-ethereum/node"
+ "github.com/ethereum/go-ethereum/params"
+ "github.com/ethereum/go-ethereum/trie"
+)
+
+var (
+ // testKey is a private key to use for funding a tester account.
+ testKey, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291")
+
+ // testAddr is the Ethereum address of the tester account.
+ testAddr = crypto.PubkeyToAddress(testKey.PublicKey)
+
+ testBalance = big.NewInt(2e18)
+)
+
+func generatePreMergeChain(n int) (*core.Genesis, []*types.Header, []*types.Block) {
+ db := rawdb.NewMemoryDatabase()
+ config := params.AllEthashProtocolChanges
+ genesis := &core.Genesis{
+ Config: config,
+ Alloc: core.GenesisAlloc{testAddr: {Balance: testBalance}},
+ ExtraData: []byte("test genesis"),
+ Timestamp: 9000,
+ BaseFee: big.NewInt(params.InitialBaseFee),
+ }
+ gblock := genesis.ToBlock(db)
+ engine := ethash.NewFaker()
+ blocks, _ := core.GenerateChain(config, gblock, engine, db, n, nil)
+ totalDifficulty := big.NewInt(0)
+
+ var headers []*types.Header
+ for _, b := range blocks {
+ totalDifficulty.Add(totalDifficulty, b.Difficulty())
+ headers = append(headers, b.Header())
+ }
+ config.TerminalTotalDifficulty = totalDifficulty
+
+ return genesis, headers, blocks
+}
+
+func TestSetHeadBeforeTotalDifficulty(t *testing.T) {
+ genesis, headers, blocks := generatePreMergeChain(10)
+ n, lesService := startLesService(t, genesis, headers)
+ defer n.Close()
+
+ api := NewConsensusAPI(lesService)
+ fcState := beacon.ForkchoiceStateV1{
+ HeadBlockHash: blocks[5].Hash(),
+ SafeBlockHash: common.Hash{},
+ FinalizedBlockHash: common.Hash{},
+ }
+ if _, err := api.ForkchoiceUpdatedV1(fcState, nil); err == nil {
+ t.Errorf("fork choice updated before total terminal difficulty should fail")
+ }
+}
+
+func TestExecutePayloadV1(t *testing.T) {
+ genesis, headers, blocks := generatePreMergeChain(10)
+ n, lesService := startLesService(t, genesis, headers[:9])
+ lesService.Merger().ReachTTD()
+ defer n.Close()
+
+ api := NewConsensusAPI(lesService)
+ fcState := beacon.ForkchoiceStateV1{
+ HeadBlockHash: blocks[8].Hash(),
+ SafeBlockHash: common.Hash{},
+ FinalizedBlockHash: common.Hash{},
+ }
+ if _, err := api.ForkchoiceUpdatedV1(fcState, nil); err != nil {
+ t.Errorf("Failed to update head %v", err)
+ }
+ block := blocks[9]
+
+ fakeBlock := types.NewBlock(&types.Header{
+ ParentHash: block.ParentHash(),
+ UncleHash: crypto.Keccak256Hash(nil),
+ Coinbase: block.Coinbase(),
+ Root: block.Root(),
+ TxHash: crypto.Keccak256Hash(nil),
+ ReceiptHash: crypto.Keccak256Hash(nil),
+ Bloom: block.Bloom(),
+ Difficulty: big.NewInt(0),
+ Number: block.Number(),
+ GasLimit: block.GasLimit(),
+ GasUsed: block.GasUsed(),
+ Time: block.Time(),
+ Extra: block.Extra(),
+ MixDigest: block.MixDigest(),
+ Nonce: types.BlockNonce{},
+ BaseFee: block.BaseFee(),
+ }, nil, nil, nil, trie.NewStackTrie(nil))
+
+ _, err := api.ExecutePayloadV1(beacon.ExecutableDataV1{
+ ParentHash: fakeBlock.ParentHash(),
+ FeeRecipient: fakeBlock.Coinbase(),
+ StateRoot: fakeBlock.Root(),
+ ReceiptsRoot: fakeBlock.ReceiptHash(),
+ LogsBloom: fakeBlock.Bloom().Bytes(),
+ Random: fakeBlock.MixDigest(),
+ Number: fakeBlock.NumberU64(),
+ GasLimit: fakeBlock.GasLimit(),
+ GasUsed: fakeBlock.GasUsed(),
+ Timestamp: fakeBlock.Time(),
+ ExtraData: fakeBlock.Extra(),
+ BaseFeePerGas: fakeBlock.BaseFee(),
+ BlockHash: fakeBlock.Hash(),
+ Transactions: encodeTransactions(fakeBlock.Transactions()),
+ })
+ if err != nil {
+ t.Errorf("Failed to execute payload %v", err)
+ }
+ headHeader := api.les.BlockChain().CurrentHeader()
+ if headHeader.Number.Uint64() != fakeBlock.NumberU64()-1 {
+ t.Fatal("Unexpected chain head update")
+ }
+ fcState = beacon.ForkchoiceStateV1{
+ HeadBlockHash: fakeBlock.Hash(),
+ SafeBlockHash: common.Hash{},
+ FinalizedBlockHash: common.Hash{},
+ }
+ if _, err := api.ForkchoiceUpdatedV1(fcState, nil); err != nil {
+ t.Fatal("Failed to update head")
+ }
+ headHeader = api.les.BlockChain().CurrentHeader()
+ if headHeader.Number.Uint64() != fakeBlock.NumberU64() {
+ t.Fatal("Failed to update chain head")
+ }
+}
+
+func TestEth2DeepReorg(t *testing.T) {
+ // TODO (MariusVanDerWijden) TestEth2DeepReorg is currently broken, because it tries to reorg
+ // before the totalTerminalDifficulty threshold
+ /*
+ genesis, preMergeBlocks := generatePreMergeChain(core.TriesInMemory * 2)
+ n, ethservice := startEthService(t, genesis, preMergeBlocks)
+ defer n.Close()
+
+ var (
+ api = NewConsensusAPI(ethservice, nil)
+ parent = preMergeBlocks[len(preMergeBlocks)-core.TriesInMemory-1]
+ head = ethservice.BlockChain().CurrentBlock().NumberU64()
+ )
+ if ethservice.BlockChain().HasBlockAndState(parent.Hash(), parent.NumberU64()) {
+ t.Errorf("Block %d not pruned", parent.NumberU64())
+ }
+ for i := 0; i < 10; i++ {
+ execData, err := api.assembleBlock(AssembleBlockParams{
+ ParentHash: parent.Hash(),
+ Timestamp: parent.Time() + 5,
+ })
+ if err != nil {
+ t.Fatalf("Failed to create the executable data %v", err)
+ }
+ block, err := ExecutableDataToBlock(ethservice.BlockChain().Config(), parent.Header(), *execData)
+ if err != nil {
+ t.Fatalf("Failed to convert executable data to block %v", err)
+ }
+ newResp, err := api.ExecutePayload(*execData)
+ if err != nil || newResp.Status != "VALID" {
+ t.Fatalf("Failed to insert block: %v", err)
+ }
+ if ethservice.BlockChain().CurrentBlock().NumberU64() != head {
+ t.Fatalf("Chain head shouldn't be updated")
+ }
+ if err := api.setHead(block.Hash()); err != nil {
+ t.Fatalf("Failed to set head: %v", err)
+ }
+ if ethservice.BlockChain().CurrentBlock().NumberU64() != block.NumberU64() {
+ t.Fatalf("Chain head should be updated")
+ }
+ parent, head = block, block.NumberU64()
+ }
+ */
+}
+
+// startEthService creates a full node instance for testing.
+func startLesService(t *testing.T, genesis *core.Genesis, headers []*types.Header) (*node.Node, *les.LightEthereum) {
+ t.Helper()
+
+ n, err := node.New(&node.Config{})
+ if err != nil {
+ t.Fatal("can't create node:", err)
+ }
+ ethcfg := ðconfig.Config{
+ Genesis: genesis,
+ Ethash: ethash.Config{PowMode: ethash.ModeFake},
+ SyncMode: downloader.LightSync,
+ TrieDirtyCache: 256,
+ TrieCleanCache: 256,
+ LightPeers: 10,
+ }
+ lesService, err := les.New(n, ethcfg)
+ if err != nil {
+ t.Fatal("can't create eth service:", err)
+ }
+ if err := n.Start(); err != nil {
+ t.Fatal("can't start node:", err)
+ }
+ if _, err := lesService.BlockChain().InsertHeaderChain(headers, 0); err != nil {
+ n.Close()
+ t.Fatal("can't import test headers:", err)
+ }
+ return n, lesService
+}
+
+func encodeTransactions(txs []*types.Transaction) [][]byte {
+ var enc = make([][]byte, len(txs))
+ for i, tx := range txs {
+ enc[i], _ = tx.MarshalBinary()
+ }
+ return enc
+}
diff --git a/miner/stress/beacon/main.go b/miner/stress/beacon/main.go
index 6a6a0a7222f9..5c3d29919c25 100644
--- a/miner/stress/beacon/main.go
+++ b/miner/stress/beacon/main.go
@@ -32,13 +32,15 @@ import (
"github.com/ethereum/go-ethereum/common/fdlimit"
"github.com/ethereum/go-ethereum/consensus/ethash"
"github.com/ethereum/go-ethereum/core"
+ "github.com/ethereum/go-ethereum/core/beacon"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/eth"
- "github.com/ethereum/go-ethereum/eth/catalyst"
+ fcatalyst "github.com/ethereum/go-ethereum/eth/catalyst"
"github.com/ethereum/go-ethereum/eth/downloader"
"github.com/ethereum/go-ethereum/eth/ethconfig"
"github.com/ethereum/go-ethereum/les"
+ lcatalyst "github.com/ethereum/go-ethereum/les/catalyst"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/miner"
"github.com/ethereum/go-ethereum/node"
@@ -88,24 +90,26 @@ var (
type ethNode struct {
typ nodetype
- api *catalyst.ConsensusAPI
- ethBackend *eth.Ethereum
- lesBackend *les.LightEthereum
stack *node.Node
enode *enode.Node
+ api *fcatalyst.ConsensusAPI
+ ethBackend *eth.Ethereum
+ lapi *lcatalyst.ConsensusAPI
+ lesBackend *les.LightEthereum
}
func newNode(typ nodetype, genesis *core.Genesis, enodes []*enode.Node) *ethNode {
var (
err error
- api *catalyst.ConsensusAPI
+ api *fcatalyst.ConsensusAPI
+ lapi *lcatalyst.ConsensusAPI
stack *node.Node
ethBackend *eth.Ethereum
lesBackend *les.LightEthereum
)
// Start the node and wait until it's up
if typ == eth2LightClient {
- stack, lesBackend, api, err = makeLightNode(genesis)
+ stack, lesBackend, lapi, err = makeLightNode(genesis)
} else {
stack, ethBackend, api, err = makeFullNode(genesis)
}
@@ -131,13 +135,14 @@ func newNode(typ nodetype, genesis *core.Genesis, enodes []*enode.Node) *ethNode
typ: typ,
api: api,
ethBackend: ethBackend,
+ lapi: lapi,
lesBackend: lesBackend,
stack: stack,
enode: enode,
}
}
-func (n *ethNode) assembleBlock(parentHash common.Hash, parentTimestamp uint64) (*catalyst.ExecutableDataV1, error) {
+func (n *ethNode) assembleBlock(parentHash common.Hash, parentTimestamp uint64) (*beacon.ExecutableDataV1, error) {
if n.typ != eth2MiningNode {
return nil, errors.New("invalid node type")
}
@@ -145,12 +150,12 @@ func (n *ethNode) assembleBlock(parentHash common.Hash, parentTimestamp uint64)
if timestamp <= parentTimestamp {
timestamp = parentTimestamp + 1
}
- payloadAttribute := catalyst.PayloadAttributesV1{
+ payloadAttribute := beacon.PayloadAttributesV1{
Timestamp: timestamp,
Random: common.Hash{},
SuggestedFeeRecipient: common.HexToAddress("0xdeadbeef"),
}
- fcState := catalyst.ForkchoiceStateV1{
+ fcState := beacon.ForkchoiceStateV1{
HeadBlockHash: parentHash,
SafeBlockHash: common.Hash{},
FinalizedBlockHash: common.Hash{},
@@ -162,39 +167,62 @@ func (n *ethNode) assembleBlock(parentHash common.Hash, parentTimestamp uint64)
return n.api.GetPayloadV1(*payload.PayloadID)
}
-func (n *ethNode) insertBlock(eb catalyst.ExecutableDataV1) error {
+func (n *ethNode) insertBlock(eb beacon.ExecutableDataV1) error {
if !eth2types(n.typ) {
return errors.New("invalid node type")
}
- newResp, err := n.api.ExecutePayloadV1(eb)
- if err != nil {
- return err
- } else if newResp.Status != "VALID" {
- return errors.New("failed to insert block")
+ switch n.typ {
+ case eth2NormalNode, eth2MiningNode:
+ newResp, err := n.api.NewPayloadV1(eb)
+ if err != nil {
+ return err
+ } else if newResp.Status != "VALID" {
+ return errors.New("failed to insert block")
+ }
+ return nil
+ case eth2LightClient:
+ newResp, err := n.lapi.ExecutePayloadV1(eb)
+ if err != nil {
+ return err
+ } else if newResp.Status != "VALID" {
+ return errors.New("failed to insert block")
+ }
+ return nil
+ default:
+ return errors.New("undefined node")
}
- return nil
}
-func (n *ethNode) insertBlockAndSetHead(parent *types.Header, ed catalyst.ExecutableDataV1) error {
+func (n *ethNode) insertBlockAndSetHead(parent *types.Header, ed beacon.ExecutableDataV1) error {
if !eth2types(n.typ) {
return errors.New("invalid node type")
}
if err := n.insertBlock(ed); err != nil {
return err
}
- block, err := catalyst.ExecutableDataToBlock(ed)
+ block, err := beacon.ExecutableDataToBlock(ed)
if err != nil {
return err
}
- fcState := catalyst.ForkchoiceStateV1{
+ fcState := beacon.ForkchoiceStateV1{
HeadBlockHash: block.ParentHash(),
SafeBlockHash: common.Hash{},
FinalizedBlockHash: common.Hash{},
}
- if _, err := n.api.ForkchoiceUpdatedV1(fcState, nil); err != nil {
- return err
+ switch n.typ {
+ case eth2NormalNode, eth2MiningNode:
+ if _, err := n.api.ForkchoiceUpdatedV1(fcState, nil); err != nil {
+ return err
+ }
+ return nil
+ case eth2LightClient:
+ if _, err := n.lapi.ForkchoiceUpdatedV1(fcState, nil); err != nil {
+ return err
+ }
+ return nil
+ default:
+ return errors.New("undefined node")
}
- return nil
}
type nodeManager struct {
@@ -290,7 +318,7 @@ func (mgr *nodeManager) run() {
nodes = append(nodes, mgr.getNodes(eth2NormalNode)...)
nodes = append(nodes, mgr.getNodes(eth2LightClient)...)
for _, node := range append(nodes) {
- fcState := catalyst.ForkchoiceStateV1{
+ fcState := beacon.ForkchoiceStateV1{
HeadBlockHash: oldest.Hash(),
SafeBlockHash: common.Hash{},
FinalizedBlockHash: oldest.Hash(),
@@ -336,20 +364,16 @@ func (mgr *nodeManager) run() {
log.Error("Failed to assemble the block", "err", err)
continue
}
- block, _ := catalyst.ExecutableDataToBlock(*ed)
+ block, _ := beacon.ExecutableDataToBlock(*ed)
nodes := mgr.getNodes(eth2MiningNode)
nodes = append(nodes, mgr.getNodes(eth2NormalNode)...)
+ nodes = append(nodes, mgr.getNodes(eth2LightClient)...)
for _, node := range nodes {
if err := node.insertBlockAndSetHead(parentBlock.Header(), *ed); err != nil {
log.Error("Failed to insert block", "type", node.typ, "err", err)
}
}
- for _, node := range mgr.getNodes(eth2LightClient) {
- if err := node.insertBlock(*ed); err != nil {
- log.Error("Failed to insert block", "type", node.typ, "err", err)
- }
- }
log.Info("Create and insert eth2 block", "number", ed.Number)
parentBlock = block
waitFinalise = append(waitFinalise, block)
@@ -435,7 +459,7 @@ func makeGenesis(faucets []*ecdsa.PrivateKey) *core.Genesis {
return genesis
}
-func makeFullNode(genesis *core.Genesis) (*node.Node, *eth.Ethereum, *catalyst.ConsensusAPI, error) {
+func makeFullNode(genesis *core.Genesis) (*node.Node, *eth.Ethereum, *fcatalyst.ConsensusAPI, error) {
// Define the basic configurations for the Ethereum node
datadir, _ := ioutil.TempDir("", "")
@@ -483,10 +507,10 @@ func makeFullNode(genesis *core.Genesis) (*node.Node, *eth.Ethereum, *catalyst.C
log.Crit("Failed to create the LES server", "err", err)
}
err = stack.Start()
- return stack, ethBackend, catalyst.NewConsensusAPI(ethBackend, nil), err
+ return stack, ethBackend, fcatalyst.NewConsensusAPI(ethBackend), err
}
-func makeLightNode(genesis *core.Genesis) (*node.Node, *les.LightEthereum, *catalyst.ConsensusAPI, error) {
+func makeLightNode(genesis *core.Genesis) (*node.Node, *les.LightEthereum, *lcatalyst.ConsensusAPI, error) {
// Define the basic configurations for the Ethereum node
datadir, _ := ioutil.TempDir("", "")
@@ -521,7 +545,7 @@ func makeLightNode(genesis *core.Genesis) (*node.Node, *les.LightEthereum, *cata
return nil, nil, nil, err
}
err = stack.Start()
- return stack, lesBackend, catalyst.NewConsensusAPI(nil, lesBackend), err
+ return stack, lesBackend, lcatalyst.NewConsensusAPI(lesBackend), err
}
func eth2types(typ nodetype) bool {