Skip to content

Commit 7d7a361

Browse files
committed
enable multithreading for PythonLanguage
- add support for starting threads using the builtin _thread module
1 parent 8a9918a commit 7d7a361

File tree

9 files changed

+435
-7
lines changed

9 files changed

+435
-7
lines changed
Lines changed: 307 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,307 @@
1+
# Copyright (c) 2018, Oracle and/or its affiliates. All rights reserved.
2+
# DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
3+
#
4+
# The Universal Permissive License (UPL), Version 1.0
5+
#
6+
# Subject to the condition set forth below, permission is hereby granted to any
7+
# person obtaining a copy of this software, associated documentation and/or
8+
# data (collectively the "Software"), free of charge and under any and all
9+
# copyright rights in the Software, and any and all patent rights owned or
10+
# freely licensable by each licensor hereunder covering either (i) the
11+
# unmodified Software as contributed to or provided by such licensor, or (ii)
12+
# the Larger Works (as defined below), to deal in both
13+
#
14+
# (a) the Software, and
15+
#
16+
# (b) any piece of software and/or hardware listed in the lrgrwrks.txt file if
17+
# one is included with the Software each a "Larger Work" to which the Software
18+
# is contributed by such licensors),
19+
#
20+
# without restriction, including without limitation the rights to copy, create
21+
# derivative works of, display, perform, and distribute the Software and make,
22+
# use, sell, offer for sale, import, export, have made, and have sold the
23+
# Software and the Larger Work(s), and to sublicense the foregoing rights on
24+
# either these or other terms.
25+
#
26+
# This license is subject to the following condition:
27+
#
28+
# The above copyright notice and either this complete permission notice or at a
29+
# minimum a reference to the UPL must be included in all copies or substantial
30+
# portions of the Software.
31+
#
32+
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
33+
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
34+
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
35+
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
36+
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
37+
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
38+
# SOFTWARE.
39+
import os
40+
import random
41+
import unittest
42+
from test import support
43+
44+
thread = support.import_module('_thread')
45+
import time
46+
import weakref
47+
48+
from test import lock_tests
49+
50+
NUMTASKS = 10
51+
NUMTRIPS = 3
52+
POLL_SLEEP = 0.010 # seconds = 10 ms
53+
54+
_print_mutex = thread.allocate_lock()
55+
56+
def verbose_print(arg):
57+
"""Helper function for printing out debugging output."""
58+
if support.verbose:
59+
with _print_mutex:
60+
print(arg)
61+
62+
63+
class BasicThreadTest(unittest.TestCase):
64+
65+
def setUp(self):
66+
self.done_mutex = thread.allocate_lock()
67+
self.done_mutex.acquire()
68+
self.running_mutex = thread.allocate_lock()
69+
self.random_mutex = thread.allocate_lock()
70+
self.created = 0
71+
self.running = 0
72+
self.next_ident = 0
73+
74+
key = support.threading_setup()
75+
self.addCleanup(support.threading_cleanup, *key)
76+
77+
78+
class ThreadRunningTests(BasicThreadTest):
79+
80+
def newtask(self):
81+
with self.running_mutex:
82+
self.next_ident += 1
83+
verbose_print("creating task %s" % self.next_ident)
84+
thread.start_new_thread(self.task, (self.next_ident,))
85+
self.created += 1
86+
self.running += 1
87+
88+
def task(self, ident):
89+
with self.random_mutex:
90+
delay = random.random() / 10000.0
91+
verbose_print("task %s will run for %sus" % (ident, round(delay*1e6)))
92+
time.sleep(delay)
93+
verbose_print("task %s done" % ident)
94+
with self.running_mutex:
95+
self.running -= 1
96+
if self.created == NUMTASKS and self.running == 0:
97+
self.done_mutex.release()
98+
99+
def test_starting_threads(self):
100+
# Basic test for thread creation.
101+
for i in range(NUMTASKS):
102+
self.newtask()
103+
verbose_print("waiting for tasks to complete...")
104+
self.done_mutex.acquire()
105+
verbose_print("all tasks done")
106+
107+
def test_stack_size(self):
108+
# Various stack size tests.
109+
self.assertEqual(thread.stack_size(), 0, "initial stack size is not 0")
110+
111+
thread.stack_size(0)
112+
self.assertEqual(thread.stack_size(), 0, "stack_size not reset to default")
113+
114+
@unittest.skipIf(os.name not in ("nt", "posix"), 'test meant for nt and posix')
115+
def test_nt_and_posix_stack_size(self):
116+
try:
117+
thread.stack_size(4096)
118+
except ValueError:
119+
verbose_print("caught expected ValueError setting "
120+
"stack_size(4096)")
121+
except thread.error:
122+
self.skipTest("platform does not support changing thread stack "
123+
"size")
124+
125+
fail_msg = "stack_size(%d) failed - should succeed"
126+
for tss in (262144, 0x100000, 0):
127+
thread.stack_size(tss)
128+
self.assertEqual(thread.stack_size(), tss, fail_msg % tss)
129+
verbose_print("successfully set stack_size(%d)" % tss)
130+
131+
for tss in (262144, 0x100000):
132+
verbose_print("trying stack_size = (%d)" % tss)
133+
self.next_ident = 0
134+
self.created = 0
135+
for i in range(NUMTASKS):
136+
self.newtask()
137+
138+
verbose_print("waiting for all tasks to complete")
139+
self.done_mutex.acquire()
140+
verbose_print("all tasks done")
141+
142+
thread.stack_size(0)
143+
144+
def test__count(self):
145+
# Test the _count() function.
146+
orig = thread._count()
147+
mut = thread.allocate_lock()
148+
mut.acquire()
149+
started = []
150+
def task():
151+
started.append(None)
152+
mut.acquire()
153+
mut.release()
154+
thread.start_new_thread(task, ())
155+
while not started:
156+
time.sleep(POLL_SLEEP)
157+
self.assertEqual(thread._count(), orig + 1)
158+
# Allow the task to finish.
159+
mut.release()
160+
# The only reliable way to be sure that the thread ended from the
161+
# interpreter's point of view is to wait for the function object to be
162+
# destroyed.
163+
done = []
164+
wr = weakref.ref(task, lambda _: done.append(None))
165+
del task
166+
while not done:
167+
time.sleep(POLL_SLEEP)
168+
self.assertEqual(thread._count(), orig)
169+
170+
def test_save_exception_state_on_error(self):
171+
# See issue #14474
172+
def task():
173+
started.release()
174+
raise SyntaxError
175+
def mywrite(self, *args):
176+
try:
177+
raise ValueError
178+
except ValueError:
179+
pass
180+
real_write(self, *args)
181+
c = thread._count()
182+
started = thread.allocate_lock()
183+
with support.captured_output("stderr") as stderr:
184+
real_write = stderr.write
185+
stderr.write = mywrite
186+
started.acquire()
187+
thread.start_new_thread(task, ())
188+
started.acquire()
189+
while thread._count() > c:
190+
time.sleep(POLL_SLEEP)
191+
self.assertIn("Traceback", stderr.getvalue())
192+
193+
194+
class Barrier:
195+
def __init__(self, num_threads):
196+
self.num_threads = num_threads
197+
self.waiting = 0
198+
self.checkin_mutex = thread.allocate_lock()
199+
self.checkout_mutex = thread.allocate_lock()
200+
self.checkout_mutex.acquire()
201+
202+
def enter(self):
203+
self.checkin_mutex.acquire()
204+
self.waiting = self.waiting + 1
205+
if self.waiting == self.num_threads:
206+
self.waiting = self.num_threads - 1
207+
self.checkout_mutex.release()
208+
return
209+
self.checkin_mutex.release()
210+
211+
self.checkout_mutex.acquire()
212+
self.waiting = self.waiting - 1
213+
if self.waiting == 0:
214+
self.checkin_mutex.release()
215+
return
216+
self.checkout_mutex.release()
217+
218+
219+
class BarrierTest(BasicThreadTest):
220+
221+
def test_barrier(self):
222+
self.bar = Barrier(NUMTASKS)
223+
self.running = NUMTASKS
224+
for i in range(NUMTASKS):
225+
thread.start_new_thread(self.task2, (i,))
226+
verbose_print("waiting for tasks to end")
227+
self.done_mutex.acquire()
228+
verbose_print("tasks done")
229+
230+
def task2(self, ident):
231+
for i in range(NUMTRIPS):
232+
if ident == 0:
233+
# give it a good chance to enter the next
234+
# barrier before the others are all out
235+
# of the current one
236+
delay = 0
237+
else:
238+
with self.random_mutex:
239+
delay = random.random() / 10000.0
240+
verbose_print("task %s will run for %sus" %
241+
(ident, round(delay * 1e6)))
242+
time.sleep(delay)
243+
verbose_print("task %s entering %s" % (ident, i))
244+
self.bar.enter()
245+
verbose_print("task %s leaving barrier" % ident)
246+
with self.running_mutex:
247+
self.running -= 1
248+
# Must release mutex before releasing done, else the main thread can
249+
# exit and set mutex to None as part of global teardown; then
250+
# mutex.release() raises AttributeError.
251+
finished = self.running == 0
252+
if finished:
253+
self.done_mutex.release()
254+
255+
class LockTests(lock_tests.LockTests):
256+
locktype = thread.allocate_lock
257+
258+
259+
class TestForkInThread(unittest.TestCase):
260+
def setUp(self):
261+
self.read_fd, self.write_fd = os.pipe()
262+
263+
@unittest.skipUnless(hasattr(os, 'fork'), 'need os.fork')
264+
@support.reap_threads
265+
def test_forkinthread(self):
266+
running = True
267+
status = "not set"
268+
269+
def thread1():
270+
nonlocal running, status
271+
272+
# fork in a thread
273+
pid = os.fork()
274+
if pid == 0:
275+
# child
276+
try:
277+
os.close(self.read_fd)
278+
os.write(self.write_fd, b"OK")
279+
finally:
280+
os._exit(0)
281+
else:
282+
# parent
283+
os.close(self.write_fd)
284+
pid, status = os.waitpid(pid, 0)
285+
running = False
286+
287+
thread.start_new_thread(thread1, ())
288+
self.assertEqual(os.read(self.read_fd, 2), b"OK",
289+
"Unable to fork() in thread")
290+
while running:
291+
time.sleep(POLL_SLEEP)
292+
self.assertEqual(status, 0)
293+
294+
def tearDown(self):
295+
try:
296+
os.close(self.read_fd)
297+
except OSError:
298+
pass
299+
300+
try:
301+
os.close(self.write_fd)
302+
except OSError:
303+
pass
304+
305+
306+
if __name__ == "__main__":
307+
unittest.main()

