Skip to content

Commit 31e2a06

Browse files
committed
feat: Add connection id to the events emitted by a request-response
1 parent d0590a7 commit 31e2a06

File tree

7 files changed

+85
-23
lines changed

7 files changed

+85
-23
lines changed

protocols/autonat/src/v1/behaviour/as_client.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,7 @@ impl HandleInnerEvent for AsClient<'_> {
112112
request_id,
113113
response,
114114
},
115+
..
115116
} => {
116117
tracing::debug!(?response, "Outbound dial-back request returned response");
117118

@@ -154,6 +155,7 @@ impl HandleInnerEvent for AsClient<'_> {
154155
peer,
155156
error,
156157
request_id,
158+
..
157159
} => {
158160
tracing::debug!(
159161
%peer,

protocols/autonat/src/v1/behaviour/as_server.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,7 @@ impl HandleInnerEvent for AsServer<'_> {
107107
request,
108108
channel,
109109
},
110+
..
110111
} => {
111112
let probe_id = self.probe_id.next();
112113
if !self.connected.contains_key(&peer) {
@@ -183,6 +184,7 @@ impl HandleInnerEvent for AsServer<'_> {
183184
peer,
184185
error,
185186
request_id,
187+
..
186188
} => {
187189
tracing::debug!(
188190
%peer,

protocols/rendezvous/src/server.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,7 @@ impl NetworkBehaviour for Behaviour {
183183
libp2p_request_response::Message::Request {
184184
request, channel, ..
185185
},
186+
..
186187
}) => {
187188
if let Some((event, response)) =
188189
handle_request(peer_id, request, &mut self.registrations)
@@ -202,6 +203,7 @@ impl NetworkBehaviour for Behaviour {
202203
peer,
203204
request_id,
204205
error,
206+
..
205207
}) => {
206208
tracing::warn!(
207209
%peer,
@@ -217,6 +219,7 @@ impl NetworkBehaviour for Behaviour {
217219
| ToSwarm::GenerateEvent(libp2p_request_response::Event::Message {
218220
peer: _,
219221
message: libp2p_request_response::Message::Response { .. },
222+
..
220223
})
221224
| ToSwarm::GenerateEvent(libp2p_request_response::Event::OutboundFailure {
222225
..

protocols/request-response/CHANGELOG.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,8 @@
1+
## 0.28.0
2+
3+
- Add connection id to the events emitted by a request-response `Behaviour`.
4+
See [PR 5676](TODO).
5+
16
## 0.27.1
27

38
- Deprecate `void` crate.

protocols/request-response/src/lib.rs

Lines changed: 61 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -131,13 +131,17 @@ pub enum Event<TRequest, TResponse, TChannelResponse = TResponse> {
131131
Message {
132132
/// The peer who sent the message.
133133
peer: PeerId,
134+
/// The connection used
135+
connection_id: ConnectionId,
134136
/// The incoming message.
135137
message: Message<TRequest, TResponse, TChannelResponse>,
136138
},
137139
/// An outbound request failed.
138140
OutboundFailure {
139141
/// The peer to whom the request was sent.
140142
peer: PeerId,
143+
/// The connection used
144+
connection_id: ConnectionId,
141145
/// The (local) ID of the failed request.
142146
request_id: OutboundRequestId,
143147
/// The error that occurred.
@@ -147,6 +151,8 @@ pub enum Event<TRequest, TResponse, TChannelResponse = TResponse> {
147151
InboundFailure {
148152
/// The peer from whom the request was received.
149153
peer: PeerId,
154+
/// The connection used
155+
connection_id: ConnectionId,
150156
/// The ID of the failed inbound request.
151157
request_id: InboundRequestId,
152158
/// The error that occurred.
@@ -159,6 +165,8 @@ pub enum Event<TRequest, TResponse, TChannelResponse = TResponse> {
159165
ResponseSent {
160166
/// The peer to whom the response was sent.
161167
peer: PeerId,
168+
/// The connection used
169+
connection_id: ConnectionId,
162170
/// The ID of the inbound request whose response was sent.
163171
request_id: InboundRequestId,
164172
},
@@ -569,10 +577,10 @@ where
569577
fn remove_pending_outbound_response(
570578
&mut self,
571579
peer: &PeerId,
572-
connection: ConnectionId,
580+
connection_id: ConnectionId,
573581
request: OutboundRequestId,
574582
) -> bool {
575-
self.get_connection_mut(peer, connection)
583+
self.get_connection_mut(peer, connection_id)
576584
.map(|c| c.pending_outbound_responses.remove(&request))
577585
.unwrap_or(false)
578586
}
@@ -585,10 +593,10 @@ where
585593
fn remove_pending_inbound_response(
586594
&mut self,
587595
peer: &PeerId,
588-
connection: ConnectionId,
596+
connection_id: ConnectionId,
589597
request: InboundRequestId,
590598
) -> bool {
591-
self.get_connection_mut(peer, connection)
599+
self.get_connection_mut(peer, connection_id)
592600
.map(|c| c.pending_inbound_responses.remove(&request))
593601
.unwrap_or(false)
594602
}
@@ -598,11 +606,11 @@ where
598606
fn get_connection_mut(
599607
&mut self,
600608
peer: &PeerId,
601-
connection: ConnectionId,
609+
connection_id: ConnectionId,
602610
) -> Option<&mut Connection> {
603611
self.connected
604612
.get_mut(peer)
605-
.and_then(|connections| connections.iter_mut().find(|c| c.id == connection))
613+
.and_then(|connections| connections.iter_mut().find(|c| c.id == connection_id))
606614
}
607615

608616
fn on_address_change(
@@ -659,6 +667,7 @@ where
659667
self.pending_events
660668
.push_back(ToSwarm::GenerateEvent(Event::InboundFailure {
661669
peer: peer_id,
670+
connection_id,
662671
request_id,
663672
error: InboundFailure::ConnectionClosed,
664673
}));
@@ -668,13 +677,21 @@ where
668677
self.pending_events
669678
.push_back(ToSwarm::GenerateEvent(Event::OutboundFailure {
670679
peer: peer_id,
680+
connection_id,
671681
request_id,
672682
error: OutboundFailure::ConnectionClosed,
673683
}));
674684
}
675685
}
676686

677-
fn on_dial_failure(&mut self, DialFailure { peer_id, .. }: DialFailure) {
687+
fn on_dial_failure(
688+
&mut self,
689+
DialFailure {
690+
peer_id,
691+
connection_id,
692+
..
693+
}: DialFailure,
694+
) {
678695
if let Some(peer) = peer_id {
679696
// If there are pending outgoing requests when a dial failure occurs,
680697
// it is implied that we are not connected to the peer, since pending
@@ -687,6 +704,7 @@ where
687704
self.pending_events
688705
.push_back(ToSwarm::GenerateEvent(Event::OutboundFailure {
689706
peer,
707+
connection_id,
690708
request_id: request.request_id,
691709
error: OutboundFailure::DialFailure,
692710
}));
@@ -811,15 +829,16 @@ where
811829
fn on_connection_handler_event(
812830
&mut self,
813831
peer: PeerId,
814-
connection: ConnectionId,
832+
connection_id: ConnectionId,
815833
event: THandlerOutEvent<Self>,
816834
) {
817835
match event {
818836
handler::Event::Response {
819837
request_id,
820838
response,
821839
} => {
822-
let removed = self.remove_pending_outbound_response(&peer, connection, request_id);
840+
let removed =
841+
self.remove_pending_outbound_response(&peer, connection_id, request_id);
823842
debug_assert!(
824843
removed,
825844
"Expect request_id to be pending before receiving response.",
@@ -830,13 +849,17 @@ where
830849
response,
831850
};
832851
self.pending_events
833-
.push_back(ToSwarm::GenerateEvent(Event::Message { peer, message }));
852+
.push_back(ToSwarm::GenerateEvent(Event::Message {
853+
peer,
854+
connection_id,
855+
message,
856+
}));
834857
}
835858
handler::Event::Request {
836859
request_id,
837860
request,
838861
sender,
839-
} => match self.get_connection_mut(&peer, connection) {
862+
} => match self.get_connection_mut(&peer, connection_id) {
840863
Some(connection) => {
841864
let inserted = connection.pending_inbound_responses.insert(request_id);
842865
debug_assert!(inserted, "Expect id of new request to be unknown.");
@@ -848,14 +871,19 @@ where
848871
channel,
849872
};
850873
self.pending_events
851-
.push_back(ToSwarm::GenerateEvent(Event::Message { peer, message }));
874+
.push_back(ToSwarm::GenerateEvent(Event::Message {
875+
peer,
876+
connection_id,
877+
message,
878+
}));
852879
}
853880
None => {
854-
tracing::debug!("Connection ({connection}) closed after `Event::Request` ({request_id}) has been emitted.");
881+
tracing::debug!("Connection ({connection_id}) closed after `Event::Request` ({request_id}) has been emitted.");
855882
}
856883
},
857884
handler::Event::ResponseSent(request_id) => {
858-
let removed = self.remove_pending_inbound_response(&peer, connection, request_id);
885+
let removed =
886+
self.remove_pending_inbound_response(&peer, connection_id, request_id);
859887
debug_assert!(
860888
removed,
861889
"Expect request_id to be pending before response is sent."
@@ -864,11 +892,13 @@ where
864892
self.pending_events
865893
.push_back(ToSwarm::GenerateEvent(Event::ResponseSent {
866894
peer,
895+
connection_id,
867896
request_id,
868897
}));
869898
}
870899
handler::Event::ResponseOmission(request_id) => {
871-
let removed = self.remove_pending_inbound_response(&peer, connection, request_id);
900+
let removed =
901+
self.remove_pending_inbound_response(&peer, connection_id, request_id);
872902
debug_assert!(
873903
removed,
874904
"Expect request_id to be pending before response is omitted.",
@@ -877,12 +907,14 @@ where
877907
self.pending_events
878908
.push_back(ToSwarm::GenerateEvent(Event::InboundFailure {
879909
peer,
910+
connection_id,
880911
request_id,
881912
error: InboundFailure::ResponseOmission,
882913
}));
883914
}
884915
handler::Event::OutboundTimeout(request_id) => {
885-
let removed = self.remove_pending_outbound_response(&peer, connection, request_id);
916+
let removed =
917+
self.remove_pending_outbound_response(&peer, connection_id, request_id);
886918
debug_assert!(
887919
removed,
888920
"Expect request_id to be pending before request times out."
@@ -891,12 +923,14 @@ where
891923
self.pending_events
892924
.push_back(ToSwarm::GenerateEvent(Event::OutboundFailure {
893925
peer,
926+
connection_id,
894927
request_id,
895928
error: OutboundFailure::Timeout,
896929
}));
897930
}
898931
handler::Event::OutboundUnsupportedProtocols(request_id) => {
899-
let removed = self.remove_pending_outbound_response(&peer, connection, request_id);
932+
let removed =
933+
self.remove_pending_outbound_response(&peer, connection_id, request_id);
900934
debug_assert!(
901935
removed,
902936
"Expect request_id to be pending before failing to connect.",
@@ -905,28 +939,33 @@ where
905939
self.pending_events
906940
.push_back(ToSwarm::GenerateEvent(Event::OutboundFailure {
907941
peer,
942+
connection_id,
908943
request_id,
909944
error: OutboundFailure::UnsupportedProtocols,
910945
}));
911946
}
912947
handler::Event::OutboundStreamFailed { request_id, error } => {
913-
let removed = self.remove_pending_outbound_response(&peer, connection, request_id);
948+
let removed =
949+
self.remove_pending_outbound_response(&peer, connection_id, request_id);
914950
debug_assert!(removed, "Expect request_id to be pending upon failure");
915951

916952
self.pending_events
917953
.push_back(ToSwarm::GenerateEvent(Event::OutboundFailure {
918954
peer,
955+
connection_id,
919956
request_id,
920957
error: OutboundFailure::Io(error),
921958
}))
922959
}
923960
handler::Event::InboundTimeout(request_id) => {
924-
let removed = self.remove_pending_inbound_response(&peer, connection, request_id);
961+
let removed =
962+
self.remove_pending_inbound_response(&peer, connection_id, request_id);
925963

926964
if removed {
927965
self.pending_events
928966
.push_back(ToSwarm::GenerateEvent(Event::InboundFailure {
929967
peer,
968+
connection_id,
930969
request_id,
931970
error: InboundFailure::Timeout,
932971
}));
@@ -938,12 +977,14 @@ where
938977
}
939978
}
940979
handler::Event::InboundStreamFailed { request_id, error } => {
941-
let removed = self.remove_pending_inbound_response(&peer, connection, request_id);
980+
let removed =
981+
self.remove_pending_inbound_response(&peer, connection_id, request_id);
942982

943983
if removed {
944984
self.pending_events
945985
.push_back(ToSwarm::GenerateEvent(Event::InboundFailure {
946986
peer,
987+
connection_id,
947988
request_id,
948989
error: InboundFailure::Io(error),
949990
}));

protocols/request-response/tests/error_reporting.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -566,6 +566,7 @@ async fn wait_request(
566566
request,
567567
channel,
568568
},
569+
..
569570
}) => {
570571
return Ok((peer, request_id, request, channel));
571572
}
@@ -600,6 +601,7 @@ async fn wait_inbound_failure(
600601
peer,
601602
request_id,
602603
error,
604+
..
603605
}) => {
604606
return Ok((peer, request_id, error));
605607
}
@@ -618,6 +620,7 @@ async fn wait_outbound_failure(
618620
peer,
619621
request_id,
620622
error,
623+
..
621624
}) => {
622625
return Ok((peer, request_id, error));
623626
}

0 commit comments

Comments
 (0)