Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
fe3034d
Add platform integration.
d42me Feb 26, 2026
5844504
Comments.
d42me Feb 26, 2026
4951f73
Merge remote-tracking branch 'origin/main' into feature/add-platform-…
d42me Feb 26, 2026
b731caf
Address feedback.
d42me Feb 26, 2026
2b6b5b3
Merge branch 'main' into feature/add-platform-integration
d42me Feb 26, 2026
ede47e9
Refactor orchestrator integration.
d42me Feb 26, 2026
ca4f10d
Update skill. Add on exit case.
d42me Feb 27, 2026
0be0977
Refactor to use prime monitor.
d42me Feb 27, 2026
9c7da8a
Remove log.
d42me Feb 27, 2026
4ebc17e
Merge branch 'main' into feature/add-platform-integration
d42me Feb 27, 2026
7b3a5e7
Update changelog.
d42me Feb 27, 2026
4312b20
Update comment.
d42me Feb 27, 2026
1f70b2e
Update docs.
d42me Feb 27, 2026
d743a53
Fix base url conflict and add try/catch for platform calls.
d42me Feb 27, 2026
a262003
Fix edge case.
d42me Feb 28, 2026
5412162
Merge branch 'main' into feature/add-platform-integration
d42me Mar 18, 2026
582b492
Fix PrimeMonitor registration auth reuse
d42me Mar 18, 2026
8274a4f
Merge origin/main into feature/add-platform-integration
d42me Mar 18, 2026
83b6686
Handle missing Prime CLI config in PrimeMonitor
d42me Mar 18, 2026
8cf4a24
Remove PrimeMonitor fallback unit test
d42me Mar 18, 2026
68c76a5
Guard PrimeMonitor finalization on close
d42me Mar 18, 2026
72fa18e
Remove PrimeMonitor config fallback try blocks
d42me Mar 19, 2026
9bd7519
Use frontend URL for PrimeMonitor dashboard links
d42me Mar 19, 2026
1b00086
Handle missing PrimeMonitor frontend URL
d42me Mar 20, 2026
f3b0879
Document platform monitoring setup
d42me Mar 20, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ This directory maintains the documentation for PRIME-RL. It is organized into th
- [**Environments**](environments.md) - Installing and using verifiers environments from the Environments Hub
- [**Async Training**](async.md) - Understanding asynchronous off-policy training and step semantics
- [**Logging**](logging.md) - Logging with loguru, torchrun, and Weights & Biases
- [**Platform Monitoring**](platform-monitoring.md) - Register runs on the Prime Intellect platform and stream training metrics
- [**MultiRunManager**](multi_run_manager.md) - Multi-run training with the MultiRunManager object for concurrent LoRA adapters
- [**Checkpointing**](checkpointing.md) - Saving and resuming training from checkpoints
- [**Benchmarking**](benchmarking.md) - Performance benchmarking and throughput measurement
Expand Down
48 changes: 48 additions & 0 deletions docs/platform-monitoring.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# Platform Monitoring

Use `orchestrator.prime_monitor` to register a run on the Prime Intellect platform and stream training metrics, samples, and distributions.

> **Internal-only for now:** external run registration is currently only enabled for internal / allowlisted teams.

## Prerequisites

You need a Prime API key with `rft:write` scope.

Use the CLI:

```bash
prime login
```

Or set an environment variable directly:

```bash
export PRIME_API_KEY=pit_...
```

## Minimal config

```toml
[orchestrator.prime_monitor]
run_name = "my-experiment"
```

You can also override from the CLI:

```bash
uv run rl @ config.toml --orchestrator.prime_monitor.run_name "my-experiment"
```

## Troubleshooting

### `API key not found`

Set the env var from `api_key_var` or run:

```bash
prime login
```

### `External training runs are not enabled for this team`

Your team is not allowlisted yet. This feature is currently internal-only.
4 changes: 4 additions & 0 deletions src/prime_rl/configs/rl.py
Original file line number Diff line number Diff line change
Expand Up @@ -455,6 +455,10 @@ def auto_setup_wandb(self):

