Skip to content

Commit 30379fd

Browse files
Add InterpreterPoolExecutor.
1 parent 63dd89b commit 30379fd

File tree

5 files changed

+270
-1
lines changed

5 files changed

+270
-1
lines changed

Lib/concurrent/futures/__init__.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
'Executor',
3030
'wait',
3131
'as_completed',
32+
'InterpreterPoolExecutor',
3233
'ProcessPoolExecutor',
3334
'ThreadPoolExecutor',
3435
)
@@ -51,4 +52,9 @@ def __getattr__(name):
5152
ThreadPoolExecutor = te
5253
return te
5354

55+
if name == 'InterpreterPoolExecutor':
56+
from .interpreter import InterpreterPoolExecutor as ie
57+
InterpreterPoolExecutor = ie
58+
return ie
59+
5460
raise AttributeError(f"module {__name__!r} has no attribute {name!r}")
Lines changed: 183 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,183 @@
1+
"""Implements InterpreterPoolExecutor."""
2+
3+
import pickle
4+
from . import thread as _thread
5+
import _interpreters
6+
import _interpqueues
7+
8+
9+
LINESEP = '''
10+
'''
11+
12+
13+
_EXEC_FAILURE_STR = """
14+
{superstr}
15+
16+
Uncaught in the interpreter:
17+
18+
{formatted}
19+
""".strip()
20+
21+
22+
class ExecutionFailed(_interpreters.InterpreterError):
23+
"""An unhandled exception happened during execution."""
24+
25+
def __init__(self, excinfo):
26+
msg = excinfo.formatted
27+
if not msg:
28+
if excinfo.type and excinfo.msg:
29+
msg = f'{excinfo.type.__name__}: {excinfo.msg}'
30+
else:
31+
msg = excinfo.type.__name__ or excinfo.msg
32+
super().__init__(msg)
33+
self.excinfo = excinfo
34+
35+
def __str__(self):
36+
try:
37+
formatted = self.excinfo.errdisplay
38+
except Exception:
39+
return super().__str__()
40+
else:
41+
return _EXEC_FAILURE_STR.format(
42+
superstr=super().__str__(),
43+
formatted=formatted,
44+
)
45+
46+
47+
UNBOUND = 2 # error; this should not happen.
48+
49+
50+
class WorkerContext(_thread.WorkerContext):
51+
52+
@classmethod
53+
def prepare(cls, initializer, initargs, shared):
54+
if isinstance(initializer, str):
55+
if initargs:
56+
raise ValueError(f'an initializer script does not take args, got {args!r}')
57+
initscript = initializer
58+
# Make sure the script compiles.
59+
# XXX Keep the compiled code object?
60+
compile(script, '<string>', 'exec')
61+
elif initializer is not None:
62+
pickled = pickle.dumps((initializer, initargs))
63+
initscript = f'''if True:
64+
initializer, initargs = pickle.loads({pickled!r})
65+
initializer(*initargs)
66+
'''
67+
else:
68+
initscript = None
69+
def create_context():
70+
return cls(initscript, shared)
71+
def resolve_task(cls, fn, args, kwargs):
72+
if isinstance(fn, str):
73+
if args or kwargs:
74+
raise ValueError(f'a script does not take args or kwargs, got {args!r} and {kwargs!r}')
75+
data = fn
76+
kind = 'script'
77+
else:
78+
data = pickle.dumps((fn, args, kwargs))
79+
kind = 'function'
80+
return (data, kind)
81+
return create_context, resolve_task
82+
83+
@classmethod
84+
def _run_pickled_func(cls, data, resultsid):
85+
fn, args, kwargs = pickle.loads(data)
86+
res = fn(*args, **kwargs)
87+
# Send the result back.
88+
try:
89+
_interpqueues.put(resultsid, res, 0, UNBOUND)
90+
except _interpreters.NotShareableError:
91+
res = pickle.dumps(res)
92+
_interpqueues.put(resultsid, res, 1, UNBOUND)
93+
94+
def __init__(self, initscript, shared=None):
95+
self.initscript = initscript or ''
96+
self.shared = dict(shared) if shared else None
97+
self.interpid = None
98+
self.resultsid = None
99+
100+
def __del__(self):
101+
if self.interpid is not None:
102+
self.finalize()
103+
104+
def _exec(self, script):
105+
assert self.interpid is not None
106+
excinfo = _interpreters.exec(self.interpid, script, restrict=True)
107+
if excinfo is not None:
108+
raise ExecutionFailed(excinfo)
109+
110+
def initialize(self):
111+
assert self.interpid is None, self.interpid
112+
self.interpid = _interpreters.create(reqrefs=True)
113+
# This may raise InterpreterNotFoundError:
114+
_interpreters.incref(self.interpid)
115+
116+
maxsize = 0
117+
fmt = 0
118+
self.resultsid = _interpqueues.create(maxsize, fmt, UNBOUND)
119+
120+
initscript = f"""if True:
121+
from {__name__} import WorkerContext
122+
"""
123+
initscript += LINESEP + self.initscript
124+
# for i, line in enumerate(initscript.splitlines(), 1):
125+
# print(f'{i:>3} {line}')
126+
self._exec(initscript)
127+
if self.shared:
128+
_interpreters.set___main___attrs(
129+
self.interpid, self.shared, restrict=True)
130+
131+
def finalize(self):
132+
interpid = self.interpid
133+
resultsid = self.resultsid
134+
self.resultsid = None
135+
self.interpid = None
136+
assert interpid is not None
137+
assert resultsid is not None
138+
try:
139+
_interpqueues.destroy(resultsid)
140+
except _interpqueues.QueueNotFoundError:
141+
pass
142+
try:
143+
_interpreters.decref(interpid)
144+
except _interpreters.InterpreterNotFoundError:
145+
pass
146+
147+
def run(self, task):
148+
data, kind = task
149+
if kind == 'script':
150+
self._exec(script)
151+
return None
152+
elif kind == 'function':
153+
self._exec(
154+
f'WorkerContext._run_pickled_func({data}, {self.resultsid})')
155+
obj, pickled, unboundop = _interpqueues.get(self.resultsid)
156+
assert unboundop is None, unboundop
157+
return pickle.loads(obj) if pickled else obj
158+
else:
159+
raise NotImplementedError(kind)
160+
161+
162+
class InterpreterPoolExecutor(_thread.ThreadPoolExecutor):
163+
164+
@classmethod
165+
def prepare_context(cls, initializer, initargs, shared):
166+
return WorkerContext.prepare(initializer, initargs, shared)
167+
168+
def __init__(self, max_workers=None, thread_name_prefix='',
169+
initializer=None, initargs=(), shared=None):
170+
"""Initializes a new InterpreterPoolExecutor instance.
171+
172+
Args:
173+
max_workers: The maximum number of interpreters that can be used to
174+
execute the given calls.
175+
thread_name_prefix: An optional name prefix to give our threads.
176+
initializer: A callable or script used to initialize
177+
each worker interpreter.
178+
initargs: A tuple of arguments to pass to the initializer.
179+
shared: A mapping of shareabled objects to be inserted into
180+
each worker interpreter.
181+
"""
182+
super().__init__(max_workers, thread_name_prefix,
183+
initializer, initargs, shared=shared)

