Skip to content

Commit e3572dd

Browse files
committed
DOP-2696: Pass exceptions through WorkerLauncher rather than deadlock
1 parent 8079563 commit e3572dd

File tree

3 files changed

+28
-13
lines changed

3 files changed

+28
-13
lines changed

snooty/parser.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1229,15 +1229,16 @@ def merge_diagnostics(
12291229

12301230
def flush(
12311231
self,
1232-
) -> queue.Queue[Union[PostprocessorResult, util.CancelledException]]:
1232+
) -> queue.Queue[Tuple[Optional[PostprocessorResult], Optional[Exception]]]:
12331233
"""Run the postprocessor if and only if any pages have changed, and return postprocessing results."""
12341234
postprocessor = self.postprocessor_factory()
12351235
return self.worker.run(postprocessor)
12361236

12371237
def flush_and_wait(self) -> PostprocessorResult:
1238-
result = self.flush().get()
1239-
if isinstance(result, util.CancelledException):
1240-
raise result
1238+
result, exception = self.flush().get()
1239+
if exception:
1240+
raise exception
1241+
assert result is not None
12411242
return result
12421243

12431244
def cancel(self) -> None:

snooty/test_util.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -171,3 +171,15 @@ def run_worker() -> None:
171171

172172
finally:
173173
sys.setswitchinterval(initial_switch_interval)
174+
175+
176+
def test_worker_that_fails() -> None:
177+
class SomeException(Exception):
178+
pass
179+
180+
def start(event: threading.Event, arg: object) -> None:
181+
raise SomeException()
182+
183+
worker = util.WorkerLauncher("worker-test", start)
184+
with pytest.raises(SomeException):
185+
worker.run_and_wait(None)

snooty/util.py

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -483,11 +483,13 @@ def __init__(
483483
self.__thread: Optional[threading.Thread] = None
484484
self.__cancel = threading.Event()
485485

486-
def run(self, arg: _T) -> queue.Queue[Union[_R, CancelledException]]:
486+
def run(self, arg: _T) -> queue.Queue[Tuple[Optional[_R], Optional[Exception]]]:
487487
"""Cancel any current thread of execution; block until it is cancelled; and re-launch the worker."""
488488
self.cancel()
489489

490-
result_queue: queue.Queue[Union[_R, CancelledException]] = queue.Queue(1)
490+
result_queue: queue.Queue[
491+
Tuple[Optional[_R], Optional[Exception]]
492+
] = queue.Queue(1)
491493

492494
def inner() -> None:
493495
try:
@@ -497,9 +499,9 @@ def inner() -> None:
497499
if self.__cancel.is_set():
498500
raise CancelledException()
499501

500-
result_queue.put(result)
501-
except CancelledException as cancelled:
502-
result_queue.put(cancelled)
502+
result_queue.put((result, None))
503+
except Exception as exception:
504+
result_queue.put((None, exception))
503505

504506
thread = threading.Thread(name=self.name, target=inner, daemon=True)
505507
with self._lock:
@@ -509,11 +511,11 @@ def inner() -> None:
509511
return result_queue
510512

511513
def run_and_wait(self, arg: _T) -> _R:
512-
result = self.run(arg).get()
513-
if isinstance(result, CancelledException):
514-
raise result
514+
result, exception = self.run(arg).get()
515+
if exception:
516+
raise exception
515517

516-
return result
518+
return cast(_R, result)
517519

518520
def cancel(self) -> None:
519521
"""Instruct the worker to raise WorkerCanceled and abort execution."""

0 commit comments

Comments
 (0)