@@ -11,7 +11,8 @@ use async_std::prelude::*;
1111use async_std:: { io, task} ;
1212
1313use futures_util:: future:: Either ;
14- use futures_util:: stream:: FuturesUnordered ;
14+
15+ use waitgroup:: { WaitGroup , Worker } ;
1516
1617/// This represents a tide [Listener](crate::listener::Listener) that
1718/// wraps an [async_std::os::unix::net::UnixListener]. It is implemented as an
@@ -26,7 +27,6 @@ pub struct UnixListener<State> {
2627 listener : Option < net:: UnixListener > ,
2728 server : Option < Server < State > > ,
2829 info : Option < ListenInfo > ,
29- join_handles : Vec < task:: JoinHandle < ( ) > > ,
3030}
3131
3232impl < State > UnixListener < State > {
@@ -36,7 +36,6 @@ impl<State> UnixListener<State> {
3636 listener : None ,
3737 server : None ,
3838 info : None ,
39- join_handles : Vec :: new ( ) ,
4039 }
4140 }
4241
@@ -46,16 +45,18 @@ impl<State> UnixListener<State> {
4645 listener : Some ( unix_listener. into ( ) ) ,
4746 server : None ,
4847 info : None ,
49- join_handles : Vec :: new ( ) ,
5048 }
5149 }
5250}
5351
5452fn handle_unix < State : Clone + Send + Sync + ' static > (
5553 app : Server < State > ,
5654 stream : UnixStream ,
57- ) -> task:: JoinHandle < ( ) > {
55+ wait_group_worker : Worker ,
56+ ) {
5857 task:: spawn ( async move {
58+ let _wait_group_worker = wait_group_worker;
59+
5960 let local_addr = unix_socket_addr_to_string ( stream. local_addr ( ) ) ;
6061 let peer_addr = unix_socket_addr_to_string ( stream. peer_addr ( ) ) ;
6162
@@ -76,7 +77,7 @@ fn handle_unix<State: Clone + Send + Sync + 'static>(
7677 if let Err ( error) = fut. await {
7778 log:: error!( "async-h1 error" , { error: error. to_string( ) } ) ;
7879 }
79- } )
80+ } ) ;
8081}
8182
8283#[ async_trait:: async_trait]
@@ -119,6 +120,7 @@ where
119120 } else {
120121 Either :: Right ( incoming)
121122 } ;
123+ let wait_group = WaitGroup :: new ( ) ;
122124
123125 while let Some ( stream) = incoming. next ( ) . await {
124126 match stream {
@@ -131,18 +133,12 @@ where
131133 }
132134
133135 Ok ( stream) => {
134- let handle = handle_unix ( server. clone ( ) , stream) ;
135- self . join_handles . push ( handle) ;
136+ handle_unix ( server. clone ( ) , stream, wait_group. worker ( ) ) ;
136137 }
137138 } ;
138139 }
139140
140- let join_handles = std:: mem:: take ( & mut self . join_handles ) ;
141- join_handles
142- . into_iter ( )
143- . collect :: < FuturesUnordered < task:: JoinHandle < ( ) > > > ( )
144- . collect :: < ( ) > ( )
145- . await ;
141+ wait_group. wait ( ) . await ;
146142
147143 Ok ( ( ) )
148144 }
0 commit comments