Skip to content

Commit d070b79

Browse files
committed
Add a close method to the Receiver interface
Also implement the method in all classes implementing the `Receiver` interface. Signed-off-by: Sahas Subramanian <[email protected]>
1 parent b6475b7 commit d070b79

File tree

7 files changed

+96
-1
lines changed

7 files changed

+96
-1
lines changed

src/frequenz/channels/_anycast.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -391,6 +391,9 @@ def __init__(self, channel: Anycast[_T], /) -> None:
391391
self._channel: Anycast[_T] = channel
392392
"""The channel that this receiver belongs to."""
393393

394+
self._closed: bool = False
395+
"""Whether the receiver is closed."""
396+
394397
self._next: _T | type[_Empty] = _Empty
395398

396399
@override
@@ -409,6 +412,9 @@ async def ready(self) -> bool:
409412
if self._next is not _Empty:
410413
return True
411414

415+
if self._closed:
416+
return False
417+
412418
# pylint: disable=protected-access
413419
while len(self._channel._deque) == 0:
414420
if self._channel._closed:
@@ -436,6 +442,9 @@ def consume(self) -> _T:
436442
):
437443
raise ReceiverStoppedError(self) from ChannelClosedError(self._channel)
438444

445+
if self._next is _Empty and self._closed:
446+
raise ReceiverStoppedError(self)
447+
439448
assert (
440449
self._next is not _Empty
441450
), "`consume()` must be preceded by a call to `ready()`"
@@ -446,6 +455,14 @@ def consume(self) -> _T:
446455

447456
return next_val
448457

458+
@override
459+
def close(self) -> None:
460+
"""Close this receiver.
461+
462+
After closing, the receiver will not be able to receive any more messages.
463+
"""
464+
self._closed = True
465+
449466
def __str__(self) -> str:
450467
"""Return a string representation of this receiver."""
451468
return f"{self._channel}:{type(self).__name__}"

