Skip to content

Commit c56bb62

Browse files
authored
feat: update utp-rs dependency (#1652)
1 parent 79369a0 commit c56bb62

File tree

13 files changed

+202
-155
lines changed

13 files changed

+202
-155
lines changed

Cargo.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ tree_hash_derive = "0.8.0"
9595
uds_windows = "1.0.1"
9696
ureq = { version = "2.5.0", features = ["json"] }
9797
url = "2.3.1"
98-
utp-rs = { tag = "v0.1.0-alpha.15", git = "https://github.com/ethereum/utp" }
98+
utp-rs = { git = "https://github.com/ethereum/utp", tag = "v0.1.0-alpha.16" }
9999

100100
# Trin workspace crates
101101
e2store = { path = "crates/e2store" }

crates/portalnet/src/discovery.rs

Lines changed: 100 additions & 86 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,7 @@
11
use std::{
2-
fmt,
3-
hash::{Hash, Hasher},
4-
io,
2+
fmt, io,
53
net::{Ipv4Addr, SocketAddr},
4+
ops::Deref,
65
str::FromStr,
76
sync::Arc,
87
time::Duration,
@@ -26,7 +25,10 @@ use parking_lot::RwLock;
2625
use tokio::sync::{mpsc, RwLock as TokioRwLock};
2726
use tracing::{debug, info, warn};
2827
use trin_validation::oracle::HeaderOracle;
29-
use utp_rs::{cid::ConnectionPeer, udp::AsyncUdpSocket};
28+
use utp_rs::{
29+
peer::{ConnectionPeer, Peer},
30+
udp::AsyncUdpSocket,
31+
};
3032

3133
use super::config::PortalnetConfig;
3234
use crate::socket;
@@ -357,113 +359,75 @@ impl Discv5UdpSocket {
357359
header_oracle,
358360
}
359361
}
360-
361-
async fn find_enr(&mut self, node_id: &NodeId) -> io::Result<UtpEnr> {
362-
if let Some(cached_enr) = self.enr_cache.write().await.get(node_id).cloned() {
363-
return Ok(UtpEnr(cached_enr));
364-
}
365-
366-
if let Some(enr) = self.discv5.find_enr(node_id) {
367-
self.enr_cache.write().await.put(*node_id, enr.clone());
368-
return Ok(UtpEnr(enr));
369-
}
370-
371-
if let Some(enr) = self.discv5.cached_node_addr(node_id) {
372-
self.enr_cache.write().await.put(*node_id, enr.enr.clone());
373-
return Ok(UtpEnr(enr.enr));
374-
}
375-
376-
let history_jsonrpc_tx = self.header_oracle.read().await.history_jsonrpc_tx();
377-
if let Ok(history_jsonrpc_tx) = history_jsonrpc_tx {
378-
if let Ok(enr) = HeaderOracle::history_get_enr(node_id, history_jsonrpc_tx).await {
379-
self.enr_cache.write().await.put(*node_id, enr.clone());
380-
return Ok(UtpEnr(enr));
381-
}
382-
}
383-
384-
let state_jsonrpc_tx = self.header_oracle.read().await.state_jsonrpc_tx();
385-
if let Ok(state_jsonrpc_tx) = state_jsonrpc_tx {
386-
if let Ok(enr) = HeaderOracle::state_get_enr(node_id, state_jsonrpc_tx).await {
387-
self.enr_cache.write().await.put(*node_id, enr.clone());
388-
return Ok(UtpEnr(enr));
389-
}
390-
}
391-
392-
let beacon_jsonrpc_tx = self.header_oracle.read().await.beacon_jsonrpc_tx();
393-
if let Ok(beacon_jsonrpc_tx) = beacon_jsonrpc_tx {
394-
if let Ok(enr) = HeaderOracle::beacon_get_enr(node_id, beacon_jsonrpc_tx).await {
395-
self.enr_cache.write().await.put(*node_id, enr.clone());
396-
return Ok(UtpEnr(enr));
397-
}
398-
}
399-
400-
debug!(node_id = %node_id, "uTP packet from unknown source");
401-
Err(io::Error::new(
402-
io::ErrorKind::Other,
403-
"ENR not found for talk req destination",
404-
))
405-
}
406362
}
407363

408364
/// A wrapper around `Enr` that implements `ConnectionPeer`.
409365
#[derive(Clone)]
410-
pub struct UtpEnr(pub Enr);
366+
pub struct UtpPeer(pub Enr);
411367

