Skip to content

Commit e984cae

Browse files
authored
Implemented find_node_designated_peer to call a designated peer with distance (#173)
1 parent 0b84ac4 commit e984cae

File tree

2 files changed

+72
-34
lines changed

2 files changed

+72
-34
lines changed

src/discv5.rs

Lines changed: 50 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -495,14 +495,33 @@ impl<P: ProtocolIdentity> Discv5<P> {
495495

496496
let (callback_send, callback_recv) = oneshot::channel();
497497

498-
let event = ServiceRequest::FindEnr(node_contact, callback_send);
498+
let event =
499+
ServiceRequest::FindNodeDesignated(node_contact.clone(), vec![0], callback_send);
500+
501+
// send the request
499502
channel
500503
.send(event)
501504
.await
502505
.map_err(|_| RequestError::ChannelFailed("Service channel closed".into()))?;
503-
callback_recv
506+
// await the response
507+
match callback_recv
504508
.await
505509
.map_err(|e| RequestError::ChannelFailed(e.to_string()))?
510+
{
511+
Ok(mut nodes) => {
512+
// This must be for asking for an ENR
513+
if nodes.len() > 1 {
514+
warn!(
515+
"Peer returned more than one ENR for itself. {}",
516+
node_contact
517+
);
518+
}
519+
nodes
520+
.pop()
521+
.ok_or(RequestError::InvalidEnr("Peer did not return an ENR"))
522+
}
523+
Err(err) => Err(err),
524+
}
506525
}
507526
}
508527

@@ -537,6 +556,35 @@ impl<P: ProtocolIdentity> Discv5<P> {
537556
}
538557
}
539558

559+
/// Send a FINDNODE request for nodes that fall within the given set of distances,
560+
/// to the designated peer and wait for a response.
561+
pub fn find_node_designated_peer(
562+
&self,
563+
enr: Enr,
564+
distances: Vec<u64>,
565+
) -> impl Future<Output = Result<Vec<Enr>, RequestError>> + 'static {
566+
let (callback_send, callback_recv) = oneshot::channel();
567+
let channel = self.clone_channel();
568+
let ip_mode = self.config.ip_mode;
569+
570+
async move {
571+
let node_contact = NodeContact::try_from_enr(enr, ip_mode)?;
572+
let channel = channel.map_err(|_| RequestError::ServiceNotStarted)?;
573+
574+
let event = ServiceRequest::FindNodeDesignated(node_contact, distances, callback_send);
575+
576+
// send the request
577+
channel
578+
.send(event)
579+
.await
580+
.map_err(|_| RequestError::ChannelFailed("Service channel closed".into()))?;
581+
// await the response
582+
callback_recv
583+
.await
584+
.map_err(|e| RequestError::ChannelFailed(e.to_string()))?
585+
}
586+
}
587+
540588
/// Runs an iterative `FIND_NODE` request.
541589
///
542590
/// This will return peers containing contactable nodes of the DHT closest to the

src/service.rs

