Skip to content

Commit 9f554c9

Browse files
committed
Reverse call order of __anext__ and receive methods in Receiver
Signed-off-by: Sahas Subramanian <[email protected]>
1 parent 4cceb39 commit 9f554c9

File tree

8 files changed

+59
-46
lines changed

8 files changed

+59
-46
lines changed

src/frequenz/channels/anycast.py

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77

88
from asyncio import Condition
99
from collections import deque
10-
from typing import Deque, Generic, Optional
10+
from typing import Deque, Generic
1111

1212
from frequenz.channels.base_classes import Receiver as BaseReceiver
1313
from frequenz.channels.base_classes import Sender as BaseSender
@@ -163,19 +163,22 @@ def __init__(self, chan: Anycast[T]) -> None:
163163
"""
164164
self._chan = chan
165165

166-
async def receive(self) -> Optional[T]:
166+
async def __anext__(self) -> T:
167167
"""Receive a message from the channel.
168168
169169
Waits for an message to become available, and returns that message.
170170
When there are multiple receivers for the channel, only one receiver
171171
will receive each message.
172172
173+
Raises:
174+
StopAsyncIteration: When the channel is closed.
175+
173176
Returns:
174-
`None`, if the channel is closed, a message otherwise.
177+
The received message.
175178
"""
176179
while len(self._chan.deque) == 0:
177180
if self._chan.closed:
178-
return None
181+
raise StopAsyncIteration()
179182
async with self._chan.recv_cv:
180183
await self._chan.recv_cv.wait()
181184
ret = self._chan.deque.popleft()

src/frequenz/channels/base_classes.py

Lines changed: 16 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -32,11 +32,14 @@ class Receiver(ABC, Generic[T]):
3232
"""A channel Receiver."""
3333

3434
@abstractmethod
35-
async def receive(self) -> Optional[T]:
36-
"""Receive a message from the channel.
35+
async def __anext__(self) -> T:
36+
"""Await the next value in the async iteration over received values.
3737
3838
Returns:
39-
`None`, if the channel is closed, a message otherwise.
39+
The next value received.
40+
41+
Raises:
42+
StopAsyncIteration: if the underlying channel is closed.
4043
"""
4144

4245
def __aiter__(self) -> Receiver[T]:
@@ -47,19 +50,16 @@ def __aiter__(self) -> Receiver[T]:
4750
"""
4851
return self
4952

50-
async def __anext__(self) -> T:
51-
"""Await the next value in the async iteration over received values.
53+
async def receive(self) -> Optional[T]:
54+
"""Receive a message from the channel.
5255
5356
Returns:
54-
The next value received.
55-
56-
Raises:
57-
StopAsyncIteration: if we receive `None`, i.e. if the underlying
58-
channel is closed.
57+
The received message.
5958
"""
60-
received = await self.receive()
61-
if received is None:
62-
raise StopAsyncIteration
59+
try:
60+
received = await self.__anext__() # pylint: disable=unnecessary-dunder-call
61+
except StopAsyncIteration:
62+
return None
6363
return received
6464

6565
def map(self, call: Callable[[T], U]) -> Receiver[U]:
@@ -136,13 +136,11 @@ def __init__(self, recv: Receiver[T], transform: Callable[[T], U]) -> None:
136136
self._recv = recv
137137
self._transform = transform
138138

139-
async def receive(self) -> Optional[U]:
139+
async def __anext__(self) -> U:
140140
"""Return a transformed message received from the input channel.
141141
142142
Returns:
143-
`None`, if the channel is closed, a message otherwise.
143+
A transformed message.
144144
"""
145-
msg = await self._recv.receive()
146-
if msg is None:
147-
return None
145+
msg = await self._recv.__anext__()
148146
return self._transform(msg)

src/frequenz/channels/bidirectional.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55

66
from __future__ import annotations
77

8-
from typing import Generic, Optional
8+
from typing import Generic
99

