Skip to content

Commit bccf6c7

Browse files
committed
Move all the multiprocessing related state from PythonLanguage to SharedMultiprocessingData
1 parent d5ff071 commit bccf6c7

File tree

4 files changed

+82
-107
lines changed

4 files changed

+82
-107
lines changed

graalpython/com.oracle.graal.python/src/com/oracle/graal/python/PythonLanguage.java

Lines changed: 0 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,7 @@
2727

2828
import java.io.IOException;
2929
import java.util.Arrays;
30-
import java.util.Map;
3130
import java.util.concurrent.ConcurrentHashMap;
32-
import java.util.concurrent.Semaphore;
3331
import java.util.logging.Level;
3432

3533
import org.graalvm.options.OptionDescriptors;
@@ -59,7 +57,6 @@
5957
import com.oracle.graal.python.parser.PythonParserImpl;
6058
import com.oracle.graal.python.runtime.GilNode;
6159
import com.oracle.graal.python.runtime.PythonContext;
62-
import com.oracle.graal.python.runtime.PythonContext.ChildContextData;
6360
import com.oracle.graal.python.runtime.PythonContext.PythonThreadState;
6461
import com.oracle.graal.python.runtime.PythonOptions;
6562
import com.oracle.graal.python.runtime.PythonParser.ParserMode;
@@ -196,14 +193,6 @@ public final class PythonLanguage extends TruffleLanguage<PythonContext> {
196193

197194
@CompilationFinal(dimensions = 1) private static final Object[] CONTEXT_INSENSITIVE_SINGLETONS = new Object[]{PNone.NONE, PNone.NO_VALUE, PEllipsis.INSTANCE, PNotImplemented.NOT_IMPLEMENTED};
198195

199-
/**
200-
* Named semaphores are shared between all processes in a system, and they persist until the
201-
* system is shut down, unless explicitly removed. We interpret this as meaning they all exist
202-
* globally per language instance, that is, they are shared between different Contexts in the
203-
* same engine.
204-
*/
205-
public final ConcurrentHashMap<String, Semaphore> namedSemaphores = new ConcurrentHashMap<>();
206-
207196
@CompilationFinal(dimensions = 1) private volatile Object[] engineOptionsStorage;
208197
@CompilationFinal private volatile OptionValues engineOptions;
209198

@@ -221,35 +210,6 @@ public final class PythonLanguage extends TruffleLanguage<PythonContext> {
221210

222211
private final MroShape mroShapeRoot = MroShape.createRoot();
223212

224-
private final Map<Long, Thread> childContextThreads = new ConcurrentHashMap<>();
225-
226-
private final Map<Long, ChildContextData> childContextData = new ConcurrentHashMap<>();
227-
228-
@TruffleBoundary
229-
public Thread getChildContextThread(long tid) {
230-
return childContextThreads.get(tid);
231-
}
232-
233-
@TruffleBoundary
234-
public void putChildContextThread(long id, Thread thread) {
235-
childContextThreads.put(id, thread);
236-
}
237-
238-
@TruffleBoundary
239-
public void removeChildContextThread(long id) {
240-
childContextThreads.remove(id);
241-
}
242-
243-
@TruffleBoundary
244-
public ChildContextData getChildContextData(long tid) {
245-
return childContextData.get(tid);
246-
}
247-
248-
@TruffleBoundary
249-
public void putChildContextData(long id, ChildContextData data) {
250-
childContextData.put(id, data);
251-
}
252-
253213
public static PythonLanguage get(Node node) {
254214
return REFERENCE.get(node);
255215
}

graalpython/com.oracle.graal.python/src/com/oracle/graal/python/builtins/modules/MultiprocessingModuleBuiltins.java

Lines changed: 10 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -141,26 +141,16 @@ PSemLock construct(Object cls, Object kindObj, Object valueObj, Object maxvalueO
141141
// have to explicitly link it, so we do that here if we
142142
// must. CPython always uses O_CREAT | O_EXCL for creating named
143143
// semaphores, so a conflict raises.
144-
PythonLanguage lang = getLanguage();
145-
if (semaphoreExists(lang, name)) {
144+
SharedMultiprocessingData multiprocessing = getContext().getSharedMultiprocessingData();
145+
if (multiprocessing.getNamedSemaphore(name) != null) {
146146
throw raise(PythonBuiltinClassType.FileExistsError, ErrorMessages.SEMAPHORE_NAME_TAKEN, name);
147147
} else {
148-
semaphorePut(lang, semaphore, name);
148+
multiprocessing.putNamedSemaphore(name, semaphore);
149149
}
150150
}
151151
return factory().createSemLock(cls, name, kind, semaphore);
152152
}
153153

154-
@TruffleBoundary
155-
private static Object semaphorePut(PythonLanguage lang, Semaphore semaphore, String name) {
156-
return lang.namedSemaphores.put(name, semaphore);
157-
}
158-
159-
@TruffleBoundary
160-
private static boolean semaphoreExists(PythonLanguage lang, String name) {
161-
return lang.namedSemaphores.containsKey(name);
162-
}
163-
164154
@TruffleBoundary
165155
private static Semaphore newSemaphore(int value) {
166156
return new Semaphore(value);
@@ -172,17 +162,12 @@ private static Semaphore newSemaphore(int value) {
172162
abstract static class SemUnlink extends PythonUnaryBuiltinNode {
173163
@Specialization
174164
PNone doit(String name) {
175-
Semaphore prev = semaphoreRemove(name, getLanguage());
165+
Semaphore prev = getContext().getSharedMultiprocessingData().removeNamedSemaphore(name);
176166
if (prev == null) {
177167
throw raise(PythonBuiltinClassType.FileNotFoundError, ErrorMessages.NO_SUCH_FILE_OR_DIR, "semaphores", name);
178168
}
179169
return PNone.NONE;
180170
}
181-
182-
@TruffleBoundary
183-
private static Semaphore semaphoreRemove(String name, PythonLanguage lang) {
184-
return lang.namedSemaphores.remove(name);
185-
}
186171
}
187172

188173
@Builtin(name = "_spawn_context", minNumOfPositionalArgs = 3, parameterNames = {"fd", "sentinel", "keepFds"})
@@ -220,15 +205,15 @@ long getTid() {
220205
abstract static class WaitTidNode extends PythonBinaryBuiltinNode {
221206
@Specialization
222207
PTuple waittid(long id, @SuppressWarnings("unused") int options) {
223-
PythonLanguage lang = getLanguage();
224208
long tid = convertTid(id);
225209
// TODO implement for options - WNOHANG and 0
226-
Thread thread = lang.getChildContextThread(tid);
210+
final SharedMultiprocessingData multiprocessing = getContext().getSharedMultiprocessingData();
211+
Thread thread = multiprocessing.getChildContextThread(tid);
227212
if (thread != null && thread.isAlive()) {
228213
return factory().createTuple(new Object[]{0, 0, 0});
229214
}
230215

231-
PythonContext.ChildContextData data = lang.getChildContextData(tid);
216+
PythonContext.ChildContextData data = multiprocessing.getChildContextData(tid);
232217
return factory().createTuple(new Object[]{id, data.wasSignaled() ? data.getExitCode() : 0, data.getExitCode()});
233218
}
234219
}
@@ -239,10 +224,10 @@ abstract static class TerminateThreadNode extends PythonBinaryBuiltinNode {
239224
@Specialization
240225
@TruffleBoundary
241226
Object terminate(long id, PInt sig) {
242-
PythonLanguage language = getLanguage();
243-
Thread thread = language.getChildContextThread(convertTid(id));
227+
final SharedMultiprocessingData multiprocessing = getContext().getSharedMultiprocessingData();
228+
Thread thread = multiprocessing.getChildContextThread(convertTid(id));
244229
if (thread != null && thread.isAlive()) {
245-
PythonContext.ChildContextData data = language.getChildContextData(convertTid(id));
230+
PythonContext.ChildContextData data = multiprocessing.getChildContextData(convertTid(id));
246231
try {
247232
data.awaitRunning();
248233
TruffleContext truffleCtx = data.getTruffleContext();

graalpython/com.oracle.graal.python/src/com/oracle/graal/python/builtins/objects/thread/SemLockBuiltins.java

Lines changed: 8 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -40,15 +40,16 @@
4040
*/
4141
package com.oracle.graal.python.builtins.objects.thread;
4242

43-
import com.oracle.graal.python.PythonLanguage;
44-
import com.oracle.graal.python.annotations.ArgumentClinic;
4543
import static com.oracle.graal.python.nodes.SpecialMethodNames.__ENTER__;
4644
import static com.oracle.graal.python.nodes.SpecialMethodNames.__EXIT__;
4745

4846
import java.util.List;
47+
import java.util.concurrent.Semaphore;
4948

49+
import com.oracle.graal.python.annotations.ArgumentClinic;
5050
import com.oracle.graal.python.builtins.Builtin;
5151
import com.oracle.graal.python.builtins.CoreFunctions;
52+
import com.oracle.graal.python.builtins.Python3Core;
5253
import com.oracle.graal.python.builtins.PythonBuiltinClassType;
5354
import com.oracle.graal.python.builtins.PythonBuiltins;
5455
import com.oracle.graal.python.builtins.objects.PNone;
@@ -58,19 +59,18 @@
5859
import com.oracle.graal.python.nodes.function.PythonBuiltinNode;
5960
import com.oracle.graal.python.nodes.function.builtins.PythonQuaternaryBuiltinNode;
6061
import com.oracle.graal.python.nodes.function.builtins.PythonTernaryBuiltinNode;
61-
import com.oracle.graal.python.nodes.function.builtins.PythonUnaryBuiltinNode;
62-
import com.oracle.graal.python.builtins.Python3Core;
6362
import com.oracle.graal.python.nodes.function.builtins.PythonTernaryClinicBuiltinNode;
63+
import com.oracle.graal.python.nodes.function.builtins.PythonUnaryBuiltinNode;
6464
import com.oracle.graal.python.nodes.function.builtins.clinic.ArgumentClinicProvider;
6565
import com.oracle.graal.python.nodes.util.CannotCastException;
6666
import com.oracle.graal.python.nodes.util.CastToJavaStringNode;
67+
import com.oracle.graal.python.runtime.PythonContext.SharedMultiprocessingData;
6768
import com.oracle.truffle.api.CompilerDirectives.TruffleBoundary;
6869
import com.oracle.truffle.api.dsl.Cached;
6970
import com.oracle.truffle.api.dsl.GenerateNodeFactory;
7071
import com.oracle.truffle.api.dsl.NodeFactory;
7172
import com.oracle.truffle.api.dsl.Specialization;
7273
import com.oracle.truffle.api.frame.VirtualFrame;
73-
import java.util.concurrent.Semaphore;
7474

7575
@CoreFunctions(extendClasses = {PythonBuiltinClassType.PSemLock})
7676
public class SemLockBuiltins extends PythonBuiltins {
@@ -214,28 +214,16 @@ Object doEnter(@SuppressWarnings("unused") Object handle, int kind, @SuppressWar
214214
throw raise(PythonBuiltinClassType.TypeError, ErrorMessages.ARG_D_MUST_BE_S_NOT_P, "_rebuild", 4, "str", nameObj);
215215
}
216216

217-
Semaphore semaphore;
218-
PythonLanguage lang = getLanguage();
219-
if (semaphoreExists(lang, name)) {
220-
semaphore = semaphoreGet(lang, name);
221-
} else {
217+
SharedMultiprocessingData multiprocessing = getContext().getSharedMultiprocessingData();
218+
Semaphore semaphore = multiprocessing.getNamedSemaphore(name);
219+
if (semaphore == null) {
222220
// TODO can this even happen? cpython simply creates a semlock object with the
223221
// provided handle
224222
semaphore = newSemaphore(0);
225223
}
226224
return factory().createSemLock(PythonBuiltinClassType.PSemLock, name, kind, semaphore);
227225
}
228226

229-
@TruffleBoundary
230-
private static Semaphore semaphoreGet(PythonLanguage lang, String name) {
231-
return lang.namedSemaphores.get(name);
232-
}
233-
234-
@TruffleBoundary
235-
private static boolean semaphoreExists(PythonLanguage lang, String name) {
236-
return lang.namedSemaphores.containsKey(name);
237-
}
238-
239227
@TruffleBoundary
240228
private static Semaphore newSemaphore(int value) {
241229
return new Semaphore(value);

graalpython/com.oracle.graal.python/src/com/oracle/graal/python/runtime/PythonContext.java

Lines changed: 64 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,14 @@
4646
import java.util.LinkedList;
4747
import java.util.List;
4848
import java.util.Map;
49+
import java.util.SortedMap;
50+
import java.util.TreeMap;
4951
import java.util.WeakHashMap;
5052
import java.util.concurrent.ConcurrentHashMap;
53+
import java.util.concurrent.CountDownLatch;
54+
import java.util.concurrent.LinkedBlockingQueue;
55+
import java.util.concurrent.Semaphore;
56+
import java.util.concurrent.atomic.AtomicBoolean;
5157
import java.util.concurrent.atomic.AtomicLong;
5258
import java.util.concurrent.locks.ReentrantLock;
5359
import java.util.logging.Level;
@@ -128,11 +134,6 @@
128134
import com.oracle.truffle.api.source.Source;
129135
import com.oracle.truffle.api.utilities.CyclicAssumption;
130136
import com.oracle.truffle.llvm.api.Toolchain;
131-
import java.util.SortedMap;
132-
import java.util.TreeMap;
133-
import java.util.concurrent.CountDownLatch;
134-
import java.util.concurrent.LinkedBlockingQueue;
135-
import java.util.concurrent.atomic.AtomicBoolean;
136137

137138
public final class PythonContext {
138139
private static final Source IMPORT_WARNINGS_SOURCE = Source.newBuilder(PythonLanguage.ID, "import warnings\n", "<internal>").internal(true).build();
@@ -603,22 +604,21 @@ public int[] pipe() {
603604
}
604605

605606
@TruffleBoundary
606-
public boolean addPipeData(int fd, byte[] bytes, Runnable noFDHandler, Runnable brokenPipeHandler) {
607+
public void addPipeData(int fd, byte[] bytes, Runnable noFDHandler, Runnable brokenPipeHandler) {
607608
LinkedBlockingQueue<Object> q = null;
608609
synchronized (pipeData) {
609610
q = pipeData.get(fd);
610611
if (q == null) {
611612
noFDHandler.run();
612613
throw CompilerDirectives.shouldNotReachHere();
613614
}
614-
Integer fd2 = getPairFd(fd);
615+
int fd2 = getPairFd(fd);
615616
if (isClosed(fd2)) {
616617
brokenPipeHandler.run();
617618
throw CompilerDirectives.shouldNotReachHere();
618619
}
619620
}
620621
q.add(bytes);
621-
return true;
622622
}
623623

624624
@TruffleBoundary
@@ -637,7 +637,7 @@ public Object takePipeData(Node node, int fd, Runnable noFDHandler) {
637637
noFDHandler.run();
638638
throw CompilerDirectives.shouldNotReachHere();
639639
}
640-
Integer fd2 = getPairFd(fd);
640+
int fd2 = getPairFd(fd);
641641
if (isClosed(fd2)) {
642642
if (q.isEmpty()) {
643643
return PythonUtils.EMPTY_BYTE_ARRAY;
@@ -659,30 +659,72 @@ public boolean isBlocking(int fd) {
659659
if (q == null) {
660660
return false;
661661
}
662-
Integer fd2 = getPairFd(fd);
662+
int fd2 = getPairFd(fd);
663663
if (isClosed(fd2)) {
664664
return false;
665665
}
666666
}
667667
return q.isEmpty();
668668
}
669669

670-
@TruffleBoundary
671-
public static void closeFDs(List<Integer> fds) {
672-
synchronized (fds) {
673-
for (Integer fd : fds) {
674-
fds.remove(fd);
675-
}
676-
}
677-
}
678-
679670
private static int getPairFd(int fd) {
680671
return fd % 2 == 0 ? fd + 1 : fd - 1;
681672
}
682673

683674
private boolean isClosed(int fd) {
684675
return pipeData.get(fd) == null && fd >= fdCounter;
685676
}
677+
678+
/**
679+
* Named semaphores are shared between all processes in a system, and they persist until the
680+
* system is shut down, unless explicitly removed. We interpret this as meaning they all
681+
* exist globally per the main context and all its children.
682+
*/
683+
private final ConcurrentHashMap<String, Semaphore> namedSemaphores = new ConcurrentHashMap<>();
684+
685+
@TruffleBoundary
686+
public void putNamedSemaphore(String name, Semaphore sem) {
687+
namedSemaphores.put(name, sem);
688+
}
689+
690+
@TruffleBoundary
691+
public Semaphore getNamedSemaphore(String name) {
692+
return namedSemaphores.get(name);
693+
}
694+
695+
@TruffleBoundary
696+
public Semaphore removeNamedSemaphore(String name) {
697+
return namedSemaphores.remove(name);
698+
}
699+
700+
private final Map<Long, Thread> childContextThreads = new ConcurrentHashMap<>();
701+
702+
private final Map<Long, ChildContextData> childContextData = new ConcurrentHashMap<>();
703+
704+
@TruffleBoundary
705+
public Thread getChildContextThread(long tid) {
706+
return childContextThreads.get(tid);
707+
}
708+
709+
@TruffleBoundary
710+
public void putChildContextThread(long id, Thread thread) {
711+
childContextThreads.put(id, thread);
712+
}
713+
714+
@TruffleBoundary
715+
public void removeChildContextThread(long id) {
716+
childContextThreads.remove(id);
717+
}
718+
719+
@TruffleBoundary
720+
public ChildContextData getChildContextData(long tid) {
721+
return childContextData.get(tid);
722+
}
723+
724+
@TruffleBoundary
725+
public void putChildContextData(long id, ChildContextData data) {
726+
childContextData.put(id, data);
727+
}
686728
}
687729

688730
public PythonContext(PythonLanguage language, TruffleLanguage.Env env, Python3Core core) {
@@ -737,8 +779,8 @@ public long spawnTruffleContext(int fd, int sentinel, int[] fdsToKeep) {
737779

738780
// TODO always force java posix in spawned
739781
long tid = thread.getId();
740-
language.putChildContextThread(tid, thread);
741-
language.putChildContextData(tid, data);
782+
getSharedMultiprocessingData().putChildContextThread(tid, thread);
783+
getSharedMultiprocessingData().putChildContextData(tid, data);
742784
for (int fdToKeep : fdsToKeep) {
743785
// prevent file descriptors from being closed when passed to another "process",
744786
// equivalent to fds_to_keep arg in posix fork_exec
@@ -1861,7 +1903,7 @@ public synchronized void disposeThread(Thread thread) {
18611903
threadStateMapping.remove(thread);
18621904
ts.dispose();
18631905
releaseSentinelLock(ts.sentinelLock);
1864-
language.removeChildContextThread(thread.getId());
1906+
getSharedMultiprocessingData().removeChildContextThread(thread.getId());
18651907
}
18661908

18671909
private static void releaseSentinelLock(WeakReference<PLock> sentinelLockWeakref) {

0 commit comments

Comments
 (0)