11use std:: { process:: Stdio , str:: from_utf8} ;
2- use tokio:: io:: AsyncReadExt ;
2+ use tokio:: io:: { AsyncReadExt , Stdout } ;
33
4+ use tokio:: process:: { ChildStderr , ChildStdout } ;
45use tokio:: select;
56use tokio:: sync:: mpsc:: Receiver ;
67use tokio:: { io:: BufReader , process:: Command , sync:: mpsc:: Sender , task} ;
@@ -13,22 +14,16 @@ pub enum ProcessEvent {
1314 Stop ,
1415}
1516
16- pub fn process_manager_start (
17- receiver : Receiver < ProcessEvent > ,
18- parent_sender : Sender < AppEvent > ,
19- ) {
17+ pub fn process_manager_start ( receiver : Receiver < ProcessEvent > , parent_sender : Sender < AppEvent > ) {
2018 task:: spawn ( async move {
2119 match process_manager_loop ( receiver, parent_sender. clone ( ) ) . await {
2220 Ok ( _) => ( ) ,
2321 Err ( e) => {
2422 parent_sender
25- . send ( AppEvent :: ChannelLog (
26- "notice" . to_string ( ) ,
27- e. to_string ( ) ,
28- ) )
23+ . send ( AppEvent :: ChannelLog ( "notice" . to_string ( ) , e. to_string ( ) ) )
2924 . await
3025 . unwrap_or_default ( ) ;
31- } ,
26+ }
3227 } ;
3328 } ) ;
3429}
@@ -39,10 +34,12 @@ async fn process_manager_loop(
3934) -> Result < ( ) , anyhow:: Error > {
4035 loop {
4136 let cmd = receiver. recv ( ) . await ;
37+
4238 let event = match cmd {
4339 Some ( event) => event,
4440 None => continue ,
4541 } ;
42+
4643 let args = match event {
4744 ProcessEvent :: Start ( args) => args,
4845 ProcessEvent :: Stop => continue ,
@@ -61,58 +58,35 @@ async fn process_manager_loop(
6158 . args ( & args[ 1 ..] )
6259 . spawn ( ) ?;
6360
64- let mut stdoutreader = BufReader :: new ( process. stdout . take ( ) . unwrap ( ) ) ;
65- let mut stderrreader = BufReader :: new ( process. stderr . take ( ) . unwrap ( ) ) ;
6661
6762 let sender = parent_sender. clone ( ) ;
6863
69- let io_task = task:: spawn ( async move {
70- loop {
71- let mut stdout_buff = [ 0 ; 255 ] ;
72- let mut stderr_buff = [ 0 ; 255 ] ;
73-
74- select ! {
75- read = stdoutreader. read( & mut stdout_buff) => {
76- if let Ok ( s) = from_utf8( & stdout_buff[ ..read. unwrap( ) ] ) {
77- if s. is_empty( ) {
78- return ;
79- }
80- sender
81- . send( AppEvent :: ChannelLog ( "stdout" . to_string( ) , s. to_string( ) ) )
82- . await . unwrap_or_default( ) ;
83- } ;
84- } ,
85- read = stderrreader. read( & mut stderr_buff) => {
86- if let Ok ( s) = from_utf8( & stderr_buff[ ..read. unwrap( ) ] ) {
87- if s. is_empty( ) {
88- return ;
89- }
90- sender
91- . send(
92- AppEvent :: ChannelLog ( "stderr" . to_string( ) , s. to_string( ) )
93- ) . await . unwrap_or_default( ) ;
94- } ;
95- } ,
96- } ;
97- }
98- } ) ;
64+ let io_task = task:: spawn ( io_loop (
65+ process. stdout . take ( ) . unwrap ( ) ,
66+ process. stderr . take ( ) . unwrap ( ) ,
67+ sender
68+ ) ) ;
9969
10070 let sender = parent_sender. clone ( ) ;
71+
10172 loop {
10273 select ! {
10374 exit_code = process. wait( ) => {
104- if let Ok ( exit_code) = exit_code {
105- if exit_code. code( ) . unwrap_or_default( ) != 0 {
106- let _ = sender. send(
107- AppEvent :: NotifyError (
108- format!(
109- "Process '{:?}' exited with code {}" ,
110- args,
111- exit_code. code( ) . unwrap_or_default( )
112- )
75+ let exit_code = match exit_code {
76+ Ok ( e) => e,
77+ // if we
78+ Err ( _) => break ,
79+ } ;
80+ if exit_code. code( ) . unwrap_or_default( ) != 0 {
81+ let _ = sender. send(
82+ AppEvent :: NotifyError (
83+ format!(
84+ "Process '{:?}' exited with code {}" ,
85+ args,
86+ exit_code. code( ) . unwrap_or_default( )
11387 )
114- ) . await ;
115- }
88+ )
89+ ) . await ;
11690 }
11791 break ;
11892 } ,
@@ -134,3 +108,41 @@ async fn process_manager_loop(
134108 }
135109 }
136110}
111+
112+ async fn io_loop (
113+ stdout : ChildStdout ,
114+ stderr : ChildStderr ,
115+ sender : Sender < AppEvent > ,
116+ ) {
117+
118+ let mut stdoutreader = BufReader :: new ( stdout) ;
119+ let mut stderrreader = BufReader :: new ( stderr) ;
120+ loop {
121+ let mut stdout_buff = [ 0 ; 255 ] ;
122+ let mut stderr_buff = [ 0 ; 255 ] ;
123+
124+ select ! {
125+ read = stdoutreader. read( & mut stdout_buff) => {
126+ if let Ok ( s) = from_utf8( & stdout_buff[ ..read. unwrap( ) ] ) {
127+ if s. is_empty( ) {
128+ return ;
129+ }
130+ sender
131+ . send( AppEvent :: ChannelLog ( "stdout" . to_string( ) , s. to_string( ) ) )
132+ . await . unwrap_or_default( ) ;
133+ } ;
134+ } ,
135+ read = stderrreader. read( & mut stderr_buff) => {
136+ if let Ok ( s) = from_utf8( & stderr_buff[ ..read. unwrap( ) ] ) {
137+ if s. is_empty( ) {
138+ return ;
139+ }
140+ sender
141+ . send(
142+ AppEvent :: ChannelLog ( "stderr" . to_string( ) , s. to_string( ) )
143+ ) . await . unwrap_or_default( ) ;
144+ } ;
145+ } ,
146+ } ;
147+ }
148+ }
0 commit comments