Skip to content

Commit 507ef51

Browse files
felipemello1Felipe Mello
authored andcommitted
[DOCS] metrics readme (meta-pytorch#380)
Co-authored-by: Felipe Mello <[email protected]>
1 parent c99105f commit 507ef51

File tree

3 files changed

+296
-0
lines changed

3 files changed

+296
-0
lines changed

docs/source/metric_logging.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
```{include} ../../src/forge/observability/README.md

docs/source/tutorials.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,4 +7,5 @@
77
:maxdepth: 1
88
99
zero-to-forge-intro
10+
metric_logging
1011
```

src/forge/observability/README.md

Lines changed: 294 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,294 @@
1+
# Metric Logging in Forge
2+
3+
We aim to make distributed observability effortless. You can call `record_metric(key, val, reduce_type)` from anywhere, and it just works. We also provide memory/performance tracers, plug-and-play logging backends, and reduction types. You can visualize aggregated results globally, per-rank or as a stream. No boilerplate required - just call, flush, and visualize. Disable with `FORGE_DISABLE_METRICS=true`.
4+
5+
## 1. Your Superpowers
6+
7+
### 1.1 Call `record_metric` from Anywhere
8+
9+
Simple to use, with no need to pass dictionaries around. For example, users can simply write:
10+
11+
```python
12+
def my_fn():
13+
record_metric(key, value, reduce)
14+
```
15+
16+
Instead of:
17+
18+
```python
19+
def my_fn(my_metrics):
20+
my_metrics[key] = value
21+
return my_metrics
22+
```
23+
24+
Simple example (for a distributed one, check the next section)
25+
```python
26+
import asyncio
27+
from forge.observability import get_or_create_metric_logger, record_metric, Reduce
28+
29+
async def main():
30+
# Setup logger
31+
mlogger = await get_or_create_metric_logger(process_name="Controller")
32+
await mlogger.init_backends.call_one({"console": {"logging_mode": "global_reduce"}})
33+
34+
# Have this in any process
35+
def my_fn(number):
36+
record_metric("my_sum_metric", number, Reduce.SUM) # sum(1,2,3)
37+
record_metric("my_max_metric", number, Reduce.MAX) # max(1,2,3)
38+
record_metric("my_mean_metric", number, Reduce.MEAN) # mean(1,2,3)
39+
40+
# Accumulate metrics
41+
for number in range(1, 4): # 1, 2, 3
42+
my_fn(number)
43+
44+
# Flush
45+
await mlogger.flush.call_one(global_step=0)
46+
47+
# Shutdown when done
48+
await mlogger.shutdown.call_one()
49+
50+
if __name__ == "__main__":
51+
asyncio.run(main())
52+
```
53+
54+
Output:
55+
```bash
56+
=== [GlobalReduce] - METRICS STEP 0 ===
57+
my_sum_metric: 6.0
58+
my_max_metric: 3.0
59+
my_mean_metric: 2.0
60+
```
61+
62+
### 1.2 Track Performance: Timing and Memory
63+
64+
Use `Tracer` for tracking durations and memory usage. Overhead is minimal, and GPU timing is non-blocking. Set `timer="gpu"` for kernel-level precision. Tracer leverages `record_metric` in the backend.
65+
66+
```python
67+
from forge.observability.perf_tracker import Tracer
68+
import torch
69+
70+
# ... Initialize logger (as shown in previous example)
71+
72+
def my_fn():
73+
a = torch.randn(1000, 1000, device="cuda")
74+
75+
t = Tracer(prefix="my_cuda_loop", track_memory=True, timer="gpu")
76+
t.start()
77+
for _ in range(3):
78+
torch.mm(a, a)
79+
t.step("my_metric_mm")
80+
t.stop()
81+
82+
# Accumulate metrics
83+
for _ in range(2):
84+
my_fn()
85+
86+
await mlogger.flush(global_step=0) # Flush and reset
87+
```
88+
89+
Output:
90+
```bash
91+
=== [GlobalReduce] - METRICS STEP 0 ===
92+
my_cuda_loop/memory_delta_end_start_avg_gb: 0.015
93+
my_cuda_loop/memory_peak_max_gb: 0.042
94+
my_cuda_loop/my_metric_mm/duration_avg_s: 0.031
95+
my_cuda_loop/my_metric_mm/duration_max_s: 0.186
96+
my_cuda_loop/total_duration_avg_s: 0.094
97+
my_cuda_loop/total_duration_max_s: 0.187
98+
```
99+
100+
For convenience, you can also use `Tracer` as a context manager or decorator:
101+
102+
```python
103+
from forge.observability.perf_tracker import trace
104+
105+
with trace(prefix="train_step", track_memory=True, timer="gpu") as t:
106+
t.step("fwd")
107+
loss = model(x)
108+
t.step("bwd")
109+
loss.backward()
110+
111+
@trace(prefix="my_reward_fn", track_memory=False, timer="cpu")
112+
async def reward_fn(x): # Supports both sync/async functions
113+
return 1.0 if x > 0 else 0.0
114+
```
115+
## 2. Logging Modes
116+
117+
Defined per backend. You have three options:
118+
119+
- **global_reduce**: N ranks = 1 chart. Reduces metrics across all ranks. Ideal for a single aggregated view (e.g., average loss chart).
120+
- **per_rank_reduce**: N ranks = N charts. Each rank reduces locally and logs to its own logger. Ideal for per-rank performance debugging (e.g., GPU utilization).
121+
- **per_rank_no_reduce**: N ranks = N charts. Each rank streams to its own logger without reduction. Ideal for real-time streams.
122+
123+
Consider an example with an actor running on 2 replicas, each with 2 processes, for a total of 4 ranks. We will record the sum of the rank values. For example, rank_0 records 0, and rank_1 records 1.
124+
125+
```python
126+
import asyncio
127+
128+
from forge.controller.actor import ForgeActor
129+
from forge.observability import get_or_create_metric_logger, record_metric, Reduce
130+
from monarch.actor import current_rank, endpoint
131+
132+
# Your distributed actor
133+
class MyActor(ForgeActor):
134+
@endpoint
135+
async def my_fn(self):
136+
rank = current_rank().rank # 0 or 1 per replica
137+
record_metric("my_sum_rank_metric", rank, Reduce.SUM) # <--- your metric
138+
139+
async def main():
140+
# Setup logger
141+
mlogger = await get_or_create_metric_logger(process_name="Controller")
142+
await mlogger.init_backends.call_one(
143+
{"console": {"logging_mode": "global_reduce"}} # <--- Define logging_mode here
144+
)
145+
146+
# Setup actor
147+
service_config = {"procs": 2, "num_replicas": 2, "with_gpus": False}
148+
my_actor = await MyActor.options(**service_config).as_service()
149+
150+
# Accumulate metrics
151+
for _ in range(2): # 2 steps
152+
await my_actor.my_fn.fanout()
153+
154+
# Flush
155+
await mlogger.flush.call_one(global_step=0) # Flush and reset
156+
157+
if __name__ == "__main__":
158+
asyncio.run(main())
159+
```
160+
161+
Output when `"logging_mode": "global_reduce"`
162+
```bash
163+
=== [GlobalReduce] - METRICS STEP 0 ===
164+
my_sum_rank_metric: 4.0 # (0 + 1) * 2 steps * 2 replicas
165+
===============
166+
```
167+
168+
Now, let’s set `"logging_mode": "per_rank_reduce"`:
169+
```bash
170+
# replica 1
171+
=== [MyActor_661W_r0] - METRICS STEP 0 ===
172+
my_sum_rank_metric: 0.0 # (rank_0) * 2 steps
173+
===============
174+
=== [MyActor_661W_r1] - METRICS STEP 0 ===
175+
my_sum_rank_metric: 2.0 # (rank_1) * 2 steps
176+
===============
177+
178+
# replica 2
179+
=== [MyActor_wQ1g_r0] - METRICS STEP 0 ===
180+
my_sum_rank_metric: 0.0 # (rank_0) * 2 steps
181+
===============
182+
=== [MyActor_wQ1g_r1] - METRICS STEP 0 ===
183+
my_sum_rank_metric: 2.0 # (rank_1) * 2 steps
184+
===============
185+
```
186+
187+
Finally, with `"logging_mode": "per_rank_no_reduce"`, we have a stream with no reduction:
188+
```bash
189+
[0] [MyActor-0/2] 2025-10-10 12:21:09 INFO my_sum_rank_metric: 0
190+
[0] [MyActor-0/2] 2025-10-10 12:21:09 INFO my_sum_rank_metric: 0
191+
[1] [MyActor-1/2] 2025-10-10 12:21:09 INFO my_sum_rank_metric: 1
192+
[1] [MyActor-1/2] 2025-10-10 12:21:09 INFO my_sum_rank_metric: 1
193+
[0] [MyActor-0/2] 2025-10-10 12:21:09 INFO my_sum_rank_metric: 0
194+
[0] [MyActor-0/2] 2025-10-10 12:21:09 INFO my_sum_rank_metric: 0
195+
[1] [MyActor-1/2] 2025-10-10 12:21:09 INFO my_sum_rank_metric: 1
196+
[1] [MyActor-1/2] 2025-10-10 12:21:09 INFO my_sum_rank_metric: 1
197+
```
198+
199+
## 3. Using Multiple Backends
200+
201+
For example, you can do `global_reduce` with Weights & Biases while using `per_rank_no_reduce` for debugging logs on the console.
202+
203+
```python
204+
mlogger = await get_or_create_metric_logger(process_name="Controller")
205+
await mlogger.init_backends.call_one({
206+
"console": {"logging_mode": "per_rank_no_reduce"},
207+
"wandb": {"logging_mode": "global_reduce"}
208+
})
209+
```
210+
211+
### 3.1 Adding a New Backend
212+
213+
Extend `LoggerBackend` for custom logging, such as saving data to JSONL files, sending Slack notifications when a metric hits a threshold, or supporting tools like MLFlow or Grafana. After writing your backend, register it with `forge.observability.metrics.get_logger_backend_class`.
214+
215+
```python
216+
# Example of a custom backend
217+
class ConsoleBackend(LoggerBackend):
218+
def __init__(self, logger_backend_config: dict[str, Any]) -> None:
219+
super().__init__(logger_backend_config)
220+
221+
async def init(self, process_name: str | None = None, *args, **kwargs) -> None:
222+
self.process_name = process_name
223+
224+
async def log_batch(self, metrics: list[Metric], global_step: int, *args, **kwargs) -> None:
225+
# Called on flush
226+
print(self.process_name, metrics)
227+
228+
def log_stream(self, metric: Metric, global_step: int, *args, **kwargs) -> None:
229+
# Called on `record_metric` if "logging_mode": "per_rank_no_reduce"
230+
print(metric)
231+
```
232+
233+
## 4. Adding a New Reduce Type
234+
235+
Metrics are accumulated each time `record_metric` is called. The following example implements the `Reduce.MEAN` accumulator. Users can extend this by adding custom reduce types, such as `WordCounterAccumulator` or `SampleAccumulator`, and registering them with `forge.observability.metrics.Reduce`. For details on how this is used, see `forge.observability.metrics.MetricCollector`.
236+
237+
238+
```python
239+
# Example of a custom reduce type
240+
class MeanAccumulator(MetricAccumulator):
241+
def __init__(self, reduction: Reduce) -> None:
242+
super().__init__(reduction)
243+
self.sum = 0.0
244+
self.count = 0
245+
self.is_reset = True
246+
247+
def append(self, value: Any) -> None:
248+
# Called after record_metric(key, value, reduce.TYPE)
249+
v = float(value.item() if hasattr(value, "item") else value)
250+
self.sum += v
251+
self.count += 1
252+
253+
def get_value(self) -> float:
254+
return self.sum / self.count if self.count > 0 else 0.0
255+
256+
def get_state(self) -> dict[str, Any]:
257+
return {"reduction_type": self.reduction_type.value, "sum": self.sum, "count": self.count}
258+
259+
@classmethod
260+
def get_reduced_value_from_states(cls, states: list[dict[str, Any]]) -> float:
261+
# Useful for global reduce; called before flush
262+
total_sum = sum(s["sum"] for s in states)
263+
total_count = sum(s["count"] for s in states)
264+
return total_sum / total_count if total_count > 0 else 0.0
265+
266+
def reset(self) -> None:
267+
self.sum = 0.0
268+
self.count = 0
269+
self.is_reset = True
270+
```
271+
272+
## 5. Behind the Scenes
273+
274+
We have two main requirements:
275+
1. Metrics must be accumulated somewhere.
276+
2. Metrics must be collected from all ranks.
277+
278+
To address #1, we use a `MetricCollector` per process to store state. For example, with 10 ranks, there are 10 `MetricCollector` instances. Within each rank, `MetricCollector` is a singleton, ensuring the same object is returned after the first call. This eliminates the need to pass dictionaries between functions.
279+
280+
To address #2, we automatically spawn a `LocalFetcherActor` for each process mesh and register it with the `GlobalLoggingActor`. This allows the `GlobalLoggingActor` to know which processes to call, and each `LocalFetcherActor` can access the local `MetricCollector`. This spawning and registration occurs in `forge.controller.provisioner.py::get_proc_mesh`.
281+
282+
The flow is generally:
283+
GlobalLoggingActor.method() -> per-procmesh LocalFetcherActor.method() -> per-rank MetricCollector.method() -> logger
284+
285+
So you may ask: "what about the logging backends"? They live in two places:
286+
- In each MetricCollector if the backend is marked as per_rank.
287+
- In the GlobalLoggingActor if the backend is marked as global_reduce.
288+
289+
In summary:
290+
1. One `GlobalLoggingActor` serves as the controller.
291+
2. For each process, `forge.controller.provisioner.py::get_proc_mesh` spawns a `LocalFetcherActor`, so N ranks = N `LocalFetcherActor` instances. These are registered with the `GlobalLoggingActor`.
292+
3. Each rank has a singleton `MetricCollector`, holding accumulated metrics and per_rank backends.
293+
4. Calling `record_metric(key, value, reduce_type)` stores metrics locally in the `MetricCollector`.
294+
5. When GlobalLoggingActor.flush() -> all LocalFetcherActor.flush() --> MetricCollector.flush()

0 commit comments

Comments
 (0)