Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 112 additions & 0 deletions tests/test_artifacts.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
"""Tests for run artifact persistence and path sanitization."""

import json

import pytest

from weco.artifacts import RunArtifacts, _sanitize_artifact_path


@pytest.fixture
def artifacts(tmp_path):
return RunArtifacts(log_dir=str(tmp_path), run_id="test-run")


def _read_manifest(path):
return json.loads(path.read_text())


@pytest.mark.parametrize(
("raw_path", "expected_parts"),
[
("model.py", ("model.py",)),
("src/model.py", ("src", "model.py")),
("./src/model.py", ("src", "model.py")),
("/absolute/path.py", ("absolute", "path.py")),
("src\\utils\\helper.py", ("src", "utils", "helper.py")),
("../../etc/passwd", ("etc", "passwd")),
("", ("unnamed_file",)),
("../../..", ("unnamed_file",)),
],
)
def test_sanitize_artifact_path(raw_path, expected_parts):
assert _sanitize_artifact_path(raw_path).parts == expected_parts


def test_save_step_code_writes_files_and_manifest(artifacts):
bundle = artifacts.save_step_code(
step=3, file_map={"src/model.py": "class Model: pass", "src/utils.py": "def helper(): pass"}
)

assert bundle == artifacts.root / "steps" / "3"
assert (bundle / "files" / "src" / "model.py").read_text() == "class Model: pass"
assert (bundle / "files" / "src" / "utils.py").read_text() == "def helper(): pass"

manifest = _read_manifest(bundle / "manifest.json")
assert manifest["type"] == "step_code_snapshot"
assert manifest["step"] == 3
assert manifest["file_count"] == 2
assert [file_entry["path"] for file_entry in manifest["files"]] == ["src/model.py", "src/utils.py"]
assert [file_entry["artifact_path"] for file_entry in manifest["files"]] == ["src/model.py", "src/utils.py"]


def test_save_step_code_keeps_steps_independent(artifacts):
artifacts.save_step_code(step=0, file_map={"f.py": "v1"})
artifacts.save_step_code(step=1, file_map={"f.py": "v2"})

assert (artifacts.root / "steps" / "0" / "files" / "f.py").read_text() == "v1"
assert (artifacts.root / "steps" / "1" / "files" / "f.py").read_text() == "v2"


def test_save_best_code_writes_manifest_without_step(artifacts):
bundle = artifacts.save_best_code({"model.py": "optimized = True"})

assert bundle == artifacts.root / "best"
assert (bundle / "files" / "model.py").read_text() == "optimized = True"

manifest = _read_manifest(bundle / "manifest.json")
assert manifest["type"] == "best_code_snapshot"
assert manifest["file_count"] == 1
assert "step" not in manifest


def test_save_execution_output_writes_step_file_and_jsonl_index(artifacts):
artifacts.save_execution_output(step=0, output="first")
artifacts.save_execution_output(step=1, output="second")

assert (artifacts.root / "outputs" / "step_0.out.txt").read_text() == "first"
assert (artifacts.root / "outputs" / "step_1.out.txt").read_text() == "second"

lines = (artifacts.root / "exec_output.jsonl").read_text().strip().split("\n")
assert len(lines) == 2
first_entry = json.loads(lines[0])
second_entry = json.loads(lines[1])

assert first_entry["step"] == 0
assert first_entry["output_file"] == "outputs/step_0.out.txt"
assert first_entry["output_length"] == len("first")
assert second_entry["step"] == 1
assert second_entry["output_file"] == "outputs/step_1.out.txt"
assert second_entry["output_length"] == len("second")


def test_root_directory_creation_is_idempotent(tmp_path):
first = RunArtifacts(log_dir=str(tmp_path), run_id="abc-123")
second = RunArtifacts(log_dir=str(tmp_path), run_id="abc-123")

assert first.root == second.root == (tmp_path / "abc-123")
assert first.root.exists()


