Skip to content

Commit 267d4ee

Browse files
Add a DataPipeline implementation (#270)
... that abstracts away the creation of the data sourcing and resampling actors, and provides an easy way to access the logical meter or one of the pools. Currently only provides two methods: `logical_meter` and `ev_charger_pool`. `battery_pool` probably needs some consensus on how args should be pass and interaction between `PowerDistributingActor`, which can be done in a subsequent PR. This allows for the following interface: ``` python async def run(): await microgrid.initialize( host=HOST, port=PORT, resampler_config=ResamplerConfig(resampling_period_s=1.0) ) grid_power = microgrid.logical_meter().grid_power() ``` This PR also renames: 1. `microgrid._microgrid` to `microgrid.connection_manager` 2. `tests/utils/mock_microgrid.py` to `mock_microgrid_client.py`
2 parents c6e15fe + 47e78d3 commit 267d4ee

34 files changed

+384
-224
lines changed

RELEASE_NOTES.md

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,17 @@
1111

1212
## New Features
1313

14-
<!-- Here goes the main new features and examples or instructions on how to use them -->
14+
* Automatic creation of core data-pipeline actors, to eliminate a lot
15+
of boiler plate code. This makes it much simpler to deploy apps
16+
(#270). For example:
17+
18+
``` python
19+
async def run():
20+
await microgrid.initialize(
21+
host=HOST, port=PORT, resampler_config=ResamplerConfig(resampling_period_s=1.0)
22+
)
23+
grid_power = microgrid.logical_meter().grid_power()
24+
```
1525

1626
## Bug Fixes
1727

benchmarks/power_distribution/power_distributor.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
from frequenz.channels import Bidirectional, Broadcast
1414

1515
from frequenz.sdk import microgrid
16+
from frequenz.sdk.actor import ResamplerConfig
1617
from frequenz.sdk.actor.power_distributing import (
1718
BatteryStatus,
1819
Error,
@@ -24,6 +25,7 @@
2425
Result,
2526
Success,
2627
)
28+
from frequenz.sdk.microgrid import connection_manager
2729
from frequenz.sdk.microgrid.component import Component, ComponentCategory
2830

2931
HOST = "microgrid.sandbox.api.frequenz.io"
@@ -152,9 +154,9 @@ async def run() -> None:
152154
"""Create microgrid api and run tests."""
153155
# pylint: disable=protected-access
154156

155-
await microgrid.initialize(HOST, PORT)
157+
await microgrid.initialize(HOST, PORT, ResamplerConfig(resampling_period_s=1.0))
156158

157-
all_batteries: Set[Component] = microgrid.get().component_graph.components(
159+
all_batteries: Set[Component] = connection_manager.get().component_graph.components(
158160
component_category={ComponentCategory.BATTERY}
159161
)
160162
batteries_ids = {c.component_id for c in all_batteries}

examples/battery_pool.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,11 @@
1818
ChannelRegistry,
1919
ComponentMetricRequest,
2020
DataSourcingActor,
21+
ResamplerConfig,
2122
)
2223
from frequenz.sdk.actor.power_distributing import PowerDistributingActor
2324
from frequenz.sdk.actor.power_distributing._battery_pool_status import BatteryStatus
25+
from frequenz.sdk.microgrid import connection_manager
2426
from frequenz.sdk.microgrid.component import ComponentCategory
2527
from frequenz.sdk.timeseries.battery_pool import BatteryPool
2628

@@ -57,7 +59,7 @@ def create_battery_pool() -> BatteryPool:
5759
battery_status_sender=battery_status_channel.new_sender(),
5860
)
5961

60-
batteries = microgrid.get().component_graph.components(
62+
batteries = connection_manager.get().component_graph.components(
6163
component_category={ComponentCategory.BATTERY}
6264
)
6365

@@ -75,7 +77,9 @@ async def main() -> None:
7577
logging.basicConfig(
7678
level=logging.DEBUG, format="%(asctime)s %(name)s %(levelname)s:%(message)s"
7779
)
78-
await microgrid.initialize(host=HOST, port=PORT)
80+
await microgrid.initialize(
81+
host=HOST, port=PORT, resampler_config=ResamplerConfig(resampling_period_s=1.0)
82+
)
7983

8084
battery_pool = create_battery_pool()
8185
receivers: Dict[str, Receiver[Any]] = {

examples/battery_status.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,12 @@
1313
from frequenz.channels import Broadcast
1414

1515
from frequenz.sdk import microgrid
16+
from frequenz.sdk.actor import ResamplerConfig
1617
from frequenz.sdk.actor.power_distributing._battery_pool_status import (
1718
BatteryPoolStatus,
1819
BatteryStatus,
1920
)
21+
from frequenz.sdk.microgrid import connection_manager
2022
from frequenz.sdk.microgrid.component import ComponentCategory
2123

2224
_logger = logging.getLogger(__name__)
@@ -29,10 +31,10 @@ async def main() -> None:
2931
logging.basicConfig(
3032
level=logging.DEBUG, format="%(asctime)s %(name)s %(levelname)s:%(message)s"
3133
)
32-
await microgrid.initialize(HOST, PORT)
34+
await microgrid.initialize(HOST, PORT, ResamplerConfig(resampling_period_s=1.0))
3335
batteries = {
3436
bat.component_id
35-
for bat in microgrid.get().component_graph.components(
37+
for bat in connection_manager.get().component_graph.components(
3638
component_category={ComponentCategory.BATTERY}
3739
)
3840
}

examples/power_distribution.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
Result,
3434
Success,
3535
)
36+
from frequenz.sdk.microgrid import connection_manager
3637
from frequenz.sdk.microgrid.component import Component, ComponentCategory
3738
from frequenz.sdk.timeseries import Sample
3839
from frequenz.sdk.timeseries.logical_meter import LogicalMeter
@@ -158,7 +159,7 @@ async def run() -> None:
158159
logging.basicConfig(
159160
level=logging.DEBUG, format="%(asctime)s %(name)s %(levelname)s:%(message)s"
160161
)
161-
await microgrid.initialize(HOST, PORT)
162+
await microgrid.initialize(HOST, PORT, ResamplerConfig(resampling_period_s=1.0))
162163

163164
channel_registry = ChannelRegistry(name="Microgrid Channel Registry")
164165

@@ -185,7 +186,7 @@ async def run() -> None:
185186
logical_meter = LogicalMeter(
186187
channel_registry,
187188
resampling_actor_request_channel.new_sender(),
188-
microgrid.get().component_graph,
189+
connection_manager.get().component_graph,
189190
)
190191
sending_actor_id: str = "SendingActor"
191192
# Bidirectional channel is used for one sender - one receiver communication
@@ -212,7 +213,7 @@ async def run() -> None:
212213

213214
# You should get components from ComponentGraph, not from the api.
214215
# It is faster and and non blocking approach.
215-
batteries: Set[Component] = microgrid.get().component_graph.components(
216+
batteries: Set[Component] = connection_manager.get().component_graph.components(
216217
# component_type=set(ComponentType.BATTERY) in v0.8.0
217218
component_category={ComponentCategory.BATTERY}
218219
)

examples/resampling.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
ComponentMetricsResamplingActor,
1717
DataSourcingActor,
1818
)
19+
from frequenz.sdk.microgrid import connection_manager
1920
from frequenz.sdk.microgrid.component import ComponentCategory, ComponentMetricId
2021
from frequenz.sdk.timeseries import Sample
2122
from frequenz.sdk.timeseries._resampling import Resampler, ResamplerConfig, Sink, Source
@@ -42,7 +43,7 @@ async def _print_sample(sample: Sample) -> None:
4243

4344
async def run() -> None: # pylint: disable=too-many-locals
4445
"""Run main functions that initializes and creates everything."""
45-
await microgrid.initialize(HOST, PORT)
46+
await microgrid.initialize(HOST, PORT, ResamplerConfig(resampling_period_s=0.2))
4647

4748
channel_registry = ChannelRegistry(name="data-registry")
4849

@@ -68,7 +69,7 @@ async def run() -> None: # pylint: disable=too-many-locals
6869
config=ResamplerConfig(resampling_period_s=1),
6970
)
7071

71-
components = await microgrid.get().api_client.components()
72+
components = await connection_manager.get().api_client.components()
7273
battery_ids = [
7374
comp.component_id
7475
for comp in components

src/frequenz/sdk/actor/_data_sourcing/microgrid_api_source.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111

1212
from frequenz.channels import Receiver, Sender
1313

14-
from ... import microgrid
14+
from ...microgrid import connection_manager
1515
from ...microgrid.component import (
1616
BatteryData,
1717
ComponentCategory,
@@ -138,7 +138,7 @@ async def _get_component_category(
138138
if comp_id in self._comp_categories_cache:
139139
return self._comp_categories_cache[comp_id]
140140

141-
api = microgrid.get().api_client
141+
api = connection_manager.get().api_client
142142
for comp in await api.components():
143143
self._comp_categories_cache[comp.component_id] = comp.category
144144

@@ -168,7 +168,7 @@ async def _check_battery_request(
168168
if comp_id not in self.comp_data_receivers:
169169
self.comp_data_receivers[
170170
comp_id
171-
] = await microgrid.get().api_client.battery_data(comp_id)
171+
] = await connection_manager.get().api_client.battery_data(comp_id)
172172

173173
async def _check_ev_charger_request(
174174
self,
@@ -191,7 +191,7 @@ async def _check_ev_charger_request(
191191
if comp_id not in self.comp_data_receivers:
192192
self.comp_data_receivers[
193193
comp_id
194-
] = await microgrid.get().api_client.ev_charger_data(comp_id)
194+
] = await connection_manager.get().api_client.ev_charger_data(comp_id)
195195

196196
async def _check_inverter_request(
197197
self,
@@ -214,7 +214,7 @@ async def _check_inverter_request(
214214
if comp_id not in self.comp_data_receivers:
215215
self.comp_data_receivers[
216216
comp_id
217-
] = await microgrid.get().api_client.inverter_data(comp_id)
217+
] = await connection_manager.get().api_client.inverter_data(comp_id)
218218

219219
async def _check_meter_request(
220220
self,
@@ -237,7 +237,7 @@ async def _check_meter_request(
237237
if comp_id not in self.comp_data_receivers:
238238
self.comp_data_receivers[
239239
comp_id
240-
] = await microgrid.get().api_client.meter_data(comp_id)
240+
] = await connection_manager.get().api_client.meter_data(comp_id)
241241

242242
async def _check_requested_component_and_metrics(
243243
self,

src/frequenz/sdk/actor/power_distributing/_battery_status.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
from frequenz.channels.util import Select, Timer
2121

2222
from frequenz.sdk._internal.asyncio import cancel_and_await
23-
from frequenz.sdk.microgrid import get as get_microgrid
23+
from frequenz.sdk.microgrid import connection_manager
2424
from frequenz.sdk.microgrid.component import (
2525
BatteryData,
2626
ComponentCategory,
@@ -240,7 +240,7 @@ async def _run(
240240
set_power_result_receiver: Channel to receive results of the requests to the
241241
components.
242242
"""
243-
api_client = get_microgrid().api_client
243+
api_client = connection_manager.get().api_client
244244

245245
battery_receiver = await api_client.battery_data(self._battery.component_id)
246246
inverter_receiver = await api_client.inverter_data(self._inverter.component_id)
@@ -494,7 +494,7 @@ def _find_adjacent_inverter_id(self, battery_id: int) -> Optional[int]:
494494
Returns:
495495
Id of the inverter. If battery hasn't adjacent inverter, then return None.
496496
"""
497-
graph = get_microgrid().component_graph
497+
graph = connection_manager.get().component_graph
498498
return next(
499499
(
500500
comp.component_id

src/frequenz/sdk/actor/power_distributing/power_distributing.py

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,9 @@
3232
from frequenz.channels import Bidirectional, Peekable, Receiver, Sender
3333
from google.protobuf.empty_pb2 import Empty # pylint: disable=no-name-in-module
3434

35-
from ... import microgrid
3635
from ..._internal.asyncio import cancel_and_await
3736
from ...actor._decorator import actor
38-
from ...microgrid import ComponentGraph
37+
from ...microgrid import ComponentGraph, connection_manager
3938
from ...microgrid.client import MicrogridApiClient
4039
from ...microgrid.component import (
4140
BatteryData,
@@ -168,7 +167,7 @@ def __init__(
168167
)
169168

170169
self._bat_inv_map, self._inv_bat_map = self._get_components_pairs(
171-
microgrid.get().component_graph
170+
connection_manager.get().component_graph
172171
)
173172
self._battery_receivers: Dict[int, Peekable[BatteryData]] = {}
174173
self._inverter_receivers: Dict[int, Peekable[InverterData]] = {}
@@ -258,7 +257,7 @@ async def run(self) -> None:
258257
"""
259258
await self._create_channels()
260259

261-
api = microgrid.get().api_client
260+
api = connection_manager.get().api_client
262261

263262
# Wait few seconds to get data from the channels created above.
264263
await asyncio.sleep(self._wait_for_data_sec)
@@ -677,7 +676,7 @@ def _get_battery_inverter_data(
677676

678677
async def _create_channels(self) -> None:
679678
"""Create channels to get data of components in microgrid."""
680-
api = microgrid.get().api_client
679+
api = connection_manager.get().api_client
681680
for battery_id, inverter_id in self._bat_inv_map.items():
682681
bat_recv: Receiver[BatteryData] = await api.battery_data(battery_id)
683682
self._battery_receivers[battery_id] = bat_recv.into_peekable()

src/frequenz/sdk/microgrid/__init__.py

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,15 +7,29 @@
77
for monitoring and adjusting the state of a microgrid.
88
"""
99

10-
from . import client, component
10+
from ..actor import ResamplerConfig
11+
from . import _data_pipeline, client, component, connection_manager
12+
from ._data_pipeline import ev_charger_pool, logical_meter
1113
from ._graph import ComponentGraph
12-
from ._microgrid import ConnectionManager, get, initialize
14+
15+
16+
async def initialize(host: str, port: int, resampler_config: ResamplerConfig) -> None:
17+
"""Initialize the microgrid connection manager and the data pipeline.
18+
19+
Args:
20+
host: Host to connect to, to reach the microgrid API.
21+
port: port to connect to.
22+
resampler_config: Configuration for the resampling actor.
23+
"""
24+
await connection_manager.initialize(host, port)
25+
_data_pipeline.initialize(resampler_config)
26+
1327

1428
__all__ = [
1529
"ComponentGraph",
16-
"ConnectionManager",
17-
"get",
1830
"initialize",
1931
"client",
2032
"component",
33+
"ev_charger_pool",
34+
"logical_meter",
2135
]

0 commit comments

Comments
 (0)