- 
          
- 
                Notifications
    You must be signed in to change notification settings 
- Fork 33.2k
          gh-74028: concurrent.futures.Executor.map: introduce buffersize param for lazier behavior
          #125663
        
          New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
| Most changes to Python require a NEWS entry. Add one using the blurb_it web app or the blurb command-line tool. If this change has little impact on Python users, wait for a maintainer to apply the  | 
| Most changes to Python require a NEWS entry. Add one using the blurb_it web app or the blurb command-line tool. If this change has little impact on Python users, wait for a maintainer to apply the  | 
| Most changes to Python require a NEWS entry. Add one using the blurb_it web app or the blurb command-line tool. If this change has little impact on Python users, wait for a maintainer to apply the  | 
    
      
        1 similar comment
      
    
  
    | Most changes to Python require a NEWS entry. Add one using the blurb_it web app or the blurb command-line tool. If this change has little impact on Python users, wait for a maintainer to apply the  | 
6a58c7d    to
    21f7b8d      
    Compare
  
    | Most changes to Python require a NEWS entry. Add one using the blurb_it web app or the blurb command-line tool. If this change has little impact on Python users, wait for a maintainer to apply the  | 
| Most changes to Python require a NEWS entry. Add one using the blurb_it web app or the blurb command-line tool. If this change has little impact on Python users, wait for a maintainer to apply the  | 
| Thanks for the PR. First, I think this is a big behavior change for Executor. I think we need to discuss it in the https://discuss.python.org/ first. In my personal opinion, I think this is not a good choice to add the  
 | 
