diff --git a/src/agentlab/experiments/reproducibility_util.py b/src/agentlab/experiments/reproducibility_util.py index 01f3fdc9..fd5af408 100644 --- a/src/agentlab/experiments/reproducibility_util.py +++ b/src/agentlab/experiments/reproducibility_util.py @@ -324,6 +324,21 @@ def append_to_journal( info, report_df: pd.DataFrame, journal_path=None, strict_reproducibility=True ): """Append the info and results to the reproducibility journal.""" + journal_path, headers = get_headers_from_journal(journal_path) + + rows = create_journal_entries(info, report_df, strict_reproducibility, headers) + + write_entries_to_journal(journal_path, rows) + + +def write_entries_to_journal(journal_path, rows): + with open(journal_path, "a", newline="") as file: + writer = csv.writer(file) + for row in rows: + writer.writerow(row) + + +def get_headers_from_journal(journal_path=None): if journal_path is None: try: _get_repo(agentlab) # if not based on git clone, this will raise an error @@ -339,6 +354,13 @@ def append_to_journal( logging.info(f"Appending to journal {journal_path}") + headers = None + if journal_path.exists(): + headers = _get_csv_headers(journal_path) + return journal_path, headers + + +def create_journal_entries(info, report_df, strict_reproducibility=True, headers=None): if len(report_df) != len(info["agent_names"]): raise ValueError( "Mismatch between the number of agents in reproducibility info and the summary report." @@ -347,12 +369,7 @@ def append_to_journal( report_df = _verify_report( report_df, info["agent_names"], strict_reproducibility=strict_reproducibility ) - rows = [] - headers = None - if journal_path.exists(): - headers = _get_csv_headers(journal_path) - if headers is None: # first creation headers = list(info.keys()) headers[headers.index("agent_names")] = "agent_name" @@ -366,8 +383,4 @@ def append_to_journal( _add_result_to_info(info_copy, report_df) rows.append([str(info_copy[key]) for key in headers]) - - with open(journal_path, "a", newline="") as file: - writer = csv.writer(file) - for row in rows: - writer.writerow(row) + return rows diff --git a/src/agentlab/experiments/study.py b/src/agentlab/experiments/study.py index b93b3ae2..4f3d2cb7 100644 --- a/src/agentlab/experiments/study.py +++ b/src/agentlab/experiments/study.py @@ -373,6 +373,13 @@ def _run(self, n_jobs=1, parallel_backend="joblib", strict_reproducibility=False avg_step_timeout=self.avg_step_timeout, ) + def get_journal_entries(self, strict_reproducibility=True, headers=None): + """Get the journal entries for the study.""" + _, summary_df, _ = self.get_results() + return repro.create_journal_entries( + self.reproducibility_info, summary_df, strict_reproducibility, headers + ) + def append_to_journal(self, strict_reproducibility=True): """Append the study to the journal. @@ -380,12 +387,11 @@ def append_to_journal(self, strict_reproducibility=True): strict_reproducibility: bool If True, incomplete experiments will raise an error. """ - _, summary_df, _ = self.get_results() - repro.append_to_journal( - self.reproducibility_info, - summary_df, - strict_reproducibility=strict_reproducibility, + journal_path, headers = repro.get_headers_from_journal() + rows = self.get_journal_entries( + strict_reproducibility=strict_reproducibility, headers=headers ) + repro.write_entries_to_journal(journal_path, rows) @property def name(self): diff --git a/src/agentlab/traces/__init__.py b/src/agentlab/traces/__init__.py new file mode 100644 index 00000000..487886d9 --- /dev/null +++ b/src/agentlab/traces/__init__.py @@ -0,0 +1,3 @@ +from .trace_utils import * +from .uploads import * +from .query import * diff --git a/src/agentlab/traces/query.py b/src/agentlab/traces/query.py new file mode 100644 index 00000000..09c3f92c --- /dev/null +++ b/src/agentlab/traces/query.py @@ -0,0 +1,58 @@ +from datasets import load_dataset +from typing import List +import os + +HF_TOKEN = os.getenv('HF_TOKEN') +HF_REPO_NAME = os.getenv('HF_REPO_NAME') + + +def query_traces_by_llm_and_benchmark(llm: str, benchmark: str) -> List[dict]: + """ + Query traces based on the provided LLM and benchmark. + :param llm: The name of the LLM (e.g., 'GPT-4'). + :param benchmark: The benchmark name (e.g., 'benchmark1'). + :return: A list of trace metadata dictionaries. + """ + INDEX_DATASET = f"{HF_REPO_NAME}/agent_traces_index" + + try: + dataset = load_dataset(INDEX_DATASET, use_auth_token=HF_TOKEN, split='train') + results = [ + { + 'exp_id': row['exp_id'], + 'study_id': row['study_id'], + 'llm': row['llm'], + 'benchmark': row['benchmark'], + 'trace_pointer': row['trace_pointer'] + } + for row in dataset + if row['llm'] == llm and row['benchmark'] == benchmark + ] + return results + except Exception as e: + print(f"Error querying traces for LLM '{llm}' and benchmark '{benchmark}': {e}") + return [] + + +def download_trace_by_experiment_id(exp_id: str, output_dir: str) -> None: + """ + Download the trace file based on the experiment ID. + :param exp_id: The ID of the experiment whose trace file needs to be downloaded. + :param output_dir: The directory where the trace file will be saved. + """ + TRACE_DATASET = f"{HF_REPO_NAME}/agent_traces_data" + + try: + dataset = load_dataset(TRACE_DATASET, use_auth_token=HF_TOKEN, split='train') + for row in dataset: + if row['exp_id'] == exp_id: + trace_file = row['zip_file'] + output_path = os.path.join(output_dir, trace_file) + dataset.download_and_prepare() + dataset.to_csv(output_path) + print(f"Trace file for experiment '{exp_id}' downloaded to {output_path}.") + return + print(f"Experiment ID '{exp_id}' not found in the dataset.") + except Exception as e: + print(f"Error downloading trace file for experiment '{exp_id}': {e}") + diff --git a/src/agentlab/traces/trace_utils.py b/src/agentlab/traces/trace_utils.py new file mode 100644 index 00000000..98ebe612 --- /dev/null +++ b/src/agentlab/traces/trace_utils.py @@ -0,0 +1,192 @@ +import os +import zipfile + +from bgym import ExpArgs, ExpResult +from datasets import Dataset, concatenate_datasets, load_dataset + +from agentlab.experiments.study import Study + +# Retrieve environment variables +hf_user_name = os.getenv("HF_REPO_NAME") +hf_token = os.getenv("HF_TOKEN") + + +def upload_study(study: Study): + """ + Uploads study details to the Hugging Face `STUDY_DATASET`. + + Args: + study (Study): The study object containing the experiment details. + """ + + if not hf_user_name or not hf_token: + raise ValueError("HF_REPO_NAME and HF_TOKEN environment variables must be set.") + + study_dataset = f"{hf_user_name}/agent_traces_study" + + # Load existing dataset or create a new one + try: + dataset = load_dataset(study_dataset, split="train", token=hf_token) + existing_data = dataset.to_dict() + headers = dataset.column_names + except Exception as e: + print(f"Could not load existing dataset: {e}. Creating a new dataset.") + existing_data = None + headers = None + + # Create a new dataset with the new study details + entries = study.get_journal_entries( + strict_reproducibility=False, headers=headers + ) # type: list[list] + if headers is None: + headers = entries[0] + + entries = entries[1:] + entries = list(zip(*entries)) + new_data = Dataset.from_dict({header: entries[i] for i, header in enumerate(headers)}) + + # Concatenate with existing data if available + if existing_data: + existing_dataset = Dataset.from_dict(existing_data) + combined_data = concatenate_datasets([existing_dataset, new_data]) + else: + combined_data = new_data + + # Push updated dataset to the Hugging Face Hub + try: + combined_data.push_to_hub(study_dataset, token=hf_token, create_pr=True) + print("Study details uploaded successfully!") + except Exception as e: + print(f"Failed to upload study details: {e}") + + +def upload_trace(exp_args: ExpArgs) -> str: + """ + Compresses a directory into a zip file, uploads it to the TRACE_DATASET on Hugging Face, + and returns the URL of the uploaded file. + + Args: + exp_args (ExpArgs): The experiment arguments. + + Returns: + str: The URL of the uploaded zip file in the dataset. + """ + # # Check if the benchmark is whitelisted + # WHITELISTED_BENCHMARKS = ["benchmark1", "benchmark2"] + # if benchmark not in WHITELISTED_BENCHMARKS: + # raise ValueError("Benchmark not whitelisted") + + if not hf_user_name or not hf_token: + raise ValueError("HF_REPO_NAME and HF_TOKEN environment variables must be set.") + + trace_dataset = f"{hf_user_name}/agent_traces_data" + + # Create a zip file from the directory + zip_filename = f"{exp_args.exp_id}.zip" + with zipfile.ZipFile(zip_filename, "w", zipfile.ZIP_DEFLATED) as zipf: + for root, _, files in os.walk(exp_args.exp_dir): + for file in files: + file_path = os.path.join(root, file) + zipf.write(file_path, os.path.relpath(file_path, exp_args.exp_dir)) + + print(f"Directory '{exp_args}' compressed into '{zip_filename}'.") + + # Load existing dataset or create a new one + try: + dataset = load_dataset(trace_dataset, use_auth_token=hf_token, split="train") + existing_data = {"exp_id": dataset["exp_id"], "zip_file": dataset["zip_file"]} + except Exception as e: + print(f"Could not load existing dataset: {e}. Creating a new dataset.") + existing_data = None + + # Create a new dataset with the new experiment trace + new_data = Dataset.from_dict({"exp_id": [exp_args.exp_id], "zip_file": [zip_filename]}) + + # Concatenate with existing data if available + if existing_data: + existing_dataset = Dataset.from_dict(existing_data) + combined_data = concatenate_datasets([existing_dataset, new_data]) + else: + combined_data = new_data + + # Push updated dataset to the Hugging Face Hub + combined_data.push_to_hub(trace_dataset, token=hf_token) + print("Experiment trace uploaded successfully!") + + # Clean up the local zip file + os.remove(zip_filename) + print(f"Temporary zip file '{zip_filename}' removed.") + + # Construct and return the file URL on the Hugging Face Hub + file_url = f"https://huggingface.co/datasets/{trace_dataset}/resolve/main/{zip_filename}" + print(f"File URL: {file_url}") + + return file_url + + +def update_index( + exp_results: list[ExpResult], study_id: str, license: str, trace_pointers: list[str] +): + """ + Adds a record to the INDEX_DATASET on Hugging Face with the given experiment details. + + Args: + exp_args (ExpArgs): The experiment arguments. + study_id (str): The study ID associated with this experiment. + license (str): The license type for the experiment trace. + trace_pointer (str): The URL of the experiment trace file. + """ + + if not hf_user_name or not hf_token: + raise ValueError("HF_REPO_NAME and HF_TOKEN environment variables must be set.") + + index_dataset = f"{hf_user_name}/agent_traces_index" + + # Load existing dataset or create a new one + try: + dataset = load_dataset(index_dataset, use_auth_token=hf_token, split="train") + existing_data = dataset.to_dict() + except Exception as e: + print(f"Could not load existing dataset: {e}. Creating a new dataset.") + existing_data = None + + # Create a new dataset with the provided index details + + dataset = [exp_res.get_exp_record() for exp_res in exp_results] + for el in dataset: + el.pop("exp_dir") + + # list[dict] -> dict[list] + new_data = {key: [d[key] for d in dataset] for key in dataset[0].keys()} + new_data["study_id"] = [study_id.hex] * len(exp_results) + new_data["license"] = [license] * len(exp_results) + new_data["trace_pointer"] = trace_pointers + + new_data = Dataset.from_dict(new_data) + + # Concatenate with existing data if available + if existing_data: + existing_dataset = Dataset.from_dict(existing_data) + combined_data = concatenate_datasets([existing_dataset, new_data]) + else: + combined_data = new_data + + # Push updated dataset to the Hugging Face Hub + combined_data.push_to_hub(index_dataset, token=hf_token, create_pr=True) + print("Index dataset updated successfully!") + + +if __name__ == "__main__": + import os + import pathlib + + from agentlab.experiments.study import Study + from agentlab.traces.trace_utils import update_index, upload_study + + path = pathlib.Path("/path/to/study") + + study = Study.load(path) + study.load_exp_args_list() + + upload_study(study) + update_index(study.exp_args_list, study.uuid, "open", ["w/e"] * len(study.exp_args_list)) diff --git a/src/agentlab/traces/uploads.py b/src/agentlab/traces/uploads.py new file mode 100644 index 00000000..276cf17f --- /dev/null +++ b/src/agentlab/traces/uploads.py @@ -0,0 +1,87 @@ +from typing import List, Optional +from datetime import datetime +from typing import Optional, List +from datetime import datetime +from agentlab.traces.trace_utils import update_index +from traces import upload_trace,upload_study + +class Experiment: + """Represents a single experiment with relevant metadata.""" + + def __init__( + self, + exp_id: str, + study_id: str, + name: str, + llm: str, + benchmark: str, + license: str, + dir: str, + ): + + self.exp_id = exp_id + self.study_id = study_id + self.name = name + self.llm = llm + self.benchmark = benchmark + self.license = license + self.dir = dir + self.timestamp = datetime.now().isoformat() + + def __repr__(self): + return ( + f"Experiment(exp_id={self.exp_id}, study_id={self.study_id}, " + f"name={self.name}, llm={self.llm}, benchmark={self.benchmark}, " + + ) + + +class Study: + """Represents a study containing multiple experiments.""" + + + def __init__(self, study_id: str, study_name: str, description: str, experiments: List[Experiment]): + self.study_id = study_id + self.study_name = study_name + self.description = description + self.experiments = experiments + + def add_experiment(self, experiment: Experiment) -> None: + """Add an experiment to the study.""" + self.experiments.append(experiment) + + def remove_experiment(self, experiment_id: str) -> None: + """Remove an experiment from the study by ID.""" + self.experiments = [exp for exp in self.experiments if exp.exp_id != experiment_id] + + def get_experiment(self, experiment_id: str) -> Optional[Experiment]: + """Retrieve an experiment by ID.""" + for experiment in self.experiments: + if experiment.exp_id == experiment_id: + return experiment + return None + + def upload(self) -> None: + """Upload all experiment traces in the study to Hugging Face.""" + self.upload_Study() + for exp in self.experiments: + trace_pointer = upload_trace(exp.exp_id,exp.dir,exp.benchmark) + # Assign a license based on LLM and benchmark + LICENSES = { + ("GPT-4", "benchmark1"): "MIT", + ("Llama2", "benchmark2"): "Apache-2.0", + } + license_type = LICENSES.get((exp.exp_llm, exp.exp_benchmark), "Unknown") + update_index(exp.exp_id,self.study_id,exp.llm,exp.benchmark,license_type,trace_pointer) + + def upload_Study(self): + """Upload study to the study dataset""" + study_data = { + "study_id": [self.study_id], + "study_name": [self.study_name], + "description": [self.description], + } + upload_study(study_data) + + def __repr__(self): + return f"Study(id={self.study_id}, name={self.study_name}, experiments={len(self.experiments)})"