Skip to content

Commit 728636c

Browse files
committed
Properly implement 'select.select' for 'os.pipe' pipes.
1 parent c86bd0d commit 728636c

File tree

1 file changed

+201
-7
lines changed

1 file changed

+201
-7
lines changed

graalpython/com.oracle.graal.python/src/com/oracle/graal/python/builtins/modules/SelectModuleBuiltins.java

Lines changed: 201 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -40,20 +40,43 @@
4040
*/
4141
package com.oracle.graal.python.builtins.modules;
4242

43-
import java.io.PrintStream;
43+
import java.io.IOException;
44+
import java.nio.channels.Channel;
45+
import java.nio.channels.ClosedChannelException;
46+
import java.nio.channels.SelectableChannel;
47+
import java.nio.channels.SelectionKey;
48+
import java.nio.channels.Selector;
4449
import java.util.List;
4550

4651
import com.oracle.graal.python.builtins.Builtin;
4752
import com.oracle.graal.python.builtins.CoreFunctions;
53+
import com.oracle.graal.python.builtins.PythonBuiltinClassType;
4854
import com.oracle.graal.python.builtins.PythonBuiltins;
55+
import com.oracle.graal.python.builtins.objects.PNone;
56+
import com.oracle.graal.python.builtins.objects.exception.OSErrorEnum;
57+
import com.oracle.graal.python.builtins.objects.function.PArguments;
58+
import com.oracle.graal.python.builtins.objects.list.PList;
59+
import com.oracle.graal.python.builtins.objects.object.PythonObjectLibrary;
4960
import com.oracle.graal.python.builtins.objects.tuple.PTuple;
61+
import com.oracle.graal.python.nodes.PGuards;
62+
import com.oracle.graal.python.nodes.SpecialMethodNames;
63+
import com.oracle.graal.python.nodes.builtins.ListNodes.FastConstructListNode;
64+
import com.oracle.graal.python.nodes.call.special.LookupAndCallBinaryNode;
5065
import com.oracle.graal.python.nodes.function.PythonBuiltinBaseNode;
5166
import com.oracle.graal.python.nodes.function.PythonBuiltinNode;
52-
import com.oracle.graal.python.runtime.PythonOptions;
67+
import com.oracle.graal.python.nodes.util.CoerceToDoubleNode;
68+
import com.oracle.graal.python.nodes.util.CoerceToFileDescriptorNode;
69+
import com.oracle.graal.python.runtime.sequence.PSequence;
70+
import com.oracle.graal.python.runtime.sequence.storage.IntSequenceStorage;
5371
import com.oracle.truffle.api.CompilerDirectives.TruffleBoundary;
72+
import com.oracle.truffle.api.CompilerDirectives.ValueType;
73+
import com.oracle.truffle.api.dsl.Cached;
5474
import com.oracle.truffle.api.dsl.GenerateNodeFactory;
5575
import com.oracle.truffle.api.dsl.NodeFactory;
5676
import com.oracle.truffle.api.dsl.Specialization;
77+
import com.oracle.truffle.api.frame.VirtualFrame;
78+
import com.oracle.truffle.api.library.CachedLibrary;
79+
import com.oracle.truffle.api.nodes.ControlFlowException;
5780

