diff --git a/src/fhda/config.py b/src/fhda/config.py index 746e647..7f3151e 100644 --- a/src/fhda/config.py +++ b/src/fhda/config.py @@ -19,3 +19,5 @@ DATA_STORAGE_PATH = Path("storage") else: DATA_STORAGE_PATH = Path("/storage") + +EVAL = bool(os.getenv("EVAL", "false").lower() == "true") diff --git a/src/fhda/data_analysis_env.py b/src/fhda/data_analysis_env.py index 8e649d9..cc75cfb 100644 --- a/src/fhda/data_analysis_env.py +++ b/src/fhda/data_analysis_env.py @@ -1,5 +1,4 @@ import hashlib -import json import logging import shutil from typing import Any, cast @@ -10,7 +9,6 @@ Message, Messages, Tool, - eval_answer, ) from .notebook_env import NBEnvironment @@ -33,7 +31,7 @@ def __init__( answer: str | int | float | None = None, # noqa: PYI041 system_prompt: str | None = None, correct_reward: float = 1.0, - eval_mode: EvalAnswerMode, + eval_mode: EvalAnswerMode | None = None, metadata: dict[str, Any] | None = None, # used for NBEvalExpt mcqs: list[MultipleChoiceQuestion] | None = None, **kwargs, @@ -66,7 +64,7 @@ async def reset(self) -> tuple[Messages, list[Tool]]: return init_obs, tools - async def submit_answer(self, answer: str | float | dict[str, Any] | None) -> str: # type: ignore[override] + async def submit_answer(self, answer: str) -> str: # type: ignore[override] """Submit an answer to the problem. Note that this tool may only be called once and ends the episode. @@ -79,75 +77,50 @@ async def submit_answer(self, answer: str | float | dict[str, Any] | None) -> st self.state.done = True logger.info("Submitting answer and closing environment") await self.close() - correct = False logger.info("Answer: %s", answer) + return answer - if self.eval_mode is None: - return CORRECT_MSG - - if isinstance(self.answer, int): - try: - answer = int(answer) # type: ignore[arg-type] - except ValueError: - pass - else: - correct = answer == self.answer + @classmethod + def eval_from_task(cls, task: str, gcs_artifact_path: str) -> "DataAnalysisEnv": + """ + Used for evaluations via crow jobs. - elif isinstance(self.answer, float): - try: - answer = float(answer) # type: ignore[arg-type] - except ValueError: - pass - else: - correct = abs(answer - self.answer) < 1e-4 * self.answer + Args: + task: The user query structured as | + gcs_artifact_path: The path to the GCS artifact – required for evaluation on crow jobs + """ + logger.info("Using the eval_from_task method") + + # Create temporary directory in GCP mounted storage volume + task_hash = hashlib.sha256(task.encode()).hexdigest() + trajectory_path = cfg.DATA_STORAGE_PATH / f"{task_hash}-{time.time()}" + trajectory_path.mkdir(parents=True, exist_ok=True) + logger.info("Trajectory path: %s", trajectory_path) + nb_path = trajectory_path / NBEnvironment.NOTEBOOK_NAME + # Copy task data to trajectory path + for item in (cfg.DATA_STORAGE_PATH / gcs_artifact_path).iterdir(): + if item.is_file(): + shutil.copy2(item, trajectory_path) + elif item.is_dir(): + shutil.copytree(item, trajectory_path / item.name, dirs_exist_ok=True) - elif isinstance(self.answer, str): - correct = bool( - await eval_answer( - proposed=str(answer), - correct=str(self.answer), - question=self.problem, - eval_mode=self.eval_mode, - ) + language = NBLanguage.PYTHON # In future, this should be a hyperparameter + if trajectory_path.exists(): + logger.info( + "Files in directory: %s", [f.name for f in trajectory_path.iterdir()] ) - elif isinstance(self.answer, dict): # This is for mcqs and open questions - # Check if answer is a json string - if isinstance(answer, str): # type: ignore[unreachable] - # Process json into dictionary - try: - processed_answer = json.loads(answer) - except json.JSONDecodeError: - return INCORRECT_MSG - else: - processed_answer = answer if isinstance(answer, dict) else {} - # Loop through each question and answer - for question_id, agent_answer in processed_answer.items(): - try: - ideal_answer = self.answer[question_id] - question = next( - q - for q in self.mcqs - if q.question_id.lower() == question_id.lower() - ) - correct = bool( - await eval_answer( - proposed=str(agent_answer), - correct=str(ideal_answer), - question=question, - eval_mode=self.eval_mode, - ) - ) - self.question_rewards[question_id] = correct - except KeyError: - self.question_rewards[question_id] = 0 - average_reward = sum(self.question_rewards.values()) / len(self.mcqs) - correct = round(average_reward) == 1.0 - - if correct: - self.state.total_reward += self.correct_reward - return CORRECT_MSG - return INCORRECT_MSG + return cls( + problem_id=f"data-analysis-task-{task_hash}", + problem=task, + # Using exact just because I won't ultimately be using env evaluation + eval_mode=EvalAnswerMode.EXACT, + nb_path=nb_path, + work_dir=trajectory_path, + language=language, + system_prompt=prompts.CAPSULE_SYSTEM_PROMPT_OPEN, + use_tmp_work_dir=False, + ) @classmethod def from_task( @@ -163,6 +136,8 @@ def from_task( """ logger.info("User task: %s", task) logger.info("GCS artifact path: %s", gcs_artifact_path) + if cfg.EVAL: + return cls.eval_from_task(task, gcs_artifact_path) # type: ignore if ( gcs_artifact_path @@ -251,6 +226,7 @@ def export_frame(self) -> Frame: "total_reward": self.state.total_reward, "nb_state": self.state.nb, "nb_state_html": nb_to_html(self.state.nb), + "nb_runtime_errors": self.state.notebook_runtime_errors, }, info={ "eval_mode": self.eval_mode, diff --git a/src/fhda/notebook_env.py b/src/fhda/notebook_env.py index 6ec9b8f..7da27a3 100644 --- a/src/fhda/notebook_env.py +++ b/src/fhda/notebook_env.py @@ -53,6 +53,7 @@ def __init__( # Add initial cell with rpy2 extension load nbformat.v4.new_code_cell(source="%load_ext rpy2.ipython") self.nb.metadata.kernelspec = self.language.make_kernelspec() + self.notebook_runtime_errors: list[str] = [] def save_nb(self): """Saves the notebook to disk.""" @@ -248,11 +249,10 @@ def list_workdir(self) -> str: The contents is represented as a nested JSON dictionary. """ - logger.info("Listing working directory: %s", self.state.work_dir) return json.dumps(self._list_dir(self.state.work_dir), indent=2) # allowing int so that agent doesn't try to force to float - def submit_answer(self, answer: str | float | int) -> str: # noqa: PYI041 + def submit_answer(self, answer: str) -> str: # noqa: PYI041 """Submit an answer to the problem. Note that this tool may only be called once and ends the episode. @@ -329,9 +329,11 @@ async def _run_notebook_local(self) -> str: """Run notebook using local kernel.""" client = self.state.kernel_manager.client() client.start_channels() - working_dir_files = list(self.state.work_dir.glob("**/*")) - logger.info(f"Files in working directory: {working_dir_files}") - await utils.nbformat_run_notebook(cells=self.state.cells, client=client) + error_messages = await utils.nbformat_run_notebook( + cells=self.state.cells, client=client + ) + if error_messages: + self.state.notebook_runtime_errors.extend(error_messages) self.state.save_nb() logger.debug("Saved notebook to disk") self.state.reload_nb() diff --git a/src/fhda/prompts.py b/src/fhda/prompts.py index 20ddc41..b480e53 100644 --- a/src/fhda/prompts.py +++ b/src/fhda/prompts.py @@ -61,6 +61,19 @@ - The first cell has already been loaded with %load_ext rpy2.ipython so you can use %%R cells from the second cell onwards """ +GENERAL_NOTEBOOK_GUIDELINES_R = """ +General Guidelines: +- Write small to medium-sized cells for easier debugging. +- Edit existing cells by their index number when fixing bugs, rather than creating new ones. +- Check dataframe shapes before printing. Use head() for large dataframes. +- Ensure each cell executes successfully before moving to the next. +- Assume you already have the packages you need installed and only install new ones if you receive errors. +- If you need to install packages, use mamba or conda. +IMPORTANT: Use R cells for all analysis. +- All cells are by default R cells. +""" + + AVOID_IMAGES = """ AVOID USING PLOTS/IMAGES. USE TABLES AND PRINT OUTPUTS INSTEAD AS MUCH AS POSSIBLE. """ diff --git a/src/fhda/utils.py b/src/fhda/utils.py index 1b1c1a1..8b76338 100644 --- a/src/fhda/utils.py +++ b/src/fhda/utils.py @@ -149,7 +149,7 @@ def encode_image_to_base64(image: str) -> str: async def nbformat_run_notebook( cells: Iterable[nbformat.NotebookNode], client: "AsyncKernelClient" -) -> None: +) -> list[str]: """Execute notebook cells using a kernel client and collect outputs. Args: @@ -158,7 +158,11 @@ async def nbformat_run_notebook( Raises: ValueError: If there is an error executing a cell + + Returns: + List of error messages from cells that raised an error """ + error_messages = [] try: logger.debug("Beginning cell execution") for idx, cell in enumerate(cells): @@ -221,8 +225,11 @@ async def nbformat_run_notebook( f"Value: {content.get('evalue', 'No error message')}\n" f"Traceback: {content.get('traceback', [])}" ) + error_messages.append( + f"Cell {idx}: {content.get('evalue', '')}" + ) logger.error(error_msg) - raise ValueError(error_msg) + # raise ValueError(error_msg) elif ( msg_type == "status" and content["execution_state"] == "idle" @@ -233,6 +240,8 @@ async def nbformat_run_notebook( logger.debug("Stopping kernel channels") client.stop_channels() + return error_messages + async def exec_cmd( container: DockerContainer, exec_command: list[str], timeout: float | None = 300 diff --git a/src/scripts/config.py b/src/scripts/config.py index 531c1f0..79cc4ce 100644 --- a/src/scripts/config.py +++ b/src/scripts/config.py @@ -16,7 +16,7 @@ from pydantic import BaseModel, ConfigDict from pydantic_core import PydanticUndefined -from .logging import configure_logs +from .expt_logging import configure_logs logger = logging.getLogger(__name__) diff --git a/src/scripts/deploy.py b/src/scripts/deploy.py index cf8c3cd..09ac8e1 100644 --- a/src/scripts/deploy.py +++ b/src/scripts/deploy.py @@ -9,6 +9,9 @@ FramePath, AuthType, ) +from crow_client.models.app import TaskQueuesConfig + +EVAL = True ENV_VARS = { "OPENAI_API_KEY": os.environ["OPENAI_API_KEY"], @@ -16,6 +19,7 @@ "USE_R": "false", "USE_DOCKER": "false", "STAGE": "DEV", + "EVAL": "true" if EVAL else "false", } CONTAINER_CONFIG = DockerContainerConfiguration(cpu="2", memory="4Gi") @@ -29,13 +33,18 @@ CrowDeploymentConfig( requirements_path=Path("pyproject.toml"), path=Path("src"), - name="data-analysis-crow", + name="bixbench-crow" if EVAL else "data-analysis-crow", environment="src.fhda.data_analysis_env.DataAnalysisEnv", environment_variables=ENV_VARS, agent="ldp.agent.ReActAgent", container_config=CONTAINER_CONFIG, force=True, frame_paths=frame_paths, + timeout=1200, + task_queues_config=TaskQueuesConfig( + name="bixbench-crow" if EVAL else "data-analysis-crow", + max_running_jobs=300, + ), ), ] diff --git a/src/scripts/platform_eval.py b/src/scripts/platform_eval.py new file mode 100644 index 0000000..179e05f --- /dev/null +++ b/src/scripts/platform_eval.py @@ -0,0 +1,185 @@ +import asyncio +import json +import logging +import os +import uuid +from typing import Any +import ast +import time + +import datasets +from ldp.agent import AgentConfig +from aviary.core import MultipleChoiceQuestion +from crow_client import CrowClient +from crow_client.models import Stage, JobRequest, RuntimeConfig +from crow_client.models.app import AuthType +import src.fhda.prompts as prompts + +logger = logging.getLogger(__name__) + +JOB_NAME = "job-futurehouse-bixbench-crow-dev" +CROW_STAGE = Stage.LOCAL +API_KEY = os.environ.get("CROW_API_KEY") +RUN_UUID = str(uuid.uuid4()) +GCS_ARTIFACT_PATH = "bixbench_data/" +HF_REPO = "futurehouse/bixbench" +MODEL = "claude-3-7-sonnet-20250219" +TEMPERATURE = 1 +NUM_RETRIES = 3 +MAX_STEPS = 30 +AVOID_IMAGES = True +KEEP_NOTEBOOKS = False +NUM_ITERATIONS = 1 +RUN_NAME = "baseline-3.7" +RESULTS_FILE = f"local/bixbench_runs/{RUN_NAME}-{time.strftime('%Y%m%d-%H%M%S')}.json" +RUNTIME_PARAMS = { + "model": MODEL, + "temperature": TEMPERATURE, + "num_retries": NUM_RETRIES, + "max_steps": MAX_STEPS, + "avoid_images": AVOID_IMAGES, + "run_name": RUN_NAME, +} + + +async def prepare_job(capsule: dict[str, Any]) -> JobRequest: + """ + Prepare a job for a capsule. + """ + + formatted_question = "\n-------\n".join( + [i.question_prompt for i in capsule["questions"]] + ) + + task = f"""\ + Here is the user query to address: + + + {formatted_question} + + + {prompts.CHAIN_OF_THOUGHT_AGNOSTIC} + {prompts.SUBMIT_ANSWER_OPEN} + {prompts.GENERAL_NOTEBOOK_GUIDELINES}""" + + if AVOID_IMAGES: + task += prompts.AVOID_IMAGES + agent = AgentConfig( + agent_type="ReActAgent", + agent_kwargs={ + "llm_model": { + "model": MODEL, + "temperature": TEMPERATURE, + "num_retries": NUM_RETRIES, + }, + "hide_old_env_states": True, + "runtime_params": RUNTIME_PARAMS, # type: ignore + }, + ) + job_data = JobRequest( + name=JOB_NAME, + query=task, + runtime_config=RuntimeConfig( + agent=agent, max_steps=MAX_STEPS, upload_id=capsule["data_folder"] + ), + ) + return job_data + + +async def load_bixbench_data( + open_question: bool = True, +) -> list[dict[str, Any]]: + """Load the BixBench dataset.""" + data = datasets.load_dataset(HF_REPO, split="train").to_list() + processed_dataset = [] + for capsule in data: + raw_questions = ast.literal_eval(capsule["questions"]) + processed_questions = [ + MultipleChoiceQuestion( + question=i["question"], + options=[ + i["ideal_answer"], + i["distractor_1"], + i["distractor_2"], + i["distractor_3"], + ], + ideal_answer=i["ideal_answer"], + shuffle_seed=MultipleChoiceQuestion.SEED_USING_QUESTION, + prompt_without_options=open_question, + question_id=i["id"], + ) + for i in raw_questions + ] + processed_dataset.append( + { + "data_folder": GCS_ARTIFACT_PATH + + capsule["data_folder"].replace(".zip", ""), + "short_id": capsule["short_id"], + "uuid": capsule["uuid"], + "questions": processed_questions, + } + ) + return processed_dataset + + +async def submit_jobs( + data: list[dict[str, Any]], +) -> list[dict[str, Any]]: + """ + Submit a question to the Crow service and wait for the answer. + + Args: + client: The CrowJobClient instance + questions: The MultipleChoiceQuestions to submit + timeout: Maximum time to wait for an answer in seconds + + Returns: + The answer string from the agent + """ + + client = CrowClient( + stage=CROW_STAGE, + auth_type=AuthType.API_KEY, + api_key=API_KEY, + ) + + jobs = [] + for iteration in range(1, NUM_ITERATIONS + 1): + for capsule in data[:1]: + job_request = await prepare_job(capsule) + job_id = client.create_job(job_request) + logger.info( + "Submitted job %s with question: %s", job_id, capsule["short_id"] + ) + job_metadata = { + **job_request.model_dump(), + **capsule, + "job_id": job_id, + "iteration": iteration, + } + job_metadata["questions"] = [ + i.model_dump() for i in job_metadata["questions"] + ] + jobs.append(job_metadata) + + return jobs + + +async def save_results(jobs: list[dict[str, Any]], output_file: str): + with open(output_file, "w", encoding="utf-8") as f: + json.dump(jobs, f, indent=4) + + +async def main(): + data = await load_bixbench_data() + jobs = await submit_jobs(data) + await save_results(jobs, RESULTS_FILE) + + +if __name__ == "__main__": + logging.basicConfig( + level=logging.INFO, + format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", + handlers=[logging.StreamHandler()], + ) + asyncio.run(main())