Skip to content

Commit b847abe

Browse files
authored
Miscelaneous improvements and fixes (#953)
- **tests: Remove unnecessary class wrapper** - **Update sandbox port to the newest sandbox** - **Fix invalid `Power` constructor** - **Log unhandled exceptions in the Microgrid API source**
2 parents ca89486 + e541ad3 commit b847abe

File tree

4 files changed

+128
-123
lines changed

4 files changed

+128
-123
lines changed

benchmarks/power_distribution/power_distributor.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030
from frequenz.sdk.timeseries._quantities import Power
3131

3232
HOST = "microgrid.sandbox.api.frequenz.io"
33-
PORT = 61060
33+
PORT = 62060
3434

3535

3636
# TODO: this send_requests function uses the battery pool to # pylint: disable=fixme
@@ -55,7 +55,7 @@ async def send_requests(batteries: set[int], request_num: int) -> list[Result]:
5555
result: list[Any] = []
5656
for _ in range(request_num):
5757
await battery_pool.propose_power(
58-
Power(float(random.randrange(100000, 1000000)))
58+
Power.from_watts(float(random.randrange(100000, 1000000)))
5959
)
6060
try:
6161
output = await asyncio.wait_for(results_rx.receive(), timeout=3)

examples/battery_pool.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
from frequenz.sdk.actor import ResamplerConfig
1515

1616
HOST = "microgrid.sandbox.api.frequenz.io" # it should be the host name.
17-
PORT = 61060
17+
PORT = 62060
1818

1919

2020
async def main() -> None:

src/frequenz/sdk/actor/_data_sourcing/microgrid_api_source.py

Lines changed: 53 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -383,48 +383,62 @@ async def _handle_data_stream(
383383
Args:
384384
comp_id: Id of the requested component.
385385
category: The category of the component.
386+
387+
Raises:
388+
Exception: if an error occurs while handling the data stream.
386389
"""
387-
stream_senders = []
388-
if comp_id in self._req_streaming_metrics:
389-
await self._check_requested_component_and_metrics(
390-
comp_id, category, self._req_streaming_metrics[comp_id]
390+
try:
391+
stream_senders = []
392+
if comp_id in self._req_streaming_metrics:
393+
await self._check_requested_component_and_metrics(
394+
comp_id, category, self._req_streaming_metrics[comp_id]
395+
)
396+
stream_senders = self._get_metric_senders(
397+
category, self._req_streaming_metrics[comp_id]
398+
)
399+
api_data_receiver: Receiver[Any] = self.comp_data_receivers[comp_id]
400+
401+
senders_done: asyncio.Event = asyncio.Event()
402+
pending_messages = 0
403+
404+
def process_msg(data: Any) -> None:
405+
tasks = []
406+
for extractor, senders in stream_senders:
407+
for sender in senders:
408+
tasks.append(
409+
sender.send(
410+
Sample(data.timestamp, Quantity(extractor(data)))
411+
)
412+
)
413+
asyncio.gather(*tasks)
414+
nonlocal pending_messages
415+
pending_messages -= 1
416+
if pending_messages == 0:
417+
senders_done.set()
418+
419+
async for data in api_data_receiver:
420+
pending_messages += 1
421+
senders_done.clear()
422+
process_msg(data)
423+
424+
while pending_messages > 0:
425+
await senders_done.wait()
426+
427+
await asyncio.gather(
428+
*[
429+
self._registry.close_and_remove(r.get_channel_name())
430+
for requests in self._req_streaming_metrics[comp_id].values()
431+
for r in requests
432+
]
391433
)
392-
stream_senders = self._get_metric_senders(
393-
category, self._req_streaming_metrics[comp_id]
434+
except Exception:
435+
_logger.exception(
436+
"Unexpected error while handling data stream for component %d (%s), "
437+
"component data is not being streamed anymore",
438+
comp_id,
439+
category.name,
394440
)
395-
api_data_receiver: Receiver[Any] = self.comp_data_receivers[comp_id]
396-
397-
senders_done: asyncio.Event = asyncio.Event()
398-
pending_messages = 0
399-
400-
def process_msg(data: Any) -> None:
401-
tasks = []
402-
for extractor, senders in stream_senders:
403-
for sender in senders:
404-
tasks.append(
405-
sender.send(Sample(data.timestamp, Quantity(extractor(data))))
406-
)
407-
asyncio.gather(*tasks)
408-
nonlocal pending_messages
409-
pending_messages -= 1
410-
if pending_messages == 0:
411-
senders_done.set()
412-
413-
async for data in api_data_receiver:
414-
pending_messages += 1
415-
senders_done.clear()
416-
process_msg(data)
417-
418-
while pending_messages > 0:
419-
await senders_done.wait()
420-
421-
await asyncio.gather(
422-
*[
423-
self._registry.close_and_remove(r.get_channel_name())
424-
for requests in self._req_streaming_metrics[comp_id].values()
425-
for r in requests
426-
]
427-
)
441+
raise
428442

429443
async def _update_streams(
430444
self,

tests/actor/test_data_sourcing.py

Lines changed: 72 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -19,90 +19,81 @@
1919
# pylint: disable=no-member
2020

2121

22-
class TestDataSourcingActor:
22+
async def test_data_sourcing_actor() -> None:
2323
"""Tests for the DataSourcingActor."""
24-
25-
async def test_data_sourcing_actor(self) -> None:
26-
"""Tests for the DataSourcingActor."""
27-
servicer = mock_api.MockMicrogridServicer()
28-
server = mock_api.MockGrpcServer(servicer, port=57899)
29-
await server.start()
30-
31-
servicer.add_component(
32-
1, components_pb.ComponentCategory.COMPONENT_CATEGORY_GRID
24+
servicer = mock_api.MockMicrogridServicer()
25+
server = mock_api.MockGrpcServer(servicer, port=57899)
26+
await server.start()
27+
28+
servicer.add_component(1, components_pb.ComponentCategory.COMPONENT_CATEGORY_GRID)
29+
servicer.add_component(4, components_pb.ComponentCategory.COMPONENT_CATEGORY_METER)
30+
servicer.add_component(7, components_pb.ComponentCategory.COMPONENT_CATEGORY_METER)
31+
servicer.add_component(
32+
8, components_pb.ComponentCategory.COMPONENT_CATEGORY_INVERTER
33+
)
34+
servicer.add_component(
35+
9, components_pb.ComponentCategory.COMPONENT_CATEGORY_BATTERY
36+
)
37+
38+
servicer.add_connection(1, 4)
39+
servicer.add_connection(1, 7)
40+
servicer.add_connection(7, 8)
41+
servicer.add_connection(8, 9)
42+
43+
await connection_manager.initialize("[::1]", 57899)
44+
45+
req_chan = Broadcast[ComponentMetricRequest](name="data_sourcing_requests")
46+
req_sender = req_chan.new_sender()
47+
48+
registry = ChannelRegistry(name="test-registry")
49+
50+
async with DataSourcingActor(req_chan.new_receiver(), registry):
51+
active_power_request = ComponentMetricRequest(
52+
"test-namespace", 4, ComponentMetricId.ACTIVE_POWER, None
3353
)
34-
servicer.add_component(
35-
4, components_pb.ComponentCategory.COMPONENT_CATEGORY_METER
54+
active_power_recv = registry.get_or_create(
55+
Sample[Quantity], active_power_request.get_channel_name()
56+
).new_receiver()
57+
await req_sender.send(active_power_request)
58+
59+
reactive_power_request = ComponentMetricRequest(
60+
"test-namespace", 4, ComponentMetricId.REACTIVE_POWER, None
3661
)
37-
servicer.add_component(
38-
7, components_pb.ComponentCategory.COMPONENT_CATEGORY_METER
62+
_ = registry.get_or_create(
63+
Sample[Quantity], reactive_power_request.get_channel_name()
64+
).new_receiver()
65+
await req_sender.send(reactive_power_request)
66+
67+
soc_request = ComponentMetricRequest(
68+
"test-namespace", 9, ComponentMetricId.SOC, None
3969
)
40-
servicer.add_component(
41-
8, components_pb.ComponentCategory.COMPONENT_CATEGORY_INVERTER
70+
soc_recv = registry.get_or_create(
71+
Sample[Quantity], soc_request.get_channel_name()
72+
).new_receiver()
73+
await req_sender.send(soc_request)
74+
75+
soc2_request = ComponentMetricRequest(
76+
"test-namespace", 9, ComponentMetricId.SOC, None
4277
)
43-
servicer.add_component(
44-
9, components_pb.ComponentCategory.COMPONENT_CATEGORY_BATTERY
78+
soc2_recv = registry.get_or_create(
79+
Sample[Quantity], soc2_request.get_channel_name()
80+
).new_receiver()
81+
await req_sender.send(soc2_request)
82+
83+
for _ in range(3):
84+
sample = await soc_recv.receive()
85+
assert sample.value is not None
86+
assert 9.0 == sample.value.base_value
87+
88+
sample = await soc2_recv.receive()
89+
assert sample.value is not None
90+
assert 9.0 == sample.value.base_value
91+
92+
sample = await active_power_recv.receive()
93+
assert sample.value is not None
94+
assert 100.0 == sample.value.base_value
95+
96+
assert await server.graceful_shutdown()
97+
connection_manager._CONNECTION_MANAGER = ( # pylint: disable=protected-access
98+
None
4599
)
46-
47-
servicer.add_connection(1, 4)
48-
servicer.add_connection(1, 7)
49-
servicer.add_connection(7, 8)
50-
servicer.add_connection(8, 9)
51-
52-
await connection_manager.initialize("[::1]", 57899)
53-
54-
req_chan = Broadcast[ComponentMetricRequest](name="data_sourcing_requests")
55-
req_sender = req_chan.new_sender()
56-
57-
registry = ChannelRegistry(name="test-registry")
58-
59-
async with DataSourcingActor(req_chan.new_receiver(), registry):
60-
active_power_request = ComponentMetricRequest(
61-
"test-namespace", 4, ComponentMetricId.ACTIVE_POWER, None
62-
)
63-
active_power_recv = registry.get_or_create(
64-
Sample[Quantity], active_power_request.get_channel_name()
65-
).new_receiver()
66-
await req_sender.send(active_power_request)
67-
68-
reactive_power_request = ComponentMetricRequest(
69-
"test-namespace", 4, ComponentMetricId.REACTIVE_POWER, None
70-
)
71-
_ = registry.get_or_create(
72-
Sample[Quantity], reactive_power_request.get_channel_name()
73-
).new_receiver()
74-
await req_sender.send(reactive_power_request)
75-
76-
soc_request = ComponentMetricRequest(
77-
"test-namespace", 9, ComponentMetricId.SOC, None
78-
)
79-
soc_recv = registry.get_or_create(
80-
Sample[Quantity], soc_request.get_channel_name()
81-
).new_receiver()
82-
await req_sender.send(soc_request)
83-
84-
soc2_request = ComponentMetricRequest(
85-
"test-namespace", 9, ComponentMetricId.SOC, None
86-
)
87-
soc2_recv = registry.get_or_create(
88-
Sample[Quantity], soc2_request.get_channel_name()
89-
).new_receiver()
90-
await req_sender.send(soc2_request)
91-
92-
for _ in range(3):
93-
sample = await soc_recv.receive()
94-
assert sample.value is not None
95-
assert 9.0 == sample.value.base_value
96-
97-
sample = await soc2_recv.receive()
98-
assert sample.value is not None
99-
assert 9.0 == sample.value.base_value
100-
101-
sample = await active_power_recv.receive()
102-
assert sample.value is not None
103-
assert 100.0 == sample.value.base_value
104-
105-
assert await server.graceful_shutdown()
106-
connection_manager._CONNECTION_MANAGER = ( # pylint: disable=protected-access
107-
None
108-
)

0 commit comments

Comments
 (0)