Skip to content

Commit 37ae22c

Browse files
authored
DataPipeline: Fix resampling/ds/power actors not started (#603)
2 parents bf8998c + a96e11c commit 37ae22c

File tree

9 files changed

+117
-65
lines changed

9 files changed

+117
-65
lines changed

RELEASE_NOTES.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ This release replaces the `@actor` decorator with a new `Actor` class.
7171
await run(actor, other_actor) # Start and await for all the actors
7272
```
7373

74-
- The `MovingWindow` is now a `BackgroundService`, so it needs to be started manually with `await window.start()`. It is recommended to use it as an `async` context manager if possible though:
74+
- The `MovingWindow` is now a `BackgroundService`, so it needs to be started manually with `window.start()`. It is recommended to use it as an `async` context manager if possible though:
7575

7676
```python
7777
async with MovingWindow(...) as window:
@@ -80,7 +80,7 @@ This release replaces the `@actor` decorator with a new `Actor` class.
8080
# The moving window is stopped here
8181
```
8282

83-
- The base actors (`ConfigManagingActor`, `ComponentMetricsResamplingActor`, `DataSourcingActor`, `PowerDistributingActor`) now inherit from the new `Actor` class, if you are using them directly, you need to start them manually with `await actor.start()` and you might need to do some other adjustments.
83+
- The base actors (`ConfigManagingActor`, `ComponentMetricsResamplingActor`, `DataSourcingActor`, `PowerDistributingActor`) now inherit from the new `Actor` class, if you are using them directly, you need to start them manually with `actor.start()` and you might need to do some other adjustments.
8484

8585
- The `BatteryPool.power_distribution_results` method has been enhanced to provide power distribution results in the form of `Power` objects, replacing the previous use of `float` values.
8686

src/frequenz/sdk/actor/_actor.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,7 @@ async def _run(self) -> None:
149149
This is mostly used for testing purposes and shouldn't be set in production.
150150
"""
151151

152-
async def start(self) -> None:
152+
def start(self) -> None:
153153
"""Start this actor.
154154
155155
If this actor is already running, this method does nothing.
@@ -172,7 +172,7 @@ async def _run_loop(self) -> None:
172172
the maximum number of restarts.
173173
BaseException: If this actor's `_run()` raises any other `BaseException`.
174174
"""
175-
_logger.info("Actor %s: Starting...", self)
175+
_logger.info("Actor %s: Started.", self)
176176
n_restarts = 0
177177
while True:
178178
try:

src/frequenz/sdk/actor/_background_service.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ def __init__(self, resolution_s: float, *, name: str | None = None) -> None:
4646
super().__init__(name=name)
4747
self._resolution_s = resolution_s
4848
49-
async def start(self) -> None:
49+
def start(self) -> None:
5050
self._tasks.add(asyncio.create_task(self._tick()))
5151
5252
async def _tick(self) -> None:
@@ -61,7 +61,7 @@ async def main() -> None:
6161
6262
# Manual start/stop (only use if necessary, as cleanup is more complicated)
6363
clock = Clock(resolution_s=1)
64-
await clock.start()
64+
clock.start()
6565
await asyncio.sleep(5)
6666
await clock.stop()
6767
@@ -80,7 +80,7 @@ def __init__(self, *, name: str | None = None) -> None:
8080
self._tasks: set[asyncio.Task[Any]] = set()
8181

8282
@abc.abstractmethod
83-
async def start(self) -> None:
83+
def start(self) -> None:
8484
"""Start this background service."""
8585

8686
@property
@@ -166,7 +166,7 @@ async def __aenter__(self) -> Self:
166166
Returns:
167167
This background service.
168168
"""
169-
await self.start()
169+
self.start()
170170
return self
171171

172172
async def __aexit__(

src/frequenz/sdk/actor/_run_utils.py

Lines changed: 13 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -19,26 +19,16 @@ async def run(*actors: Actor) -> None:
1919
actors: the actors to be awaited.
2020
"""
2121
_logger.info("Starting %s actor(s)...", len(actors))
22-
await _wait_tasks(
23-
set(asyncio.create_task(a.start(), name=str(a)) for a in actors),
24-
"starting",
25-
"started",
26-
)
2722

28-
# Wait until all actors are done
29-
await _wait_tasks(
30-
set(asyncio.create_task(a.wait(), name=str(a)) for a in actors),
31-
"running",
32-
"finished",
33-
)
34-
35-
_logger.info("All %s actor(s) finished.", len(actors))
23+
for actor in actors:
24+
if actor.is_running:
25+
_logger.info("Actor %s: Already running, skipping start.", actor)
26+
else:
27+
_logger.info("Actor %s: Starting...", actor)
28+
actor.start()
3629

37-
38-
async def _wait_tasks(
39-
tasks: set[asyncio.Task[None]], error_str: str, success_str: str
40-
) -> None:
41-
pending_tasks = tasks
30+
# Wait until all actors are done
31+
pending_tasks = set(asyncio.create_task(a.wait(), name=str(a)) for a in actors)
4232
while pending_tasks:
4333
done_tasks, pending_tasks = await asyncio.wait(
4434
pending_tasks, return_when=asyncio.FIRST_COMPLETED
@@ -49,19 +39,14 @@ async def _wait_tasks(
4939
# Cancellation needs to be checked first, otherwise the other methods
5040
# could raise a CancelledError
5141
if task.cancelled():
52-
_logger.info(
53-
"Actor %s: Cancelled while %s.",
54-
task.get_name(),
55-
error_str,
56-
)
42+
_logger.info("Actor %s: Cancelled while running.", task.get_name())
5743
elif exception := task.exception():
5844
_logger.error(
59-
"Actor %s: Raised an exception while %s.",
45+
"Actor %s: Raised an exception while running.",
6046
task.get_name(),
61-
error_str,
6247
exc_info=exception,
6348
)
6449
else:
65-
_logger.info(
66-
"Actor %s: %s normally.", task.get_name(), success_str.capitalize()
67-
)
50+
_logger.info("Actor %s: Finished normally.", task.get_name())
51+
52+
_logger.info("All %s actor(s) finished.", len(actors))

src/frequenz/sdk/microgrid/_data_pipeline.py

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -215,6 +215,7 @@ def _start_power_distributing_actor(self) -> None:
215215
channel_registry=self._channel_registry,
216216
battery_status_sender=self._battery_status_channel.new_sender(),
217217
)
218+
self._power_distributing_actor.start()
218219

219220
def _data_sourcing_request_sender(self) -> Sender[ComponentMetricRequest]:
220221
"""Return a Sender for sending requests to the data sourcing actor.
@@ -237,6 +238,7 @@ def _data_sourcing_request_sender(self) -> Sender[ComponentMetricRequest]:
237238
registry=self._channel_registry,
238239
)
239240
self._data_sourcing_actor = _ActorInfo(actor, channel)
241+
self._data_sourcing_actor.actor.start()
240242
return self._data_sourcing_actor.channel.new_sender()
241243

242244
def _resampling_request_sender(self) -> Sender[ComponentMetricRequest]:
@@ -262,17 +264,9 @@ def _resampling_request_sender(self) -> Sender[ComponentMetricRequest]:
262264
config=self._resampler_config,
263265
)
264266
self._resampling_actor = _ActorInfo(actor, channel)
267+
self._resampling_actor.actor.start()
265268
return self._resampling_actor.channel.new_sender()
266269

267-
async def _start(self) -> None:
268-
"""Start the data pipeline actors."""
269-
if self._data_sourcing_actor:
270-
await self._data_sourcing_actor.actor.start()
271-
if self._resampling_actor:
272-
await self._resampling_actor.actor.start()
273-
# The power distributing actor is started lazily when the first battery pool is
274-
# created.
275-
276270
async def _stop(self) -> None:
277271
"""Stop the data pipeline actors."""
278272
if self._data_sourcing_actor:
@@ -300,7 +294,6 @@ async def initialize(resampler_config: ResamplerConfig) -> None:
300294
if _DATA_PIPELINE is not None:
301295
raise RuntimeError("DataPipeline is already initialized.")
302296
_DATA_PIPELINE = _DataPipeline(resampler_config)
303-
await _DATA_PIPELINE._start() # pylint: disable=protected-access
304297

305298

306299
def logical_meter() -> LogicalMeter:

src/frequenz/sdk/timeseries/_moving_window.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -190,7 +190,7 @@ def __init__( # pylint: disable=too-many-arguments
190190
align_to=align_to,
191191
)
192192

193-
async def start(self) -> None:
193+
def start(self) -> None:
194194
"""Start the MovingWindow.
195195
196196
This method starts the MovingWindow tasks.

tests/actor/test_actor.py

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -175,7 +175,7 @@ async def test_basic_actor(caplog: pytest.LogCaptureFixture) -> None:
175175
original_tasks = set(actor.tasks)
176176

177177
# Start is a no-op if already started
178-
await actor.start()
178+
actor.start()
179179
assert actor.is_running is True
180180
assert original_tasks == set(actor.tasks)
181181

@@ -192,7 +192,7 @@ async def test_basic_actor(caplog: pytest.LogCaptureFixture) -> None:
192192
assert actor.is_running is False
193193
assert BaseTestActor.restart_count == 0
194194
assert caplog.record_tuples == [
195-
(*ACTOR_INFO, "Actor EchoActor[EchoActor]: Starting..."),
195+
(*ACTOR_INFO, "Actor EchoActor[EchoActor]: Started."),
196196
(*ACTOR_INFO, "Actor EchoActor[EchoActor]: Cancelled."),
197197
]
198198

@@ -222,12 +222,14 @@ async def test_restart_on_unhandled_exception(
222222
await channel.new_sender().send(i)
223223

224224
await run(actor)
225+
await actor.wait()
225226

226227
assert actor.is_running is False
227228
assert BaseTestActor.restart_count == restart_limit
228229
expected_log = [
229230
(*RUN_INFO, "Starting 1 actor(s)..."),
230-
(*ACTOR_INFO, "Actor RaiseExceptionActor[test]: Starting..."),
231+
(*RUN_INFO, "Actor RaiseExceptionActor[test]: Starting..."),
232+
(*ACTOR_INFO, "Actor RaiseExceptionActor[test]: Started."),
231233
]
232234
for i in range(restart_limit):
233235
expected_log.extend(
@@ -253,10 +255,6 @@ async def test_restart_on_unhandled_exception(
253255
"Actor RaiseExceptionActor[test]: Maximum restarts attempted "
254256
f"({restart_limit}/{restart_limit}), bailing out...",
255257
),
256-
(
257-
*RUN_INFO,
258-
"Actor RaiseExceptionActor[test]: Started normally.",
259-
),
260258
(
261259
*RUN_ERROR,
262260
"Actor RaiseExceptionActor[test]: Raised an exception while running.",
@@ -286,10 +284,10 @@ async def test_does_not_restart_on_normal_exit(
286284
assert BaseTestActor.restart_count == 0
287285
assert caplog.record_tuples == [
288286
(*RUN_INFO, "Starting 1 actor(s)..."),
289-
(*ACTOR_INFO, "Actor NopActor[test]: Starting..."),
287+
(*RUN_INFO, "Actor NopActor[test]: Starting..."),
288+
(*ACTOR_INFO, "Actor NopActor[test]: Started."),
290289
(*ACTOR_INFO, "Actor NopActor[test]: _run() returned without error."),
291290
(*ACTOR_INFO, "Actor NopActor[test]: Stopped."),
292-
(*RUN_INFO, "Actor NopActor[test]: Started normally."),
293291
(*RUN_INFO, "Actor NopActor[test]: Finished normally."),
294292
(*RUN_INFO, "All 1 actor(s) finished."),
295293
]
@@ -319,9 +317,9 @@ async def test_does_not_restart_on_base_exception(
319317
assert BaseTestActor.restart_count == 0
320318
assert caplog.record_tuples == [
321319
(*RUN_INFO, "Starting 1 actor(s)..."),
322-
(*ACTOR_INFO, "Actor RaiseBaseExceptionActor[test]: Starting..."),
320+
(*RUN_INFO, "Actor RaiseBaseExceptionActor[test]: Starting..."),
321+
(*ACTOR_INFO, "Actor RaiseBaseExceptionActor[test]: Started."),
323322
(*ACTOR_ERROR, "Actor RaiseBaseExceptionActor[test]: Raised a BaseException."),
324-
(*RUN_INFO, "Actor RaiseBaseExceptionActor[test]: Started normally."),
325323
(
326324
*RUN_ERROR,
327325
"Actor RaiseBaseExceptionActor[test]: Raised an exception while running.",
@@ -373,8 +371,8 @@ async def cancel_actor() -> None:
373371
assert BaseTestActor.restart_count == 0
374372
assert caplog.record_tuples == [
375373
(*RUN_INFO, "Starting 1 actor(s)..."),
376-
(*ACTOR_INFO, "Actor EchoActor[EchoActor]: Starting..."),
377-
(*RUN_INFO, "Actor EchoActor[EchoActor]: Started normally."),
374+
(*RUN_INFO, "Actor EchoActor[EchoActor]: Starting..."),
375+
(*ACTOR_INFO, "Actor EchoActor[EchoActor]: Started."),
378376
(*ACTOR_INFO, "Actor EchoActor[EchoActor]: Cancelled."),
379377
(*RUN_ERROR, "Actor EchoActor[EchoActor]: Raised an exception while running."),
380378
(*RUN_INFO, "All 1 actor(s) finished."),

tests/actor/test_background_service.py

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ def __init__(
3636
self._sleep = sleep
3737
self._exc = exc
3838

39-
async def start(self) -> None:
39+
def start(self) -> None:
4040
"""Start this service."""
4141

4242
async def nop() -> None:
@@ -76,7 +76,7 @@ async def test_start_await() -> None:
7676
await fake_service.stop()
7777
assert fake_service.is_running is False
7878

79-
await fake_service.start()
79+
fake_service.start()
8080
assert fake_service.is_running is True
8181

8282
# Should stop immediately
@@ -96,7 +96,7 @@ async def test_start_stop() -> None:
9696
await fake_service.stop()
9797
assert fake_service.is_running is False
9898

99-
await fake_service.start()
99+
fake_service.start()
100100
assert fake_service.is_running is True
101101

102102
await asyncio.sleep(1.0)
@@ -119,7 +119,7 @@ async def test_start_and_crash(
119119
assert fake_service.name == "test"
120120
assert fake_service.is_running is False
121121

122-
await fake_service.start()
122+
fake_service.start()
123123
with pytest.raises(BaseExceptionGroup) as exc_info:
124124
match method:
125125
case "await":
@@ -146,7 +146,8 @@ async def test_async_context_manager() -> None:
146146
async with FakeService(name="test", sleep=1.0) as fake_service:
147147
assert fake_service.is_running is True
148148
# Is a no-op if the service is running
149-
await fake_service.start()
149+
fake_service.start()
150+
await asyncio.sleep(0)
150151
assert fake_service.is_running is True
151152

152153
assert fake_service.is_running is False
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
# License: MIT
2+
# Copyright © 2023 Frequenz Energy-as-a-Service GmbH
3+
4+
"""Basic tests for the DataPipeline."""
5+
6+
import asyncio
7+
from datetime import timedelta
8+
from typing import Iterator
9+
10+
import async_solipsism
11+
import pytest
12+
from pytest_mock import MockerFixture
13+
14+
from frequenz.sdk.microgrid._data_pipeline import _DataPipeline
15+
from frequenz.sdk.microgrid.client import Connection
16+
from frequenz.sdk.microgrid.component import Component, ComponentCategory, InverterType
17+
from frequenz.sdk.timeseries._resampling import ResamplerConfig
18+
19+
from ..utils.mock_microgrid_client import MockMicrogridClient
20+
21+
22+
@pytest.fixture
23+
def event_loop() -> Iterator[async_solipsism.EventLoop]:
24+
"""Replace the loop with one that doesn't interact with the outside world."""
25+
loop = async_solipsism.EventLoop()
26+
asyncio.set_event_loop(loop) # Set the loop as default
27+
yield loop
28+
loop.close()
29+
30+
31+
async def test_actors_started(mocker: MockerFixture) -> None:
32+
"""Test that the datasourcing, resampling and power distributing actors are started."""
33+
34+
datapipeline = _DataPipeline(
35+
resampler_config=ResamplerConfig(resampling_period=timedelta(seconds=1))
36+
)
37+
await asyncio.sleep(1)
38+
39+
# pylint: disable=protected-access
40+
assert datapipeline._data_sourcing_actor is None
41+
assert datapipeline._resampling_actor is None
42+
assert datapipeline._power_distributing_actor is None
43+
44+
datapipeline.logical_meter()
45+
46+
assert datapipeline._data_sourcing_actor is not None
47+
assert datapipeline._data_sourcing_actor.actor is not None
48+
await asyncio.sleep(1)
49+
assert datapipeline._data_sourcing_actor.actor.is_running
50+
51+
assert datapipeline._resampling_actor is not None
52+
assert datapipeline._resampling_actor.actor is not None
53+
assert datapipeline._resampling_actor.actor.is_running
54+
55+
assert datapipeline._power_distributing_actor is None
56+
57+
mock_client = MockMicrogridClient(
58+
set(
59+
[
60+
Component(1, ComponentCategory.GRID),
61+
Component(4, ComponentCategory.INVERTER, InverterType.BATTERY),
62+
Component(15, ComponentCategory.BATTERY),
63+
]
64+
),
65+
connections=set([Connection(1, 4), Connection(4, 15)]),
66+
)
67+
mock_client.initialize(mocker)
68+
69+
datapipeline.battery_pool()
70+
71+
assert datapipeline._power_distributing_actor is not None
72+
await asyncio.sleep(1)
73+
assert datapipeline._power_distributing_actor.is_running
74+
75+
await datapipeline._stop()

0 commit comments

Comments
 (0)