Skip to content

Commit 00ab2bf

Browse files
committed
Add a MappingReceiverFetcher implementation
This is a `ReceiverFetcher` which can be used to modify newly created receivers, for example by applying a `map` or a `filter` on them, before returning them. Signed-off-by: Sahas Subramanian <[email protected]>
1 parent fa4dc36 commit 00ab2bf

File tree

1 file changed

+31
-0
lines changed

1 file changed

+31
-0
lines changed

src/frequenz/sdk/_internal/_channels.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
from ._asyncio import cancel_and_await
1313

1414
T_co = typing.TypeVar("T_co", covariant=True)
15+
U_co = typing.TypeVar("U_co", covariant=True)
1516

1617

1718
class ReceiverFetcher(typing.Generic[T_co], typing.Protocol):
@@ -29,6 +30,36 @@ def new_receiver(self, *, limit: int = 50) -> Receiver[T_co]:
2930
"""
3031

3132

33+
class MappingReceiverFetcher(typing.Generic[T_co, U_co]):
34+
"""A receiver fetcher that can manipulate receivers before returning them."""
35+
36+
def __init__(
37+
self,
38+
fetcher: ReceiverFetcher[T_co],
39+
mapping_function: typing.Callable[[Receiver[T_co]], Receiver[U_co]],
40+
) -> None:
41+
"""Initialize this instance.
42+
43+
Args:
44+
fetcher: The underlying fetcher to get receivers from.
45+
mapping_function: The method to be applied on new receivers before returning
46+
them.
47+
"""
48+
self._fetcher = fetcher
49+
self._mapping_function = mapping_function
50+
51+
def new_receiver(self, *, limit: int = 50) -> Receiver[U_co]:
52+
"""Get a receiver from the channel.
53+
54+
Args:
55+
limit: The maximum size of the receiver.
56+
57+
Returns:
58+
A receiver instance.
59+
"""
60+
return self._mapping_function(self._fetcher.new_receiver(limit=limit))
61+
62+
3263
class _Sentinel:
3364
"""A sentinel to denote that no value has been received yet."""
3465

0 commit comments

Comments
 (0)