Skip to content

Commit 2d9e123

Browse files
committed
fix race condition where pipeline exhausts with unraised errors
1 parent 01e4a0a commit 2d9e123

File tree

2 files changed

+9
-3
lines changed

2 files changed

+9
-3
lines changed

src/pyper/_core/sync_helper/output.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,11 +35,11 @@ def __call__(self, *args, **kwargs):
3535
with ThreadPool() as tp:
3636
q_out = self._get_q_out(tp, *args, **kwargs)
3737
while True:
38-
if tp.has_error:
39-
raise tp.get_error()
38+
tp.raise_error_if_exists()
4039
try:
4140
# Use the timeout strategy for unblocking main thread without busy waiting
4241
if (data := q_out.get(timeout=1)) is StopSentinel:
42+
tp.raise_error_if_exists()
4343
break
4444
yield data
4545
except queue.Empty:

src/pyper/_core/util/thread_pool.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,13 +35,19 @@ def get_error(self) -> Exception:
3535

3636
def put_error(self, e: Exception):
3737
self._error_queue.put(e)
38+
39+
def raise_error_if_exists(self):
40+
if self.has_error:
41+
raise self.get_error()
3842

3943
def submit(self, func, /, args=(), kwargs=None, daemon=False):
4044
@functools.wraps(func)
4145
def wrapper(*args, **kwargs):
4246
try:
4347
return func(*args, **kwargs)
44-
except Exception as e:
48+
# In the pipeline, each worker will handle errors internally, so this will never be reached in tests.
49+
# This wrapper is still implemented for safety; it never makes sense not to capture errors here
50+
except Exception as e: # pragma: no cover
4551
self._error_queue.put(e)
4652
t = threading.Thread(target=wrapper, args=args, kwargs=kwargs, daemon=daemon)
4753
t.start()

0 commit comments

Comments
 (0)