Skip to content

Commit 886a8a2

Browse files
committed
concurrent.futures.Executor.map: raise after executor shutdown
1 parent da3fea3 commit 886a8a2

2 files changed

Lines changed: 29 additions & 16 deletions

File tree

Lib/concurrent/futures/_base.py

Lines changed: 11 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -305,10 +305,12 @@ def wait(fs, timeout=None, return_when=ALL_COMPLETED):
305305
return DoneAndNotDoneFutures(done, fs - done)
306306

307307

308-
def _result_or_cancel(fut, timeout=None):
308+
def _result_or_cancel(fut, end_time=None):
309309
try:
310310
try:
311-
return fut.result(timeout)
311+
if end_time is None:
312+
return fut.result()
313+
return fut.result(end_time - time.monotonic())
312314
finally:
313315
fut.cancel()
314316
finally:
@@ -606,20 +608,20 @@ def map(self, fn, *iterables, timeout=None, chunksize=1, buffersize=None):
606608
if buffersize is not None and buffersize < 1:
607609
raise ValueError("buffersize must be None or > 0")
608610

609-
if timeout is not None:
611+
if timeout is None:
612+
end_time = None
613+
else:
610614
end_time = timeout + time.monotonic()
611615

612616
zipped_iterables = zip(*iterables)
613617
if buffersize:
614618
fs = collections.deque(
615619
self.submit(fn, *args) for args in islice(zipped_iterables, buffersize)
616620
)
621+
executor = self
617622
else:
618623
fs = [self.submit(fn, *args) for args in zipped_iterables]
619-
620-
# Use a weak reference to ensure that the executor can be garbage
621-
# collected independently of the result_iterator closure.
622-
executor_weakref = weakref.ref(self)
624+
executor = None
623625

624626
# Yield must be hidden in closure so that the futures are submitted
625627
# before the first iterator value is required.
@@ -628,17 +630,10 @@ def result_iterator():
628630
# reverse to keep finishing order
629631
fs.reverse()
630632
while fs:
631-
if (
632-
buffersize
633-
and (executor := executor_weakref())
634-
and (args := next(zipped_iterables, None))
635-
):
633+
if buffersize and (args := next(zipped_iterables, None)):
636634
fs.appendleft(executor.submit(fn, *args))
637635
# Careful not to keep a reference to the popped future
638-
if timeout is None:
639-
yield _result_or_cancel(fs.pop())
640-
else:
641-
yield _result_or_cancel(fs.pop(), end_time - time.monotonic())
636+
yield _result_or_cancel(fs.pop(), end_time)
642637
finally:
643638
for future in fs:
644639
future.cancel()

Lib/test/test_concurrent_futures/executor.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,24 @@ def test_map_buffersize_on_multiple_infinite_iterables(self):
139139
self.assertEqual(next(res, None), 2)
140140
self.assertEqual(next(res, None), 4)
141141

142+
@warnings_helper.ignore_fork_in_thread_deprecation_warnings()
143+
def test_map_buffersize_raise_on_shutdown(self):
144+
ints = range(4)
145+
fn = str
146+
buffersize = 2
147+
expected_results = map(fn, ints)
148+
for nexts in [0, 1]:
149+
with self.subTest(nexts=nexts):
150+
with self.executor_type(max_workers=1) as executor:
151+
results = executor.map(fn, ints, buffersize=buffersize)
152+
for _ in range(nexts):
153+
self.assertEqual(next(results), next(expected_results))
154+
with self.assertRaisesRegex(
155+
RuntimeError,
156+
"cannot schedule new futures after shutdown",
157+
):
158+
next(results)
159+
142160
def test_map_buffersize_on_empty_iterable(self):
143161
res = self.executor.map(str, [], buffersize=2)
144162
self.assertIsNone(next(res, None))

0 commit comments

Comments
 (0)