Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
267 changes: 127 additions & 140 deletions project/paperbench/paperbench/nano/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from contextlib import asynccontextmanager, nullcontext
from datetime import timedelta
from pathlib import Path
from typing import AsyncGenerator
from typing import Any, AsyncGenerator

import blobfile as bf
from dotenv import load_dotenv
Expand Down Expand Up @@ -66,13 +66,15 @@ async def _start_computer(
self,
cluster_config: LocalConfig,
) -> AsyncGenerator[ComputerInterface, None]:
# This is a total hack to get around the fact that `ComputerTask` only supports a single
# Docker image. Should we run the agent and judge in separate `ComputerTask`s? Should we
# refactor `ComputerTask` to support multiple computer configurations? Or something
# else?
"""
Helper function for starting an AlcatrazComputerInterface with a particular cluster_config.
"""
# here we are ensuring task-level configuration overrides cluster configuration
task = self.model_copy(
update={
# one exception: want to keep the cluster_config'd image
"docker_image": cluster_config.image,
# override the env variables so it's the same for any computer in PBTask
"environment": {
"OPENAI_API_KEY": os.getenv("OPENAI_API_KEY", None),
"ANTHROPIC_API_KEY": os.getenv("ANTHROPIC_API_KEY", None),
Expand All @@ -85,14 +87,17 @@ async def _start_computer(
},
}
)

alcatraz_env = task_to_alcatraz_config(task, cluster_config)

async with alcatraz_env.build() as cluster:
yield AlcatrazComputerInterface(cluster_value=cluster)

@override
async def _setup(self, computer: ComputerInterface) -> None:
"""
Called by ComputerTask.setup(), implements the setup logic specific to PBTask instances.
Specifically, sets up the files and folders necessary for attempting the PBTask.
"""
ctx_logger = logger.bind(
run_group_id=self.run_group_id, run_id=self.run_id, runs_dir=self.runs_dir
)
Expand Down Expand Up @@ -141,18 +146,10 @@ async def _setup(self, computer: ComputerInterface) -> None:
for dir in [SUBMISSION_DIR, CODE_DIR]:
await computer.check_shell_command(f"mkdir -p {dir}")

@override
async def grade(self, computer: ComputerInterface) -> PaperBenchGrade:
"""
Runs the grading process for a single submission.
"""

async def _ensure_files_available_for_grading(self, computer: ComputerInterface) -> None:
ctx_logger = logger.bind(
run_group_id=self.run_group_id,
run_id=self.run_id,
runs_dir=self.runs_dir,
run_group_id=self.run_group_id, run_id=self.run_id, runs_dir=self.runs_dir
)

# We need one final upload before grading, for solvers which do not take care of this step.
if not self.skipped_rollout:
ctx_logger.info("Attempting to upload logs before grading...", destinations=["run"])
Expand All @@ -170,107 +167,96 @@ async def grade(self, computer: ComputerInterface) -> PaperBenchGrade:
run_dir=self.run_dir,
status="done",
)
ctx_logger.info("Successfully uploaded logs before grading!", destinations=["run"])
except Exception as e:
ctx_logger.exception(
f"Exception uploading final logs before grading: {e}", destinations=["run"]
)

checkpoint = await self._select_checkpoint()
if not checkpoint:
ctx_logger.exception("No checkpoint exists, skipping grading!", destinations=["run"])
def _early_exit_grade(self, grader_log: str) -> PaperBenchGrade:
"""Helper function to create a PaperBenchGrade when early exiting for some reason."""
return PaperBenchGrade(
paperbench_result=PaperBenchResult(
paper_id=self.paper_id,
run_id=self.run_id,
submission_exists=False,
skipped_reproduction=self.reproduction.skip_reproduction,
azure_checkpoint_path=None,
code_only=self.judge.code_only,
resources_provided=self.judge.resources_provided,
agent_output=None,
judge_output=None,
reproduction_output=None,
),
score=0.0,
grader_log=grader_log,
)

return PaperBenchGrade(
paperbench_result=PaperBenchResult(
paper_id=self.paper_id,
run_id=self.run_id,
submission_exists=False,
skipped_reproduction=self.reproduction.skip_reproduction,
azure_checkpoint_path=None,
code_only=self.judge.code_only,
resources_provided=self.judge.resources_provided,
agent_output=None,
judge_output=None,
reproduction_output=None,
),
score=0.0,
grader_log="No checkpoint exists, skipping grading!",
)
def _should_grade(self, reproduction_output: ReproductionOutput | None) -> bool:
"""
We can proceed with grading if reproduction was successful
OR we are in a reproduction-free setup
"""
return (reproduction_output and reproduction_output.success) or (
self.reproduction.skip_reproduction or self.judge.code_only
)

