@@ -2,9 +2,8 @@ use std::{net::SocketAddr, sync::Arc, time::Duration};
22
33use anyhow:: { anyhow, Result } ;
44use bytes:: Bytes ;
5- use futures_util:: StreamExt ;
65use lunatic_process:: { env:: Environment , state:: ProcessState } ;
7- use quinn:: { ClientConfig , Connecting , Endpoint , Incoming , NewConnection , ServerConfig } ;
6+ use quinn:: { ClientConfig , Connecting , ConnectionError , Endpoint , ServerConfig } ;
87use rustls_pemfile:: Item ;
98use wasmtime:: ResourceLimiter ;
109
@@ -53,10 +52,7 @@ impl Client {
5352 retry : u32 ,
5453 ) -> Result < ( SendStream , RecvStream ) > {
5554 for _ in 0 ..retry {
56- let new_conn = self . inner . connect ( addr, name) ?. await ?;
57- let NewConnection {
58- connection : conn, ..
59- } = new_conn;
55+ let conn = self . inner . connect ( addr, name) ?. await ?;
6056 if let Ok ( ( send, recv) ) = conn. open_bi ( ) . await {
6157 return Ok ( ( SendStream { stream : send } , RecvStream { stream : recv } ) ) ;
6258 }
@@ -85,7 +81,7 @@ pub fn new_quic_client(ca_cert: &str) -> Result<Client> {
8581 Ok ( Client { inner : endpoint } )
8682}
8783
88- pub fn new_quic_server ( addr : SocketAddr , cert : & str , key : & str ) -> Result < ( Endpoint , Incoming ) > {
84+ pub fn new_quic_server ( addr : SocketAddr , cert : & str , key : & str ) -> Result < Endpoint > {
8985 let mut cert = cert. as_bytes ( ) ;
9086 let mut key = key. as_bytes ( ) ;
9187 let pk = rustls_pemfile:: read_one ( & mut key) ?. unwrap ( ) ;
@@ -112,10 +108,10 @@ pub fn new_quic_server(addr: SocketAddr, cert: &str, key: &str) -> Result<(Endpo
112108}
113109
114110pub async fn handle_accept_control (
115- quic_server : & mut ( Endpoint , Incoming ) ,
111+ quic_server : & mut Endpoint ,
116112 control_server : control:: server:: Server ,
117113) -> Result < ( ) > {
118- while let Some ( conn) = quic_server. 1 . next ( ) . await {
114+ while let Some ( conn) = quic_server. accept ( ) . await {
119115 tokio:: spawn ( handle_quic_stream ( conn, control_server. clone ( ) ) ) ;
120116 }
121117 Ok ( ( ) )
@@ -125,12 +121,19 @@ async fn handle_quic_stream(
125121 conn : Connecting ,
126122 control_server : control:: server:: Server ,
127123) -> Result < ( ) > {
128- let NewConnection { mut bi_streams, .. } = conn. await ?;
129- while let Some ( stream) = bi_streams. next ( ) . await {
130- if let Ok ( ( s, r) ) = stream {
131- let send = SendStream { stream : s } ;
132- let recv = RecvStream { stream : r } ;
133- tokio:: spawn ( handle_quic_connection ( send, recv, control_server. clone ( ) ) ) ;
124+ let conn = conn. await ?;
125+ loop {
126+ let stream = conn. accept_bi ( ) . await ;
127+ match stream {
128+ Ok ( ( s, r) ) => {
129+ let send = SendStream { stream : s } ;
130+ let recv = RecvStream { stream : r } ;
131+ tokio:: spawn ( handle_quic_connection ( send, recv, control_server. clone ( ) ) ) ;
132+ }
133+ Err ( ConnectionError :: LocallyClosed ) => {
134+ break ;
135+ }
136+ Err ( _) => { }
134137 }
135138 }
136139 Ok ( ( ) )
@@ -153,14 +156,14 @@ async fn handle_quic_connection(
153156}
154157
155158pub async fn handle_node_server < T , E > (
156- quic_server : & mut ( Endpoint , Incoming ) ,
159+ quic_server : & mut Endpoint ,
157160 ctx : distributed:: server:: ServerCtx < T , E > ,
158161) -> Result < ( ) >
159162where
160163 T : ProcessState + ResourceLimiter + DistributedCtx < E > + Send + ' static ,
161164 E : Environment + ' static ,
162165{
163- while let Some ( conn) = quic_server. 1 . next ( ) . await {
166+ while let Some ( conn) = quic_server. accept ( ) . await {
164167 tokio:: spawn ( handle_quic_connection_node ( ctx. clone ( ) , conn) ) ;
165168 }
166169 Ok ( ( ) )
@@ -174,12 +177,17 @@ where
174177 T : ProcessState + ResourceLimiter + DistributedCtx < E > + Send + ' static ,
175178 E : Environment + ' static ,
176179{
177- let NewConnection { mut bi_streams, .. } = conn. await ?;
178- while let Some ( stream) = bi_streams. next ( ) . await {
179- if let Ok ( ( s, r) ) = stream {
180- let send = SendStream { stream : s } ;
181- let recv = RecvStream { stream : r } ;
182- tokio:: spawn ( handle_quic_stream_node ( ctx. clone ( ) , send, recv) ) ;
180+ let conn = conn. await ?;
181+ loop {
182+ let stream = conn. accept_bi ( ) . await ;
183+ match stream {
184+ Ok ( ( s, r) ) => {
185+ let send = SendStream { stream : s } ;
186+ let recv = RecvStream { stream : r } ;
187+ tokio:: spawn ( handle_quic_stream_node ( ctx. clone ( ) , send, recv) ) ;
188+ }
189+ Err ( ConnectionError :: LocallyClosed ) => break ,
190+ Err ( _) => { }
183191 }
184192 }
185193 Ok ( ( ) )
0 commit comments