Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions protocols/request-response/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
- fix: public cbor/json codec module
See [PR 5830](https://github.com/libp2p/rust-libp2p/pull/5830).

- feat: add `Behaviour::send_request_with_addresses()`
See [PR 5938](https://github.com/libp2p/rust-libp2p/issues/5938).

## 0.28.0

- Deprecate `void` crate.
Expand Down
18 changes: 17 additions & 1 deletion protocols/request-response/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,20 @@ where
/// > managed via [`libp2p_swarm::Swarm::add_peer_address`].
/// > Addresses are automatically removed when dial attempts
/// > to them fail.
/// > Alternatively, [`Behaviour::send_request_with_addresses`]
/// > can be used.
pub fn send_request(&mut self, peer: &PeerId, request: TCodec::Request) -> OutboundRequestId {
self.send_request_with_addresses(peer, request, Vec::new())
}

/// Like [`Behaviour::send_request`], but additionally using the provided addresses
/// if a connection needs to be established.
pub fn send_request_with_addresses(
&mut self,
peer: &PeerId,
request: TCodec::Request,
addresses: Vec<Multiaddr>,
) -> OutboundRequestId {
let request_id = self.next_outbound_request_id();
let request = OutboundMessage {
request_id,
Expand All @@ -443,7 +456,10 @@ where

if let Some(request) = self.try_send_request(peer, request) {
self.pending_events.push_back(ToSwarm::Dial {
opts: DialOpts::peer_id(*peer).build(),
opts: DialOpts::peer_id(*peer)
.addresses(addresses)
.extend_addresses_through_behaviour()
.build(),
});
self.pending_outbound_requests
.entry(*peer)
Expand Down
125 changes: 121 additions & 4 deletions protocols/request-response/tests/ping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,7 @@ async fn is_response_outbound() {

let mut swarm1 = Swarm::new_ephemeral(|_| {
request_response::cbor::Behaviour::<Ping, Pong>::new(
[(
StreamProtocol::new("/ping/1"),
request_response::ProtocolSupport::Full,
)],
[(StreamProtocol::new("/ping/1"), ProtocolSupport::Full)],
request_response::Config::default(),
)
});
Expand Down Expand Up @@ -180,6 +177,126 @@ async fn ping_protocol() {
peer2.await;
}

/// Exercises a simple ping protocol where peers are not connected prior to request sending.
#[async_std::test]
#[cfg(feature = "cbor")]
async fn ping_protocol_explicit_address() {
let ping = Ping("ping".to_string().into_bytes());
let pong = Pong("pong".to_string().into_bytes());

let protocols = iter::once((StreamProtocol::new("/ping/1"), ProtocolSupport::Full));
let cfg = request_response::Config::default();

let mut swarm1 = Swarm::new_ephemeral(|_| {
request_response::cbor::Behaviour::<Ping, Pong>::new(protocols.clone(), cfg.clone())
});
let peer1_id = *swarm1.local_peer_id();
let mut swarm2 = Swarm::new_ephemeral(|_| {
request_response::cbor::Behaviour::<Ping, Pong>::new(protocols, cfg)
});
let peer2_id = *swarm2.local_peer_id();

let (peer1_listen_addr, _) = swarm1.listen().with_memory_addr_external().await;

let expected_ping = ping.clone();
let expected_pong = pong.clone();

let peer1 = async move {
loop {
match swarm1.next_swarm_event().await.try_into_behaviour_event() {
Ok(request_response::Event::Message {
peer,
message:
request_response::Message::Request {
request, channel, ..
},
..
}) => {
assert_eq!(&request, &expected_ping);
assert_eq!(&peer, &peer2_id);
swarm1
.behaviour_mut()
.send_response(channel, pong.clone())
.unwrap();
}
Ok(request_response::Event::ResponseSent { peer, .. }) => {
assert_eq!(&peer, &peer2_id);
}
Ok(e) => {
panic!("Peer1: Unexpected event: {e:?}")
}
Err(..) => {}
}
}
};

let peer2 = async {
let req_id = swarm2.behaviour_mut().send_request(&peer1_id, ping.clone());
assert!(swarm2.behaviour().is_pending_outbound(&peer1_id, &req_id));

// Can't dial to unknown peer
match swarm2
.next_swarm_event()
.await
.try_into_behaviour_event()
.unwrap()
{
request_response::Event::OutboundFailure {
peer, request_id, ..
} => {
assert_eq!(&peer, &peer1_id);
assert_eq!(req_id, request_id);
}
e => panic!("Peer2: Unexpected event: {e:?}"),
}

let req_id = swarm2.behaviour_mut().send_request_with_addresses(
&peer1_id,
ping.clone(),
vec![peer1_listen_addr],
);
assert!(swarm2.behaviour().is_pending_outbound(&peer1_id, &req_id));

// Dial to peer with explicit address succeeds
match swarm2.select_next_some().await {
SwarmEvent::Dialing { peer_id, .. } => {
assert_eq!(&peer_id, &Some(peer1_id));
}
e => panic!("Peer2: Unexpected event: {e:?}"),
}
match swarm2.select_next_some().await {
SwarmEvent::ConnectionEstablished { peer_id, .. } => {
assert_eq!(&peer_id, &peer1_id);
}
e => panic!("Peer2: Unexpected event: {e:?}"),
}
match swarm2
.next_swarm_event()
.await
.try_into_behaviour_event()
.unwrap()
{
request_response::Event::Message {
peer,
message:
request_response::Message::Response {
request_id,
response,
},
..
} => {
assert_eq!(&response, &expected_pong);
assert_eq!(&peer, &peer1_id);
assert_eq!(req_id, request_id);
}
e => panic!("Peer2: Unexpected event: {e:?}"),
}
};

async_std::task::spawn(Box::pin(peer1));
peer2.await;
}

#[async_std::test]
#[cfg(feature = "cbor")]
async fn emits_inbound_connection_closed_failure() {
Expand Down
Loading