Skip to content

Commit 715fc82

Browse files
authored
Expose send_ping from service.rs for discv5.rs (#172)
1 parent d86707d commit 715fc82

File tree

2 files changed

+165
-87
lines changed

2 files changed

+165
-87
lines changed

src/discv5.rs

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,11 @@ use tracing::{debug, warn};
3939
use libp2p_core::Multiaddr;
4040

4141
// Create lazy static variable for the global permit/ban list
42-
use crate::metrics::{Metrics, METRICS};
42+
use crate::{
43+
metrics::{Metrics, METRICS},
44+
service::Pong,
45+
};
46+
4347
lazy_static! {
4448
pub static ref PERMIT_BAN_LIST: RwLock<crate::PermitBanList> =
4549
RwLock::new(crate::PermitBanList::default());
@@ -308,6 +312,31 @@ impl<P: ProtocolIdentity> Discv5<P> {
308312
None
309313
}
310314

315+
/// Sends a PING request to a node.
316+
pub fn send_ping(
317+
&mut self,
318+
enr: Enr,
319+
) -> impl Future<Output = Result<Pong, RequestError>> + 'static {
320+
let (callback_send, callback_recv) = oneshot::channel();
321+
let channel = self.clone_channel();
322+
323+
async move {
324+
let channel = channel.map_err(|_| RequestError::ServiceNotStarted)?;
325+
326+
let event = ServiceRequest::Ping(enr, Some(callback_send));
327+
328+
// send the request
329+
channel
330+
.send(event)
331+
.await
332+
.map_err(|_| RequestError::ChannelFailed("Service channel closed".into()))?;
333+
// await the response
334+
callback_recv
335+
.await
336+
.map_err(|e| RequestError::ChannelFailed(e.to_string()))?
337+
}
338+
}
339+
311340
/// Bans a node from the server. This will remove the node from the routing table if it exists
312341
/// and block all incoming packets from the node until the timeout specified. Setting the
313342
/// timeout to `None` creates a permanent ban.

src/service.rs

Lines changed: 135 additions & 86 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,13 @@ use futures::prelude::*;
3838
use more_asserts::debug_unreachable;
3939
use parking_lot::RwLock;
4040
use rpc::*;
41-
use std::{collections::HashMap, net::SocketAddr, sync::Arc, task::Poll, time::Instant};
41+
use std::{
42+
collections::HashMap,
43+
net::{IpAddr, SocketAddr},
44+
sync::Arc,
45+
task::Poll,
46+
time::Instant,
47+
};
4248
use tokio::sync::{mpsc, oneshot};
4349
use tracing::{debug, error, info, trace, warn};
4450

@@ -146,6 +152,8 @@ pub enum ServiceRequest {
146152
Vec<u8>,
147153
oneshot::Sender<Result<Vec<u8>, RequestError>>,
148154
),
155+
/// The PING discv5 RPC function.
156+
Ping(Enr, Option<oneshot::Sender<Result<Pong, RequestError>>>),
149157
/// Sets up an event stream where the discv5 server will return various events such as
150158
/// discovered nodes as it traverses the DHT.
151159
RequestEventStream(oneshot::Sender<mpsc::Receiver<Discv5Event>>),
@@ -213,12 +221,24 @@ struct ActiveRequest {
213221
pub callback: Option<CallbackResponse>,
214222
}
215223

224+
#[derive(Debug)]
225+
pub struct Pong {
226+
/// The current ENR sequence number of the responder.
227+
pub enr_seq: u64,
228+
/// Our external IP address as observed by the responder.
229+
pub ip: IpAddr,
230+
/// Our external UDP port as observed by the responder.
231+
pub port: u16,
232+
}
233+
216234
/// The kinds of responses we can send back to the discv5 layer.
217235
pub enum CallbackResponse {
218236
/// A response to a requested ENR.
219237
Enr(oneshot::Sender<Result<Enr, RequestError>>),
220238
/// A response from a TALK request
221239
Talk(oneshot::Sender<Result<Vec<u8>, RequestError>>),
240+
/// A response from a Pong request
241+
Pong(oneshot::Sender<Result<Pong, RequestError>>),
222242
}
223243

224244
/// For multiple responses to a FindNodes request, this keeps track of the request count
@@ -335,6 +355,9 @@ impl Service {
335355
ServiceRequest::Talk(node_contact, protocol, request, callback) => {
336356
self.talk_request(node_contact, protocol, request, callback);
337357
}
358+
ServiceRequest::Ping(enr, callback) => {
359+
self.send_ping(enr, callback);
360+
}
338361
ServiceRequest::RequestEventStream(callback) => {
339362
// the channel size needs to be large to handle many discovered peers
340363
// if we are reporting them on the event stream.
@@ -428,7 +451,7 @@ impl Service {
428451
};
429452

430453
if let Some(enr) = enr {
431-
self.send_ping(enr);
454+
self.send_ping(enr, None);
432455
}
433456
}
434457
}
@@ -770,102 +793,117 @@ impl Service {
770793
self.discovered(&node_id, nodes, active_request.query_id);
771794
}
772795
ResponseBody::Pong { enr_seq, ip, port } => {
773-
let socket = SocketAddr::new(ip, port);
774-
// perform ENR majority-based update if required.
796+
// Send the response to the user, if they are who asked
797+
if let Some(CallbackResponse::Pong(callback)) = active_request.callback {
798+
let response = Pong { enr_seq, ip, port };
799+
if let Err(e) = callback.send(Ok(response)) {
800+
warn!("Failed to send callback response {:?}", e)
801+
};
802+
} else {
803+
let socket = SocketAddr::new(ip, port);
804+
// perform ENR majority-based update if required.
775805

776-
// Only count votes that from peers we have contacted.
777-
let key: kbucket::Key<NodeId> = node_id.into();
778-
let should_count = matches!(
806+
// Only count votes that from peers we have contacted.
807+
let key: kbucket::Key<NodeId> = node_id.into();
808+
let should_count = matches!(
779809
self.kbuckets.write().entry(&key),
780810
kbucket::Entry::Present(_, status)
781811
if status.is_connected() && !status.is_incoming());
782812

783-
if should_count {
784-
// get the advertised local addresses
785-
let (local_ip4_socket, local_ip6_socket) = {
786-
let local_enr = self.local_enr.read();
787-
(local_enr.udp4_socket(), local_enr.udp6_socket())
788-
};
789-
790-
if let Some(ref mut ip_votes) = self.ip_votes {
791-
ip_votes.insert(node_id, socket);
792-
let (maybe_ip4_majority, maybe_ip6_majority) = ip_votes.majority();
813+
if should_count {
814+
// get the advertised local addresses
815+
let (local_ip4_socket, local_ip6_socket) = {
816+
let local_enr = self.local_enr.read();
817+
(local_enr.udp4_socket(), local_enr.udp6_socket())
818+
};
793819

794-
let new_ip4 = maybe_ip4_majority.and_then(|majority| {
795-
if Some(majority) != local_ip4_socket {
796-
Some(majority)
797-
} else {
798-
None
799-
}
800-
});
801-
let new_ip6 = maybe_ip6_majority.and_then(|majority| {
802-
if Some(majority) != local_ip6_socket {
803-
Some(majority)
804-
} else {
805-
None
806-
}
807-
});
820+
if let Some(ref mut ip_votes) = self.ip_votes {
821+
ip_votes.insert(node_id, socket);
822+
let (maybe_ip4_majority, maybe_ip6_majority) = ip_votes.majority();
808823

809-
if new_ip4.is_some() || new_ip6.is_some() {
810-
let mut updated = false;
811-
812-
// Check if our advertised IPV6 address needs to be updated.
813-
if let Some(new_ip6) = new_ip6 {
814-
let new_ip6: SocketAddr = new_ip6.into();
815-
let result = self
816-
.local_enr
817-
.write()
818-
.set_udp_socket(new_ip6, &self.enr_key.read());
819-
match result {
820-
Ok(_) => {
821-
updated = true;
822-
info!("Local UDP ip6 socket updated to: {}", new_ip6);
823-
self.send_event(Discv5Event::SocketUpdated(new_ip6));
824-
}
825-
Err(e) => {
826-
warn!("Failed to update local UDP ip6 socket. ip6: {}, error: {:?}", new_ip6, e);
827-
}
824+
let new_ip4 = maybe_ip4_majority.and_then(|majority| {
825+
if Some(majority) != local_ip4_socket {
826+
Some(majority)
827+
} else {
828+
None
828829
}
829-
}
830-
if let Some(new_ip4) = new_ip4 {
831-
let new_ip4: SocketAddr = new_ip4.into();
832-
let result = self
833-
.local_enr
834-
.write()
835-
.set_udp_socket(new_ip4, &self.enr_key.read());
836-
match result {
837-
Ok(_) => {
838-
updated = true;
839-
info!("Local UDP socket updated to: {}", new_ip4);
840-
self.send_event(Discv5Event::SocketUpdated(new_ip4));
830+
});
831+
let new_ip6 = maybe_ip6_majority.and_then(|majority| {
832+
if Some(majority) != local_ip6_socket {
833+
Some(majority)
834+
} else {
835+
None
836+
}
837+
});
838+
839+
if new_ip4.is_some() || new_ip6.is_some() {
840+
let mut updated = false;
841+
842+
// Check if our advertised IPV6 address needs to be updated.
843+
if let Some(new_ip6) = new_ip6 {
844+
let new_ip6: SocketAddr = new_ip6.into();
845+
let result = self
846+
.local_enr
847+
.write()
848+
.set_udp_socket(new_ip6, &self.enr_key.read());
849+
match result {
850+
Ok(_) => {
851+
updated = true;
852+
info!(
853+
"Local UDP ip6 socket updated to: {}",
854+
new_ip6,
855+
);
856+
self.send_event(Discv5Event::SocketUpdated(
857+
new_ip6,
858+
));
859+
}
860+
Err(e) => {
861+
warn!("Failed to update local UDP ip6 socket. ip6: {}, error: {:?}", new_ip6, e);
862+
}
841863
}
842-
Err(e) => {
843-
warn!("Failed to update local UDP socket. ip: {}, error: {:?}", new_ip4, e);
864+
}
865+
if let Some(new_ip4) = new_ip4 {
866+
let new_ip4: SocketAddr = new_ip4.into();
867+
let result = self
868+
.local_enr
869+
.write()
870+
.set_udp_socket(new_ip4, &self.enr_key.read());
871+
match result {
872+
Ok(_) => {
873+
updated = true;
874+
info!("Local UDP socket updated to: {}", new_ip4);
875+
self.send_event(Discv5Event::SocketUpdated(
876+
new_ip4,
877+
));
878+
}
879+
Err(e) => {
880+
warn!("Failed to update local UDP socket. ip: {}, error: {:?}", new_ip4, e);
881+
}
844882
}
845883
}
846-
}
847-
if updated {
848-
self.ping_connected_peers();
884+
if updated {
885+
self.ping_connected_peers();
886+
}
849887
}
850888
}
851889
}
852-
}
853890

854-
// check if we need to request a new ENR
855-
if let Some(enr) = self.find_enr(&node_id) {
856-
if enr.seq() < enr_seq {
857-
// request an ENR update
858-
debug!("Requesting an ENR update from: {}", active_request.contact);
859-
let request_body = RequestBody::FindNode { distances: vec![0] };
860-
let active_request = ActiveRequest {
861-
contact: active_request.contact,
862-
request_body,
863-
query_id: None,
864-
callback: None,
865-
};
866-
self.send_rpc_request(active_request);
891+
// check if we need to request a new ENR
892+
if let Some(enr) = self.find_enr(&node_id) {
893+
if enr.seq() < enr_seq {
894+
// request an ENR update
895+
debug!("Requesting an ENR update from: {}", active_request.contact);
896+
let request_body = RequestBody::FindNode { distances: vec![0] };
897+
let active_request = ActiveRequest {
898+
contact: active_request.contact,
899+
request_body,
900+
query_id: None,
901+
callback: None,
902+
};
903+
self.send_rpc_request(active_request);
904+
}
905+
self.connection_updated(node_id, ConnectionStatus::PongReceived(enr));
867906
}
868-
self.connection_updated(node_id, ConnectionStatus::PongReceived(enr));
869907
}
870908
}
871909
ResponseBody::Talk { response } => {
@@ -897,7 +935,11 @@ impl Service {
897935
// Send RPC Requests //
898936

899937
/// Sends a PING request to a node.
900-
fn send_ping(&mut self, enr: Enr) {
938+
fn send_ping(
939+
&mut self,
940+
enr: Enr,
941+
callback: Option<oneshot::Sender<Result<Pong, RequestError>>>,
942+
) {
901943
match NodeContact::try_from_enr(enr, self.config.ip_mode) {
902944
Ok(contact) => {
903945
let request_body = RequestBody::Ping {
@@ -907,7 +949,7 @@ impl Service {
907949
contact,
908950
request_body,
909951
query_id: None,
910-
callback: None,
952+
callback: callback.map(CallbackResponse::Pong),
911953
};
912954
self.send_rpc_request(active_request);
913955
}
@@ -933,7 +975,7 @@ impl Service {
933975
};
934976

935977
for enr in connected_peers {
936-
self.send_ping(enr.clone());
978+
self.send_ping(enr.clone(), None);
937979
}
938980
}
939981

@@ -1341,7 +1383,7 @@ impl Service {
13411383
}
13421384
};
13431385
if let Some(enr) = optional_enr {
1344-
self.send_ping(enr)
1386+
self.send_ping(enr, None)
13451387
}
13461388
}
13471389
}
@@ -1383,6 +1425,13 @@ impl Service {
13831425
.unwrap_or_else(|_| debug!("Couldn't send TALK error response to user"));
13841426
return;
13851427
}
1428+
Some(CallbackResponse::Pong(callback)) => {
1429+
// return the error
1430+
callback
1431+
.send(Err(error))
1432+
.unwrap_or_else(|_| debug!("Couldn't send Pong error response to user"));
1433+
return;
1434+
}
13861435
None => {
13871436
// no callback to send too
13881437
}

0 commit comments

Comments
 (0)