@@ -45,103 +45,43 @@ class ComponentMetricsResamplingActor:
4545
4646 def __init__ ( # pylint: disable=too-many-arguments
4747 self ,
48+ * ,
4849 channel_registry : ChannelRegistry ,
49- subscription_sender : Sender [ComponentMetricRequest ],
50- subscription_receiver : Receiver [ComponentMetricRequest ],
50+ data_sourcing_request_sender : Sender [ComponentMetricRequest ],
51+ resampling_request_receiver : Receiver [ComponentMetricRequest ],
5152 resampling_period_s : float = 0.2 ,
5253 max_data_age_in_periods : float = 3.0 ,
5354 resampling_function : ResamplingFunction = average ,
5455 ) -> None :
5556 """Initialize the ComponentMetricsResamplingActor.
5657
5758 Args:
58- channel_registry: global channel registry used for receiving component
59- data from DataSource and for sending resampled samples downstream
60- subscription_sender: channel for sending component metric requests to the
61- DataSourcing actor
62- subscription_receiver: channel for receiving component metric requests
63- resampling_period_s: value describing how often resampling should be
64- performed, in seconds
65- max_data_age_in_periods: max age that samples shouldn't exceed in order
66- to be used in the resampling function
67- resampling_function: function to be applied to a sequence of samples within
68- a resampling period to produce a single output sample
69-
70- Example:
71- ```python
72- async def run() -> None:
73- await microgrid_api.initialize(HOST, PORT)
74-
75- channel_registry = ChannelRegistry(name="Microgrid Channel Registry")
76-
77- data_source_request_channel = Broadcast[ComponentMetricRequest](
78- "Data Source Request Channel"
79- )
80- data_source_request_sender = data_source_request_channel.new_sender()
81- data_source_request_receiver = data_source_request_channel.new_receiver()
82-
83- resampling_actor_request_channel = Broadcast[ComponentMetricRequest](
84- "Resampling Actor Request Channel"
85- )
86- resampling_actor_request_sender = resampling_actor_request_channel.new_sender()
87- resampling_actor_request_receiver = resampling_actor_request_channel.new_receiver()
88-
89- _data_sourcing_actor = DataSourcingActor(
90- request_receiver=data_source_request_receiver, registry=channel_registry
91- )
92-
93- _resampling_actor = ComponentMetricsResamplingActor(
94- channel_registry=channel_registry,
95- subscription_sender=data_source_request_sender,
96- subscription_receiver=resampling_actor_request_receiver,
97- resampling_period_s=1.0,
98- )
99-
100- components = await microgrid_api.get().microgrid_api_client.components()
101- battery_ids = [
102- comp.component_id
103- for comp in components
104- if comp.category == ComponentCategory.BATTERY
105- ]
106-
107- subscription_requests = [
108- ComponentMetricRequest(
109- namespace="Resampling",
110- component_id=component_id,
111- metric_id=ComponentMetricId.SOC,
112- start_time=None,
113- )
114- for component_id in battery_ids
115- ]
116-
117- await asyncio.gather(
118- *[
119- resampling_actor_request_sender.send(request)
120- for request in subscription_requests
121- ]
122- )
123-
124- sample_receiver = MergeNamed(
125- **{
126- channel_name: channel_registry.new_receiver(channel_name)
127- for channel_name in map(
128- lambda req: req.get_channel_name(), subscription_requests
129- )
130- }
131- )
132-
133- async for channel_name, msg in sample_receiver:
134- print(msg)
135-
136- asyncio.run(run())
137- ```
59+ channel_registry: The channel registry used to get senders and
60+ receivers for data sourcing subscriptions.
61+ data_sourcing_request_sender: The sender used to send requests to
62+ the [`DataSourcingActor`][frequenz.sdk.actor.DataSourcingActor]
63+ to subscribe to component metrics.
64+ resampling_request_receiver: The receiver to use to receive new
65+ resampmling subscription requests.
66+ resampling_period_s: The time it passes between resampled data
67+ should be calculated (in seconds).
68+ max_data_age_in_periods: The maximum age a sample can have to be
69+ considered *relevant* for resampling purposes, expressed in the
70+ number of resampling periods. For exapmle is
71+ `resampling_period_s` is 3 and `max_data_age_in_periods` is 2,
72+ then data older than `3*2 = 6` secods will be discarded when
73+ creating a new sample and never passed to the resampling
74+ function.
75+ resampling_function: The function to be applied to the sequence of
76+ *relevant* samples at a given time. The result of the function
77+ is what is sent as the resampled data.
13878 """
13979 self ._channel_registry = channel_registry
140- self ._subscription_sender = subscription_sender
141- self ._subscription_receiver = subscription_receiver
14280 self ._resampling_period_s = resampling_period_s
14381 self ._max_data_age_in_periods : float = max_data_age_in_periods
14482 self ._resampling_function : ResamplingFunction = resampling_function
83+ self ._data_sourcing_request_sender = data_sourcing_request_sender
84+ self ._resampling_request_receiver = resampling_request_receiver
14585 self ._resampler = Resampler (
14686 resampling_period_s = resampling_period_s ,
14787 max_data_age_in_periods = max_data_age_in_periods ,
@@ -158,7 +98,7 @@ async def _subscribe(self, request: ComponentMetricRequest) -> None:
15898 request , namespace = request .namespace + ":Source"
15999 )
160100 data_source_channel_name = data_source_request .get_channel_name ()
161- await self ._subscription_sender .send (data_source_request )
101+ await self ._data_sourcing_request_sender .send (data_source_request )
162102 receiver = self ._channel_registry .new_receiver (data_source_channel_name )
163103
164104 # This is a temporary hack until the Sender implementation uses
@@ -173,7 +113,7 @@ async def sink_adapter(sample: Sample) -> None:
173113
174114 async def _process_resampling_requests (self ) -> None :
175115 """Process resampling data requests."""
176- async for request in self ._subscription_receiver :
116+ async for request in self ._resampling_request_receiver :
177117 await self ._subscribe (request )
178118
179119 async def run (self ) -> None :
0 commit comments