Skip to content

Commit 0550126

Browse files
committed
Use the new public type variables
Signed-off-by: Leandro Lucarella <[email protected]>
1 parent 808440d commit 0550126

File tree

4 files changed

+55
-41
lines changed

4 files changed

+55
-41
lines changed

src/frequenz/channels/_anycast.py

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,15 +11,14 @@
1111
from typing import Generic, TypeVar
1212

1313
from ._exceptions import ChannelClosedError
14+
from ._generic import ChannelMessageT
1415
from ._receiver import Receiver, ReceiverStoppedError
1516
from ._sender import Sender, SenderError
1617

1718
_logger = logging.getLogger(__name__)
1819

19-
_T = TypeVar("_T")
20-
2120

22-
class Anycast(Generic[_T]):
21+
class Anycast(Generic[ChannelMessageT]):
2322
"""A channel that delivers each message to exactly one receiver.
2423
2524
# Description
@@ -213,7 +212,7 @@ def __init__(self, *, name: str, limit: int = 10) -> None:
213212
of the channel.
214213
"""
215214

216-
self._deque: deque[_T] = deque(maxlen=limit)
215+
self._deque: deque[ChannelMessageT] = deque(maxlen=limit)
217216
"""The channel's buffer."""
218217

219218
self._send_cv: Condition = Condition()
@@ -282,11 +281,11 @@ async def close(self) -> None:
282281
async with self._recv_cv:
283282
self._recv_cv.notify_all()
284283

285-
def new_sender(self) -> Sender[_T]:
284+
def new_sender(self) -> Sender[ChannelMessageT]:
286285
"""Return a new sender attached to this channel."""
287286
return _Sender(self)
288287

289-
def new_receiver(self) -> Receiver[_T]:
288+
def new_receiver(self) -> Receiver[ChannelMessageT]:
290289
"""Return a new receiver attached to this channel."""
291290
return _Receiver(self)
292291

@@ -302,6 +301,9 @@ def __repr__(self) -> str:
302301
)
303302

304303

304+
_T = TypeVar("_T")
305+
306+
305307
class _Sender(Sender[_T]):
306308
"""A sender to send messages to an Anycast channel.
307309

src/frequenz/channels/_broadcast.py

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -12,15 +12,14 @@
1212
from typing import Generic, TypeVar
1313

1414
from ._exceptions import ChannelClosedError
15+
from ._generic import ChannelMessageT
1516
from ._receiver import Receiver, ReceiverStoppedError
1617
from ._sender import Sender, SenderError
1718

1819
_logger = logging.Logger(__name__)
1920

20-
_T = TypeVar("_T")
21-
2221

23-
class Broadcast(Generic[_T]):
22+
class Broadcast(Generic[ChannelMessageT]):
2423
"""A channel that deliver all messages to all receivers.
2524
2625
# Description
@@ -206,13 +205,15 @@ def __init__(self, *, name: str, resend_latest: bool = False) -> None:
206205
self._recv_cv: Condition = Condition()
207206
"""The condition to wait for data in the channel's buffer."""
208207

209-
self._receivers: dict[int, weakref.ReferenceType[_Receiver[_T]]] = {}
208+
self._receivers: dict[
209+
int, weakref.ReferenceType[_Receiver[ChannelMessageT]]
210+
] = {}
210211
"""The receivers attached to the channel, indexed by their hash()."""
211212

212213
self._closed: bool = False
213214
"""Whether the channel is closed."""
214215

215-
self._latest: _T | None = None
216+
self._latest: ChannelMessageT | None = None
216217
"""The latest message sent to the channel."""
217218

218219
self.resend_latest: bool = resend_latest
@@ -261,11 +262,13 @@ async def close(self) -> None:
261262
async with self._recv_cv:
262263
self._recv_cv.notify_all()
263264

264-
def new_sender(self) -> Sender[_T]:
265+
def new_sender(self) -> Sender[ChannelMessageT]:
265266
"""Return a new sender attached to this channel."""
266267
return _Sender(self)
267268

