Skip to content

Commit 3732b3f

Browse files
Merge branch 'master' into discv5.2
2 parents 372a782 + f78d538 commit 3732b3f

File tree

7 files changed

+223
-44
lines changed

7 files changed

+223
-44
lines changed

.github/workflows/build.yml

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,6 @@ jobs:
1818
- uses: actions/checkout@v3
1919
- name: Get latest version of stable rust
2020
run: rustup update stable
21-
- name: Install protobuf compiler for the libp2p-core dependency
22-
uses: arduino/setup-protoc@v1
2321
- name: Lint code for quality and style with Clippy
2422
run: cargo clippy --workspace --tests --all-features -- -D warnings
2523
release-tests-ubuntu:
@@ -40,8 +38,6 @@ jobs:
4038
- uses: actions/checkout@v3
4139
- name: Get latest version of stable rust
4240
run: rustup update stable
43-
- name: Install protobuf compiler for the libp2p-core dependency
44-
uses: arduino/setup-protoc@v1
4541
- name: Run tests in release
4642
run: cargo test --all --release --all-features
4743
check-rustdoc-links:

Cargo.toml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,8 @@ exclude = [".gitignore", ".github/*"]
1414
[dependencies]
1515
enr = { version = "0.8.1", features = ["k256", "ed25519"] }
1616
tokio = { version = "1.15.0", features = ["net", "sync", "macros", "rt"] }
17-
libp2p-core = { version = "0.36.0", optional = true }
17+
libp2p-core = { version = "0.40.0", optional = true }
18+
libp2p-identity = { version = "0.2.1", features = ["ed25519", "secp256k1"], optional = true }
1819
zeroize = { version = "1.4.3", features = ["zeroize_derive"] }
1920
futures = "0.3.19"
2021
uint = { version = "0.9.1", default-features = false }
@@ -48,5 +49,5 @@ clap = { version = "3.1", features = ["derive"] }
4849
if-addrs = "0.10.1"
4950

5051
[features]
51-
libp2p = ["libp2p-core"]
52+
libp2p = ["libp2p-core", "libp2p-identity"]
5253
serde = ["enr/serde"]

src/handler/mod.rs

Lines changed: 74 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,12 @@ use session::Session;
7373
// seconds).
7474
const BANNED_NODES_CHECK: u64 = 300; // Check every 5 minutes.
7575

