Skip to content

Commit 41fdf5f

Browse files
committed
add support for 3.10
1 parent 5e53d31 commit 41fdf5f

File tree

5 files changed

+253
-6
lines changed

5 files changed

+253
-6
lines changed

.coveragerc

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,4 +3,7 @@ exclude_also =
33
# pragma: no cover
44
if TYPE_CHECKING:
55
if t.TYPE_CHECKING:
6-
raise NotImplementedError
6+
raise NotImplementedError
7+
8+
[run]
9+
omit = **/pyper/_core/util/task_group.py

pyproject.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ name = "python-pyper"
99
dynamic = ["version"]
1010
description = "Concurrent Python made simple"
1111
readme = "README.md"
12-
requires-python = ">=3.11"
12+
requires-python = ">=3.10"
1313
authors = [
1414
{ name = "Richard Zhu", email = "[email protected]" },
1515
]
@@ -23,6 +23,7 @@ classifiers = [
2323
"Framework :: AsyncIO",
2424
"License :: OSI Approved :: MIT License",
2525
"Programming Language :: Python :: 3 :: Only",
26+
"Programming Language :: Python :: 3.10",
2627
"Programming Language :: Python :: 3.11",
2728
"Programming Language :: Python :: 3.12",
2829
"Programming Language :: Python :: 3.13"

src/pyper/_core/async_helper/output.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,18 @@
11
from __future__ import annotations
22

33
import asyncio
4+
import sys
45
from typing import TYPE_CHECKING
56

67
from .stage import AsyncProducer, AsyncProducerConsumer
78
from ..util.sentinel import StopSentinel
89
from ..util.thread_pool import ThreadPool
910

11+
if sys.version_info < (3, 11): # pragma: no cover
12+
from ..util.task_group import TaskGroup, ExceptionGroup
13+
else:
14+
from asyncio import TaskGroup
15+
1016
if TYPE_CHECKING:
1117
from ..pipeline import AsyncPipeline
1218

@@ -15,7 +21,7 @@ class AsyncPipelineOutput:
1521
def __init__(self, pipeline: AsyncPipeline):
1622
self.pipeline = pipeline
1723

18-
def _get_q_out(self, tg: asyncio.TaskGroup, tp: ThreadPool ,*args, **kwargs) -> asyncio.Queue:
24+
def _get_q_out(self, tg: TaskGroup, tp: ThreadPool ,*args, **kwargs) -> asyncio.Queue:
1925
"""Feed forward each stage to the next, returning the output queue of the final stage."""
2026
q_out = None
2127
for task, next_task in zip(self.pipeline.tasks, self.pipeline.tasks[1:] + [None]):
@@ -36,7 +42,7 @@ async def __call__(self, *args, **kwargs):
3642
# We unify async and thread-based concurrency by
3743
# 1. using TaskGroup to spin up asynchronous tasks
3844
# 2. using ThreadPool to spin up synchronous tasks
39-
async with asyncio.TaskGroup() as tg, ThreadPool() as tp:
45+
async with TaskGroup() as tg, ThreadPool() as tp:
4046
q_out = self._get_q_out(tg, tp, *args, **kwargs)
4147
while (data := await q_out.get()) is not StopSentinel:
4248
yield data

src/pyper/_core/async_helper/stage.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,25 @@
11
from __future__ import annotations
22

33
import asyncio
4+
import sys
45
from typing import TYPE_CHECKING
56

67
from .queue_io import AsyncDequeue, AsyncEnqueue
78
from ..util.asynchronize import ascynchronize
89
from ..util.sentinel import StopSentinel
910

11+
if sys.version_info < (3, 11): # pragma: no cover
12+
from ..util.task_group import TaskGroup
13+
else:
14+
from asyncio import TaskGroup
15+
1016
if TYPE_CHECKING:
1117
from ..util.thread_pool import ThreadPool
1218
from ..task import Task
1319

1420

1521
class AsyncProducer:
16-
def __init__(self, task: Task, tg: asyncio.TaskGroup, tp: ThreadPool, n_consumers: int):
22+
def __init__(self, task: Task, tg: TaskGroup, tp: ThreadPool, n_consumers: int):
1723
self.task = ascynchronize(task, tp)
1824
if task.concurrency > 1:
1925
raise RuntimeError(f"The first task in a pipeline ({task.func.__qualname__}) cannot have concurrency greater than 1")
@@ -36,7 +42,7 @@ def start(self, *args, **kwargs):
3642

3743

3844
class AsyncProducerConsumer:
39-
def __init__(self, q_in: asyncio.Queue, task: Task, tg: asyncio.TaskGroup, tp: ThreadPool, n_consumers: int):
45+
def __init__(self, q_in: asyncio.Queue, task: Task, tg: TaskGroup, tp: ThreadPool, n_consumers: int):
4046
self.q_in = q_in
4147
self.task = ascynchronize(task, tp)
4248
self.tg = tg

src/pyper/_core/util/task_group.py

Lines changed: 231 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,231 @@
1+
"""Fallback implementation of TaskGroup and ExceptionGroup for < 3.11"""
2+
from __future__ import annotations
3+
4+
import asyncio
5+
from typing import List
6+
7+
8+
class TaskGroup:
9+
"""Asynchronous context manager for managing groups of tasks.
10+
11+
Example use:
12+
13+
async with asyncio.TaskGroup() as group:
14+
task1 = group.create_task(some_coroutine(...))
15+
task2 = group.create_task(other_coroutine(...))
16+
print("Both tasks have completed now.")
17+
18+
All tasks are awaited when the context manager exits.
19+
20+
Any exceptions other than `asyncio.CancelledError` raised within
21+
a task will cancel all remaining tasks and wait for them to exit.
22+
The exceptions are then combined and raised as an `ExceptionGroup`.
23+
"""
24+
def __init__(self):
25+
self._entered = False
26+
self._exiting = False
27+
self._aborting = False
28+
self._loop = None
29+
self._parent_task = None
30+
self._parent_cancel_requested = False
31+
self._tasks = set()
32+
self._errors = []
33+
self._base_error = None
34+
self._on_completed_fut = None
35+
36+
def __repr__(self):
37+
info = ['']
38+
if self._tasks:
39+
info.append(f"tasks={len(self._tasks)}")
40+
if self._errors:
41+
info.append(f"errors={len(self._errors)}")
42+
if self._aborting:
43+
info.append("cancelling")
44+
elif self._entered:
45+
info.append("entered")
46+
47+
info_str = ' '.join(info)
48+
return f"<TaskGroup{info_str}>"
49+
50+
async def __aenter__(self):
51+
if self._entered:
52+
raise RuntimeError(
53+
f"TaskGroup {self!r} has already been entered")
54+
if self._loop is None:
55+
self._loop = asyncio.get_running_loop()
56+
self._parent_task = asyncio.tasks.current_task(self._loop)
57+
if self._parent_task is None:
58+
raise RuntimeError(f"TaskGroup {self!r} cannot determine the parent task")
59+
self._entered = True
60+
61+
return self
62+
63+
async def __aexit__(self, et, exc, tb):
64+
self._exiting = True
65+
66+
if (exc is not None and
67+
self._is_base_error(exc) and
68+
self._base_error is None):
69+
self._base_error = exc
70+
71+
propagate_cancellation_error = \
72+
exc if et is asyncio.CancelledError else None
73+
# if self._parent_cancel_requested:
74+
# if self._parent_task.uncancel() == 0:
75+
# propagate_cancellation_error = None
76+
77+
if et is not None:
78+
if not self._aborting:
79+
# Our parent task is being cancelled:
80+
#
81+
# async with TaskGroup() as g:
82+
# g.create_task(...)
83+
# await ... # <- CancelledError
84+
#
85+
# or there's an exception in "async with":
86+
#
87+
# async with TaskGroup() as g:
88+
# g.create_task(...)
89+
# 1 / 0
90+
#
91+
self._abort()
92+
93+
# We use while-loop here because "self._on_completed_fut"
94+
# can be cancelled multiple times if our parent task
95+
# is being cancelled repeatedly (or even once, when
96+
# our own cancellation is already in progress)
97+
while self._tasks:
98+
if self._on_completed_fut is None:
99+
self._on_completed_fut = self._loop.create_future()
100+
101+
try:
102+
await self._on_completed_fut
103+
except asyncio.CancelledError as ex:
104+
if not self._aborting:
105+
# Our parent task is being cancelled:
106+
#
107+
# async def wrapper():
108+
# async with TaskGroup() as g:
109+
# g.create_task(foo)
110+
#
111+
# "wrapper" is being cancelled while "foo" is
112+
# still running.
113+
propagate_cancellation_error = ex
114+
self._abort()
115+
116+
self._on_completed_fut = None
117+
118+
assert not self._tasks
119+
120+
if self._base_error is not None:
121+
raise self._base_error
122+
123+
# Propagate CancelledError if there is one, except if there
124+
# are other errors -- those have priority.
125+
if propagate_cancellation_error and not self._errors:
126+
raise propagate_cancellation_error
127+
128+
if et is not None and et is not asyncio.CancelledError:
129+
self._errors.append(exc)
130+
131+
if self._errors:
132+
# Exceptions are heavy objects that can have object
133+
# cycles (bad for GC); let's not keep a reference to
134+
# a bunch of them.
135+
try:
136+
me = ExceptionGroup("unhandled errors in a TaskGroup", self._errors)
137+
raise me from None
138+
finally:
139+
self._errors = None
140+
141+
def create_task(self, coro, *, name=None, context=None):
142+
"""Create a new task in this group and return it.
143+
144+
Similar to `asyncio.create_task`.
145+
"""
146+
if not self._entered:
147+
raise RuntimeError(f"TaskGroup {self!r} has not been entered")
148+
if self._exiting and not self._tasks:
149+
raise RuntimeError(f"TaskGroup {self!r} is finished")
150+
if self._aborting:
151+
raise RuntimeError(f"TaskGroup {self!r} is shutting down")
152+
if context is None:
153+
task = self._loop.create_task(coro)
154+
else:
155+
task = self._loop.create_task(coro, context=context)
156+
asyncio.tasks._set_task_name(task, name)
157+
if task.done():
158+
self._on_task_done(task)
159+
else:
160+
self._tasks.add(task)
161+
task.add_done_callback(self._on_task_done)
162+
return task
163+
164+
def _is_base_error(self, exc: BaseException) -> bool:
165+
assert isinstance(exc, BaseException)
166+
return isinstance(exc, (SystemExit, KeyboardInterrupt))
167+
168+
def _abort(self):
169+
self._aborting = True
170+
171+
for t in self._tasks:
172+
if not t.done():
173+
t.cancel()
174+
175+
def _on_task_done(self, task):
176+
self._tasks.discard(task)
177+
178+
if self._on_completed_fut is not None and not self._tasks:
179+
if not self._on_completed_fut.done():
180+
self._on_completed_fut.set_result(True)
181+
182+
if task.cancelled():
183+
return
184+
185+
exc = task.exception()
186+
if exc is None:
187+
return
188+
189+
self._errors.append(exc)
190+
if self._is_base_error(exc) and self._base_error is None:
191+
self._base_error = exc
192+
193+
if self._parent_task.done():
194+
# Not sure if this case is possible, but we want to handle
195+
# it anyways.
196+
self._loop.call_exception_handler({
197+
"message": f"Task {task!r} has errored out but its parent "
198+
f"task {self._parent_task} is already complete",
199+
"exception": exc,
200+
"task": task,
201+
})
202+
return
203+
204+
if not self._aborting and not self._parent_cancel_requested:
205+
# If parent task *is not* being cancelled, it means that we want
206+
# to manually cancel it to abort whatever is being run right now
207+
# in the TaskGroup. But we want to mark parent task as
208+
# "not cancelled" later in __aexit__. Example situation that
209+
# we need to handle:
210+
#
211+
# async def foo():
212+
# try:
213+
# async with TaskGroup() as g:
214+
# g.create_task(crash_soon())
215+
# await something # <- this needs to be canceled
216+
# # by the TaskGroup, e.g.
217+
# # foo() needs to be cancelled
218+
# except Exception:
219+
# # Ignore any exceptions raised in the TaskGroup
220+
# pass
221+
# await something_else # this line has to be called
222+
# # after TaskGroup is finished.
223+
self._abort()
224+
self._parent_cancel_requested = True
225+
self._parent_task.cancel()
226+
227+
228+
class ExceptionGroup(Exception):
229+
def __init__(self, message: str, exceptions: List[Exception]):
230+
self.message = message
231+
self.exceptions = exceptions

0 commit comments

Comments
 (0)