Skip to content

Conversation

@garrett4wade
Copy link
Collaborator

Description

This PR introduces a significant refactoring to support single-controller training, which provides the following benefits:

  • Centralized staleness control: Resolves straggler issues when length distributions are imbalanced
  • Infrastructure agnostic: The script can run anywhere with connectivity to the cluster scheduler
  • Optimized data communication: Decreased data communication overhead through under-the-hood optimizations
  • Enhanced scalability: Enables auto-scaling of inference workloads and intelligent request scheduling (in the future)

Current Limitations (Proof of Concept)

This implementation is a proof of concept with the following known limitations:

  1. Only local scheduler support is implemented
  2. Dataset and generated rollouts are not yet distributed across processes
  3. Rollout statistics are not exportable
  4. Auto-scaling and request scheduling are not yet implemented

Comparison to Previous Approaches

This PR differs from #410, #415, and #489 primarily in its approach to RPC security and request scheduling during rollout. It provides a cleaner, more efficient implementation with minimal changes to existing APIs.

Roadmap

Over the next few weeks, we will break down these changes into smaller, focused PRs and merge them incrementally into the main branch.

Related Issue

Fixes #260

Type of Change

  • Bug fix (non-breaking change that fixes an issue)
  • New feature (non-breaking change that adds functionality)
  • Breaking change (fix or feature that would cause existing functionality to not work as expected)
  • Documentation update
  • Code refactoring (no functional changes)
  • Performance improvement
  • Test coverage improvement

Checklist

  • I have read the Contributing Guide
  • I have run formatting tools (pre-commit or manual)
  • I have run relevant unit tests and they pass
  • I have added tests for new functionality
  • I have updated documentation if needed
  • My branch is up to date with main
  • This PR introduces breaking changes (if yes, fill out details below)
  • If this PR changes documentation, I have built and previewed it locally with jb build docs
  • No critical issues raised by AI reviewers (/gemini review)

Additional Context

Discussion about implementing RolloutController: #469

@garrett4wade garrett4wade marked this pull request as draft November 4, 2025 05:36
@gemini-code-assist
Copy link
Contributor

Summary of Changes

Hello @garrett4wade, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request introduces a significant refactoring to support single-controller training, which provides centralized staleness control, infrastructure agnostic, optimized data communication, and enhanced scalability. It includes new scheduling features, a RolloutController implementation, an abstract Scheduler API, and a LocalScheduler implementation.

Highlights

  • Single-Controller Training: Introduces a refactoring to support single-controller training, centralizing staleness control and optimizing data communication.
  • New Scheduling Features: Adds SchedulingStrategy and SchedulingSpec dataclasses to cli_args.py, enabling more flexible resource allocation and task scheduling.
  • RolloutController Implementation: Implements RolloutController to manage distributed InferenceEngine workers, including request scheduling, load balancing, and staleness control.
  • Abstract Scheduler API: Defines an abstract Scheduler API with methods for worker creation, deletion, and engine management.
  • Local Scheduler Implementation: Provides a LocalScheduler implementation for managing worker subprocesses on a single node.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a major and well-executed refactoring to support a single-controller architecture for training. The changes significantly improve modularity, API clarity, and testability by introducing dedicated controllers, a new scheduler API, and a more robust RPC mechanism. The new LocalScheduler and the controller implementations are comprehensive. My feedback focuses on a few areas to enhance robustness and maintainability, such as improving the safety of subprocess creation, making data merging logic less heuristic, and refining the use of magic strings for control flow.

Comment on lines +345 to +348
if "attention_mask" in first_result:
return DistributedBatchMemory.concat(
[DistributedBatchMemory.from_dict(r) for r in results]
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The logic to decide how to merge dictionaries by checking for the presence of "attention_mask" is a bit heuristic and could be brittle. If a batch-like dictionary that needs padding doesn't happen to have this key, or a non-batch dictionary does, the merging logic will be incorrect.

A more robust approach would be to have a more explicit way to identify batch data that needs special concatenation. For example, the remote method could wrap batch data in a specific container class, or return metadata indicating the type of the result.

This would make the merging logic more reliable and less dependent on convention.

Comment on lines 299 to 305
def wait_quiet(
self, count: int, timeout: float | None = None
) -> dict[str, Any] | None:
try:
return self._engine.wait(count, timeout=timeout)
except TimeoutError:
return "NO_RESULT"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The wait_quiet method's return type hint dict[str, Any] | None does not match its implementation, which can also return the string "NO_RESULT". This can lead to type checker errors and confusion.

Additionally, using a magic string like "NO_RESULT" to signal a timeout is not ideal. It makes the code harder to understand and maintain.

Consider one of these alternatives:

  1. Update the type hint to dict[str, Any] | None | Literal["NO_RESULT"] to be accurate, as suggested below.
  2. A better approach would be to avoid the magic string. Let this method re-raise the TimeoutError or a custom exception, and have the caller in RolloutController handle it. This would make the control flow clearer.
  3. If you want to avoid exceptions, return a sentinel object instead of a string, e.g., NO_RESULT = object(). This is safer than string comparisons.
Suggested change
def wait_quiet(
self, count: int, timeout: float | None = None
) -> dict[str, Any] | None:
try:
return self._engine.wait(count, timeout=timeout)
except TimeoutError:
return "NO_RESULT"
def wait_quiet(
self, count: int, timeout: float | None = None
) -> dict[str, Any] | None | str:
try:
return self._engine.wait(count, timeout=timeout)
except TimeoutError:
return "NO_RESULT"

Comment on lines +394 to +399
process = subprocess.Popen(
cmd,
shell=isinstance(cmd, str),
stdout=sys.stdout,
stderr=sys.stdout,
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The use of shell=True with subprocess.Popen (implicitly set because cmd is a string) can be a security risk if any part of the command string is derived from untrusted input. While the risk seems low in this context, it's best practice to avoid shell=True. The command string is constructed to use a pipe (| tee), which necessitates shell=True.

A safer approach would be to manage the output redirection in Python by reading from the subprocess's stdout/stderr and writing to both the log file and sys.stdout. This would allow you to pass the command as a list of arguments to Popen without shell=True.

For example, this could be done in a separate thread to avoid blocking:

# cmd_list should be a list of arguments, not a string
with open(log_file, "ab") as log_f:
    process = subprocess.Popen(
        cmd_list, 
        env=env, 
        stdout=subprocess.PIPE, 
        stderr=subprocess.STDOUT
    )
    for line in iter(process.stdout.readline, b''):
        sys.stdout.buffer.write(line)
        sys.stdout.flush()
        log_f.write(line)

This is more complex but avoids the risks of shell injection and makes stream handling more explicit.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Feature] Add single-controller mode

2 participants