@@ -45,12 +45,8 @@ def resolve_task(fn, args, kwargs):
4545 # XXX Circle back to this later.
4646 raise TypeError ('scripts not supported' )
4747 else :
48- # Functions defined in the __main__ module can't be pickled,
49- # so they can't be used here. In the future, we could possibly
50- # borrow from multiprocessing to work around this.
5148 task = (fn , args , kwargs )
52- data = pickle .dumps (task )
53- return data
49+ return task
5450
5551 if initializer is not None :
5652 try :
@@ -65,35 +61,6 @@ def create_context():
6561 return cls (initdata , shared )
6662 return create_context , resolve_task
6763
68- @classmethod
69- @contextlib .contextmanager
70- def _capture_exc (cls , resultsid ):
71- try :
72- yield
73- except BaseException as exc :
74- # Send the captured exception out on the results queue,
75- # but still leave it unhandled for the interpreter to handle.
76- _interpqueues .put (resultsid , (None , exc ))
77- raise # re-raise
78-
79- @classmethod
80- def _send_script_result (cls , resultsid ):
81- _interpqueues .put (resultsid , (None , None ))
82-
83- @classmethod
84- def _call (cls , func , args , kwargs , resultsid ):
85- with cls ._capture_exc (resultsid ):
86- res = func (* args or (), ** kwargs or {})
87- # Send the result back.
88- with cls ._capture_exc (resultsid ):
89- _interpqueues .put (resultsid , (res , None ))
90-
91- @classmethod
92- def _call_pickled (cls , pickled , resultsid ):
93- with cls ._capture_exc (resultsid ):
94- fn , args , kwargs = pickle .loads (pickled )
95- cls ._call (fn , args , kwargs , resultsid )
96-
9764 def __init__ (self , initdata , shared = None ):
9865 self .initdata = initdata
9966 self .shared = dict (shared ) if shared else None
@@ -104,11 +71,56 @@ def __del__(self):
10471 if self .interpid is not None :
10572 self .finalize ()
10673
107- def _exec (self , script ):
108- assert self .interpid is not None
109- excinfo = _interpreters .exec (self .interpid , script , restrict = True )
74+ def _call (self , fn , args , kwargs ):
75+ def do_call (resultsid , func , * args , ** kwargs ):
76+ try :
77+ return func (* args , ** kwargs )
78+ except BaseException as exc :
79+ # Avoid relying on globals.
80+ import _interpreters
81+ import _interpqueues
82+ # Send the captured exception out on the results queue,
83+ # but still leave it unhandled for the interpreter to handle.
84+ try :
85+ _interpqueues .put (resultsid , exc )
86+ except _interpreters .NotShareableError :
87+ # The exception is not shareable.
88+ import sys
89+ import traceback
90+ print ('exception is not shareable:' , file = sys .stderr )
91+ traceback .print_exception (exc )
92+ _interpqueues .put (resultsid , None )
93+ raise # re-raise
94+
95+ args = (self .resultsid , fn , * args )
96+ res , excinfo = _interpreters .call (self .interpid , do_call , args , kwargs )
11097 if excinfo is not None :
11198 raise ExecutionFailed (excinfo )
99+ return res
100+
101+ def _get_exception (self ):
102+ # Wait for the exception data to show up.
103+ while True :
104+ try :
105+ excdata = _interpqueues .get (self .resultsid )
106+ except _interpqueues .QueueNotFoundError :
107+ raise # re-raise
108+ except _interpqueues .QueueError as exc :
109+ if exc .__cause__ is not None or exc .__context__ is not None :
110+ raise # re-raise
111+ if str (exc ).endswith (' is empty' ):
112+ continue
113+ else :
114+ raise # re-raise
115+ except ModuleNotFoundError :
116+ # interpreters.queues doesn't exist, which means
117+ # QueueEmpty doesn't. Act as though it does.
118+ continue
119+ else :
120+ break
121+ exc , unboundop = excdata
122+ assert unboundop is None , unboundop
123+ return exc
112124
113125 def initialize (self ):
114126 assert self .interpid is None , self .interpid
@@ -119,8 +131,6 @@ def initialize(self):
119131 maxsize = 0
120132 self .resultsid = _interpqueues .create (maxsize )
121133
122- self ._exec (f'from { __name__ } import WorkerContext' )
123-
124134 if self .shared :
125135 _interpreters .set___main___attrs (
126136 self .interpid , self .shared , restrict = True )
@@ -148,37 +158,15 @@ def finalize(self):
148158 pass
149159
150160 def run (self , task ):
151- data = task
152- script = f'WorkerContext._call_pickled({ data !r} , { self .resultsid } )'
153-
161+ fn , args , kwargs = task
154162 try :
155- self ._exec (script )
156- except ExecutionFailed as exc :
157- exc_wrapper = exc
158- else :
159- exc_wrapper = None
160-
161- # Return the result, or raise the exception.
162- while True :
163- try :
164- obj = _interpqueues .get (self .resultsid )
165- except _interpqueues .QueueNotFoundError :
163+ return self ._call (fn , args , kwargs )
164+ except ExecutionFailed as wrapper :
165+ exc = self ._get_exception ()
166+ if exc is None :
167+ # The exception must have been not shareable.
166168 raise # re-raise
167- except _interpqueues .QueueError :
168- continue
169- except ModuleNotFoundError :
170- # interpreters.queues doesn't exist, which means
171- # QueueEmpty doesn't. Act as though it does.
172- continue
173- else :
174- break
175- (res , exc ), unboundop = obj
176- assert unboundop is None , unboundop
177- if exc is not None :
178- assert res is None , res
179- assert exc_wrapper is not None
180- raise exc from exc_wrapper
181- return res
169+ raise exc from wrapper
182170
183171
184172class BrokenInterpreterPool (_thread .BrokenThreadPool ):
0 commit comments