268-
def new_receiver(self, *, name: str | None = None, limit: int = 50) -> Receiver[_T]:
269+
def new_receiver(
270+
self, *, name: str | None = None, limit: int = 50
271+
) -> Receiver[ChannelMessageT]:
269272
"""Return a new receiver attached to this channel.
270273
271274
Broadcast receivers have their own buffer, and when messages are not
@@ -279,7 +282,7 @@ def new_receiver(self, *, name: str | None = None, limit: int = 50) -> Receiver[
279282
Returns:
280283
A new receiver attached to this channel.
281284
"""
282-
recv: _Receiver[_T] = _Receiver(self, name=name, limit=limit)
285+
recv: _Receiver[ChannelMessageT] = _Receiver(self, name=name, limit=limit)
283286
self._receivers[hash(recv)] = weakref.ref(recv)
284287
if self.resend_latest and self._latest is not None:
285288
recv.enqueue(self._latest)
@@ -300,6 +303,9 @@ def __repr__(self) -> str:
300303
)
301304

302305

306+
_T = TypeVar("_T")
307+
308+
303309
class _Sender(Sender[_T]):
304310
"""A sender to send messages to the broadcast channel.
305311

src/frequenz/channels/_receiver.py

Lines changed: 26 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -136,18 +136,16 @@
136136

137137
from abc import ABC, abstractmethod
138138
from collections.abc import Callable
139-
from typing import Generic, Self, TypeVar
139+
from typing import Generic, Self
140140

141141
from ._exceptions import Error
142+
from ._generic import MappedMessageT_co, ReceiverMessageT_co
142143

143-
_T_co = TypeVar("_T_co", covariant=True)
144-
_U_co = TypeVar("_U_co", covariant=True)
145144

146-
147-
class Receiver(ABC, Generic[_T_co]):
145+
class Receiver(ABC, Generic[ReceiverMessageT_co]):
148146
"""An endpoint to receive messages."""
149147

150-
async def __anext__(self) -> _T_co:
148+
async def __anext__(self) -> ReceiverMessageT_co:
151149
"""Await the next message in the async iteration over received messages.
152150
153151
Returns:
@@ -177,7 +175,7 @@ async def ready(self) -> bool:
177175
"""
178176

179177
@abstractmethod
180-
def consume(self) -> _T_co:
178+
def consume(self) -> ReceiverMessageT_co:
181179
"""Return the latest message once `ready()` is complete.
182180
183181
`ready()` must be called before each call to `consume()`.
@@ -198,7 +196,7 @@ def __aiter__(self) -> Self:
198196
"""
199197
return self
200198

201-
async def receive(self) -> _T_co:
199+
async def receive(self) -> ReceiverMessageT_co:
202200
"""Receive a message.
203201
204202
Returns:
@@ -225,7 +223,9 @@ async def receive(self) -> _T_co:
225223
raise ReceiverStoppedError(self) from exc
226224
return received
227225

228-
def map(self, mapping_function: Callable[[_T_co], _U_co], /) -> Receiver[_U_co]:
226+
def map(
227+
self, mapping_function: Callable[[ReceiverMessageT_co], MappedMessageT_co], /
228+
) -> Receiver[MappedMessageT_co]:
229229
"""Apply a mapping function on the received message.
230230
231231
Tip:
@@ -243,13 +243,13 @@ def map(self, mapping_function: Callable[[_T_co], _U_co], /) -> Receiver[_U_co]:
243243
return _Mapper(receiver=self, mapping_function=mapping_function)
244244

245245

246-
class ReceiverError(Error, Generic[_T_co]):
246+
class ReceiverError(Error, Generic[ReceiverMessageT_co]):
247247
"""An error that originated in a [Receiver][frequenz.channels.Receiver].
248248
249249
All exceptions generated by receivers inherit from this exception.
250250
"""
251251

252-
def __init__(self, message: str, receiver: Receiver[_T_co]):
252+
def __init__(self, message: str, receiver: Receiver[ReceiverMessageT_co]):
253253
"""Initialize this error.
254254
255255
Args:
@@ -258,14 +258,14 @@ def __init__(self, message: str, receiver: Receiver[_T_co]):
258258
error happened.
259259
"""
260260
super().__init__(message)
261-
self.receiver: Receiver[_T_co] = receiver
261+
self.receiver: Receiver[ReceiverMessageT_co] = receiver
262262
"""The receiver where the error happened."""
263263

264264

265-
class ReceiverStoppedError(ReceiverError[_T_co]):
265+
class ReceiverStoppedError(ReceiverError[ReceiverMessageT_co]):
266266
"""A stopped [`Receiver`][frequenz.channels.Receiver] was used."""
267267

268-
def __init__(self, receiver: Receiver[_T_co]):
268+
def __init__(self, receiver: Receiver[ReceiverMessageT_co]):
269269
"""Initialize this error.
270270
271271
Args:
@@ -275,7 +275,9 @@ def __init__(self, receiver: Receiver[_T_co]):
275275
super().__init__(f"Receiver {receiver} was stopped", receiver)
276276

