Skip to content
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Lib/asyncio/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from .coroutines import *
from .events import *
from .exceptions import *
from .executor import *
from .futures import *
from .graph import *
from .locks import *
Expand All @@ -27,6 +28,7 @@
coroutines.__all__ +
events.__all__ +
exceptions.__all__ +
executor.__all__ +
futures.__all__ +
graph.__all__ +
locks.__all__ +
Expand Down
226 changes: 226 additions & 0 deletions Lib/asyncio/executor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,226 @@
import time
from inspect import isawaitable
from typing import (Any, AsyncIterable, Awaitable, Iterable, NamedTuple,
Optional, Protocol, cast)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AsyncIterable, Iterable, Awaitable live in collections.abc now, Optional[T] should be replaced with T | None.


from .exceptions import CancelledError
from .futures import Future
from .locks import Event
from .queues import Queue, QueueShutDown
from .tasks import FIRST_COMPLETED, Task, create_task, gather, wait, wait_for

__all__ = (
"Executor",
)


class _WorkFunction[R](Protocol):
def __call__(self, *args, **kwargs) -> R | Awaitable[R]:
...
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
class _WorkFunction[R](Protocol):
def __call__(self, *args, **kwargs) -> R | Awaitable[R]:
...
class _WorkFunction[R, **P](Protocol):
def __call__(self, *args: P.args, **kwargs: P.kwargs) -> R | Awaitable[R]:
...

If the module has type hints -- let's use it constantly everywhere



class _WorkItem[R](NamedTuple):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use a dataclass instead of named tuple.

fn: _WorkFunction[R]
args: tuple[Any, ...]
kwargs: dict[Any, Any]
future: Future[R]


async def _run_work_item[R](work_item: _WorkItem[R]) -> R:
result = work_item.fn(*work_item.args, **work_item.kwargs)
if isawaitable(result):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm sorry, I disagree with accepting two function colors.
In my mind, async executor should work with async functions only.
Processing sync functions without blocking calls inside don't make sense; they could be called without an executor.
Handling sync functions with blocking calls requires a to_thread() wrapper; it is not a subject of the proposed class.

result = cast(R, await result)
return result


async def _worker[R](input_queue: Queue[Optional[_WorkItem[R]]]) -> None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please make the function a private method of Executor.

while True:
work_item = await input_queue.get()
try:
if work_item is None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How could it be None?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

None was used as a terminator, however, I see now that shutting the queue down works too. I've refactored it, thank you for pointing this out.

break

item_future = work_item.future
if item_future.cancelled():
continue

try:
task = create_task(_run_work_item(work_item))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know if the worker should create a task per processed item.
It looks like an overhead now.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand, however, I think create_task is needed if I want to race fn against possible cancellation of the work item, and cancel fn's execution if that happens.

If I were to remove create_task and just await fn(...), it will prevent immediate cancellation of long-running work items, which could be undesirable for the users, e.g. if it would occur while leaving an Executor's context due to a raised exception.

Do you know of a way to achieve this behavior without using create_task?

await wait([task, item_future], return_when=FIRST_COMPLETED)
if item_future.cancelled():
task.cancel()
continue
result = task.result()
except BaseException as exception:
if not item_future.cancelled():
item_future.set_exception(exception)
continue
if not item_future.cancelled():
item_future.set_result(result)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
continue
if not item_future.cancelled():
item_future.set_result(result)
else:
if not item_future.cancelled():
item_future.set_result(result)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After some tinkering, I managed to made the code more compact, but the new version cannot use try-except-else if it is to remain compact.

Please review the new version and consider which way you would prefer.

For easier comparison, here's a try-except-else version:

    async def _worker(self) -> None:
        while True:
            try:
                work_item = await self._input_queue.get()
                item_future = work_item.future

                try:
                    if item_future.cancelled():
                        continue

                    task = create_task(work_item.fn(
                        *work_item.args,
                        **work_item.kwargs,
                    ))
                    await wait([task, item_future], return_when=FIRST_COMPLETED)
                    if item_future.cancelled():
                        task.cancel()
                        continue
                    result = task.result()
                except BaseException as exception:
                    if not item_future.cancelled():
                        item_future.set_exception(exception)
                else:
                    if not item_future.cancelled():
                        item_future.set_result(result)
                finally:
                    self._input_queue.task_done()
            except QueueShutDown:  # The executor has been shut down.
                break

finally:
input_queue.task_done()


async def _azip(*iterables: Iterable | AsyncIterable) -> AsyncIterable[tuple]:
def _as_async_iterable[T](
iterable: Iterable[T] | AsyncIterable[T],
) -> AsyncIterable[T]:
async def _to_async_iterable(
iterable: Iterable[T],
) -> AsyncIterable[T]:
for item in iterable:
yield item

if isinstance(iterable, AsyncIterable):
return iterable
return _to_async_iterable(iterable)

