Skip to content

Commit cc3271f

Browse files
authored
feat(request-response): add Behaviour::send_request_with_addresses()
This is agreed alternative to #5692, which resolves #5634 Pull-Request: #5938.
1 parent 92d48a5 commit cc3271f

File tree

3 files changed

+141
-5
lines changed

3 files changed

+141
-5
lines changed

protocols/request-response/CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,9 @@
33
- fix: public cbor/json codec module
44
See [PR 5830](https://github.com/libp2p/rust-libp2p/pull/5830).
55

6+
- feat: add `Behaviour::send_request_with_addresses()`
7+
See [PR 5938](https://github.com/libp2p/rust-libp2p/issues/5938).
8+
69
## 0.28.0
710

811
- Deprecate `void` crate.

protocols/request-response/src/lib.rs

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -433,7 +433,20 @@ where
433433
/// > managed via [`libp2p_swarm::Swarm::add_peer_address`].
434434
/// > Addresses are automatically removed when dial attempts
435435
/// > to them fail.
436+
/// > Alternatively, [`Behaviour::send_request_with_addresses`]
437+
/// > can be used.
436438
pub fn send_request(&mut self, peer: &PeerId, request: TCodec::Request) -> OutboundRequestId {
439+
self.send_request_with_addresses(peer, request, Vec::new())
440+
}
441+
442+
/// Like [`Behaviour::send_request`], but additionally using the provided addresses
443+
/// if a connection needs to be established.
444+
pub fn send_request_with_addresses(
445+
&mut self,
446+
peer: &PeerId,
447+
request: TCodec::Request,
448+
addresses: Vec<Multiaddr>,
449+
) -> OutboundRequestId {
437450
let request_id = self.next_outbound_request_id();
438451
let request = OutboundMessage {
439452
request_id,
@@ -443,7 +456,10 @@ where
443456

444457
if let Some(request) = self.try_send_request(peer, request) {
445458
self.pending_events.push_back(ToSwarm::Dial {
446-
opts: DialOpts::peer_id(*peer).build(),
459+
opts: DialOpts::peer_id(*peer)
460+
.addresses(addresses)
461+
.extend_addresses_through_behaviour()
462+
.build(),
447463
});
448464
self.pending_outbound_requests
449465
.entry(*peer)

protocols/request-response/tests/ping.rs

Lines changed: 121 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -43,10 +43,7 @@ async fn is_response_outbound() {
4343

4444
let mut swarm1 = Swarm::new_ephemeral(|_| {
4545
request_response::cbor::Behaviour::<Ping, Pong>::new(
46-
[(
47-
StreamProtocol::new("/ping/1"),
48-
request_response::ProtocolSupport::Full,
49-
)],
46+
[(StreamProtocol::new("/ping/1"), ProtocolSupport::Full)],
5047
request_response::Config::default(),
5148
)
5249
});
@@ -180,6 +177,126 @@ async fn ping_protocol() {
180177
peer2.await;
181178
}
182179

180+
/// Exercises a simple ping protocol where peers are not connected prior to request sending.
181+
#[async_std::test]
182+
#[cfg(feature = "cbor")]
183+
async fn ping_protocol_explicit_address() {
184+
let ping = Ping("ping".to_string().into_bytes());
185+
let pong = Pong("pong".to_string().into_bytes());
186+
187+
let protocols = iter::once((StreamProtocol::new("/ping/1"), ProtocolSupport::Full));
188+
let cfg = request_response::Config::default();
189+
190+
let mut swarm1 = Swarm::new_ephemeral(|_| {
191+
request_response::cbor::Behaviour::<Ping, Pong>::new(protocols.clone(), cfg.clone())
192+
});
193+
let peer1_id = *swarm1.local_peer_id();
194+
let mut swarm2 = Swarm::new_ephemeral(|_| {
195+
request_response::cbor::Behaviour::<Ping, Pong>::new(protocols, cfg)
196+
});
197+
let peer2_id = *swarm2.local_peer_id();
198+
199+
let (peer1_listen_addr, _) = swarm1.listen().with_memory_addr_external().await;
200+
201+
let expected_ping = ping.clone();
202+
let expected_pong = pong.clone();
203+
204+
let peer1 = async move {
205+
loop {
206+
match swarm1.next_swarm_event().await.try_into_behaviour_event() {
207+
Ok(request_response::Event::Message {
208+
peer,
209+
message:
210+
request_response::Message::Request {
211+
request, channel, ..
212+
},
213+
..
214+
}) => {
215+
assert_eq!(&request, &expected_ping);
216+
assert_eq!(&peer, &peer2_id);
217+
swarm1
218+
.behaviour_mut()
219+
.send_response(channel, pong.clone())
220+
.unwrap();
221+
}
222+
Ok(request_response::Event::ResponseSent { peer, .. }) => {
223+
assert_eq!(&peer, &peer2_id);
224+
}
225+
Ok(e) => {
226+
panic!("Peer1: Unexpected event: {e:?}")
227+
}
228+
Err(..) => {}
229+
}
230+
}
231+
};
232+
233+
let peer2 = async {
234+
let req_id = swarm2.behaviour_mut().send_request(&peer1_id, ping.clone());
235+
assert!(swarm2.behaviour().is_pending_outbound(&peer1_id, &req_id));
236+
237+
// Can't dial to unknown peer
238+
match swarm2
239+
.next_swarm_event()
240+
.await
241+
.try_into_behaviour_event()
242+
.unwrap()
243+
{
244+
request_response::Event::OutboundFailure {
245+
peer, request_id, ..
246+
} => {
247+
assert_eq!(&peer, &peer1_id);
248+
assert_eq!(req_id, request_id);
249+
}
250+
e => panic!("Peer2: Unexpected event: {e:?}"),
251+
}
252+
253+
let req_id = swarm2.behaviour_mut().send_request_with_addresses(
254+
&peer1_id,
255+
ping.clone(),
256+
vec![peer1_listen_addr],
257+
);
258+
assert!(swarm2.behaviour().is_pending_outbound(&peer1_id, &req_id));
259+
260+
// Dial to peer with explicit address succeeds
261+
match swarm2.select_next_some().await {
262+
SwarmEvent::Dialing { peer_id, .. } => {
263+
assert_eq!(&peer_id, &Some(peer1_id));
264+
}
265+
e => panic!("Peer2: Unexpected event: {e:?}"),
266+
}
267+
match swarm2.select_next_some().await {
268+
SwarmEvent::ConnectionEstablished { peer_id, .. } => {
269+
assert_eq!(&peer_id, &peer1_id);
270+
}
271+
e => panic!("Peer2: Unexpected event: {e:?}"),
272+
}
273+
match swarm2
274+
.next_swarm_event()
275+
.await
276+
.try_into_behaviour_event()
277+
.unwrap()
278+
{
279+
request_response::Event::Message {
280+
peer,
281+
message:
282+
request_response::Message::Response {
283+
request_id,
284+
response,
285+
},
286+
..
287+
} => {
288+
assert_eq!(&response, &expected_pong);
289+
assert_eq!(&peer, &peer1_id);
290+
assert_eq!(req_id, request_id);
291+
}
292+
e => panic!("Peer2: Unexpected event: {e:?}"),
293+
}
294+
};
295+
296+
async_std::task::spawn(Box::pin(peer1));
297+
peer2.await;
298+
}
299+
183300
#[async_std::test]
184301
#[cfg(feature = "cbor")]
185302
async fn emits_inbound_connection_closed_failure() {

0 commit comments

Comments
 (0)