Skip to content

Commit 43af70f

Browse files
committed
Add a ReceiverFetcherWith implementation
This is a `ReceiverFetcher` which can be used to modify newly created receivers, for example by applying a `map` or a `filter` on them, before returning them. Signed-off-by: Sahas Subramanian <[email protected]>
1 parent fa4dc36 commit 43af70f

File tree

1 file changed

+33
-0
lines changed

1 file changed

+33
-0
lines changed

src/frequenz/sdk/_internal/_channels.py

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,12 @@
88
import typing
99

1010
from frequenz.channels import Receiver
11+
from typing_extensions import override
1112

1213
from ._asyncio import cancel_and_await
1314

1415
T_co = typing.TypeVar("T_co", covariant=True)
16+
U_co = typing.TypeVar("U_co", covariant=True)
1517

1618

1719
class ReceiverFetcher(typing.Generic[T_co], typing.Protocol):
@@ -29,6 +31,37 @@ def new_receiver(self, *, limit: int = 50) -> Receiver[T_co]:
2931
"""
3032

3133

34+
class ReceiverFetcherWith(ReceiverFetcher[U_co], typing.Generic[T_co, U_co]):
35+
"""A receiver fetcher that can manipulate receivers before returning them."""
36+
37+
def __init__(
38+
self,
39+
fetcher: ReceiverFetcher[T_co],
40+
manipulator: typing.Callable[[Receiver[T_co]], Receiver[U_co]],
41+
) -> None:
42+
"""Initialize this instance.
43+
44+
Args:
45+
fetcher: The underlying fetcher to get receivers from.
46+
manipulator: The method to be applied on new receivers before returning
47+
them.
48+
"""
49+
self._fetcher = fetcher
50+
self._manipulator = manipulator
51+
52+
@override
53+
def new_receiver(self, *, limit: int = 50) -> Receiver[U_co]:
54+
"""Get a receiver from the channel.
55+
56+
Args:
57+
limit: The maximum size of the receiver.
58+
59+
Returns:
60+
A receiver instance.
61+
"""
62+
return self._manipulator(self._fetcher.new_receiver(limit=limit))
63+
64+
3265
class _Sentinel:
3366
"""A sentinel to denote that no value has been received yet."""
3467

0 commit comments

Comments
 (0)