Skip to content

Conversation

@aoshen524
Copy link
Contributor

  • Add configuration parameters for SHM cache in RolloutConfig
  • Implement SHM sender cache (P0/Engine side) and receiver cache (P1/Worker side)
  • Replace print() with structured logging using logger
  • Add conditional resource initialization (only when mm_shm_cache_gb > 0)
  • Add cache statistics methods (get_shm_cache_stats, log_shm_cache_stats)
  • Improve type hints with TYPE_CHECKING
  • Add comprehensive module-level documentation
  • Performance: ~10x speedup for multi-turn multi-modal workloads with large images/videos

What does this PR do?

Add concise overview of what this PR aims to achieve or accomplish. Reference related GitHub issues and PRs that help with the review.

Checklist Before Starting

  • Search for similar PRs. Paste at least one query link here: ...
  • Format the PR title as [{modules}] {type}: {description} (This will be checked by the CI)
    • {modules} include fsdp, megatron, sglang, vllm, rollout, trainer, ci, training_utils, recipe, hardware, deployment, ray, worker, single_controller, misc, perf, model, algo, env, tool, ckpt, doc, data, cfg, reward
    • If this PR involves multiple modules, separate them with , like [megatron, fsdp, doc]
    • {type} is in feat, fix, refactor, chore, test
    • If this PR breaks any API (CLI arguments, config, function signature, etc.), add [BREAKING] to the beginning of the title.
    • Example: [BREAKING][fsdp, megatron] feat: dynamic batching

Test

For changes that can not be tested by CI (e.g., algorithm implementation, new model support), validate by experiment(s) and show results like training curve plots, evaluation results, etc.

API and Usage Example

Demonstrate how the API changes if any, and provide usage example(s) if possible.

# Add code snippet or script demonstrating how to use this

Design & Code Changes

Demonstrate the high-level design if this PR is complex, and list the specific changes.

Checklist Before Submitting

Important

Please check all the following items before requesting a review, otherwise the reviewer might deprioritize this PR for review.

- Add configuration parameters for SHM cache in RolloutConfig
- Implement SHM sender cache (P0/Engine side) and receiver cache (P1/Worker side)
- Replace print() with structured logging using logger
- Add conditional resource initialization (only when mm_shm_cache_gb > 0)
- Add cache statistics methods (get_shm_cache_stats, log_shm_cache_stats)
- Improve type hints with TYPE_CHECKING
- Add comprehensive module-level documentation
- Performance: ~10x speedup for multi-turn multi-modal workloads with large images/videos
Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a shared memory (SHM) cache to optimize multi-turn multi-modal workloads by reducing IPC overhead. The implementation includes configuration in RolloutConfig, sender and receiver cache logic, and statistics logging. While the overall approach is sound and a valuable performance enhancement, I've identified several critical issues. There is a file descriptor leak in the FileLock implementation that could lead to resource exhaustion. Additionally, there's a consistent misuse of a configuration parameter (mm_processor_cache_gb instead of the newly introduced mm_shm_cache_gb) across multiple files, which will prevent the SHM cache feature from being enabled and working correctly. These issues must be addressed before merging.

Comment on lines 79 to 120
class FileLock:
"""
A cross-process file-based lock that can be used across Ray Actors.
This is needed because multiprocessing.Lock() cannot be shared across Ray Actors.
The lock file is created in /dev/shm for fast access (same as shared memory).
"""

def __init__(self, lock_file: str):
self.lock_file = lock_file
self._fd = None

def __enter__(self):
self._fd = open(self.lock_file, "w")
fcntl.flock(self._fd, fcntl.LOCK_EX)
return self

def __exit__(self, exc_type, exc_val, exc_tb):
if self._fd:
fcntl.flock(self._fd, fcntl.LOCK_UN)
self._fd.close()
self._fd = None
return False

