Skip to content
Open
33 changes: 23 additions & 10 deletions src/agentlab/experiments/reproducibility_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,21 @@ def append_to_journal(
info, report_df: pd.DataFrame, journal_path=None, strict_reproducibility=True
):
"""Append the info and results to the reproducibility journal."""
journal_path, headers = get_headers_from_journal(journal_path)

rows = create_journal_entries(info, report_df, strict_reproducibility, headers)

write_entries_to_journal(journal_path, rows)


def write_entries_to_journal(journal_path, rows):
with open(journal_path, "a", newline="") as file:
writer = csv.writer(file)
for row in rows:
writer.writerow(row)


def get_headers_from_journal(journal_path=None):
if journal_path is None:
try:
_get_repo(agentlab) # if not based on git clone, this will raise an error
Expand All @@ -339,6 +354,13 @@ def append_to_journal(

logging.info(f"Appending to journal {journal_path}")

headers = None
if journal_path.exists():
headers = _get_csv_headers(journal_path)
return journal_path, headers


def create_journal_entries(info, report_df, strict_reproducibility=True, headers=None):
if len(report_df) != len(info["agent_names"]):
raise ValueError(
"Mismatch between the number of agents in reproducibility info and the summary report."
Expand All @@ -347,12 +369,7 @@ def append_to_journal(
report_df = _verify_report(
report_df, info["agent_names"], strict_reproducibility=strict_reproducibility
)

rows = []
headers = None
if journal_path.exists():
headers = _get_csv_headers(journal_path)

if headers is None: # first creation
headers = list(info.keys())
headers[headers.index("agent_names")] = "agent_name"
Expand All @@ -366,8 +383,4 @@ def append_to_journal(
_add_result_to_info(info_copy, report_df)

rows.append([str(info_copy[key]) for key in headers])

with open(journal_path, "a", newline="") as file:
writer = csv.writer(file)
for row in rows:
writer.writerow(row)
return rows
16 changes: 11 additions & 5 deletions src/agentlab/experiments/study.py
Original file line number Diff line number Diff line change
Expand Up @@ -373,19 +373,25 @@ def _run(self, n_jobs=1, parallel_backend="joblib", strict_reproducibility=False
avg_step_timeout=self.avg_step_timeout,
)

def get_journal_entries(self, strict_reproducibility=True, headers=None):
"""Get the journal entries for the study."""
_, summary_df, _ = self.get_results()
return repro.create_journal_entries(
self.reproducibility_info, summary_df, strict_reproducibility, headers
)

def append_to_journal(self, strict_reproducibility=True):
"""Append the study to the journal.

Args:
strict_reproducibility: bool
If True, incomplete experiments will raise an error.
"""
_, summary_df, _ = self.get_results()
repro.append_to_journal(
self.reproducibility_info,
summary_df,
strict_reproducibility=strict_reproducibility,
journal_path, headers = repro.get_headers_from_journal()
rows = self.get_journal_entries(
strict_reproducibility=strict_reproducibility, headers=headers
)
repro.write_entries_to_journal(journal_path, rows)

@property
def name(self):
Expand Down
3 changes: 3 additions & 0 deletions src/agentlab/traces/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from .trace_utils import *
from .uploads import *
from .query import *
58 changes: 58 additions & 0 deletions src/agentlab/traces/query.py
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll update this to recent changes in the upload methods

Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
from datasets import load_dataset
from typing import List
import os

HF_TOKEN = os.getenv('HF_TOKEN')
HF_REPO_NAME = os.getenv('HF_REPO_NAME')


def query_traces_by_llm_and_benchmark(llm: str, benchmark: str) -> List[dict]:
"""
Query traces based on the provided LLM and benchmark.
:param llm: The name of the LLM (e.g., 'GPT-4').
:param benchmark: The benchmark name (e.g., 'benchmark1').
:return: A list of trace metadata dictionaries.
"""
INDEX_DATASET = f"{HF_REPO_NAME}/agent_traces_index"

try:
dataset = load_dataset(INDEX_DATASET, use_auth_token=HF_TOKEN, split='train')
results = [
{
'exp_id': row['exp_id'],
'study_id': row['study_id'],
'llm': row['llm'],
'benchmark': row['benchmark'],
'trace_pointer': row['trace_pointer']
}
for row in dataset
if row['llm'] == llm and row['benchmark'] == benchmark
]
return results
except Exception as e:
print(f"Error querying traces for LLM '{llm}' and benchmark '{benchmark}': {e}")
return []


def download_trace_by_experiment_id(exp_id: str, output_dir: str) -> None:
"""
Download the trace file based on the experiment ID.
:param exp_id: The ID of the experiment whose trace file needs to be downloaded.
:param output_dir: The directory where the trace file will be saved.
"""
TRACE_DATASET = f"{HF_REPO_NAME}/agent_traces_data"

try:
dataset = load_dataset(TRACE_DATASET, use_auth_token=HF_TOKEN, split='train')
for row in dataset:
if row['exp_id'] == exp_id:
trace_file = row['zip_file']
output_path = os.path.join(output_dir, trace_file)
dataset.download_and_prepare()
dataset.to_csv(output_path)
print(f"Trace file for experiment '{exp_id}' downloaded to {output_path}.")
return
print(f"Experiment ID '{exp_id}' not found in the dataset.")
except Exception as e:
print(f"Error downloading trace file for experiment '{exp_id}': {e}")

192 changes: 192 additions & 0 deletions src/agentlab/traces/trace_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
import os
import zipfile

from bgym import ExpArgs, ExpResult
from datasets import Dataset, concatenate_datasets, load_dataset

from agentlab.experiments.study import Study

# Retrieve environment variables
hf_user_name = os.getenv("HF_REPO_NAME")
hf_token = os.getenv("HF_TOKEN")


def upload_study(study: Study):
"""
Uploads study details to the Hugging Face `STUDY_DATASET`.

Args:
study (Study): The study object containing the experiment details.
"""

if not hf_user_name or not hf_token:
raise ValueError("HF_REPO_NAME and HF_TOKEN environment variables must be set.")

study_dataset = f"{hf_user_name}/agent_traces_study"

# Load existing dataset or create a new one
try:
dataset = load_dataset(study_dataset, split="train", token=hf_token)
existing_data = dataset.to_dict()
headers = dataset.column_names
except Exception as e:
print(f"Could not load existing dataset: {e}. Creating a new dataset.")
existing_data = None
headers = None

# Create a new dataset with the new study details
entries = study.get_journal_entries(
strict_reproducibility=False, headers=headers
) # type: list[list]
if headers is None:
headers = entries[0]

entries = entries[1:]
entries = list(zip(*entries))
new_data = Dataset.from_dict({header: entries[i] for i, header in enumerate(headers)})

# Concatenate with existing data if available
if existing_data:
existing_dataset = Dataset.from_dict(existing_data)
combined_data = concatenate_datasets([existing_dataset, new_data])
else:
combined_data = new_data

# Push updated dataset to the Hugging Face Hub
try:
combined_data.push_to_hub(study_dataset, token=hf_token, create_pr=True)
print("Study details uploaded successfully!")
except Exception as e:
print(f"Failed to upload study details: {e}")


def upload_trace(exp_args: ExpArgs) -> str:
"""
Compresses a directory into a zip file, uploads it to the TRACE_DATASET on Hugging Face,
and returns the URL of the uploaded file.

Args:
exp_args (ExpArgs): The experiment arguments.

Returns:
str: The URL of the uploaded zip file in the dataset.
"""
# # Check if the benchmark is whitelisted
# WHITELISTED_BENCHMARKS = ["benchmark1", "benchmark2"]
# if benchmark not in WHITELISTED_BENCHMARKS:
# raise ValueError("Benchmark not whitelisted")

if not hf_user_name or not hf_token:
raise ValueError("HF_REPO_NAME and HF_TOKEN environment variables must be set.")

trace_dataset = f"{hf_user_name}/agent_traces_data"

# Create a zip file from the directory
zip_filename = f"{exp_args.exp_id}.zip"
with zipfile.ZipFile(zip_filename, "w", zipfile.ZIP_DEFLATED) as zipf:
for root, _, files in os.walk(exp_args.exp_dir):
for file in files:
file_path = os.path.join(root, file)
zipf.write(file_path, os.path.relpath(file_path, exp_args.exp_dir))

print(f"Directory '{exp_args}' compressed into '{zip_filename}'.")

# Load existing dataset or create a new one
try:
dataset = load_dataset(trace_dataset, use_auth_token=hf_token, split="train")
existing_data = {"exp_id": dataset["exp_id"], "zip_file": dataset["zip_file"]}
except Exception as e:
print(f"Could not load existing dataset: {e}. Creating a new dataset.")
existing_data = None
Comment on lines +95 to +100
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Loading the traces dataset is going to be a problem, as the traces are really heavy (200GB for our TMLR paper).
Ideally we'd have smth more similar to your original version:

def upload_trace(trace_file: str, exp_id: str):
    api.upload_file(
        path_or_fileobj=trace_file,
        path_in_repo=f"{exp_id}.zip",
        repo_id=TRACE_DATASET,
        repo_type="dataset",
    )

We would trust the index dataset to avoid duplicates, and use the trace dataset as a container in which we'd dump the zipfiles.


# Create a new dataset with the new experiment trace
new_data = Dataset.from_dict({"exp_id": [exp_args.exp_id], "zip_file": [zip_filename]})

# Concatenate with existing data if available
if existing_data:
existing_dataset = Dataset.from_dict(existing_data)
combined_data = concatenate_datasets([existing_dataset, new_data])
else:
combined_data = new_data

# Push updated dataset to the Hugging Face Hub
combined_data.push_to_hub(trace_dataset, token=hf_token)
print("Experiment trace uploaded successfully!")

# Clean up the local zip file
os.remove(zip_filename)
print(f"Temporary zip file '{zip_filename}' removed.")

# Construct and return the file URL on the Hugging Face Hub
file_url = f"https://huggingface.co/datasets/{trace_dataset}/resolve/main/{zip_filename}"
print(f"File URL: {file_url}")

return file_url


def update_index(
exp_results: list[ExpResult], study_id: str, license: str, trace_pointers: list[str]
):
"""
Adds a record to the INDEX_DATASET on Hugging Face with the given experiment details.

Args:
exp_args (ExpArgs): The experiment arguments.
study_id (str): The study ID associated with this experiment.
license (str): The license type for the experiment trace.
trace_pointer (str): The URL of the experiment trace file.
"""

if not hf_user_name or not hf_token:
raise ValueError("HF_REPO_NAME and HF_TOKEN environment variables must be set.")

index_dataset = f"{hf_user_name}/agent_traces_index"

# Load existing dataset or create a new one
try:
dataset = load_dataset(index_dataset, use_auth_token=hf_token, split="train")
existing_data = dataset.to_dict()
except Exception as e:
print(f"Could not load existing dataset: {e}. Creating a new dataset.")
existing_data = None

# Create a new dataset with the provided index details

dataset = [exp_res.get_exp_record() for exp_res in exp_results]
for el in dataset:
el.pop("exp_dir")

# list[dict] -> dict[list]
new_data = {key: [d[key] for d in dataset] for key in dataset[0].keys()}
new_data["study_id"] = [study_id.hex] * len(exp_results)
new_data["license"] = [license] * len(exp_results)
new_data["trace_pointer"] = trace_pointers

new_data = Dataset.from_dict(new_data)

# Concatenate with existing data if available
if existing_data:
existing_dataset = Dataset.from_dict(existing_data)
combined_data = concatenate_datasets([existing_dataset, new_data])
else:
combined_data = new_data

# Push updated dataset to the Hugging Face Hub
combined_data.push_to_hub(index_dataset, token=hf_token, create_pr=True)
print("Index dataset updated successfully!")


if __name__ == "__main__":
import os
import pathlib

from agentlab.experiments.study import Study
from agentlab.traces.trace_utils import update_index, upload_study

path = pathlib.Path("/path/to/study")

study = Study.load(path)
study.load_exp_args_list()

upload_study(study)
update_index(study.exp_args_list, study.uuid, "open", ["w/e"] * len(study.exp_args_list))
Loading