Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import os
from abc import ABC, abstractmethod
from contextlib import asynccontextmanager
from typing import Any, AsyncContextManager, AsyncGenerator, Literal, cast
from typing import Any, AsyncGenerator, Literal, cast

import async_lru
import structlog.stdlib
Expand All @@ -9,18 +10,24 @@

import chz
from alcatraz.clusters.local import BaseAlcatrazCluster, ClusterConfig, LocalConfig
from nanoeval_alcatraz.task_to_alcatraz_config import task_to_alcatraz_config
from alcatraz.utils.cmds import run_command_streaming
from nanoeval.solvers.computer_tasks.code_execution_interface import (
ComputerConfiguration,
ComputerInterface,
ComputerRuntime,
ExecutionResult,
JupyterComputerInterface,
JupyterExecutionResult,
)
from nanoeval_alcatraz.task_to_alcatraz_config import (
SUPPORTED_NETWORK_MODES,
task_to_alcatraz_config,
)

logger = structlog.stdlib.get_logger(component=__name__)

ALCATRAZ_TIMEOUT = int(os.getenv("ALCATRAZ_TIMEOUT", 120))


class Python3ExceptionDict(BaseModel):
"""A pydantic model for serializing a Python 3.x exception.
Expand All @@ -38,10 +45,14 @@ class Python3ExceptionDict(BaseModel):
notes: list[str]


class BaseAlcatrazComputerInterface(JupyterComputerInterface, ABC):
class BaseAlcatrazComputerInterfaceNoJupyter(ComputerInterface, ABC):
"""
Override this class to create a custom AlcatrazComputerInterface without Jupyter support.
"""

@property
@abstractmethod
def cluster(self) -> BaseAlcatrazCluster:
def _cluster(self) -> BaseAlcatrazCluster:
pass

async def disable_internet(self) -> None:
Expand All @@ -52,29 +63,44 @@ async def disable_internet(self) -> None:

# Verify
logger.info("Post-setup network access disabled")
logger.info("Verified network access successfully disabled")
try:
from alcatraz.utils.network import assert_internet_disabled # type: ignore

await assert_internet_disabled(self._cluster)
logger.info("Verified network access successfully disabled")
except ImportError:
pass

async def upload(self, file: bytes, destination: str) -> None:
return await self.cluster.upload(file, destination)
return await self._cluster.upload(file, destination)

async def download(self, file: str) -> bytes:
return await self.cluster.download(file)
return await self._cluster.download(file)

async def send_shell_command(self, cmd: str, idempotent: bool = False) -> ExecutionResult:
res = await run_command_streaming(self.cluster, cmd)
async def send_shell_command(self, cmd: str, *, idempotent: bool = False) -> ExecutionResult:
res = await run_command_streaming(self._cluster, cmd)
return ExecutionResult(output=res["result"], exit_code=res["exit_code"])

async def fetch_container_names(self) -> list[str]:
return await self.cluster.fetch_container_names()
return await self._cluster.fetch_container_names()

async def stop(self) -> None:
await self.cluster._stop()
await self._cluster._stop()


@chz.chz
class BaseAlcatrazComputerInterface(
BaseAlcatrazComputerInterfaceNoJupyter, JupyterComputerInterface
):
"""
Override this class to add Jupyter-specific functionality to the AlcatrazComputerInterface.
"""

@override
async def execute(self, code: str, timeout: int = 120) -> JupyterExecutionResult:
await self._start_cluster_once()
async def execute(self, code: str, timeout: int = ALCATRAZ_TIMEOUT) -> JupyterExecutionResult:
await self._start_jupyter_kernel_once()

messages = await self.cluster.send_kernel_command(code, timeout=timeout)
messages = await self._cluster.send_kernel_command(code, timeout=timeout)

# Parse the messages into a final execution result
# TODO(kevinliu) - this may not be a perfect parsing, but it should only really be used for setup and grade so hopefully it's good enough
Expand Down Expand Up @@ -108,42 +134,89 @@ async def execute(self, code: str, timeout: int = 120) -> JupyterExecutionResult
)

@async_lru.alru_cache(maxsize=1)
async def _start_cluster_once(self) -> None:
if not await self.cluster.is_kernel_started():
await self.cluster.create_kernel_on_machine()
async def _start_jupyter_kernel_once(self) -> None:
if not await self._cluster.is_kernel_started():
await self._cluster.create_kernel_on_machine()


@chz.chz
class AlcatrazComputerInterface(BaseAlcatrazComputerInterface):
class AlcatrazComputerInterfaceNoJupyter(BaseAlcatrazComputerInterfaceNoJupyter):
cluster_value: BaseAlcatrazCluster

@property
def cluster(self) -> BaseAlcatrazCluster:
def _cluster(self) -> BaseAlcatrazCluster:
return self.cluster_value


@chz.chz
class AlcatrazComputerRuntime(ComputerRuntime):
class AlcatrazComputerInterface(AlcatrazComputerInterfaceNoJupyter, BaseAlcatrazComputerInterface):
pass


@chz.chz
class AlcatrazComputerRuntimeNoJupyter(ComputerRuntime):
env: ClusterConfig = chz.field(default_factory=LocalConfig)

async def _do_runtime_setup(
self, task: ComputerConfiguration, computer: ComputerInterface
) -> None:
assert isinstance(computer, BaseAlcatrazComputerInterfaceNoJupyter)
# Alcatraz must do this dynamically since it doesn't have the ability to remove
# a container's internet access at creation time.
# Other runtimes should do this by configuring the container object itself.
if not task.allow_internet:
logger.info("Disabling internet (since allow_internet is False)")
await computer.disable_internet()

if task.network_mode not in SUPPORTED_NETWORK_MODES:
raise ValueError(
f"The {task.network_mode} network mode is not supported on Alcatraz, and will never be. Please change it, or stop using Alcatraz."
)

@asynccontextmanager
async def run(
async def _start_computer(
self, task: ComputerConfiguration
) -> AsyncGenerator[AlcatrazComputerInterface, None]:
async with task_to_alcatraz_config(task, self.env).build() as cluster:
computer = AlcatrazComputerInterface(cluster_value=cluster)
) -> AsyncGenerator[AlcatrazComputerInterfaceNoJupyter, None]:
async with task_to_alcatraz_config(task, self.env).build() as _cluster:
computer = AlcatrazComputerInterfaceNoJupyter(cluster_value=_cluster)
yield computer

@override

@chz.chz
class AlcatrazComputerRuntime(ComputerRuntime):
env: ClusterConfig = chz.field(default_factory=LocalConfig)

async def _do_runtime_setup(
self, task: ComputerConfiguration, computer: AlcatrazComputerInterface
self, task: ComputerConfiguration, computer: ComputerInterface
) -> None:
"""No-op, we don't use this but need to implement the abstract method."""
return
assert isinstance(computer, BaseAlcatrazComputerInterface)

