- 
          
- 
                Notifications
    You must be signed in to change notification settings 
- Fork 33.2k
          gh-124309: Modernize the staggered_race implementation to support eager task factories
          #124390
        
          New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
  
    gh-124309: Modernize the staggered_race implementation to support eager task factories
  
  #124390
              Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're using private methods of TaskGroup and starting tasks on the loop rather than the TaskGroup
| I think I'm just going to refactor this to not use  | 
| I think it's worth persevering with TaskGroup, you just need to write it without using add_done_callback or private attributes | 
| I'll try it, but I'm worried that it isn't possible when considering an eager task factory. The previous implementation used a variation of a task group (a list containing tasks, since it predated  While we're here,  | 
| A demo of what I mean wrt TaskGroup: """Support for running coroutines in parallel with staggered start times."""
__all__ = 'staggered_race',
from . import locks
from . import tasks
from . import taskgroups
async def staggered_race(coro_fns, delay, *, loop=None):
    """Run coroutines with staggered start times and take the first to finish.
    This method takes an iterable of coroutine functions. The first one is
    started immediately. From then on, whenever the immediately preceding one
    fails (raises an exception), or when *delay* seconds has passed, the next
    coroutine is started. This continues until one of the coroutines complete
    successfully, in which case all others are cancelled, or until all
    coroutines fail.
    The coroutines provided should be well-behaved in the following way:
    * They should only ``return`` if completed successfully.
    * They should always raise an exception if they did not complete
      successfully. In particular, if they handle cancellation, they should
      probably reraise, like this::
        try:
            # do work
        except asyncio.CancelledError:
            # undo partially completed work
            raise
    Args:
        coro_fns: an iterable of coroutine functions, i.e. callables that
            return a coroutine object when called. Use ``functools.partial`` or
            lambdas to pass arguments.
        delay: amount of time, in seconds, between starting coroutines. If
            ``None``, the coroutines will run sequentially.
        loop: the event loop to use.
    Returns:
        tuple *(winner_result, winner_index, exceptions)* where
        - *winner_result*: the result of the winning coroutine, or ``None``
          if no coroutines won.
        - *winner_index*: the index of the winning coroutine in
          ``coro_fns``, or ``None`` if no coroutines won. If the winning
          coroutine may return None on success, *winner_index* can be used
          to definitively determine whether any coroutine won.
        - *exceptions*: list of exceptions returned by the coroutines.
          ``len(exceptions)`` is equal to the number of coroutines actually
          started, and the order is the same as in ``coro_fns``. The winning
          coroutine's entry is ``None``.
    """
    # TODO: when we have aiter() and anext(), allow async iterables in coro_fns.
    winner_result = None
    winner_index = None
    exceptions = []
    class _Done(Exception):
        pass
    async def run_one_coro(this_index, coro_fn, this_failed):
        try:
            result = await coro_fn()
        except (SystemExit, KeyboardInterrupt):
            raise
        except BaseException as e:
            exceptions[this_index] = e
            this_failed.set()  # Kickstart the next coroutine
        else:
            # Store winner's results
            nonlocal winner_index, winner_result
            # There could be more than one winner
            winner_index = this_index
            winner_result = result
            raise _Done
    try:
        async with taskgroups.TaskGroup() as tg:
            for this_index, coro_fn in enumerate(coro_fns):
                this_failed = locks.Event()
                exceptions.append(None)
                tg.create_task(run_one_coro(this_index, coro_fn, this_failed))
                try:
                    await tasks.wait_for(this_failed.wait(), delay)
                except TimeoutError:
                    pass
    except* _Done:
        pass
    return winner_result, winner_index, exceptions | 
Co-authored-by: Thomas Grainger <[email protected]>
        
          
                Misc/NEWS.d/next/Library/2024-09-23-18-18-23.gh-issue-124309.iFcarA.rst
              
                Outdated
          
            Show resolved
            Hide resolved
        
      Co-authored-by: Jelle Zijlstra <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @ZeroIntensity.
Co-authored-by: Carol Willing <[email protected]>
| Thanks @ZeroIntensity for the PR, and @kumaraditya303 for merging it 🌮🎉.. I'm working now to backport this PR to: 3.12, 3.13. | 
…port eager task factories (pythonGH-124390) (cherry picked from commit de929f3) Co-authored-by: Peter Bierma <[email protected]> Co-authored-by: Thomas Grainger <[email protected]> Co-authored-by: Jelle Zijlstra <[email protected]> Co-authored-by: Carol Willing <[email protected]> Co-authored-by: Kumar Aditya <[email protected]>
| Sorry, @ZeroIntensity and @kumaraditya303, I could not cleanly backport this to   | 
| GH-124573 is a backport of this pull request to the 3.13 branch. | 
| GH-124574 is a backport of this pull request to the 3.12 branch. | 
…pport e… (#124574) gh-124309: Modernize the `staggered_race` implementation to support eager task factories (#124390) Co-authored-by: Thomas Grainger <[email protected]> Co-authored-by: Jelle Zijlstra <[email protected]> Co-authored-by: Carol Willing <[email protected]> Co-authored-by: Kumar Aditya <[email protected]> (cherry picked from commit de929f3) Co-authored-by: Peter Bierma <[email protected]>
…n to support eager task factories (python#124390)" This reverts commit de929f3.
…wnstream (pythonGH-124810) * Revert "pythonGH-124639: add back loop param to staggered_race (pythonGH-124700)" This reverts commit e0a41a5. * Revert "pythongh-124309: Modernize the `staggered_race` implementation to support eager task factories (pythonGH-124390)" This reverts commit de929f3. (cherry picked from commit 133e929) Co-authored-by: Peter Bierma <[email protected]>
…ownstream (GH-124810) (#124817) gh-124309: Revert eager task factory fix to prevent breaking downstream (GH-124810) * Revert "GH-124639: add back loop param to staggered_race (GH-124700)" This reverts commit e0a41a5. * Revert "gh-124309: Modernize the `staggered_race` implementation to support eager task factories (GH-124390)" This reverts commit de929f3. (cherry picked from commit 133e929) Co-authored-by: Peter Bierma <[email protected]>
Uh oh!
There was an error while loading. Please reload this page.