1010
from frequenz.channels.base_classes import Receiver, Sender, T, U
1111
from frequenz.channels.broadcast import Broadcast
@@ -82,10 +82,10 @@ async def send(self, msg: T) -> bool:
8282
"""
8383
return await self._sender.send(msg)
8484

85-
async def receive(self) -> Optional[U]:
85+
async def __anext__(self) -> U:
8686
"""Receive a value from the other side.
8787
8888
Returns:
8989
Received value, or `None` if the channels are closed.
9090
"""
91-
return await self._receiver.receive()
91+
return await self._receiver.__anext__()

src/frequenz/channels/broadcast.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -249,7 +249,7 @@ def __len__(self) -> int:
249249
"""
250250
return len(self._q)
251251

252-
async def receive(self) -> Optional[T]:
252+
async def __anext__(self) -> T:
253253
"""Receive a message from the Broadcast channel.
254254
255255
Waits until there are messages available in the channel and returns
@@ -261,6 +261,7 @@ async def receive(self) -> Optional[T]:
261261
`EOFError`.
262262
263263
Raises:
264+
StopAsyncIteration: When the channel is closed.
264265
EOFError: when the receiver has been converted into a `Peekable`.
265266
266267
Returns:
@@ -271,7 +272,7 @@ async def receive(self) -> Optional[T]:
271272

272273
while len(self._q) == 0:
273274
if self._chan.closed:
274-
return None
275+
raise StopAsyncIteration()
275276
async with self._chan.recv_cv:
276277
await self._chan.recv_cv.wait()
277278
ret = self._q.popleft()

src/frequenz/channels/merge.py

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55

66
import asyncio
77
from collections import deque
8-
from typing import Any, Deque, Optional, Set
8+
from typing import Any, Deque, Set
99

1010
from frequenz.channels.base_classes import Receiver, T
1111

@@ -34,7 +34,7 @@ def __init__(self, *args: Receiver[T]) -> None:
3434
"""
3535
self._receivers = {str(id): recv for id, recv in enumerate(args)}
3636
self._pending: Set[asyncio.Task[Any]] = {
37-
asyncio.create_task(recv.receive(), name=name)
37+
asyncio.create_task(recv.__anext__(), name=name)
3838
for name, recv in self._receivers.items()
3939
}
4040
self._results: Deque[T] = deque(maxlen=len(self._receivers))
@@ -44,9 +44,12 @@ def __del__(self) -> None:
4444
for task in self._pending:
4545
task.cancel()
4646

47-
async def receive(self) -> Optional[T]:
47+
async def __anext__(self) -> T:
4848
"""Wait until there's a message in any of the channels.
4949
50+
Raises:
51+
StopAsyncIteration: When the channel is closed.
52+
5053
Returns:
5154
The next message that was received, or `None`, if all channels have
5255
closed.
@@ -58,17 +61,17 @@ async def receive(self) -> Optional[T]:
5861
return self._results.popleft()
5962

6063
if len(self._pending) == 0:
61-
return None
64+
raise StopAsyncIteration()
6265
done, self._pending = await asyncio.wait(
6366
self._pending, return_when=asyncio.FIRST_COMPLETED
6467
)
6568
for item in done:
6669
name = item.get_name()
67-
result = item.result()
6870
# if channel is closed, don't add a task for it again.
69-
if result is None:
71+
if isinstance(item.exception(), StopAsyncIteration):
7072
continue
73+
result = item.result()
7174
self._results.append(result)
7275
self._pending.add(
73-
asyncio.create_task(self._receivers[name].receive(), name=name)
76+
asyncio.create_task(self._receivers[name].__anext__(), name=name)
7477
)

src/frequenz/channels/merge_named.py

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55

66
import asyncio
77
from collections import deque
8-
from typing import Any, Deque, Optional, Set, Tuple
8+
from typing import Any, Deque, Set, Tuple
99

1010
from frequenz.channels.base_classes import Receiver, T
1111

@@ -21,7 +21,7 @@ def __init__(self, **kwargs: Receiver[T]) -> None:
2121
"""
2222
self._receivers = kwargs
2323
self._pending: Set[asyncio.Task[Any]] = {
24-
asyncio.create_task(recv.receive(), name=name)
24+
asyncio.create_task(recv.__anext__(), name=name)
2525
for name, recv in self._receivers.items()
2626
}
2727
self._results: Deque[Tuple[str, T]] = deque(maxlen=len(self._receivers))
@@ -31,9 +31,12 @@ def __del__(self) -> None:
3131
for task in self._pending:
3232
task.cancel()
3333