async_iterables = [_as_async_iterable(iterable) for iterable in iterables]
iterators = [aiter(async_iterable) for async_iterable in async_iterables]
while True:
try:
items = await gather(*[anext(iterator) for iterator in iterators])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
items = await gather(*[anext(iterator) for iterator in iterators])
items =[anext(iterator) for iterator in iterators]

await gather() is not for free; it creates a task per item.
Let's use plain list comprehension here.
If a user wants concurrency, he could create a task explicitly providing arguments.

yield tuple(items)
except StopAsyncIteration:
break


async def _consume_cancelled_future(future):
try:
await future
except CancelledError:
pass


class Executor[R]:
_input_queue: Queue[Optional[_WorkItem[R]]]
_workers: list[Task]
_feeders: set[Task]
_shutdown: bool = False

def __init__(self, max_workers: int) -> None:
if max_workers <= 0:
raise ValueError("max_workers must be greater than 0")

self._input_queue = Queue(max_workers)
self._workers = [
create_task(_worker(self._input_queue))
for _ in range(max_workers)
]
self._feeders = set()

async def submit(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think about context vars?
Now, workers work with a context implicitly copied during the Executor creation.
How could I submit a function with the context that I have at the moment of submit() call?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While I have no strong opinions about context vars, I do see they can be useful.

I suggest adding a context: Context | None = None parameter to submit() and map() that will be set to contextvars.copy_context() if None, and propagating the context in _WorkItem to be used inside a worker when executing fn.

self,
fn: _WorkFunction[R],
/,
*args,
**kwargs,
) -> Future[R]:
if self._shutdown:
raise RuntimeError("Cannot schedule new tasks after shutdown")

future = Future()
work_item = _WorkItem(fn, args, kwargs, future)
await self._input_queue.put(work_item)

return future

async def map(
self,
fn: _WorkFunction[R],
*iterables: Iterable | AsyncIterable,
timeout: Optional[float] = None,
chunksize: int = 1,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This argument is never used; please drop it.
asyncio.Executor is not 100% compatible with concurrent.futures.Executor anyway.

) -> AsyncIterable[R]:
if self._shutdown:
raise RuntimeError("Cannot schedule new tasks after shutdown")

end_time = None if timeout is None else time.monotonic() + timeout

inputs_stream = _azip(*iterables)
submitted_tasks = Queue[Optional[Future]]()
tasks_in_flight_limit = len(self._workers) + self._input_queue.maxsize
resume_feeding = Event()

async def _feeder() -> None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the function could be extracted into a method.

try:
async for args in inputs_stream:
if self._shutdown:
break
future = await self.submit(fn, *args)
await submitted_tasks.put(future)

if submitted_tasks.qsize() >= tasks_in_flight_limit:
await resume_feeding.wait()
resume_feeding.clear()
except QueueShutDown:
# The executor was shut down while feeder waited to submit a
# task.
pass
finally:
await submitted_tasks.put(None)
submitted_tasks.shutdown()

feeder_task = create_task(_feeder())
self._feeders.add(feeder_task)
feeder_task.add_done_callback(self._feeders.remove)

try:
while True:
task = await submitted_tasks.get()
if task is None:
break

remaining_time = (
None if end_time is None else end_time - time.monotonic()
)
if remaining_time is not None and remaining_time <= 0:
raise TimeoutError()
result = await wait_for(task, remaining_time)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please replace wait_for() with async with timeout().

yield result
resume_feeding.set()
finally:
feeder_task.cancel()
await _consume_cancelled_future(feeder_task)

finalization_tasks = []
while submitted_tasks.qsize() > 0:
task = await submitted_tasks.get()
if task is not None:
task.cancel()
finalization_tasks.append(create_task(
_consume_cancelled_future(task),
))
if finalization_tasks:
await gather(*finalization_tasks)

async def shutdown(self, wait=True, *, cancel_futures=False) -> None:
if self._shutdown:
return
self._shutdown = True

if cancel_futures:
finalization_tasks = []
while not self._input_queue.empty():
work_item = self._input_queue.get_nowait()
if work_item is not None:
work_item.future.cancel()
finalization_tasks.append(create_task(
_consume_cancelled_future(work_item.future),
))
if finalization_tasks:
await gather(*finalization_tasks)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the code waits for all futures cancellations, there is no need to create a task per future.
for loop with waiting futures one by one could be enough; it doesn't wait the last future longer than the current code.


for _ in self._workers:
await self._input_queue.put(None)
self._input_queue.shutdown()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could these three lines be reduced to bare self._input_queue.shutdown()?


if wait:
await gather(*self._workers)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could a worker raise an exception? Should .shutdown() propagate the exception to a caller?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unless there is a bug in the implementation, a worker should never raise an exception.


async def __aenter__(self) -> "Executor":
return self

async def __aexit__(self, exc_type, exc_val, exc_tb) -> bool:
await self.shutdown(wait=True)
return False
Loading
Loading