Skip to content

Commit 38de91a

Browse files
authored
feat: remove debug mode and remove auto parallelism cap calculation (#41)
1 parent 6e430bc commit 38de91a

File tree

4 files changed

+41
-182
lines changed

4 files changed

+41
-182
lines changed

src/cli.py

Lines changed: 26 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -360,7 +360,6 @@ def create_parser(cls) -> argparse.ArgumentParser:
360360

361361
# Diagnostics group
362362
g_diag = parser.add_argument_group("Diagnostics")
363-
g_diag.add_argument("--debug", action="store_true", help="Enable debug logging")
364363
g_diag.add_argument(
365364
"--yes", action="store_true", help="Assume 'yes' for prompts"
366365
)
@@ -1034,8 +1033,7 @@ async def run_list_runs(self, args: argparse.Namespace) -> int:
10341033

10351034
except (OSError, json.JSONDecodeError) as e:
10361035
self.console.print(f"[red]Error listing runs: {e}[/red]")
1037-
if args.debug:
1038-
self.console.print_exception()
1036+
self.console.print_exception()
10391037
return 1
10401038

10411039
async def run_show_run(self, args: argparse.Namespace) -> int:
@@ -1140,8 +1138,7 @@ async def run_show_run(self, args: argparse.Namespace) -> int:
11401138

11411139
except (OSError, json.JSONDecodeError) as e:
11421140
self.console.print(f"[red]Error showing run: {e}[/red]")
1143-
if args.debug:
1144-
self.console.print_exception()
1141+
self.console.print_exception()
11451142
return 1
11461143

11471144
async def run_prune(self, args: argparse.Namespace) -> int:
@@ -1238,8 +1235,7 @@ async def run_prune(self, args: argparse.Namespace) -> int:
12381235
return 0
12391236
except Exception as e:
12401237
self.console.print(f"[red]Prune failed: {e}[/red]")
1241-
if args.debug:
1242-
self.console.print_exception()
1238+
self.console.print_exception()
12431239
return 1
12441240

12451241
def _display_detailed_results(
@@ -1481,7 +1477,7 @@ async def run_orchestrator(self, args: argparse.Namespace) -> int:
14811477
setup_structured_logging(
14821478
logs_dir=args.logs_dir,
14831479
run_id=run_id,
1484-
debug=args.debug,
1480+
debug=True,
14851481
quiet=args.no_tui and args.output == "quiet",
14861482
no_tui=args.no_tui,
14871483
)
@@ -1638,8 +1634,7 @@ async def run_orchestrator(self, args: argparse.Namespace) -> int:
16381634
if args.output:
16391635
cli_config["output"] = args.output
16401636
# Server extension support removed
1641-
if args.debug:
1642-
cli_config.setdefault("logging", {})["level"] = "DEBUG"
1637+
# Always use full logging; no debug toggle
16431638
# Add CLI auth args
16441639
if hasattr(args, "oauth_token") and args.oauth_token:
16451640
cli_config.setdefault("runner", {})["oauth_token"] = args.oauth_token
@@ -1829,35 +1824,30 @@ def _red(k, v):
18291824
"logs_dir"
18301825
) or full_config.get("logs_dir", Path("./logs"))
18311826

1832-
# Log configuration sources (only in debug mode)
1833-
if full_config.get("debug") or args.debug:
1834-
self.console.print("[dim]Configuration loaded from:[/dim]")
1835-
if cli_config:
1836-
self.console.print(" - Command line arguments")
1837-
# No Pitaya-specific environment variables are used
1838-
if dotenv_config:
1839-
self.console.print(" - .env file")
1840-
if config:
1841-
self.console.print(f" - Config file: {args.config or 'pitaya.yaml'}")
1842-
self.console.print(" - Built-in defaults")
1827+
# Log configuration sources unconditionally
1828+
self.console.print("[dim]Configuration loaded from:[/dim]")
1829+
if cli_config:
1830+
self.console.print(" - Command line arguments")
1831+
if dotenv_config:
1832+
self.console.print(" - .env file")
1833+
if config:
1834+
self.console.print(f" - Config file: {args.config or 'pitaya.yaml'}")
1835+
self.console.print(" - Built-in defaults")
18431836

18441837
# Respect global session volume consent by setting env for runner
18451838
allow_global_session = bool(getattr(args, "allow_global_session_volume", False))
18461839

1847-
# Resolve 'auto' for max_parallel per spec
1848-
if isinstance(max_parallel, str) and max_parallel.lower() == "auto":
1849-
try:
1850-
host_cpu = max(1, os.cpu_count() or 1)
1851-
per_container = max(1, int(container_limits.cpu_count))
1852-
computed = max(2, min(20, host_cpu // per_container))
1853-
max_parallel_val = computed
1854-
except Exception:
1855-
max_parallel_val = 20
1856-
else:
1857-
try:
1840+
# Resolve max_parallel without host resource calculations
1841+
try:
1842+
if isinstance(max_parallel, str):
1843+
# Accept numeric strings; treat non-numeric (e.g., 'auto') as default 5
18581844
max_parallel_val = int(max_parallel)
1859-
except Exception:
1860-
max_parallel_val = 20
1845+
elif isinstance(max_parallel, int):
1846+
max_parallel_val = max_parallel
1847+
else:
1848+
max_parallel_val = 5
1849+
except Exception:
1850+
max_parallel_val = 5
18611851

18621852
# Proxy automatic egress defaults removed
18631853

@@ -2221,8 +2211,7 @@ def emit_json(ev: dict):
22212211
return 2
22222212
except (OrchestratorError, DockerError, ValidationError) as e:
22232213
self.console.print(f"[red]Error: {e}[/red]")
2224-
if args.debug:
2225-
self.console.print_exception()
2214+
self.console.print_exception()
22262215
# Shutdown orchestrator
22272216
await self.orchestrator.shutdown()
22282217
return 1
@@ -2653,8 +2642,7 @@ async def monitor_shutdown():
26532642
return 2
26542643
except (OrchestratorError, DockerError, ValidationError) as e:
26552644
self.console.print(f"[red]Error: {e}[/red]")
2656-
if args.debug:
2657-
self.console.print_exception()
2645+
self.console.print_exception()
26582646
# Shutdown orchestrator
26592647
await self.orchestrator.shutdown()
26602648
return 1

src/config.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -245,7 +245,7 @@ def get_default_config() -> Dict[str, Any]:
245245
"output": "tui",
246246
"state_dir": Path("./pitaya_state"),
247247
"logs_dir": Path("./logs"),
248-
"debug": False,
248+
# Debug mode removed; logs are verbose by default
249249
# Normative defaults per spec §6.1.1 (subset applied where supported)
250250
"import_policy": "auto", # auto|never|always
251251
"import_conflict_policy": "fail", # fail|overwrite|suffix
@@ -262,8 +262,8 @@ def get_default_config() -> Dict[str, Any]:
262262
"tmpfs_size_mb": 512,
263263
},
264264
"orchestration": {
265-
# Spec default: auto => max(2, min(20, floor(host_cpu / runner.container_cpu)))
266-
"max_parallel_instances": "auto",
265+
# Default concurrency: 5; use --max-parallel to override
266+
"max_parallel_instances": 5,
267267
# Branch namespace is hierarchical by default
268268
# Format: orc/<strategy>/<run_id>/k<short8>
269269
"branch_namespace": "hierarchical",

src/orchestration/orchestrator.py

Lines changed: 10 additions & 136 deletions
Original file line numberDiff line numberDiff line change
@@ -207,68 +207,17 @@ async def initialize(self) -> None:
207207
# Clean up orphaned containers from previous runs
208208
await self.cleanup_orphaned_containers()
209209

210-
# Resolve max_parallel_instances (simple adaptive default)
211-
try:
212-
import os
213-
import math
214-
215-
host_cpu = max(1, os.cpu_count() or 1)
216-
# cgroup-aware effective CPUs (best-effort)
217-
try:
218-
import platform
219-
from pathlib import Path as _P
220-
221-
if platform.system() == "Linux":
222-
cpu_max = _P("/sys/fs/cgroup/cpu.max")
223-
if cpu_max.exists():
224-
parts = cpu_max.read_text().strip().split()
225-
if len(parts) >= 2 and parts[0] != "max":
226-
quota = float(parts[0])
227-
period = float(parts[1]) or 100000.0
228-
eff = int(math.ceil(quota / period))
229-
if eff > 0:
230-
host_cpu = eff
231-
else:
232-
q = _P("/sys/fs/cgroup/cpu/cpu.cfs_quota_us")
233-
p = _P("/sys/fs/cgroup/cpu/cpu.cfs_period_us")
234-
if q.exists() and p.exists():
235-
quota = float(q.read_text().strip() or "0")
236-
period = float(p.read_text().strip() or "100000")
237-
if quota > 0 and period > 0:
238-
eff = int(math.ceil(quota / period))
239-
if eff > 0:
240-
host_cpu = eff
241-
except Exception:
242-
pass
243-
per_container = max(1, int(self.container_limits.cpu_count))
244-
adaptive = max(2, min(20, host_cpu // per_container))
245-
if self.max_parallel_instances is None:
246-
self.max_parallel_instances = adaptive
247-
logger.info(
248-
f"Parallelism(auto): host_cpu={host_cpu}, per_container={per_container} -> max_parallel_instances={adaptive}"
249-
)
250-
# Oversubscription warning only when not explicit
251-
try:
252-
if (
253-
int(self.max_parallel_instances) * per_container
254-
) > host_cpu and not getattr(self, "_explicit_max_parallel", False):
255-
logger.warning(
256-
f"Configured parallelism may oversubscribe CPU: max_parallel_instances={self.max_parallel_instances}, per_container_cpu={per_container}, host_cpu={host_cpu}"
257-
)
258-
except Exception:
259-
pass
260-
except Exception as e:
261-
logger.debug(f"Adaptive parallelism calc failed: {e}")
210+
# Resolve max_parallel_instances without any host CPU/memory calculations.
211+
# Default to 5 if not provided by the caller/CLI.
212+
if self.max_parallel_instances is None:
213+
self.max_parallel_instances = 5
214+
logger.info("Parallelism(default): max_parallel_instances=5")
262215

263216
# Initialize resource pool semaphore now that parallelism is resolved
264217
self._resource_pool = asyncio.Semaphore(int(self.max_parallel_instances or 1))
265218

266-
# Start multiple background executors for true parallel execution
267-
# If explicit max-parallel set, honor fully; otherwise cap at 10
268-
if getattr(self, "_explicit_max_parallel", False):
269-
num_executors = int(self.max_parallel_instances or 1)
270-
else:
271-
num_executors = min(int(self.max_parallel_instances or 1), 10)
219+
# Start multiple background executors equal to max_parallel_instances
220+
num_executors = int(self.max_parallel_instances or 1)
272221
for i in range(num_executors):
273222
task = asyncio.create_task(self._instance_executor())
274223
self._executor_tasks.append(task)
@@ -1321,86 +1270,11 @@ async def _instance_executor(self) -> None:
13211270
break
13221271

13231272
async def _admission_wait(self, cpu_need: int, mem_need_gb: int) -> None:
1324-
"""Wait until CPU+memory tokens available and disk guard healthy."""
1325-
import shutil
1326-
import time as _time
1327-
1328-
repo_path = getattr(self, "repo_path", Path.cwd())
1329-
pack_dir = repo_path / ".git" / "objects" / "pack"
1330-
start = _time.monotonic()
1331-
while not self._shutdown:
1332-
# Disk free space guard
1333-
try:
1334-
stat = shutil.disk_usage(str(repo_path))
1335-
free_gb = stat.free / (1024**3)
1336-
except Exception:
1337-
free_gb = self._disk_min_free_gb # assume OK if unknown
1338-
# Pack growth slope (MiB/min), best-effort
1339-
try:
1340-
size_bytes = 0
1341-
if pack_dir.exists():
1342-
for p in pack_dir.iterdir():
1343-
if p.is_file() and (p.suffix in (".pack", ".idx")):
1344-
size_bytes += p.stat().st_size
1345-
now = _time.time()
1346-
self._pack_series.append((now, size_bytes))
1347-
slope = 0.0
1348-
# compute against oldest point >= 5min ago if available
1349-
oldest = None
1350-
for ts, sz in list(self._pack_series):
1351-
if now - ts >= 300:
1352-
oldest = (ts, sz)
1353-
break
1354-
if oldest:
1355-
dt_min = max(0.001, (now - oldest[0]) / 60.0)
1356-
slope = max(
1357-
0.0, (size_bytes - oldest[1]) / (1024.0 * 1024.0) / dt_min
1358-
)
1359-
except Exception:
1360-
slope = 0.0
1361-
1362-
async with self._admission_lock:
1363-
if getattr(self, "_explicit_max_parallel", False):
1364-
# Honor operator's explicit parallel setting: bypass CPU/memory guard; keep disk guard
1365-
disk_ok = (free_gb >= self._disk_min_free_gb) and (
1366-
slope <= self._pack_max_slope_mib_per_min
1367-
)
1368-
cpu_ok = True
1369-
mem_ok = True
1370-
else:
1371-
cpu_ok = (self._cpu_in_use + cpu_need) <= self._host_cpu
1372-
mem_ok = (self._mem_in_use_gb + mem_need_gb) <= int(
1373-
self._host_mem_gb * self._mem_guard_pct
1374-
)
1375-
disk_ok = (free_gb >= self._disk_min_free_gb) and (
1376-
slope <= self._pack_max_slope_mib_per_min
1377-
)
1378-
1379-
if cpu_ok and mem_ok and disk_ok:
1380-
self._cpu_in_use += cpu_need
1381-
self._mem_in_use_gb += mem_need_gb
1382-
return
1383-
1384-
# Log periodically while waiting
1385-
try:
1386-
if (int((_time.monotonic() - start)) % 10) == 0:
1387-
logger.info(
1388-
f"admission.wait: cpu_ok={cpu_ok} mem_ok={mem_ok} disk_ok={disk_ok} free_gb={free_gb:.1f} slope_mib_per_min={slope:.1f} in_use(cpu={self._cpu_in_use}/{self._host_cpu}, mem={self._mem_in_use_gb}/{int(self._host_mem_gb*self._mem_guard_pct)})"
1389-
)
1390-
except Exception:
1391-
pass
1392-
# Trigger on-run GC/backpressure attempt when disk is the bottleneck
1393-
if not disk_ok:
1394-
try:
1395-
await self._attempt_on_run_gc()
1396-
except Exception as e:
1397-
logger.debug(f"on-run GC attempt failed: {e}")
1398-
await asyncio.sleep(0.5)
1273+
"""No-op admission: do not gate on CPU/memory/disk; honor only max_parallel semaphore."""
1274+
return
13991275

14001276
async def _admission_release(self, cpu: int, mem_gb: int) -> None:
1401-
async with self._admission_lock:
1402-
self._cpu_in_use = max(0, self._cpu_in_use - cpu)
1403-
self._mem_in_use_gb = max(0, self._mem_in_use_gb - mem_gb)
1277+
return
14041278

14051279
async def _attempt_on_run_gc(self) -> None:
14061280
"""Best-effort GC to free disk by removing oldest failed workspaces for this run.

src/tui/cli.py

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -92,9 +92,7 @@ def create_parser(cls) -> argparse.ArgumentParser:
9292
"--event-types", nargs="+", help="Filter by event type(s)"
9393
)
9494

95-
# Diagnostics
96-
g_diag = parser.add_argument_group("Diagnostics")
97-
g_diag.add_argument("--debug", action="store_true", help="Enable debug logging")
95+
# Diagnostics (debug mode removed; verbose by default)
9896

9997
return parser
10098

@@ -137,8 +135,7 @@ async def run(self, args: argparse.Namespace) -> int:
137135
return 2
138136
except (OSError, IOError) as e:
139137
self.console.print(f"[red]Error: {e}[/red]")
140-
if args.debug:
141-
self.console.print_exception()
138+
self.console.print_exception()
142139
return 1
143140

144141
# Connected modes removed

0 commit comments

Comments
 (0)