11use std:: process:: Stdio ;
22
3+ use futures:: future:: Future ;
34use process_wrap:: tokio:: { TokioChildWrapper , TokioCommandWrap } ;
45use tokio:: {
56 io:: AsyncRead ,
67 process:: { ChildStderr , ChildStdin , ChildStdout } ,
78} ;
89
9- use super :: { IntoTransport , Transport } ;
10- use crate :: service :: ServiceRole ;
10+ use super :: { RxJsonRpcMessage , Transport , TxJsonRpcMessage , async_rw :: AsyncRwTransport } ;
11+ use crate :: RoleClient ;
1112
13+ const MAX_WAIT_ON_DROP_SECS : u64 = 3 ;
1214/// The parts of a child process.
1315type ChildProcessParts = (
1416 Box < dyn TokioChildWrapper > ,
@@ -36,18 +38,23 @@ fn child_process(mut child: Box<dyn TokioChildWrapper>) -> std::io::Result<Child
3638
3739pub struct TokioChildProcess {
3840 child : ChildWithCleanup ,
39- child_stdin : ChildStdin ,
40- child_stdout : ChildStdout ,
41+ transport : AsyncRwTransport < RoleClient , ChildStdout , ChildStdin > ,
4142}
4243
4344pub struct ChildWithCleanup {
44- inner : Box < dyn TokioChildWrapper > ,
45+ inner : Option < Box < dyn TokioChildWrapper > > ,
4546}
4647
4748impl Drop for ChildWithCleanup {
4849 fn drop ( & mut self ) {
49- if let Err ( e) = self . inner . start_kill ( ) {
50- tracing:: warn!( "Failed to kill child process: {e}" ) ;
50+ // We should not use start_kill(), instead we should use kill() to avoid zombies
51+ if let Some ( mut inner) = self . inner . take ( ) {
52+ // We don't care about the result, just try to kill it
53+ tokio:: spawn ( async move {
54+ if let Err ( e) = Box :: into_pin ( inner. kill ( ) ) . await {
55+ tracing:: warn!( "Error killing child process: {}" , e) ;
56+ }
57+ } ) ;
5158 }
5259 }
5360}
@@ -64,7 +71,7 @@ pin_project_lite::pin_project! {
6471impl TokioChildProcessOut {
6572 /// Get the process ID of the child process.
6673 pub fn id ( & self ) -> Option < u32 > {
67- self . child . inner . id ( )
74+ self . child . inner . as_ref ( ) ? . id ( )
6875 }
6976}
7077
@@ -92,23 +99,51 @@ impl TokioChildProcess {
9299
93100 /// Get the process ID of the child process.
94101 pub fn id ( & self ) -> Option < u32 > {
95- self . child . inner . id ( )
102+ self . child . inner . as_ref ( ) ?. id ( )
103+ }
104+
105+ /// Gracefully shutdown the child process
106+ ///
107+ /// This will first wait for the child process to exit normally with a timeout.
108+ /// If the child process doesn't exit within the timeout, it will be killed.
109+ pub async fn graceful_shutdown ( & mut self ) -> std:: io:: Result < ( ) > {
110+ if let Some ( mut child) = self . child . inner . take ( ) {
111+ let wait_fut = Box :: into_pin ( child. wait ( ) ) ;
112+ tokio:: select! {
113+ _ = tokio:: time:: sleep( std:: time:: Duration :: from_secs( MAX_WAIT_ON_DROP_SECS ) ) => {
114+ if let Err ( e) = Box :: into_pin( child. kill( ) ) . await {
115+ tracing:: warn!( "Error killing child: {e}" ) ;
116+ return Err ( e) ;
117+ }
118+ } ,
119+ res = wait_fut => {
120+ match res {
121+ Ok ( status) => {
122+ tracing:: info!( "Child exited gracefully {}" , status) ;
123+ }
124+ Err ( e) => {
125+ tracing:: warn!( "Error waiting for child: {e}" ) ;
126+ return Err ( e) ;
127+ }
128+ }
129+ }
130+ }
131+ }
132+ Ok ( ( ) )
133+ }
134+
135+ /// Take ownership of the inner child process
136+ pub fn into_inner ( mut self ) -> Option < Box < dyn TokioChildWrapper > > {
137+ self . child . inner . take ( )
96138 }
97139
98140 /// Split this helper into a reader (stdout) and writer (stdin).
141+ #[ deprecated(
142+ since = "0.5.0" ,
143+ note = "use the Transport trait implementation instead"
144+ ) ]
99145 pub fn split ( self ) -> ( TokioChildProcessOut , ChildStdin ) {
100- let TokioChildProcess {
101- child,
102- child_stdin,
103- child_stdout,
104- } = self ;
105- (
106- TokioChildProcessOut {
107- child,
108- child_stdout,
109- } ,
110- child_stdin,
111- )
146+ unimplemented ! ( "This method is deprecated, use the Transport trait implementation instead" ) ;
112147 }
113148}
114149
@@ -156,20 +191,31 @@ impl TokioChildProcessBuilder {
156191
157192 let ( child, stdout, stdin, stderr_opt) = child_process ( self . cmd . spawn ( ) ?) ?;
158193
194+ let transport = AsyncRwTransport :: new ( stdout, stdin) ;
159195 let proc = TokioChildProcess {
160- child : ChildWithCleanup { inner : child } ,
161- child_stdin : stdin,
162- child_stdout : stdout,
196+ child : ChildWithCleanup { inner : Some ( child) } ,
197+ transport,
163198 } ;
164199 Ok ( ( proc, stderr_opt) )
165200 }
166201}
167202
168- impl < R : ServiceRole > IntoTransport < R , std:: io:: Error , ( ) > for TokioChildProcess {
169- fn into_transport ( self ) -> impl Transport < R , Error = std:: io:: Error > + ' static {
170- IntoTransport :: < R , std:: io:: Error , super :: async_rw:: TransportAdapterAsyncRW > :: into_transport (
171- self . split ( ) ,
172- )
203+ impl Transport < RoleClient > for TokioChildProcess {
204+ type Error = std:: io:: Error ;
205+
206+ fn send (
207+ & mut self ,
208+ item : TxJsonRpcMessage < RoleClient > ,
209+ ) -> impl Future < Output = Result < ( ) , Self :: Error > > + Send + ' static {
210+ self . transport . send ( item)
211+ }
212+
213+ fn receive ( & mut self ) -> impl Future < Output = Option < RxJsonRpcMessage < RoleClient > > > + Send {
214+ self . transport . receive ( )
215+ }
216+
217+ fn close ( & mut self ) -> impl Future < Output = Result < ( ) , Self :: Error > > + Send {
218+ self . graceful_shutdown ( )
173219 }
174220}
175221
@@ -183,3 +229,78 @@ impl ConfigureCommandExt for tokio::process::Command {
183229 self
184230 }
185231}
232+
233+ #[ cfg( unix) ]
234+ #[ cfg( test) ]
235+ mod tests {
236+ use tokio:: process:: Command ;
237+
238+ use super :: * ;
239+
240+ #[ tokio:: test]
241+ async fn test_tokio_child_process_drop ( ) {
242+ let r = TokioChildProcess :: new ( Command :: new ( "sleep" ) . configure ( |cmd| {
243+ cmd. arg ( "30" ) ;
244+ } ) ) ;
245+ assert ! ( r. is_ok( ) ) ;
246+ let child_process = r. unwrap ( ) ;
247+ let id = child_process. id ( ) ;
248+ assert ! ( id. is_some( ) ) ;
249+ let id = id. unwrap ( ) ;
250+ // Drop the child process
251+ drop ( child_process) ;
252+ // Wait a moment to allow the cleanup task to run
253+ tokio:: time:: sleep ( std:: time:: Duration :: from_secs ( MAX_WAIT_ON_DROP_SECS + 1 ) ) . await ;
254+ // Check if the process is still running
255+ let status = Command :: new ( "ps" )
256+ . arg ( "-p" )
257+ . arg ( id. to_string ( ) )
258+ . status ( )
259+ . await ;
260+ match status {
261+ Ok ( status) => {
262+ assert ! (
263+ !status. success( ) ,
264+ "Process with PID {} is still running" ,
265+ id
266+ ) ;
267+ }
268+ Err ( e) => {
269+ panic ! ( "Failed to check process status: {}" , e) ;
270+ }
271+ }
272+ }
273+
274+ #[ tokio:: test]
275+ async fn test_tokio_child_process_graceful_shutdown ( ) {
276+ let r = TokioChildProcess :: new ( Command :: new ( "sleep" ) . configure ( |cmd| {
277+ cmd. arg ( "30" ) ;
278+ } ) ) ;
279+ assert ! ( r. is_ok( ) ) ;
280+ let mut child_process = r. unwrap ( ) ;
281+ let id = child_process. id ( ) ;
282+ assert ! ( id. is_some( ) ) ;
283+ let id = id. unwrap ( ) ;
284+ child_process. graceful_shutdown ( ) . await . unwrap ( ) ;
285+ // Wait a moment to allow the cleanup task to run
286+ tokio:: time:: sleep ( std:: time:: Duration :: from_secs ( MAX_WAIT_ON_DROP_SECS + 1 ) ) . await ;
287+ // Check if the process is still running
288+ let status = Command :: new ( "ps" )
289+ . arg ( "-p" )
290+ . arg ( id. to_string ( ) )
291+ . status ( )
292+ . await ;
293+ match status {
294+ Ok ( status) => {
295+ assert ! (
296+ !status. success( ) ,
297+ "Process with PID {} is still running" ,
298+ id
299+ ) ;
300+ }
301+ Err ( e) => {
302+ panic ! ( "Failed to check process status: {}" , e) ;
303+ }
304+ }
305+ }
306+ }
0 commit comments