Skip to content
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
fe3034d
Add platform integration.
d42me Feb 26, 2026
5844504
Comments.
d42me Feb 26, 2026
4951f73
Merge remote-tracking branch 'origin/main' into feature/add-platform-…
d42me Feb 26, 2026
b731caf
Address feedback.
d42me Feb 26, 2026
2b6b5b3
Merge branch 'main' into feature/add-platform-integration
d42me Feb 26, 2026
ede47e9
Refactor orchestrator integration.
d42me Feb 26, 2026
ca4f10d
Update skill. Add on exit case.
d42me Feb 27, 2026
0be0977
Refactor to use prime monitor.
d42me Feb 27, 2026
9c7da8a
Remove log.
d42me Feb 27, 2026
4ebc17e
Merge branch 'main' into feature/add-platform-integration
d42me Feb 27, 2026
7b3a5e7
Update changelog.
d42me Feb 27, 2026
4312b20
Update comment.
d42me Feb 27, 2026
1f70b2e
Update docs.
d42me Feb 27, 2026
d743a53
Fix base url conflict and add try/catch for platform calls.
d42me Feb 27, 2026
a262003
Fix edge case.
d42me Feb 28, 2026
5412162
Merge branch 'main' into feature/add-platform-integration
d42me Mar 18, 2026
582b492
Fix PrimeMonitor registration auth reuse
d42me Mar 18, 2026
8274a4f
Merge origin/main into feature/add-platform-integration
d42me Mar 18, 2026
83b6686
Handle missing Prime CLI config in PrimeMonitor
d42me Mar 18, 2026
8cf4a24
Remove PrimeMonitor fallback unit test
d42me Mar 18, 2026
68c76a5
Guard PrimeMonitor finalization on close
d42me Mar 18, 2026
72fa18e
Remove PrimeMonitor config fallback try blocks
d42me Mar 19, 2026
9bd7519
Use frontend URL for PrimeMonitor dashboard links
d42me Mar 19, 2026
1b00086
Handle missing PrimeMonitor frontend URL
d42me Mar 20, 2026
f3b0879
Document platform monitoring setup
d42me Mar 20, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -91,3 +91,4 @@ Documenting changes which affect configuration usage patterns (added/moved/remov
- **`dry_run`**: Added to `RLConfig` and `SFTConfig` (default: `False`). When set, validates the config, writes resolved subconfigs to `output_dir/configs/`, and exits without starting any processes. Works the same for both local and SLURM runs (2026-02-26)
- **Config output location**: Resolved subconfigs are now always written to `output_dir/configs/` instead of `.pydantic_config/<uuid>/`. This applies to both local and SLURM entrypoints, and for both single-node and multi-node deployments (2026-02-26)
- **SFT config filename**: The resolved SFT trainer config is now written as `sft.toml` instead of `trainer.toml` (2026-02-26)
- **`orchestrator.prime_monitor`**: Extended `PrimeMonitorConfig` with `run_name` and `team_id` fields, and auto-registration support: when `RUN_ID` is not set, the monitor now registers an external run on the platform automatically and streams live metrics, samples, and distributions. Authentication is read from `PRIME_API_KEY` or `~/.prime/config.json` (`prime login`). (2026-02-27)
54 changes: 54 additions & 0 deletions skills/platform-local-run.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
# Prime Platform Local Run

Stream live training metrics/samples to the Prime Intellect platform from any machine, without hosted infrastructure.

## Prerequisites

Your `PRIME_API_KEY` must have `rft:write` scope. Log in via the `prime` CLI:
```bash
prime login
```

Or set the environment variable directly:
```bash
export PRIME_API_KEY=pit_...
```

## Usage

Add `[orchestrator.prime_monitor]` to your TOML config:

```toml
[orchestrator.prime_monitor]
run_name = "my-experiment" # optional; defaults to W&B run name if set
```

Or as a CLI override:
```bash
uv run rl @ config.toml --orchestrator.prime_monitor.run_name "my-experiment"
```

If `RUN_ID` is not already set in the environment, prime-rl will automatically:
1. Resolve API key from `PRIME_API_KEY` env var or `~/.prime/config.json` (via `prime login`)
2. Call `POST /api/v1/rft/external-runs` → extract `run.id`, print dashboard URL
3. Set `RUN_ID` in the orchestrator process
4. Stream metrics/samples/distributions to `api/internal/rft` during training
5. On completion (or crash), call `PUT /api/v1/rft/external-runs/{run_id}/status`

If `RUN_ID` is already set (hosted K8s runs inject it directly), registration is skipped and monitoring proceeds normally.

## Optional fields

```toml
[orchestrator.prime_monitor]
run_name = "qwen3-reverse-text"
team_id = "clxxx..." # show run under a team
base_url = "https://api.primeintellect.ai/api/v1/rft" # default
```

## Auth

`PRIME_API_KEY` (from env or `~/.prime/config.json`) is used for both creating the run and streaming data.
The key must have `rft:write` scope — granted by default when you `prime login`.

`team_id` is resolved from `prime_monitor.team_id` in config, then from `~/.prime/config.json`.
4 changes: 4 additions & 0 deletions src/prime_rl/configs/rl.py
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,10 @@ def auto_setup_wandb(self):

validate_shared_wandb_config(self.trainer, self.orchestrator)

if self.orchestrator.prime_monitor is not None and self.orchestrator.prime_monitor.run_name is None:
if self.wandb and self.wandb.name:
self.orchestrator.prime_monitor.run_name = self.wandb.name

return self

@model_validator(mode="after")
Expand Down
16 changes: 15 additions & 1 deletion src/prime_rl/configs/shared.py
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ class PrimeMonitorConfig(BaseConfig):
Field(
description="The base URL for Prime Intellect monitoring API.",
),
] = "https://api.primeintellect.ai/api/internal/rft"
] = "https://api.primeintellect.ai/api/v1/rft"

