diff --git a/docs/source/conf.py b/docs/source/conf.py index 4e3cec1fa..179a32437 100644 --- a/docs/source/conf.py +++ b/docs/source/conf.py @@ -15,6 +15,7 @@ import sys import pytorch_sphinx_theme2 +from sphinx_gallery.sorting import ExplicitOrder, FileNameSortKey # Add the source directory to Python path so modules can be imported sys.path.insert(0, os.path.abspath("../../src/forge")) @@ -82,7 +83,12 @@ def get_version_path(): "_templates", os.path.join(os.path.dirname(pytorch_sphinx_theme2.__file__), "templates"), ] -exclude_patterns = ["tutorials/index.rst", "tutorials/template_tutorial.rst"] +exclude_patterns = [ + "tutorials/index.rst", + "tutorials/template_tutorial.rst", + "tutorials/**/index.rst", + "tutorial_sources/**/README.md", +] html_static_path = ["_static"] html_css_files = ["custom.css"] @@ -204,7 +210,7 @@ def get_version_path(): sphinx_gallery_conf = { "examples_dirs": "tutorial_sources", # Path to examples directory "gallery_dirs": "tutorials", # Path to generate gallery - "filename_pattern": ".*", # Include all files + "filename_pattern": ".*", # Match all Python files "download_all_examples": False, "first_notebook_cell": "%matplotlib inline", "plot_gallery": "True", @@ -212,6 +218,8 @@ def get_version_path(): "backreferences_dir": None, "show_signature": False, "write_computation_times": False, + "subsection_order": ExplicitOrder(["tutorial_sources/zero-to-forge"]), + "within_subsection_order": FileNameSortKey, } diff --git a/docs/source/tutorial_sources/README.txt b/docs/source/tutorial_sources/README.txt index 1fadb0a08..e69de29bb 100644 --- a/docs/source/tutorial_sources/README.txt +++ b/docs/source/tutorial_sources/README.txt @@ -1,5 +0,0 @@ -Tutorials -========= - -This gallery contains tutorials and examples to help you get started with Forge. -Each tutorial demonstrates specific features and use cases with practical examples. diff --git a/docs/source/tutorial_sources/zero-to-forge/1_RL_and_Forge_Fundamentals.py b/docs/source/tutorial_sources/zero-to-forge/1_RL_and_Forge_Fundamentals.py new file mode 100644 index 000000000..e408cffae --- /dev/null +++ b/docs/source/tutorial_sources/zero-to-forge/1_RL_and_Forge_Fundamentals.py @@ -0,0 +1,506 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# All rights reserved. +# +# This source code is licensed under the BSD-style license found in the +# LICENSE file in the root directory of this source tree. + +""" +Part 1: RL Fundamentals - Using Forge Terminology +================================================== + +**Author:** `Sanyam Bhutani `_ + +.. grid:: 2 + + .. grid-item-card:: :octicon:`mortar-board;1em;` What you will learn + :class-card: card-prerequisites + + * Core RL components in Forge + * How RL concepts map to Forge services + * The RL training loop with Forge APIs + * Forge's distributed architecture benefits + + .. grid-item-card:: :octicon:`list-unordered;1em;` Prerequisites + :class-card: card-prerequisites + + * Understanding of basic RL concepts + * Familiarity with Python async/await + * PyTorch experience recommended +""" + +######################################################################### +# Core RL Components in Forge +# ---------------------------- +# +# Let's start with a simple math tutoring example to understand RL concepts +# with the exact names Forge uses: +# +# The Toy Example: Teaching Math +# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +# +# .. mermaid:: +# +# graph TD +# subgraph Example["Math Tutoring RL Example"] +# Dataset["Dataset: math problems"] +# Policy["Policy: student AI"] +# Reward["Reward Model: scores answers"] +# Reference["Reference Model: baseline"] +# ReplayBuffer["Replay Buffer: stores experiences"] +# Trainer["Trainer: improves student"] +# end +# +# Dataset --> Policy +# Policy --> Reward +# Policy --> Reference +# Reward --> ReplayBuffer +# Reference --> ReplayBuffer +# ReplayBuffer --> Trainer +# Trainer --> Policy +# +# style Policy fill:#4CAF50 +# style Reward fill:#FF9800 +# style Trainer fill:#E91E63 +# +# RL Components Defined (Forge Names) +# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +# +# 1. **Dataset**: Provides questions/prompts (like "What is 2+2?") +# 2. **Policy**: The AI being trained (generates answers like "The answer is 4") +# 3. **Reward Model**: Evaluates answer quality (gives scores like 0.95) +# 4. **Reference Model**: Original policy copy (prevents drift from baseline) +# 5. **Replay Buffer**: Stores experiences (question + answer + score) +# 6. **Trainer**: Updates the policy weights based on experiences + +###################################################################### +# The RL Learning Flow +# -------------------- +# +# Here's a conceptual example of how an RL step works. +# This is CONCEPTUAL - see apps/grpo/main.py for actual GRPO implementation. + + +def conceptual_rl_step(): + """Conceptual RL training step showing the flow.""" + # 1. Get a math problem + question = dataset.sample() # "What is 2+2?" + + # 2. Student generates answer + answer = policy.generate(question) # "The answer is 4" + + # 3. Teacher grades it + score = reward_model.evaluate(question, answer) # 0.95 + + # 4. Compare to original student + baseline = reference_model.compute_logprobs(question, answer) + + # 5. Store the experience + experience = Episode(question, answer, score, baseline) + replay_buffer.add(experience) + + # 6. When enough experiences collected, improve student + batch = replay_buffer.sample(curr_policy_version=0) + if batch is not None: + trainer.train_step(batch) # Student gets better! + + +###################################################################### +# From Concepts to Forge Services +# -------------------------------- +# +# Here's the key insight: **Each RL component becomes a Forge service**. +# The toy example above maps directly to Forge: +# +# .. mermaid:: +# +# graph LR +# subgraph Concepts["RL Concepts"] +# C1["Dataset"] +# C2["Policy"] +# C3["Reward Model"] +# C4["Reference Model"] +# C5["Replay Buffer"] +# C6["Trainer"] +# end +# +# subgraph Services["Forge Services (Real Classes)"] +# S1["DatasetActor"] +# S2["Policy"] +# S3["RewardActor"] +# S4["ReferenceModel"] +# S5["ReplayBuffer"] +# S6["RLTrainer"] +# end +# +# C1 --> S1 +# C2 --> S2 +# C3 --> S3 +# C4 --> S4 +# C5 --> S5 +# C6 --> S6 +# +# style C2 fill:#4CAF50 +# style S2 fill:#4CAF50 +# style C3 fill:#FF9800 +# style S3 fill:#FF9800 + +###################################################################### +# RL Step with Forge Services +# ---------------------------- +# +# Let's look at the example from above again, but this time we use the +# actual Forge API names: + +import asyncio + + +async def conceptual_forge_rl_step(services, step): + """Single RL step using verified Forge APIs.""" + # 1. Get a math problem - Using actual DatasetActor API + sample = await services["dataloader"].sample.call_one() + question, target = sample["request"], sample["target"] + + # 2. Student generates answer - Using actual Policy API + responses = await services["policy"].generate.route(prompt=question) + answer = responses[0].text + + # 3. Teacher grades it - Using actual RewardActor API + score = await services["reward_actor"].evaluate_response.route( + prompt=question, response=answer, target=target + ) + + # 4. Compare to baseline - Using actual ReferenceModel API + # Note: ReferenceModel.forward requires input_ids, max_req_tokens, return_logprobs + # ref_logprobs = await services['ref_model'].forward.route( + # input_ids, max_req_tokens, return_logprobs=True + # ) + + # 5. Store experience - Using actual Episode structure from apps/grpo/main.py + # episode = create_episode_from_response(responses[0], score, ref_logprobs, step) + # await services['replay_buffer'].add.call_one(episode) + + # 6. Improve student - Using actual training pattern + batch = await services["replay_buffer"].sample.call_one(curr_policy_version=step) + if batch is not None: + inputs, targets = batch # GRPO returns (inputs, targets) tuple + loss = await services["trainer"].train_step.call(inputs, targets) + + # 7. Policy synchronization - Using actual weight update pattern + await services["trainer"].push_weights.call(step + 1) + await services["policy"].update_weights.fanout(step + 1) + + return loss + + +###################################################################### +# **Key difference**: Same RL logic, but each component is now a distributed, +# # fault-tolerant, auto-scaling service. + +# Did you realise-we are not worrying about any Infra code here! Forge +# # Automagically handles the details behind the scenes and you can focus on +# # writing your RL Algorthms! + + +###################################################################### +# Why This Matters: Traditional ML Infrastructure Fails +# ----------------------------------------------------- +# +# Our simple RL loop above has complex requirements: +# +# Problem 1: Different Resource Needs +# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +# +# +------------------+-------------------------+---------------------------+ +# | Component | Resource Needs | Scaling Strategy | +# +==================+=========================+===========================+ +# | **Policy** | Large GPU memory | Multiple replicas | +# +------------------+-------------------------+---------------------------+ +# | **Reward** | Small compute | CPU or small GPU | +# +------------------+-------------------------+---------------------------+ +# | **Trainer** | Massive GPU compute | Distributed training | +# +------------------+-------------------------+---------------------------+ +# | **Dataset** | CPU intensive I/O | High memory bandwidth | +# +------------------+-------------------------+---------------------------+ +# +# Problem 2: Complex Interdependencies +# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +# +# .. mermaid:: +# +# graph LR +# A["Policy: Student AI
'What is 2+2?' → 'The answer is 4'"] +# B["Reward: Teacher
Scores answer: 0.95"] +# C["Reference: Original Student
Provides baseline comparison"] +# D["Replay Buffer: Notebook
Stores: question + answer + score"] +# E["Trainer: Tutor
Improves student using experiences"] +# +# A --> B +# A --> C +# B --> D +# C --> D +# D --> E +# E --> A +# +# style A fill:#4CAF50 +# style B fill:#FF9800 +# style C fill:#2196F3 +# style D fill:#8BC34A +# style E fill:#E91E63 +# +# Each step has different: +# +# * **Latency requirements**: Policy inference needs low latency +# * **Scaling patterns**: Need N policy replicas to keep trainer busy +# * **Failure modes**: Any component failure cascades to halt pipeline +# * **Resource utilization**: GPUs for inference/training, CPUs for data + +###################################################################### +# Enter Forge: RL-Native Architecture +# ------------------------------------ +# +# Forge solves these problems by treating each RL component as an +# **independent, distributed unit**. +# +# Quick API Reference (covered in detail in Part 2): +# +# * ``.route()`` - Send request to any healthy replica (load balanced) +# * ``.call_one()`` - Send request to a single actor instance +# * ``.fanout()`` - Send request to ALL replicas in a service + + +async def real_rl_training_step(services, step): + """Single RL step using verified Forge APIs.""" + # 1. Environment interaction - Using actual DatasetActor API + sample = await services["dataloader"].sample.call_one() + prompt, target = sample["request"], sample["target"] + + responses = await services["policy"].generate.route(prompt) + + # 2. Reward computation - Using actual RewardActor API + score = await services["reward_actor"].evaluate_response.route( + prompt=prompt, response=responses[0].text, target=target + ) + + # 3. Get reference logprobs - Using actual ReferenceModel API + input_ids = torch.cat([responses[0].prompt_ids, responses[0].token_ids]) + ref_logprobs = await services["ref_model"].forward.route( + input_ids.unsqueeze(0), max_req_tokens=512, return_logprobs=True + ) + + # 4. Experience storage - Using actual Episode pattern from GRPO + # episode = create_episode_from_response(responses[0], score, ref_logprobs, step) + # await services['replay_buffer'].add.call_one(episode) + + # 5. Learning - Using actual trainer pattern + batch = await services["replay_buffer"].sample.call_one(curr_policy_version=step) + if batch is not None: + inputs, targets = batch + loss = await services["trainer"].train_step.call(inputs, targets) + + # 6. Policy synchronization + await services["trainer"].push_weights.call(step + 1) + await services["policy"].update_weights.fanout(step + 1) + + return loss + + +##################################################################### +# **Key insight**: Each line of RL pseudocode becomes a service call. +# The complexity of distribution, scaling, and fault tolerance is hidden +# behind these simple interfaces. + +###################################################################### +# What Makes This Powerful +# ------------------------- +# +# Automatic Resource Management +# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + + +async def example_automatic_management(policy): + """Forge handles routing, GPU memory, batching, and scaling.""" + responses = await policy.generate.route(prompt="What is 2+2?") + answer = responses[0].text + return answer + + +import torch + +try: + from apps.grpo.main import ComputeAdvantages, DatasetActor, RewardActor +except ImportError: + # Module not available during doc build + ComputeAdvantages = DatasetActor = RewardActor = None + +###################################################################### +# Forge handles behind the scenes: +# +# - Routing to least loaded replica +# - GPU memory management +# - Batch optimization +# - Failure recovery +# - Auto-scaling based on demand + + +###################################################################### +# Independent Scaling +# ~~~~~~~~~~~~~~~~~~~ +# +# Here's how you configure different components with different resources: + +# Note: This is example code showing the Forge API +# For actual imports, see apps/grpo/main.py +from forge.actors.policy import Policy +from forge.actors.reference_model import ReferenceModel +from forge.actors.replay_buffer import ReplayBuffer +from forge.actors.trainer import RLTrainer +from forge.data.rewards import MathReward, ThinkingReward + + +async def example_forge_service_initialization(): + """Example of initializing Forge services for RL training.""" + model = "Qwen/Qwen3-1.7B" + group_size = 1 + + ( + dataloader, + policy, + trainer, + replay_buffer, + compute_advantages, + ref_model, + reward_actor, + ) = await asyncio.gather( + # Dataset actor (CPU) + DatasetActor.options(procs=1).as_actor( + path="openai/gsm8k", + revision="main", + data_split="train", + streaming=True, + model=model, + ), + # Policy service with GPU + Policy.options(procs=1, with_gpus=True, num_replicas=1).as_service( + engine_config={ + "model": model, + "tensor_parallel_size": 1, + "pipeline_parallel_size": 1, + "enforce_eager": False, + }, + sampling_config={ + "n": group_size, + "max_tokens": 16, + "temperature": 1.0, + "top_p": 1.0, + }, + ), + # Trainer actor with GPU + RLTrainer.options(procs=1, with_gpus=True).as_actor( + # Trainer config would come from YAML in real usage + model={ + "name": "qwen3", + "flavor": "1.7B", + "hf_assets_path": f"hf://{model}", + }, + optimizer={"name": "AdamW", "lr": 1e-5}, + training={"local_batch_size": 2, "seq_len": 2048}, + ), + # Replay buffer (CPU) + ReplayBuffer.options(procs=1).as_actor( + batch_size=2, max_policy_age=1, dp_size=1 + ), + # Advantage computation (CPU) + ComputeAdvantages.options(procs=1).as_actor(), + # Reference model with GPU + ReferenceModel.options(procs=1, with_gpus=True).as_actor( + model={ + "name": "qwen3", + "flavor": "1.7B", + "hf_assets_path": f"hf://{model}", + }, + training={"dtype": "bfloat16"}, + ), + # Reward actor (CPU) + RewardActor.options(procs=1, num_replicas=1).as_service( + reward_functions=[MathReward(), ThinkingReward()] + ), + ) + + return ( + dataloader, + policy, + trainer, + replay_buffer, + compute_advantages, + ref_model, + reward_actor, + ) + + +# Run the example (commented out to avoid execution during doc build) +# asyncio.run(example_forge_service_initialization()) + + +###################################################################### +# Forge Components: Services vs Actors +# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +# +# Forge has two types of distributed components: +# +# * **Services**: Multiple replicas with automatic load balancing +# (like Policy, RewardActor) +# * **Actors**: Single instances that handle their own internal +# distribution (like RLTrainer, ReplayBuffer) +# +# We cover this distinction in detail in Part 2, but for now this +# explains the scaling patterns: +# +# * Policy service: ``num_replicas=8`` for high inference demand +# * RewardActor service: ``num_replicas=16`` for parallel evaluation +# * RLTrainer actor: Single instance with internal distributed training + +###################################################################### +# Fault Tolerance +# ~~~~~~~~~~~~~~~ +# +# Forge provides automatic fault tolerance: + + +async def example_fault_tolerance(policy, reward_actor): + """If a replica fails, Forge automatically handles it.""" + # If a policy replica fails: + responses = await policy.generate.route(prompt="What is 2+2?") + answer = responses[0].text + # -> Forge automatically routes to healthy replica + # -> Failed replica respawns in background + # -> No impact on training loop + + # If reward service fails: + score = await reward_actor.evaluate_response.route( + prompt="question", response=answer, target="target" + ) + # -> Retries on different replica automatically + # -> Graceful degradation if all replicas fail + # -> System continues (may need application-level handling) + + +###################################################################### +# Conclusion +# ---------- +# +# This tutorial covered: +# +# * How RL concepts map to Forge components +# * The challenges of traditional RL infrastructure +# * How Forge's architecture solves these challenges +# * Basic Forge API patterns (route, call_one, fanout) +# +# In the next section, we will go a layer deeper and learn how Forge +# services work internally. +# +# Further Reading +# --------------- +# +# * Continue to :doc:`2_Forge_Internals` +# * Check out the full `GRPO implementation `_ +# * Read about the :doc:`../../api_actors` documentation diff --git a/docs/source/tutorial_sources/zero-to-forge/2_Forge_Internals.py b/docs/source/tutorial_sources/zero-to-forge/2_Forge_Internals.py new file mode 100644 index 000000000..001d3c02d --- /dev/null +++ b/docs/source/tutorial_sources/zero-to-forge/2_Forge_Internals.py @@ -0,0 +1,839 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# All rights reserved. +# +# This source code is licensed under the BSD-style license found in the +# LICENSE file in the root directory of this source tree. + +""" +Part 2: Peeling Back the Abstraction - What Are Services? +========================================================== + +**Author:** `Sanyam Bhutani `_ + +.. grid:: 2 + + .. grid-item-card:: :octicon:`mortar-board;1em;` What you will learn + :class-card: card-prerequisites + + * How Forge services work under the hood + * Service communication patterns (route, fanout, call_one) + * State management in distributed systems + * Real-world service orchestration patterns + + .. grid-item-card:: :octicon:`list-unordered;1em;` Prerequisites + :class-card: card-prerequisites + + * Complete :doc:`1_RL_and_Forge_Fundamentals` + * Understanding of Python async/await + * Basic distributed systems knowledge + +We highly recommend completing Part 1 before starting this tutorial. +Part 1 explains RL Concepts and how they land in Forge. + +Now that you see the power of the service abstraction, let's understand +what's actually happening under the hood. Grab your chai! +""" + +###################################################################### +# Service Anatomy: Beyond the Interface +# -------------------------------------- +# +# When you call ``await policy_service.generate(question)``, here's what +# actually happens: +# +# (Don't worry, we will understand Services right in the next section!) +# +# .. mermaid:: +# +# graph TD +# Call["Your Code:
await policy_service.generate"] +# +# subgraph ServiceLayer["Service Layer"] +# Proxy["Service Proxy: Load balancing, Health checking"] +# LB["Load Balancer: Replica selection, Circuit breaker"] +# end +# +# subgraph Replicas["Replica Management"] +# R1["Replica 1: GPU 0, Healthy"] +# R2["Replica 2: GPU 1, Overloaded"] +# R3["Replica 3: GPU 2, Failed"] +# R4["Replica 4: GPU 3, Healthy"] +# end +# +# subgraph Compute["Actual Computation"] +# Actor["Policy Actor: vLLM engine, Model weights, KV cache"] +# end +# +# Call --> Proxy +# Proxy --> LB +# LB --> R1 +# LB -.-> R2 +# LB -.-> R3 +# LB --> R4 +# R1 --> Actor +# R4 --> Actor +# +# style Call fill:#4CAF50 +# style LB fill:#FF9800 +# style R3 fill:#F44336 +# style Actor fill:#9C27B0 + +###################################################################### +# Service Components Deep Dive +# ----------------------------- +# +# 1. Real Service Configuration +# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +# +# Here's the actual ``ServiceConfig`` from Forge source code: + +# Configuration pattern from apps/grpo/main.py: +# +# .. code-block:: python +# +# Policy.options( +# procs=1, # Processes per replica +# num_replicas=4, # Number of replicas +# with_gpus=True # Allocate GPUs +# # Other available options: +# # hosts=None # the number of remote hosts used per replica +# ) + +###################################################################### +# 2. Real Service Creation +# ~~~~~~~~~~~~~~~~~~~~~~~~ +# +# Services are created using the ``.options().as_service()`` pattern +# from the actual GRPO implementation. +# +# The service creation automatically handles: +# +# * Spawning actor replicas across processes/GPUs +# * Load balancing with ``.route()`` method for services +# * Health monitoring and failure recovery +# * Message routing and serialization + +import asyncio + +from forge.actors.policy import Policy + + +async def example_service_creation(): + """Example of creating and using a policy service.""" + model = "Qwen/Qwen3-1.7B" + + policy = await Policy.options(procs=1, with_gpus=True, num_replicas=1).as_service( + engine_config={ + "model": model, + "tensor_parallel_size": 1, + "pipeline_parallel_size": 1, + "enforce_eager": False, + }, + sampling_config={"n": 1, "max_tokens": 16, "temperature": 1.0, "top_p": 1.0}, + ) + + prompt = "What is 3 + 5?" + responses = await policy.generate.route(prompt) + print(f"Response: {responses[0].text}") + + # Cleanup when done + await policy.shutdown() + return policy + + +# Run the example (commented out to avoid execution during doc build) +# asyncio.run(example_service_creation()) + + +###################################################################### +# 3. How Services Actually Work +# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +# +# Forge services are implemented as ``ServiceActors`` that manage +# collections of your ``ForgeActor`` replicas. +# +# When you call ``.as_service()``, Forge creates a ``ServiceInterface`` +# that manages N replicas of your ``ForgeActor`` class and gives you +# methods like ``.route()``, ``.fanout()``, etc. + + +# Your code sees this simple interface: +# responses = await policy.generate.route(prompt=prompt) +# But Forge handles all the complexity of replica management, load balancing, and fault tolerance + + +###################################################################### +# Communication Patterns: Quick Reference +# ---------------------------------------- +# +# **API Summary:** +# +# * ``.route()`` - Send request to any healthy replica in a service (load balanced) +# * ``.call_one()`` - Send request to a single actor instance +# * ``.fanout()`` - Send request to ALL replicas in a service +# +# .. mermaid:: +# +# graph LR +# subgraph Request["Your Request"] +# Code["await service.method.ADVERB()"] +# end +# +# subgraph Patterns["Communication Patterns"] +# Route[".route()
→ One healthy replica"] +# CallOne[".call_one()
→ Single actor"] +# Fanout[".fanout()
→ ALL replicas"] +# end +# +# subgraph Replicas["Replicas/Actors"] +# R1["Replica 1"] +# R2["Replica 2"] +# R3["Replica 3"] +# A1["Actor"] +# end +# +# Code --> Route +# Code --> CallOne +# Code --> Fanout +# +# Route --> R2 +# CallOne --> A1 +# Fanout --> R1 +# Fanout --> R2 +# Fanout --> R3 +# +# style Route fill:#4CAF50 +# style CallOne fill:#FF9800 +# style Fanout fill:#9C27B0 + +###################################################################### +# Deep Dive: Service Communication Patterns +# ------------------------------------------ +# +# These communication patterns ("adverbs") determine how your service +# calls are routed to replicas. Understanding when to use each pattern +# is key to effective Forge usage. +# +# 1. ``.route()`` - Load Balanced Single Replica +# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +# +# **When to use**: Normal request routing where any replica can handle +# the request. + + +def example_route_pattern(): + """Example showing route pattern for load balanced requests.""" + # responses = await policy.generate.route(prompt=question) + # answer = responses[0].text # Extract text from Completion object + pass + + +###################################################################### +# Behind the scenes: +# +# 1. Health check eliminates failed replicas +# 2. Load balancer picks replica (currently round robin) +# 3. Request routes to that specific replica +# 4. Automatic retry on different replica if failure +# +# **Performance characteristics**: +# +# * **Latency**: Lowest (single network hop) +# * **Throughput**: Limited by single replica capacity +# * **Fault tolerance**: Automatic failover to other replicas +# +# **Critical insight**: ``.route()`` is your default choice for +# stateless operations in Forge services. + +###################################################################### +# 2. ``.fanout()`` - Broadcast with Results Collection +# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +# +# **When to use**: You need responses from ALL replicas. + + +async def example_fanout_pattern(): + """Example showing fanout pattern for broadcast operations.""" + # Get version from all policy replicas + # current_versions = await policy.get_version.fanout() + # Returns: [version_replica_1, version_replica_2, ...] + + # Update weights on all replicas + # await policy.update_weights.fanout(new_policy_version) + # Broadcasts to all replicas simultaneously + pass + + +# **Performance characteristics**: +# +# * **Latency**: Slowest replica determines total latency +# * **Throughput**: Network bandwidth × number of replicas +# * **Fault tolerance**: Fails if ANY replica fails (unless configured) +# +# **Critical gotcha**: Don't use ``.fanout()`` for high-frequency +# operations - it contacts all replicas. + +###################################################################### +# 3. Streaming Operations - Custom Implementation Pattern +# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +# +# **When to use**: You want to process results as they arrive. +# +# Streaming requires custom implementation in your training loop. +# The basic ``ReplayBuffer`` doesn't have built-in streaming methods. + + +async def example_streaming_pattern(): + """Pattern from apps/grpo/main.py continuous training.""" + # This is the real API call pattern + # while training: + # batch = await replay_buffer.sample.call_one(curr_policy_version=step) + # if batch is not None: + # # Process batch immediately + # loss = await trainer.train_step.call_one(batch) + # print(f"Training loss: {loss}") + # else: + # await asyncio.sleep(0.1) # Wait for more data + pass + + +###################################################################### +# **Performance characteristics**: +# +# * **Latency**: Process first result immediately +# * **Throughput**: Non-blocking async operations +# * **Fault tolerance**: Continues if some replicas fail +# +# **Critical insight**: Essential for high-throughput RL where +# you can't wait for batches. + +###################################################################### +# Service Sessions for Stateful Operations +# ----------------------------------------- +# +# **When to use**: When you need multiple calls to hit the same replica +# (like KV cache preservation). +# +# **What are sticky sessions?** A session ensures all your service calls +# within the ``async with`` block go to the same replica, instead of +# being load-balanced across different replicas. +# This Counter example demonstrates the difference between regular routing and sessions: + +from forge.controller import ForgeActor +from monarch.actor import endpoint + + +class ForgeCounter(ForgeActor): + def __init__(self, initial_value: int): + self.value = initial_value + + @endpoint + def increment(self) -> int: + self.value += 1 + return self.value + + @endpoint + def get_value(self) -> int: + return self.value + + @endpoint + async def reset(self): + self.value = 0 + + +async def example_session_comparison(): + """Demonstrate the difference between sessions and normal routing.""" + counter_service = await ForgeCounter.options(procs=1, num_replicas=4).as_service( + initial_value=0 + ) + + # WITHOUT SESSIONS: Each .route() call goes to a different replica + await counter_service.increment.route() # Might go to replica 2 + await counter_service.increment.route() # Might go to replica 1 + await counter_service.increment.route() # Might go to replica 3 + + results = await counter_service.increment.fanout() # Get from all replicas + print(f"All replica values: {results}") + # Output: All replica values: [1, 2, 1, 1] - Each replica has different state! + + # WITH SESSIONS: All calls go to the SAME replica + print("\nUsing sticky sessions:") + async with counter_service.session(): # Creates a session that picks one replica + await counter_service.reset.route() # Uses .route() within session + print(await counter_service.increment.route()) # 1 + print(await counter_service.increment.route()) # 2 + print(await counter_service.increment.route()) # 3 + + final_value = await counter_service.get_value.route() + print(f"Final value on this replica: {final_value}") # 3 + + # Cleanup + await counter_service.shutdown() + + +async def example_multi_turn_conversation(policy, turn1, turn2): + """Same pattern works with Policy for multi-turn conversations.""" + async with policy.session(): + response1 = await policy.generate.route(turn1) + full_prompt = turn1 + response1[0].text + turn2 + response2 = await policy.generate.route(full_prompt) + # Both calls hit same replica, preserving KV cache + return response2 + + +###################################################################### +# **Performance impact**: Critical for maintaining KV cache in multi-turn conversations. + +###################################################################### +# Deep Dive: State Management Reality +# ------------------------------------ +# +# The most complex challenge in distributed RL is maintaining state +# consistency while maximizing performance. +# +# The KV Cache Problem +# ~~~~~~~~~~~~~~~~~~~~ +# +# **The challenge**: Policy inference is much faster with KV cache, +# but cache is tied to specific conversation history. + +# This breaks KV cache optimization: +async def naive_multi_turn(): + # Each call might go to different replica = cache miss + response1 = await policy_service.generate.choose(question1) + response2 = await policy_service.generate.choose( + question1 + response1 + ) # Cache miss! + response3 = await policy_service.generate.choose(conversation_so_far) # Cache miss! + + +###################################################################### +# **The solution**: Sticky sessions ensure all calls go to same replica. + + +async def optimized_multi_turn(policy): + """The solution: Sticky sessions ensure same replica.""" + async with policy.session(): + # All calls guaranteed to hit same replica = cache hits + question1 = "What is 2+2?" + response1 = await policy.generate.route(prompt=question1) + full_prompt = question1 + response1[0].text + response2 = await policy.generate.route(prompt=full_prompt) # Cache hit! + conversation = full_prompt + response2[0].text + response3 = await policy.generate.route(prompt=conversation) # Cache hit! + + # Session ends, replica can be garbage collected or reused + + +###################################################################### +# **Performance impact**: Maintaining KV cache across turns avoids +# recomputing previous tokens. + +###################################################################### +# Replay Buffer Consistency +# ~~~~~~~~~~~~~~~~~~~~~~~~~~ +# +# **The challenge**: Multiple trainers and experience collectors +# reading/writing concurrently. +# +# **Real Forge approach**: The ReplayBuffer actor handles concurrency +# internally: + + +async def example_replay_buffer_usage(): + """Forge ReplayBuffer endpoints (verified from source code).""" + # Add episodes (thread-safe by actor model) + # await replay_buffer.add.call_one(episode) + + # Sample batches for training + # batch = await replay_buffer.sample.call_one( + # curr_policy_version=step_number, + # batch_size=None, # Optional parameter, uses default from config + # ) + + # Additional methods available: + # await replay_buffer.clear.call_one() # Clear buffer + # await replay_buffer.evict.call_one(curr_policy_version) # Remove old episodes + # state = await replay_buffer.state_dict.call_one() # Get state + pass + + +# **Critical insight**: The actor model provides natural thread safety - +# each actor processes messages sequentially. + +###################################################################### +# Weight Synchronization Strategy +# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +# +# **The challenge**: Trainer updates policy weights, but policy service +# needs those weights. + +# Forge weight synchronization pattern from apps/grpo/main.py +async def real_weight_sync(trainer, policy, step): + # Trainer pushes weights to TorchStore with version number + await trainer.push_weights.call_one(policy_version=step + 1) + + # Policy service updates to new version from TorchStore + # Use .fanout() to update ALL policy replicas + await policy.update_weights.fanout(policy_version=step + 1) + + +async def check_policy_version(policy): + """Check current policy version.""" + current_version = await policy.get_version.route() + print(f"Current policy version: {current_version}") + return current_version + + +###################################################################### +# Deep Dive: Asynchronous Coordination Patterns +# ---------------------------------------------- +# +# **The real challenge**: Different services run at different speeds, +# but Forge's service abstraction handles the coordination complexity. +# +# The Forge Approach: Let Services Handle Coordination +# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +# +# Instead of manual coordination, Forge services handle speed mismatches +# automatically: + + +from apps.grpo.main import Episode, Group + + +async def simple_rl_step(): + + # ===== Generate a rollout ===== + sample = await dataloader.sample.call_one() # DatasetActor is an actor, not service + prompt, target = sample["request"], sample["target"] # Correct field names + + print(f"Prompt: {prompt}") + print(f"Target: {target}") + + actions = await policy.generate.route(prompt=prompt) # Policy is a service + print(f"Policy response: {actions[0].text}") + + # Create input tensor for reference model (requires full context) + input_ids = torch.cat([actions[0].prompt_ids, actions[0].token_ids]) + ref_logprobs = await ref_model.forward.route( + input_ids.unsqueeze(0), max_req_tokens=512, return_logprobs=True + ) + reward = await reward_actor.evaluate_response.route( # RewardActor is a service + prompt=prompt, response=actions[0].text, target=target + ) + print(f"Reward: {reward}") + + # Create episode using actual GRPO Episode structure + episode = Episode( + episode_id="0", + request=prompt, + policy_version=0, + pad_id=tokenizer.pad_token_id, + request_len=512, + response_len=512, + target=target, + ) + + # Add response data + episode.response = actions[0].text + episode.request_tokens = actions[0].prompt_ids.tolist() + episode.response_tokens = actions[0].token_ids.tolist() + episode.ref_logprobs = ref_logprobs[0] # Extract from batch dimension + episode.reward = reward + + # Compute advantages using actual ComputeAdvantages actor + group = Group.new_group(0, 1, prompt, 0, tokenizer.pad_token_id, 512, 512, target) + group.episodes[0] = episode + advantages = await compute_advantages.compute.call_one( + group + ) # ComputeAdvantages is an actor + episode.advantage = advantages[0] + print(f"Advantage: {advantages[0]}") + await replay_buffer.add.call_one(episode) # ReplayBuffer is an actor + print("Episode stored in replay buffer") + + # ===== Train on the batch ===== + batch = await replay_buffer.sample.call_one(curr_policy_version=0) + if batch is not None: + print("Training on batch...") + inputs, targets = batch # GRPO returns (inputs, targets) tuple + loss = await trainer.train_step.call(inputs, targets) # RLTrainer is an actor + print(f"Training loss: {loss}") + return loss + else: + print("Not enough data in buffer yet") + return None + + +async def run_training_steps(): + """Run multiple RL training steps.""" + # Note: This simplified example assumes tokenizer and services are already initialized + for step in range(10): + print(f"\n--- RL Step {step + 1} ---") + loss = await simple_rl_step() + if loss: + print(f"Step {step + 1} complete, loss: {loss:.4f}") + else: + print(f"Step {step + 1} complete, building buffer...") + + +###################################################################### +# Handling Speed Mismatches with Service Scaling +# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +# +# **The insight**: Scale services independently based on their +# bottlenecks. + + +async def example_service_scaling(): + """Example showing how to scale services independently.""" + # Scale fast services with more replicas + policy = await Policy.options( + procs=1, num_replicas=8, with_gpus=True # Many replicas for high throughput + ).as_service(engine_config={"model": "model_name", "tensor_parallel_size": 1}) + + # Reward evaluation might be CPU-bound + reward_actor = await RewardActor.options( + procs=1, num_replicas=16, with_gpus=False # More CPU replicas + ).as_service(reward_functions=[MathReward()]) + + # Training needs fewer but more powerful replicas + trainer = await RLTrainer.options( + procs=1, with_gpus=True # Fewer but GPU-heavy + ).as_actor( # Trainer typically uses .as_actor() not .as_service() + model={"name": "qwen3", "flavor": "1.7B"}, + optimizer={"name": "AdamW", "lr": 1e-5}, + ) + return policy, reward_actor, trainer + + +###################################################################### +# Service Implementation Example +# ------------------------------- +# +# Let's see how a reward service is actually implemented: + +# Exact RewardActor from apps/grpo/main.py + +from forge.controller import ForgeActor +from forge.data.rewards import MathReward, ThinkingReward +from monarch.actor import endpoint + +# class definition from apps/grpo/main.py +class RewardActor(ForgeActor): + def __init__(self, reward_functions: list): + self.reward_functions = reward_functions + + @endpoint + async def evaluate_response(self, prompt: str, response: str, target: str) -> float: + """Evaluate response quality using multiple reward functions""" + total_reward = 0.0 + + for reward_fn in self.reward_functions: + # Each reward function contributes to total score + reward = reward_fn(prompt, response, target) + total_reward += reward + + # Return average reward across all functions + return ( + total_reward / len(self.reward_functions) if self.reward_functions else 0.0 + ) + + +async def example_reward_actor_usage(): + """Example of using the RewardActor service.""" + reward_actor = await RewardActor.options(procs=1, num_replicas=1).as_service( + reward_functions=[MathReward(), ThinkingReward()] + ) + + prompt = "What is 15% of 240?" + response = "15% of 240 is 36" + target = "36" + + score = await reward_actor.evaluate_response.route( + prompt=prompt, response=response, target=target + ) + print(f"Reward score: {score}") # Usually around 1.0 for correct math answers + # For production scaling - increase num_replicas for parallel evaluation: + # RewardActor.options(procs=1, num_replicas=16) # 16 parallel evaluators + + # Cleanup when done + await reward_actor.shutdown() + + +###################################################################### +# Service Orchestration: The Training Loop +# ----------------------------------------- +# +# Now let's see how services coordinate in a real training loop: + + +# This is the REAL way production RL systems are built with Forge + + +async def example_full_service_orchestration(): + """Service creation pattern from apps/grpo/main.py lines 322-344.""" + print("Initializing all services...") + ( + dataloader, + policy, + trainer, + replay_buffer, + compute_advantages, + ref_model, + reward_actor, + ) = await asyncio.gather( + DatasetActor.options(procs=1).as_actor( + path="openai/gsm8k", + revision="main", + data_split="train", + streaming=True, + model="Qwen/Qwen3-1.7B", + ), + Policy.options(procs=1, with_gpus=True, num_replicas=1).as_service( + engine_config={"model": "Qwen/Qwen3-1.7B", "tensor_parallel_size": 1}, + sampling_config={"n": 1, "max_tokens": 512}, + ), + RLTrainer.options(procs=1, with_gpus=True).as_actor( + model={ + "name": "qwen3", + "flavor": "1.7B", + "hf_assets_path": "hf://Qwen/Qwen3-1.7B", + }, + optimizer={"name": "AdamW", "lr": 1e-5}, + training={"local_batch_size": 2, "seq_len": 2048}, + ), + ReplayBuffer.options(procs=1).as_actor( + batch_size=2, max_policy_age=1, dp_size=1 + ), + ComputeAdvantages.options(procs=1).as_actor(), + ReferenceModel.options(procs=1, with_gpus=True).as_actor( + model={ + "name": "qwen3", + "flavor": "1.7B", + "hf_assets_path": "hf://Qwen/Qwen3-1.7B", + } + ), + RewardActor.options(procs=1, num_replicas=1).as_service( + reward_functions=[MathReward(), ThinkingReward()] + ), + ) + + print("All services initialized successfully!") + + # Run training loop + await production_training_loop( + dataloader, + policy, + trainer, + replay_buffer, + compute_advantages, + ref_model, + reward_actor, + ) + + print("Shutting down services...") + await asyncio.gather( + DatasetActor.shutdown(dataloader), + policy.shutdown(), + RLTrainer.shutdown(trainer), + ReplayBuffer.shutdown(replay_buffer), + ComputeAdvantages.shutdown(compute_advantages), + ReferenceModel.shutdown(ref_model), + reward_actor.shutdown(), + ) + print("All services shut down successfully!") + + +async def production_training_loop( + dataloader, + policy, + trainer, + replay_buffer, + compute_advantages, + ref_model, + reward_actor, +): + """Real training loop pattern from apps/grpo/main.py""" + step = 0 + + while True: + # Data generation + sample = await dataloader.sample.call_one() + + # Policy generation service call + responses = await policy.generate.route(sample["request"]) # Correct field name + + # Reference computation service call (requires full input tensor) + input_ids = torch.cat([responses[0].prompt_ids, responses[0].token_ids]) + ref_logprobs = await ref_model.forward.route( + input_ids.unsqueeze(0), max_req_tokens=512, return_logprobs=True + ) + + # Reward evaluation service call + reward = await reward_actor.evaluate_response.route( + prompt=sample["question"], + response=responses[0].text, + target=sample["answer"], + ) + + # Experience storage (using actual Episode structure) + episode = create_episode_from_grpo_data( + sample, responses[0], reward, ref_logprobs[0], step + ) + await replay_buffer.add.call_one(episode) + + # Training when ready + batch = await replay_buffer.sample.call_one(curr_policy_version=step) + if batch is not None: + inputs, targets = batch # GRPO returns (inputs, targets) tuple + loss = await trainer.train_step.call(inputs, targets) + + # Weight synchronization pattern + await trainer.push_weights.call(step + 1) + await policy.update_weights.fanout(step + 1) # Fanout to all replicas + + print(f"Step {step}, Loss: {loss:.4f}") + step += 1 + + +# **Key observations:** +# +# 1. **Parallelism**: Independent operations run concurrently +# 2. **Load balancing**: Each ``.route()`` call automatically selects optimal replica +# 3. **Fault tolerance**: Failures automatically retry on different replicas +# 4. **Resource efficiency**: CPU and GPU services scale independently +# 5. **Coordination**: Services coordinate through shared state (replay buffer, weight versions) +# +# This is the power of the service abstraction - complex distributed +# coordination looks like simple async Python code. + +###################################################################### +# Conclusion +# ---------- +# +# This tutorial covered: +# +# * How Forge services work under the hood +# * Communication patterns: ``.route()``, ``.fanout()``, ``.call_one()`` +# * State management with sessions and actors +# * Service scaling and orchestration patterns +# +# **Key takeaways:** +# +# * Use ``.route()`` for stateless load-balanced operations +# * Use ``.fanout()`` for coordinated updates across all replicas +# * Use sessions for stateful operations like multi-turn conversations +# * Scale services independently based on bottlenecks +# * Let Forge handle coordination complexity +# +# In the next part we will learn about Monarch internals. +# +# Further Reading +# --------------- +# +# * Continue to :doc:`3_Monarch_101` (coming soon) +# * Check the `Forge source code `_ +# * Review the :doc:`../../api_actors` documentation +# * Explore the `GRPO application `_ diff --git a/docs/source/tutorial_sources/zero-to-forge/3_Monarch_101.py b/docs/source/tutorial_sources/zero-to-forge/3_Monarch_101.py new file mode 100644 index 000000000..c0c3c1411 --- /dev/null +++ b/docs/source/tutorial_sources/zero-to-forge/3_Monarch_101.py @@ -0,0 +1,496 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# All rights reserved. +# +# This source code is licensed under the BSD-style license found in the +# LICENSE file in the root directory of this source tree. + +""" +Part 3: The Forge-Monarch Connection +===================================== + +**Author:** `Sanyam Bhutani `_ + +.. grid:: 2 + + .. grid-item-card:: :octicon:`mortar-board;1em;` What you will learn + :class-card: card-prerequisites + + * How Forge services are built on Monarch + * Understanding ProcMesh and ActorMesh + * The complete hierarchy from service to silicon + * Message routing patterns in distributed actors + + .. grid-item-card:: :octicon:`list-unordered;1em;` Prerequisites + :class-card: card-prerequisites + + * Complete :doc:`1_RL_and_Forge_Fundamentals` + * Complete :doc:`2_Forge_Internals` + * Understanding of distributed systems + +This is part 3 of our series. In the previous sections we learned: + +* Part 1: [RL Concepts and how they map to Forge](1_RL_and_Forge_Fundamentals) +* Part 2: [Forge Internals](2_Forge_Internals) + +Now let's peel back the layers. Forge services are built on top of +**Monarch**, PyTorch's distributed actor framework. Understanding this +connection is crucial for optimization and debugging. +""" + +###################################################################### +# The Complete Hierarchy: Service to Silicon +# ------------------------------------------- +# +# .. mermaid:: +# +# graph TD +# subgraph YourCode["1. Your RL Code"] +# Call["await policy_service.generate.route('What is 2+2?')"] +# end +# +# subgraph ForgeServices["2. Forge Service Layer"] +# ServiceInterface["ServiceInterface: Routes requests, Load balancing, Health checks"] +# ServiceActor["ServiceActor: Manages replicas, Monitors health, Coordinates failures"] +# end +# +# subgraph MonarchLayer["3. Monarch Actor Layer"] +# ActorMesh["ActorMesh PolicyActor: 4 instances, Different GPUs, Message passing"] +# ProcMesh["ProcMesh: 4 processes, GPU topology 0,1,2,3, Network interconnect"] +# end +# +# subgraph Hardware["4. Physical Hardware"] +# GPU0["GPU 0: PolicyActor #1, vLLM Engine, Model Weights"] +# GPU1["GPU 1: PolicyActor #2, vLLM Engine, Model Weights"] +# GPU2["GPU 2: PolicyActor #3, vLLM Engine, Model Weights"] +# GPU3["GPU 3: PolicyActor #4, vLLM Engine, Model Weights"] +# end +# +# Call --> ServiceInterface +# ServiceInterface --> ServiceActor +# ServiceActor --> ActorMesh +# ActorMesh --> ProcMesh +# ProcMesh --> GPU0 +# ProcMesh --> GPU1 +# ProcMesh --> GPU2 +# ProcMesh --> GPU3 +# +# style Call fill:#4CAF50 +# style ServiceActor fill:#FF9800 +# style ActorMesh fill:#9C27B0 +# style ProcMesh fill:#2196F3 + +###################################################################### +# Deep Dive: ProcMesh - The Foundation +# ------------------------------------- +# +# **ProcMesh** is Monarch's core abstraction for organizing processes +# across hardware. Think of it as a multi-dimensional grid that maps +# directly to your cluster topology. +# +# Single Host ProcMesh +# ~~~~~~~~~~~~~~~~~~~~ +# +# **Key insight**: ProcMesh creates one process per GPU, automatically handling the process-to-hardware mapping. +# + +# Example call (commented out since this_host is not defined at module level): +# procs = this_host().spawn_procs(per_host={"gpus": 8}) + +# This creates: +# Process 0 → GPU 0 +# Process 1 → GPU 1 +# Process 2 → GPU 2 +# Process 3 → GPU 3 +# Process 4 → GPU 4 +# Process 5 → GPU 5 +# Process 6 → GPU 6 +# Process 7 → GPU 7 + +###################################################################### +# .. mermaid:: +# +# graph TD +# subgraph Host["Single Host (8 GPUs)"] +# subgraph ProcMesh["ProcMesh: per_host={'gpus': 8}"] +# P0["Process 0
GPU 0"] +# P1["Process 1
GPU 1"] +# P2["Process 2
GPU 2"] +# P3["Process 3
GPU 3"] +# P4["Process 4
GPU 4"] +# P5["Process 5
GPU 5"] +# P6["Process 6
GPU 6"] +# P7["Process 7
GPU 7"] +# end +# +# P0 -.->|"Network"| P1 +# P1 -.->|"Network"| P2 +# P2 -.->|"Network"| P3 +# P3 -.->|"Network"| P4 +# P4 -.->|"Network"| P5 +# P5 -.->|"Network"| P6 +# P6 -.->|"Network"| P7 +# P7 -.->|"Network"| P0 +# end +# +# style P0 fill:#F44336 +# style P1 fill:#F44336 +# style P2 fill:#F44336 +# style P3 fill:#F44336 +# style P4 fill:#F44336 +# style P5 fill:#F44336 +# style P6 fill:#F44336 +# style P7 fill:#F44336 + +###################################################################### +# The beauty: you don't manage individual processes or GPU assignments - +# ProcMesh handles the topology for you. + +###################################################################### +# Multi-Host ProcMesh +# ~~~~~~~~~~~~~~~~~~~ +# +# **Key insight**: ProcMesh seamlessly scales across multiple hosts with continuous process numbering. +# +# .. mermaid:: +# +# graph TD +# subgraph Cluster["Multi-Host Cluster"] +# subgraph Host1["Host 1"] +# subgraph PM1["ProcMesh Segment 1"] +# H1P0["Process 0
GPU 0"] +# H1P1["Process 1
GPU 1"] +# H1P2["Process 2
GPU 2"] +# H1P3["Process 3
GPU 3"] +# end +# end +# +# subgraph Host2["Host 2"] +# subgraph PM2["ProcMesh Segment 2"] +# H2P0["Process 4
GPU 0"] +# H2P1["Process 5
GPU 1"] +# H2P2["Process 6
GPU 2"] +# H2P3["Process 7
GPU 3"] +# end +# end +# +# subgraph Host3["Host 3"] +# subgraph PM3["ProcMesh Segment 3"] +# H3P0["Process 8
GPU 0"] +# H3P1["Process 9
GPU 1"] +# H3P2["Process 10
GPU 2"] +# H3P3["Process 11
GPU 3"] +# end +# end +# end +# +# H1P0 -.->|"InfiniBand"| H2P0 +# H1P1 -.->|"InfiniBand"| H2P1 +# H2P0 -.->|"InfiniBand"| H3P0 +# H2P1 -.->|"InfiniBand"| H3P1 +# +# style PM1 fill:#F44336 +# style PM2 fill:#4CAF50 +# style PM3 fill:#2196F3 + +# Same simple API works across hosts: +cluster_procs = spawn_cluster_procs( + hosts=["host1", "host2", "host3"], per_host={"gpus": 4} +) + +# Automatically creates: +# Host 1: Processes 0-3 → GPUs 0-3 +# Host 2: Processes 4-7 → GPUs 0-3 +# Host 3: Processes 8-11 → GPUs 0-3 + +# Your code stays the same whether it's 1 host or 100 hosts +actors = cluster_procs.spawn("my_actor", MyActor) + +###################################################################### +# **The power**: Scale from single host to cluster without changing your +# actor code - ProcMesh handles all the complexity. + +# This shows the underlying actor system that powers Forge services +# NOTE: This is for educational purposes - use ForgeActor and .as_service() in real Forge apps! + +from monarch.actor import Actor, endpoint, Future, ProcMesh, this_host, this_proc + + +# STEP 1: Define a basic actor +class Counter(Actor): + def __init__(self, initial_value: int): + self.value = initial_value + + @endpoint + def increment(self) -> None: + self.value += 1 + + @endpoint + def get_value(self) -> int: + return self.value + + +async def example_monarch_counter_usage(): + """Example showing basic Monarch actor usage.""" + # STEP 2: Single actor in local process + counter: Counter = this_proc().spawn("counter", Counter, initial_value=0) + + # STEP 3: Send messages + fut: Future[int] = counter.get_value.call_one() + value = await fut + print(f"Counter value: {value}") # 0 + + # STEP 4: Multiple actors across processes + procs: ProcMesh = this_host().spawn_procs(per_host={"gpus": 8}) + counters: Counter = procs.spawn("counters", Counter, 0) + + # STEP 5: Broadcast to all actors + await counters.increment.call() + + # STEP 6: Different message patterns + # call_one() - single actor + value = await counters.get_value.call_one() + print(f"One counter: {value}") # Output: One counter: 1 + + # choose() - random single actor (actors only, not services) + value = await counters.get_value.choose() + print(f"Random counter: {value}") # Output: Random counter: 1 + + # call() - all actors, collect results + values = await counters.get_value.call() + print(f"All counters: {values}") # Output: All counters: [1, 1, 1, 1, 1, 1, 1, 1] + + # broadcast() - fire and forget + await counters.increment.broadcast() # No return value - just sends to all actors + + # Cleanup + await procs.stop() + + +###################################################################### +# Remember: This raw Monarch code is for understanding how Forge works internally. +# In your Forge applications, use ForgeActor, .as_service(), .as_actor() instead! + + +###################################################################### +# Actor Meshes: Your Code Running Distributed +# -------------------------------------------- +# +# **ActorMesh** is created when you spawn actors across a ProcMesh. +# Each process in the ProcMesh gets one instance of your actor. +# +# - **One actor instance per process**: `mesh.spawn("policy", PolicyActor)` creates one PolicyActor in each process +# - **Same constructor arguments**: All instances get the same initialization parameters +# - **Independent state**: Each actor instance maintains its own state and memory +# - **Message routing**: You can send messages to one actor or all actors using different methods + +# Simple example: +procs = spawn_procs(per_host={"gpus": 4}) # 4 processes +policy_actors = procs.spawn("policy", PolicyActor, model="Qwen/Qwen3-7B") + +# Now you have 4 PolicyActor instances, one per GPU +# All initialized with the same model parameter + + +###################################################################### +# How Forge Services Use Monarch +# ------------------------------- +# +# Now the key insight: **Forge services are ``ServiceActors`` that manage +# ``ActorMeshes`` of your ``ForgeActor`` replicas**. +# +# The Service Creation Process +# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +# +# .. mermaid:: +# +# graph TD +# subgraph ServiceCreation["Service Creation Process"] +# Call["await PolicyActor.options(num_replicas=4, procs=1).as_service(model='Qwen')"] +# +# ServiceActor["ServiceActor: Manages 4 replicas, Health checks, Routes calls"] +# +# subgraph Replicas["4 Independent Replicas"] +# subgraph R0["Replica 0"] +# PM0["ProcMesh: 1 process, GPU 0"] +# AM0["ActorMesh
1 PolicyActor"] +# end +# +# subgraph R1["Replica 1"] +# PM1["ProcMesh: 1 process, GPU 1"] +# AM1["ActorMesh
1 PolicyActor"] +# end +# +# subgraph R2["Replica 2"] +# PM2["ProcMesh: 1 process, GPU 2"] +# AM2["ActorMesh
1 PolicyActor"] +# end +# +# subgraph R3["Replica 3"] +# PM3["ProcMesh: 1 process, GPU 3"] +# AM3["ActorMesh
1 PolicyActor"] +# end +# end +# +# Call --> ServiceActor +# ServiceActor --> R0 +# ServiceActor --> R1 +# ServiceActor --> R2 +# ServiceActor --> R3 +# PM0 --> AM0 +# PM1 --> AM1 +# PM2 --> AM2 +# PM3 --> AM3 +# end +# +# style ServiceActor fill:#FF9800 +# style AM0 fill:#4CAF50 +# style AM1 fill:#4CAF50 +# style AM2 fill:#4CAF50 +# style AM3 fill:#4CAF50 + +###################################################################### +# Service Call to Actor Execution +# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +# +# .. mermaid:: +# +# graph TD +# subgraph CallFlow["Complete Call Flow"] +# UserCall["await policy_service.generate.route('What is 2+2?')"] +# +# ServiceInterface["ServiceInterface: Receives .route() call, Routes to ServiceActor"] +# +# ServiceActor["ServiceActor: Selects healthy replica, Load balancing, Failure handling"] +# +# SelectedReplica["Selected Replica #2: ProcMesh 1 process, ActorMesh 1 PolicyActor"] +# +# PolicyActor["PolicyActor Instance: Loads model, Runs vLLM inference"] +# +# GPU["GPU 2: vLLM engine, Model weights, KV cache, CUDA kernels"] +# +# UserCall --> ServiceInterface +# ServiceInterface --> ServiceActor +# ServiceActor --> SelectedReplica +# SelectedReplica --> PolicyActor +# PolicyActor --> GPU +# +# GPU -.->|"Response"| PolicyActor +# PolicyActor -.->|"Response"| SelectedReplica +# SelectedReplica -.->|"Response"| ServiceActor +# ServiceActor -.->|"Response"| ServiceInterface +# ServiceInterface -.->|"'The answer is 4'"| UserCall +# end +# +# style UserCall fill:#4CAF50 +# style ServiceActor fill:#FF9800 +# style PolicyActor fill:#9C27B0 +# style GPU fill:#FF5722 + +###################################################################### +# Multiple Services Sharing Infrastructure +# ----------------------------------------- +# +# In real RL systems, you have multiple services that can share or use +# separate ``ProcMeshes``: +# +# .. mermaid:: +# +# graph TD +# subgraph Cluster["RL Training Cluster"] +# subgraph Services["Forge Services"] +# PS["Policy Service
4 GPU replicas"] +# TS["Trainer Service
2 GPU replicas"] +# RS["Reward Service
4 CPU replicas"] +# BS["Buffer Service
1 CPU replica"] +# end +# +# subgraph MonarchInfra["Monarch Infrastructure"] +# subgraph GPUMesh["GPU ProcMesh (6 processes)"] +# G0["Process 0
GPU 0"] +# G1["Process 1
GPU 1"] +# G2["Process 2
GPU 2"] +# G3["Process 3
GPU 3"] +# G4["Process 4
GPU 4"] +# G5["Process 5
GPU 5"] +# end +# +# subgraph CPUMesh["CPU ProcMesh (5 processes)"] +# C0["Process 0
CPU"] +# C1["Process 1
CPU"] +# C2["Process 2
CPU"] +# C3["Process 3
CPU"] +# C4["Process 4
CPU"] +# end +# end +# +# PS --> G0 +# PS --> G1 +# PS --> G2 +# PS --> G3 +# TS --> G4 +# TS --> G5 +# RS --> C0 +# RS --> C1 +# RS --> C2 +# RS --> C3 +# BS --> C4 +# end +# +# style PS fill:#4CAF50 +# style TS fill:#E91E63 +# style RS fill:#FF9800 +# style BS fill:#9C27B0 +# style GPUMesh fill:#FFEBEE +# style CPUMesh fill:#E3F2FD + +###################################################################### +# Key Insights: Why This Architecture Matters +# -------------------------------------------- +# +# 1. **Process Isolation**: Each actor runs in its own process - failures don't cascade +# 2. **Location Transparency**: Actors can be local or remote with identical APIs +# 3. **Structured Distribution**: ProcMesh maps directly to hardware topology +# 4. **Message Passing**: No shared memory means no race conditions or locks +# 5. **Service Abstraction**: Forge hides Monarch complexity while preserving power +# +# Understanding this hierarchy helps you: +# +# * **Debug performance issues**: Is the bottleneck at service, actor, or hardware level? +# * **Optimize resource usage**: How many replicas per service? GPU vs CPU processes? +# * **Handle failures gracefully**: Which layer failed and how to recover? +# * **Scale effectively**: Where to add resources for maximum impact? + + +###################################################################### +# Conclusion +# ---------- +# +# What You've Learned +# ~~~~~~~~~~~~~~~~~~~ +# +# 1. **RL Fundamentals**: How RL concepts map to Forge services with real examples +# 2. **Service Abstraction**: How to use Forge services effectively +# 3. **Monarch Foundation**: How Forge services connect to distributed actors and hardware +# +# Key Takeaways +# ~~~~~~~~~~~~~ +# +# * **Services hide complexity**: Your RL code looks like simple async functions, +# but runs on distributed clusters +# * **Communication patterns matter**: ``.route()``, ``.fanout()``, sessions, +# and ``.call_one()`` each serve specific purposes +# * **Architecture understanding helps**: Knowing the Service → Actor → Process → +# Hardware hierarchy helps you debug, optimize, and scale +# * **Always verify APIs**: This guide is verified, but cross-check with source +# code for latest changes +# * **Real API patterns**: Use ``.options().as_service()`` not ``spawn_service()``, +# use ``.route()`` not ``.choose()``, etc. +# +# Further Reading +# --------------- +# +# * Review :doc:`1_RL_and_Forge_Fundamentals` for RL concepts +# * Review :doc:`2_Forge_Internals` for service patterns +# * Check the `Forge source code `_ +# * Explore the `GRPO application `_ +# * Read about `Monarch `_ for deeper understanding diff --git a/docs/source/tutorial_sources/zero-to-forge/README.md b/docs/source/tutorial_sources/zero-to-forge/README.md new file mode 100644 index 000000000..f32b01c20 --- /dev/null +++ b/docs/source/tutorial_sources/zero-to-forge/README.md @@ -0,0 +1,26 @@ +# Zero to Forge: From RL Theory to Production-Scale Implementation + +A comprehensive guide for ML Engineers building distributed RL systems for language models. + +Some of the examples mentioned below will be conceptual in nature for understanding. Please refer to API Docs (Coming Soon!) for more details + +Welcome to the Tutorials section! This section is inspired by the A-Z PyTorch tutorial, shoutout to our PyTorch friends that remember! + +## Tutorial Structure + +This section currently is structured in 3 detailed parts: + +1. [RL Fundamentals and Understanding Forge Terminology](1_RL_and_Forge_Fundamentals): This gives a quick refresher of Reinforcement Learning and teaches you Forge Fundamentals +2. [Forge Internals](2_Forge_Internals): Goes a layer deeper and explains the internals of Forge +3. [Monarch 101](3_Monarch_101): It's a 101 to Monarch and how Forge Talks to Monarch + +Each part builds upon the next and the entire section can be consumed in roughly an hour - Grab a Chai and Enjoy! + +If you're eager, please checkout our SFT Tutorial too (Coming soon!)! + +.. toctree:: + :maxdepth: 1 + + 1_RL_and_Forge_Fundamentals + 2_Forge_Internals + 3_Monarch_101 diff --git a/docs/source/tutorials.md b/docs/source/tutorials.md index 6e06c636a..42339dfcb 100644 --- a/docs/source/tutorials.md +++ b/docs/source/tutorials.md @@ -1,9 +1,10 @@ # Tutorials - This section provides step-by-step guides to help you master TorchForge's capabilities, - from basic model fine-tuning to advanced distributed training scenarios. +This section provides step-by-step guides to help you master TorchForge's capabilities, +from basic model fine-tuning to advanced distributed training scenarios. ```{toctree} :maxdepth: 1 +zero-to-forge-intro ``` diff --git a/docs/source/zero-to-forge-intro.md b/docs/source/zero-to-forge-intro.md new file mode 100644 index 000000000..c7c31fdf1 --- /dev/null +++ b/docs/source/zero-to-forge-intro.md @@ -0,0 +1,28 @@ +# Zero to Forge: From RL Theory to Production-Scale Implementation + +A comprehensive guide for ML Engineers building distributed RL systems for language models. + +Some of the examples mentioned below will be conceptual in nature for understanding. Please refer to API Docs (Coming Soon!) for more details + +Welcome to the Tutorials section! This section is inspired by the A-Z PyTorch tutorial, shoutout to our PyTorch friends that remember! + +## Tutorial Structure + +This section currently is structured in 3 detailed parts: + +1. [RL Fundamentals and Understanding Forge Terminology](tutorials/zero-to-forge/1_RL_and_Forge_Fundamentals): This gives a quick refresher of Reinforcement Learning and teaches you Forge Fundamentals +2. [Forge Internals](tutorials/zero-to-forge/2_Forge_Internals): Goes a layer deeper and explains the internals of Forge +3. [Monarch 101](tutorials/zero-to-forge/3_Monarch_101): It's a 101 to Monarch and how Forge Talks to Monarch + +Each part builds upon the next and the entire section can be consumed in roughly an hour - Grab a Chai and Enjoy! + +If you're eager, please checkout our SFT Tutorial too (Coming soon!)! + +```{toctree} +:maxdepth: 1 +:hidden: + +tutorials/zero-to-forge/1_RL_and_Forge_Fundamentals +tutorials/zero-to-forge/2_Forge_Internals +tutorials/zero-to-forge/3_Monarch_101 +```