Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions src/fhda/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,4 @@
else:
DATA_STORAGE_PATH = Path("/storage")

EVAL = bool(os.getenv("EVAL", "false").lower() == "true")

VALID_FROM_TASK_KWARGS = ["run_notebook_on_edit"]
133 changes: 41 additions & 92 deletions src/fhda/data_analysis_env.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import hashlib
import logging
import shutil
import json
from typing import Any, cast
import time
from aviary.core import (
Expand Down Expand Up @@ -100,55 +99,12 @@ def export_frame(self) -> Frame:
},
)

@classmethod
def eval_from_task(
cls, task: str, gcs_artifact_path: str, environment_config: str | None = None
) -> "DataAnalysisEnv":
"""
Used for evaluations via crow jobs.

Args:
task: The user query structured as <data_path> | <query>
gcs_artifact_path: The path to the GCS artifact – required for evaluation on crow jobs
"""
logger.info("Using the eval_from_task method")
# Create temporary directory in GCP mounted storage volume
task_hash = hashlib.sha256(task.encode()).hexdigest()
trajectory_path = cfg.DATA_STORAGE_PATH / f"{task_hash}-{time.time()}"
trajectory_path.mkdir(parents=True, exist_ok=True)
logger.info("Trajectory path: %s", trajectory_path)
nb_path = trajectory_path / NBEnvironment.NOTEBOOK_NAME
# Copy task data to trajectory path
for item in (cfg.DATA_STORAGE_PATH / gcs_artifact_path).iterdir():
if item.is_file():
shutil.copy2(item, trajectory_path)
elif item.is_dir():
shutil.copytree(item, trajectory_path / item.name, dirs_exist_ok=True)

language = NBLanguage.PYTHON # In future, this should be a hyperparameter
if trajectory_path.exists():
logger.info(
"Files in directory: %s", [f.name for f in trajectory_path.iterdir()]
)

return cls(
problem_id=f"data-analysis-task-{task_hash}",
problem=task,
# Using exact just because I won't ultimately be using env evaluation
eval_mode=EvalAnswerMode.EXACT,
nb_path=nb_path,
work_dir=trajectory_path,
language=language,
system_prompt=prompts.CAPSULE_SYSTEM_PROMPT_OPEN,
use_tmp_work_dir=False,
)

@classmethod
def from_task(
cls,
task: str,
gcs_artifact_path: str | None = None,
environment_config: str | None = None,
environment_config: dict[str, Any] | None = None,
) -> "DataAnalysisEnv":
"""
Perform data analysis on a user query.
Expand All @@ -161,74 +117,67 @@ def from_task(
logger.info("User task: %s", task)
logger.info("GCS artifact path: %s", gcs_artifact_path)
logger.info("environment_config: %s", environment_config)
if cfg.EVAL:
return cls.eval_from_task(task, gcs_artifact_path) # type: ignore

if (
not gcs_artifact_path
): # The files are already in the GCS bucket in a job-specific directory
): # Platform jobs should always be associated with data from a GCS bucket
raise NotImplementedError(
"Running crow jobs without gcs_artifact_path is not supported"
)
trajectory_path = cfg.DATA_STORAGE_PATH / gcs_artifact_path
nb_path = trajectory_path / NBEnvironment.NOTEBOOK_NAME
query = task
task_hash = gcs_artifact_path

if environment_config:
kwargs = {
k: v
for k, v in json.loads(environment_config).items()
for k, v in environment_config.items()
if k in cfg.VALID_FROM_TASK_KWARGS
}
else:
kwargs = {}
logger.info("Filtered kwargs: %s", kwargs)

# Augment incoming task with CoT instructions
augmented_task = f"""\
Here is the user query to address:

<query>
{query}
</query>

