Skip to content

Commit e6f84a6

Browse files
authored
Move the request call into its own file (#149)
1 parent 46fb65c commit e6f84a6

File tree

4 files changed

+139
-73
lines changed

4 files changed

+139
-73
lines changed

src/handler/active_requests.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ impl ActiveRequests {
2121
}
2222

2323
pub(crate) fn insert(&mut self, node_address: NodeAddress, request_call: RequestCall) {
24-
let nonce = *request_call.packet.message_nonce();
24+
let nonce = *request_call.packet().message_nonce();
2525
self.active_requests_mapping
2626
.insert(node_address.clone(), request_call);
2727
self.active_requests_nonce_mapping
@@ -55,7 +55,7 @@ impl ActiveRequests {
5555
// Remove the associated nonce mapping.
5656
match self
5757
.active_requests_nonce_mapping
58-
.remove(request_call.packet.message_nonce())
58+
.remove(request_call.packet().message_nonce())
5959
{
6060
Some(_) => Some(request_call),
6161
None => {
@@ -84,7 +84,7 @@ impl ActiveRequests {
8484
}
8585

8686
for (address, request) in self.active_requests_mapping.iter() {
87-
let nonce = request.packet.message_nonce();
87+
let nonce = request.packet().message_nonce();
8888
if !self.active_requests_nonce_mapping.contains_key(nonce) {
8989
panic!("Address {} maps to request with nonce {:?}, which does not exist in `active_requests_nonce_mapping`", address, nonce);
9090
}
@@ -99,7 +99,7 @@ impl Stream for ActiveRequests {
9999
Poll::Ready(Some(Ok((node_address, request_call)))) => {
100100
// Remove the associated nonce mapping.
101101
self.active_requests_nonce_mapping
102-
.remove(request_call.packet.message_nonce());
102+
.remove(request_call.packet().message_nonce());
103103
Poll::Ready(Some(Ok((node_address, request_call))))
104104
}
105105
Poll::Ready(Some(Err(err))) => Poll::Ready(Some(Err(err))),

src/handler/mod.rs

Lines changed: 30 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ use tracing::{debug, error, trace, warn};
5555

5656
mod active_requests;
5757
mod crypto;
58+
mod request_call;
5859
mod session;
5960
mod tests;
6061

@@ -64,6 +65,7 @@ use crate::metrics::METRICS;
6465

6566
use crate::lru_time_cache::LruTimeCache;
6667
use active_requests::ActiveRequests;
68+
use request_call::RequestCall;
6769
use session::Session;
6870

6971
// The time interval to check banned peer timeouts and unban peers when the timeout has elapsed (in
@@ -151,49 +153,6 @@ pub struct Challenge {
151153
remote_enr: Option<Enr>,
152154
}
153155

154-
/// A request to a node that we are waiting for a response.
155-
#[derive(Debug)]
156-
pub(crate) struct RequestCall {
157-
contact: NodeContact,
158-
/// The raw discv5 packet sent.
159-
packet: Packet,
160-
/// The unencrypted message. Required if need to re-encrypt and re-send.
161-
request: Request,
162-
/// Handshakes attempted.
163-
handshake_sent: bool,
164-
/// The number of times this request has been re-sent.
165-
retries: u8,
166-
/// If we receive a Nodes Response with a total greater than 1. This keeps track of the
167-
/// remaining responses expected.
168-
remaining_responses: Option<u64>,
169-
/// Signifies if we are initiating the session with a random packet. This is only used to
170-
/// determine the connection direction of the session.
171-
initiating_session: bool,
172-
}
173-
174-
impl RequestCall {
175-
fn new(
176-
contact: NodeContact,
177-
packet: Packet,
178-
request: Request,
179-
initiating_session: bool,
180-
) -> Self {
181-
RequestCall {
182-
contact,
183-
packet,
184-
request,
185-
handshake_sent: false,
186-
retries: 1,
187-
remaining_responses: None,
188-
initiating_session,
189-
}
190-
}
191-
192-
fn id(&self) -> &RequestId {
193-
&self.request.id
194-
}
195-
}
196-
197156
/// Process to handle handshakes and sessions established from raw RPC communications between nodes.
198157
pub struct Handler {
199158
/// Configuration for the discv5 service.
@@ -432,7 +391,7 @@ impl Handler {
432391
node_address: NodeAddress,
433392
mut request_call: RequestCall,
434393
) {
435-
if request_call.retries >= self.request_retries {
394+
if request_call.retries() >= self.request_retries {
436395
trace!("Request timed out with {}", node_address);
437396
// Remove the request from the awaiting packet_filter
438397
self.remove_expected_response(node_address.socket_addr);
@@ -443,12 +402,12 @@ impl Handler {
443402
// increment the request retry count and restart the timeout
444403
trace!(
445404
"Resending message: {} to {}",
446-
request_call.request,
405+
request_call.request(),
447406
node_address
448407
);
449-
self.send(node_address.clone(), request_call.packet.clone())
408+
self.send(node_address.clone(), request_call.packet().clone())
450409
.await;
451-
request_call.retries += 1;
410+
request_call.increment_retries();
452411
self.active_requests.insert(node_address, request_call);
453412
}
454413
}
@@ -599,26 +558,26 @@ impl Handler {
599558
};
600559

601560
// double check the message nonces match
602-
if request_call.packet.message_nonce() != &request_nonce {
561+
if request_call.packet().message_nonce() != &request_nonce {
603562
// This could theoretically happen if a peer uses the same node id across
604563
// different connections.
605-
warn!("Received a WHOAREYOU from a non expected source. Source: {}, message_nonce {} , expected_nonce: {}", request_call.contact, hex::encode(request_call.packet.message_nonce()), hex::encode(request_nonce));
564+
warn!("Received a WHOAREYOU from a non expected source. Source: {}, message_nonce {} , expected_nonce: {}", request_call.contact(), hex::encode(request_call.packet().message_nonce()), hex::encode(request_nonce));
606565
// NOTE: Both mappings are removed in this case.
607566
return;
608567
}
609568

610569
trace!(
611570
"Received a WHOAREYOU packet response. Source: {}",
612-
request_call.contact
571+
request_call.contact()
613572
);
614573

615574
// We do not allow multiple WHOAREYOU packets for a single challenge request. If we have
616575
// already sent a WHOAREYOU ourselves, we drop sessions who send us a WHOAREYOU in
617576
// response.
618-
if request_call.handshake_sent {
577+
if request_call.handshake_sent() {
619578
warn!(
620579
"Authentication response already sent. Dropping session. Node: {}",
621-
request_call.contact
580+
request_call.contact()
622581
);
623582
self.fail_request(request_call, RequestError::InvalidRemotePacket, true)
624583
.await;
@@ -636,12 +595,12 @@ impl Handler {
636595

637596
// Generate a new session and authentication packet
638597
let (auth_packet, mut session) = match Session::encrypt_with_header(
639-
&request_call.contact,
598+
request_call.contact(),
640599
self.key.clone(),
641600
updated_enr,
642601
&self.node_id,
643602
&challenge_data,
644-
&(request_call.request.clone().encode()),
603+
&(request_call.request().clone().encode()),
645604
) {
646605
Ok(v) => v,
647606
Err(e) => {
@@ -665,15 +624,18 @@ impl Handler {
665624
//
666625
// All sent requests must have an associated node_id. Therefore the following
667626
// must not panic.
668-
let node_address = request_call.contact.node_address();
669-
match request_call.contact.enr() {
627+
let node_address = request_call.contact().node_address();
628+
match request_call.contact().enr() {
670629
Some(enr) => {
671630
// NOTE: Here we decide if the session is outgoing or ingoing. The condition for an
672631
// outgoing session is that we originally sent a RANDOM packet (signifying we did
673632
// not have a session for a request) and the packet is not a PING (we are not
674633
// trying to update an old session that may have expired.
675634
let connection_direction = {
676-
match (&request_call.initiating_session, &request_call.request.body) {
635+
match (
636+
request_call.initiating_session(),
637+
&request_call.request().body,
638+
) {
677639
(true, RequestBody::Ping { .. }) => ConnectionDirection::Incoming,
678640
(true, _) => ConnectionDirection::Outgoing,
679641
(false, _) => ConnectionDirection::Incoming,
@@ -682,9 +644,9 @@ impl Handler {
682644

683645
// We already know the ENR. Send the handshake response packet
684646
trace!("Sending Authentication response to node: {}", node_address);
685-
request_call.packet = auth_packet.clone();
686-
request_call.handshake_sent = true;
687-
request_call.initiating_session = false;
647+
request_call.update_packet(auth_packet.clone());
648+
request_call.set_handshake_sent();
649+
request_call.set_initiating_session(false);
688650
// Reinsert the request_call
689651
self.insert_active_request(request_call);
690652
// Send the actual packet to the send task.
@@ -704,10 +666,10 @@ impl Handler {
704666
// Don't know the ENR. Establish the session, but request an ENR also
705667

706668
// Send the Auth response
707-
let contact = request_call.contact.clone();
669+
let contact = request_call.contact().clone();
708670
trace!("Sending Authentication response to node: {}", node_address);
709-
request_call.packet = auth_packet.clone();
710-
request_call.handshake_sent = true;
671+
request_call.update_packet(auth_packet.clone());
672+
request_call.set_handshake_sent();
711673
// Reinsert the request_call
712674
self.insert_active_request(request_call);
713675
self.send(node_address.clone(), auth_packet).await;
@@ -1016,7 +978,7 @@ impl Handler {
1016978
if let ResponseBody::Nodes { total, .. } = response.body {
1017979
if total > 1 {
1018980
// This is a multi-response Nodes response
1019-
if let Some(remaining_responses) = request_call.remaining_responses.as_mut() {
981+
if let Some(remaining_responses) = request_call.remaining_responses_mut() {
1020982
*remaining_responses -= 1;
1021983
if remaining_responses != &0 {
1022984
// more responses remaining, add back the request and send the response
@@ -1034,7 +996,7 @@ impl Handler {
1034996
}
1035997
} else {
1036998
// This is the first instance
1037-
request_call.remaining_responses = Some(total - 1);
999+
*request_call.remaining_responses_mut() = Some(total - 1);
10381000
// add back the request and send the response
10391001
self.active_requests
10401002
.insert(node_address.clone(), request_call);
@@ -1074,7 +1036,7 @@ impl Handler {
10741036

10751037
/// Inserts a request and associated auth_tag mapping.
10761038
fn insert_active_request(&mut self, request_call: RequestCall) {
1077-
let node_address = request_call.contact.node_address();
1039+
let node_address = request_call.contact().node_address();
10781040

10791041
// adds the mapping of message nonce to node address
10801042
self.active_requests.insert(node_address, request_call);
@@ -1100,7 +1062,7 @@ impl Handler {
11001062
) {
11011063
// The Request has expired, remove the session.
11021064
// Fail the current request
1103-
let request_id = request_call.request.id;
1065+
let request_id = request_call.request().id.clone();
11041066
if let Err(e) = self
11051067
.service_send
11061068
.send(HandlerOut::RequestFailed(request_id, error.clone()))
@@ -1109,7 +1071,7 @@ impl Handler {
11091071
warn!("Failed to inform request failure {}", e)
11101072
}
11111073

1112-
let node_address = request_call.contact.node_address();
1074+
let node_address = request_call.contact().node_address();
11131075
self.fail_session(&node_address, error, remove_session)
11141076
.await;
11151077
}

src/handler/request_call.rs

Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
pub use crate::node_info::{NodeAddress, NodeContact};
2+
use crate::{
3+
packet::Packet,
4+
rpc::{Request, RequestId},
5+
};
6+
7+
/// A request to a node that we are waiting for a response.
8+
#[derive(Debug)]
9+
pub(crate) struct RequestCall {
10+
contact: NodeContact,
11+
/// The raw discv5 packet sent.
12+
packet: Packet,
13+
/// The unencrypted message. Required if need to re-encrypt and re-send.
14+
request: Request,
15+
/// Handshakes attempted.
16+
handshake_sent: bool,
17+
/// The number of times this request has been re-sent.
18+
retries: u8,
19+
/// If we receive a Nodes Response with a total greater than 1. This keeps track of the
20+
/// remaining responses expected.
21+
remaining_responses: Option<u64>,
22+
/// Signifies if we are initiating the session with a random packet. This is only used to
23+
/// determine the connection direction of the session.
24+
initiating_session: bool,
25+
}
26+
27+
impl RequestCall {
28+
pub fn new(
29+
contact: NodeContact,
30+
packet: Packet,
31+
request: Request,
32+
initiating_session: bool,
33+
) -> Self {
34+
RequestCall {
35+
contact,
36+
packet,
37+
request,
38+
handshake_sent: false,
39+
retries: 1,
40+
remaining_responses: None,
41+
initiating_session,
42+
}
43+
}
44+
45+
/// Returns the contact associated with this call.
46+
pub fn contact(&self) -> &NodeContact {
47+
&self.contact
48+
}
49+
50+
/// Returns the id associated with this call.
51+
pub fn id(&self) -> &RequestId {
52+
&self.request.id
53+
}
54+
55+
/// Returns the associated request for this call.
56+
pub fn request(&self) -> &Request {
57+
&self.request
58+
}
59+
60+
/// Returns the packet associated with this call.
61+
pub fn packet(&self) -> &Packet {
62+
&self.packet
63+
}
64+
65+
/// The number of times this request has been resent.
66+
pub fn retries(&self) -> u8 {
67+
self.retries
68+
}
69+
70+
/// Increments the number of retries for this call
71+
pub fn increment_retries(&mut self) {
72+
self.retries += 1;
73+
}
74+
75+
/// Returns whether the handshake has been sent for this call or not.
76+
pub fn handshake_sent(&self) -> bool {
77+
self.handshake_sent
78+
}
79+
80+
/// Indicates the handshake has been sent.
81+
pub fn set_handshake_sent(&mut self) {
82+
self.handshake_sent = true;
83+
}
84+
85+
/// Indicates a session has been initiated for this call.
86+
pub fn set_initiating_session(&mut self, state: bool) {
87+
self.initiating_session = state;
88+
}
89+
90+
/// Returns whether a session is being initiated for this call.
91+
pub fn initiating_session(&self) -> bool {
92+
self.initiating_session
93+
}
94+
95+
/// Updates the underlying packet for the call.
96+
pub fn update_packet(&mut self, packet: Packet) {
97+
self.packet = packet;
98+
}
99+
100+
/// Gets a mutable reference to the remaining repsonses.
101+
pub fn remaining_responses_mut(&mut self) -> &mut Option<u64> {
102+
&mut self.remaining_responses
103+
}
104+
}

src/handler/tests.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -247,7 +247,7 @@ async fn test_active_requests_insert() {
247247
let request_call = RequestCall::new(contact, packet, request, initiating_session);
248248

249249
// insert the pair and verify the mapping remains in sync
250-
let nonce = *request_call.packet.message_nonce();
250+
let nonce = *request_call.packet().message_nonce();
251251
active_requests.insert(node_address, request_call);
252252
active_requests.check_invariant();
253253
active_requests.remove_by_nonce(&nonce);

0 commit comments

Comments
 (0)