def test_step_snapshot_sanitizes_path_traversal(artifacts, tmp_path):
artifacts.save_step_code(step=0, file_map={"../../etc/evil.py": "malicious"})

assert not (tmp_path / "etc" / "evil.py").exists()
assert (artifacts.root / "steps" / "0" / "files" / "etc" / "evil.py").exists()


def test_best_snapshot_sanitizes_path_traversal(artifacts, tmp_path):
artifacts.save_best_code({"../../../tmp/evil.py": "malicious"})

assert not (tmp_path.parent / "tmp" / "evil.py").exists()
assert (artifacts.root / "best" / "files" / "tmp" / "evil.py").exists()
4 changes: 2 additions & 2 deletions weco/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,8 @@ def _recover_suggest_after_transport_error(

def start_optimization_run(
console: Console,
source_code: str,
source_path: str,
source_code: str | dict[str, str],
source_path: str | None,
evaluation_command: str,
metric_name: str,
maximize: bool,
Expand Down
133 changes: 133 additions & 0 deletions weco/artifacts.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
"""On-disk artifact management for optimization runs.

Centralizes the directory layout and write logic for all artifacts
produced during an optimization run under .runs/<run_id>/.
"""

import json
import pathlib
from datetime import datetime


def _sanitize_artifact_path(path_value: str) -> pathlib.Path:
"""Convert a source path into a safe relative artifact path.

Strips traversal components (..), absolute prefixes, and Windows
drive letters so that artifacts are always written under the
intended directory.
"""
normalized = path_value.replace("\\", "/")
parts = pathlib.PurePosixPath(normalized).parts
safe_parts: list[str] = []
for part in parts:
if part in ("", ".", "/"):
continue
if part == "..":
continue
if not safe_parts and ":" in part:
part = part.replace(":", "_")
safe_parts.append(part)

if not safe_parts:
return pathlib.Path("unnamed_file")
return pathlib.Path(*safe_parts)


class RunArtifacts:
"""Manages the on-disk artifact layout for a single optimization run.

Layout::

<root>/
steps/<step>/
files/<relative_path> # actual code files
manifest.json # machine-readable index
best/
files/<relative_path>
manifest.json
outputs/
step_<n>.out.txt # execution stdout/stderr
exec_output.jsonl # centralized output index
"""

def __init__(self, log_dir: str, run_id: str) -> None:
self.root = pathlib.Path(log_dir) / run_id
self.root.mkdir(parents=True, exist_ok=True)

# ------------------------------------------------------------------
# Code snapshots
# ------------------------------------------------------------------

def save_step_code(self, step: int, file_map: dict[str, str]) -> pathlib.Path:
"""Write code snapshot + manifest for a given step.

Returns the bundle directory path.
"""
return self._write_code_bundle(file_map, label=("steps", str(step)))

def save_best_code(self, file_map: dict[str, str]) -> pathlib.Path:
"""Write code snapshot + manifest for the best result.

Returns the bundle directory path.
"""
return self._write_code_bundle(file_map, label=("best",))

# ------------------------------------------------------------------
# Execution output
# ------------------------------------------------------------------

def save_execution_output(self, step: int, output: str) -> None:
"""Save execution output as a per-step file and append to the JSONL index."""
timestamp = datetime.now().isoformat()

outputs_dir = self.root / "outputs"
# Keep raw execution output per step for easy local inspection.
outputs_dir.mkdir(parents=True, exist_ok=True)

step_file = outputs_dir / f"step_{step}.out.txt"
# Store full stdout/stderr for this exact step.
step_file.write_text(output, encoding="utf-8")

jsonl_file = self.root / "exec_output.jsonl"
entry = {
"step": step,
"timestamp": timestamp,
"output_file": step_file.relative_to(self.root).as_posix(),
"output_length": len(output),
}
# Append compact metadata so tooling can stream/index outputs.
with open(jsonl_file, "a", encoding="utf-8") as f:
f.write(json.dumps(entry) + "\n")

# ------------------------------------------------------------------
# Internal helpers
# ------------------------------------------------------------------

def _write_code_bundle(self, file_map: dict[str, str], label: tuple[str, ...]) -> pathlib.Path:
bundle_dir = self.root.joinpath(*label)
files_dir = bundle_dir / "files"
files_dir.mkdir(parents=True, exist_ok=True)

files_manifest: list[dict[str, str | int]] = []
for source_path, content in sorted(file_map.items()):
artifact_rel = _sanitize_artifact_path(source_path)
artifact_path = files_dir / artifact_rel
artifact_path.parent.mkdir(parents=True, exist_ok=True)
artifact_path.write_text(content, encoding="utf-8")
files_manifest.append(
{"path": source_path, "artifact_path": artifact_rel.as_posix(), "bytes": len(content.encode("utf-8"))}
)

is_step = label[0] == "steps"
manifest: dict = {
"type": "step_code_snapshot" if is_step else "best_code_snapshot",
"created_at": datetime.now().isoformat(),
"file_count": len(files_manifest),
"files": files_manifest,
}
if is_step:
manifest["step"] = int(label[1])

manifest_path = bundle_dir / "manifest.json"
manifest_path.write_text(json.dumps(manifest, indent=2) + "\n", encoding="utf-8")
return bundle_dir
24 changes: 15 additions & 9 deletions weco/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
RunStartAttemptedEvent,
)
from .utils import check_for_cli_updates, get_default_model, UnrecognizedAPIKeysError, DefaultModelNotFoundError
from .validation import validate_source_file, validate_log_directory, ValidationError, print_validation_error
from .validation import validate_sources, validate_log_directory, ValidationError, print_validation_error


install(show_locals=True)
Expand Down Expand Up @@ -55,12 +55,15 @@ def parse_api_keys(api_key_args: list[str] | None) -> dict[str, str]:
# Function to define and return the run_parser (or configure it on a passed subparser object)
# This helps keep main() cleaner and centralizes run command arg definitions.
def configure_run_parser(run_parser: argparse.ArgumentParser) -> None:
run_parser.add_argument(
"-s",
"--source",
source_group = run_parser.add_mutually_exclusive_group(required=True)
source_group.add_argument(
"-s", "--source", type=str, help="Path to a single source code file to be optimized (e.g., `optimize.py`)"
)
source_group.add_argument(
"--sources",
nargs="+",
type=str,
required=True,
help="Path to the source code file that will be optimized (e.g., `optimize.py`)",
help="Paths to multiple source code files to be optimized together (e.g., `model.py utils.py config.py`)",
)
run_parser.add_argument(
"-c",
Expand Down Expand Up @@ -111,7 +114,7 @@ def configure_run_parser(run_parser: argparse.ArgumentParser) -> None:
run_parser.add_argument(
"--save-logs",
action="store_true",
help="Save execution output to .runs/<run-id>/outputs/step_<n>.out.txt with JSONL index",
help="Save execution output to .runs/<run-id>/outputs/step_<n>.out.txt with JSONL index. Code snapshots are written to .runs/<run-id>/steps/<step>/files and .runs/<run-id>/best/files.",
)
run_parser.add_argument(
"--apply-change",
Expand Down Expand Up @@ -263,9 +266,12 @@ def execute_run_command(args: argparse.Namespace) -> None:

ctx = get_event_context()

# Normalize source input so --source follows the same internal path as --sources
source_arg = args.sources if args.sources is not None else [args.source]

# Early validation — fail fast with helpful errors
try:
validate_source_file(args.source)
validate_sources(source_arg)
validate_log_directory(args.log_dir)
except ValidationError as e:
print_validation_error(e, console)
Expand Down Expand Up @@ -301,7 +307,7 @@ def execute_run_command(args: argparse.Namespace) -> None:
)

success = optimize(
source=args.source,
source=source_arg,
eval_command=args.eval_command,
metric=args.metric,
goal=args.goal,
Expand Down
Loading