Skip to content

Commit 2f44dbc

Browse files
committed
Adapt code to use the new ChannelRegistry
Note that we needed to include a hack for the `BatteryPool` because the hierarchy of `_Report`/`Report`/`BatteryPoolReport` is messed up and the types can only be forced to be compatible with a cast. Signed-off-by: Leandro Lucarella <[email protected]>
1 parent 98ad550 commit 2f44dbc

File tree

12 files changed

+102
-43
lines changed

12 files changed

+102
-43
lines changed

benchmarks/timeseries/benchmark_datasourcing.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,9 @@ async def consume(channel: Receiver[Any]) -> None:
108108
"current_phase_requests", evc_id, component_metric_id, None
109109
)
110110

111-
recv_channel = channel_registry.new_receiver(request.get_channel_name())
111+
recv_channel = channel_registry.get_or_create(
112+
ComponentMetricRequest, request.get_channel_name()
113+
).new_receiver()
112114

113115
await request_sender.send(request)
114116
consume_tasks.append(asyncio.create_task(consume(recv_channel)))

src/frequenz/sdk/actor/_data_sourcing/microgrid_api_source.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -325,7 +325,9 @@ def _get_metric_senders(
325325
(
326326
self._get_data_extraction_method(category, metric),
327327
[
328-
self._registry.new_sender(request.get_channel_name())
328+
self._registry.get_or_create(
329+
Sample[Quantity], request.get_channel_name()
330+
).new_sender()
329331
for request in req_list
330332
],
331333
)
@@ -379,8 +381,7 @@ def process_msg(data: Any) -> None:
379381

380382
await asyncio.gather(
381383
*[
382-
# pylint: disable=protected-access
383-
self._registry._close_channel(r.get_channel_name())
384+
self._registry.close_and_remove(r.get_channel_name())
384385
for requests in self._req_streaming_metrics[comp_id].values()
385386
for r in requests
386387
]

src/frequenz/sdk/actor/_power_managing/_power_managing_actor.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -211,14 +211,16 @@ async def _run(self) -> None:
211211

212212
if component_ids not in self._subscriptions:
213213
self._subscriptions[component_ids] = {
214-
priority: self._channel_registry.new_sender(
215-
sub.get_channel_name()
216-
)
214+
priority: self._channel_registry.get_or_create(
215+
_Report, sub.get_channel_name()
216+
).new_sender()
217217
}
218218
elif priority not in self._subscriptions[component_ids]:
219219
self._subscriptions[component_ids][
220220
priority
221-
] = self._channel_registry.new_sender(sub.get_channel_name())
221+
] = self._channel_registry.get_or_create(
222+
_Report, sub.get_channel_name()
223+
).new_sender()
222224

223225
if sub.component_ids not in self._bound_tracker_tasks:
224226
self._add_bounds_tracker(sub.component_ids)

src/frequenz/sdk/actor/_resampling.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -77,11 +77,15 @@ async def _subscribe(self, request: ComponentMetricRequest) -> None:
7777
)
7878
data_source_channel_name = data_source_request.get_channel_name()
7979
await self._data_sourcing_request_sender.send(data_source_request)
80-
receiver = self._channel_registry.new_receiver(data_source_channel_name)
80+
receiver = self._channel_registry.get_or_create(
81+
Sample[Quantity], data_source_channel_name
82+
).new_receiver()
8183

8284
# This is a temporary hack until the Sender implementation uses
8385
# exceptions to report errors.
84-
sender = self._channel_registry.new_sender(request_channel_name)
86+
sender = self._channel_registry.get_or_create(
87+
Sample[Quantity], request_channel_name
88+
).new_sender()
8589

8690
async def sink_adapter(sample: Sample[Quantity]) -> None:
8791
await sender.send(sample)

src/frequenz/sdk/timeseries/_grid_frequency.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
from ..microgrid import connection_manager
1616
from ..microgrid.component import Component, ComponentCategory, ComponentMetricId
1717
from ..timeseries._base_types import Sample
18-
from ..timeseries._quantities import Frequency
18+
from ..timeseries._quantities import Frequency, Quantity
1919

2020
if TYPE_CHECKING:
2121
# Imported here to avoid a circular import.
@@ -95,9 +95,9 @@ def new_receiver(self) -> Receiver[Sample[Frequency]]:
9595
Returns:
9696
A receiver that will receive grid frequency samples.
9797
"""
98-
receiver = self._channel_registry.new_receiver(
99-
self._component_metric_request.get_channel_name()
100-
)
98+
receiver = self._channel_registry.get_or_create(
99+
Sample[Quantity], self._component_metric_request.get_channel_name()
100+
).new_receiver()
101101

102102
if not self._task:
103103
self._task = asyncio.create_task(self._send_request())

src/frequenz/sdk/timeseries/_voltage_streamer.py

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,9 @@ def new_receiver(self) -> Receiver[Sample3Phase[Voltage]]:
118118
A receiver that will receive the phase-to-neutral voltage as a
119119
3-phase sample.
120120
"""
121-
receiver = self._channel_registry.new_receiver(self._channel_key)
121+
receiver = self._channel_registry.get_or_create(
122+
Sample3Phase[Voltage], self._channel_key
123+
).new_receiver()
122124

