Skip to content

Commit 705b7d2

Browse files
committed
feat: --max-startup-parallel
1 parent a1f3a46 commit 705b7d2

File tree

5 files changed

+145
-30
lines changed

5 files changed

+145
-30
lines changed

src/cli.py

Lines changed: 65 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -264,6 +264,12 @@ def create_parser(cls) -> argparse.ArgumentParser:
264264
default=None,
265265
help="Max parallel instances (auto if omitted)",
266266
)
267+
g_limits.add_argument(
268+
"--max-startup-parallel",
269+
type=int,
270+
default=None,
271+
help="Max parallel startup initializations (default: min(10,max-parallel))",
272+
)
267273
g_limits.add_argument(
268274
"--timeout", type=int, default=3600, help="Timeout per instance (seconds)"
269275
)
@@ -844,6 +850,7 @@ def _add(field: str, reason: str, example: str = ""):
844850
_add("import_conflict_policy", "must be fail|overwrite|suffix", "fail")
845851
except Exception:
846852
pass
853+
# Allow 'auto' for both
847854
try:
848855
mpi = full_config.get("orchestration", {}).get(
849856
"max_parallel_instances", "auto"
@@ -864,6 +871,26 @@ def _add(field: str, reason: str, example: str = ""):
864871
"must be integer or 'auto'",
865872
"auto",
866873
)
874+
try:
875+
mps = full_config.get("orchestration", {}).get(
876+
"max_parallel_startup", "auto"
877+
)
878+
if isinstance(mps, str) and mps.lower() == "auto":
879+
pass
880+
else:
881+
v2 = int(mps)
882+
if v2 <= 0:
883+
_add(
884+
"orchestration.max_parallel_startup",
885+
"must be > 0 or 'auto'",
886+
"auto",
887+
)
888+
except Exception:
889+
_add(
890+
"orchestration.max_parallel_startup",
891+
"must be integer or 'auto'",
892+
"auto",
893+
)
867894
# Strategy exists
868895
try:
869896
strategy = full_config.get("strategy", args.strategy)
@@ -1598,6 +1625,10 @@ async def run_orchestrator(self, args: argparse.Namespace) -> int:
15981625
cli_config.setdefault("orchestration", {})[
15991626
"max_parallel_instances"
16001627
] = args.max_parallel
1628+
if getattr(args, "max_startup_parallel", None):
1629+
cli_config.setdefault("orchestration", {})[
1630+
"max_parallel_startup"
1631+
] = args.max_startup_parallel
16011632
if hasattr(args, "timeout") and args.timeout:
16021633
cli_config.setdefault("runner", {})["timeout"] = args.timeout
16031634
if hasattr(args, "force_commit") and args.force_commit:
@@ -1800,6 +1831,9 @@ def _red(k, v):
18001831

18011832
# Get orchestration settings from merged config
18021833
max_parallel = full_config["orchestration"]["max_parallel_instances"]
1834+
max_startup_parallel = full_config["orchestration"].get(
1835+
"max_parallel_startup", "auto"
1836+
)
18031837
state_dir = full_config.get("orchestration", {}).get(
18041838
"state_dir"
18051839
) or full_config.get("state_dir", Path("./pitaya_state"))
@@ -1820,17 +1854,38 @@ def _red(k, v):
18201854
# Respect global session volume consent by setting env for runner
18211855
allow_global_session = bool(getattr(args, "allow_global_session_volume", False))
18221856

