|
36 | 36 |
|
37 | 37 | use crate::{types::ProtocolName, ReputationChange}; |
38 | 38 |
|
39 | | -use futures::{ |
40 | | - channel::{mpsc, oneshot}, |
41 | | - prelude::*, |
42 | | -}; |
| 39 | +use futures::{channel::oneshot, prelude::*}; |
43 | 40 | use libp2p::{ |
44 | 41 | core::{Endpoint, Multiaddr}, |
45 | 42 | request_response::{self, Behaviour, Codec, Message, ProtocolSupport, ResponseChannel}, |
@@ -126,7 +123,7 @@ pub struct ProtocolConfig { |
126 | 123 | /// other peers. If this is `Some` but the channel is closed, then the local node will |
127 | 124 | /// advertise support for this protocol, but any incoming request will lead to an error being |
128 | 125 | /// sent back. |
129 | | - pub inbound_queue: Option<mpsc::Sender<IncomingRequest>>, |
| 126 | + pub inbound_queue: Option<async_channel::Sender<IncomingRequest>>, |
130 | 127 | } |
131 | 128 |
|
132 | 129 | /// A single request received by a peer on a request-response protocol. |
@@ -259,8 +256,10 @@ pub struct RequestResponsesBehaviour { |
259 | 256 | /// |
260 | 257 | /// Contains the underlying libp2p request-response [`Behaviour`], plus an optional |
261 | 258 | /// "response builder" used to build responses for incoming requests. |
262 | | - protocols: |
263 | | - HashMap<ProtocolName, (Behaviour<GenericCodec>, Option<mpsc::Sender<IncomingRequest>>)>, |
| 259 | + protocols: HashMap< |
| 260 | + ProtocolName, |
| 261 | + (Behaviour<GenericCodec>, Option<async_channel::Sender<IncomingRequest>>), |
| 262 | + >, |
264 | 263 |
|
265 | 264 | /// Pending requests, passed down to a request-response [`Behaviour`], awaiting a reply. |
266 | 265 | pending_requests: |
@@ -295,7 +294,10 @@ struct MessageRequest { |
295 | 294 | request: Vec<u8>, |
296 | 295 | channel: ResponseChannel<Result<Vec<u8>, ()>>, |
297 | 296 | protocol: ProtocolName, |
298 | | - resp_builder: Option<futures::channel::mpsc::Sender<IncomingRequest>>, |
| 297 | + // A builder used for building responses for incoming requests. Note that we use |
| 298 | + // `async_channel` and not `mpsc` on purpose, because `mpsc::channel` allocates an extra |
| 299 | + // message slot for every cloned `Sender` and this breaks a back-pressure mechanism. |
| 300 | + resp_builder: Option<async_channel::Sender<IncomingRequest>>, |
299 | 301 | // Once we get incoming request we save all params, create an async call to Peerset |
300 | 302 | // to get the reputation of the peer. |
301 | 303 | get_peer_reputation: Pin<Box<dyn Future<Output = Result<i32, ()>> + Send>>, |
@@ -618,10 +620,12 @@ impl NetworkBehaviour for RequestResponsesBehaviour { |
618 | 620 |
|
619 | 621 | // Submit the request to the "response builder" passed by the user at |
620 | 622 | // initialization. |
621 | | - if let Some(mut resp_builder) = resp_builder { |
| 623 | + if let Some(resp_builder) = resp_builder { |
622 | 624 | // If the response builder is too busy, silently drop `tx`. This |
623 | 625 | // will be reported by the corresponding request-response [`Behaviour`] |
624 | 626 | // through an `InboundFailure::Omission` event. |
| 627 | + // Note that we use `async_channel::bounded` and not `mpsc::channel` |
| 628 | + // because the latter allocates an extra slot for every cloned sender. |
625 | 629 | let _ = resp_builder.try_send(IncomingRequest { |
626 | 630 | peer, |
627 | 631 | payload: request, |
@@ -1036,11 +1040,7 @@ impl Codec for GenericCodec { |
1036 | 1040 | mod tests { |
1037 | 1041 | use super::*; |
1038 | 1042 |
|
1039 | | - use futures::{ |
1040 | | - channel::{mpsc, oneshot}, |
1041 | | - executor::LocalPool, |
1042 | | - task::Spawn, |
1043 | | - }; |
| 1043 | + use futures::{channel::oneshot, executor::LocalPool, task::Spawn}; |
1044 | 1044 | use libp2p::{ |
1045 | 1045 | core::{ |
1046 | 1046 | transport::{MemoryTransport, Transport}, |
@@ -1112,7 +1112,7 @@ mod tests { |
1112 | 1112 | // Build swarms whose behaviour is [`RequestResponsesBehaviour`]. |
1113 | 1113 | let mut swarms = (0..2) |
1114 | 1114 | .map(|_| { |
1115 | | - let (tx, mut rx) = mpsc::channel::<IncomingRequest>(64); |
| 1115 | + let (tx, mut rx) = async_channel::bounded::<IncomingRequest>(64); |
1116 | 1116 |
|
1117 | 1117 | pool.spawner() |
1118 | 1118 | .spawn_obj( |
@@ -1215,7 +1215,7 @@ mod tests { |
1215 | 1215 | // Build swarms whose behaviour is [`RequestResponsesBehaviour`]. |
1216 | 1216 | let mut swarms = (0..2) |
1217 | 1217 | .map(|_| { |
1218 | | - let (tx, mut rx) = mpsc::channel::<IncomingRequest>(64); |
| 1218 | + let (tx, mut rx) = async_channel::bounded::<IncomingRequest>(64); |
1219 | 1219 |
|
1220 | 1220 | pool.spawner() |
1221 | 1221 | .spawn_obj( |
@@ -1353,8 +1353,8 @@ mod tests { |
1353 | 1353 | }; |
1354 | 1354 |
|
1355 | 1355 | let (mut swarm_2, mut swarm_2_handler_1, mut swarm_2_handler_2, listen_add_2, peerset) = { |
1356 | | - let (tx_1, rx_1) = mpsc::channel(64); |
1357 | | - let (tx_2, rx_2) = mpsc::channel(64); |
| 1356 | + let (tx_1, rx_1) = async_channel::bounded(64); |
| 1357 | + let (tx_2, rx_2) = async_channel::bounded(64); |
1358 | 1358 |
|
1359 | 1359 | let protocol_configs = vec![ |
1360 | 1360 | ProtocolConfig { |
|
0 commit comments