diff --git a/protocols/request-response/CHANGELOG.md b/protocols/request-response/CHANGELOG.md index 54ed5eb3c50..8d978e88104 100644 --- a/protocols/request-response/CHANGELOG.md +++ b/protocols/request-response/CHANGELOG.md @@ -3,6 +3,9 @@ - fix: public cbor/json codec module See [PR 5830](https://github.com/libp2p/rust-libp2p/pull/5830). +- feat: add `Behaviour::send_request_with_addresses()` + See [PR 5938](https://github.com/libp2p/rust-libp2p/issues/5938). + ## 0.28.0 - Deprecate `void` crate. diff --git a/protocols/request-response/src/lib.rs b/protocols/request-response/src/lib.rs index 7fba089b04e..0e9afdc7ae0 100644 --- a/protocols/request-response/src/lib.rs +++ b/protocols/request-response/src/lib.rs @@ -433,7 +433,20 @@ where /// > managed via [`libp2p_swarm::Swarm::add_peer_address`]. /// > Addresses are automatically removed when dial attempts /// > to them fail. + /// > Alternatively, [`Behaviour::send_request_with_addresses`] + /// > can be used. pub fn send_request(&mut self, peer: &PeerId, request: TCodec::Request) -> OutboundRequestId { + self.send_request_with_addresses(peer, request, Vec::new()) + } + + /// Like [`Behaviour::send_request`], but additionally using the provided addresses + /// if a connection needs to be established. + pub fn send_request_with_addresses( + &mut self, + peer: &PeerId, + request: TCodec::Request, + addresses: Vec, + ) -> OutboundRequestId { let request_id = self.next_outbound_request_id(); let request = OutboundMessage { request_id, @@ -443,7 +456,10 @@ where if let Some(request) = self.try_send_request(peer, request) { self.pending_events.push_back(ToSwarm::Dial { - opts: DialOpts::peer_id(*peer).build(), + opts: DialOpts::peer_id(*peer) + .addresses(addresses) + .extend_addresses_through_behaviour() + .build(), }); self.pending_outbound_requests .entry(*peer) diff --git a/protocols/request-response/tests/ping.rs b/protocols/request-response/tests/ping.rs index 94adedac2d7..1abd08023b3 100644 --- a/protocols/request-response/tests/ping.rs +++ b/protocols/request-response/tests/ping.rs @@ -43,10 +43,7 @@ async fn is_response_outbound() { let mut swarm1 = Swarm::new_ephemeral(|_| { request_response::cbor::Behaviour::::new( - [( - StreamProtocol::new("/ping/1"), - request_response::ProtocolSupport::Full, - )], + [(StreamProtocol::new("/ping/1"), ProtocolSupport::Full)], request_response::Config::default(), ) }); @@ -180,6 +177,126 @@ async fn ping_protocol() { peer2.await; } +/// Exercises a simple ping protocol where peers are not connected prior to request sending. +#[async_std::test] +#[cfg(feature = "cbor")] +async fn ping_protocol_explicit_address() { + let ping = Ping("ping".to_string().into_bytes()); + let pong = Pong("pong".to_string().into_bytes()); + + let protocols = iter::once((StreamProtocol::new("/ping/1"), ProtocolSupport::Full)); + let cfg = request_response::Config::default(); + + let mut swarm1 = Swarm::new_ephemeral(|_| { + request_response::cbor::Behaviour::::new(protocols.clone(), cfg.clone()) + }); + let peer1_id = *swarm1.local_peer_id(); + let mut swarm2 = Swarm::new_ephemeral(|_| { + request_response::cbor::Behaviour::::new(protocols, cfg) + }); + let peer2_id = *swarm2.local_peer_id(); + + let (peer1_listen_addr, _) = swarm1.listen().with_memory_addr_external().await; + + let expected_ping = ping.clone(); + let expected_pong = pong.clone(); + + let peer1 = async move { + loop { + match swarm1.next_swarm_event().await.try_into_behaviour_event() { + Ok(request_response::Event::Message { + peer, + message: + request_response::Message::Request { + request, channel, .. + }, + .. + }) => { + assert_eq!(&request, &expected_ping); + assert_eq!(&peer, &peer2_id); + swarm1 + .behaviour_mut() + .send_response(channel, pong.clone()) + .unwrap(); + } + Ok(request_response::Event::ResponseSent { peer, .. }) => { + assert_eq!(&peer, &peer2_id); + } + Ok(e) => { + panic!("Peer1: Unexpected event: {e:?}") + } + Err(..) => {} + } + } + }; + + let peer2 = async { + let req_id = swarm2.behaviour_mut().send_request(&peer1_id, ping.clone()); + assert!(swarm2.behaviour().is_pending_outbound(&peer1_id, &req_id)); + + // Can't dial to unknown peer + match swarm2 + .next_swarm_event() + .await + .try_into_behaviour_event() + .unwrap() + { + request_response::Event::OutboundFailure { + peer, request_id, .. + } => { + assert_eq!(&peer, &peer1_id); + assert_eq!(req_id, request_id); + } + e => panic!("Peer2: Unexpected event: {e:?}"), + } + + let req_id = swarm2.behaviour_mut().send_request_with_addresses( + &peer1_id, + ping.clone(), + vec![peer1_listen_addr], + ); + assert!(swarm2.behaviour().is_pending_outbound(&peer1_id, &req_id)); + + // Dial to peer with explicit address succeeds + match swarm2.select_next_some().await { + SwarmEvent::Dialing { peer_id, .. } => { + assert_eq!(&peer_id, &Some(peer1_id)); + } + e => panic!("Peer2: Unexpected event: {e:?}"), + } + match swarm2.select_next_some().await { + SwarmEvent::ConnectionEstablished { peer_id, .. } => { + assert_eq!(&peer_id, &peer1_id); + } + e => panic!("Peer2: Unexpected event: {e:?}"), + } + match swarm2 + .next_swarm_event() + .await + .try_into_behaviour_event() + .unwrap() + { + request_response::Event::Message { + peer, + message: + request_response::Message::Response { + request_id, + response, + }, + .. + } => { + assert_eq!(&response, &expected_pong); + assert_eq!(&peer, &peer1_id); + assert_eq!(req_id, request_id); + } + e => panic!("Peer2: Unexpected event: {e:?}"), + } + }; + + async_std::task::spawn(Box::pin(peer1)); + peer2.await; +} + #[async_std::test] #[cfg(feature = "cbor")] async fn emits_inbound_connection_closed_failure() {