Skip to content

Commit 2e9d419

Browse files
Initialize grid in data pipeline
The grid is initialized in the data pipeline, so that it can instantiate the formula engine pool to generate grid metrics. This caused circular imports, which is resolved by enclosing the type hints in a `TYPE_CHECKING` block. Also the component metadata is now retrieved from the component graph, instead of the microgrid API client. This is because asynchronous calls to the API are not suitable in the data pipeline as this is a high level user facing interface. The grid initialize() function needed to be changed to process the metadata as a dictionary rather than as a ComponentMetadata, and this is due to the fact that the component graph does not effectively retrieve the metadata as a ComponentMetadata object. Signed-off-by: Daniel Zullo <[email protected]>
1 parent 4f8a27c commit 2e9d419

File tree

6 files changed

+42
-20
lines changed

6 files changed

+42
-20
lines changed

src/frequenz/sdk/microgrid/__init__.py

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,6 @@
123123
""" # noqa: D205, D400
124124

125125
from ..actor import ResamplerConfig
126-
from ..timeseries.grid import initialize as initialize_grid
127126
from . import _data_pipeline, client, component, connection_manager, metadata
128127
from ._data_pipeline import (
129128
battery_pool,
@@ -143,11 +142,6 @@ async def initialize(host: str, port: int, resampler_config: ResamplerConfig) ->
143142
resampler_config: Configuration for the resampling actor.
144143
"""
145144
await connection_manager.initialize(host, port)
146-
147-
api_client = connection_manager.get().api_client
148-
components = await api_client.components()
149-
initialize_grid(components)
150-
151145
await _data_pipeline.initialize(resampler_config)
152146

153147

src/frequenz/sdk/microgrid/_data_pipeline.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
from ..timeseries._grid_frequency import GridFrequency
2525
from ..timeseries.grid import Grid
2626
from ..timeseries.grid import get as get_grid
27+
from ..timeseries.grid import initialize as initialize_grid
2728
from . import connection_manager
2829
from .component import ComponentCategory
2930

