Skip to content

Commit a46d4a2

Browse files
committed
wip
1 parent 33773ca commit a46d4a2

1 file changed

Lines changed: 19 additions & 9 deletions

File tree

Lib/concurrent/futures/_base.py

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
__author__ = 'Brian Quinlan (brian@sweetapp.com)'
55

66
import collections
7+
from concurrent import futures
78
import logging
89
import threading
910
import time
@@ -599,12 +600,21 @@ def map(self, fn, *iterables, timeout=None, chunksize=1, buffersize=None, as_com
599600
end_time = timeout + time.monotonic()
600601

601602
zipped_iterables = zip(*iterables)
602-
if buffersize:
603-
fs = collections.deque(
604-
self.submit(fn, *args) for args in islice(zipped_iterables, buffersize)
605-
)
603+
futures_gen = (self.submit(fn, *args) for args in islice(zipped_iterables, buffersize))
604+
605+
if as_completed:
606+
fs = set(futures_gen)
607+
add_to_buffer = fs.add
608+
remove_from_buffer = fs.remove
606609
else:
607-
fs = [self.submit(fn, *args) for args in zipped_iterables]
610+
if buffersize:
611+
fs = collections.deque(futures_gen)
612+
else:
613+
fs = list(futures_gen)
614+
# reverse so that the next (FIFO) future is on the right
615+
fs.reverse()
616+
add_to_buffer = fs.append
617+
remove_from_buffer = fs.pop
608618

609619
# Use a weak reference to ensure that the executor can be garbage
610620
# collected independently of the result_iterator closure.
@@ -614,12 +624,12 @@ def map(self, fn, *iterables, timeout=None, chunksize=1, buffersize=None, as_com
614624
# before the first iterator value is required.
615625
def result_iterator():
616626
try:
617-
# reverse so that the next (FIFO) future is on the right
618-
fs.reverse()
619627
# careful not to keep references to futures or results
620628
while fs:
621629
# wait for the next result
622-
if timeout is None:
630+
if as_completed:
631+
next(futures.as_completed(fs, end_time - time.monotonic() if timeout else None))
632+
elif timeout is None:
623633
fs[-1].result()
624634
else:
625635
fs[-1].result(end_time - time.monotonic())
@@ -630,7 +640,7 @@ def result_iterator():
630640
and (executor := executor_weakref())
631641
and (args := next(zipped_iterables, None))
632642
):
633-
fs.appendleft(executor.submit(fn, *args))
643+
add_to_buffer(executor.submit(fn, *args))
634644

635645
# yield the awaited result
636646
yield fs.pop()._result

0 commit comments

Comments
 (0)