Skip to content

Commit e3df51b

Browse files
committed
Add datasourcing benchmark
Signed-off-by: Mathias L. Baumann <[email protected]>
1 parent 37a8471 commit e3df51b

File tree

1 file changed

+190
-0
lines changed

1 file changed

+190
-0
lines changed
Lines changed: 190 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,190 @@
1+
# License: MIT
2+
# Copyright © 2023 Frequenz Energy-as-a-Service GmbH
3+
4+
"""Benchmark the data sourcing actor.
5+
6+
To be able to access the `tests` package we need to adjust the PYTHONPATH.
7+
8+
Usage:
9+
10+
PYTHONPATH=. python benchmark_datasourcing.py <num ev chargers> <num messages per battery>
11+
"""
12+
import argparse
13+
import asyncio
14+
import sys
15+
import tracemalloc
16+
from time import perf_counter
17+
from typing import Any, Tuple
18+
19+
from frequenz.channels import Broadcast, Receiver, ReceiverStoppedError
20+
21+
from frequenz.sdk import microgrid
22+
from frequenz.sdk.actor import (
23+
ChannelRegistry,
24+
ComponentMetricRequest,
25+
DataSourcingActor,
26+
)
27+
from frequenz.sdk.microgrid.component import ComponentMetricId
28+
29+
try:
30+
from tests.timeseries.mock_microgrid import MockMicrogrid
31+
from tests.utils import MockMicrogridClient
32+
except ImportError:
33+
print(
34+
"Error: Unable to import the `tests` package. "
35+
"Please make sure that the PYTHONPATH env variable is set to the root of the repository."
36+
)
37+
sys.exit(1)
38+
39+
COMPONENT_METRIC_IDS = [
40+
ComponentMetricId.CURRENT_PHASE_1,
41+
ComponentMetricId.CURRENT_PHASE_2,
42+
ComponentMetricId.CURRENT_PHASE_3,
43+
]
44+
45+
46+
def enable_mock_client(client: MockMicrogridClient) -> None:
47+
"""Enable the mock microgrid client.
48+
49+
Args:
50+
client: the mock microgrid client to enable.
51+
"""
52+
# pylint: disable=protected-access
53+
microgrid.connection_manager._CONNECTION_MANAGER = client.mock_microgrid
54+
55+
56+
# pylint: disable=too-many-locals
57+
async def benchmark_data_sourcing(
58+
num_ev_chargers: int, num_msgs_per_battery: int
59+
) -> None:
60+
"""Benchmark the data sourcing actor.
61+
62+
Benchmark the data sourcing actor by sending out a number of requests and
63+
printing out the number of samples sent and the time taken.
64+
65+
Args:
66+
num_ev_chargers: number of EV Chargers to create for the mock microgrid.
67+
num_msgs_per_battery: number of messages to send out for each battery.
68+
"""
69+
num_expected_messages = (
70+
num_ev_chargers * len(COMPONENT_METRIC_IDS) * num_msgs_per_battery
71+
)
72+
mock_grid = MockMicrogrid(
73+
grid_side_meter=False, num_values=num_msgs_per_battery, sample_rate_s=0.0
74+
)
75+
76+
mock_grid.add_ev_chargers(num_ev_chargers)
77+
mock_grid.start_mock_client(enable_mock_client)
78+
79+
request_channel = Broadcast[ComponentMetricRequest](
80+
"DataSourcingActor Request Channel"
81+
)
82+
83+
channel_registry = ChannelRegistry(name="Microgrid Channel Registry")
84+
request_receiver = request_channel.new_receiver(
85+
"datasourcing-benchmark", maxsize=(num_ev_chargers * len(COMPONENT_METRIC_IDS))
86+
)
87+
request_sender = request_channel.new_sender()
88+
89+
consume_tasks = []
90+
91+
start_time = perf_counter()
92+
samples_sent = 0
93+
94+
async def consume(channel: Receiver[Any]) -> None:
95+
while True:
96+
try:
97+
await channel.ready()
98+
channel.consume()
99+
except ReceiverStoppedError:
100+
return
101+
102+
nonlocal samples_sent
103+
samples_sent += 1
104+
105+
for evc_id in mock_grid.evc_ids:
106+
for component_metric_id in COMPONENT_METRIC_IDS:
107+
request = ComponentMetricRequest(
108+
"current_phase_requests", evc_id, component_metric_id, None
109+
)
110+
111+
recv_channel = channel_registry.new_receiver(request.get_channel_name())
112+
113+
await request_sender.send(request)
114+
consume_tasks.append(asyncio.create_task(consume(recv_channel)))
115+
116+
DataSourcingActor(request_receiver, channel_registry)
117+
118+
await asyncio.gather(*consume_tasks)
119+
120+
time_taken = perf_counter() - start_time
121+
122+
await mock_grid.cleanup()
123+
124+
print(f"Samples Sent: {samples_sent}, time taken: {time_taken}")
125+
print(f"Samples per second: {samples_sent / time_taken}")
126+
print(
127+
"Expected samples: "
128+
f"{num_expected_messages}, missing: {num_expected_messages - samples_sent}"
129+
)
130+
print(
131+
f"Missing per EVC: {(num_expected_messages - samples_sent) / num_ev_chargers}"
132+
)
133+
134+
135+
def parse_args() -> Tuple[int, int, bool]:
136+
"""Parse the command line arguments.
137+
138+
Returns:
139+
A tuple of (num ev chargers, num messages per battery, record allocations).
140+
"""
141+
parser = argparse.ArgumentParser(description="Benchmark the data sourcing actor.")
142+
parser.add_argument(
143+
"num_ev_chargers",
144+
type=int,
145+
help="Number of EV Chargers to create for the mock microgrid.",
146+
)
147+
parser.add_argument(
148+
"num_msgs_per_battery",
149+
type=int,
150+
help="Number of messages to send out for each battery.",
151+
)
152+
parser.add_argument(
153+
"--record-allocations",
154+
action="store_true",
155+
help="Record memory allocations.",
156+
)
157+
158+
args = parser.parse_args()
159+
160+
return args.num_ev_chargers, args.num_msgs_per_battery, args.record_allocations
161+
162+
163+
def main() -> None:
164+
"""Start everything."""
165+
(
166+
num_ev_chargers_pararm,
167+
num_msgs_per_battery_param,
168+
record_allocations,
169+
) = parse_args()
170+
171+
if record_allocations:
172+
tracemalloc.start()
173+
174+
asyncio.run(
175+
benchmark_data_sourcing(num_ev_chargers_pararm, num_msgs_per_battery_param)
176+
)
177+
178+
if not record_allocations:
179+
sys.exit(0)
180+
181+
snapshot = tracemalloc.take_snapshot()
182+
top_stats = snapshot.statistics("lineno")
183+
184+
print("\n[ Top 10 ]")
185+
for stat in top_stats[:10]:
186+
print(stat)
187+
188+
189+
if __name__ == "__main__":
190+
main()

0 commit comments

Comments
 (0)