@@ -115,6 +116,7 @@ def __init__(
115116
self._power_managing_actor: _power_managing.PowerManagingActor | None = None
116117

117118
self._logical_meter: LogicalMeter | None = None
119+
self._grid: Grid | None = None
118120
self._ev_charger_pools: dict[frozenset[int], EVChargerPool] = {}
119121
self._battery_pools: dict[frozenset[int], BatteryPoolReferenceStore] = {}
120122
self._frequency_pool: dict[int, GridFrequency] = {}
@@ -199,7 +201,14 @@ def grid(
199201
Returns:
200202
A Grid instance.
201203
"""
202-
return get_grid()
204+
if self._grid is None:
205+
initialize_grid(
206+
channel_registry=self._channel_registry,
207+
resampler_subscription_sender=self._resampling_request_sender(),
208+
)
209+
self._grid = get_grid()
210+
211+
return self._grid
203212

204213
def battery_pool(
205214
self,

src/frequenz/sdk/timeseries/formula_engine/_formula_engine_pool.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99

1010
from frequenz.channels import Sender
1111

12-
from ...actor import ChannelRegistry, ComponentMetricRequest
1312
from ...microgrid.component import ComponentMetricId
1413
from .._quantities import Current, Power, Quantity
1514
from ._formula_generators._formula_generator import (
@@ -20,6 +19,7 @@
2019

2120
if TYPE_CHECKING:
2221
# Break circular import
22+
from ...actor import ChannelRegistry, ComponentMetricRequest
2323
from ..formula_engine import FormulaEngine, FormulaEngine3Phase
2424

2525

src/frequenz/sdk/timeseries/formula_engine/_formula_generators/_formula_generator.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,23 +3,27 @@
33

44
"""Base class for formula generators that use the component graphs."""
55

6+
from __future__ import annotations
67

78
import sys
89
from abc import ABC, abstractmethod
910
from collections import abc
1011
from collections.abc import Callable
1112
from dataclasses import dataclass
12-
from typing import Generic
13+
from typing import TYPE_CHECKING, Generic
1314

1415
from frequenz.channels import Sender
1516

16-
from ....actor import ChannelRegistry, ComponentMetricRequest
1717
from ....microgrid import component, connection_manager
1818
from ....microgrid.component import ComponentMetricId
1919
from ..._quantities import QuantityT
2020
from .._formula_engine import FormulaEngine, FormulaEngine3Phase
2121
from .._resampled_formula_builder import ResampledFormulaBuilder
2222

23+
if TYPE_CHECKING:
24+
# Break circular import
25+
from ....actor import ChannelRegistry, ComponentMetricRequest
26+
2327

2428
class FormulaGenerationError(Exception):
2529
"""An error encountered during formula generation from the component graph."""

src/frequenz/sdk/timeseries/formula_engine/_resampled_formula_builder.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,19 +3,23 @@
33

44
"""A builder for creating formula engines that operate on resampled component metrics."""
55

6+
from __future__ import annotations
67

78
from collections.abc import Callable
8-
from typing import Generic
9+
from typing import TYPE_CHECKING, Generic
910

1011
from frequenz.channels import Receiver, Sender
1112

12-
from ...actor import ChannelRegistry, ComponentMetricRequest
1313
from ...microgrid.component import ComponentMetricId
1414
from .. import Sample
1515
from .._quantities import QuantityT
1616
from ._formula_engine import FormulaBuilder, FormulaEngine
1717
from ._tokenizer import Tokenizer, TokenType
1818

19+
if TYPE_CHECKING:
20+
# Break circular import
21+
from ...actor import ChannelRegistry, ComponentMetricRequest
22+
1923

2024
class ResampledFormulaBuilder(Generic[QuantityT], FormulaBuilder[QuantityT]):
2125
"""Provides a way to build a FormulaEngine from resampled data streams."""
@@ -63,6 +67,9 @@ def _get_resampled_receiver(
6367
Returns:
6468
A receiver to stream resampled data for the given component id.
6569
"""
70+
# pylint: disable=import-outside-toplevel
71+
from frequenz.sdk.actor import ComponentMetricRequest
72+
6673
request = ComponentMetricRequest(self._namespace, component_id, metric_id, None)
6774
self._resampler_requests.append(request)
6875
return self._channel_registry.new_receiver(request.get_channel_name())

src/frequenz/sdk/timeseries/grid.py

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,16 +6,16 @@
66
This module provides the `Grid` type, which represents a grid connection point
77
in a microgrid.
88
"""
9+
from __future__ import annotations
910

1011
import logging
1112
import uuid
12-
from collections.abc import Iterable
1313
from dataclasses import dataclass
1414
from typing import TYPE_CHECKING
1515

1616
from frequenz.channels import Sender
1717

18-
from ..microgrid.component import Component
18+
from ..microgrid import connection_manager
1919
from ..microgrid.component._component import ComponentCategory
2020
from . import Fuse
2121
from .formula_engine._formula_engine_pool import FormulaEnginePool
@@ -42,14 +42,12 @@ class Grid:
4242

4343

4444
def initialize(
45-
components: Iterable[Component],
4645
channel_registry: ChannelRegistry,
4746
resampler_subscription_sender: Sender[ComponentMetricRequest],
4847
) -> None:
4948
"""Initialize the grid connection.
5049
5150
Args:
52-
components: The components in the microgrid.
5351
channel_registry: The channel registry instance shared with the
5452
resampling actor.
5553
resampler_subscription_sender: The sender for sending metric requests
@@ -63,9 +61,9 @@ def initialize(
6361
global _GRID # pylint: disable=global-statement
6462

6563
grid_connections = list(
66-
component
67-
for component in components
68-
if component.category == ComponentCategory.GRID
64+
connection_manager.get().component_graph.components(
65+
component_category={ComponentCategory.GRID},
66+
)
6967
)
7068

7169
grid_connections_count = len(grid_connections)
@@ -82,7 +80,17 @@ def initialize(
8280
if grid_connections[0].metadata is None:
8381
raise RuntimeError("Grid metadata is None")
8482

85-
fuse = grid_connections[0].metadata.fuse
83+
fuse: Fuse | None = None
84+
85+
# The current implementation of the Component Graph fails to
86+
# effectively convert components from a dictionary representation to
87+
# the expected Component object.
88+
# Specifically for the component metadata, it hands back a dictionary
89+
# instead of the expected ComponentMetadata type.
90+
metadata = grid_connections[0].metadata
91+
if isinstance(metadata, dict):
92+
fuse_dict = metadata.get("fuse", None)
93+
fuse = Fuse(**fuse_dict) if fuse_dict else None
8694

8795
if fuse is None:
8896
raise RuntimeError("Grid fuse is None")

0 commit comments

Comments
 (0)