|
6 | 6 | License |
7 | 7 | MIT |
8 | 8 | """ |
9 | | -import time |
10 | | -from typing import Iterator |
11 | | - |
12 | | -import grpc |
13 | | -from frequenz.api.microgrid import microgrid_pb2 |
14 | | -from frequenz.api.microgrid.battery_pb2 import Battery |
15 | | -from frequenz.api.microgrid.battery_pb2 import Data as BatteryData |
16 | | -from frequenz.api.microgrid.common_pb2 import MetricAggregation |
17 | | -from frequenz.api.microgrid.microgrid_pb2 import ComponentData, ComponentIdParam |
| 9 | +import dataclasses |
| 10 | +from datetime import datetime, timedelta, timezone |
| 11 | + |
| 12 | +import time_machine |
18 | 13 | from frequenz.channels import Broadcast |
19 | | -from google.protobuf.timestamp_pb2 import Timestamp # pylint: disable=no-name-in-module |
20 | | -from pytest_mock import MockerFixture |
21 | 14 |
|
22 | 15 | from frequenz.sdk.actor import ChannelRegistry |
23 | | -from frequenz.sdk.actor.data_sourcing import DataSourcingActor |
24 | 16 | from frequenz.sdk.actor.resampling import ComponentMetricsResamplingActor |
25 | 17 | from frequenz.sdk.data_pipeline import ComponentMetricId, ComponentMetricRequest |
26 | | -from frequenz.sdk.microgrid import microgrid_api |
27 | | -from tests.test_microgrid import mock_api |
| 18 | +from frequenz.sdk.timeseries import Sample |
| 19 | + |
| 20 | + |
| 21 | +def _now(*, shift: float = 0.0) -> datetime: |
| 22 | + return datetime.now(timezone.utc) + timedelta(seconds=shift) |
28 | 23 |
|
29 | 24 |
|
30 | | -async def test_component_metrics_resampling_actor(mocker: MockerFixture) -> None: |
| 25 | +@time_machine.travel(0) |
| 26 | +async def test_component_metrics_resampling_actor() -> None: |
31 | 27 | """Run main functions that initializes and creates everything.""" |
32 | 28 |
|
33 | | - servicer = mock_api.MockMicrogridServicer() |
34 | | - |
35 | | - # pylint: disable=unused-argument |
36 | | - def get_component_data( |
37 | | - request: ComponentIdParam, context: grpc.ServicerContext |
38 | | - ) -> Iterator[ComponentData]: |
39 | | - """Return an iterator for mock ComponentData.""" |
40 | | - # pylint: disable=stop-iteration-return |
41 | | - |
42 | | - def next_msg(value: float) -> ComponentData: |
43 | | - timestamp = Timestamp() |
44 | | - timestamp.GetCurrentTime() |
45 | | - return ComponentData( |
46 | | - id=request.id, |
47 | | - ts=timestamp, |
48 | | - battery=Battery( |
49 | | - data=BatteryData( |
50 | | - soc=MetricAggregation(avg=value), |
51 | | - ) |
52 | | - ), |
53 | | - ) |
54 | | - |
55 | | - for value in [3, 6, 9]: |
56 | | - yield next_msg(value=value) |
57 | | - time.sleep(0.1) |
58 | | - |
59 | | - mocker.patch.object(servicer, "GetComponentData", get_component_data) |
60 | | - |
61 | | - server = mock_api.MockGrpcServer(servicer, port=57899) |
62 | | - await server.start() |
63 | | - |
64 | | - try: |
65 | | - |
66 | | - servicer.add_component(1, microgrid_pb2.ComponentCategory.COMPONENT_CATEGORY_GRID) |
67 | | - servicer.add_component( |
68 | | - 3, microgrid_pb2.ComponentCategory.COMPONENT_CATEGORY_JUNCTION |
69 | | - ) |
70 | | - servicer.add_component(4, microgrid_pb2.ComponentCategory.COMPONENT_CATEGORY_METER) |
71 | | - servicer.add_component(7, microgrid_pb2.ComponentCategory.COMPONENT_CATEGORY_METER) |
72 | | - servicer.add_component( |
73 | | - 8, microgrid_pb2.ComponentCategory.COMPONENT_CATEGORY_INVERTER |
74 | | - ) |
75 | | - servicer.add_component( |
76 | | - 9, microgrid_pb2.ComponentCategory.COMPONENT_CATEGORY_BATTERY |
77 | | - ) |
78 | | - |
79 | | - servicer.add_connection(1, 3) |
80 | | - servicer.add_connection(3, 4) |
81 | | - servicer.add_connection(3, 7) |
82 | | - servicer.add_connection(7, 8) |
83 | | - servicer.add_connection(8, 9) |
84 | | - |
85 | | - await microgrid_api.initialize("[::1]", 57899) |
86 | | - |
87 | | - channel_registry = ChannelRegistry(name="Microgrid Channel Registry") |
88 | | - |
89 | | - data_source_request_channel = Broadcast[ComponentMetricRequest]( |
90 | | - "Data Source Request Channel" |
91 | | - ) |
92 | | - data_source_request_sender = data_source_request_channel.get_sender() |
93 | | - data_source_request_receiver = data_source_request_channel.get_receiver() |
94 | | - |
95 | | - resampling_actor_request_channel = Broadcast[ComponentMetricRequest]( |
96 | | - "Resampling Actor Request Channel" |
97 | | - ) |
98 | | - resampling_actor_request_sender = resampling_actor_request_channel.get_sender() |
99 | | - resampling_actor_request_receiver = resampling_actor_request_channel.get_receiver() |
100 | | - |
101 | | - DataSourcingActor( |
102 | | - request_receiver=data_source_request_receiver, registry=channel_registry |
103 | | - ) |
104 | | - |
105 | | - ComponentMetricsResamplingActor( |
106 | | - channel_registry=channel_registry, |
107 | | - subscription_sender=data_source_request_sender, |
108 | | - subscription_receiver=resampling_actor_request_receiver, |
109 | | - resampling_period_s=0.1, |
110 | | - ) |
111 | | - |
112 | | - subscription_request = ComponentMetricRequest( |
113 | | - namespace="Resampling", |
114 | | - component_id=9, |
115 | | - metric_id=ComponentMetricId.SOC, |
116 | | - start_time=None, |
117 | | - ) |
118 | | - |
119 | | - await resampling_actor_request_sender.send(subscription_request) |
120 | | - |
121 | | - index = 0 |
122 | | - expected_sample_values = [ |
123 | | - 3.0, |
124 | | - 4.5, |
125 | | - 6.0, |
126 | | - 7.5, |
127 | | - 9.0, |
128 | | - None, |
129 | | - None, |
130 | | - None, |
131 | | - ] |
132 | | - |
133 | | - async for sample in channel_registry.get_receiver( |
134 | | - subscription_request.get_channel_name() |
135 | | - ): |
136 | | - assert sample.value == expected_sample_values[index] |
137 | | - index += 1 |
138 | | - if index >= len(expected_sample_values): |
139 | | - break |
140 | | - |
141 | | - finally: |
142 | | - await server.stop(0.1) |
143 | | - microgrid_api._MICROGRID_API = None # pylint: disable=protected-access |
| 29 | + channel_registry = ChannelRegistry(name="test") |
| 30 | + data_source_req_chan = Broadcast[ComponentMetricRequest]("data-source-req") |
| 31 | + data_source_req_recv = data_source_req_chan.get_receiver() |
| 32 | + resampling_req_chan = Broadcast[ComponentMetricRequest]("resample-req") |
| 33 | + resampling_req_sender = resampling_req_chan.get_sender() |
| 34 | + |
| 35 | + resampling_actor = ComponentMetricsResamplingActor( |
| 36 | + channel_registry=channel_registry, |
| 37 | + subscription_sender=data_source_req_chan.get_sender(), |
| 38 | + subscription_receiver=resampling_req_chan.get_receiver(), |
| 39 | + resampling_period_s=0.2, |
| 40 | + max_data_age_in_periods=2, |
| 41 | + ) |
| 42 | + |
| 43 | + subs_req = ComponentMetricRequest( |
| 44 | + namespace="Resampling", |
| 45 | + component_id=9, |
| 46 | + metric_id=ComponentMetricId.SOC, |
| 47 | + start_time=None, |
| 48 | + ) |
| 49 | + |
| 50 | + await resampling_req_sender.send(subs_req) |
| 51 | + data_source_req = await data_source_req_recv.receive() |
| 52 | + assert data_source_req is not None |
| 53 | + assert data_source_req == dataclasses.replace(subs_req, namespace="Source") |
| 54 | + |
| 55 | + timeseries_receiver = channel_registry.get_receiver(subs_req.get_channel_name()) |
| 56 | + timeseries_sender = channel_registry.get_sender(data_source_req.get_channel_name()) |
| 57 | + |
| 58 | + new_sample = await timeseries_receiver.receive() # At ~0.2s (timer) |
| 59 | + assert new_sample is not None |
| 60 | + assert new_sample.value is None |
| 61 | + |
| 62 | + sample = Sample(_now(shift=0.1), 3) # ts = ~0.3s |
| 63 | + await timeseries_sender.send(sample) |
| 64 | + new_sample = await timeseries_receiver.receive() # At ~0.4s (timer) |
| 65 | + assert new_sample is not None |
| 66 | + assert new_sample.value == 3 |
| 67 | + assert new_sample.timestamp >= sample.timestamp |
| 68 | + |
| 69 | + sample = Sample(_now(shift=0.05), 4) # ts = ~0.45s |
| 70 | + await timeseries_sender.send(sample) |
| 71 | + new_sample = await timeseries_receiver.receive() # At ~0.6s (timer) |
| 72 | + assert new_sample is not None |
| 73 | + assert new_sample.value == 3.5 # avg(3, 4) |
| 74 | + assert new_sample.timestamp >= sample.timestamp |
| 75 | + |
| 76 | + await timeseries_sender.send(Sample(_now(shift=0.05), 8)) # ts = ~0.65s |
| 77 | + await timeseries_sender.send(Sample(_now(shift=0.1), 1)) # ts = ~0.7s |
| 78 | + sample = Sample(_now(shift=0.15), 9) # ts = ~0.75s |
| 79 | + await timeseries_sender.send(sample) |
| 80 | + new_sample = await timeseries_receiver.receive() # At ~0.8s (timer) |
| 81 | + assert new_sample is not None |
| 82 | + assert new_sample.value == 5.5 # avg(4, 8, 1, 9) |
| 83 | + assert new_sample.timestamp >= sample.timestamp |
| 84 | + |
| 85 | + # No more samples sent |
| 86 | + new_sample = await timeseries_receiver.receive() # At ~1.0s (timer) |
| 87 | + assert new_sample is not None |
| 88 | + assert new_sample.value == 6 # avg(8, 1, 9) |
| 89 | + assert new_sample.timestamp >= sample.timestamp |
| 90 | + |
| 91 | + # No more samples sent |
| 92 | + new_sample = await timeseries_receiver.receive() # At ~1.2s (timer) |
| 93 | + assert new_sample is not None |
| 94 | + assert new_sample.value is None |
| 95 | + |
| 96 | + await resampling_actor._stop() # type: ignore # pylint: disable=no-member, protected-access |
0 commit comments