Skip to content

Commit 23ff30c

Browse files
authored
Make interface for BatteryPool metrics consistent with power methods (frequenz-floss#381)
The `soc`, `capacity`, `power_bounds` methods of the `BatteryPool` were directly returning receivers. They've now been converted to properties, from which `new_receiver()` has to be called, to get a receiver. New: ``` python soc_recv = battery_pool.soc.new_receiver() ``` Old: ``` python soc_recv = battery_pool.soc() ```
2 parents 16a677b + 7e77a18 commit 23ff30c

File tree

5 files changed

+50
-42
lines changed

5 files changed

+50
-42
lines changed

RELEASE_NOTES.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,14 @@ This release drops support for Python versions older than 3.11.
1616

1717
* Now `frequenz.sdk.timeseries.Sample` uses a more sensible comparison. Before this release `Sample`s were compared only based on the `timestamp`. This was due to a limitation in Python versions earlier than 3.10. Now that the minimum supported version is 3.11 this hack is not needed anymore and `Sample`s are compared using both `timestamp` and `value` as most people probably expects.
1818

19+
* `BatteryPool` metric streaming interfaces have changed for `soc`, `capacity` and `power_bounds`:
20+
21+
```python
22+
soc_rx = battery_pool.soc() # old
23+
24+
soc_rx = battery_pool.soc.new_receiver() # new
25+
```
26+
1927
## New Features
2028

2129
<!-- Here goes the main new features and examples or instructions on how to use them -->

examples/battery_pool.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,9 +33,9 @@ async def main() -> None:
3333

3434
battery_pool = microgrid.battery_pool()
3535
receivers: Dict[str, Receiver[Any]] = {
36-
"soc": await battery_pool.soc(maxsize=1),
37-
"capacity": await battery_pool.capacity(maxsize=1),
38-
"power_bounds": await battery_pool.power_bounds(maxsize=1),
36+
"soc": battery_pool.soc.new_receiver(maxsize=1),
37+
"capacity": battery_pool.capacity.new_receiver(maxsize=1),
38+
"power_bounds": battery_pool.power_bounds.new_receiver(maxsize=1),
3939
}
4040

4141
merged_channel = MergeNamed[Any](**receivers)

src/frequenz/sdk/timeseries/battery_pool/_methods.py

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
from frequenz.channels import Broadcast, Receiver
1414

1515
from ..._internal._asyncio import cancel_and_await
16-
from ..._internal._constants import WAIT_FOR_COMPONENT_DATA_SEC
16+
from ..._internal._constants import RECEIVER_MAX_SIZE, WAIT_FOR_COMPONENT_DATA_SEC
1717
from ._component_metric_fetcher import (
1818
ComponentMetricFetcher,
1919
LatestBatteryMetricsFetcher,
@@ -25,7 +25,7 @@
2525
_logger = logging.getLogger(__name__)
2626

2727

28-
class AggregateMethod(Generic[T], ABC):
28+
class MetricAggregator(Generic[T], ABC):
2929
"""Interface to control how the component data should be aggregated and send."""
3030

3131
@abstractmethod
@@ -37,7 +37,9 @@ def update_working_batteries(self, new_working_batteries: set[int]) -> None:
3737
"""
3838

3939
@abstractmethod
40-
def new_receiver(self, maxsize: int | None) -> Receiver[T | None]:
40+
def new_receiver(
41+
self, maxsize: int | None = RECEIVER_MAX_SIZE
42+
) -> Receiver[T | None]:
4143
"""Return new receiver for the aggregated metric results.
4244
4345
Args:
@@ -61,7 +63,7 @@ def name(cls) -> str:
6163
"""
6264

6365

64-
class SendOnUpdate(AggregateMethod[T]):
66+
class SendOnUpdate(MetricAggregator[T]):
6567
"""Wait for the change of the components metrics and send updated result.
6668
6769
This method will cache the component metrics. When any metric change it will
@@ -110,7 +112,9 @@ def name(cls) -> str:
110112
"""
111113
return "SendOnUpdate"
112114

113-
def new_receiver(self, maxsize: int | None) -> Receiver[T | None]:
115+
def new_receiver(
116+
self, maxsize: int | None = RECEIVER_MAX_SIZE
117+
) -> Receiver[T | None]:
114118
"""Return new receiver for the aggregated metric results.
115119
116120
Args:

src/frequenz/sdk/timeseries/battery_pool/battery_pool.py

Lines changed: 27 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414
from frequenz.channels import Receiver, Sender
1515

1616
from ..._internal._asyncio import cancel_and_await
17-
from ..._internal._constants import RECEIVER_MAX_SIZE
1817
from ...actor import ChannelRegistry, ComponentMetricRequest
1918
from ...actor.power_distributing._battery_pool_status import BatteryStatus
2019
from ...microgrid import connection_manager
@@ -25,7 +24,7 @@
2524
FormulaGeneratorConfig,
2625
FormulaType,
2726
)
28-
from ._methods import AggregateMethod, SendOnUpdate
27+
from ._methods import MetricAggregator, SendOnUpdate
2928
from ._metric_calculator import CapacityCalculator, PowerBoundsCalculator, SoCCalculator
3029
from ._result_types import CapacityMetrics, PowerMetrics, SoCMetrics
3130

@@ -85,7 +84,7 @@ def __init__( # pylint: disable=too-many-arguments
8584
)
8685

8786
self._min_update_interval = min_update_interval
88-
self._active_methods: dict[str, AggregateMethod[Any]] = {}
87+
self._active_methods: dict[str, MetricAggregator[Any]] = {}
8988

9089
self._namespace: str = f"battery-pool-{self._batteries}-{uuid.uuid4()}"
9190
self._formula_pool: FormulaEnginePool = FormulaEnginePool(
@@ -184,19 +183,19 @@ def consumption_power(self) -> FormulaEngine:
184183
assert isinstance(engine, FormulaEngine)
185184
return engine
186185

187-
async def soc(
188-
self, maxsize: int | None = RECEIVER_MAX_SIZE
189-
) -> Receiver[SoCMetrics | None]:
186+
@property
187+
def soc(self) -> MetricAggregator[SoCMetrics]:
190188
"""Get receiver to receive new soc metrics when they change.
191189
192-
Soc formulas are described in the receiver return type.
193-
None will be send if there is no component to calculate metric.
190+
Soc formulas are described in the receiver return type. None will be send if
191+
there is no component to calculate metric.
194192
195-
Args:
196-
maxsize: Maxsize of the receiver channel.
193+
A receiver from the MetricAggregator can be obtained by calling the
194+
`new_receiver` method.
197195
198196
Returns:
199-
Receiver for this metric.
197+
A MetricAggregator that will calculate and stream the aggregate soc of
198+
all batteries in the pool.
200199
"""
201200
method_name = SendOnUpdate.name() + "_" + SoCCalculator.name()
202201

@@ -208,22 +207,21 @@ async def soc(
208207
min_update_interval=self._min_update_interval,
209208
)
210209

211-
running_method = self._active_methods[method_name]
212-
return running_method.new_receiver(maxsize)
210+
return self._active_methods[method_name]
213211

214-
async def capacity(
215-
self, maxsize: int | None = RECEIVER_MAX_SIZE
216-
) -> Receiver[CapacityMetrics | None]:
212+
@property
213+
def capacity(self) -> MetricAggregator[CapacityMetrics]:
217214
"""Get receiver to receive new capacity metrics when they change.
218215
219-
Capacity formulas are described in the receiver return type.
220-
None will be send if there is no component to calculate metrics.
216+
Capacity formulas are described in the receiver return type. None will be send
217+
if there is no component to calculate metrics.
221218
222-
Args:
223-
maxsize: Maxsize of the receiver channel.
219+
A receiver from the MetricAggregator can be obtained by calling the
220+
`new_receiver` method.
224221
225222
Returns:
226-
Receiver for this metric.
223+
A MetricAggregator that will calculate and stream the capacity of all
224+
batteries in the pool.
227225
"""
228226
method_name = SendOnUpdate.name() + "_" + CapacityCalculator.name()
229227

@@ -235,22 +233,21 @@ async def capacity(
235233
min_update_interval=self._min_update_interval,
236234
)
237235

238-
running_method = self._active_methods[method_name]
239-
return running_method.new_receiver(maxsize)
236+
return self._active_methods[method_name]
240237

241-
async def power_bounds(
242-
self, maxsize: int | None = RECEIVER_MAX_SIZE
243-
) -> Receiver[PowerMetrics | None]:
238+
@property
239+
def power_bounds(self) -> MetricAggregator[PowerMetrics]:
244240
"""Get receiver to receive new power bounds when they change.
245241
246242
Power bounds formulas are described in the receiver return type.
247243
None will be send if there is no component to calculate metrics.
248244
249-
Args:
250-
maxsize: Maxsize of the receivers channel.
245+
A receiver from the MetricAggregator can be obtained by calling the
246+
`new_receiver` method.
251247
252248
Returns:
253-
Receiver for this metric.
249+
A MetricAggregator that will calculate and stream the power bounds
250+
of all batteries in the pool.
254251
"""
255252
method_name = SendOnUpdate.name() + "_" + PowerBoundsCalculator.name()
256253

@@ -262,8 +259,7 @@ async def power_bounds(
262259
min_update_interval=self._min_update_interval,
263260
)
264261

265-
running_method = self._active_methods[method_name]
266-
return running_method.new_receiver(maxsize)
262+
return self._active_methods[method_name]
267263

268264
async def stop(self) -> None:
269265
"""Stop all pending async tasks."""

tests/timeseries/_battery_pool/test_battery_pool.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -500,7 +500,7 @@ async def run_capacity_test(setup_args: SetupArgs) -> None:
500500
sampling_rate=0.05,
501501
)
502502

503-
capacity_receiver = await battery_pool.capacity(maxsize=50)
503+
capacity_receiver = battery_pool.capacity.new_receiver(maxsize=50)
504504

505505
# First metrics delivers slower because of the startup delay in the pool.
506506
msg = await asyncio.wait_for(
@@ -633,7 +633,7 @@ async def run_soc_test(setup_args: SetupArgs) -> None:
633633
sampling_rate=0.05,
634634
)
635635

636-
receiver = await battery_pool.soc(maxsize=50)
636+
receiver = battery_pool.soc.new_receiver(maxsize=50)
637637

638638
# First metrics delivers slower because of the startup delay in the pool.
639639
msg = await asyncio.wait_for(
@@ -775,7 +775,7 @@ async def run_power_bounds_test( # pylint: disable=too-many-locals
775775
sampling_rate=0.1,
776776
)
777777

778-
receiver = await battery_pool.power_bounds(maxsize=50)
778+
receiver = battery_pool.power_bounds.new_receiver(maxsize=50)
779779

780780
# First metrics delivers slower because of the startup delay in the pool.
781781
msg = await asyncio.wait_for(

0 commit comments

Comments
 (0)