|
1 | | -# License: MIT |
2 | | -# Copyright © 2022 Frequenz Energy-as-a-Service GmbH |
3 | | - |
4 | | -"""A class that would dynamically create, own and provide access to channels.""" |
5 | | - |
6 | | -from __future__ import annotations |
7 | | - |
8 | | -import typing |
9 | | - |
10 | | -from frequenz.channels import Broadcast, Receiver, Sender |
11 | | - |
12 | | -from .._internal._channels import ReceiverFetcher |
13 | | - |
14 | | - |
15 | | -class ChannelRegistry: |
16 | | - """Dynamically creates, own and provide access to channels. |
17 | | -
|
18 | | - It can be used by actors to dynamically establish a communication channel |
19 | | - between each other. Channels are identified by string names. |
20 | | - """ |
21 | | - |
22 | | - def __init__(self, *, name: str) -> None: |
23 | | - """Create a `ChannelRegistry` instance. |
24 | | -
|
25 | | - Args: |
26 | | - name: A unique name for the registry. |
27 | | - """ |
28 | | - self._name = name |
29 | | - self._channels: dict[str, Broadcast[typing.Any]] = {} |
30 | | - |
31 | | - def set_resend_latest(self, key: str, resend_latest: bool) -> None: |
32 | | - """Set the `resend_latest` flag for a given channel. |
33 | | -
|
34 | | - This flag controls whether the latest value of the channel should be resent to |
35 | | - new receivers, in slow streams. |
36 | | -
|
37 | | - `resend_latest` is `False` by default. It is safe to be set in data/reporting |
38 | | - channels, but is not recommended for use in channels that stream control |
39 | | - instructions. |
40 | | -
|
41 | | - Args: |
42 | | - key: The key to identify the channel. |
43 | | - resend_latest: Whether to resend the latest value to new receivers, for the |
44 | | - given channel. |
45 | | - """ |
46 | | - if key not in self._channels: |
47 | | - self._channels[key] = Broadcast(f"{self._name}-{key}") |
48 | | - # This attribute is protected in the current version of the channels library, |
49 | | - # but that will change in the future. |
50 | | - self._channels[key].resend_latest = resend_latest |
51 | | - |
52 | | - def new_sender(self, key: str) -> Sender[typing.Any]: |
53 | | - """Get a sender to a dynamically created channel with the given key. |
54 | | -
|
55 | | - Args: |
56 | | - key: A key to identify the channel. |
57 | | -
|
58 | | - Returns: |
59 | | - A sender to a dynamically created channel with the given key. |
60 | | - """ |
61 | | - if key not in self._channels: |
62 | | - self._channels[key] = Broadcast(f"{self._name}-{key}") |
63 | | - return self._channels[key].new_sender() |
64 | | - |
65 | | - def new_receiver(self, key: str, maxsize: int = 50) -> Receiver[typing.Any]: |
66 | | - """Get a receiver to a dynamically created channel with the given key. |
67 | | -
|
68 | | - Args: |
69 | | - key: A key to identify the channel. |
70 | | - maxsize: The maximum size of the receiver. |
71 | | -
|
72 | | - Returns: |
73 | | - A receiver for a dynamically created channel with the given key. |
74 | | - """ |
75 | | - if key not in self._channels: |
76 | | - self._channels[key] = Broadcast(f"{self._name}-{key}") |
77 | | - return self._channels[key].new_receiver(maxsize=maxsize) |
78 | | - |
79 | | - def new_receiver_fetcher(self, key: str) -> ReceiverFetcher[typing.Any]: |
80 | | - """Get a receiver fetcher to a dynamically created channel with the given key. |
81 | | -
|
82 | | - Args: |
83 | | - key: A key to identify the channel. |
84 | | -
|
85 | | - Returns: |
86 | | - A receiver fetcher for a dynamically created channel with the given key. |
87 | | - """ |
88 | | - if key not in self._channels: |
89 | | - self._channels[key] = Broadcast(f"{self._name}-{key}") |
90 | | - return _RegistryReceiverFetcher(self, key) |
91 | | - |
92 | | - async def _close_channel(self, key: str) -> None: |
93 | | - """Close a channel with the given key. |
94 | | -
|
95 | | - This method is private and should only be used in special cases. |
96 | | -
|
97 | | - Args: |
98 | | - key: A key to identify the channel. |
99 | | - """ |
100 | | - if key in self._channels: |
101 | | - if channel := self._channels.pop(key, None): |
102 | | - await channel.close() |
103 | | - |
104 | | - |
105 | | -T = typing.TypeVar("T") |
106 | | - |
107 | | - |
108 | | -class _RegistryReceiverFetcher(typing.Generic[T]): |
109 | | - """A receiver fetcher that is bound to a channel registry and a key.""" |
110 | | - |
111 | | - def __init__( |
112 | | - self, |
113 | | - registry: ChannelRegistry, |
114 | | - key: str, |
115 | | - ) -> None: |
116 | | - """Create a new instance of a receiver fetcher. |
117 | | -
|
118 | | - Args: |
119 | | - registry: The channel registry. |
120 | | - key: A key to identify the channel. |
121 | | - """ |
122 | | - self._registry = registry |
123 | | - self._key = key |
124 | | - |
125 | | - def new_receiver(self, maxsize: int = 50) -> Receiver[T]: |
126 | | - """Get a receiver from the channel. |
127 | | -
|
128 | | - Args: |
129 | | - maxsize: The maximum size of the receiver. |
130 | | -
|
131 | | - Returns: |
132 | | - A receiver instance. |
133 | | - """ |
134 | | - return self._registry.new_receiver(self._key, maxsize) |
0 commit comments