# Run a jupyter command; this will force Jupyter to start up and be installed.
# This is an Alcatraz only feature. It must be done before Internet access is disabled, because
# Alcatraz supports live-installing Jupyter.
# TODO(kevinliu) we should catalog which evals rely on this and remove it.
logger.info("Running initial Jupyter command to ensure Jupyter is installed")
await computer.execute("print('hi')")
logger.info("Jupyter working!")

# Alcatraz must do this dynamically since it doesn't have the ability to remove
# a container's internet access at creation time.
# Other runtimes should do this by configuring the container object itself.
if not task.allow_internet:
logger.info("Disabling internet (since allow_internet is False)")
await computer.disable_internet()

if task.network_mode not in SUPPORTED_NETWORK_MODES:
raise ValueError(
f"The {task.network_mode} network mode is not supported on Alcatraz, and will never be. Please change it, or stop using Alcatraz."
)

@override
def _start_computer(
@asynccontextmanager
async def _start_computer(
self, task: ComputerConfiguration
) -> AsyncContextManager[AlcatrazComputerInterface]:
"""No-op, we don't use this but need to implement the abstract method."""
return
) -> AsyncGenerator[AlcatrazComputerInterface, None]:
async with task_to_alcatraz_config(task, self.env).build() as _cluster:
computer = AlcatrazComputerInterface(cluster_value=_cluster)
yield computer


Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,24 @@
import logging

from alcatraz.clusters.local import ClusterConfig
from nanoeval.solvers.computer_tasks.code_execution_interface import ComputerConfiguration
from nanoeval.solvers.computer_tasks.code_execution_interface import (
ComputerConfiguration,
NetworkMode,
)

logger = logging.getLogger(__name__)

SUPPORTED_NETWORK_MODES = [NetworkMode.NONE, NetworkMode.UNPROXIED]


def task_to_alcatraz_config(task: ComputerConfiguration, config: ClusterConfig) -> ClusterConfig:
# TODO, we should really just have a ClusterConfig as part of the task itself

if task.network_mode not in SUPPORTED_NETWORK_MODES:
raise ValueError(
f"The {task.network_mode} network mode is not supported on Alcatraz, and will never be. Please change it, or stop using Alcatraz."
)

if task.azure_vm_sku:
logger.info("Using custom azure_vm_sku: %s", task.azure_vm_sku)
config = config.model_copy(update={"azure_vm_sku": task.azure_vm_sku})
Expand All @@ -35,11 +45,22 @@ def task_to_alcatraz_config(task: ComputerConfiguration, config: ClusterConfig)
if task.alcatraz_limits:
logger.info("Using custom limits: %s", task.alcatraz_limits)
config = config.model_copy(update={"limits": task.alcatraz_limits})

merged_volumes_config = {}
if task.volumes_config:
logger.info("Using custom volumes_config: %s", task.volumes_config)
config = config.model_copy(update={"volumes_config": task.volumes_config})

