Skip to content

Conversation

felipemello1
Copy link
Contributor

@felipemello1 felipemello1 commented Oct 9, 2025

  1. Changes logging mode, so its clearer:

Before:

reduce_across_ranks: bool
share_run_id: bool

After:

logging_mode: Enum[GLOBAL_REDUCE, PER_RANK_REDUCE, PER_RANK_NO_REDUCE]
per_rank_share_run: bool
  1. Adds class:
class LoggingMode(Enum):
    GLOBAL_REDUCE = "global_reduce"
    PER_RANK_REDUCE = "per_rank_reduce"
    PER_RANK_NO_REDUCE = "per_rank_no_reduce"
  1. Introduces the "PER_RANK_NO_REDUCE" mode, aka streaming. This means we call backend.log(metric) as soon as we get it, without any reduction.

Before, MetricLogger.push(metric) would just collect the metric. Now, it also logs.

def push(self, metric: Metric) -> None:
      # flush in "PER_RANK_NO_REDUCE" mode
      for backend in self.per_rank_no_reduce_backends:
            backend.log_stream(metric=metric, global_step=self.global_step)

      # Always accumulate for reduction and state return
        key = metric.key
        if key not in self.accumulators:
            self.accumulators[key] = metric.reduction.accumulator_class(
                metric.reduction
            )
        self.accumulators[key].append(metric.value)

Notice how x-axis is timestamp:
image

  1. Main design change: logger backends now have async def log_batch and def log_stream. It not totally clear to me if both should be async/sync or if i should try to unify them.
class LoggerBackend(ABC):
    """Abstract logger_backend for metric logging, e.g. wandb, jsonl, etc."""

    def __init__(self, logger_backend_config: dict[str, Any]) -> None:
        self.logger_backend_config = logger_backend_config

    @abstractmethod
    async def init(
        self,
        role: BackendRole,
        primary_logger_metadata: dict[str, Any] | None = None,
        process_name: str | None = None,
    ) -> None:
        """Initializes backend, e.g. wandb.run.init()."""
        pass

    @abstractmethod
    async def log_batch(
        self, metrics: list[Metric], global_step: int, *args, **kwargs
    ) -> None:
        """Log batch of accumulated metrics to backend"""
        pass

    def log_stream(self, metric: Metric, global_step: int, *args, **kwargs) -> None:
        """Stream single metric to backend immediately."""
        pass

    async def finish(self) -> None:
        pass

    def get_metadata_for_secondary_ranks(self) -> dict[str, Any] | None:
        """Return sharable state after primary init (e.g., for shared modes). Called only on globals."""
        return None

@meta-cla meta-cla bot added the CLA Signed This label is managed by the Meta Open Source bot. label Oct 9, 2025
@felipemello1 felipemello1 changed the title Metric Logging updates 5/N [draft] Metric Logging updates 5/N Oct 9, 2025
@felipemello1 felipemello1 marked this pull request as ready for review October 9, 2025 20:56
@felipemello1 felipemello1 changed the title [draft] Metric Logging updates 5/N Metric Logging updates 5/N Oct 9, 2025
@felipemello1 felipemello1 changed the title Metric Logging updates 5/N Metric Logging updates 5/N - enable streaming Oct 14, 2025
@codecov-commenter
Copy link

Codecov Report

❌ Patch coverage is 42.85714% with 68 lines in your changes missing coverage. Please review.
✅ Project coverage is 65.04%. Comparing base (4c14792) to head (69f9f8c).
⚠️ Report is 8 commits behind head on main.

Files with missing lines Patch % Lines
src/forge/observability/metrics.py 32.81% 43 Missing ⚠️
src/forge/observability/metric_actors.py 28.57% 25 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main     #363      +/-   ##
==========================================
- Coverage   73.68%   65.04%   -8.65%     
==========================================
  Files          81       82       +1     
  Lines        7729     7901     +172     
==========================================
- Hits         5695     5139     -556     
- Misses       2034     2762     +728     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

Comment on lines +21 to +24
logging_mode: global_reduce # global_reduce, per_rank_reduce, per_rank_no_reduce
per_rank_share_run: False
console:
reduce_across_ranks: True
logging_mode: global_reduce
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to duplicate logging_mode across different configs like this? feels like clunky UX to me

Copy link
Contributor Author

@felipemello1 felipemello1 Oct 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is per backend. You could have scuba logging on streamining mode, console logging global_reduce and wandb logging per rank. If you have a single backend, you define it only once.

A logger is represented by a backend, i.e. wandb backend. If reduce_across_ranks=False,
the backend is instantiated per-rank, in the MetricCollector, otherwise it is instantiated once globally,
in the GlobalLoggingActor.
Supports multiple logging backends, each with different logging modes.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(I know this is the design from prior to this PR but) just wondering why we use a singleton across multiple logger backends. In my mind it's more idiomatic to instantiate different classes for different logger backends, and also that usage pattern shouldn't be super common anyways. (Personally I would prefer e.g. a simple console logger + generalized logging backend for wandb etc as two separate entities.) Basically I worry that we are putting extra onus on people who just wanna log to wandb with all these nested dict configs etc for something that is a bit of a niche use case