def acquire(self, blocking: bool = True) -> bool:
self._fd = open(self.lock_file, "w")
try:
if blocking:
fcntl.flock(self._fd, fcntl.LOCK_EX)
else:
fcntl.flock(self._fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
return True
except BlockingIOError:
self._fd.close()
self._fd = None
return False

def release(self):
if self._fd:
fcntl.flock(self._fd, fcntl.LOCK_UN)
self._fd.close()
self._fd = None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

The FileLock class has a potential file descriptor leak. Both __enter__ and acquire methods open the lock file via open(self.lock_file, "w") without checking if a file descriptor self._fd is already held. If acquire is called multiple times on the same instance without an intervening release, file descriptors will be leaked. The file should only be opened if self._fd is None.

class FileLock:
    """
    A cross-process file-based lock that can be used across Ray Actors.
    This is needed because multiprocessing.Lock() cannot be shared across Ray Actors.

    The lock file is created in /dev/shm for fast access (same as shared memory).
    """

    def __init__(self, lock_file: str):
        self.lock_file = lock_file
        self._fd = None

    def __enter__(self):
        if self._fd is None:
            self._fd = open(self.lock_file, "w")
        fcntl.flock(self._fd, fcntl.LOCK_EX)
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        if self._fd:
            fcntl.flock(self._fd, fcntl.LOCK_UN)
            self._fd.close()
            self._fd = None
        return False

    def acquire(self, blocking: bool = True) -> bool:
        if self._fd is None:
            self._fd = open(self.lock_file, "w")
        try:
            if blocking:
                fcntl.flock(self._fd, fcntl.LOCK_EX)
            else:
                fcntl.flock(self._fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
            return True
        except BlockingIOError:
            self._fd.close()
            self._fd = None
            return False

    def release(self):
        if self._fd:
            fcntl.flock(self._fd, fcntl.LOCK_UN)
            self._fd.close()
            self._fd = None

Comment on lines 168 to 170
if mm_config is None or mm_config.mm_processor_cache_gb <= 0:
logger.info("[P0 SHM Cache] DISABLED: mm_processor_cache_gb=0 or mm_config=None")
return None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

The code checks for mm_config.mm_processor_cache_gb to enable the SHM cache, but the new configuration parameter added in RolloutConfig is mm_shm_cache_gb. This mismatch will prevent the feature from being enabled as intended.

Suggested change
if mm_config is None or mm_config.mm_processor_cache_gb <= 0:
logger.info("[P0 SHM Cache] DISABLED: mm_processor_cache_gb=0 or mm_config=None")
return None
if mm_config is None or getattr(mm_config, "mm_shm_cache_gb", 0) <= 0:
logger.info("[P0 SHM Cache] DISABLED: mm_shm_cache_gb=0 or mm_config=None")
return None

SingleWriterShmRingBuffer,
)

cache_gb = mm_config.mm_processor_cache_gb
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

This line uses mm_processor_cache_gb to set the cache size. It should use mm_shm_cache_gb to be consistent with the new configuration parameters.

Suggested change
cache_gb = mm_config.mm_processor_cache_gb
cache_gb = mm_config.mm_shm_cache_gb

Comment on lines +256 to +259
if mm_config.mm_processor_cache_gb <= 0:
raise RuntimeError(
f"[P1 SHM Cache] mm_processor_cache_gb={mm_config.mm_processor_cache_gb} but SHM cache is enabled"
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

The code checks for mm_config.mm_processor_cache_gb, which is inconsistent with the newly added mm_shm_cache_gb configuration. This will raise a RuntimeError incorrectly. Please use mm_shm_cache_gb.

Suggested change
if mm_config.mm_processor_cache_gb <= 0:
raise RuntimeError(
f"[P1 SHM Cache] mm_processor_cache_gb={mm_config.mm_processor_cache_gb} but SHM cache is enabled"
)
if getattr(mm_config, "mm_shm_cache_gb", 0) <= 0:
raise RuntimeError(
f"[P1 SHM Cache] mm_shm_cache_gb={getattr(mm_config, 'mm_shm_cache_gb', 0)} but SHM cache is enabled"
)

f"[P1 SHM Cache] mm_processor_cache_gb={mm_config.mm_processor_cache_gb} but SHM cache is enabled"
)

cache_gb = mm_config.mm_processor_cache_gb
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

This line uses mm_processor_cache_gb to set the cache size. It should use mm_shm_cache_gb to be consistent with the new configuration parameters.

Suggested change
cache_gb = mm_config.mm_processor_cache_gb
cache_gb = mm_config.mm_shm_cache_gb

mm_config = model_config.get_multimodal_config()

# Only generate SHM names if cache is enabled
if mm_config and getattr(mm_config, "mm_processor_cache_gb", 0) > 0:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

The code checks for mm_processor_cache_gb to decide whether to initialize the SHM cache, but the new configuration parameter is mm_shm_cache_gb. This will prevent the SHM cache from being enabled.

Suggested change
if mm_config and getattr(mm_config, "mm_processor_cache_gb", 0) > 0:
if mm_config and getattr(mm_config, "mm_shm_cache_gb", 0) > 0:

- Fix FileLock file descriptor leak in __enter__ and acquire methods
- Change mm_processor_cache_gb to mm_shm_cache_gb for consistency
- Use getattr with default values for safer config access
- Addresses Gemini bot critical review comments
- Change all mm_shm_cache_gb to mm_processor_cache_gb (vLLM native parameter)
- Set mm_processor_cache_gb default to 4.0 (matching vLLM default)
- Set mm_shm_cache_max_object_size_mb default to 128 (matching vLLM default)
- Restore mm_shm_cache_name_prefix and mm_shm_cache_lock_prefix for verl compatibility
- Remove version-specific comments
- Pass vLLM parameters through hf_overrides
- Update example.sh with correct parameter names
@wuxibin89
Copy link
Collaborator

The ExternalZeroMQDistributedExecutor is going to be deprecated and use vllm's own MultiprocExecutor, see: #4280

@wuxibin89 wuxibin89 mentioned this pull request Jan 12, 2026
28 tasks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants