Skip to content

Commit 31cc8f3

Browse files
author
Felipe Mello
committed
add metrics readme
1 parent 3303af5 commit 31cc8f3

File tree

1 file changed

+295
-0
lines changed

1 file changed

+295
-0
lines changed

src/forge/observability/README.md

Lines changed: 295 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,295 @@
1+
# Observability in Forge
2+
3+
We aim to make distributed observability effortless. You can call `record_metric(key, val, reduce_type)` from anywhere, and it just works. We also provide memory/performance tracers, plug-and-play logging backends, and reduction types. No boilerplate required-just call, flush, and visualize. Disable with `FORGE_DISABLE_METRICS=true`.
4+
5+
## Your Superpowers
6+
7+
### Call `record_metric` from Anywhere
8+
9+
Simple to use, with no need to pass dictionaries around.
10+
11+
Full example:
12+
```python
13+
import asyncio
14+
from forge.observability import get_or_create_metric_logger, record_metric, Reduce
15+
16+
async def main():
17+
# Setup logger
18+
mlogger = await get_or_create_metric_logger(process_name="Controller")
19+
await mlogger.init_backends.call_one({"console": {"logging_mode": "global_reduce"}})
20+
21+
# Have this in any process
22+
def my_fn(number):
23+
record_metric("my_sum_metric", number, Reduce.SUM) # sum(1,2,3)
24+
record_metric("my_max_metric", number, Reduce.MAX) # max(1,2,3)
25+
record_metric("my_mean_metric", number, Reduce.MEAN) # mean(1,2,3)
26+
27+
# Accumulate metrics
28+
for number in range(1, 4): # 1, 2, 3
29+
my_fn(number)
30+
31+
# Flush
32+
await mlogger.flush.call_one(global_step=0) # Flushes and resets metric accumulators
33+
34+
# Shutdown when done
35+
await mlogger.shutdown.call_one()
36+
37+
if __name__ == "__main__":
38+
asyncio.run(main())
39+
```
40+
41+
Output:
42+
```bash
43+
=== [GlobalReduce] - METRICS STEP 0 ===
44+
my_sum_metric: 6.0
45+
my_max_metric: 3.0
46+
my_mean_metric: 2.0
47+
```
48+
49+
### Track Performance: Timing and Memory
50+
51+
Use `Tracer` for tracking durations and memory usage. Overhead is minimal, and GPU timing is non-blocking. Set `timer="gpu"` for kernel-level precision. Tracer leverages `record_metric` in the backend.
52+
53+
```python
54+
from forge.observability.perf_tracker import Tracer
55+
import torch
56+
57+
# ... Initialize logger (as shown in previous example)
58+
59+
def my_fn():
60+
a, b = torch.randn(1000, 1000, device="cuda"), torch.randn(
61+
1000, 1000, device="cuda"
62+
)
63+
64+
tracer = Tracer(prefix="my_cuda_loop", track_memory=True, timer="gpu")
65+
tracer.start()
66+
for _ in range(3):
67+
torch.mm(a, b)
68+
tracer.step("my_metric_mm_a_b")
69+
tracer.stop()
70+
71+
# Accumulate metrics
72+
for _ in range(2):
73+
my_fn()
74+
75+
await mlogger.flush(global_step=0) # Flush and reset
76+
```
77+
78+
Output:
79+
```bash
80+
=== [GlobalReduce] - METRICS STEP 0 ===
81+
my_cuda_loop/memory_delta_end_start_avg_gb: 0.015
82+
my_cuda_loop/memory_peak_max_gb: 0.042
83+
my_cuda_loop/my_metric_mm_a_b/duration_avg_s: 0.031
84+
my_cuda_loop/my_metric_mm_a_b/duration_max_s: 0.186
85+
my_cuda_loop/total_duration_avg_s: 0.094
86+
my_cuda_loop/total_duration_max_s: 0.187
87+
```
88+
89+
For convenience, you can also use `Tracer` as a context manager or decorator:
90+
91+
```python
92+
from forge.observability.perf_tracker import trace
93+
94+
with trace(prefix="train_step", track_memory=True, timer="gpu") as t:
95+
t.step("fwd")
96+
loss = model(x)
97+
t.step("bwd")
98+
loss.backward()
99+
```
100+
101+
```python
102+
from forge.observability.perf_tracker import trace
103+
104+
@trace(prefix="fwd_pass", track_memory=False, timer="cpu")
105+
async def reward_fn(x): # Supports both synchronous and asynchronous functions
106+
return 1.0 if x > 0 else 0.0
107+
```
108+
109+
### Logging Modes
110+
111+
Defined per backend. You have three options:
112+
113+
- **global_reduce**: N ranks = 1 charts. Ranks accumulate → controller reduces → 1 entry per flush. Ideal for a single aggregated view (e.g., average loss chart).
114+
- **per_rank_reduce**: N ranks = N charts. Each rank reduces locally → log once per rank per flush. Ideal for per-rank performance debugging (e.g., GPU utilization).
115+
- **per_rank_no_reduce**: N ranks = N charts. Values are logged immediately without reduction. Ideal for real-time streams.
116+
117+
118+
Consider an example with an actor running on 2 replicas, each with 2 processes, for a total of 4 ranks. We will record the sum of the rank values. For example, rank_0 records 0, and rank_1 records 1.
119+
120+
```python
121+
import asyncio
122+
123+
from forge.controller.actor import ForgeActor
124+
from forge.observability import get_or_create_metric_logger, record_metric, Reduce
125+
from monarch.actor import current_rank, endpoint
126+
127+
# Your distributed actor
128+
class MyActor(ForgeActor):
129+
@endpoint
130+
async def my_fn(self):
131+
rank = current_rank().rank # 0 or 1 per replica
132+
record_metric("my_sum_rank_metric", rank, Reduce.SUM)
133+
134+
async def main():
135+
# Setup logger
136+
mlogger = await get_or_create_metric_logger(process_name="Controller")
137+
await mlogger.init_backends.call_one(
138+
{"console": {"logging_mode": "global_reduce"}} # <--- Define logging_mode here
139+
)
140+
141+
# Setup actor
142+
service_config = {"procs": 2, "num_replicas": 2, "with_gpus": False}
143+
my_actor = await MyActor.options(**service_config).as_service()
144+
145+
# Accumulate metrics
146+
for _ in range(2): # 2 steps
147+
await my_actor.my_fn.fanout()
148+
149+
# Flush
150+
await mlogger.flush.call_one(global_step=0) # Flush and reset
151+
152+
if __name__ == "__main__":
153+
asyncio.run(main())
154+
```
155+
156+
Output:
157+
```bash
158+
=== [GlobalReduce] - METRICS STEP 0 ===
159+
my_sum_rank_metric: 4.0 # (rank_0 + rank_1) * 2 steps * 2 replicas
160+
===============
161+
```
162+
163+
Now, let’s set `"logging_mode": "per_rank_reduce"`:
164+
```bash
165+
=== [MyActor_661W_r0] - METRICS STEP 0 ===
166+
my_sum_rank_metric: 0.0 # (rank_0) * 2 steps
167+
===============
168+
=== [MyActor_661W_r1] - METRICS STEP 0 ===
169+
my_sum_rank_metric: 2.0 # (rank_1) * 2 steps
170+
===============
171+
=== [MyActor_wQ1g_r0] - METRICS STEP 0 ===
172+
my_sum_rank_metric: 0.0 # (rank_0) * 2 steps
173+
===============
174+
=== [MyActor_wQ1g_r1] - METRICS STEP 0 ===
175+
my_sum_rank_metric: 2.0 # (rank_1) * 2 steps
176+
===============
177+
```
178+
179+
Finally, with `"logging_mode": "per_rank_no_reduce"`
180+
```bash
181+
[0] [MyActor-0/2] 2025-10-10 12:21:09 INFO my_sum_rank_metric: 0
182+
[0] [MyActor-0/2] 2025-10-10 12:21:09 INFO my_sum_rank_metric: 0
183+
[1] [MyActor-1/2] 2025-10-10 12:21:09 INFO my_sum_rank_metric: 1
184+
[1] [MyActor-1/2] 2025-10-10 12:21:09 INFO my_sum_rank_metric: 1
185+
[0] [MyActor-0/2] 2025-10-10 12:21:09 INFO my_sum_rank_metric: 0
186+
[0] [MyActor-0/2] 2025-10-10 12:21:09 INFO my_sum_rank_metric: 0
187+
[1] [MyActor-1/2] 2025-10-10 12:21:09 INFO my_sum_rank_metric: 1
188+
[1] [MyActor-1/2] 2025-10-10 12:21:09 INFO my_sum_rank_metric: 1
189+
```
190+
191+
### Using Multiple Backends
192+
193+
For example, you can log reduced metrics to Weights & Biases while using "per_rank_no_reduce" for debugging logs. We support multiple backends during logger initialization:
194+
195+
```python
196+
mlogger = await get_or_create_metric_logger(process_name="Controller")
197+
await mlogger.init_backends.call_one({
198+
"console": {"logging_mode": "per_rank_no_reduce"},
199+
"wandb": {"logging_mode": "global_reduce"}
200+
})
201+
```
202+
203+
### Adding a New Backend
204+
205+
Extend `LoggerBackend` for custom logging, such as saving data to JSONL files, sending Slack notifications when a metric hits a threshold, or supporting tools like MLFlow or Grafana. After writing your backend, register it with `forge.observability.metrics.get_logger_backend_class`.
206+
207+
# TODO: we need a better solution here that doesn't involve commiting to forge
208+
# e.g. register_new_backend_type(my_custom_backend_type)
209+
210+
```python
211+
class ConsoleBackend(LoggerBackend):
212+
def __init__(self, logger_backend_config: dict[str, Any]) -> None:
213+
super().__init__(logger_backend_config)
214+
215+
async def init(self, process_name: str | None = None, *args, **kwargs) -> None:
216+
self.process_name = process_name
217+
218+
async def log_batch(self, metrics: list[Metric], global_step: int, *args, **kwargs) -> None:
219+
# Called on flush
220+
print(self.process_name, metrics)
221+
222+
def log_stream(self, metric: Metric, global_step: int, *args, **kwargs) -> None:
223+
# Called on `record_metric` if "logging_mode": "per_rank_no_reduce"
224+
print(metric)
225+
```
226+
227+
### Adding a New Reduce Type
228+
229+
Metrics are accumulated each time `record_metric` is called. The following example implements the `Reduce.MEAN` accumulator. By tracking `sum` and `count`, it efficiently supports accurate global reduction. Users can extend this by adding custom reduce types, such as `WordCounterAccumulator` or `SampleAccumulator`, and registering them with `forge.observability.metrics.Reduce`. For details on how this is used, see `forge.observability.metrics.MetricCollector`.
230+
231+
# TODO: we need a better solution here that doesn't involve commiting to forge
232+
# e.g. register_new_reduce_type(my_custom_reduce_type)
233+
234+
```python
235+
class MeanAccumulator(MetricAccumulator):
236+
def __init__(self, reduction: Reduce) -> None:
237+
super().__init__(reduction)
238+
self.sum = 0.0
239+
self.count = 0
240+
241+
def append(self, value: Any) -> None:
242+
# Called after record_metric(key, value, reduce.TYPE)
243+
v = float(value.item() if hasattr(value, "item") else value)
244+
self.sum += v
245+
self.count += 1
246+
247+
def get_value(self) -> float:
248+
return self.sum / self.count if self.count > 0 else 0.0
249+
250+
def get_state(self) -> dict[str, Any]:
251+
return {"reduction_type": self.reduction_type.value, "sum": self.sum, "count": self.count}
252+
253+
@classmethod
254+
def get_reduced_value_from_states(cls, states: list[dict[str, Any]]) -> float:
255+
# Useful for global reduce; called before flush
256+
total_sum = sum(s["sum"] for s in states)
257+
total_count = sum(s["count"] for s in states)
258+
return total_sum / total_count if total_count > 0 else 0.0
259+
260+
def reset(self) -> None:
261+
self.sum = 0.0
262+
self.count = 0
263+
```
264+
265+
### Behind the Scenes
266+
267+
We have two main requirements:
268+
1. Metrics must be accumulated somewhere.
269+
2. Metrics must be collected from all ranks.
270+
271+
To address #1, we use a `MetricCollector` per process to store state. For example, with 10 ranks, there are 10 `MetricCollector` instances. Within each rank, `MetricCollector` is a singleton, ensuring the same object is returned after the first call. This eliminates the need to pass dictionaries between functions.
272+
273+
For example, users can simply write:
274+
275+
```python
276+
def my_fn():
277+
record_metric(key, value, reduce) # Calls MetricCollector().push(key, value, reduce)
278+
```
279+
280+
This is simpler than:
281+
282+
```python
283+
def my_fn(my_metrics):
284+
my_metrics[key] = value
285+
return my_metrics
286+
```
287+
288+
To address #2, we automatically spawn a `LocalFetcherActor` for each process and register it with the `GlobalLoggingActor`. This allows the `GlobalLoggingActor` to know which actors to call, and each `LocalFetcherActor` can access the local `MetricCollector`. This spawning and registration occurs in `forge.controller.provisioner.py::get_proc_mesh`.
289+
290+
In summary:
291+
1. One `GlobalLoggingActor` serves as the controller.
292+
2. For each process, `forge.controller.provisioner.py::get_proc_mesh` spawns a `LocalFetcherActor`, so N ranks = N `LocalFetcherActor` instances. These are registered with the `GlobalLoggingActor`.
293+
3. Each rank has a singleton `MetricCollector`, acting as the local storage for metrics.
294+
4. Calling `record_metric(key, value, reduce_type)` stores metrics locally in the `MetricCollector`.
295+
5. When GlobalLoggingActor.flush() -> all LocalFetcherActor.flush() --> MetricCollector.flush()

0 commit comments

Comments
 (0)