@@ -11,6 +11,7 @@ use async_std::prelude::*;
11
11
use async_std:: { io, task} ;
12
12
13
13
use futures_util:: future:: Either ;
14
+ use futures_util:: stream:: FuturesUnordered ;
14
15
15
16
/// This represents a tide [Listener](crate::listener::Listener) that
16
17
/// wraps an [async_std::os::unix::net::UnixListener]. It is implemented as an
@@ -25,6 +26,7 @@ pub struct UnixListener<State> {
25
26
listener : Option < net:: UnixListener > ,
26
27
server : Option < Server < State > > ,
27
28
info : Option < ListenInfo > ,
29
+ join_handles : Vec < task:: JoinHandle < ( ) > > ,
28
30
}
29
31
30
32
impl < State > UnixListener < State > {
@@ -34,6 +36,7 @@ impl<State> UnixListener<State> {
34
36
listener : None ,
35
37
server : None ,
36
38
info : None ,
39
+ join_handles : Vec :: new ( ) ,
37
40
}
38
41
}
39
42
@@ -43,11 +46,15 @@ impl<State> UnixListener<State> {
43
46
listener : Some ( unix_listener. into ( ) ) ,
44
47
server : None ,
45
48
info : None ,
49
+ join_handles : Vec :: new ( ) ,
46
50
}
47
51
}
48
52
}
49
53
50
- fn handle_unix < State : Clone + Send + Sync + ' static > ( app : Server < State > , stream : UnixStream ) {
54
+ fn handle_unix < State : Clone + Send + Sync + ' static > (
55
+ app : Server < State > ,
56
+ stream : UnixStream ,
57
+ ) -> task:: JoinHandle < ( ) > {
51
58
task:: spawn ( async move {
52
59
let local_addr = unix_socket_addr_to_string ( stream. local_addr ( ) ) ;
53
60
let peer_addr = unix_socket_addr_to_string ( stream. peer_addr ( ) ) ;
@@ -61,7 +68,7 @@ fn handle_unix<State: Clone + Send + Sync + 'static>(app: Server<State>, stream:
61
68
if let Err ( error) = fut. await {
62
69
log:: error!( "async-h1 error" , { error: error. to_string( ) } ) ;
63
70
}
64
- } ) ;
71
+ } )
65
72
}
66
73
67
74
#[ async_trait:: async_trait]
@@ -116,10 +123,19 @@ where
116
123
}
117
124
118
125
Ok ( stream) => {
119
- handle_unix ( server. clone ( ) , stream) ;
126
+ let handle = handle_unix ( server. clone ( ) , stream) ;
127
+ self . join_handles . push ( handle) ;
120
128
}
121
129
} ;
122
130
}
131
+
132
+ let join_handles = std:: mem:: take ( & mut self . join_handles ) ;
133
+ join_handles
134
+ . into_iter ( )
135
+ . collect :: < FuturesUnordered < task:: JoinHandle < ( ) > > > ( )
136
+ . collect :: < ( ) > ( )
137
+ . await ;
138
+
123
139
Ok ( ( ) )
124
140
}
125
141
0 commit comments