Skip to content

Commit 37b4062

Browse files
committed
Move reusable parts of EVChargerPool to EVChargerPoolReferenceStore
This allows the efficient creation of multiple `EVChargerPool` instances for the same set of EV chargers, but with different priorities, while reusing the formulas and bounds by having only a single instance of the `EVChargerPoolReferenceStore` class for any given set of EV chargers. Signed-off-by: Sahas Subramanian <[email protected]>
1 parent 11999c2 commit 37b4062

File tree

4 files changed

+147
-71
lines changed

4 files changed

+147
-71
lines changed

src/frequenz/sdk/microgrid/_data_pipeline.py

Lines changed: 31 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,9 @@
3838
)
3939
from ..timeseries.consumer import Consumer
4040
from ..timeseries.ev_charger_pool import EVChargerPool
41+
from ..timeseries.ev_charger_pool._ev_charger_pool_reference_store import (
42+
EVChargerPoolReferenceStore,
43+
)
4144
from ..timeseries.logical_meter import LogicalMeter
4245
from ..timeseries.producer import Producer
4346

@@ -100,7 +103,7 @@ def __init__(
100103
self._consumer: Consumer | None = None
101104
self._producer: Producer | None = None
102105
self._grid: Grid | None = None
103-
self._ev_charger_pools: dict[frozenset[int], EVChargerPool] = {}
106+
self._ev_charger_pools: dict[frozenset[int], EVChargerPoolReferenceStore] = {}
104107
self._battery_pools: dict[frozenset[int], BatteryPoolReferenceStore] = {}
105108
self._frequency_instance: GridFrequency | None = None
106109
self._voltage_instance: VoltageStreamer | None = None
@@ -161,6 +164,8 @@ def producer(self) -> Producer:
161164
def ev_charger_pool(
162165
self,
163166
ev_charger_ids: abc.Set[int] | None = None,
167+
name: str | None = None,
168+
priority: int = -sys.maxsize - 1,
164169
) -> EVChargerPool:
165170
"""Return the corresponding EVChargerPool instance for the given ids.
166171
@@ -170,11 +175,17 @@ def ev_charger_pool(
170175
Args:
171176
ev_charger_ids: Optional set of IDs of EV Chargers to be managed by the
172177
EVChargerPool.
178+
name: An optional name used to identify this instance of the pool or a
179+
corresponding actor in the logs.
180+
priority: The priority of the actor making the call.
173181
174182
Returns:
175183
An EVChargerPool instance.
176184
"""
177185
from ..timeseries.ev_charger_pool import EVChargerPool
186+
from ..timeseries.ev_charger_pool._ev_charger_pool_reference_store import (
187+
EVChargerPoolReferenceStore,
188+
)
178189

179190
if not self._ev_power_wrapper.started:
180191
self._ev_power_wrapper.start()
@@ -185,15 +196,15 @@ def ev_charger_pool(
185196
key = frozenset(ev_charger_ids)
186197

187198
if key not in self._ev_charger_pools:
188-
self._ev_charger_pools[key] = EVChargerPool(
199+
self._ev_charger_pools[key] = EVChargerPoolReferenceStore(
189200
channel_registry=self._channel_registry,
190201
resampler_subscription_sender=self._resampling_request_sender(),
191202
status_receiver=self._ev_power_wrapper.status_channel.new_receiver(
192203
maxsize=1
193204
),
194205
component_ids=ev_charger_ids,
195206
)
196-
return self._ev_charger_pools[key]
207+
return EVChargerPool(self._ev_charger_pools[key], name, priority)
197208

198209
def grid(self) -> Grid:
199210
"""Return the grid measuring point."""
@@ -364,21 +375,32 @@ def producer() -> Producer:
364375
return _get().producer()
365376

366377

367-
def ev_charger_pool(ev_charger_ids: abc.Set[int] | None = None) -> EVChargerPool:
368-
"""Return the corresponding EVChargerPool instance for the given ids.
378+
def ev_charger_pool(
379+
ev_charger_ids: abc.Set[int] | None = None,
380+
name: str | None = None,
381+
priority: int = -sys.maxsize - 1,
382+
) -> EVChargerPool:
383+
"""Return a new `EVChargerPool` instance for the given parameters.
369384
370-
If an EVChargerPool instance for the given ids doesn't exist, a new one is
371-
created and returned.
385+
The priority value is used to resolve conflicts when multiple actors are trying to
386+
propose different power values for the same set of EV chargers.
387+
388+
!!! note
389+
When specifying priority, bigger values indicate higher priority. The default
390+
priority is the lowest possible value.
372391
373392
Args:
374393
ev_charger_ids: Optional set of IDs of EV Chargers to be managed by the
375394
EVChargerPool. If not specified, all EV Chargers available in the
376395
component graph are used.
396+
name: An optional name used to identify this instance of the pool or a
397+
corresponding actor in the logs.
398+
priority: The priority of the actor making the call.
377399
378400
Returns:
379-
An EVChargerPool instance.
401+
An `EVChargerPool` instance.
380402
"""
381-
return _get().ev_charger_pool(ev_charger_ids)
403+
return _get().ev_charger_pool(ev_charger_ids, name, priority)
382404

383405

384406
def battery_pool(

src/frequenz/sdk/timeseries/ev_charger_pool/__init__.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,8 @@
44
"""Interactions with EV Chargers."""
55

66
from ._ev_charger_pool import EVChargerPool, EVChargerPoolError
7-
from ._set_current_bounds import ComponentCurrentLimit
87

98
__all__ = [
10-
"ComponentCurrentLimit",
119
"EVChargerPool",
1210
"EVChargerPoolError",
1311
]

src/frequenz/sdk/timeseries/ev_charger_pool/_ev_charger_pool.py

Lines changed: 25 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -7,23 +7,16 @@
77
import uuid
88
from collections import abc
99

10-
from frequenz.channels import Broadcast, Receiver, Sender
11-
1210
from ..._internal._channels import ReceiverFetcher
13-
from ...actor import ChannelRegistry, ComponentMetricRequest
14-
from ...actor.power_distributing import ComponentPoolStatus
15-
from ...microgrid import connection_manager
16-
from ...microgrid.component import ComponentCategory
1711
from .._base_types import SystemBounds
1812
from .._quantities import Current, Power
1913
from ..formula_engine import FormulaEngine, FormulaEngine3Phase
20-
from ..formula_engine._formula_engine_pool import FormulaEnginePool
2114
from ..formula_engine._formula_generators import (
2215
EVChargerCurrentFormula,
2316
EVChargerPowerFormula,
2417
FormulaGeneratorConfig,
2518
)
26-
from ._system_bounds_tracker import EVCSystemBoundsTracker
19+
from ._ev_charger_pool_reference_store import EVChargerPoolReferenceStore
2720

2821

2922
class EVChargerPoolError(Exception):
@@ -47,10 +40,9 @@ class EVChargerPool:
4740

4841
def __init__( # pylint: disable=too-many-arguments
4942
self,
50-
channel_registry: ChannelRegistry,
51-
resampler_subscription_sender: Sender[ComponentMetricRequest],
52-
status_receiver: Receiver[ComponentPoolStatus],
53-
component_ids: abc.Set[int] | None = None,
43+
ev_charger_pool_ref: EVChargerPoolReferenceStore,
44+
name: str | None,
45+
priority: int,
5446
) -> None:
5547
"""Create an `EVChargerPool` instance.
5648
@@ -60,46 +52,15 @@ def __init__( # pylint: disable=too-many-arguments
6052
method for creating `EVChargerPool` instances.
6153
6254
Args:
63-
channel_registry: A channel registry instance shared with the resampling
64-
actor.
65-
resampler_subscription_sender: A sender for sending metric requests to the
66-
resampling actor.
67-
status_receiver: A receiver that streams the status of the EV Chargers in
68-
the pool.
69-
component_ids: An optional list of component_ids belonging to this pool. If
70-
not specified, IDs of all EV Chargers in the microgrid will be fetched
71-
from the component graph.
55+
ev_charger_pool_ref: The EV charger pool reference store instance.
56+
name: An optional name used to identify this instance of the pool or a
57+
corresponding actor in the logs.
58+
priority: The priority of the actor using this wrapper.
7259
"""
73-
self._channel_registry: ChannelRegistry = channel_registry
74-
self._resampler_subscription_sender: Sender[ComponentMetricRequest] = (
75-
resampler_subscription_sender
76-
)
77-
self._status_receiver: Receiver[ComponentPoolStatus] = status_receiver
78-
self._component_ids: abc.Set[int] = set()
79-
if component_ids is not None:
80-
self._component_ids = component_ids
81-
else:
82-
graph = connection_manager.get().component_graph
83-
self._component_ids = {
84-
evc.component_id
85-
for evc in graph.components(
86-
component_categories={ComponentCategory.EV_CHARGER}
87-
)
88-
}
89-
self._namespace: str = f"ev-charger-pool-{uuid.uuid4()}"
90-
self._formula_pool: FormulaEnginePool = FormulaEnginePool(
91-
self._namespace,
92-
self._channel_registry,
93-
self._resampler_subscription_sender,
94-
)
95-
96-
self._bounds_channel: Broadcast[SystemBounds] = Broadcast(
97-
name=f"System Bounds for EV Chargers: {component_ids}"
98-
)
99-
self._bounds_tracker: EVCSystemBoundsTracker = EVCSystemBoundsTracker(
100-
self.component_ids, self._status_receiver, self._bounds_channel.new_sender()
101-
)
102-
self._bounds_tracker.start()
60+
self._ev_charger_pool = ev_charger_pool_ref
61+
unique_id = uuid.uuid4()
62+
self._source_id = unique_id if name is None else f"{name}-{unique_id}"
63+
self._priority = priority
10364

10465
@property
10566
def component_ids(self) -> abc.Set[int]:
@@ -108,7 +69,7 @@ def component_ids(self) -> abc.Set[int]:
10869
Returns:
10970
Set of managed component IDs.
11071
"""
111-
return self._component_ids
72+
return self._ev_charger_pool.component_ids
11273

11374
@property
11475
def current(self) -> FormulaEngine3Phase[Current]:
@@ -126,10 +87,14 @@ def current(self) -> FormulaEngine3Phase[Current]:
12687
A FormulaEngine that will calculate and stream the total current of all EV
12788
Chargers.
12889
"""
129-
engine = self._formula_pool.from_3_phase_current_formula_generator(
130-
"ev_charger_total_current",
131-
EVChargerCurrentFormula,
132-
FormulaGeneratorConfig(component_ids=self._component_ids),
90+
engine = (
91+
self._ev_charger_pool.formula_pool.from_3_phase_current_formula_generator(
92+
"ev_charger_total_current",
93+
EVChargerCurrentFormula,
94+
FormulaGeneratorConfig(
95+
component_ids=self._ev_charger_pool.component_ids
96+
),
97+
)
13398
)
13499
assert isinstance(engine, FormulaEngine3Phase)
135100
return engine
@@ -150,21 +115,21 @@ def power(self) -> FormulaEngine[Power]:
150115
A FormulaEngine that will calculate and stream the total power of all EV
151116
Chargers.
152117
"""
153-
engine = self._formula_pool.from_power_formula_generator(
118+
engine = self._ev_charger_pool.formula_pool.from_power_formula_generator(
154119
"ev_charger_power",
155120
EVChargerPowerFormula,
156121
FormulaGeneratorConfig(
157-
component_ids=self._component_ids,
122+
component_ids=self._ev_charger_pool.component_ids,
158123
),
159124
)
160125
assert isinstance(engine, FormulaEngine)
161126
return engine
162127

163128
async def stop(self) -> None:
164129
"""Stop all tasks and channels owned by the EVChargerPool."""
165-
await self._formula_pool.stop()
130+
await self._ev_charger_pool.stop()
166131

167132
@property
168133
def _system_power_bounds(self) -> ReceiverFetcher[SystemBounds]:
169134
"""Return a receiver for the system power bounds."""
170-
return self._bounds_channel
135+
return self._ev_charger_pool.bounds_channel
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
# License: MIT
2+
# Copyright © 2024 Frequenz Energy-as-a-Service GmbH
3+
4+
"""Manages shared state/tasks for a set of EV chargers."""
5+
6+
7+
import uuid
8+
from collections import abc
9+
10+
from frequenz.channels import Broadcast, Receiver, Sender
11+
12+
from ...actor import ChannelRegistry, ComponentMetricRequest
13+
from ...actor.power_distributing import ComponentPoolStatus
14+
from ...microgrid import connection_manager
15+
from ...microgrid.component import ComponentCategory
16+
from .._base_types import SystemBounds
17+
from ..formula_engine._formula_engine_pool import FormulaEnginePool
18+
from ._system_bounds_tracker import EVCSystemBoundsTracker
19+
20+
21+
class EVChargerPoolReferenceStore:
22+
"""A class for maintaining the shared state/tasks for a set of pool of EV chargers.
23+
24+
This includes ownership of
25+
- the formula engine pool and metric calculators.
26+
- the tasks for calculating system bounds for the EV chargers.
27+
28+
These are independent of the priority of the actors and can be shared between
29+
multiple users of the same set of EV chargers.
30+
31+
They are exposed through the EVChargerPool class.
32+
"""
33+
34+
def __init__(
35+
self,
36+
channel_registry: ChannelRegistry,
37+
resampler_subscription_sender: Sender[ComponentMetricRequest],
38+
status_receiver: Receiver[ComponentPoolStatus],
39+
component_ids: abc.Set[int] | None = None,
40+
):
41+
"""Create an instance of the class.
42+
43+
Args:
44+
channel_registry: A channel registry instance shared with the resampling
45+
actor.
46+
resampler_subscription_sender: A sender for sending metric requests to the
47+
resampling actor.
48+
status_receiver: A receiver that streams the status of the EV Chargers in
49+
the pool.
50+
component_ids: An optional list of component_ids belonging to this pool. If
51+
not specified, IDs of all EV Chargers in the microgrid will be fetched
52+
from the component graph.
53+
"""
54+
self.channel_registry = channel_registry
55+
self.resampler_subscription_sender = resampler_subscription_sender
56+
self.status_receiver = status_receiver
57+
58+
if component_ids is not None:
59+
self.component_ids: frozenset[int] = frozenset(component_ids)
60+
else:
61+
graph = connection_manager.get().component_graph
62+
self.component_ids = frozenset(
63+
{
64+
evc.component_id
65+
for evc in graph.components(
66+
component_categories={ComponentCategory.EV_CHARGER}
67+
)
68+
}
69+
)
70+
71+
self.namespace: str = f"ev-charger-pool-{uuid.uuid4()}"
72+
self.formula_pool = FormulaEnginePool(
73+
self.namespace,
74+
self.channel_registry,
75+
self.resampler_subscription_sender,
76+
)
77+
78+
self.bounds_channel: Broadcast[SystemBounds] = Broadcast(
79+
name=f"System Bounds for EV Chargers: {component_ids}"
80+
)
81+
self.bounds_tracker: EVCSystemBoundsTracker = EVCSystemBoundsTracker(
82+
self.component_ids,
83+
self.status_receiver,
84+
self.bounds_channel.new_sender(),
85+
)
86+
self.bounds_tracker.start()
87+
88+
async def stop(self) -> None:
89+
"""Stop all tasks and channels owned by the EVChargerPool."""
90+
await self.formula_pool.stop()
91+
await self.bounds_tracker.stop()

0 commit comments

Comments
 (0)