Skip to content

Commit afdb24e

Browse files
committed
Provide selectable channels for subprocess std i/o
1 parent ca3b864 commit afdb24e

File tree

2 files changed

+218
-5
lines changed

2 files changed

+218
-5
lines changed

graalpython/com.oracle.graal.python/src/com/oracle/graal/python/builtins/modules/PosixSubprocessModuleBuiltins.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,6 @@
4545
import java.lang.ProcessBuilder.Redirect;
4646
import java.nio.ByteBuffer;
4747
import java.nio.channels.Channel;
48-
import java.nio.channels.Channels;
4948
import java.nio.channels.WritableByteChannel;
5049
import java.util.ArrayList;
5150
import java.util.HashMap;
@@ -76,6 +75,7 @@
7675
import com.oracle.graal.python.runtime.PythonContext;
7776
import com.oracle.graal.python.runtime.PythonOptions;
7877
import com.oracle.graal.python.runtime.sequence.storage.SequenceStorage;
78+
import com.oracle.graal.python.runtime.ProcessWrapper;
7979
import com.oracle.truffle.api.CompilerDirectives.TruffleBoundary;
8080
import com.oracle.truffle.api.TruffleFile;
8181
import com.oracle.truffle.api.TruffleLanguage.Env;
@@ -262,20 +262,20 @@ private int exec(ArrayList<String> argStrings, File cwd, Map<String, String> env
262262
pb.directory(cwd);
263263
pb.environment().putAll(env);
264264

265-
Process process = pb.start();
265+
ProcessWrapper process = new ProcessWrapper(pb.start(), p2cwrite != -1, c2pread != 1, errread != -1);
266266
try {
267267
if (p2cwrite != -1) {
268268
// user code is expected to close the unused ends of the pipes
269269
resources.getFileChannel(p2cwrite).close();
270-
resources.fdopen(p2cwrite, Channels.newChannel(process.getOutputStream()));
270+
resources.fdopen(p2cwrite, process.getOutputChannel());
271271
}
272272
if (c2pread != -1) {
273273
resources.getFileChannel(c2pread).close();
274-
resources.fdopen(c2pread, Channels.newChannel(process.getInputStream()));
274+
resources.fdopen(c2pread, process.getInputChannel());
275275
}
276276
if (errread != -1) {
277277
resources.getFileChannel(errread).close();
278-
resources.fdopen(errread, Channels.newChannel(process.getErrorStream()));
278+
resources.fdopen(errread, process.getErrorChannel());
279279
}
280280
} catch (IOException ex) {
281281
// We only want to rethrow the IOException that may come out of pb.start()
Lines changed: 213 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,213 @@
1+
/*
2+
* Copyright (c) 2020, Oracle and/or its affiliates. All rights reserved.
3+
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4+
*
5+
* The Universal Permissive License (UPL), Version 1.0
6+
*
7+
* Subject to the condition set forth below, permission is hereby granted to any
8+
* person obtaining a copy of this software, associated documentation and/or
9+
* data (collectively the "Software"), free of charge and under any and all
10+
* copyright rights in the Software, and any and all patent rights owned or
11+
* freely licensable by each licensor hereunder covering either (i) the
12+
* unmodified Software as contributed to or provided by such licensor, or (ii)
13+
* the Larger Works (as defined below), to deal in both
14+
*
15+
* (a) the Software, and
16+
*
17+
* (b) any piece of software and/or hardware listed in the lrgrwrks.txt file if
18+
* one is included with the Software each a "Larger Work" to which the Software
19+
* is contributed by such licensors),
20+
*
21+
* without restriction, including without limitation the rights to copy, create
22+
* derivative works of, display, perform, and distribute the Software and make,
23+
* use, sell, offer for sale, import, export, have made, and have sold the
24+
* Software and the Larger Work(s), and to sublicense the foregoing rights on
25+
* either these or other terms.
26+
*
27+
* This license is subject to the following condition:
28+
*
29+
* The above copyright notice and either this complete permission notice or at a
30+
* minimum a reference to the UPL must be included in all copies or substantial
31+
* portions of the Software.
32+
*
33+
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
34+
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
35+
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
36+
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
37+
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
38+
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
39+
* SOFTWARE.
40+
*/
41+
package com.oracle.graal.python.runtime;
42+
43+
import java.io.IOException;
44+
import java.io.InputStream;
45+
import java.io.OutputStream;
46+
import java.nio.ByteBuffer;
47+
import java.nio.channels.Channel;
48+
import java.nio.channels.Channels;
49+
import java.nio.channels.Pipe;
50+
import java.nio.channels.ReadableByteChannel;
51+
import java.nio.channels.WritableByteChannel;
52+
53+
/**
54+
* Extends a Java {@link Process} by providing selectable channels for reading the child's stdout
55+
* and stderr and writing to child's stdin. This is accomplished by creating (for each stream) a
56+
* {@link Pipe} and a thread that 'pumps' the data between the pipe and the stdio/stdout/stderr of
57+
* the child process. This has several limitations and drawbacks:
58+
* <ul>
59+
* <li>needs extra resources (pipes, threads)</li>
60+
* <li>pumping data between channels introduces buffering</li>
61+
* <li>errors detected in the pumping threads need to be reported by the other end of the pipe,
62+
* which obscures the stacktrace and makes debugging more difficult</li>
63+
* <li>closing one of the channels provided by this class might not result in immediate closing of
64+
* the child's stream - this means that the child might not detect EPIPE in the first write
65+
* operation it attempts</li>
66+
* </ul>
67+
*/
68+
public final class ProcessWrapper extends Process {
69+
70+
private final Process process;
71+
private final Pipe inPipe;
72+
private final Pipe outPipe;
73+
private final Pipe errPipe;
74+
private final ChannelPump inThread;
75+
private final ChannelPump outThread;
76+
private final ChannelPump errThread;
77+
78+
public ProcessWrapper(Process process, boolean pipeStdin, boolean pipeStdout, boolean pipeStderr) throws IOException {
79+
this.process = process;
80+
if (pipeStdin) {
81+
inPipe = Pipe.open();
82+
inThread = new ChannelPump(inPipe.source(), Channels.newChannel(process.getOutputStream()));
83+
inThread.start();
84+
} else {
85+
inPipe = null;
86+
inThread = null;
87+
}
88+
if (pipeStdout) {
89+
outPipe = Pipe.open();
90+
outThread = new ChannelPump(Channels.newChannel(process.getInputStream()), outPipe.sink());
91+
outThread.start();
92+
} else {
93+
outPipe = null;
94+
outThread = null;
95+
}
96+
if (pipeStderr) {
97+
errPipe = Pipe.open();
98+
errThread = new ChannelPump(Channels.newChannel(process.getErrorStream()), errPipe.sink());
99+
errThread.start();
100+
} else {
101+
errPipe = null;
102+
errThread = null;
103+
}
104+
}
105+
106+
public Channel getOutputChannel() {
107+
assert inPipe != null;
108+
return inPipe.sink();
109+
}
110+
111+
public Channel getInputChannel() {
112+
assert outPipe != null;
113+
return outPipe.source();
114+
}
115+
116+
public Channel getErrorChannel() {
117+
assert errPipe != null;
118+
return errPipe.source();
119+
}
120+
121+
@Override
122+
public OutputStream getOutputStream() {
123+
throw new UnsupportedOperationException();
124+
}
125+
126+
@Override
127+
public InputStream getInputStream() {
128+
throw new UnsupportedOperationException();
129+
}
130+
131+
@Override
132+
public InputStream getErrorStream() {
133+
throw new UnsupportedOperationException();
134+
}
135+
136+
@Override
137+
public int waitFor() throws InterruptedException {
138+
int retVal = process.waitFor();
139+
if (inThread != null) {
140+
inThread.join();
141+
}
142+
if (outThread != null) {
143+
outThread.join();
144+
}
145+
if (errThread != null) {
146+
errThread.join();
147+
}
148+
return retVal;
149+
}
150+
151+
@Override
152+
public int exitValue() {
153+
return process.exitValue();
154+
}
155+
156+
@Override
157+
public void destroy() {
158+
process.destroy();
159+
}
160+
161+
@Override
162+
public Process destroyForcibly() {
163+
return process.destroyForcibly();
164+
}
165+
166+
@Override
167+
public boolean isAlive() {
168+
return process.isAlive();
169+
}
170+
171+
static class ChannelPump extends Thread {
172+
173+
private static final int BUF_SIZE = 8192;
174+
175+
private final ReadableByteChannel source;
176+
private final WritableByteChannel sink;
177+
178+
ChannelPump(ReadableByteChannel source, WritableByteChannel sink) {
179+
this.source = source;
180+
this.sink = sink;
181+
}
182+
183+
@Override
184+
public void run() {
185+
ByteBuffer buf = ByteBuffer.allocate(BUF_SIZE);
186+
try {
187+
while (true) {
188+
buf.clear();
189+
int r = source.read(buf);
190+
if (r == -1) {
191+
return;
192+
}
193+
buf.flip();
194+
while (buf.hasRemaining()) {
195+
sink.write(buf);
196+
}
197+
}
198+
} catch (IOException e) {
199+
// TODO report the error to the other end of the pipe
200+
// (for now just close both channels)
201+
} finally {
202+
try {
203+
sink.close();
204+
} catch (IOException ignored) {
205+
}
206+
try {
207+
source.close();
208+
} catch (IOException ignored) {
209+
}
210+
}
211+
}
212+
}
213+
}

0 commit comments

Comments
 (0)