Skip to content

Commit 35076d9

Browse files
Use interpreters instead of _interpreters.
1 parent cfc8566 commit 35076d9

File tree

2 files changed

+37
-111
lines changed

2 files changed

+37
-111
lines changed

Lib/concurrent/futures/interpreter.py

Lines changed: 34 additions & 106 deletions
Original file line numberDiff line numberDiff line change
@@ -1,39 +1,26 @@
11
"""Implements InterpreterPoolExecutor."""
22

3-
import contextlib
4-
import pickle
3+
from concurrent import interpreters
4+
import sys
55
import textwrap
66
from . import thread as _thread
7-
import _interpreters
8-
import _interpqueues
7+
import traceback
98

109

11-
class ExecutionFailed(_interpreters.InterpreterError):
12-
"""An unhandled exception happened during execution."""
13-
14-
def __init__(self, excinfo):
15-
msg = excinfo.formatted
16-
if not msg:
17-
if excinfo.type and excinfo.msg:
18-
msg = f'{excinfo.type.__name__}: {excinfo.msg}'
19-
else:
20-
msg = excinfo.type.__name__ or excinfo.msg
21-
super().__init__(msg)
22-
self.excinfo = excinfo
23-
24-
def __str__(self):
10+
def do_call(results, func, args, kwargs):
11+
try:
12+
return func(*args, **kwargs)
13+
except BaseException as exc:
14+
# Send the captured exception out on the results queue,
15+
# but still leave it unhandled for the interpreter to handle.
2516
try:
26-
formatted = self.excinfo.errdisplay
27-
except Exception:
28-
return super().__str__()
29-
else:
30-
return textwrap.dedent(f"""
31-
{super().__str__()}
32-
33-
Uncaught in the interpreter:
34-
35-
{formatted}
36-
""".strip())
17+
results.put(exc)
18+
except interpreters.NotShareableError:
19+
# The exception is not shareable.
20+
print('exception is not shareable:', file=sys.stderr)
21+
traceback.print_exception(exc)
22+
results.put(None)
23+
raise # re-raise
3724

3825

3926
class WorkerContext(_thread.WorkerContext):
@@ -63,72 +50,19 @@ def create_context():
6350

6451
def __init__(self, initdata):
6552
self.initdata = initdata
66-
self.interpid = None
67-
self.resultsid = None
53+
self.interp = None
54+
self.results = None
6855

6956
def __del__(self):
70-
if self.interpid is not None:
57+
if self.interp is not None:
7158
self.finalize()
7259

73-
def _call(self, fn, args, kwargs):
74-
def do_call(resultsid, func, *args, **kwargs):
75-
try:
76-
return func(*args, **kwargs)
77-
except BaseException as exc:
78-
# Avoid relying on globals.
79-
import _interpreters
80-
import _interpqueues
81-
# Send the captured exception out on the results queue,
82-
# but still leave it unhandled for the interpreter to handle.
83-
try:
84-
_interpqueues.put(resultsid, exc)
85-
except _interpreters.NotShareableError:
86-
# The exception is not shareable.
87-
import sys
88-
import traceback
89-
print('exception is not shareable:', file=sys.stderr)
90-
traceback.print_exception(exc)
91-
_interpqueues.put(resultsid, None)
92-
raise # re-raise
93-
94-
args = (self.resultsid, fn, *args)
95-
res, excinfo = _interpreters.call(self.interpid, do_call, args, kwargs)
96-
if excinfo is not None:
97-
raise ExecutionFailed(excinfo)
98-
return res
99-
100-
def _get_exception(self):
101-
# Wait for the exception data to show up.
102-
while True:
103-
try:
104-
excdata = _interpqueues.get(self.resultsid)
105-
except _interpqueues.QueueNotFoundError:
106-
raise # re-raise
107-
except _interpqueues.QueueError as exc:
108-
if exc.__cause__ is not None or exc.__context__ is not None:
109-
raise # re-raise
110-
if str(exc).endswith(' is empty'):
111-
continue
112-
else:
113-
raise # re-raise
114-
except ModuleNotFoundError:
115-
# interpreters.queues doesn't exist, which means
116-
# QueueEmpty doesn't. Act as though it does.
117-
continue
118-
else:
119-
break
120-
exc, unboundop = excdata
121-
assert unboundop is None, unboundop
122-
return exc
123-
12460
def initialize(self):
125-
assert self.interpid is None, self.interpid
126-
self.interpid = _interpreters.create(reqrefs=True)
61+
assert self.interp is None, self.interp
62+
self.interp = interpreters.create()
12763
try:
128-
_interpreters.incref(self.interpid)
129-
13064
maxsize = 0
131-
self.resultsid = _interpqueues.create(maxsize)
65+
self.results = interpreters.create_queue(maxsize)
13266

13367
if self.initdata:
13468
self.run(self.initdata)
@@ -137,27 +71,21 @@ def initialize(self):
13771
raise # re-raise
13872

13973
def finalize(self):
140-
interpid = self.interpid
141-
resultsid = self.resultsid
142-
self.resultsid = None
143-
self.interpid = None
144-
if resultsid is not None:
145-
try:
146-
_interpqueues.destroy(resultsid)
147-
except _interpqueues.QueueNotFoundError:
148-
pass
149-
if interpid is not None:
150-
try:
151-
_interpreters.decref(interpid)
152-
except _interpreters.InterpreterNotFoundError:
153-
pass
74+
interp = self.interp
75+
results = self.results
76+
self.results = None
77+
self.interp = None
78+
if results is not None:
79+
del results
80+
if interp is not None:
81+
interp.close()
15482

15583
def run(self, task):
156-
fn, args, kwargs = task
15784
try:
158-
return self._call(fn, args, kwargs)
159-
except ExecutionFailed as wrapper:
160-
exc = self._get_exception()
85+
return self.interp.call(do_call, self.results, *task)
86+
except interpreters.ExecutionFailed as wrapper:
87+
# Wait for the exception data to show up.
88+
exc = self.results.get()
16189
if exc is None:
16290
# The exception must have been not shareable.
16391
raise # re-raise

Lib/test/test_concurrent_futures/test_interpreter_pool.py

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,7 @@
55
import sys
66
import time
77
import unittest
8-
from concurrent.futures.interpreter import (
9-
ExecutionFailed, BrokenInterpreterPool,
10-
)
8+
from concurrent.futures.interpreter import BrokenInterpreterPool
119
from concurrent import interpreters
1210
from concurrent.interpreters import _queues as queues
1311
import _interpreters
@@ -325,7 +323,7 @@ def test_submit_exception_in_script(self):
325323
self.assertIs(type(captured.exception), Exception)
326324
self.assertEqual(str(captured.exception), 'spam')
327325
cause = captured.exception.__cause__
328-
self.assertIs(type(cause), ExecutionFailed)
326+
self.assertIs(type(cause), interpreters.ExecutionFailed)
329327
for attr in ('__name__', '__qualname__', '__module__'):
330328
self.assertEqual(getattr(cause.excinfo.type, attr),
331329
getattr(Exception, attr))
@@ -338,7 +336,7 @@ def test_submit_exception_in_func(self):
338336
self.assertIs(type(captured.exception), Exception)
339337
self.assertEqual(str(captured.exception), 'spam')
340338
cause = captured.exception.__cause__
341-
self.assertIs(type(cause), ExecutionFailed)
339+
self.assertIs(type(cause), interpreters.ExecutionFailed)
342340
for attr in ('__name__', '__qualname__', '__module__'):
343341
self.assertEqual(getattr(cause.excinfo.type, attr),
344342
getattr(Exception, attr))

0 commit comments

Comments
 (0)