Skip to content

Commit 97f6b17

Browse files
committed
concurrent.futures.Executor.map: yield buffered results and raise after an executor shutdown
1 parent a003923 commit 97f6b17

2 files changed

Lines changed: 61 additions & 17 deletions

File tree

Lib/concurrent/futures/_base.py

Lines changed: 19 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -305,10 +305,12 @@ def wait(fs, timeout=None, return_when=ALL_COMPLETED):
305305
return DoneAndNotDoneFutures(done, fs - done)
306306

307307

308-
def _result_or_cancel(fut, timeout=None):
308+
def _result_or_cancel(fut, end_time=None):
309309
try:
310310
try:
311-
return fut.result(timeout)
311+
if end_time is None:
312+
return fut.result()
313+
return fut.result(end_time - time.monotonic())
312314
finally:
313315
fut.cancel()
314316
finally:
@@ -633,20 +635,20 @@ def map(self, fn, *iterables, timeout=None, chunksize=1, buffersize=None):
633635
if buffersize is not None and buffersize < 1:
634636
raise ValueError("buffersize must be None or > 0")
635637

636-
if timeout is not None:
638+
if timeout is None:
639+
end_time = None
640+
else:
637641
end_time = timeout + time.monotonic()
638642

639643
zipped_iterables = zip(*iterables)
640644
if buffersize:
641645
fs = collections.deque(
642646
self.submit(fn, *args) for args in islice(zipped_iterables, buffersize)
643647
)
648+
executor = self
644649
else:
645650
fs = [self.submit(fn, *args) for args in zipped_iterables]
646-
647-
# Use a weak reference to ensure that the executor can be garbage
648-
# collected independently of the result_iterator closure.
649-
executor_weakref = weakref.ref(self)
651+
executor = None
650652

651653
# Yield must be hidden in closure so that the futures are submitted
652654
# before the first iterator value is required.
@@ -655,17 +657,17 @@ def result_iterator():
655657
# reverse to keep finishing order
656658
fs.reverse()
657659
while fs:
658-
if (
659-
buffersize
660-
and (executor := executor_weakref())
661-
and (args := next(zipped_iterables, None))
662-
):
663-
fs.appendleft(executor.submit(fn, *args))
660+
if buffersize and (args := next(zipped_iterables, None)):
661+
try:
662+
fs.appendleft(executor.submit(fn, *args))
663+
except RuntimeError:
664+
# 'cannot schedule new futures after shutdown' error
665+
# yield from buffer:
666+
while fs:
667+
yield _result_or_cancel(fs.pop(), end_time)
668+
raise
664669
# Careful not to keep a reference to the popped future
665-
if timeout is None:
666-
yield _result_or_cancel(fs.pop())
667-
else:
668-
yield _result_or_cancel(fs.pop(), end_time - time.monotonic())
670+
yield _result_or_cancel(fs.pop(), end_time)
669671
finally:
670672
for future in fs:
671673
future.cancel()

Lib/test/test_concurrent_futures/executor.py

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,48 @@ def test_map_buffersize_on_multiple_infinite_iterables(self):
148148
self.assertEqual(next(res, None), 2)
149149
self.assertEqual(next(res, None), 4)
150150

151+
@warnings_helper.ignore_fork_in_thread_deprecation_warnings()
152+
def test_map_buffersize_shutdown_yield_buffer_and_raise(self):
153+
ints = range(4)
154+
fn = str
155+
156+
def test(buffersize, nexts_before_shutdown, wait):
157+
expected_results = map(fn, ints)
158+
159+
executor = self.executor_type(max_workers=1)
160+
results = executor.map(fn, ints, buffersize=buffersize)
161+
# iterate before shutdown
162+
for _ in range(nexts_before_shutdown):
163+
self.assertEqual(next(results), next(expected_results))
164+
time.sleep(0.1)
165+
executor.shutdown(wait=wait)
166+
# iterate over buffered results after shutdown
167+
for _ in range(min(buffersize, len(ints) - nexts_before_shutdown)):
168+
self.assertEqual(next(results), next(expected_results))
169+
170+
if buffersize + nexts_before_shutdown >= len(ints):
171+
# case where all tasks were submitted before shutdown
172+
with self.assertRaises(StopIteration):
173+
next(results)
174+
else:
175+
# buffer exhausted, the error is raised next
176+
with self.assertRaisesRegex(
177+
RuntimeError,
178+
"cannot schedule new futures after shutdown",
179+
):
180+
next(results)
181+
with self.assertRaises(StopIteration):
182+
next(results)
183+
184+
for buffersize in [3]:
185+
for nexts_before_shutdown in range(buffersize):
186+
for wait in (True, False):
187+
with self.subTest(
188+
buffersize=buffersize,
189+
nexts_before_shutdown=nexts_before_shutdown,
190+
):
191+
test(buffersize, nexts_before_shutdown, wait)
192+
151193
def test_map_buffersize_on_empty_iterable(self):
152194
res = self.executor.map(str, [], buffersize=2)
153195
self.assertIsNone(next(res, None))

0 commit comments

Comments
 (0)