Skip to content
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -200,3 +200,7 @@ assets/wheels/vllm*.whl
# DCP artifacts
forge_dcp_tmp/
demo_top_down.md


# enroot / sqsh
*.sqsh
13 changes: 12 additions & 1 deletion src/forge/actors/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,14 @@
# This source code is licensed under the BSD-style license found in the
# LICENSE file in the root directory of this source tree.

__all__ = ["Policy", "PolicyRouter", "RLTrainer", "ReplayBuffer", "TitanRefModel"]
__all__ = [
"Policy",
"PolicyRouter",
"RLTrainer",
"ReplayBuffer",
"TitanRefModel",
"SandboxedPythonCoder",
]


def __getattr__(name):
Expand All @@ -28,5 +35,9 @@ def __getattr__(name):
from .reference_model import ReferenceModel

return ReferenceModel
elif name == "SandboxedPythonCoder":
from .coder import SandboxedPythonCoder

return SandboxedPythonCoder
else:
raise AttributeError(f"module {__name__} has no attribute {name}")
150 changes: 150 additions & 0 deletions src/forge/actors/coder.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the BSD-style license found in the
# LICENSE file in the root directory of this source tree.

import logging
import os
import subprocess
import tempfile
from pathlib import Path

from monarch.actor import endpoint

from forge.controller import ForgeActor

logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)


class SandboxedPythonCoder(ForgeActor):
"""A sandboxed code execution environment using enroot containers.
This is a proof of concept of using enroot to provided a sandboxed
environment for executing Python code using NVIDIA's enroot technology.
It automatically manages the entire container lifecycle including image
import, container creation, and cleanup.
The actor follows a three-stage workflow:
1. Image Management: Automatically imports Docker images to enroot .sqsh format
2. Container Lifecycle: Creates fresh container instances for isolated execution
3. Code Execution: Safely runs Python code with proper error handling and output capture
Dependencies:
- enroot: NVIDIA's container runtime (must be installed on host)
- Docker images: Accessible via docker:// URLs or local paths
Args:
docker_image: Docker image URL to import (e.g., "docker://python:3.10").
Can be any Docker Hub image or custom registry URL.
sqsh_image_path: Local filesystem path where the enroot .sqsh image will be stored.
If the file doesn't exist, it will be created via enroot import.
container_name: Unique name for the enroot container instance. Used for
container lifecycle management (create/remove operations).
"""

def __init__(
self,
docker_image: str = "docker://python:3.10",
sqsh_image_path: str = "python-image.sqsh",
container_name: str = "sandbox",
):
self.docker_image = docker_image
self.sqsh_image_path = sqsh_image_path
self.container_name = container_name
self._initialized = False

@endpoint
async def setup(self):
logging.debug("Setting up sandboxed actor")
await self._maybe_create_image()
self.recreate()

@endpoint
async def reset(self):
"""Resets the container instance from the base image."""
self.recreate()

async def _maybe_create_image(self):
"""Ensure the enroot image exists, import it if necessary."""
if not os.path.exists(self.sqsh_image_path):
logging.debug(
f"Image {self.sqsh_image_path} not found, importing from {self.docker_image}"
)
result = subprocess.run(
["enroot", "import", "-o", self.sqsh_image_path, self.docker_image],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
)
if result.returncode != 0:
raise RuntimeError(f"Failed to import image: {result.stderr}")
logging.debug(
f"Successfully imported {self.docker_image} to {self.sqsh_image_path}"
)
else:
logging.info(f"Using existing image: {self.sqsh_image_path}")

