Skip to content

Commit db4d820

Browse files
committed
New cancellable asynchronous iterators
1 parent 2e80020 commit db4d820

File tree

1 file changed

+34
-6
lines changed

1 file changed

+34
-6
lines changed

p2p/cancellable.py

Lines changed: 34 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,27 +1,28 @@
11
from typing import (
2+
AsyncIterator,
23
Awaitable,
34
TypeVar,
45
)
56

67
from cancel_token import CancelToken
78

9+
TReturn = TypeVar('TReturn')
10+
811

912
class CancellableMixin:
1013
cancel_token: CancelToken = None
1114

12-
_TReturn = TypeVar('_TReturn')
13-
1415
async def wait(self,
15-
awaitable: Awaitable[_TReturn],
16+
awaitable: Awaitable[TReturn],
1617
token: CancelToken = None,
17-
timeout: float = None) -> _TReturn:
18+
timeout: float = None) -> TReturn:
1819
"""See wait_first()"""
1920
return await self.wait_first(awaitable, token=token, timeout=timeout)
2021

2122
async def wait_first(self,
22-
*awaitables: Awaitable[_TReturn],
23+
*awaitables: Awaitable[TReturn],
2324
token: CancelToken = None,
24-
timeout: float = None) -> _TReturn:
25+
timeout: float = None) -> TReturn:
2526
"""
2627
Wait for the first awaitable to complete, unless we timeout or the token chain is triggered.
2728
@@ -39,3 +40,30 @@ async def wait_first(self,
3940
else:
4041
token_chain = token.chain(self.cancel_token)
4142
return await token_chain.cancellable_wait(*awaitables, timeout=timeout)
43+
44+
async def wait_iter(
45+
self,
46+
aiterable: AsyncIterator[TReturn],
47+
token: CancelToken = None,
48+
timeout: float = None) -> AsyncIterator[TReturn]:
49+
"""
50+
Iterate through an async iterator, raising the OperationCancelled exception if the token is
51+
triggered. For example:
52+
53+
::
54+
55+
async for val in self.wait_iter(my_async_iterator()):
56+
do_stuff(val)
57+
58+
See :meth:`CancellableMixin.wait_first` for using arguments ``token`` and ``timeout``
59+
"""
60+
aiter = aiterable.__aiter__()
61+
while True:
62+
try:
63+
yield await self.wait(
64+
aiter.__anext__(),
65+
token=token,
66+
timeout=timeout,
67+
)
68+
except StopAsyncIteration:
69+
break

0 commit comments

Comments
 (0)