Skip to content

Commit 546e19c

Browse files
njgheorghitaackintoshdivagant-martianAgeManning
authored
Add support for concurrent requests to a single peer. (#200)
Co-authored-by: ackintosh <[email protected]> Co-authored-by: Diva M <[email protected]> Co-authored-by: Age Manning <[email protected]>
1 parent aa12e38 commit 546e19c

File tree

7 files changed

+1022
-147
lines changed

7 files changed

+1022
-147
lines changed

src/discv5.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ lazy_static! {
4949
RwLock::new(crate::PermitBanList::default());
5050
}
5151

52-
mod test;
52+
pub(crate) mod test;
5353

5454
/// Events that can be produced by the `Discv5` event stream.
5555
#[derive(Debug)]

src/discv5/test.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ async fn build_nodes_from_keypairs_dual_stack(
116116
}
117117

118118
/// Generate `n` deterministic keypairs from a given seed.
119-
fn generate_deterministic_keypair(n: usize, seed: u64) -> Vec<CombinedKey> {
119+
pub(crate) fn generate_deterministic_keypair(n: usize, seed: u64) -> Vec<CombinedKey> {
120120
let mut keypairs = Vec::new();
121121
for i in 0..n {
122122
let sk = {

src/handler/active_requests.rs

Lines changed: 132 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -1,68 +1,142 @@
11
use super::*;
22
use delay_map::HashMapDelay;
33
use more_asserts::debug_unreachable;
4+
use std::collections::hash_map::Entry;
45

56
pub(super) struct ActiveRequests {
67
/// A list of raw messages we are awaiting a response from the remote.
7-
active_requests_mapping: HashMapDelay<NodeAddress, RequestCall>,
8+
active_requests_mapping: HashMap<NodeAddress, Vec<RequestCall>>,
89
// WHOAREYOU messages do not include the source node id. We therefore maintain another
910
// mapping of active_requests via message_nonce. This allows us to match WHOAREYOU
1011
// requests with active requests sent.
11-
/// A mapping of all pending active raw requests message nonces to their NodeAddress.
12-
active_requests_nonce_mapping: HashMap<MessageNonce, NodeAddress>,
12+
/// A mapping of all active raw requests message nonces to their NodeAddress.
13+
active_requests_nonce_mapping: HashMapDelay<MessageNonce, NodeAddress>,
1314
}
1415

1516
impl ActiveRequests {
1617
pub fn new(request_timeout: Duration) -> Self {
1718
ActiveRequests {
18-
active_requests_mapping: HashMapDelay::new(request_timeout),
19-
active_requests_nonce_mapping: HashMap::new(),
19+
active_requests_mapping: HashMap::new(),
20+
active_requests_nonce_mapping: HashMapDelay::new(request_timeout),
2021
}
2122
}
2223

24+
/// Insert a new request into the active requests mapping.
2325
pub fn insert(&mut self, node_address: NodeAddress, request_call: RequestCall) {
2426
let nonce = *request_call.packet().message_nonce();
2527
self.active_requests_mapping
26-
.insert(node_address.clone(), request_call);
28+
.entry(node_address.clone())
29+
.or_default()
30+
.push(request_call);
2731
self.active_requests_nonce_mapping
2832
.insert(nonce, node_address);
2933
}
3034

31-
pub fn get(&self, node_address: &NodeAddress) -> Option<&RequestCall> {
35+
/// Update the underlying packet for the request via message nonce.
36+
pub fn update_packet(&mut self, old_nonce: MessageNonce, new_packet: Packet) {
37+
let node_address =
38+
if let Some(node_address) = self.active_requests_nonce_mapping.remove(&old_nonce) {
39+
node_address
40+
} else {
41+
debug_unreachable!("expected to find nonce in active_requests_nonce_mapping");
42+
error!("expected to find nonce in active_requests_nonce_mapping");
43+
return;
44+
};
45+
46+
self.active_requests_nonce_mapping
47+
.insert(new_packet.header.message_nonce, node_address.clone());
48+
49+
match self.active_requests_mapping.entry(node_address) {
50+
Entry::Occupied(mut requests) => {
51+
let maybe_request_call = requests
52+
.get_mut()
53+
.iter_mut()
54+
.find(|req| req.packet().message_nonce() == &old_nonce);
55+
56+
if let Some(request_call) = maybe_request_call {
57+
request_call.update_packet(new_packet);
58+
} else {
59+
debug_unreachable!("expected to find request call in active_requests_mapping");
60+
error!("expected to find request call in active_requests_mapping");
61+
}
62+
}
63+
Entry::Vacant(_) => {
64+
debug_unreachable!("expected to find node address in active_requests_mapping");
65+
error!("expected to find node address in active_requests_mapping");
66+
}
67+
}
68+
}
69+
70+
pub fn get(&self, node_address: &NodeAddress) -> Option<&Vec<RequestCall>> {
3271
self.active_requests_mapping.get(node_address)
3372
}
3473

74+
/// Remove a single request identified by its nonce.
3575
pub fn remove_by_nonce(&mut self, nonce: &MessageNonce) -> Option<(NodeAddress, RequestCall)> {
36-
match self.active_requests_nonce_mapping.remove(nonce) {
37-
Some(node_address) => match self.active_requests_mapping.remove(&node_address) {
38-
Some(request_call) => Some((node_address, request_call)),
39-
None => {
40-
debug_unreachable!("A matching request call doesn't exist");
41-
error!("A matching request call doesn't exist");
42-
None
76+
let node_address = self.active_requests_nonce_mapping.remove(nonce)?;
77+
match self.active_requests_mapping.entry(node_address.clone()) {
78+
Entry::Vacant(_) => {
79+
debug_unreachable!("expected to find node address in active_requests_mapping");
80+
error!("expected to find node address in active_requests_mapping");
81+
None
82+
}
83+
Entry::Occupied(mut requests) => {
84+
let result = requests
85+
.get()
86+
.iter()
87+
.position(|req| req.packet().message_nonce() == nonce)
88+
.map(|index| (node_address, requests.get_mut().remove(index)));
89+
if requests.get().is_empty() {
90+
requests.remove();
4391
}
44-
},
45-
None => None,
92+
result
93+
}
4694
}
4795
}
4896

49-
pub fn remove(&mut self, node_address: &NodeAddress) -> Option<RequestCall> {
50-
match self.active_requests_mapping.remove(node_address) {
51-
Some(request_call) => {
52-
// Remove the associated nonce mapping.
53-
match self
54-
.active_requests_nonce_mapping
55-
.remove(request_call.packet().message_nonce())
56-
{
57-
Some(_) => Some(request_call),
58-
None => {
59-
debug_unreachable!("A matching nonce mapping doesn't exist");
60-
error!("A matching nonce mapping doesn't exist");
61-
None
62-
}
97+
/// Remove all requests associated with a node.
98+
pub fn remove_requests(&mut self, node_address: &NodeAddress) -> Option<Vec<RequestCall>> {
99+
let requests = self.active_requests_mapping.remove(node_address)?;
100+
// Account for node addresses in `active_requests_nonce_mapping` with an empty list
101+
if requests.is_empty() {
102+
debug_unreachable!("expected to find requests in active_requests_mapping");
103+
return None;
104+
}
105+
for req in &requests {
106+
if self
107+
.active_requests_nonce_mapping
108+
.remove(req.packet().message_nonce())
109+
.is_none()
110+
{
111+
debug_unreachable!("expected to find req with nonce");
112+
error!("expected to find req with nonce");
113+
}
114+
}
115+
Some(requests)
116+
}
117+
118+
/// Remove a single request identified by its id.
119+
pub fn remove_request(
120+
&mut self,
121+
node_address: &NodeAddress,
122+
id: &RequestId,
123+
) -> Option<RequestCall> {
124+
match self.active_requests_mapping.entry(node_address.clone()) {
125+
Entry::Vacant(_) => None,
126+
Entry::Occupied(mut requests) => {
127+
let index = requests.get().iter().position(|req| {
128+
let req_id: RequestId = req.id().into();
129+
&req_id == id
130+
})?;
131+
let request_call = requests.get_mut().remove(index);
132+
if requests.get().is_empty() {
133+
requests.remove();
63134
}
135+
// Remove the associated nonce mapping.
136+
self.active_requests_nonce_mapping
137+
.remove(request_call.packet().message_nonce());
138+
Some(request_call)
64139
}
65-
None => None,
66140
}
67141
}
68142

@@ -80,10 +154,12 @@ impl ActiveRequests {
80154
}
81155
}
82156

83-
for (address, request) in self.active_requests_mapping.iter() {
84-
let nonce = request.packet().message_nonce();
85-
if !self.active_requests_nonce_mapping.contains_key(nonce) {
86-
panic!("Address {} maps to request with nonce {:?}, which does not exist in `active_requests_nonce_mapping`", address, nonce);
157+
for (address, requests) in self.active_requests_mapping.iter() {
158+
for req in requests {
159+
let nonce = req.packet().message_nonce();
160+
if !self.active_requests_nonce_mapping.contains_key(nonce) {
161+
panic!("Address {} maps to request with nonce {:?}, which does not exist in `active_requests_nonce_mapping`", address, nonce);
162+
}
87163
}
88164
}
89165
}
@@ -92,12 +168,27 @@ impl ActiveRequests {
92168
impl Stream for ActiveRequests {
93169
type Item = Result<(NodeAddress, RequestCall), String>;
94170
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
95-
match self.active_requests_mapping.poll_next_unpin(cx) {
96-
Poll::Ready(Some(Ok((node_address, request_call)))) => {
97-
// Remove the associated nonce mapping.
98-
self.active_requests_nonce_mapping
99-
.remove(request_call.packet().message_nonce());
100-
Poll::Ready(Some(Ok((node_address, request_call))))
171+
match self.active_requests_nonce_mapping.poll_next_unpin(cx) {
172+
Poll::Ready(Some(Ok((nonce, node_address)))) => {
173+
match self.active_requests_mapping.entry(node_address.clone()) {
174+
Entry::Vacant(_) => Poll::Ready(None),
175+
Entry::Occupied(mut requests) => {
176+
match requests
177+
.get()
178+
.iter()
179+
.position(|req| req.packet().message_nonce() == &nonce)
180+
{
181+
Some(index) => {
182+
let result = (node_address, requests.get_mut().remove(index));
183+
if requests.get().is_empty() {
184+
requests.remove();
185+
}
186+
Poll::Ready(Some(Ok(result)))
187+
}
188+
None => Poll::Ready(None),
189+
}
190+
}
191+
}
101192
}
102193
Poll::Ready(Some(Err(err))) => Poll::Ready(Some(Err(err))),
103194
Poll::Ready(None) => Poll::Ready(None),

0 commit comments

Comments
 (0)