{prompts.CHAIN_OF_THOUGHT_AGNOSTIC}
{prompts.GENERAL_NOTEBOOK_GUIDELINES}"""
task_hash = hashlib.sha256(task.encode()).hexdigest()
if kwargs.get("eval", False):
# Create a temporary directory in GCP mounted storage volume
trajectory_path = cfg.DATA_STORAGE_PATH / f"{task_hash}-{time.time()}"
trajectory_path.mkdir(parents=True, exist_ok=True)
for item in (cfg.DATA_STORAGE_PATH / gcs_artifact_path).iterdir():
if item.is_file():
shutil.copy2(item, trajectory_path)
elif item.is_dir():
shutil.copytree(
item, trajectory_path / item.name, dirs_exist_ok=True
)
else:
# Use the GCP folder created when uploading the data via the platform
trajectory_path = cfg.DATA_STORAGE_PATH / gcs_artifact_path
# Augment incoming user query with CoT instructions
task = (
f"Here is the user query to address:\n"
f"<query>\n"
f"{task}\n"
f"</query>\n"
f"{prompts.CHAIN_OF_THOUGHT_AGNOSTIC}\n"
f"{prompts.GENERAL_NOTEBOOK_GUIDELINES}"
)
logger.info("Trajectory path: %s", trajectory_path)
nb_path = trajectory_path / NBEnvironment.NOTEBOOK_NAME

language = NBLanguage.PYTHON # In future, this should be a hyperparameter
if language == NBLanguage.R:
augmented_task += f"\n{prompts.R_OUTPUT_RECOMMENDATION_PROMPT}"

# Log all parameters being passed to constructor
logger.info(
"Creating DataAnalysisEnv with parameters: "
"problem_id=data-analysis-task-%s, "
"problem=%s, "
"eval_mode=%s, "
"nb_path=%s, "
"work_dir=%s, "
"language=%s, "
"system_prompt=%s, "
"use_tmp_work_dir=%s, "
"gcs_artifact_path=%s",
task_hash,
augmented_task,
EvalAnswerMode.LLM,
nb_path,
trajectory_path,
language,
prompts.CAPSULE_SYSTEM_PROMPT_QUERY,
False,
gcs_artifact_path,
)
task += f"\n{prompts.R_OUTPUT_RECOMMENDATION_PROMPT}"

if trajectory_path.exists():
logger.info(
"Files in directory: %s", [f.name for f in trajectory_path.iterdir()]
)
files = list(trajectory_path.iterdir())
logger.info("Files in directory: %s", [f.name for f in files])
if not files:
raise ValueError(
f"No files found in trajectory path: {trajectory_path}"
)
else:
raise ValueError(f"Trajectory path does not exist: {trajectory_path}")

return cls(
problem_id=f"data-analysis-task-{task_hash}",
problem=augmented_task,
problem=task,
eval_mode=EvalAnswerMode.LLM,
nb_path=nb_path,
work_dir=trajectory_path,
Expand Down
7 changes: 3 additions & 4 deletions src/scripts/deploy.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@
"ANTHROPIC_API_KEY": os.environ["ANTHROPIC_API_KEY"],
"USE_R": "false",
"USE_DOCKER": "false",
"STAGE": "DEV",
"EVAL": "true" if EVAL else "false",
"STAGE": "PROD",
}

CONTAINER_CONFIG = DockerContainerConfiguration(cpu="2", memory="4Gi")
Expand All @@ -33,7 +32,7 @@
CrowDeploymentConfig(
requirements_path=Path("pyproject.toml"),
path=Path("src"),
name="bixbench-crow2" if EVAL else "data-analysis-crow",
name="data-analysis-crow",
environment="src.fhda.data_analysis_env.DataAnalysisEnv",
environment_variables=ENV_VARS,
agent="ldp.agent.ReActAgent",
Expand All @@ -42,7 +41,7 @@
frame_paths=frame_paths,
timeout=3600,
task_queues_config=TaskQueuesConfig(
name="bixbench-crow2" if EVAL else "data-analysis-crow",
name="data-analysis-crow",
max_running_jobs=300,
),
),
Expand Down