Skip to content

Commit c13edc3

Browse files
committed
Rename Receiver methods _ready, _get to ready, consume
Signed-off-by: Sahas Subramanian <[email protected]>
1 parent 359357c commit c13edc3

File tree

9 files changed

+60
-52
lines changed

9 files changed

+60
-52
lines changed

src/frequenz/channels/anycast.py

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,7 @@ def __init__(self, chan: Anycast[T]) -> None:
164164
self._chan = chan
165165
self._next: Optional[T] = None
166166

167-
async def _ready(self) -> None:
167+
async def ready(self) -> None:
168168
"""Wait until the receiver is ready with a value.
169169
170170
Raises:
@@ -183,17 +183,19 @@ async def _ready(self) -> None:
183183
async with self._chan.send_cv:
184184
self._chan.send_cv.notify(1)
185185

186-
def _get(self) -> T:
187-
"""Return the latest value once `_ready()` is complete.
186+
def consume(self) -> T:
187+
"""Return the latest value once `ready()` is complete.
188188
189189
Raises:
190-
EOFError: When called before a call to `_ready()` finishes.
190+
EOFError: When called before a call to `ready()` finishes.
191191
192192
Returns:
193193
The next value that was received.
194194
"""
195195
if self._next is None:
196-
raise EOFError("_get was called before a call to _ready finished.")
196+
raise EOFError(
197+
"`consume()` was called before a call to `ready()` finished."
198+
)
197199
next_val = self._next
198200
self._next = None
199201
return next_val

src/frequenz/channels/base_classes.py

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -69,25 +69,25 @@ async def __anext__(self) -> T:
6969
Raises:
7070
StopAsyncIteration: if the underlying channel is closed.
7171
"""
72-
await self._ready()
73-
return self._get()
72+
await self.ready()
73+
return self.consume()
7474

7575
@abstractmethod
76-
async def _ready(self) -> None:
76+
async def ready(self) -> None:
7777
"""Wait until the receiver is ready with a value.
7878
79-
Once a call to `_ready` has finished, the value should be read with a call to
80-
`_get()`.
79+
Once a call to `ready()` has finished, the value should be read with a call to
80+
`consume()`.
8181
8282
Raises:
8383
StopAsyncIteration: if the underlying channel is closed.
8484
"""
8585

8686
@abstractmethod
87-
def _get(self) -> T:
88-
"""Return the latest value once `_ready` is complete.
87+
def consume(self) -> T:
88+
"""Return the latest value once `ready()` is complete.
8989
90-
`_ready()` must be called before each call to `_get()`.
90+
`ready()` must be called before each call to `consume()`.
9191
9292
Returns:
9393
The next value received.
@@ -193,14 +193,14 @@ def __init__(self, recv: Receiver[T], transform: Callable[[T], U]) -> None:
193193
self._recv = recv
194194
self._transform = transform
195195

196-
async def _ready(self) -> None:
196+
async def ready(self) -> None:
197197
"""Wait until the receiver is ready with a value."""
198-
await self._recv._ready() # pylint: disable=protected-access
198+
await self._recv.ready() # pylint: disable=protected-access
199199

200-
def _get(self) -> U:
201-
"""Return a transformed value once `_ready()` is complete.
200+
def consume(self) -> U:
201+
"""Return a transformed value once `ready()` is complete.
202202
203203
Returns:
204204
The next value that was received.
205205
"""
206-
return self._transform(self._recv._get()) # pylint: disable=protected-access
206+
return self._transform(self._recv.consume()) # pylint: disable=protected-access

src/frequenz/channels/bidirectional.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -82,14 +82,14 @@ async def send(self, msg: T) -> bool:
8282
"""
8383
return await self._sender.send(msg)
8484

85-
async def _ready(self) -> None:
85+
async def ready(self) -> None:
8686
"""Wait until the receiver is ready with a value."""
87-
await self._receiver._ready() # pylint: disable=protected-access
87+
await self._receiver.ready() # pylint: disable=protected-access
8888

89-
def _get(self) -> U:
89+
def consume(self) -> U:
9090
"""Return the latest value once `_ready` is complete.
9191
9292
Returns:
9393
The next value that was received.
9494
"""
95-
return self._receiver._get() # pylint: disable=protected-access
95+
return self._receiver.consume() # pylint: disable=protected-access

src/frequenz/channels/broadcast.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -249,11 +249,11 @@ def __len__(self) -> int:
249249
"""
250250
return len(self._q)
251251

252-
async def _ready(self) -> None:
252+
async def ready(self) -> None:
253253
"""Wait until the receiver is ready with a value.
254254
255255
Raises:
256-
EOFError: When called before a call to `_ready()` finishes.
256+
EOFError: When called before a call to `ready()` finishes.
257257
StopAsyncIteration: if the underlying channel is closed.
258258
"""
259259
if not self._active:
@@ -269,8 +269,8 @@ async def _ready(self) -> None:
269269
async with self._chan.recv_cv:
270270
await self._chan.recv_cv.wait()
271271

272-
def _get(self) -> T:
273-
"""Return the latest value once `_ready` is complete.
272+
def consume(self) -> T:
273+
"""Return the latest value once `ready` is complete.
274274
275275
Returns:
276276
The next value that was received.

src/frequenz/channels/merge.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ def __del__(self) -> None:
4444
for task in self._pending:
4545
task.cancel()
4646