graalpython/com.oracle.graal.python/src/com/oracle/graal/python/PythonLanguage.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -470,4 +470,9 @@ public PCode cacheCode(String filename, Supplier<PCode> createCode) {
470470
public static Shape freshShape() {
471471
return freshShape;
472472
}
473+
474+
@Override
475+
protected boolean isThreadAccessAllowed(Thread thread, boolean singleThreaded) {
476+
return true;
477+
}
473478
}

graalpython/com.oracle.graal.python/src/com/oracle/graal/python/builtins/Python3Core.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,8 @@
136136
import com.oracle.graal.python.builtins.objects.slice.SliceBuiltins;
137137
import com.oracle.graal.python.builtins.objects.str.StringBuiltins;
138138
import com.oracle.graal.python.builtins.objects.superobject.SuperBuiltins;
139+
import com.oracle.graal.python.builtins.objects.thread.LockBuiltins;
140+
import com.oracle.graal.python.builtins.objects.thread.ThreadBuiltins;
139141
import com.oracle.graal.python.builtins.objects.traceback.TracebackBuiltins;
140142
import com.oracle.graal.python.builtins.objects.tuple.TupleBuiltins;
141143
import com.oracle.graal.python.builtins.objects.type.PythonBuiltinClass;
@@ -287,6 +289,8 @@ private static final PythonBuiltins[] initializeBuiltins() {
287289
new BufferBuiltins(),
288290
new MemoryviewBuiltins(),
289291
new ThreadModuleBuiltins(),
292+
new ThreadBuiltins(),
293+
new LockBuiltins(),
290294
new SuperBuiltins(),
291295
new BinasciiModuleBuiltins(),
292296
new PosixSubprocessModuleBuiltins(),

0 commit comments

Comments
 (0)