76+
// The one-time session timeout.
77+
const ONE_TIME_SESSION_TIMEOUT: u64 = 30;
78+
79+
// The maximum number of established one-time sessions to maintain.
80+
const ONE_TIME_SESSION_CACHE_CAPACITY: usize = 100;
81+
7682
/// Messages sent from the application layer to `Handler`.
7783
#[derive(Debug, Clone, PartialEq)]
7884
#[allow(clippy::large_enum_variant)]
@@ -191,6 +197,8 @@ pub struct Handler {
191197
active_challenges: HashMapDelay<NodeAddress, Challenge>,
192198
/// Established sessions with peers.
193199
sessions: LruTimeCache<NodeAddress, Session>,
200+
/// Established sessions with peers for a specific request, stored just one per node.
201+
one_time_sessions: LruTimeCache<NodeAddress, (RequestId, Session)>,
194202
/// The channel to receive messages from the application layer.
195203
service_recv: mpsc::UnboundedReceiver<HandlerIn>,
196204
/// The channel to send messages to the application layer.
@@ -281,6 +289,10 @@ impl Handler {
281289
config.session_timeout,
282290
Some(config.session_cache_capacity),
283291
),
292+
one_time_sessions: LruTimeCache::new(
293+
Duration::from_secs(ONE_TIME_SESSION_TIMEOUT),
294+
Some(ONE_TIME_SESSION_CACHE_CAPACITY),
295+
),
284296
active_challenges: HashMapDelay::new(config.request_timeout),
285297
service_recv,
286298
service_send,
@@ -516,23 +528,23 @@ impl Handler {
516528
response: Response,
517529
) {
518530
// Check for an established session
519-
if let Some(session) = self.sessions.get_mut(&node_address) {
520-
// Encrypt the message and send
521-
let packet = match session.encrypt_message::<P>(self.node_id, &response.encode()) {
522-
Ok(packet) => packet,
523-
Err(e) => {
524-
warn!("Could not encrypt response: {:?}", e);
525-
return;
526-
}
527-
};
528-
self.send(node_address, packet).await;
531+
let packet = if let Some(session) = self.sessions.get_mut(&node_address) {
532+
session.encrypt_message::<P>(self.node_id, &response.encode())
533+
} else if let Some(mut session) = self.remove_one_time_session(&node_address, &response.id)
534+
{
535+
session.encrypt_message::<P>(self.node_id, &response.encode())
529536
} else {
530537
// Either the session is being established or has expired. We simply drop the
531538
// response in this case.
532-
warn!(
539+
return warn!(
533540
"Session is not established. Dropping response {} for node: {}",
534541
response, node_address.node_id
535542
);
543+
};
544+
545+
match packet {
546+
Ok(packet) => self.send(node_address, packet).await,
547+
Err(e) => warn!("Could not encrypt response: {:?}", e),
536548
}
537549
}
538550

@@ -780,7 +792,7 @@ impl Handler {
780792
ephem_pubkey,
781793
enr_record,
782794
) {
783-
Ok((session, enr)) => {
795+
Ok((mut session, enr)) => {
784796
// Receiving an AuthResponse must give us an up-to-date view of the node ENR.
785797
// Verify the ENR is valid
786798
if self.verify_enr(&enr, &node_address) {
@@ -820,6 +832,38 @@ impl Handler {
820832
);
821833
self.fail_session(&node_address, RequestError::InvalidRemoteEnr, true)
822834
.await;
835+
836+
// Respond to PING request even if the ENR or NodeAddress don't match
837+
// so that the source node can notice its external IP address has been changed.
838+
let maybe_ping_request = match session.decrypt_message(
839+
message_nonce,
840+
message,
841+
authenticated_data,
842+
) {
843+
Ok(m) => match Message::decode(&m) {
844+
Ok(Message::Request(request)) if request.msg_type() == 1 => {
845+
Some(request)
846+
}
847+
_ => None,
848+
},
849+
_ => None,
850+
};
851+
if let Some(request) = maybe_ping_request {
852+
debug!(
853+
"Responding to a PING request using a one-time session. node_address: {}",
854+
node_address
855+
);
856+
self.one_time_sessions
857+
.insert(node_address.clone(), (request.id.clone(), session));
858+
if let Err(e) = self
859+
.service_send
860+
.send(HandlerOut::Request(node_address.clone(), Box::new(request)))
861+
.await
862+
{
863+
warn!("Failed to report request to application {}", e);
864+
self.one_time_sessions.remove(&node_address);
865+
}
866+
}
823867
}
824868
}
825869
Err(Discv5Error::InvalidChallengeSignature(challenge)) => {
@@ -1119,6 +1163,24 @@ impl Handler {
11191163
}
11201164
}
11211165

1166+
/// Remove one-time session by the given NodeAddress and RequestId if exists.
1167+
fn remove_one_time_session(
1168+
&mut self,
1169+
node_address: &NodeAddress,
1170+
request_id: &RequestId,
1171+
) -> Option<Session> {
1172+
match self.one_time_sessions.peek(node_address) {
1173+
Some((id, _)) if id == request_id => {
1174+
let (_, session) = self
1175+
.one_time_sessions
1176+
.remove(node_address)
1177+
.expect("one-time session must exist");
1178+
Some(session)
1179+
}
1180+
_ => None,
1181+
}
1182+
}
1183+
11221184
/// A request has failed.
11231185
async fn fail_request(
11241186
&mut self,

src/handler/session.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -265,3 +265,11 @@ impl Session {
265265
Ok((packet, session))
266266
}
267267
}
268+
269+
#[cfg(test)]
270+
pub(crate) fn build_dummy_session() -> Session {
271+
Session::new(Keys {
272+
encryption_key: [0; 16],
273+
decryption_key: [0; 16],
274+
})
275+
}

src/handler/tests.rs

Lines changed: 101 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,10 @@ use crate::{
99
};
1010
use std::net::{Ipv4Addr, Ipv6Addr};
1111

12-
use crate::{handler::HandlerOut::RequestFailed, RequestError::SelfRequest};
12+
use crate::{
13+
handler::{session::build_dummy_session, HandlerOut::RequestFailed},
14+
RequestError::SelfRequest,
15+
};
1316
use active_requests::ActiveRequests;
1417
use enr::EnrBuilder;
1518
use std::time::Duration;
@@ -21,6 +24,66 @@ fn init() {
2124
.try_init();
2225
}
2326

27+
async fn build_handler<P: ProtocolIdentity>() -> Handler {
28+
let config = Discv5ConfigBuilder::new(ListenConfig::default()).build();
29+
let key = CombinedKey::generate_secp256k1();
30+
let enr = EnrBuilder::new("v4")
31+
.ip4(Ipv4Addr::LOCALHOST)
32+
.udp4(9000)
33+
.build(&key)
34+
.unwrap();
35+
let mut listen_sockets = SmallVec::default();
36+
listen_sockets.push((Ipv4Addr::LOCALHOST, 9000).into());
37+
let node_id = enr.node_id();
38+
let filter_expected_responses = Arc::new(RwLock::new(HashMap::new()));
39+
40+
let socket = {
41+
let socket_config = {
42+
let filter_config = FilterConfig {
43+
enabled: config.enable_packet_filter,
44+
rate_limiter: config.filter_rate_limiter.clone(),
45+
max_nodes_per_ip: config.filter_max_nodes_per_ip,
46+
max_bans_per_ip: config.filter_max_bans_per_ip,
47+
};
48+
49+
socket::SocketConfig {
50+
executor: config.executor.clone().expect("Executor must exist"),
51+
filter_config,
52+
listen_config: config.listen_config.clone(),
53+
local_node_id: node_id,
54+
expected_responses: filter_expected_responses.clone(),
55+
ban_duration: config.ban_duration,
56+
}
57+
};
58+
59+
Socket::new::<P>(socket_config).await.unwrap()
60+
};
61+
let (_, service_recv) = mpsc::unbounded_channel();
62+
let (service_send, _) = mpsc::channel(50);
63+
let (_, exit) = oneshot::channel();
64+
65+
Handler {
66+
request_retries: config.request_retries,
67+
node_id,
68+
enr: Arc::new(RwLock::new(enr)),
69+
key: Arc::new(RwLock::new(key)),
70+
active_requests: ActiveRequests::new(config.request_timeout),
71+
pending_requests: HashMap::new(),
72+
filter_expected_responses,
73+
sessions: LruTimeCache::new(config.session_timeout, Some(config.session_cache_capacity)),
74+
one_time_sessions: LruTimeCache::new(
75+
Duration::from_secs(ONE_TIME_SESSION_TIMEOUT),
76+
Some(ONE_TIME_SESSION_CACHE_CAPACITY),
77+
),
78+
active_challenges: HashMapDelay::new(config.request_timeout),
79+
service_recv,
80+
service_send,
81+
listen_sockets,
82+
socket,
83+
exit,
84+
}
85+
}
86+
2487
macro_rules! arc_rw {
2588
( $x: expr ) => {
2689
Arc::new(RwLock::new($x))
@@ -353,3 +416,40 @@ async fn test_self_request_ipv6() {
353416
handler_out
354417
);
355418
}
419+
420+
#[tokio::test]
421+
async fn remove_one_time_session() {
422+
let mut handler = build_handler::<DefaultProtocolId>().await;
423+
424+
let enr = {
425+
let key = CombinedKey::generate_secp256k1();
426+
EnrBuilder::new("v4")
427+
.ip4(Ipv4Addr::LOCALHOST)
428+
.udp4(9000)
429+
.build(&key)
430+
.unwrap()
431+
};
432+
let node_address = NodeAddress::new("127.0.0.1:9000".parse().unwrap(), enr.node_id());
433+
let request_id = RequestId::random();
434+
let session = build_dummy_session();
435+
handler
436+
.one_time_sessions
437+
.insert(node_address.clone(), (request_id.clone(), session));
438+
439+
let other_request_id = RequestId::random();
440+
assert!(handler
441+
.remove_one_time_session(&node_address, &other_request_id)
442+
.is_none());
443+
assert_eq!(1, handler.one_time_sessions.len());
444+
445+
let other_node_address = NodeAddress::new("127.0.0.1:9001".parse().unwrap(), enr.node_id());
446+
assert!(handler
447+
.remove_one_time_session(&other_node_address, &request_id)
448+
.is_none());
449+
assert_eq!(1, handler.one_time_sessions.len());
450+
451+
assert!(handler
452+
.remove_one_time_session(&node_address, &request_id)
453+
.is_some());
454+
assert_eq!(0, handler.one_time_sessions.len());
455+
}

src/node_info.rs

Lines changed: 24 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,9 @@ use enr::{CombinedPublicKey, NodeId};
44
use std::net::SocketAddr;
55

66
#[cfg(feature = "libp2p")]
7-
use libp2p_core::{identity::PublicKey, multiaddr::Protocol, multihash, Multiaddr};
7+
use libp2p_core::{multiaddr::Protocol, Multiaddr};
8+
#[cfg(feature = "libp2p")]
9+
use libp2p_identity::{KeyType, PublicKey};
810

911
/// This type relaxes the requirement of having an ENR to connect to a node, to allow for unsigned
1012
/// connection types, such as multiaddrs.
@@ -94,36 +96,34 @@ impl NodeContact {
9496
Protocol::Udp(port) => udp_port = Some(port),
9597
Protocol::Ip4(addr) => ip_addr = Some(addr.into()),
9698
Protocol::Ip6(addr) => ip_addr = Some(addr.into()),
97-
Protocol::P2p(multihash) => p2p = Some(multihash),
99+
Protocol::P2p(peer_id) => p2p = Some(peer_id),
98100
_ => {}
99101
}
100102
}
101103

102104
let udp_port = udp_port.ok_or("A UDP port must be specified in the multiaddr")?;
103105
let ip_addr = ip_addr.ok_or("An IP address must be specified in the multiaddr")?;
104-
let multihash = p2p.ok_or("The p2p protocol must be specified in the multiaddr")?;
105-
106-
// verify the correct key type
107-
if multihash.code() != u64::from(multihash::Code::Identity) {
108-
return Err("The key type is unsupported");
109-
}
110-
111-
let public_key: CombinedPublicKey =
112-
match PublicKey::from_protobuf_encoding(&multihash.to_bytes()[2..])
113-
.map_err(|_| "Invalid public key")?
114-
{
115-
PublicKey::Secp256k1(pk) => {
116-
enr::k256::ecdsa::VerifyingKey::from_sec1_bytes(&pk.encode_uncompressed())
117-
.expect("Libp2p key conversion, always valid")
118-
.into()
119-
}
120-
PublicKey::Ed25519(pk) => {
121-
enr::ed25519_dalek::VerifyingKey::from_bytes(&pk.encode())
122-
.expect("Libp2p key conversion, always valid")
123-
.into()
124-
}
106+
let peer_id = p2p.ok_or("The p2p protocol must be specified in the multiaddr")?;
107+
108+
let public_key: CombinedPublicKey = {
109+
let pk = PublicKey::try_decode_protobuf(&peer_id.to_bytes()[2..])
110+
.map_err(|_| "Invalid public key")?;
111+
match pk.key_type() {
112+
KeyType::Secp256k1 => enr::k256::ecdsa::VerifyingKey::from_sec1_bytes(
113+
&pk.try_into_secp256k1()
114+
.expect("Must be secp256k1")
115+
.to_bytes_uncompressed(),
116+
)
117+
.expect("Libp2p key conversion, always valid")
118+
.into(),
119+
KeyType::Ed25519 => enr::ed25519_dalek::VerifyingKey::from_bytes(
120+
&pk.try_into_ed25519().expect("Must be ed25519").to_bytes(),
121+
)
122+
.expect("Libp2p key conversion, always valid")
123+
.into(),
125124
_ => return Err("The key type is not supported"),
126-
};
125+
}
126+
};
127127

128128
Ok(NodeContact {
129129
public_key,

0 commit comments

Comments
 (0)