Skip to content

Problems with Clean Shutdown and Actor Management in training scriptsΒ #360

@DNXie

Description

@DNXie

πŸ› Describe the bug

We noticed several issues with shutdown in training scripts:

1. No Centralized Shutdown for Actors/Services (Solved #357)

  • Problem: Each actor/service (e.g., DatasetActor, policy, RLTrainer, etc.) must be shut down individually.
await asyncio.gather(
            DatasetActor.shutdown(dataloader),
            policy.shutdown(),
            RLTrainer.shutdown(trainer),
            ReplayBuffer.shutdown(replay_buffer),
            ComputeAdvantages.shutdown(compute_advantages),
            ref_model.shutdown(),
            reward_actor.shutdown(),
        )
        # TODO - add a global shutdown that implicitly shuts down all services
        # and remote allocations
        await shutdown()
  • Context: There is no global shutdown mechanism in the provisioner to handle all services at once. This leads to repetitive and error-prone shutdown code.
  • Suggestion: Implement a centralized shutdown in the provisioner to manage all actors/services.

2. Ctrl+C Does Not Exit Cleanly (Gone itself)

  • Problem: grpo/main.py does not exit on a single Ctrl+C as expected; it requires multiple interrupts.
  • Context: The main async loop is wrapped in a try/except for KeyboardInterrupt, but tasks are not cancelled or awaited cleanly, leading to delayed shutdown.

3. Excessive Monarch Noise on Interrupt/Shutdown

  • Problem: Interrupting or finishing a run produces noisy Monarch logs with errors about failed stop messages and closed channels.
[-]E1009 07:20:30.112712 1322189 hyperactor/src/channel/net.rs:796] session unix:@FzPIPHWcrW42EQC1edKBSZcr.7152260238076237936: failed while receiving ack: Connection reset by peer (os error 104)
[1] [1]E1009 07:20:30.112690 1325969 hyperactor/src/proc.rs:1116] _18zcqNVssCLk[1].local_fetcher_actor[1]: failed to send stop message to parent pid 0: ActorError { actor_id: ActorId(Ranked(WorldId("_18zcqNVssCLk"), 1), "local_fetcher_actor", 0), kind: MailboxSender(MailboxSenderError { location: Unbound(ActorId(Ranked(WorldId("_18zcqNVssCLk"), 1), "local_fetcher_actor", 0), "hyperactor::actor::Signal"), kind: Other(channel closed) }) }
[1] [1]E1009 07:20:32.033873 1325967 hyperactor/src/proc.rs:1116] _15CksRCxT6CU[1].TrainActor[1]: failed to send stop message to parent pid 0: ActorError { actor_id: ActorId(Ranked(WorldId("_15CksRCxT6CU"), 1), "TrainActor", 0), kind: MailboxSender(MailboxSenderError { location: Unbound(ActorId(Ranked(WorldId("_15CksRCxT6CU"), 1), "TrainActor", 0), "hyperactor::actor::Signal"), kind: Other(channel closed) }) }
[1] [1]E1009 07:20:32.033895 1325967 hyperactor/src/proc.rs:1116] _15CksRCxT6CU[1].setup[1]: failed to send stop message to parent pid 0: ActorError { actor_id: ActorId(Ranked(WorldId("_15CksRCxT6CU"), 1), "setup", 0), kind: MailboxSender(MailboxSenderError { location: Unbound(ActorId(Ranked(WorldId("_15CksRCxT6CU"), 1), "setup", 0), "hyperactor::actor::Signal"), kind: Other(channel closed) }) }
[0] [0]E1009 07:20:32.033916 1325966 hyperactor/src/proc.rs:1116] _15CksRCxT6CU[0].TrainActor[1]: failed to send stop message to parent pid 0: ActorError { actor_id: ActorId(Ranked(WorldId("_15CksRCxT6CU"), 0), "TrainActor", 0), kind: MailboxSender(MailboxSenderError { location: Unbound(ActorId(Ranked(WorldId("_15CksRCxT6CU"), 0), "TrainActor", 0), "hyperactor::actor::Signal"), kind: Other(channel closed) }) }
[0] [0]E1009 07:20:32.033973 1325966 hyperactor/src/proc.rs:1116] _15CksRCxT6CU[0].local_fetcher_actor[1]: failed to send stop message to parent pid 0: ActorError { actor_id: ActorId(Ranked(WorldId("_15CksRCxT6CU"), 0), "local_fetcher_actor", 0), kind: MailboxSender(MailboxSenderError { location: Unbound(ActorId(Ranked(WorldId("_15CksRCxT6CU"), 0), "local_fetcher_actor", 0), "hyperactor::actor::Signal"), kind: Other(channel closed) }) }
[1] [1]E1009 07:20:32.033982 1325967 hyperactor/src/proc.rs:1116] _15CksRCxT6CU[1].local_fetcher_actor[1]: failed to send stop message to parent pid 0: ActorError { actor_id: ActorId(Ranked(WorldId("_15CksRCxT6CU"), 1), "local_fetcher_actor", 0), kind: MailboxSender(MailboxSenderError { location: Unbound(ActorId(Ranked(WorldId("_15CksRCxT6CU"), 1), "local_fetcher_actor", 0), "hyperactor::actor::Signal"), kind: Other(channel closed) }) }
[0] [0]E1009 07:20:32.034012 1325966 hyperactor/src/proc.rs:1116] _15CksRCxT6CU[0].setup[1]: failed to send stop message to parent pid 0: ActorError { actor_id: ActorId(Ranked(WorldId("_15CksRCxT6CU"), 0), "setup", 0), kind: MailboxSender(MailboxSenderError { location: Unbound(ActorId(Ranked(WorldId("_15CksRCxT6CU"), 0), "setup", 0), "hyperactor::actor::Signal"), kind: Other(channel closed) }) }
  • Context: Likely caused by the GlobalLoggingActor trying to interact with LocalFetcherActors after their process mesh has been cleaned up. However the repro (see below) doesn't initiate GlobalLoggingActor.
  • Repro:
import asyncio
import logging

from forge.controller import ForgeActor
from monarch.actor import endpoint

logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)


class SimpleActor(ForgeActor):
    def __init__(self):
        super().__init__()

    @endpoint
    def add_one(self, x):
        return x + 1


async def main():
    """Minimal reproduction of Monarch Actor logging after shutdown."""
    # Create a simple actor
    actor = await SimpleActor.as_actor()

    # Call a method to ensure the actor is working
    await actor.add_one.call(5)

    # Shutdown the actor
    await SimpleActor.shutdown(actor)


if __name__ == "__main__":
    asyncio.run(main())
  • Potential Root Cause: GlobalLoggingActor is instantiated automatically in provisioner.py and not always shut down properly. There may be issues with how shutdown is overridden in GlobalLoggingActor versus the base ForgeActor.

TLDR of how this works:

  • a) Whenever an actor is spawned (e.g. SimpleActor.as_actor()), it calls provisioner.py get_proc_mesh
  • b) at the end of this function, we call _ = await get_or_create_metric_logger(procs)
  • c) In this function, we spawn a GlobalLoggingActor (if there isn't one) -- this is a singleton, meaning if we have 100 processes, we still have only one GlobalLoggingActor -- and a LocalFetcherActor. This is one per process. If we have 100 processes, we have 100 LocalFetcherActor.
  • d) when we call actor.shutdown, it calls this function: stop_proc_mesh

Versions

No response

Metadata

Metadata

Assignees

Labels

bugSomething isn't working

Type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions