@@ -7,6 +7,8 @@ use std::process::{Command, ExitStatus, Stdio};
7
7
use tokio:: {
8
8
io:: { AsyncBufReadExt , AsyncRead , BufReader } ,
9
9
process:: { Child , Command as AsyncCommand } ,
10
+ select,
11
+ sync:: watch:: { channel, Receiver } ,
10
12
task:: { self , JoinHandle } ,
11
13
} ;
12
14
@@ -31,19 +33,21 @@ impl Process {
31
33
// signal.
32
34
cmd. kill_on_drop ( true ) ;
33
35
34
- // println!("AsyncCommand: {:?}", cmd);
35
36
let mut child = cmd. spawn ( ) . expect ( "Spawning subprocess should succeed" ) ;
36
- // println!("Child: {:?}", child);
37
37
38
38
let stdout = child. stdout . take ( ) . unwrap ( ) ;
39
+ let ( kill_tx, kill_watch) = channel :: < bool > ( false ) ;
40
+
39
41
let stdout_jh = task:: spawn ( Self :: listen_on_channel (
42
+ kill_watch. clone ( ) ,
40
43
task_id. clone ( ) ,
41
44
log. clone ( ) ,
42
45
ChannelName :: StdOut ,
43
46
stdout,
44
47
) ) ;
45
48
let stderr = child. stderr . take ( ) . unwrap ( ) ;
46
49
let stderr_jh = task:: spawn ( Self :: listen_on_channel (
50
+ kill_watch,
47
51
task_id,
48
52
log. clone ( ) ,
49
53
ChannelName :: StdErr ,
@@ -60,6 +64,7 @@ impl Process {
60
64
let pid = Pid :: from_raw ( self_. child . id ( ) . unwrap ( ) as i32 ) ;
61
65
move || {
62
66
let _ = kill ( pid, Signal :: SIGKILL ) ;
67
+ let _ = kill_tx. send ( true ) ;
63
68
}
64
69
} ;
65
70
@@ -77,21 +82,34 @@ impl Process {
77
82
child. wait ( ) . await
78
83
}
79
84
80
- async fn listen_on_channel < R > ( task_id : TaskId , log : Logger , channel_tag : ChannelName , src : R )
81
- where
85
+ async fn listen_on_channel < R > (
86
+ mut kill_watch : Receiver < bool > ,
87
+ task_id : TaskId ,
88
+ log : Logger ,
89
+ channel_tag : ChannelName ,
90
+ src : R ,
91
+ ) where
82
92
R : AsyncRead + Unpin ,
83
93
{
84
94
let buffered_reader = BufReader :: new ( src) ;
85
95
let mut lines = buffered_reader. lines ( ) ;
86
96
loop {
87
- match lines. next_line ( ) . await {
88
- Ok ( Some ( line) ) => {
89
- let task_id: String = format ! ( "{}" , task_id) ;
90
- let output_channel: String = format ! ( "{:?}" , channel_tag) ;
91
- info ! ( log, "{}" , line; "task_id" => task_id, "output_channel" => output_channel)
97
+ select ! {
98
+ line_res = lines. next_line( ) => {
99
+ match line_res {
100
+ Ok ( Some ( line) ) => {
101
+ let task_id: String = format!( "{}" , task_id) ;
102
+ let output_channel: String = format!( "{:?}" , channel_tag) ;
103
+ info!( log, "{}" , line; "task_id" => task_id, "output_channel" => output_channel)
104
+ }
105
+ Ok ( None ) => break ,
106
+ Err ( e) => eprintln!( "listen_on_channel(): {:?}" , e) ,
107
+ }
108
+ }
109
+ _ = kill_watch. changed( ) => {
110
+ info!( log, "({}|{:?}): Kill received." , task_id, channel_tag) ;
111
+ return ;
92
112
}
93
- Ok ( None ) => break ,
94
- Err ( e) => eprintln ! ( "listen_on_channel(): {:?}" , e) ,
95
113
}
96
114
}
97
115
info ! ( log, "({}|{:?}): Channel has closed." , task_id, channel_tag) ;
0 commit comments