Skip to content

Conversation

felipemello1
Copy link
Contributor

@felipemello1 felipemello1 commented Oct 3, 2025

How to review this PR:

  • I dont think you should review every file. Too many small changes.
  • Main feedback should be on logging_mode and specially on the API for backend.log_batch and backend.log_stream
    --> I added comments for both in the PR

Summary of changes:

  1. Added timestamp logging system for metrics tracking

This means we now enable zero reduce. The metrics are pushed immediately to the backend with a timestamp and step, and its up to the backend to buffer if they want to.

image
  1. Changed metric logging configs
    Before:
reduce_across_ranks: bool
share_run_id: bool

After:

logging_mode: Enum[GLOBAL_REDUCE, PER_RANK_REDUCE, PER_RANK_NO_REDUCE]
per_rank_share_run: bool

It should be more intuitive what the options mean and we avoid adding a 3rd flag for no_reduce.


  1. unit tests for metrics.py and metric_actors.py

  1. Better actor naming. Now we get this:
image

@meta-cla meta-cla bot added the CLA Signed This label is managed by the Meta Open Source bot. label Oct 3, 2025
@felipemello1 felipemello1 changed the title [Logging] add time stamp logging [DRAFT - DO NOT REVIEW] [Logging] add time stamp logging Oct 3, 2025
@felipemello1 felipemello1 marked this pull request as draft October 3, 2025 18:06
@felipemello1 felipemello1 changed the title [DRAFT - DO NOT REVIEW] [Logging] add time stamp logging [DRAFT - DO NOT REVIEW] [Logging] add time stamp logging + test Oct 3, 2025
@felipemello1 felipemello1 marked this pull request as ready for review October 5, 2025 13:02
@felipemello1 felipemello1 changed the title [DRAFT - DO NOT REVIEW] [Logging] add time stamp logging + test [Logging] add time stamp logging + test Oct 5, 2025
Comment on lines 482 to 508
def push(self, metric: Metric) -> None:
"""Immediately log metrics to backends marked as "no_reduce" and adds metrics to accumulators for reduction
and later logging."""
if not self._is_initialized:
raise ValueError("Collector not initialized—call init first")
raise ValueError(
"MetricCollector was not initialized. This happens when you try to use `record_metric` "
"before you have initialized any logging backends. Please call in your main file:\n"
"`mlogger = await get_or_create_metric_logger(actor_name='Controller')`\n"
"`await mlogger.init_backends.call_one(logging_config)`\n"
"or, to disable metric logging globally, set env variable `FORGE_DISABLE_METRICS=True`"
)

# Validate metric object
if not isinstance(metric, Metric):
raise TypeError(f"Expected {Metric} object, got {metric}")

# Always accumulate for deferred logging and state return
key = metric.key
if key not in self.accumulators:
self.accumulators[key] = reduction.accumulator_class(reduction)
self.accumulators[key] = metric.reduction.accumulator_class(
metric.reduction
)
self.accumulators[key].append(metric.value)

self.accumulators[key].append(value)
# For PER_RANK_NO_REDUCE backends: log immediately
for backend in self.per_rank_no_reduce_backends:
backend.log_immediate(metric=metric, step=self.step)
Copy link
Contributor Author

@felipemello1 felipemello1 Oct 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the major chunk that needs review, IMO.

  1. We do async log_batch flush, i.e
for train_step in range(10):
	do_something()
    await mlogger.flush.call_one()

mlogger.flush calls MetricLogger.flush, which flushes all metrics accumulated during the train step.


  1. However, when logging_mode=PER_RANK_NO_REDUCE, there is no reduce, and MetricCollector flushes immediately on every push, i.e.
record_metric(key, value, reduce_type) # reduce is ignored

record_metric then calls MetricCollector.push, which calls backend.log_stream(key, value). Notice that this is sync.


  1. I am not sure if this is the right API

async batch flush that calls backend.log
sync single metric flush that calls backend.log_stream

What happens when we have backend.log_table. Do we need .log_table_stream too?

logger = logging.getLogger(__name__)


def detect_actor_name_from_call_stack() -> str:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@allenwang28 , mind taking a look at this file?

I used inspect to get the class name from the call stack. I didnt find a good way to get the ActorName from the context(). I tried it in get_actor_name_with_rank, but it returns the local_fetcher, and not the actor.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am getting this currently
image

@felipemello1 felipemello1 mentioned this pull request Oct 5, 2025
14 tasks
),
)

# Call after services are initialized
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you maybe explain in the comment, why the init_backends should be called after services are initialized?

Copy link
Contributor Author

@felipemello1 felipemello1 Oct 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Calling before works for every mode except when 'per_rank_share_run=True'. Then it hangs. wandb says its experimental, and it didnt investigate it more deeply to see if i need to wait for something to finish. But i agree, i will add a note! Edit: done

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we debug this further instead of checking in this workaround?

