Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
e162788
metric logger simple example
Sep 19, 2025
9f13bfb
it works
Sep 22, 2025
4b9aada
delete old files
Sep 22, 2025
2864324
refactoring + docstrings
Sep 23, 2025
c337083
docstring
Sep 23, 2025
16f2267
comments
Sep 23, 2025
40e16c2
update method name
Sep 23, 2025
d7c175d
no circular import
Sep 23, 2025
538e8f2
update command
Sep 23, 2025
64c71f2
Merge branch 'main' of https://github.com/pytorch-labs/forge into met…
Sep 23, 2025
166b5d4
update arg name
Sep 23, 2025
e27d451
move metric actor out of asyncio lock
Sep 23, 2025
11ea544
Merge branch 'main' of https://github.com/pytorch-labs/forge into met…
Sep 23, 2025
4a8db51
Merge branch 'main' into metric_logging
Sep 23, 2025
5cadbee
fix deregister
Sep 23, 2025
cb33d5f
lint
Sep 23, 2025
f28097c
docstring
Sep 23, 2025
3772620
Merge branch 'main' of github.com:meta-pytorch/forge into metric_logging
Sep 24, 2025
06afbb5
fix result extraction and add logger shutdown
Sep 24, 2025
5369939
fix shutdown order
Sep 24, 2025
ffe09b9
simplification + docstrings
Sep 26, 2025
185504d
bug fix + register if respawn
Sep 26, 2025
052e937
it works
Sep 26, 2025
efb639d
use procmesh as key
Sep 26, 2025
781906d
docstring
Sep 26, 2025
f2a9e09
remove protected imports
Sep 26, 2025
8e157bd
create get_metric_logger
Sep 26, 2025
5465080
Merge branch 'main' of github.com:meta-pytorch/forge into metric_logging
Sep 26, 2025
5736c79
call became fanout
Sep 26, 2025
2b0bb05
Merge branch 'main' of github.com:meta-pytorch/forge into metric_logging
Sep 29, 2025
a426cd5
upstream changes
Sep 29, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion apps/grpo/qwen3_1_7b.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Grouped Relative Policy Optimization (GRPO)
# >>> python -m apps.grpo.qwen3_1_7b --config apps/grpo/qwen3_1_7b.yaml
# >>> python -m apps.grpo.main --config apps/grpo/qwen3_1_7b.yaml

# Global configuration
group_size: 8
Expand Down
91 changes: 91 additions & 0 deletions apps/toy_metrics/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the BSD-style license found in the
# LICENSE file in the root directory of this source tree.

import asyncio

import logging
import time

from forge.controller.actor import ForgeActor
from forge.controller.provisioner import shutdown
from forge.observability.metric_actors import setup_metric_logger
from forge.observability.metrics import record_metric, ReductionType

from monarch.actor import current_rank, endpoint

logging.basicConfig(level=logging.DEBUG)


class TrainActor(ForgeActor):
"""Example training actor that records loss metrics."""

@endpoint
async def train_step(self, step: int):
rank = current_rank().rank
value = rank * 1000 + 100 * step
print(f"[TRAIN] Rank {rank}: Step {step}, loss={value}")
record_metric("train/loss", value)


class GeneratorActor(ForgeActor):
"""Example generation actor that records token count metrics."""

@endpoint
async def generate_step(self, step: int, substep: int):
rank = current_rank().rank
value = rank * 1000 + step * 100 + substep * 10
print(f"[GEN] Rank {rank}: Step {step}.{substep}, tokens={value}")
record_metric("generate/tokens", value, ReductionType.SUM)


# Main
async def main():
"""Example demonstrating distributed metric logging with different backends."""
group = f"grpo_exp_{int(time.time())}"

# Config format: {backend_name: backend_config_dict}
# Each backend can specify reduce_across_ranks to control distributed logging behavior
config = {
"console": {"reduce_across_ranks": True},
"wandb": {
"project": "my_project",
"group": group,
"reduce_across_ranks": True,
# Only useful if NOT reduce_across_ranks.
"share_run_id": False, # Share run ID across ranks -- Not recommended.
},
}

