Skip to content

Commit 3dc9ba3

Browse files
committed
Ensure deleted broadcast receivers get cleaned up
... by storing just a weakref to a receiver in the channel object. Signed-off-by: Sahas Subramanian <[email protected]>
1 parent 57731d5 commit 3dc9ba3

File tree

2 files changed

+33
-20
lines changed

2 files changed

+33
-20
lines changed

src/frequenz/channels/broadcast.py

Lines changed: 11 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
from __future__ import annotations
77

88
import logging
9+
import weakref
910
from asyncio import Condition
1011
from collections import deque
1112
from typing import Deque, Dict, Generic, Optional
@@ -81,7 +82,7 @@ def __init__(self, name: str, resend_latest: bool = False) -> None:
8182
self._resend_latest = resend_latest
8283

8384
self.recv_cv: Condition = Condition()
84-
self.receivers: Dict[UUID, Receiver[T]] = {}
85+
self.receivers: Dict[UUID, weakref.ReferenceType[Receiver[T]]] = {}
8586
self.closed: bool = False
8687
self._latest: Optional[T] = None
8788

@@ -101,17 +102,6 @@ async def close(self) -> None:
101102
async with self.recv_cv:
102103
self.recv_cv.notify_all()
103104

104-
def _drop_receiver(self, uuid: UUID) -> None:
105-
"""Drop a specific receiver from the list of broadcast receivers.
106-
107-
Called from the destructors of receivers.
108-
109-
Args:
110-
uuid: a uuid identifying the receiver to be dropped.
111-
"""
112-
if uuid in self.receivers:
113-
del self.receivers[uuid]
114-
115105
def get_sender(self) -> Sender[T]:
116106
"""Create a new broadcast sender.
117107
@@ -140,7 +130,7 @@ def get_receiver(
140130
if name is None:
141131
name = str(uuid)
142132
recv: Receiver[T] = Receiver(uuid, name, maxsize, self)
143-
self.receivers[uuid] = recv
133+
self.receivers[uuid] = weakref.ref(recv)
144134
if self._resend_latest and self._latest is not None:
145135
recv.enqueue(self._latest)
146136
return recv
@@ -188,8 +178,15 @@ async def send(self, msg: T) -> bool:
188178
return False
189179
# pylint: disable=protected-access
190180
self._chan._latest = msg
191-
for recv in self._chan.receivers.values():
181+
stale_refs = []
182+
for name, recv_ref in self._chan.receivers.items():
183+
recv = recv_ref()
184+
if recv is None:
185+
stale_refs.append(name)
186+
continue
192187
recv.enqueue(msg)
188+
for name in stale_refs:
189+
del self._chan.receivers[name]
193190
async with self._chan.recv_cv:
194191
self._chan.recv_cv.notify_all()
195192
return True
@@ -225,11 +222,6 @@ def __init__(self, uuid: UUID, name: str, maxsize: int, chan: Broadcast[T]) -> N
225222

226223
self._active = True
227224

228-
def __del__(self) -> None:
229-
"""Drop this receiver from the list of Broadcast receivers."""
230-
if self._active:
231-
self._chan._drop_receiver(self._uuid)
232-
233225
def enqueue(self, msg: T) -> None:
234226
"""Put a message into this receiver's queue.
235227
@@ -295,7 +287,6 @@ def into_peekable(self) -> Peekable[T]:
295287
Returns:
296288
A `Peekable` instance.
297289
"""
298-
self._chan._drop_receiver(self._uuid) # pylint: disable=protected-access
299290
self._active = False
300291
return Peekable(self._chan)
301292

tests/test_broadcast.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -217,3 +217,25 @@ async def test_broadcast_map() -> None:
217217

218218
assert (await receiver.receive()) is False
219219
assert (await receiver.receive()) is True
220+
221+
222+
async def test_broadcast_receiver_drop() -> None:
223+
"""Ensure deleted receivers get cleaned up."""
224+
chan = Broadcast[int]("input-chan")
225+
sender = chan.get_sender()
226+
227+
receiver1 = chan.get_receiver()
228+
receiver2 = chan.get_receiver()
229+
230+
await sender.send(10)
231+
232+
assert 10 == await receiver1.receive()
233+
assert 10 == await receiver2.receive()
234+
235+
assert len(chan.receivers) == 2
236+
237+
del receiver2
238+
239+
await sender.send(20)
240+
241+
assert len(chan.receivers) == 1

0 commit comments

Comments
 (0)