Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 71 additions & 7 deletions src/octopal/runtime/housekeeping.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@
import os
import shutil
import stat
import subprocess
import time
import uuid
from contextlib import suppress
from dataclasses import dataclass
from datetime import UTC, datetime, timedelta
from pathlib import Path
Expand Down Expand Up @@ -39,16 +41,15 @@ def remove_tree_with_retries(
*,
retries: int = 6,
base_delay_seconds: float = 0.25,
docker_cleanup_image: str | None = None,
) -> bool:
"""Remove a directory tree with Windows-friendly retry behavior."""
if not path.exists():
return True

def _onerror(func, value, exc_info) -> None:
try:
with suppress(OSError):
os.chmod(value, stat.S_IWRITE | stat.S_IREAD | stat.S_IEXEC)
except OSError:
pass
func(value)

for attempt in range(1, max(1, retries) + 1):
Expand All @@ -59,20 +60,78 @@ def _onerror(func, value, exc_info) -> None:
return True
except PermissionError:
if attempt == retries:
return False
break
time.sleep(base_delay_seconds * attempt)
except OSError:
if attempt == retries:
return False
break
time.sleep(base_delay_seconds * attempt)

if docker_cleanup_image and _remove_tree_contents_with_docker(
path,
image=docker_cleanup_image,
):
try:
shutil.rmtree(path, onerror=_onerror)
return True
except FileNotFoundError:
return True
except OSError:
pass

return not path.exists()


def _remove_tree_contents_with_docker(path: Path, *, image: str) -> bool:
"""Best-effort cleanup for root-owned files Docker created in a bind mount."""
if os.name == "nt":
return False
image = str(image or "").strip()
if not image or not path.exists():
return False
docker = shutil.which("docker")
if not docker:
return False

try:
resolved = path.resolve(strict=True)
except OSError:
return False

command = (
"import pathlib, shutil; "
"p=pathlib.Path('/cleanup'); "
"[shutil.rmtree(x) if x.is_dir() and not x.is_symlink() else x.unlink() "
"for x in p.iterdir()]"
)
try:
completed = subprocess.run(
[
docker,
"run",
"--rm",
"-v",
f"{resolved}:/cleanup",
image,
"python",
"-c",
command,
],
check=False,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
timeout=60,
)
except Exception:
return False
return completed.returncode == 0


