Skip to content

Commit d31e62e

Browse files
Separate handler requests from service requests (#152)
* fix stuff * separate handler requests vs service requests
1 parent 25c92df commit d31e62e

File tree

4 files changed

+145
-72
lines changed

4 files changed

+145
-72
lines changed

src/handler/active_requests.rs

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use super::*;
22
use delay_map::HashMapDelay;
33
use more_asserts::debug_unreachable;
44

5-
pub(crate) struct ActiveRequests {
5+
pub(super) struct ActiveRequests {
66
/// A list of raw messages we are awaiting a response from the remote.
77
active_requests_mapping: HashMapDelay<NodeAddress, RequestCall>,
88
// WHOAREYOU messages do not include the source node id. We therefore maintain another
@@ -13,29 +13,26 @@ pub(crate) struct ActiveRequests {
1313
}
1414

1515
impl ActiveRequests {
16-
pub(crate) fn new(request_timeout: Duration) -> Self {
16+
pub fn new(request_timeout: Duration) -> Self {
1717
ActiveRequests {
1818
active_requests_mapping: HashMapDelay::new(request_timeout),
1919
active_requests_nonce_mapping: HashMap::new(),
2020
}
2121
}
2222

23-
pub(crate) fn insert(&mut self, node_address: NodeAddress, request_call: RequestCall) {
23+
pub fn insert(&mut self, node_address: NodeAddress, request_call: RequestCall) {
2424
let nonce = *request_call.packet().message_nonce();
2525
self.active_requests_mapping
2626
.insert(node_address.clone(), request_call);
2727
self.active_requests_nonce_mapping
2828
.insert(nonce, node_address);
2929
}
3030

31-
pub(crate) fn get(&self, node_address: &NodeAddress) -> Option<&RequestCall> {
31+
pub fn get(&self, node_address: &NodeAddress) -> Option<&RequestCall> {
3232
self.active_requests_mapping.get(node_address)
3333
}
3434

35-
pub(crate) fn remove_by_nonce(
36-
&mut self,
37-
nonce: &MessageNonce,
38-
) -> Option<(NodeAddress, RequestCall)> {
35+
pub fn remove_by_nonce(&mut self, nonce: &MessageNonce) -> Option<(NodeAddress, RequestCall)> {
3936
match self.active_requests_nonce_mapping.remove(nonce) {
4037
Some(node_address) => match self.active_requests_mapping.remove(&node_address) {
4138
Some(request_call) => Some((node_address, request_call)),
@@ -49,7 +46,7 @@ impl ActiveRequests {
4946
}
5047
}
5148

52-
pub(crate) fn remove(&mut self, node_address: &NodeAddress) -> Option<RequestCall> {
49+
pub fn remove(&mut self, node_address: &NodeAddress) -> Option<RequestCall> {
5350
match self.active_requests_mapping.remove(node_address) {
5451
Some(request_call) => {
5552
// Remove the associated nonce mapping.
@@ -75,7 +72,7 @@ impl ActiveRequests {
7572
// this makes is so that if there is a panic, the error is printed in the caller of this
7673
// function.
7774
#[track_caller]
78-
pub(crate) fn check_invariant(&self) {
75+
pub fn check_invariant(&self) {
7976
// First check that for every `MessageNonce` there is an associated `NodeAddress`.
8077
for (nonce, address) in self.active_requests_nonce_mapping.iter() {
8178
if !self.active_requests_mapping.contains_key(address) {

src/handler/mod.rs

Lines changed: 109 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,22 @@ pub struct Challenge {
153153
remote_enr: Option<Enr>,
154154
}
155155

156+
/// Request ID from the handler's perspective.
157+
#[derive(Debug, Clone)]
158+
enum HandlerReqId {
159+
/// Requests made by the handler.
160+
Internal(RequestId),
161+
/// Requests made from outside the handler.
162+
External(RequestId),
163+
}
164+
165+
/// A request queued for sending.
166+
struct PendingRequest {
167+
contact: NodeContact,
168+
request_id: HandlerReqId,
169+
request: RequestBody,
170+
}
171+
156172
/// Process to handle handshakes and sessions established from raw RPC communications between nodes.
157173
pub struct Handler {
158174
/// Configuration for the discv5 service.
@@ -169,7 +185,7 @@ pub struct Handler {
169185
/// The expected responses by SocketAddr which allows packets to pass the underlying filter.
170186
filter_expected_responses: Arc<RwLock<HashMap<SocketAddr, usize>>>,
171187
/// Requests awaiting a handshake completion.
172-
pending_requests: HashMap<NodeAddress, Vec<(NodeContact, Request)>>,
188+
pending_requests: HashMap<NodeAddress, Vec<PendingRequest>>,
173189
/// Currently in-progress outbound handshakes (WHOAREYOU packets) with peers.
174190
active_challenges: HashMapDelay<NodeAddress, Challenge>,
175191
/// Established sessions with peers.
@@ -191,6 +207,7 @@ type HandlerReturn = (
191207
mpsc::UnboundedSender<HandlerIn>,
192208
mpsc::Receiver<HandlerOut>,
193209
);
210+
194211
impl Handler {
195212
/// A new Session service which instantiates the UDP socket send/recv tasks.
196213
pub async fn spawn(
@@ -274,13 +291,13 @@ impl Handler {
274291
Some(handler_request) = self.service_recv.recv() => {
275292
match handler_request {
276293
HandlerIn::Request(contact, request) => {
277-
let id = request.id.clone();
278-
if let Err(request_error) = self.send_request(contact, *request).await {
279-
// If the sending failed report to the application
280-
if let Err(e) = self.service_send.send(HandlerOut::RequestFailed(id, request_error)).await {
281-
warn!("Failed to inform that request failed {}", e)
282-
}
283-
}
294+
let Request { id, body: request } = *request;
295+
if let Err(request_error) = self.send_request(contact, HandlerReqId::External(id.clone()), request).await {
296+
// If the sending failed report to the application
297+
if let Err(e) = self.service_send.send(HandlerOut::RequestFailed(id, request_error)).await {
298+
warn!("Failed to inform that request failed {}", e)
299+
}
300+
}
284301
}
285302
HandlerIn::Response(dst, response) => self.send_response(dst, *response).await,
286303
HandlerIn::WhoAreYou(wru_ref, enr) => self.send_challenge(wru_ref, enr).await,
@@ -395,7 +412,7 @@ impl Handler {
395412
// increment the request retry count and restart the timeout
396413
trace!(
397414
"Resending message: {} to {}",
398-
request_call.request(),
415+
request_call.body(),
399416
node_address
400417
);
401418
self.send(node_address.clone(), request_call.packet().clone())
@@ -409,7 +426,8 @@ impl Handler {
409426
async fn send_request(
410427
&mut self,
411428
contact: NodeContact,
412-
request: Request,
429+
request_id: HandlerReqId,
430+
request: RequestBody,
413431
) -> Result<(), RequestError> {
414432
let node_address = contact.node_address();
415433

@@ -426,15 +444,25 @@ impl Handler {
426444
self.pending_requests
427445
.entry(node_address)
428446
.or_insert_with(Vec::new)
429-
.push((contact, request));
447+
.push(PendingRequest {
448+
contact,
449+
request_id,
450+
request,
451+
});
430452
return Ok(());
431453
}
432454

433455
let (packet, initiating_session) = {
434456
if let Some(session) = self.sessions.get_mut(&node_address) {
435457
// Encrypt the message and send
458+
let request = match &request_id {
459+
HandlerReqId::Internal(id) | HandlerReqId::External(id) => Request {
460+
id: id.clone(),
461+
body: request.clone(),
462+
},
463+
};
436464
let packet = session
437-
.encrypt_message(self.node_id, &request.clone().encode())
465+
.encrypt_message(self.node_id, &request.encode())
438466
.map_err(|e| RequestError::EncryptionFailed(format!("{:?}", e)))?;
439467
(packet, false)
440468
} else {
@@ -450,7 +478,13 @@ impl Handler {
450478
}
451479
};
452480

453-
let call = RequestCall::new(contact, packet.clone(), request, initiating_session);
481+
let call = RequestCall::new(
482+
contact,
483+
packet.clone(),
484+
request_id,
485+
request,
486+
initiating_session,
487+
);
454488
// let the filter know we are expecting a response
455489
self.add_expected_response(node_address.socket_addr);
456490
self.send(node_address.clone(), packet).await;
@@ -593,7 +627,7 @@ impl Handler {
593627
updated_enr,
594628
&self.node_id,
595629
&challenge_data,
596-
&(request_call.request().clone().encode()),
630+
&request_call.encode(),
597631
) {
598632
Ok(v) => v,
599633
Err(e) => {
@@ -625,10 +659,7 @@ impl Handler {
625659
// not have a session for a request) and the packet is not a PING (we are not
626660
// trying to update an old session that may have expired.
627661
let connection_direction = {
628-
match (
629-
request_call.initiating_session(),
630-
&request_call.request().body,
631-
) {
662+
match (request_call.initiating_session(), &request_call.body()) {
632663
(true, RequestBody::Ping { .. }) => ConnectionDirection::Incoming,
633664
(true, _) => ConnectionDirection::Outgoing,
634665
(false, _) => ConnectionDirection::Incoming,
@@ -668,13 +699,12 @@ impl Handler {
668699
self.send(node_address.clone(), auth_packet).await;
669700

670701
let id = RequestId::random();
671-
let request = Request {
672-
id: id.clone(),
673-
body: RequestBody::FindNode { distances: vec![0] },
674-
};
675-
676-
session.awaiting_enr = Some(id);
677-
if let Err(e) = self.send_request(contact, request).await {
702+
let request = RequestBody::FindNode { distances: vec![0] };
703+
session.awaiting_enr = Some(id.clone());
704+
if let Err(e) = self
705+
.send_request(contact, HandlerReqId::Internal(id), request)
706+
.await
707+
{
678708
warn!("Failed to send Enr request {}", e)
679709
}
680710
}
@@ -802,21 +832,35 @@ impl Handler {
802832
self.pending_requests.entry(node_address)
803833
{
804834
// If it exists, there must be a request here
805-
let (contact, request) = entry.get_mut().remove(0);
835+
let PendingRequest {
836+
contact,
837+
request_id,
838+
request,
839+
} = entry.get_mut().remove(0);
806840
if entry.get().is_empty() {
807841
entry.remove();
808842
}
809-
let id = request.id.clone();
810843
trace!("Sending next awaiting message. Node: {}", contact);
811-
if let Err(request_error) = self.send_request(contact, request).await {
844+
if let Err(request_error) = self
845+
.send_request(contact, request_id.clone(), request)
846+
.await
847+
{
812848
warn!("Failed to send next awaiting request {}", request_error);
813849
// Inform the service that the request failed
814-
if let Err(e) = self
815-
.service_send
816-
.send(HandlerOut::RequestFailed(id, request_error))
817-
.await
818-
{
819-
warn!("Failed to inform that request failed {}", e);
850+
match request_id {
851+
HandlerReqId::Internal(_) => {
852+
// An internal request could not be sent. For now we do nothing about
853+
// this.
854+
}
855+
HandlerReqId::External(id) => {
856+
if let Err(e) = self
857+
.service_send
858+
.send(HandlerOut::RequestFailed(id, request_error))
859+
.await
860+
{
861+
warn!("Failed to inform that request failed {}", e);
862+
}
863+
}
820864
}
821865
}
822866
}
@@ -954,7 +998,10 @@ impl Handler {
954998
async fn handle_response(&mut self, node_address: NodeAddress, response: Response) {
955999
// Find a matching request, if any
9561000
if let Some(mut request_call) = self.active_requests.remove(&node_address) {
957-
if request_call.id() != &response.id {
1001+
let id = match request_call.id() {
1002+
HandlerReqId::Internal(id) | HandlerReqId::External(id) => id,
1003+
};
1004+
if id != &response.id {
9581005
trace!(
9591006
"Received an RPC Response to an unknown request. Likely late response. {}",
9601007
node_address
@@ -1055,13 +1102,19 @@ impl Handler {
10551102
) {
10561103
// The Request has expired, remove the session.
10571104
// Fail the current request
1058-
let request_id = request_call.request().id.clone();
1059-
if let Err(e) = self
1060-
.service_send
1061-
.send(HandlerOut::RequestFailed(request_id, error.clone()))
1062-
.await
1063-
{
1064-
warn!("Failed to inform request failure {}", e)
1105+
match request_call.id() {
1106+
HandlerReqId::Internal(_) => {
1107+
// Do not report failures on requests belonging to the handler.
1108+
}
1109+
HandlerReqId::External(id) => {
1110+
if let Err(e) = self
1111+
.service_send
1112+
.send(HandlerOut::RequestFailed(id.clone(), error.clone()))
1113+
.await
1114+
{
1115+
warn!("Failed to inform request failure {}", e)
1116+
}
1117+
}
10651118
}
10661119

10671120
let node_address = request_call.contact().node_address();
@@ -1083,13 +1136,20 @@ impl Handler {
10831136
.store(self.sessions.len(), Ordering::Relaxed);
10841137
}
10851138
if let Some(to_remove) = self.pending_requests.remove(node_address) {
1086-
for request in to_remove {
1087-
if let Err(e) = self
1088-
.service_send
1089-
.send(HandlerOut::RequestFailed(request.1.id, error.clone()))
1090-
.await
1091-
{
1092-
warn!("Failed to inform request failure {}", e)
1139+
for PendingRequest { request_id, .. } in to_remove {
1140+
match request_id {
1141+
HandlerReqId::Internal(_) => {
1142+
// Do not report failures on requests belonging to the handler.
1143+
}
1144+
HandlerReqId::External(id) => {
1145+
if let Err(e) = self
1146+
.service_send
1147+
.send(HandlerOut::RequestFailed(id, error.clone()))
1148+
.await
1149+
{
1150+
warn!("Failed to inform request failure {}", e)
1151+
}
1152+
}
10931153
}
10941154
}
10951155
}

0 commit comments

Comments
 (0)