Skip to content

Commit a96e11c

Browse files
committed
DataPipeline: Fix resampling/ds/power actors not started
If any function using the resampling actor or datasourcing actor was used after the call to `_DataPipeline._start()` the according actors weer never started as they were only created once we requested them through functions like `logical_meter()` and similar. Additionally, the PowerDistributingActor was never started at all. This commit fixes both those issues. Signed-off-by: Mathias L. Baumann <[email protected]> # Conflicts: # src/frequenz/sdk/microgrid/_data_pipeline.py
1 parent 40e80d7 commit a96e11c

File tree

2 files changed

+78
-10
lines changed

2 files changed

+78
-10
lines changed

src/frequenz/sdk/microgrid/_data_pipeline.py

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -215,6 +215,7 @@ def _start_power_distributing_actor(self) -> None:
215215
channel_registry=self._channel_registry,
216216
battery_status_sender=self._battery_status_channel.new_sender(),
217217
)
218+
self._power_distributing_actor.start()
218219

219220
def _data_sourcing_request_sender(self) -> Sender[ComponentMetricRequest]:
220221
"""Return a Sender for sending requests to the data sourcing actor.
@@ -237,6 +238,7 @@ def _data_sourcing_request_sender(self) -> Sender[ComponentMetricRequest]:
237238
registry=self._channel_registry,
238239
)
239240
self._data_sourcing_actor = _ActorInfo(actor, channel)
241+
self._data_sourcing_actor.actor.start()
240242
return self._data_sourcing_actor.channel.new_sender()
241243

242244
def _resampling_request_sender(self) -> Sender[ComponentMetricRequest]:
@@ -262,17 +264,9 @@ def _resampling_request_sender(self) -> Sender[ComponentMetricRequest]:
262264
config=self._resampler_config,
263265
)
264266
self._resampling_actor = _ActorInfo(actor, channel)
267+
self._resampling_actor.actor.start()
265268
return self._resampling_actor.channel.new_sender()
266269

267-
async def _start(self) -> None:
268-
"""Start the data pipeline actors."""
269-
if self._data_sourcing_actor:
270-
await self._data_sourcing_actor.actor.start()
271-
if self._resampling_actor:
272-
await self._resampling_actor.actor.start()
273-
# The power distributing actor is started lazily when the first battery pool is
274-
# created.
275-
276270
async def _stop(self) -> None:
277271
"""Stop the data pipeline actors."""
278272
if self._data_sourcing_actor:
@@ -300,7 +294,6 @@ async def initialize(resampler_config: ResamplerConfig) -> None:
300294
if _DATA_PIPELINE is not None:
301295
raise RuntimeError("DataPipeline is already initialized.")
302296
_DATA_PIPELINE = _DataPipeline(resampler_config)
303-
await _DATA_PIPELINE._start() # pylint: disable=protected-access
304297

305298

306299
def logical_meter() -> LogicalMeter:
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
# License: MIT
2+
# Copyright © 2023 Frequenz Energy-as-a-Service GmbH
3+
4+
"""Basic tests for the DataPipeline."""
5+
6+
import asyncio
7+
from datetime import timedelta
8+
from typing import Iterator
9+
10+
import async_solipsism
11+
import pytest
12+
from pytest_mock import MockerFixture
13+
14+
from frequenz.sdk.microgrid._data_pipeline import _DataPipeline
15+
from frequenz.sdk.microgrid.client import Connection
16+
from frequenz.sdk.microgrid.component import Component, ComponentCategory, InverterType
17+
from frequenz.sdk.timeseries._resampling import ResamplerConfig
18+
19+
from ..utils.mock_microgrid_client import MockMicrogridClient
20+
21+
22+
@pytest.fixture
23+
def event_loop() -> Iterator[async_solipsism.EventLoop]:
24+
"""Replace the loop with one that doesn't interact with the outside world."""
25+
loop = async_solipsism.EventLoop()
26+
asyncio.set_event_loop(loop) # Set the loop as default
27+
yield loop
28+
loop.close()
29+
30+
31+
async def test_actors_started(mocker: MockerFixture) -> None:
32+
"""Test that the datasourcing, resampling and power distributing actors are started."""
33+
34+
datapipeline = _DataPipeline(
35+
resampler_config=ResamplerConfig(resampling_period=timedelta(seconds=1))
36+
)
37+
await asyncio.sleep(1)
38+
39+
# pylint: disable=protected-access
40+
assert datapipeline._data_sourcing_actor is None
41+
assert datapipeline._resampling_actor is None
42+
assert datapipeline._power_distributing_actor is None
43+
44+
datapipeline.logical_meter()
45+
46+
assert datapipeline._data_sourcing_actor is not None
47+
assert datapipeline._data_sourcing_actor.actor is not None
48+
await asyncio.sleep(1)
49+
assert datapipeline._data_sourcing_actor.actor.is_running
50+
51+
assert datapipeline._resampling_actor is not None
52+
assert datapipeline._resampling_actor.actor is not None
53+
assert datapipeline._resampling_actor.actor.is_running
54+
55+
assert datapipeline._power_distributing_actor is None
56+
57+
mock_client = MockMicrogridClient(
58+
set(
59+
[
60+
Component(1, ComponentCategory.GRID),
61+
Component(4, ComponentCategory.INVERTER, InverterType.BATTERY),
62+
Component(15, ComponentCategory.BATTERY),
63+
]
64+
),
65+
connections=set([Connection(1, 4), Connection(4, 15)]),
66+
)
67+
mock_client.initialize(mocker)
68+
69+
datapipeline.battery_pool()
70+
71+
assert datapipeline._power_distributing_actor is not None
72+
await asyncio.sleep(1)
73+
assert datapipeline._power_distributing_actor.is_running
74+
75+
await datapipeline._stop()

0 commit comments

Comments
 (0)