Skip to content

Commit d0f9c96

Browse files
authored
Merge pull request #1554 from carver/wait-iter
New cancellable asynchronous iterators
2 parents 2e80020 + 6de66c0 commit d0f9c96

File tree

2 files changed

+62
-6
lines changed

2 files changed

+62
-6
lines changed

p2p/cancellable.py

Lines changed: 34 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,27 +1,28 @@
11
from typing import (
2+
AsyncIterator,
23
Awaitable,
34
TypeVar,
45
)
56

67
from cancel_token import CancelToken
78

9+
TReturn = TypeVar('TReturn')
10+
811

912
class CancellableMixin:
1013
cancel_token: CancelToken = None
1114

12-
_TReturn = TypeVar('_TReturn')
13-
1415
async def wait(self,
15-
awaitable: Awaitable[_TReturn],
16+
awaitable: Awaitable[TReturn],
1617
token: CancelToken = None,
17-
timeout: float = None) -> _TReturn:
18+
timeout: float = None) -> TReturn:
1819
"""See wait_first()"""
1920
return await self.wait_first(awaitable, token=token, timeout=timeout)
2021

2122
async def wait_first(self,
22-
*awaitables: Awaitable[_TReturn],
23+
*awaitables: Awaitable[TReturn],
2324
token: CancelToken = None,
24-
timeout: float = None) -> _TReturn:
25+
timeout: float = None) -> TReturn:
2526
"""
2627
Wait for the first awaitable to complete, unless we timeout or the token chain is triggered.
2728
@@ -39,3 +40,30 @@ async def wait_first(self,
3940
else:
4041
token_chain = token.chain(self.cancel_token)
4142
return await token_chain.cancellable_wait(*awaitables, timeout=timeout)
43+
44+
async def wait_iter(
45+
self,
46+
aiterable: AsyncIterator[TReturn],
47+
token: CancelToken = None,
48+
timeout: float = None) -> AsyncIterator[TReturn]:
49+
"""
50+
Iterate through an async iterator, raising the OperationCancelled exception if the token is
51+
triggered. For example:
52+
53+
::
54+
55+
async for val in self.wait_iter(my_async_iterator()):
56+
do_stuff(val)
57+
58+
See :meth:`CancellableMixin.wait_first` for using arguments ``token`` and ``timeout``
59+
"""
60+
aiter = aiterable.__aiter__()
61+
while True:
62+
try:
63+
yield await self.wait(
64+
aiter.__anext__(),
65+
token=token,
66+
timeout=timeout,
67+
)
68+
except StopAsyncIteration:
69+
break

tests/p2p/test_service.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import asyncio
22

3+
from cancel_token import OperationCancelled
34
import pytest
45

56
from p2p.service import BaseService
@@ -41,6 +42,33 @@ async def test_daemon_exit_causes_parent_cancellation():
4142
await asyncio.wait_for(service.events.cleaned_up.wait(), timeout=1)
4243

4344

45+
@pytest.mark.asyncio
46+
async def test_cancel_exits_async_generator():
47+
service = WaitService()
48+
asyncio.ensure_future(service.run())
49+
50+
async def cancel_soon():
51+
await service.sleep(0.05)
52+
await service.cancel()
53+
54+
asyncio.ensure_future(cancel_soon())
55+
56+
async def async_iterator():
57+
yield 1
58+
await asyncio.sleep(0.5)
59+
assert False, "iterator should have been cancelled by now"
60+
61+
try:
62+
async for val in service.wait_iter(async_iterator()):
63+
assert val == 1
64+
except OperationCancelled:
65+
pass
66+
else:
67+
assert False, "iterator should have been cancelled during iteration"
68+
69+
await service.cancel()
70+
71+
4472
@pytest.mark.asyncio
4573
async def test_service_tasks_do_not_leak_memory():
4674
service = WaitService()

0 commit comments

Comments
 (0)