diff --git a/docs/Tutorials/1_RL_and_Forge_Fundamentals.MD b/docs/Tutorials/1_RL_and_Forge_Fundamentals.MD new file mode 100644 index 000000000..39b6d62aa --- /dev/null +++ b/docs/Tutorials/1_RL_and_Forge_Fundamentals.MD @@ -0,0 +1,395 @@ +# Part 1: RL Fundamentals - Using Forge Terminology + +## Core RL Components in Forge + +Let's start with a simple math tutoring example to understand RL concepts with the exact names Forge uses: + +### The Toy Example: Teaching Math + +```mermaid +graph TD + subgraph Example["Math Tutoring RL Example"] + Dataset["Dataset: math problems"] + Policy["Policy: student AI"] + Reward["Reward Model: scores answers"] + Reference["Reference Model: baseline"] + ReplayBuffer["Replay Buffer: stores experiences"] + Trainer["Trainer: improves student"] + end + + Dataset --> Policy + Policy --> Reward + Policy --> Reference + Reward --> ReplayBuffer + Reference --> ReplayBuffer + ReplayBuffer --> Trainer + Trainer --> Policy + + style Policy fill:#4CAF50 + style Reward fill:#FF9800 + style Trainer fill:#E91E63 +``` + +### RL Components Defined (Forge Names) + +1. **Dataset**: Provides questions/prompts (like "What is 2+2?") +2. **Policy**: The AI being trained (generates answers like "The answer is 4") +3. **Reward Model**: Evaluates answer quality (gives scores like 0.95) +4. **Reference Model**: Original policy copy (prevents drift from baseline) +5. **Replay Buffer**: Stores experiences (question + answer + score) +6. **Trainer**: Updates the policy weights based on experiences + +### The RL Learning Flow + +```python +# CONCEPTUAL EXAMPLE - see apps/grpo/main.py for GRPO Code + +def conceptual_rl_step(): + # 1. Get a math problem + question = dataset.sample() # "What is 2+2?" + + # 2. Student generates answer + answer = policy.generate(question) # "The answer is 4" + + # 3. Teacher grades it + score = reward_model.evaluate(question, answer) # 0.95 + + # 4. Compare to original student + baseline = reference_model.compute_logprobs(question, answer) + + # 5. Store the experience + experience = Episode(question, answer, score, baseline) + replay_buffer.add(experience) + + # 6. When enough experiences collected, improve student + batch = replay_buffer.sample(curr_policy_version=0) + if batch is not None: + trainer.train_step(batch) # Student gets better! + +# 🔄 See complete working example below with actual Forge service calls +``` + +## From Concepts to Forge Services + +Here's the key insight: **Each RL component becomes a Forge service**. The toy example above maps directly to Forge: + +```mermaid +graph LR + subgraph Concepts["RL Concepts"] + C1["Dataset"] + C2["Policy"] + C3["Reward Model"] + C4["Reference Model"] + C5["Replay Buffer"] + C6["Trainer"] + end + + subgraph Services["Forge Services (Real Classes)"] + S1["DatasetActor"] + S2["Policy"] + S3["RewardActor"] + S4["ReferenceModel"] + S5["ReplayBuffer"] + S6["RLTrainer"] + end + + C1 --> S1 + C2 --> S2 + C3 --> S3 + C4 --> S4 + C5 --> S5 + C6 --> S6 + + style C2 fill:#4CAF50 + style S2 fill:#4CAF50 + style C3 fill:#FF9800 + style S3 fill:#FF9800 +``` + +### RL Step with Forge Services + +Let's look at the example from above again, but this time we would use the names from Forge: + +```python +# Conceptual Example + +async def conceptual_forge_rl_step(services, step): + # 1. Get a math problem - Using actual DatasetActor API + sample = await services['dataloader'].sample.call_one() + question, target = sample["request"], sample["target"] + + # 2. Student generates answer - Using actual Policy API + responses = await services['policy'].generate.route(prompt=question) + answer = responses[0].text + + # 3. Teacher grades it - Using actual RewardActor API + score = await services['reward_actor'].evaluate_response.route( + prompt=question, response=answer, target=target + ) + + # 4. Compare to baseline - Using actual ReferenceModel API + # Note: ReferenceModel.forward requires input_ids, max_req_tokens, return_logprobs + ref_logprobs = await services['ref_model'].forward.route( + input_ids, max_req_tokens, return_logprobs=True + ) + + # 5. Store experience - Using actual Episode structure from apps/grpo/main.py + episode = create_episode_from_response(responses[0], score, ref_logprobs, step) + await services['replay_buffer'].add.call_one(episode) + + # 6. Improve student - Using actual training pattern + batch = await services['replay_buffer'].sample.call_one( + curr_policy_version=step + ) + if batch is not None: + inputs, targets = batch + loss = await services['trainer'].train_step.call(inputs, targets) + return loss +``` + +**Key difference**: Same RL logic, but each component is now a distributed, fault-tolerant, auto-scaling service. + +Did you realise-we are not worrying about any Infra code here! Forge Automagically handles the details behind the scenes and you can focus on writing your RL Algorthms! + + +## Why This Matters: Traditional ML Infrastructure Fails + +### The Infrastructure Challenge + +Our simple RL loop above has complex requirements: + +#### Problem 1: Different Resource Needs + +| Component | Resource Needs | Scaling Strategy | +|-----------|----------------|------------------| +| **Policy** (Student AI) | Large GPU memory | Multiple replicas for throughput | +| **Reward Heuristic** (Teacher) | Small compute | CPU or small GPU | +| **Trainer** (Tutor) | Massive GPU compute | Distributed training | +| **Dataset** (Question Bank) | CPU intensive I/O | High memory bandwidth | + +### Problem 2: Complex Interdependencies + +```mermaid +graph LR + A["Policy: Student AI
'What is 2+2?' → 'The answer is 4'"] + B["Reward: Teacher
Scores answer: 0.95"] + C["Reference: Original Student
Provides baseline comparison"] + D["Replay Buffer: Notebook
Stores: question + answer + score"] + E["Trainer: Tutor
Improves student using experiences"] + + A --> B + A --> C + B --> D + C --> D + D --> E + E --> A + + style A fill:#4CAF50 + style B fill:#FF9800 + style C fill:#2196F3 + style D fill:#8BC34A + style E fill:#E91E63 +``` + +Each step has different: +- **Latency requirements**: Policy inference needs low latency (each episode waits), training can batch multiple episodes together +- **Scaling patterns**: Need N policy replicas to keep trainer busy, plus different sharding strategies (tensor parallel for training vs replicated inference) +- **Failure modes**: Any component failure cascades to halt the entire pipeline (Forge prevents this with automatic failover) +- **Resource utilization**: GPUs for inference/training, CPUs for data processing + +### Problem 3: The Coordination Challenge + +Unlike supervised learning where you process independent batches, RL requires coordination: + +```python +# While this does work, it creates bottlenecks and resource waste +def naive_rl_step(): + # Policy waits idle while reward model works + response = policy_model.generate(prompt) # GPU busy + reward = reward_model.evaluate(prompt, response) # Policy GPU idle + + # Training waits for single episode + loss = compute_loss(response, reward) # Batch size = 1, inefficient + + # Everything stops if any component fails + if policy_fails or reward_fails or trainer_fails: + entire_system_stops() +``` + +## Enter Forge: RL-Native Architecture + +Forge solves these problems by treating each RL component as an **independent, distributed unit** - some as fault-tolerant services (like Policy inference where failures are easy to handle), others as actors (like Trainers where recovery semantics differ) + +Let's see how core RL concepts map to Forge components (you'll notice a mix of `.route()` for services and `.call_one()` for actors - we cover when to use each in Part 2): + +**Quick API Reference:** (covered in detail in Part 2: Service Communication Patterns) +- `.route()` - Send request to any healthy replica in a service (load balanced) +- `.call_one()` - Send request to a single actor instance +- `.fanout()` - Send request to ALL replicas in a service + +```python +async def real_rl_training_step(services, step): + """Single RL step using verified Forge APIs""" + + # 1. Environment interaction - Using actual DatasetActor API + sample = await services['dataloader'].sample.call_one() + prompt, target = sample["request"], sample["target"] + + responses = await services['policy'].generate.route(prompt) + + # 2. Reward computation - Using actual RewardActor API + score = await services['reward_actor'].evaluate_response.route( + prompt=prompt, response=responses[0].text, target=target + ) + + # 3. Get reference logprobs - Using actual ReferenceModel API + # Note: ReferenceModel requires full input_ids tensor, not just tokens + input_ids = torch.cat([responses[0].prompt_ids, responses[0].token_ids]) + ref_logprobs = await services['ref_model'].forward.route( + input_ids.unsqueeze(0), max_req_tokens=512, return_logprobs=True + ) + + # 4. Experience storage - Using actual Episode pattern from GRPO + episode = create_episode_from_response(responses[0], score, ref_logprobs, step) + await services['replay_buffer'].add.call_one(episode) + + # 5. Learning - Using actual trainer pattern + batch = await services['replay_buffer'].sample.call_one( + curr_policy_version=step + ) + if batch is not None: + inputs, targets = batch # GRPO returns (inputs, targets) tuple + loss = await services['trainer'].train_step.call(inputs, targets) + + # 6. Policy synchronization - Using actual weight update pattern + await services['trainer'].push_weights.call(step + 1) + await services['policy'].update_weights.fanout(step + 1) + + return loss +``` + +**Key insight**: Each line of RL pseudocode becomes a service call. The complexity of distribution, scaling, and fault tolerance is hidden behind these simple interfaces. + +## What Makes This Powerful + +### Automatic Resource Management +```python +responses = await policy.generate.route(prompt=question) +answer = responses[0].text # responses is list[Completion] +``` + +Forge handles behind the scenes: +- Routing to least loaded replica +- GPU memory management +- Batch optimization +- Failure recovery +- Auto-scaling based on demand + +### Independent Scaling +```python + +from forge.actors.policy import Policy +from forge.actors.replay_buffer import ReplayBuffer +from forge.actors.reference_model import ReferenceModel +from forge.actors.trainer import RLTrainer +from apps.grpo.main import DatasetActor, RewardActor, ComputeAdvantages +from forge.data.rewards import MathReward, ThinkingReward +import asyncio +import torch + +model = "Qwen/Qwen3-1.7B" +group_size = 1 + +( + dataloader, + policy, + trainer, + replay_buffer, + compute_advantages, + ref_model, + reward_actor, +) = await asyncio.gather( + # Dataset actor (CPU) + DatasetActor.options(procs=1).as_actor( + path="openai/gsm8k", + revision="main", + data_split="train", + streaming=True, + model=model, + ), + # Policy service with GPU + Policy.options(procs=1, with_gpus=True, num_replicas=1).as_service( + engine_config={ + "model": model, + "tensor_parallel_size": 1, + "pipeline_parallel_size": 1, + "enforce_eager": False + }, + sampling_config={ + "n": group_size, + "max_tokens": 16, + "temperature": 1.0, + "top_p": 1.0 + } + ), + # Trainer actor with GPU + RLTrainer.options(procs=1, with_gpus=True).as_actor( + # Trainer config would come from YAML in real usage + model={"name": "qwen3", "flavor": "1.7B", "hf_assets_path": f"hf://{model}"}, + optimizer={"name": "AdamW", "lr": 1e-5}, + training={"local_batch_size": 2, "seq_len": 2048} + ), + # Replay buffer (CPU) + ReplayBuffer.options(procs=1).as_actor( + batch_size=2, + max_policy_age=1, + dp_size=1 + ), + # Advantage computation (CPU) + ComputeAdvantages.options(procs=1).as_actor(), + # Reference model with GPU + ReferenceModel.options(procs=1, with_gpus=True).as_actor( + model={"name": "qwen3", "flavor": "1.7B", "hf_assets_path": f"hf://{model}"}, + training={"dtype": "bfloat16"} + ), + # Reward actor (CPU) + RewardActor.options(procs=1, num_replicas=1).as_service( + reward_functions=[MathReward(), ThinkingReward()] + ) + ) +``` + +**Forge Components: Services vs Actors** + +Forge has two types of distributed components: +- **Services**: Multiple replicas with automatic load balancing (like Policy, RewardActor) +- **Actors**: Single instances that handle their own internal distribution (like RLTrainer, ReplayBuffer) + +We cover this distinction in detail in Part 2, but for now this explains the scaling patterns: +- Policy service: num_replicas=8 for high inference demand +- RewardActor service: num_replicas=16 for parallel evaluation +- RLTrainer actor: Single instance with internal distributed training + + +### Fault Tolerance +```python +# If a policy replica fails: +responses = await policy.generate.route(prompt=question) +answer = responses[0].text +# -> Forge automatically routes to healthy replica +# -> Failed replica respawns in background +# -> No impact on training loop + +# If reward service fails: +score = await reward_actor.evaluate_response.route( + prompt=question, response=answer, target=target +) +``` + +- Retries on different replica automatically +- Graceful degradation if all replicas fail +- System continues (may need application-level handling) + +This is fundamentally different from monolithic RL implementations where any component failure stops everything! + +In the next Section, we will go a layer deeper and learn how ForgeServices work. Continue to [Part 2 here](./2_Forge_Internals.MD) diff --git a/docs/Tutorials/2_Forge_Internals.MD b/docs/Tutorials/2_Forge_Internals.MD new file mode 100644 index 000000000..1a9421a96 --- /dev/null +++ b/docs/Tutorials/2_Forge_Internals.MD @@ -0,0 +1,671 @@ +# Part 2: Peeling Back the Abstraction - What Are Services? + +We highly recommend reading [Part 1](./1_RL_and_Forge_Fundamentals.MD) before this, it explains RL Concepts and how they land in Forge. + +Now that you see the power of the service abstraction, let's understand what's actually happening under the hood, Grab your chai! + +## Service Anatomy: Beyond the Interface + +When you call `await policy_service.generate(question)`, here's what actually happens: + +(Don't worry, we will understand Services right in the next section!) + +```mermaid +graph TD + Call["Your Code:
await policy_service.generate"] + + subgraph ServiceLayer["Service Layer"] + Proxy["Service Proxy: Load balancing, Health checking"] + LB["Load Balancer: Replica selection, Circuit breaker"] + end + + subgraph Replicas["Replica Management"] + R1["Replica 1: GPU 0, Healthy"] + R2["Replica 2: GPU 1, Overloaded"] + R3["Replica 3: GPU 2, Failed"] + R4["Replica 4: GPU 3, Healthy"] + end + + subgraph Compute["Actual Computation"] + Actor["Policy Actor: vLLM engine, Model weights, KV cache"] + end + + Call --> Proxy + Proxy --> LB + LB --> R1 + LB -.-> R2 + LB -.-> R3 + LB --> R4 + R1 --> Actor + R4 --> Actor + + style Call fill:#4CAF50 + style LB fill:#FF9800 + style R3 fill:#F44336 + style Actor fill:#9C27B0 +``` + +## Service Components Deep Dive + +### 1. Real Service Configuration + +Here's the actual ServiceConfig from Forge source code: + +```python +# Configuration pattern from apps/grpo/main.py: +Policy.options( + procs=1, # Processes per replica + num_replicas=4, # Number of replicas + with_gpus=True # Allocate GPUs + # Other available options: + # hosts=None # the number of remote hosts used per replica +) +``` + +### 2. Real Service Creation + +Services are created using the `.options().as_service()` pattern from the actual GRPO implementation: + +The service creation automatically handles: +- Spawning actor replicas across processes/GPUs +- Load balancing with .route() method for services +- Health monitoring and failure recovery +- Message routing and serialization + +```python +from forge.actors.policy import Policy + +model = "Qwen/Qwen3-1.7B" + +policy = await Policy.options( + procs=1, + with_gpus=True, + num_replicas=1 +).as_service( + engine_config={ + "model": model, + "tensor_parallel_size": 1, + "pipeline_parallel_size": 1, + "enforce_eager": False + }, + sampling_config={ + "n": 1, + "max_tokens": 16, + "temperature": 1.0, + "top_p": 1.0 + } +) + +prompt = "What is 3 + 5?" +responses = await policy.generate.route(prompt) +print(f"Response: {responses[0].text}") + +# Cleanup when done +await policy.shutdown() +``` + +### 3. How Services Actually Work + +Forge services are implemented as ServiceActors that manage collections of your ForgeActor replicas: + +When you call `.as_service()`, Forge creates a `ServiceInterface` that manages N replicas of your `ForgeActor` class and gives you methods like `.route()`, `.fanout()`, etc. + +```python +# Your code sees this simple interface: +responses = await policy.generate.route(prompt=prompt) +# But Forge handles all the complexity of replica management, load balancing, and fault tolerance +``` + +## Communication Patterns: Quick Reference + +**API Summary:** +- `.route()` - Send request to any healthy replica in a service (load balanced) +- `.call_one()` - Send request to a single actor instance +- `.fanout()` - Send request to ALL replicas in a service + +```mermaid +graph LR + subgraph Request["Your Request"] + Code["await service.method.ADVERB()"] + end + + subgraph Patterns["Communication Patterns"] + Route[".route()
→ One healthy replica"] + CallOne[".call_one()
→ Single actor"] + Fanout[".fanout()
→ ALL replicas"] + end + + subgraph Replicas["Replicas/Actors"] + R1["Replica 1"] + R2["Replica 2"] + R3["Replica 3"] + A1["Actor"] + end + + Code --> Route + Code --> CallOne + Code --> Fanout + + Route --> R2 + CallOne --> A1 + Fanout --> R1 + Fanout --> R2 + Fanout --> R3 + + style Route fill:#4CAF50 + style CallOne fill:#FF9800 + style Fanout fill:#9C27B0 +``` + +## Deep Dive: Service Communication Patterns + +These communication patterns (\"adverbs\") determine how your service calls are routed to replicas. Understanding when to use each pattern is key to effective Forge usage. + +### 1. `.route()` - Load Balanced Single Replica + +**When to use**: Normal request routing where any replica can handle the request. + +```python +responses = await policy.generate.route(prompt=question) +answer = responses[0].text # Extract text from Completion object +``` + +Behind the scenes: +1. Health check eliminates failed replicas +2. Load balancer picks replica (currently round robin, configurable balancers coming soon) +3. Request routes to that specific replica +4. Automatic retry on different replica if failure + +**Performance characteristics**: +- **Latency**: Lowest (single network hop) +- **Throughput**: Limited by single replica capacity +- **Fault tolerance**: Automatic failover to other replicas + +**Critical insight**: `.route()` is your default choice for stateless operations in Forge services. + +### 2. `.fanout()` - Broadcast with Results Collection + +**When to use**: You need responses from ALL replicas. + +```python +# Get version from all policy replicas +current_versions = await policy.get_version.fanout() +# Returns: [version_replica_1, version_replica_2, ...] + +# Update weights on all replicas +await policy.update_weights.fanout(new_policy_version) +# Broadcasts to all replicas simultaneously +``` + +**Performance characteristics**: +- **Latency**: Slowest replica determines total latency +- **Throughput**: Network bandwidth × number of replicas +- **Fault tolerance**: Fails if ANY replica fails (unless configured otherwise) + +**Critical gotcha**: Don't use `.fanout()` for high-frequency operations - it contacts all replicas. + +### 3. Streaming Operations - Custom Implementation Pattern + +**When to use**: You want to process results as they arrive, not wait for all. + +```python +# CONCEPTUAL - Streaming requires custom implementation in your training loop +# The basic ReplayBuffer doesn't have built-in streaming methods +# Pattern from apps/grpo/main.py continuous training: + +while training: + # This is the real API call pattern + batch = await replay_buffer.sample.call_one(curr_policy_version=step) + if batch is not None: + # Process batch immediately + loss = await trainer.train_step.call_one(batch) + print(f"Training loss: {loss}") + else: + await asyncio.sleep(0.1) # Wait for more data +``` + +**Performance characteristics**: +- **Latency**: Process first result immediately +- **Throughput**: Non-blocking async operations (much higher than waiting for full batches) +- **Fault tolerance**: Continues if some replicas fail + +**Critical insight**: This is essential for high-throughput RL where you can't wait for batches. + +### 3. Service Sessions for Stateful Operations + +**When to use**: When you need multiple calls to hit the same replica (like KV cache preservation). + +**What are sticky sessions?** A session ensures all your service calls within the `async with` block go to the same replica, instead of being load-balanced across different replicas. + +```python +# This Counter example demonstrates the difference between regular routing and sessions + +from forge.controller import ForgeActor +from monarch.actor import endpoint + +class ForgeCounter(ForgeActor): + def __init__(self, initial_value: int): + self.value = initial_value + + @endpoint + def increment(self) -> int: + self.value += 1 + return self.value + + @endpoint + def get_value(self) -> int: + return self.value + + @endpoint + async def reset(self): + self.value = 0 + +counter_service = await ForgeCounter.options( + procs=1, num_replicas=4 +).as_service(initial_value=0) + +# WITHOUT SESSIONS: Each .route() call goes to a different replica +await counter_service.increment.route() # Might go to replica 2 +await counter_service.increment.route() # Might go to replica 1 +await counter_service.increment.route() # Might go to replica 3 + +results = await counter_service.increment.fanout() # Get from all replicas +print(f"All replica values: {results}") +# Output: All replica values: [1, 2, 1, 1] - Each replica has different state! +``` + +The problem: each `.route()` call can go to different replicas, creating inconsistent state. + +```python +# WITH SESSIONS: All calls go to the SAME replica +print("\nUsing sticky sessions:") +async with counter_service.session(): # Creates a session that picks one replica + await counter_service.reset.route() # Uses .route() within session + print(await counter_service.increment.route()) # 1 + print(await counter_service.increment.route()) # 2 + print(await counter_service.increment.route()) # 3 + + final_value = await counter_service.get_value.route() + print(f"Final value on this replica: {final_value}") # 3 + +# Output: +# Using sticky sessions: +# 1 +# 2 +# 3 +# Final value on this replica: 3 + +# Same pattern works with Policy for multi-turn conversations: +# async with policy.session(): +# response1 = await policy.generate.route(turn1) +# full_prompt = turn1 + response1[0].text + turn2 +# response2 = await policy.generate.route(full_prompt) +# # Both calls hit same replica, preserving KV cache + +# Cleanup +await counter_service.shutdown() +``` + +**Performance impact**: Critical for maintaining KV cache in multi-turn conversations. + +## Deep Dive: State Management Reality + +The most complex challenge in distributed RL is maintaining state consistency while maximizing performance. + +### The KV Cache Problem + +**The challenge**: Policy inference is much faster with KV cache, but cache is tied to specific conversation history. + +```python +# This breaks KV cache optimization: +async def naive_multi_turn(): + # Each call might go to different replica = cache miss + response1 = await policy_service.generate.choose(question1) + response2 = await policy_service.generate.choose(question1 + response1) # Cache miss! + response3 = await policy_service.generate.choose(conversation_so_far) # Cache miss! +``` + +**The solution**: Sticky sessions ensure all calls go to same replica. + +```python +async def optimized_multi_turn(): + async with policy.session(): + # All calls guaranteed to hit same replica = cache hits + response1 = await policy.generate.route(prompt=question1) + full_prompt = question1 + response1[0].text + response2 = await policy.generate.route(prompt=full_prompt) # Cache hit! + conversation = full_prompt + response2[0].text + response3 = await policy.generate.route(prompt=conversation) # Cache hit! + + # Session ends, replica can be garbage collected or reused +``` + +**Performance impact**: Maintaining KV cache across turns avoids recomputing previous tokens. + +### Replay Buffer Consistency + +**The challenge**: Multiple trainers and experience collectors reading/writing concurrently. + +**Real Forge approach**: The ReplayBuffer actor handles concurrency internally: + +```python +# Forge ReplayBuffer endpoints (verified from source code) +# Add episodes (thread-safe by actor model) +await replay_buffer.add.call_one(episode) # .choose() would work too, but .call_one() clarifies it's a singleton actor not ActorMesh + +# Sample batches for training +batch = await replay_buffer.sample.call_one( + curr_policy_version=step_number, + batch_size=None # Optional parameter, uses default from config +) + +# Additional methods available: +# await replay_buffer.clear.call_one() # Clear buffer +# await replay_buffer.evict.call_one(curr_policy_version) # Remove old episodes +# state = await replay_buffer.state_dict.call_one() # Get state for checkpointing +``` + +**Critical insight**: The actor model provides natural thread safety - each actor processes messages sequentially. + +### Weight Synchronization Strategy + +**The challenge**: Trainer updates policy weights, but policy service needs those weights. + +```python +# Forge weight synchronization pattern from apps/grpo/main.py +async def real_weight_sync(trainer, policy, step): + # Trainer pushes weights to TorchStore with version number + await trainer.push_weights.call_one(policy_version=step + 1) + + # Policy service updates to new version from TorchStore + # Use .fanout() to update ALL policy replicas + await policy.update_weights.fanout(policy_version=step + 1) + +# Check current policy version +current_version = await policy.get_version.route() +print(f"Current policy version: {current_version}") +``` + +## Deep Dive: Asynchronous Coordination Patterns + +**The real challenge**: Different services run at different speeds, but Forge's service abstraction handles the coordination complexity. + +### The Forge Approach: Let Services Handle Coordination + +Instead of manual coordination, Forge services handle speed mismatches automatically: + +```python +from apps.grpo.main import Episode, Group + +async def simple_rl_step(): + + # ===== Generate a rollout ===== + sample = await dataloader.sample.call_one() # DatasetActor is an actor, not service + prompt, target = sample["request"], sample["target"] # Correct field names + + print(f"Prompt: {prompt}") + print(f"Target: {target}") + + actions = await policy.generate.route(prompt=prompt) # Policy is a service + print(f"Policy response: {actions[0].text}") + + # Create input tensor for reference model (requires full context) + input_ids = torch.cat([actions[0].prompt_ids, actions[0].token_ids]) + ref_logprobs = await ref_model.forward.route( + input_ids.unsqueeze(0), max_req_tokens=512, return_logprobs=True + ) + reward = await reward_actor.evaluate_response.route( # RewardActor is a service + prompt=prompt, + response=actions[0].text, + target=target + ) + print(f"Reward: {reward}") + + # Create episode using actual GRPO Episode structure + episode = Episode( + episode_id="0", + request=prompt, + policy_version=0, + pad_id=tokenizer.pad_token_id, + request_len=512, + response_len=512, + target=target + ) + + # Add response data + episode.response = actions[0].text + episode.request_tokens = actions[0].prompt_ids.tolist() + episode.response_tokens = actions[0].token_ids.tolist() + episode.ref_logprobs = ref_logprobs[0] # Extract from batch dimension + episode.reward = reward + + # Compute advantages using actual ComputeAdvantages actor + group = Group.new_group(0, 1, prompt, 0, tokenizer.pad_token_id, 512, 512, target) + group.episodes[0] = episode + advantages = await compute_advantages.compute.call_one(group) # ComputeAdvantages is an actor + episode.advantage = advantages[0] + print(f"Advantage: {advantages[0]}") + await replay_buffer.add.call_one(episode) # ReplayBuffer is an actor + print("Episode stored in replay buffer") + + # ===== Train on the batch ===== + batch = await replay_buffer.sample.call_one(curr_policy_version=0) + if batch is not None: + print("Training on batch...") + inputs, targets = batch # GRPO returns (inputs, targets) tuple + loss = await trainer.train_step.call(inputs, targets) # RLTrainer is an actor + print(f"Training loss: {loss}") + return loss + else: + print("Not enough data in buffer yet") + return None + +# Note: This simplified example assumes tokenizer and services are already initialized +for step in range(10): + print(f"\n--- RL Step {step + 1} ---") + loss = await simple_rl_step() + if loss: + print(f"Step {step + 1} complete, loss: {loss:.4f}") + else: + print(f"Step {step + 1} complete, building buffer...") +``` + +### Handling Speed Mismatches with Service Scaling + +**The insight**: Scale services independently based on their bottlenecks. + +```python +# Scale fast services with more replicas +policy = await Policy.options( + procs=1, num_replicas=8, with_gpus=True # Many replicas for high throughput +).as_service( + engine_config={"model": model_name, "tensor_parallel_size": 1} +) + +# Reward evaluation might be CPU-bound +reward_actor = await RewardActor.options( + procs=1, num_replicas=16, with_gpus=False # More CPU replicas +).as_service( + reward_functions=[MathReward()] +) + +# Training needs fewer but more powerful replicas +trainer = await RLTrainer.options( + procs=1, with_gpus=True # Fewer but GPU-heavy +).as_actor( # Trainer typically uses .as_actor() not .as_service() + model={"name": "qwen3", "flavor": "1.7B"}, + optimizer={"name": "AdamW", "lr": 1e-5} +) +``` + +## Service Implementation Example + +Let's see how a reward service is actually implemented: + +```python +# Exact RewardActor from apps/grpo/main.py + +from forge.controller import ForgeActor +from monarch.actor import endpoint +from forge.data.rewards import MathReward, ThinkingReward + +# class definition from apps/grpo/main.py +class RewardActor(ForgeActor): + def __init__(self, reward_functions: list): + self.reward_functions = reward_functions + + @endpoint + async def evaluate_response(self, prompt: str, response: str, target: str) -> float: + """Evaluate response quality using multiple reward functions""" + total_reward = 0.0 + + for reward_fn in self.reward_functions: + # Each reward function contributes to total score + reward = reward_fn(prompt, response, target) + total_reward += reward + + # Return average reward across all functions + return total_reward / len(self.reward_functions) if self.reward_functions else 0.0 + +reward_actor = await RewardActor.options( + procs=1, num_replicas=1 +).as_service( + reward_functions=[MathReward(), ThinkingReward()] +) + +prompt = "What is 15% of 240?" +response = "15% of 240 is 36" +target = "36" + +score = await reward_actor.evaluate_response.route( + prompt=prompt, + response=response, + target=target +) +print(f"Reward score: {score}") # Usually around 1.0 for correct math answers + +# For production scaling - increase num_replicas for parallel evaluation: +# RewardActor.options(procs=1, num_replicas=16) # 16 parallel evaluators + +# Cleanup when done +await reward_actor.shutdown() +``` + +## Service Orchestration: The Training Loop + +Now let's see how services coordinate in a real training loop: + +```python +# This is the REAL way production RL systems are built with Forge + +import asyncio +import torch +from forge.actors.policy import Policy +from forge.actors.reference_model import ReferenceModel +from forge.actors.replay_buffer import ReplayBuffer +from forge.actors.trainer import RLTrainer +from apps.grpo.main import DatasetActor, RewardActor, ComputeAdvantages +from forge.data.rewards import MathReward, ThinkingReward + +# Service creation pattern from apps/grpo/main.py lines 322-344 +print("Initializing all services...") +( + dataloader, + policy, + trainer, + replay_buffer, + compute_advantages, + ref_model, + reward_actor, +) = await asyncio.gather( + DatasetActor.options(procs=1).as_actor( + path="openai/gsm8k", revision="main", data_split="train", + streaming=True, model="Qwen/Qwen3-1.7B" + ), + Policy.options(procs=1, with_gpus=True, num_replicas=1).as_service( + engine_config={"model": "Qwen/Qwen3-1.7B", "tensor_parallel_size": 1}, + sampling_config={"n": 1, "max_tokens": 512} + ), + RLTrainer.options(procs=1, with_gpus=True).as_actor( + model={"name": "qwen3", "flavor": "1.7B", "hf_assets_path": "hf://Qwen/Qwen3-1.7B"}, + optimizer={"name": "AdamW", "lr": 1e-5}, + training={"local_batch_size": 2, "seq_len": 2048} + ), + ReplayBuffer.options(procs=1).as_actor( + batch_size=2, max_policy_age=1, dp_size=1 + ), + ComputeAdvantages.options(procs=1).as_actor(), + ReferenceModel.options(procs=1, with_gpus=True).as_actor( + model={"name": "qwen3", "flavor": "1.7B", "hf_assets_path": "hf://Qwen/Qwen3-1.7B"} + ), + RewardActor.options(procs=1, num_replicas=1).as_service( + reward_functions=[MathReward(), ThinkingReward()] + ), +) + +print("All services initialized successfully!") + +async def production_training_loop(): + """Real training loop pattern from apps/grpo/main.py""" + step = 0 + + while True: + # Data generation + sample = await dataloader.sample.call_one() + + # Policy generation service call + responses = await policy.generate.route(sample["request"]) # Correct field name + + # Reference computation service call (requires full input tensor) + input_ids = torch.cat([responses[0].prompt_ids, responses[0].token_ids]) + ref_logprobs = await ref_model.forward.route( + input_ids.unsqueeze(0), max_req_tokens=512, return_logprobs=True + ) + + # Reward evaluation service call + reward = await reward_actor.evaluate_response.route( + prompt=sample["question"], + response=responses[0].text, + target=sample["answer"] + ) + + # Experience storage (using actual Episode structure) + episode = create_episode_from_grpo_data(sample, responses[0], reward, ref_logprobs[0], step) + await replay_buffer.add.call_one(episode) + + # Training when ready + batch = await replay_buffer.sample.call_one(curr_policy_version=step) + if batch is not None: + inputs, targets = batch # GRPO returns (inputs, targets) tuple + loss = await trainer.train_step.call(inputs, targets) + + # Weight synchronization pattern + await trainer.push_weights.call(step + 1) + await policy.update_weights.fanout(step + 1) # Fanout to all replicas + + print(f"Step {step}, Loss: {loss:.4f}") + step += 1 + +print("Shutting down services...") +await asyncio.gather( + DatasetActor.shutdown(dataloader), + policy.shutdown(), + RLTrainer.shutdown(trainer), + ReplayBuffer.shutdown(replay_buffer), + ComputeAdvantages.shutdown(compute_advantages), + ReferenceModel.shutdown(ref_model), + reward_actor.shutdown(), +) +print("All services shut down successfully!") +``` + +**Key observations:** +1. **Parallelism**: Independent operations run concurrently +2. **Load balancing**: Each `.route()` call automatically selects optimal replica +3. **Fault tolerance**: Failures automatically retry on different replicas +4. **Resource efficiency**: CPU and GPU services scale independently +5. **Coordination**: Services coordinate through shared state (replay buffer, weight versions) + +This is the power of the service abstraction - complex distributed coordination looks like simple async Python code. + +In the next part we will learn about [Monarch internals](./3_Monarch_101.MD) diff --git a/docs/Tutorials/3_Monarch_101.MD b/docs/Tutorials/3_Monarch_101.MD new file mode 100644 index 000000000..2213e9bb5 --- /dev/null +++ b/docs/Tutorials/3_Monarch_101.MD @@ -0,0 +1,342 @@ +# Part 3: The Forge-Monarch Connection + +This is part 3 of our series, in the previous sections: we learned Part 1: [RL Concepts and how they map to Forge](./1_RL_and_Forge_Fundamentals.MD), Part 2: [Forge Internals](./2_Forge_Internals.MD). + +Now let's peel back the layers. Forge services are built on top of **Monarch**, PyTorch's distributed actor framework. Understanding this connection is crucial for optimization and debugging. + +## The Complete Hierarchy: Service to Silicon + +```mermaid +graph TD + subgraph YourCode["1. Your RL Code"] + Call["await policy_service.generate.route('What is 2+2?')"] + end + + subgraph ForgeServices["2. Forge Service Layer"] + ServiceInterface["ServiceInterface: Routes requests, Load balancing, Health checks"] + ServiceActor["ServiceActor: Manages replicas, Monitors health, Coordinates failures"] + end + + subgraph MonarchLayer["3. Monarch Actor Layer"] + ActorMesh["ActorMesh PolicyActor: 4 instances, Different GPUs, Message passing"] + ProcMesh["ProcMesh: 4 processes, GPU topology 0,1,2,3, Network interconnect"] + end + + subgraph Hardware["4. Physical Hardware"] + GPU0["GPU 0: PolicyActor #1, vLLM Engine, Model Weights"] + GPU1["GPU 1: PolicyActor #2, vLLM Engine, Model Weights"] + GPU2["GPU 2: PolicyActor #3, vLLM Engine, Model Weights"] + GPU3["GPU 3: PolicyActor #4, vLLM Engine, Model Weights"] + end + + Call --> ServiceInterface + ServiceInterface --> ServiceActor + ServiceActor --> ActorMesh + ActorMesh --> ProcMesh + ProcMesh --> GPU0 + ProcMesh --> GPU1 + ProcMesh --> GPU2 + ProcMesh --> GPU3 + + style Call fill:#4CAF50 + style ServiceActor fill:#FF9800 + style ActorMesh fill:#9C27B0 + style ProcMesh fill:#2196F3 +``` + +## Deep Dive: ProcMesh - The Foundation + +**ProcMesh** is Monarch's core abstraction for organizing processes across hardware. Think of it as a multi-dimensional grid that maps directly to your cluster topology. + +### Single Host ProcMesh + +**Key insight**: ProcMesh creates one process per GPU, automatically handling the process-to-hardware mapping. + +```python +# This simple call: +procs = this_host().spawn_procs(per_host={"gpus": 8}) + +# Creates: +# Process 0 → GPU 0 +# Process 1 → GPU 1 +# Process 2 → GPU 2 +# Process 3 → GPU 3 +# Process 4 → GPU 4 +# Process 5 → GPU 5 +# Process 6 → GPU 6 +# Process 7 → GPU 7 +``` + +The beauty: you don't manage individual processes or GPU assignments - ProcMesh handles the topology for you. + +### Multi-Host ProcMesh + +**Key insight**: ProcMesh seamlessly scales across multiple hosts with continuous process numbering. + +```python +# Same simple API works across hosts: +cluster_procs = spawn_cluster_procs( + hosts=["host1", "host2", "host3"], + per_host={"gpus": 4} +) + +# Automatically creates: +# Host 1: Processes 0-3 → GPUs 0-3 +# Host 2: Processes 4-7 → GPUs 0-3 +# Host 3: Processes 8-11 → GPUs 0-3 + +# Your code stays the same whether it's 1 host or 100 hosts +actors = cluster_procs.spawn("my_actor", MyActor) +``` + +**The power**: Scale from single host to cluster without changing your actor code - ProcMesh handles all the complexity. + +```python +# This shows the underlying actor system that powers Forge services +# NOTE: This is for educational purposes - use ForgeActor and .as_service() in real Forge apps! + +from monarch.actor import Actor, endpoint, this_proc, Future +from monarch.actor import ProcMesh, this_host +import asyncio + +# STEP 1: Define a basic actor +class Counter(Actor): + def __init__(self, initial_value: int): + self.value = initial_value + + @endpoint + def increment(self) -> None: + self.value += 1 + + @endpoint + def get_value(self) -> int: + return self.value + +# STEP 2: Single actor in local process +counter: Counter = this_proc().spawn("counter", Counter, initial_value=0) + +# STEP 3: Send messages +fut: Future[int] = counter.get_value.call_one() +value = await fut +print(f"Counter value: {value}") # 0 + +# STEP 4: Multiple actors across processes +procs: ProcMesh = this_host().spawn_procs(per_host={"gpus": 8}) +counters: Counter = procs.spawn("counters", Counter, 0) + +# STEP 5: Broadcast to all actors +await counters.increment.call() + +# STEP 6: Different message patterns +# call_one() - single actor +value = await counters.get_value.call_one() +print(f"One counter: {value}") # Output: One counter: 1 + +# choose() - random single actor (actors only, not services) +value = await counters.get_value.choose() +print(f"Random counter: {value}") # Output: Random counter: 1 + +# call() - all actors, collect results +values = await counters.get_value.call() +print(f"All counters: {values}") # Output: All counters: [1, 1, 1, 1, 1, 1, 1, 1] + +# broadcast() - fire and forget +await counters.increment.broadcast() # No return value - just sends to all actors + +# Cleanup +await procs.stop() + +# Remember: This raw Monarch code is for understanding how Forge works internally. +# In your Forge applications, use ForgeActor, .as_service(), .as_actor() instead! +``` + +## Actor Meshes: Your Code Running Distributed + +**ActorMesh** is created when you spawn actors across a ProcMesh. Key points: + +- **One actor instance per process**: `mesh.spawn("policy", PolicyActor)` creates one PolicyActor in each process +- **Same constructor arguments**: All instances get the same initialization parameters +- **Independent state**: Each actor instance maintains its own state and memory +- **Message routing**: You can send messages to one actor or all actors using different methods + +```python +# Simple example: +procs = spawn_procs(per_host={"gpus": 4}) # 4 processes +policy_actors = procs.spawn("policy", PolicyActor, model="Qwen/Qwen3-7B") + +# Now you have 4 PolicyActor instances, one per GPU +# All initialized with the same model parameter +``` + +## How Forge Services Use Monarch + +Now the key insight: **Forge services are ServiceActors that manage ActorMeshes of your ForgeActor replicas**. + +### The Service Creation Process + +```mermaid +graph TD + subgraph ServiceCreation["Service Creation Process"] + Call["await PolicyActor.options(num_replicas=4, procs=1).as_service(model='Qwen')"] + + ServiceActor["ServiceActor: Manages 4 replicas, Health checks, Routes calls"] + + subgraph Replicas["4 Independent Replicas"] + subgraph R0["Replica 0"] + PM0["ProcMesh: 1 process, GPU 0"] + AM0["ActorMesh
1 PolicyActor"] + end + + subgraph R1["Replica 1"] + PM1["ProcMesh: 1 process, GPU 1"] + AM1["ActorMesh
1 PolicyActor"] + end + + subgraph R2["Replica 2"] + PM2["ProcMesh: 1 process, GPU 2"] + AM2["ActorMesh
1 PolicyActor"] + end + + subgraph R3["Replica 3"] + PM3["ProcMesh: 1 process, GPU 3"] + AM3["ActorMesh
1 PolicyActor"] + end + end + + Call --> ServiceActor + ServiceActor --> R0 + ServiceActor --> R1 + ServiceActor --> R2 + ServiceActor --> R3 + PM0 --> AM0 + PM1 --> AM1 + PM2 --> AM2 + PM3 --> AM3 + end + + style ServiceActor fill:#FF9800 + style AM0 fill:#4CAF50 + style AM1 fill:#4CAF50 + style AM2 fill:#4CAF50 + style AM3 fill:#4CAF50 +``` + +### Service Call to Actor Execution + +```mermaid +graph TD + subgraph CallFlow["Complete Call Flow"] + UserCall["await policy_service.generate.route('What is 2+2?')"] + + ServiceInterface["ServiceInterface: Receives .route() call, Routes to ServiceActor"] + + ServiceActor["ServiceActor: Selects healthy replica, Load balancing, Failure handling"] + + SelectedReplica["Selected Replica #2: ProcMesh 1 process, ActorMesh 1 PolicyActor"] + + PolicyActor["PolicyActor Instance: Loads model, Runs vLLM inference"] + + GPU["GPU 2: vLLM engine, Model weights, KV cache, CUDA kernels"] + + UserCall --> ServiceInterface + ServiceInterface --> ServiceActor + ServiceActor --> SelectedReplica + SelectedReplica --> PolicyActor + PolicyActor --> GPU + + GPU -.->|"Response"| PolicyActor + PolicyActor -.->|"Response"| SelectedReplica + SelectedReplica -.->|"Response"| ServiceActor + ServiceActor -.->|"Response"| ServiceInterface + ServiceInterface -.->|"'The answer is 4'"| UserCall + end + + style UserCall fill:#4CAF50 + style ServiceActor fill:#FF9800 + style PolicyActor fill:#9C27B0 + style GPU fill:#FF5722 +``` + +## Multiple Services Sharing Infrastructure + +In real RL systems, you have multiple services that can share or use separate ProcMeshes: + +```mermaid +graph TD + subgraph Cluster["RL Training Cluster"] + subgraph Services["Forge Services"] + PS["Policy Service
4 GPU replicas"] + TS["Trainer Service
2 GPU replicas"] + RS["Reward Service
4 CPU replicas"] + BS["Buffer Service
1 CPU replica"] + end + + subgraph MonarchInfra["Monarch Infrastructure"] + subgraph GPUMesh["GPU ProcMesh (6 processes)"] + G0["Process 0
GPU 0"] + G1["Process 1
GPU 1"] + G2["Process 2
GPU 2"] + G3["Process 3
GPU 3"] + G4["Process 4
GPU 4"] + G5["Process 5
GPU 5"] + end + + subgraph CPUMesh["CPU ProcMesh (5 processes)"] + C0["Process 0
CPU"] + C1["Process 1
CPU"] + C2["Process 2
CPU"] + C3["Process 3
CPU"] + C4["Process 4
CPU"] + end + end + + PS --> G0 + PS --> G1 + PS --> G2 + PS --> G3 + TS --> G4 + TS --> G5 + RS --> C0 + RS --> C1 + RS --> C2 + RS --> C3 + BS --> C4 + end + + style PS fill:#4CAF50 + style TS fill:#E91E63 + style RS fill:#FF9800 + style BS fill:#9C27B0 + style GPUMesh fill:#FFEBEE + style CPUMesh fill:#E3F2FD +``` + +## Key Insights: Why This Architecture Matters + +1. **Process Isolation**: Each actor runs in its own process - failures don't cascade +2. **Location Transparency**: Actors can be local or remote with identical APIs +3. **Structured Distribution**: ProcMesh maps directly to hardware topology +4. **Message Passing**: No shared memory means no race conditions or locks +5. **Service Abstraction**: Forge hides Monarch complexity while preserving power + +Understanding this hierarchy helps you: +- **Debug performance issues**: Is the bottleneck at service, actor, or hardware level? +- **Optimize resource usage**: How many replicas per service? GPU vs CPU processes? +- **Handle failures gracefully**: Which layer failed and how to recover? +- **Scale effectively**: Where to add resources for maximum impact? + +# Conclusion + +## What You've Learned + +1. **RL Fundamentals**: How RL concepts map to Forge services with REAL, working examples +2. **Service Abstraction**: How to use Forge services effectively with verified communication patterns +3. **Monarch Foundation**: How Forge services connect to distributed actors and hardware + +## Key Takeaways + +- **Services hide complexity**: Your RL code looks like simple async functions, but runs on distributed clusters +- **Communication patterns matter**: `.route()`, `.fanout()`, sessions, and `.call_one()` each serve specific purposes +- **Architecture understanding helps**: Knowing the Service → Actor → Process → Hardware hierarchy helps you debug, optimize, and scale +- **Always verify APIs**: This guide is verified, but cross-check with source code for latest changes +- **Real API patterns**: Use `.options().as_service()` not `spawn_service()`, use `.route()` not `.choose()`, etc. diff --git a/docs/Tutorials/ReadMe.MD b/docs/Tutorials/ReadMe.MD new file mode 100644 index 000000000..084710853 --- /dev/null +++ b/docs/Tutorials/ReadMe.MD @@ -0,0 +1,19 @@ +## Zero to Forge: From RL Theory to Production-Scale Implementation + +A comprehensive guide for ML Engineers building distributed RL systems for language models. + +Some of the examples mentioned below will be conceptual in nature for understanding. Please refer to API Docs (Coming Soon!) for more details + +Welcome to the Tutorials section! This section is inspired by the A-Z PyTorch tutorial, shoutout to our PyTorch friends that remember! + +### + +This section currently is structured in 3 detailed parts: + +1. [RL Fundamentals and Understanding Forge Terminology](./1_RL_and_Forge_Fundamentals.MD): This gives a quick refresher of Reinforcement Learning and teaches you Forge Fundamentals +2. [Forge Internals](./2_Forge_Internals.MD): Goes a layer deeper and explains the internals of Forge +3. [Monarch 101](./3_Monarch_101.MD): It's a 101 to Monarch and how Forge Talks to Monarch + +Each part builds upon the next and the entire section can be consumed in roughly an hour-Grab a Chai and Enjoy! + +If you're eager, please checkout our SFT Tutorial too (Coming soon!) as well as [App Examples](../../apps/).