Skip to content

Commit 1748ddc

Browse files
perf(l1): improved discovery and peer initialization (#5147)
**Motivation** We are too aggressive on our DiscoveryServer and Initiator. On start we send up to 5000 Udp messages every 5 seconds and start up to 5000 Tcp connections every 3 seconds. After several tests it was found that on one hand it is not necessary to be that eager, and on the other hand it caused several issues regarding the tokio runtime (ie. it was blocked for more than 10ms in several places, causing some problems when trying to synchronize from the local machine). **Description** This PR changes the way Lookup is performed: instead of trying a lot of connections at the same time every 3 seconds, it distributes the connection attempts in time, making it more gentle with the CPU. It also lowers the attempt rate. - Our Discovery lookup is more frequent, but tries 1 node at a time. - Our Connection attempt is more frequent, but tries 1 node at a time. - We match both processes rate to avoid having both, too much pending connections, or no pending connections at all. - Also noticed that spawning some GenServers in dedicated threads was no longer necessary, and it created a lot of threads, as every `start_on_thread` creates it's own Tokio Runtime with it's thread pool. - Also removed a spawn_blocking inside a GenServer that may cause a contention issue. ** Pending (for a future PR) ** - Will create a command line argument to set up the lookup frequency or eagerness: #5148 - Will make the lookup frequency adjustable at runtime, depending on the amount of connected peers: #5149 --------- Co-authored-by: Tomás Grüner <[email protected]>
1 parent 59d2ccb commit 1748ddc

File tree

6 files changed

+93
-92
lines changed

6 files changed

+93
-92
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,10 @@
22

33
## Perf
44

5+
### 2025-10-31
6+
7+
- Improved discovery and peer initialization [#5147](https://github.com/lambdaclass/ethrex/pull/5147)
8+
59
### 2025-10-30
610

711
- Pipeline Merkleization and Execution [#5084](https://github.com/lambdaclass/ethrex/pull/5084)

crates/networking/p2p/discv4/peer_table.rs

Lines changed: 32 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ use std::{
1717
time::{Duration, Instant},
1818
};
1919
use thiserror::Error;
20-
use tracing::debug;
2120

2221
const MAX_SCORE: i64 = 50;
2322
const MIN_SCORE: i64 = -50;
@@ -127,7 +126,7 @@ pub struct PeerTable {
127126
impl PeerTable {
128127
pub fn spawn(target_peers: usize) -> PeerTable {
129128
PeerTable {
130-
handle: PeerTableServer::new(target_peers).start_on_thread(),
129+
handle: PeerTableServer::new(target_peers).start(),
131130
}
132131
}
133132

@@ -320,25 +319,20 @@ impl PeerTable {
320319
}
321320
}
322321

323-
/// Get all contacts available to initiate a connection
324-
pub async fn get_contacts_to_initiate(
325-
&mut self,
326-
amount: usize,
327-
) -> Result<Vec<Contact>, PeerTableError> {
328-
match self
329-
.handle
330-
.call(CallMessage::GetContactsToInitiate(amount))
331-
.await?
332-
{
333-
OutMessage::Contacts(contacts) => Ok(contacts),
322+
/// Provide a contact to initiate a connection
323+
pub async fn get_contact_to_initiate(&mut self) -> Result<Option<Contact>, PeerTableError> {
324+
match self.handle.call(CallMessage::GetContactToInitiate).await? {
325+
OutMessage::Contact(contact) => Ok(Some(contact)),
326+
OutMessage::NotFound => Ok(None),
334327
_ => unreachable!(),
335328
}
336329
}
337330

338-
/// Get all contacts available for lookup
339-
pub async fn get_contacts_for_lookup(&mut self) -> Result<Vec<Contact>, PeerTableError> {
340-
match self.handle.call(CallMessage::GetContactsForLookup).await? {
341-
OutMessage::Contacts(contacts) => Ok(contacts),
331+
/// Provide a contact to perform Discovery lookup
332+
pub async fn get_contact_for_lookup(&mut self) -> Result<Option<Contact>, PeerTableError> {
333+
match self.handle.call(CallMessage::GetContactForLookup).await? {
334+
OutMessage::Contact(contact) => Ok(Some(contact)),
335+
OutMessage::NotFound => Ok(None),
342336
_ => unreachable!(),
343337
}
344338
}
@@ -574,10 +568,7 @@ impl PeerTableServer {
574568
}
575569
}
576570

577-
fn get_contacts_to_initiate(&mut self, max_amount: usize) -> Vec<Contact> {
578-
let mut contacts = Vec::new();
579-
let mut tried_connections = 0;
580-
571+
fn get_contact_to_initiate(&mut self) -> Option<Contact> {
581572
for contact in self.contacts.values() {
582573
let node_id = contact.node.node_id();
583574
if !self.peers.contains_key(&node_id)
@@ -587,29 +578,23 @@ impl PeerTableServer {
587578
{
588579
self.already_tried_peers.insert(node_id);
589580

590-
contacts.push(contact.clone());
591-
592-
tried_connections += 1;
593-
if tried_connections >= max_amount {
594-
break;
595-
}
581+
return Some(contact.clone());
596582
}
597583
}
598-
599-
if tried_connections < max_amount {
600-
debug!("Resetting list of tried peers.");
601-
self.already_tried_peers.clear();
602-
}
603-
604-
contacts
584+
// No untried contact found, resetting tried peers.
585+
tracing::info!("Resetting list of tried peers.");
586+
self.already_tried_peers.clear();
587+
None
605588
}
606589

607-
fn get_contacts_for_lookup(&mut self) -> Vec<Contact> {
590+
fn get_contact_for_lookup(&mut self) -> Option<Contact> {
608591
self.contacts
609592
.values()
610593
.filter(|c| c.n_find_node_sent < MAX_FIND_NODE_PER_PEER && !c.disposable)
594+
.collect::<Vec<_>>()
595+
.choose(&mut rand::rngs::OsRng)
596+
.cloned()
611597
.cloned()
612-
.collect()
613598
}
614599

615600
fn get_contacts_to_revalidate(&mut self, revalidation_interval: Duration) -> Vec<Contact> {
@@ -635,7 +620,7 @@ impl PeerTableServer {
635620
if sender_ip != contact.node.ip {
636621
return OutMessage::IpMismatch;
637622
}
638-
OutMessage::ValidContact(contact.clone())
623+
OutMessage::Contact(contact.clone())
639624
}
640625

641626
fn get_closest_nodes(&mut self, node_id: H256) -> Vec<Node> {
@@ -814,8 +799,8 @@ enum CallMessage {
814799
PeerCountByCapabilities { capabilities: Vec<Capability> },
815800
TargetReached,
816801
TargetPeersReached,
817-
GetContactsToInitiate(usize),
818-
GetContactsForLookup,
802+
GetContactToInitiate,
803+
GetContactForLookup,
819804
GetContactsToRevalidate(Duration),
820805
GetBestPeer { capabilities: Vec<Capability> },
821806
GetScore { node_id: H256 },
@@ -844,7 +829,7 @@ pub enum OutMessage {
844829
TargetReached(bool),
845830
IsNew(bool),
846831
Nodes(Vec<Node>),
847-
ValidContact(Contact),
832+
Contact(Contact),
848833
InvalidContact,
849834
UnknownContact,
850835
IpMismatch,
@@ -890,12 +875,14 @@ impl GenServer for PeerTableServer {
890875
CallMessage::TargetPeersReached => CallResponse::Reply(Self::OutMsg::TargetReached(
891876
self.peers.len() >= self.target_peers,
892877
)),
893-
CallMessage::GetContactsToInitiate(amount) => CallResponse::Reply(
894-
Self::OutMsg::Contacts(self.get_contacts_to_initiate(amount)),
878+
CallMessage::GetContactToInitiate => CallResponse::Reply(
879+
self.get_contact_to_initiate()
880+
.map_or(Self::OutMsg::NotFound, Self::OutMsg::Contact),
881+
),
882+
CallMessage::GetContactForLookup => CallResponse::Reply(
883+
self.get_contact_for_lookup()
884+
.map_or(Self::OutMsg::NotFound, Self::OutMsg::Contact),
895885
),
896-
CallMessage::GetContactsForLookup => {
897-
CallResponse::Reply(Self::OutMsg::Contacts(self.get_contacts_for_lookup()))
898-
}
899886
CallMessage::GetContactsToRevalidate(revalidation_interval) => CallResponse::Reply(
900887
Self::OutMsg::Contacts(self.get_contacts_to_revalidate(revalidation_interval)),
901888
),

crates/networking/p2p/discv4/server.rs

Lines changed: 33 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,9 @@ const REVALIDATION_INTERVAL: Duration = Duration::from_secs(12 * 60 * 60); // 12
3939
/// The initial interval between peer lookups, until the number of peers reaches
4040
/// [target_peers](DiscoverySideCarState::target_peers), or the number of
4141
/// contacts reaches [target_contacts](DiscoverySideCarState::target_contacts).
42-
const INITIAL_LOOKUP_INTERVAL: Duration = Duration::from_secs(5);
43-
const LOOKUP_INTERVAL: Duration = Duration::from_secs(5 * 60); // 5 minutes
42+
pub const INITIAL_LOOKUP_INTERVAL: Duration = Duration::from_millis(100); // 10 per second
43+
pub const LOOKUP_INTERVAL: Duration = Duration::from_millis(600); // 100 per minute
44+
const CHANGE_FIND_NODE_MESSAGE_INTERVAL: Duration = Duration::from_secs(5);
4445
const PRUNE_INTERVAL: Duration = Duration::from_secs(5);
4546

4647
#[derive(Debug, thiserror::Error)]
@@ -65,6 +66,7 @@ pub enum InMessage {
6566
Revalidate,
6667
Lookup,
6768
Prune,
69+
ChangeFindNodeMessage,
6870
Shutdown,
6971
}
7072

@@ -80,6 +82,9 @@ pub struct DiscoveryServer {
8082
signer: SecretKey,
8183
udp_socket: Arc<UdpSocket>,
8284
peer_table: PeerTable,
85+
/// The last `FindNode` message sent, cached due to message
86+
/// signatures being expensive.
87+
find_node_message: BytesMut,
8388
}
8489

8590
impl DiscoveryServer {
@@ -100,6 +105,7 @@ impl DiscoveryServer {
100105
signer,
101106
udp_socket,
102107
peer_table: peer_table.clone(),
108+
find_node_message: Self::random_message(&signer),
103109
};
104110

105111
info!(count = bootnodes.len(), "Adding bootnodes");
@@ -111,7 +117,7 @@ impl DiscoveryServer {
111117
.new_contacts(bootnodes, local_node.node_id())
112118
.await?;
113119

114-
discovery_server.start_on_thread();
120+
discovery_server.start();
115121
Ok(())
116122
}
117123

@@ -202,6 +208,18 @@ impl DiscoveryServer {
202208
Ok(())
203209
}
204210

211+
/// Generate and store a FindNodeMessage with a random key. We then send the same message on Disovery lookup.
212+
/// We change this message every CHANGE_FIND_NODE_MESSAGE_INTERVAL.
213+
fn random_message(signer: &SecretKey) -> BytesMut {
214+
let expiration: u64 = get_msg_expiration_from_seconds(EXPIRATION_SECONDS);
215+
let random_priv_key = SecretKey::new(&mut OsRng);
216+
let random_pub_key = public_key_from_signing_key(&random_priv_key);
217+
let msg = Message::FindNode(FindNodeMessage::new(random_pub_key, expiration));
218+
let mut buf = BytesMut::new();
219+
msg.encode_with_header(&mut buf, signer);
220+
buf
221+
}
222+
205223
async fn revalidate(&mut self) -> Result<(), DiscoveryServerError> {
206224
for contact in self
207225
.peer_table
@@ -214,17 +232,9 @@ impl DiscoveryServer {
214232
}
215233

216234
async fn lookup(&mut self) -> Result<(), DiscoveryServerError> {
217-
// Sending the same FindNode message to all contacts to optimize message creation and encoding
218-
let expiration: u64 = get_msg_expiration_from_seconds(EXPIRATION_SECONDS);
219-
let random_priv_key = SecretKey::new(&mut OsRng);
220-
let random_pub_key = public_key_from_signing_key(&random_priv_key);
221-
let msg = Message::FindNode(FindNodeMessage::new(random_pub_key, expiration));
222-
let mut buf = BytesMut::new();
223-
msg.encode_with_header(&mut buf, &self.signer);
224-
225-
for contact in self.peer_table.get_contacts_for_lookup().await? {
226-
if self.udp_socket.send_to(&buf, &contact.node.udp_addr()).await.inspect_err(
227-
|e| error!(sending = ?msg, addr = ?&contact.node.udp_addr(), err=?e, "Error sending message"),
235+
if let Some(contact) = self.peer_table.get_contact_for_lookup().await? {
236+
if self.udp_socket.send_to(&self.find_node_message, &contact.node.udp_addr()).await.inspect_err(
237+
|e| error!(sending = "FindNode", addr = ?&contact.node.udp_addr(), err=?e, "Error sending message"),
228238
).is_err() {
229239
self.peer_table
230240
.set_disposable(&contact.node.node_id())
@@ -454,7 +464,7 @@ impl DiscoveryServer {
454464
debug!(received = message_type, to = %format!("{sender_public_key:#x}"), "IP address mismatch, skipping");
455465
Err(DiscoveryServerError::InvalidContact)
456466
}
457-
PeerTableOutMessage::ValidContact(contact) => Ok(contact),
467+
PeerTableOutMessage::Contact(contact) => Ok(contact),
458468
_ => unreachable!(),
459469
}
460470
}
@@ -505,8 +515,12 @@ impl GenServer for DiscoveryServer {
505515
InMessage::Revalidate,
506516
);
507517
send_interval(PRUNE_INTERVAL, handle.clone(), InMessage::Prune);
518+
send_interval(
519+
CHANGE_FIND_NODE_MESSAGE_INTERVAL,
520+
handle.clone(),
521+
InMessage::ChangeFindNodeMessage,
522+
);
508523
let _ = handle.clone().cast(InMessage::Lookup).await;
509-
510524
send_message_on(handle.clone(), tokio::signal::ctrl_c(), InMessage::Shutdown);
511525

512526
Ok(Success(self))
@@ -548,6 +562,9 @@ impl GenServer for DiscoveryServer {
548562
.await
549563
.inspect_err(|e| error!(err=?e, "Error Pruning peer table"));
550564
}
565+
Self::CastMsg::ChangeFindNodeMessage => {
566+
self.find_node_message = Self::random_message(&self.signer);
567+
}
551568
Self::CastMsg::Shutdown => return CastResponse::Stop,
552569
}
553570
CastResponse::NoReply

crates/networking/p2p/rlpx/connection/handshake.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ pub(crate) async fn perform(
6363
eth_version: Arc<RwLock<EthCapVersion>>,
6464
) -> Result<(Established, SplitStream<Framed<TcpStream, RLPxCodec>>), PeerConnectionError> {
6565
let (context, node, framed) = match state {
66-
ConnectionState::Initiator(Initiator { context, node, .. }) => {
66+
ConnectionState::Initiator(Initiator { context, node }) => {
6767
let addr = SocketAddr::new(node.ip, node.tcp_port);
6868
let mut stream = match tcp_stream(addr).await {
6969
Ok(result) => result,

crates/networking/p2p/rlpx/connection/server.rs

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1061,14 +1061,11 @@ async fn handle_incoming_message(
10611061
}
10621062
Message::GetByteCodes(req) => {
10631063
let storage_clone = state.storage.clone();
1064-
let response =
1065-
tokio::task::spawn_blocking(move || process_byte_codes_request(req, storage_clone))
1066-
.await
1067-
.map_err(|_| {
1068-
PeerConnectionError::InternalError(
1069-
"Failed to execute bytecode retrieval task".to_string(),
1070-
)
1071-
})??;
1064+
let response = process_byte_codes_request(req, storage_clone).map_err(|_| {
1065+
PeerConnectionError::InternalError(
1066+
"Failed to execute bytecode retrieval task".to_string(),
1067+
)
1068+
})?;
10721069
send(state, Message::ByteCodes(response)).await?
10731070
}
10741071
Message::GetTrieNodes(req) => {

0 commit comments

Comments
 (0)