Skip to content

Commit 4cceb39

Browse files
Ensure deleted receivers get cleaned up (#45)
`Broadcast` receivers were not getting cleaned up when then go out of scope, because `Broadcast` instances were holding on to a strong reference to them. And because these receivers were not being read from anymore, but were still getting messages, their buffers would overflow, and the logs would get flooded with warnings. This PR fixes this issue by storing just weak references to the receivers in the `Broadcast` instances.
2 parents 57731d5 + 59ea963 commit 4cceb39

File tree

3 files changed

+36
-21
lines changed

3 files changed

+36
-21
lines changed

RELEASE_NOTES.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,11 @@ time.
2020

2121
## Bug Fixes
2222

23+
* [`Broadcast`] receivers now get cleaned up once they go out of scope.
24+
2325
* [`Timer`] now returns [timezone-aware] `datetime` objects using UTC as
2426
timezone.
2527

26-
28+
[`Broadcast`]: https://frequenz-floss.github.io/frequenz-channels-python/v0.11/reference/frequenz/channels/#frequenz.channels.Broadcast
2729
[`Timer`]: https://frequenz-floss.github.io/frequenz-channels-python/v0.11/reference/frequenz/channels/#frequenz.channels.Timer
2830
[timezone-aware]: https://docs.python.org/3/library/datetime.html#aware-and-naive-objects

src/frequenz/channels/broadcast.py

Lines changed: 11 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
from __future__ import annotations
77

88
import logging
9+
import weakref
910
from asyncio import Condition
1011
from collections import deque
1112
from typing import Deque, Dict, Generic, Optional
@@ -81,7 +82,7 @@ def __init__(self, name: str, resend_latest: bool = False) -> None:
8182
self._resend_latest = resend_latest
8283

8384
self.recv_cv: Condition = Condition()
84-
self.receivers: Dict[UUID, Receiver[T]] = {}
85+
self.receivers: Dict[UUID, weakref.ReferenceType[Receiver[T]]] = {}
8586
self.closed: bool = False
8687
self._latest: Optional[T] = None
8788

@@ -101,17 +102,6 @@ async def close(self) -> None:
101102
async with self.recv_cv:
102103
self.recv_cv.notify_all()
103104

104-
def _drop_receiver(self, uuid: UUID) -> None:
105-
"""Drop a specific receiver from the list of broadcast receivers.
106-
107-
Called from the destructors of receivers.
108-
109-
Args:
110-
uuid: a uuid identifying the receiver to be dropped.
111-
"""
112-
if uuid in self.receivers:
113-
del self.receivers[uuid]
114-
115105
def get_sender(self) -> Sender[T]:
116106
"""Create a new broadcast sender.
117107
@@ -140,7 +130,7 @@ def get_receiver(
140130
if name is None:
141131
name = str(uuid)
142132
recv: Receiver[T] = Receiver(uuid, name, maxsize, self)
143-
self.receivers[uuid] = recv
133+
self.receivers[uuid] = weakref.ref(recv)
144134
if self._resend_latest and self._latest is not None:
145135
recv.enqueue(self._latest)
146136
return recv
@@ -188,8 +178,15 @@ async def send(self, msg: T) -> bool:
188178
return False
189179
# pylint: disable=protected-access
190180
self._chan._latest = msg
191-
for recv in self._chan.receivers.values():
181+
stale_refs = []
182+
for name, recv_ref in self._chan.receivers.items():
183+
recv = recv_ref()
184+
if recv is None:
185+
stale_refs.append(name)
186+
continue
192187
recv.enqueue(msg)
188+
for name in stale_refs:
189+
del self._chan.receivers[name]
193190
async with self._chan.recv_cv:
194191
self._chan.recv_cv.notify_all()
195192
return True
@@ -225,11 +222,6 @@ def __init__(self, uuid: UUID, name: str, maxsize: int, chan: Broadcast[T]) -> N
225222

226223
self._active = True
227224

228-
def __del__(self) -> None:
229-
"""Drop this receiver from the list of Broadcast receivers."""
230-
if self._active:
231-
self._chan._drop_receiver(self._uuid)
232-
233225
def enqueue(self, msg: T) -> None:
234226
"""Put a message into this receiver's queue.
235227
@@ -295,7 +287,6 @@ def into_peekable(self) -> Peekable[T]:
295287
Returns:
296288
A `Peekable` instance.
297289
"""
298-
self._chan._drop_receiver(self._uuid) # pylint: disable=protected-access
299290
self._active = False
300291
return Peekable(self._chan)
301292

tests/test_broadcast.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -217,3 +217,25 @@ async def test_broadcast_map() -> None:
217217

218218
assert (await receiver.receive()) is False
219219
assert (await receiver.receive()) is True
220+
221+
222+
async def test_broadcast_receiver_drop() -> None:
223+
"""Ensure deleted receivers get cleaned up."""
224+
chan = Broadcast[int]("input-chan")
225+
sender = chan.get_sender()
226+
227+
receiver1 = chan.get_receiver()
228+
receiver2 = chan.get_receiver()
229+
230+
await sender.send(10)
231+
232+
assert 10 == await receiver1.receive()
233+
assert 10 == await receiver2.receive()
234+
235+
assert len(chan.receivers) == 2
236+
237+
del receiver2
238+
239+
await sender.send(20)
240+
241+
assert len(chan.receivers) == 1

0 commit comments

Comments
 (0)