Skip to content

Commit 6138ee0

Browse files
committed
Update ready and consume methods to raise ChannelClosedError
... instead of `StopAsyncIteration` Signed-off-by: Sahas Subramanian <[email protected]>
1 parent 6b67fe1 commit 6138ee0

File tree

8 files changed

+29
-29
lines changed

8 files changed

+29
-29
lines changed

src/frequenz/channels/anycast.py

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
from collections import deque
1010
from typing import Deque, Generic, Optional
1111

12+
from frequenz.channels.base_classes import ChannelClosedError
1213
from frequenz.channels.base_classes import Receiver as BaseReceiver
1314
from frequenz.channels.base_classes import Sender as BaseSender
1415
from frequenz.channels.base_classes import T
@@ -168,15 +169,15 @@ async def ready(self) -> None:
168169
"""Wait until the receiver is ready with a value.
169170
170171
Raises:
171-
StopAsyncIteration: if the underlying channel is closed.
172+
ChannelClosedError: if the underlying channel is closed.
172173
"""
173174
# if a message is already ready, then return immediately.
174175
if self._next is not None:
175176
return
176177

177178
while len(self._chan.deque) == 0:
178179
if self._chan.closed:
179-
raise StopAsyncIteration()
180+
raise ChannelClosedError()
180181
async with self._chan.recv_cv:
181182
await self._chan.recv_cv.wait()
182183
self._next = self._chan.deque.popleft()
@@ -186,9 +187,6 @@ async def ready(self) -> None:
186187
def consume(self) -> T:
187188
"""Return the latest value once `ready()` is complete.
188189
189-
Raises:
190-
EOFError: When called before a call to `ready()` finishes.
191-
192190
Returns:
193191
The next value that was received.
194192
"""

src/frequenz/channels/base_classes.py

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -69,8 +69,11 @@ async def __anext__(self) -> T:
6969
Raises:
7070
StopAsyncIteration: if the underlying channel is closed.
7171
"""
72-
await self.ready()
73-
return self.consume()
72+
try:
73+
await self.ready()
74+
return self.consume()
75+
except ChannelClosedError as exc:
76+
raise StopAsyncIteration() from exc
7477

7578
@abstractmethod
7679
async def ready(self) -> None:
@@ -80,7 +83,7 @@ async def ready(self) -> None:
8083
`consume()`.
8184
8285
Raises:
83-
StopAsyncIteration: if the underlying channel is closed.
86+
ChannelClosedError: if the underlying channel is closed.
8487
"""
8588

