@@ -32,6 +32,7 @@ use crate::{
3232use concurrent_dial:: ConcurrentDial ;
3333use fnv:: FnvHashMap ;
3434use futures:: prelude:: * ;
35+ use futures:: stream:: SelectAll ;
3536use futures:: {
3637 channel:: { mpsc, oneshot} ,
3738 future:: { poll_fn, BoxFuture , Either } ,
@@ -41,6 +42,7 @@ use futures::{
4142use instant:: Instant ;
4243use libp2p_core:: connection:: Endpoint ;
4344use libp2p_core:: muxing:: { StreamMuxerBox , StreamMuxerExt } ;
45+ use std:: task:: Waker ;
4446use std:: {
4547 collections:: { hash_map, HashMap } ,
4648 convert:: TryFrom as _,
@@ -117,6 +119,9 @@ where
117119 /// See [`Connection::max_negotiating_inbound_streams`].
118120 max_negotiating_inbound_streams : usize ,
119121
122+ /// How many [`task::EstablishedConnectionEvent`]s can be buffered before the connection is back-pressured.
123+ per_connection_event_buffer_size : usize ,
124+
120125 /// The executor to use for running connection tasks. Can either be a global executor
121126 /// or a local queue.
122127 executor : ExecSwitch ,
@@ -128,14 +133,12 @@ where
128133 /// Receiver for events reported from pending tasks.
129134 pending_connection_events_rx : mpsc:: Receiver < task:: PendingConnectionEvent > ,
130135
131- /// Sender distributed to established tasks for reporting events back
132- /// to the pool.
133- established_connection_events_tx :
134- mpsc:: Sender < task:: EstablishedConnectionEvent < THandler :: Handler > > ,
136+ /// Waker in case we haven't established any connections yet.
137+ no_established_connections_waker : Option < Waker > ,
135138
136- /// Receiver for events reported from established tasks .
137- established_connection_events_rx :
138- mpsc:: Receiver < task:: EstablishedConnectionEvent < THandler :: Handler > > ,
139+ /// Receivers for events reported from established connections .
140+ established_connection_events :
141+ SelectAll < mpsc:: Receiver < task:: EstablishedConnectionEvent < THandler :: Handler > > > ,
139142}
140143
141144#[ derive( Debug ) ]
@@ -315,8 +318,6 @@ where
315318 /// Creates a new empty `Pool`.
316319 pub fn new ( local_id : PeerId , config : PoolConfig , limits : ConnectionLimits ) -> Self {
317320 let ( pending_connection_events_tx, pending_connection_events_rx) = mpsc:: channel ( 0 ) ;
318- let ( established_connection_events_tx, established_connection_events_rx) =
319- mpsc:: channel ( config. task_event_buffer_size ) ;
320321 let executor = match config. executor {
321322 Some ( exec) => ExecSwitch :: Executor ( exec) ,
322323 None => ExecSwitch :: LocalSpawn ( Default :: default ( ) ) ,
@@ -331,11 +332,12 @@ where
331332 dial_concurrency_factor : config. dial_concurrency_factor ,
332333 substream_upgrade_protocol_override : config. substream_upgrade_protocol_override ,
333334 max_negotiating_inbound_streams : config. max_negotiating_inbound_streams ,
335+ per_connection_event_buffer_size : config. per_connection_event_buffer_size ,
334336 executor,
335337 pending_connection_events_tx,
336338 pending_connection_events_rx,
337- established_connection_events_tx ,
338- established_connection_events_rx ,
339+ no_established_connections_waker : None ,
340+ established_connection_events : Default :: default ( ) ,
339341 }
340342 }
341343
@@ -547,9 +549,11 @@ where
547549 //
548550 // Note that established connections are polled before pending connections, thus
549551 // prioritizing established connections over pending connections.
550- match self . established_connection_events_rx . poll_next_unpin ( cx) {
552+ match self . established_connection_events . poll_next_unpin ( cx) {
551553 Poll :: Pending => { }
552- Poll :: Ready ( None ) => unreachable ! ( "Pool holds both sender and receiver." ) ,
554+ Poll :: Ready ( None ) => {
555+ self . no_established_connections_waker = Some ( cx. waker ( ) . clone ( ) ) ;
556+ }
553557
554558 Poll :: Ready ( Some ( task:: EstablishedConnectionEvent :: Notify { id, peer_id, event } ) ) => {
555559 return Poll :: Ready ( PoolEvent :: ConnectionEvent { peer_id, id, event } ) ;
@@ -750,27 +754,35 @@ where
750754
751755 let ( command_sender, command_receiver) =
752756 mpsc:: channel ( self . task_command_buffer_size ) ;
757+ let ( event_sender, event_receiver) =
758+ mpsc:: channel ( self . per_connection_event_buffer_size ) ;
759+
753760 conns. insert (
754761 id,
755762 EstablishedConnection {
756763 endpoint : endpoint. clone ( ) ,
757764 sender : command_sender,
758765 } ,
759766 ) ;
767+ self . established_connection_events . push ( event_receiver) ;
768+ if let Some ( waker) = self . no_established_connections_waker . take ( ) {
769+ waker. wake ( ) ;
770+ }
760771
761772 let connection = Connection :: new (
762773 muxer,
763774 handler. into_handler ( & obtained_peer_id, & endpoint) ,
764775 self . substream_upgrade_protocol_override ,
765776 self . max_negotiating_inbound_streams ,
766777 ) ;
778+
767779 self . spawn (
768780 task:: new_for_established_connection (
769781 id,
770782 obtained_peer_id,
771783 connection,
772784 command_receiver,
773- self . established_connection_events_tx . clone ( ) ,
785+ event_sender ,
774786 )
775787 . boxed ( ) ,
776788 ) ;
@@ -1069,7 +1081,7 @@ pub struct PoolConfig {
10691081
10701082 /// Size of the pending connection task event buffer and the established connection task event
10711083 /// buffer.
1072- pub task_event_buffer_size : usize ,
1084+ pub per_connection_event_buffer_size : usize ,
10731085
10741086 /// Number of addresses concurrently dialed for a single outbound connection attempt.
10751087 pub dial_concurrency_factor : NonZeroU8 ,
@@ -1088,7 +1100,7 @@ impl PoolConfig {
10881100 Self {
10891101 executor,
10901102 task_command_buffer_size : 32 ,
1091- task_event_buffer_size : 7 ,
1103+ per_connection_event_buffer_size : 7 ,
10921104 dial_concurrency_factor : NonZeroU8 :: new ( 8 ) . expect ( "8 > 0" ) ,
10931105 substream_upgrade_protocol_override : None ,
10941106 max_negotiating_inbound_streams : 128 ,
@@ -1113,8 +1125,8 @@ impl PoolConfig {
11131125 /// When the buffer is full, the background tasks of all connections will stall.
11141126 /// In this way, the consumers of network events exert back-pressure on
11151127 /// the network connection I/O.
1116- pub fn with_connection_event_buffer_size ( mut self , n : usize ) -> Self {
1117- self . task_event_buffer_size = n;
1128+ pub fn with_per_connection_event_buffer_size ( mut self , n : usize ) -> Self {
1129+ self . per_connection_event_buffer_size = n;
11181130 self
11191131 }
11201132
0 commit comments