11/*******************************************************************************
2- * Copyright (c) 2017 Microsoft Corporation and others.
2+ * Copyright (c) 2017-2019 Microsoft Corporation and others.
33* All rights reserved. This program and the accompanying materials
44* are made available under the terms of the Eclipse Public License v1.0
55* which accompanies this distribution, and is available at
1717import java .io .InputStreamReader ;
1818import java .nio .charset .Charset ;
1919import java .nio .charset .StandardCharsets ;
20+ import java .util .stream .Stream ;
2021
21- import io .reactivex .functions .Consumer ;
22+ import com .microsoft .java .debug .core .protocol .Events .OutputEvent .Category ;
23+
24+ import io .reactivex .Observable ;
25+ import io .reactivex .schedulers .Schedulers ;
2226import io .reactivex .subjects .PublishSubject ;
2327
2428public class ProcessConsole {
25- private Process process ;
26- private String name ;
27- private Charset encoding ;
28- private PublishSubject <String > stdoutSubject = PublishSubject .<String >create ();
29- private PublishSubject <String > stderrSubject = PublishSubject .<String >create ();
30- private Thread stdoutThread = null ;
31- private Thread stderrThread = null ;
29+ private InputStreamObservable stdoutStream ;
30+ private InputStreamObservable stderrStream ;
31+ private Observable <ConsoleMessage > observable = null ;
3232
3333 public ProcessConsole (Process process ) {
3434 this (process , "Process" , StandardCharsets .UTF_8 );
@@ -44,76 +44,126 @@ public ProcessConsole(Process process) {
4444 * the process encoding format
4545 */
4646 public ProcessConsole (Process process , String name , Charset encoding ) {
47- this .process = process ;
48- this .name = name ;
49- this .encoding = encoding ;
47+ this .stdoutStream = new InputStreamObservable (name + " Stdout Handler" , process .getInputStream (), encoding );
48+ this .stderrStream = new InputStreamObservable (name + " Stderr Handler" , process .getErrorStream (), encoding );
49+ Observable <ConsoleMessage > stdout = this .stdoutStream .messages ().map ((message ) -> new ConsoleMessage (message , Category .stdout ));
50+ Observable <ConsoleMessage > stderr = this .stderrStream .messages ().map ((message ) -> new ConsoleMessage (message , Category .stderr ));
51+ this .observable = Observable .mergeArrayDelayError (stdout , stderr ).observeOn (Schedulers .newThread ());
5052 }
5153
5254 /**
53- * Start two separate threads to monitor the messages from stdout and stderr streams of the target process.
55+ * Start monitoring the stdout/ stderr streams of the target process.
5456 */
5557 public void start () {
56- this .stdoutThread = new Thread (this .name + " Stdout Handler" ) {
57- public void run () {
58- monitor (process .getInputStream (), stdoutSubject );
59- }
60- };
61- stdoutThread .setDaemon (true );
62- stdoutThread .start ();
63-
64- this .stderrThread = new Thread (this .name + " Stderr Handler" ) {
65- public void run () {
66- monitor (process .getErrorStream (), stderrSubject );
67- }
68- };
69- stderrThread .setDaemon (true );
70- stderrThread .start ();
58+ stdoutStream .start ();
59+ stderrStream .start ();
7160 }
7261
7362 /**
74- * Stop the process console handlers .
63+ * Stop monitoring the process console.
7564 */
7665 public void stop () {
77- if (this .stdoutThread != null ) {
78- this .stdoutThread .interrupt ();
79- this .stdoutThread = null ;
80- }
66+ stdoutStream .stop ();
67+ stderrStream .stop ();
68+ }
8169
82- if (this .stderrThread != null ) {
83- this .stderrThread .interrupt ();
84- this .stderrThread = null ;
85- }
70+ public Observable <ConsoleMessage > messages () {
71+ return observable ;
8672 }
8773
88- public void onStdout ( Consumer < String > callback ) {
89- stdoutSubject . subscribe ( callback );
74+ public Observable < ConsoleMessage > stdoutMessages ( ) {
75+ return this . messages (). filter (( message ) -> message . category == Category . stdout );
9076 }
9177
92- public void onStderr ( Consumer < String > callback ) {
93- stderrSubject . subscribe ( callback );
78+ public Observable < ConsoleMessage > stderrMessages ( ) {
79+ return this . messages (). filter (( message ) -> message . category == Category . stderr );
9480 }
9581
96- private void monitor (InputStream input , PublishSubject <String > subject ) {
97- BufferedReader reader = new BufferedReader (new InputStreamReader (input , encoding ));
98- final int BUFFERSIZE = 4096 ;
99- char [] buffer = new char [BUFFERSIZE ];
100- while (true ) {
101- try {
102- if (Thread .interrupted ()) {
103- subject .onComplete ();
104- return ;
82+ /**
83+ * Split the stdio message to lines, and return them as a new Observable.
84+ */
85+ public Observable <ConsoleMessage > lineMessages () {
86+ return this .messages ().map ((message ) -> {
87+ String [] lines = message .output .split ("(?<=\n )" );
88+ return Stream .of (lines ).map ((line ) -> new ConsoleMessage (line , message .category )).toArray (ConsoleMessage []::new );
89+ }).concatMap ((lines ) -> Observable .fromArray (lines ));
90+ }
91+
92+ public static class InputStreamObservable {
93+ private PublishSubject <String > rxSubject = PublishSubject .<String >create ();
94+ private String name ;
95+ private InputStream inputStream ;
96+ private Charset encoding ;
97+ private Thread loopingThread ;
98+
99+ /**
100+ * Constructor.
101+ */
102+ public InputStreamObservable (String name , InputStream inputStream , Charset encoding ) {
103+ this .name = name ;
104+ this .inputStream = inputStream ;
105+ this .encoding = encoding ;
106+ }
107+
108+ /**
109+ * Starts the stream.
110+ */
111+ public void start () {
112+ loopingThread = new Thread (name ) {
113+ public void run () {
114+ monitor (inputStream , rxSubject );
105115 }
106- int read = reader .read (buffer , 0 , BUFFERSIZE );
107- if (read == -1 ) {
108- subject .onComplete ();
116+ };
117+ loopingThread .setDaemon (true );
118+ loopingThread .start ();
119+ }
120+
121+ /**
122+ * Stops the stream.
123+ */
124+ public void stop () {
125+ if (loopingThread != null ) {
126+ loopingThread .interrupt ();
127+ loopingThread = null ;
128+ }
129+ }
130+
131+ private void monitor (InputStream input , PublishSubject <String > subject ) {
132+ BufferedReader reader = new BufferedReader (new InputStreamReader (input , encoding ));
133+ final int BUFFERSIZE = 4096 ;
134+ char [] buffer = new char [BUFFERSIZE ];
135+ while (true ) {
136+ try {
137+ if (Thread .interrupted ()) {
138+ subject .onComplete ();
139+ return ;
140+ }
141+ int read = reader .read (buffer , 0 , BUFFERSIZE );
142+ if (read == -1 ) {
143+ subject .onComplete ();
144+ return ;
145+ }
146+
147+ subject .onNext (new String (buffer , 0 , read ));
148+ } catch (IOException e ) {
149+ subject .onError (e );
109150 return ;
110151 }
111-
112- subject .onNext (new String (buffer , 0 , read ));
113- } catch (IOException e ) {
114- subject .onError (e );
115- return ;
116152 }
117153 }
154+
155+ public Observable <String > messages () {
156+ return rxSubject ;
157+ }
158+ }
159+
160+ public static class ConsoleMessage {
161+ public String output ;
162+ public Category category ;
163+
164+ public ConsoleMessage (String message , Category category ) {
165+ this .output = message ;
166+ this .category = category ;
167+ }
118168 }
119169}
0 commit comments