Skip to content

Commit d2e30e0

Browse files
authored
expected_responses remains after challenge has been completed (#210)
* Replace `Handler::spawn` with `build_handler()` and `Handler::start()` * Test the handler's states after the handler has been terminated * Remove expected response on handle_auth_message() * Rename variables for readability
1 parent 1439dec commit d2e30e0

File tree

2 files changed

+86
-50
lines changed

2 files changed

+86
-50
lines changed

src/handler/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -793,6 +793,8 @@ impl Handler {
793793
enr_record,
794794
) {
795795
Ok((mut session, enr)) => {
796+
// Remove the expected response for the challenge.
797+
self.remove_expected_response(node_address.socket_addr);
796798
// Receiving an AuthResponse must give us an up-to-date view of the node ENR.
797799
// Verify the ENR is valid
798800
if self.verify_enr(&enr, &node_address) {

src/handler/tests.rs

Lines changed: 84 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -24,14 +24,16 @@ fn init() {
2424
.try_init();
2525
}
2626

27-
async fn build_handler<P: ProtocolIdentity>() -> Handler {
28-
let config = ConfigBuilder::new(ListenConfig::default()).build();
29-
let key = CombinedKey::generate_secp256k1();
30-
let enr = EnrBuilder::new("v4")
31-
.ip4(Ipv4Addr::LOCALHOST)
32-
.udp4(9000)
33-
.build(&key)
34-
.unwrap();
27+
async fn build_handler<P: ProtocolIdentity>(
28+
enr: Enr,
29+
key: CombinedKey,
30+
config: Config,
31+
) -> (
32+
oneshot::Sender<()>,
33+
mpsc::UnboundedSender<HandlerIn>,
34+
mpsc::Receiver<HandlerOut>,
35+
Handler,
36+
) {
3537
let mut listen_sockets = SmallVec::default();
3638
listen_sockets.push((Ipv4Addr::LOCALHOST, 9000).into());
3739
let node_id = enr.node_id();
@@ -58,11 +60,11 @@ async fn build_handler<P: ProtocolIdentity>() -> Handler {
5860

5961
Socket::new::<P>(socket_config).await.unwrap()
6062
};
61-
let (_, service_recv) = mpsc::unbounded_channel();
62-
let (service_send, _) = mpsc::channel(50);
63-
let (_, exit) = oneshot::channel();
63+
let (handler_send, service_recv) = mpsc::unbounded_channel();
64+
let (service_send, handler_recv) = mpsc::channel(50);
65+
let (exit_sender, exit) = oneshot::channel();
6466

65-
Handler {
67+
let handler = Handler {
6668
request_retries: config.request_retries,
6769
node_id,
6870
enr: Arc::new(RwLock::new(enr)),
@@ -81,7 +83,8 @@ async fn build_handler<P: ProtocolIdentity>() -> Handler {
8183
listen_sockets,
8284
socket,
8385
exit,
84-
}
86+
};
87+
(exit_sender, handler_send, handler_recv, handler)
8588
}
8689

8790
macro_rules! arc_rw {
@@ -194,47 +197,58 @@ async fn multiple_messages() {
194197
.udp4(sender_port)
195198
.build(&key1)
196199
.unwrap();
197-
let sender_listen_config = ListenConfig::Ipv4 {
198-
ip: sender_enr.ip4().unwrap(),
199-
port: sender_enr.udp4().unwrap(),
200-
};
201-
let sender_config = ConfigBuilder::new(sender_listen_config).build();
202200

203201
let receiver_enr = EnrBuilder::new("v4")
204202
.ip4(ip)
205203
.udp4(receiver_port)
206204
.build(&key2)
207205
.unwrap();
208-
let receiver_listen_config = ListenConfig::Ipv4 {
209-
ip: receiver_enr.ip4().unwrap(),
210-
port: receiver_enr.udp4().unwrap(),
206+
207+
// Build sender handler
208+
let (sender_exit, sender_send, mut sender_recv, mut handler) = {
209+
let sender_listen_config = ListenConfig::Ipv4 {
210+
ip: sender_enr.ip4().unwrap(),
211+
port: sender_enr.udp4().unwrap(),
212+
};
213+
let sender_config = ConfigBuilder::new(sender_listen_config).build();
214+
build_handler::<DefaultProtocolId>(sender_enr.clone(), key1, sender_config).await
215+
};
216+
let sender = async move {
217+
// Start sender handler.
218+
handler.start::<DefaultProtocolId>().await;
219+
// After the handler has been terminated test the handler's states.
220+
assert!(handler.pending_requests.is_empty());
221+
assert_eq!(0, handler.active_requests.count().await);
222+
assert!(handler.active_challenges.is_empty());
223+
assert!(handler.filter_expected_responses.read().is_empty());
211224
};
212-
let receiver_config = ConfigBuilder::new(receiver_listen_config).build();
213-
214-
let (_exit_send, sender_handler, mut sender_handler_recv) =
215-
Handler::spawn::<DefaultProtocolId>(
216-
arc_rw!(sender_enr.clone()),
217-
arc_rw!(key1),
218-
sender_config,
219-
)
220-
.await
221-
.unwrap();
222225

223-
let (_exit_recv, recv_send, mut receiver_handler) = Handler::spawn::<DefaultProtocolId>(
224-
arc_rw!(receiver_enr.clone()),
225-
arc_rw!(key2),
226-
receiver_config,
227-
)
228-
.await
229-
.unwrap();
226+
// Build receiver handler
227+
let (receiver_exit, receiver_send, mut receiver_recv, mut handler) = {
228+
let receiver_listen_config = ListenConfig::Ipv4 {
229+
ip: receiver_enr.ip4().unwrap(),
230+
port: receiver_enr.udp4().unwrap(),
231+
};
232+
let receiver_config = ConfigBuilder::new(receiver_listen_config).build();
233+
build_handler::<DefaultProtocolId>(receiver_enr.clone(), key2, receiver_config).await
234+
};
235+
let receiver = async move {
236+
// Start receiver handler.
237+
handler.start::<DefaultProtocolId>().await;
238+
// After the handler has been terminated test the handler's states.
239+
assert!(handler.pending_requests.is_empty());
240+
assert_eq!(0, handler.active_requests.count().await);
241+
assert!(handler.active_challenges.is_empty());
242+
assert!(handler.filter_expected_responses.read().is_empty());
243+
};
230244

231245
let send_message = Box::new(Request {
232246
id: RequestId(vec![1]),
233247
body: RequestBody::Ping { enr_seq: 1 },
234248
});
235249

236250
// sender to send the first message then await for the session to be established
237-
let _ = sender_handler.send(HandlerIn::Request(
251+
let _ = sender_send.send(HandlerIn::Request(
238252
receiver_enr.clone().into(),
239253
send_message.clone(),
240254
));
@@ -253,35 +267,46 @@ async fn multiple_messages() {
253267
let mut message_count = 0usize;
254268
let recv_send_message = send_message.clone();
255269

256-
let sender = async move {
270+
let sender_ops = async move {
271+
let mut response_count = 0usize;
257272
loop {
258-
match sender_handler_recv.recv().await {
273+
match sender_recv.recv().await {
259274
Some(HandlerOut::Established(_, _, _)) => {
260275
// now the session is established, send the rest of the messages
261276
for _ in 0..messages_to_send - 1 {
262-
let _ = sender_handler.send(HandlerIn::Request(
277+
let _ = sender_send.send(HandlerIn::Request(
263278
receiver_enr.clone().into(),
264279
send_message.clone(),
265280
));
266281
}
267282
}
283+
Some(HandlerOut::Response(_, _)) => {
284+
response_count += 1;
285+
if response_count == messages_to_send {
286+
// Notify the handlers that the message exchange has been completed.
287+
sender_exit.send(()).unwrap();
288+
receiver_exit.send(()).unwrap();
289+
return;
290+
}
291+
}
268292
_ => continue,
269293
};
270294
}
271295
};
272296

273-
let receiver = async move {
297+
let receiver_ops = async move {
274298
loop {
275-
match receiver_handler.recv().await {
299+
match receiver_recv.recv().await {
276300
Some(HandlerOut::WhoAreYou(wru_ref)) => {
277-
let _ = recv_send.send(HandlerIn::WhoAreYou(wru_ref, Some(sender_enr.clone())));
301+
let _ =
302+
receiver_send.send(HandlerIn::WhoAreYou(wru_ref, Some(sender_enr.clone())));
278303
}
279304
Some(HandlerOut::Request(addr, request)) => {
280305
assert_eq!(request, recv_send_message);
281306
message_count += 1;
282307
// required to send a pong response to establish the session
283-
let _ =
284-
recv_send.send(HandlerIn::Response(addr, Box::new(pong_response.clone())));
308+
let _ = receiver_send
309+
.send(HandlerIn::Response(addr, Box::new(pong_response.clone())));
285310
if message_count == messages_to_send {
286311
return;
287312
}
@@ -294,10 +319,12 @@ async fn multiple_messages() {
294319
};
295320

296321
let sleep_future = sleep(Duration::from_millis(100));
322+
let message_exchange = async move {
323+
let _ = tokio::join!(sender, sender_ops, receiver, receiver_ops);
324+
};
297325

298326
tokio::select! {
299-
_ = sender => {}
300-
_ = receiver => {}
327+
_ = message_exchange => {}
301328
_ = sleep_future => {
302329
panic!("Test timed out");
303330
}
@@ -419,7 +446,14 @@ async fn test_self_request_ipv6() {
419446

420447
#[tokio::test]
421448
async fn remove_one_time_session() {
422-
let mut handler = build_handler::<DefaultProtocolId>().await;
449+
let config = ConfigBuilder::new(ListenConfig::default()).build();
450+
let key = CombinedKey::generate_secp256k1();
451+
let enr = EnrBuilder::new("v4")
452+
.ip4(Ipv4Addr::LOCALHOST)
453+
.udp4(9000)
454+
.build(&key)
455+
.unwrap();
456+
let (_, _, _, mut handler) = build_handler::<DefaultProtocolId>(enr, key, config).await;
423457

424458
let enr = {
425459
let key = CombinedKey::generate_secp256k1();

0 commit comments

Comments
 (0)