1823-
# Resolve max_parallel without host resource calculations
1824-
try:
1825-
if isinstance(max_parallel, str):
1826-
# Accept numeric strings; treat non-numeric (e.g., 'auto') as default 5
1857+
# Resolve parallelism
1858+
import os as _os
1859+
1860+
def _cpu_default() -> int:
1861+
try:
1862+
return max(2, int(_os.cpu_count() or 2))
1863+
except Exception:
1864+
return 2
1865+
1866+
# Total parallel: auto -> cpu-based
1867+
if isinstance(max_parallel, str):
1868+
if max_parallel.lower() == "auto":
1869+
max_parallel_val = _cpu_default()
1870+
else:
18271871
max_parallel_val = int(max_parallel)
1828-
elif isinstance(max_parallel, int):
1829-
max_parallel_val = max_parallel
1872+
elif isinstance(max_parallel, int):
1873+
max_parallel_val = max(1, max_parallel)
1874+
else:
1875+
max_parallel_val = _cpu_default()
1876+
1877+
# Startup parallel: auto -> min(10, total)
1878+
if isinstance(max_startup_parallel, str):
1879+
if max_startup_parallel.lower() == "auto":
1880+
max_startup_parallel_val = min(10, max_parallel_val)
18301881
else:
1831-
max_parallel_val = 5
1832-
except Exception:
1833-
max_parallel_val = 5
1882+
max_startup_parallel_val = int(max_startup_parallel)
1883+
elif isinstance(max_startup_parallel, int):
1884+
max_startup_parallel_val = max(1, max_startup_parallel)
1885+
else:
1886+
max_startup_parallel_val = min(10, max_parallel_val)
1887+
# Clamp startup to not exceed total
1888+
max_startup_parallel_val = min(max_startup_parallel_val, max_parallel_val)
18341889

18351890
# Proxy automatic egress defaults removed
18361891

