Skip to content

Commit b9ee4ae

Browse files
committed
[GR-38924] Improve multiprocessing.connection.wait.
PullRequest: graalpython/2286
2 parents de882cf + 4ba48ab commit b9ee4ae

File tree

6 files changed

+232
-89
lines changed

6 files changed

+232
-89
lines changed
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
# Copyright (c) 2022, 2022, Oracle and/or its affiliates. All rights reserved.
2+
# DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
3+
#
4+
# The Universal Permissive License (UPL), Version 1.0
5+
#
6+
# Subject to the condition set forth below, permission is hereby granted to any
7+
# person obtaining a copy of this software, associated documentation and/or
8+
# data (collectively the "Software"), free of charge and under any and all
9+
# copyright rights in the Software, and any and all patent rights owned or
10+
# freely licensable by each licensor hereunder covering either (i) the
11+
# unmodified Software as contributed to or provided by such licensor, or (ii)
12+
# the Larger Works (as defined below), to deal in both
13+
#
14+
# (a) the Software, and
15+
#
16+
# (b) any piece of software and/or hardware listed in the lrgrwrks.txt file if
17+
# one is included with the Software each a "Larger Work" to which the Software
18+
# is contributed by such licensors),
19+
#
20+
# without restriction, including without limitation the rights to copy, create
21+
# derivative works of, display, perform, and distribute the Software and make,
22+
# use, sell, offer for sale, import, export, have made, and have sold the
23+
# Software and the Larger Work(s), and to sublicense the foregoing rights on
24+
# either these or other terms.
25+
#
26+
# This license is subject to the following condition:
27+
#
28+
# The above copyright notice and either this complete permission notice or at a
29+
# minimum a reference to the UPL must be included in all copies or substantial
30+
# portions of the Software.
31+
#
32+
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
33+
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
34+
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
35+
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
36+
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
37+
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
38+
# SOFTWARE.
39+
import multiprocessing
40+
import time
41+
from multiprocessing.connection import wait
42+
43+
44+
def test_wait_timeout():
45+
timeout = 3
46+
a, b = multiprocessing.Pipe()
47+
x, y = multiprocessing.connection.Pipe(False) # Truffle multiprocessing pipe
48+
for fds in [[a, b], [x, y], [a, b, x, y]]:
49+
start = time.monotonic()
50+
res = wait(fds, timeout)
51+
delta = time.monotonic() - start
52+
assert not res
53+
assert delta < timeout * 2
54+
assert delta > timeout / 2
55+
56+
57+
def test_wait():
58+
a, b = multiprocessing.Pipe()
59+
x, y = multiprocessing.connection.Pipe(False) # Truffle multiprocessing pipe
60+
a.send(42)
61+
res = wait([b, y], 3)
62+
assert res == [b], "res1"
63+
assert b.recv() == 42, "res2"
64+
y.send(33)
65+
res = wait([b, x], 3)
66+
assert res == [x], "res3"
67+
assert x.recv() == 33, "res4"
68+
a.send(1)
69+
y.send(2)
70+
res = wait([b, x], 3)
71+
assert set(res) == set([b, x])
72+
assert b.recv() == 1
73+
assert x.recv() == 2

graalpython/com.oracle.graal.python.test/src/tests/unittest_tags/test_multiprocessing_spawn.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
*graalpython.lib-python.3.test.test_multiprocessing_spawn.TestWait.test_wait_slow
4848
*graalpython.lib-python.3.test.test_multiprocessing_spawn.TestWait.test_wait_socket
4949
*graalpython.lib-python.3.test.test_multiprocessing_spawn.TestWait.test_wait_socket_slow
50+
*graalpython.lib-python.3.test.test_multiprocessing_spawn.TestWait.test_wait_timeout
5051
*graalpython.lib-python.3.test.test_multiprocessing_spawn.WithProcessesTestBarrier.test_abort
5152
*graalpython.lib-python.3.test.test_multiprocessing_spawn.WithProcessesTestBarrier.test_abort_and_reset
5253
*graalpython.lib-python.3.test.test_multiprocessing_spawn.WithProcessesTestBarrier.test_action

graalpython/com.oracle.graal.python/src/com/oracle/graal/python/builtins/modules/MultiprocessingModuleBuiltins.java