Lib/test/test_concurrent_futures/executor.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,10 @@ def make_dummy_object(_):
2323

2424

2525
class ExecutorTest:
26+
27+
def assertTaskRaises(self, exctype):
28+
return self.assertRaises(exctype)
29+
2630
# Executor.shutdown() and context manager usage is tested by
2731
# ExecutorShutdownTest.
2832
def test_submit(self):
@@ -52,7 +56,8 @@ def test_map_exception(self):
5256
i = self.executor.map(divmod, [1, 1, 1, 1], [2, 3, 0, 5])
5357
self.assertEqual(i.__next__(), (0, 1))
5458
self.assertEqual(i.__next__(), (0, 1))
55-
self.assertRaises(ZeroDivisionError, i.__next__)
59+
with self.assertTaskRaises(ZeroDivisionError):
60+
i.__next__()
5661

5762
@support.requires_resource('walltime')
5863
def test_map_timeout(self):
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
import unittest
2+
from concurrent.futures.interpreter import ExecutionFailed
3+
from test import support
4+
from test.support.interpreters import queues
5+
6+
from .executor import ExecutorTest, mul
7+
from .util import BaseTestCase, InterpreterPoolMixin, setup_module
8+
9+
10+
class InterpreterPoolExecutorTest(InterpreterPoolMixin, ExecutorTest, BaseTestCase):
11+
12+
def assertTaskRaises(self, exctype):
13+
return self.assertRaisesRegex(ExecutionFailed, exctype.__name__)
14+
15+
def test_saturation(self):
16+
blocker = queues.create()
17+
executor = self.executor_type(4, shared=dict(blocker=blocker))
18+
19+
for i in range(15 * executor._max_workers):
20+
executor.submit('blocker.get()')
21+
self.assertEqual(len(executor._threads), executor._max_workers)
22+
for i in range(15 * executor._max_workers):
23+
blocker.put_nowait(None)
24+
executor.shutdown(wait=True)
25+
26+
@support.requires_gil_enabled("gh-117344: test is flaky without the GIL")
27+
def test_idle_thread_reuse(self):
28+
executor = self.executor_type()
29+
executor.submit(mul, 21, 2).result()
30+
executor.submit(mul, 6, 7).result()
31+
executor.submit(mul, 3, 14).result()
32+
self.assertEqual(len(executor._threads), 1)
33+
executor.shutdown(wait=True)
34+
35+
# def test_executor_map_current_future_cancel(self):
36+
# blocker = queues.create()
37+
# log = queues.create()
38+
#
39+
# script = """if True:
40+
# def log_n_wait({ident}):
41+
# blocker(f"ident {ident} started")
42+
# try:
43+
# stop_event.wait()
44+
# finally:
45+
# log.append(f"ident {ident} stopped")
46+
# """
47+
#
48+
# with self.executor_type(max_workers=1) as pool:
49+
# # submit work to saturate the pool
50+
# fut = pool.submit(script.format(ident="first"))
51+
# gen = pool.map(log_n_wait, ["second", "third"], timeout=0)
52+
# try:
53+
# with self.assertRaises(TimeoutError):
54+
# next(gen)
55+
# finally:
56+
# gen.close()
57+
# blocker.put
58+
# stop_event.set()
59+
# fut.result()
60+
# # ident='second' is cancelled as a result of raising a TimeoutError
61+
# # ident='third' is cancelled because it remained in the collection of futures
62+
# self.assertListEqual(log, ["ident='first' started", "ident='first' stopped"])
63+
64+
65+
def setUpModule():
66+
setup_module()
67+
68+
69+
if __name__ == "__main__":
70+
unittest.main()

Lib/test/test_concurrent_futures/util.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,10 @@ class ThreadPoolMixin(ExecutorMixin):
7474
executor_type = futures.ThreadPoolExecutor
7575

7676

77+
class InterpreterPoolMixin(ExecutorMixin):
78+
executor_type = futures.InterpreterPoolExecutor
79+
80+
7781
class ProcessPoolForkMixin(ExecutorMixin):
7882
executor_type = futures.ProcessPoolExecutor
7983
ctx = "fork"
@@ -120,6 +124,7 @@ def get_context(self):
120124

121125
def create_executor_tests(remote_globals, mixin, bases=(BaseTestCase,),
122126
executor_mixins=(ThreadPoolMixin,
127+
InterpreterPoolMixin,
123128
ProcessPoolForkMixin,
124129
ProcessPoolForkserverMixin,
125130
ProcessPoolSpawnMixin)):

0 commit comments

Comments
 (0)