Skip to content

Commit c5ddca9

Browse files
committed
Migrate PowerDistributingActor to the Actor class
Signed-off-by: Leandro Lucarella <[email protected]>
1 parent 3a266f8 commit c5ddca9

File tree

4 files changed

+326
-363
lines changed

4 files changed

+326
-363
lines changed

benchmarks/power_distribution/power_distributor.py

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -108,19 +108,16 @@ async def run_test( # pylint: disable=too-many-locals
108108
power_request_channel = Broadcast[Request]("power-request")
109109
battery_status_channel = Broadcast[BatteryStatus]("battery-status")
110110
channel_registry = ChannelRegistry(name="power_distributor")
111-
distributor = PowerDistributingActor(
111+
async with PowerDistributingActor(
112112
channel_registry=channel_registry,
113113
requests_receiver=power_request_channel.new_receiver(),
114114
battery_status_sender=battery_status_channel.new_sender(),
115-
)
116-
117-
tasks: List[Coroutine[Any, Any, List[Result]]] = []
118-
tasks.append(send_requests(batteries, num_requests))
119-
120-
result = await asyncio.gather(*tasks)
121-
exec_time = timeit.default_timer() - start
115+
):
116+
tasks: List[Coroutine[Any, Any, List[Result]]] = []
117+
tasks.append(send_requests(batteries, num_requests))
122118

123-
await distributor._stop() # type: ignore # pylint: disable=no-member, protected-access
119+
result = await asyncio.gather(*tasks)
120+
exec_time = timeit.default_timer() - start
124121

125122
summary = parse_result(result)
126123
summary["num_requests"] = num_requests

src/frequenz/sdk/actor/power_distributing/power_distributing.py

Lines changed: 38 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828

2929
from ..._internal._math import is_close_to_zero
3030
from ...actor import ChannelRegistry
31-
from ...actor._decorator import actor
31+
from ...actor._actor import Actor
3232
from ...microgrid import ComponentGraph, connection_manager
3333
from ...microgrid.client import MicrogridApiClient
3434
from ...microgrid.component import (
@@ -83,8 +83,7 @@ def has_expired(self) -> bool:
8383
return time.monotonic_ns() >= self.expiry_time
8484

8585

86-
@actor
87-
class PowerDistributingActor:
86+
class PowerDistributingActor(Actor):
8887
# pylint: disable=too-many-instance-attributes
8988
"""Actor to distribute the power between batteries in a microgrid.
9089
@@ -125,7 +124,6 @@ class PowerDistributingActor:
125124
)
126125
from frequenz.channels import Broadcast, Receiver, Sender
127126
from datetime import timedelta
128-
from frequenz.sdk import actor
129127
130128
HOST = "localhost"
131129
PORT = 50051
@@ -145,38 +143,37 @@ class PowerDistributingActor:
145143
146144
channel = Broadcast[Request]("power_distributor")
147145
channel_registry = ChannelRegistry(name="power_distributor")
148-
power_distributor = PowerDistributingActor(
146+
async with PowerDistributingActor(
149147
requests_receiver=channel.new_receiver(),
150148
channel_registry=channel_registry,
151149
battery_status_sender=battery_status_channel.new_sender(),
152-
)
153-
154-
sender = channel.new_sender()
155-
namespace: str = "namespace"
156-
# Set power 1200W to given batteries.
157-
request = Request(
158-
namespace=namespace,
159-
power=1200.0,
160-
batteries=batteries_ids,
161-
request_timeout_sec=10.0
162-
)
163-
await sender.send(request)
164-
result_rx = channel_registry.new_receiver(namespace)
165-
166-
# It is recommended to use timeout when waiting for the response!
167-
result: Result = await asyncio.wait_for(result_rx.receive(), timeout=10)
168-
169-
if isinstance(result, Success):
170-
print("Command succeed")
171-
elif isinstance(result, PartialFailure):
172-
print(
173-
f"Batteries {result.failed_batteries} failed, total failed power" \
174-
f"{result.failed_power}"
150+
):
151+
sender = channel.new_sender()
152+
namespace: str = "namespace"
153+
# Set power 1200W to given batteries.
154+
request = Request(
155+
namespace=namespace,
156+
power=1200.0,
157+
batteries=batteries_ids,
158+
request_timeout_sec=10.0
175159
)
176-
elif isinstance(result, Ignored):
177-
print("Request was ignored, because of newer request")
178-
elif isinstance(result, Error):
179-
print(f"Request failed with error: {result.msg}")
160+
await sender.send(request)
161+
result_rx = channel_registry.new_receiver(namespace)
162+
163+
# It is recommended to use timeout when waiting for the response!
164+
result: Result = await asyncio.wait_for(result_rx.receive(), timeout=10)
165+
166+
if isinstance(result, Success):
167+
print("Command succeed")
168+
elif isinstance(result, PartialFailure):
169+
print(
170+
f"Batteries {result.failed_batteries} failed, total failed power" \
171+
f"{result.failed_power}"
172+
)
173+
elif isinstance(result, Ignored):
174+
print("Request was ignored, because of newer request")
175+
elif isinstance(result, Error):
176+
print(f"Request failed with error: {result.msg}")
180177
```
181178
"""
182179

@@ -198,6 +195,7 @@ def __init__(
198195
wait_for_data_sec: How long actor should wait before processing first
199196
request. It is a time needed to collect first components data.
200197
"""
198+
super().__init__()
201199
self._requests_receiver = requests_receiver
202200
self._channel_registry = channel_registry
203201
self._wait_for_data_sec = wait_for_data_sec
@@ -288,7 +286,7 @@ async def _send_result(self, namespace: str, result: Result) -> None:
288286

289287
await self._result_senders[namespace].send(result)
290288

291-
async def run(self) -> None:
289+
async def _run(self) -> None:
292290
"""Run actor main function.
293291
294292
It waits for new requests in task_queue and process it, and send
@@ -758,7 +756,11 @@ async def _cancel_tasks(self, tasks: Iterable[asyncio.Task[Any]]) -> None:
758756

759757
await asyncio.gather(*tasks, return_exceptions=True)
760758

761-
async def _stop_actor(self) -> None:
762-
"""Stop all running async tasks."""
759+
async def stop(self, msg: str | None = None) -> None:
760+
"""Stop this actor.
761+
762+
Args:
763+
msg: The message to be passed to the tasks being cancelled.
764+
"""
763765
await self._all_battery_status.stop()
764-
await self._stop() # type: ignore # pylint: disable=no-member
766+
await super().stop(msg)

src/frequenz/sdk/microgrid/_data_pipeline.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -270,13 +270,17 @@ async def _start(self) -> None:
270270
await self._data_sourcing_actor.actor.start()
271271
if self._resampling_actor:
272272
await self._resampling_actor.actor.start()
273+
# The power distributing actor is started lazily when the first battery pool is
274+
# created.
273275

274276
async def _stop(self) -> None:
275277
"""Stop the data pipeline actors."""
276278
if self._data_sourcing_actor:
277279
await self._data_sourcing_actor.actor.stop()
278280
if self._resampling_actor:
279281
await self._resampling_actor.actor.stop()
282+
if self._power_distributing_actor:
283+
await self._power_distributing_actor.stop()
280284

281285

282286
_DATA_PIPELINE: _DataPipeline | None = None

0 commit comments

Comments
 (0)