Lines changed: 109 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@
5454
import com.oracle.graal.python.builtins.objects.PNone;
5555
import com.oracle.graal.python.builtins.objects.buffer.PythonBufferAccessLibrary;
5656
import com.oracle.graal.python.builtins.objects.bytes.PBytes;
57+
import com.oracle.graal.python.builtins.objects.common.SequenceNodes;
5758
import com.oracle.graal.python.builtins.objects.common.SequenceStorageNodes;
5859
import com.oracle.graal.python.builtins.objects.exception.OSErrorEnum;
5960
import com.oracle.graal.python.builtins.objects.ints.PInt;
@@ -71,10 +72,13 @@
7172
import com.oracle.graal.python.nodes.function.builtins.PythonBinaryBuiltinNode;
7273
import com.oracle.graal.python.nodes.function.builtins.PythonUnaryBuiltinNode;
7374
import com.oracle.graal.python.nodes.util.CannotCastException;
75+
import com.oracle.graal.python.nodes.util.CastToJavaDoubleNode;
7476
import com.oracle.graal.python.nodes.util.CastToJavaIntExactNode;
7577
import com.oracle.graal.python.nodes.util.CastToJavaIntLossyNode;
7678
import com.oracle.graal.python.nodes.util.CastToTruffleStringNode;
7779
import com.oracle.graal.python.runtime.GilNode;
80+
import com.oracle.graal.python.runtime.PosixSupportLibrary;
81+
import com.oracle.graal.python.runtime.PosixSupportLibrary.Timeval;
7882
import com.oracle.graal.python.runtime.PythonContext;
7983
import com.oracle.graal.python.runtime.PythonContext.SharedMultiprocessingData;
8084
import com.oracle.graal.python.runtime.sequence.PSequence;
@@ -356,32 +360,81 @@ PNone close(@SuppressWarnings("unused") long fd) {
356360
}
357361
}
358362

