Skip to content

Commit fb7f42c

Browse files
committed
introduce NetworkManager component
1 parent 6c8c70c commit fb7f42c

File tree

19 files changed

+714
-300
lines changed

19 files changed

+714
-300
lines changed

Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,8 @@ license = "MIT OR Apache-2.0"
66
exclude = [".github/"]
77

88
[workspace]
9-
members = [
9+
members = [ "crates/network",
1010
"crates/scroll-wire"
1111
]
1212

13-
resolver = "2"
13+
resolver = "2"

crates/network/Cargo.toml

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
[package]
2+
name = "network"
3+
version.workspace = true
4+
edition.workspace = true
5+
rust-version.workspace = true
6+
license.workspace = true
7+
exclude.workspace = true
8+
9+
[dependencies]
10+
reth-eth-wire-types = { path = "../../../reth/crates/net/eth-wire-types" }
11+
reth-network = { path = "../../../reth/crates/net/network" }
12+
reth-network-api = { path = "../../../reth/crates/net/network-api" }
13+
reth-network-types = { path = "../../../reth/crates/net/network-types" }
14+
reth-network-peers = { path = "../../../reth/crates/net/peers" }
15+
reth-primitives = { path = "../../../reth/crates/primitives" }
16+
reth-discv5 = { path = "../../../reth/crates/net/discv5" }
17+
reth-tasks = { path = "../../../reth/crates/tasks" }
18+
reth-storage-api = { path = "../../../reth/crates/storage/storage-api" }
19+
tokio = {version = "1.39", default-features = false, features = ["full"] }
20+
tokio-stream = "0.1"
21+
scroll-wire = { path = "../scroll-wire" }
22+
alloy-primitives = { version = "0.8.15", default-features = false, features = [
23+
"map-foldhash",
24+
] }
25+
secp256k1 = { version = "0.29", default-features = false, features = ["global-context", "rand-std", "recovery"] }
26+
parking_lot = "0.12"
27+
futures = "0.3"
28+
reth-scroll-chainspec = { path = "../../../reth/crates/scroll/chainspec" }
29+

crates/network/src/handle.rs

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
use alloy_primitives::PrimitiveSignature;
2+
use reth_network::NetworkHandle as RethNetworkHandle;
3+
use reth_network_peers::PeerId;
4+
use reth_primitives::Block;
5+
use secp256k1::SecretKey;
6+
use std::sync::Arc;
7+
use tokio::sync::{mpsc::UnboundedSender, oneshot};
8+
9+
/// A _sharable_ frontend used to communicate with the [`NetworkManager`].
10+
#[derive(Debug, Clone)]
11+
pub struct NetworkHandle {
12+
/// A reference to the inner network handle.
13+
pub(crate) inner: Arc<NetworkInner>,
14+
}
15+
16+
impl NetworkHandle {
17+
/// Creates a new [`NetworkHandle`] instance from the given [`UnboundedSender`] and [`RethNetworkHandle`].
18+
pub fn new(
19+
to_manager_tx: UnboundedSender<NetworkHandleMessage>,
20+
inner_network_handle: RethNetworkHandle,
21+
) -> Self {
22+
let inner = NetworkInner {
23+
to_manager_tx,
24+
inner_network_handle,
25+
};
26+
Self {
27+
inner: Arc::new(inner),
28+
}
29+
}
30+
}
31+
32+
/// The inner state of the [`NetworkHandle`].
33+
#[derive(Debug)]
34+
pub struct NetworkInner {
35+
/// The sender half of the channel set up between this type and the [`NetworkManager`].
36+
pub(crate) to_manager_tx: UnboundedSender<NetworkHandleMessage>,
37+
/// A reference to the inner network handle which is used to communicate with the inner network.
38+
pub inner_network_handle: RethNetworkHandle,
39+
}
40+
41+
impl NetworkHandle {
42+
/// Returns a reference to the inner network handle.
43+
pub fn inner(&self) -> &RethNetworkHandle {
44+
&self.inner.inner_network_handle
45+
}
46+
47+
/// Returns the peer id of the network handle.
48+
pub fn peer_id(&self) -> &PeerId {
49+
&self.inner.inner_network_handle.peer_id()
50+
}
51+
52+
/// Sends a message to the network manager.
53+
pub fn send_message(&self, msg: NetworkHandleMessage) {
54+
let _ = self.inner.to_manager_tx.send(msg);
55+
}
56+
57+
/// Announces a block to the network.
58+
pub fn announce_block(&self, block: Block, signature: PrimitiveSignature) {
59+
self.send_message(NetworkHandleMessage::AnnounceBlock { block, signature });
60+
}
61+
62+
/// Shuts down the network handle.
63+
pub async fn shutdown(&self) -> Result<(), oneshot::error::RecvError> {
64+
let (tx, rx) = oneshot::channel();
65+
self.send_message(NetworkHandleMessage::Shutdown(tx));
66+
rx.await
67+
}
68+
69+
/// Returns the secret key of the network handle.
70+
pub fn secret_key(&self) -> &SecretKey {
71+
&self.inner.inner_network_handle.secret_key()
72+
}
73+
}
74+
75+
/// A message type used for communication between the [`NetworkHandle`] and the [`super::NetworkManager`].
76+
pub enum NetworkHandleMessage {
77+
AnnounceBlock {
78+
block: Block,
79+
signature: PrimitiveSignature,
80+
},
81+
Shutdown(oneshot::Sender<()>),
82+
}

crates/network/src/import.rs

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
use alloy_primitives::PrimitiveSignature;
2+
use reth_network_peers::PeerId;
3+
use scroll_wire::NewBlock;
4+
use std::task::{Context, Poll};
5+
6+
/// A trait for importing new blocks from the network.
7+
pub trait BlockImport: std::fmt::Debug + Send + Sync {
8+
/// Called when a new block is received from the network.
9+
fn on_new_block(
10+
&mut self,
11+
peer_id: PeerId,
12+
block: reth_primitives::Block,
13+
signature: PrimitiveSignature,
14+
);
15+
16+
/// Polls the block import type for results of block import.
17+
fn poll(&mut self, cx: &mut Context<'_>) -> Poll<BlockImportOutcome>;
18+
}
19+
20+
/// The outcome of a block import operation.
21+
pub struct BlockImportOutcome {
22+
/// The peer that the block was received from.
23+
pub peer: PeerId,
24+
/// The result of the block import operation.
25+
pub result: Result<BlockValidation, BlockImportError>,
26+
}
27+
28+
/// The result of a block validation operation.
29+
pub enum BlockValidation {
30+
/// The block header is valid.
31+
ValidHeader { new_block: NewBlock },
32+
/// The block is valid.
33+
ValidBlock { new_block: NewBlock },
34+
}
35+
36+
/// An error that can occur during block import.
37+
pub enum BlockImportError {
38+
/// An error occurred during consensus.
39+
Consensus(ConsensusError),
40+
}
41+
42+
/// A consensus related error that can occur during block import.
43+
pub enum ConsensusError {
44+
/// The block is invalid.
45+
InvalidBlock,
46+
/// The state root is invalid.
47+
InvalidStateRoot,
48+
/// The signature is invalid.
49+
InvalidSignature,
50+
}
51+
52+
/// A block import type that does nothing.
53+
#[derive(Debug)]
54+
pub struct NoopBlockImport;
55+
56+
impl BlockImport for NoopBlockImport {
57+
fn on_new_block(
58+
&mut self,
59+
_peer_id: PeerId,
60+
_block: reth_primitives::Block,
61+
_signature: PrimitiveSignature,
62+
) {
63+
println!("received new block");
64+
}
65+
66+
fn poll(&mut self, _cx: &mut std::task::Context<'_>) -> std::task::Poll<BlockImportOutcome> {
67+
std::task::Poll::Pending
68+
}
69+
}

crates/network/src/lib.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
mod import;
2+
pub use import::{BlockImport, BlockImportOutcome, BlockValidation, NoopBlockImport};
3+
4+
mod handle;
5+
use handle::{NetworkHandle, NetworkHandleMessage};
6+
7+
mod manager;
8+
pub use manager::NetworkManager;
9+
10+
pub use reth_network::NetworkConfigBuilder;

crates/network/src/main.rs

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
use alloy_primitives::PrimitiveSignature;
2+
use network::{NetworkConfigBuilder, NetworkManager, NoopBlockImport};
3+
use reth_network::{NetworkInfo, Peers};
4+
use reth_scroll_chainspec::SCROLL_MAINNET;
5+
6+
#[tokio::main]
7+
async fn main() {
8+
let config_1 =
9+
NetworkConfigBuilder::<reth_network::EthNetworkPrimitives>::with_rng_secret_key()
10+
.disable_discovery()
11+
.build_with_noop_provider((*SCROLL_MAINNET).clone());
12+
let network_1 = NetworkManager::new(config_1, NoopBlockImport).await;
13+
let network_1_handle = network_1.handle();
14+
let peer_1_id = *network_1_handle.peer_id();
15+
let peer_1_addr = network_1_handle.inner().local_addr();
16+
17+
let config_2 =
18+
NetworkConfigBuilder::<reth_network::EthNetworkPrimitives>::with_rng_secret_key()
19+
.disable_discovery()
20+
.listener_addr(std::net::SocketAddr::V4(std::net::SocketAddrV4::new(
21+
std::net::Ipv4Addr::UNSPECIFIED,
22+
0,
23+
)))
24+
.build_with_noop_provider((*SCROLL_MAINNET).clone());
25+
let network_2 = NetworkManager::new(config_2, NoopBlockImport).await;
26+
let network_2_handle = network_2.handle();
27+
let peer_2_id = *network_2_handle.peer_id();
28+
let peer_2_addr = network_2_handle.inner().local_addr();
29+
30+
tokio::spawn(network_1);
31+
tokio::spawn(network_2);
32+
33+
network_1_handle.inner().add_peer(peer_2_id, peer_2_addr);
34+
network_2_handle.inner().add_peer(peer_1_id, peer_1_addr);
35+
36+
let signature = PrimitiveSignature::try_from(&[0u8; 65][..]).unwrap();
37+
let block = reth_primitives::Block::default();
38+
39+
for _ in 0..100 {
40+
network_1_handle.announce_block(block.clone(), signature);
41+
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
42+
}
43+
}

0 commit comments

Comments
 (0)