validate_shared_wandb_config(self.trainer, self.orchestrator)

if self.orchestrator.prime_monitor is not None and self.orchestrator.prime_monitor.run_name is None:
if self.wandb and self.wandb.name:
self.orchestrator.prime_monitor.run_name = self.wandb.name

return self

@model_validator(mode="after")
Expand Down
23 changes: 22 additions & 1 deletion src/prime_rl/configs/shared.py
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ class PrimeMonitorConfig(BaseConfig):
Field(
description="The base URL for Prime Intellect monitoring API.",
),
] = "https://api.primeintellect.ai/api/internal/rft"
] = "https://api.primeintellect.ai/api/v1/rft"

api_key_var: Annotated[
str,
Expand All @@ -333,6 +333,27 @@ class PrimeMonitorConfig(BaseConfig):
),
] = LogExtrasConfig()

run_name: Annotated[
str | None,
Field(
description="Name for the run shown on the platform. Defaults to the W&B run name if set, otherwise auto-generated by the platform.",
),
] = None

team_id: Annotated[
str | None,
Field(
description="Team ID to associate the run with.",
),
] = None

frontend_url: Annotated[
str | None,
Field(
description="Frontend base URL used for the dashboard link shown after registration. Defaults to the Prime CLI frontend URL when unset.",
),
] = None


class HeartbeatConfig(BaseConfig):
"""Configures the heartbeat for BetterStack."""
Expand Down
137 changes: 129 additions & 8 deletions src/prime_rl/utils/monitor/prime.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,13 @@
from pathlib import Path
from threading import Thread
from typing import Any
from urllib.parse import urlparse

import httpx
import pyarrow as pa
import pyarrow.parquet as pq
import verifiers as vf
from prime_cli.core.config import Config as PrimeConfig
from transformers.tokenization_utils import PreTrainedTokenizer