# Force all timing methods in forge.observability.perf_tracker.py to use
# CPU timer if False or GPU timer if True. If unset, defaults to the assigned value to the function.
METRIC_TIMER_USES_CUDA = "METRIC_TIMER_USES_CUDA"
METRIC_TIMER_USES_GPU = "METRIC_TIMER_USES_GPU"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

making it future proof when we support other backends besides cuda

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TPUs here we come

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OH NO

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"METRIC_TIMER_USES_ACCELERATOR"? Is this what torch uses? geez, we refactor it when the time comes

Comment on lines 657 to +796
async def _init_shared_local(self, primary_metadata: Dict[str, Any]):
import wandb
from wandb.sdk.lib.service import service_token

shared_id = primary_metadata.get("shared_run_id")
if shared_id is None:
raise ValueError(
f"Shared ID required but not provided for {self.name} backend init"
)

# Clear any stale service tokens that might be pointing to dead processes
# In multiprocessing environments, WandB service tokens can become stale and point
# to dead service processes. This causes wandb.init() to hang indefinitely trying
# to connect to non-existent services. Clearing forces fresh service connection.
service_token.clear_service_in_env()

settings = wandb.Settings(mode="shared", x_primary=False, x_label=self.name)
self.run = wandb.init(
id=shared_id,
project=self.project,
group=self.group,
settings=settings,
id=shared_id, project=self.project, group=self.group, settings=settings
Copy link
Contributor Author

@felipemello1 felipemello1 Oct 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@allenwang28 @JenniferWang This fixes the wandb hang. Now we can initialize before or after services, no difference.

tldr: it seems that spawning new processes copies environment variables used by wandb for shared mode, but they are not true for new runs. When spawning everything AFTER services, we didnt have this issue, because every new proc would have their own fresh vars.

if self.share_run_id:
elif role == BackendRole.LOCAL:
if self.per_rank_share_run:
await self._init_shared_local(primary_logger_metadata)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wouldn't this call simply overwrite self.run that is initialized in self._init_shared_global() and set the x_primary to False?

Copy link
Contributor Author

@felipemello1 felipemello1 Oct 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure what you mean by "overwrite". Are you saying because the backend would be called once with backend.init(role=global, ...) and then again (role=local)?

If so, then no. This should never happen. A backend is initialized only once per process.

What happens is that each process has a different instance of this class. So controller has a wandb backend, trainActor has another, etc.

global happens here: https://github.com/felipemello1/forge/blob/ece12d72d39120392ac679dc951125772157bfa6/src/forge/observability/metric_actors.py#L262

Local happens here: https://github.com/felipemello1/forge/blob/ece12d72d39120392ac679dc951125772157bfa6/src/forge/observability/metrics.py#L471

Let me know if thats not clear

Copy link
Member

@DNXie DNXie Oct 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But here in GlobalLoggingActor.init_backends, it would call

await backend.init(role=BackendRole.GLOBAL)

as you pointed out. And also

fetcher.init_backends.call(
                    self.metadata_per_primary_backend, self.config
                )

https://github.com/felipemello1/forge/blob/ece12d72d39120392ac679dc951125772157bfa6/src/forge/observability/metric_actors.py#L276

In LocalFetcherActor.init_backends it would call MetricCollector.init_backends
https://github.com/felipemello1/forge/blob/ece12d72d39120392ac679dc951125772157bfa6/src/forge/observability/metric_actors.py#L167

Are they not the same backend instances? But with grpo/main, there is only one rank involved (rank0). Could you help me understand?

Copy link
Contributor Author

@felipemello1 felipemello1 Oct 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure! We have:

  1. Single GlobalLoggingActor.
  2. Then, when every process is spawned, we spawn a LocalFetcherActor with it in provisioner.py.
    So, if TrainActor has 2 replicas with 2 gpus each, we have 4 LocalFetcherActor.
  3. Each process also has its own MetricCollector.
  4. Each MetricCollector has its own backend. So in the example above, we have 4 backends, 1 per process, if you set logging_mode="per_rank_reduce"|"per_rank_no_reduce". If you set "global_reduce", then no backend is instantiated in the ranks.
  5. When we call GlobalLoggingActor.init_backends --> calls LocalFetcherActor.init_backends, --> calls MetricCollector.init_backends --> local_backend.init, tagged as "local".

So where is the "global" backend? This one is instantiated inside of the GlobalLoggingActor, and its the only "global" one: https://github.com/felipemello1/forge/blob/timestamp_logging/src/forge/observability/metric_actors.py#L262

So to recap:

  • Each process has its own backend. N processes == N Backends instances
  • And we have an extra one in global

Copy link
Contributor Author

@felipemello1 felipemello1 Oct 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it makes it easier to visualize, imagine that instead of backend, it was just a .json that we write to. Every process writes to its own file. Global has its own file too.

global.json
r0.json
r1.json
Etc...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, Felipe, for the explanation! It is much clear now.

@felipemello1
Copy link
Contributor Author

will break it down into 4 PRs

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

CLA Signed This label is managed by the Meta Open Source bot.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants