Skip to content

Commit 382ebef

Browse files
committed
Split __anext__ in receivers into two methods: _ready and _get
Signed-off-by: Sahas Subramanian <[email protected]>
1 parent 18000b8 commit 382ebef

File tree

8 files changed

+161
-66
lines changed

8 files changed

+161
-66
lines changed

src/frequenz/channels/anycast.py

Lines changed: 25 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77

88
from asyncio import Condition
99
from collections import deque
10-
from typing import Deque, Generic
10+
from typing import Deque, Generic, Optional
1111

1212
from frequenz.channels.base_classes import Receiver as BaseReceiver
1313
from frequenz.channels.base_classes import Sender as BaseSender
@@ -162,26 +162,38 @@ def __init__(self, chan: Anycast[T]) -> None:
162162
chan: A reference to the channel that this receiver belongs to.
163163
"""
164164
self._chan = chan
165+
self._next: Optional[T] = None
165166

166-
async def __anext__(self) -> T:
167-
"""Receive a message from the channel.
168-
169-
Waits for an message to become available, and returns that message.
170-
When there are multiple receivers for the channel, only one receiver
171-
will receive each message.
167+
async def _ready(self) -> None:
168+
"""Wait until the receiver is ready with a value.
172169
173170
Raises:
174-
StopAsyncIteration: When the channel is closed.
175-
176-
Returns:
177-
The received message.
171+
StopAsyncIteration: if the underlying channel is closed.
178172
"""
173+
# if a message is already ready, then return immediately.
174+
if self._next is not None:
175+
return
176+
179177
while len(self._chan.deque) == 0:
180178
if self._chan.closed:
181179
raise StopAsyncIteration()
182180
async with self._chan.recv_cv:
183181
await self._chan.recv_cv.wait()
184-
ret = self._chan.deque.popleft()
182+
self._next = self._chan.deque.popleft()
185183
async with self._chan.send_cv:
186184
self._chan.send_cv.notify(1)
187-
return ret
185+
186+
def _get(self) -> T:
187+
"""Return the latest value once `_ready()` is complete.
188+
189+
Raises:
190+
EOFError: When called before a call to `_ready()` finishes.
191+
192+
Returns:
193+
The next value that was received.
194+
"""
195+
if self._next is None:
196+
raise EOFError("_get was called before a call to _ready finished.")
197+
next_val = self._next
198+
self._next = None
199+
return next_val

src/frequenz/channels/base_classes.py

Lines changed: 34 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,6 @@ async def send(self, msg: T) -> bool:
6060
class Receiver(ABC, Generic[T]):
6161
"""A channel Receiver."""
6262

63-
@abstractmethod
6463
async def __anext__(self) -> T:
6564
"""Await the next value in the async iteration over received values.
6665
@@ -70,6 +69,32 @@ async def __anext__(self) -> T:
7069
Raises:
7170
StopAsyncIteration: if the underlying channel is closed.
7271
"""
72+
await self._ready()
73+
return self._get()
74+
75+
@abstractmethod
76+
async def _ready(self) -> None:
77+
"""Wait until the receiver is ready with a value.
78+
79+
Once a call to `_ready` has finished, the value should be read with a call to
80+
`_get()`.
81+
82+
Raises:
83+
StopAsyncIteration: if the underlying channel is closed.
84+
"""
85+
86+
@abstractmethod
87+
def _get(self) -> T:
88+
"""Return the latest value once `_ready` is complete.
89+
90+
`_ready()` must be called before each call to `_get()`.
91+
92+
Returns:
93+
The next value received.
94+
95+
Raises:
96+
StopAsyncIteration: if the underlying channel is closed.
97+
"""
7398

7499
def __aiter__(self) -> Receiver[T]:
75100
"""Initialize the async iterator over received values.
@@ -168,11 +193,14 @@ def __init__(self, recv: Receiver[T], transform: Callable[[T], U]) -> None:
168193
self._recv = recv
169194
self._transform = transform
170195

171-
async def __anext__(self) -> U:
172-
"""Return a transformed message received from the input channel.
196+
async def _ready(self) -> None:
197+
"""Wait until the receiver is ready with a value."""
198+
await self._recv._ready() # pylint: disable=protected-access
199+
200+
def _get(self) -> U:
201+
"""Return a transformed value once `_ready()` is complete.
173202
174203
Returns:
175-
A transformed message.
204+
The next value that was received.
176205
"""
177-
msg = await self._recv.__anext__()
178-
return self._transform(msg)
206+
return self._transform(self._recv._get()) # pylint: disable=protected-access

src/frequenz/channels/bidirectional.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -82,10 +82,14 @@ async def send(self, msg: T) -> bool:
8282
"""
8383
return await self._sender.send(msg)
8484

85-
async def __anext__(self) -> U:
86-
"""Receive a value from the other side.
85+
async def _ready(self) -> None:
86+
"""Wait until the receiver is ready with a value."""
87+
await self._receiver._ready() # pylint: disable=protected-access
88+
89+
def _get(self) -> U:
90+
"""Return the latest value once `_ready` is complete.
8791
8892
Returns:
89-
Received value, or `None` if the channels are closed.
93+
The next value that was received.
9094
"""
91-
return await self._receiver.__anext__()
95+
return self._receiver._get() # pylint: disable=protected-access

src/frequenz/channels/broadcast.py

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -249,32 +249,32 @@ def __len__(self) -> int:
249249
"""
250250
return len(self._q)
251251

252-
async def __anext__(self) -> T:
253-
"""Receive a message from the Broadcast channel.
254-
255-
Waits until there are messages available in the channel and returns
256-
them. If there are no remaining messages in the buffer and the channel
257-
is closed, returns `None` immediately.
258-
259-
If [into_peekable()][frequenz.channels.Receiver.into_peekable] is called
260-
on a broadcast `Receiver`, further calls to `receive`, will raise an
261-
`EOFError`.
252+
async def _ready(self) -> None:
253+
"""Wait until the receiver is ready with a value.
262254
263255
Raises:
264-
StopAsyncIteration: When the channel is closed.
265-
EOFError: when the receiver has been converted into a `Peekable`.
266-
267-
Returns:
268-
`None`, if the channel is closed, a message otherwise.
256+
EOFError: When called before a call to `_ready()` finishes.
257+
StopAsyncIteration: if the underlying channel is closed.
269258
"""
270259
if not self._active:
271260
raise EOFError("This receiver is no longer active.")
272261

262+
# Use a while loop here, to handle spurious wakeups of condition variables.
263+
#
264+
# The condition also makes sure that if there are already messages ready to be
265+
# consumed, then we return immediately.
273266
while len(self._q) == 0:
274267
if self._chan.closed:
275268
raise StopAsyncIteration()
276269
async with self._chan.recv_cv:
277270
await self._chan.recv_cv.wait()
271+
272+
def _get(self) -> T:
273+
"""Return the latest value once `_ready` is complete.
274+
275+
Returns:
276+
The next value that was received.
277+
"""
278278
ret = self._q.popleft()
279279
return ret
280280

src/frequenz/channels/merge.py

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -44,21 +44,18 @@ def __del__(self) -> None:
4444
for task in self._pending:
4545
task.cancel()
4646

47-
async def __anext__(self) -> T:
48-
"""Wait until there's a message in any of the channels.
47+
async def _ready(self) -> None:
48+
"""Wait until the receiver is ready with a value.
4949
5050
Raises:
51-
StopAsyncIteration: When the channel is closed.
52-
53-
Returns:
54-
The next message that was received, or `None`, if all channels have
55-
closed.
51+
StopAsyncIteration: if the underlying channel is closed.
5652
"""
5753
# we use a while loop to continue to wait for new data, in case the
5854
# previous `wait` completed because a channel was closed.
5955
while True:
56+
# if there are messages waiting to be consumed, return immediately.
6057
if len(self._results) > 0:
61-
return self._results.popleft()
58+
return
6259

6360
if len(self._pending) == 0:
6461
raise StopAsyncIteration()
@@ -73,5 +70,20 @@ async def __anext__(self) -> T:
7370
result = item.result()
7471
self._results.append(result)
7572
self._pending.add(
73+
# pylint: disable=unnecessary-dunder-call
7674
asyncio.create_task(self._receivers[name].__anext__(), name=name)
7775
)
76+
77+
def _get(self) -> T:
78+
"""Return the latest value once `_ready` is complete.
79+
80+
Raises:
81+
EOFError: When called before a call to `_ready()` finishes.
82+
83+
Returns:
84+
The next value that was received.
85+
"""
86+
if not self._results:
87+
raise EOFError("_get called before _ready finished.")
88+
89+
return self._results.popleft()

src/frequenz/channels/merge_named.py

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -31,21 +31,18 @@ def __del__(self) -> None:
3131
for task in self._pending:
3232
task.cancel()
3333

34-
async def __anext__(self) -> Tuple[str, T]:
34+
async def _ready(self) -> None:
3535
"""Wait until there's a message in any of the channels.
3636
3737
Raises:
3838
StopAsyncIteration: When the channel is closed.
39-
40-
Returns:
41-
The next message that was received, or `None`, if all channels have
42-
closed.
4339
"""
4440
# we use a while loop to continue to wait for new data, in case the
4541
# previous `wait` completed because a channel was closed.
4642
while True:
43+
# if there are messages waiting to be consumed, return immediately.
4744
if len(self._results) > 0:
48-
return self._results.popleft()
45+
return
4946

5047
if len(self._pending) == 0:
5148
raise StopAsyncIteration()
@@ -60,5 +57,20 @@ async def __anext__(self) -> Tuple[str, T]:
6057
result = item.result()
6158
self._results.append((name, result))
6259
self._pending.add(
60+
# pylint: disable=unnecessary-dunder-call
6361
asyncio.create_task(self._receivers[name].__anext__(), name=name)
6462
)
63+
64+
def _get(self) -> Tuple[str, T]:
65+
"""Return the latest value once `_ready` is complete.
66+
67+
Raises:
68+
EOFError: When called before a call to `_ready()` finishes.
69+
70+
Returns:
71+
The next value that was received, along with its name.
72+
"""
73+
if not self._results:
74+
raise EOFError("_get called before _ready finished.")
75+
76+
return self._results.popleft()

src/frequenz/channels/utils/file_watcher.py

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
from typing import List, Optional, Set, Union
99

1010
from watchfiles import Change, awatch
11+
from watchfiles.main import FileChange
1112

1213
from frequenz.channels.base_classes import Receiver
1314

@@ -52,6 +53,7 @@ def __init__(
5253
and pathlib.Path(path_str).is_file()
5354
),
5455
)
56+
self._changes: Set[FileChange] = set()
5557

5658
def __del__(self) -> None:
5759
"""Cleanup registered watches.
@@ -62,7 +64,7 @@ def __del__(self) -> None:
6264
"""
6365
self._stop_event.set()
6466

65-
async def __anext__(self) -> pathlib.Path:
67+
async def _ready(self) -> None:
6668
"""Wait for the next file event and return its path.
6769
6870
Raises:
@@ -71,13 +73,17 @@ async def __anext__(self) -> pathlib.Path:
7173
Returns:
7274
Path of next file.
7375
"""
74-
while True:
75-
changes = await self._awatch.__anext__()
76-
for change in changes:
77-
# Tuple of (Change, path) returned by watchfiles
78-
if change is None or len(change) != 2:
79-
raise StopAsyncIteration()
80-
81-
_, path_str = change
82-
path = pathlib.Path(path_str)
83-
return path
76+
# if there are messages waiting to be consumed, return immediately.
77+
if self._changes:
78+
return
79+
80+
self._changes = await self._awatch.__anext__()
81+
82+
def _get(self) -> pathlib.Path:
83+
change = self._changes.pop()
84+
# Tuple of (Change, path) returned by watchfiles
85+
if change is None or len(change) != 2:
86+
raise StopAsyncIteration()
87+
_, path_str = change
88+
path = pathlib.Path(path_str)
89+
return path

src/frequenz/channels/utils/timer.py

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
import asyncio
77
from datetime import datetime, timedelta, timezone
8+
from typing import Optional
89

910
from frequenz.channels.base_classes import Receiver
1011

@@ -61,6 +62,7 @@ def __init__(self, interval: float) -> None:
6162
self._stopped = False
6263
self._interval = timedelta(seconds=interval)
6364
self._next_msg_time = datetime.now(timezone.utc) + self._interval
65+
self._now: Optional[datetime] = None
6466

6567
def reset(self) -> None:
6668
"""Reset the timer to start timing from `now`."""
@@ -75,11 +77,12 @@ def stop(self) -> None:
7577
"""
7678
self._stopped = True
7779

78-
async def __anext__(self) -> datetime:
80+
async def _ready(self) -> None:
7981
"""Return the current time (in UTC) once the next tick is due.
8082
8183
Raises:
82-
StopAsyncIteration: When the channel is closed.
84+
StopAsyncIteration: if [stop()][frequenz.channels.Timer.stop] has been
85+
called on the timer.
8386
8487
Returns:
8588
The time of the next tick in UTC or `None` if
@@ -90,6 +93,10 @@ async def __anext__(self) -> datetime:
9093
* **v0.11.0:** Returns a timezone-aware datetime with UTC timezone
9194
instead of a native datetime object.
9295
"""
96+
# if there are messages waiting to be consumed, return immediately.
97+
if self._now is not None:
98+
return
99+
93100
if self._stopped:
94101
raise StopAsyncIteration()
95102
now = datetime.now(timezone.utc)
@@ -98,7 +105,21 @@ async def __anext__(self) -> datetime:
98105
await asyncio.sleep(diff.total_seconds())
99106
now = datetime.now(timezone.utc)
100107
diff = self._next_msg_time - now
108+
self._now = now
109+
110+
self._next_msg_time = self._now + self._interval
111+
112+
def _get(self) -> datetime:
113+
"""Return the latest value once `_ready` is complete.
101114
102-
self._next_msg_time = now + self._interval
115+
Raises:
116+
EOFError: When called before a call to `_ready()` finishes.
103117
118+
Returns:
119+
The timestamp for the next tick.
120+
"""
121+
if self._now is None:
122+
raise EOFError("_get called before _ready finished")
123+
now = self._now
124+
self._now = None
104125
return now

0 commit comments

Comments
 (0)