Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 99 additions & 0 deletions pawn/lab/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,105 @@ def read_metrics(
trial.best_accuracy = acc


def read_pretrain_val_summary(trial: Trial) -> dict[str, Any] | None:
"""Scan the trial's metrics.jsonl for the latest pretraining val record
and compute a log-linear fit on forfeit rate over the most recent half
of the history.

Returns a dict with:
latest: the latest val record's key fields (game_completion_rate,
avg_plies_completed, forfeit min/max/median, legal, late_legal,
val_loss, step)
forfeit_fit: {slope_per_step, half_life_steps, n_points,
current_forfeit} computed from the most recent half of the
(step, forfeit_rate) series — matches the dashboard's
log-linear fit. The OLS itself is restricted to strictly
positive forfeit rates (log(0) would blow up), but
`current_forfeit` is always the last observed forfeit rate
from the full series — including 0.0 if the most recent eval
had no forfeits. Omitted if fewer than 4 val records have
game_completion_rate (not a pretraining run or too early),
or if fewer than 3 positive-forfeit points land in the
half-window.

Returns None if no val records are available.
"""
if trial.run_dir is None:
return None
metrics_path = Path(trial.run_dir) / "metrics.jsonl"
if not metrics_path.exists():
return None

steps: list[int] = []
forfeit_rates: list[float] = []
latest: dict[str, Any] | None = None

try:
with open(metrics_path) as f:
for line in f:
try:
rec = json.loads(line)
except (json.JSONDecodeError, ValueError):
continue
if rec.get("type") != "val":
continue
gc = rec.get("val/game_completion_rate")
if gc is None:
continue
step = rec.get("step")
if step is None:
continue
steps.append(int(step))
forfeit_rates.append(1.0 - float(gc))
latest = rec
except OSError:
return None

if not latest:
return None

summary: dict[str, Any] = {
"latest": {
"step": latest.get("step"),
"val_loss": latest.get("val/loss"),
"game_completion_rate": latest.get("val/game_completion_rate"),
"avg_plies_completed": latest.get("val/avg_plies_completed"),
"forfeit_ply_min": latest.get("val/min_forfeit_ply"),
"forfeit_ply_max": latest.get("val/max_forfeit_ply"),
"forfeit_ply_median": latest.get("val/median_forfeit_ply"),
"legal_move_rate": latest.get("val/legal_move_rate"),
"late_legal_move_rate": latest.get("val/late_legal_move_rate"),
},
}

n = len(steps)
if n >= 4:
half = n // 2
xs = steps[half:]
ys = forfeit_rates[half:]
# Only fit on strictly positive forfeit rates
pos = [(x, y) for x, y in zip(xs, ys) if y > 0]
if len(pos) >= 3:
xs_f = [float(x) for x, _ in pos]
ys_log = [math.log(y) for _, y in pos]
n_pts = len(xs_f)
mean_x = sum(xs_f) / n_pts
mean_y = sum(ys_log) / n_pts
num = sum((x - mean_x) * (y - mean_y) for x, y in zip(xs_f, ys_log))
den = sum((x - mean_x) ** 2 for x in xs_f)
if den > 0:
slope = num / den
half_life = math.log(2) / abs(slope) if slope != 0 else None
summary["forfeit_fit"] = {
"slope_per_step": slope,
"half_life_steps": half_life,
"n_points": n_pts,
"current_forfeit": forfeit_rates[-1],
}

return summary


def check_health(trial: Trial) -> str | None:
"""Return a health issue string, or None if healthy."""
loss = trial.last_train_loss
Expand Down
18 changes: 15 additions & 3 deletions pawn/lab/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,12 @@
from pathlib import Path
from typing import Any

from pawn.lab.monitor import check_health, is_alive, read_metrics
from pawn.lab.monitor import (
check_health,
is_alive,
read_metrics,
read_pretrain_val_summary,
)
from pawn.lab.state import Trial, _format_duration, _now_iso

log = logging.getLogger("pawn.lab")
Expand Down Expand Up @@ -521,7 +526,7 @@ def status(self) -> dict[str, Any]:
for t in self.trials.values():
if t.status == "running":
cfg = t.config or t.params
running.append({
row: dict[str, Any] = {
"trial": t.trial_id, "strategy": t.strategy,
"step": t.current_step, "total": t.total_steps,
"sps": round(t.steps_per_sec, 2),
Expand All @@ -532,7 +537,14 @@ def status(self) -> dict[str, Any]:
"key_hp": {k: v for k, v in cfg.items()
if k in ("lr", "lora_rank", "bottleneck_dim",
"density", "d_model", "n_layers", "batch_size")},
})
}
# For pretraining runs, surface game-completion metrics and
# the log-linear forfeit-rate fit (matches the dashboard chart).
if cfg.get("run_type") == "pretrain":
pretrain = read_pretrain_val_summary(t)
if pretrain:
row["pretrain"] = pretrain
running.append(row)
elapsed = time.time() - self.start_time
cost = (self.cost_per_hour * elapsed / 3600) if self.cost_per_hour else None
return {
Expand Down
4 changes: 3 additions & 1 deletion pawn/lab/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@ def _runner(ctx: Context) -> TrialRunner:

@mcp.tool
async def lab_status(ctx: Context) -> dict[str, Any]:
"""Compact lab status: GPUs, running trials (ID, strategy, key HPs, step/total, ETA, train_loss, train_acc, val_loss, val_acc), counts, elapsed time, cost. Train metrics update every log_interval steps; val metrics update at eval_interval. Use lab_log for real-time stdout."""
"""Compact lab status: GPUs, running trials (ID, strategy, key HPs, step/total, ETA, train_loss, train_acc, val_loss, val_acc), counts, elapsed time, cost. Train metrics update every log_interval steps; val metrics update at eval_interval. Use lab_log for real-time stdout.

For running pretraining trials, each row also carries a `pretrain` block with the latest game-completion metrics (game_completion_rate, avg_plies_completed, forfeit_ply min/max/median, legal/late_legal) and a `forfeit_fit` sub-block with a log-linear fit over the most recent half of the forfeit-rate history (slope_per_step, half_life_steps, current_forfeit, n_points). The fit is the primary late-stage convergence signal — it keeps moving after val_loss plateaus."""
return _runner(ctx).status()


Expand Down
180 changes: 179 additions & 1 deletion tests/lab/test_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,12 @@

import pytest

from pawn.lab.monitor import check_health, is_alive, read_metrics
from pawn.lab.monitor import (
check_health,
is_alive,
read_metrics,
read_pretrain_val_summary,
)
from pawn.lab.state import Trial


Expand Down Expand Up @@ -318,3 +323,176 @@ def test_no_total_steps_uses_500_threshold(self):
assert issue is not None
assert isinstance(issue, str) and len(issue) > 0
assert "NaN" in issue or "Inf" in issue


# =====================================================================
# read_pretrain_val_summary
# =====================================================================


def _val_record(step: int, gc: float, **kwargs) -> dict:
"""Build a minimal val record with game_completion_rate."""
rec = {
"type": "val",
"step": step,
"val/loss": 3.0,
"val/game_completion_rate": gc,
"val/avg_plies_completed": 300.0,
"val/min_forfeit_ply": 10.0,
"val/max_forfeit_ply": 400.0,
"val/median_forfeit_ply": 100.0,
"val/legal_move_rate": 0.997,
"val/late_legal_move_rate": 0.993,
}
rec.update(kwargs)
return rec


class TestReadPretrainValSummary:
def test_no_run_dir_returns_none(self):
trial = _make_trial()
assert read_pretrain_val_summary(trial) is None

def test_missing_metrics_file_returns_none(self, tmp_path):
trial = _make_trial()
trial.run_dir = str(tmp_path / "does_not_exist")
assert read_pretrain_val_summary(trial) is None

def test_no_val_records_returns_none(self, tmp_path):
run_dir = tmp_path / "run_x"
_write_metrics_file(run_dir / "metrics.jsonl", [
{"type": "train", "step": 10, "train/loss": 2.0},
])
trial = _make_trial()
trial.run_dir = str(run_dir)
assert read_pretrain_val_summary(trial) is None

def test_val_records_without_game_completion_return_none(self, tmp_path):
"""Adapter runs log val records but no game_completion_rate."""
run_dir = tmp_path / "run_x"
_write_metrics_file(run_dir / "metrics.jsonl", [
{"type": "val", "step": 100, "val/loss": 2.0, "val/accuracy": 0.5},
{"type": "val", "step": 200, "val/loss": 1.9, "val/accuracy": 0.55},
])
trial = _make_trial()
trial.run_dir = str(run_dir)
assert read_pretrain_val_summary(trial) is None

def test_returns_latest_without_fit_when_too_few_records(self, tmp_path):
"""Need n >= 4 val records to even attempt the fit."""
run_dir = tmp_path / "run_x"
_write_metrics_file(run_dir / "metrics.jsonl", [
_val_record(1000, 0.5),
_val_record(2000, 0.4),
_val_record(3000, 0.3),
])
trial = _make_trial()
trial.run_dir = str(run_dir)
summary = read_pretrain_val_summary(trial)

assert summary is not None
assert "latest" in summary
assert summary["latest"]["step"] == 3000
assert summary["latest"]["game_completion_rate"] == pytest.approx(0.3)
assert "forfeit_fit" not in summary

def test_returns_fit_on_known_series(self, tmp_path):
"""Construct a known exponential decay and verify OLS recovers it.

We pick forfeit_rate(step) = exp(-k * step + b) so log(forfeit) is
exactly linear in step with slope -k. The fit uses the second half
of the history.
"""
run_dir = tmp_path / "run_x"
k = 1e-5 # half-life = ln(2)/k ~= 69314 steps
b = math.log(0.5) # forfeit(0) = 0.5

records = []
n = 20
for i in range(n):
step = (i + 1) * 1000
forfeit = math.exp(-k * step + b)
gc = 1.0 - forfeit
records.append(_val_record(step, gc))

_write_metrics_file(run_dir / "metrics.jsonl", records)
trial = _make_trial()
trial.run_dir = str(run_dir)
summary = read_pretrain_val_summary(trial)

assert summary is not None
assert "forfeit_fit" in summary
fit = summary["forfeit_fit"]
assert fit["slope_per_step"] == pytest.approx(-k, rel=1e-6)
assert fit["half_life_steps"] == pytest.approx(math.log(2) / k, rel=1e-6)
# Second half of n=20 → 10 points
assert fit["n_points"] == 10
# current_forfeit is the last overall series value, not the fit window's
expected_current = math.exp(-k * (n * 1000) + b)
assert fit["current_forfeit"] == pytest.approx(expected_current)

def test_all_zero_forfeit_omits_fit(self, tmp_path):
"""If every forfeit rate is exactly 0 (perfect completion), the OLS
window has no positive points to fit and forfeit_fit is omitted."""
run_dir = tmp_path / "run_x"
_write_metrics_file(run_dir / "metrics.jsonl", [
_val_record(1000, 1.0),
_val_record(2000, 1.0),
_val_record(3000, 1.0),
_val_record(4000, 1.0),
_val_record(5000, 1.0),
])
trial = _make_trial()
trial.run_dir = str(run_dir)
summary = read_pretrain_val_summary(trial)

assert summary is not None
assert summary["latest"]["game_completion_rate"] == pytest.approx(1.0)
assert "forfeit_fit" not in summary

def test_latest_records_carries_all_fields(self, tmp_path):
"""All documented latest fields are present when available."""
run_dir = tmp_path / "run_x"
_write_metrics_file(run_dir / "metrics.jsonl", [
_val_record(
5000, 0.9,
**{"val/avg_plies_completed": 321.5,
"val/min_forfeit_ply": 25.0,
"val/max_forfeit_ply": 300.0,
"val/median_forfeit_ply": 120.0,
"val/loss": 2.9,
"val/legal_move_rate": 0.996,
"val/late_legal_move_rate": 0.992}),
])
trial = _make_trial()
trial.run_dir = str(run_dir)
summary = read_pretrain_val_summary(trial)

assert summary is not None
latest = summary["latest"]
assert latest["step"] == 5000
assert latest["val_loss"] == pytest.approx(2.9)
assert latest["game_completion_rate"] == pytest.approx(0.9)
assert latest["avg_plies_completed"] == pytest.approx(321.5)
assert latest["forfeit_ply_min"] == pytest.approx(25.0)
assert latest["forfeit_ply_max"] == pytest.approx(300.0)
assert latest["forfeit_ply_median"] == pytest.approx(120.0)
assert latest["legal_move_rate"] == pytest.approx(0.996)
assert latest["late_legal_move_rate"] == pytest.approx(0.992)

def test_current_forfeit_is_last_of_full_series(self, tmp_path):
"""`current_forfeit` tracks the unfiltered last value, even when it's
zero and got dropped from the OLS window."""
run_dir = tmp_path / "run_x"
# 10 decaying records, then a final record at 100% completion (forfeit=0)
records = [_val_record(i * 1000, 1.0 - math.exp(-i * 0.2)) for i in range(1, 11)]
records.append(_val_record(11_000, 1.0)) # forfeit = 0.0
_write_metrics_file(run_dir / "metrics.jsonl", records)
trial = _make_trial()
trial.run_dir = str(run_dir)
summary = read_pretrain_val_summary(trial)

assert summary is not None
# If forfeit_fit was computed, current_forfeit should be the unfiltered last value
if "forfeit_fit" in summary:
assert summary["forfeit_fit"]["current_forfeit"] == pytest.approx(0.0)
Loading
Loading