Skip to content

Commit a1957a9

Browse files
authored
Make internal variable names private (#213)
The `Anycast` and `Broadcast` implementations were exposing internal variables to users. This shouldn't be the case and is fixed in this PR. Closes #186
2 parents 26f5285 + 68a0a6b commit a1957a9

File tree

5 files changed

+63
-50
lines changed

5 files changed

+63
-50
lines changed

RELEASE_NOTES.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ The `Timer` now can be started with a delay.
66

77
## Upgrading
88

9-
<!-- Here goes notes on how to upgrade from previous versions, including deprecations and what they should be replaced with -->
9+
* Internal variable names in the `Anycast` and `Broadcast` implementations are now private.
1010

1111
## New Features
1212

benchmarks/benchmark_broadcast.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ async def fast_sender(num_messages: int, chan: Sender[int]) -> None:
4444

4545

4646
async def benchmark_broadcast(
47-
send_msg: Callable[[int, Sender[int]], Coroutine[Any, Any, int]],
47+
send_msg: Callable[[int, Sender[int]], Coroutine[Any, Any, None]],
4848
num_channels: int,
4949
num_messages: int,
5050
num_receivers: int,

src/frequenz/channels/_anycast.py

Lines changed: 31 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -67,34 +67,34 @@ def __init__(self, maxsize: int = 10) -> None:
6767
Args:
6868
maxsize: Size of the channel's buffer.
6969
"""
70-
self.limit: int = maxsize
70+
self._limit: int = maxsize
7171
"""The maximum number of values that can be stored in the channel's buffer.
7272
7373
If the length of channel's buffer reaches the limit, then the sender
7474
blocks at the [send()][frequenz.channels.Sender.send] method until
7575
a value is consumed.
7676
"""
7777

78-
self.deque: Deque[T] = deque(maxlen=maxsize)
78+
self._deque: Deque[T] = deque(maxlen=maxsize)
7979
"""The channel's buffer."""
8080

81-
self.send_cv: Condition = Condition()
81+
self._send_cv: Condition = Condition()
8282
"""The condition to wait for free space in the channel's buffer.
8383
8484
If the channel's buffer is full, then the sender waits for values to
8585
get consumed using this condition until there's some free space
8686
available in the channel's buffer.
8787
"""
8888

89-
self.recv_cv: Condition = Condition()
89+
self._recv_cv: Condition = Condition()
9090
"""The condition to wait for values in the channel's buffer.
9191
9292
If the channel's buffer is empty, then the receiver waits for values
9393
using this condition until there's a value available in the channel's
9494
buffer.
9595
"""
9696

97-
self.closed: bool = False
97+
self._closed: bool = False
9898
"""Whether the channel is closed."""
9999

100100
async def close(self) -> None:
@@ -109,11 +109,11 @@ async def close(self) -> None:
109109
immediately.
110110
111111
"""
112-
self.closed = True
113-
async with self.send_cv:
114-
self.send_cv.notify_all()
115-
async with self.recv_cv:
116-
self.recv_cv.notify_all()
112+
self._closed = True
113+
async with self._send_cv:
114+
self._send_cv.notify_all()
115+
async with self._recv_cv:
116+
self._recv_cv.notify_all()
117117

118118
def new_sender(self) -> Sender[T]:
119119
"""Create a new sender.
@@ -164,16 +164,18 @@ async def send(self, msg: T) -> None:
164164
A [ChannelClosedError][frequenz.channels.ChannelClosedError] is
165165
set as the cause.
166166
"""
167-
if self._chan.closed:
167+
# pylint: disable=protected-access
168+
if self._chan._closed:
168169
raise SenderError("The channel was closed", self) from ChannelClosedError(
169170
self._chan
170171
)
171-
while len(self._chan.deque) == self._chan.deque.maxlen:
172-
async with self._chan.send_cv:
173-
await self._chan.send_cv.wait()
174-
self._chan.deque.append(msg)
175-
async with self._chan.recv_cv:
176-
self._chan.recv_cv.notify(1)
172+
while len(self._chan._deque) == self._chan._deque.maxlen:
173+
async with self._chan._send_cv:
174+
await self._chan._send_cv.wait()
175+
self._chan._deque.append(msg)
176+
async with self._chan._recv_cv:
177+
self._chan._recv_cv.notify(1)
178+
# pylint: enable=protected-access
177179

178180

179181
class _Empty:
@@ -213,14 +215,16 @@ async def ready(self) -> bool:
213215
if self._next is not _Empty:
214216
return True
215217

216-
while len(self._chan.deque) == 0:
217-
if self._chan.closed:
218+
# pylint: disable=protected-access
219+
while len(self._chan._deque) == 0:
220+
if self._chan._closed:
218221
return False
219-
async with self._chan.recv_cv:
220-
await self._chan.recv_cv.wait()
221-
self._next = self._chan.deque.popleft()
222-
async with self._chan.send_cv:
223-
self._chan.send_cv.notify(1)
222+
async with self._chan._recv_cv:
223+
await self._chan._recv_cv.wait()
224+
self._next = self._chan._deque.popleft()
225+
async with self._chan._send_cv:
226+
self._chan._send_cv.notify(1)
227+
# pylint: enable=protected-access
224228
return True
225229

226230
def consume(self) -> T:
@@ -233,7 +237,9 @@ def consume(self) -> T:
233237
ReceiverStoppedError: if the receiver stopped producing messages.
234238
ReceiverError: if there is some problem with the receiver.
235239
"""
236-
if self._next is _Empty and self._chan.closed:
240+
if ( # pylint: disable=protected-access
241+
self._next is _Empty and self._chan._closed
242+
):
237243
raise ReceiverStoppedError(self) from ChannelClosedError(self._chan)
238244

239245
assert (

src/frequenz/channels/_broadcast.py

Lines changed: 26 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ def __init__(self, name: str, resend_latest: bool = False) -> None:
8484
get the latest value as soon as they are created, without having
8585
to wait for the next message on the channel to arrive.
8686
"""
87-
self.name: str = name
87+
self._name: str = name
8888
"""The name of the broadcast channel.
8989
9090
Only used for debugging purposes.
@@ -93,13 +93,13 @@ def __init__(self, name: str, resend_latest: bool = False) -> None:
9393
self._resend_latest = resend_latest
9494
"""Whether to resend the latest value to new receivers."""
9595

96-
self.recv_cv: Condition = Condition()
96+
self._recv_cv: Condition = Condition()
9797
"""The condition to wait for data in the channel's buffer."""
9898

99-
self.receivers: dict[UUID, weakref.ReferenceType[Receiver[T]]] = {}
99+
self._receivers: dict[UUID, weakref.ReferenceType[Receiver[T]]] = {}
100100
"""The receivers attached to the channel, indexed by their UUID."""
101101

102-
self.closed: bool = False
102+
self._closed: bool = False
103103
"""Whether the channel is closed."""
104104

105105
self._latest: T | None = None
@@ -117,9 +117,9 @@ async def close(self) -> None:
117117
immediately.
118118
"""
119119
self._latest = None
120-
self.closed = True
121-
async with self.recv_cv:
122-
self.recv_cv.notify_all()
120+
self._closed = True
121+
async with self._recv_cv:
122+
self._recv_cv.notify_all()
123123

124124
def new_sender(self) -> Sender[T]:
125125
"""Create a new broadcast sender.
@@ -147,7 +147,7 @@ def new_receiver(self, name: str | None = None, maxsize: int = 50) -> Receiver[T
147147
if name is None:
148148
name = str(uuid)
149149
recv: Receiver[T] = Receiver(uuid, name, maxsize, self)
150-
self.receivers[uuid] = weakref.ref(recv)
150+
self._receivers[uuid] = weakref.ref(recv)
151151
if self._resend_latest and self._latest is not None:
152152
recv.enqueue(self._latest)
153153
return recv
@@ -193,23 +193,24 @@ async def send(self, msg: T) -> None:
193193
A [ChannelClosedError][frequenz.channels.ChannelClosedError] is
194194
set as the cause.
195195
"""
196-
if self._chan.closed:
196+
# pylint: disable=protected-access
197+
if self._chan._closed:
197198
raise SenderError("The channel was closed", self) from ChannelClosedError(
198199
self._chan
199200
)
200-
# pylint: disable=protected-access
201201
self._chan._latest = msg
202202
stale_refs = []
203-
for name, recv_ref in self._chan.receivers.items():
203+
for name, recv_ref in self._chan._receivers.items():
204204
recv = recv_ref()
205205
if recv is None:
206206
stale_refs.append(name)
207207
continue
208208
recv.enqueue(msg)
209209
for name in stale_refs:
210-
del self._chan.receivers[name]
211-
async with self._chan.recv_cv:
212-
self._chan.recv_cv.notify_all()
210+
del self._chan._receivers[name]
211+
async with self._chan._recv_cv:
212+
self._chan._recv_cv.notify_all()
213+
# pylint: enable=protected-access
213214

214215

215216
class Receiver(BaseReceiver[T]):
@@ -271,7 +272,7 @@ def enqueue(self, msg: T) -> None:
271272
self._q.popleft()
272273
logger.warning(
273274
"Broadcast receiver [%s:%s] is full. Oldest message was dropped.",
274-
self._chan.name,
275+
self._chan._name, # pylint: disable=protected-access
275276
self._name,
276277
)
277278
self._q.append(msg)
@@ -307,18 +308,22 @@ async def ready(self) -> bool:
307308
#
308309
# The condition also makes sure that if there are already messages ready to be
309310
# consumed, then we return immediately.
311+
# pylint: disable=protected-access
310312
while len(self._q) == 0:
311-
if self._chan.closed:
313+
if self._chan._closed:
312314
return False
313-
async with self._chan.recv_cv:
314-
await self._chan.recv_cv.wait()
315+
async with self._chan._recv_cv:
316+
await self._chan._recv_cv.wait()
315317
return True
318+
# pylint: enable=protected-access
316319

317320
def _deactivate(self) -> None:
318321
"""Set the receiver as inactive and remove it from the channel."""
319322
self._active = False
320-
if self._uuid in self._chan.receivers:
321-
del self._chan.receivers[self._uuid]
323+
# pylint: disable=protected-access
324+
if self._uuid in self._chan._receivers:
325+
del self._chan._receivers[self._uuid]
326+
# pylint: enable=protected-access
322327

323328
def consume(self) -> T:
324329
"""Return the latest value once `ready` is complete.
@@ -337,7 +342,7 @@ def consume(self) -> T:
337342
self,
338343
)
339344

340-
if not self._q and self._chan.closed:
345+
if not self._q and self._chan._closed: # pylint: disable=protected-access
341346
raise ReceiverStoppedError(self) from ChannelClosedError(self._chan)
342347

343348
assert self._q, "`consume()` must be preceded by a call to `ready()`"

tests/test_broadcast.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -265,10 +265,12 @@ async def test_broadcast_receiver_drop() -> None:
265265
assert 10 == await receiver1.receive()
266266
assert 10 == await receiver2.receive()
267267

268-
assert len(chan.receivers) == 2
268+
# pylint: disable=protected-access
269+
assert len(chan._receivers) == 2
269270

270271
del receiver2
271272

272273
await sender.send(20)
273274

274-
assert len(chan.receivers) == 1
275+
assert len(chan._receivers) == 1
276+
# pylint: enable=protected-access

0 commit comments

Comments
 (0)