359-
@Builtin(name = "_select", minNumOfPositionalArgs = 1, parameterNames = {"rlist"})
363+
@Builtin(name = "_select", minNumOfPositionalArgs = 4)
360364
@GenerateNodeFactory
361365
abstract static class SelectNode extends PythonBuiltinNode {
366+
/*
367+
* We would like to poll two different things with a timeout: the actual file descriptors
368+
* and the Java managed LinkedBlockingQueues.
369+
*
370+
* The LinkedBlockingQueue does not expose anything that would allow us to wait on multiple
371+
* LinkedBlockingQueues at once, so we'd have to spawn a thread for each or roll out our own
372+
* synchronization of take/offer to allow that.
373+
*
374+
* The actual file descriptors could be backed by Java POSIX emulation layer, or by the
375+
* native POSIX implementation -- the `select` can run actual native select, which we cannot
376+
* easily interrupt from Java if one of the LinkedBlockingQueue is unblocked earlier than
377+
* the native select returns.
378+
*
379+
* Given all these complexities, for the time being, we do active waiting here, but at least
380+
* without holding the GIL, and we also yield in every iteration.
381+
*/
382+
362383
@Specialization
363-
Object doGeneric(VirtualFrame frame, Object rlist,
384+
Object doGeneric(VirtualFrame frame, Object multiprocessingFdsList, Object multiprocessingObjsList, Object posixFileObjsList, Object timeoutObj,
385+
@Cached PosixModuleBuiltins.FileDescriptorConversionNode fdConvertor,
364386
@Cached PyObjectSizeNode sizeNode,
365387
@Cached PyObjectGetItem getItem,
388+
@Cached SequenceNodes.GetObjectArrayNode getObjectArrayNode,
366389
@Cached ListNodes.FastConstructListNode constructListNode,
367390
@Cached CastToJavaIntLossyNode castToJava,
391+
@Cached CastToJavaDoubleNode castToDouble,
368392
@Cached GilNode gil) {
369-
ArrayBuilder<Integer> notEmpty = new ArrayBuilder<>();
370-
SharedMultiprocessingData sharedData = getContext().getSharedMultiprocessingData();
371-
PSequence pSequence = constructListNode.execute(frame, rlist);
372-
for (int i = 0; i < sizeNode.execute(frame, pSequence); i++) {
393+
PythonContext context = getContext();
394+
SharedMultiprocessingData sharedData = context.getSharedMultiprocessingData();
395+
396+
PSequence pSequence = constructListNode.execute(frame, multiprocessingFdsList);
397+
int size = sizeNode.execute(frame, pSequence);
398+
int[] multiprocessingFds = new int[size];
399+
for (int i = 0; i < size; i++) {
373400
Object pythonObject = getItem.execute(frame, pSequence, i);
374-
int fd = toInt(castToJava, pythonObject);
375-
gil.release(true);
376-
try {
377-
if (!sharedData.isBlocking(fd)) {
378-
notEmpty.add(fd);
401+
multiprocessingFds[i] = toInt(castToJava, pythonObject);
402+
}
403+
404+
Object[] posixFileObjs = getObjectArrayNode.execute(posixFileObjsList);
405+
int[] posixFds = new int[posixFileObjs.length];
406+
for (int i = 0; i < posixFileObjs.length; i++) {
407+
posixFds[i] = toInt(castToJava, fdConvertor.execute(frame, posixFileObjs[i]));
408+
}
409+
410+
double timeout = castToDouble.execute(timeoutObj);
411+
412+
Object[] multiprocessingObjs = getObjectArrayNode.execute(multiprocessingObjsList);
413+
gil.release(true);
414+
try {
415+
boolean[] selectedMultiprocessingFds = new boolean[multiprocessingFds.length];
416+
boolean[] selectedPosixFds = new boolean[posixFds.length];
417+
418+
doSelect(context.getPosixSupport(), sharedData, posixFds, selectedPosixFds, multiprocessingFds, selectedMultiprocessingFds, timeout);
419+
420+
ArrayBuilder<Object> result = new ArrayBuilder<>(4);
421+
for (int i = 0; i < selectedMultiprocessingFds.length; i++) {
422+
if (selectedMultiprocessingFds[i]) {
423+
result.add(multiprocessingObjs[i]);
424+
}
425+
}
426+
for (int i = 0; i < selectedPosixFds.length; i++) {
427+
if (selectedPosixFds[i]) {
428+
result.add(posixFileObjs[i]);
379429
}
380-
} finally {
381-
gil.acquire();
382430
}
431+
432+
return factory().createList(result.toArray(new Object[0]));
433+
} catch (PosixSupportLibrary.PosixException e) {
434+
throw raiseOSErrorFromPosixException(frame, e);
435+
} finally {
436+
gil.acquire();
383437
}
384-
return factory().createList(notEmpty.toObjectArray(new Object[0]));
385438
}
386439

387440
private static int toInt(CastToJavaIntLossyNode castToJava, Object pythonObject) {
@@ -391,6 +444,48 @@ private static int toInt(CastToJavaIntLossyNode castToJava, Object pythonObject)
391444
throw CompilerDirectives.shouldNotReachHere();
392445
}
393446
}
447+
448+
@TruffleBoundary
449+
private static void doSelect(Object posix, SharedMultiprocessingData sharedData,
450+
int[] posixFds, boolean[] selectedPosixFds,
451+
int[] multiprocessingFds, boolean[] selectedMultiprocessingFds,
452+
double timeoutInS) throws PosixSupportLibrary.PosixException {
453+
PosixSupportLibrary posixLib = PosixSupportLibrary.getUncached();
454+
boolean blocking = timeoutInS >= 0;
455+
boolean untilReady = timeoutInS == 0;
456+
long deadline = 0;
457+
if (blocking && !untilReady) {
458+
long timeout = (long) (timeoutInS * 1000_000_000.0);
459+
deadline = System.nanoTime() + timeout;
460+
}
461+
while (true) {
462+
boolean selected = false;
463+
if (posixFds.length > 0) {
464+
PosixSupportLibrary.SelectResult selectResult = posixLib.select(posix, posixFds,
465+
PythonUtils.EMPTY_INT_ARRAY, PythonUtils.EMPTY_INT_ARRAY, Timeval.SELECT_TIMEOUT_NOW);
466+
System.arraycopy(selectResult.getReadFds(), 0, selectedPosixFds, 0, selectedPosixFds.length);
467+
if (blocking) {
468+
for (boolean b : selectedPosixFds) {
469+
selected |= b;
470+
}
471+
}
472+
}
473+
for (int i = 0; i < multiprocessingFds.length; i++) {
474+
int fd = multiprocessingFds[i];
475+
selectedMultiprocessingFds[i] = !sharedData.isBlocking(fd);
476+
if (selectedMultiprocessingFds[i]) {
477+
selected = true;
478+
}
479+
}
480+
if (!blocking || selected) {
481+
return;
482+
}
483+
if (deadline != 0 && deadline - System.nanoTime() < 0) {
484+
return;
485+
}
486+
Thread.yield();
487+
}
488+
}
394489
}
395490

396491
}

graalpython/com.oracle.graal.python/src/com/oracle/graal/python/builtins/modules/SelectModuleBuiltins.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,11 @@
8686
@CoreFunctions(defineModule = "select")
8787
public class SelectModuleBuiltins extends PythonBuiltins {
8888

89+
/*
90+
* ATTENTION: if we ever add "poll" support, update the code in
91+
* MultiprocessingModuleBuilins#SelectNode to use it if available
92+
*/
93+
8994
public SelectModuleBuiltins() {
9095
addBuiltinConstant("error", PythonErrorType.OSError);
9196
}

graalpython/com.oracle.graal.python/src/com/oracle/graal/python/runtime/PosixSupportLibrary.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -983,6 +983,8 @@ public ByteBuffer getByteBuffer() {
983983
*/
984984
@ValueType
985985
public static final class Timeval {
986+
public static final Timeval SELECT_TIMEOUT_NOW = new Timeval(0, 0);
987+
986988
private final long seconds;
987989
private final long microseconds;
988990

0 commit comments

Comments
 (0)