Skip to content

Commit 18a52b2

Browse files
committed
Adapt formula engine
1 parent 318f236 commit 18a52b2

File tree

6 files changed

+79
-94
lines changed

6 files changed

+79
-94
lines changed

src/frequenz/client/reporting/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
This package provides a low-level interface for interacting with the reporting API.
77
"""
88

9-
9+
from ._base_types import *
1010
from ._client import ReportingApiClient
1111

1212
__all__ = ["ReportingApiClient"]

src/frequenz/client/reporting/formula_engine/_formula_engine.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,8 @@
1616

1717
from frequenz.channels import Broadcast, Receiver
1818

19-
from ..._internal._asyncio import cancel_and_await
20-
from .. import Sample, Sample3Phase
19+
#from ..._internal._asyncio import cancel_and_await
20+
from .._base_types import Sample, Sample3Phase
2121
from .._quantities import Quantity, QuantityT
2222
from ._formula_evaluator import FormulaEvaluator
2323
from ._formula_formatter import format_formula
@@ -316,6 +316,7 @@ async def _run(self) -> None:
316316
_logger.warning(
317317
"Formula application failed: %s. Error: %s", self._name, err
318318
)
319+
raise
319320
else:
320321
await sender.send(msg)
321322

@@ -429,7 +430,8 @@ async def _stop(self) -> None:
429430
"""Stop a running formula engine."""
430431
if self._task is None:
431432
return
432-
await cancel_and_await(self._task)
433+
# FIXME
434+
#await cancel_and_await(self._task)
433435

434436
def __add__(
435437
self,

src/frequenz/client/reporting/formula_engine/_formula_generators/__init__.py

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -3,21 +3,21 @@
33

44
"""Generators for formulas from component graphs."""
55

6-
from ._battery_power_formula import BatteryPowerFormula
7-
from ._chp_power_formula import CHPPowerFormula
8-
from ._consumer_power_formula import ConsumerPowerFormula
9-
from ._ev_charger_current_formula import EVChargerCurrentFormula
10-
from ._ev_charger_power_formula import EVChargerPowerFormula
6+
#from ._battery_power_formula import BatteryPowerFormula
7+
#from ._chp_power_formula import CHPPowerFormula
8+
#from ._consumer_power_formula import ConsumerPowerFormula
9+
#from ._ev_charger_current_formula import EVChargerCurrentFormula
10+
#from ._ev_charger_power_formula import EVChargerPowerFormula
1111
from ._formula_generator import (
1212
ComponentNotFound,
1313
FormulaGenerationError,
1414
FormulaGenerator,
1515
FormulaGeneratorConfig,
1616
)
17-
from ._grid_current_formula import GridCurrentFormula
18-
from ._grid_power_3_phase_formula import GridPower3PhaseFormula
19-
from ._grid_power_formula import GridPowerFormula
20-
from ._producer_power_formula import ProducerPowerFormula
17+
#from ._grid_current_formula import GridCurrentFormula
18+
#from ._grid_power_3_phase_formula import GridPower3PhaseFormula
19+
#from ._grid_power_formula import GridPowerFormula
20+
#from ._producer_power_formula import ProducerPowerFormula
2121
from ._pv_power_formula import PVPowerFormula
2222

2323
__all__ = [
@@ -29,19 +29,19 @@
2929
#
3030
# Power Formula generators
3131
#
32-
"CHPPowerFormula",
33-
"ConsumerPowerFormula",
34-
"GridPower3PhaseFormula",
35-
"GridPowerFormula",
36-
"BatteryPowerFormula",
37-
"EVChargerPowerFormula",
32+
# "CHPPowerFormula",
33+
# "ConsumerPowerFormula",
34+
# "GridPower3PhaseFormula",
35+
# "GridPowerFormula",
36+
# "BatteryPowerFormula",
37+
# "EVChargerPowerFormula",
3838
"PVPowerFormula",
39-
"ProducerPowerFormula",
39+
# "ProducerPowerFormula",
4040
#
4141
# Current formula generators
4242
#
43-
"GridCurrentFormula",
44-
"EVChargerCurrentFormula",
43+
# "GridCurrentFormula",
44+
# "EVChargerCurrentFormula",
4545
#
4646
# Exceptions
4747
#

src/frequenz/client/reporting/formula_engine/_formula_generators/_formula_generator.py

Lines changed: 31 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,11 @@
1313
from typing import Generic
1414

1515
from frequenz.channels import Sender
16-
from frequenz.client.microgrid import Component, ComponentCategory, ComponentMetricId
16+
from frequenz.client.reporting.component_graph import Component, ComponentCategory, ComponentMetricId
1717

18-
from ...._internal._channels import ChannelRegistry
19-
from ....microgrid import connection_manager
20-
from ....microgrid._data_sourcing import ComponentMetricRequest
18+
#from ...._internal._channels import ChannelRegistry
19+
#from ....microgrid import connection_manager
20+
#from ....microgrid._data_sourcing import ComponentMetricRequest
2121
from ..._quantities import QuantityT
2222
from .._formula_engine import FormulaEngine, FormulaEngine3Phase
2323
from .._resampled_formula_builder import ResampledFormulaBuilder
@@ -57,10 +57,12 @@ class FormulaGenerator(ABC, Generic[QuantityT]):
5757

5858
def __init__(
5959
self,
60-
namespace: str,
61-
channel_registry: ChannelRegistry,
62-
resampler_subscription_sender: Sender[ComponentMetricRequest],
60+
get_receiver: Callable[[int, ComponentMetricId], Receiver[Sample[Quantity]]],
61+
#namespace: str,
62+
#channel_registry: ChannelRegistry,
63+
#resampler_subscription_sender: Sender[ComponentMetricRequest],
6364
config: FormulaGeneratorConfig,
65+
component_graph: ComponentGraph,
6466
) -> None:
6567
"""Create a `FormulaGenerator` instance.
6668
@@ -72,17 +74,19 @@ def __init__(
7274
resampling actor.
7375
config: configs for the formula generator.
7476
"""
75-
self._channel_registry: ChannelRegistry = channel_registry
76-
self._resampler_subscription_sender: Sender[ComponentMetricRequest] = (
77-
resampler_subscription_sender
78-
)
79-
self._namespace: str = namespace
77+
#self._channel_registry: ChannelRegistry = channel_registry
78+
#self._resampler_subscription_sender: Sender[ComponentMetricRequest] = (
79+
# resampler_subscription_sender
80+
#)
81+
#self._namespace: str = namespace
82+
self._get_receiver = get_receiver
8083
self._config: FormulaGeneratorConfig = config
84+
self._component_graph: ComponentGraph = component_graph
8185

8286
@property
8387
def namespace(self) -> str:
8488
"""Get the namespace for the formula generator."""
85-
return self._namespace
89+
return "bla" #self._namespace
8690

8791
def _get_builder(
8892
self,
@@ -91,10 +95,11 @@ def _get_builder(
9195
create_method: Callable[[float], QuantityT],
9296
) -> ResampledFormulaBuilder[QuantityT]:
9397
builder = ResampledFormulaBuilder(
94-
self._namespace,
98+
#self._namespace,
9599
name,
96-
self._channel_registry,
97-
self._resampler_subscription_sender,
100+
#self._channel_registry,
101+
#self._resampler_subscription_sender,
102+
self._get_receiver,
98103
component_metric_id,
99104
create_method,
100105
)
@@ -110,7 +115,8 @@ def _get_grid_component(self) -> Component:
110115
Raises:
111116
ComponentNotFound: If the grid component is not found in the component graph.
112117
"""
113-
component_graph = connection_manager.get().component_graph
118+
#component_graph = connection_manager.get().component_graph
119+
component_graph = self._component_graph
114120
grid_component = next(
115121
iter(
116122
component_graph.components(
@@ -134,7 +140,8 @@ def _get_grid_component_successors(self) -> set[Component]:
134140
ComponentNotFound: If no successor components are found in the component graph.
135141
"""
136142
grid_component = self._get_grid_component()
137-
component_graph = connection_manager.get().component_graph
143+
#component_graph = connection_manager.get().component_graph
144+
component_graph = self._component_graph
138145
grid_successors = component_graph.successors(grid_component.component_id)
139146

140147
if not grid_successors:
@@ -178,7 +185,8 @@ def _get_metric_fallback_components(
178185
* The keys are primary components.
179186
* The values are sets of fallback components.
180187
"""
181-
graph = connection_manager.get().component_graph
188+
#graph = connection_manager.get().component_graph
189+
graph = self._component_graph
182190
fallbacks: dict[Component, set[Component]] = {}
183191

184192
for component in components:
@@ -210,7 +218,8 @@ def _get_meter_fallback_components(self, meter: Component) -> set[Component]:
210218
"""
211219
assert meter.category == ComponentCategory.METER
212220

213-
graph = connection_manager.get().component_graph
221+
#graph = connection_manager.get().component_graph
222+
graph = self._component_graph
214223
successors = graph.successors(meter.component_id)
215224

216225
# All fallbacks has to be of the same type and category.
@@ -241,7 +250,8 @@ def _is_primary_fallback_pair(
241250
Returns:
242251
bool: True if the provided components are a primary-fallback pair, False otherwise.
243252
"""
244-
graph = connection_manager.get().component_graph
253+
#graph = connection_manager.get().component_graph
254+
graph = self._component_graph
245255

246256
# reassign to decrease the length of the line and make code readable
247257
fallback = fallback_candidate

src/frequenz/client/reporting/formula_engine/_formula_generators/_pv_power_formula.py

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,9 @@
55

66
import logging
77

8-
from frequenz.client.microgrid import Component, ComponentCategory, ComponentMetricId
8+
from frequenz.client.reporting.component_graph import Component, ComponentCategory, ComponentMetricId
99

10-
from ....microgrid import connection_manager
10+
#from ....microgrid import connection_manager
1111
from ..._quantities import Power
1212
from .._formula_engine import FormulaEngine
1313
from ._fallback_formula_metric_fetcher import FallbackFormulaMetricFetcher
@@ -27,6 +27,7 @@ def generate( # noqa: DOC502
2727
# * ComponentNotFound is raised indirectly by _get_pv_power_components
2828
# * RuntimeError is also raised indirectly by _get_pv_power_components
2929
self,
30+
#component_graph: "ComponentGraph",
3031
) -> FormulaEngine[Power]:
3132
"""Make a formula for the PV power production of a microgrid.
3233
@@ -42,7 +43,8 @@ def generate( # noqa: DOC502
4243
"pv-power", ComponentMetricId.ACTIVE_POWER, Power.from_watts
4344
)
4445

45-
component_graph = connection_manager.get().component_graph
46+
#component_graph = connection_manager.get().component_graph
47+
component_graph = self._component_graph
4648
component_ids = self._config.component_ids
4749
if component_ids:
4850
pv_components = component_graph.components(set(component_ids))
@@ -125,13 +127,9 @@ def _get_fallback_formulas(
125127
fallback_ids = [c.component_id for c in fallback_components]
126128

127129
generator = PVPowerFormula(
128-
f"{self._namespace}_fallback_{fallback_ids}",
129-
self._channel_registry,
130-
self._resampler_subscription_sender,
131-
FormulaGeneratorConfig(
132-
component_ids=set(fallback_ids),
133-
allow_fallback=False,
134-
),
130+
self._get_receiver,
131+
self._config,
132+
self._component_graph,
135133
)
136134

137135
fallback_formulas[primary_component] = FallbackFormulaMetricFetcher(

src/frequenz/client/reporting/formula_engine/_resampled_formula_builder.py

Lines changed: 16 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,10 @@
88
from collections.abc import Callable
99

1010
from frequenz.channels import Receiver, Sender
11-
from frequenz.client.microgrid import ComponentMetricId
11+
from frequenz.client.reporting.component_graph import ComponentMetricId
1212

13-
from ..._internal._channels import ChannelRegistry
14-
from ...microgrid._data_sourcing import ComponentMetricRequest
13+
#from ..._internal._channels import ChannelRegistry
14+
#from ...microgrid._data_sourcing import ComponentMetricRequest
1515
from .. import Sample
1616
from .._quantities import Quantity, QuantityT
1717
from ._formula_engine import FormulaBuilder, FormulaEngine
@@ -24,10 +24,11 @@ class ResampledFormulaBuilder(FormulaBuilder[QuantityT]):
2424

2525
def __init__( # pylint: disable=too-many-arguments
2626
self,
27-
namespace: str,
27+
#namespace: str,
2828
formula_name: str,
29-
channel_registry: ChannelRegistry,
30-
resampler_subscription_sender: Sender[ComponentMetricRequest],
29+
#channel_registry: ChannelRegistry,
30+
#resampler_subscription_sender: Sender[ComponentMetricRequest],
31+
get_receiver: Callable[[int, ComponentMetricId], Receiver[Sample[Quantity]]],
3132
metric_id: ComponentMetricId,
3233
create_method: Callable[[float], QuantityT],
3334
) -> None:
@@ -46,44 +47,18 @@ def __init__( # pylint: disable=too-many-arguments
4647
formula is for generating power values, this would be
4748
`Power.from_watts`, for example.
4849
"""
49-
self._channel_registry: ChannelRegistry = channel_registry
50-
self._resampler_subscription_sender: Sender[ComponentMetricRequest] = (
51-
resampler_subscription_sender
52-
)
53-
self._namespace: str = namespace
50+
#self._channel_registry: ChannelRegistry = channel_registry
51+
#self._resampler_subscription_sender: Sender[ComponentMetricRequest] = (
52+
# resampler_subscription_sender
53+
#)
54+
#self._namespace: str = namespace
55+
self._get_resampled_receiver: Callable[
56+
[int, ComponentMetricId], Receiver[Sample[Quantity]]
57+
] = get_receiver
5458
self._metric_id: ComponentMetricId = metric_id
55-
self._resampler_requests: list[ComponentMetricRequest] = []
59+
self._resampler_requests: list = [] # list[ComponentMetricRequest] = []
5660
super().__init__(formula_name, create_method) # type: ignore[arg-type]
5761

58-
def _get_resampled_receiver(
59-
self, component_id: int, metric_id: ComponentMetricId
60-
) -> Receiver[Sample[QuantityT]]:
61-
"""Get a receiver with the resampled data for the given component id.
62-
63-
Args:
64-
component_id: The component id for which to get a resampled data receiver.
65-
metric_id: A metric ID to fetch for all components in this formula.
66-
67-
Returns:
68-
A receiver to stream resampled data for the given component id.
69-
"""
70-
request = ComponentMetricRequest(self._namespace, component_id, metric_id, None)
71-
self._resampler_requests.append(request)
72-
resampled_channel = self._channel_registry.get_or_create(
73-
Sample[Quantity], request.get_channel_name()
74-
)
75-
resampled_receiver = resampled_channel.new_receiver().map(
76-
lambda sample: Sample(
77-
sample.timestamp,
78-
(
79-
self._create_method(sample.value.base_value)
80-
if sample.value is not None
81-
else None
82-
),
83-
)
84-
)
85-
return resampled_receiver
86-
8762
async def subscribe(self) -> None:
8863
"""Subscribe to all resampled component metric streams."""
8964
for request in self._resampler_requests:

0 commit comments

Comments
 (0)