Lines changed: 22 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -143,8 +143,13 @@ pub enum ServiceRequest {
143143
/// - A Predicate Query - Searches for peers closest to a random target that match a specified
144144
/// predicate.
145145
StartQuery(QueryKind, oneshot::Sender<Vec<Enr>>),
146-
/// Find the ENR of a node given its multiaddr.
147-
FindEnr(NodeContact, oneshot::Sender<Result<Enr, RequestError>>),
146+
/// Send a FINDNODE request for nodes that fall within the given set of distances,
147+
/// to the designated peer and wait for a response.
148+
FindNodeDesignated(
149+
NodeContact,
150+
Vec<u64>,
151+
oneshot::Sender<Result<Vec<Enr>, RequestError>>,
152+
),
148153
/// The TALK discv5 RPC function.
149154
Talk(
150155
NodeContact,
@@ -233,8 +238,8 @@ pub struct Pong {
233238

234239
/// The kinds of responses we can send back to the discv5 layer.
235240
pub enum CallbackResponse {
236-
/// A response to a requested ENR.
237-
Enr(oneshot::Sender<Result<Enr, RequestError>>),
241+
/// A response to a requested Nodes.
242+
Nodes(oneshot::Sender<Result<Vec<Enr>, RequestError>>),
238243
/// A response from a TALK request
239244
Talk(oneshot::Sender<Result<Vec<u8>, RequestError>>),
240245
/// A response from a Pong request
@@ -349,8 +354,8 @@ impl Service {
349354
}
350355
}
351356
}
352-
ServiceRequest::FindEnr(node_contact, callback) => {
353-
self.request_enr(node_contact, Some(callback));
357+
ServiceRequest::FindNodeDesignated(node_contact, distance, callback) => {
358+
self.request_find_node_designated_peer(node_contact, distance, Some(callback));
354359
}
355360
ServiceRequest::Talk(node_contact, protocol, request, callback) => {
356361
self.talk_request(node_contact, protocol, request, callback);
@@ -587,7 +592,7 @@ impl Service {
587592
if let Some(enr) = to_request_enr {
588593
match NodeContact::try_from_enr(enr, self.config.ip_mode) {
589594
Ok(contact) => {
590-
self.request_enr(contact, None);
595+
self.request_find_node_designated_peer(contact, vec![0], None);
591596
}
592597
Err(NonContactable { enr }) => {
593598
debug_unreachable!("Stored ENR is not contactable. {}", enr);
@@ -681,25 +686,9 @@ impl Service {
681686
_ => unreachable!(),
682687
};
683688

684-
// This could be an ENR request from the outer service. If so respond to the
685-
// callback and End.
686-
if let Some(CallbackResponse::Enr(callback)) = active_request.callback.take() {
687-
// Currently only support requesting for ENR's. Verify this is the case.
688-
if !distances_requested.is_empty() && distances_requested[0] != 0 {
689-
error!("Retrieved a callback request that wasn't for a peer's ENR");
690-
return;
691-
}
692-
// This must be for asking for an ENR
693-
if nodes.len() > 1 {
694-
warn!(
695-
"Peer returned more than one ENR for itself. {}",
696-
active_request.contact
697-
);
698-
}
699-
let response = nodes
700-
.pop()
701-
.ok_or(RequestError::InvalidEnr("Peer did not return an ENR"));
702-
if let Err(e) = callback.send(response) {
689+
if let Some(CallbackResponse::Nodes(callback)) = active_request.callback.take()
690+
{
691+
if let Err(e) = callback.send(Ok(nodes)) {
703692
warn!("Failed to send response in callback {:?}", e)
704693
}
705694
return;
@@ -980,17 +969,18 @@ impl Service {
980969
}
981970

982971
/// Request an external node's ENR.
983-
fn request_enr(
972+
fn request_find_node_designated_peer(
984973
&mut self,
985974
contact: NodeContact,
986-
callback: Option<oneshot::Sender<Result<Enr, RequestError>>>,
975+
distances: Vec<u64>,
976+
callback: Option<oneshot::Sender<Result<Vec<Enr>, RequestError>>>,
987977
) {
988-
let request_body = RequestBody::FindNode { distances: vec![0] };
978+
let request_body = RequestBody::FindNode { distances };
989979
let active_request = ActiveRequest {
990980
contact,
991981
request_body,
992982
query_id: None,
993-
callback: callback.map(CallbackResponse::Enr),
983+
callback: callback.map(CallbackResponse::Nodes),
994984
};
995985
self.send_rpc_request(active_request);
996986
}
@@ -1412,10 +1402,10 @@ impl Service {
14121402
// If this is initiated by the user, return an error on the callback. All callbacks
14131403
// support a request error.
14141404
match active_request.callback {
1415-
Some(CallbackResponse::Enr(callback)) => {
1405+
Some(CallbackResponse::Nodes(callback)) => {
14161406
callback
14171407
.send(Err(error))
1418-
.unwrap_or_else(|_| debug!("Couldn't send TALK error response to user"));
1408+
.unwrap_or_else(|_| debug!("Couldn't send Nodes error response to user"));
14191409
return;
14201410
}
14211411
Some(CallbackResponse::Talk(callback)) => {

0 commit comments

Comments
 (0)