|
| 1 | +#!/usr/bin/env python3 |
| 2 | +# Copyright (c) Meta Platforms, Inc. and affiliates. |
| 3 | +# All rights reserved. |
| 4 | +# |
| 5 | +# This source code is licensed under the BSD-style license found in the |
| 6 | +# LICENSE file in the root directory of this source tree. |
| 7 | + |
| 8 | +# pyre-strict |
| 9 | + |
| 10 | +""" |
| 11 | +Benchmark for measuring message throughput in Monarch actor mesh. |
| 12 | +""" |
| 13 | + |
| 14 | +import asyncio |
| 15 | +import time |
| 16 | +from typing import Any, Dict |
| 17 | + |
| 18 | +from monarch.actor import Actor, endpoint, proc_mesh |
| 19 | + |
| 20 | +from windtunnel.benchmarks.python_benchmark_runner.benchmark import ( |
| 21 | + main, |
| 22 | + register_benchmark, |
| 23 | + UserCounters, |
| 24 | + UserMetric, |
| 25 | +) |
| 26 | + |
| 27 | +FILE_PATH: str = "monarch/python/benches/actor_mesh_benchmark.py" |
| 28 | + |
| 29 | + |
| 30 | +class SleepActor(Actor): |
| 31 | + @endpoint |
| 32 | + async def sleep(self, sleep_secs: float, _: bytes) -> None: |
| 33 | + await asyncio.sleep(sleep_secs) |
| 34 | + |
| 35 | + |
| 36 | +async def run_actor_scaling_benchmark( |
| 37 | + actor_mesh: Any, |
| 38 | + message_size: int, |
| 39 | + duration_seconds: int = 10, |
| 40 | + sleep_secs: float = 0.1, |
| 41 | +) -> Dict[str, float]: |
| 42 | + """ |
| 43 | + Run a benchmark with a specific number of actors and message size. |
| 44 | + Returns statistics about the benchmark run including: |
| 45 | + - avg_time_ms: average time per iteration in milliseconds |
| 46 | + - median_time_ms: median time per iteration in milliseconds |
| 47 | + - min_time_ms: minimum time per iteration in milliseconds |
| 48 | + - max_time_ms: maximum time per iteration in milliseconds |
| 49 | + - throughput_mbps: throughput in megabits per second |
| 50 | + - iterations: number of iterations completed |
| 51 | + """ |
| 52 | + payload = bytes(message_size) |
| 53 | + times = [] |
| 54 | + |
| 55 | + start_benchmark = time.time() |
| 56 | + iteration_count = 0 |
| 57 | + |
| 58 | + while time.time() - start_benchmark < duration_seconds: |
| 59 | + start_time = time.time() |
| 60 | + await actor_mesh.sleep.call(sleep_secs, payload) |
| 61 | + elapsed_time = time.time() - start_time |
| 62 | + times.append(elapsed_time) |
| 63 | + iteration_count += 1 |
| 64 | + |
| 65 | + if iteration_count == 0: |
| 66 | + raise ValueError("No iterations completed") |
| 67 | + |
| 68 | + times_ms = [t * 1000 for t in times] |
| 69 | + avg_time_ms = sum(times_ms) / iteration_count |
| 70 | + sorted_times = sorted(times_ms) |
| 71 | + median_time_ms = ( |
| 72 | + sorted_times[iteration_count // 2] |
| 73 | + if iteration_count % 2 == 1 |
| 74 | + else ( |
| 75 | + sorted_times[iteration_count // 2 - 1] + sorted_times[iteration_count // 2] |
| 76 | + ) |
| 77 | + / 2 |
| 78 | + ) |
| 79 | + |
| 80 | + return { |
| 81 | + "avg_time_ms": avg_time_ms, |
| 82 | + "median_time_ms": median_time_ms, |
| 83 | + "min_time_ms": min(times_ms), |
| 84 | + "max_time_ms": max(times_ms), |
| 85 | + "throughput_mbps": (message_size * 8) / (avg_time_ms / 1000) / 1_000_000, |
| 86 | + "iterations": iteration_count, |
| 87 | + } |
| 88 | + |
| 89 | + |
| 90 | +@register_benchmark(FILE_PATH, use_counters=True) |
| 91 | +async def bench_actor_scaling(counters: UserCounters) -> None: |
| 92 | + """ |
| 93 | + Benchmark how long it takes to process 1KB message on different numbers of actors. |
| 94 | + Reports average, median, min, and max times. |
| 95 | + """ |
| 96 | + host_counts = [1, 10, 100] |
| 97 | + message_sizes = [1024] |
| 98 | + duration_seconds = 10 |
| 99 | + |
| 100 | + for host_count in host_counts: |
| 101 | + for message_size in message_sizes: |
| 102 | + mesh = await proc_mesh(hosts=host_count) |
| 103 | + await mesh.logging_option(stream_to_client=False, aggregate_window_sec=None) |
| 104 | + actor_mesh = await mesh.spawn("actor", SleepActor) |
| 105 | + # Allow Actor init to finish |
| 106 | + await asyncio.sleep(1) |
| 107 | + |
| 108 | + stats = await run_actor_scaling_benchmark( |
| 109 | + actor_mesh, message_size, duration_seconds, sleep_secs=0.1 |
| 110 | + ) |
| 111 | + await mesh.stop() |
| 112 | + |
| 113 | + counters[f"actor_count_{host_count}_median_ms"] = UserMetric( |
| 114 | + value=int(stats["median_time_ms"]) |
| 115 | + ) |
| 116 | + counters[f"actor_count_{host_count}_min_ms"] = UserMetric( |
| 117 | + value=int(stats["min_time_ms"]) |
| 118 | + ) |
| 119 | + counters[f"actor_count_{host_count}_max_ms"] = UserMetric( |
| 120 | + value=int(stats["max_time_ms"]) |
| 121 | + ) |
| 122 | + |
| 123 | + |
| 124 | +if __name__ == "__main__": |
| 125 | + asyncio.run(main()) |
0 commit comments