Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions newsfragments/3331.bugfix.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Fixed a bug where iterating over an ``@as_safe_channel``-derived ``ReceiveChannel``
would raise `~trio.BrokenResourceError` if the channel was closed by another task.
It now shuts down cleanly.
11 changes: 9 additions & 2 deletions src/trio/_channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import trio

from ._abc import ReceiveChannel, ReceiveType, SendChannel, SendType, T
from ._core import Abort, RaiseCancelT, Task, enable_ki_protection
from ._core import Abort, BrokenResourceError, RaiseCancelT, Task, enable_ki_protection
from ._util import (
MultipleExceptionError,
NoPublicConstructor,
Expand Down Expand Up @@ -577,12 +577,19 @@ async def _move_elems_to_channel(
while True:
# wait for receiver to call next on the aiter
await send_semaphore.acquire()
if not send_chan._state.open_receive_channels:
# skip the possibly-expensive computation in the generator,
# if we know it will be impossible to send the result.
break
try:
value = await agen.__anext__()
except StopAsyncIteration:
return
# Send the value to the channel
await send_chan.send(value)
try:
await send_chan.send(value)
except BrokenResourceError:
break # closed since we checked above
finally:
# work around `.aclose()` not suppressing GeneratorExit in an
# ExceptionGroup:
Expand Down
46 changes: 45 additions & 1 deletion src/trio/_tests/test_channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,7 @@ async def test_as_safe_channel_broken_resource() -> None:
@as_safe_channel
async def agen() -> AsyncGenerator[int]:
yield 1
yield 2
yield 2 # pragma: no cover

async with agen() as recv_chan:
assert await recv_chan.__anext__() == 1
Expand Down Expand Up @@ -695,3 +695,47 @@ async def agen(ex: type[BaseException]) -> AsyncGenerator[None]:
async with agen(ValueError) as g:
async for _ in g:
break


async def test_as_safe_channel_close_between_iteration() -> None:
@as_safe_channel
async def agen() -> AsyncGenerator[None]:
while True:
yield

async with agen() as chan, trio.open_nursery() as nursery:

async def close_channel() -> None:
await trio.lowlevel.checkpoint()
await chan.aclose()

nursery.start_soon(close_channel)
with pytest.raises(trio.ClosedResourceError):
async for _ in chan:
pass


async def test_as_safe_channel_close_before_iteration() -> None:
@as_safe_channel
async def agen() -> AsyncGenerator[None]:
raise AssertionError("should be unreachable") # pragma: no cover
yield # pragma: no cover

async with agen() as chan:
await chan.aclose()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Huh, it's a bit unintuitive to me that it's possible to not run any code within the async generator. But that can happen in sync generators too so I guess it's fine.

with pytest.raises(trio.ClosedResourceError):
await chan.receive()


async def test_as_safe_channel_close_during_iteration() -> None:
@as_safe_channel
async def agen() -> AsyncGenerator[None]:
await chan.aclose()
while True:
yield

for _ in range(10): # 20% missed-alarm rate, so run ten times
async with agen() as chan:
with pytest.raises(trio.ClosedResourceError):
async for _ in chan:
pass
Loading