def recreate(self):
"""(Re)create a clean container instance from the base image."""
# Remove any old container
logging.debug(f"Removing container {self.container_name}")
subprocess.run(
["enroot", "remove", "-f", self.container_name],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
# Create new container from image
result = subprocess.run(
["enroot", "create", "--name", self.container_name, self.sqsh_image_path],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
)
logging.debug(f"Container creation result: {result}")
if result.returncode != 0:
raise RuntimeError(f"Failed to reset container: {result.stderr}")
self._initialized = True
logging.debug("Successfully initialized container")

@endpoint
async def execute(self, code: str) -> tuple[str, str]:
"""Executes Python code inside the container and returns the output.
Args:
code: Python source code string to execute.
Returns:
The captured stdout and stderr from the execution.
"""
logging.debug(f"Executing {code}")
if not self._initialized:
raise RuntimeError("Container not initialized. Call reset() first.")

# Write code to a temporary file that we can mount
with tempfile.TemporaryDirectory() as tmpdir:
code_path = Path(tmpdir) / "script.py"
code_path.write_text(code)

# Run the code inside the container, mounting tmpdir
cmd = [
"enroot",
"start",
"--mount",
f"{tmpdir}:/work",
self.container_name,
"python3",
"/work/script.py",
]
result = subprocess.run(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
)
output = result.stdout
error = result.stderr
return output, error
6 changes: 6 additions & 0 deletions src/forge/controller/provisioner.py
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,12 @@ async def host_mesh_from_proc(self, proc_mesh: ProcMesh):

async def stop_proc_mesh(self, proc_mesh: ProcMesh):
"""Stops a proc mesh."""
if proc_mesh not in self._proc_host_map:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this in here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for some reason a bunch of unit tests were failing due to wrong cleanup. Not sure why I saw this in this PR specifically, but we should have this line anyways. If we want to be really clean I can add this in a separate PR

logger.warning(
f"proc mesh {proc_mesh} was requested to be stopped, but was either already stopped or "
"was never registered with the provisioner."
)
return
async with self._lock:
# Deregister local logger from global logger
if hasattr(proc_mesh, "_local_fetcher"):
Expand Down
89 changes: 89 additions & 0 deletions tests/integration_tests/test_coder.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the BSD-style license found in the
# LICENSE file in the root directory of this source tree.

"""
Integration tests for forge.actors.coder.SandboxedPythonCoder.

Requires enroot to be installed.

"""

import os
import uuid

import pytest

from forge.actors.coder import SandboxedPythonCoder


@pytest.mark.timeout(30)
@pytest.mark.asyncio
async def test_coder_runs_python():
"""Integration test for SandboxedPythonCoder with real container execution."""
# Create unique names to avoid test conflicts
unique_id = str(uuid.uuid1())
container_name = f"test_sandbox_{unique_id}"
image_path = f"/tmp/python_test_{unique_id}.sqsh"

coder = None
try:
coder = await SandboxedPythonCoder.as_actor(
docker_image="docker://python:3.10",
sqsh_image_path=image_path,
container_name=container_name,
)

# Execute code
results, _ = await coder.execute.call_one(
code="print('hello world')",
)
print("Got results", results)
assert results == "hello world\n"

finally:
# Clean up resources
if coder:
await SandboxedPythonCoder.shutdown(coder)

# Clean up the image file
if os.path.exists(image_path):
os.unlink(image_path)


@pytest.mark.timeout(30)
@pytest.mark.asyncio
async def test_coder_catches_error():
"""Integration test for SandboxedPythonCoder with real container execution."""
# Create unique names to avoid test conflicts
unique_id = str(uuid.uuid1())
container_name = f"test_sandbox_{unique_id}"
image_path = f"/tmp/python_test_{unique_id}.sqsh"

coder = None
try:
print("starting test")
coder = await SandboxedPythonCoder.as_actor(
docker_image="docker://python:3.10",
sqsh_image_path=image_path,
container_name=container_name,
)
print("Got coder")

# Execute code
_, stderr = await coder.execute.call_one(
code="hello world",
)
print("got stderr", stderr)
assert "SyntaxError" in stderr

finally:
# Clean up resources
if coder:
await SandboxedPythonCoder.shutdown(coder)

# Clean up the image file
if os.path.exists(image_path):
os.unlink(image_path)
Loading
Loading