277277

278-
class _Mapper(Receiver[_U_co], Generic[_T_co, _U_co]):
278+
class _Mapper(
279+
Receiver[MappedMessageT_co], Generic[ReceiverMessageT_co, MappedMessageT_co]
280+
):
279281
"""Apply a transform function on a channel receiver.
280282
281283
Has two generic types:
@@ -285,18 +287,23 @@ class _Mapper(Receiver[_U_co], Generic[_T_co, _U_co]):
285287
"""
286288

287289
def __init__(
288-
self, *, receiver: Receiver[_T_co], mapping_function: Callable[[_T_co], _U_co]
290+
self,
291+
*,
292+
receiver: Receiver[ReceiverMessageT_co],
293+
mapping_function: Callable[[ReceiverMessageT_co], MappedMessageT_co],
289294
) -> None:
290295
"""Initialize this receiver mapper.
291296
292297
Args:
293298
receiver: The input receiver.
294299
mapping_function: The function to apply on the input data.
295300
"""
296-
self._receiver: Receiver[_T_co] = receiver
301+
self._receiver: Receiver[ReceiverMessageT_co] = receiver
297302
"""The input receiver."""
298303

299-
self._mapping_function: Callable[[_T_co], _U_co] = mapping_function
304+
self._mapping_function: Callable[[ReceiverMessageT_co], MappedMessageT_co] = (
305+
mapping_function
306+
)
300307
"""The function to apply on the input data."""
301308

302309
async def ready(self) -> bool:
@@ -314,7 +321,7 @@ async def ready(self) -> bool:
314321

315322
# We need a noqa here because the docs have a Raises section but the code doesn't
316323
# explicitly raise anything.
317-
def consume(self) -> _U_co: # noqa: DOC502
324+
def consume(self) -> MappedMessageT_co: # noqa: DOC502
318325
"""Return a transformed message once `ready()` is complete.
319326
320327
Returns:

src/frequenz/channels/_sender.py

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -50,18 +50,17 @@
5050
"""
5151

5252
from abc import ABC, abstractmethod
53-
from typing import Generic, TypeVar
53+
from typing import Generic
5454

5555
from ._exceptions import Error
56+
from ._generic import SenderMessageT_contra
5657

57-
_T_contra = TypeVar("_T_contra", contravariant=True)
5858

59-
60-
class Sender(ABC, Generic[_T_contra]):
59+
class Sender(ABC, Generic[SenderMessageT_contra]):
6160
"""An endpoint to sends messages."""
6261

6362
@abstractmethod
64-
async def send(self, message: _T_contra, /) -> None:
63+
async def send(self, message: SenderMessageT_contra, /) -> None:
6564
"""Send a message.
6665
6766
Args:
@@ -72,13 +71,13 @@ async def send(self, message: _T_contra, /) -> None:
7271
"""
7372

7473

75-
class SenderError(Error, Generic[_T_contra]):
74+
class SenderError(Error, Generic[SenderMessageT_contra]):
7675
"""An error that originated in a [Sender][frequenz.channels.Sender].
7776
7877
All exceptions generated by senders inherit from this exception.
7978
"""
8079

81-
def __init__(self, message: str, sender: Sender[_T_contra]):
80+
def __init__(self, message: str, sender: Sender[SenderMessageT_contra]):
8281
"""Initialize this error.
8382
8483
Args:
@@ -87,5 +86,5 @@ def __init__(self, message: str, sender: Sender[_T_contra]):
8786
happened.
8887
"""
8988
super().__init__(message)
90-
self.sender: Sender[_T_contra] = sender
89+
self.sender: Sender[SenderMessageT_contra] = sender
9190
"""The sender where the error happened."""

0 commit comments

Comments
 (0)