api_key_var: Annotated[
str,
Expand All @@ -262,6 +262,20 @@ class PrimeMonitorConfig(BaseConfig):
),
] = LogExtrasConfig()

run_name: Annotated[
str | None,
Field(
description="Name for the run shown on the platform. Defaults to the W&B run name if set, otherwise auto-generated by the platform.",
),
] = None

team_id: Annotated[
str | None,
Field(
description="Team ID to associate the run with.",
),
] = None


class HeartbeatConfig(BaseConfig):
"""Configures the heartbeat for BetterStack."""
Expand Down
119 changes: 111 additions & 8 deletions src/prime_rl/utils/monitor/prime.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,13 @@
from pathlib import Path
from threading import Thread
from typing import Any
from urllib.parse import urlparse

import httpx
import pyarrow as pa
import pyarrow.parquet as pq
import verifiers as vf
from prime_cli.core.config import Config as PrimeConfig
from transformers.tokenization_utils import PreTrainedTokenizer

from prime_rl.configs.shared import PrimeMonitorConfig
Expand Down Expand Up @@ -67,6 +69,8 @@ def __init__(
self.logger = get_logger()
self.history: list[dict[str, Any]] = []
self.output_dir = output_dir
self._registered = False
self._finalized = False

rank = int(os.environ.get("RANK", os.environ.get("DP_RANK", "0")))
self.enabled = self.config is not None
Expand All @@ -79,24 +83,27 @@ def __init__(
assert config is not None
self.logger.info(f"Initializing {self.__class__.__name__} ({config})")

# Get API key from environment variable
api_key = os.getenv(config.api_key_var)
api_key = os.getenv(config.api_key_var) or PrimeConfig().api_key or None
if not api_key:
self.logger.warning(
f"API key not found. Set {config.api_key_var} environment variable. PrimeMonitor will not be able to upload data."
f"API key not found. Set {config.api_key_var} environment variable or run `prime login`. "
"PrimeMonitor will not be able to upload data."
)
self.enabled = False
return

self.api_key = api_key
self.base_url = config.base_url
self.base_url = config.base_url.rstrip("/")

# Get run_id from environment variable (check before allocating resources)
run_id = os.getenv("RUN_ID")
if not run_id:
self.logger.warning("RUN_ID environment variable not set. PrimeMonitor will not be able to upload data.")
self.enabled = False
return
run_id = self._register_run(config, run_config)
if run_id:
os.environ["RUN_ID"] = run_id
else:
self.enabled = False
return

self.run_id = run_id

# Set up async HTTP client with background event loop.
Expand All @@ -117,6 +124,97 @@ def __init__(
if config.log_extras.distributions:
self.last_log_distributions_step = -1

def _register_run(self, config: PrimeMonitorConfig, run_config: BaseSettings | None) -> str | None:
"""Register an external run with the platform. Returns run_id on success, None on failure."""
prime_config = PrimeConfig()
registration_api_key = prime_config.api_key or None
if not registration_api_key:
self.logger.warning(
"Prime Intellect API key not found. Either set PRIME_API_KEY or run `prime login`. "
"PrimeMonitor will not be able to register or upload data."
)
return None

team_id = config.team_id or prime_config.team_id

model = getattr(run_config, "model", None) if run_config else None
environments = getattr(run_config, "env", None) if run_config else None
wandb = getattr(run_config, "wandb", None) if run_config else None

payload: dict = {
"base_model": model.name if model else "unknown",
"max_steps": getattr(run_config, "max_steps", None) or 0,
}
if config.run_name:
payload["name"] = config.run_name
if team_id:
payload["team_id"] = team_id
if environments:
payload["environments"] = [{"id": env.id} for env in environments if hasattr(env, "id")]
if wandb and getattr(wandb, "project", None):
payload["wandb_project"] = wandb.project

parsed = urlparse(config.base_url)
api_base = f"{parsed.scheme}://{parsed.netloc}/api/v1/rft"

try:
response = httpx.post(
f"{api_base}/external-runs",
headers={"Authorization": f"Bearer {registration_api_key}"},
json=payload,
timeout=30,
)
except httpx.HTTPError as e:
self.logger.warning(f"Failed to register platform run: {e}. PrimeMonitor will not be able to upload data.")
return None

if response.status_code != 201:
self.logger.warning(
f"Failed to create platform run (HTTP {response.status_code}): {response.text}. "
"PrimeMonitor will not be able to upload data."
)
return None

run_id = response.json()["run"]["id"]
dashboard_url = f"{parsed.scheme}://{parsed.netloc}/dashboard/training/{run_id}"
self.logger.success(f"Monitor run at:\n {dashboard_url}")
self._registered = True
return run_id

def _finalize_run(self, success: bool) -> None:
"""Mark the run as completed or failed on the platform."""
if not getattr(self, "_registered", False):
return
prime_config = PrimeConfig()
registration_api_key = prime_config.api_key or None
if not registration_api_key:
return

payload: dict = {"status": "completed" if success else "failed"}
status_label = "completed" if success else "failed"
self.logger.info(f"Finalizing platform run {self.run_id} as {status_label}")

parsed = urlparse(self.base_url)
finalize_url = f"{parsed.scheme}://{parsed.netloc}/api/v1/rft/external-runs/{self.run_id}/status"

try:
response = httpx.put(
finalize_url,
headers={"Authorization": f"Bearer {registration_api_key}"},
json=payload,
timeout=30,
)
except httpx.HTTPError as e:
self.logger.warning(f"Failed to finalize platform run {self.run_id}: {e}")
return

if response.status_code != 200:
self.logger.warning(
f"Failed to finalize platform run {self.run_id} (HTTP {response.status_code}): {response.text}"
)
return
self.logger.info(f"Platform run {self.run_id} marked as {status_label}")

def log(self, metrics: dict[str, Any], step: int | None = None) -> None:
self.history.append(metrics)
if not self.is_master:
Expand Down Expand Up @@ -378,12 +476,17 @@ def save_final_summary(self, filename: str = "final_summary.json") -> None:
"summary": self.history[-1] if self.history else {},
},
)
self._finalize_run(success=True)
self._finalized = True

def close(self) -> None:
"""Close the HTTP client and stop the background event loop."""
if not hasattr(self, "_client"):
return

if self.is_master and self.enabled and not self._finalized:
self._finalize_run(success=False)

self._flush()

# Close the async client within the event loop
Expand Down