from prime_rl.configs.shared import PrimeMonitorConfig
Expand Down Expand Up @@ -68,6 +70,10 @@ def __init__(
self.logger = get_logger()
self.history: list[dict[str, Any]] = []
self.output_dir = output_dir
self._registered = False
self._finalized = False
self._closed = False
self._owner_pid = os.getpid()

rank = int(os.environ.get("RANK", os.environ.get("DP_RANK", "0")))
self.enabled = self.config is not None
Expand All @@ -80,24 +86,31 @@ def __init__(
assert config is not None
self.logger.info(f"Initializing {self.__class__.__name__} ({config})")

# Get API key from environment variable
api_key = os.getenv(config.api_key_var)
if api_key is None:
prime_config = PrimeConfig()
api_key = prime_config.api_key

if not api_key:
self.logger.warning(
f"API key not found. Set {config.api_key_var} environment variable. PrimeMonitor will not be able to upload data."
f"API key not found. Set {config.api_key_var} environment variable or run `prime login`. "
"PrimeMonitor will not be able to upload data."
)
self.enabled = False
return

self.api_key = api_key
self.base_url = config.base_url
self.base_url = config.base_url.rstrip("/")

# Get run_id from environment variable (check before allocating resources)
run_id = os.getenv("RUN_ID")
if not run_id:
self.logger.warning("RUN_ID environment variable not set. PrimeMonitor will not be able to upload data.")
self.enabled = False
return
run_id = self._register_run(config, run_config)
if run_id:
os.environ["RUN_ID"] = run_id
else:
self.enabled = False
return

self.run_id = run_id

# Set up async HTTP client with background event loop.
Expand All @@ -118,6 +131,104 @@ def __init__(
if config.log_extras.distributions:
self.last_log_distributions_step = -1

def _register_run(self, config: PrimeMonitorConfig, run_config: BaseConfig | None) -> str | None:
"""Register an external run with the platform. Returns run_id on success, None on failure."""
registration_api_key = self.api_key
if not registration_api_key:
self.logger.warning(
f"Prime Intellect API key not found. Set {config.api_key_var} environment variable or run `prime login`. "
"PrimeMonitor will not be able to register or upload data."
)
return None

prime_config = None
team_id = config.team_id
frontend_url = config.frontend_url
if team_id is None or frontend_url is None:
prime_config = PrimeConfig()
if team_id is None:
team_id = prime_config.team_id
if frontend_url is None:
frontend_url = prime_config.frontend_url

model = getattr(run_config, "model", None) if run_config else None
environments = getattr(run_config, "env", None) if run_config else None
wandb = getattr(run_config, "wandb", None) if run_config else None

payload: dict = {
"base_model": model.name if model else "unknown",
"max_steps": getattr(run_config, "max_steps", None) or 0,
}
if config.run_name:
payload["name"] = config.run_name
if team_id:
payload["team_id"] = team_id
if environments:
payload["environments"] = [{"id": env.id} for env in environments if hasattr(env, "id")]
if wandb and getattr(wandb, "project", None):
payload["wandb_project"] = wandb.project

parsed = urlparse(config.base_url)
api_base = f"{parsed.scheme}://{parsed.netloc}/api/v1/rft"

try:
response = httpx.post(
f"{api_base}/external-runs",
headers={"Authorization": f"Bearer {registration_api_key}"},
json=payload,
timeout=30,
)
except httpx.HTTPError as e:
self.logger.warning(f"Failed to register platform run: {e}. PrimeMonitor will not be able to upload data.")
return None

if response.status_code != 201:
self.logger.warning(
f"Failed to create platform run (HTTP {response.status_code}): {response.text}. "
"PrimeMonitor will not be able to upload data."
)
return None

run_id = response.json()["run"]["id"]
if frontend_url:
dashboard_url = f"{frontend_url.rstrip('/')}/dashboard/training/{run_id}"
self.logger.success(f"Monitor run at: {dashboard_url}")
else:
self.logger.success(f"Registered platform run {run_id}")
self._registered = True
return run_id

def _finalize_run(self, success: bool) -> None:
"""Mark the run as completed or failed on the platform."""
if not getattr(self, "_registered", False):
return

registration_api_key = self.api_key
payload: dict = {"status": "completed" if success else "failed"}
status_label = "completed" if success else "failed"
self.logger.info(f"Finalizing platform run {self.run_id} as {status_label}")

parsed = urlparse(self.base_url)
finalize_url = f"{parsed.scheme}://{parsed.netloc}/api/v1/rft/external-runs/{self.run_id}/status"

try:
response = httpx.put(
finalize_url,
headers={"Authorization": f"Bearer {registration_api_key}"},
json=payload,
timeout=30,
)
except httpx.HTTPError as e:
self.logger.warning(f"Failed to finalize platform run {self.run_id}: {e}")
return

if response.status_code != 200:
self.logger.warning(
f"Failed to finalize platform run {self.run_id} (HTTP {response.status_code}): {response.text}"
)
return
self.logger.info(f"Platform run {self.run_id} marked as {status_label}")

def log(self, metrics: dict[str, Any], step: int) -> None:
self.history.append(metrics)
if not self.is_master:
Expand Down Expand Up @@ -387,12 +498,22 @@ def save_final_summary(self, filename: str = "final_summary.json") -> None:
"summary": self.history[-1] if self.history else {},
},
)
if os.getpid() == self._owner_pid:
self._finalize_run(success=True)
self._finalized = True

def close(self) -> None:
"""Close the HTTP client and stop the background event loop."""
if not hasattr(self, "_client"):
if self._closed or not hasattr(self, "_client"):
return

self._closed = True

should_finalize = self.is_master and self.enabled and not self._finalized and os.getpid() == self._owner_pid
if should_finalize:
self._finalize_run(success=False)
self._finalized = True

self._flush()

# Close the async client within the event loop
Expand Down
Loading