diff --git a/project/paperbench/paperbench/nano/task.py b/project/paperbench/paperbench/nano/task.py index 1ba48880..88faad8d 100644 --- a/project/paperbench/paperbench/nano/task.py +++ b/project/paperbench/paperbench/nano/task.py @@ -7,7 +7,7 @@ from contextlib import asynccontextmanager, nullcontext from datetime import timedelta from pathlib import Path -from typing import AsyncGenerator +from typing import Any, AsyncGenerator import blobfile as bf from dotenv import load_dotenv @@ -66,13 +66,15 @@ async def _start_computer( self, cluster_config: LocalConfig, ) -> AsyncGenerator[ComputerInterface, None]: - # This is a total hack to get around the fact that `ComputerTask` only supports a single - # Docker image. Should we run the agent and judge in separate `ComputerTask`s? Should we - # refactor `ComputerTask` to support multiple computer configurations? Or something - # else? + """ + Helper function for starting an AlcatrazComputerInterface with a particular cluster_config. + """ + # here we are ensuring task-level configuration overrides cluster configuration task = self.model_copy( update={ + # one exception: want to keep the cluster_config'd image "docker_image": cluster_config.image, + # override the env variables so it's the same for any computer in PBTask "environment": { "OPENAI_API_KEY": os.getenv("OPENAI_API_KEY", None), "ANTHROPIC_API_KEY": os.getenv("ANTHROPIC_API_KEY", None), @@ -85,7 +87,6 @@ async def _start_computer( }, } ) - alcatraz_env = task_to_alcatraz_config(task, cluster_config) async with alcatraz_env.build() as cluster: @@ -93,6 +94,10 @@ async def _start_computer( @override async def _setup(self, computer: ComputerInterface) -> None: + """ + Called by ComputerTask.setup(), implements the setup logic specific to PBTask instances. + Specifically, sets up the files and folders necessary for attempting the PBTask. + """ ctx_logger = logger.bind( run_group_id=self.run_group_id, run_id=self.run_id, runs_dir=self.runs_dir ) @@ -141,18 +146,10 @@ async def _setup(self, computer: ComputerInterface) -> None: for dir in [SUBMISSION_DIR, CODE_DIR]: await computer.check_shell_command(f"mkdir -p {dir}") - @override - async def grade(self, computer: ComputerInterface) -> PaperBenchGrade: - """ - Runs the grading process for a single submission. - """ - + async def _ensure_files_available_for_grading(self, computer: ComputerInterface) -> None: ctx_logger = logger.bind( - run_group_id=self.run_group_id, - run_id=self.run_id, - runs_dir=self.runs_dir, + run_group_id=self.run_group_id, run_id=self.run_id, runs_dir=self.runs_dir ) - # We need one final upload before grading, for solvers which do not take care of this step. if not self.skipped_rollout: ctx_logger.info("Attempting to upload logs before grading...", destinations=["run"]) @@ -170,107 +167,96 @@ async def grade(self, computer: ComputerInterface) -> PaperBenchGrade: run_dir=self.run_dir, status="done", ) + ctx_logger.info("Successfully uploaded logs before grading!", destinations=["run"]) except Exception as e: ctx_logger.exception( f"Exception uploading final logs before grading: {e}", destinations=["run"] ) - checkpoint = await self._select_checkpoint() - if not checkpoint: - ctx_logger.exception("No checkpoint exists, skipping grading!", destinations=["run"]) + def _early_exit_grade(self, grader_log: str) -> PaperBenchGrade: + """Helper function to create a PaperBenchGrade when early exiting for some reason.""" + return PaperBenchGrade( + paperbench_result=PaperBenchResult( + paper_id=self.paper_id, + run_id=self.run_id, + submission_exists=False, + skipped_reproduction=self.reproduction.skip_reproduction, + azure_checkpoint_path=None, + code_only=self.judge.code_only, + resources_provided=self.judge.resources_provided, + agent_output=None, + judge_output=None, + reproduction_output=None, + ), + score=0.0, + grader_log=grader_log, + ) - return PaperBenchGrade( - paperbench_result=PaperBenchResult( - paper_id=self.paper_id, - run_id=self.run_id, - submission_exists=False, - skipped_reproduction=self.reproduction.skip_reproduction, - azure_checkpoint_path=None, - code_only=self.judge.code_only, - resources_provided=self.judge.resources_provided, - agent_output=None, - judge_output=None, - reproduction_output=None, - ), - score=0.0, - grader_log="No checkpoint exists, skipping grading!", - ) + def _should_grade(self, reproduction_output: ReproductionOutput | None) -> bool: + """ + We can proceed with grading if reproduction was successful + OR we are in a reproduction-free setup + """ + return (reproduction_output and reproduction_output.success) or ( + self.reproduction.skip_reproduction or self.judge.code_only + ) - ctx_logger.info("Successfully uploaded logs before grading!", destinations=["run"]) + def _save_grade(self, grade: PaperBenchGrade) -> None: + ctx_logger = logger.bind( + run_group_id=self.run_group_id, run_id=self.run_id, runs_dir=self.runs_dir + ) - path_to_submission, _ = checkpoint - path_to_executed_submission = path_to_submission.replace(".tar.gz", "_executed.tar.gz") - reproduction_output = None + path_to_grade = bf.join(self.run_dir, "grade.json") + bf.write_bytes(path_to_grade, json.dumps(grade.to_dict(), indent=2).encode("utf-8")) + ctx_logger.info( + purple(f"Grades saved to {path_to_grade}"), destinations=["group", "run"], _print=True + ) - if self.reproduction.skip_reproduction or self.judge.code_only: - submission_to_grade_path = path_to_submission - else: - submission_to_grade_path = path_to_executed_submission + def _record_extra(self, extra: dict[str, Any]) -> None: + get_recorder().record_extra( + { + "run_group_id": self.run_group_id, + "run_id": self.run_id, + **extra, + } + ) - ctx_logger.info( - f"Starting the reproduction process for `{self.question_id}.{self.attempt_id}`...", - destinations=["group", "run"], - _print=True, - ) + def _should_reproduce(self) -> bool: + """Check if we should run the reproduction step""" + return not self.reproduction.skip_reproduction and not self.judge.code_only - async with self._start_computer( - self.reproduction.cluster_config - ) as reproduction_computer: - reproduction_output = await self._reproduce(reproduction_computer) + @override + async def grade(self, computer: ComputerInterface) -> PaperBenchGrade: + """Defines the grading process for a PBTask.""" + ctx_logger = logger.bind( + run_group_id=self.run_group_id, run_id=self.run_id, runs_dir=self.runs_dir + ) - ctx_logger.info( - f"The reproduction process for {self.question_id}.{self.attempt_id} has finished!", - destinations=["group", "run"], - _print=True, - ) + # 1. setup + await self._ensure_files_available_for_grading(computer) + checkpoint = await self._select_checkpoint() + if not checkpoint: + ctx_logger.exception("No checkpoint exists, skipping grading!", destinations=["run"]) + return self._early_exit_grade("No checkpoint exists, skipping grading!") + path_to_submission, _ = checkpoint + path_to_executed_submission = path_to_submission.replace(".tar.gz", "_executed.tar.gz") - get_recorder().record_extra( - { - "run_group_id": self.run_group_id, - "run_id": self.run_id, - "repro_metadata": ( - reproduction_output.metadata.to_dict() - if reproduction_output.metadata - else {} - ), - } - ) + # 2. run reproduction + repro_output = None + submission_to_grade_path = path_to_submission + if self._should_reproduce(): + repro_output = await self._run_reproduce(path_to_submission) + repro_metadata = repro_output.metadata.to_dict() if repro_output.metadata else {} + submission_to_grade_path = path_to_executed_submission + self._record_extra({"repro_metadata": repro_metadata}) - # Grade reproduction + # 3. run judge judge_output = None + if self._should_grade(repro_output): + judge_output = await self._run_judge(submission_to_grade_path, self.paper_id) + self._record_extra({"judge_output": judge_output.to_dict() if judge_output else None}) - if ( - reproduction_output - and reproduction_output.success - or self.reproduction.skip_reproduction - or self.judge.code_only - ): - ctx_logger.info( - f"Grading the submission for {self.question_id}.{self.attempt_id}...", - destinations=["group", "run"], - _print=True, - ) - - ctx_logger.info( - f"Grading submission {submission_to_grade_path}...", destinations=["run"] - ) - - judge_output = await self.grade_submission(submission_to_grade_path, self.paper_id) - - ctx_logger.info( - f"Grading for {self.question_id}.{self.attempt_id} finished!", - destinations=["group", "run"], - _print=True, - ) - - get_recorder().record_extra( - { - "run_group_id": self.run_group_id, - "run_id": self.run_id, - "judge_output": judge_output.to_dict() if judge_output else None, - } - ) - + # 4. wrap up grade = PaperBenchGrade( paperbench_result=PaperBenchResult( paper_id=self.paper_id, @@ -281,48 +267,29 @@ async def grade(self, computer: ComputerInterface) -> PaperBenchGrade: code_only=self.judge.code_only, resources_provided=self.judge.resources_provided, judge_output=judge_output, - reproduction_output=reproduction_output, + reproduction_output=repro_output, ), score=judge_output.score if judge_output else 0.0, grader_log="Grading completed successfully" if judge_output else "Log file not found!", ) - - get_recorder().record_extra( - { - "run_group_id": self.run_group_id, - "run_id": self.run_id, - "pb_result": grade.to_dict(), - } - ) - - path_to_grade = bf.join(self.run_dir, "grade.json") - bf.write_bytes(path_to_grade, json.dumps(grade.to_dict(), indent=2).encode("utf-8")) - ctx_logger.info( - purple(f"Grades saved to {path_to_grade}"), destinations=["group", "run"], _print=True - ) + self._record_extra({"pb_result": grade.to_dict()}) + self._save_grade(grade) return grade - async def _reproduce(self, computer: ComputerInterface) -> ReproductionOutput: + async def _run_reproduce(self, submission: str) -> ReproductionOutput: + """Runs the reproduction process for the submission associated with the PBTask.""" ctx_logger = logger.bind( run_group_id=self.run_group_id, run_id=self.run_id, runs_dir=self.runs_dir, ) + ctx_logger.info( + f"Starting the reproduction process for `{self.question_id}.{self.attempt_id}`...", + destinations=["group", "run"], + _print=True, + ) - checkpoint = await self._select_checkpoint() - - if not checkpoint: - ctx_logger.exception( - "No checkpoint exists, skipping reproduction!", destinations=["run"] - ) - - return ReproductionOutput( - executed_submission=None, - metadata=None, - ) - - submission, _ = checkpoint reproduce_output_path = submission.replace(".tar.gz", "_executed.tar.gz") repro_metadata_path = submission.replace(".tar.gz", "_executed_metadata.json") @@ -346,14 +313,15 @@ async def _reproduce(self, computer: ComputerInterface) -> ReproductionOutput: ) # Reproduce on alcatraz - await reproduce_on_computer( - computer=computer, - submission_path=submission, - logger=ctx_logger.bind(destinations=["run"]), - run_dir=self.run_dir, - timeout=self.reproduction.timeout, - retry_threshold=self.reproduction.retry_threshold, - ) + async with self._start_computer(self.reproduction.cluster_config) as computer: + await reproduce_on_computer( + computer=computer, + submission_path=submission, + logger=ctx_logger.bind(destinations=["run"]), + run_dir=self.run_dir, + timeout=self.reproduction.timeout, + retry_threshold=self.reproduction.retry_threshold, + ) # Now the result should exist repro_output_exists = bf.exists(reproduce_output_path) @@ -383,13 +351,19 @@ async def _reproduce(self, computer: ComputerInterface) -> ReproductionOutput: data = json.loads(f.read()) metadata = ReproductionMetadata.from_dict(data) + ctx_logger.info( + f"The reproduction process for {self.question_id}.{self.attempt_id} has finished!", + destinations=["group", "run"], + _print=True, + ) + return ReproductionOutput( executed_submission=reproduce_output_path, metadata=metadata, ) async def _select_checkpoint(self) -> tuple[str, timedelta] | None: - """Identify the submission tarball to use for reproduction/grading.""" + """Identifies the submission tarball to use for reproduction/grading.""" ctx_logger = logger.bind( run_group_id=self.run_group_id, run_id=self.run_id, @@ -419,16 +393,23 @@ async def _select_checkpoint(self) -> tuple[str, timedelta] | None: return submission_path, submission_duration - async def grade_submission( + async def _run_judge( self, submission_path: str, paper_id: str, ) -> JudgeOutput | None: + """Runs the judge on the given submission for the PBTask.""" ctx_logger = logger.bind( run_group_id=self.run_group_id, run_id=self.run_id, runs_dir=self.runs_dir, ) + ctx_logger.info( + f"Grading the submission for {self.question_id}.{self.attempt_id}" + f" from {submission_path}...", + destinations=["group", "run"], + _print=True, + ) grader_upload_path = submission_path.replace( ".tar.gz", f"_grader_output_{self.judge.grade_id}.json" @@ -479,4 +460,10 @@ async def grade_submission( resources_provided=self.judge.resources_provided, ) + ctx_logger.info( + f"Grading for {self.question_id}.{self.attempt_id} finished!", + destinations=["group", "run"], + _print=True, + ) + return judge_output