5881
@CoreFunctions(defineModule = "select")
5982
public class SelectModuleBuiltins extends PythonBuiltins {
@@ -65,13 +88,184 @@ protected List<? extends NodeFactory<? extends PythonBuiltinBaseNode>> getNodeFa
6588
@Builtin(name = "select", minNumOfPositionalArgs = 3, parameterNames = {"rlist", "wlist", "xlist", "timeout"})
6689
@GenerateNodeFactory
6790
abstract static class SelectNode extends PythonBuiltinNode {
68-
@Specialization
91+
92+
@Specialization(limit = "3")
93+
PTuple doWithoutTimeout(VirtualFrame frame, Object rlist, Object wlist, Object xlist, @SuppressWarnings("unused") PNone timeout,
94+
@CachedLibrary("rlist") PythonObjectLibrary rlistLibrary,
95+
@CachedLibrary("wlist") PythonObjectLibrary wlistLibrary,
96+
@CachedLibrary("xlist") PythonObjectLibrary xlistLibrary,
97+
@Cached CoerceToDoubleNode coerceToDoubleNode,
98+
@Cached("createGetItem()") LookupAndCallBinaryNode callGetItemNode,
99+
@Cached FastConstructListNode constructListNode,
100+
@Cached CoerceToFileDescriptorNode coerceToFDNode) {
101+
return doGeneric(frame, rlist, wlist, xlist, PNone.NONE, rlistLibrary, wlistLibrary, xlistLibrary, coerceToDoubleNode, callGetItemNode, constructListNode, coerceToFDNode);
102+
}
103+
104+
@Specialization(replaces = "doWithoutTimeout", limit = "3")
105+
PTuple doGeneric(VirtualFrame frame, Object rlist, Object wlist, Object xlist, Object timeout,
106+
@CachedLibrary("rlist") PythonObjectLibrary rlistLibrary,
107+
@CachedLibrary("wlist") PythonObjectLibrary wlistLibrary,
108+
@CachedLibrary("xlist") PythonObjectLibrary xlistLibrary,
109+
@Cached CoerceToDoubleNode coerceToDoubleNode,
110+
@Cached("createGetItem()") LookupAndCallBinaryNode callGetItemNode,
111+
@Cached FastConstructListNode constructListNode,
112+
@Cached CoerceToFileDescriptorNode coerceToFDNode) {
113+
114+
ChannelFD[] readFDs;
115+
ChannelFD[] writeFDs;
116+
ChannelFD[] xFDs;
117+
try {
118+
readFDs = seq2set(frame, rlist, rlistLibrary, coerceToFDNode, callGetItemNode, constructListNode);
119+
writeFDs = seq2set(frame, wlist, wlistLibrary, coerceToFDNode, callGetItemNode, constructListNode);
120+
xFDs = seq2set(frame, xlist, xlistLibrary, coerceToFDNode, callGetItemNode, constructListNode);
121+
} catch (NonSelectableChannel e) {
122+
// If one of the channels is not selectable, we do what we did before: just return
123+
// everything.
124+
return factory().createTuple(new Object[]{rlist, wlist, xlist});
125+
}
126+
127+
// IMPORTANT: The meaning of the timeout value is slightly different:
128+
// 'timeout == 0.0' means we should not block and return immediately. However, the Java
129+
// API does not allow a non-blocking select. So we set the timeout to 1 ms.
130+
//
131+
// 'timeout == None' means we should wait indefinitely, i.e., we need to pass 0 to the
132+
// Java API.
133+
long timeoutMillis;
134+
if (!PGuards.isPNone(timeout)) {
135+
double timeoutSecs = coerceToDoubleNode.execute(frame, timeout);
136+
timeoutMillis = timeoutSecs != 0.0 ? (long) (timeoutSecs * 1000.0) : 1L;
137+
} else {
138+
timeoutMillis = 0;
139+
}
140+
141+
if (timeoutMillis < 0) {
142+
throw raise(PythonBuiltinClassType.ValueError, "timeout must be non-negative");
143+
}
144+
145+
try {
146+
doSelect(readFDs, writeFDs, xFDs, timeoutMillis);
147+
} catch (ClosedChannelException e) {
148+
// If the channel was closed (this can only happen concurrently between resolving
149+
// the FD to the channel and registration), we provided an incorrect file
150+
// descriptor. The errno code for that is EBADF.
151+
throw raiseOSError(frame, OSErrorEnum.EBADF);
152+
} catch (IOException e) {
153+
throw raiseOSError(frame, e);
154+
} catch (RuntimeException e) {
155+
throw raise(PythonBuiltinClassType.SystemError, e);
156+
}
157+
158+
return factory().createTuple(new PList[]{toList(readFDs), toList(writeFDs), toList(xFDs)});
159+
}
160+
69161
@TruffleBoundary
70-
PTuple select(Object rlist, Object wlist, Object xlist, @SuppressWarnings("unused") Object timeout) {
71-
if (getContext().getOption(PythonOptions.VerboseFlag)) {
72-
new PrintStream(getContext().getEnv().err()).println("select() will always return immediately, we only support blocking I/O for now");
162+
private static void doSelect(ChannelFD[] readFDs, ChannelFD[] writeFDs, ChannelFD[] xFDs, long timeoutMillis) throws IOException {
163+
Selector selector = Selector.open();
164+
165+
for (ChannelFD readFD : readFDs) {
166+
readFD.channel.configureBlocking(false);
167+
readFD.channel.register(selector, SelectionKey.OP_READ);
73168
}
74-
return factory().createTuple(new Object[]{rlist, wlist, xlist});
169+
170+
for (ChannelFD writeFD : writeFDs) {
171+
writeFD.channel.configureBlocking(false);
172+
writeFD.channel.register(selector, SelectionKey.OP_WRITE);
173+
}
174+
175+
for (ChannelFD xFD : xFDs) {
176+
// TODO(fa): not sure if these ops are representing
177+
// "exceptional condition pending"
178+
xFD.channel.configureBlocking(false);
179+
xFD.channel.register(selector, SelectionKey.OP_ACCEPT | SelectionKey.OP_CONNECT);
180+
}
181+
182+
int selected = selector.select(timeoutMillis);
183+
184+
// remove non-selected channels from given lists
185+
int deleted = 0;
186+
for (int i = 0; i < readFDs.length; i++) {
187+
ChannelFD readFD = readFDs[i];
188+
SelectionKey selectionKey = readFD.channel.keyFor(selector);
189+
if (!selectionKey.isReadable()) {
190+
readFDs[i] = null;
191+
deleted++;
192+
}
193+
}
194+
195+
for (int i = 0; i < writeFDs.length; i++) {
196+
ChannelFD writeFD = writeFDs[i];
197+
SelectionKey selectionKey = writeFD.channel.keyFor(selector);
198+
if (!selectionKey.isWritable()) {
199+
writeFDs[i] = null;
200+
deleted++;
201+
}
202+
}
203+
204+
for (int i = 0; i < xFDs.length; i++) {
205+
ChannelFD xFD = xFDs[i];
206+
SelectionKey selectionKey = xFD.channel.keyFor(selector);
207+
if (!(selectionKey.isAcceptable() || selectionKey.isConnectable())) {
208+
xFDs[i] = null;
209+
deleted++;
210+
}
211+
}
212+
assert selected == (readFDs.length + writeFDs.length + xFDs.length) - deleted;
213+
}
214+
215+
private ChannelFD[] seq2set(VirtualFrame frame, Object sequence, PythonObjectLibrary lib, CoerceToFileDescriptorNode coerceToFDNode, LookupAndCallBinaryNode callGetItemNode,
216+
FastConstructListNode constructListNode) {
217+
int len = lib.lengthWithState(sequence, PArguments.getThreadState(frame));
218+
ChannelFD[] result = new ChannelFD[len];
219+
220+
PSequence pSequence = constructListNode.execute(sequence);
221+
222+
for (int i = 0; i < len; i++) {
223+
int fd = coerceToFDNode.execute(frame, callGetItemNode.executeObject(frame, pSequence, i));
224+
Channel fileChannel = getContext().getResources().getFileChannel(fd);
225+
if (!(fileChannel instanceof SelectableChannel)) {
226+
throw NonSelectableChannel.INSTANCE;
227+
}
228+
result[i] = new ChannelFD(fd, (SelectableChannel) fileChannel);
229+
}
230+
return result;
231+
}
232+
233+
private PList toList(ChannelFD[] arr) {
234+
int cnt = 0;
235+
for (ChannelFD channelFD : arr) {
236+
if (channelFD != null) {
237+
cnt++;
238+
}
239+
}
240+
int[] fds = new int[cnt];
241+
for (ChannelFD channelFD : arr) {
242+
if (channelFD != null) {
243+
fds[fds.length - (cnt--)] = channelFD.fd;
244+
}
245+
}
246+
return factory().createList(new IntSequenceStorage(fds));
247+
}
248+
249+
static LookupAndCallBinaryNode createGetItem() {
250+
return LookupAndCallBinaryNode.create(SpecialMethodNames.__GETITEM__);
75251
}
252+
253+
@ValueType
254+
private static final class ChannelFD {
255+
private final int fd;
256+
private final SelectableChannel channel;
257+
258+
private ChannelFD(int fd, SelectableChannel channel) {
259+
this.fd = fd;
260+
this.channel = channel;
261+
}
262+
}
263+
264+
private static final class NonSelectableChannel extends ControlFlowException {
265+
private static final long serialVersionUID = 1L;
266+
267+
static final NonSelectableChannel INSTANCE = new NonSelectableChannel();
268+
}
269+
76270
}
77271
}

0 commit comments

Comments
 (0)