ctx_logger.info("Successfully uploaded logs before grading!", destinations=["run"])
def _save_grade(self, grade: PaperBenchGrade) -> None:
ctx_logger = logger.bind(
run_group_id=self.run_group_id, run_id=self.run_id, runs_dir=self.runs_dir
)

path_to_submission, _ = checkpoint
path_to_executed_submission = path_to_submission.replace(".tar.gz", "_executed.tar.gz")
reproduction_output = None
path_to_grade = bf.join(self.run_dir, "grade.json")
bf.write_bytes(path_to_grade, json.dumps(grade.to_dict(), indent=2).encode("utf-8"))
ctx_logger.info(
purple(f"Grades saved to {path_to_grade}"), destinations=["group", "run"], _print=True
)

if self.reproduction.skip_reproduction or self.judge.code_only:
submission_to_grade_path = path_to_submission
else:
submission_to_grade_path = path_to_executed_submission
def _record_extra(self, extra: dict[str, Any]) -> None:
get_recorder().record_extra(
{
"run_group_id": self.run_group_id,
"run_id": self.run_id,
**extra,
}
)

ctx_logger.info(
f"Starting the reproduction process for `{self.question_id}.{self.attempt_id}`...",
destinations=["group", "run"],
_print=True,
)
def _should_reproduce(self) -> bool:
"""Check if we should run the reproduction step"""
return not self.reproduction.skip_reproduction and not self.judge.code_only

async with self._start_computer(
self.reproduction.cluster_config
) as reproduction_computer:
reproduction_output = await self._reproduce(reproduction_computer)
@override
async def grade(self, computer: ComputerInterface) -> PaperBenchGrade:
"""Defines the grading process for a PBTask."""
ctx_logger = logger.bind(
run_group_id=self.run_group_id, run_id=self.run_id, runs_dir=self.runs_dir
)

ctx_logger.info(
f"The reproduction process for {self.question_id}.{self.attempt_id} has finished!",
destinations=["group", "run"],
_print=True,
)
# 1. setup
await self._ensure_files_available_for_grading(computer)
checkpoint = await self._select_checkpoint()
if not checkpoint:
ctx_logger.exception("No checkpoint exists, skipping grading!", destinations=["run"])
return self._early_exit_grade("No checkpoint exists, skipping grading!")
path_to_submission, _ = checkpoint
path_to_executed_submission = path_to_submission.replace(".tar.gz", "_executed.tar.gz")

get_recorder().record_extra(
{
"run_group_id": self.run_group_id,
"run_id": self.run_id,
"repro_metadata": (
reproduction_output.metadata.to_dict()
if reproduction_output.metadata
else {}
),
}
)
# 2. run reproduction
repro_output = None
submission_to_grade_path = path_to_submission
if self._should_reproduce():
repro_output = await self._run_reproduce(path_to_submission)
repro_metadata = repro_output.metadata.to_dict() if repro_output.metadata else {}
submission_to_grade_path = path_to_executed_submission
self._record_extra({"repro_metadata": repro_metadata})

# Grade reproduction
# 3. run judge
judge_output = None
if self._should_grade(repro_output):
judge_output = await self._run_judge(submission_to_grade_path, self.paper_id)
self._record_extra({"judge_output": judge_output.to_dict() if judge_output else None})

if (
reproduction_output
and reproduction_output.success
or self.reproduction.skip_reproduction
or self.judge.code_only
):
ctx_logger.info(
f"Grading the submission for {self.question_id}.{self.attempt_id}...",
destinations=["group", "run"],
_print=True,
)

ctx_logger.info(
f"Grading submission {submission_to_grade_path}...", destinations=["run"]
)

judge_output = await self.grade_submission(submission_to_grade_path, self.paper_id)

ctx_logger.info(
f"Grading for {self.question_id}.{self.attempt_id} finished!",
destinations=["group", "run"],
_print=True,
)

get_recorder().record_extra(
{
"run_group_id": self.run_group_id,
"run_id": self.run_id,
"judge_output": judge_output.to_dict() if judge_output else None,
}
)