8689
@abstractmethod
@@ -93,7 +96,7 @@ def consume(self) -> T:
9396
The next value received.
9497
9598
Raises:
96-
StopAsyncIteration: if the underlying channel is closed.
99+
ChannelClosedError: if the underlying channel is closed.
97100
"""
98101

99102
def __aiter__(self) -> Receiver[T]:

src/frequenz/channels/broadcast.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
from typing import Deque, Dict, Generic, Optional
1313
from uuid import UUID, uuid4
1414

15-
from frequenz.channels.base_classes import BufferedReceiver
15+
from frequenz.channels.base_classes import BufferedReceiver, ChannelClosedError
1616
from frequenz.channels.base_classes import Peekable as BasePeekable
1717
from frequenz.channels.base_classes import Sender as BaseSender
1818
from frequenz.channels.base_classes import T
@@ -253,8 +253,8 @@ async def ready(self) -> None:
253253
"""Wait until the receiver is ready with a value.
254254
255255
Raises:
256-
EOFError: When called before a call to `ready()` finishes.
257-
StopAsyncIteration: if the underlying channel is closed.
256+
EOFError: if this receiver is no longer active.
257+
ChannelClosedError: if the underlying channel is closed.
258258
"""
259259
if not self._active:
260260
raise EOFError("This receiver is no longer active.")
@@ -265,7 +265,7 @@ async def ready(self) -> None:
265265
# consumed, then we return immediately.
266266
while len(self._q) == 0:
267267
if self._chan.closed:
268-
raise StopAsyncIteration()
268+
raise ChannelClosedError()
269269
async with self._chan.recv_cv:
270270
await self._chan.recv_cv.wait()
271271

src/frequenz/channels/merge.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
from collections import deque
88
from typing import Any, Deque, Set
99

10-
from frequenz.channels.base_classes import Receiver, T
10+
from frequenz.channels.base_classes import ChannelClosedError, Receiver, T
1111

1212

1313
class Merge(Receiver[T]):
@@ -48,7 +48,7 @@ async def ready(self) -> None:
4848
"""Wait until the receiver is ready with a value.
4949
5050
Raises:
51-
StopAsyncIteration: if the underlying channel is closed.
51+
ChannelClosedError: if the underlying channel is closed.
5252
"""
5353
# we use a while loop to continue to wait for new data, in case the
5454
# previous `wait` completed because a channel was closed.
@@ -58,7 +58,7 @@ async def ready(self) -> None:
5858
return
5959

6060
if len(self._pending) == 0:
61-
raise StopAsyncIteration()
61+
raise ChannelClosedError()
6262
done, self._pending = await asyncio.wait(
6363
self._pending, return_when=asyncio.FIRST_COMPLETED
6464
)

src/frequenz/channels/merge_named.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
from collections import deque
88
from typing import Any, Deque, Set, Tuple
99

10-
from frequenz.channels.base_classes import Receiver, T
10+
from frequenz.channels.base_classes import ChannelClosedError, Receiver, T
1111

1212

1313
class MergeNamed(Receiver[Tuple[str, T]]):
@@ -35,7 +35,7 @@ async def ready(self) -> None:
3535
"""Wait until there's a message in any of the channels.
3636
3737
Raises:
38-
StopAsyncIteration: When the channel is closed.
38+
ChannelClosedError: when all the channels are closed.
3939
"""
4040
# we use a while loop to continue to wait for new data, in case the
4141
# previous `wait` completed because a channel was closed.
@@ -45,7 +45,7 @@ async def ready(self) -> None:
4545
return
4646

4747
if len(self._pending) == 0:
48-
raise StopAsyncIteration()
48+
raise ChannelClosedError()
4949
done, self._pending = await asyncio.wait(
5050
self._pending, return_when=asyncio.FIRST_COMPLETED
5151
)

src/frequenz/channels/select.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
from dataclasses import dataclass
1414
from typing import Any, Dict, List, Optional, Set, TypeVar
1515

16-
from frequenz.channels.base_classes import Receiver
16+
from frequenz.channels.base_classes import ChannelClosedError, Receiver
1717

1818
logger = logging.Logger(__name__)
1919
T = TypeVar("T")
@@ -147,7 +147,7 @@ async def ready(self) -> bool:
147147
for item in done:
148148
name = item.get_name()
149149
recv = self._receivers[name]
150-
if isinstance(item.exception(), StopAsyncIteration):
150+
if isinstance(item.exception(), ChannelClosedError):
151151
result = None
152152
else:
153153
result = recv
@@ -157,8 +157,7 @@ async def ready(self) -> bool:
157157
# don't add a task for it again.
158158
if result is None:
159159
continue
160-
ready = recv.ready()
161-
self._pending.add(asyncio.create_task(ready, name=name))
160+
self._pending.add(asyncio.create_task(recv.ready(), name=name))
162161
return True
163162

164163
def __getattr__(self, name: str) -> Optional[Any]:

src/frequenz/channels/utils/file_watcher.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
from watchfiles import Change, awatch
1111
from watchfiles.main import FileChange
1212

13-
from frequenz.channels.base_classes import Receiver
13+
from frequenz.channels.base_classes import ChannelClosedError, Receiver
1414

1515

1616
class EventType(Enum):
@@ -83,7 +83,7 @@ def consume(self) -> pathlib.Path:
8383
"""Return the latest change once `ready` is complete.
8484
8585
Raises:
86-
StopAsyncIteration: When the channel is closed.
86+
ChannelClosedError: When the channel is closed.
8787
8888
Returns:
8989
The next change that was received.
@@ -92,7 +92,7 @@ def consume(self) -> pathlib.Path:
9292
change = self._changes.pop()
9393
# Tuple of (Change, path) returned by watchfiles
9494
if change is None or len(change) != 2:
95-
raise StopAsyncIteration()
95+
raise ChannelClosedError()
9696
_, path_str = change
9797
path = pathlib.Path(path_str)
9898
return path

src/frequenz/channels/utils/timer.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
from datetime import datetime, timedelta, timezone
88
from typing import Optional
99

10-
from frequenz.channels.base_classes import Receiver
10+
from frequenz.channels.base_classes import ChannelClosedError, Receiver
1111

1212

1313
class Timer(Receiver[datetime]):
@@ -81,7 +81,7 @@ async def ready(self) -> None:
8181
"""Return the current time (in UTC) once the next tick is due.
8282
8383
Raises:
84-
StopAsyncIteration: if [stop()][frequenz.channels.Timer.stop] has been
84+
ChannelClosedError: if [stop()][frequenz.channels.Timer.stop] has been
8585
called on the timer.
8686
8787
Returns:
@@ -94,7 +94,7 @@ async def ready(self) -> None:
9494
return
9595

9696
if self._stopped:
97-
raise StopAsyncIteration()
97+
raise ChannelClosedError()
9898
now = datetime.now(timezone.utc)
9999
diff = self._next_msg_time - now
100100
while diff.total_seconds() > 0:

0 commit comments

Comments
 (0)