Skip to content

Commit cd54c44

Browse files
authored
Add a Receiver.close() method (#348)
This method would allow individual receivers to be closed, without affecting the underlying channel, if there is one.
2 parents 154cc55 + 6e5d635 commit cd54c44

File tree

15 files changed

+407
-2
lines changed

15 files changed

+407
-2
lines changed

RELEASE_NOTES.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010

1111
## New Features
1212

13-
<!-- Here goes the main new features and examples or instructions on how to use them -->
13+
- Added a `Receiver.close()` method for closing just a receiver. Also implemented it for all the `Receiver` implementations in this library.
1414

1515
## Bug Fixes
1616

src/frequenz/channels/_anycast.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@
1010
from collections import deque
1111
from typing import Generic, TypeVar
1212

13+
from typing_extensions import override
14+
1315
from ._exceptions import ChannelClosedError
1416
from ._generic import ChannelMessageT
1517
from ._receiver import Receiver, ReceiverStoppedError
@@ -320,6 +322,7 @@ def __init__(self, channel: Anycast[_T], /) -> None:
320322
self._channel: Anycast[_T] = channel
321323
"""The channel that this sender belongs to."""
322324

325+
@override
323326
async def send(self, message: _T, /) -> None:
324327
"""Send a message across the channel.
325328
@@ -388,8 +391,12 @@ def __init__(self, channel: Anycast[_T], /) -> None:
388391
self._channel: Anycast[_T] = channel
389392
"""The channel that this receiver belongs to."""
390393

394+
self._closed: bool = False
395+
"""Whether the receiver is closed."""
396+
391397
self._next: _T | type[_Empty] = _Empty
392398

399+
@override
393400
async def ready(self) -> bool:
394401
"""Wait until the receiver is ready with a message or an error.
395402
@@ -405,6 +412,9 @@ async def ready(self) -> bool:
405412
if self._next is not _Empty:
406413
return True
407414

415+
if self._closed:
416+
return False
417+
408418
# pylint: disable=protected-access
409419
while len(self._channel._deque) == 0:
410420
if self._channel._closed:
@@ -417,6 +427,7 @@ async def ready(self) -> bool:
417427
# pylint: enable=protected-access
418428
return True
419429

430+
@override
420431
def consume(self) -> _T:
421432
"""Return the latest message once `ready()` is complete.
422433
@@ -431,6 +442,9 @@ def consume(self) -> _T:
431442
):
432443
raise ReceiverStoppedError(self) from ChannelClosedError(self._channel)
433444

445+
if self._next is _Empty and self._closed:
446+
raise ReceiverStoppedError(self)
447+
434448
assert (
435449
self._next is not _Empty
436450
), "`consume()` must be preceded by a call to `ready()`"
@@ -441,6 +455,14 @@ def consume(self) -> _T:
441455

442456
return next_val
443457

458+
@override
459+
def close(self) -> None:
460+
"""Close this receiver.
461+
462+
After closing, the receiver will not be able to receive any more messages.
463+
"""
464+
self._closed = True
465+
444466
def __str__(self) -> str:
445467
"""Return a string representation of this receiver."""
446468
return f"{self._channel}:{type(self).__name__}"

src/frequenz/channels/_broadcast.py

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@
1111
from collections import deque
1212
from typing import Generic, TypeVar
1313

14+
from typing_extensions import override
15+
1416
from ._exceptions import ChannelClosedError
1517
from ._generic import ChannelMessageT
1618
from ._receiver import Receiver, ReceiverStoppedError
@@ -327,6 +329,7 @@ def __init__(self, channel: Broadcast[_T], /) -> None:
327329
self._channel: Broadcast[_T] = channel
328330
"""The broadcast channel this sender belongs to."""
329331

332+
@override
330333
async def send(self, message: _T, /) -> None:
331334
"""Send a message to all broadcast receivers.
332335
@@ -414,6 +417,9 @@ def __init__(
414417
self._q: deque[_T] = deque(maxlen=limit)
415418
"""The receiver's internal message queue."""
416419

420+
self._closed: bool = False
421+
"""Whether the receiver is closed."""
422+
417423
def enqueue(self, message: _T, /) -> None:
418424
"""Put a message into this receiver's queue.
419425
@@ -441,6 +447,7 @@ def __len__(self) -> int:
441447
"""
442448
return len(self._q)
443449

450+
@override
444451
async def ready(self) -> bool:
445452
"""Wait until the receiver is ready with a message or an error.
446453
@@ -462,13 +469,14 @@ async def ready(self) -> bool:
462469
# consumed, then we return immediately.
463470
# pylint: disable=protected-access
464471
while len(self._q) == 0:
465-
if self._channel._closed:
472+
if self._channel._closed or self._closed:
466473
return False
467474
async with self._channel._recv_cv:
468475
await self._channel._recv_cv.wait()
469476
return True
470477
# pylint: enable=protected-access
471478

479+
@override
472480
def consume(self) -> _T:
473481
"""Return the latest message once `ready` is complete.
474482
@@ -481,9 +489,25 @@ def consume(self) -> _T:
481489
if not self._q and self._channel._closed: # pylint: disable=protected-access
482490
raise ReceiverStoppedError(self) from ChannelClosedError(self._channel)
483491

492+
if self._closed:
493+
raise ReceiverStoppedError(self)
494+
484495
assert self._q, "`consume()` must be preceded by a call to `ready()`"
485496
return self._q.popleft()
486497

498+
@override
499+
def close(self) -> None:
500+
"""Close the receiver.
501+
502+
After calling this method, new messages will not be received. Once the
503+
receiver's buffer is drained, trying to receive a message will raise a
504+
[`ReceiverStoppedError`][frequenz.channels.ReceiverStoppedError].
505+
"""
506+
self._closed = True
507+
self._channel._receivers.pop( # pylint: disable=protected-access
508+
hash(self), None
509+
)
510+
487511
def __str__(self) -> str:
488512
"""Return a string representation of this receiver."""
489513
return f"{self._channel}:{type(self).__name__}"

src/frequenz/channels/_merge.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,8 @@
5454
from collections import deque
5555
from typing import Any
5656

57+
from typing_extensions import override
58+
5759
from ._generic import ReceiverMessageT_co
5860
from ._receiver import Receiver, ReceiverStoppedError
5961

@@ -135,6 +137,7 @@ async def stop(self) -> None:
135137
await asyncio.gather(*self._pending, return_exceptions=True)
136138
self._pending = set()
137139

140+
@override
138141
async def ready(self) -> bool:
139142
"""Wait until the receiver is ready with a message or an error.
140143
@@ -171,6 +174,7 @@ async def ready(self) -> bool:
171174
asyncio.create_task(anext(self._receivers[name]), name=name)
172175
)
173176

177+
@override
174178
def consume(self) -> ReceiverMessageT_co:
175179
"""Return the latest message once `ready` is complete.
176180
@@ -187,6 +191,21 @@ def consume(self) -> ReceiverMessageT_co:
187191

188192
return self._results.popleft()
189193

194+
@override
195+
def close(self) -> None:
196+
"""Close the receiver.
197+
198+
After calling this method, new messages will not be received. Once the
199+
receiver's buffer is drained, trying to receive a message will raise a
200+
[`ReceiverStoppedError`][frequenz.channels.ReceiverStoppedError].
201+
"""
202+
for task in self._pending:
203+
if not task.done() and task.get_loop().is_running():
204+
task.cancel()
205+
self._pending = set()
206+
for recv in self._receivers.values():
207+
recv.close()
208+
190209
def __str__(self) -> str:
191210
"""Return a string representation of this receiver."""
192211
if len(self._receivers) > 3:

src/frequenz/channels/_receiver.py

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,8 @@
157157
from collections.abc import Callable
158158
from typing import TYPE_CHECKING, Any, Generic, Self, TypeGuard, TypeVar, overload
159159

160+
from typing_extensions import override
161+
160162
from ._exceptions import Error
161163
from ._generic import MappedMessageT_co, ReceiverMessageT_co
162164

@@ -215,6 +217,15 @@ def consume(self) -> ReceiverMessageT_co:
215217
ReceiverError: If there is some problem with the receiver.
216218
"""
217219

220+
def close(self) -> None:
221+
"""Close the receiver.
222+
223+
After calling this method, new messages will not be available from the receiver.
224+
Once the receiver's buffer is drained, trying to receive a message will raise a
225+
[`ReceiverStoppedError`][frequenz.channels.ReceiverStoppedError].
226+
"""
227+
raise NotImplementedError("close() must be implemented by subclasses")
228+
218229
def __aiter__(self) -> Self:
219230
"""Get an async iterator over the received messages.
220231
@@ -433,6 +444,7 @@ def __init__(
433444
)
434445
"""The function to apply on the input data."""
435446

447+
@override
436448
async def ready(self) -> bool:
437449
"""Wait until the receiver is ready with a message or an error.
438450
@@ -448,6 +460,7 @@ async def ready(self) -> bool:
448460

449461
# We need a noqa here because the docs have a Raises section but the code doesn't
450462
# explicitly raise anything.
463+
@override
451464
def consume(self) -> MappedMessageT_co: # noqa: DOC502
452465
"""Return a transformed message once `ready()` is complete.
453466
@@ -460,6 +473,16 @@ def consume(self) -> MappedMessageT_co: # noqa: DOC502
460473
"""
461474
return self._mapping_function(self._receiver.consume())
462475

476+
@override
477+
def close(self) -> None:
478+
"""Close the receiver.
479+
480+
After calling this method, new messages will not be received. Once the
481+
receiver's buffer is drained, trying to receive a message will raise a
482+
[`ReceiverStoppedError`][frequenz.channels.ReceiverStoppedError].
483+
"""
484+
self._receiver.close()
485+
463486
def __str__(self) -> str:
464487
"""Return a string representation of the mapper."""
465488
return f"{type(self).__name__}:{self._receiver}:{self._mapping_function}"
@@ -509,6 +532,7 @@ def __init__(
509532

510533
self._recv_closed = False
511534

535+
@override
512536
async def ready(self) -> bool:
513537
"""Wait until the receiver is ready with a message or an error.
514538
@@ -528,6 +552,7 @@ async def ready(self) -> bool:
528552
self._recv_closed = True
529553
return False
530554

555+
@override
531556
def consume(self) -> ReceiverMessageT_co:
532557
"""Return a transformed message once `ready()` is complete.
533558
@@ -547,6 +572,16 @@ def consume(self) -> ReceiverMessageT_co:
547572
self._next_message = _SENTINEL
548573
return message
549574

575+
@override
576+
def close(self) -> None:
577+
"""Close the receiver.
578+
579+
After calling this method, new messages will not be received. Once the
580+
receiver's buffer is drained, trying to receive a message will raise a
581+
[`ReceiverStoppedError`][frequenz.channels.ReceiverStoppedError].
582+
"""
583+
self._receiver.close()
584+
550585
def __str__(self) -> str:
551586
"""Return a string representation of the filter."""
552587
return f"{type(self).__name__}:{self._receiver}:{self._filter_function}"

src/frequenz/channels/event.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616

1717
import asyncio as _asyncio
1818

19+
from typing_extensions import override
20+
1921
from frequenz.channels._receiver import Receiver, ReceiverStoppedError
2022

2123

@@ -141,6 +143,7 @@ def set(self) -> None:
141143
self._is_set = True
142144
self._event.set()
143145

146+
@override
144147
async def ready(self) -> bool:
145148
"""Wait until this receiver is ready.
146149
@@ -152,6 +155,7 @@ async def ready(self) -> bool:
152155
await self._event.wait()
153156
return not self._is_stopped
154157

158+
@override
155159
def consume(self) -> None:
156160
"""Consume the event.
157161
@@ -168,6 +172,11 @@ def consume(self) -> None:
168172
self._is_set = False
169173
self._event.clear()
170174

175+
@override
176+
def close(self) -> None:
177+
"""Close this receiver."""
178+
self.stop()
179+
171180
def __str__(self) -> str:
172181
"""Return a string representation of this event."""
173182
return f"{type(self).__name__}({self._name!r})"

src/frequenz/channels/experimental/_relay_sender.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@
99

1010
import typing
1111

12+
from typing_extensions import override
13+
1214
from .._generic import SenderMessageT_contra
1315
from .._sender import Sender
1416

@@ -46,6 +48,7 @@ def __init__(self, *senders: Sender[SenderMessageT_contra]) -> None:
4648
"""
4749
self._senders = senders
4850

51+
@override
4952
async def send(self, message: SenderMessageT_contra, /) -> None:
5053
"""Send a message.
5154

src/frequenz/channels/file_watcher.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
from datetime import timedelta
2626
from enum import Enum
2727

28+
from typing_extensions import override
2829
from watchfiles import Change, awatch
2930
from watchfiles.main import FileChange
3031

@@ -185,6 +186,7 @@ def __del__(self) -> None:
185186
# is stopped.
186187
self._stop_event.set()
187188

189+
@override
188190
async def ready(self) -> bool:
189191
"""Wait until the receiver is ready with a message or an error.
190192
@@ -212,6 +214,7 @@ async def ready(self) -> bool:
212214

213215
return True
214216

217+
@override
215218
def consume(self) -> Event:
216219
"""Return the latest event once `ready` is complete.
217220
@@ -229,6 +232,11 @@ def consume(self) -> Event:
229232
change, path_str = self._changes.pop()
230233
return Event(type=EventType(change), path=pathlib.Path(path_str))
231234

235+
@override
236+
def close(self) -> None:
237+
"""Close this receiver."""
238+
self._stop_event.set()
239+
232240
def __str__(self) -> str:
233241
"""Return a string representation of this receiver."""
234242
if len(self._paths) > 3:

0 commit comments

Comments
 (0)