Skip to content

Commit 37d499e

Browse files
Add run() method to simplify awaiting of actors completion
The manual join() of actors was causing issues with linting tools such as pylint and mypy due to the indirection created by the actor decorator. The new run() function simplifies and improves the synchronization of actors on the client side. Signed-off-by: Daniel Zullo <[email protected]>
1 parent 1f561d8 commit 37d499e

File tree

5 files changed

+36
-11
lines changed

5 files changed

+36
-11
lines changed

RELEASE_NOTES.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
## New Features
1212

1313
* A new class `SerializableRingbuffer` is now available, extending the `OrderedRingBuffer` class with the ability to load & dump the data to disk.
14+
* Add the `run(*actors)` function for running and synchronizing the execution of actors. This new function simplifies the way actors are managed on the client side, allowing for a cleaner and more streamlined approach. Users/apps can now run actors simply by calling run(actor1, actor2, actor3...) without the need to manually call join() and deal with linting errors.
1415

1516
## Bug Fixes
1617

examples/power_distribution.py

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,13 @@
1818

1919
from frequenz.channels import Bidirectional, Broadcast, Receiver, Sender
2020

21-
from frequenz.sdk import microgrid
21+
from frequenz.sdk import actor, microgrid
2222
from frequenz.sdk.actor import (
2323
ChannelRegistry,
2424
ComponentMetricRequest,
2525
ComponentMetricsResamplingActor,
2626
DataSourcingActor,
2727
ResamplerConfig,
28-
actor,
2928
)
3029
from frequenz.sdk.actor.power_distributing import (
3130
PowerDistributingActor,
@@ -42,7 +41,7 @@
4241
PORT = 61060
4342

4443

45-
@actor
44+
@actor.actor
4645
class DecisionMakingActor:
4746
"""Actor that receives set receives power for given batteries."""
4847

@@ -112,7 +111,7 @@ async def run(self) -> None:
112111
_logger.info("Set power with %d succeed.", power_to_set)
113112

114113

115-
@actor
114+
@actor.actor
116115
class DataCollectingActor:
117116
"""Actor that makes decisions about how much to charge/discharge batteries."""
118117

@@ -227,10 +226,7 @@ async def run() -> None:
227226
active_power_data=await logical_meter.grid_power(),
228227
)
229228

230-
# pylint: disable=no-member
231-
await service_actor.join() # type: ignore[attr-defined]
232-
await client_actor.join() # type: ignore[attr-defined]
233-
await power_distributor.join() # type: ignore[attr-defined]
229+
await actor.run(service_actor, client_actor, power_distributor)
234230

235231

236232
asyncio.run(run())

src/frequenz/sdk/actor/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
from ._data_sourcing import ComponentMetricRequest, DataSourcingActor
1010
from ._decorator import actor
1111
from ._resampling import ComponentMetricsResamplingActor
12+
from ._run_utils import run
1213

1314
__all__ = [
1415
"ChannelRegistry",
@@ -18,4 +19,5 @@
1819
"DataSourcingActor",
1920
"ResamplerConfig",
2021
"actor",
22+
"run",
2123
]
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# License: MIT
2+
# Copyright © 2023 Frequenz Energy-as-a-Service GmbH
3+
4+
"""Utility functions to run and synchronize the execution of actors."""
5+
6+
7+
import asyncio
8+
from typing import Any
9+
10+
from ._decorator import BaseActor
11+
12+
13+
async def run(*actors: Any) -> None:
14+
"""Await the completion of all actors.
15+
16+
Args:
17+
actors: the actors to be awaited.
18+
19+
Raises:
20+
AssertionError: if any of the actors is not an instance of BaseActor.
21+
"""
22+
# Check that each actor is an instance of BaseActor at runtime,
23+
# due to the indirection created by the actor decorator.
24+
for actor in actors:
25+
assert isinstance(actor, BaseActor), f"{actor} is not an instance of BaseActor"
26+
27+
await asyncio.gather(*(actor.join() for actor in actors))

tests/actor/test_decorator.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
from frequenz.channels import Broadcast, Receiver, Sender
66
from frequenz.channels.util import Select
77

8-
from frequenz.sdk.actor import actor
8+
from frequenz.sdk.actor import actor, run
99

1010

1111
@actor
@@ -111,5 +111,4 @@ async def test_actor_does_not_restart() -> None:
111111
)
112112

113113
await channel.new_sender().send(1)
114-
# pylint: disable=no-member
115-
await _faulty_actor.join() # type: ignore
114+
await run(_faulty_actor)

0 commit comments

Comments
 (0)