Copy link
Contributor Author

@felipemello1 felipemello1 Oct 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my mind it's more idiomatic to instantiate different classes for different logger backends

We do it. But the logic for backend in backends: backend.log(metric) has to live somewhere. MetricCollector is this wrapper with 4 methods:

class MetricCollector:
       def init_backends():
       def push()
       def flush()
       def shutdown()

each one of them is sort of just doing for backend in backends: backend.do_something()

share_run_id (bool, default False): Only used if reduce_across_ranks=False.
True -> shared run across ranks; False -> separate runs per rank.
logging_mode (LoggingMode): Determines logging behavior
per_rank_share_run (bool, default False): For per-rank modes, whether to share run ID across ranks.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know we chatted about it a bit already, but personally I still don't understand this one. It seems to me like we should always use a single run; if e.g. there are name collisions of metrics within the run we should sort that out ourselves.

Copy link
Contributor Author

@felipemello1 felipemello1 Oct 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"It seems to me like we should always use a single run", then you cannot have this:

image

Imagine that multiple run == multiple json files. e.g. rank_0.json, rank_1.json, etc.


hopefully the readme can clarify this: https://github.com/meta-pytorch/forge/pull/380/files

# Per-rank modes based on share_run_id bool
elif role == BackendRole.GLOBAL and self.share_run_id:
# Per-rank modes based on per_rank_share_run bool
elif role == BackendRole.GLOBAL and self.per_rank_share_run:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A more general question about extensibility: say I want to use my own logger not provided by forge. Do I now have to implement my own custom logic to handle these different cases? (Basically I want to ensure we are not introducing unnecessary friction for users who want to customize beyond what we've directly provided)

Copy link
Contributor Author

@felipemello1 felipemello1 Oct 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you have your own backend, you have to implement a LoggerBackend. The logging_mode defines where things get called.

If "GLOBAL_REDUCE", we call log_batch(reduced_metrics) once, from the controller
if "PER_RANK_REDUCE", we call log_batch(per_rank_reduced_metrics) once per rank.
if "PER_RANK_NO_REDUCE", we call log_stream(metric) on every rank as soon as the record_metric is called.

In the init the user can define how to initialize it, e.g. create a file, initialize a run, etc

class LoggerBackend(ABC):
    @abstractmethod
    async def init(...) -> None:
        pass

    @abstractmethod
    async def log_batch(
        self, metrics: list[Metric], global_step: int, *args, **kwargs
    ) -> None:
        pass
	
    @abstractmethod
    def log_stream(self, metric: Metric, global_step: int, *args, **kwargs) -> None:
        pass

return

# Convert metrics to WandB log format
log_data = {"step": global_step}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did we change global_step -> step here?

def log_stream(self, metric: Metric, global_step: int, *args, **kwargs) -> None:
"""Stream single metric to backend immediately.
NOTE: This method is called synchronously.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the rationale here? In my mind the difference between this and log_batch behavior is a bit unintuitive

Copy link
Contributor Author

@felipemello1 felipemello1 Oct 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we do record_metric(key, value, reduce) -> MetricCollector.push

if "per_rank_no_reduce", we immediately call MetricCollector.push -> backend.log_stream(single_metric)
else, we just accumulate the metric until flush.

GlobalLoggingActor.flush -> MetricCollector.flush -> backend.log_batch(reduced_metrics)

I am not 100% convinced that they should be different methods, but i also was not comfortable merging them.

Weights & Biases logging backend for distributed training.
Weights & Biases logging backend.
For logging mode details, see `forge.observability.metrics.LoggingMode` documentation.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would also be helpful to provide some images in the documentation demonstrating the resulting figures from each of the different modes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

documentation as in readme, or in this docstring?

if metadata_per_primary_backend:
primary_metadata = metadata_per_primary_backend.get(backend_name, {})
# Skip local instantiation. Backend will be instantiated in GlobalLoggingActor.
if mode == LoggingMode.GLOBAL_REDUCE:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Kinda similar to my other comment about supporting multiple logging backends.. I feel like the MetricCollector class is doing a lot (imo too much). I know I wasn't around to give detailed review of the first few PRs, but I do wonder whether all these different logging modes should really all be handled in the same class. Currently the logic is a bit hard for me to understand

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

feel like the MetricCollector class is doing a lot (imo too much)

MetricCollector is this:

class MetricCollector:
       def init_backends():
       def push()
       def flush()
       def shutdown()

Are you saying that the methods are doing too much or we have too many methods?

Currently the logic is a bit hard for me to understand

Thats fair. Maybe i can try to make it more obvious, or maybe its a documentation issue for a global view, which i address in https://github.com/meta-pytorch/forge/pull/380/files

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

CLA Signed This label is managed by the Meta Open Source bot.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants