Skip to content

Commit 1adc692

Browse files
committed
[GR-22298] Properly implement 'select.select' for 'os.pipe' pipes.
PullRequest: graalpython/880
2 parents 00a086b + 39b7d4b commit 1adc692

File tree

6 files changed

+469
-10
lines changed

6 files changed

+469
-10
lines changed

graalpython/com.oracle.graal.python.test/src/com/oracle/graal/python/test/PythonTests.java

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@
4545
import java.nio.file.Files;
4646
import java.nio.file.Path;
4747
import java.nio.file.Paths;
48+
import java.util.Collections;
49+
import java.util.Map;
4850

4951
import org.graalvm.polyglot.Context;
5052
import org.graalvm.polyglot.Engine;
@@ -97,10 +99,14 @@ public class PythonTests {
9799
}
98100

99101
public static void enterContext(String... newArgs) {
102+
enterContext(Collections.emptyMap(), newArgs);
103+
}
104+
105+
public static void enterContext(Map<String, String> options, String[] args) {
100106
PythonTests.outArray.reset();
101107
PythonTests.errArray.reset();
102108
Context prevContext = context;
103-
context = Context.newBuilder().engine(engine).allowExperimentalOptions(true).allowAllAccess(true).arguments("python", newArgs).option("python.Executable", executable).build();
109+
context = Context.newBuilder().engine(engine).allowExperimentalOptions(true).allowAllAccess(true).options(options).arguments("python", args).option("python.Executable", executable).build();
104110
context.initialize("python");
105111
if (prevContext != null) {
106112
closeContext(prevContext);
@@ -333,6 +339,15 @@ public static Value runScript(String[] args, File path, OutputStream out, Output
333339
}
334340
}
335341

342+
public static Value runScript(Map<String, String> options, String[] args, String source, OutputStream out, OutputStream err) {
343+
try {
344+
enterContext(options, args);
345+
return context.eval(org.graalvm.polyglot.Source.create("python", source));
346+
} finally {
347+
flush(out, err);
348+
}
349+
}
350+
336351
public static Value runScript(String[] args, String source, OutputStream out, OutputStream err) {
337352
try {
338353
enterContext(args);
@@ -352,8 +367,12 @@ public static Value runScript(String[] args, org.graalvm.polyglot.Source source,
352367
}
353368

354369
public static Value runScript(String[] args, String source, OutputStream out, OutputStream err, Runnable cb) {
370+
return runScript(Collections.emptyMap(), args, source, out, err, cb);
371+
}
372+
373+
public static Value runScript(Map<String, String> options, String[] args, String source, OutputStream out, OutputStream err, Runnable cb) {
355374
try {
356-
enterContext(args);
375+
enterContext(options, args);
357376
return context.eval(org.graalvm.polyglot.Source.create("python", source));
358377
} finally {
359378
cb.run();
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
/*
2+
* Copyright (c) 2020, Oracle and/or its affiliates. All rights reserved.
3+
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4+
*
5+
* The Universal Permissive License (UPL), Version 1.0
6+
*
7+
* Subject to the condition set forth below, permission is hereby granted to any
8+
* person obtaining a copy of this software, associated documentation and/or
9+
* data (collectively the "Software"), free of charge and under any and all
10+
* copyright rights in the Software, and any and all patent rights owned or
11+
* freely licensable by each licensor hereunder covering either (i) the
12+
* unmodified Software as contributed to or provided by such licensor, or (ii)
13+
* the Larger Works (as defined below), to deal in both
14+
*
15+
* (a) the Software, and
16+
*
17+
* (b) any piece of software and/or hardware listed in the lrgrwrks.txt file if
18+
* one is included with the Software each a "Larger Work" to which the Software
19+
* is contributed by such licensors),
20+
*
21+
* without restriction, including without limitation the rights to copy, create
22+
* derivative works of, display, perform, and distribute the Software and make,
23+
* use, sell, offer for sale, import, export, have made, and have sold the
24+
* Software and the Larger Work(s), and to sublicense the foregoing rights on
25+
* either these or other terms.
26+
*
27+
* This license is subject to the following condition:
28+
*
29+
* The above copyright notice and either this complete permission notice or at a
30+
* minimum a reference to the UPL must be included in all copies or substantial
31+
* portions of the Software.
32+
*
33+
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
34+
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
35+
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
36+
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
37+
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
38+
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
39+
* SOFTWARE.
40+
*/
41+
package com.oracle.graal.python.test.module;
42+
43+
import static org.junit.Assert.assertEquals;
44+
45+
import java.io.ByteArrayOutputStream;
46+
import java.io.PrintStream;
47+
import java.util.Collections;
48+
49+
import org.junit.Test;
50+
51+
import com.oracle.graal.python.test.PythonTests;
52+
53+
public class ThreadPoolTests {
54+
55+
@Test
56+
public void threadPool() {
57+
String source = "import _sysconfig\n" +
58+
"assert _sysconfig.get_config_vars().get('WITH_THREAD'), 'context was not started for threading'\n" +
59+
"from multiprocessing.pool import ThreadPool\n" +
60+
"\n" +
61+
"def fun(item):\n" +
62+
" return item != None\n" +
63+
"\n" +
64+
"items = list(range(0, 10))\n" +
65+
"\n" +
66+
"pool = ThreadPool(2)\n" +
67+
"res = list(pool.imap(fun, items))\n" +
68+
"pool.close()\n" +
69+
"\n" +
70+
"print(res)\n";
71+
final ByteArrayOutputStream byteArray = new ByteArrayOutputStream();
72+
final PrintStream printStream = new PrintStream(byteArray);
73+
PythonTests.runScript(Collections.singletonMap("python.WithThread", "true"), new String[0], source, printStream, System.err, () -> PythonTests.closeContext());
74+
String result = byteArray.toString().replaceAll("\r\n", "\n");
75+
assertEquals("[True, True, True, True, True, True, True, True, True, True]\n", result);
76+
}
77+
}

graalpython/com.oracle.graal.python/src/com/oracle/graal/python/builtins/modules/SelectModuleBuiltins.java

Lines changed: 201 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -40,20 +40,43 @@
4040
*/
4141
package com.oracle.graal.python.builtins.modules;
4242

43-
import java.io.PrintStream;
43+
import java.io.IOException;
44+
import java.nio.channels.Channel;
45+
import java.nio.channels.ClosedChannelException;
46+
import java.nio.channels.SelectableChannel;
47+
import java.nio.channels.SelectionKey;
48+
import java.nio.channels.Selector;
4449
import java.util.List;
4550

4651
import com.oracle.graal.python.builtins.Builtin;
4752
import com.oracle.graal.python.builtins.CoreFunctions;
53+
import com.oracle.graal.python.builtins.PythonBuiltinClassType;
4854
import com.oracle.graal.python.builtins.PythonBuiltins;
55+
import com.oracle.graal.python.builtins.objects.PNone;
56+
import com.oracle.graal.python.builtins.objects.exception.OSErrorEnum;
57+
import com.oracle.graal.python.builtins.objects.function.PArguments;
58+
import com.oracle.graal.python.builtins.objects.list.PList;
59+
import com.oracle.graal.python.builtins.objects.object.PythonObjectLibrary;
4960
import com.oracle.graal.python.builtins.objects.tuple.PTuple;
61+
import com.oracle.graal.python.nodes.PGuards;
62+
import com.oracle.graal.python.nodes.SpecialMethodNames;
63+
import com.oracle.graal.python.nodes.builtins.ListNodes.FastConstructListNode;
64+
import com.oracle.graal.python.nodes.call.special.LookupAndCallBinaryNode;
5065
import com.oracle.graal.python.nodes.function.PythonBuiltinBaseNode;
5166
import com.oracle.graal.python.nodes.function.PythonBuiltinNode;
52-
import com.oracle.graal.python.runtime.PythonOptions;
67+
import com.oracle.graal.python.nodes.util.CoerceToDoubleNode;
68+
import com.oracle.graal.python.nodes.util.CoerceToFileDescriptorNode;
69+
import com.oracle.graal.python.runtime.sequence.PSequence;
70+
import com.oracle.graal.python.runtime.sequence.storage.IntSequenceStorage;
5371
import com.oracle.truffle.api.CompilerDirectives.TruffleBoundary;
72+
import com.oracle.truffle.api.CompilerDirectives.ValueType;
73+
import com.oracle.truffle.api.dsl.Cached;
5474
import com.oracle.truffle.api.dsl.GenerateNodeFactory;
5575
import com.oracle.truffle.api.dsl.NodeFactory;
5676
import com.oracle.truffle.api.dsl.Specialization;
77+
import com.oracle.truffle.api.frame.VirtualFrame;
78+
import com.oracle.truffle.api.library.CachedLibrary;
79+
import com.oracle.truffle.api.nodes.ControlFlowException;
5780

5881
@CoreFunctions(defineModule = "select")
5982
public class SelectModuleBuiltins extends PythonBuiltins {
@@ -65,13 +88,184 @@ protected List<? extends NodeFactory<? extends PythonBuiltinBaseNode>> getNodeFa
6588
@Builtin(name = "select", minNumOfPositionalArgs = 3, parameterNames = {"rlist", "wlist", "xlist", "timeout"})
6689
@GenerateNodeFactory
6790
abstract static class SelectNode extends PythonBuiltinNode {
68-
@Specialization
91+
92+
@Specialization(limit = "3")
93+
PTuple doWithoutTimeout(VirtualFrame frame, Object rlist, Object wlist, Object xlist, @SuppressWarnings("unused") PNone timeout,
94+
@CachedLibrary("rlist") PythonObjectLibrary rlistLibrary,
95+
@CachedLibrary("wlist") PythonObjectLibrary wlistLibrary,
96+
@CachedLibrary("xlist") PythonObjectLibrary xlistLibrary,
97+
@Cached CoerceToDoubleNode coerceToDoubleNode,
98+
@Cached("createGetItem()") LookupAndCallBinaryNode callGetItemNode,
99+
@Cached FastConstructListNode constructListNode,
100+
@Cached CoerceToFileDescriptorNode coerceToFDNode) {
101+
return doGeneric(frame, rlist, wlist, xlist, PNone.NONE, rlistLibrary, wlistLibrary, xlistLibrary, coerceToDoubleNode, callGetItemNode, constructListNode, coerceToFDNode);
102+
}
103+
104+
@Specialization(replaces = "doWithoutTimeout", limit = "3")
105+
PTuple doGeneric(VirtualFrame frame, Object rlist, Object wlist, Object xlist, Object timeout,
106+
@CachedLibrary("rlist") PythonObjectLibrary rlistLibrary,
107+
@CachedLibrary("wlist") PythonObjectLibrary wlistLibrary,
108+
@CachedLibrary("xlist") PythonObjectLibrary xlistLibrary,
109+
@Cached CoerceToDoubleNode coerceToDoubleNode,
110+
@Cached("createGetItem()") LookupAndCallBinaryNode callGetItemNode,
111+
@Cached FastConstructListNode constructListNode,
112+
@Cached CoerceToFileDescriptorNode coerceToFDNode) {
113+
114+
ChannelFD[] readFDs;
115+
ChannelFD[] writeFDs;
116+
ChannelFD[] xFDs;
117+
try {
118+
readFDs = seq2set(frame, rlist, rlistLibrary, coerceToFDNode, callGetItemNode, constructListNode);
119+
writeFDs = seq2set(frame, wlist, wlistLibrary, coerceToFDNode, callGetItemNode, constructListNode);
120+
xFDs = seq2set(frame, xlist, xlistLibrary, coerceToFDNode, callGetItemNode, constructListNode);
121+
} catch (NonSelectableChannel e) {
122+
// If one of the channels is not selectable, we do what we did before: just return
123+
// everything.
124+
return factory().createTuple(new Object[]{rlist, wlist, xlist});
125+
}
126+
127+
// IMPORTANT: The meaning of the timeout value is slightly different:
128+
// 'timeout == 0.0' means we should not block and return immediately. However, the Java
129+
// API does not allow a non-blocking select. So we set the timeout to 1 ms.
130+
//
131+
// 'timeout == None' means we should wait indefinitely, i.e., we need to pass 0 to the
132+
// Java API.
133+
long timeoutMillis;
134+
if (!PGuards.isPNone(timeout)) {
135+
double timeoutSecs = coerceToDoubleNode.execute(frame, timeout);
136+
timeoutMillis = timeoutSecs != 0.0 ? (long) (timeoutSecs * 1000.0) : 1L;
137+
} else {
138+
timeoutMillis = 0;
139+
}
140+
141+
if (timeoutMillis < 0) {
142+
throw raise(PythonBuiltinClassType.ValueError, "timeout must be non-negative");
143+
}
144+
145+
try {
146+
doSelect(readFDs, writeFDs, xFDs, timeoutMillis);
147+
} catch (ClosedChannelException e) {
148+
// If the channel was closed (this can only happen concurrently between resolving
149+
// the FD to the channel and registration), we provided an incorrect file
150+
// descriptor. The errno code for that is EBADF.
151+
throw raiseOSError(frame, OSErrorEnum.EBADF);
152+
} catch (IOException e) {
153+
throw raiseOSError(frame, e);
154+
} catch (RuntimeException e) {
155+
throw raise(PythonBuiltinClassType.SystemError, e);
156+
}
157+
158+
return factory().createTuple(new PList[]{toList(readFDs), toList(writeFDs), toList(xFDs)});
159+
}
160+
69161
@TruffleBoundary
70-
PTuple select(Object rlist, Object wlist, Object xlist, @SuppressWarnings("unused") Object timeout) {
71-
if (getContext().getOption(PythonOptions.VerboseFlag)) {
72-
new PrintStream(getContext().getEnv().err()).println("select() will always return immediately, we only support blocking I/O for now");
162+
private static void doSelect(ChannelFD[] readFDs, ChannelFD[] writeFDs, ChannelFD[] xFDs, long timeoutMillis) throws IOException {
163+
Selector selector = Selector.open();
164+
165+
for (ChannelFD readFD : readFDs) {
166+
readFD.channel.configureBlocking(false);
167+
readFD.channel.register(selector, SelectionKey.OP_READ);
73168
}
74-
return factory().createTuple(new Object[]{rlist, wlist, xlist});
169+
170+
for (ChannelFD writeFD : writeFDs) {
171+
writeFD.channel.configureBlocking(false);
172+
writeFD.channel.register(selector, SelectionKey.OP_WRITE);
173+
}
174+
175+
for (ChannelFD xFD : xFDs) {
176+
// TODO(fa): not sure if these ops are representing
177+
// "exceptional condition pending"
178+
xFD.channel.configureBlocking(false);
179+
xFD.channel.register(selector, SelectionKey.OP_ACCEPT | SelectionKey.OP_CONNECT);
180+
}
181+
182+
int selected = selector.select(timeoutMillis);
183+
184+
// remove non-selected channels from given lists
185+
int deleted = 0;
186+
for (int i = 0; i < readFDs.length; i++) {
187+
ChannelFD readFD = readFDs[i];
188+
SelectionKey selectionKey = readFD.channel.keyFor(selector);
189+
if (!selectionKey.isReadable()) {
190+
readFDs[i] = null;
191+
deleted++;
192+
}
193+
}
194+
195+
for (int i = 0; i < writeFDs.length; i++) {
196+
ChannelFD writeFD = writeFDs[i];
197+
SelectionKey selectionKey = writeFD.channel.keyFor(selector);
198+
if (!selectionKey.isWritable()) {
199+
writeFDs[i] = null;
200+
deleted++;
201+
}
202+
}
203+
204+
for (int i = 0; i < xFDs.length; i++) {
205+
ChannelFD xFD = xFDs[i];
206+
SelectionKey selectionKey = xFD.channel.keyFor(selector);
207+
if (!(selectionKey.isAcceptable() || selectionKey.isConnectable())) {
208+
xFDs[i] = null;
209+
deleted++;
210+
}
211+
}
212+
assert selected == (readFDs.length + writeFDs.length + xFDs.length) - deleted;
213+
}
214+
215+
private ChannelFD[] seq2set(VirtualFrame frame, Object sequence, PythonObjectLibrary lib, CoerceToFileDescriptorNode coerceToFDNode, LookupAndCallBinaryNode callGetItemNode,
216+
FastConstructListNode constructListNode) {
217+
int len = lib.lengthWithState(sequence, PArguments.getThreadState(frame));
218+
ChannelFD[] result = new ChannelFD[len];
219+
220+
PSequence pSequence = constructListNode.execute(sequence);
221+
222+
for (int i = 0; i < len; i++) {
223+
int fd = coerceToFDNode.execute(frame, callGetItemNode.executeObject(frame, pSequence, i));
224+
Channel fileChannel = getContext().getResources().getFileChannel(fd);
225+
if (!(fileChannel instanceof SelectableChannel)) {
226+
throw NonSelectableChannel.INSTANCE;
227+
}
228+
result[i] = new ChannelFD(fd, (SelectableChannel) fileChannel);
229+
}
230+
return result;
231+
}
232+
233+
private PList toList(ChannelFD[] arr) {
234+
int cnt = 0;
235+
for (ChannelFD channelFD : arr) {
236+
if (channelFD != null) {
237+
cnt++;
238+
}
239+
}
240+
int[] fds = new int[cnt];
241+
for (ChannelFD channelFD : arr) {
242+
if (channelFD != null) {
243+
fds[fds.length - (cnt--)] = channelFD.fd;
244+
}
245+
}
246+
return factory().createList(new IntSequenceStorage(fds));
247+
}
248+
249+
static LookupAndCallBinaryNode createGetItem() {
250+
return LookupAndCallBinaryNode.create(SpecialMethodNames.__GETITEM__);
75251
}
252+
253+
@ValueType
254+
private static final class ChannelFD {
255+
private final int fd;
256+
private final SelectableChannel channel;
257+
258+
private ChannelFD(int fd, SelectableChannel channel) {
259+
this.fd = fd;
260+
this.channel = channel;
261+
}
262+
}
263+
264+
private static final class NonSelectableChannel extends ControlFlowException {
265+
private static final long serialVersionUID = 1L;
266+
267+
static final NonSelectableChannel INSTANCE = new NonSelectableChannel();
268+
}
269+
76270
}
77271
}

graalpython/com.oracle.graal.python/src/com/oracle/graal/python/nodes/SpecialMethodNames.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2018, 2019, Oracle and/or its affiliates. All rights reserved.
2+
* Copyright (c) 2018, 2020, Oracle and/or its affiliates. All rights reserved.
33
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
44
*
55
* The Universal Permissive License (UPL), Version 1.0
@@ -164,6 +164,7 @@ public abstract class SpecialMethodNames {
164164
public static final String DECODE = "decode";
165165
public static final String __SIZEOF__ = "__sizeof__";
166166
public static final String __CLASS_GETITEM__ = "__class_getitem__";
167+
public static final String FILENO = "fileno";
167168

168169
public static final String RICHCMP = "__truffle_richcompare__";
169170
public static final String TRUFFLE_SOURCE = "__truffle_source__";

0 commit comments

Comments
 (0)