Skip to content

Commit 9daab41

Browse files
Add temporary sdk interface to pytest utils module
It allows to create all tools in all places and close them when not needed. This should be replaced by true sdk interface in the future. Signed-off-by: ela-kotulska-frequenz <[email protected]>
1 parent cfb9991 commit 9daab41

File tree

1 file changed

+74
-0
lines changed

1 file changed

+74
-0
lines changed

tests/utils/sdk_interface.py

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
# License: MIT
2+
# Copyright © 2023 Frequenz Energy-as-a-Service GmbH
3+
4+
"""Create all needed sdk tools. To be replaced by real sdk interface when ready."""
5+
import asyncio
6+
from typing import Any, List
7+
8+
from frequenz.channels import Broadcast
9+
10+
from frequenz.sdk.actor import (
11+
ChannelRegistry,
12+
ComponentMetricRequest,
13+
ComponentMetricsResamplingActor,
14+
DataSourcingActor,
15+
ResamplerConfig,
16+
)
17+
from frequenz.sdk.actor.power_distributing import BatteryStatus, PowerDistributingActor
18+
19+
20+
class SdkInterface:
21+
"""Sdk interface.
22+
23+
To be replaced by true sdk interface when ready.
24+
"""
25+
26+
def __init__(self, resampling_period_s: float) -> None:
27+
"""Create class instance.
28+
29+
Args:
30+
resampling_period_s: resampling period for the
31+
ComponentMetricsResamplingActor.
32+
"""
33+
# Any to be replaced with BaseActor when ready
34+
self._actors: List[Any] = []
35+
self.channel_registry = ChannelRegistry(name="Microgrid Channel Registry")
36+
37+
self.data_source_request_channel = Broadcast[ComponentMetricRequest](
38+
"Data Source Request Channel"
39+
)
40+
41+
self._actors.append(
42+
DataSourcingActor(
43+
request_receiver=self.data_source_request_channel.new_receiver(),
44+
registry=self.channel_registry,
45+
)
46+
)
47+
48+
self.resampling_actor_request_channel = Broadcast[ComponentMetricRequest](
49+
"Resampling Actor Request Channel"
50+
)
51+
52+
self._actors.append(
53+
ComponentMetricsResamplingActor(
54+
channel_registry=self.channel_registry,
55+
data_sourcing_request_sender=self.data_source_request_channel.new_sender(),
56+
resampling_request_receiver=self.resampling_actor_request_channel.new_receiver(),
57+
config=ResamplerConfig(resampling_period_s=resampling_period_s),
58+
)
59+
)
60+
61+
self.battery_status_channel = Broadcast[BatteryStatus]("batteries-status")
62+
self._power_distributing_actor = PowerDistributingActor(
63+
users_channels={},
64+
battery_status_sender=self.battery_status_channel.new_sender(),
65+
)
66+
self._actors.append(self._power_distributing_actor)
67+
68+
async def stop(self) -> None:
69+
# pylint: disable=protected-access
70+
"""Stop all async tasks and all actors."""
71+
await self._power_distributing_actor._stop_actor()
72+
await asyncio.gather(
73+
*[actor._stop() for actor in self._actors], return_exceptions=True
74+
)

0 commit comments

Comments
 (0)