merged_volumes_config.update(task.volumes_config)
if task.volume_mounts:
volumes_config = {
str(i): {
"bind_source": volume_mount.host_path,
"bind_dest": volume_mount.container_path,
}
for i, volume_mount in enumerate(task.volume_mounts)
}
logger.info("Using custom volumes_mount: %s", volumes_config)
merged_volumes_config.update(volumes_config)
if merged_volumes_config:
config = config.model_copy(update={"volumes_config": merged_volumes_config})
if task.shm_size:
logger.info("Using custom shm size: %s", task.shm_size)
config = config.model_copy(update={"shm_size": task.shm_size})
Expand Down
101 changes: 55 additions & 46 deletions project/swelancer/swelancer/eval.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,54 +122,63 @@ async def _setup(self, computer: ComputerInterface, runtime_config: RuntimeConfi
ctx_logger = logger.bind(
run_group_id=self.run_group_id, runs_dir=self.runs_dir, run_id=self.run_id
)
async with asyncio.timeout(2400):
try:
# Assert various things about the environment
ctx_logger.info("SETUP", destinations=["run"])
if isinstance(computer, JupyterComputerInterface):
await computer.check_execute(
"""import os; assert os.environ.get('CONDA_DEFAULT_ENV') == 'testbed', os.environ.get('CONDA_DEFAULT_ENV')""",
)
ctx_logger.info("Correct Jupyter environment is loaded.", destinations=["run"])

# Assert various things about the environment
ctx_logger.info("SETUP", destinations=["run"])
if isinstance(computer, JupyterComputerInterface):
await computer.check_execute(
"""import os; assert os.environ.get('CONDA_DEFAULT_ENV') == 'testbed', os.environ.get('CONDA_DEFAULT_ENV')""",
)
ctx_logger.info("Correct Jupyter environment is loaded.", destinations=["run"])

if self.variant == "swe_manager":
# reset to the broken commit
await computer.check_shell_command(f"bash -c 'git reset --hard {self.manager_commit}^'")
else:
# instead of a time based check, check until the SETUP_DONE environment variable is set
ctx_logger.info("Waiting for setup to be done", destinations=["run"])
while True:
res = await computer.send_shell_command("cat /setup_done.txt")
if res.output.decode("utf-8").strip() == "done":
ctx_logger.info("Setup is done", destinations=["run"])
break
await asyncio.sleep(15)

# power user flag: if using a stale image where zip and unzip are not installed
if os.getenv("INSTALL_ZIP_UNZIP", "false").lower() == "true":
await computer.check_shell_command("apt-get update && apt-get install -y zip unzip")

# Zip the tests directory with a password
ctx_logger.info("Zipping tests", destinations=["run"])
await computer.check_shell_command(
"cd /app/tests && zip -P 'secretpasswordhere' -r /app/tmp.zip . && mv /app/tmp.zip /app/tests.zip"
)
# Clear tests directory
await computer.send_shell_command("rm -rf /app/tests")

# Remove existing git repo and create a new one
await computer.check_shell_command("rm -rf .git")
await computer.check_shell_command("git init")

# Creat a temp commit to mark where the model's changes begin
if self.variant == "ic_swe":
await computer.check_shell_command("bash -c 'git add .'")
await computer.send_shell_command(
'bash -c \'git -c user.name="temp" -c user.email="temp@example.com" commit -m "temp"\''
)
ctx_logger.info("Temp commit created", destinations=["run"])
if self.variant == "swe_manager":
# reset to the broken commit
await computer.check_shell_command(
f"bash -c 'git reset --hard {self.manager_commit}^'"
)
else:
# instead of a time based check, check until the SETUP_DONE environment variable is set
ctx_logger.info("Waiting for setup to be done", destinations=["run"])
while True:
res = await computer.send_shell_command("cat /setup_done.txt")
if res.output.decode("utf-8").strip() == "done":
ctx_logger.info("Setup is done", destinations=["run"])
break
await asyncio.sleep(15)

# power user flag: if using a stale image where zip and unzip are not installed
if os.getenv("INSTALL_ZIP_UNZIP", "false").lower() == "true":
await computer.check_shell_command(
"apt-get update && apt-get install -y zip unzip"
)

if self.disable_internet:
await computer.disable_internet()
# Zip the tests directory with a password
ctx_logger.info("Zipping tests", destinations=["run"])
await computer.check_shell_command(
"cd /app/tests && zip -P 'secretpasswordhere' -r /app/tmp.zip . && mv /app/tmp.zip /app/tests.zip"
)
# Clear tests directory
await computer.send_shell_command("rm -rf /app/tests")

# Remove existing git repo and create a new one
await computer.check_shell_command("rm -rf .git")
await computer.check_shell_command("git init")

# Creat a temp commit to mark where the model's changes begin
if self.variant == "ic_swe":
await computer.check_shell_command("bash -c 'git add .'")
await computer.send_shell_command(
'bash -c \'git -c user.name="temp" -c user.email="temp@example.com" commit -m "temp"\''
)
ctx_logger.info("Temp commit created", destinations=["run"])

if self.disable_internet:
await computer.disable_internet()
except Exception as e:
ctx_logger.exception("An error occurred during setup", destinations=["run"])
raise RolloutSystemError(f"An error occurred during setup: {e}") from e
ctx_logger.info("Setup complete", destinations=["run"])

@override
async def grade(
Expand Down
Loading