@@ -10,6 +10,7 @@ use futures::TryFutureExt;
1010use rustls:: pki_types:: CertificateDer ;
1111use rustls_pemfile:: certs;
1212use tokio:: sync:: RwLock ;
13+ use tokio:: time:: { timeout, Duration } ;
1314use tracing:: { debug, trace} ;
1415
1516use crate :: { app:: SyncDnsClient , proxy:: * , session:: Session } ;
@@ -87,6 +88,7 @@ impl Manager {
8788 transport_config. max_idle_timeout ( Some ( quinn:: IdleTimeout :: from ( quinn:: VarInt :: from_u32 (
8889 300_000 ,
8990 ) ) ) ) ;
91+ transport_config. keep_alive_interval ( Some ( Duration :: from_secs ( 10 ) ) ) ;
9092 transport_config
9193 . congestion_controller_factory ( Arc :: new ( quinn:: congestion:: BbrConfig :: default ( ) ) ) ;
9294 client_config. transport_config ( Arc :: new ( transport_config) ) ;
@@ -106,19 +108,30 @@ impl Manager {
106108 pub async fn new_stream (
107109 & self ,
108110 ) -> Result < QuicProxyStream < quinn:: RecvStream , quinn:: SendStream > > {
111+ let dial_timeout = Duration :: from_secs ( * crate :: option:: OUTBOUND_DIAL_TIMEOUT ) ;
109112 let start = std:: time:: Instant :: now ( ) ;
110- for conn in self . connections . read ( ) . await . iter ( ) {
111- match conn. open_bi ( ) . await {
112- Ok ( ( send, recv) ) => {
113- trace ! (
114- "opened QUIC stream on existing connection (rtt {} ms) in {} ms" ,
115- conn. rtt( ) . as_millis( ) ,
116- start. elapsed( ) . as_millis( ) ,
117- ) ;
118- return Ok ( QuicProxyStream { recv, send } ) ;
119- }
120- Err ( e) => {
121- debug ! ( "open QUIC stream failed: {}" , e) ;
113+ {
114+ let mut conns = self . connections . write ( ) . await ;
115+ let idx = 0usize ;
116+ while idx < conns. len ( ) {
117+ let conn = & conns[ idx] ;
118+ match timeout ( dial_timeout, conn. open_bi ( ) ) . await {
119+ Ok ( Ok ( ( send, recv) ) ) => {
120+ trace ! (
121+ "opened QUIC stream on existing connection (rtt {} ms) in {} ms" ,
122+ conn. rtt( ) . as_millis( ) ,
123+ start. elapsed( ) . as_millis( ) ,
124+ ) ;
125+ return Ok ( QuicProxyStream { recv, send } ) ;
126+ }
127+ Ok ( Err ( e) ) => {
128+ debug ! ( "open QUIC stream failed: {}" , e) ;
129+ conns. swap_remove ( idx) ;
130+ }
131+ Err ( _) => {
132+ debug ! ( "open QUIC stream timed out" ) ;
133+ conns. swap_remove ( idx) ;
134+ }
122135 }
123136 }
124137 }
@@ -145,16 +158,50 @@ impl Manager {
145158 if ips. is_empty ( ) {
146159 return Err ( anyhow ! ( "could not resolve to any address" , ) ) ;
147160 }
148- let connect_addr = SocketAddr :: new ( ips[ 0 ] , self . port ) ;
149161 let server_name = self . server_name . as_ref ( ) . unwrap_or ( & self . address ) ;
150- let conn = endpoint. connect ( connect_addr, server_name) ?. await ?;
151- let ( send, recv) = conn. open_bi ( ) . await ?;
162+ let mut last_err: Option < anyhow:: Error > = None ;
163+ for ip in ips {
164+ let connect_addr = SocketAddr :: new ( ip, self . port ) ;
165+ let connecting = match endpoint. connect ( connect_addr, server_name) {
166+ Ok ( c) => c,
167+ Err ( e) => {
168+ last_err = Some ( e. into ( ) ) ;
169+ continue ;
170+ }
171+ } ;
172+ let conn = match timeout ( dial_timeout, connecting) . await {
173+ Ok ( Ok ( c) ) => c,
174+ Ok ( Err ( e) ) => {
175+ last_err = Some ( e. into ( ) ) ;
176+ continue ;
177+ }
178+ Err ( _) => {
179+ last_err = Some ( anyhow ! ( "connect QUIC timed out" ) ) ;
180+ continue ;
181+ }
182+ } ;
183+ let ( send, recv) = match timeout ( dial_timeout, conn. open_bi ( ) ) . await {
184+ Ok ( Ok ( x) ) => x,
185+ Ok ( Err ( e) ) => {
186+ last_err = Some ( e. into ( ) ) ;
187+ continue ;
188+ }
189+ Err ( _) => {
190+ last_err = Some ( anyhow ! ( "open QUIC stream timed out" ) ) ;
191+ continue ;
192+ }
193+ } ;
194+
195+ let mut conns = self . connections . write ( ) . await ;
196+ conns. push ( conn) ;
197+ conns. truncate ( 4 ) ;
152198
153- self . connections . write ( ) . await . push ( conn ) ;
199+ trace ! ( "opened QUIC stream on new connection" , ) ;
154200
155- trace ! ( "opened QUIC stream on new connection" , ) ;
201+ return Ok ( QuicProxyStream { recv, send } ) ;
202+ }
156203
157- Ok ( QuicProxyStream { recv , send } )
204+ Err ( last_err . unwrap_or_else ( || anyhow ! ( "connect QUIC failed" ) ) )
158205 }
159206}
160207
0 commit comments