service_config = {"procs": 2, "num_replicas": 2, "with_gpus": False}
mlogger = await setup_metric_logger()

# Spawn services first (triggers registrations via provisioner hook)
trainer = await TrainActor.options(**service_config).as_service()
generator = await GeneratorActor.options(**service_config).as_service()

# Now init config on global (inits backends eagerly across fetchers)
await mlogger.init_backends.call_one(config)

for i in range(3):
print(f"\n=== Global Step {i} ===")
await trainer.train_step.fanout(i)
for sub in range(3):
await generator.generate_step.fanout(i, sub)
await mlogger.flush.call_one(i)

# shutdown
await mlogger.shutdown.call_one()

await asyncio.gather(
trainer.shutdown(),
generator.shutdown(),
)

await shutdown()


if __name__ == "__main__":
asyncio.run(main())
8 changes: 1 addition & 7 deletions src/forge/controller/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
#
# This source code is licensed under the BSD-style license found in the
# LICENSE file in the root directory of this source tree.

from .actor import ForgeActor
from .proc_mesh import get_proc_mesh, stop_proc_mesh

Expand All @@ -24,9 +23,4 @@ async def spawn_actors(
return actors


__all__ = [
"spawn_actors",
"stop_proc_mesh",
"get_proc_mesh",
"ForgeActor",
]
__all__ = ["spawn_actors", "stop_proc_mesh", "get_proc_mesh", "ForgeActor"]
10 changes: 10 additions & 0 deletions src/forge/controller/provisioner.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
from monarch.tools.components import hyperactor
from monarch.tools.config import Config

from forge.observability.metric_actors import setup_metric_logger

from forge.types import ProcessConfig

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -215,11 +217,19 @@ def bootstrap(gpu_ids: list[str]):
self._server_names.append(server_name)
self._proc_server_map[procs] = server_name

# Spawn local logging actor on each process and register with global logger
_ = await setup_metric_logger(procs)

return procs

async def stop_proc_mesh(self, proc_mesh: ProcMesh):
"""Stops a proc mesh."""
async with self._lock:
# Deregister local logger from global logger
if hasattr(proc_mesh, "_local_fetcher"):
global_logger = await setup_metric_logger(proc_mesh)
await global_logger.deregister_fetcher.call_one(proc_mesh)

if hasattr(proc_mesh, "_gpu_ids"):
gpu_manager = self._host_gpu_map[proc_mesh._host._host_id]
gpu_manager.release_gpus(proc_mesh._gpu_ids)
Expand Down
54 changes: 54 additions & 0 deletions src/forge/observability/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the BSD-style license found in the
# LICENSE file in the root directory of this source tree.

from .metric_actors import GlobalLoggingActor, LocalFetcherActor, setup_metric_logger
from .metrics import (
ConsoleBackend,
# Utility functions
get_actor_name_with_rank,
get_logger_backend_class,
# Backend classes
LoggerBackend,
MaxAccumulator,
MeanAccumulator,
# Accumulator classes
MetricAccumulator,
MetricCollector,
MinAccumulator,
record_metric,
reduce_metrics_states,
ReductionType,
StdAccumulator,
SumAccumulator,
WandbBackend,
)

__all__ = [
# Main API functions
"record_metric",
"reduce_metrics_states",
"get_actor_name_with_rank",
"get_logger_backend_class",
"setup_metric_logger",
# Enums
"ReductionType",
# Actor classes
"GlobalLoggingActor",
"LocalFetcherActor",
# Collector
"MetricCollector",
# Backend classes
"LoggerBackend",
"ConsoleBackend",
"WandbBackend",
# Accumulator classes
"MetricAccumulator",
"MeanAccumulator",
"SumAccumulator",
"MaxAccumulator",
"MinAccumulator",
"StdAccumulator",
]
Loading
Loading