Skip to content

Commit 338a1b2

Browse files
committed
Migrate ComponentMetricsResamplingActor to the Actor class
Signed-off-by: Leandro Lucarella <[email protected]>
1 parent 3c2768f commit 338a1b2

File tree

4 files changed

+86
-85
lines changed

4 files changed

+86
-85
lines changed

src/frequenz/sdk/actor/_resampling.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,15 +15,14 @@
1515
from ..timeseries import Sample
1616
from ..timeseries._quantities import Quantity
1717
from ..timeseries._resampling import Resampler, ResamplerConfig, ResamplingError
18+
from ._actor import Actor
1819
from ._channel_registry import ChannelRegistry
1920
from ._data_sourcing import ComponentMetricRequest
20-
from ._decorator import actor
2121

2222
_logger = logging.getLogger(__name__)
2323

2424

25-
@actor
26-
class ComponentMetricsResamplingActor:
25+
class ComponentMetricsResamplingActor(Actor):
2726
"""An actor to resample microgrid component metrics."""
2827

2928
def __init__( # pylint: disable=too-many-arguments
@@ -46,6 +45,7 @@ def __init__( # pylint: disable=too-many-arguments
4645
resampmling subscription requests.
4746
config: The configuration for the resampler.
4847
"""
48+
super().__init__()
4949
self._channel_registry: ChannelRegistry = channel_registry
5050
self._data_sourcing_request_sender: Sender[
5151
ComponentMetricRequest
@@ -91,7 +91,7 @@ async def _process_resampling_requests(self) -> None:
9191
async for request in self._resampling_request_receiver:
9292
await self._subscribe(request)
9393

94-
async def run(self) -> None:
94+
async def _run(self) -> None:
9595
"""Resample known component metrics and process resampling requests.
9696
9797
If there is a resampling error while resampling some component metric,

src/frequenz/sdk/microgrid/_data_pipeline.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -264,6 +264,11 @@ def _resampling_request_sender(self) -> Sender[ComponentMetricRequest]:
264264
self._resampling_actor = _ActorInfo(actor, channel)
265265
return self._resampling_actor.channel.new_sender()
266266

267+
async def _start(self) -> None:
268+
"""Start the data pipeline actors."""
269+
if self._resampling_actor:
270+
await self._resampling_actor.actor.start()
271+
267272
async def _stop(self) -> None:
268273
"""Stop the data pipeline actors."""
269274
# pylint: disable=protected-access
@@ -291,6 +296,7 @@ async def initialize(resampler_config: ResamplerConfig) -> None:
291296
if _DATA_PIPELINE is not None:
292297
raise RuntimeError("DataPipeline is already initialized.")
293298
_DATA_PIPELINE = _DataPipeline(resampler_config)
299+
await _DATA_PIPELINE._start() # pylint: disable=protected-access
294300

295301

296302
def logical_meter() -> LogicalMeter:

src/frequenz/sdk/timeseries/logical_meter/_logical_meter.py

Lines changed: 27 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -57,41 +57,40 @@ class LogicalMeter:
5757
)
5858
5959
# Instantiate a resampling actor
60-
_resampling_actor = ComponentMetricsResamplingActor(
60+
async with ComponentMetricsResamplingActor(
6161
channel_registry=channel_registry,
6262
data_sourcing_request_sender=data_source_request_sender,
6363
resampling_request_receiver=resampling_request_receiver,
6464
config=ResamplerConfig(resampling_period=timedelta(seconds=1)),
65-
)
66-
67-
await initialize(
68-
"127.0.0.1",
69-
50051,
70-
ResamplerConfig(resampling_period=timedelta(seconds=1))
71-
)
72-
73-
# Create a logical meter instance
74-
logical_meter = LogicalMeter(
75-
channel_registry,
76-
resampling_request_sender,
77-
)
65+
):
66+
await initialize(
67+
"127.0.0.1",
68+
50051,
69+
ResamplerConfig(resampling_period=timedelta(seconds=1))
70+
)
7871
79-
# Get a receiver for a builtin formula
80-
grid_power_recv = logical_meter.grid_power.new_receiver()
81-
for grid_power_sample in grid_power_recv:
82-
print(grid_power_sample)
72+
# Create a logical meter instance
73+
logical_meter = LogicalMeter(
74+
channel_registry,
75+
resampling_request_sender,
76+
)
8377
84-
# or compose formula receivers to create a new formula
85-
net_power_recv = (
86-
(
87-
logical_meter.grid_power
88-
- logical_meter.pv_power
78+
# Get a receiver for a builtin formula
79+
grid_power_recv = logical_meter.grid_power.new_receiver()
80+
for grid_power_sample in grid_power_recv:
81+
print(grid_power_sample)
82+
83+
# or compose formula receivers to create a new formula
84+
net_power_recv = (
85+
(
86+
logical_meter.grid_power
87+
- logical_meter.pv_power
88+
)
89+
.build("net_power")
90+
.new_receiver()
8991
)
90-
.build("net_power")
91-
.new_receiver()
92-
)
93-
for net_power_sample in net_power_recv:
94-
print(net_power_sample)
92+
for net_power_sample in net_power_recv:
93+
print(net_power_sample)
9594
```
9695
"""
9796

tests/actor/test_resampling.py

Lines changed: 49 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -122,39 +122,37 @@ async def test_single_request(
122122
resampling_req_chan = Broadcast[ComponentMetricRequest]("resample-req")
123123
resampling_req_sender = resampling_req_chan.new_sender()
124124

125-
resampling_actor = ComponentMetricsResamplingActor(
125+
async with ComponentMetricsResamplingActor(
126126
channel_registry=channel_registry,
127127
data_sourcing_request_sender=data_source_req_chan.new_sender(),
128128
resampling_request_receiver=resampling_req_chan.new_receiver(),
129129
config=ResamplerConfig(
130130
resampling_period=timedelta(seconds=0.2),
131131
max_data_age_in_periods=2,
132132
),
133-
)
134-
135-
subs_req = ComponentMetricRequest(
136-
namespace="Resampling",
137-
component_id=9,
138-
metric_id=ComponentMetricId.SOC,
139-
start_time=None,
140-
)
141-
142-
await resampling_req_sender.send(subs_req)
143-
data_source_req = await data_source_req_recv.receive()
144-
assert data_source_req is not None
145-
assert data_source_req == dataclasses.replace(
146-
subs_req, namespace="Resampling:Source"
147-
)
148-
149-
await _assert_resampling_works(
150-
channel_registry,
151-
fake_time,
152-
resampling_chan_name=subs_req.get_channel_name(),
153-
data_source_chan_name=data_source_req.get_channel_name(),
154-
)
155-
156-
await resampling_actor._stop() # type: ignore # pylint: disable=no-member,protected-access
157-
await resampling_actor._resampler.stop() # pylint: disable=protected-access
133+
) as resampling_actor:
134+
subs_req = ComponentMetricRequest(
135+
namespace="Resampling",
136+
component_id=9,
137+
metric_id=ComponentMetricId.SOC,
138+
start_time=None,
139+
)
140+
141+
await resampling_req_sender.send(subs_req)
142+
data_source_req = await data_source_req_recv.receive()
143+
assert data_source_req is not None
144+
assert data_source_req == dataclasses.replace(
145+
subs_req, namespace="Resampling:Source"
146+
)
147+
148+
await _assert_resampling_works(
149+
channel_registry,
150+
fake_time,
151+
resampling_chan_name=subs_req.get_channel_name(),
152+
data_source_chan_name=data_source_req.get_channel_name(),
153+
)
154+
155+
await resampling_actor._resampler.stop() # pylint: disable=protected-access
158156

159157

160158
async def test_duplicate_request(
@@ -168,37 +166,35 @@ async def test_duplicate_request(
168166
resampling_req_chan = Broadcast[ComponentMetricRequest]("resample-req")
169167
resampling_req_sender = resampling_req_chan.new_sender()
170168

171-
resampling_actor = ComponentMetricsResamplingActor(
169+
async with ComponentMetricsResamplingActor(
172170
channel_registry=channel_registry,
173171
data_sourcing_request_sender=data_source_req_chan.new_sender(),
174172
resampling_request_receiver=resampling_req_chan.new_receiver(),
175173
config=ResamplerConfig(
176174
resampling_period=timedelta(seconds=0.2),
177175
max_data_age_in_periods=2,
178176
),
179-
)
180-
181-
subs_req = ComponentMetricRequest(
182-
namespace="Resampling",
183-
component_id=9,
184-
metric_id=ComponentMetricId.SOC,
185-
start_time=None,
186-
)
187-
188-
await resampling_req_sender.send(subs_req)
189-
data_source_req = await data_source_req_recv.receive()
190-
191-
# Send duplicate request
192-
await resampling_req_sender.send(subs_req)
193-
with pytest.raises(asyncio.TimeoutError):
194-
await asyncio.wait_for(data_source_req_recv.receive(), timeout=0.1)
195-
196-
await _assert_resampling_works(
197-
channel_registry,
198-
fake_time,
199-
resampling_chan_name=subs_req.get_channel_name(),
200-
data_source_chan_name=data_source_req.get_channel_name(),
201-
)
202-
203-
await resampling_actor._stop() # type: ignore # pylint: disable=no-member,protected-access
204-
await resampling_actor._resampler.stop() # pylint: disable=protected-access
177+
) as resampling_actor:
178+
subs_req = ComponentMetricRequest(
179+
namespace="Resampling",
180+
component_id=9,
181+
metric_id=ComponentMetricId.SOC,
182+
start_time=None,
183+
)
184+
185+
await resampling_req_sender.send(subs_req)
186+
data_source_req = await data_source_req_recv.receive()
187+
188+
# Send duplicate request
189+
await resampling_req_sender.send(subs_req)
190+
with pytest.raises(asyncio.TimeoutError):
191+
await asyncio.wait_for(data_source_req_recv.receive(), timeout=0.1)
192+
193+
await _assert_resampling_works(
194+
channel_registry,
195+
fake_time,
196+
resampling_chan_name=subs_req.get_channel_name(),
197+
data_source_chan_name=data_source_req.get_channel_name(),
198+
)
199+
200+
await resampling_actor._resampler.stop() # pylint: disable=protected-access

0 commit comments

Comments
 (0)