src/frequenz/channels/_broadcast.py

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -417,6 +417,9 @@ def __init__(
417417
self._q: deque[_T] = deque(maxlen=limit)
418418
"""The receiver's internal message queue."""
419419

420+
self._closed: bool = False
421+
"""Whether the receiver is closed."""
422+
420423
def enqueue(self, message: _T, /) -> None:
421424
"""Put a message into this receiver's queue.
422425
@@ -466,7 +469,7 @@ async def ready(self) -> bool:
466469
# consumed, then we return immediately.
467470
# pylint: disable=protected-access
468471
while len(self._q) == 0:
469-
if self._channel._closed:
472+
if self._channel._closed or self._closed:
470473
return False
471474
async with self._channel._recv_cv:
472475
await self._channel._recv_cv.wait()
@@ -486,9 +489,25 @@ def consume(self) -> _T:
486489
if not self._q and self._channel._closed: # pylint: disable=protected-access
487490
raise ReceiverStoppedError(self) from ChannelClosedError(self._channel)
488491

492+
if self._closed:
493+
raise ReceiverStoppedError(self)
494+
489495
assert self._q, "`consume()` must be preceded by a call to `ready()`"
490496
return self._q.popleft()
491497

498+
@override
499+
def close(self) -> None:
500+
"""Close the receiver.
501+
502+
After calling this method, new messages will not be received. Once the
503+
receiver's buffer is drained, trying to receive a message will raise a
504+
[`ReceiverStoppedError`][frequenz.channels.ReceiverStoppedError].
505+
"""
506+
self._closed = True
507+
self._channel._receivers.pop( # pylint: disable=protected-access
508+
hash(self), None
509+
)
510+
492511
def __str__(self) -> str:
493512
"""Return a string representation of this receiver."""
494513
return f"{self._channel}:{type(self).__name__}"

src/frequenz/channels/_merge.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,21 @@ def consume(self) -> ReceiverMessageT_co:
191191

192192
return self._results.popleft()
193193

194+
@override
195+
def close(self) -> None:
196+
"""Close the receiver.
197+
198+
After calling this method, new messages will not be received. Once the
199+
receiver's buffer is drained, trying to receive a message will raise a
200+
[`ReceiverStoppedError`][frequenz.channels.ReceiverStoppedError].
201+
"""
202+
for task in self._pending:
203+
if not task.done() and task.get_loop().is_running():
204+
task.cancel()
205+
self._pending = set()
206+
for recv in self._receivers.values():
207+
recv.close()
208+
194209
def __str__(self) -> str:
195210
"""Return a string representation of this receiver."""
196211
if len(self._receivers) > 3:

src/frequenz/channels/_receiver.py

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -217,6 +217,15 @@ def consume(self) -> ReceiverMessageT_co:
217217
ReceiverError: If there is some problem with the receiver.
218218
"""
219219

220+
def close(self) -> None:
221+
"""Close the receiver.
222+
223+
After calling this method, new messages will not be available from the receiver.
224+
Once the receiver's buffer is drained, trying to receive a message will raise a
225+
[`ReceiverStoppedError`][frequenz.channels.ReceiverStoppedError].
226+
"""
227+
raise NotImplementedError("close() must be implemented by subclasses")
228+
220229
def __aiter__(self) -> Self:
221230
"""Get an async iterator over the received messages.
222231
@@ -464,6 +473,16 @@ def consume(self) -> MappedMessageT_co: # noqa: DOC502
464473
"""
465474
return self._mapping_function(self._receiver.consume())
466475

476+
@override
477+
def close(self) -> None:
478+
"""Close the receiver.
479+
480+
After calling this method, new messages will not be received. Once the
481+
receiver's buffer is drained, trying to receive a message will raise a
482+
[`ReceiverStoppedError`][frequenz.channels.ReceiverStoppedError].
483+
"""
484+
self._receiver.close()
485+
467486
def __str__(self) -> str:
468487
"""Return a string representation of the mapper."""
469488
return f"{type(self).__name__}:{self._receiver}:{self._mapping_function}"
@@ -553,6 +572,16 @@ def consume(self) -> ReceiverMessageT_co:
553572
self._next_message = _SENTINEL
554573
return message
555574

575+
@override
576+
def close(self) -> None:
577+
"""Close the receiver.
578+
579+
After calling this method, new messages will not be received. Once the
580+
receiver's buffer is drained, trying to receive a message will raise a
581+
[`ReceiverStoppedError`][frequenz.channels.ReceiverStoppedError].
582+
"""
583+
self._receiver.close()
584+
556585
def __str__(self) -> str:
557586
"""Return a string representation of the filter."""
558587
return f"{type(self).__name__}:{self._receiver}:{self._filter_function}"

src/frequenz/channels/event.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,11 @@ def consume(self) -> None:
172172
self._is_set = False
173173
self._event.clear()
174174

175+
@override
176+
def close(self) -> None:
177+
"""Close this receiver."""
178+
self.stop()
179+
175180
def __str__(self) -> str:
176181
"""Return a string representation of this event."""
177182
return f"{type(self).__name__}({self._name!r})"

src/frequenz/channels/file_watcher.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -232,6 +232,11 @@ def consume(self) -> Event:
232232
change, path_str = self._changes.pop()
233233
return Event(type=EventType(change), path=pathlib.Path(path_str))
234234

235+
@override
236+
def close(self) -> None:
237+
"""Close this receiver."""
238+
self._stop_event.set()
239+
235240
def __str__(self) -> str:
236241
"""Return a string representation of this receiver."""
237242
if len(self._paths) > 3:

src/frequenz/channels/timer.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -745,6 +745,11 @@ def consume(self) -> timedelta:
745745
self._current_drift = None
746746
return drift
747747

748+
@override
749+
def close(self) -> None:
750+
"""Close the timer."""
751+
self.stop()
752+
748753
def _now(self) -> int:
749754
"""Return the current monotonic clock time in microseconds.
750755

0 commit comments

Comments
 (0)