412-
impl UtpEnr {
413-
pub fn node_id(&self) -> NodeId {
414-
self.0.node_id()
368+
impl Deref for UtpPeer {
369+
type Target = Enr;
370+
371+
fn deref(&self) -> &Self::Target {
372+
&self.0
415373
}
374+
}
416375

376+
impl UtpPeer {
417377
pub fn client(&self) -> Option<String> {
418-
self.0
419-
.get_decodable::<String>(ENR_PORTAL_CLIENT_KEY)
378+
self.get_decodable::<String>(ENR_PORTAL_CLIENT_KEY)
420379
.and_then(|v| v.ok())
421380
}
422381
}
423382

424-
impl std::fmt::Debug for UtpEnr {
383+
impl std::fmt::Debug for UtpPeer {
425384
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
426385
let peer_client_type = self.client().unwrap_or_else(|| "Unknown".to_string());
427-
f.debug_struct("UtpEnr")
386+
f.debug_struct("UtpPeer")
428387
.field("enr", &self.0)
429388
.field("Peer Client Type", &peer_client_type)
430389
.finish()
431390
}
432391
}
433392

434-
// Why are we implementing Hash, PartialEq, Eq for UtpEnr?
435-
// UtpEnr is used as an element of the key for a Connections HashTable in our uTP library.
436-
// Enr's can change and are not stable, so if we initiate a ``connect_with_cid`` we are inserting
437-
// our known Enr for the peer, but if the peer has a more upto date Enr, values will be different
438-
// and the Hash for the old Enr and New Enr will be different, along with equating the two structs
439-
// will return false. This leads us to a situation where our peer sends us a uTP messages back and
440-
// our code thinks the same peer is instead 2 different peers causing uTP to ignore the messages. We
441-
// fixed this by implementing Eq and Hash only using the NodeId of the Enr as it is the only stable
442-
// non-updatable field in the Enr.
443-
impl Hash for UtpEnr {
444-
fn hash<H: Hasher>(&self, state: &mut H) {
445-
self.0.node_id().hash(state);
393+
impl ConnectionPeer for UtpPeer {
394+
type Id = NodeId;
395+
396+
fn id(&self) -> Self::Id {
397+
self.node_id()
446398
}
447-
}
448399

449-
impl PartialEq for UtpEnr {
450-
fn eq(&self, other: &Self) -> bool {
451-
self.0.node_id() == other.0.node_id()
400+
fn consolidate(a: Self, b: Self) -> Self {
401+
assert!(a.id() == b.id());
402+
if a.seq() >= b.seq() {
403+
a
404+
} else {
405+
b
406+
}
452407
}
453408
}
454409

455-
impl Eq for UtpEnr {}
456-
457-
impl ConnectionPeer for UtpEnr {}
458-
459410
#[async_trait]
460-
impl AsyncUdpSocket<UtpEnr> for Discv5UdpSocket {
461-
async fn send_to(&mut self, buf: &[u8], target: &UtpEnr) -> io::Result<usize> {
411+
impl AsyncUdpSocket<UtpPeer> for Discv5UdpSocket {
412+
async fn send_to(&mut self, buf: &[u8], peer: &Peer<UtpPeer>) -> io::Result<usize> {
413+
let peer_id = *peer.id();
414+
let peer_enr = peer.peer().cloned();
462415
let discv5 = Arc::clone(&self.discv5);
463-
let target = target.0.clone();
416+
let enr_cache = Arc::clone(&self.enr_cache);
417+
let header_oracle = Arc::clone(&self.header_oracle);
464418
let data = buf.to_vec();
465419
tokio::spawn(async move {
466-
match discv5.send_talk_req(target, Subnetwork::Utp, data).await {
420+
let enr = match peer_enr {
421+
Some(enr) => enr.0,
422+
None => match find_enr(&peer_id, &discv5, enr_cache, header_oracle).await {
423+
Ok(enr) => enr,
424+
Err(err) => {
425+
warn!(%err, "unable to send uTP talk request, ENR not found");
426+
return;
427+
}
428+
},
429+
};
430+
match discv5.send_talk_req(enr, Subnetwork::Utp, data).await {
467431
// We drop the talk response because it is ignored in the uTP protocol.
468432
Ok(..) => {}
469433
Err(err) => match err {
@@ -476,11 +440,10 @@ impl AsyncUdpSocket<UtpEnr> for Discv5UdpSocket {
476440
Ok(buf.len())
477441
}
478442

479-
async fn recv_from(&mut self, buf: &mut [u8]) -> io::Result<(usize, UtpEnr)> {
443+
async fn recv_from(&mut self, buf: &mut [u8]) -> io::Result<(usize, Peer<UtpPeer>)> {
480444
match self.talk_request_receiver.recv().await {
481445
Some(talk_req) => {
482-
let src_node_id = talk_req.node_id();
483-
let enr = self.find_enr(src_node_id).await?;
446+
let node_id = *talk_req.node_id();
484447
let packet = talk_req.body();
485448
let n = std::cmp::min(buf.len(), packet.len());
486449
buf[..n].copy_from_slice(&packet[..n]);
@@ -490,9 +453,60 @@ impl AsyncUdpSocket<UtpEnr> for Discv5UdpSocket {
490453
warn!(%err, "failed to respond to uTP talk request");
491454
}
492455

493-
Ok((n, enr))
456+
Ok((n, Peer::new_id(node_id)))
494457
}
495458
None => Err(io::Error::from(io::ErrorKind::NotConnected)),
496459
}
497460
}
498461
}
462+
463+
async fn find_enr(
464+
node_id: &NodeId,
465+
discv5: &Arc<Discovery>,
466+
enr_cache: Arc<TokioRwLock<LruCache<NodeId, Enr>>>,
467+
header_oracle: Arc<TokioRwLock<HeaderOracle>>,
468+
) -> io::Result<Enr> {
469+
if let Some(cached_enr) = enr_cache.write().await.get(node_id).cloned() {
470+
return Ok(cached_enr);
471+
}
472+
473+
if let Some(enr) = discv5.find_enr(node_id) {
474+
enr_cache.write().await.put(*node_id, enr.clone());
475+
return Ok(enr);
476+
}
477+
478+
if let Some(enr) = discv5.cached_node_addr(node_id) {
479+
enr_cache.write().await.put(*node_id, enr.enr.clone());
480+
return Ok(enr.enr);
481+
}
482+
483+
let history_jsonrpc_tx = header_oracle.read().await.history_jsonrpc_tx();
484+
if let Ok(history_jsonrpc_tx) = history_jsonrpc_tx {
485+
if let Ok(enr) = HeaderOracle::history_get_enr(node_id, history_jsonrpc_tx).await {
486+
enr_cache.write().await.put(*node_id, enr.clone());
487+
return Ok(enr);
488+
}
489+
}
490+
491+
let state_jsonrpc_tx = header_oracle.read().await.state_jsonrpc_tx();
492+
if let Ok(state_jsonrpc_tx) = state_jsonrpc_tx {
493+
if let Ok(enr) = HeaderOracle::state_get_enr(node_id, state_jsonrpc_tx).await {
494+
enr_cache.write().await.put(*node_id, enr.clone());
495+
return Ok(enr);
496+
}
497+
}
498+
499+
let beacon_jsonrpc_tx = header_oracle.read().await.beacon_jsonrpc_tx();
500+
if let Ok(beacon_jsonrpc_tx) = beacon_jsonrpc_tx {
501+
if let Ok(enr) = HeaderOracle::beacon_get_enr(node_id, beacon_jsonrpc_tx).await {
502+
enr_cache.write().await.put(*node_id, enr.clone());
503+
return Ok(enr);
504+
}
505+
}
506+
507+
debug!(node_id = %node_id, "uTP packet to unknown target");
508+
Err(io::Error::new(
509+
io::ErrorKind::Other,
510+
"ENR not found for talk req destination",
511+
))
512+
}

crates/portalnet/src/overlay/protocol.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ use utp_rs::socket::UtpSocket;
4242
use super::{ping_extensions::PingExtension, service::OverlayService};
4343
use crate::{
4444
bootnodes::Bootnode,
45-
discovery::{Discovery, UtpEnr},
45+
discovery::{Discovery, UtpPeer},
4646
events::EventEnvelope,
4747
find::query_info::{FindContentResult, RecursiveFindContentResult},
4848
overlay::{
@@ -105,7 +105,7 @@ impl<
105105
pub async fn new(
106106
config: OverlayConfig,
107107
discovery: Arc<Discovery>,
108-
utp_socket: Arc<UtpSocket<UtpEnr>>,
108+
utp_socket: Arc<UtpSocket<UtpPeer>>,
109109
store: Arc<RwLock<TStore>>,
110110
protocol: Subnetwork,
111111
validator: Arc<TValidator>,
@@ -498,10 +498,10 @@ impl<
498498
let cid = utp_rs::cid::ConnectionId {
499499
recv: conn_id,
500500
send: conn_id.wrapping_add(1),
501-
peer: UtpEnr(enr),
501+
peer_id: enr.node_id(),
502502
};
503503
self.utp_controller
504-
.connect_inbound_stream(cid)
504+
.connect_inbound_stream(cid, UtpPeer(enr))
505505
.await
506506
.map_err(|err| OverlayRequestError::ContentNotFound {
507507
message: format!("Unable to locate content on the network: {err:?}"),

0 commit comments

Comments
 (0)