34-
async def receive(self) -> Optional[Tuple[str, T]]:
34+
async def __anext__(self) -> Tuple[str, T]:
3535
"""Wait until there's a message in any of the channels.
3636
37+
Raises:
38+
StopAsyncIteration: When the channel is closed.
39+
3740
Returns:
3841
The next message that was received, or `None`, if all channels have
3942
closed.
@@ -45,17 +48,17 @@ async def receive(self) -> Optional[Tuple[str, T]]:
4548
return self._results.popleft()
4649

4750
if len(self._pending) == 0:
48-
return None
51+
raise StopAsyncIteration()
4952
done, self._pending = await asyncio.wait(
5053
self._pending, return_when=asyncio.FIRST_COMPLETED
5154
)
5255
for item in done:
5356
name = item.get_name()
54-
result = item.result()
5557
# if channel is closed, don't add a task for it again.
56-
if result is None:
58+
if isinstance(item.exception(), StopAsyncIteration):
5759
continue
60+
result = item.result()
5861
self._results.append((name, result))
5962
self._pending.add(
60-
asyncio.create_task(self._receivers[name].receive(), name=name)
63+
asyncio.create_task(self._receivers[name].__anext__(), name=name)
6164
)

src/frequenz/channels/utils/file_watcher.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,9 +62,12 @@ def __del__(self) -> None:
6262
"""
6363
self._stop_event.set()
6464

65-
async def receive(self) -> Optional[pathlib.Path]:
65+
async def __anext__(self) -> pathlib.Path:
6666
"""Wait for the next file event and return its path.
6767
68+
Raises:
69+
StopAsyncIteration: When the channel is closed.
70+
6871
Returns:
6972
Path of next file.
7073
"""
@@ -73,7 +76,7 @@ async def receive(self) -> Optional[pathlib.Path]:
7376
for change in changes:
7477
# Tuple of (Change, path) returned by watchfiles
7578
if change is None or len(change) != 2:
76-
return None
79+
raise StopAsyncIteration()
7780

7881
_, path_str = change
7982
path = pathlib.Path(path_str)

src/frequenz/channels/utils/timer.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55

66
import asyncio
77
from datetime import datetime, timedelta, timezone
8-
from typing import Optional
98

109
from frequenz.channels.base_classes import Receiver
1110

@@ -76,9 +75,12 @@ def stop(self) -> None:
7675
"""
7776
self._stopped = True
7877

79-
async def receive(self) -> Optional[datetime]:
78+
async def __anext__(self) -> datetime:
8079
"""Return the current time (in UTC) once the next tick is due.
8180
81+
Raises:
82+
StopAsyncIteration: When the channel is closed.
83+
8284
Returns:
8385
The time of the next tick in UTC or `None` if
8486
[stop()][frequenz.channels.Timer.stop] has been called on the
@@ -89,7 +91,7 @@ async def receive(self) -> Optional[datetime]:
8991
instead of a native datetime object.
9092
"""
9193
if self._stopped:
92-
return None
94+
raise StopAsyncIteration()
9395
now = datetime.now(timezone.utc)
9496
diff = self._next_msg_time - now
9597
while diff.total_seconds() > 0:

0 commit comments

Comments
 (0)