Skip to content

Commit 05fe824

Browse files
committed
feat(request-response): Add support for custom dial options when making request to disconnected peer
1 parent b057f91 commit 05fe824

File tree

10 files changed

+249
-62
lines changed

10 files changed

+249
-62
lines changed

Cargo.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ libp2p-pnet = { version = "0.25.0", path = "transports/pnet" }
9696
libp2p-quic = { version = "0.11.2", path = "transports/quic" }
9797
libp2p-relay = { version = "0.18.1", path = "protocols/relay" }
9898
libp2p-rendezvous = { version = "0.15.0", path = "protocols/rendezvous" }
99-
libp2p-request-response = { version = "0.27.1", path = "protocols/request-response" }
99+
libp2p-request-response = { version = "0.28.0", path = "protocols/request-response" }
100100
libp2p-server = { version = "0.12.8", path = "misc/server" }
101101
libp2p-stream = { version = "0.2.0-alpha.1", path = "protocols/stream" }
102102
libp2p-swarm = { version = "0.45.2", path = "swarm" }

protocols/autonat/src/v1/behaviour/as_client.rs

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -154,11 +154,20 @@ impl HandleInnerEvent for AsClient<'_> {
154154
error,
155155
request_id,
156156
} => {
157-
tracing::debug!(
158-
%peer,
159-
"Outbound Failure {} when on dial-back request to peer.",
160-
error,
161-
);
157+
if let Some(peer) = peer {
158+
tracing::debug!(
159+
%peer,
160+
%request_id,
161+
"Outbound Failure {} when on dial-back request to peer.",
162+
error,
163+
);
164+
} else {
165+
tracing::debug!(
166+
%request_id,
167+
"Outbound Failure {} when on dial-back request to peer.",
168+
error,
169+
);
170+
}
162171
let probe_id = self
163172
.ongoing_outbound
164173
.remove(&request_id)
@@ -169,7 +178,7 @@ impl HandleInnerEvent for AsClient<'_> {
169178
VecDeque::from([ToSwarm::GenerateEvent(Event::OutboundProbe(
170179
OutboundProbeEvent::Error {
171180
probe_id,
172-
peer: Some(peer),
181+
peer,
173182
error: OutboundProbeError::OutboundRequest(error),
174183
},
175184
))])

protocols/request-response/CHANGELOG.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,8 @@
1+
## 0.28.0
2+
3+
- `send_request` Add support for custom dial options when making request to disconnected peer.
4+
See [PR 5692](https://github.com/libp2p/rust-libp2p/pull/5692).
5+
16
## 0.27.1
27

38
- Deprecate `void` crate.

protocols/request-response/Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ name = "libp2p-request-response"
33
edition = "2021"
44
rust-version = { workspace = true }
55
description = "Generic Request/Response Protocols"
6-
version = "0.27.1"
6+
version = "0.28.0"
77
authors = ["Parity Technologies <[email protected]>"]
88
license = "MIT"
99
repository = "https://github.com/libp2p/rust-libp2p"
@@ -19,7 +19,7 @@ libp2p-core = { workspace = true }
1919
libp2p-swarm = { workspace = true }
2020
libp2p-identity = { workspace = true }
2121
rand = "0.8"
22-
serde = { version = "1.0", optional = true}
22+
serde = { version = "1.0", optional = true }
2323
serde_json = { version = "1.0.117", optional = true }
2424
smallvec = "1.13.2"
2525
tracing = { workspace = true }

protocols/request-response/src/lib.rs

Lines changed: 86 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,7 @@ pub enum Event<TRequest, TResponse, TChannelResponse = TResponse> {
136136
/// An outbound request failed.
137137
OutboundFailure {
138138
/// The peer to whom the request was sent.
139-
peer: PeerId,
139+
peer: Option<PeerId>,
140140
/// The (local) ID of the failed request.
141141
request_id: OutboundRequestId,
142142
/// The error that occurred.
@@ -333,6 +333,24 @@ impl Config {
333333
}
334334
}
335335

336+
#[derive(Debug, Eq, PartialEq, Hash)]
337+
enum PendingOutgoingRequest {
338+
PeerId(PeerId),
339+
ConnectionId(ConnectionId),
340+
}
341+
342+
impl From<PeerId> for PendingOutgoingRequest {
343+
fn from(peer_id: PeerId) -> Self {
344+
Self::PeerId(peer_id)
345+
}
346+
}
347+
348+
impl From<ConnectionId> for PendingOutgoingRequest {
349+
fn from(connection_id: ConnectionId) -> Self {
350+
Self::ConnectionId(connection_id)
351+
}
352+
}
353+
336354
/// A request/response protocol for some message codec.
337355
pub struct Behaviour<TCodec>
338356
where
@@ -360,7 +378,8 @@ where
360378
addresses: PeerAddresses,
361379
/// Requests that have not yet been sent and are waiting for a connection
362380
/// to be established.
363-
pending_outbound_requests: HashMap<PeerId, SmallVec<[OutboundMessage<TCodec>; 10]>>,
381+
pending_outbound_requests:
382+
HashMap<PendingOutgoingRequest, SmallVec<[OutboundMessage<TCodec>; 10]>>,
364383
}
365384

366385
impl<TCodec> Behaviour<TCodec>
@@ -417,28 +436,45 @@ where
417436
/// connection is established.
418437
///
419438
/// > **Note**: In order for such a dialing attempt to succeed,
420-
/// > the `RequestResonse` protocol must either be embedded
421-
/// > in another `NetworkBehaviour` that provides peer and
422-
/// > address discovery, or known addresses of peers must be
423-
/// > managed via [`Behaviour::add_address`] and
439+
/// > the `peer` must be [`DialOpts`] with multiaddresses or
440+
/// > in case of simple [`PeerId`] `RequestResponse` protocol
441+
/// > must either be embedded in another `NetworkBehaviour`
442+
/// > that provides peer and address discovery, or known addresses of
443+
/// > peers must be managed via [`Behaviour::add_address`] and
424444
/// > [`Behaviour::remove_address`].
425-
pub fn send_request(&mut self, peer: &PeerId, request: TCodec::Request) -> OutboundRequestId {
445+
pub fn send_request<Peer>(&mut self, peer: Peer, request: TCodec::Request) -> OutboundRequestId
446+
where
447+
DialOpts: From<Peer>,
448+
{
426449
let request_id = self.next_outbound_request_id();
427450
let request = OutboundMessage {
428451
request_id,
429452
request,
430453
protocols: self.outbound_protocols.clone(),
431454
};
432455

433-
if let Some(request) = self.try_send_request(peer, request) {
434-
self.pending_events.push_back(ToSwarm::Dial {
435-
opts: DialOpts::peer_id(*peer).build(),
436-
});
437-
self.pending_outbound_requests
438-
.entry(*peer)
439-
.or_default()
440-
.push(request);
441-
}
456+
let opts = DialOpts::from(peer);
457+
let maybe_peer_id = opts.get_peer_id();
458+
let request = if let Some(peer_id) = &maybe_peer_id {
459+
if let Some(request) = self.try_send_request(peer_id, request) {
460+
request
461+
} else {
462+
// Sent successfully
463+
return request_id;
464+
}
465+
} else {
466+
request
467+
};
468+
469+
self.pending_outbound_requests
470+
.entry(if let Some(peer_id) = maybe_peer_id {
471+
peer_id.into()
472+
} else {
473+
opts.connection_id().into()
474+
})
475+
.or_default()
476+
.push(request);
477+
self.pending_events.push_back(ToSwarm::Dial { opts });
442478

443479
request_id
444480
}
@@ -506,7 +542,7 @@ where
506542
// Check if request is still pending to be sent.
507543
let pen_conn = self
508544
.pending_outbound_requests
509-
.get(peer)
545+
.get(&PendingOutgoingRequest::from(*peer))
510546
.map(|rps| rps.iter().any(|rp| rp.request_id == *request_id))
511547
.unwrap_or(false);
512548

@@ -665,30 +701,41 @@ where
665701
for request_id in connection.pending_outbound_responses {
666702
self.pending_events
667703
.push_back(ToSwarm::GenerateEvent(Event::OutboundFailure {
668-
peer: peer_id,
704+
peer: Some(peer_id),
669705
request_id,
670706
error: OutboundFailure::ConnectionClosed,
671707
}));
672708
}
673709
}
674710

675-
fn on_dial_failure(&mut self, DialFailure { peer_id, .. }: DialFailure) {
676-
if let Some(peer) = peer_id {
677-
// If there are pending outgoing requests when a dial failure occurs,
678-
// it is implied that we are not connected to the peer, since pending
679-
// outgoing requests are drained when a connection is established and
680-
// only created when a peer is not connected when a request is made.
681-
// Thus these requests must be considered failed, even if there is
682-
// another, concurrent dialing attempt ongoing.
683-
if let Some(pending) = self.pending_outbound_requests.remove(&peer) {
684-
for request in pending {
685-
self.pending_events
686-
.push_back(ToSwarm::GenerateEvent(Event::OutboundFailure {
687-
peer,
688-
request_id: request.request_id,
689-
error: OutboundFailure::DialFailure,
690-
}));
691-
}
711+
fn on_dial_failure(
712+
&mut self,
713+
DialFailure {
714+
peer_id,
715+
connection_id,
716+
..
717+
}: DialFailure,
718+
) {
719+
let key = if let Some(peer_id) = peer_id {
720+
peer_id.into()
721+
} else {
722+
connection_id.into()
723+
};
724+
725+
// If there are pending outgoing requests when a dial failure occurs,
726+
// it is implied that we are not connected to the peer, since pending
727+
// outgoing requests are drained when a connection is established and
728+
// only created when a peer is not connected when a request is made.
729+
// Thus, these requests must be considered failed, even if there is
730+
// another, concurrent dialing attempt ongoing.
731+
if let Some(pending) = self.pending_outbound_requests.remove(&key) {
732+
for request in pending {
733+
self.pending_events
734+
.push_back(ToSwarm::GenerateEvent(Event::OutboundFailure {
735+
peer: peer_id,
736+
request_id: request.request_id,
737+
error: OutboundFailure::DialFailure,
738+
}));
692739
}
693740
}
694741
}
@@ -703,7 +750,7 @@ where
703750
) {
704751
let mut connection = Connection::new(connection_id, remote_address);
705752

706-
if let Some(pending_requests) = self.pending_outbound_requests.remove(&peer) {
753+
if let Some(pending_requests) = self.pending_outbound_requests.remove(&peer.into()) {
707754
for request in pending_requests {
708755
connection
709756
.pending_outbound_responses
@@ -887,7 +934,7 @@ where
887934

888935
self.pending_events
889936
.push_back(ToSwarm::GenerateEvent(Event::OutboundFailure {
890-
peer,
937+
peer: Some(peer),
891938
request_id,
892939
error: OutboundFailure::Timeout,
893940
}));
@@ -901,7 +948,7 @@ where
901948

902949
self.pending_events
903950
.push_back(ToSwarm::GenerateEvent(Event::OutboundFailure {
904-
peer,
951+
peer: Some(peer),
905952
request_id,
906953
error: OutboundFailure::UnsupportedProtocols,
907954
}));
@@ -912,7 +959,7 @@ where
912959

913960
self.pending_events
914961
.push_back(ToSwarm::GenerateEvent(Event::OutboundFailure {
915-
peer,
962+
peer: Some(peer),
916963
request_id,
917964
error: OutboundFailure::Io(error),
918965
}))

protocols/request-response/tests/error_reporting.rs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ async fn report_outbound_failure_on_read_response() {
5151
.send_request(&peer1_id, Action::FailOnReadResponse);
5252

5353
let (peer, req_id_done, error) = wait_outbound_failure(&mut swarm2).await.unwrap();
54-
assert_eq!(peer, peer1_id);
54+
assert_eq!(peer, Some(peer1_id));
5555
assert_eq!(req_id_done, req_id);
5656

5757
let error = match error {
@@ -94,7 +94,7 @@ async fn report_outbound_failure_on_write_request() {
9494
.send_request(&peer1_id, Action::FailOnWriteRequest);
9595

9696
let (peer, req_id_done, error) = wait_outbound_failure(&mut swarm2).await.unwrap();
97-
assert_eq!(peer, peer1_id);
97+
assert_eq!(peer, Some(peer1_id));
9898
assert_eq!(req_id_done, req_id);
9999

100100
let error = match error {
@@ -151,7 +151,7 @@ async fn report_outbound_timeout_on_read_response() {
151151
.send_request(&peer1_id, Action::TimeoutOnReadResponse);
152152

153153
let (peer, req_id_done, error) = wait_outbound_failure(&mut swarm2).await.unwrap();
154-
assert_eq!(peer, peer1_id);
154+
assert_eq!(peer, Some(peer1_id));
155155
assert_eq!(req_id_done, req_id);
156156
assert!(matches!(error, OutboundFailure::Timeout));
157157
};
@@ -203,7 +203,7 @@ async fn report_outbound_failure_on_max_streams() {
203203
.send_request(&peer1_id, Action::FailOnMaxStreams);
204204

205205
let (peer, req_id_done, error) = wait_outbound_failure(&mut swarm2).await.unwrap();
206-
assert_eq!(peer, peer1_id);
206+
assert_eq!(peer, Some(peer1_id));
207207
assert_eq!(req_id_done, outbound_req_id);
208208
assert!(matches!(error, OutboundFailure::Io(_)));
209209
};
@@ -236,7 +236,7 @@ async fn report_inbound_failure_on_read_request() {
236236
.send_request(&peer1_id, Action::FailOnReadRequest);
237237

238238
let (peer, req_id_done, error) = wait_outbound_failure(&mut swarm2).await.unwrap();
239-
assert_eq!(peer, peer1_id);
239+
assert_eq!(peer, Some(peer1_id));
240240
assert_eq!(req_id_done, req_id);
241241

242242
match error {
@@ -295,7 +295,7 @@ async fn report_inbound_failure_on_write_response() {
295295
.send_request(&peer1_id, Action::FailOnWriteResponse);
296296

297297
let (peer, req_id_done, error) = wait_outbound_failure(&mut swarm2).await.unwrap();
298-
assert_eq!(peer, peer1_id);
298+
assert_eq!(peer, Some(peer1_id));
299299
assert_eq!(req_id_done, req_id);
300300

301301
match error {
@@ -352,7 +352,7 @@ async fn report_inbound_timeout_on_write_response() {
352352
.send_request(&peer1_id, Action::TimeoutOnWriteResponse);
353353

354354
let (peer, req_id_done, error) = wait_outbound_failure(&mut swarm2).await.unwrap();
355-
assert_eq!(peer, peer1_id);
355+
assert_eq!(peer, Some(peer1_id));
356356
assert_eq!(req_id_done, req_id);
357357

358358
match error {
@@ -612,7 +612,7 @@ async fn wait_inbound_failure(
612612

613613
async fn wait_outbound_failure(
614614
swarm: &mut Swarm<request_response::Behaviour<TestCodec>>,
615-
) -> Result<(PeerId, OutboundRequestId, OutboundFailure)> {
615+
) -> Result<(Option<PeerId>, OutboundRequestId, OutboundFailure)> {
616616
loop {
617617
match swarm.select_next_some().await.try_into_behaviour_event() {
618618
Ok(request_response::Event::OutboundFailure {

0 commit comments

Comments
 (0)