diff --git a/cmd/evm/internal/t8ntool/execution.go b/cmd/evm/internal/t8ntool/execution.go index dfdde4217396..28af534ec0bb 100644 --- a/cmd/evm/internal/t8ntool/execution.go +++ b/cmd/evm/internal/t8ntool/execution.go @@ -67,6 +67,7 @@ type ommer struct { type stEnv struct { Coinbase common.Address `json:"currentCoinbase" gencodec:"required"` Difficulty *big.Int `json:"currentDifficulty"` + Random *big.Int `json:"currentRandom"` ParentDifficulty *big.Int `json:"parentDifficulty"` GasLimit uint64 `json:"currentGasLimit" gencodec:"required"` Number uint64 `json:"currentNumber" gencodec:"required"` @@ -81,6 +82,7 @@ type stEnv struct { type stEnvMarshaling struct { Coinbase common.UnprefixedAddress Difficulty *math.HexOrDecimal256 + Random *math.HexOrDecimal256 ParentDifficulty *math.HexOrDecimal256 GasLimit math.HexOrDecimal64 Number math.HexOrDecimal64 diff --git a/cmd/evm/internal/t8ntool/gen_stenv.go b/cmd/evm/internal/t8ntool/gen_stenv.go index 1bb3c6a46b0c..a6d774cdabcf 100644 --- a/cmd/evm/internal/t8ntool/gen_stenv.go +++ b/cmd/evm/internal/t8ntool/gen_stenv.go @@ -18,6 +18,7 @@ func (s stEnv) MarshalJSON() ([]byte, error) { type stEnv struct { Coinbase common.UnprefixedAddress `json:"currentCoinbase" gencodec:"required"` Difficulty *math.HexOrDecimal256 `json:"currentDifficulty"` + Random *math.HexOrDecimal256 `json:"currentRandom"` ParentDifficulty *math.HexOrDecimal256 `json:"parentDifficulty"` GasLimit math.HexOrDecimal64 `json:"currentGasLimit" gencodec:"required"` Number math.HexOrDecimal64 `json:"currentNumber" gencodec:"required"` @@ -31,6 +32,7 @@ func (s stEnv) MarshalJSON() ([]byte, error) { var enc stEnv enc.Coinbase = common.UnprefixedAddress(s.Coinbase) enc.Difficulty = (*math.HexOrDecimal256)(s.Difficulty) + enc.Random = (*math.HexOrDecimal256)(s.Random) enc.ParentDifficulty = (*math.HexOrDecimal256)(s.ParentDifficulty) enc.GasLimit = math.HexOrDecimal64(s.GasLimit) enc.Number = math.HexOrDecimal64(s.Number) @@ -48,6 +50,7 @@ func (s *stEnv) UnmarshalJSON(input []byte) error { type stEnv struct { Coinbase *common.UnprefixedAddress `json:"currentCoinbase" gencodec:"required"` Difficulty *math.HexOrDecimal256 `json:"currentDifficulty"` + Random *math.HexOrDecimal256 `json:"currentRandom"` ParentDifficulty *math.HexOrDecimal256 `json:"parentDifficulty"` GasLimit *math.HexOrDecimal64 `json:"currentGasLimit" gencodec:"required"` Number *math.HexOrDecimal64 `json:"currentNumber" gencodec:"required"` @@ -69,6 +72,9 @@ func (s *stEnv) UnmarshalJSON(input []byte) error { if dec.Difficulty != nil { s.Difficulty = (*big.Int)(dec.Difficulty) } + if dec.Random != nil { + s.Random = (*big.Int)(dec.Random) + } if dec.ParentDifficulty != nil { s.ParentDifficulty = (*big.Int)(dec.ParentDifficulty) } diff --git a/cmd/geth/config.go b/cmd/geth/config.go index 18829bade932..7a642edd0e41 100644 --- a/cmd/geth/config.go +++ b/cmd/geth/config.go @@ -159,7 +159,7 @@ func makeFullNode(ctx *cli.Context) (*node.Node, ethapi.Backend) { cfg.Eth.OverrideArrowGlacier = new(big.Int).SetUint64(ctx.GlobalUint64(utils.OverrideArrowGlacierFlag.Name)) } if ctx.GlobalIsSet(utils.OverrideTerminalTotalDifficulty.Name) { - cfg.Eth.Genesis.Config.TerminalTotalDifficulty = new(big.Int).SetUint64(ctx.GlobalUint64(utils.OverrideTerminalTotalDifficulty.Name)) + cfg.Eth.OverrideTerminalTotalDifficulty = new(big.Int).SetUint64(ctx.GlobalUint64(utils.OverrideTerminalTotalDifficulty.Name)) } backend, _ := utils.RegisterEthService(stack, &cfg.Eth, ctx.GlobalBool(utils.CatalystFlag.Name)) diff --git a/consensus/beacon/consensus.go b/consensus/beacon/consensus.go index d90406438558..507a5fe6b3eb 100644 --- a/consensus/beacon/consensus.go +++ b/consensus/beacon/consensus.go @@ -181,10 +181,7 @@ func (beacon *Beacon) verifyHeader(chain consensus.ChainHeaderReader, header, pa if len(header.Extra) > 32 { return fmt.Errorf("extra-data longer than 32 bytes (%d)", len(header.Extra)) } - // Verify the seal parts. Ensure the mixhash, nonce and uncle hash are the expected value. - if header.MixDigest != (common.Hash{}) { - return errInvalidMixDigest - } + // Verify the seal parts. Ensure the nonce and uncle hash are the expected value. if header.Nonce != beaconNonce { return errInvalidNonce } diff --git a/core/blockchain.go b/core/blockchain.go index 68388ec40a61..815cafc411d6 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -31,6 +31,7 @@ import ( "github.com/ethereum/go-ethereum/common/mclock" "github.com/ethereum/go-ethereum/common/prque" "github.com/ethereum/go-ethereum/consensus" + "github.com/ethereum/go-ethereum/consensus/beacon" "github.com/ethereum/go-ethereum/core/rawdb" "github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/state/snapshot" @@ -1358,6 +1359,29 @@ func (bc *BlockChain) InsertChain(chain types.Blocks) (int, error) { prev.Hash().Bytes()[:4], i, block.NumberU64(), block.Hash().Bytes()[:4], block.ParentHash().Bytes()[:4]) } } + if chain[len(chain)-1].Difficulty().Cmp(common.Big0) == 0 { + start := 0 + for i := 0; i < len(chain); i++ { + if chain[i].Difficulty().Cmp(common.Big0) != 0 { + start++ + } + } + if start != 0 { + newChain := types.Blocks(chain[0:start]) + if in, err := bc.insertChain(newChain, true, true); err != nil { + return in, err + } + } + for i := start; i < len(chain); i++ { + conf := bc.GetVMConfig() + conf.RandomOpcode = true + bc.SetVMConfig(*conf) + if err := bc.InsertBlockWithoutSetHead(chain[i]); err != nil { + return len(chain), err + } + } + return len(chain), nil + } // Pre-checks passed, start the full block imports if !bc.chainmu.TryLock() { return 0, errChainStopped @@ -2258,6 +2282,16 @@ func (bc *BlockChain) reportBlock(block *types.Block, receipts types.Receipts, e i, receipt.CumulativeGasUsed, receipt.GasUsed, receipt.ContractAddress.Hex(), receipt.Status, receipt.TxHash.Hex(), receipt.Logs, receipt.Bloom, receipt.PostState) } + if eng, ok := bc.engine.(*beacon.Beacon); ok { + if eng.IsPoSHeader(block.Header()) { + fmt.Println("PoSHeader") + } + if reached, err := beacon.IsTTDReached(bc, block.ParentHash(), block.NumberU64()-1); reached { + fmt.Println("TTDD reached") + } else if err != nil { + fmt.Printf("TTDReached error: %v\n", err) + } + } log.Error(fmt.Sprintf(` ########## BAD BLOCK ######### Chain config: %v diff --git a/core/blockchain_reader.go b/core/blockchain_reader.go index beaa57b0c11b..d43087e5778d 100644 --- a/core/blockchain_reader.go +++ b/core/blockchain_reader.go @@ -343,6 +343,12 @@ func (bc *BlockChain) GetVMConfig() *vm.Config { return &bc.vmConfig } +// SetVMConfig sets the vm config. +// Warning: might not be threadsafe with other components +func (bc *BlockChain) SetVMConfig(config vm.Config) { + bc.vmConfig = config +} + // SetTxLookupLimit is responsible for updating the txlookup limit to the // original one stored in db if the new mismatches with the old one. func (bc *BlockChain) SetTxLookupLimit(limit uint64) { diff --git a/core/evm.go b/core/evm.go index 6c67fc43762c..edbe7b7a4b48 100644 --- a/core/evm.go +++ b/core/evm.go @@ -61,6 +61,7 @@ func NewEVMBlockContext(header *types.Header, chain ChainContext, author *common Difficulty: new(big.Int).Set(header.Difficulty), BaseFee: baseFee, GasLimit: header.GasLimit, + Random: header.MixDigest, } } diff --git a/core/genesis.go b/core/genesis.go index 557440d08aa1..fda36b48991f 100644 --- a/core/genesis.go +++ b/core/genesis.go @@ -294,7 +294,8 @@ func (g *Genesis) ToBlock(db ethdb.Database) *types.Block { if g.GasLimit == 0 { head.GasLimit = params.GenesisGasLimit } - if g.Difficulty == nil { + emptyHash := common.Hash{} + if g.Difficulty == nil && bytes.Equal(g.Mixhash[:], emptyHash[:]) { head.Difficulty = params.GenesisDifficulty } if g.Config != nil && g.Config.IsLondon(common.Big0) { diff --git a/core/rawdb/accessors_sync.go b/core/rawdb/accessors_sync.go new file mode 100644 index 000000000000..50dfb848e4e0 --- /dev/null +++ b/core/rawdb/accessors_sync.go @@ -0,0 +1,80 @@ +// Copyright 2021 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package rawdb + +import ( + "bytes" + + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/ethdb" + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/rlp" +) + +// ReadSkeletonSyncStatus retrieves the serialized sync status saved at shutdown. +func ReadSkeletonSyncStatus(db ethdb.KeyValueReader) []byte { + data, _ := db.Get(skeletonSyncStatusKey) + return data +} + +// WriteSkeletonSyncStatus stores the serialized sync status to save at shutdown. +func WriteSkeletonSyncStatus(db ethdb.KeyValueWriter, status []byte) { + if err := db.Put(skeletonSyncStatusKey, status); err != nil { + log.Crit("Failed to store skeleton sync status", "err", err) + } +} + +// DeleteSkeletonSyncStatus deletes the serialized sync status saved at the last +// shutdown +func DeleteSkeletonSyncStatus(db ethdb.KeyValueWriter) { + if err := db.Delete(skeletonSyncStatusKey); err != nil { + log.Crit("Failed to remove skeleton sync status", "err", err) + } +} + +// ReadSkeletonHeader retrieves a block header from the skeleton sync store, +func ReadSkeletonHeader(db ethdb.KeyValueReader, number uint64) *types.Header { + data, _ := db.Get(skeletonHeaderKey(number)) + if len(data) == 0 { + return nil + } + header := new(types.Header) + if err := rlp.Decode(bytes.NewReader(data), header); err != nil { + log.Error("Invalid skeleton header RLP", "number", number, "err", err) + return nil + } + return header +} + +// WriteSkeletonHeader stores a block header into the skeleton sync store. +func WriteSkeletonHeader(db ethdb.KeyValueWriter, header *types.Header) { + data, err := rlp.EncodeToBytes(header) + if err != nil { + log.Crit("Failed to RLP encode header", "err", err) + } + key := skeletonHeaderKey(header.Number.Uint64()) + if err := db.Put(key, data); err != nil { + log.Crit("Failed to store skeleton header", "err", err) + } +} + +// DeleteSkeletonHeader removes all block header data associated with a hash. +func DeleteSkeletonHeader(db ethdb.KeyValueWriter, number uint64) { + if err := db.Delete(skeletonHeaderKey(number)); err != nil { + log.Crit("Failed to delete skeleton header", "err", err) + } +} diff --git a/core/rawdb/schema.go b/core/rawdb/schema.go index b35fcba45f79..f869f36e1040 100644 --- a/core/rawdb/schema.go +++ b/core/rawdb/schema.go @@ -63,6 +63,9 @@ var ( // snapshotSyncStatusKey tracks the snapshot sync status across restarts. snapshotSyncStatusKey = []byte("SnapshotSyncStatus") + // skeletonSyncStatusKey tracks the skeleton sync status across restarts. + skeletonSyncStatusKey = []byte("SkeletonSyncStatus") + // txIndexTailKey tracks the oldest block whose transactions have been indexed. txIndexTailKey = []byte("TransactionIndexTail") @@ -92,6 +95,7 @@ var ( SnapshotAccountPrefix = []byte("a") // SnapshotAccountPrefix + account hash -> account trie value SnapshotStoragePrefix = []byte("o") // SnapshotStoragePrefix + account hash + storage hash -> storage trie value CodePrefix = []byte("c") // CodePrefix + code hash -> account code + skeletonHeaderPrefix = []byte("S") // skeletonHeaderPrefox + num (uint64 big endian) -> header PreimagePrefix = []byte("secure-key-") // PreimagePrefix + hash -> preimage configPrefix = []byte("ethereum-config-") // config prefix for the db @@ -210,6 +214,11 @@ func bloomBitsKey(bit uint, section uint64, hash common.Hash) []byte { return key } +// skeletonHeaderKey = skeletonHeaderPrefix + num (uint64 big endian) +func skeletonHeaderKey(number uint64) []byte { + return append(skeletonHeaderPrefix, encodeBlockNumber(number)...) +} + // preimageKey = PreimagePrefix + hash func preimageKey(hash common.Hash) []byte { return append(PreimagePrefix, hash.Bytes()...) diff --git a/core/state_transition.go b/core/state_transition.go index 135a9c6dbe85..701919d774fb 100644 --- a/core/state_transition.go +++ b/core/state_transition.go @@ -310,7 +310,7 @@ func (st *StateTransition) TransitionDb() (*ExecutionResult, error) { } // Set up the initial access list. - if rules := st.evm.ChainConfig().Rules(st.evm.Context.BlockNumber); rules.IsBerlin { + if rules := st.evm.ChainConfig().Rules(st.evm.Context.BlockNumber, st.evm.Config.RandomOpcode); rules.IsBerlin { st.state.PrepareAccessList(msg.From(), msg.To(), vm.ActivePrecompiles(rules), msg.AccessList()) } var ( diff --git a/core/vm/evm.go b/core/vm/evm.go index 618bbcf176cb..1e8d89f60ed2 100644 --- a/core/vm/evm.go +++ b/core/vm/evm.go @@ -75,6 +75,7 @@ type BlockContext struct { Time *big.Int // Provides information for TIME Difficulty *big.Int // Provides information for DIFFICULTY BaseFee *big.Int // Provides information for BASEFEE + Random common.Hash // Provides information for RANDOM } // TxContext provides the EVM with information about a transaction. @@ -131,7 +132,7 @@ func NewEVM(blockCtx BlockContext, txCtx TxContext, statedb StateDB, chainConfig StateDB: statedb, Config: config, chainConfig: chainConfig, - chainRules: chainConfig.Rules(blockCtx.BlockNumber), + chainRules: chainConfig.Rules(blockCtx.BlockNumber, config.RandomOpcode), } evm.interpreter = NewEVMInterpreter(evm, config) return evm diff --git a/core/vm/instructions.go b/core/vm/instructions.go index bda480f083d4..3601d0a24292 100644 --- a/core/vm/instructions.go +++ b/core/vm/instructions.go @@ -475,6 +475,12 @@ func opDifficulty(pc *uint64, interpreter *EVMInterpreter, scope *ScopeContext) return nil, nil } +func opRandom(pc *uint64, interpreter *EVMInterpreter, scope *ScopeContext) ([]byte, error) { + v := new(uint256.Int).SetBytes((interpreter.evm.Context.Random.Bytes())) + scope.Stack.push(v) + return nil, nil +} + func opGasLimit(pc *uint64, interpreter *EVMInterpreter, scope *ScopeContext) ([]byte, error) { scope.Stack.push(new(uint256.Int).SetUint64(interpreter.evm.Context.GasLimit)) return nil, nil diff --git a/core/vm/instructions_test.go b/core/vm/instructions_test.go index 560d26a0b8d4..2033a9ad1b7c 100644 --- a/core/vm/instructions_test.go +++ b/core/vm/instructions_test.go @@ -21,6 +21,7 @@ import ( "encoding/json" "fmt" "io/ioutil" + "math/big" "testing" "github.com/ethereum/go-ethereum/common" @@ -650,3 +651,36 @@ func TestCreate2Addreses(t *testing.T) { } } } + +func TestRandom(t *testing.T) { + type testcase struct { + name string + random common.Hash + } + + for _, tt := range []testcase{ + {name: "empty hash", random: common.Hash{}}, + {name: "1", random: common.Hash{0}}, + {name: "emptyCodeHash", random: emptyCodeHash}, + {name: "hash(0x010203)", random: crypto.Keccak256Hash([]byte{0x01, 0x02, 0x03})}, + } { + var ( + env = NewEVM(BlockContext{Random: tt.random}, TxContext{}, nil, params.TestChainConfig, Config{RandomOpcode: true}) + stack = newstack() + pc = uint64(0) + evmInterpreter = env.interpreter + ) + opRandom(&pc, evmInterpreter, &ScopeContext{nil, stack, nil}) + if len(stack.data) != 1 { + t.Errorf("Expected one item on stack after %v, got %d: ", tt.name, len(stack.data)) + } + actual := stack.pop() + expected, overflow := uint256.FromBig(new(big.Int).SetBytes(tt.random.Bytes())) + if overflow { + t.Errorf("Testcase %v: invalid overflow", tt.name) + } + if actual.Cmp(expected) != 0 { + t.Errorf("Testcase %v: expected %x, got %x", tt.name, expected, actual) + } + } +} diff --git a/core/vm/interpreter.go b/core/vm/interpreter.go index afa3c0f12118..51fb2ff7abfb 100644 --- a/core/vm/interpreter.go +++ b/core/vm/interpreter.go @@ -31,6 +31,7 @@ type Config struct { Tracer EVMLogger // Opcode logger NoRecursion bool // Disables call, callcode, delegate call and create NoBaseFee bool // Forces the EIP-1559 baseFee to 0 (needed for 0 price calls) + RandomOpcode bool // Enables the random opcode EnablePreimageRecording bool // Enables recording of SHA3/keccak preimages JumpTable JumpTable // EVM instruction table, automatically populated if unset @@ -74,6 +75,8 @@ func NewEVMInterpreter(evm *EVM, cfg Config) *EVMInterpreter { if cfg.JumpTable[STOP] == nil { var jt JumpTable switch { + case evm.chainRules.IsMerge: + jt = newMergeInstructionSet() case evm.chainRules.IsLondon: jt = londonInstructionSet case evm.chainRules.IsBerlin: diff --git a/core/vm/jump_table.go b/core/vm/jump_table.go index 329ad77cbf83..282e4541b7c4 100644 --- a/core/vm/jump_table.go +++ b/core/vm/jump_table.go @@ -58,11 +58,23 @@ var ( istanbulInstructionSet = newIstanbulInstructionSet() berlinInstructionSet = newBerlinInstructionSet() londonInstructionSet = newLondonInstructionSet() + mergeInstructionSet = newMergeInstructionSet() ) // JumpTable contains the EVM opcodes supported at a given fork. type JumpTable [256]*operation +func newMergeInstructionSet() JumpTable { + instructionSet := newLondonInstructionSet() + instructionSet[RANDOM] = &operation{ + execute: opRandom, + constantGas: GasQuickStep, + minStack: minStack(0, 1), + maxStack: maxStack(0, 1), + } + return instructionSet +} + // newLondonInstructionSet returns the frontier, homestead, byzantium, // contantinople, istanbul, petersburg, berlin and london instructions. func newLondonInstructionSet() JumpTable { diff --git a/core/vm/opcodes.go b/core/vm/opcodes.go index 6659ec401ff1..993157e07e8e 100644 --- a/core/vm/opcodes.go +++ b/core/vm/opcodes.go @@ -104,6 +104,7 @@ const ( CHAINID OpCode = 0x46 SELFBALANCE OpCode = 0x47 BASEFEE OpCode = 0x48 + RANDOM OpCode = 0x44 // Same as DIFFICULTY ) // 0x50 range - 'storage' and execution. @@ -279,7 +280,7 @@ var opCodeToString = map[OpCode]string{ COINBASE: "COINBASE", TIMESTAMP: "TIMESTAMP", NUMBER: "NUMBER", - DIFFICULTY: "DIFFICULTY", + DIFFICULTY: "DIFFICULTY", // TODO (MariusVanDerWijden) rename to RANDOM post merge GASLIMIT: "GASLIMIT", CHAINID: "CHAINID", SELFBALANCE: "SELFBALANCE", diff --git a/core/vm/runtime/runtime.go b/core/vm/runtime/runtime.go index 103ce3e175ff..c197d0db1000 100644 --- a/core/vm/runtime/runtime.go +++ b/core/vm/runtime/runtime.go @@ -118,7 +118,7 @@ func Execute(code, input []byte, cfg *Config) ([]byte, *state.StateDB, error) { vmenv = NewEnv(cfg) sender = vm.AccountRef(cfg.Origin) ) - if rules := cfg.ChainConfig.Rules(vmenv.Context.BlockNumber); rules.IsBerlin { + if rules := cfg.ChainConfig.Rules(vmenv.Context.BlockNumber, false); rules.IsBerlin { cfg.State.PrepareAccessList(cfg.Origin, &address, vm.ActivePrecompiles(rules), nil) } cfg.State.CreateAccount(address) @@ -150,7 +150,7 @@ func Create(input []byte, cfg *Config) ([]byte, common.Address, uint64, error) { vmenv = NewEnv(cfg) sender = vm.AccountRef(cfg.Origin) ) - if rules := cfg.ChainConfig.Rules(vmenv.Context.BlockNumber); rules.IsBerlin { + if rules := cfg.ChainConfig.Rules(vmenv.Context.BlockNumber, false); rules.IsBerlin { cfg.State.PrepareAccessList(cfg.Origin, nil, vm.ActivePrecompiles(rules), nil) } // Call the code with the given configuration. @@ -176,7 +176,7 @@ func Call(address common.Address, input []byte, cfg *Config) ([]byte, uint64, er sender := cfg.State.GetOrNewStateObject(cfg.Origin) statedb := cfg.State - if rules := cfg.ChainConfig.Rules(vmenv.Context.BlockNumber); rules.IsBerlin { + if rules := cfg.ChainConfig.Rules(vmenv.Context.BlockNumber, false); rules.IsBerlin { statedb.PrepareAccessList(cfg.Origin, &address, vm.ActivePrecompiles(rules), nil) } // Call the code with the given configuration. diff --git a/eth/api.go b/eth/api.go index f81dfa922b7a..bc2a615ad1dd 100644 --- a/eth/api.go +++ b/eth/api.go @@ -257,6 +257,16 @@ func (api *PrivateAdminAPI) ImportChain(file string) (bool, error) { return true, nil } +// NewHead requests the node to beacon-sync to the designated head header. +func (api *PrivateAdminAPI) NewHead(blob hexutil.Bytes) error { + header := new(types.Header) + if err := rlp.DecodeBytes(blob, header); err != nil { + return err + } + mode, _ := api.eth.handler.chainSync.modeAndLocalHead() + return api.eth.Downloader().BeaconSync(mode, header) +} + // PublicDebugAPI is the collection of Ethereum full node APIs exposed // over the public debugging endpoint. type PublicDebugAPI struct { diff --git a/eth/catalyst/api.go b/eth/catalyst/api.go index 059df3670286..4e6be1bce7b9 100644 --- a/eth/catalyst/api.go +++ b/eth/catalyst/api.go @@ -18,7 +18,8 @@ package catalyst import ( - "bytes" + "crypto/sha256" + "encoding/binary" "errors" "fmt" "math/big" @@ -42,11 +43,13 @@ import ( ) var ( - VALID = GenericStringResponse{"VALID"} - INVALID = GenericStringResponse{"INVALID"} - SYNCING = GenericStringResponse{"SYNCING"} - UnknownHeader = rpc.CustomError{Code: -32000, Message: "unknown header"} - UnknownPayload = rpc.CustomError{Code: -32001, Message: "unknown payload"} + VALID = GenericStringResponse{"VALID"} + SUCCESS = GenericStringResponse{"SUCCESS"} + INVALID = ForkChoiceResponse{Status: "INVALID", PayloadID: nil} + SYNCING = ForkChoiceResponse{Status: "INVALID", PayloadID: nil} + UnknownHeader = rpc.CustomError{Code: -32000, Message: "unknown header"} + UnknownPayload = rpc.CustomError{Code: -32001, Message: "unknown payload"} + InvalidPayloadID = rpc.CustomError{Code: 1, Message: "invalid payload id"} ) // Register adds catalyst APIs to the full node. @@ -82,7 +85,7 @@ type ConsensusAPI struct { eth *eth.Ethereum les *les.LightEthereum engine consensus.Engine // engine is the post-merge consensus engine, only for block creation - preparedBlocks map[int]*ExecutableData + preparedBlocks map[uint64]*ExecutableDataV1 } func NewConsensusAPI(eth *eth.Ethereum, les *les.LightEthereum) *ConsensusAPI { @@ -111,7 +114,7 @@ func NewConsensusAPI(eth *eth.Ethereum, les *les.LightEthereum) *ConsensusAPI { eth: eth, les: les, engine: engine, - preparedBlocks: make(map[int]*ExecutableData), + preparedBlocks: make(map[uint64]*ExecutableDataV1), } } @@ -130,6 +133,7 @@ type blockExecutionEnv struct { func (env *blockExecutionEnv) commitTransaction(tx *types.Transaction, coinbase common.Address) error { vmconfig := *env.chain.GetVMConfig() + vmconfig.RandomOpcode = true snap := env.state.Snapshot() receipt, err := core.ApplyTransaction(env.chain.Config(), env.chain, &coinbase, env.gasPool, env.state, env.header, tx, &env.header.GasUsed, vmconfig) if err != nil { @@ -171,103 +175,135 @@ func (api *ConsensusAPI) makeEnv(parent *types.Block, header *types.Header) (*bl return env, nil } -func (api *ConsensusAPI) PreparePayload(params AssembleBlockParams) (*PayloadResponse, error) { - data, err := api.assembleBlock(params) - if err != nil { - return nil, err +func (api *ConsensusAPI) GetPayloadV1(payloadID hexutil.Bytes) (*ExecutableDataV1, error) { + hash := []byte(payloadID) + if len(hash) < 8 { + return nil, &InvalidPayloadID } - id := len(api.preparedBlocks) - api.preparedBlocks[id] = data - return &PayloadResponse{PayloadID: uint64(id)}, nil -} - -func (api *ConsensusAPI) GetPayload(PayloadID hexutil.Uint64) (*ExecutableData, error) { - data, ok := api.preparedBlocks[int(PayloadID)] + id := binary.BigEndian.Uint64(hash[:8]) + data, ok := api.preparedBlocks[id] if !ok { return nil, &UnknownPayload } return data, nil } -// ConsensusValidated is called to mark a block as valid, so -// that data that is no longer needed can be removed. -func (api *ConsensusAPI) ConsensusValidated(params ConsensusValidatedParams) error { - switch params.Status { - case VALID.Status: - return nil - case INVALID.Status: - // TODO (MariusVanDerWijden) delete the block from the bc - return nil - default: - return errors.New("invalid params.status") +func (api *ConsensusAPI) ForkchoiceUpdatedV1(heads ForkchoiceStateV1, PayloadAttributes *PayloadAttributesV1) (ForkChoiceResponse, error) { + if heads.HeadBlockHash == (common.Hash{}) { + return ForkChoiceResponse{Status: SUCCESS.Status, PayloadID: nil}, nil + } + if err := api.checkTerminalTotalDifficulty(heads.HeadBlockHash); err != nil { + if block := api.eth.BlockChain().GetBlockByHash(heads.HeadBlockHash); block == nil { + // TODO (MariusVanDerWijden) trigger sync + return SYNCING, nil + } + return INVALID, err + } + // If the finalized block is set, check if it is in our blockchain + if heads.FinalizedBlockHash != (common.Hash{}) { + if block := api.eth.BlockChain().GetBlockByHash(heads.FinalizedBlockHash); block == nil { + // TODO (MariusVanDerWijden) trigger sync + return SYNCING, nil + } } + // SetHead + if err := api.setHead(heads.HeadBlockHash); err != nil { + return INVALID, err + } + // Assemble block (if needed) + if PayloadAttributes != nil { + data, err := api.assembleBlock(heads.HeadBlockHash, PayloadAttributes) + if err != nil { + return INVALID, err + } + hash := computePayloadId(heads.HeadBlockHash, PayloadAttributes) + id := binary.BigEndian.Uint64(hash) + api.preparedBlocks[id] = data + log.Info("Created payload", "payloadid", id) + // TODO (MariusVanDerWijden) do something with the payloadID? + hex := hexutil.Bytes(hash) + return ForkChoiceResponse{Status: SUCCESS.Status, PayloadID: &hex}, nil + } + return ForkChoiceResponse{Status: SUCCESS.Status, PayloadID: nil}, nil } -func (api *ConsensusAPI) ForkchoiceUpdated(params ForkChoiceParams) error { - var emptyHash = common.Hash{} - if !bytes.Equal(params.HeadBlockHash[:], emptyHash[:]) { - if err := api.checkTerminalTotalDifficulty(params.HeadBlockHash); err != nil { - return err - } - return api.setHead(params.HeadBlockHash) +func computePayloadId(headBlockHash common.Hash, params *PayloadAttributesV1) []byte { + time := make([]byte, 8) + binary.BigEndian.PutUint64(time, params.Timestamp) + // Hash + hasher := sha256.New() + hasher.Write(headBlockHash[:]) + hasher.Write(time) + hasher.Write(params.Random[:]) + hasher.Write(params.FeeRecipient[:]) + return hasher.Sum([]byte{})[:8] +} + +func (api *ConsensusAPI) invalid() ExecutePayloadResponse { + if api.light { + return ExecutePayloadResponse{Status: INVALID.Status, LatestValidHash: api.les.BlockChain().CurrentHeader().Hash()} } - return nil + return ExecutePayloadResponse{Status: INVALID.Status, LatestValidHash: api.eth.BlockChain().CurrentHeader().Hash()} } // ExecutePayload creates an Eth1 block, inserts it in the chain, and returns the status of the chain. -func (api *ConsensusAPI) ExecutePayload(params ExecutableData) (GenericStringResponse, error) { +func (api *ConsensusAPI) ExecutePayloadV1(params ExecutableDataV1) (ExecutePayloadResponse, error) { block, err := ExecutableDataToBlock(params) if err != nil { - return INVALID, err + return api.invalid(), err } if api.light { parent := api.les.BlockChain().GetHeaderByHash(params.ParentHash) if parent == nil { - return INVALID, fmt.Errorf("could not find parent %x", params.ParentHash) + return api.invalid(), fmt.Errorf("could not find parent %x", params.ParentHash) } if err = api.les.BlockChain().InsertHeader(block.Header()); err != nil { - return INVALID, err + return api.invalid(), err } - return VALID, nil + return ExecutePayloadResponse{Status: VALID.Status, LatestValidHash: block.Hash()}, nil } if !api.eth.BlockChain().HasBlock(block.ParentHash(), block.NumberU64()-1) { - /* - TODO (MariusVanDerWijden) reenable once sync is merged - if err := api.eth.Downloader().BeaconSync(api.eth.SyncMode(), block.Header()); err != nil { - return SYNCING, err - } - */ - return SYNCING, nil + + //TODO (MariusVanDerWijden) reenable once sync is merged + if err := api.eth.Downloader().BeaconSync(api.eth.SyncMode(), block.Header()); err != nil { + return ExecutePayloadResponse{Status: SYNCING.Status, LatestValidHash: common.Hash{}}, err + } + + // TODO (MariusVanDerWijden) we should return nil here not empty hash + return ExecutePayloadResponse{Status: SYNCING.Status, LatestValidHash: common.Hash{}}, nil } parent := api.eth.BlockChain().GetBlockByHash(params.ParentHash) td := api.eth.BlockChain().GetTd(parent.Hash(), block.NumberU64()-1) ttd := api.eth.BlockChain().Config().TerminalTotalDifficulty if td.Cmp(ttd) < 0 { - return INVALID, fmt.Errorf("can not execute payload on top of block with low td got: %v threshold %v", td, ttd) + return api.invalid(), fmt.Errorf("can not execute payload on top of block with low td got: %v threshold %v", td, ttd) } + conf := api.eth.BlockChain().GetVMConfig() + conf.RandomOpcode = true + api.eth.BlockChain().SetVMConfig(*conf) if err := api.eth.BlockChain().InsertBlockWithoutSetHead(block); err != nil { - return INVALID, err + return api.invalid(), err } merger := api.merger() if !merger.TDDReached() { merger.ReachTTD() } - return VALID, nil + return ExecutePayloadResponse{Status: VALID.Status, LatestValidHash: block.Hash()}, nil } // AssembleBlock creates a new block, inserts it into the chain, and returns the "execution // data" required for eth2 clients to process the new block. -func (api *ConsensusAPI) assembleBlock(params AssembleBlockParams) (*ExecutableData, error) { +func (api *ConsensusAPI) assembleBlock(parentHash common.Hash, params *PayloadAttributesV1) (*ExecutableDataV1, error) { if api.light { return nil, errors.New("not supported") } - log.Info("Producing block", "parentHash", params.ParentHash) + log.Info("Producing block", "parentHash", parentHash) bc := api.eth.BlockChain() - parent := bc.GetBlockByHash(params.ParentHash) + parent := bc.GetBlockByHash(parentHash) if parent == nil { - log.Warn("Cannot assemble block with parent hash to unknown block", "parentHash", params.ParentHash) - return nil, fmt.Errorf("cannot assemble block with unknown parent %s", params.ParentHash) + log.Warn("Cannot assemble block with parent hash to unknown block", "parentHash", parentHash) + return nil, fmt.Errorf("cannot assemble block with unknown parent %s", parentHash) } if params.Timestamp < parent.Time() { @@ -287,7 +323,11 @@ func (api *ConsensusAPI) assembleBlock(params AssembleBlockParams) (*ExecutableD GasLimit: parent.GasLimit(), // Keep the gas limit constant in this prototype Extra: []byte{}, // TODO (MariusVanDerWijden) properly set extra data Time: params.Timestamp, + MixDigest: params.Random, } + conf := api.eth.BlockChain().GetVMConfig() + conf.RandomOpcode = true + api.eth.BlockChain().SetVMConfig(*conf) if config := api.eth.BlockChain().Config(); config.IsLondon(header.Number) { header.BaseFee = misc.CalcBaseFee(config, parent.Header()) } @@ -376,7 +416,7 @@ func decodeTransactions(enc [][]byte) ([]*types.Transaction, error) { return txs, nil } -func ExecutableDataToBlock(params ExecutableData) (*types.Block, error) { +func ExecutableDataToBlock(params ExecutableDataV1) (*types.Block, error) { txs, err := decodeTransactions(params.Transactions) if err != nil { return nil, err @@ -401,6 +441,7 @@ func ExecutableDataToBlock(params ExecutableData) (*types.Block, error) { Time: params.Timestamp, BaseFee: params.BaseFeePerGas, Extra: params.ExtraData, + MixDigest: params.Random, // TODO (MariusVanDerWijden) add params.Random to header once required } block := types.NewBlockWithHeader(header).WithBody(txs, nil /* uncles */) @@ -410,8 +451,8 @@ func ExecutableDataToBlock(params ExecutableData) (*types.Block, error) { return block, nil } -func BlockToExecutableData(block *types.Block, random common.Hash) *ExecutableData { - return &ExecutableData{ +func BlockToExecutableData(block *types.Block, random common.Hash) *ExecutableDataV1 { + return &ExecutableDataV1{ BlockHash: block.Hash(), ParentHash: block.ParentHash(), Coinbase: block.Coinbase(), @@ -447,11 +488,7 @@ func (api *ConsensusAPI) checkTerminalTotalDifficulty(head common.Hash) error { if newHeadBlock == nil { return &UnknownHeader } - parent := api.eth.BlockChain().GetBlockByHash(newHeadBlock.ParentHash()) - if parent == nil { - return fmt.Errorf("parent unavailable: %v", newHeadBlock.ParentHash()) - } - td := api.eth.BlockChain().GetTd(parent.Hash(), parent.NumberU64()) + td := api.eth.BlockChain().GetTd(newHeadBlock.Hash(), newHeadBlock.NumberU64()) if td != nil && td.Cmp(api.eth.BlockChain().Config().TerminalTotalDifficulty) < 0 { return errors.New("total difficulty not reached yet") } @@ -460,11 +497,6 @@ func (api *ConsensusAPI) checkTerminalTotalDifficulty(head common.Hash) error { // setHead is called to perform a force choice. func (api *ConsensusAPI) setHead(newHead common.Hash) error { - // Trigger the transition if it's the first `NewHead` event. - merger := api.merger() - if !merger.PoSFinalized() { - merger.FinalizePoS() - } log.Info("Setting head", "head", newHead) if api.light { headHeader := api.les.BlockChain().CurrentHeader() @@ -478,10 +510,20 @@ func (api *ConsensusAPI) setHead(newHead common.Hash) error { if err := api.les.BlockChain().SetChainHead(newHeadHeader); err != nil { return err } + // Trigger the transition if it's the first `NewHead` event. + merger := api.merger() + if !merger.PoSFinalized() { + merger.FinalizePoS() + } return nil } headBlock := api.eth.BlockChain().CurrentBlock() if headBlock.Hash() == newHead { + // Trigger the transition if it's the first `NewHead` event. + merger := api.merger() + if !merger.PoSFinalized() { + merger.FinalizePoS() + } return nil } newHeadBlock := api.eth.BlockChain().GetBlockByHash(newHead) @@ -491,6 +533,12 @@ func (api *ConsensusAPI) setHead(newHead common.Hash) error { if err := api.eth.BlockChain().SetChainHead(newHeadBlock); err != nil { return err } + // Trigger the transition if it's the first `NewHead` event. + merger := api.merger() + if !merger.PoSFinalized() { + merger.FinalizePoS() + } + // TODO (MariusVanDerWijden) are we really synced now? api.eth.SetSynced() return nil } diff --git a/eth/catalyst/api_test.go b/eth/catalyst/api_test.go index 98130673eb74..012b83f9c912 100644 --- a/eth/catalyst/api_test.go +++ b/eth/catalyst/api_test.go @@ -85,11 +85,10 @@ func TestEth2AssembleBlock(t *testing.T) { t.Fatalf("error signing transaction, err=%v", err) } ethservice.TxPool().AddLocal(tx) - blockParams := AssembleBlockParams{ - ParentHash: blocks[9].Hash(), - Timestamp: blocks[9].Time() + 5, + blockParams := PayloadAttributesV1{ + Timestamp: blocks[9].Time() + 5, } - execData, err := api.assembleBlock(blockParams) + execData, err := api.assembleBlock(blocks[9].Hash(), &blockParams) if err != nil { t.Fatalf("error producing block, err=%v", err) } @@ -107,11 +106,10 @@ func TestEth2AssembleBlockWithAnotherBlocksTxs(t *testing.T) { // Put the 10th block's tx in the pool and produce a new block api.insertTransactions(blocks[9].Transactions()) - blockParams := AssembleBlockParams{ - ParentHash: blocks[8].Hash(), - Timestamp: blocks[8].Time() + 5, + blockParams := PayloadAttributesV1{ + Timestamp: blocks[8].Time() + 5, } - execData, err := api.assembleBlock(blockParams) + execData, err := api.assembleBlock(blocks[8].Hash(), &blockParams) if err != nil { t.Fatalf("error producing block, err=%v", err) } @@ -126,14 +124,20 @@ func TestSetHeadBeforeTotalDifficulty(t *testing.T) { defer n.Close() api := NewConsensusAPI(ethservice, nil) - - if err := api.ForkchoiceUpdated(ForkChoiceParams{HeadBlockHash: blocks[5].Hash()}); err == nil { + fcState := ForkchoiceStateV1{ + HeadBlockHash: blocks[5].Hash(), + SafeBlockHash: common.Hash{}, + FinalizedBlockHash: common.Hash{}, + } + if _, err := api.ForkchoiceUpdatedV1(fcState, nil); err == nil { t.Errorf("fork choice updated before total terminal difficulty should fail") } } func TestEth2PrepareAndGetPayload(t *testing.T) { genesis, blocks := generatePreMergeChain(10) + // We need to properly set the terminal total difficulty + genesis.Config.TerminalTotalDifficulty.Sub(genesis.Config.TerminalTotalDifficulty, blocks[9].Difficulty()) n, ethservice := startEthService(t, genesis, blocks[:9]) defer n.Close() @@ -141,15 +145,20 @@ func TestEth2PrepareAndGetPayload(t *testing.T) { // Put the 10th block's tx in the pool and produce a new block api.insertTransactions(blocks[9].Transactions()) - blockParams := AssembleBlockParams{ - ParentHash: blocks[8].Hash(), - Timestamp: blocks[8].Time() + 5, + blockParams := PayloadAttributesV1{ + Timestamp: blocks[8].Time() + 5, + } + fcState := ForkchoiceStateV1{ + HeadBlockHash: blocks[8].Hash(), + SafeBlockHash: common.Hash{}, + FinalizedBlockHash: common.Hash{}, } - respID, err := api.PreparePayload(blockParams) + _, err := api.ForkchoiceUpdatedV1(fcState, &blockParams) if err != nil { t.Fatalf("error preparing payload, err=%v", err) } - execData, err := api.GetPayload(hexutil.Uint64(respID.PayloadID)) + payloadID := computePayloadId(fcState.HeadBlockHash, &blockParams) + execData, err := api.GetPayloadV1(hexutil.Bytes(payloadID)) if err != nil { t.Fatalf("error getting payload, err=%v", err) } @@ -201,9 +210,8 @@ func TestEth2NewBlock(t *testing.T) { tx, _ := types.SignTx(types.NewContractCreation(nonce, new(big.Int), 1000000, big.NewInt(2*params.InitialBaseFee), logCode), types.LatestSigner(ethservice.BlockChain().Config()), testKey) ethservice.TxPool().AddLocal(tx) - execData, err := api.assembleBlock(AssembleBlockParams{ - ParentHash: parent.Hash(), - Timestamp: parent.Time() + 5, + execData, err := api.assembleBlock(parent.Hash(), &PayloadAttributesV1{ + Timestamp: parent.Time() + 5, }) if err != nil { t.Fatalf("Failed to create the executable data %v", err) @@ -212,7 +220,7 @@ func TestEth2NewBlock(t *testing.T) { if err != nil { t.Fatalf("Failed to convert executable data to block %v", err) } - newResp, err := api.ExecutePayload(*execData) + newResp, err := api.ExecutePayloadV1(*execData) if err != nil || newResp.Status != "VALID" { t.Fatalf("Failed to insert block: %v", err) } @@ -220,8 +228,12 @@ func TestEth2NewBlock(t *testing.T) { t.Fatalf("Chain head shouldn't be updated") } checkLogEvents(t, newLogCh, rmLogsCh, 0, 0) - - if err := api.ForkchoiceUpdated(ForkChoiceParams{HeadBlockHash: block.Hash(), FinalizedBlockHash: block.Hash()}); err != nil { + fcState := ForkchoiceStateV1{ + HeadBlockHash: block.Hash(), + SafeBlockHash: block.Hash(), + FinalizedBlockHash: block.Hash(), + } + if _, err := api.ForkchoiceUpdatedV1(fcState, nil); err != nil { t.Fatalf("Failed to insert block: %v", err) } if ethservice.BlockChain().CurrentBlock().NumberU64() != block.NumberU64() { @@ -238,9 +250,8 @@ func TestEth2NewBlock(t *testing.T) { ) parent = preMergeBlocks[len(preMergeBlocks)-1] for i := 0; i < 10; i++ { - execData, err := api.assembleBlock(AssembleBlockParams{ - ParentHash: parent.Hash(), - Timestamp: parent.Time() + 6, + execData, err := api.assembleBlock(parent.Hash(), &PayloadAttributesV1{ + Timestamp: parent.Time() + 6, }) if err != nil { t.Fatalf("Failed to create the executable data %v", err) @@ -249,7 +260,7 @@ func TestEth2NewBlock(t *testing.T) { if err != nil { t.Fatalf("Failed to convert executable data to block %v", err) } - newResp, err := api.ExecutePayload(*execData) + newResp, err := api.ExecutePayloadV1(*execData) if err != nil || newResp.Status != "VALID" { t.Fatalf("Failed to insert block: %v", err) } @@ -257,10 +268,12 @@ func TestEth2NewBlock(t *testing.T) { t.Fatalf("Chain head shouldn't be updated") } - if err := api.ConsensusValidated(ConsensusValidatedParams{BlockHash: block.Hash(), Status: "VALID"}); err != nil { - t.Fatalf("Failed to insert block: %v", err) + fcState := ForkchoiceStateV1{ + HeadBlockHash: block.Hash(), + SafeBlockHash: block.Hash(), + FinalizedBlockHash: block.Hash(), } - if err := api.ForkchoiceUpdated(ForkChoiceParams{FinalizedBlockHash: block.Hash(), HeadBlockHash: block.Hash()}); err != nil { + if _, err := api.ForkchoiceUpdatedV1(fcState, nil); err != nil { t.Fatalf("Failed to insert block: %v", err) } if ethservice.BlockChain().CurrentBlock().NumberU64() != block.NumberU64() { @@ -360,33 +373,41 @@ func TestFullAPI(t *testing.T) { tx, _ := types.SignTx(types.NewContractCreation(nonce, new(big.Int), 1000000, big.NewInt(2*params.InitialBaseFee), logCode), types.LatestSigner(ethservice.BlockChain().Config()), testKey) ethservice.TxPool().AddLocal(tx) - params := AssembleBlockParams{ - ParentHash: parent.Hash(), + params := PayloadAttributesV1{ Timestamp: parent.Time() + 1, Random: crypto.Keccak256Hash([]byte{byte(i)}), FeeRecipient: parent.Coinbase(), } - resp, err := api.PreparePayload(params) + fcState := ForkchoiceStateV1{ + HeadBlockHash: parent.Hash(), + SafeBlockHash: common.Hash{}, + FinalizedBlockHash: common.Hash{}, + } + resp, err := api.ForkchoiceUpdatedV1(fcState, ¶ms) if err != nil { - t.Fatalf("can't prepare payload: %v", err) + t.Fatalf("error preparing payload, err=%v", err) + } + if resp.Status != SUCCESS.Status { + t.Fatalf("error preparing payload, invalid status: %v", resp.Status) } - payload, err := api.GetPayload(hexutil.Uint64(resp.PayloadID)) + payloadID := computePayloadId(parent.Hash(), ¶ms) + payload, err := api.GetPayloadV1(hexutil.Bytes(payloadID)) if err != nil { t.Fatalf("can't get payload: %v", err) } - execResp, err := api.ExecutePayload(*payload) + execResp, err := api.ExecutePayloadV1(*payload) if err != nil { t.Fatalf("can't execute payload: %v", err) } if execResp.Status != VALID.Status { t.Fatalf("invalid status: %v", execResp.Status) } - - if err := api.ConsensusValidated(ConsensusValidatedParams{BlockHash: payload.BlockHash, Status: VALID.Status}); err != nil { - t.Fatalf("failed to validate consensus: %v", err) + fcState = ForkchoiceStateV1{ + HeadBlockHash: payload.BlockHash, + SafeBlockHash: payload.ParentHash, + FinalizedBlockHash: payload.ParentHash, } - - if err := api.ForkchoiceUpdated(ForkChoiceParams{HeadBlockHash: payload.BlockHash, FinalizedBlockHash: payload.BlockHash}); err != nil { + if _, err := api.ForkchoiceUpdatedV1(fcState, nil); err != nil { t.Fatalf("Failed to insert block: %v", err) } if ethservice.BlockChain().CurrentBlock().NumberU64() != payload.Number { diff --git a/eth/catalyst/api_types.go b/eth/catalyst/api_types.go index ff0aea39bf24..2b684e564f4d 100644 --- a/eth/catalyst/api_types.go +++ b/eth/catalyst/api_types.go @@ -23,26 +23,24 @@ import ( "github.com/ethereum/go-ethereum/common/hexutil" ) -//go:generate go run github.com/fjl/gencodec -type AssembleBlockParams -field-override assembleBlockParamsMarshaling -out gen_blockparams.go +//go:generate go run github.com/fjl/gencodec -type PayloadAttributesV1 -field-override payloadAttributesMarshaling -out gen_blockparams.go // Structure described at https://github.com/ethereum/execution-apis/pull/74 -type AssembleBlockParams struct { - ParentHash common.Hash `json:"parentHash" gencodec:"required"` +type PayloadAttributesV1 struct { Timestamp uint64 `json:"timestamp" gencodec:"required"` Random common.Hash `json:"random" gencodec:"required"` FeeRecipient common.Address `json:"feeRecipient" gencodec:"required"` } -// JSON type overrides for assembleBlockParams. -type assembleBlockParamsMarshaling struct { +// JSON type overrides for PayloadAttributesV1. +type payloadAttributesMarshaling struct { Timestamp hexutil.Uint64 } -//go:generate go run github.com/fjl/gencodec -type ExecutableData -field-override executableDataMarshaling -out gen_ed.go +//go:generate go run github.com/fjl/gencodec -type ExecutableDataV1 -field-override executableDataMarshaling -out gen_ed.go -// Structure described at https://github.com/ethereum/execution-apis/pull/74/files -type ExecutableData struct { - BlockHash common.Hash `json:"blockHash" gencodec:"required"` +// Structure described at https://github.com/ethereum/execution-apis/src/engine/specification.md +type ExecutableDataV1 struct { ParentHash common.Hash `json:"parentHash" gencodec:"required"` Coinbase common.Address `json:"coinbase" gencodec:"required"` StateRoot common.Hash `json:"stateRoot" gencodec:"required"` @@ -55,6 +53,7 @@ type ExecutableData struct { Timestamp uint64 `json:"timestamp" gencodec:"required"` ExtraData []byte `json:"extraData" gencodec:"required"` BaseFeePerGas *big.Int `json:"baseFeePerGas" gencodec:"required"` + BlockHash common.Hash `json:"blockHash" gencodec:"required"` Transactions [][]byte `json:"transactions" gencodec:"required"` } @@ -93,12 +92,23 @@ type GenericStringResponse struct { Status string `json:"status"` } +type ExecutePayloadResponse struct { + Status string `json:"status"` + LatestValidHash common.Hash `json:"latestValidHash"` +} + type ConsensusValidatedParams struct { BlockHash common.Hash `json:"blockHash"` Status string `json:"status"` } -type ForkChoiceParams struct { +type ForkChoiceResponse struct { + Status string `json:"status"` + PayloadID *hexutil.Bytes `json:"payloadId"` +} + +type ForkchoiceStateV1 struct { HeadBlockHash common.Hash `json:"headBlockHash"` + SafeBlockHash common.Hash `json:"safeBlockHash"` FinalizedBlockHash common.Hash `json:"finalizedBlockHash"` } diff --git a/eth/catalyst/gen_blockparams.go b/eth/catalyst/gen_blockparams.go index 9928c12908a9..04ea6faf0bac 100644 --- a/eth/catalyst/gen_blockparams.go +++ b/eth/catalyst/gen_blockparams.go @@ -10,51 +10,44 @@ import ( "github.com/ethereum/go-ethereum/common/hexutil" ) -var _ = (*assembleBlockParamsMarshaling)(nil) +var _ = (*payloadAttributesMarshaling)(nil) // MarshalJSON marshals as JSON. -func (a AssembleBlockParams) MarshalJSON() ([]byte, error) { - type AssembleBlockParams struct { - ParentHash common.Hash `json:"parentHash" gencodec:"required"` +func (p PayloadAttributesV1) MarshalJSON() ([]byte, error) { + type PayloadAttributesV1 struct { Timestamp hexutil.Uint64 `json:"timestamp" gencodec:"required"` Random common.Hash `json:"random" gencodec:"required"` FeeRecipient common.Address `json:"feeRecipient" gencodec:"required"` } - var enc AssembleBlockParams - enc.ParentHash = a.ParentHash - enc.Timestamp = hexutil.Uint64(a.Timestamp) - enc.Random = a.Random - enc.FeeRecipient = a.FeeRecipient + var enc PayloadAttributesV1 + enc.Timestamp = hexutil.Uint64(p.Timestamp) + enc.Random = p.Random + enc.FeeRecipient = p.FeeRecipient return json.Marshal(&enc) } // UnmarshalJSON unmarshals from JSON. -func (a *AssembleBlockParams) UnmarshalJSON(input []byte) error { - type AssembleBlockParams struct { - ParentHash *common.Hash `json:"parentHash" gencodec:"required"` +func (p *PayloadAttributesV1) UnmarshalJSON(input []byte) error { + type PayloadAttributesV1 struct { Timestamp *hexutil.Uint64 `json:"timestamp" gencodec:"required"` Random *common.Hash `json:"random" gencodec:"required"` FeeRecipient *common.Address `json:"feeRecipient" gencodec:"required"` } - var dec AssembleBlockParams + var dec PayloadAttributesV1 if err := json.Unmarshal(input, &dec); err != nil { return err } - if dec.ParentHash == nil { - return errors.New("missing required field 'parentHash' for AssembleBlockParams") - } - a.ParentHash = *dec.ParentHash if dec.Timestamp == nil { - return errors.New("missing required field 'timestamp' for AssembleBlockParams") + return errors.New("missing required field 'timestamp' for PayloadAttributesV1") } - a.Timestamp = uint64(*dec.Timestamp) + p.Timestamp = uint64(*dec.Timestamp) if dec.Random == nil { - return errors.New("missing required field 'random' for AssembleBlockParams") + return errors.New("missing required field 'random' for PayloadAttributesV1") } - a.Random = *dec.Random + p.Random = *dec.Random if dec.FeeRecipient == nil { - return errors.New("missing required field 'feeRecipient' for AssembleBlockParams") + return errors.New("missing required field 'feeRecipient' for PayloadAttributesV1") } - a.FeeRecipient = *dec.FeeRecipient + p.FeeRecipient = *dec.FeeRecipient return nil } diff --git a/eth/catalyst/gen_ed.go b/eth/catalyst/gen_ed.go index 2953ab820fc2..babfb44df0bd 100644 --- a/eth/catalyst/gen_ed.go +++ b/eth/catalyst/gen_ed.go @@ -14,9 +14,8 @@ import ( var _ = (*executableDataMarshaling)(nil) // MarshalJSON marshals as JSON. -func (e ExecutableData) MarshalJSON() ([]byte, error) { - type ExecutableData struct { - BlockHash common.Hash `json:"blockHash" gencodec:"required"` +func (e ExecutableDataV1) MarshalJSON() ([]byte, error) { + type ExecutableDataV1 struct { ParentHash common.Hash `json:"parentHash" gencodec:"required"` Coinbase common.Address `json:"coinbase" gencodec:"required"` StateRoot common.Hash `json:"stateRoot" gencodec:"required"` @@ -29,10 +28,10 @@ func (e ExecutableData) MarshalJSON() ([]byte, error) { Timestamp hexutil.Uint64 `json:"timestamp" gencodec:"required"` ExtraData hexutil.Bytes `json:"extraData" gencodec:"required"` BaseFeePerGas *hexutil.Big `json:"baseFeePerGas" gencodec:"required"` + BlockHash common.Hash `json:"blockHash" gencodec:"required"` Transactions []hexutil.Bytes `json:"transactions" gencodec:"required"` } - var enc ExecutableData - enc.BlockHash = e.BlockHash + var enc ExecutableDataV1 enc.ParentHash = e.ParentHash enc.Coinbase = e.Coinbase enc.StateRoot = e.StateRoot @@ -45,6 +44,7 @@ func (e ExecutableData) MarshalJSON() ([]byte, error) { enc.Timestamp = hexutil.Uint64(e.Timestamp) enc.ExtraData = e.ExtraData enc.BaseFeePerGas = (*hexutil.Big)(e.BaseFeePerGas) + enc.BlockHash = e.BlockHash if e.Transactions != nil { enc.Transactions = make([]hexutil.Bytes, len(e.Transactions)) for k, v := range e.Transactions { @@ -55,9 +55,8 @@ func (e ExecutableData) MarshalJSON() ([]byte, error) { } // UnmarshalJSON unmarshals from JSON. -func (e *ExecutableData) UnmarshalJSON(input []byte) error { - type ExecutableData struct { - BlockHash *common.Hash `json:"blockHash" gencodec:"required"` +func (e *ExecutableDataV1) UnmarshalJSON(input []byte) error { + type ExecutableDataV1 struct { ParentHash *common.Hash `json:"parentHash" gencodec:"required"` Coinbase *common.Address `json:"coinbase" gencodec:"required"` StateRoot *common.Hash `json:"stateRoot" gencodec:"required"` @@ -70,66 +69,67 @@ func (e *ExecutableData) UnmarshalJSON(input []byte) error { Timestamp *hexutil.Uint64 `json:"timestamp" gencodec:"required"` ExtraData *hexutil.Bytes `json:"extraData" gencodec:"required"` BaseFeePerGas *hexutil.Big `json:"baseFeePerGas" gencodec:"required"` + BlockHash *common.Hash `json:"blockHash" gencodec:"required"` Transactions []hexutil.Bytes `json:"transactions" gencodec:"required"` } - var dec ExecutableData + var dec ExecutableDataV1 if err := json.Unmarshal(input, &dec); err != nil { return err } - if dec.BlockHash == nil { - return errors.New("missing required field 'blockHash' for ExecutableData") - } - e.BlockHash = *dec.BlockHash if dec.ParentHash == nil { - return errors.New("missing required field 'parentHash' for ExecutableData") + return errors.New("missing required field 'parentHash' for ExecutableDataV1") } e.ParentHash = *dec.ParentHash if dec.Coinbase == nil { - return errors.New("missing required field 'coinbase' for ExecutableData") + return errors.New("missing required field 'coinbase' for ExecutableDataV1") } e.Coinbase = *dec.Coinbase if dec.StateRoot == nil { - return errors.New("missing required field 'stateRoot' for ExecutableData") + return errors.New("missing required field 'stateRoot' for ExecutableDataV1") } e.StateRoot = *dec.StateRoot if dec.ReceiptRoot == nil { - return errors.New("missing required field 'receiptRoot' for ExecutableData") + return errors.New("missing required field 'receiptRoot' for ExecutableDataV1") } e.ReceiptRoot = *dec.ReceiptRoot if dec.LogsBloom == nil { - return errors.New("missing required field 'logsBloom' for ExecutableData") + return errors.New("missing required field 'logsBloom' for ExecutableDataV1") } e.LogsBloom = *dec.LogsBloom if dec.Random == nil { - return errors.New("missing required field 'random' for ExecutableData") + return errors.New("missing required field 'random' for ExecutableDataV1") } e.Random = *dec.Random if dec.Number == nil { - return errors.New("missing required field 'blockNumber' for ExecutableData") + return errors.New("missing required field 'blockNumber' for ExecutableDataV1") } e.Number = uint64(*dec.Number) if dec.GasLimit == nil { - return errors.New("missing required field 'gasLimit' for ExecutableData") + return errors.New("missing required field 'gasLimit' for ExecutableDataV1") } e.GasLimit = uint64(*dec.GasLimit) if dec.GasUsed == nil { - return errors.New("missing required field 'gasUsed' for ExecutableData") + return errors.New("missing required field 'gasUsed' for ExecutableDataV1") } e.GasUsed = uint64(*dec.GasUsed) if dec.Timestamp == nil { - return errors.New("missing required field 'timestamp' for ExecutableData") + return errors.New("missing required field 'timestamp' for ExecutableDataV1") } e.Timestamp = uint64(*dec.Timestamp) if dec.ExtraData == nil { - return errors.New("missing required field 'extraData' for ExecutableData") + return errors.New("missing required field 'extraData' for ExecutableDataV1") } e.ExtraData = *dec.ExtraData if dec.BaseFeePerGas == nil { - return errors.New("missing required field 'baseFeePerGas' for ExecutableData") + return errors.New("missing required field 'baseFeePerGas' for ExecutableDataV1") } e.BaseFeePerGas = (*big.Int)(dec.BaseFeePerGas) + if dec.BlockHash == nil { + return errors.New("missing required field 'blockHash' for ExecutableDataV1") + } + e.BlockHash = *dec.BlockHash if dec.Transactions == nil { - return errors.New("missing required field 'transactions' for ExecutableData") + return errors.New("missing required field 'transactions' for ExecutableDataV1") } e.Transactions = make([][]byte, len(dec.Transactions)) for k, v := range dec.Transactions { diff --git a/eth/downloader/beaconsync.go b/eth/downloader/beaconsync.go new file mode 100644 index 000000000000..6c0b76016053 --- /dev/null +++ b/eth/downloader/beaconsync.go @@ -0,0 +1,221 @@ +// Copyright 2021 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package downloader + +import ( + "sync" + "sync/atomic" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/log" +) + +// beaconBackfiller is the chain and state backfilling that can be commenced once +// the skeleton syncer has successfully reverse downloaded all the headers up to +// the genesis block or an existing header in the database. Its operation is fully +// directed by the skeleton sync's head/tail events. +type beaconBackfiller struct { + downloader *Downloader // Downloader to direct via this callback implementation + syncMode SyncMode // Sync mode to use for backfilling the skeleton chains + success func() // Callback to run on successful sync cycle completion + filling bool // Flag whether the downloader is backfilling or not + lock sync.Mutex // Mutex protecting the sync lock +} + +// newBeaconBackfiller is a helper method to create the backfiller. +func newBeaconBackfiller(dl *Downloader, success func()) backfiller { + return &beaconBackfiller{ + downloader: dl, + success: success, + } +} + +// suspend cancels any background downloader threads. +func (b *beaconBackfiller) suspend() { + b.downloader.Cancel() +} + +// resume starts the downloader threads for backfilling state and chain data. +func (b *beaconBackfiller) resume() { + b.lock.Lock() + if b.filling { + // If a previous filling cycle is still running, just ignore this start + // request. // TODO(karalabe): We should make this channel driven + b.lock.Unlock() + return + } + b.filling = true + mode := b.syncMode + b.lock.Unlock() + + // Start the backfilling on its own thread since the downloader does not have + // its own lifecycle runloop. + go func() { + // Set the backfiller to non-filling when download completes + defer func() { + b.lock.Lock() + b.filling = false + b.lock.Unlock() + }() + // If the downloader fails, report an error as in beacon chain mode there + // should be no errors as long as the chain we're syncing to is valid. + if err := b.downloader.synchronise("", common.Hash{}, nil, mode, true); err != nil { + log.Error("Beacon backfilling failed", "err", err) + return + } + // Synchronization succeeded. Since this happens async, notify the outer + // context to disable snap syncing and enable transaction propagation. + if b.success != nil { + b.success() + } + }() +} + +// setMode updates the sync mode from the current one to the requested one. If +// there's an active sync in progress, it will be cancelled and restarted. +func (b *beaconBackfiller) setMode(mode SyncMode) { + // Update the old sync mode and track if it was changed + b.lock.Lock() + updated := b.syncMode != mode + filling := b.filling + b.syncMode = mode + b.lock.Unlock() + + // If the sync mode was changed mid-sync, restart. This should never ever + // really happen, we just handle it to detect programming errors. + if !updated || !filling { + return + } + log.Error("Downloader sync mode changed mid-run", "old", mode.String(), "new", mode.String()) + b.suspend() + b.resume() +} + +// BeaconSync is the Ethereum 2 version of the chain synchronization, where the +// chain is not downloaded from genesis onward, rather from trusted head announces +// backwards. +// +// Internally backfilling and state sync is done the same way, but the header +// retrieval and scheduling is replaced. +func (d *Downloader) BeaconSync(mode SyncMode, head *types.Header) error { + // When the downloader starts a sync cycle, it needs to be aware of the sync + // mode to use (full, snap). To keep the skeleton chain oblivious, inject the + // mode into the backfiller directly. + // + // Super crazy dangerous type cast. Should be fine (TM), we're only using a + // different backfiller implementation for skeleton tests. + d.skeleton.filler.(*beaconBackfiller).setMode(mode) + + // Signal the skeleton sync to switch to a new head, however it wants + if err := d.skeleton.Sync(head); err != nil { + return err + } + return nil +} + +// findBeaconAncestor tries to locate the common ancestor link of the local chain +// and the beacon chain just requested. In the general case when our node was in +// sync and on the correct chain, checking the top N links should already get us +// a match. In the rare scenario when we ended up on a long reorganisation (i.e. +// none of the head links match), we do a binary search to find the ancestor. +func (d *Downloader) findBeaconAncestor() uint64 { + // Figure out the current local head position + var head *types.Header + + switch d.getMode() { + case FullSync: + head = d.blockchain.CurrentBlock().Header() + case SnapSync: + head = d.blockchain.CurrentFastBlock().Header() + default: + head = d.lightchain.CurrentHeader() + } + number := head.Number.Uint64() + + // If the head is present in the skeleton chain, return that + if head.Hash() == d.skeleton.Header(number).Hash() { + return number + } + // Head header not present, binary search to find the ancestor + start, end := uint64(0), number + for start+1 < end { + // Split our chain interval in two, and request the hash to cross check + check := (start + end) / 2 + + h := d.skeleton.Header(check) + n := h.Number.Uint64() + + var known bool + switch d.getMode() { + case FullSync: + known = d.blockchain.HasBlock(h.Hash(), n) + case SnapSync: + known = d.blockchain.HasFastBlock(h.Hash(), n) + default: + known = d.lightchain.HasHeader(h.Hash(), n) + } + if !known { + end = check + continue + } + start = check + } + return start +} + +// fetchBeaconHeaders feeds skeleton headers to the downloader queue for scheduling +// until sync errors or is finished. +func (d *Downloader) fetchBeaconHeaders(from uint64) error { + head, err := d.skeleton.Head() + if err != nil { + return err + } + for { + // Retrieve a batch of headers and feed it to the header processor + headers := make([]*types.Header, 0, maxHeadersProcess) + for i := 0; i < maxHeadersProcess && from <= head.Number.Uint64(); i++ { + headers = append(headers, d.skeleton.Header(from)) + from++ + } + select { + case d.headerProcCh <- headers: + case <-d.cancelCh: + return errCanceled + } + // If we still have headers to import, loop and keep pushing them + if from <= head.Number.Uint64() { + continue + } + // If the pivot block is committed, signal header sync termination + if atomic.LoadInt32(&d.committed) == 1 { + d.headerProcCh <- nil + return nil + } + // State sync still going, wait a bit for new headers and retry + select { + case <-time.After(fsHeaderContCheck): + case <-d.cancelCh: + return errCanceled + } + head, err = d.skeleton.Head() + if err != nil { + return err + } + } +} diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index 95b90d3b5710..50b6adbdd71a 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -30,7 +30,6 @@ import ( "github.com/ethereum/go-ethereum/core/rawdb" "github.com/ethereum/go-ethereum/core/state/snapshot" "github.com/ethereum/go-ethereum/core/types" - "github.com/ethereum/go-ethereum/eth/protocols/eth" "github.com/ethereum/go-ethereum/eth/protocols/snap" "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/event" @@ -118,6 +117,9 @@ type Downloader struct { // Channels headerProcCh chan []*types.Header // Channel to feed the header processor new tasks + // Skeleton sync + skeleton *skeleton // Header skeleton to backfill the chain with (eth2 mode) + // State sync pivotHeader *types.Header // Pivot block header to dynamically push the syncing state root pivotLock sync.RWMutex // Lock protecting pivot header reads from updates @@ -196,7 +198,7 @@ type BlockChain interface { } // New creates a new downloader to fetch hashes and blocks from remote peers. -func New(checkpoint uint64, stateDb ethdb.Database, stateBloom *trie.SyncBloom, mux *event.TypeMux, chain BlockChain, lightchain LightChain, dropPeer peerDropFn) *Downloader { +func New(checkpoint uint64, stateDb ethdb.Database, stateBloom *trie.SyncBloom, mux *event.TypeMux, chain BlockChain, lightchain LightChain, dropPeer peerDropFn, success func()) *Downloader { if lightchain == nil { lightchain = chain } @@ -215,6 +217,8 @@ func New(checkpoint uint64, stateDb ethdb.Database, stateBloom *trie.SyncBloom, SnapSyncer: snap.NewSyncer(stateDb), stateSyncStart: make(chan *stateSync), } + dl.skeleton = newSkeleton(stateDb, dl.peers, dropPeer, newBeaconBackfiller(dl, success)) + go dl.stateFetcher() return dl } @@ -314,10 +318,10 @@ func (d *Downloader) UnregisterPeer(id string) error { return nil } -// Synchronise tries to sync up our local block chain with a remote peer, both +// LegacySync tries to sync up our local block chain with a remote peer, both // adding various sanity checks as well as wrapping it with various log entries. -func (d *Downloader) Synchronise(id string, head common.Hash, td *big.Int, mode SyncMode) error { - err := d.synchronise(id, head, td, mode) +func (d *Downloader) LegacySync(id string, head common.Hash, td *big.Int, mode SyncMode) error { + err := d.synchronise(id, head, td, mode, false) switch err { case nil, errBusy, errCanceled: @@ -343,7 +347,7 @@ func (d *Downloader) Synchronise(id string, head common.Hash, td *big.Int, mode // synchronise will select the peer and use it for synchronising. If an empty string is given // it will use the best peer possible and synchronize if its TD is higher than our own. If any of the // checks fail an error will be returned. This method is synchronous -func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int, mode SyncMode) error { +func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int, mode SyncMode, beaconMode bool) error { // Mock out the synchronisation if testing if d.synchroniseMock != nil { return d.synchroniseMock(id, hash) @@ -404,11 +408,14 @@ func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int, mode atomic.StoreUint32(&d.mode, uint32(mode)) // Retrieve the origin peer and initiate the downloading process - p := d.peers.Peer(id) - if p == nil { - return errUnknownPeer + var p *peerConnection + if !beaconMode { // Beacon mode doesn't need a peer to sync from + p = d.peers.Peer(id) + if p == nil { + return errUnknownPeer + } } - return d.syncWithPeer(p, hash, td) + return d.syncWithPeer(p, hash, td, beaconMode) } func (d *Downloader) getMode() SyncMode { @@ -417,7 +424,7 @@ func (d *Downloader) getMode() SyncMode { // syncWithPeer starts a block synchronization based on the hash chain from the // specified peer and head hash. -func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td *big.Int) (err error) { +func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td *big.Int, beaconMode bool) (err error) { d.mux.Post(StartEvent{}) defer func() { // reset on error @@ -428,33 +435,57 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td *big.I d.mux.Post(DoneEvent{latest}) } }() - if p.version < eth.ETH66 { - return fmt.Errorf("%w: advertized %d < required %d", errTooOld, p.version, eth.ETH66) - } mode := d.getMode() - log.Debug("Synchronising with the network", "peer", p.id, "eth", p.version, "head", hash, "td", td, "mode", mode) + if !beaconMode { + log.Debug("Synchronising with the network", "peer", p.id, "eth", p.version, "head", hash, "td", td, "mode", mode) + } else { + log.Debug("Backfilling with the network", "mode", mode) + } defer func(start time.Time) { log.Debug("Synchronisation terminated", "elapsed", common.PrettyDuration(time.Since(start))) }(time.Now()) // Look up the sync boundaries: the common ancestor and the target block - latest, pivot, err := d.fetchHead(p) - if err != nil { - return err - } - if mode == SnapSync && pivot == nil { - // If no pivot block was returned, the head is below the min full block - // threshold (i.e. new chain). In that case we won't really snap sync - // anyway, but still need a valid pivot block to avoid some code hitting - // nil panics on an access. - pivot = d.blockchain.CurrentBlock().Header() + var latest, pivot *types.Header + if !beaconMode { + // In legacy mode, use the master peer to retrieve the headers from + latest, pivot, err = d.fetchHead(p) + if err != nil { + return err + } + if mode == SnapSync && pivot == nil { + // If no pivot block was returned, the head is below the min full block + // threshold (i.e. new chain). In that case we won't really snap sync + // anyway, but still need a valid pivot block to avoid some code hitting + // nil panics on an access. + pivot = d.blockchain.CurrentBlock().Header() + } + } else { + // In beacon mode, user the skeleton chain to retrieve the headers from + latest, err = d.skeleton.Head() + if err != nil { + return err + } + // Opposed to legacy mode, in beacon mode we trust the chain we've been + // told to sync to, so no need to leave a gap between the pivot and head + // to full sync. Still, the downloader's been architected to do a full + // block import after the pivot, so make it off by one to avoid having + // to special case everything internally. + pivot = d.skeleton.Header(latest.Number.Uint64() - 1) } height := latest.Number.Uint64() - origin, err := d.findAncestor(p, latest) - if err != nil { - return err + var origin uint64 + if !beaconMode { + // In legacy mode, reach out to the network and find the ancestor + origin, err = d.findAncestor(p, latest) + if err != nil { + return err + } + } else { + // In beacon mode, use the skeleton chain for the ancestor lookup + origin = d.findBeaconAncestor() } d.syncStatsLock.Lock() if d.syncStatsChainHeight <= origin || d.syncStatsChainOrigin > origin { @@ -525,11 +556,19 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td *big.I if d.syncInitHook != nil { d.syncInitHook(origin, height) } + var headerFetcher func() error + if !beaconMode { + // In legacy mode, headers are retrieved from the network + headerFetcher = func() error { return d.fetchHeaders(p, origin+1, latest.Number.Uint64()) } + } else { + // In beacon mode, headers are served by the skeleton syncer + headerFetcher = func() error { return d.fetchBeaconHeaders(origin + 1) } + } fetchers := []func() error{ - func() error { return d.fetchHeaders(p, origin+1, latest.Number.Uint64()) }, // Headers are always retrieved - func() error { return d.fetchBodies(origin + 1) }, // Bodies are retrieved during normal and snap sync - func() error { return d.fetchReceipts(origin + 1) }, // Receipts are retrieved during snap sync - func() error { return d.processHeaders(origin+1, td) }, + headerFetcher, // Headers are always retrieved + func() error { return d.fetchBodies(origin+1, beaconMode) }, // Bodies are retrieved during normal and snap sync + func() error { return d.fetchReceipts(origin+1, beaconMode) }, // Receipts are retrieved during snap sync + func() error { return d.processHeaders(origin+1, td, beaconMode) }, } if mode == SnapSync { d.pivotLock.Lock() @@ -1125,7 +1164,7 @@ func (d *Downloader) fillHeaderSkeleton(from uint64, skeleton []*types.Header) ( log.Debug("Filling up skeleton", "from", from) d.queue.ScheduleSkeleton(from, skeleton) - err := d.concurrentFetch((*headerQueue)(d)) + err := d.concurrentFetch((*headerQueue)(d), false) if err != nil { log.Debug("Skeleton fill failed", "err", err) } @@ -1139,9 +1178,9 @@ func (d *Downloader) fillHeaderSkeleton(from uint64, skeleton []*types.Header) ( // fetchBodies iteratively downloads the scheduled block bodies, taking any // available peers, reserving a chunk of blocks for each, waiting for delivery // and also periodically checking for timeouts. -func (d *Downloader) fetchBodies(from uint64) error { +func (d *Downloader) fetchBodies(from uint64, beaconMode bool) error { log.Debug("Downloading block bodies", "origin", from) - err := d.concurrentFetch((*bodyQueue)(d)) + err := d.concurrentFetch((*bodyQueue)(d), beaconMode) log.Debug("Block body download terminated", "err", err) return err @@ -1150,9 +1189,9 @@ func (d *Downloader) fetchBodies(from uint64) error { // fetchReceipts iteratively downloads the scheduled block receipts, taking any // available peers, reserving a chunk of receipts for each, waiting for delivery // and also periodically checking for timeouts. -func (d *Downloader) fetchReceipts(from uint64) error { +func (d *Downloader) fetchReceipts(from uint64, beaconMode bool) error { log.Debug("Downloading receipts", "origin", from) - err := d.concurrentFetch((*receiptQueue)(d)) + err := d.concurrentFetch((*receiptQueue)(d), beaconMode) log.Debug("Receipt download terminated", "err", err) return err @@ -1161,7 +1200,7 @@ func (d *Downloader) fetchReceipts(from uint64) error { // processHeaders takes batches of retrieved headers from an input channel and // keeps processing and scheduling them into the header chain and downloader's // queue until the stream ends or a failure occurs. -func (d *Downloader) processHeaders(origin uint64, td *big.Int) error { +func (d *Downloader) processHeaders(origin uint64, td *big.Int, beaconMode bool) error { // Keep a count of uncertain headers to roll back var ( rollback uint64 // Zero means no rollback (fine as you can't unroll the genesis) @@ -1209,35 +1248,40 @@ func (d *Downloader) processHeaders(origin uint64, td *big.Int) error { case <-d.cancelCh: } } - // If no headers were retrieved at all, the peer violated its TD promise that it had a - // better chain compared to ours. The only exception is if its promised blocks were - // already imported by other means (e.g. fetcher): - // - // R , L : Both at block 10 - // R: Mine block 11, and propagate it to L - // L: Queue block 11 for import - // L: Notice that R's head and TD increased compared to ours, start sync - // L: Import of block 11 finishes - // L: Sync begins, and finds common ancestor at 11 - // L: Request new headers up from 11 (R's TD was higher, it must have something) - // R: Nothing to give - if mode != LightSync { - head := d.blockchain.CurrentBlock() - if !gotHeaders && td.Cmp(d.blockchain.GetTd(head.Hash(), head.NumberU64())) > 0 { - return errStallingPeer + // If we're in legacy sync mode, we need to check total difficulty + // violations from malicious peers. That is not needed in beacon + // mode and we can skip to terminating sync. + if !beaconMode { + // If no headers were retrieved at all, the peer violated its TD promise that it had a + // better chain compared to ours. The only exception is if its promised blocks were + // already imported by other means (e.g. fetcher): + // + // R , L : Both at block 10 + // R: Mine block 11, and propagate it to L + // L: Queue block 11 for import + // L: Notice that R's head and TD increased compared to ours, start sync + // L: Import of block 11 finishes + // L: Sync begins, and finds common ancestor at 11 + // L: Request new headers up from 11 (R's TD was higher, it must have something) + // R: Nothing to give + if mode != LightSync { + head := d.blockchain.CurrentBlock() + if !gotHeaders && td.Cmp(d.blockchain.GetTd(head.Hash(), head.NumberU64())) > 0 { + return errStallingPeer + } } - } - // If snap or light syncing, ensure promised headers are indeed delivered. This is - // needed to detect scenarios where an attacker feeds a bad pivot and then bails out - // of delivering the post-pivot blocks that would flag the invalid content. - // - // This check cannot be executed "as is" for full imports, since blocks may still be - // queued for processing when the header download completes. However, as long as the - // peer gave us something useful, we're already happy/progressed (above check). - if mode == SnapSync || mode == LightSync { - head := d.lightchain.CurrentHeader() - if td.Cmp(d.lightchain.GetTd(head.Hash(), head.Number.Uint64())) > 0 { - return errStallingPeer + // If snap or light syncing, ensure promised headers are indeed delivered. This is + // needed to detect scenarios where an attacker feeds a bad pivot and then bails out + // of delivering the post-pivot blocks that would flag the invalid content. + // + // This check cannot be executed "as is" for full imports, since blocks may still be + // queued for processing when the header download completes. However, as long as the + // peer gave us something useful, we're already happy/progressed (above check). + if mode == SnapSync || mode == LightSync { + head := d.lightchain.CurrentHeader() + if td.Cmp(d.lightchain.GetTd(head.Hash(), head.Number.Uint64())) > 0 { + return errStallingPeer + } } } // Disable any rollback and return diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go index e6e092b22f5a..566e0febee23 100644 --- a/eth/downloader/downloader_test.go +++ b/eth/downloader/downloader_test.go @@ -75,7 +75,7 @@ func newTester() *downloadTester { chain: chain, peers: make(map[string]*downloadTesterPeer), } - tester.downloader = New(0, db, trie.NewSyncBloom(1, db), new(event.TypeMux), tester.chain, nil, tester.dropPeer) + tester.downloader = New(0, db, trie.NewSyncBloom(1, db), new(event.TypeMux), tester.chain, nil, tester.dropPeer, nil) return tester } @@ -96,7 +96,7 @@ func (dl *downloadTester) sync(id string, td *big.Int, mode SyncMode) error { td = dl.peers[id].chain.GetTd(head.Hash(), head.NumberU64()) } // Synchronise with the chosen peer and ensure proper cleanup afterwards - err := dl.downloader.synchronise(id, head.Hash(), td, mode) + err := dl.downloader.synchronise(id, head.Hash(), td, mode, false) select { case <-dl.downloader.cancelCh: // Ok, downloader fully cancelled after sync cycle @@ -933,7 +933,7 @@ func testBlockHeaderAttackerDropping(t *testing.T, protocol uint) { // Simulate a synchronisation and check the required result tester.downloader.synchroniseMock = func(string, common.Hash) error { return tt.result } - tester.downloader.Synchronise(id, tester.chain.Genesis().Hash(), big.NewInt(1000), FullSync) + tester.downloader.LegacySync(id, tester.chain.Genesis().Hash(), big.NewInt(1000), FullSync) if _, ok := tester.peers[id]; !ok != tt.drop { t.Errorf("test %d: peer drop mismatch for %v: have %v, want %v", i, tt.result, !ok, tt.drop) } diff --git a/eth/downloader/fetchers_concurrent.go b/eth/downloader/fetchers_concurrent.go index 4bade2b4c3dd..a0aa197175a3 100644 --- a/eth/downloader/fetchers_concurrent.go +++ b/eth/downloader/fetchers_concurrent.go @@ -76,7 +76,7 @@ type typedQueue interface { // concurrentFetch iteratively downloads scheduled block parts, taking available // peers, reserving a chunk of fetch requests for each and waiting for delivery // or timeouts. -func (d *Downloader) concurrentFetch(queue typedQueue) error { +func (d *Downloader) concurrentFetch(queue typedQueue, beaconMode bool) error { // Create a delivery channel to accept responses from all peers responses := make(chan *eth.Response) @@ -127,7 +127,7 @@ func (d *Downloader) concurrentFetch(queue typedQueue) error { finished := false for { // Short circuit if we lost all our peers - if d.peers.Len() == 0 { + if d.peers.Len() == 0 && !beaconMode { return errNoPeers } // If there's nothing more to fetch, wait or terminate @@ -209,7 +209,7 @@ func (d *Downloader) concurrentFetch(queue typedQueue) error { } // Make sure that we have peers available for fetching. If all peers have been tried // and all failed throw an error - if !progressed && !throttled && len(pending) == 0 && len(idles) == d.peers.Len() && queued > 0 { + if !progressed && !throttled && len(pending) == 0 && len(idles) == d.peers.Len() && queued > 0 && !beaconMode { return errPeersUnavailable } } diff --git a/eth/downloader/peer.go b/eth/downloader/peer.go index 324fdb9cd51f..d74d23e74d55 100644 --- a/eth/downloader/peer.go +++ b/eth/downloader/peer.go @@ -294,19 +294,19 @@ func (ps *peerSet) AllPeers() []*peerConnection { // peerCapacitySort implements sort.Interface. // It sorts peer connections by capacity (descending). type peerCapacitySort struct { - p []*peerConnection - tp []int + peers []*peerConnection + caps []int } func (ps *peerCapacitySort) Len() int { - return len(ps.p) + return len(ps.peers) } func (ps *peerCapacitySort) Less(i, j int) bool { - return ps.tp[i] > ps.tp[j] + return ps.caps[i] > ps.caps[j] } func (ps *peerCapacitySort) Swap(i, j int) { - ps.p[i], ps.p[j] = ps.p[j], ps.p[i] - ps.tp[i], ps.tp[j] = ps.tp[j], ps.tp[i] + ps.peers[i], ps.peers[j] = ps.peers[j], ps.peers[i] + ps.caps[i], ps.caps[j] = ps.caps[j], ps.caps[i] } diff --git a/eth/downloader/skeleton.go b/eth/downloader/skeleton.go new file mode 100644 index 000000000000..7c604cbffac7 --- /dev/null +++ b/eth/downloader/skeleton.go @@ -0,0 +1,965 @@ +// Copyright 2021 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package downloader + +import ( + "encoding/json" + "errors" + "math/rand" + "sort" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/rawdb" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/eth/protocols/eth" + "github.com/ethereum/go-ethereum/ethdb" + "github.com/ethereum/go-ethereum/log" +) + +// scratchHeaders is the number of headers to store in a scratch space to allow +// concurrent downloads. A header is about 0.5KB in size, so there is no worry +// about using too much memory. The only catch is that we can only validate gaps +// afer they're linked to the head, so the bigger the scratch space, the larger +// potential for invalid headers. +// +// The current scratch space of 131072 headers is expected to use 64MB RAM. +const scratchHeaders = 131072 + +// requestHeaders is the number of header to request from a remote peer in a single +// network packet. Although the skeleton downloader takes into consideration peer +// capacities when picking idlers, the packet size was decided to remain constant +// since headers are relatively small and it's easier to work with fixed batches +// vs. dynamic interval fillings. +const requestHeaders = 512 + +// errSyncLinked is an internal helper error to signal that the current sync +// cycle linked up to the genesis block, this the skeleton syncer should ping +// the backfiller to resume. Since we already have that logic on sync start, +// piggie-back on that instead of 2 entrypoints. +var errSyncLinked = errors.New("sync linked") + +// errSyncMerged is an internal helper error to signal that the current sync +// cycle merged with a previously aborted subchain, thus the skeleton syncer +// should abort and restart with the new state. +var errSyncMerged = errors.New("sync merged") + +// errSyncReorged is an internal helper error to signal that the head chain of +// the current sync cycle was (partially) reorged, thus the skeleton syncer +// should abort and restart with the new state. +var errSyncReorged = errors.New("sync reorged") + +// errTerminated is returned if the sync mechanism was terminated for this run of +// the process. This is usually the case when Geth is shutting down and some events +// might still be propagating. +var errTerminated = errors.New("terminated") + +func init() { + // Tuning parameters is nice, but the scratch space must be assignable in + // full to peers. It's a useless cornercase to support a dangling half-group. + if scratchHeaders%requestHeaders != 0 { + panic("Please make scratchHeaders divisible by requestHeaders") + } +} + +// subchain is a contiguous header chain segment that is backed by the database, +// but may not be linked to the live chain. The skeleton downloader may produce +// a new one of these every time it is restarted until the subchain grows large +// enough to connect with a previous subchain. +// +// The subchains use the exact same database namespace and are not disjoint from +// each other. As such, extending one to overlap the other entails reducing the +// second one first. This combined buffer model is used to avoid having to move +// data on disk when two subchains are joined together. +type subchain struct { + Head uint64 // Block number of the newest header in the subchain + Tail uint64 // Block number of the oldest header in the subchain + Next common.Hash // Block hash of the next oldest header in the subchain +} + +// skeletonProgress is a database entry to allow suspending and resuming a chain +// sync. As the skeleton header chain is downloaded backwards, restarts can and +// will produce temporarilly disjoint subchains. There is no way to restart a +// suspended skeleton sync without prior knowlege of all prior suspension points. +type skeletonProgress struct { + Subchains []*subchain // Disjoint subchains downloaded until now +} + +// headerRequest tracks a pending header request to ensure responses are to +// actual requests and to validate any security constraints. +// +// Concurrency note: header requests and responses are handled concurrently from +// the main runloop to allow Keccak256 hash verifications on the peer's thread and +// to drop on invalid response. The request struct must contain all the data to +// construct the response without accessing runloop internals (i.e. subchains). +// That is only included to allow the runloop to match a response to the task being +// synced without having yet another set of maps. +type headerRequest struct { + peer string // Peer to which this request is assigned + id uint64 // Request ID of this request + + deliver chan *headerResponse // Channel to deliver successful response on + revert chan *headerRequest // Channel to deliver request failure on + cancel chan struct{} // Channel to track sync cancellation + stale chan struct{} // Channel to signal the request was dropped + + head uint64 // Head number of the requested batch of headers +} + +// headerResponse is an already verified remote response to a header request. +type headerResponse struct { + peer *peerConnection // Peer from which this response originates + reqid uint64 // Request ID that this response fulfils + headers []*types.Header // Chain of headers +} + +// backfiller is a callback interface through which the skeleton sync can tell +// the downloader that it should suspend or resume backfilling on specific head +// events (e.g. suspend on forks or gaps, resume on successfull linkups). +type backfiller interface { + // suspend requests the backfiller to abort any running full or snap sync + // based on the skeleton chain as it might be invalid. The backfiller should + // gracefully handle multiple consecutive suspends without a resume, even + // on initial sartup. + suspend() + + // resume requests the backfiller to start running fill or snap sync based on + // the skeleton chain as it has successfully been linked. Appending new heads + // to the end of the chain will not result in suspend/resume cycles. + resume() +} + +// skeleton represents a header chain synchronized after the Ethereum 2 merge, +// where blocks aren't validated any more via PoW in a forward fashion, rather +// are dictated and extended at the head via the beacon chain and backfilled on +// the original Ethereum 1 block sync protocol. +// +// Since the skeleton is grown backwards from head to genesis, it is handled as +// a separate entity, not mixed in with the logical sequential transition of the +// blocks. Once the skeleton is connected to an existing, validated chain, the +// headers will be moved into the main downloader for filling and execution. +// +// Opposed to the Ethereum 1 block synchronization which is trustless (and uses a +// master peer to minimize the attack surface), Ethereum 2 block synchronization +// starts from a trusted head. As such, there is no need for a master peer any +// more and headers can be requested fully concurrently (though some batches might +// be discarded if they don't link up correctly). +// +// Although a skeleton is part of a sync cycle, it is not recreated, rather stays +// alive throughout the lifetime of the downloader. This allows it to be extended +// concurrently with the sync cycle, since extensions arrive from an API surface, +// not from within (vs. Ethereum 1 sync). +// +// Since the skeleton tracks the entire header chain until it is cosumed by the +// forward block filling, it needs 0.5KB/block storage. At current mainnet sizes +// this is only possible with a disk backend. Since the skeleton is separate from +// the node's header chain, storing the headers ephemerally until sync finishes +// is wasted disk IO, but it's a price we're going to pay to keep things simple +// for now. +type skeleton struct { + db ethdb.Database // Database backing the skeleton + filler backfiller // Chain syncer suspended/resumed by head events + + peers *peerSet // Set of peers we can sync from + idles map[string]*peerConnection // Set of idle peers in the current sync cycle + drop peerDropFn // Drops a peer for misbehaving + + progress *skeletonProgress // Sync progress tracker for resumption and metrics + started time.Time // Timestamp when the skeleton syncer was created + logged time.Time // Timestamp when progress was last logged to the user + pulled uint64 // Number of headers downloaded in this run + + scratchSpace []*types.Header // Scratch space to accumulate headers in (first = recent) + scratchOwners []string // Peer IDs owning chunks of the scratch space (pend or delivered) + scratchHead uint64 // Block number of the first item in the scratch space + + requests map[uint64]*headerRequest // Header requests currently running + + headEvents chan *types.Header // Notification channel for new heads + terminate chan chan error // Termination channel to abort sync + terminated chan struct{} // Channel to signal that the syner is dead + + // Callback hooks used during testing + syncStarting func() // callback triggered after a sync cycle is inited but before started +} + +// newSkeleton creates a new sync skeleton that tracks a potentially dangling +// header chain until it's linked into an existing set of blocks. +func newSkeleton(db ethdb.Database, peers *peerSet, drop peerDropFn, filler backfiller) *skeleton { + sk := &skeleton{ + db: db, + filler: filler, + peers: peers, + drop: drop, + requests: make(map[uint64]*headerRequest), + headEvents: make(chan *types.Header), + terminate: make(chan chan error), + terminated: make(chan struct{}), + } + go sk.startup() + return sk +} + +// startup is an initial background loop which waits for an event to start or +// tear the syncer down. This is required to make the skeleton sync loop once +// per process but at the same time not start before the beacon chain announces +// a new (existing) head. +func (s *skeleton) startup() { + // Close a notification channel so anyone sending us events will know if the + // sync loop was torn down for good. + defer close(s.terminated) + + // Wait for startup or teardown + select { + case errc := <-s.terminate: + // No head was announced but Geth is shutting down + errc <- nil + return + + case head := <-s.headEvents: + // New head announced, start syncing to it, looping every time a current + // cycle is terminated due to a chain event (head reorg, old chain merge) + s.started = time.Now() + + for { + // If the sync cycle terminated or was terminated, propagate up when + // higher layers request termination. There's no fancy explicit error + // signalling as the sync loop should never terminate (TM). + newhead, err := s.sync(head) + switch { + case err == errSyncLinked: + // Sync cycle linked up to the genesis block. Tear down the loop + // and restart it so, it can properly notify the backfiller. Don't + // account a new head. + head = nil + + case err == errSyncMerged: + // Subchains were merged, we just need to reinit the internal + // start to continue on the tail of the merged chain. Don't + // announce a new head, + head = nil + + case err == errSyncReorged: + // The subchain being synced got modified at the head in a + // way that requires resyncing it. Restart sync with the new + // head to force a cleanup. + head = newhead + + case err == errTerminated: + // Sync was requested to be terminated from within, stop and + // return (no need to pass a message, was already done internally) + return + + default: + // Sync either successfully terminated or failed with an unhandled + // error. Abort and wait until Geth requests a termination. + errc := <-s.terminate + errc <- err + return + } + } + } +} + +// Terminate tears down the syncer indefinitely. +func (s *skeleton) Terminate() error { + // Request termination and fetch any errors + errc := make(chan error) + s.terminate <- errc + err := <-errc + + // Wait for full shutdown (not necessary, but cleaner) + <-s.terminated + return err +} + +// Sync starts or resumes a previous sync cycle to download and maintain a reverse +// header chain starting at the head and leading towards genesis to an available +// ancestor. +// +// This method does not block, rather it just waits until the syncer receives the +// fed header. What the syncer does with it is the syncer's problem. +func (s *skeleton) Sync(head *types.Header) error { + log.Trace("New skeleton head announced", "number", head.Number, "hash", head.Hash()) + select { + case s.headEvents <- head: + return nil + case <-s.terminated: + return errTerminated + } +} + +// sync is the internal version of Sync that executes a single sync cycle, either +// until some termination condition is reached, or until the current cycle merges +// with a previously aborted run. +func (s *skeleton) sync(head *types.Header) (*types.Header, error) { + // If we're continuing a previous merge interrupt, just access the existing + // old state without initing from disk. + if head == nil { + head = rawdb.ReadSkeletonHeader(s.db, s.progress.Subchains[0].Head) + } else { + // Otherwise, initialize the sync, trimming and previous leftovers until + // we're consistent with the newly requested chain head + s.initSync(head) + } + // Create the scratch space to fill with concurrently downloaded headers + s.scratchSpace = make([]*types.Header, scratchHeaders) + defer func() { s.scratchSpace = nil }() // don't hold on to references after sync + + s.scratchOwners = make([]string, scratchHeaders/requestHeaders) + defer func() { s.scratchOwners = nil }() // don't hold on to references after sync + + s.scratchHead = s.progress.Subchains[0].Tail - 1 // tail must not be 0! + + // If the sync is already done, resume the backfiller. When the loop stops, + // terminate the backfiller too. + if s.scratchHead == 0 { + s.filler.resume() + } + defer s.filler.suspend() + + // Create a set of unique channels for this sync cycle. We need these to be + // ephemeral so a data race doesn't accidentally deliver something stale on + // a persistent channel across syncs (yup, this happened) + var ( + requestFails = make(chan *headerRequest) + responses = make(chan *headerResponse) + ) + cancel := make(chan struct{}) + defer close(cancel) + + log.Debug("Starting reverse header sync cycle", "head", head.Number, "hash", head.Hash(), "cont", s.scratchHead) + + // Whether sync completed or not, disregard any future packets + defer func() { + log.Debug("Terminating reverse header sync cycle", "head", head.Number, "hash", head.Hash(), "cont", s.scratchHead) + s.requests = make(map[uint64]*headerRequest) + }() + + // Start tracking idle peers for task assignments + peering := make(chan *peeringEvent, 64) // arbitrary buffer, just some burst protection + + peeringSub := s.peers.SubscribeEvents(peering) + defer peeringSub.Unsubscribe() + + s.idles = make(map[string]*peerConnection) + for _, peer := range s.peers.AllPeers() { + s.idles[peer.id] = peer + } + // Nofity any tester listening for startup events + if s.syncStarting != nil { + s.syncStarting() + } + for { + // Something happened, try to assign new tasks to any idle peers + s.assingTasks(responses, requestFails, cancel) + + // Wait for something to happen + select { + case event := <-peering: + // A peer joined or left, the tasks queue and allocations need to be + // checked for potential assignment or reassignment + peerid := event.peer.id + if event.join { + s.idles[peerid] = event.peer + } else { + s.revertRequests(peerid) + delete(s.idles, peerid) + } + + case errc := <-s.terminate: + errc <- nil + return nil, errTerminated + + case head := <-s.headEvents: + // New head was announced, try to integrate it. If successful, nothing + // needs to be done as the head simply extended the last range. For now + // we don't seamlessly integrate reorgs to keep things simple. If the + // network starts doing many mini reorgs, it might be worthwhile handling + // a limited depth without an error. + if reorged := s.processNewHead(head); reorged { + return head, errSyncReorged + } + // New head was integrated into the skeleton chain. If the backfiller + // is still running, it will pick it up. If it already terminated, + // a new cycle needs to be spun up. + if s.scratchHead == 0 { + s.filler.resume() + } + + case req := <-requestFails: + s.revertRequest(req) + + case res := <-responses: + // Process the batch of headers. If though processing we managed to + // link the curret subchain to a previously downloaded one, abort the + // sync and restart with the merged subchains. We could probably hack + // the internal state to switch the scratch space over to the tail of + // the extended subchain, but since the scenario is rare, it's cleaner + // to rely on the restart mechanism than a stateful modification. + if merged := s.processResponse(res); merged { + return nil, errSyncMerged + } + // If we've just reached the genesis block, tear down the sync cycle + // and restart it to resume the backfiller. We could just as well do + // a signalling here, but it's a tad cleaner to have only one entry + // pathway to suspending/resuming it. + return nil, errSyncLinked + } + } +} + +// initSync attempts to get the skeleton sync into a consistent state wrt any +// past state on disk and the newly requested head to sync to. If the new head +// is nil, the method will return and continue from the previous head. +func (s *skeleton) initSync(head *types.Header) { + // Extract the head number, we'll need it all over + number := head.Number.Uint64() + + // Retrieve the previously saved sync progress + if status := rawdb.ReadSkeletonSyncStatus(s.db); len(status) > 0 { + s.progress = new(skeletonProgress) + if err := json.Unmarshal(status, s.progress); err != nil { + log.Error("Failed to decode skeleton sync status", "err", err) + } else { + // Previous sync was available, print some continuation logs + for _, subchain := range s.progress.Subchains { + log.Debug("Restarting skeleton subchain", "head", subchain.Head, "tail", subchain.Tail) + } + // Create a new subchain for the head (unless the last can be extended), + // trimming anything it would overwrite + headchain := &subchain{ + Head: number, + Tail: number, + Next: head.ParentHash, + } + for len(s.progress.Subchains) > 0 { + // If the last chain is above the new head, delete altogether + lastchain := s.progress.Subchains[0] + if lastchain.Tail >= headchain.Tail { + log.Debug("Dropping skeleton subchain", "head", lastchain.Head, "tail", lastchain.Tail) + s.progress.Subchains = s.progress.Subchains[1:] + continue + } + // Otherwise truncate the last chain if needed and abort trimming + if lastchain.Head >= headchain.Tail { + log.Debug("Trimming skeleton subchain", "oldhead", lastchain.Head, "newhead", headchain.Tail-1, "tail", lastchain.Tail) + lastchain.Head = headchain.Tail - 1 + } + break + } + // If the last subchain can be extended, we're lucky. Otherwise create + // a new subchain sync task. + var extended bool + if n := len(s.progress.Subchains); n > 0 { + lastchain := s.progress.Subchains[0] + if lastchain.Head == headchain.Tail-1 { + lasthead := rawdb.ReadSkeletonHeader(s.db, lastchain.Head) + if lasthead.Hash() == head.ParentHash { + log.Debug("Extended skeleton subchain with new head", "head", headchain.Tail, "tail", lastchain.Tail) + lastchain.Head = headchain.Tail + extended = true + } + } + } + if !extended { + log.Debug("Created new skeleton subchain", "head", number, "tail", number) + s.progress.Subchains = append([]*subchain{headchain}, s.progress.Subchains...) + } + // Update the database with the new sync stats and insert the new + // head header. We won't delete any trimmed skeleton headers since + // those will be outside the index space of the many subchains and + // the database space will be reclaimed eventually when processing + // blocks above the current head (TODO(karalabe): don't forget). + batch := s.db.NewBatch() + + rawdb.WriteSkeletonHeader(batch, head) + s.saveSyncStatus(batch) + + if err := batch.Write(); err != nil { + log.Crit("Failed to write skeleton sync status", "err", err) + } + return + } + } + // Either we've failed to decode the previus state, or there was none. Start + // a fresh sync with a single subchain represented by the currently sent + // chain head. + s.progress = &skeletonProgress{ + Subchains: []*subchain{ + { + Head: number, + Tail: number, + Next: head.ParentHash, + }, + }, + } + batch := s.db.NewBatch() + + rawdb.WriteSkeletonHeader(batch, head) + s.saveSyncStatus(batch) + + if err := batch.Write(); err != nil { + log.Crit("Failed to write initial skeleton sync status", "err", err) + } + log.Debug("Created initial skeleton subchain", "head", number, "tail", number) +} + +// saveSyncStatus marshals the remaining sync tasks into leveldb. +func (s *skeleton) saveSyncStatus(db ethdb.KeyValueWriter) { + status, err := json.Marshal(s.progress) + if err != nil { + panic(err) // This can only fail during implementation + } + rawdb.WriteSkeletonSyncStatus(db, status) +} + +// processNewHead does the internal shuffling for a new head marker and either +// accepts and integrates it into the skeleton or requests a reorg. Upon reorg, +// the syncer will tear itself down and restart with a fresh head. It is simpler +// to reconstruct the sync state than to mutate it and hope for the best. +func (s *skeleton) processNewHead(head *types.Header) bool { + // If the header cannot be inserted without interruption, return an error for + // the outer loop to tear down the skeleton sync and restart it + number := head.Number.Uint64() + + lastchain := s.progress.Subchains[0] + if lastchain.Tail >= number { + log.Warn("Beacon chain reorged", "tail", lastchain.Tail, "newHead", number) + return true + } + if lastchain.Head+1 < number { + log.Warn("Beacon chain gapped", "head", lastchain.Head, "newHead", number) + return true + } + if parent := rawdb.ReadSkeletonHeader(s.db, number-1); parent.Hash() != head.ParentHash { + log.Warn("Beacon chain forked", "ancestor", parent.Number, "hash", parent.Hash(), "want", head.ParentHash) + return true + } + // New header seems to be in the last subchain range. Unwind any extra headers + // from the chain tip and insert the new head. We won't delete any trimmed + // skeleton headers since those will be outside the index space of the many + // subchains and the database space will be reclaimed eventually when processing + // blocks above the current head (TODO(karalabe): don't forget). + batch := s.db.NewBatch() + + rawdb.WriteSkeletonHeader(batch, head) + lastchain.Head = number + s.saveSyncStatus(batch) + + if err := batch.Write(); err != nil { + log.Crit("Failed to write skeleton sync status", "err", err) + } + return false +} + +// assingTasks attempts to match idle peers to pending header retrievals. +func (s *skeleton) assingTasks(success chan *headerResponse, fail chan *headerRequest, cancel chan struct{}) { + // Sort the peers by download capacity to use faster ones if many available + idlers := &peerCapacitySort{ + peers: make([]*peerConnection, 0, len(s.idles)), + caps: make([]int, 0, len(s.idles)), + } + targetTTL := s.peers.rates.TargetTimeout() + for _, peer := range s.idles { + idlers.peers = append(idlers.peers, peer) + idlers.caps = append(idlers.caps, s.peers.rates.Capacity(peer.id, eth.BlockHeadersMsg, targetTTL)) + } + if len(idlers.peers) == 0 { + return + } + sort.Sort(idlers) + + // Find header regions not yet downloading and fill them + for task, owner := range s.scratchOwners { + // If we're out of idle peers, stop assigning tasks + if len(idlers.peers) == 0 { + return + } + // Skip any tasks already filling + if owner != "" { + continue + } + // If we've reached the genesis, stop assigning tasks + if uint64(task*requestHeaders) >= s.scratchHead { + return + } + // Found a task and have peers available, assign it + idle := idlers.peers[0] + + idlers.peers = idlers.peers[1:] + idlers.caps = idlers.caps[1:] + + // Matched a pending task to an idle peer, allocate a unique request id + var reqid uint64 + for { + reqid = uint64(rand.Int63()) + if reqid == 0 { + continue + } + if _, ok := s.requests[reqid]; ok { + continue + } + break + } + // Generate the network query and send it to the peer + req := &headerRequest{ + peer: idle.id, + id: reqid, + deliver: success, + revert: fail, + cancel: cancel, + stale: make(chan struct{}), + head: s.scratchHead - uint64(task*requestHeaders), + } + s.requests[reqid] = req + delete(s.idles, idle.id) + + // Generate the network query and send it to the peer + go s.executeTask(idle, req) + + // Inject the request into the task to block further assignments + s.scratchOwners[task] = idle.id + } +} + +// executeTask executes a single fetch request, blocking until either a result +// arrives or a timeouts / cancellation is triggered. The method should be run +// on its own goroutine and will deliver on the requested channels. +func (s *skeleton) executeTask(peer *peerConnection, req *headerRequest) { + start := time.Now() + resCh := make(chan *eth.Response) + + // Figure out how many headers to fetch. Usually this will be a full batch, + // but for the very tail of the chain, trim the request to the number left. + // Since nodes may or may not return the genesis header for a batch request, + // don't even request it. The parent hash of block #1 is enough to link. + requestCount := requestHeaders + if req.head < requestHeaders { + requestCount = int(req.head) + } + peer.log.Trace("Fetching skeleton headers", "from", req.head, "count", requestCount) + netreq, err := peer.peer.RequestHeadersByNumber(req.head, requestCount, 0, true, resCh) + if err != nil { + peer.log.Trace("Failed to request headers", "err", err) + s.scheduleRevertRequest(req) + return + } + defer netreq.Close() + + // Wait until the response arrives, the request is cancelled or times out + ttl := s.peers.rates.TargetTimeout() + + timeoutTimer := time.NewTimer(ttl) + defer timeoutTimer.Stop() + + select { + case <-req.cancel: + peer.log.Debug("Header request cancelled") + s.scheduleRevertRequest(req) + + case <-timeoutTimer.C: + // Header retrieval timed out, update the metrics + peer.log.Trace("Header request timed out", "elapsed", ttl) + headerTimeoutMeter.Mark(1) + s.peers.rates.Update(peer.id, eth.BlockHeadersMsg, 0, 0) + s.scheduleRevertRequest(req) + + case res := <-resCh: + // Headers successfully retrieved, update the metrics + headers := *res.Res.(*eth.BlockHeadersPacket) + + headerReqTimer.Update(time.Since(start)) + s.peers.rates.Update(peer.id, eth.BlockHeadersMsg, res.Time, len(headers)) + + // Cross validate the headers with the requests + switch { + case len(headers) == 0: + // No headers were delivered, reject the response and reschedule + peer.log.Debug("No headers delivered") + res.Done <- errors.New("no headers delivered") + s.scheduleRevertRequest(req) + + case headers[0].Number.Uint64() != req.head: + // Header batch anchored at non-requested number + peer.log.Debug("Invalid header response head", "have", headers[0].Number, "want", req.head) + res.Done <- errors.New("invalid header batch anchor") + s.scheduleRevertRequest(req) + + case headers[0].Number.Uint64() >= requestHeaders && len(headers) != requestHeaders: + // Invalid number of non-genesis headers delivered, reject the response and reschedule + peer.log.Debug("Invalid non-genesis header count", "have", len(headers), "want", requestHeaders) + res.Done <- errors.New("not enough non-genesis headers delivered") + s.scheduleRevertRequest(req) + + case headers[0].Number.Uint64() < requestHeaders && uint64(len(headers)) != headers[0].Number.Uint64(): + // Invalid number of genesis headers delivered, reject the response and reschedule + peer.log.Debug("Invalid genesis header count", "have", len(headers), "want", headers[0].Number.Uint64()) + res.Done <- errors.New("not enough genesis headers delivered") + s.scheduleRevertRequest(req) + + default: + // Packet seems structurally valid, check hash progression and if it + // is correct too, deliver for storage + for i := 0; i < len(headers)-1; i++ { + if headers[i].ParentHash != headers[i+1].Hash() { + peer.log.Debug("Invalid genesis header count", "have", len(headers), "want", headers[0].Number.Uint64()) + res.Done <- errors.New("not enough genesis headers delivered") + s.scheduleRevertRequest(req) + return + } + } + // Hash chain is valid. The delivery might still be junk as we're + // downloading batches concurrently (so no way to link the headers + // until gaps are filled); in that case, we'll nuke the peer when + // we detect the fault. + res.Done <- nil + + select { + case req.deliver <- &headerResponse{ + peer: peer, + reqid: req.id, + headers: headers, + }: + case <-req.cancel: + } + } + } +} + +// revertRequests locates all the currently pending reuqests from a particular +// peer and reverts them, rescheduling for others to fulfill. +func (s *skeleton) revertRequests(peer string) { + // Gather the requests first, revertals need the lock too + var requests []*headerRequest + for _, req := range s.requests { + if req.peer == peer { + requests = append(requests, req) + } + } + // Revert all the requests matching the peer + for _, req := range requests { + s.revertRequest(req) + } +} + +// scheduleRevertRequest asks the event loop to clean up a request and return +// all failed retrieval tasks to the scheduler for reassignment. +func (s *skeleton) scheduleRevertRequest(req *headerRequest) { + select { + case req.revert <- req: + // Sync event loop notified + case <-req.cancel: + // Sync cycle got cancelled + case <-req.stale: + // Request already reverted + } +} + +// revertRequest cleans up a request and returns all failed retrieval tasks to +// the scheduler for reassignment. +// +// Note, this needs to run on the event runloop thread to reschedule to idle peers. +// On peer threads, use scheduleRevertRequest. +func (s *skeleton) revertRequest(req *headerRequest) { + log.Trace("Reverting header request", "peer", req.peer, "reqid", req.id) + select { + case <-req.stale: + log.Trace("Header request already reverted", "peer", req.peer, "reqid", req.id) + return + default: + } + close(req.stale) + + // Remove the request from the tracked set + delete(s.requests, req.id) + + // Remove the request from the tracked set and mark the task as not-pending, + // ready for resheduling + s.scratchOwners[(s.scratchHead-req.head)/requestHeaders] = "" +} + +func (s *skeleton) processResponse(res *headerResponse) bool { + res.peer.log.Trace("Processing header response", "head", res.headers[0].Number, "hash", res.headers[0].Hash(), "count", len(res.headers)) + + // Whether or not the response is valid, we can mark the peer as idle and + // notify the scheduler to assign a new task. If the response is invalid, + // we'll drop the peer in a bit. + s.idles[res.peer.id] = res.peer + + // Ensure the response is for a valid request + if _, ok := s.requests[res.reqid]; !ok { + // Request stale, perhaps the peer timed out but came through in the end + res.peer.log.Warn("Unexpected header packet") + return false + } + delete(s.requests, res.reqid) + + // Insert the delivered headers into the scratch space independent of the + // content or continuation; those will be validated in a moment + head := res.headers[0].Number.Uint64() + copy(s.scratchSpace[s.scratchHead-head:], res.headers) + + // If there's still a gap in the head of the scratch space, abort + if s.scratchSpace[0] == nil { + return false + } + // Try to consume any head headers, validating the boundary conditions + var merged bool // Whether subchains were merged + + batch := s.db.NewBatch() + for s.scratchSpace[0] != nil { + // Next batch of headers available, cross-reference with the subchain + // we are extending and either accept or discard + if s.progress.Subchains[0].Next != s.scratchSpace[0].Hash() { + // Print a log messages to track what's going on + tail := s.progress.Subchains[0].Tail + want := s.progress.Subchains[0].Next + have := s.scratchSpace[0].Hash() + + log.Warn("Invalid skeleton headers", "peer", s.scratchOwners[0], "number", tail-1, "want", want, "have", have) + + // The peer delivered junk, or at least not the subchain we are + // syncing to. Free up the scratch space and assignment, reassign + // and drop the original peer. + for i := 0; i < requestHeaders; i++ { + s.scratchSpace[i] = nil + } + s.drop(s.scratchOwners[0]) + s.scratchOwners[0] = "" + break + } + // Scratch delivery matches required subchain, deliver the batch of + // headers and push the subchain forward + var consumed int + for _, header := range s.scratchSpace[:requestHeaders] { + if header != nil { // nil when the genesis is reached + consumed++ + + rawdb.WriteSkeletonHeader(batch, header) + s.pulled++ + + s.progress.Subchains[0].Tail-- + s.progress.Subchains[0].Next = header.ParentHash + } + } + // Batch of headers consumed, shift the download window forward + head := s.progress.Subchains[0].Head + tail := s.progress.Subchains[0].Tail + next := s.progress.Subchains[0].Next + + log.Trace("Primary subchain extended", "head", head, "tail", tail, "next", next) + + copy(s.scratchSpace, s.scratchSpace[requestHeaders:]) + for i := 0; i < requestHeaders; i++ { + s.scratchSpace[scratchHeaders-i-1] = nil + } + copy(s.scratchOwners, s.scratchOwners[1:]) + s.scratchOwners[scratchHeaders/requestHeaders-1] = "" + + s.scratchHead -= uint64(consumed) + + // If the subchain extended into the next subchain, we need to handle + // the overlap. Since there could be many overlaps (come on), do this + // in a loop. + for len(s.progress.Subchains) > 1 && s.progress.Subchains[1].Head >= s.progress.Subchains[0].Tail { + // Extract some stats from the second subchain + head := s.progress.Subchains[1].Head + tail := s.progress.Subchains[1].Tail + next := s.progress.Subchains[1].Next + + // Since we just overwrote part of the next subchain, we need to trim + // its head independent of matching or mismatching content + if s.progress.Subchains[1].Tail >= s.progress.Subchains[0].Tail { + // Fully overwritten, get rid of the subchain as a whole + log.Debug("Previous subchain fully overwritten", "head", head, "tail", tail, "next", next) + s.progress.Subchains = append(s.progress.Subchains[:1], s.progress.Subchains[2:]...) + continue + } else { + // Partially overwritten, trim the head to the overwritten size + log.Debug("Previous subchain partially overwritten", "head", head, "tail", tail, "next", next) + s.progress.Subchains[1].Head = s.progress.Subchains[0].Tail - 1 + } + // If the old subchain is an extension of the new one, merge the two + // and let the skeleton syncer restart (to clean internal state) + if rawdb.ReadSkeletonHeader(s.db, s.progress.Subchains[1].Head).Hash() == s.progress.Subchains[0].Next { + log.Debug("Previous subchain merged", "head", head, "tail", tail, "next", next) + s.progress.Subchains[0].Tail = s.progress.Subchains[1].Tail + s.progress.Subchains[0].Next = s.progress.Subchains[1].Next + + s.progress.Subchains = append(s.progress.Subchains[:1], s.progress.Subchains[2:]...) + merged = true + } + } + } + s.saveSyncStatus(batch) + if err := batch.Write(); err != nil { + log.Crit("Failed to write skeleton headers and progress", "err", err) + } + // Print a progress report to make the UX a bit nicer + left := s.progress.Subchains[0].Tail - 1 + if time.Since(s.logged) > 8*time.Second || left == 0 { + s.logged = time.Now() + + if s.pulled == 0 { + log.Info("Beacon sync starting", "left", left) + } else { + eta := float64(time.Since(s.started)) / float64(s.pulled) * float64(left) + log.Info("Syncing beacon headers", "downloaded", s.pulled, "left", left, "eta", common.PrettyDuration(eta)) + } + } + return merged +} + +// Head retrieves the current head tracked by the skeleton syncer. This method +// is meant to be used by the backfiller, whose life cycle is controlled by the +// skeleton syncer. +// +// Note, the method will not use the internal state of the skeleton, but will +// rather blindly pull stuff from the database. This is fine, because the back- +// filler will only run when the skeleton chain is fully downloaded and stable. +// There might be new heads appended, but those are stomic from the perspective +// of this method. Any head reorg will first tear down the backfiller and only +// then make the modification. +func (s *skeleton) Head() (*types.Header, error) { + // Read the current sync progress from disk and figure out the current head. + // Although there's a lot of error handling here, these are mostly as sanity + // checks to avoid crashing if a programming error happens. These should not + // happen in live code. + status := rawdb.ReadSkeletonSyncStatus(s.db) + if len(status) == 0 { + return nil, errors.New("beacon sync not yet started") + } + s.progress = new(skeletonProgress) + if err := json.Unmarshal(status, s.progress); err != nil { + return nil, err + } + if s.progress.Subchains[0].Tail != 1 { + return nil, errors.New("beacon sync not yet finished") + } + return rawdb.ReadSkeletonHeader(s.db, s.progress.Subchains[0].Head), nil +} + +// Header retrieves a specific header tracked by the skeleton syncer. This method +// is meant to be used by the backfiller, whose life cycle is controlled by the +// skeleton syncer. +// +// Note, outside the permitted runtimes, this method might return nil results and +// subsequent calls might return headers from different chains. +func (s *skeleton) Header(number uint64) *types.Header { + return rawdb.ReadSkeletonHeader(s.db, number) +} diff --git a/eth/downloader/skeleton_test.go b/eth/downloader/skeleton_test.go new file mode 100644 index 000000000000..7c2e07a43866 --- /dev/null +++ b/eth/downloader/skeleton_test.go @@ -0,0 +1,257 @@ +// Copyright 2021 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package downloader + +import ( + "encoding/json" + "math/big" + "os" + "testing" + + "github.com/ethereum/go-ethereum/core/rawdb" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/log" +) + +// hookedBackfiller is a tester backfiller with all interface methods mocked and +// hooked so tests can implement only the things they need. +type hookedBackfiller struct { + // suspendHook is an optional hook to be called when the filler is requested + // to be suspended. + suspendHook func() + + // resumeHook is an optional hook to be called when the filler is requested + // to be resumed. + resumeHook func() +} + +// suspend requests the backfiller to abort any running full or snap sync +// based on the skeleton chain as it might be invalid. The backfiller should +// gracefully handle multiple consecutive suspends without a resume, even +// on initial sartup. +func (hf *hookedBackfiller) suspend() { + if hf.suspendHook != nil { + hf.suspendHook() + } +} + +// resume requests the backfiller to start running fill or snap sync based on +// the skeleton chain as it has successfully been linked. Appending new heads +// to the end of the chain will not result in suspend/resume cycles. +func (hf *hookedBackfiller) resume() { + if hf.resumeHook != nil { + hf.resumeHook() + } +} + +// newNoopBackfiller creates a hooked backfiller with all callbacks disabled, +// essentially acting as a noop. +func newNoopBackfiller() backfiller { + return new(hookedBackfiller) +} + +// Tests various sync initialzations based on previous leftovers in the database +// and announced heads. +func TestSkeletonSyncInit(t *testing.T) { + log.Root().SetHandler(log.LvlFilterHandler(log.LvlTrace, log.StreamHandler(os.Stderr, log.TerminalFormat(true)))) + + // Create a few key headers + var ( + genesis = &types.Header{Number: big.NewInt(0)} + block49 = &types.Header{Number: big.NewInt(49)} + block49B = &types.Header{Number: big.NewInt(49), Extra: []byte("B")} + block50 = &types.Header{Number: big.NewInt(50), ParentHash: block49.Hash()} + ) + tests := []struct { + headers []*types.Header // Database content (beside the genesis) + oldstate []*subchain // Old sync state with various interrupted subchains + head *types.Header // New head header to announce to reorg to + newstate []*subchain // Expected sync state after the reorg + }{ + // Completely empty database with only the genesis set. The sync is expected + // to create a single subchain with the requested head. + { + head: block50, + newstate: []*subchain{{Head: 50, Tail: 50}}, + }, + // Empty database with only the genesis set with a leftover empty sync + // progess. This is a synthetic case, just for the sake of covering things. + { + oldstate: []*subchain{}, + head: block50, + newstate: []*subchain{{Head: 50, Tail: 50}}, + }, + // A single leftover subchain is present, older than the new head. The + // old subchain should be left as is and a new one appended to the sync + // status. + { + oldstate: []*subchain{{Head: 10, Tail: 5}}, + head: block50, + newstate: []*subchain{ + {Head: 50, Tail: 50}, + {Head: 10, Tail: 5}, + }, + }, + // Multiple leftover subchains are present, older than the new head. The + // old subchains should be left as is and a new one appended to the sync + // status. + { + oldstate: []*subchain{ + {Head: 20, Tail: 15}, + {Head: 10, Tail: 5}, + }, + head: block50, + newstate: []*subchain{ + {Head: 50, Tail: 50}, + {Head: 20, Tail: 15}, + {Head: 10, Tail: 5}, + }, + }, + // A single leftover subchain is present, newer than the new head. The + // newer subchain should be deleted and a fresh one created for the head. + { + oldstate: []*subchain{{Head: 65, Tail: 60}}, + head: block50, + newstate: []*subchain{{Head: 50, Tail: 50}}, + }, + // Multiple leftover subchain is present, newer than the new head. The + // newer subchains should be deleted and a fresh one created for the head. + { + oldstate: []*subchain{ + {Head: 75, Tail: 70}, + {Head: 65, Tail: 60}, + }, + head: block50, + newstate: []*subchain{{Head: 50, Tail: 50}}, + }, + + // Two leftover subchains are present, one fully older and one fully + // newer than the announced head. The head should delete the newer one, + // keeping the older one. + { + oldstate: []*subchain{ + {Head: 65, Tail: 60}, + {Head: 10, Tail: 5}, + }, + head: block50, + newstate: []*subchain{ + {Head: 50, Tail: 50}, + {Head: 10, Tail: 5}, + }, + }, + // Multiple leftover subchains are present, some fully older and some + // fully newer than the announced head. The head should delete the newer + // ones, keeping the older ones. + { + oldstate: []*subchain{ + {Head: 75, Tail: 70}, + {Head: 65, Tail: 60}, + {Head: 20, Tail: 15}, + {Head: 10, Tail: 5}, + }, + head: block50, + newstate: []*subchain{ + {Head: 50, Tail: 50}, + {Head: 20, Tail: 15}, + {Head: 10, Tail: 5}, + }, + }, + // A single leftover subchain is present and the new head is extending + // it with one more header. We expect the subchain head to be pushed + // forward. + { + headers: []*types.Header{block49}, + oldstate: []*subchain{{Head: 49, Tail: 5}}, + head: block50, + newstate: []*subchain{{Head: 50, Tail: 5}}, + }, + // A single leftover subchain is present and although the new head does + // extend it number wise, the hash chain does not link up. We expect a + // new subchain to be created for the dangling head. + { + headers: []*types.Header{block49B}, + oldstate: []*subchain{{Head: 49, Tail: 5}}, + head: block50, + newstate: []*subchain{ + {Head: 50, Tail: 50}, + {Head: 49, Tail: 5}, + }, + }, + // A single leftover subchain is present. A new head is announced that + // links into the middle of it, correctly anchoring into an existing + // header. We expect the old subchain to be truncated and extended with + // the new head. + { + headers: []*types.Header{block49}, + oldstate: []*subchain{{Head: 100, Tail: 5}}, + head: block50, + newstate: []*subchain{{Head: 50, Tail: 5}}, + }, + // A single leftover subchain is present. A new head is announced that + // links into the middle of it, but does not anchor into an existing + // header. We expect the old subchain to be truncated and a new chain + // be created for the dangling head. + { + headers: []*types.Header{block49B}, + oldstate: []*subchain{{Head: 100, Tail: 5}}, + head: block50, + newstate: []*subchain{ + {Head: 50, Tail: 50}, + {Head: 49, Tail: 5}, + }, + }, + } + for i, tt := range tests { + // Create a fresh database and initialize it with the starting state + db := rawdb.NewMemoryDatabase() + + rawdb.WriteHeader(db, genesis) + for _, header := range tt.headers { + rawdb.WriteSkeletonHeader(db, header) + } + if tt.oldstate != nil { + blob, _ := json.Marshal(&skeletonProgress{Subchains: tt.oldstate}) + rawdb.WriteSkeletonSyncStatus(db, blob) + } + // Create a skeleton sync and run a cycle + wait := make(chan struct{}) + + skeleton := newSkeleton(db, newPeerSet(), func(string) {}, newNoopBackfiller()) + skeleton.syncStarting = func() { close(wait) } + skeleton.Sync(tt.head) + + <-wait + skeleton.Terminate() + + // Ensure the correct resulting sync status + var progress skeletonProgress + json.Unmarshal(rawdb.ReadSkeletonSyncStatus(db), &progress) + + if len(progress.Subchains) != len(tt.newstate) { + t.Errorf("test %d: subchain count mismatch: have %d, want %d", i, len(progress.Subchains), len(tt.newstate)) + continue + } + for j := 0; j < len(progress.Subchains); j++ { + if progress.Subchains[j].Head != tt.newstate[j].Head { + t.Errorf("test %d: subchain %d head mismatch: have %d, want %d", i, j, progress.Subchains[j].Head, tt.newstate[j].Head) + } + if progress.Subchains[j].Tail != tt.newstate[j].Tail { + t.Errorf("test %d: subchain %d tail mismatch: have %d, want %d", i, j, progress.Subchains[j].Tail, tt.newstate[j].Tail) + } + } + } +} diff --git a/eth/handler.go b/eth/handler.go index 26a90495b2ef..829092518569 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -173,6 +173,26 @@ func newHandler(config *handlerConfig) (*handler, error) { h.checkpointNumber = (config.Checkpoint.SectionIndex+1)*params.CHTFrequency - 1 h.checkpointHash = config.Checkpoint.SectionHead } + // If sync succeeds, pass a callback to potentially disable snap sync mode + // and enable transaction propagation. + success := func() { + // If we were running snap sync and it finished, disable doing another + // round on next sync cycle + if atomic.LoadUint32(&h.snapSync) == 1 { + log.Info("Snap sync complete, auto disabling") + atomic.StoreUint32(&h.snapSync, 0) + } + // If we've successfully finished a sync cycle and passed any required + // checkpoint, enable accepting transactions from the network + head := h.chain.CurrentBlock() + if head.NumberU64() >= h.checkpointNumber { + // Checkpoint passed, sanity check the timestamp to have a fallback mechanism + // for non-checkpointed (number = 0) private networks. + if head.Time() >= uint64(time.Now().AddDate(0, -1, 0).Unix()) { + atomic.StoreUint32(&h.acceptTxs, 1) + } + } + } // Construct the downloader (long sync) and its backing state bloom if snap // sync is requested. The downloader is responsible for deallocating the state // bloom when it's done. @@ -183,7 +203,7 @@ func newHandler(config *handlerConfig) (*handler, error) { if atomic.LoadUint32(&h.snapSync) == 1 && atomic.LoadUint32(&h.snapSync) == 0 { h.stateBloom = trie.NewSyncBloom(config.BloomCache, config.Database) } - h.downloader = downloader.New(h.checkpointNumber, config.Database, h.stateBloom, h.eventMux, h.chain, nil, h.removePeer) + h.downloader = downloader.New(h.checkpointNumber, config.Database, h.stateBloom, h.eventMux, h.chain, nil, h.removePeer, success) // Construct the fetcher (short sync) validator := func(header *types.Header) error { diff --git a/eth/sync.go b/eth/sync.go index b8ac67d3b2d1..85cdad10a4dd 100644 --- a/eth/sync.go +++ b/eth/sync.go @@ -227,7 +227,7 @@ func (h *handler) doSync(op *chainSyncOp) error { } } // Run the sync cycle, and disable snap sync if we're past the pivot block - err := h.downloader.Synchronise(op.peer.ID(), op.head, op.td, op.mode) + err := h.downloader.LegacySync(op.peer.ID(), op.head, op.td, op.mode) if err != nil { return err } diff --git a/eth/tracers/js/tracer.go b/eth/tracers/js/tracer.go index b8e035e6f3f4..e77fc01e868e 100644 --- a/eth/tracers/js/tracer.go +++ b/eth/tracers/js/tracer.go @@ -697,7 +697,7 @@ func (jst *jsTracer) CaptureStart(env *vm.EVM, from common.Address, to common.Ad jst.ctx["block"] = env.Context.BlockNumber.Uint64() jst.dbWrapper.db = env.StateDB // Update list of precompiles based on current block - rules := env.ChainConfig().Rules(env.Context.BlockNumber) + rules := env.ChainConfig().Rules(env.Context.BlockNumber, false) jst.activePrecompiles = vm.ActivePrecompiles(rules) // Compute intrinsic gas diff --git a/eth/tracers/native/4byte.go b/eth/tracers/native/4byte.go index e60e82de479a..2624ec403ee2 100644 --- a/eth/tracers/native/4byte.go +++ b/eth/tracers/native/4byte.go @@ -83,7 +83,7 @@ func (t *fourByteTracer) CaptureStart(env *vm.EVM, from common.Address, to commo t.env = env // Update list of precompiles based on current block - rules := env.ChainConfig().Rules(env.Context.BlockNumber) + rules := env.ChainConfig().Rules(env.Context.BlockNumber, false) t.activePrecompiles = vm.ActivePrecompiles(rules) // Save the outer calldata also diff --git a/internal/ethapi/api.go b/internal/ethapi/api.go index 65e34752bf41..acb3f8b2a0dd 100644 --- a/internal/ethapi/api.go +++ b/internal/ethapi/api.go @@ -1433,7 +1433,7 @@ func AccessList(ctx context.Context, b Backend, blockNrOrHash rpc.BlockNumberOrH to = crypto.CreateAddress(args.from(), uint64(*args.Nonce)) } // Retrieve the precompiles since they don't need to be added to the access list - precompiles := vm.ActivePrecompiles(b.ChainConfig().Rules(header.Number)) + precompiles := vm.ActivePrecompiles(b.ChainConfig().Rules(header.Number, header.Difficulty == nil)) // Create an initial tracer prevTracer := logger.NewAccessListTracer(nil, args.from(), to, precompiles) diff --git a/internal/web3ext/web3ext.go b/internal/web3ext/web3ext.go index c4bdbaeb8d20..acc38dcd40ee 100644 --- a/internal/web3ext/web3ext.go +++ b/internal/web3ext/web3ext.go @@ -152,6 +152,11 @@ web3._extend({ call: 'admin_importChain', params: 1 }), + new web3._extend.Method({ + name: 'newHead', + call: 'admin_newHead', + params: 1 + }), new web3._extend.Method({ name: 'sleepBlocks', call: 'admin_sleepBlocks', diff --git a/miner/stress/beacon/main.go b/miner/stress/beacon/main.go index dc5c229410cf..70005e20dbe9 100644 --- a/miner/stress/beacon/main.go +++ b/miner/stress/beacon/main.go @@ -30,7 +30,6 @@ import ( "github.com/ethereum/go-ethereum/accounts/keystore" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/fdlimit" - "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/consensus/ethash" "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/types" @@ -138,25 +137,30 @@ func newNode(typ nodetype, genesis *core.Genesis, enodes []*enode.Node) *ethNode } } -func (n *ethNode) assembleBlock(parentHash common.Hash, parentTimestamp uint64) (*catalyst.ExecutableData, error) { +func (n *ethNode) assembleBlock(parentHash common.Hash, parentTimestamp uint64) (*catalyst.ExecutableDataV1, error) { if n.typ != eth2MiningNode { return nil, errors.New("invalid node type") } - payload, err := n.api.PreparePayload(catalyst.AssembleBlockParams{ - ParentHash: parentHash, - Timestamp: uint64(time.Now().Unix()), - }) + payloadAttribute := catalyst.PayloadAttributesV1{ + Timestamp: uint64(time.Now().Unix()), + } + fcState := catalyst.ForkchoiceStateV1{ + HeadBlockHash: parentHash, + SafeBlockHash: common.Hash{}, + FinalizedBlockHash: common.Hash{}, + } + payload, err := n.api.ForkchoiceUpdatedV1(fcState, &payloadAttribute) if err != nil { return nil, err } - return n.api.GetPayload(hexutil.Uint64(payload.PayloadID)) + return n.api.GetPayloadV1(*payload.PayloadID) } -func (n *ethNode) insertBlock(eb catalyst.ExecutableData) error { +func (n *ethNode) insertBlock(eb catalyst.ExecutableDataV1) error { if !eth2types(n.typ) { return errors.New("invalid node type") } - newResp, err := n.api.ExecutePayload(eb) + newResp, err := n.api.ExecutePayloadV1(eb) if err != nil { return err } else if newResp.Status != "VALID" { @@ -165,7 +169,7 @@ func (n *ethNode) insertBlock(eb catalyst.ExecutableData) error { return nil } -func (n *ethNode) insertBlockAndSetHead(parent *types.Header, ed catalyst.ExecutableData) error { +func (n *ethNode) insertBlockAndSetHead(parent *types.Header, ed catalyst.ExecutableDataV1) error { if !eth2types(n.typ) { return errors.New("invalid node type") } @@ -176,7 +180,12 @@ func (n *ethNode) insertBlockAndSetHead(parent *types.Header, ed catalyst.Execut if err != nil { return err } - if err := n.api.ConsensusValidated(catalyst.ConsensusValidatedParams{BlockHash: block.Hash(), Status: "VALID"}); err != nil { + fcState := catalyst.ForkchoiceStateV1{ + HeadBlockHash: block.ParentHash(), + SafeBlockHash: common.Hash{}, + FinalizedBlockHash: common.Hash{}, + } + if _, err := n.api.ForkchoiceUpdatedV1(fcState, nil); err != nil { return err } return nil @@ -275,7 +284,12 @@ func (mgr *nodeManager) run() { nodes = append(nodes, mgr.getNodes(eth2NormalNode)...) nodes = append(nodes, mgr.getNodes(eth2LightClient)...) for _, node := range append(nodes) { - node.api.ConsensusValidated(catalyst.ConsensusValidatedParams{BlockHash: oldest.Hash(), Status: catalyst.VALID.Status}) + fcState := catalyst.ForkchoiceStateV1{ + HeadBlockHash: oldest.Hash(), + SafeBlockHash: common.Hash{}, + FinalizedBlockHash: common.Hash{}, + } + node.api.ForkchoiceUpdatedV1(fcState, nil) } log.Info("Finalised eth2 block", "number", oldest.NumberU64(), "hash", oldest.Hash()) waitFinalise = waitFinalise[1:] diff --git a/miner/worker.go b/miner/worker.go index 54932a474deb..041c03af8029 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -1040,7 +1040,13 @@ func (w *worker) commit(uncles []*types.Header, interval func(), update bool, st return err } - if w.isRunning() && !w.merger.TDDReached() { + // If we're post merge, just ignore + td, ttd := w.chain.GetTd(block.ParentHash(), block.NumberU64()-1), w.chain.Config().TerminalTotalDifficulty + if td != nil && ttd != nil && td.Cmp(ttd) >= 0 { + return nil + } + + if w.isRunning() { if interval != nil { interval() } diff --git a/params/config.go b/params/config.go index f767c1c4b92b..eaff7d0af29a 100644 --- a/params/config.go +++ b/params/config.go @@ -267,7 +267,7 @@ var ( AllCliqueProtocolChanges = &ChainConfig{big.NewInt(1337), big.NewInt(0), nil, false, big.NewInt(0), common.Hash{}, big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), nil, nil, nil, &CliqueConfig{Period: 0, Epoch: 30000}} TestChainConfig = &ChainConfig{big.NewInt(1), big.NewInt(0), nil, false, big.NewInt(0), common.Hash{}, big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), big.NewInt(0), nil, new(EthashConfig), nil} - TestRules = TestChainConfig.Rules(new(big.Int)) + TestRules = TestChainConfig.Rules(new(big.Int), false) ) // TrustedCheckpoint represents a set of post-processed trie roots (CHT and @@ -661,11 +661,12 @@ type Rules struct { ChainID *big.Int IsHomestead, IsEIP150, IsEIP155, IsEIP158 bool IsByzantium, IsConstantinople, IsPetersburg, IsIstanbul bool - IsBerlin, IsLondon bool + IsBerlin, IsLondon, IsMerge bool } // Rules ensures c's ChainID is not nil. -func (c *ChainConfig) Rules(num *big.Int) Rules { +func (c *ChainConfig) Rules(num *big.Int, isMerge bool) Rules { + // TODO (MariusVanDerWijden) replace isMerge after the merge once we know the merge block chainID := c.ChainID if chainID == nil { chainID = new(big.Int) @@ -682,5 +683,6 @@ func (c *ChainConfig) Rules(num *big.Int) Rules { IsIstanbul: c.IsIstanbul(num), IsBerlin: c.IsBerlin(num), IsLondon: c.IsLondon(num), + IsMerge: isMerge, } } diff --git a/tests/gen_stenv.go b/tests/gen_stenv.go index ecf7af850382..29fbce121385 100644 --- a/tests/gen_stenv.go +++ b/tests/gen_stenv.go @@ -17,7 +17,8 @@ var _ = (*stEnvMarshaling)(nil) func (s stEnv) MarshalJSON() ([]byte, error) { type stEnv struct { Coinbase common.UnprefixedAddress `json:"currentCoinbase" gencodec:"required"` - Difficulty *math.HexOrDecimal256 `json:"currentDifficulty" gencodec:"required"` + Difficulty *math.HexOrDecimal256 `json:"currentDifficulty" gencodec:"optional"` + Random *math.HexOrDecimal256 `json:"currentRandom" gencodec:"optional"` GasLimit math.HexOrDecimal64 `json:"currentGasLimit" gencodec:"required"` Number math.HexOrDecimal64 `json:"currentNumber" gencodec:"required"` Timestamp math.HexOrDecimal64 `json:"currentTimestamp" gencodec:"required"` @@ -26,6 +27,7 @@ func (s stEnv) MarshalJSON() ([]byte, error) { var enc stEnv enc.Coinbase = common.UnprefixedAddress(s.Coinbase) enc.Difficulty = (*math.HexOrDecimal256)(s.Difficulty) + enc.Random = (*math.HexOrDecimal256)(s.Random) enc.GasLimit = math.HexOrDecimal64(s.GasLimit) enc.Number = math.HexOrDecimal64(s.Number) enc.Timestamp = math.HexOrDecimal64(s.Timestamp) @@ -37,7 +39,8 @@ func (s stEnv) MarshalJSON() ([]byte, error) { func (s *stEnv) UnmarshalJSON(input []byte) error { type stEnv struct { Coinbase *common.UnprefixedAddress `json:"currentCoinbase" gencodec:"required"` - Difficulty *math.HexOrDecimal256 `json:"currentDifficulty" gencodec:"required"` + Difficulty *math.HexOrDecimal256 `json:"currentDifficulty" gencodec:"optional"` + Random *math.HexOrDecimal256 `json:"currentRandom" gencodec:"optional"` GasLimit *math.HexOrDecimal64 `json:"currentGasLimit" gencodec:"required"` Number *math.HexOrDecimal64 `json:"currentNumber" gencodec:"required"` Timestamp *math.HexOrDecimal64 `json:"currentTimestamp" gencodec:"required"` @@ -51,10 +54,12 @@ func (s *stEnv) UnmarshalJSON(input []byte) error { return errors.New("missing required field 'currentCoinbase' for stEnv") } s.Coinbase = common.Address(*dec.Coinbase) - if dec.Difficulty == nil { - return errors.New("missing required field 'currentDifficulty' for stEnv") + if dec.Difficulty != nil { + s.Difficulty = (*big.Int)(dec.Difficulty) + } + if dec.Random != nil { + s.Random = (*big.Int)(dec.Random) } - s.Difficulty = (*big.Int)(dec.Difficulty) if dec.GasLimit == nil { return errors.New("missing required field 'currentGasLimit' for stEnv") } diff --git a/tests/state_test_util.go b/tests/state_test_util.go index f7fb08bfbc8d..8a5920f29588 100644 --- a/tests/state_test_util.go +++ b/tests/state_test_util.go @@ -80,7 +80,8 @@ type stPostState struct { type stEnv struct { Coinbase common.Address `json:"currentCoinbase" gencodec:"required"` - Difficulty *big.Int `json:"currentDifficulty" gencodec:"required"` + Difficulty *big.Int `json:"currentDifficulty" gencodec:"optional"` + Random *big.Int `json:"currentRandom" gencodec:"optional"` GasLimit uint64 `json:"currentGasLimit" gencodec:"required"` Number uint64 `json:"currentNumber" gencodec:"required"` Timestamp uint64 `json:"currentTimestamp" gencodec:"required"` @@ -90,6 +91,7 @@ type stEnv struct { type stEnvMarshaling struct { Coinbase common.UnprefixedAddress Difficulty *math.HexOrDecimal256 + Random *math.HexOrDecimal256 GasLimit math.HexOrDecimal64 Number math.HexOrDecimal64 Timestamp math.HexOrDecimal64 @@ -218,8 +220,12 @@ func (t *StateTest) RunNoVerify(subtest StateSubtest, vmconfig vm.Config, snapsh context := core.NewEVMBlockContext(block.Header(), nil, &t.json.Env.Coinbase) context.GetHash = vmTestBlockHash context.BaseFee = baseFee + if t.json.Env.Random != nil { + vmconfig.RandomOpcode = true + context.Random = common.BigToHash(t.json.Env.Random) + context.Difficulty = nil + } evm := vm.NewEVM(context, txContext, statedb, config, vmconfig) - // Execute the message. snapshot := statedb.Snapshot() gaspool := new(core.GasPool) @@ -268,7 +274,7 @@ func MakePreState(db ethdb.Database, accounts core.GenesisAlloc, snapshotter boo } func (t *StateTest) genesis(config *params.ChainConfig) *core.Genesis { - return &core.Genesis{ + genesis := &core.Genesis{ Config: config, Coinbase: t.json.Env.Coinbase, Difficulty: t.json.Env.Difficulty, @@ -277,6 +283,12 @@ func (t *StateTest) genesis(config *params.ChainConfig) *core.Genesis { Timestamp: t.json.Env.Timestamp, Alloc: t.json.Pre, } + if t.json.Env.Random != nil { + // Post-Merge + genesis.Mixhash = common.BigToHash(t.json.Env.Random) + genesis.Difficulty = nil + } + return genesis } func (tx *stTransaction) toMessage(ps stPostState, baseFee *big.Int) (core.Message, error) { diff --git a/tests/testdata b/tests/testdata index 092a8834dc44..5d534e37b80e 160000 --- a/tests/testdata +++ b/tests/testdata @@ -1 +1 @@ -Subproject commit 092a8834dc445e683103689d6f0e75a5d380a190 +Subproject commit 5d534e37b80e9310e8c7751f805ca481a451123e