123125
if not self._task:
124126
self._task = asyncio.create_task(self._send_request())
@@ -148,10 +150,14 @@ async def _send_request(self) -> None:
148150
await self._resampler_subscription_sender.send(req)
149151

150152
phases_rx.append(
151-
self._channel_registry.new_receiver(req.get_channel_name())
153+
self._channel_registry.get_or_create(
154+
Sample[Quantity], req.get_channel_name()
155+
).new_receiver()
152156
)
153157

154-
sender = self._channel_registry.new_sender(self._channel_key)
158+
sender = self._channel_registry.get_or_create(
159+
Sample3Phase[Voltage], self._channel_key
160+
).new_sender()
155161

156162
_logger.debug(
157163
"Sent request for fetching voltage from: %s", self._source_component

src/frequenz/sdk/timeseries/battery_pool/_battery_pool.py

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import uuid
1313
from collections import abc
1414
from datetime import timedelta
15+
from typing import cast
1516

1617
from ... import timeseries
1718
from ..._internal._channels import ReceiverFetcher
@@ -386,12 +387,14 @@ def power_status(self) -> ReceiverFetcher[BatteryPoolReport]:
386387
] = asyncio.create_task(
387388
self._battery_pool._power_manager_bounds_subscription_sender.send(sub)
388389
)
389-
self._battery_pool._channel_registry.set_resend_latest(
390-
sub.get_channel_name(), True
391-
)
392-
return self._battery_pool._channel_registry.new_receiver_fetcher(
393-
sub.get_channel_name()
390+
channel = self._battery_pool._channel_registry.get_or_create(
391+
_power_managing._Report, sub.get_channel_name()
394392
)
393+
channel.resend_latest = True
394+
395+
# More details on why the cast is needed here:
396+
# https://github.com/frequenz-floss/frequenz-sdk-python/issues/823
397+
return cast(ReceiverFetcher[BatteryPoolReport], channel)
395398

396399
@property
397400
def _system_power_bounds(self) -> ReceiverFetcher[SystemBounds]:

src/frequenz/sdk/timeseries/ev_charger_pool/_ev_charger_pool.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -279,7 +279,9 @@ async def resampler_subscribe(
279279
start_time=None,
280280
)
281281
await self._resampler_subscription_sender.send(request)
282-
return self._channel_registry.new_receiver(request.get_channel_name())
282+
return self._channel_registry.get_or_create(
283+
Sample[Quantity], request.get_channel_name()
284+
).new_receiver()
283285

284286
return (
285287
await resampler_subscribe(ComponentMetricId.CURRENT_PHASE_1),

src/frequenz/sdk/timeseries/formula_engine/_resampled_formula_builder.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212

1313
from ...microgrid.component import ComponentMetricId
1414
from .. import Sample
15-
from .._quantities import QuantityT
15+
from .._quantities import Quantity, QuantityT
1616
from ._formula_engine import FormulaBuilder, FormulaEngine
1717
from ._tokenizer import Tokenizer, TokenType
1818

@@ -74,7 +74,9 @@ def _get_resampled_receiver(
7474

7575
request = ComponentMetricRequest(self._namespace, component_id, metric_id, None)
7676
self._resampler_requests.append(request)
77-
return self._channel_registry.new_receiver(request.get_channel_name())
77+
return self._channel_registry.get_or_create(
78+
Sample[QuantityT], request.get_channel_name()
79+
).new_receiver()
7880

7981
async def subscribe(self) -> None:
8082
"""Subscribe to all resampled component metric streams."""

tests/actor/test_data_sourcing.py

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
)
1414
from frequenz.sdk.microgrid import connection_manager
1515
from frequenz.sdk.microgrid.component import ComponentMetricId
16+
from frequenz.sdk.timeseries import Quantity, Sample
1617
from tests.microgrid import mock_api
1718

1819
# pylint: disable=no-member
@@ -59,21 +60,25 @@ async def test_data_sourcing_actor(self) -> None:
5960
active_power_request = ComponentMetricRequest(
6061
"test-namespace", 4, ComponentMetricId.ACTIVE_POWER, None
6162
)
62-
active_power_recv = registry.new_receiver(
63-
active_power_request.get_channel_name()
64-
)
63+
active_power_recv = registry.get_or_create(
64+
Sample[Quantity], active_power_request.get_channel_name()
65+
).new_receiver()
6566
await req_sender.send(active_power_request)
6667

6768
soc_request = ComponentMetricRequest(
6869
"test-namespace", 9, ComponentMetricId.SOC, None
6970
)
70-
soc_recv = registry.new_receiver(soc_request.get_channel_name())
71+
soc_recv = registry.get_or_create(
72+
Sample[Quantity], soc_request.get_channel_name()
73+
).new_receiver()
7174
await req_sender.send(soc_request)
7275

7376
soc2_request = ComponentMetricRequest(
7477
"test-namespace", 9, ComponentMetricId.SOC, None
7578
)
76-
soc2_recv = registry.new_receiver(soc2_request.get_channel_name())
79+
soc2_recv = registry.get_or_create(
80+
Sample[Quantity], soc2_request.get_channel_name()
81+
).new_receiver()
7782
await req_sender.send(soc2_request)
7883

7984
for _ in range(3):

0 commit comments

Comments
 (0)