11"""Implements InterpreterPoolExecutor."""
22
3- import contextlib
4- import pickle
3+ from concurrent import interpreters
4+ import sys
55import textwrap
66from . import thread as _thread
7- import _interpreters
8- import _interpqueues
7+ import traceback
98
109
11- class ExecutionFailed (_interpreters .InterpreterError ):
12- """An unhandled exception happened during execution."""
13-
14- def __init__ (self , excinfo ):
15- msg = excinfo .formatted
16- if not msg :
17- if excinfo .type and excinfo .msg :
18- msg = f'{ excinfo .type .__name__ } : { excinfo .msg } '
19- else :
20- msg = excinfo .type .__name__ or excinfo .msg
21- super ().__init__ (msg )
22- self .excinfo = excinfo
23-
24- def __str__ (self ):
10+ def do_call (results , func , args , kwargs ):
11+ try :
12+ return func (* args , ** kwargs )
13+ except BaseException as exc :
14+ # Send the captured exception out on the results queue,
15+ # but still leave it unhandled for the interpreter to handle.
2516 try :
26- formatted = self .excinfo .errdisplay
27- except Exception :
28- return super ().__str__ ()
29- else :
30- return textwrap .dedent (f"""
31- { super ().__str__ ()}
32-
33- Uncaught in the interpreter:
34-
35- { formatted }
36- """ .strip ())
17+ results .put (exc )
18+ except interpreters .NotShareableError :
19+ # The exception is not shareable.
20+ print ('exception is not shareable:' , file = sys .stderr )
21+ traceback .print_exception (exc )
22+ results .put (None )
23+ raise # re-raise
3724
3825
3926class WorkerContext (_thread .WorkerContext ):
4027
4128 @classmethod
42- def prepare (cls , initializer , initargs , shared ):
29+ def prepare (cls , initializer , initargs ):
4330 def resolve_task (fn , args , kwargs ):
4431 if isinstance (fn , str ):
4532 # XXX Circle back to this later.
4633 raise TypeError ('scripts not supported' )
4734 else :
48- # Functions defined in the __main__ module can't be pickled,
49- # so they can't be used here. In the future, we could possibly
50- # borrow from multiprocessing to work around this.
5135 task = (fn , args , kwargs )
52- data = pickle .dumps (task )
53- return data
36+ return task
5437
5538 if initializer is not None :
5639 try :
@@ -62,68 +45,24 @@ def resolve_task(fn, args, kwargs):
6245 else :
6346 initdata = None
6447 def create_context ():
65- return cls (initdata , shared )
48+ return cls (initdata )
6649 return create_context , resolve_task
6750
68- @classmethod
69- @contextlib .contextmanager
70- def _capture_exc (cls , resultsid ):
71- try :
72- yield
73- except BaseException as exc :
74- # Send the captured exception out on the results queue,
75- # but still leave it unhandled for the interpreter to handle.
76- _interpqueues .put (resultsid , (None , exc ))
77- raise # re-raise
78-
79- @classmethod
80- def _send_script_result (cls , resultsid ):
81- _interpqueues .put (resultsid , (None , None ))
82-
83- @classmethod
84- def _call (cls , func , args , kwargs , resultsid ):
85- with cls ._capture_exc (resultsid ):
86- res = func (* args or (), ** kwargs or {})
87- # Send the result back.
88- with cls ._capture_exc (resultsid ):
89- _interpqueues .put (resultsid , (res , None ))
90-
91- @classmethod
92- def _call_pickled (cls , pickled , resultsid ):
93- with cls ._capture_exc (resultsid ):
94- fn , args , kwargs = pickle .loads (pickled )
95- cls ._call (fn , args , kwargs , resultsid )
96-
97- def __init__ (self , initdata , shared = None ):
51+ def __init__ (self , initdata ):
9852 self .initdata = initdata
99- self .shared = dict (shared ) if shared else None
100- self .interpid = None
101- self .resultsid = None
53+ self .interp = None
54+ self .results = None
10255
10356 def __del__ (self ):
104- if self .interpid is not None :
57+ if self .interp is not None :
10558 self .finalize ()
10659
107- def _exec (self , script ):
108- assert self .interpid is not None
109- excinfo = _interpreters .exec (self .interpid , script , restrict = True )
110- if excinfo is not None :
111- raise ExecutionFailed (excinfo )
112-
11360 def initialize (self ):
114- assert self .interpid is None , self .interpid
115- self .interpid = _interpreters .create (reqrefs = True )
61+ assert self .interp is None , self .interp
62+ self .interp = interpreters .create ()
11663 try :
117- _interpreters .incref (self .interpid )
118-
11964 maxsize = 0
120- self .resultsid = _interpqueues .create (maxsize )
121-
122- self ._exec (f'from { __name__ } import WorkerContext' )
123-
124- if self .shared :
125- _interpreters .set___main___attrs (
126- self .interpid , self .shared , restrict = True )
65+ self .results = interpreters .create_queue (maxsize )
12766
12867 if self .initdata :
12968 self .run (self .initdata )
@@ -132,53 +71,25 @@ def initialize(self):
13271 raise # re-raise
13372
13473 def finalize (self ):
135- interpid = self .interpid
136- resultsid = self .resultsid
137- self .resultsid = None
138- self .interpid = None
139- if resultsid is not None :
140- try :
141- _interpqueues .destroy (resultsid )
142- except _interpqueues .QueueNotFoundError :
143- pass
144- if interpid is not None :
145- try :
146- _interpreters .decref (interpid )
147- except _interpreters .InterpreterNotFoundError :
148- pass
74+ interp = self .interp
75+ results = self .results
76+ self .results = None
77+ self .interp = None
78+ if results is not None :
79+ del results
80+ if interp is not None :
81+ interp .close ()
14982
15083 def run (self , task ):
151- data = task
152- script = f'WorkerContext._call_pickled({ data !r} , { self .resultsid } )'
153-
15484 try :
155- self ._exec (script )
156- except ExecutionFailed as exc :
157- exc_wrapper = exc
158- else :
159- exc_wrapper = None
160-
161- # Return the result, or raise the exception.
162- while True :
163- try :
164- obj = _interpqueues .get (self .resultsid )
165- except _interpqueues .QueueNotFoundError :
85+ return self .interp .call (do_call , self .results , * task )
86+ except interpreters .ExecutionFailed as wrapper :
87+ # Wait for the exception data to show up.
88+ exc = self .results .get ()
89+ if exc is None :
90+ # The exception must have been not shareable.
16691 raise # re-raise
167- except _interpqueues .QueueError :
168- continue
169- except ModuleNotFoundError :
170- # interpreters._queues doesn't exist, which means
171- # QueueEmpty doesn't. Act as though it does.
172- continue
173- else :
174- break
175- (res , exc ), unboundop = obj
176- assert unboundop is None , unboundop
177- if exc is not None :
178- assert res is None , res
179- assert exc_wrapper is not None
180- raise exc from exc_wrapper
181- return res
92+ raise exc from wrapper
18293
18394
18495class BrokenInterpreterPool (_thread .BrokenThreadPool ):
@@ -192,11 +103,11 @@ class InterpreterPoolExecutor(_thread.ThreadPoolExecutor):
192103 BROKEN = BrokenInterpreterPool
193104
194105 @classmethod
195- def prepare_context (cls , initializer , initargs , shared ):
196- return WorkerContext .prepare (initializer , initargs , shared )
106+ def prepare_context (cls , initializer , initargs ):
107+ return WorkerContext .prepare (initializer , initargs )
197108
198109 def __init__ (self , max_workers = None , thread_name_prefix = '' ,
199- initializer = None , initargs = (), shared = None ):
110+ initializer = None , initargs = ()):
200111 """Initializes a new InterpreterPoolExecutor instance.
201112
202113 Args:
@@ -206,8 +117,6 @@ def __init__(self, max_workers=None, thread_name_prefix='',
206117 initializer: A callable or script used to initialize
207118 each worker interpreter.
208119 initargs: A tuple of arguments to pass to the initializer.
209- shared: A mapping of shareabled objects to be inserted into
210- each worker interpreter.
211120 """
212121 super ().__init__ (max_workers , thread_name_prefix ,
213- initializer , initargs , shared = shared )
122+ initializer , initargs )
0 commit comments