diff --git a/crates/net/eth-wire-types/src/block_range.rs b/crates/net/eth-wire-types/src/block_range.rs new file mode 100644 index 00000000000..2678c95006d --- /dev/null +++ b/crates/net/eth-wire-types/src/block_range.rs @@ -0,0 +1,56 @@ +//! Block range related types introduced in eth/70 (EIP-7975). + +use alloy_rlp::{RlpDecodable, RlpEncodable}; +use reth_codecs_derive::add_arbitrary_tests; + +/// The block range a peer can currently serve (inclusive bounds). +#[derive(Clone, Copy, Debug, PartialEq, Eq, RlpEncodable, RlpDecodable)] +#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] +#[cfg_attr(any(test, feature = "arbitrary"), derive(arbitrary::Arbitrary))] +#[add_arbitrary_tests(rlp)] +pub struct BlockRange { + /// Earliest block number the peer can serve. + pub start_block: u64, + /// Latest block number the peer can serve. + pub end_block: u64, +} + +impl BlockRange { + /// Returns true if the start/end pair forms a valid range. + pub const fn is_valid(&self) -> bool { + self.start_block <= self.end_block + } +} + +/// eth/70 request for the peer's current block range. +/// +/// The payload is empty; the request id is carried by the [`crate::message::RequestPair`]. +#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, RlpEncodable, RlpDecodable)] +#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] +#[cfg_attr(any(test, feature = "arbitrary"), derive(arbitrary::Arbitrary))] +#[add_arbitrary_tests(rlp)] +pub struct RequestBlockRange; + +/// eth/70 response to [`RequestBlockRange`]. +#[derive(Clone, Copy, Debug, PartialEq, Eq, RlpEncodable, RlpDecodable)] +#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] +#[cfg_attr(any(test, feature = "arbitrary"), derive(arbitrary::Arbitrary))] +#[add_arbitrary_tests(rlp)] +pub struct SendBlockRange { + /// The earliest block which is available. + pub start_block: u64, + /// The latest block which is available. + pub end_block: u64, +} + +impl SendBlockRange { + /// Returns the contained range. + pub const fn as_range(&self) -> BlockRange { + BlockRange { start_block: self.start_block, end_block: self.end_block } + } + + /// Constructs from a [`BlockRange`]. + pub const fn from_range(range: BlockRange) -> Self { + Self { start_block: range.start_block, end_block: range.end_block } + } +} diff --git a/crates/net/eth-wire-types/src/broadcast.rs b/crates/net/eth-wire-types/src/broadcast.rs index 1900cf004aa..9855f6f6cb8 100644 --- a/crates/net/eth-wire-types/src/broadcast.rs +++ b/crates/net/eth-wire-types/src/broadcast.rs @@ -169,7 +169,7 @@ impl NewPooledTransactionHashes { matches!(version, EthVersion::Eth67 | EthVersion::Eth66) } Self::Eth68(_) => { - matches!(version, EthVersion::Eth68 | EthVersion::Eth69) + matches!(version, EthVersion::Eth68 | EthVersion::Eth69 | EthVersion::Eth70) } } } diff --git a/crates/net/eth-wire-types/src/capability.rs b/crates/net/eth-wire-types/src/capability.rs index f7cd00671f8..d35e4c17ee9 100644 --- a/crates/net/eth-wire-types/src/capability.rs +++ b/crates/net/eth-wire-types/src/capability.rs @@ -100,6 +100,16 @@ impl Capability { Self::eth(EthVersion::Eth68) } + /// Returns the [`EthVersion::Eth69`] capability. + pub const fn eth_69() -> Self { + Self::eth(EthVersion::Eth69) + } + + /// Returns the [`EthVersion::Eth70`] capability. + pub const fn eth_70() -> Self { + Self::eth(EthVersion::Eth70) + } + /// Whether this is eth v66 protocol. #[inline] pub fn is_eth_v66(&self) -> bool { @@ -118,10 +128,26 @@ impl Capability { self.name == "eth" && self.version == 68 } + /// Whether this is eth v69. + #[inline] + pub fn is_eth_v69(&self) -> bool { + self.name == "eth" && self.version == 69 + } + + /// Whether this is eth v70. + #[inline] + pub fn is_eth_v70(&self) -> bool { + self.name == "eth" && self.version == 70 + } + /// Whether this is any eth version. #[inline] pub fn is_eth(&self) -> bool { - self.is_eth_v66() || self.is_eth_v67() || self.is_eth_v68() + self.is_eth_v66() || + self.is_eth_v67() || + self.is_eth_v68() || + self.is_eth_v69() || + self.is_eth_v70() } } @@ -141,7 +167,7 @@ impl From for Capability { #[cfg(any(test, feature = "arbitrary"))] impl<'a> arbitrary::Arbitrary<'a> for Capability { fn arbitrary(u: &mut arbitrary::Unstructured<'a>) -> arbitrary::Result { - let version = u.int_in_range(66..=69)?; // Valid eth protocol versions are 66-69 + let version = u.int_in_range(66..=70)?; // Valid eth protocol versions are 66-70 // Only generate valid eth protocol name for now since it's the only supported protocol Ok(Self::new_static("eth", version)) } @@ -155,6 +181,8 @@ pub struct Capabilities { eth_66: bool, eth_67: bool, eth_68: bool, + eth_69: bool, + eth_70: bool, } impl Capabilities { @@ -164,6 +192,8 @@ impl Capabilities { eth_66: value.iter().any(Capability::is_eth_v66), eth_67: value.iter().any(Capability::is_eth_v67), eth_68: value.iter().any(Capability::is_eth_v68), + eth_69: value.iter().any(Capability::is_eth_v69), + eth_70: value.iter().any(Capability::is_eth_v70), inner: value, } } @@ -182,7 +212,7 @@ impl Capabilities { /// Whether the peer supports `eth` sub-protocol. #[inline] pub const fn supports_eth(&self) -> bool { - self.eth_68 || self.eth_67 || self.eth_66 + self.eth_70 || self.eth_69 || self.eth_68 || self.eth_67 || self.eth_66 } /// Whether this peer supports eth v66 protocol. @@ -202,6 +232,18 @@ impl Capabilities { pub const fn supports_eth_v68(&self) -> bool { self.eth_68 } + + /// Whether this peer supports eth v69 protocol. + #[inline] + pub const fn supports_eth_v69(&self) -> bool { + self.eth_69 + } + + /// Whether this peer supports eth v70 protocol. + #[inline] + pub const fn supports_eth_v70(&self) -> bool { + self.eth_70 + } } impl From> for Capabilities { @@ -224,6 +266,8 @@ impl Decodable for Capabilities { eth_66: inner.iter().any(Capability::is_eth_v66), eth_67: inner.iter().any(Capability::is_eth_v67), eth_68: inner.iter().any(Capability::is_eth_v68), + eth_69: inner.iter().any(Capability::is_eth_v69), + eth_70: inner.iter().any(Capability::is_eth_v70), inner, }) } diff --git a/crates/net/eth-wire-types/src/lib.rs b/crates/net/eth-wire-types/src/lib.rs index b7d27227846..7a0a8233cc5 100644 --- a/crates/net/eth-wire-types/src/lib.rs +++ b/crates/net/eth-wire-types/src/lib.rs @@ -12,7 +12,10 @@ extern crate alloc; mod status; -pub use status::{Status, StatusBuilder, StatusEth69, StatusMessage, UnifiedStatus}; +pub use status::{Status, StatusBuilder, StatusEth69, StatusEth70, StatusMessage, UnifiedStatus}; + +mod block_range; +pub use block_range::{BlockRange, RequestBlockRange, SendBlockRange}; pub mod version; pub use version::{EthVersion, ProtocolVersion}; diff --git a/crates/net/eth-wire-types/src/message.rs b/crates/net/eth-wire-types/src/message.rs index 5f36115204b..f8404a2531b 100644 --- a/crates/net/eth-wire-types/src/message.rs +++ b/crates/net/eth-wire-types/src/message.rs @@ -1,4 +1,4 @@ -//! Implements Ethereum wire protocol for versions 66, 67, and 68. +//! Implements Ethereum wire protocol for versions 66 through 70. //! Defines structs/enums for messages, request-response pairs, and broadcasts. //! Handles compatibility with [`EthVersion`]. //! @@ -9,8 +9,8 @@ use super::{ broadcast::NewBlockHashes, BlockBodies, BlockHeaders, GetBlockBodies, GetBlockHeaders, GetNodeData, GetPooledTransactions, GetReceipts, NewPooledTransactionHashes66, - NewPooledTransactionHashes68, NodeData, PooledTransactions, Receipts, Status, StatusEth69, - Transactions, + NewPooledTransactionHashes68, NodeData, PooledTransactions, Receipts, RequestBlockRange, + SendBlockRange, Status, StatusEth69, StatusEth70, Transactions, }; use crate::{ status::StatusMessage, BlockRangeUpdate, EthNetworkPrimitives, EthVersion, NetworkPrimitives, @@ -66,10 +66,12 @@ impl ProtocolMessage { // For EIP-7642 (https://github.com/ethereum/EIPs/blob/master/EIPS/eip-7642.md): // pre-merge (legacy) status messages include total difficulty, whereas eth/69 omits it. let message = match message_type { - EthMessageID::Status => EthMessage::Status(if version < EthVersion::Eth69 { - StatusMessage::Legacy(Status::decode(buf)?) - } else { + EthMessageID::Status => EthMessage::Status(if version >= EthVersion::Eth70 { + StatusMessage::Eth70(StatusEth70::decode(buf)?) + } else if version >= EthVersion::Eth69 { StatusMessage::Eth69(StatusEth69::decode(buf)?) + } else { + StatusMessage::Legacy(Status::decode(buf)?) }), EthMessageID::NewBlockHashes => { EthMessage::NewBlockHashes(NewBlockHashes::decode(buf)?) @@ -99,6 +101,18 @@ impl ProtocolMessage { EthMessageID::PooledTransactions => { EthMessage::PooledTransactions(RequestPair::decode(buf)?) } + EthMessageID::RequestBlockRange => { + if version < EthVersion::Eth70 { + return Err(MessageError::Invalid(version, EthMessageID::RequestBlockRange)) + } + EthMessage::RequestBlockRange(RequestPair::decode(buf)?) + } + EthMessageID::SendBlockRange => { + if version < EthVersion::Eth70 { + return Err(MessageError::Invalid(version, EthMessageID::SendBlockRange)) + } + EthMessage::SendBlockRange(RequestPair::decode(buf)?) + } EthMessageID::GetNodeData => { if version >= EthVersion::Eth67 { return Err(MessageError::Invalid(version, EthMessageID::GetNodeData)) @@ -205,6 +219,9 @@ impl From> for ProtocolBroadcastMes /// /// The `eth/69` announces the historical block range served by the node. Removes total difficulty /// information. And removes the Bloom field from receipts transferred over the protocol. +/// +/// The `eth/70` (EIP-7975) extends `Status` with `blockRange` and adds `RequestBlockRange` / +/// `SendBlockRange` messages to query/announce the currently served block interval. #[derive(Clone, Debug, PartialEq, Eq)] #[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] pub enum EthMessage { @@ -253,6 +270,10 @@ pub enum EthMessage { serde(bound = "N::PooledTransaction: serde::Serialize + serde::de::DeserializeOwned") )] PooledTransactions(RequestPair>), + /// Represents a `RequestBlockRange` request-response pair. + RequestBlockRange(RequestPair), + /// Represents a `SendBlockRange` request-response pair. + SendBlockRange(RequestPair), /// Represents a `GetNodeData` request-response pair. GetNodeData(RequestPair), /// Represents a `NodeData` request-response pair. @@ -298,6 +319,8 @@ impl EthMessage { Self::BlockBodies(_) => EthMessageID::BlockBodies, Self::GetPooledTransactions(_) => EthMessageID::GetPooledTransactions, Self::PooledTransactions(_) => EthMessageID::PooledTransactions, + Self::RequestBlockRange(_) => EthMessageID::RequestBlockRange, + Self::SendBlockRange(_) => EthMessageID::SendBlockRange, Self::GetNodeData(_) => EthMessageID::GetNodeData, Self::NodeData(_) => EthMessageID::NodeData, Self::GetReceipts(_) => EthMessageID::GetReceipts, @@ -315,7 +338,8 @@ impl EthMessage { Self::GetBlockHeaders(_) | Self::GetReceipts(_) | Self::GetPooledTransactions(_) | - Self::GetNodeData(_) + Self::GetNodeData(_) | + Self::RequestBlockRange(_) ) } @@ -328,7 +352,8 @@ impl EthMessage { Self::Receipts69(_) | Self::BlockHeaders(_) | Self::BlockBodies(_) | - Self::NodeData(_) + Self::NodeData(_) | + Self::SendBlockRange(_) ) } } @@ -348,6 +373,8 @@ impl Encodable for EthMessage { Self::BlockBodies(bodies) => bodies.encode(out), Self::GetPooledTransactions(request) => request.encode(out), Self::PooledTransactions(transactions) => transactions.encode(out), + Self::RequestBlockRange(request) => request.encode(out), + Self::SendBlockRange(range) => range.encode(out), Self::GetNodeData(request) => request.encode(out), Self::NodeData(data) => data.encode(out), Self::GetReceipts(request) => request.encode(out), @@ -371,6 +398,8 @@ impl Encodable for EthMessage { Self::BlockBodies(bodies) => bodies.length(), Self::GetPooledTransactions(request) => request.length(), Self::PooledTransactions(transactions) => transactions.length(), + Self::RequestBlockRange(request) => request.length(), + Self::SendBlockRange(range) => range.length(), Self::GetNodeData(request) => request.length(), Self::NodeData(data) => data.length(), Self::GetReceipts(request) => request.length(), @@ -452,6 +481,10 @@ pub enum EthMessageID { GetPooledTransactions = 0x09, /// Represents pooled transactions. PooledTransactions = 0x0a, + /// Requests block range (eth/70). + RequestBlockRange = 0x0b, + /// Responds with block range (eth/70). + SendBlockRange = 0x0c, /// Requests node data. GetNodeData = 0x0d, /// Represents node data. @@ -483,6 +516,8 @@ impl EthMessageID { Self::NewPooledTransactionHashes => 0x08, Self::GetPooledTransactions => 0x09, Self::PooledTransactions => 0x0a, + Self::RequestBlockRange => 0x0b, + Self::SendBlockRange => 0x0c, Self::GetNodeData => 0x0d, Self::NodeData => 0x0e, Self::GetReceipts => 0x0f, @@ -494,10 +529,9 @@ impl EthMessageID { /// Returns the max value for the given version. pub const fn max(version: EthVersion) -> u8 { - if version.is_eth69() { - Self::BlockRangeUpdate.to_u8() - } else { - Self::Receipts.to_u8() + match version { + EthVersion::Eth70 | EthVersion::Eth69 => Self::BlockRangeUpdate.to_u8(), + _ => Self::Receipts.to_u8(), } } @@ -562,6 +596,8 @@ impl TryFrom for EthMessageID { 0x08 => Ok(Self::NewPooledTransactionHashes), 0x09 => Ok(Self::GetPooledTransactions), 0x0a => Ok(Self::PooledTransactions), + 0x0b => Ok(Self::RequestBlockRange), + 0x0c => Ok(Self::SendBlockRange), 0x0d => Ok(Self::GetNodeData), 0x0e => Ok(Self::NodeData), 0x0f => Ok(Self::GetReceipts), diff --git a/crates/net/eth-wire-types/src/status.rs b/crates/net/eth-wire-types/src/status.rs index 7c9bbe1af49..eb771bcacde 100644 --- a/crates/net/eth-wire-types/src/status.rs +++ b/crates/net/eth-wire-types/src/status.rs @@ -1,4 +1,4 @@ -use crate::EthVersion; +use crate::{BlockRange, EthVersion}; use alloy_chains::{Chain, NamedChain}; use alloy_hardforks::{EthereumHardfork, ForkId, Head}; use alloy_primitives::{hex, B256, U256}; @@ -9,11 +9,11 @@ use reth_codecs_derive::add_arbitrary_tests; /// `UnifiedStatus` is an internal superset of all ETH status fields for all `eth/` versions. /// -/// This type can be converted into [`Status`] or [`StatusEth69`] depending on the version and -/// unsupported fields are stripped out. +/// This type can be converted into [`Status`], [`StatusEth69`], or [`StatusEth70`] depending on +/// the negotiated version and unsupported fields are stripped out. #[derive(Clone, Debug, PartialEq, Eq, Copy)] pub struct UnifiedStatus { - /// The eth protocol version (e.g. eth/66 to eth/69). + /// The eth protocol version (e.g. eth/66 to eth/70). pub version: EthVersion, /// The chain ID identifying the peer’s network. pub chain: Chain, @@ -29,6 +29,8 @@ pub struct UnifiedStatus { pub earliest_block: Option, /// The latest block number this node has (eth/69 only). pub latest_block: Option, + /// The block range this node can serve (eth/70). + pub block_range: Option, } impl Default for UnifiedStatus { @@ -45,6 +47,7 @@ impl Default for UnifiedStatus { total_difficulty: Some(U256::from(17_179_869_184u64)), earliest_block: Some(0), latest_block: Some(0), + block_range: None, } } } @@ -68,6 +71,7 @@ impl UnifiedStatus { .total_difficulty(Some(head.total_difficulty)) .earliest_block(Some(0)) .latest_block(Some(head.number)) + .block_range(Some(BlockRange { start_block: 0, end_block: head.number })) .build() } @@ -76,6 +80,7 @@ impl UnifiedStatus { pub const fn set_history_range(&mut self, earliest: u64, latest: u64) { self.earliest_block = Some(earliest); self.latest_block = Some(latest); + self.block_range = Some(BlockRange { start_block: earliest, end_block: latest }); } /// Sets the [`EthVersion`] for the status. @@ -83,6 +88,13 @@ impl UnifiedStatus { self.version = v; } + /// Sets the block range for eth/70 peers. + pub const fn set_block_range(&mut self, block_range: BlockRange) { + self.block_range = Some(block_range); + self.earliest_block = Some(block_range.start_block); + self.latest_block = Some(block_range.end_block); + } + /// Consume this `UnifiedStatus` and produce the legacy [`Status`] message used by all /// `eth/66`–`eth/68`. pub fn into_legacy(self) -> Status { @@ -109,12 +121,33 @@ impl UnifiedStatus { } } + /// Consume this `UnifiedStatus` and produce the [`StatusEth70`] message used by `eth/70`. + pub fn into_eth70(self) -> StatusEth70 { + let block_range = self + .block_range + .or_else(|| { + self.earliest_block + .zip(self.latest_block) + .map(|(start_block, end_block)| BlockRange { start_block, end_block }) + }) + .unwrap_or(BlockRange { start_block: 0, end_block: 0 }); + + StatusEth70 { + version: self.version, + chain: self.chain, + genesis: self.genesis, + forkid: self.forkid, + block_range, + blockhash: self.blockhash, + } + } + /// Convert this `UnifiedStatus` into the appropriate `StatusMessage` variant based on version. pub fn into_message(self) -> StatusMessage { - if self.version >= EthVersion::Eth69 { - StatusMessage::Eth69(self.into_eth69()) - } else { - StatusMessage::Legacy(self.into_legacy()) + match self.version { + v if v >= EthVersion::Eth70 => StatusMessage::Eth70(self.into_eth70()), + v if v >= EthVersion::Eth69 => StatusMessage::Eth69(self.into_eth69()), + _ => StatusMessage::Legacy(self.into_legacy()), } } @@ -130,6 +163,7 @@ impl UnifiedStatus { total_difficulty: Some(s.total_difficulty), earliest_block: None, latest_block: None, + block_range: None, }, StatusMessage::Eth69(e) => Self { version: e.version, @@ -140,6 +174,18 @@ impl UnifiedStatus { total_difficulty: None, earliest_block: Some(e.earliest), latest_block: Some(e.latest), + block_range: Some(BlockRange { start_block: e.earliest, end_block: e.latest }), + }, + StatusMessage::Eth70(e) => Self { + version: e.version, + chain: e.chain, + genesis: e.genesis, + forkid: e.forkid, + blockhash: e.blockhash, + total_difficulty: None, + earliest_block: Some(e.block_range.start_block), + latest_block: Some(e.block_range.end_block), + block_range: Some(e.block_range), }, } } @@ -153,11 +199,17 @@ pub struct StatusBuilder { impl StatusBuilder { /// Consumes the builder and returns the constructed [`UnifiedStatus`]. - pub const fn build(self) -> UnifiedStatus { + pub const fn build(mut self) -> UnifiedStatus { + // If earliest/latest were set (eth/69 path), also populate block_range so eth/70 can reuse + // the same values without requiring a separate setter. eth/69 conversion ignores + // block_range, so this is safe for all versions. + if let (Some(start), Some(end)) = (self.status.earliest_block, self.status.latest_block) { + self.status.block_range = Some(BlockRange { start_block: start, end_block: end }); + } self.status } - /// Sets the eth protocol version (e.g., eth/66, eth/69). + /// Sets the eth protocol version (e.g., eth/66, eth/70). pub const fn version(mut self, version: EthVersion) -> Self { self.status.version = version; self @@ -204,6 +256,18 @@ impl StatusBuilder { self.status.latest_block = latest; self } + + /// Sets the block range, if known (eth/70). + pub const fn block_range(mut self, block_range: Option) -> Self { + if let Some(block_range) = block_range { + self.status.block_range = Some(block_range); + self.status.earliest_block = Some(block_range.start_block); + self.status.latest_block = Some(block_range.end_block); + } else { + self.status.block_range = None; + } + self + } } /// The status message is used in the eth protocol handshake to ensure that peers are on the same @@ -378,8 +442,83 @@ impl Debug for StatusEth69 { } } -/// `StatusMessage` can store either the Legacy version (with TD) or the -/// eth/69 version (omits TD). +/// Status message for `eth/70` including the block range as per EIP-7975. +#[derive(Copy, Clone, PartialEq, Eq, RlpEncodable, RlpDecodable)] +#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] +#[cfg_attr(any(test, feature = "arbitrary"), derive(arbitrary::Arbitrary))] +#[add_arbitrary_tests(rlp)] +pub struct StatusEth70 { + /// The current protocol version (eth/70). + pub version: EthVersion, + + /// The chain id, as introduced in EIP-155. + pub chain: Chain, + + /// The genesis hash of the peer's chain. + pub genesis: B256, + + /// Fork identifier as defined by EIP-2124. + pub forkid: ForkId, + + /// The block range the peer can serve. + pub block_range: BlockRange, + + /// Hash of the latest block this node has (current head). + pub blockhash: B256, +} + +impl Display for StatusEth70 { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + let hexed_blockhash = hex::encode(self.blockhash); + let hexed_genesis = hex::encode(self.genesis); + write!( + f, + "StatusEth70 {{ version: {}, chain: {}, genesis: {}, forkid: {:X?}, block_range: [{}, {}], blockhash: {} }}", + self.version, + self.chain, + hexed_genesis, + self.forkid, + self.block_range.start_block, + self.block_range.end_block, + hexed_blockhash, + ) + } +} + +impl Debug for StatusEth70 { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + let hexed_blockhash = hex::encode(self.blockhash); + let hexed_genesis = hex::encode(self.genesis); + if f.alternate() { + write!( + f, + "StatusEth70 {{\n\tversion: {:?},\n\tchain: {:?},\n\tgenesis: {},\n\tforkid: {:X?},\n\tblock_range: [{}, {}],\n\tblockhash: {}\n}}", + self.version, + self.chain, + hexed_genesis, + self.forkid, + self.block_range.start_block, + self.block_range.end_block, + hexed_blockhash + ) + } else { + write!( + f, + "StatusEth70 {{ version: {:?}, chain: {:?}, genesis: {}, forkid: {:X?}, block_range: [{}, {}], blockhash: {} }}", + self.version, + self.chain, + hexed_genesis, + self.forkid, + self.block_range.start_block, + self.block_range.end_block, + hexed_blockhash + ) + } + } +} + +/// `StatusMessage` can store either the Legacy version (with TD), the +/// eth/69 version (omits TD), or the eth/70 version (with block range). #[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] #[derive(Clone, Copy, Debug, PartialEq, Eq)] pub enum StatusMessage { @@ -387,6 +526,8 @@ pub enum StatusMessage { Legacy(Status), /// The new `eth/69` status with no `total_difficulty`. Eth69(StatusEth69), + /// The `eth/70` status which includes the `block_range`. + Eth70(StatusEth70), } impl StatusMessage { @@ -395,6 +536,7 @@ impl StatusMessage { match self { Self::Legacy(legacy_status) => legacy_status.genesis, Self::Eth69(status_69) => status_69.genesis, + Self::Eth70(status_70) => status_70.genesis, } } @@ -403,6 +545,7 @@ impl StatusMessage { match self { Self::Legacy(legacy_status) => legacy_status.version, Self::Eth69(status_69) => status_69.version, + Self::Eth70(status_70) => status_70.version, } } @@ -411,6 +554,7 @@ impl StatusMessage { match self { Self::Legacy(legacy_status) => &legacy_status.chain, Self::Eth69(status_69) => &status_69.chain, + Self::Eth70(status_70) => &status_70.chain, } } @@ -419,6 +563,7 @@ impl StatusMessage { match self { Self::Legacy(legacy_status) => legacy_status.forkid, Self::Eth69(status_69) => status_69.forkid, + Self::Eth70(status_70) => status_70.forkid, } } @@ -427,6 +572,7 @@ impl StatusMessage { match self { Self::Legacy(legacy_status) => legacy_status.blockhash, Self::Eth69(status_69) => status_69.blockhash, + Self::Eth70(status_70) => status_70.blockhash, } } } @@ -436,6 +582,7 @@ impl Encodable for StatusMessage { match self { Self::Legacy(s) => s.encode(out), Self::Eth69(s) => s.encode(out), + Self::Eth70(s) => s.encode(out), } } @@ -443,6 +590,7 @@ impl Encodable for StatusMessage { match self { Self::Legacy(s) => s.length(), Self::Eth69(s) => s.length(), + Self::Eth70(s) => s.length(), } } } @@ -452,12 +600,13 @@ impl Display for StatusMessage { match self { Self::Legacy(s) => Display::fmt(s, f), Self::Eth69(s69) => Display::fmt(s69, f), + Self::Eth70(s70) => Display::fmt(s70, f), } } } #[cfg(test)] mod tests { - use crate::{EthVersion, Status, StatusEth69, StatusMessage, UnifiedStatus}; + use crate::{BlockRange, EthVersion, Status, StatusEth69, StatusMessage, UnifiedStatus}; use alloy_consensus::constants::MAINNET_GENESIS_HASH; use alloy_genesis::Genesis; use alloy_hardforks::{EthereumHardfork, ForkHash, ForkId, Head}; @@ -546,6 +695,30 @@ mod tests { assert_eq!(unified_status, roundtripped_unified_status); } + #[test] + fn roundtrip_eth70() { + let unified_status = UnifiedStatus::builder() + .version(EthVersion::Eth70) + .chain(Chain::mainnet()) + .genesis(MAINNET_GENESIS_HASH) + .forkid(ForkId { hash: ForkHash([0xb7, 0x15, 0x07, 0x7d]), next: 0 }) + .blockhash(b256!("0xfeb27336ca7923f8fab3bd617fcb6e75841538f71c1bcfc267d7838489d9e13d")) + .total_difficulty(None) + .block_range(Some(BlockRange { start_block: 1, end_block: 2 })) + .build(); + + let status_message = unified_status.into_message(); + let roundtripped_unified_status = UnifiedStatus::from_message(status_message); + assert_eq!(unified_status, roundtripped_unified_status); + + if let StatusMessage::Eth70(status70) = status_message { + assert_eq!(status70.block_range.start_block, 1); + assert_eq!(status70.block_range.end_block, 2); + } else { + panic!("expected StatusMessage::Eth70 variant"); + } + } + #[test] fn encode_eth69_status_message() { let expected = hex!("f8544501a0d4e56740f876aef8c010b86a40d5f56745a118d0906a34e69aec8c0db1cb8fa3c684b715077d8083ed14f2840112a880a0feb27336ca7923f8fab3bd617fcb6e75841538f71c1bcfc267d7838489d9e13d"); diff --git a/crates/net/eth-wire-types/src/version.rs b/crates/net/eth-wire-types/src/version.rs index 8b2e3a424d9..6553bd2e41e 100644 --- a/crates/net/eth-wire-types/src/version.rs +++ b/crates/net/eth-wire-types/src/version.rs @@ -27,6 +27,8 @@ pub enum EthVersion { Eth68 = 68, /// The `eth` protocol version 69. Eth69 = 69, + /// The `eth` protocol version 70. + Eth70 = 70, } impl EthVersion { @@ -55,6 +57,11 @@ impl EthVersion { pub const fn is_eth69(&self) -> bool { matches!(self, Self::Eth69) } + + /// Returns true if the version is eth/70 + pub const fn is_eth70(&self) -> bool { + matches!(self, Self::Eth70) + } } /// RLP encodes `EthVersion` as a single byte (66-69). @@ -96,6 +103,7 @@ impl TryFrom<&str> for EthVersion { "67" => Ok(Self::Eth67), "68" => Ok(Self::Eth68), "69" => Ok(Self::Eth69), + "70" => Ok(Self::Eth70), _ => Err(ParseVersionError(s.to_string())), } } @@ -120,6 +128,7 @@ impl TryFrom for EthVersion { 67 => Ok(Self::Eth67), 68 => Ok(Self::Eth68), 69 => Ok(Self::Eth69), + 70 => Ok(Self::Eth70), _ => Err(ParseVersionError(u.to_string())), } } @@ -149,6 +158,7 @@ impl From for &'static str { EthVersion::Eth67 => "67", EthVersion::Eth68 => "68", EthVersion::Eth69 => "69", + EthVersion::Eth70 => "70", } } } @@ -195,7 +205,7 @@ impl Decodable for ProtocolVersion { #[cfg(test)] mod tests { - use super::{EthVersion, ParseVersionError}; + use super::EthVersion; use alloy_rlp::{Decodable, Encodable, Error as RlpError}; use bytes::BytesMut; @@ -205,7 +215,7 @@ mod tests { assert_eq!(EthVersion::Eth67, EthVersion::try_from("67").unwrap()); assert_eq!(EthVersion::Eth68, EthVersion::try_from("68").unwrap()); assert_eq!(EthVersion::Eth69, EthVersion::try_from("69").unwrap()); - assert_eq!(Err(ParseVersionError("70".to_string())), EthVersion::try_from("70")); + assert_eq!(EthVersion::Eth70, EthVersion::try_from("70").unwrap()); } #[test] @@ -214,12 +224,18 @@ mod tests { assert_eq!(EthVersion::Eth67, "67".parse().unwrap()); assert_eq!(EthVersion::Eth68, "68".parse().unwrap()); assert_eq!(EthVersion::Eth69, "69".parse().unwrap()); - assert_eq!(Err(ParseVersionError("70".to_string())), "70".parse::()); + assert_eq!(EthVersion::Eth70, "70".parse().unwrap()); } #[test] fn test_eth_version_rlp_encode() { - let versions = [EthVersion::Eth66, EthVersion::Eth67, EthVersion::Eth68, EthVersion::Eth69]; + let versions = [ + EthVersion::Eth66, + EthVersion::Eth67, + EthVersion::Eth68, + EthVersion::Eth69, + EthVersion::Eth70, + ]; for version in versions { let mut encoded = BytesMut::new(); @@ -236,7 +252,7 @@ mod tests { (67_u8, Ok(EthVersion::Eth67)), (68_u8, Ok(EthVersion::Eth68)), (69_u8, Ok(EthVersion::Eth69)), - (70_u8, Err(RlpError::Custom("invalid eth version"))), + (70_u8, Ok(EthVersion::Eth70)), (65_u8, Err(RlpError::Custom("invalid eth version"))), ]; diff --git a/crates/net/eth-wire/src/capability.rs b/crates/net/eth-wire/src/capability.rs index 9b706a02cf9..9691acb4399 100644 --- a/crates/net/eth-wire/src/capability.rs +++ b/crates/net/eth-wire/src/capability.rs @@ -418,6 +418,8 @@ mod tests { Capability::new_static("eth", 66), Capability::new_static("eth", 67), Capability::new_static("eth", 68), + Capability::new_static("eth", 69), + Capability::new_static("eth", 70), ] .into(); @@ -425,6 +427,8 @@ mod tests { assert!(capabilities.supports_eth_v66()); assert!(capabilities.supports_eth_v67()); assert!(capabilities.supports_eth_v68()); + assert!(capabilities.supports_eth_v69()); + assert!(capabilities.supports_eth_v70()); } #[test] diff --git a/crates/net/eth-wire/src/handshake.rs b/crates/net/eth-wire/src/handshake.rs index f604f1fca11..6e24e6951f0 100644 --- a/crates/net/eth-wire/src/handshake.rs +++ b/crates/net/eth-wire/src/handshake.rs @@ -218,6 +218,19 @@ where return Err(EthHandshakeError::BlockhashZero.into()); } } + if let StatusMessage::Eth70(s) = their_status_message { + if !s.block_range.is_valid() { + return Err(EthHandshakeError::EarliestBlockGreaterThanLatestBlock { + got: s.block_range.start_block, + latest: s.block_range.end_block, + } + .into()); + } + + if s.blockhash.is_zero() { + return Err(EthHandshakeError::BlockhashZero.into()); + } + } Ok(UnifiedStatus::from_message(their_status_message)) } diff --git a/crates/net/eth-wire/src/hello.rs b/crates/net/eth-wire/src/hello.rs index 40deebb6310..9cad7223a01 100644 --- a/crates/net/eth-wire/src/hello.rs +++ b/crates/net/eth-wire/src/hello.rs @@ -260,10 +260,11 @@ mod tests { assert_eq!(hello_encoded.len(), hello.length()); } + //TODO: add test for eth70 here once we have fully support it #[test] - fn test_default_protocols_include_eth69() { - // ensure that the default protocol list includes Eth69 as the latest version + fn test_default_protocols_still_include_eth69() { + // ensure that older eth/69 remains advertised for compatibility let secret_key = SecretKey::new(&mut rand_08::thread_rng()); let id = pk2id(&secret_key.public_key(SECP256K1)); let hello = HelloMessageWithProtocols::builder(id).build(); diff --git a/crates/net/eth-wire/src/protocol.rs b/crates/net/eth-wire/src/protocol.rs index 16ec62b7cd7..18fb3fd0252 100644 --- a/crates/net/eth-wire/src/protocol.rs +++ b/crates/net/eth-wire/src/protocol.rs @@ -84,5 +84,6 @@ mod tests { assert_eq!(Protocol::eth(EthVersion::Eth67).messages(), 17); assert_eq!(Protocol::eth(EthVersion::Eth68).messages(), 17); assert_eq!(Protocol::eth(EthVersion::Eth69).messages(), 18); + assert_eq!(Protocol::eth(EthVersion::Eth70).messages(), 18); } } diff --git a/crates/net/network/src/session/active.rs b/crates/net/network/src/session/active.rs index 98ccbad3b95..4c41ee69b7d 100644 --- a/crates/net/network/src/session/active.rs +++ b/crates/net/network/src/session/active.rs @@ -26,7 +26,8 @@ use metrics::Gauge; use reth_eth_wire::{ errors::{EthHandshakeError, EthStreamError}, message::{EthBroadcastMessage, MessageError, RequestPair}, - Capabilities, DisconnectP2P, DisconnectReason, EthMessage, NetworkPrimitives, NewBlockPayload, + BlockRange, Capabilities, DisconnectP2P, DisconnectReason, EthMessage, NetworkPrimitives, + NewBlockPayload, RequestBlockRange, SendBlockRange, }; use reth_eth_wire_types::RawCapabilityMessage; use reth_metrics::common::mpsc::MeteredPollSender; @@ -51,6 +52,9 @@ use tracing::{debug, trace}; /// since the last update. The interval is set to one epoch duration in seconds. pub(super) const RANGE_UPDATE_INTERVAL: Duration = Duration::from_secs(EPOCH_SLOTS * 12); +/// Minimum interval between `RequestBlockRange` queries for eth/70 peers. +pub(super) const RANGE_REQUEST_INTERVAL: Duration = Duration::from_secs(60); + // Constants for timeout updating. /// Minimum timeout value @@ -135,6 +139,10 @@ pub(crate) struct ActiveSession { /// The last latest block number we sent in a range update /// Used to avoid sending unnecessary updates when block height hasn't changed significantly pub(crate) last_sent_latest_block: Option, + /// The last time we requested the remote block range (eth/70). + pub(crate) last_range_request: Option, + /// The last time we updated the remote block range information. + pub(crate) last_range_update: Option, } impl ActiveSession { @@ -276,6 +284,37 @@ impl ActiveSession { EthMessage::Receipts69(resp) => { on_response!(resp, GetReceipts69) } + EthMessage::RequestBlockRange(req) => { + let response = EthMessage::SendBlockRange(RequestPair { + request_id: req.request_id, + message: SendBlockRange::from_range(BlockRange { + start_block: self.local_range_info.earliest(), + end_block: self.local_range_info.latest(), + }), + }); + self.queued_outgoing.push_back(response.into()); + OnIncomingMessageOutcome::Ok + } + EthMessage::SendBlockRange(msg) => { + let range = msg.message.as_range(); + if range.start_block > range.end_block { + return OnIncomingMessageOutcome::BadMessage { + error: EthStreamError::InvalidMessage(MessageError::Other(format!( + "invalid block range: start ({}) > end ({})", + range.start_block, range.end_block + ))), + message: EthMessage::SendBlockRange(msg), + }; + } + + if let Some(range_info) = self.range_info.as_ref() { + let latest_hash = range_info.latest_hash(); + range_info.update(range.start_block, range.end_block, latest_hash); + } + self.last_range_update = Some(Instant::now()); + + OnIncomingMessageOutcome::Ok + } EthMessage::BlockRangeUpdate(msg) => { // Validate that earliest <= latest according to the spec if msg.earliest > msg.latest { @@ -301,6 +340,7 @@ impl ActiveSession { if let Some(range_info) = self.range_info.as_ref() { range_info.update(msg.earliest, msg.latest, msg.latest_hash); } + self.last_range_update = Some(Instant::now()); OnIncomingMessageOutcome::Ok } @@ -356,6 +396,42 @@ impl ActiveSession { } } + /// Request our peer's block range only when a throttle allows it. + fn maybe_request_block_range(&mut self, tick: Option) { + if self.conn.version() < EthVersion::Eth70 { + return + } + + let needs_now = self.last_range_update.is_some() || self.last_range_request.is_some(); + let now_for_eval = tick.or_else(|| needs_now.then(Instant::now)); + + let stale_update = self.last_range_update.is_none_or(|last| { + // compare against interval tick (or freshly captured) to determine if the + // current view is stale. + let now = now_for_eval.expect("now must be present when last_range_update is set"); + now.saturating_duration_since(last) >= RANGE_REQUEST_INTERVAL + }); + let can_request = self.last_range_request.is_none_or(|last| { + let now = now_for_eval.expect("now must be present when last_range_request is set"); + now.saturating_duration_since(last) >= RANGE_REQUEST_INTERVAL + }); + + if stale_update && can_request { + // Only allocate a request ID and enqueue a message when both + // staleness and request-rate checks pass. + let now = now_for_eval.unwrap_or_else(Instant::now); + let request_id = self.next_id(); + self.queued_outgoing.push_back( + EthMessage::RequestBlockRange(RequestPair { + request_id, + message: RequestBlockRange, + }) + .into(), + ); + self.last_range_request = Some(now); + } + } + /// Returns the deadline timestamp at which the request times out fn request_deadline(&self) -> Instant { Instant::now() + @@ -734,9 +810,12 @@ impl Future for ActiveSession { } } + let mut interval_tick = None; if let Some(interval) = &mut this.range_update_interval { // Check if we should send a range update based on block height changes - while interval.poll_tick(cx).is_ready() { + while let Poll::Ready(now) = interval.poll_tick(cx) { + let now: Instant = now.into_std(); + interval_tick = Some(now); let current_latest = this.local_range_info.latest(); let should_send = if let Some(last_sent) = this.last_sent_latest_block { // Only send if block height has advanced by at least one epoch (32 blocks) @@ -746,14 +825,26 @@ impl Future for ActiveSession { }; if should_send { - this.queued_outgoing.push_back( - EthMessage::BlockRangeUpdate(this.local_range_info.to_message()).into(), - ); + // Prefer eth/70 SendBlockRange; fall back to eth/69 BlockRangeUpdate. + let msg = if this.conn.version() >= EthVersion::Eth70 { + EthMessage::SendBlockRange(RequestPair { + request_id: 0, + message: SendBlockRange::from_range(BlockRange { + start_block: this.local_range_info.earliest(), + end_block: this.local_range_info.latest(), + }), + }) + } else { + EthMessage::BlockRangeUpdate(this.local_range_info.to_message()) + }; + this.queued_outgoing.push_back(msg.into()); this.last_sent_latest_block = Some(current_latest); } } } + this.maybe_request_block_range(interval_tick); + while this.internal_request_timeout_interval.poll_tick(cx).is_ready() { // check for timed out requests if this.check_timed_out_requests(Instant::now()) && @@ -1073,6 +1164,8 @@ mod tests { ), range_update_interval: None, last_sent_latest_block: None, + last_range_request: None, + last_range_update: None, } } ev => { diff --git a/crates/net/network/src/session/mod.rs b/crates/net/network/src/session/mod.rs index 17528e2fcfa..fba877a5cab 100644 --- a/crates/net/network/src/session/mod.rs +++ b/crates/net/network/src/session/mod.rs @@ -19,7 +19,7 @@ use futures::{future::Either, io, FutureExt, StreamExt}; use reth_ecies::{stream::ECIESStream, ECIESError}; use reth_eth_wire::{ errors::EthStreamError, handshake::EthRlpxHandshake, multiplex::RlpxProtocolMultiplexer, - BlockRangeUpdate, Capabilities, DisconnectReason, EthStream, EthVersion, + BlockRange, BlockRangeUpdate, Capabilities, DisconnectReason, EthStream, EthVersion, HelloMessageWithProtocols, NetworkPrimitives, UnauthedP2PStream, UnifiedStatus, HANDSHAKE_TIMEOUT, }; @@ -538,6 +538,11 @@ impl SessionManager { // negotiated version let version = conn.version(); + let remote_range_info = + status.earliest_block.zip(status.latest_block).map(|(earliest, latest)| { + BlockRangeInfo::new(earliest, latest, status.blockhash) + }); + // Configure the interval at which the range information is updated, starting with // ETH69 let range_update_interval = (conn.version() >= EthVersion::Eth69).then(|| { @@ -568,10 +573,12 @@ impl SessionManager { internal_request_timeout: Arc::clone(&timeout), protocol_breach_request_timeout: self.protocol_breach_request_timeout, terminate_message: None, - range_info: None, + range_info: remote_range_info.clone(), local_range_info: self.local_range_info.clone(), range_update_interval, last_sent_latest_block: None, + last_range_request: None, + last_range_update: remote_range_info.as_ref().map(|_| Instant::now()), }; self.spawn(session); @@ -608,7 +615,7 @@ impl SessionManager { messages, direction, timeout, - range_info: None, + range_info: remote_range_info, }) } PendingSessionEvent::Disconnected { remote_addr, session_id, direction, error } => { @@ -692,6 +699,10 @@ impl SessionManager { self.status.earliest_block = Some(block_range_update.earliest); self.status.latest_block = Some(block_range_update.latest); self.status.blockhash = block_range_update.latest_hash; + self.status.block_range = Some(BlockRange { + start_block: block_range_update.earliest, + end_block: block_range_update.latest, + }); // Update the shared local range info that gets propagated to active sessions self.local_range_info.update( diff --git a/crates/net/network/src/transactions/mod.rs b/crates/net/network/src/transactions/mod.rs index 11b0e14dcaf..9f37f29e8d2 100644 --- a/crates/net/network/src/transactions/mod.rs +++ b/crates/net/network/src/transactions/mod.rs @@ -1921,7 +1921,9 @@ impl PooledTransactionsHashesBuilder { fn new(version: EthVersion) -> Self { match version { EthVersion::Eth66 | EthVersion::Eth67 => Self::Eth66(Default::default()), - EthVersion::Eth68 | EthVersion::Eth69 => Self::Eth68(Default::default()), + EthVersion::Eth68 | EthVersion::Eth69 | EthVersion::Eth70 => { + Self::Eth68(Default::default()) + } } }