Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
159 changes: 156 additions & 3 deletions dream-server/bin/dream-host-agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
})

INSTALL_DIR: Path = Path()
DATA_DIR: Path = Path()
AGENT_API_KEY: str = ""
GPU_BACKEND: str = "nvidia"
TIER: str = "1"
Expand Down Expand Up @@ -137,6 +138,40 @@ def _precreate_data_dirs(service_id: str):
logger.warning("Failed to pre-create %s: %s", dir_path, e)


def _resolve_setup_hook(ext_dir: Path) -> Path | None:
"""Read manifest to find setup_hook path. Returns None if no hook defined."""
manifest_path = None
for name in ("manifest.yaml", "manifest.yml"):
candidate = ext_dir / name
if candidate.exists():
manifest_path = candidate
break
if manifest_path is None:
return None
try:
import yaml
manifest = yaml.safe_load(manifest_path.read_text(encoding="utf-8"))
except (ImportError, OSError):
return None
if not isinstance(manifest, dict):
return None
service_def = manifest.get("service", {})
if not isinstance(service_def, dict):
return None
setup_hook = service_def.get("setup_hook", "")
if not isinstance(setup_hook, str) or not setup_hook:
return None
hook_path = (ext_dir / setup_hook).resolve()
try:
hook_path.relative_to(ext_dir.resolve())
except ValueError:
logger.warning("Path traversal attempt in setup_hook for %s: %s", ext_dir.name, setup_hook)
return None
if not hook_path.is_file():
return None
return hook_path


def docker_compose_action(service_id: str, action: str) -> tuple:
flags = resolve_compose_flags()
if action == "start":
Expand Down Expand Up @@ -175,6 +210,40 @@ def _iso_now() -> str:
return datetime.now(timezone.utc).isoformat()


_BEARER_RE = re.compile(r"Bearer\s+[A-Za-z0-9._\-=+/]+", re.IGNORECASE)


def _write_progress(service_id: str, status: str, phase_label: str = "",
error: str | None = None) -> None:
"""Atomically write install progress file."""
progress_dir = DATA_DIR / "extension-progress"
progress_dir.mkdir(parents=True, exist_ok=True)
progress_file = progress_dir / f"{service_id}.json"
tmp_file = progress_file.with_suffix(".json.tmp")

# Preserve started_at from existing file
started_at = _iso_now()
if progress_file.exists():
try:
existing = json.loads(progress_file.read_text(encoding="utf-8"))
started_at = existing.get("started_at", started_at)
except (json.JSONDecodeError, OSError):
pass

sanitized_error = _BEARER_RE.sub("Bearer [REDACTED]", error) if error else None

data = {
"service_id": service_id,
"status": status,
"phase_label": phase_label,
"error": sanitized_error,
"started_at": started_at,
"updated_at": _iso_now(),
}
tmp_file.write_text(json.dumps(data), encoding="utf-8")
os.rename(str(tmp_file), str(progress_file))


def json_response(handler, code: int, body: dict):
payload = json.dumps(body).encode("utf-8")
handler.send_response(code)
Expand Down Expand Up @@ -336,6 +405,8 @@ def do_POST(self):
self._handle_extension(action)
elif self.path == "/v1/extension/logs":
self._handle_logs()
elif self.path == "/v1/extension/install":
self._handle_install()
elif self.path == "/v1/extension/setup-hook":
self._handle_setup_hook()
elif self.path == "/v1/service/logs":
Expand Down Expand Up @@ -545,13 +616,95 @@ def _handle_setup_hook(self):
logger.info("setup_hook completed for %s", service_id)
json_response(self, 200, {"status": "ok", "service_id": service_id})

def _handle_install(self):
"""Combined install: setup_hook → pull → start with progress tracking."""
if not check_auth(self):
return
body = read_json_body(self)
if body is None:
return
service_id = validate_service_id(self, body)
if service_id is None:
return
run_setup_hook = body.get("run_setup_hook", False)

lock = _service_locks[service_id]
if not lock.acquire(blocking=False):
json_response(self, 409, {"error": f"Operation in progress for {service_id}"})
return

