Skip to content
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 12 additions & 4 deletions Lib/multiprocessing/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import itertools
import os
import queue
import sys
import threading
import time
import traceback
Expand Down Expand Up @@ -776,13 +777,20 @@ def get(self, timeout=None):
def _set(self, i, obj):
self._success, self._value = obj
if self._callback and self._success:
self._callback(self._value)
self._handle_exceptions(self._callback, self._value)
if self._error_callback and not self._success:
self._error_callback(self._value)
self._handle_exceptions(self._error_callback, self._value)
self._event.set()
del self._cache[self._job]
self._pool = None

@staticmethod
def _handle_exceptions(callback, args):
try:
return callback(args)
except Exception as e:
sys.excepthook(*sys.exc_info())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe threading.excepthook would be better to call?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, quite possibly. In fact I did use that in an earlier version of the code. I decided to change because the thread is internal to the pool code, which does not set its excepthook. So ISTM it will always end up using the system hook anyway, and the code is a little uglier because it needs to marshal the args then explicitly del them to avoid reference cycles:

    except Exception as e:
            args = threading.ExceptHookArgs([type(e), e, e.__traceback__, None])
            threading.excepthook(args)
            del args

However, it probably is more robust/future-proof/correct to use threading.excepthook, so I'm happy to switch to this if you think it is appropriate.


__class_getitem__ = classmethod(types.GenericAlias)

AsyncResult = ApplyResult # create alias -- see #17805
Expand Down Expand Up @@ -813,7 +821,7 @@ def _set(self, i, success_result):
self._value[i*self._chunksize:(i+1)*self._chunksize] = result
if self._number_left == 0:
if self._callback:
self._callback(self._value)
self._handle_exceptions(self._callback, self._value)
del self._cache[self._job]
self._event.set()
self._pool = None
Expand All @@ -825,7 +833,7 @@ def _set(self, i, success_result):
if self._number_left == 0:
# only consider the result ready once all jobs are done
if self._error_callback:
self._error_callback(self._value)
self._handle_exceptions(self._error_callback, self._value)
del self._cache[self._job]
self._event.set()
self._pool = None
Expand Down
Loading