Skip to content

Commit faddf78

Browse files
authored
fix: ensure our ENR cache has enough room for max sessions + remove temp fixes (#1739)
1 parent 9ad8def commit faddf78

File tree

7 files changed

+52
-140
lines changed

7 files changed

+52
-140
lines changed

bin/trin/src/cli.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ use ethportal_api::{
1717
};
1818
use portalnet::{
1919
bootnodes::Bootnodes,
20-
config::{PortalnetConfig, NODE_ADDR_CACHE_CAPACITY},
20+
config::{PortalnetConfig, DISCV5_SESSION_CACHE_CAPACITY},
2121
constants::{
2222
DEFAULT_DISCOVERY_PORT, DEFAULT_NETWORK, DEFAULT_UTP_TRANSFER_LIMIT,
2323
DEFAULT_WEB3_HTTP_ADDRESS, DEFAULT_WEB3_IPC_PATH, DEFAULT_WEB3_WS_PORT,
@@ -433,7 +433,7 @@ impl TrinConfig {
433433
bootnodes: self.bootnodes.to_enrs(self.network.network()),
434434
no_stun: self.no_stun,
435435
no_upnp: self.no_upnp,
436-
node_addr_cache_capacity: NODE_ADDR_CACHE_CAPACITY,
436+
discv5_session_cache_capacity: DISCV5_SESSION_CACHE_CAPACITY,
437437
disable_poke: self.disable_poke,
438438
trusted_block_root: self.trusted_block_root,
439439
utp_transfer_limit: self.utp_transfer_limit,

bin/trin/src/lib.rs

Lines changed: 1 addition & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -77,18 +77,7 @@ pub async fn run_trin(
7777
// Initialize and spawn uTP socket
7878
let (utp_talk_reqs_tx, utp_talk_reqs_rx) = mpsc::unbounded_channel();
7979

80-
// Set the enr_cache_capacity to the maximum uTP limit between all active networks. This is
81-
// a trade off between memory usage and increased searches from the networks for each Enr.
82-
// utp_transfer_limit is 2x as it would be utp_transfer_limit for incoming and
83-
// utp_transfer_limit for outgoing
84-
let enr_cache_capacity =
85-
portalnet_config.utp_transfer_limit * 2 * trin_config.portal_subnetworks.len();
86-
let discv5_utp_socket = Discv5UdpSocket::new(
87-
Arc::clone(&discovery),
88-
utp_talk_reqs_rx,
89-
header_oracle.clone(),
90-
enr_cache_capacity,
91-
);
80+
let discv5_utp_socket = Discv5UdpSocket::new(Arc::clone(&discovery), utp_talk_reqs_rx);
9281
let utp_socket = UtpSocket::with_socket(discv5_utp_socket);
9382
let utp_socket = Arc::new(utp_socket);
9483

crates/portalnet/src/config.rs

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,9 @@ use ethportal_api::types::{enr::Enr, network::Network};
55

66
use crate::{bootnodes::Bootnodes, constants::DEFAULT_UTP_TRANSFER_LIMIT};
77

8-
/// Capacity of the cache for observed `NodeAddress` values.
9-
/// Provides capacity for 32 full k-buckets. This capacity will be shared among all active portal
10-
/// subnetworks.
11-
pub const NODE_ADDR_CACHE_CAPACITY: usize = discv5::kbucket::MAX_NODES_PER_BUCKET * 32;
8+
/// Discv5 session cache capacity.
9+
/// Provides capacity for 1000 nodes, to match Discv5's default session_cache_capacity value.
10+
pub const DISCV5_SESSION_CACHE_CAPACITY: usize = 1000;
1211

1312
#[derive(Clone)]
1413
pub struct PortalnetConfig {
@@ -18,10 +17,10 @@ pub struct PortalnetConfig {
1817
pub bootnodes: Vec<Enr>,
1918
pub no_stun: bool,
2019
pub no_upnp: bool,
21-
pub node_addr_cache_capacity: usize,
20+
pub discv5_session_cache_capacity: usize,
2221
pub disable_poke: bool,
2322
pub trusted_block_root: Option<B256>,
24-
// the max number of concurrent utp transfers
23+
/// the max number of concurrent utp transfers
2524
pub utp_transfer_limit: usize,
2625
}
2726

@@ -35,7 +34,7 @@ impl Default for PortalnetConfig {
3534
bootnodes: Bootnodes::default().to_enrs(Network::Mainnet),
3635
no_stun: false,
3736
no_upnp: false,
38-
node_addr_cache_capacity: NODE_ADDR_CACHE_CAPACITY,
37+
discv5_session_cache_capacity: DISCV5_SESSION_CACHE_CAPACITY,
3938
disable_poke: false,
4039
trusted_block_root: None,
4140
utp_transfer_limit: DEFAULT_UTP_TRANSFER_LIMIT,

crates/portalnet/src/discovery.rs

Lines changed: 27 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,8 @@ use ethportal_api::{
2323
};
2424
use lru::LruCache;
2525
use parking_lot::RwLock;
26-
use tokio::sync::{mpsc, RwLock as TokioRwLock};
26+
use tokio::sync::mpsc;
2727
use tracing::{debug, info, warn};
28-
use trin_validation::oracle::HeaderOracle;
2928
use utp_rs::{
3029
peer::{ConnectionPeer, Peer},
3130
udp::AsyncUdpSocket,
@@ -159,6 +158,11 @@ impl Discovery {
159158

160159
let discv5_config = ConfigBuilder::new(listen_config)
161160
.request_timeout(Duration::from_secs(3))
161+
// Set the session cache capacity to match our node address cache capacity. If our cache
162+
// is smaller then the session cache capacity, it can lead to problems where we can't
163+
// send replies to nodes that we have a session with, as we wouldn't have enough room in
164+
// to store all the Enr's from all of our current established connections.
165+
.session_cache_capacity(portal_config.discv5_session_cache_capacity)
162166
.build();
163167
let discv5 = Discv5::new(enr, enr_key, discv5_config)
164168
.map_err(|e| format!("Failed to create discv5 instance: {e}"))?;
@@ -173,7 +177,9 @@ impl Discovery {
173177
.map_err(|e| format!("Failed to add bootnode enr: {e}"))?;
174178
}
175179

176-
let node_addr_cache = LruCache::new(portal_config.node_addr_cache_capacity);
180+
// We set the cache capacity to double the node address cache capacity pad for
181+
// inconsistencies between the two caches.
182+
let node_addr_cache = LruCache::new(portal_config.discv5_session_cache_capacity * 2);
177183
let node_addr_cache = Arc::new(RwLock::new(node_addr_cache));
178184

179185
Ok(Self {
@@ -213,16 +219,6 @@ impl Discovery {
213219
let _ = talk_req_tx.send(talk_req).await;
214220
}
215221
Event::SessionEstablished(enr, socket_addr) => {
216-
// TODO: this is a temporary fix to prevent caching of eth2 nodes
217-
// and will be updated to a more stable solution as soon as it
218-
// validates the theory of what is causing the issue on mainnet.
219-
if enr.get_decodable::<String>(ENR_PORTAL_CLIENT_KEY).is_none() {
220-
debug!(
221-
enr = ?enr,
222-
"discv5 session established with node that does not have a portal client key, not caching"
223-
);
224-
continue;
225-
}
226222
if let Some(old) = node_addr_cache.write().put(
227223
enr.node_id(),
228224
NodeAddress {
@@ -290,8 +286,20 @@ impl Discovery {
290286
}
291287

292288
/// Looks up the ENR for `node_id`.
289+
///
290+
/// First, it checks the cache for the `NodeAddress` and returns the `Enr` if found.
291+
/// If not found, it queries the Discv5 routing table for the `node_id` and returns the `Enr` if
292+
/// found.
293293
pub fn find_enr(&self, node_id: &NodeId) -> Option<Enr> {
294-
self.discv5.find_enr(node_id)
294+
if let Some(enr) = self.cached_node_addr(node_id) {
295+
return Some(enr.enr);
296+
}
297+
298+
if let Some(enr) = self.discv5.find_enr(node_id) {
299+
return Some(enr);
300+
}
301+
302+
None
295303
}
296304

297305
/// Adds `enr` to the discv5 routing table.
@@ -363,24 +371,16 @@ impl Discovery {
363371
pub struct Discv5UdpSocket {
364372
talk_request_receiver: mpsc::UnboundedReceiver<TalkRequest>,
365373
discv5: Arc<Discovery>,
366-
enr_cache: Arc<TokioRwLock<LruCache<NodeId, Enr>>>,
367-
header_oracle: Arc<TokioRwLock<HeaderOracle>>,
368374
}
369375

370376
impl Discv5UdpSocket {
371377
pub fn new(
372378
discv5: Arc<Discovery>,
373379
talk_request_receiver: mpsc::UnboundedReceiver<TalkRequest>,
374-
header_oracle: Arc<TokioRwLock<HeaderOracle>>,
375-
enr_cache_capacity: usize,
376380
) -> Self {
377-
let enr_cache = LruCache::new(enr_cache_capacity);
378-
let enr_cache = Arc::new(TokioRwLock::new(enr_cache));
379381
Self {
380382
discv5,
381383
talk_request_receiver,
382-
enr_cache,
383-
header_oracle,
384384
}
385385
}
386386
}
@@ -437,16 +437,15 @@ impl AsyncUdpSocket<UtpPeer> for Discv5UdpSocket {
437437
let peer_id = *peer.id();
438438
let peer_enr = peer.peer().cloned();
439439
let discv5 = Arc::clone(&self.discv5);
440-
let enr_cache = Arc::clone(&self.enr_cache);
441-
let header_oracle = Arc::clone(&self.header_oracle);
442440
let data = buf.to_vec();
443441
tokio::spawn(async move {
444442
let enr = match peer_enr {
445443
Some(enr) => enr.0,
446-
None => match find_enr(&peer_id, &discv5, enr_cache, header_oracle).await {
447-
Ok(enr) => enr,
448-
Err(err) => {
449-
warn!(%err, "unable to send uTP talk request, ENR not found");
444+
None => match discv5.find_enr(&peer_id) {
445+
Some(enr) => enr,
446+
None => {
447+
debug!(node_id = %peer_id, "uTP packet to unknown target");
448+
warn!( "unable to send uTP talk request, ENR not found for talk req destination");
450449
return;
451450
}
452451
},
@@ -483,54 +482,3 @@ impl AsyncUdpSocket<UtpPeer> for Discv5UdpSocket {
483482
}
484483
}
485484
}
486-
487-
async fn find_enr(
488-
node_id: &NodeId,
489-
discv5: &Arc<Discovery>,
490-
enr_cache: Arc<TokioRwLock<LruCache<NodeId, Enr>>>,
491-
header_oracle: Arc<TokioRwLock<HeaderOracle>>,
492-
) -> io::Result<Enr> {
493-
if let Some(cached_enr) = enr_cache.write().await.get(node_id).cloned() {
494-
return Ok(cached_enr);
495-
}
496-
497-
if let Some(enr) = discv5.find_enr(node_id) {
498-
enr_cache.write().await.put(*node_id, enr.clone());
499-
return Ok(enr);
500-
}
501-
502-
if let Some(enr) = discv5.cached_node_addr(node_id) {
503-
enr_cache.write().await.put(*node_id, enr.enr.clone());
504-
return Ok(enr.enr);
505-
}
506-
507-
let history_jsonrpc_tx = header_oracle.read().await.history_jsonrpc_tx();
508-
if let Ok(history_jsonrpc_tx) = history_jsonrpc_tx {
509-
if let Ok(enr) = HeaderOracle::history_get_enr(node_id, history_jsonrpc_tx).await {
510-
enr_cache.write().await.put(*node_id, enr.clone());
511-
return Ok(enr);
512-
}
513-
}
514-
515-
let state_jsonrpc_tx = header_oracle.read().await.state_jsonrpc_tx();
516-
if let Ok(state_jsonrpc_tx) = state_jsonrpc_tx {
517-
if let Ok(enr) = HeaderOracle::state_get_enr(node_id, state_jsonrpc_tx).await {
518-
enr_cache.write().await.put(*node_id, enr.clone());
519-
return Ok(enr);
520-
}
521-
}
522-
523-
let beacon_jsonrpc_tx = header_oracle.read().await.beacon_jsonrpc_tx();
524-
if let Ok(beacon_jsonrpc_tx) = beacon_jsonrpc_tx {
525-
if let Ok(enr) = HeaderOracle::beacon_get_enr(node_id, beacon_jsonrpc_tx).await {
526-
enr_cache.write().await.put(*node_id, enr.clone());
527-
return Ok(enr);
528-
}
529-
}
530-
531-
debug!(node_id = %node_id, "uTP packet to unknown target");
532-
Err(io::Error::new(
533-
io::ErrorKind::Other,
534-
"ENR not found for talk req destination",
535-
))
536-
}

crates/portalnet/src/overlay/service/manager.rs

Lines changed: 11 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1037,9 +1037,9 @@ impl<
10371037
// if we're unable to find the ENR for the source node we throw an error
10381038
// since the enr is required for the accept queue, and it is expected to be present
10391039
let enr = self.find_enr(source).ok_or_else(|| {
1040-
OverlayRequestError::AcceptError(
1041-
"handle_offer: unable to find ENR for NodeId".to_string(),
1042-
)
1040+
OverlayRequestError::AcceptError(format!(
1041+
"handle_offer: unable to find ENR for NodeId: source={source:?}"
1042+
))
10431043
})?;
10441044
for (i, key) in content_keys.iter().enumerate() {
10451045
// Accept content if within radius and not already present in the data store.
@@ -2281,19 +2281,14 @@ impl<
22812281

22822282
/// Returns an ENR if one is known for the given NodeId.
22832283
pub fn find_enr(&self, node_id: &NodeId) -> Option<Enr> {
2284-
// Check whether we know this node id in our X's Portal Network's routing table.
2285-
if let Some(node) = self.kbuckets.entry(*node_id).present_or_pending() {
2286-
return Some(node.enr);
2287-
}
2288-
2289-
// Check whether this node id is in our discv5 routing table
2284+
// Check whether this node id is in our enr_session_cache or discv5 routing table
22902285
if let Some(enr) = self.discovery.find_enr(node_id) {
22912286
return Some(enr);
22922287
}
22932288

2294-
// Check whether this node id is in our discovery ENR cache
2295-
if let Some(node_addr) = self.discovery.cached_node_addr(node_id) {
2296-
return Some(node_addr.enr);
2289+
// Check whether we know this node id in our X's Portal Network's routing table.
2290+
if let Some(node) = self.kbuckets.entry(*node_id).present_or_pending() {
2291+
return Some(node.enr);
22972292
}
22982293

22992294
// Check the existing find node queries for the ENR.
@@ -2473,14 +2468,11 @@ mod tests {
24732468
use parking_lot::lock_api::Mutex;
24742469
use rstest::*;
24752470
use serial_test::serial;
2476-
use tokio::{
2477-
sync::{mpsc::unbounded_channel, RwLock as TokioRwLock},
2478-
time::timeout,
2479-
};
2471+
use tokio::{sync::mpsc::unbounded_channel, time::timeout};
24802472
use tokio_test::{assert_pending, assert_ready, task};
24812473
use trin_metrics::portalnet::PORTALNET_METRICS;
24822474
use trin_storage::{DistanceFunction, MemoryContentStore};
2483-
use trin_validation::{oracle::HeaderOracle, validator::MockValidator};
2475+
use trin_validation::validator::MockValidator;
24842476

24852477
use super::*;
24862478
use crate::{
@@ -2510,15 +2502,9 @@ mod tests {
25102502
};
25112503
let discovery = Arc::new(Discovery::new(portal_config, MAINNET.clone()).unwrap());
25122504

2513-
let header_oracle = HeaderOracle::default();
2514-
let header_oracle = Arc::new(TokioRwLock::new(header_oracle));
25152505
let (_utp_talk_req_tx, utp_talk_req_rx) = unbounded_channel();
2516-
let discv5_utp = crate::discovery::Discv5UdpSocket::new(
2517-
Arc::clone(&discovery),
2518-
utp_talk_req_rx,
2519-
header_oracle,
2520-
50,
2521-
);
2506+
let discv5_utp =
2507+
crate::discovery::Discv5UdpSocket::new(Arc::clone(&discovery), utp_talk_req_rx);
25222508
let utp_socket = utp_rs::socket::UtpSocket::with_socket(discv5_utp);
25232509
let metrics = OverlayMetricsReporter {
25242510
overlay_metrics: PORTALNET_METRICS.overlay(),

crates/portalnet/tests/overlay.rs

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,11 @@ use portalnet::{
2626
},
2727
};
2828
use tokio::{
29-
sync::{mpsc, mpsc::unbounded_channel, RwLock as TokioRwLock},
29+
sync::{mpsc, mpsc::unbounded_channel},
3030
time::{self, Duration},
3131
};
3232
use trin_storage::{ContentStore, DistanceFunction, MemoryContentStore};
33-
use trin_validation::{oracle::HeaderOracle, validator::MockValidator};
33+
use trin_validation::validator::MockValidator;
3434
use utp_rs::socket::UtpSocket;
3535

3636
async fn init_overlay(
@@ -49,11 +49,8 @@ async fn init_overlay(
4949
let store = MemoryContentStore::new(node_id, DistanceFunction::Xor);
5050
let store = Arc::new(Mutex::new(store));
5151

52-
let header_oracle = HeaderOracle::default();
53-
let header_oracle = Arc::new(TokioRwLock::new(header_oracle));
5452
let (_utp_talk_req_tx, utp_talk_req_rx) = unbounded_channel();
55-
let discv5_utp =
56-
Discv5UdpSocket::new(Arc::clone(&discovery), utp_talk_req_rx, header_oracle, 50);
53+
let discv5_utp = Discv5UdpSocket::new(Arc::clone(&discovery), utp_talk_req_rx);
5754
let utp_socket = Arc::new(UtpSocket::with_socket(discv5_utp));
5855

5956
let validator = Arc::new(MockValidator {});

testing/utp/src/lib.rs

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ use tokio::sync::{
2525
mpsc::{self, Receiver},
2626
RwLock,
2727
};
28-
use trin_validation::oracle::HeaderOracle;
2928
use utp_rs::{conn::ConnectionConfig, peer::Peer, socket::UtpSocket};
3029

3130
use crate::rpc::RpcServer;
@@ -179,15 +178,9 @@ pub async fn run_test_app(
179178
let enr = discovery.local_enr();
180179
let discovery = Arc::new(discovery);
181180

182-
let header_oracle = HeaderOracle::default();
183-
let header_oracle = Arc::new(RwLock::new(header_oracle));
184181
let (utp_talk_req_tx, utp_talk_req_rx) = mpsc::unbounded_channel();
185-
let discv5_utp_socket = portalnet::discovery::Discv5UdpSocket::new(
186-
Arc::clone(&discovery),
187-
utp_talk_req_rx,
188-
header_oracle,
189-
50,
190-
);
182+
let discv5_utp_socket =
183+
portalnet::discovery::Discv5UdpSocket::new(Arc::clone(&discovery), utp_talk_req_rx);
191184
let utp_socket = utp_rs::socket::UtpSocket::with_socket(discv5_utp_socket);
192185
let utp_socket = Arc::new(utp_socket);
193186

0 commit comments

Comments
 (0)