|
52 | 52 | import asyncio |
53 | 53 | import itertools |
54 | 54 | from collections import deque |
55 | | -from typing import Any, TypeVar |
| 55 | +from typing import Any |
56 | 56 |
|
| 57 | +from ._generic import ReceiverMessageT_co |
57 | 58 | from ._receiver import Receiver, ReceiverStoppedError |
58 | 59 |
|
59 | | -_T = TypeVar("_T") |
60 | 60 |
|
61 | | - |
62 | | -def merge(*receivers: Receiver[_T]) -> Merger[_T]: |
| 61 | +def merge(*receivers: Receiver[ReceiverMessageT_co]) -> Merger[ReceiverMessageT_co]: |
63 | 62 | """Merge messages coming from multiple receivers into a single stream. |
64 | 63 |
|
65 | 64 | Example: |
@@ -95,31 +94,33 @@ def merge(*receivers: Receiver[_T]) -> Merger[_T]: |
95 | 94 | return Merger(*receivers, name="merge") |
96 | 95 |
|
97 | 96 |
|
98 | | -class Merger(Receiver[_T]): |
| 97 | +class Merger(Receiver[ReceiverMessageT_co]): |
99 | 98 | """A receiver that merges messages coming from multiple receivers into a single stream. |
100 | 99 |
|
101 | 100 | Tip: |
102 | 101 | Please consider using the more idiomatic [`merge()`][frequenz.channels.merge] |
103 | 102 | function instead of creating a `Merger` instance directly. |
104 | 103 | """ |
105 | 104 |
|
106 | | - def __init__(self, *receivers: Receiver[_T], name: str | None) -> None: |
| 105 | + def __init__( |
| 106 | + self, *receivers: Receiver[ReceiverMessageT_co], name: str | None |
| 107 | + ) -> None: |
107 | 108 | """Initialize this merger. |
108 | 109 |
|
109 | 110 | Args: |
110 | 111 | *receivers: The receivers to merge. |
111 | 112 | name: The name of the receiver. Used to create the string representation |
112 | 113 | of the receiver. |
113 | 114 | """ |
114 | | - self._receivers: dict[str, Receiver[_T]] = { |
| 115 | + self._receivers: dict[str, Receiver[ReceiverMessageT_co]] = { |
115 | 116 | str(id): recv for id, recv in enumerate(receivers) |
116 | 117 | } |
117 | 118 | self._name: str = name if name is not None else type(self).__name__ |
118 | 119 | self._pending: set[asyncio.Task[Any]] = { |
119 | 120 | asyncio.create_task(anext(recv), name=name) |
120 | 121 | for name, recv in self._receivers.items() |
121 | 122 | } |
122 | | - self._results: deque[_T] = deque(maxlen=len(self._receivers)) |
| 123 | + self._results: deque[ReceiverMessageT_co] = deque(maxlen=len(self._receivers)) |
123 | 124 |
|
124 | 125 | def __del__(self) -> None: |
125 | 126 | """Finalize this merger.""" |
@@ -170,7 +171,7 @@ async def ready(self) -> bool: |
170 | 171 | asyncio.create_task(anext(self._receivers[name]), name=name) |
171 | 172 | ) |
172 | 173 |
|
173 | | - def consume(self) -> _T: |
| 174 | + def consume(self) -> ReceiverMessageT_co: |
174 | 175 | """Return the latest message once `ready` is complete. |
175 | 176 |
|
176 | 177 | Returns: |
|
0 commit comments