# 4. wrap up
grade = PaperBenchGrade(
paperbench_result=PaperBenchResult(
paper_id=self.paper_id,
Expand All @@ -281,48 +267,29 @@ async def grade(self, computer: ComputerInterface) -> PaperBenchGrade:
code_only=self.judge.code_only,
resources_provided=self.judge.resources_provided,
judge_output=judge_output,
reproduction_output=reproduction_output,
reproduction_output=repro_output,
),
score=judge_output.score if judge_output else 0.0,
grader_log="Grading completed successfully" if judge_output else "Log file not found!",
)

get_recorder().record_extra(
{
"run_group_id": self.run_group_id,
"run_id": self.run_id,
"pb_result": grade.to_dict(),
}
)

path_to_grade = bf.join(self.run_dir, "grade.json")
bf.write_bytes(path_to_grade, json.dumps(grade.to_dict(), indent=2).encode("utf-8"))
ctx_logger.info(
purple(f"Grades saved to {path_to_grade}"), destinations=["group", "run"], _print=True
)
self._record_extra({"pb_result": grade.to_dict()})
self._save_grade(grade)

return grade

async def _reproduce(self, computer: ComputerInterface) -> ReproductionOutput:
async def _run_reproduce(self, submission: str) -> ReproductionOutput:
"""Runs the reproduction process for the submission associated with the PBTask."""
ctx_logger = logger.bind(
run_group_id=self.run_group_id,
run_id=self.run_id,
runs_dir=self.runs_dir,
)
ctx_logger.info(
f"Starting the reproduction process for `{self.question_id}.{self.attempt_id}`...",
destinations=["group", "run"],
_print=True,
)

checkpoint = await self._select_checkpoint()

if not checkpoint:
ctx_logger.exception(
"No checkpoint exists, skipping reproduction!", destinations=["run"]
)

return ReproductionOutput(
executed_submission=None,
metadata=None,
)

submission, _ = checkpoint
reproduce_output_path = submission.replace(".tar.gz", "_executed.tar.gz")
repro_metadata_path = submission.replace(".tar.gz", "_executed_metadata.json")

Expand All @@ -346,14 +313,15 @@ async def _reproduce(self, computer: ComputerInterface) -> ReproductionOutput:
)

# Reproduce on alcatraz
await reproduce_on_computer(
computer=computer,
submission_path=submission,
logger=ctx_logger.bind(destinations=["run"]),
run_dir=self.run_dir,
timeout=self.reproduction.timeout,
retry_threshold=self.reproduction.retry_threshold,
)
async with self._start_computer(self.reproduction.cluster_config) as computer:
await reproduce_on_computer(
computer=computer,
submission_path=submission,
logger=ctx_logger.bind(destinations=["run"]),
run_dir=self.run_dir,
timeout=self.reproduction.timeout,
retry_threshold=self.reproduction.retry_threshold,
)

# Now the result should exist
repro_output_exists = bf.exists(reproduce_output_path)
Expand Down Expand Up @@ -383,13 +351,19 @@ async def _reproduce(self, computer: ComputerInterface) -> ReproductionOutput:
data = json.loads(f.read())
metadata = ReproductionMetadata.from_dict(data)

ctx_logger.info(
f"The reproduction process for {self.question_id}.{self.attempt_id} has finished!",
destinations=["group", "run"],
_print=True,
)

return ReproductionOutput(
executed_submission=reproduce_output_path,
metadata=metadata,
)

async def _select_checkpoint(self) -> tuple[str, timedelta] | None:
"""Identify the submission tarball to use for reproduction/grading."""
"""Identifies the submission tarball to use for reproduction/grading."""
ctx_logger = logger.bind(
run_group_id=self.run_group_id,
run_id=self.run_id,
Expand Down Expand Up @@ -419,16 +393,23 @@ async def _select_checkpoint(self) -> tuple[str, timedelta] | None:

return submission_path, submission_duration

async def grade_submission(
async def _run_judge(
self,
submission_path: str,
paper_id: str,
) -> JudgeOutput | None:
"""Runs the judge on the given submission for the PBTask."""
ctx_logger = logger.bind(
run_group_id=self.run_group_id,
run_id=self.run_id,
runs_dir=self.runs_dir,
)
ctx_logger.info(
f"Grading the submission for {self.question_id}.{self.attempt_id}"
f" from {submission_path}...",
destinations=["group", "run"],
_print=True,
)

grader_upload_path = submission_path.replace(
".tar.gz", f"_grader_output_{self.judge.grade_id}.json"
Expand Down Expand Up @@ -479,4 +460,10 @@ async def grade_submission(
resources_provided=self.judge.resources_provided,
)

ctx_logger.info(
f"Grading for {self.question_id}.{self.attempt_id} finished!",
destinations=["group", "run"],
_print=True,
)

return judge_output