47-
async def _ready(self) -> None:
47+
async def ready(self) -> None:
4848
"""Wait until the receiver is ready with a value.
4949
5050
Raises:
@@ -74,16 +74,16 @@ async def _ready(self) -> None:
7474
asyncio.create_task(self._receivers[name].__anext__(), name=name)
7575
)
7676

77-
def _get(self) -> T:
78-
"""Return the latest value once `_ready` is complete.
77+
def consume(self) -> T:
78+
"""Return the latest value once `ready` is complete.
7979
8080
Raises:
81-
EOFError: When called before a call to `_ready()` finishes.
81+
EOFError: When called before a call to `ready()` finishes.
8282
8383
Returns:
8484
The next value that was received.
8585
"""
8686
if not self._results:
87-
raise EOFError("_get called before _ready finished.")
87+
raise EOFError("`consume` called before `ready` finished.")
8888

8989
return self._results.popleft()

src/frequenz/channels/merge_named.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ def __del__(self) -> None:
3131
for task in self._pending:
3232
task.cancel()
3333

34-
async def _ready(self) -> None:
34+
async def ready(self) -> None:
3535
"""Wait until there's a message in any of the channels.
3636
3737
Raises:
@@ -61,11 +61,11 @@ async def _ready(self) -> None:
6161
asyncio.create_task(self._receivers[name].__anext__(), name=name)
6262
)
6363

64-
def _get(self) -> Tuple[str, T]:
65-
"""Return the latest value once `_ready` is complete.
64+
def consume(self) -> Tuple[str, T]:
65+
"""Return the latest value once `ready` is complete.
6666
6767
Raises:
68-
EOFError: When called before a call to `_ready()` finishes.
68+
EOFError: When called before a call to `ready()` finishes.
6969
7070
Returns:
7171
The next value that was received, along with its name.

src/frequenz/channels/select.py

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ def get(self) -> _Selected:
5050
"""
5151
if self.recv is None:
5252
return _Selected(None)
53-
return _Selected(self.recv._get()) # pylint: disable=protected-access
53+
return _Selected(self.recv.consume()) # pylint: disable=protected-access
5454

5555

5656
class Select:
@@ -93,9 +93,7 @@ def __init__(self, **kwargs: Receiver[Any]) -> None:
9393
self._pending: Set[asyncio.Task[None]] = set()
9494

9595
for name, recv in self._receivers.items():
96-
# can replace __anext__() to anext() (Only Python 3.10>=)
97-
ready = recv._ready() # pylint: disable=unnecessary-dunder-call
98-
self._pending.add(asyncio.create_task(ready, name=name))
96+
self._pending.add(asyncio.create_task(recv.ready(), name=name))
9997

10098
self._ready_count = 0
10199
self._prev_ready_count = 0
@@ -124,7 +122,7 @@ async def ready(self) -> bool:
124122
if value is not None:
125123
dropped_names.append(name)
126124
if value.recv is not None:
127-
value.recv._get() # pylint: disable=protected-access
125+
value.recv.consume()
128126
self._result[name] = None
129127
self._ready_count = 0
130128
self._prev_ready_count = 0
@@ -159,7 +157,7 @@ async def ready(self) -> bool:
159157
# don't add a task for it again.
160158
if result is None:
161159
continue
162-
ready = recv._ready() # pylint: disable=protected-access
160+
ready = recv.ready()
163161
self._pending.add(asyncio.create_task(ready, name=name))
164162
return True
165163

src/frequenz/channels/utils/file_watcher.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ def __del__(self) -> None:
6464
"""
6565
self._stop_event.set()
6666

67-
async def _ready(self) -> None:
67+
async def ready(self) -> None:
6868
"""Wait for the next file event and return its path.
6969
7070
Raises:
@@ -79,7 +79,15 @@ async def _ready(self) -> None:
7979

8080
self._changes = await self._awatch.__anext__()
8181

82-
def _get(self) -> pathlib.Path:
82+
def consume(self) -> pathlib.Path:
83+
"""Return the latest change once `ready` is complete.
84+
85+
Raises:
86+
StopAsyncIteration: When the channel is closed.
87+
88+
Returns:
89+
The next change that was received.
90+
"""
8391
change = self._changes.pop()
8492
# Tuple of (Change, path) returned by watchfiles
8593
if change is None or len(change) != 2:

src/frequenz/channels/utils/timer.py

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ def stop(self) -> None:
7777
"""
7878
self._stopped = True
7979

80-
async def _ready(self) -> None:
80+
async def ready(self) -> None:
8181
"""Return the current time (in UTC) once the next tick is due.
8282
8383
Raises:
@@ -88,10 +88,6 @@ async def _ready(self) -> None:
8888
The time of the next tick in UTC or `None` if
8989
[stop()][frequenz.channels.Timer.stop] has been called on the
9090
timer.
91-
92-
Changelog:
93-
* **v0.11.0:** Returns a timezone-aware datetime with UTC timezone
94-
instead of a native datetime object.
9591
"""
9692
# if there are messages waiting to be consumed, return immediately.
9793
if self._now is not None:
@@ -109,17 +105,21 @@ async def _ready(self) -> None:
109105

110106
self._next_msg_time = self._now + self._interval
111107

112-
def _get(self) -> datetime:
113-
"""Return the latest value once `_ready` is complete.
108+
def consume(self) -> datetime:
109+
"""Return the latest value once `ready` is complete.
114110
115111
Raises:
116-
EOFError: When called before a call to `_ready()` finishes.
112+
EOFError: When called before a call to `ready()` finishes.
117113
118114
Returns:
119115
The timestamp for the next tick.
116+
117+
Changelog:
118+
* **v0.11.0:** Returns a timezone-aware datetime with UTC timezone
119+
instead of a native datetime object.
120120
"""
121121
if self._now is None:
122-
raise EOFError("_get called before _ready finished")
122+
raise EOFError("`consume` called before `ready` finished")
123123
now = self._now
124124
self._now = None
125125
return now

0 commit comments

Comments
 (0)