44
55"""Benchmark Iris controller performance under realistic load.
66
7- Simulates a cluster with 25 TPU slices of varying sizes and 100 training jobs,
8- measuring scheduler performance, job scheduling latency, and resource utilization.
7+ Two benchmark modes:
8+
9+ 1. ``benchmark`` (default): Simulates a cluster with 25 TPU slices of varying
10+ sizes and 100 training jobs, measuring scheduler performance, job scheduling
11+ latency, and resource utilization.
12+
13+ 2. ``single-worker``: Submits many jobs to a single CPU worker as fast as
14+ possible while streaming logs from every job. This exercises the controller
15+ hot-path that was overwhelmed in #3062 (125+ simultaneous task pods on one
16+ worker caused DEADLINE_EXCEEDED RPC timeouts).
917
1018Usage:
1119 uv run python lib/iris/tests/e2e/benchmark_controller.py
1220 uv run python lib/iris/tests/e2e/benchmark_controller.py --num-jobs 200 --num-slices 50
1321 uv run python lib/iris/tests/e2e/benchmark_controller.py --profile --profile-output ./profiles
22+ uv run python lib/iris/tests/e2e/benchmark_controller.py single-worker --num-jobs 100
1423
15- This benchmark helps detect performance regressions like #2802 (SSL context overhead).
24+ This benchmark helps detect performance regressions like #2802 (SSL context overhead)
25+ and #3062 (single-worker burst overwhelms controller).
1626"""
1727
1828import json
2737from pathlib import Path
2838
2939import click
40+ import humanfriendly
3041import psutil
3142from iris .client .client import IrisClient , Job , ResourceSpec
3243from iris .cluster .config import load_config , make_local_config
3344from iris .cluster .manager import connect_cluster
34- from iris .cluster .types import get_tpu_topology , tpu_device
45+ from iris .cluster .types import Entrypoint , EnvironmentSpec , get_tpu_topology , tpu_device
3546from iris .rpc import cluster_pb2 , config_pb2
3647from iris .rpc .cluster_connect import ControllerServiceClientSync
3748
@@ -132,8 +143,6 @@ def _make_benchmark_config(num_slices: int) -> config_pb2.IrisClusterConfig:
132143
133144def _parse_size (size_str : str ) -> int :
134145 """Parse human-readable size string to bytes."""
135- import humanfriendly
136-
137146 return humanfriendly .parse_size (size_str )
138147
139148
@@ -155,8 +164,6 @@ def _submit_job_mix(client: IrisClient, num_jobs: int, workspace: Path) -> tuple
155164 Returns:
156165 (schedulable_jobs, unschedulable_jobs) tuple
157166 """
158- from iris .cluster .types import Entrypoint , EnvironmentSpec
159-
160167 num_small = int (num_jobs * 0.60 )
161168 num_medium = int (num_jobs * 0.25 )
162169 num_large = int (num_jobs * 0.10 )
@@ -345,12 +352,191 @@ def run_benchmark(num_jobs: int, num_slices: int) -> BenchmarkMetrics:
345352 controller_client .close ()
346353
347354
348- @click .group (invoke_without_command = True )
349- @click .pass_context
350- def cli (ctx : click .Context ) -> None :
355+ def _make_single_worker_config () -> config_pb2 .IrisClusterConfig :
356+ """Build a local cluster config with a single CPU worker.
357+
358+ This deliberately funnels all work onto one worker so we can observe how the
359+ controller handles a burst of task creation, scheduling, log streaming, and
360+ completion RPCs from a single source — the scenario that triggered #3062.
361+ """
362+ config = load_config (TEST_ROOT / "examples" / "demo.yaml" )
363+ config .scale_groups .clear ()
364+
365+ sg = config .scale_groups ["local-cpu" ]
366+ sg .name = "local-cpu"
367+ sg .accelerator_type = config_pb2 .ACCELERATOR_TYPE_CPU
368+ sg .num_vms = 1
369+ sg .min_slices = 1
370+ sg .max_slices = 1
371+ sg .resources .cpu_millicores = 128 * 1000
372+ sg .resources .memory_bytes = 256 * 1024 ** 3
373+ sg .resources .disk_bytes = 500 * 1024 ** 3
374+ sg .slice_template .local .SetInParent ()
375+
376+ return make_local_config (config )
377+
378+
379+ def _cpu_burn_task (seconds : float = 1.0 ):
380+ """CPU-bound task that burns cycles in a tight loop.
381+
382+ Generates enough work to keep the subprocess busy for approximately
383+ *seconds*, giving the controller realistic scheduling and log-streaming
384+ pressure.
385+ """
386+ print (f"burning cpu for { seconds } s" )
387+ deadline = time .monotonic () + seconds
388+ total = 0
389+ while time .monotonic () < deadline :
390+ for _ in range (10_000 ):
391+ total += 1
392+ print (f"done ({ total } iterations)" )
393+ return total
394+
395+
396+ def run_single_worker_benchmark (num_jobs : int ) -> BenchmarkMetrics :
397+ """Submit *num_jobs* to one worker as fast as possible, streaming logs.
398+
399+ Every job is waited on concurrently with ``stream_logs=True`` and
400+ ``include_children=True``, mirroring how Marin's ferry driver monitors a
401+ batch of download tasks. The goal is to stress the controller's RPC
402+ handling and task-dispatch path when a single worker is hit with many
403+ concurrent requests.
404+ """
405+ print ("\n " + "=" * 70 )
406+ print ("Iris Controller Benchmark — single-worker burst" )
407+ print ("=" * 70 )
408+ print ("Configuration:" )
409+ print (f" Jobs: { num_jobs } " )
410+ print (" Workers: 1 (all jobs target the same worker)" )
411+ print (" Log streaming: ON" )
412+ print ("=" * 70 + "\n " )
413+
414+ config = _make_single_worker_config ()
415+
416+ print ("Starting local cluster with 1 CPU worker..." )
417+ with connect_cluster (config ) as url :
418+ client = IrisClient .remote (url , workspace = TEST_ROOT )
419+ controller_client = ControllerServiceClientSync (address = url , timeout_ms = 30000 )
420+
421+ try :
422+ print ("Waiting for worker to register..." )
423+ _wait_for_workers (controller_client , 1 , timeout = 60.0 )
424+
425+ controller_proc = psutil .Process (os .getpid ())
426+ mem_before = controller_proc .memory_info ().rss
427+
428+ # Submit all jobs as fast as possible — this is the burst.
429+ print (f"Submitting { num_jobs } jobs..." )
430+ submit_start = time .time ()
431+ jobs : list [Job ] = []
432+ for i in range (num_jobs ):
433+ job = client .submit (
434+ entrypoint = Entrypoint .from_callable (_cpu_burn_task , 1.0 ),
435+ name = f"burst-{ i :04d} " ,
436+ resources = ResourceSpec (cpu = 0 , memory = "64m" ),
437+ environment = EnvironmentSpec (),
438+ )
439+ jobs .append (job )
440+ submission_time = time .time () - submit_start
441+ logger .info ("Submitted %d jobs in %.3fs" , num_jobs , submission_time )
442+
443+ # Wait for every job with log streaming enabled, concurrently.
444+ print (f"Waiting for { num_jobs } jobs (streaming logs)..." )
445+ wait_start = time .monotonic ()
446+ results = _wait_all_jobs_threaded (jobs , timeout = 300.0 )
447+ time_to_complete = time .monotonic () - wait_start
448+
449+ for r in results :
450+ if r .error is not None :
451+ logger .warning ("Job %s errored during wait: %s" , r .job .job_id , r .error )
452+
453+ mem_after = controller_proc .memory_info ().rss
454+ memory_delta_mb = (mem_after - mem_before ) / (1024 * 1024 )
455+
456+ final_counts : dict [str , int ] = defaultdict (int )
457+ for r in results :
458+ final_counts [r .state_name ] += 1
459+ final_counts = dict (final_counts )
460+
461+ metrics = BenchmarkMetrics (
462+ num_jobs = num_jobs ,
463+ num_slices = 1 ,
464+ submission_time_seconds = submission_time ,
465+ time_to_complete = time_to_complete ,
466+ controller_memory_mb = memory_delta_mb ,
467+ jobs_by_state = final_counts ,
468+ )
469+
470+ print ("\n " + "=" * 70 )
471+ print ("Benchmark Results (single-worker burst):" )
472+ print ("-" * 70 )
473+ print (f" Job submission time: { metrics .submission_time_seconds :>10.2f} s" )
474+ print (f" Time to complete: { metrics .time_to_complete :>10.2f} s" )
475+ print (f" Controller memory delta: { metrics .controller_memory_mb :>10.1f} MB" )
476+ print (f" Throughput: { num_jobs / metrics .time_to_complete :>10.1f} jobs/s" )
477+ print ("\n Final job states:" )
478+ for state , count in sorted (metrics .jobs_by_state .items ()):
479+ print (f" { state :<30} { count :>5} " )
480+ print ("=" * 70 + "\n " )
481+
482+ return metrics
483+
484+ finally :
485+ controller_client .close ()
486+
487+
488+ @click .group ()
489+ def cli () -> None :
351490 """Benchmark Iris controller performance."""
352- if ctx .invoked_subcommand is None :
353- ctx .invoke (benchmark )
491+ pass
492+
493+
494+ def _run_with_pyspy (
495+ subcommand : str ,
496+ cli_args : list [str ],
497+ profile_output : Path ,
498+ speedscope_name : str ,
499+ ) -> None :
500+ """Re-launch this script under py-spy for CPU profiling.
501+
502+ Constructs a py-spy command that invokes ``__file__ <subcommand> <cli_args>``
503+ and writes the speedscope profile to *profile_output / speedscope_name*.
504+ """
505+ print (f"\n Profiling enabled: output will be saved to { profile_output } " )
506+ print ("Note: py-spy requires sudo permissions\n " )
507+
508+ profile_output .mkdir (parents = True , exist_ok = True )
509+ speedscope_file = profile_output / speedscope_name
510+
511+ pyspy_cmd = [
512+ "sudo" ,
513+ "py-spy" ,
514+ "record" ,
515+ "--format" ,
516+ "speedscope" ,
517+ "--output" ,
518+ str (speedscope_file ),
519+ "--rate" ,
520+ "100" ,
521+ "--subprocesses" ,
522+ "--gil" ,
523+ "--idle" ,
524+ "--" ,
525+ sys .executable ,
526+ __file__ ,
527+ subcommand ,
528+ * cli_args ,
529+ ]
530+
531+ print (f"Running: { ' ' .join (pyspy_cmd )} \n " )
532+ result = subprocess .run (pyspy_cmd )
533+
534+ if result .returncode == 0 :
535+ _print_profile_table (speedscope_file )
536+ print (f"Speedscope profile saved to { speedscope_file } " )
537+ print ("To view: https://www.speedscope.app/" )
538+ else :
539+ print (f"\n py-spy failed with return code { result .returncode } " )
354540
355541
356542def _print_profile_table (speedscope_path : Path , top_n : int = 30 ) -> None :
@@ -404,59 +590,56 @@ def _print_profile_table(speedscope_path: Path, top_n: int = 30) -> None:
404590 default = Path ("/tmp/profiles" ),
405591 help = "Directory for profile output (default: /tmp/profiles/)" ,
406592)
407- def benchmark (
593+ def multi_tpu (
408594 num_jobs : int ,
409595 num_slices : int ,
410596 profile : bool = False ,
411597 profile_output : Path | None = None ,
412598) -> None :
413599 """Run controller benchmark."""
414600 if profile :
415- print (f"\n Profiling enabled: output will be saved to { profile_output } " )
416- print ("Note: py-spy requires sudo permissions\n " )
417-
418- profile_output .mkdir (parents = True , exist_ok = True )
419- speedscope_file = profile_output / "controller_benchmark.speedscope"
420-
421- pyspy_cmd = [
422- "sudo" ,
423- "py-spy" ,
424- "record" ,
425- "--format" ,
426- "speedscope" ,
427- "--output" ,
428- str (speedscope_file ),
429- "--rate" ,
430- "100" ,
431- "--subprocesses" ,
432- "--gil" ,
433- "--idle" ,
434- "--" ,
435- sys .executable ,
436- __file__ ,
437- "benchmark" ,
438- "--num-jobs" ,
439- str (num_jobs ),
440- "--num-slices" ,
441- str (num_slices ),
442- ]
443-
444- print (f"Running: { ' ' .join (pyspy_cmd )} \n " )
445- result = subprocess .run (pyspy_cmd )
446-
447- if result .returncode == 0 :
448- _print_profile_table (speedscope_file )
449- print (f"Speedscope profile saved to { speedscope_file } " )
450- print ("To view: https://www.speedscope.app/" )
451- else :
452- print (f"\n py-spy failed with return code { result .returncode } " )
453-
601+ _run_with_pyspy (
602+ "multi_tpu" ,
603+ ["--num-jobs" , str (num_jobs ), "--num-slices" , str (num_slices )],
604+ profile_output ,
605+ "controller_benchmark.speedscope" ,
606+ )
454607 return
455608
456- # Normal benchmark mode
457609 run_benchmark (num_jobs = num_jobs , num_slices = num_slices )
458610
459611
612+ @cli .command ("single-worker" )
613+ @click .option ("--num-jobs" , type = int , default = 100 , help = "Number of jobs to burst-submit to a single worker" )
614+ @click .option ("--profile" , is_flag = True , help = "Profile with py-spy (requires sudo)" )
615+ @click .option (
616+ "--profile-output" ,
617+ type = click .Path (path_type = Path ),
618+ default = Path ("/tmp/profiles" ),
619+ help = "Directory for profile output (default: /tmp/profiles/)" ,
620+ )
621+ def single_worker (
622+ num_jobs : int ,
623+ profile : bool = False ,
624+ profile_output : Path | None = None ,
625+ ) -> None :
626+ """Burst-submit jobs to a single worker with log streaming.
627+
628+ Exercises the controller hot-path from #3062: many concurrent task
629+ creations, log-stream RPCs, and completions funneled through one worker.
630+ """
631+ if profile :
632+ _run_with_pyspy (
633+ "single-worker" ,
634+ ["--num-jobs" , str (num_jobs )],
635+ profile_output ,
636+ "single_worker_benchmark.speedscope" ,
637+ )
638+ return
639+
640+ run_single_worker_benchmark (num_jobs = num_jobs )
641+
642+
460643if __name__ == "__main__" :
461644 from iris .logging import configure_logging
462645
0 commit comments