Skip to content

Commit 4dd556a

Browse files
Use _interpreters.call().
1 parent 4152f17 commit 4dd556a

File tree

2 files changed

+52
-69
lines changed

2 files changed

+52
-69
lines changed

Lib/concurrent/futures/interpreter.py

Lines changed: 46 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -45,12 +45,8 @@ def resolve_task(fn, args, kwargs):
4545
# XXX Circle back to this later.
4646
raise TypeError('scripts not supported')
4747
else:
48-
# Functions defined in the __main__ module can't be pickled,
49-
# so they can't be used here. In the future, we could possibly
50-
# borrow from multiprocessing to work around this.
5148
task = (fn, args, kwargs)
52-
data = pickle.dumps(task)
53-
return data
49+
return task
5450

5551
if initializer is not None:
5652
try:
@@ -65,35 +61,6 @@ def create_context():
6561
return cls(initdata, shared)
6662
return create_context, resolve_task
6763

68-
@classmethod
69-
@contextlib.contextmanager
70-
def _capture_exc(cls, resultsid):
71-
try:
72-
yield
73-
except BaseException as exc:
74-
# Send the captured exception out on the results queue,
75-
# but still leave it unhandled for the interpreter to handle.
76-
_interpqueues.put(resultsid, (None, exc))
77-
raise # re-raise
78-
79-
@classmethod
80-
def _send_script_result(cls, resultsid):
81-
_interpqueues.put(resultsid, (None, None))
82-
83-
@classmethod
84-
def _call(cls, func, args, kwargs, resultsid):
85-
with cls._capture_exc(resultsid):
86-
res = func(*args or (), **kwargs or {})
87-
# Send the result back.
88-
with cls._capture_exc(resultsid):
89-
_interpqueues.put(resultsid, (res, None))
90-
91-
@classmethod
92-
def _call_pickled(cls, pickled, resultsid):
93-
with cls._capture_exc(resultsid):
94-
fn, args, kwargs = pickle.loads(pickled)
95-
cls._call(fn, args, kwargs, resultsid)
96-
9764
def __init__(self, initdata, shared=None):
9865
self.initdata = initdata
9966
self.shared = dict(shared) if shared else None
@@ -104,11 +71,46 @@ def __del__(self):
10471
if self.interpid is not None:
10572
self.finalize()
10673

107-
def _exec(self, script):
108-
assert self.interpid is not None
109-
excinfo = _interpreters.exec(self.interpid, script, restrict=True)
74+
def _call(self, fn, args, kwargs):
75+
def do_call(resultsid, func, *args, **kwargs):
76+
try:
77+
return func(*args, **kwargs)
78+
except BaseException as exc:
79+
# Avoid relying on globals.
80+
import _interpqueues
81+
# Send the captured exception out on the results queue,
82+
# but still leave it unhandled for the interpreter to handle.
83+
try:
84+
_interpqueues.put(resultsid, exc)
85+
except TypeError:
86+
# The exception is not shareable.
87+
_interpqueues.put(resultsid, None)
88+
raise # re-raise
89+
90+
args = (self.resultsid, fn, *args)
91+
res, excinfo = _interpreters.call(self.interpid, do_call, args, kwargs)
11092
if excinfo is not None:
11193
raise ExecutionFailed(excinfo)
94+
return res
95+
96+
def _get_exception(self):
97+
# Wait for the exception data to show up.
98+
while True:
99+
try:
100+
excdata = _interpqueues.get(self.resultsid)
101+
except _interpqueues.QueueNotFoundError:
102+
raise # re-raise
103+
except _interpqueues.QueueError:
104+
continue
105+
except ModuleNotFoundError:
106+
# interpreters.queues doesn't exist, which means
107+
# QueueEmpty doesn't. Act as though it does.
108+
continue
109+
else:
110+
break
111+
exc, unboundop = excdata
112+
assert unboundop is None, unboundop
113+
return exc
112114

113115
def initialize(self):
114116
assert self.interpid is None, self.interpid
@@ -119,8 +121,6 @@ def initialize(self):
119121
maxsize = 0
120122
self.resultsid = _interpqueues.create(maxsize)
121123

122-
self._exec(f'from {__name__} import WorkerContext')
123-
124124
if self.shared:
125125
_interpreters.set___main___attrs(
126126
self.interpid, self.shared, restrict=True)
@@ -148,37 +148,15 @@ def finalize(self):
148148
pass
149149

150150
def run(self, task):
151-
data = task
152-
script = f'WorkerContext._call_pickled({data!r}, {self.resultsid})'
153-
151+
fn, args, kwargs = task
154152
try:
155-
self._exec(script)
156-
except ExecutionFailed as exc:
157-
exc_wrapper = exc
158-
else:
159-
exc_wrapper = None
160-
161-
# Return the result, or raise the exception.
162-
while True:
163-
try:
164-
obj = _interpqueues.get(self.resultsid)
165-
except _interpqueues.QueueNotFoundError:
153+
return self._call(fn, args, kwargs)
154+
except ExecutionFailed as wrapper:
155+
exc = self._get_exception()
156+
if exc is None:
157+
# The exception must be not shareable.
166158
raise # re-raise
167-
except _interpqueues.QueueError:
168-
continue
169-
except ModuleNotFoundError:
170-
# interpreters.queues doesn't exist, which means
171-
# QueueEmpty doesn't. Act as though it does.
172-
continue
173-
else:
174-
break
175-
(res, exc), unboundop = obj
176-
assert unboundop is None, unboundop
177-
if exc is not None:
178-
assert res is None, res
179-
assert exc_wrapper is not None
180-
raise exc from exc_wrapper
181-
return res
159+
raise exc from wrapper
182160

183161

184162
class BrokenInterpreterPool(_thread.BrokenThreadPool):

Lib/test/test_concurrent_futures/test_interpreter_pool.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import io
44
import os
55
import pickle
6+
import select
67
import time
78
import unittest
89
from concurrent.futures.interpreter import (
@@ -22,10 +23,14 @@ def noop():
2223

2324

2425
def write_msg(fd, msg):
26+
import os
2527
os.write(fd, msg + b'\0')
2628

2729

28-
def read_msg(fd):
30+
def read_msg(fd, timeout=10.0):
31+
r, _, _ = select.select([fd], [], [], timeout)
32+
if fd not in r:
33+
raise TimeoutError('nothing to read')
2934
msg = b''
3035
while ch := os.read(fd, 1):
3136
if ch == b'\0':

0 commit comments

Comments
 (0)