@@ -1851,6 +1906,7 @@ def _red(k, v):
18511906
# Create orchestrator
18521907
self.orchestrator = Orchestrator(
18531908
max_parallel_instances=max_parallel_val,
1909+
max_parallel_startup=max_startup_parallel_val,
18541910
state_dir=Path(state_dir),
18551911
logs_dir=Path(logs_dir),
18561912
container_limits=container_limits,

src/config.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -262,8 +262,10 @@ def get_default_config() -> Dict[str, Any]:
262262
"tmpfs_size_mb": 512,
263263
},
264264
"orchestration": {
265-
# Default concurrency: 5; use --max-parallel to override
266-
"max_parallel_instances": 5,
265+
# Default concurrency: auto-calculated from CPU count
266+
"max_parallel_instances": "auto",
267+
# Startup cap: auto -> min(10, max_parallel_instances)
268+
"max_parallel_startup": "auto",
267269
# Branch namespace is hierarchical by default
268270
# Format: orc/<strategy>/<run_id>/k<short8>
269271
"branch_namespace": "hierarchical",

src/instance_runner/api.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
"""
88

99
from pathlib import Path
10+
import asyncio
1011
from typing import Any, Callable, Dict, Optional
1112

1213
from ..shared import (
@@ -34,6 +35,7 @@ async def run_instance(
3435
session_id: Optional[str] = None,
3536
operator_resume: bool = False,
3637
event_callback: Optional[Callable[[Dict[str, Any]], None]] = None,
38+
startup_semaphore: Optional[asyncio.Semaphore] = None,
3739
timeout_seconds: int = 3600,
3840
container_limits: Optional[ContainerLimits] = None,
3941
auth_config: Optional[AuthConfig] = None,
@@ -114,6 +116,7 @@ async def run_instance(
114116
operator_resume=operator_resume,
115117
session_group_key=session_group_key,
116118
event_callback=event_callback,
119+
startup_semaphore=startup_semaphore,
117120
timeout_seconds=timeout_seconds,
118121
container_limits=container_limits,
119122
auth_config=auth_config,

src/instance_runner/runner.py

Lines changed: 39 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@ async def run_instance(
8484
operator_resume: bool = False,
8585
session_group_key: Optional[str] = None,
8686
event_callback: Optional[Callable[[Dict[str, Any]], None]] = None,
87+
startup_semaphore: Optional[asyncio.Semaphore] = None,
8788
timeout_seconds: int = 3600,
8889
container_limits: Optional[ContainerLimits] = None,
8990
auth_config: Optional[AuthConfig] = None,
@@ -210,6 +211,7 @@ async def run_instance(
210211
operator_resume=operator_resume,
211212
session_group_key=session_group_key,
212213
event_callback=event_callback,
214+
startup_semaphore=startup_semaphore,
213215
timeout_seconds=timeout_seconds,
214216
container_limits=container_limits,
215217
auth_config=auth_config,
@@ -317,6 +319,7 @@ async def _run_instance_attempt(
317319
operator_resume: bool,
318320
session_group_key: Optional[str],
319321
event_callback: Optional[Callable[[Dict[str, Any]], None]],
322+
startup_semaphore: Optional[asyncio.Semaphore],
320323
timeout_seconds: int,
321324
container_limits: ContainerLimits,
322325
auth_config: Optional[AuthConfig],
@@ -419,20 +422,43 @@ def emit_event(event_type: str, data: Dict[str, Any]) -> None:
419422

420423
emit_event("instance.phase_completed", {"phase": "validation"})
421424

422-
# Phase 2: Workspace Preparation (NEW - happens BEFORE container)
425+
# Phase 2: Workspace Preparation (happens BEFORE container)
423426
logger.info(f"Preparing isolated workspace for branch {base_branch}")
424-
emit_event("instance.workspace_preparing", {"base_branch": base_branch})
425-
426-
logger.info("Calling git_ops.prepare_workspace...")
427-
workspace_dir = await git_ops.prepare_workspace(
428-
repo_path=repo_path,
429-
base_branch=base_branch,
430-
instance_id=instance_id,
431-
run_id=run_id,
432-
strategy_execution_id=strategy_execution_id,
433-
container_name=container_name,
434-
reuse_if_exists=operator_resume,
435-
)
427+
if startup_semaphore is not None:
428+
# Signal that we are waiting on startup slot
429+
try:
430+
emit_event(
431+
"instance.startup_waiting",
432+
{"base_branch": base_branch},
433+
)
434+
except Exception:
435+
pass
436+
async with startup_semaphore:
437+
emit_event(
438+
"instance.workspace_preparing", {"base_branch": base_branch}
439+
)
440+
logger.info("Calling git_ops.prepare_workspace (startup slot acquired)...")
441+
workspace_dir = await git_ops.prepare_workspace(
442+
repo_path=repo_path,
443+
base_branch=base_branch,
444+
instance_id=instance_id,
445+
run_id=run_id,
446+
strategy_execution_id=strategy_execution_id,
447+
container_name=container_name,
448+
reuse_if_exists=operator_resume,
449+
)
450+
else:
451+
emit_event("instance.workspace_preparing", {"base_branch": base_branch})
452+
logger.info("Calling git_ops.prepare_workspace...")
453+
workspace_dir = await git_ops.prepare_workspace(
454+
repo_path=repo_path,
455+
base_branch=base_branch,
456+
instance_id=instance_id,
457+
run_id=run_id,
458+
strategy_execution_id=strategy_execution_id,
459+
container_name=container_name,
460+
reuse_if_exists=operator_resume,
461+
)
436462
logger.info(f"Workspace prepared at: {workspace_dir}")
437463

438464
emit_event(

src/orchestration/orchestrator.py

Lines changed: 34 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ class Orchestrator:
5454
def __init__(
5555
self,
5656
max_parallel_instances: Optional[int] = None,
57+
max_parallel_startup: Optional[int] = None,
5758
state_dir: Path = Path("./pitaya_state"),
5859
logs_dir: Path = Path("./logs"),
5960
container_limits: Optional[ContainerLimits] = None,
@@ -87,6 +88,7 @@ def __init__(
8788
auth_config: Authentication configuration for AI tools
8889
"""
8990
self.max_parallel_instances: Optional[int] = max_parallel_instances
91+
self.max_parallel_startup: Optional[int] = max_parallel_startup
9092
self.state_dir = state_dir
9193
self.logs_dir = logs_dir
9294
self.container_limits = container_limits or ContainerLimits()
@@ -131,6 +133,8 @@ def __init__(
131133

132134
# Resource pool semaphore (initialized after adaptive calc in initialize())
133135
self._resource_pool: asyncio.Semaphore = asyncio.Semaphore(1)
136+
# Separate startup semaphore to throttle expensive workspace prep
137+
self._startup_pool: asyncio.Semaphore = asyncio.Semaphore(1)
134138
# Branch namespace strategy: 'flat' (default) or 'hierarchical'
135139
self.branch_namespace = str(branch_namespace or "flat").lower()
136140
self.allow_overwrite_protected_refs = bool(allow_overwrite_protected_refs)
@@ -206,14 +210,30 @@ async def initialize(self) -> None:
206210
# Clean up orphaned containers from previous runs
207211
await self.cleanup_orphaned_containers()
208212

209-
# Resolve max_parallel_instances without any host CPU/memory calculations.
210-
# Default to 5 if not provided by the caller/CLI.
211-
if self.max_parallel_instances is None:
212-
self.max_parallel_instances = 5
213-
logger.info("Parallelism(default): max_parallel_instances=5")
213+
# Resolve parallelism defaults: CPU-based for total; startup=min(10, total)
214+
if self.max_parallel_instances is None or int(self.max_parallel_instances) <= 0:
215+
try:
216+
import os as _os
217+
218+
cpu = int(_os.cpu_count() or 2)
219+
except Exception:
220+
cpu = 2
221+
self.max_parallel_instances = max(1, cpu)
222+
logger.info(
223+
f"Parallelism(default): max_parallel_instances={self.max_parallel_instances} (cpu-based)"
224+
)
225+
if self.max_parallel_startup is None or int(self.max_parallel_startup) <= 0:
226+
self.max_parallel_startup = max(
227+
1, min(10, int(self.max_parallel_instances or 1))
228+
)
229+
logger.info(
230+
f"Parallelism(default): max_parallel_startup={self.max_parallel_startup} (min(10,total))"
231+
)
214232

215233
# Initialize resource pool semaphore now that parallelism is resolved
216234
self._resource_pool = asyncio.Semaphore(int(self.max_parallel_instances or 1))
235+
# Initialize startup pool (caps concurrent workspace preparations)
236+
self._startup_pool = asyncio.Semaphore(int(self.max_parallel_startup or self.max_parallel_instances or 1))
217237

218238
# Start multiple background executors equal to max_parallel_instances
219239
num_executors = int(self.max_parallel_instances or 1)
@@ -222,7 +242,8 @@ async def initialize(self) -> None:
222242
self._executor_tasks.append(task)
223243

224244
logger.info(
225-
f"Pitaya initialized with {self.max_parallel_instances} max parallel instances and {num_executors} executor tasks"
245+
f"Pitaya initialized with max_parallel_instances={self.max_parallel_instances} "
246+
f"max_parallel_startup={self.max_parallel_startup} and {num_executors} executor tasks"
226247
)
227248
self._initialized = True
228249

@@ -1499,6 +1520,11 @@ def _write_last_active():
14991520
"container_env_preparing",
15001521
"Preparing container env...",
15011522
)
1523+
elif et == "instance.startup_waiting":
1524+
phase, activity = (
1525+
"startup_waiting",
1526+
"Waiting for startup slot...",
1527+
)
15021528
elif et == "instance.container_env_prepared":
15031529
phase, activity = "container_env_prepared", "Container env ready"
15041530
elif et == "instance.container_created":
@@ -1608,6 +1634,7 @@ def _write_last_active():
16081634
),
16091635
operator_resume=op_resume,
16101636
event_callback=event_callback,
1637+
startup_semaphore=self._startup_pool,
16111638
timeout_seconds=self.runner_timeout_seconds,
16121639
container_limits=ContainerLimits(
16131640
cpu_count=eff_cpu, memory_gb=eff_mem, memory_swap_gb=eff_mem
@@ -2555,6 +2582,7 @@ async def _run_instance_from_saved_state(self, instance_info) -> InstanceResult:
25552582
instance_id=instance_info.instance_id,
25562583
),
25572584
container_name=instance_info.container_name,
2585+
startup_semaphore=self._startup_pool,
25582586
container_limits=ContainerLimits(
25592587
cpu_count=eff_cpu, memory_gb=eff_mem, memory_swap_gb=eff_mem
25602588
),

0 commit comments

Comments
 (0)