Skip to content

Commit b4285eb

Browse files
committed
Support the Actor class in actor.run()
When actors deriving from `Actor` are passed to `actor.run()`, they will be started and awaited to finish too. Signed-off-by: Leandro Lucarella <[email protected]>
1 parent 573a71a commit b4285eb

File tree

2 files changed

+20
-15
lines changed

2 files changed

+20
-15
lines changed

src/frequenz/sdk/actor/_run_utils.py

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import logging
99
from typing import Any
1010

11+
from ._actor import Actor
1112
from ._decorator import BaseActor
1213

1314
_logger = logging.getLogger(__name__)
@@ -22,14 +23,20 @@ async def run(*actors: Any) -> None:
2223
Raises:
2324
AssertionError: if any of the actors is not an instance of BaseActor.
2425
"""
25-
# Check that each actor is an instance of BaseActor at runtime,
26-
# due to the indirection created by the actor decorator.
27-
for actor in actors:
28-
assert isinstance(actor, BaseActor), f"{actor} is not an instance of BaseActor"
26+
pending_tasks: set[asyncio.Task[Any]] = set()
2927

30-
pending_tasks = set()
28+
# Check that each actor is an instance of BaseActor or Actor at runtime,
29+
# due to the indirection created by the actor decorator.
3130
for actor in actors:
32-
pending_tasks.add(asyncio.create_task(actor.join(), name=str(actor)))
31+
if isinstance(actor, Actor):
32+
await actor.start()
33+
awaitable = actor.wait()
34+
else:
35+
assert isinstance(
36+
actor, BaseActor
37+
), f"{actor} is not an instance of BaseActor or Actor"
38+
awaitable = actor.join() # type: ignore
39+
pending_tasks.add(asyncio.create_task(awaitable, name=str(actor)))
3340

3441
# Currently the actor decorator manages the life-cycle of the actor tasks
3542
while pending_tasks:

tests/actor/test_run_utils.py

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
import pytest
1212
import time_machine
1313

14-
from frequenz.sdk.actor import actor, run
14+
from frequenz.sdk.actor import Actor, run
1515

1616

1717
# Setting 'autouse' has no effect as this method replaces the event loop for all tests in the file.
@@ -30,8 +30,7 @@ def fake_time() -> Iterator[time_machine.Coordinates]:
3030
yield traveller
3131

3232

33-
@actor
34-
class FaultyActor:
33+
class FaultyActor(Actor):
3534
"""A test faulty actor."""
3635

3736
def __init__(self, name: str) -> None:
@@ -40,10 +39,10 @@ def __init__(self, name: str) -> None:
4039
Args:
4140
name: the name of the faulty actor.
4241
"""
43-
self.name = name
42+
super().__init__(name=name)
4443
self.is_cancelled = False
4544

46-
async def run(self) -> None:
45+
async def _run(self) -> None:
4746
"""Run the faulty actor.
4847
4948
Raises:
@@ -53,8 +52,7 @@ async def run(self) -> None:
5352
raise asyncio.CancelledError(f"Faulty Actor {self.name} failed")
5453

5554

56-
@actor
57-
class SleepyActor:
55+
class SleepyActor(Actor):
5856
"""A test actor that sleeps a short time."""
5957

6058
def __init__(self, name: str, sleep_duration: float) -> None:
@@ -64,11 +62,11 @@ def __init__(self, name: str, sleep_duration: float) -> None:
6462
name: the name of the sleepy actor.
6563
sleep_duration: the virtual duration to sleep while running.
6664
"""
67-
self.name = name
65+
super().__init__(name=name)
6866
self.sleep_duration = sleep_duration
6967
self.is_joined = False
7068

71-
async def run(self) -> None:
69+
async def _run(self) -> None:
7270
"""Run the sleepy actor."""
7371
while time.time() < self.sleep_duration:
7472
await asyncio.sleep(0.1)

0 commit comments

Comments
 (0)