def _run_install():
try:
flags = resolve_compose_flags()

# Step 1: Setup hook (if requested)
if run_setup_hook:
_write_progress(service_id, "setup_hook", "Running setup...")
ext_dir = USER_EXTENSIONS_DIR / service_id
hook_path = _resolve_setup_hook(ext_dir)
if hook_path:
result = subprocess.run(
["bash", str(hook_path), str(INSTALL_DIR), GPU_BACKEND],
cwd=str(ext_dir),
capture_output=True, text=True,
timeout=SUBPROCESS_TIMEOUT_START,
)
if result.returncode != 0:
_write_progress(service_id, "error", "Setup failed",
error=result.stderr[:500])
return

# Step 2: Pull (best-effort — failure is non-fatal if cached image exists)
_write_progress(service_id, "pulling", "Downloading image...")
pull_result = subprocess.run(
["docker", "compose"] + flags + ["pull", service_id],
cwd=str(INSTALL_DIR), capture_output=True, text=True,
timeout=SUBPROCESS_TIMEOUT_START,
)
if pull_result.returncode != 0:
logger.warning("Pull failed for %s (rc=%d), proceeding to start: %s",
service_id, pull_result.returncode, pull_result.stderr[:200])

# Step 3: Start
_write_progress(service_id, "starting", "Starting container...")
_precreate_data_dirs(service_id)
start_result = subprocess.run(
["docker", "compose"] + flags + ["up", "-d", "--no-deps", service_id],
cwd=str(INSTALL_DIR), capture_output=True, text=True,
timeout=SUBPROCESS_TIMEOUT_START,
)
if start_result.returncode != 0:
_write_progress(service_id, "error", "Installation failed",
error=start_result.stderr[:500])
return

# Step 4: Success
_write_progress(service_id, "started", "Service started")

except subprocess.TimeoutExpired:
_write_progress(service_id, "error", "Installation failed",
error=f"timed out ({SUBPROCESS_TIMEOUT_START}s)")
except (RuntimeError, OSError, subprocess.SubprocessError) as exc:
logger.exception("Install failed for %s", service_id)
_write_progress(service_id, "error", "Installation failed",
error=str(exc)[:500])
finally:
lock.release()

try:
json_response(self, 202, {"status": "accepted", "service_id": service_id, "action": "install"})
threading.Thread(target=_run_install, daemon=True).start()
except Exception:
lock.release()
raise


class ThreadedHTTPServer(ThreadingMixIn, HTTPServer):
daemon_threads = True


def main():
global INSTALL_DIR, AGENT_API_KEY, GPU_BACKEND, TIER, CORE_SERVICE_IDS, USER_EXTENSIONS_DIR
global INSTALL_DIR, DATA_DIR, AGENT_API_KEY, GPU_BACKEND, TIER, CORE_SERVICE_IDS, USER_EXTENSIONS_DIR

parser = argparse.ArgumentParser(description="DreamServer Host Agent")
parser.add_argument("--port", type=int, default=7710, help="Listen port (default: 7710)")
Expand Down Expand Up @@ -583,10 +736,10 @@ def main():
GPU_BACKEND = env.get("GPU_BACKEND", "nvidia")
TIER = env.get("TIER", "1")

data_dir = Path(env.get("DREAM_DATA_DIR", str(INSTALL_DIR / "data")))
DATA_DIR = Path(env.get("DREAM_DATA_DIR", str(INSTALL_DIR / "data")))
USER_EXTENSIONS_DIR = Path(env.get(
"DREAM_USER_EXTENSIONS_DIR",
str(data_dir / "user-extensions"),
str(DATA_DIR / "user-extensions"),
))

port = args.port
Expand Down
2 changes: 1 addition & 1 deletion dream-server/extensions/services/dashboard-api/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# Copy application
COPY main.py config.py models.py security.py gpu.py helpers.py agent_monitor.py ./
COPY main.py config.py models.py security.py gpu.py helpers.py agent_monitor.py user_extensions.py ./
COPY routers/ routers/

# Non-root user
Expand Down
Loading
Loading