Skip to content

Commit ac472c8

Browse files
Add tool to stream mock component data
In our current MockMicrogrid is component data are hardcoded. It is impossible to control what component data are send and to change it for new tests. Signed-off-by: ela-kotulska-frequenz <[email protected]>
1 parent 4d9b491 commit ac472c8

File tree

1 file changed

+108
-0
lines changed

1 file changed

+108
-0
lines changed
Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
# License: MIT
2+
# Copyright © 2023 Frequenz Energy-as-a-Service GmbH
3+
4+
"""Tool for mocking streams of component data."""
5+
from __future__ import annotations
6+
7+
import asyncio
8+
from dataclasses import replace
9+
from datetime import datetime, timezone
10+
11+
from frequenz.sdk._internal.asyncio import cancel_and_await
12+
from frequenz.sdk.microgrid.component import ComponentData
13+
14+
from .mock_microgrid import MockMicrogridClient
15+
16+
17+
class MockComponentDataStreamer:
18+
"""Mock streams of component data.
19+
20+
This tool was create to:
21+
* specify what data should be send with the stream.
22+
* specify different sampling for each component data.
23+
* stop and start any stream any time.
24+
* modify the data that comes with the stream in runtime.
25+
"""
26+
27+
def __init__(self, mock_microgrid: MockMicrogridClient) -> None:
28+
"""Create class instance.
29+
30+
Args:
31+
mock_microgrid: Mock microgrid.
32+
"""
33+
self._mock_microgrid = mock_microgrid
34+
self._component_data: dict[int, ComponentData] = {}
35+
self._streaming_tasks: dict[int, asyncio.Task[None]] = {}
36+
37+
def start_streaming(
38+
self, component_data: ComponentData, sampling_rate: float
39+
) -> None:
40+
"""Start streaming this component data with given sampling rate.
41+
42+
Args:
43+
component_data: component data to be streamed.
44+
sampling_rate: sampling rate
45+
46+
Raises:
47+
RuntimeError: If component is already streaming data.
48+
"""
49+
component_id = component_data.component_id
50+
if component_id in self._streaming_tasks:
51+
raise RuntimeError("Component is already streaming")
52+
53+
self._component_data[component_id] = component_data
54+
self._streaming_tasks[component_id] = asyncio.create_task(
55+
self._stream_data(component_id, sampling_rate)
56+
)
57+
58+
def get_current_component_data(self, component_id: int) -> ComponentData:
59+
"""Get component data that are currently streamed or was streaming recently.
60+
61+
Args:
62+
component_id: component id
63+
64+
Raises:
65+
KeyError: If component never sent any data.
66+
67+
Returns:
68+
Component data.
69+
"""
70+
if component_id not in self._component_data:
71+
raise KeyError(f"Component {component_id} was never streaming data.")
72+
73+
return self._component_data[component_id]
74+
75+
def update_stream(self, new_component_data: ComponentData) -> None:
76+
"""Update component stream to send new data.
77+
78+
Component id is taken from the given component data/
79+
80+
Args:
81+
new_component_data: new component data
82+
83+
Raises:
84+
KeyError: If this component is not sending data.
85+
"""
86+
cid = new_component_data.component_id
87+
if cid not in self._streaming_tasks:
88+
raise KeyError(f"Component {cid} is not streaming data")
89+
90+
self._component_data[cid] = new_component_data
91+
92+
async def stop_streaming(self, component_id: int) -> None:
93+
"""Stop sending data from this component."""
94+
if task := self._streaming_tasks.pop(component_id, None):
95+
await cancel_and_await(task)
96+
97+
async def stop(self) -> None:
98+
"""Stop sending any data. This will close any pending async tasks."""
99+
await asyncio.gather(
100+
*[self.stop_streaming(cid) for cid in self._streaming_tasks]
101+
)
102+
103+
async def _stream_data(self, component_id: int, sampling_rate: float) -> None:
104+
while component_id in self._component_data:
105+
data = self._component_data[component_id]
106+
new_data = replace(data, timestamp=datetime.now(tz=timezone.utc))
107+
await self._mock_microgrid.send(new_data)
108+
await asyncio.sleep(sampling_rate)

0 commit comments

Comments
 (0)