@@ -74,6 +74,7 @@ def __init__(
7474 default_docker_image : Optional [str ] = None ,
7575 default_agent_cli_args : Optional [List [str ]] = None ,
7676 force_commit : bool = False ,
77+ explicit_max_parallel : bool = False ,
7778 ):
7879 """
7980 Initialize orchestrator.
@@ -107,6 +108,8 @@ def __init__(
107108 self .default_agent_cli_args = []
108109 # Runner behavior: force a commit if workspace has changes
109110 self .force_commit : bool = bool (force_commit )
111+ # Whether operator explicitly set max_parallel (override guards/policies)
112+ self ._explicit_max_parallel = bool (explicit_max_parallel )
110113 # Load model mapping checksum for handshake (single-process still validates equality)
111114 try :
112115 from ..utils .model_mapping import load_model_mapping
@@ -251,9 +254,11 @@ async def initialize(self) -> None:
251254 logger .info (
252255 f"Parallelism(auto): host_cpu={ host_cpu } , per_container={ per_container } -> max_parallel_instances={ adaptive } "
253256 )
254- # Oversubscription warning
257+ # Oversubscription warning only when not explicit
255258 try :
256- if (int (self .max_parallel_instances ) * per_container ) > host_cpu :
259+ if (
260+ int (self .max_parallel_instances ) * per_container
261+ ) > host_cpu and not getattr (self , "_explicit_max_parallel" , False ):
257262 logger .warning (
258263 f"Configured parallelism may oversubscribe CPU: max_parallel_instances={ self .max_parallel_instances } , per_container_cpu={ per_container } , host_cpu={ host_cpu } "
259264 )
@@ -266,8 +271,11 @@ async def initialize(self) -> None:
266271 self ._resource_pool = asyncio .Semaphore (int (self .max_parallel_instances or 1 ))
267272
268273 # Start multiple background executors for true parallel execution
269- # Start a reasonable number of executors (min of max_parallel and 10)
270- num_executors = min (int (self .max_parallel_instances or 1 ), 10 )
274+ # If explicit max-parallel set, honor fully; otherwise cap at 10
275+ if getattr (self , "_explicit_max_parallel" , False ):
276+ num_executors = int (self .max_parallel_instances or 1 )
277+ else :
278+ num_executors = min (int (self .max_parallel_instances or 1 ), 10 )
271279 for i in range (num_executors ):
272280 task = asyncio .create_task (self ._instance_executor ())
273281 self ._executor_tasks .append (task )
@@ -1359,13 +1367,21 @@ async def _admission_wait(self, cpu_need: int, mem_need_gb: int) -> None:
13591367 slope = 0.0
13601368
13611369 async with self ._admission_lock :
1362- cpu_ok = (self ._cpu_in_use + cpu_need ) <= self ._host_cpu
1363- mem_ok = (self ._mem_in_use_gb + mem_need_gb ) <= int (
1364- self ._host_mem_gb * self ._mem_guard_pct
1365- )
1366- disk_ok = (free_gb >= self ._disk_min_free_gb ) and (
1367- slope <= self ._pack_max_slope_mib_per_min
1368- )
1370+ if getattr (self , "_explicit_max_parallel" , False ):
1371+ # Honor operator's explicit parallel setting: bypass CPU/memory guard; keep disk guard
1372+ disk_ok = (free_gb >= self ._disk_min_free_gb ) and (
1373+ slope <= self ._pack_max_slope_mib_per_min
1374+ )
1375+ cpu_ok = True
1376+ mem_ok = True
1377+ else :
1378+ cpu_ok = (self ._cpu_in_use + cpu_need ) <= self ._host_cpu
1379+ mem_ok = (self ._mem_in_use_gb + mem_need_gb ) <= int (
1380+ self ._host_mem_gb * self ._mem_guard_pct
1381+ )
1382+ disk_ok = (free_gb >= self ._disk_min_free_gb ) and (
1383+ slope <= self ._pack_max_slope_mib_per_min
1384+ )
13691385
13701386 if cpu_ok and mem_ok and disk_ok :
13711387 self ._cpu_in_use += cpu_need
0 commit comments