def cleanup_ephemeral_worker_dirs(
workspace_dir: Path,
*,
retention_minutes: int,
docker_cleanup_image: str | None = None,
) -> WorkerDirCleanupResult:
result = WorkerDirCleanupResult()
workers_dir = workspace_dir / "workers"
Expand All @@ -97,7 +156,10 @@ def cleanup_ephemeral_worker_dirs(
if modified >= cutoff:
continue

if remove_tree_with_retries(worker_dir):
if remove_tree_with_retries(
worker_dir,
docker_cleanup_image=docker_cleanup_image,
):
result.deleted_dirs += 1
else:
result.errors += 1
Expand All @@ -123,7 +185,9 @@ def cleanup_workspace_tmp(workspace_dir: Path, *, retention_hours: int) -> TmpCl
result.errors += 1

# Remove empty directories deepest-first.
for directory in sorted([p for p in tmp_dir.rglob("*") if p.is_dir()], key=lambda p: len(p.parts), reverse=True):
for directory in sorted(
[p for p in tmp_dir.rglob("*") if p.is_dir()], key=lambda p: len(p.parts), reverse=True
):
try:
next(directory.iterdir())
except StopIteration:
Expand Down
40 changes: 21 additions & 19 deletions src/octopal/runtime/octo/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,19 +65,19 @@
_complete_text,
build_forced_worker_followup,
normalize_plain_text,
route_internal_maintenance,
route_heartbeat,
route_internal_maintenance,
route_or_reply,
route_scheduler_tick,
route_scheduled_octo_control,
route_scheduler_tick,
route_worker_results_back_to_octo,
should_force_worker_followup,
)
from octopal.runtime.policy.engine import PolicyEngine
from octopal.runtime.scheduler.service import (
SCHEDULED_TASK_BLOCKED_REASON_KEY,
SCHEDULED_TASK_SUGGESTED_EXECUTION_MODE_KEY,
SCHEDULED_TASK_BLOCKED_UNTIL_KEY,
SCHEDULED_TASK_SUGGESTED_EXECUTION_MODE_KEY,
SchedulerService,
normalize_notify_user_policy,
parse_scheduled_task_blocked_until,
Expand Down Expand Up @@ -1655,10 +1655,11 @@ async def _periodic_cleanup(self, interval_seconds: int):

cfg = self._housekeeping_cfg or {}
worker_result = await asyncio.to_thread(
cleanup_ephemeral_worker_dirs,
self.canon.workspace_dir,
retention_minutes=int(cfg.get("worker_dir_retention_minutes", 15)),
)
cleanup_ephemeral_worker_dirs,
self.canon.workspace_dir,
retention_minutes=int(cfg.get("worker_dir_retention_minutes", 15)),
docker_cleanup_image=getattr(self.runtime.launcher, "image", None),
)
if worker_result.deleted_dirs or worker_result.errors:
logger.info(
"Ephemeral worker dir cleanup complete",
Expand Down Expand Up @@ -1753,9 +1754,7 @@ def _publish_scheduler_metrics(
dispatch_summary.get("rejected_by_policy") or 0
)
payload["last_dispatch_errors"] = int(dispatch_summary.get("errors") or 0)
payload["last_policy_reasons"] = dict(
dispatch_summary.get("policy_reasons") or {}
)
payload["last_policy_reasons"] = dict(dispatch_summary.get("policy_reasons") or {})
update_component_gauges("scheduler", payload)

async def _run_scheduler_tick_once(self, *, chat_id: int = 0, max_tasks: int = 10) -> None:
Expand Down Expand Up @@ -1797,9 +1796,7 @@ async def _run_scheduler_tick_once(self, *, chat_id: int = 0, max_tasks: int = 1
"dispatch_rejected_by_policy": int(
dispatch_summary.get("rejected_by_policy") or 0
),
"dispatch_policy_reasons": dict(
dispatch_summary.get("policy_reasons") or {}
),
"dispatch_policy_reasons": dict(dispatch_summary.get("policy_reasons") or {}),
"dispatch_errors": int(dispatch_summary.get("errors") or 0),
}
)
Expand All @@ -1811,9 +1808,9 @@ async def _run_scheduler_tick_once(self, *, chat_id: int = 0, max_tasks: int = 1
counters["completed_total"] = int(counters.get("completed_total", 0) or 0) + int(
dispatch_summary.get("completed") or 0
)
counters["duplicates_total"] = int(
counters.get("duplicates_total", 0) or 0
) + int(dispatch_summary.get("duplicates") or 0)
counters["duplicates_total"] = int(counters.get("duplicates_total", 0) or 0) + int(
dispatch_summary.get("duplicates") or 0
)
counters["rejected_by_policy_total"] = int(
counters.get("rejected_by_policy_total", 0) or 0
) + int(dispatch_summary.get("rejected_by_policy") or 0)
Expand Down Expand Up @@ -1940,7 +1937,9 @@ def _get_persisted_scheduled_octo_control_backoff(
reason=None,
)
return None
reason = str(metadata.get(SCHEDULED_TASK_BLOCKED_REASON_KEY) or "").strip() or "blocked_by_route"
reason = (
str(metadata.get(SCHEDULED_TASK_BLOCKED_REASON_KEY) or "").strip() or "blocked_by_route"
)
return remaining, reason

def _update_scheduled_octo_control_backoff_metadata(
Expand All @@ -1958,7 +1957,9 @@ def _update_scheduled_octo_control_backoff_metadata(
task_id = str(task.get("id") or "").strip()
if not task_id:
return
metadata = dict(task.get("metadata") or {}) if isinstance(task.get("metadata"), dict) else {}
metadata = (
dict(task.get("metadata") or {}) if isinstance(task.get("metadata"), dict) else {}
)
if blocked_until is None:
metadata.pop(SCHEDULED_TASK_BLOCKED_UNTIL_KEY, None)
else:
Expand Down Expand Up @@ -2123,7 +2124,8 @@ async def _run_scheduled_octo_control_task_once(
self._set_scheduled_octo_control_backoff(task_id, reason="blocked_by_route")
self._update_scheduled_octo_control_backoff_metadata(
task,
blocked_until=utc_now() + timedelta(seconds=_SCHEDULED_OCTO_CONTROL_BACKOFF_SECONDS),
blocked_until=utc_now()
+ timedelta(seconds=_SCHEDULED_OCTO_CONTROL_BACKOFF_SECONDS),
reason="blocked_by_route",
)
logger.warning(
Expand Down
57 changes: 45 additions & 12 deletions src/octopal/runtime/workers/launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import subprocess
import sys
from dataclasses import dataclass
from pathlib import Path
from typing import Protocol


Expand Down Expand Up @@ -56,7 +57,6 @@ async def launch(
env: dict[str, str],
) -> asyncio.subprocess.Process:
import json
from pathlib import Path

worker_id = os.path.basename(cwd.rstrip(os.sep))
container_ws = self.container_workspace
Expand Down Expand Up @@ -88,6 +88,11 @@ async def launch(
seen_mounts: set[tuple[str, str]] = set()
host_skills_dir = host_ws_path / "skills"
host_skills_dir.mkdir(parents=True, exist_ok=True)
_prepare_worker_mount_target(
host_worker_dir,
rel_path="skills",
source_path=host_skills_dir,
)
cmd_args.extend(["-v", f"{host_skills_dir}:{container_worker_dir}/skills"])
seen_mounts.add((str(host_skills_dir), f"{container_worker_dir}/skills"))
for rel_path in allowed_paths or []:
Expand All @@ -100,6 +105,11 @@ async def launch(
continue
if not host_path.exists():
continue
_prepare_worker_mount_target(
host_worker_dir,
rel_path=rel_path,
source_path=host_path,
)
mount_targets = (
f"{container_ws}/{rel_path}",
f"{container_worker_dir}/{rel_path}",
Expand All @@ -112,17 +122,19 @@ async def launch(
seen_mounts.add(mount_key)

spec_in_container = f"{container_ws}/workers/{worker_id}/spec.json"
cmd_args.extend([
"-w",
f"{container_ws}/workers/{worker_id}",
"-e",
f"OCTOPAL_WORKER_SPEC={spec_in_container}",
self.image,
"python",
"-m",
self.entrypoint_module,
spec_in_container,
])
cmd_args.extend(
[
"-w",
f"{container_ws}/workers/{worker_id}",
"-e",
f"OCTOPAL_WORKER_SPEC={spec_in_container}",
self.image,
"python",
"-m",
self.entrypoint_module,
spec_in_container,
]
)

popen_kwargs = _worker_subprocess_kwargs()
return await asyncio.create_subprocess_exec(
Expand Down Expand Up @@ -160,6 +172,27 @@ def _filter_container_env(
return filtered


def _prepare_worker_mount_target(
host_worker_dir: Path,
*,
rel_path: str,
source_path: Path,
) -> None:
try:
target = (host_worker_dir / rel_path).resolve()
target.relative_to(host_worker_dir)
except (OSError, ValueError):
return

if source_path.is_dir():
target.mkdir(parents=True, exist_ok=True)
return

target.parent.mkdir(parents=True, exist_ok=True)
if not target.exists():
target.touch()


def _host_user_spec() -> str | None:
if os.name == "nt":
return None
Expand Down
13 changes: 6 additions & 7 deletions src/octopal/runtime/workers/runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,9 +179,7 @@ async def run_task(
# behavior only when we cannot map requested MCP tools to a
# concrete server set.
ensure_server_ids = resolved_server_ids or None
await self.mcp_manager.ensure_configured_servers_connected(
ensure_server_ids
)
await self.mcp_manager.ensure_configured_servers_connected(ensure_server_ids)
except Exception:
logger.warning(
"Failed to ensure configured MCP servers before worker launch",
Expand Down Expand Up @@ -1310,10 +1308,11 @@ async def _cleanup_worker_dir(self, worker_dir: Path) -> None:

removed = await asyncio.to_thread(
remove_tree_with_retries,
worker_dir,
retries=8,
base_delay_seconds=0.25,
)
worker_dir,
retries=8,
base_delay_seconds=0.25,
docker_cleanup_image=getattr(self.launcher, "image", None),
)
if removed:
logger.info("WorkerRuntime cleaned up worker dir: %s", worker_dir)
return
Expand Down
Loading