9eef605    to
    e5c867a      
    Compare
  
    | Hi @Zheaoli, thank you for your comment! 
 You mean big alternative behavior, right? (the default behavior when ommitting  
 Fair, I will start a thread there and ping you. 
 I'm not sure to get it, could you detail that point? 🙏🏻 
 You are completely right, makes more sense! I have fixed that (commit) | 
8bf7be7    to
    769060e      
    Compare
  
    | 
 For me, the basic  | 
| Hi @Zheaoli 
 There may be a misunderstanding here, the goal of this PR is precisely to make  I will recap the behaviors so that everybody is on the same page: built-in  | 
769060e    to
    be419ed      
    Compare
  
    | hey @rruuaanng, fyi I have applied your requested changes regarding the integration of unit tests into existing class 🙏🏻 | 
        
          
                Lib/concurrent/futures/_base.py
              
                Outdated
          
        
      | args_iter = iter(zip(*iterables)) | ||
| if buffersize: | ||
| fs = collections.deque( | ||
| self.submit(fn, *args) for args in islice(args_iter, buffersize) | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't buffersize empty? Can you introduce it? (Forgive me for not understanding it).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
absolutely np, thank you for taking the time to review my proposal. To be sure to understand the question well, what do you mean by "Isn't buffersize empty?"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @rruuaanng , I have reworked the PR's description, I hope it makes things clearer!
e28a0f0    to
    bb0e747      
    Compare
  
    | Hey @NewUserHa @AA-Turner @serhiy-storchaka, this may interest you given your recent activity on #14221 🙏🏻 | 
        
          
                Misc/NEWS.d/next/Library/2024-10-18-10-27-54.gh-issue-74028.4d4vVD.rst
              
                Outdated
          
            Show resolved
            Hide resolved
        
              
          
                Lib/concurrent/futures/_base.py
              
                Outdated
          
        
      | if ( | ||
| buffersize | ||
| and (executor := executor_weakref()) | ||
| and (args := next(args_iter, None)) | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
args may be empty, so you need to check for args is not None
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you refering to the case where one call executor.map(func) without any input iterable?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. You can't always assume that func needs an input (or do you?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you are right! But in such a case we don't enter the while fs: (fs being empty in that case), right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@picnixz I have added unit tests checking the behavior with multiple input iterables and without any input iterables.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But in such a case we don't enter the while fs.
Not necessarily. What I meant is that you call executor.map with an input iterable that yields args = () everytime.
Note that it also doesn't hurt to check is not None because it's probably slightly faster since otherwise you need to call __bool__ on the args being yielded.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So for example a call like executor.map(func, [()])? In such a call we get iterables = ([()],) and args_iter = iter(zip(*([()],))) and next(args_iter,) will be ((),) (not ()). You may have missed the ziping in your reasoning?
In term of pure readability of the code I struggle to have an opinion, do you feel that (args := next(args_iter, None)) is not None is more natural?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You may have missed the ziping in your reasoning?
I did :) Sorry, my bad!
do you feel that (args := next(args_iter, None)) is not None is more natural?
I feel it would at least help avoiding questions like mine! (and it would still be probably slightly better performance wise but this claim is just my gut feeling).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@picnixz oh yes I see... I have renamed args_iter into a more self-explanatory zipped_iterables, do you think it would be enough to avoid the confusion?
(Because I am scared that the addition of is not None may misslead some of our fellow pythonistas wondering "wait, why is this not None check necessary here, what am I missing here 🤔?")
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Personally, I like having the is not None just so that I don't have to wonder what's args_iter is precisely yielding. I can assume that it's yielding a tuple-like object, but I don't necessarily know the shape of that tuple. So is not None discriminates probable items and the sentinel value. So I'd say it's still pythonic.
Performance-wise it should be roughly the same (one checks that the tuple's size != 0 and the other just compares if it's the None singleton but both are essentially a single comparison).
Now up to you. If others didn't observe (like me) that args_iter never yields an empty tuple, then it's probably better to keep the is not None check for clarity.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Many thanks for your review @picnixz 🙏🏻 !
        
          
                Lib/concurrent/futures/_base.py
              
                Outdated
          
        
      | if ( | ||
| buffersize | ||
| and (executor := executor_weakref()) | ||
| and (args := next(args_iter, None)) | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you refering to the case where one call executor.map(func) without any input iterable?
1ccc07f    to
    5d63a05      
    Compare
  
    Co-authored-by: Bénédikt Tran <[email protected]>
| I'll rerun the CI tomorrow if it still fails, don't worry | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. 👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the new line I wanted you to add has been gobbled by another commit :(
Co-authored-by: Bénédikt Tran <[email protected]>
| I'll merge this tomorrow or on Friday (today is review's day!) | 
| Thank you for the contribution! | 
| Thank you @picnixz @gpshead @serhiy-storchaka for making this change go through! 🙏🏻 | 
…cutor.map` for lazier behavior (python#125663) `concurrent.futures.Executor.map` now supports limiting the number of submitted tasks whose results have not yet been yielded via the new `buffersize` parameter. --------- Co-authored-by: Bénédikt Tran <[email protected]>
…cutor.map` for lazier behavior (python#125663) `concurrent.futures.Executor.map` now supports limiting the number of submitted tasks whose results have not yet been yielded via the new `buffersize` parameter. --------- Co-authored-by: Bénédikt Tran <[email protected]>
| # reverse to keep finishing order | ||
| fs.reverse() | ||
| while fs: | ||
| if ( | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ebonnal I believe you got this part slightly wrong, "off-by-one". IIUC, the number of pending futures cannot be larger than buffsize. However, after the initial submission of buffsize tasks before, here in this branch you are appending an EXTRA task to the queue, and now you have buffsize + 1 tasks that have potentially not yielded yet.
Fortunately, looks like the fix is trivial: you simply have to yield first, next append to the queue:
diff --git a/Lib/concurrent/futures/_base.py b/Lib/concurrent/futures/_base.py
index d98b1ebdd58..3b9ccf4d651 100644
--- a/Lib/concurrent/futures/_base.py
+++ b/Lib/concurrent/futures/_base.py
@@ -628,17 +628,17 @@ def result_iterator():
                 # reverse to keep finishing order
                 fs.reverse()
                 while fs:
+                    # Careful not to keep a reference to the popped future
+                    if timeout is None:
+                        yield _result_or_cancel(fs.pop())
+                    else:
+                        yield _result_or_cancel(fs.pop(), end_time - time.monotonic())
                     if (
                         buffersize
                         and (executor := executor_weakref())
                         and (args := next(zipped_iterables, None))
                     ):
                         fs.appendleft(executor.submit(fn, *args))
-                    # Careful not to keep a reference to the popped future
-                    if timeout is None:
-                        yield _result_or_cancel(fs.pop())
-                    else:
-                        yield _result_or_cancel(fs.pop(), end_time - time.monotonic())
             finally:
                 for future in fs:
                     future.cancel()There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @dalcinl,
TL;DR: fyi we have #131467 that is open and tackling this "off-by-one" situation. Would be great to get your review there! 🙏🏻
I have not proposed this variation at first because I think it makes sense as an optional follow up PR given that it integrates slightly less smoothly into existing logic.
You will notice that it is not "just" moving the yield before the enqueue, let's see why on a simple scenario, explaining the 3 behaviors:
Scenario:
it = executor.map(fn, iterable, buffersize=buffersize)
# point A
next(it)
# point B
next(it)
# point C"enqueue -> wait -> yield" (current, introduced by this PR) behavior
- buffers buffersizetasks
- at # point Athere isbuffersizebuffered tasks
- 1st call to next:- enqueue a task, jumping to buffersize+1tasks in buffer
- wait for the next result to be available
- yield the next result, finally going down to buffersizetasks in buffer
 
- enqueue a task, jumping to 
- at # point Bthere is stillbuffersizebuffered tasks
- 2nd call to next: same
pro: buffersize tasks in buffer between two calls to next
con: while waiting for the next result we have buffersize+1 tasks in buffer
"wait -> yield -> enqueue" (your proposal) behavior
- buffers buffersizetasks
- at # point Athere isbuffersizebuffered tasks
- call to next:- wait for the next result to be available
- yield this result, going down to buffersize - 1tasks in buffer
 
- at # point Bthere isbuffersize - 1buffered tasks
- call to next:- enqueue a task, jumping back to buffersizetasks in buffer
- wait for the next result to be available
- yield next result, going down to buffersize - 1tasks in buffer
 
- enqueue a task, jumping back to 
- at # point Cthere is stillbuffersize-1buffered tasks
pro: never exceed buffersize
con: between two calls to next we have only buffersize - 1 tasks in buffer
"wait -> enqueue -> yield" (#131467) behavior
- buffers buffersizetasks
- at # point Athere isbuffersizebuffered tasks
- 1st call to next:- wait for next result to be available
- enqueue a task, jumping to buffersize + 1tasks in buffer
- yield already available result without needing to wait, going back to buffersizetasks in buffer
 
- at # point Bthere is stillbuffersizebuffered tasks
- 2nd call to next: same
pros:
- buffersizetasks in buffer between two calls to- next
- jumps to buffersize + 1during a call tonextbut is back instantly tobuffersizebecause the next result is already available when we enqueued the next task (the closest we can get to a yield-and-enqueue-at-the-same-time).
Let me know if it makes sense 🙏🏻
Context recap (#74028)
Let's consider that we have an input
iterableandN = len(iterable).Current$O(N)$  in space (unecessarily expensive on large iterables, completely impossible to use on infinite iterables):$N$  tasks to the $N$ ). Following calls to 
concurrent.futures.Executor.mapisThe call
results: Iterator = executor.map(func, iterable)iterates over all the elements of theiterable, submittingexecutor(futures collected into a list of sizenext(results)take the oldest future from the list (FIFO), then wait for its result and return it.Proposal: add an optional
buffersizeparamWith this proposal, the call$b$  elements of $b$  tasks to the 
results: Iterator = executor.map(func, iterable, buffersize=b)will iterate only over the firstiterable, submittingexecutor(futures stored in the bufferdeque) and then will return the results iterator.Calls to
next(results)will get the next input element fromiterableand submit a task to theexecutorfor it (enqueuing another future), then wait for the oldest future in the buffer queue to complete (FIFO), then return the result.Benefits:
buffersizethe client code takes back the control over the speed of iteration over the inputiterable: after an initial spike offuncto fill the buffer, the iteration over inputiterablewill follow the rate of the iteration over theresults(controlled by the client), which is critical whenfuncinvolves talking to services that you don't want to overload.Why a new PR
It turns out it is very similar to the initial work of @MojoVampire in #707 back in 2017 (followed up by @graingert in #18566 and @Jason-Y-Z in #114975): use a queue of fixed size to hold the not-yet-yielded future results.
In addition this PR:
buffersize=None(default)📚 Documentation preview 📚: https://cpython-previews--125663.org.readthedocs.build/