15
15
import time
16
16
from typing import Any , Dict
17
17
18
+ import humanfriendly
19
+
18
20
from monarch .actor import Actor , endpoint , proc_mesh
19
21
20
22
from windtunnel .benchmarks .python_benchmark_runner .benchmark import (
29
31
30
32
class SleepActor (Actor ):
31
33
@endpoint
32
- async def sleep (self , sleep_secs : float , _ : bytes ) -> None :
34
+ async def sleep (self , sleep_secs : float , _ : bytes ) -> int :
33
35
await asyncio .sleep (sleep_secs )
34
36
37
+ return 1
38
+
35
39
36
40
async def run_actor_scaling_benchmark (
37
41
actor_mesh : Any ,
42
+ actor_count : int ,
38
43
message_size : int ,
39
- duration_seconds : int = 10 ,
40
- sleep_secs : float = 0.1 ,
44
+ duration_seconds : int ,
45
+ sleep_secs : float ,
41
46
) -> Dict [str , float ]:
42
47
"""
43
48
Run a benchmark with a specific number of actors and message size.
@@ -57,16 +62,19 @@ async def run_actor_scaling_benchmark(
57
62
58
63
while time .time () - start_benchmark < duration_seconds :
59
64
start_time = time .time ()
60
- await actor_mesh .sleep .call (sleep_secs , payload )
65
+ val_mesh = await actor_mesh .sleep .call (sleep_secs , payload )
61
66
elapsed_time = time .time () - start_time
62
67
times .append (elapsed_time )
68
+
69
+ val = sum ([val [1 ] for val in val_mesh .items ()])
70
+ assert val == actor_count , f"Expected { actor_count } responses, got { val } "
63
71
iteration_count += 1
64
72
65
73
if iteration_count == 0 :
66
74
raise ValueError ("No iterations completed" )
67
75
68
76
times_ms = [t * 1000 for t in times ]
69
- avg_time_ms = sum (times_ms ) / iteration_count
77
+ avg_time_ms = sum (times_ms ) / ( iteration_count * 1.0 )
70
78
sorted_times = sorted (times_ms )
71
79
median_time_ms = (
72
80
sorted_times [iteration_count // 2 ]
@@ -82,7 +90,8 @@ async def run_actor_scaling_benchmark(
82
90
"median_time_ms" : median_time_ms ,
83
91
"min_time_ms" : min (times_ms ),
84
92
"max_time_ms" : max (times_ms ),
85
- "throughput_mbps" : (message_size * 8 ) / (avg_time_ms / 1000 ) / 1_000_000 ,
93
+ "throughput_mBps" : (message_size * actor_count * (1000.0 / avg_time_ms ))
94
+ / 1_000_000 ,
86
95
"iterations" : iteration_count ,
87
96
}
88
97
@@ -106,7 +115,11 @@ async def bench_actor_scaling(counters: UserCounters) -> None:
106
115
await asyncio .sleep (1 )
107
116
108
117
stats = await run_actor_scaling_benchmark (
109
- actor_mesh , message_size , duration_seconds , sleep_secs = 0.1
118
+ actor_mesh ,
119
+ host_count * 8 ,
120
+ message_size ,
121
+ duration_seconds ,
122
+ sleep_secs = 0.1 ,
110
123
)
111
124
await mesh .stop ()
112
125
@@ -121,5 +134,52 @@ async def bench_actor_scaling(counters: UserCounters) -> None:
121
134
)
122
135
123
136
137
+ @register_benchmark (FILE_PATH , use_counters = True )
138
+ async def bench_message_scaling (counters : UserCounters ) -> None :
139
+ """
140
+ Benchmark how long it takes to process messages of different sizes on different numbers of actors.
141
+ Reports average, median, min, max times, throughput in Mbps, and number of iterations completed.
142
+ """
143
+ gpu_counts = [1 , 10 ]
144
+ KB = 1000
145
+ MB = 1000 * KB
146
+ message_sizes = [10 * KB , 100 * KB , 1 * MB , 10 * MB , 100 * MB ]
147
+ duration_seconds = 5
148
+
149
+ for gpus in gpu_counts :
150
+ for message_size in message_sizes :
151
+ if gpus >= 20 and message_size >= 100 * MB :
152
+ continue
153
+ print (f"Testing host_count: { gpus } , message_size: { message_size } " )
154
+ mesh = await proc_mesh (gpus = gpus )
155
+ await mesh .logging_option (stream_to_client = True , aggregate_window_sec = None )
156
+ actor_mesh = await mesh .spawn ("actor" , SleepActor )
157
+ # Allow Actor init to finish
158
+ await asyncio .sleep (1 )
159
+
160
+ stats = await run_actor_scaling_benchmark (
161
+ actor_mesh ,
162
+ gpus ,
163
+ message_size ,
164
+ duration_seconds ,
165
+ sleep_secs = 0.0 ,
166
+ )
167
+ await mesh .stop ()
168
+
169
+ size = humanfriendly .format_size (message_size )
170
+ counters [f"hosts_{ gpus } _size_{ size } _median_ms" ] = UserMetric (
171
+ value = int (stats ["median_time_ms" ])
172
+ )
173
+ counters [f"hosts_{ gpus } _size_{ size } _min_ms" ] = UserMetric (
174
+ value = int (stats ["min_time_ms" ])
175
+ )
176
+ counters [f"hosts_{ gpus } _size_{ size } _max_ms" ] = UserMetric (
177
+ value = int (stats ["max_time_ms" ])
178
+ )
179
+ counters [f"hosts_{ gpus } _size_{ size } _throughput_mBps" ] = UserMetric (
180
+ value = int (stats ["throughput_mBps" ])
181
+ )
182
+
183
+
124
184
if __name__ == "__main__" :
125
185
asyncio .run (main ())
0 commit comments