Skip to content

feat: Enable P2P Transfers via NIXL#135

Open
KavinKrishnan wants to merge 1 commit intomainfrom
kavink/p2p_nixl_transfers
Open

feat: Enable P2P Transfers via NIXL#135
KavinKrishnan wants to merge 1 commit intomainfrom
kavink/p2p_nixl_transfers

Conversation

@KavinKrishnan
Copy link
Contributor

@KavinKrishnan KavinKrishnan commented Jan 27, 2026

Summary

This PR introduces high-performance GPU-to-GPU model weight transfers using NIXL (NVIDIA Interconnect eXchange Library) over RDMA, enabling sub-second weight replication between vLLM instances.
Key Features

🚀 P2P RDMA Weight Transfer:

  • Zero-copy GPU transfers: Model weights transfer directly from source GPU memory to target GPU memory over InfiniBand/RoCE without CPU involvement
  • vLLM integration: Custom mx-source and mx-target load formats that integrate with vLLM's weight loading pipeline
  • FP8 support: Handles DeepSeek-V3 style FP8 quantized models with proper weight scale transfer
  • Multi-GPU support: Tensor parallel (TP=8) transfers with per-worker coordination

Summary by CodeRabbit

  • New Features

    • P2P GPU weight transfer system with NIXL RDMA support for distributed model loading
    • Python client library with gRPC and ZMQ coordination
    • Kubernetes deployment examples for vLLM with ModelExpress integration
    • Redis-based metadata coordination service
  • Documentation

    • Added comprehensive guides for architecture, deployment, and development workflows
  • Chores

    • Updated copyright year range to 2025-2026 across repository
    • Added Redis dependency to workspace configuration

✏️ Tip: You can customize this high-level summary in your review settings.

@github-actions github-actions bot added the feat label Jan 27, 2026
@KavinKrishnan KavinKrishnan marked this pull request as draft January 27, 2026 18:44
@KavinKrishnan KavinKrishnan force-pushed the kavink/p2p_nixl_transfers branch from d3262f6 to 4bf04b7 Compare January 28, 2026 03:01
@KavinKrishnan KavinKrishnan force-pushed the kavink/p2p_nixl_transfers branch from 727b361 to 4bf04b7 Compare January 29, 2026 22:24
@KavinKrishnan KavinKrishnan force-pushed the kavink/p2p_nixl_transfers branch from 4bf04b7 to 18ba08f Compare January 29, 2026 22:41
@KavinKrishnan KavinKrishnan force-pushed the kavink/p2p_nixl_transfers branch from 18ba08f to 58f75b7 Compare January 29, 2026 22:43
@KavinKrishnan KavinKrishnan marked this pull request as ready for review January 29, 2026 22:47
@coderabbitai
Copy link

coderabbitai bot commented Jan 29, 2026

Walkthrough

This pull request adds comprehensive P2P GPU weight transfer capabilities to ModelExpress via a new metadata service layer, including Rust backend components (gRPC service, Redis state management), Python client libraries (gRPC client, NIXL RDMA integration, vLLM loaders), Kubernetes deployment examples, extensive documentation, and updates copyright years to 2025-2026 across multiple files.

Changes

Cohort / File(s) Summary
Copyright Year Updates
.github/*, helm/*, docs/CLI.md, CODE_OF_CONDUCT.md, README.md, SECURITY.md, modelexpress-cli-completion.bash, rust-toolchain.toml
Updated SPDX copyright year range from 2025 to 2025-2026. No functional changes.
Workspace Dependencies
Cargo.toml, modelexpress_server/Cargo.toml
Added redis v0.27 with tokio-comp and connection-manager features for metadata coordination.
Protocol Buffer & Build
modelexpress_common/proto/p2p.proto, modelexpress_common/build.rs, modelexpress_common/src/lib.rs
Defined P2P metadata service with PublishMetadata and GetMetadata RPC endpoints; updated build script to compile proto definitions.
Rust Backend Implementation
modelexpress_server/src/p2p_service.rs, modelexpress_server/src/state.rs, modelexpress_server/src/main.rs
Implemented P2P gRPC service with Redis-backed state manager for storing/retrieving model worker metadata; integrated service into server with optional Redis connection and 100MB message size limits.
Python Metadata Types
modelexpress_client/python/modelexpress/types.py, modelexpress_client/python/modelexpress/p2p_pb2.py, modelexpress_client/python/modelexpress/p2p_pb2_grpc.py
Added TensorDescriptor, WorkerMetadata, and GetMetadataResponse dataclasses; auto-generated protobuf and gRPC stubs.
Python Client Library
modelexpress_client/python/modelexpress/__init__.py, modelexpress_client/python/modelexpress/client.py, modelexpress_client/python/modelexpress/nixl_transfer.py
Implemented ModelExpressClient for gRPC coordination and ZMQ worker connectivity; added NixlTransferManager for RDMA-based tensor registration and transfers with optional contiguous-region coalescing.
Python vLLM Integration
modelexpress_client/python/modelexpress/vllm_loader.py, modelexpress_client/python/modelexpress/vllm_extension.py
Implemented MxSourceModelLoader and MxTargetModelLoader for coordinating model weight loading/receiving with NIXL RDMA; added MxWorkerExtension for ZMQ-based weight serving with FP8 pre/post-processing hooks; includes Redis-based source coordination.
Python Package & Tests
modelexpress_client/python/pyproject.toml, modelexpress_client/python/tests/...
Added pyproject.toml with dependencies (grpcio, nixl, pyzmq, torch, redis) and dev extras; added type validation tests.
Kubernetes Deployment
examples/p2p_transfer_k8s/Dockerfile.client, examples/p2p_transfer_k8s/{model-download,modelexpress-server,vllm-source,vllm-target}.yaml
Introduced multi-stage Dockerfile patching vLLM with ModelExpress loaders; created K8s manifests for model downloading, Redis-integrated server, and source/target vLLM deployments with GPUDirect RDMA and UCX configuration.
Documentation
docs/CLAUDE.md, docs/CONTEXT.md, examples/p2p_transfer_k8s/README.md
Added comprehensive guides covering ModelExpress architecture, P2P transfer mechanics, NIXL integration, Redis coordination, FP8 handling, Kubernetes deployment workflow, and troubleshooting guidance.

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~50 minutes

Poem

🐰 Whiskers twitch with pride today,
New pathways help the weights convey,
RDMA's swift, through Redis we dance,
vLLM loaders in perfect stance,
From source to target, GPUs aligned,
A P2P transfer, beautifully designed!

🚥 Pre-merge checks | ✅ 2 | ❌ 1
❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 79.61% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Title check ✅ Passed The title 'feat: Enable P2P Transfers via NIXL' clearly and specifically describes the main feature being added to the codebase.
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@coderabbitai
Copy link

coderabbitai bot commented Jan 29, 2026

Walkthrough

This pull request introduces peer-to-peer GPU weight transfer capabilities to ModelExpress using NIXL/RDMA technology. It includes a Rust gRPC server with Redis-backed metadata coordination, Python client libraries for NIXL transfers, vLLM loader extensions, Kubernetes deployment manifests, and comprehensive documentation. Copyright years are also updated across the codebase.

Changes

Cohort / File(s) Summary
Copyright Year Updates
.github/copilot-instructions.md, .github/dco.yml, .github/workflows/ci.yml, .github/workflows/codeql.yml, .github/workflows/common.ps1, CODE_OF_CONDUCT.md, README.md, SECURITY.md, helm/.helmignore, helm/README.md, docs/CLI.md, modelexpress-cli-completion.bash, rust-toolchain.toml
Updated SPDX copyright year range from 2025 to 2025-2026 across header comments. No functional changes.
Dependency Configuration
Cargo.toml, modelexpress_server/Cargo.toml, modelexpress_client/python/pyproject.toml
Added Redis workspace dependency with tokio-comp and connection-manager features, added uuid workspace dependency to server, and created Python pyproject.toml with grpcio, nixl, protobuf, pyzmq, torch, and development dependencies.
Documentation
docs/CLAUDE.md, docs/CONTEXT.md, examples/p2p_transfer_k8s/README.md
Added comprehensive documentation covering ModelExpress architecture, NIXL integration, deployment workflows, environment variables, and troubleshooting guidance for P2P GPU weight transfers.
Kubernetes Deployment
examples/p2p_transfer_k8s/model-download.yaml, examples/p2p_transfer_k8s/modelexpress-server.yaml, examples/p2p_transfer_k8s/vllm-source.yaml, examples/p2p_transfer_k8s/vllm-target.yaml
Added Kubernetes manifests for model downloads, ModelExpress server with Redis sidecar, and source/target vLLM deployments with GPU/RDMA resource configuration, environment variables, and health checks.
Docker Configuration
examples/p2p_transfer_k8s/Dockerfile.client
Added Dockerfile for Python-based ModelExpress client within vLLM image, with gRPC/Protobuf setup, loader registration via environment variable, and vLLM model_loader patching.
Protobuf Definitions
modelexpress_common/proto/p2p.proto, modelexpress_common/build.rs
Added p2p.proto defining P2pService with PublishMetadata and GetMetadata RPCs, TensorDescriptor, WorkerMetadata message types, and updated build.rs to compile the new proto file.
Generated gRPC Code
modelexpress_client/python/modelexpress/p2p_pb2.py, modelexpress_client/python/modelexpress/p2p_pb2_grpc.py
Generated Python protobuf message classes (TensorDescriptor, WorkerMetadata, etc.) and gRPC client/server stubs for P2P service from p2p.proto schema.
Python Data Types
modelexpress_client/python/modelexpress/types.py
Added Python dataclasses TensorDescriptor, WorkerMetadata, and GetMetadataResponse to represent core P2P metadata structures.
Python NIXL Integration
modelexpress_client/python/modelexpress/nixl_transfer.py
Added NixlTransferManager class providing NIXL agent initialization, tensor registration with contiguous-region optimization, and RDMA-based receive operations with timeout/coalescing support.
Python Client Implementation
modelexpress_client/python/modelexpress/client.py
Added ModelExpressClient orchestrating P2P transfers: gRPC connection to coordinator, ZMQ communication with vLLM workers, metadata aggregation/publication, and weight receiving with transfer statistics.
vLLM Extension
modelexpress_client/python/modelexpress/vllm_extension.py
Added MxWorkerExtension enabling vLLM workers to expose weights via ZMQ and NIXL, with metadata/descriptor serving and receive request handling for RDMA transfers.
vLLM Loaders
modelexpress_client/python/modelexpress/vllm_loader.py
Added MxSourceModelLoader and MxTargetModelLoader supporting P2P weight synchronization with raw tensor RDMA registration, FP8 post-processing, Redis-backed coordination, and robust transfer retry/restart logic.
Python Public API
modelexpress_client/python/modelexpress/__init__.py
Added register_modelexpress_loaders() function to programmatically register mx-source and mx-target loaders with vLLM's model loader registry.
Python Testing
modelexpress_client/python/tests/__init__.py, modelexpress_client/python/tests/test_types.py
Added test module verifying TensorDescriptor, WorkerMetadata, and GetMetadataResponse creation, field validation, and basic invariants.
Rust P2P Service
modelexpress_server/src/p2p_service.rs
Added P2pServiceImpl implementing gRPC P2pService with publish_metadata and get_metadata RPCs, validating inputs and delegating to state manager with error handling and logging.
Rust State Management
modelexpress_server/src/state.rs
Added P2pStateManager providing Redis-backed persistence of model metadata with atomic Lua script merging, serializable record types (TensorRecord, WorkerRecord, ModelMetadataRecord), and connection management with auto-reconnect.
Rust Server Integration
modelexpress_server/src/main.rs
Integrated P2P service into gRPC server: initializes P2pStateManager with Redis connection, registers P2pServiceServer with 100 MB message size limit, and maintains graceful shutdown flow.

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~75 minutes

Poem

🐰 hops excitedly with whiskers twitching

With Redis keys and NIXL dreams so bright,
P2P transfers soar through GPU night,
vLLM workers hop and sync their weights,
Speeding models through computational gates! 🚀

–A CodeRabbit celebrates

🚥 Pre-merge checks | ✅ 2 | ❌ 1
❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 79.61% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title 'feat: Enable P2P Transfers via NIXL' is fully related to the main changes in the changeset. The PR adds comprehensive P2P (peer-to-peer) GPU weight transfer functionality using NIXL, which is clearly reflected in the title.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

Tip

🧪 Unit Test Generation v2 is now available!

We have significantly improved our unit test generation capabilities.

To enable: Add this to your .coderabbit.yaml configuration:

reviews:
  finishing_touches:
    unit_tests:
      enabled: true

Try it out by using the @coderabbitai generate unit tests command on your code files or under ✨ Finishing Touches on the walkthrough!

Have feedback? Share your thoughts on our Discord thread!


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 19

🤖 Fix all issues with AI agents
In `@docs/CLAUDE.md`:
- Around line 26-40: Add the missing language identifier "text" to the fenced
code blocks for the architecture diagram, repo tree, and FP8 diagram in
CLAUDE.md (the unlabeled triple-backtick blocks that render the ASCII diagrams)
so markdownlint MD040 is satisfied; update each fence from ``` to ```text for
the three diagram blocks (architecture diagram, repo tree, FP8 diagram) and any
other unlabeled fences in the file.
- Around line 17-20: Reformat the Markdown tables (e.g., the table starting with
the header "| Model | Status | Transfer Time | Notes |") so all pipes and column
cells are consistently spaced and aligned to satisfy markdownlint MD060; update
each affected table (lines referenced in the comment: the one with
"DeepSeek-V3..." plus the others) by padding cells to equal column widths or
running a Markdown table formatter/formatter tool, ensuring header separator
rows (---) match column widths and that there are spaces around pipe characters
for uniform alignment.
- Around line 336-338: Change the compound modifier "NIXL ready coordination
(implemented in vllm-source.yaml)" to hyphenated form "NIXL-ready coordination
(implemented in vllm-source.yaml)" in the Solutions list so the phrase reads as
a single compound adjective; update the exact string occurrence shown in the
diff.

In `@docs/CONTEXT.md`:
- Around line 479-482: Update the two debug commands that reference the
non-existent kubernetes resource "deployment/redis": replace the target with the
actual Redis deployment/service "modelexpress-redis" or run the redis-cli
against the redis container in the "modelexpress-server" pod; reference the
REDIS_URL env var location to confirm the service name. Specifically, change
"kubectl exec deployment/redis -- redis-cli ..." to either "kubectl exec
deployment/modelexpress-redis -- redis-cli ..." or "kubectl exec -it $(kubectl
get pods -l app=modelexpress-server -o jsonpath='{.items[0].metadata.name}') --
redis-cli -h modelexpress-redis ..." so the KEYS and GET commands target the
correct Redis instance.

In `@examples/p2p_transfer_k8s/model-download.yaml`:
- Around line 35-75: The Pod spec for the downloader container is running as
root and allows privilege escalation; update the template.spec ->
containers[name: downloader] to add a securityContext that sets runAsUser (e.g.,
1000), runAsGroup, fsGroup, and sets allowPrivilegeEscalation: false and
readOnlyRootFilesystem as appropriate, and also add pod-level securityContext if
needed to enforce fsGroup for PVC writes; ensure the PVC supports UID 1000 write
permissions and adjust the persistentVolumeClaim mount permissions accordingly
so the non-root user can write to /models.

In `@examples/p2p_transfer_k8s/modelexpress-server.yaml`:
- Around line 39-44: The manifest uses an initContainer named "redis" with
"restartPolicy: Always", which requires Kubernetes 1.29+ or the
SidecarContainers feature gate; fix by either documenting the minimum Kubernetes
version (>=1.29) for this chart, or move the Redis image from initContainers
into the main pod "containers" list (rename or reuse the container instead of
initContainer "redis") and implement a readiness/retry loop in the server
startup to wait for Redis, or deploy Redis as a separate Deployment/Service;
update references to initContainers/name: redis and remove or adjust
restartPolicy accordingly to match the chosen approach.
- Around line 41-86: Add a securityContext to both container specs (containers
named "redis" and "modelexpress-server") to enforce non-root and prevent
privilege escalation: set allowPrivilegeEscalation: false and runAsNonRoot: true
(and add a reasonable runAsUser UID such as 1000 if needed), and consider adding
readOnlyRootFilesystem: true and capabilities.drop: ["ALL"] to further harden
the containers; update the container blocks where ports/env/resources are
defined to include this securityContext for each container.

In `@examples/p2p_transfer_k8s/README.md`:
- Around line 7-32: Add language identifiers to the unlabeled fenced code
blocks: change the ASCII diagram block that begins with "Node A (Source - first
to start)" to use ```text and any shell/snippet blocks to use ```bash; apply the
same change to the other unlabeled fences referenced in the comment so
markdownlint MD040 is satisfied.

In `@examples/p2p_transfer_k8s/vllm-target.yaml`:
- Around line 38-147: The container "vllm" currently sets securityContext with
capabilities.add IPC_LOCK but runs as root; update the pod spec (container name
vllm, securityContext) to harden it by adding allowPrivilegeEscalation: false
and runAsNonRoot: true while keeping IPC_LOCK in capabilities.add, but first
verify the ModelExpress image
(nvcr.io/nvidian/dynamo-dev/modelexpress-p2p-client:v0.1.0-baseline) supports
non-root execution (no root-only NCCL or file paths) before merging; if the
image cannot run non-root, document the reason and leave the current privileges
until an image fix is available.

In `@modelexpress_client/python/modelexpress/client.py`:
- Around line 174-198: In _start_weight_server, validate that
self.engine_address has an allowed URL scheme before calling urlopen: parse
self.engine_address (e.g., via urllib.parse.urlparse), ensure parsed.scheme is
either "http" or "https" (and not empty/other schemes like file/ftp), and raise
a clear ValueError if not; then construct the collective_rpc URL from the
validated engine_address and proceed as before—this prevents urlopen from
accepting dangerous schemes.
- Around line 538-560: The TCP branch in generate_zmq_addresses currently only
does a string replace of ":5555" so bases like tcp://host:6000 produce identical
addresses for all ranks; update the non-IPC path in generate_zmq_addresses to
parse the URL with urllib.parse.urlparse (use base), extract the port, raise a
clear error if no port is present, and generate per-rank addresses by
incrementing the parsed port (e.g., port + rank) while preserving scheme and
hostname; ensure the resulting address is reconstructed (including netloc and
path) and appended into addresses for each rank.
- Around line 563-662: Ensure tp_size is validated and aligned with explicit
addresses: in main(), after computing zmq_addresses (from args.zmq_addresses or
generate_zmq_addresses(zmq_base, args.tp_size)), set a local tp_size =
len(zmq_addresses) and if tp_size == 0 raise/exit with a clear error; when
args.zmq_addresses was provided, override/replace args.tp_size with tp_size (or
pass tp_size into ModelExpressClient.run) so reporting and behavior (and
subsequent use in receive_from_source which creates ThreadPoolExecutor) use the
actual number of addresses rather than the original args.tp_size; add a short
check to log and exit if tp_size <= 0 to avoid creating a ThreadPoolExecutor
with max_workers=0.
- Around line 141-168: The _get_metadata method (and the analogous
PublishMetadata call) must specify a gRPC deadline to avoid indefinite hangs:
modify the calls to use the stub methods with a timeout argument (e.g.,
self._grpc_stub.GetMetadata(request, timeout=some_seconds)) and make the timeout
configurable on the client (add or use a field like self._timeout or
self._grpc_timeout set in the constructor), and wrap the call to catch
grpc.RpcError to handle deadline/exceptions gracefully; update both usages of
GetMetadata/PublishMetadata and their request types (p2p_pb2.GetMetadataRequest
/ PublishMetadataRequest) accordingly so the client fails fast when the server
is unreachable.

In `@modelexpress_client/python/modelexpress/p2p_pb2_grpc.py`:
- Around line 11-28: The generated p2p_pb2_grpc.py enforces
GRPC_GENERATED_VERSION = '1.76.0' against GRPC_VERSION (grpc.__version__), so
update the project dependency constraints to match: change the runtime
requirement grpcio to ">=1.76.0" and the dev tool grpcio-tools to ">=1.76.0" in
modelexpress_client/python/pyproject.toml so installed grpc versions cannot be
older than the generated-code guard (ensure the pyproject entries for grpcio and
grpcio-tools are updated accordingly).

In `@modelexpress_client/python/modelexpress/p2p_pb2.py`:
- Around line 15-22: The protobuf runtime check in modelexpress/p2p_pb2.py calls
_runtime_version.ValidateProtobufRuntimeVersion requiring version 6.31.1+, so
update the Python package dependency in pyproject.toml from protobuf>=4.25.0 (or
any lower bound) to protobuf>=6.31.1 to match this requirement; locate
pyproject.toml in the package and change the protobuf entry accordingly, then
run a quick install or CI to verify imports of p2p_pb2.py succeed.

In `@modelexpress_client/python/modelexpress/vllm_loader.py`:
- Around line 666-670: The log call in MxTargetModelLoader.load_model() uses an
unnecessary f-string for the static message f"[TIMING]
MxTargetModelLoader.load_model() COMPLETE"; remove the f prefix so the call
becomes a regular string and keep the surrounding _log calls using total_time
and _log unchanged to avoid creating a gratuitous formatted string.
- Around line 763-768: Unused unpacked variable: cached_metadata_hash from
SourceReadyCoordinator.wait_for_source_ready is never used; prefix it with an
underscore to mark intentional discard. Locate the call to
SourceReadyCoordinator.wait_for_source_ready (the variables source_ready,
cached_session_id, cached_metadata_hash) and rename cached_metadata_hash to
_cached_metadata_hash (or just _ if preferred) so linters recognize it as
intentionally unused; ensure no other code relies on the original name.
- Around line 176-190: The wait_for_source_ready loop is checking for
non-existent fields ("nixl_ready" and "stability_verified") while
publish_source_ready sets "ready"; update wait_for_source_ready (the function
using client.get loop) to parse the stored JSON and check ready == True (e.g.,
ready_info.get("ready")), then extract session_id and metadata_hash as before;
ensure the log and return behavior (session_id and metadata_hash extraction in
wait_for_source_ready) remain unchanged so the consumer matches the published
payload from publish_source_ready.
- Around line 134-151: publish_source_ready writes the ready flag under the
Redis key prefix "mx:ready:{model_name}:worker:{worker_id}" but
wait_for_source_ready looks for "mx:nixl_ready:{model_name}:worker:{worker_id}",
so the consumer never sees the flag; update one of the functions (either
publish_source_ready or wait_for_source_ready) to use the same key prefix as the
other (e.g., change wait_for_source_ready to check
"mx:ready:{model_name}:worker:{worker_id}" or change publish_source_ready to use
"mx:nixl_ready:...") so both functions reference the identical key string;
ensure the unique key construction (the f-string using model_name and worker_id)
and any tests or comments are updated to reflect the chosen prefix.
🧹 Nitpick comments (16)
examples/p2p_transfer_k8s/Dockerfile.client (1)

38-48: Avoid hard‑coding the Python site‑packages path.
This will break if the base image changes Python version or layout. Resolve the path dynamically.

♻️ Suggested refactor
-RUN LOADER_INIT="/usr/local/lib/python3.12/dist-packages/vllm/model_executor/model_loader/__init__.py" && \
+RUN SITE_PKGS="$(python3 - <<'PY'\nimport site\nprint(next(p for p in site.getsitepackages() if p.endswith('site-packages')))\nPY\n)" && \
+    LOADER_INIT="$SITE_PKGS/vllm/model_executor/model_loader/__init__.py" && \
     echo '' >> $LOADER_INIT && \
modelexpress_client/python/modelexpress/types.py (1)

9-31: Consider lightweight validation for metadata invariants.
A small __post_init__ guard can prevent invalid descriptors (e.g., negative size/addr or empty dtype).

♻️ Suggested refactor
 `@dataclass`
 class TensorDescriptor:
     """Descriptor for a tensor in GPU memory."""
     name: str
     addr: int
     size: int
     device_id: int
     dtype: str
+
+    def __post_init__(self) -> None:
+        if self.addr < 0 or self.size < 0:
+            raise ValueError("addr and size must be non-negative")
+        if not self.dtype:
+            raise ValueError("dtype must be set")
modelexpress_server/src/p2p_service.rs (1)

38-67: Consider returning gRPC status codes for invalid input and internal errors.

Right now, errors are encoded only in the response body. Using Status::invalid_argument (for empty model_name) and Status::internal (for storage errors) makes failures explicit to clients and aligns with gRPC expectations.

Proposed refactor
-        if req.model_name.is_empty() {
-            return Ok(Response::new(PublishMetadataResponse {
-                success: false,
-                message: "model_name is required".to_string(),
-            }));
-        }
+        if req.model_name.is_empty() {
+            return Err(Status::invalid_argument("model_name is required"));
+        }
@@
-            Err(e) => {
-                error!("Failed to publish metadata: {}", e);
-                Ok(Response::new(PublishMetadataResponse {
-                    success: false,
-                    message: format!("Failed to publish metadata: {e}"),
-                }))
-            }
+            Err(e) => {
+                error!("Failed to publish metadata: {}", e);
+                Err(Status::internal(format!("Failed to publish metadata: {e}")))
+            }
@@
-        if req.model_name.is_empty() {
-            return Ok(Response::new(GetMetadataResponse {
-                found: false,
-                workers: Vec::new(),
-            }));
-        }
+        if req.model_name.is_empty() {
+            return Err(Status::invalid_argument("model_name is required"));
+        }
@@
-            Err(e) => {
-                error!("Failed to get metadata: {}", e);
-                Ok(Response::new(GetMetadataResponse {
-                    found: false,
-                    workers: Vec::new(),
-                }))
-            }
+            Err(e) => {
+                error!("Failed to get metadata: {}", e);
+                Err(Status::internal(format!("Failed to get metadata: {e}")))
+            }

Also applies to: 76-117

examples/p2p_transfer_k8s/vllm-source.yaml (1)

69-70: Avoid hard-coded worker count in the readiness publish script.

The Python snippet uses range(8) while MX_EXPECTED_WORKERS is configurable. This can drift if the env changes. Use MX_EXPECTED_WORKERS to keep it consistent.

Proposed fix
-          args: ["set -ex; (sleep 5; while ! curl -sf http://127.0.0.1:8000/health; do sleep 5; done; sleep 30; curl -sf -X POST http://127.0.0.1:8000/v1/completions -H 'Content-Type: application/json' -d '{\"model\":\"'\"$MODEL_NAME\"'\",\"prompt\":\"1+1=\",\"max_tokens\":3}' || true; python3 -c 'import redis,json,uuid,os; h=os.environ.get(\"MX_REDIS_HOST\",\"modelexpress-server\"); m=os.environ.get(\"MODEL_NAME\",\"x\"); s=str(uuid.uuid4()); c=redis.Redis(host=h,port=6379,decode_responses=True); [c.setex(\"mx:nixl_ready:\"+m+\":worker:\"+str(w),14400,json.dumps({\"session_id\":s,\"nixl_ready\":True,\"stability_verified\":True})) or print(\"Published\",w) for w in range(8)]') & exec python3 -m vllm.entrypoints.openai.api_server --model $MODEL_NAME --load-format mx-source --tensor-parallel-size 8 --enable-expert-parallel"]
+          args: ["set -ex; (sleep 5; while ! curl -sf http://127.0.0.1:8000/health; do sleep 5; done; sleep 30; curl -sf -X POST http://127.0.0.1:8000/v1/completions -H 'Content-Type: application/json' -d '{\"model\":\"'\"$MODEL_NAME\"'\",\"prompt\":\"1+1=\",\"max_tokens\":3}' || true; python3 -c 'import redis,json,uuid,os; h=os.environ.get(\"MX_REDIS_HOST\",\"modelexpress-server\"); m=os.environ.get(\"MODEL_NAME\",\"x\"); s=str(uuid.uuid4()); n=int(os.environ.get(\"MX_EXPECTED_WORKERS\",\"8\")); c=redis.Redis(host=h,port=6379,decode_responses=True); [c.setex(\"mx:nixl_ready:\"+m+\":worker:\"+str(w),14400,json.dumps({\"session_id\":s,\"nixl_ready\":True,\"stability_verified\":True})) or print(\"Published\",w) for w in range(n)]') & exec python3 -m vllm.entrypoints.openai.api_server --model $MODEL_NAME --load-format mx-source --tensor-parallel-size 8 --enable-expert-parallel"]

Also applies to: 105-105

modelexpress_server/src/main.rs (1)

125-135: Consider making gRPC max message size configurable; current 100MB default is adequate but inflexible.

While metadata for large models (~0.46MB estimated) remains well within 100MB, making this configurable via environment variable (e.g., MX_GRPC_MAX_MESSAGE_SIZE_MB) would allow deployment-specific tuning without code changes. Consider a reasonable default (e.g., 256MB) to provide more headroom for different deployment scenarios.

Proposed fix (env-configurable size)
-    // Set max message size to 100MB for large models like DeepSeek-V3
-    const MAX_MESSAGE_SIZE: usize = 100 * 1024 * 1024;
+    // Set max message size for P2P metadata; allow env override
+    const DEFAULT_MAX_MESSAGE_SIZE_MB: usize = 256;
+    let max_message_size_mb = std::env::var("MX_GRPC_MAX_MESSAGE_SIZE_MB")
+        .ok()
+        .and_then(|v| v.parse::<usize>().ok())
+        .unwrap_or(DEFAULT_MAX_MESSAGE_SIZE_MB);
+    let max_message_size = max_message_size_mb * 1024 * 1024;
@@
-            P2pServiceServer::new(p2p_service)
-                .max_decoding_message_size(MAX_MESSAGE_SIZE)
-                .max_encoding_message_size(MAX_MESSAGE_SIZE),
+            P2pServiceServer::new(p2p_service)
+                .max_decoding_message_size(max_message_size)
+                .max_encoding_message_size(max_message_size),
modelexpress_client/python/modelexpress/vllm_extension.py (3)

109-123: Consider validating port range for TCP addresses.

The TCP port calculation at line 122 could overflow if worker_rank is large. While unlikely in practice, a bounds check would be defensive.

Proposed defensive fix
         else:
-            zmq_address = zmq_address_base.replace(":5555", f":{5555 + worker_rank}")
+            new_port = 5555 + worker_rank
+            if new_port > 65535:
+                raise ValueError(f"Computed ZMQ port {new_port} exceeds valid range")
+            zmq_address = zmq_address_base.replace(":5555", f":{new_port}")
         return zmq_address

159-171: Consider using logging.exception for better stack traces.

When logging exceptions, logging.exception automatically includes the stack trace, which aids debugging NIXL initialization failures.

Proposed fix
         except Exception as e:
-            logger.error(f"Worker {self._worker_rank}: Failed to initialize NIXL: {e}")
+            logger.exception(f"Worker {self._worker_rank}: Failed to initialize NIXL: {e}")
             self._nixl_manager = None

279-285: Consider logging the exception for transfer failures.

The exception is converted to a string but the stack trace is lost, making debugging difficult for RDMA transfer issues.

Proposed fix
         except Exception as e:
+            logger.exception(f"Worker {self._worker_rank}: receive_from failed")
             self._zmq_socket.send_pyobj({
                 "success": False,
                 "error": str(e),
             })
modelexpress_client/python/modelexpress/vllm_loader.py (3)

52-59: Consider making logging level configurable.

Hardcoding DEBUG level (lines 53, 59) may produce excessive logs in production. Consider using environment variables or config.


977-981: Use exception chaining for better debugging.

When re-raising as RuntimeError, chain the original exception to preserve the stack trace.

Proposed fix
                     else:
                         _log(f"[Worker {device_id}] Transfer failed after {transfer_retries} attempts: {transfer_err}", "ERROR")
-                        raise RuntimeError(
-                            f"Transfer failed after {transfer_retries} attempts: {transfer_err}"
-                        )
+                        raise RuntimeError(
+                            f"Transfer failed after {transfer_retries} attempts"
+                        ) from transfer_err

1027-1040: Global mutable state could cause issues in testing or multi-model scenarios.

The module-level dictionaries _raw_tensor_registry and _nixl_managers accumulate state across model loads. Consider documenting this limitation or providing a reset mechanism.

modelexpress_server/src/state.rs (1)

70-76: Potential precision loss when deserializing f64 to u64.

While this handles the cjson float case, casting f64 directly to u64 can lose precision for very large addresses. Consider validating the conversion.

Proposed defensive fix
         fn visit_f64<E>(self, value: f64) -> Result<Self::Value, E>
         where
             E: de::Error,
         {
             // Handle floats from cjson (the problematic case)
-            Ok(value as u64)
+            // Validate the float can be represented exactly as u64
+            if value < 0.0 || value > u64::MAX as f64 {
+                return Err(E::custom("f64 value out of u64 range"));
+            }
+            let converted = value as u64;
+            // Check for precision loss (round-trip check)
+            if (converted as f64 - value).abs() > 1.0 {
+                tracing::warn!("Potential precision loss converting f64 {} to u64 {}", value, converted);
+            }
+            Ok(converted)
         }
modelexpress_client/python/modelexpress/nixl_transfer.py (4)

148-158: _registered_regions attribute initialized only in one branch.

If use_contiguous is False, line 157 sets self._registered_regions = None, but if an exception occurs before that or in the True branch after line 148, subsequent calls to get_registered_descriptors could fail with AttributeError.

Proposed fix - initialize in __init__
     def __init__(self, agent_name: str, device_id: int):
         self._agent_name = agent_name
         self._device_id = device_id

         self._agent: Any = None
         self._metadata: bytes = b""
         self._tensor_descriptors: list[TensorDescriptor] = []
         self._tensors: dict[str, torch.Tensor] = {}
+        self._registered_regions: list[tuple[int, int]] | None = None

397-411: Busy-wait loop with short sleep may cause high CPU usage.

The 1ms sleep in the transfer wait loop (line 411) is very short. For long transfers, consider adaptive backoff or using a longer initial sleep.

Consider exponential backoff
         # Wait for completion
         start_wait = time.perf_counter()
+        sleep_time = 0.001  # Start at 1ms
+        max_sleep = 0.1     # Cap at 100ms
         while True:
             if timeout_seconds is not None and time.perf_counter() - start_wait >= timeout_seconds:
                 self._agent.release_xfer_handle(handle)
                 raise TimeoutError("Transfer timed out")

             status = self._agent.check_xfer_state(handle)
             if status in ("DONE", "SUCCESS"):
                 self._agent.release_xfer_handle(handle)
                 break
             if status in ("ERR", "ERROR", "FAIL"):
                 self._agent.release_xfer_handle(handle)
                 raise RuntimeError(f"Transfer failed with status {status}")
-            time.sleep(0.001)
+            time.sleep(sleep_time)
+            sleep_time = min(sleep_time * 2, max_sleep)  # Exponential backoff

468-476: Add strict=True to zip() for safety.

If remote_descs and local_tensors have different lengths, silent truncation occurs. Using strict=True would catch this mismatch.

Proposed fix
-        for remote, local in zip(remote_descs, local_tensors):
+        for remote, local in zip(remote_descs, local_tensors, strict=True):
             local_addr = local.data_ptr()
             local_size = local.numel() * local.element_size()
             indexed.append((remote, local_addr, local_size))

536-541: Consider logging before clearing state in shutdown.

If debugging is needed, the current state (agent name, tensor count) would be useful to log before clearing.

Proposed improvement
     def shutdown(self) -> None:
         """Clean up NIXL resources."""
+        if self._agent is not None:
+            logger.debug(f"Shutting down NIXL agent '{self._agent_name}' with {len(self._tensors)} tensors")
         self._agent = None
         self._metadata = b""
         self._tensor_descriptors.clear()
         self._tensors.clear()
         logger.info("NixlTransferManager shutdown complete")

docs/CLAUDE.md Outdated
Comment on lines 17 to 20
| Model | Status | Transfer Time | Notes |
|-------|--------|---------------|-------|
| DeepSeek-V3 (671B, FP8) | Working | 40-80s | 681GB across 8 GPUs |
| Llama 3.3 70B | Working | ~5s | 140GB across 4-8 GPUs |
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Fix table pipe alignment to satisfy markdownlint MD060.

Several tables trigger MD060. Please reformat the tables with consistent spacing/alignment (or run a markdown formatter) to keep pipes aligned.

Example fix for the first table (apply similarly to the other tables)
-| Model | Status | Transfer Time | Notes |
-|-------|--------|---------------|-------|
-| DeepSeek-V3 (671B, FP8) | Working | 40-80s | 681GB across 8 GPUs |
-| Llama 3.3 70B | Working | ~5s | 140GB across 4-8 GPUs |
+| Model                 | Status  | Transfer Time | Notes                     |
+|-----------------------|---------|---------------|---------------------------|
+| DeepSeek-V3 (671B, FP8) | Working | 40-80s        | 681GB across 8 GPUs       |
+| Llama 3.3 70B         | Working | ~5s           | 140GB across 4-8 GPUs     |

Also applies to: 44-49, 99-105, 268-277, 281-287, 359-363, 379-385, 400-406

🧰 Tools
🪛 markdownlint-cli2 (0.20.0)

[warning] 19-19: Table column style
Table pipe does not align with header for style "aligned"

(MD060, table-column-style)


[warning] 19-19: Table column style
Table pipe does not align with header for style "aligned"

(MD060, table-column-style)


[warning] 19-19: Table column style
Table pipe does not align with header for style "aligned"

(MD060, table-column-style)


[warning] 19-19: Table column style
Table pipe does not align with header for style "aligned"

(MD060, table-column-style)


[warning] 20-20: Table column style
Table pipe does not align with header for style "aligned"

(MD060, table-column-style)


[warning] 20-20: Table column style
Table pipe does not align with header for style "aligned"

(MD060, table-column-style)


[warning] 20-20: Table column style
Table pipe does not align with header for style "aligned"

(MD060, table-column-style)


[warning] 20-20: Table column style
Table pipe does not align with header for style "aligned"

(MD060, table-column-style)

🤖 Prompt for AI Agents
In `@docs/CLAUDE.md` around lines 17 - 20, Reformat the Markdown tables (e.g., the
table starting with the header "| Model | Status | Transfer Time | Notes |") so
all pipes and column cells are consistently spaced and aligned to satisfy
markdownlint MD060; update each affected table (lines referenced in the comment:
the one with "DeepSeek-V3..." plus the others) by padding cells to equal column
widths or running a Markdown table formatter/formatter tool, ensuring header
separator rows (---) match column widths and that there are spaces around pipe
characters for uniform alignment.

Comment on lines +26 to +40
```
Node A (Source) Node B (Target)
+---------------------------+ +---------------------------+
| vLLM + MxSourceModelLoader| | vLLM + MxTargetModelLoader|
| - Load weights from disk | | - Create dummy weights |
| - Register with NIXL | === RDMA ==>| - Receive via RDMA |
| - Publish to server | | - Run FP8 processing |
+-------------+-------------+ +-------------+-------------+
| |
v v
+---------------------------------------------------------------+
| ModelExpress Server (Rust) |
| Redis: model_name -> worker metadata |
+---------------------------------------------------------------+
```
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Add language identifiers to fenced code blocks (MD040).

The architecture diagram, repo tree, and FP8 diagram blocks lack a language tag, which triggers markdownlint. Consider using text for these fences and apply the same fix to each block.

Proposed fix (apply similarly to other unlabeled fences)
-```
+```text
   Node A (Source)                           Node B (Target)
   +---------------------------+             +---------------------------+
   | vLLM + MxSourceModelLoader|             | vLLM + MxTargetModelLoader|
   | - Load weights from disk  |             | - Create dummy weights    |
   | - Register with NIXL      | === RDMA ==>| - Receive via RDMA        |
   | - Publish to server       |             | - Run FP8 processing      |
   +-------------+-------------+             +-------------+-------------+
                 |                                         |
                 v                                         v
   +---------------------------------------------------------------+
   |                    ModelExpress Server (Rust)                  |
   |              Redis: model_name -> worker metadata              |
   +---------------------------------------------------------------+
-```
+```

Also applies to: 126-153, 306-321

🧰 Tools
🪛 markdownlint-cli2 (0.20.0)

[warning] 26-26: Fenced code blocks should have a language specified

(MD040, fenced-code-language)

🤖 Prompt for AI Agents
In `@docs/CLAUDE.md` around lines 26 - 40, Add the missing language identifier
"text" to the fenced code blocks for the architecture diagram, repo tree, and
FP8 diagram in CLAUDE.md (the unlabeled triple-backtick blocks that render the
ASCII diagrams) so markdownlint MD040 is satisfied; update each fence from ```
to ```text for the three diagram blocks (architecture diagram, repo tree, FP8
diagram) and any other unlabeled fences in the file.

Comment on lines +336 to +337
**Solutions**:
- Use robust NIXL ready coordination (implemented in vllm-source.yaml)
- Check source pod for restarts
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Hyphenate “NIXL-ready” for clarity.

Compound modifiers read better with a hyphen.

Proposed fix
-- Use robust NIXL ready coordination (implemented in vllm-source.yaml)
+- Use robust NIXL-ready coordination (implemented in vllm-source.yaml)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
**Solutions**:
- Use robust NIXL ready coordination (implemented in vllm-source.yaml)
- Check source pod for restarts
**Solutions**:
- Use robust NIXL-ready coordination (implemented in vllm-source.yaml)
- Check source pod for restarts
🧰 Tools
🪛 LanguageTool

[grammar] ~337-~337: Use a hyphen to join words.
Context: ...warmup Solutions: - Use robust NIXL ready coordination (implemented in vllm-...

(QB_NEW_EN_HYPHEN)

🤖 Prompt for AI Agents
In `@docs/CLAUDE.md` around lines 336 - 338, Change the compound modifier "NIXL
ready coordination (implemented in vllm-source.yaml)" to hyphenated form
"NIXL-ready coordination (implemented in vllm-source.yaml)" in the Solutions
list so the phrase reads as a single compound adjective; update the exact string
occurrence shown in the diff.

docs/CONTEXT.md Outdated
Comment on lines 479 to 482
# Check Redis state
kubectl exec deployment/redis -- redis-cli KEYS "mx:*"
kubectl exec deployment/redis -- redis-cli GET "mx:model:meta-llama/Llama-3.1-70B"

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Find all Redis-related resource definitions in manifests
echo "=== Redis resource definitions in manifests ==="
find . -type f \( -name "*.yaml" -o -name "*.yml" \) -exec grep -l -i "redis" {} \;
echo ""
echo "=== Checking names in YAML files ==="
rg "name:\s*['\"]?.*redis" -t yaml --color=never
echo ""
echo "=== Checking kubectl deployment references in docs ==="
rg "deployment/.*redis|exec.*redis" -t md --color=never
echo ""
echo "=== All Redis mentions in CONTEXT.md ==="
rg -n "redis" docs/CONTEXT.md --color=never -i

Repository: ai-dynamo/modelexpress

Length of output: 1621


Fix Redis deployment reference in debug commands.

The commands at lines 480-481 reference deployment/redis, which does not exist in the Kubernetes manifests. The Redis service is deployed as modelexpress-redis (as shown in the REDIS_URL environment variable on line 385). Either reference the service directly or use the redis container within the modelexpress-server deployment to avoid copy-paste failures.

🤖 Prompt for AI Agents
In `@docs/CONTEXT.md` around lines 479 - 482, Update the two debug commands that
reference the non-existent kubernetes resource "deployment/redis": replace the
target with the actual Redis deployment/service "modelexpress-redis" or run the
redis-cli against the redis container in the "modelexpress-server" pod;
reference the REDIS_URL env var location to confirm the service name.
Specifically, change "kubectl exec deployment/redis -- redis-cli ..." to either
"kubectl exec deployment/modelexpress-redis -- redis-cli ..." or "kubectl exec
-it $(kubectl get pods -l app=modelexpress-server -o
jsonpath='{.items[0].metadata.name}') -- redis-cli -h modelexpress-redis ..." so
the KEYS and GET commands target the correct Redis instance.

Comment on lines 35 to 78
ttlSecondsAfterFinished: 3600 # Clean up 1 hour after completion
template:
spec:
restartPolicy: OnFailure
containers:
- name: downloader
image: python:3.11-slim
env:
- name: HF_HUB_CACHE
value: /models
- name: MODEL_NAME
value: "deepseek-ai/DeepSeek-V3"
- name: HF_TOKEN
valueFrom:
secretKeyRef:
name: hf-token-secret
key: HF_TOKEN
command: ["bash", "-c"]
args:
- |
set -ex
pip install huggingface_hub

# Download model files (weights, config, tokenizer)
python3 -c "
from huggingface_hub import snapshot_download
snapshot_download(
'${MODEL_NAME}',
)
print('Download complete')
"
resources:
requests:
memory: "8Gi"
cpu: "4"
volumeMounts:
- name: model-cache
mountPath: /models
volumes:
- name: model-cache
persistentVolumeClaim:
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

cat -n examples/p2p_transfer_k8s/model-download.yaml | head -100

Repository: ai-dynamo/modelexpress

Length of output: 2727


Harden container security (root + privilege escalation).
This job runs as root and allows privilege escalation, which violates Pod Security Standards' restricted level. Add securityContext to run as non-root and disallow privilege escalation.

🔧 Suggested hardening
 spec:
   template:
     spec:
+      securityContext:
+        runAsNonRoot: true
+        runAsUser: 1000
+        fsGroup: 1000
       restartPolicy: OnFailure
       containers:
         - name: downloader
           image: python:3.11-slim
+          securityContext:
+            allowPrivilegeEscalation: false

This requires verifying PVC permissions support uid 1000 writes at mount time.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
ttlSecondsAfterFinished: 3600 # Clean up 1 hour after completion
template:
spec:
restartPolicy: OnFailure
containers:
- name: downloader
image: python:3.11-slim
env:
- name: HF_HUB_CACHE
value: /models
- name: MODEL_NAME
value: "deepseek-ai/DeepSeek-V3"
- name: HF_TOKEN
valueFrom:
secretKeyRef:
name: hf-token-secret
key: HF_TOKEN
command: ["bash", "-c"]
args:
- |
set -ex
pip install huggingface_hub
# Download model files (weights, config, tokenizer)
python3 -c "
from huggingface_hub import snapshot_download
snapshot_download(
'${MODEL_NAME}',
)
print('Download complete')
"
resources:
requests:
memory: "8Gi"
cpu: "4"
volumeMounts:
- name: model-cache
mountPath: /models
volumes:
- name: model-cache
persistentVolumeClaim:
ttlSecondsAfterFinished: 3600 # Clean up 1 hour after completion
template:
spec:
securityContext:
runAsNonRoot: true
runAsUser: 1000
fsGroup: 1000
restartPolicy: OnFailure
containers:
- name: downloader
image: python:3.11-slim
securityContext:
allowPrivilegeEscalation: false
env:
- name: HF_HUB_CACHE
value: /models
- name: MODEL_NAME
value: "deepseek-ai/DeepSeek-V3"
- name: HF_TOKEN
valueFrom:
secretKeyRef:
name: hf-token-secret
key: HF_TOKEN
command: ["bash", "-c"]
args:
- |
set -ex
pip install huggingface_hub
# Download model files (weights, config, tokenizer)
python3 -c "
from huggingface_hub import snapshot_download
snapshot_download(
'${MODEL_NAME}',
)
print('Download complete')
"
resources:
requests:
memory: "8Gi"
cpu: "4"
volumeMounts:
- name: model-cache
mountPath: /models
volumes:
- name: model-cache
persistentVolumeClaim:
🤖 Prompt for AI Agents
In `@examples/p2p_transfer_k8s/model-download.yaml` around lines 35 - 75, The Pod
spec for the downloader container is running as root and allows privilege
escalation; update the template.spec -> containers[name: downloader] to add a
securityContext that sets runAsUser (e.g., 1000), runAsGroup, fsGroup, and sets
allowPrivilegeEscalation: false and readOnlyRootFilesystem as appropriate, and
also add pod-level securityContext if needed to enforce fsGroup for PVC writes;
ensure the PVC supports UID 1000 write permissions and adjust the
persistentVolumeClaim mount permissions accordingly so the non-root user can
write to /models.

Comment on lines +15 to +22
_runtime_version.ValidateProtobufRuntimeVersion(
_runtime_version.Domain.PUBLIC,
6,
31,
1,
'',
'p2p.proto'
)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Check Python dependency constraints for protobuf
rg -n --glob 'requirements*.txt' --glob 'pyproject.toml' --glob 'setup.cfg' --glob 'setup.py' 'protobuf' .

Repository: ai-dynamo/modelexpress

Length of output: 136


Update protobuf dependency constraint in pyproject.toml to >=6.31.1.

The generated p2p_pb2.py module validates that protobuf runtime is version 6.31.1 or higher at import time. However, modelexpress_client/python/pyproject.toml currently specifies protobuf>=4.25.0, which allows installing protobuf versions that will cause imports to fail. Update the constraint to protobuf>=6.31.1 to match the required runtime version.

🤖 Prompt for AI Agents
In `@modelexpress_client/python/modelexpress/p2p_pb2.py` around lines 15 - 22, The
protobuf runtime check in modelexpress/p2p_pb2.py calls
_runtime_version.ValidateProtobufRuntimeVersion requiring version 6.31.1+, so
update the Python package dependency in pyproject.toml from protobuf>=4.25.0 (or
any lower bound) to protobuf>=6.31.1 to match this requirement; locate
pyproject.toml in the package and change the protobuf entry accordingly, then
run a quick install or CI to verify imports of p2p_pb2.py succeed.

Comment on lines 134 to 145
session_id = cls.get_session_id()
key = f"mx:ready:{model_name}:worker:{worker_id}"

ready_info = {
"session_id": session_id,
"timestamp": time.time(),
"metadata_hash": metadata_hash,
"ready": True,
}

try:
# Set with 2 hour TTL (matches warmup timeout)
client.setex(key, 7200, json.dumps(ready_info))
_log(f"[Worker {worker_id}] Published ready flag to Redis: session={session_id[:8]}...", "INFO")
return True
except Exception as e:
_log(f"[Worker {worker_id}] Failed to publish ready flag: {e}", "WARNING")
return False
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Redis key mismatch between publish_source_ready and wait_for_source_ready.

publish_source_ready uses key mx:ready:{model_name}:worker:{worker_id} (line 135), but wait_for_source_ready checks mx:nixl_ready:{model_name}:worker:{worker_id} (line 172). This will cause the target to never find the ready flag.

Proposed fix - use consistent key prefix
     `@classmethod`
     def publish_source_ready(
         cls,
         model_name: str,
         worker_id: int,
         metadata_hash: str,
     ) -> bool:
         ...
         session_id = cls.get_session_id()
-        key = f"mx:ready:{model_name}:worker:{worker_id}"
+        key = f"mx:nixl_ready:{model_name}:worker:{worker_id}"

Or alternatively, update wait_for_source_ready to use mx:ready: - whichever key format is intended.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
session_id = cls.get_session_id()
key = f"mx:ready:{model_name}:worker:{worker_id}"
ready_info = {
"session_id": session_id,
"timestamp": time.time(),
"metadata_hash": metadata_hash,
"ready": True,
}
try:
# Set with 2 hour TTL (matches warmup timeout)
client.setex(key, 7200, json.dumps(ready_info))
_log(f"[Worker {worker_id}] Published ready flag to Redis: session={session_id[:8]}...", "INFO")
return True
except Exception as e:
_log(f"[Worker {worker_id}] Failed to publish ready flag: {e}", "WARNING")
return False
session_id = cls.get_session_id()
key = f"mx:nixl_ready:{model_name}:worker:{worker_id}"
ready_info = {
"session_id": session_id,
"timestamp": time.time(),
"metadata_hash": metadata_hash,
"ready": True,
}
try:
# Set with 2 hour TTL (matches warmup timeout)
client.setex(key, 7200, json.dumps(ready_info))
_log(f"[Worker {worker_id}] Published ready flag to Redis: session={session_id[:8]}...", "INFO")
return True
except Exception as e:
_log(f"[Worker {worker_id}] Failed to publish ready flag: {e}", "WARNING")
return False
🧰 Tools
🪛 Ruff (0.14.14)

[warning] 148-148: Consider moving this statement to an else block

(TRY300)


[warning] 149-149: Do not catch blind exception: Exception

(BLE001)

🤖 Prompt for AI Agents
In `@modelexpress_client/python/modelexpress/vllm_loader.py` around lines 134 -
151, publish_source_ready writes the ready flag under the Redis key prefix
"mx:ready:{model_name}:worker:{worker_id}" but wait_for_source_ready looks for
"mx:nixl_ready:{model_name}:worker:{worker_id}", so the consumer never sees the
flag; update one of the functions (either publish_source_ready or
wait_for_source_ready) to use the same key prefix as the other (e.g., change
wait_for_source_ready to check "mx:ready:{model_name}:worker:{worker_id}" or
change publish_source_ready to use "mx:nixl_ready:...") so both functions
reference the identical key string; ensure the unique key construction (the
f-string using model_name and worker_id) and any tests or comments are updated
to reflect the chosen prefix.

Comment on lines 176 to 190

while time.time() - start_time < timeout_seconds:
try:
data = client.get(key)
if data:
ready_info = json.loads(data)
if ready_info.get("nixl_ready") and ready_info.get("stability_verified"):
session_id = ready_info.get("session_id")
metadata_hash = ready_info.get("metadata_hash")
_log(
f"[Worker {worker_id}] Source ready! session={session_id[:8] if session_id else 'N/A'}..., "
f"hash={metadata_hash[:8] if metadata_hash else 'N/A'}...",
"INFO"
)
return True, session_id, metadata_hash
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Waiting for different fields than what's published.

publish_source_ready sets ready: True (line 141), but wait_for_source_ready checks for nixl_ready and stability_verified (line 182). These fields don't exist in the published data.

Proposed fix - align published and expected fields
         ready_info = {
             "session_id": session_id,
             "timestamp": time.time(),
             "metadata_hash": metadata_hash,
-            "ready": True,
+            "nixl_ready": True,
+            "stability_verified": True,
         }

Or update the consumer to check ready instead:

-                    if ready_info.get("nixl_ready") and ready_info.get("stability_verified"):
+                    if ready_info.get("ready"):
🤖 Prompt for AI Agents
In `@modelexpress_client/python/modelexpress/vllm_loader.py` around lines 176 -
190, The wait_for_source_ready loop is checking for non-existent fields
("nixl_ready" and "stability_verified") while publish_source_ready sets "ready";
update wait_for_source_ready (the function using client.get loop) to parse the
stored JSON and check ready == True (e.g., ready_info.get("ready")), then
extract session_id and metadata_hash as before; ensure the log and return
behavior (session_id and metadata_hash extraction in wait_for_source_ready)
remain unchanged so the consumer matches the published payload from
publish_source_ready.

Comment on lines +666 to +605
total_time = _time.perf_counter() - load_start
_log("=" * 60, "INFO")
_log(f"[TIMING] MxTargetModelLoader.load_model() COMPLETE", "INFO")
_log(f"[TIMING] Total load time: {total_time:.2f}s", "INFO")
_log("=" * 60, "INFO")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Remove extraneous f-string prefix.

Line 668 has an f-string without placeholders.

Proposed fix
-        _log(f"[TIMING] MxTargetModelLoader.load_model() COMPLETE", "INFO")
+        _log("[TIMING] MxTargetModelLoader.load_model() COMPLETE", "INFO")
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
total_time = _time.perf_counter() - load_start
_log("=" * 60, "INFO")
_log(f"[TIMING] MxTargetModelLoader.load_model() COMPLETE", "INFO")
_log(f"[TIMING] Total load time: {total_time:.2f}s", "INFO")
_log("=" * 60, "INFO")
total_time = _time.perf_counter() - load_start
_log("=" * 60, "INFO")
_log("[TIMING] MxTargetModelLoader.load_model() COMPLETE", "INFO")
_log(f"[TIMING] Total load time: {total_time:.2f}s", "INFO")
_log("=" * 60, "INFO")
🧰 Tools
🪛 Ruff (0.14.14)

[error] 668-668: f-string without any placeholders

Remove extraneous f prefix

(F541)

🤖 Prompt for AI Agents
In `@modelexpress_client/python/modelexpress/vllm_loader.py` around lines 666 -
670, The log call in MxTargetModelLoader.load_model() uses an unnecessary
f-string for the static message f"[TIMING] MxTargetModelLoader.load_model()
COMPLETE"; remove the f prefix so the call becomes a regular string and keep the
surrounding _log calls using total_time and _log unchanged to avoid creating a
gratuitous formatted string.

Comment on lines +763 to +697
source_ready, cached_session_id, cached_metadata_hash = SourceReadyCoordinator.wait_for_source_ready(
model_name=model_name,
worker_id=device_id,
timeout_seconds=7200, # 2 hour timeout (matches source warmup)
poll_interval=10,
)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Prefix unused variable with underscore.

cached_metadata_hash is unpacked but never used. Prefix with underscore to indicate intentional discard.

Proposed fix
-        source_ready, cached_session_id, cached_metadata_hash = SourceReadyCoordinator.wait_for_source_ready(
+        source_ready, cached_session_id, _cached_metadata_hash = SourceReadyCoordinator.wait_for_source_ready(
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
source_ready, cached_session_id, cached_metadata_hash = SourceReadyCoordinator.wait_for_source_ready(
model_name=model_name,
worker_id=device_id,
timeout_seconds=7200, # 2 hour timeout (matches source warmup)
poll_interval=10,
)
source_ready, cached_session_id, _cached_metadata_hash = SourceReadyCoordinator.wait_for_source_ready(
model_name=model_name,
worker_id=device_id,
timeout_seconds=7200, # 2 hour timeout (matches source warmup)
poll_interval=10,
)
🧰 Tools
🪛 Ruff (0.14.14)

[warning] 763-763: Unpacked variable cached_metadata_hash is never used

Prefix it with an underscore or any other dummy variable pattern

(RUF059)

🤖 Prompt for AI Agents
In `@modelexpress_client/python/modelexpress/vllm_loader.py` around lines 763 -
768, Unused unpacked variable: cached_metadata_hash from
SourceReadyCoordinator.wait_for_source_ready is never used; prefix it with an
underscore to mark intentional discard. Locate the call to
SourceReadyCoordinator.wait_for_source_ready (the variables source_ready,
cached_session_id, cached_metadata_hash) and rename cached_metadata_hash to
_cached_metadata_hash (or just _ if preferred) so linters recognize it as
intentionally unused; ensure no other code relies on the original name.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 9

🤖 Fix all issues with AI agents
In `@examples/p2p_transfer_k8s/Dockerfile.client`:
- Line 7: Pin the Python package versions in the Dockerfile.client RUN line so
the generated code has compatible runtime packages: change the pip install
invocation that currently installs grpcio, grpcio-tools, protobuf, and redis to
specify at least grpcio>=1.76.0 and protobuf>=6.31.1 (and pin grpcio-tools to
match grpcio, e.g., grpcio-tools>=1.76.0); update the RUN line installing
grpcio, grpcio-tools, protobuf, and redis to use these version specifiers.

In `@examples/p2p_transfer_k8s/README.md`:
- Around line 135-136: The README's Redis connectivity check incorrectly targets
a host named modelexpress-redis; instead exec into the modelexpress-server pod
and run redis-cli against localhost (or omit -h) because Redis is running as a
sidecar inside the same pod. Update the example command that uses kubectl exec
-it deployment/modelexpress-server -- redis-cli -h modelexpress-redis ping to
invoke redis-cli against localhost (or simply redis-cli ping) so the check
connects to the sidecar Redis running in the modelexpress-server pod.
- Around line 73-74: Update the kubectl log command in README.md to use the
actual container name defined in the manifests: replace the referenced container
flag "-c client" with "-c vllm" so it matches the container named "vllm" in
vllm-source.yaml and vllm-target.yaml; ensure the example line reads the same
command semantics (kubectl logs deployment/mx-source -c vllm -f).

In `@modelexpress_client/python/modelexpress/client.py`:
- Around line 517-523: The loop over self._zmq_sockets currently uses an unused
variable named rank and silently swallows all exceptions; change the loop
variable to a conventional unused name like "_" (for example: for _, socket in
self._zmq_sockets.items()) and replace the bare except: pass with a proper
try/except/finally: catch Exception as e and log the error (e.g. via
self.logger.error or self.logger.exception) including the exception details, and
ensure socket.close() is executed in a finally block so sockets are always
closed even on error; update references to socket.send_pyobj and
socket.recv_string accordingly inside the try block.

In `@modelexpress_client/python/modelexpress/nixl_transfer.py`:
- Line 470: The loop using zip(remote_descs, local_tensors) silently truncates
when lengths differ; change the call to zip(remote_descs, local_tensors,
strict=True) in the function where that loop appears (the iterator over
remote_descs and local_tensors) so a ValueError is raised on length mismatch,
and update any surrounding error handling to surface that exception if needed.

In `@modelexpress_client/python/modelexpress/vllm_extension.py`:
- Line 159: The assignment computing device_id using next(iter(self._tensors))
can raise StopIteration if _tensors is empty; modify the code in the method that
calls _build_weight_infos() (where device_id is set) to first check if
self._tensors is truthy (or use next(iter(self._tensors), None)) and default
device_id to 0 when there are no tensors; update the device_id calculation line
that currently references self._tensors[next(iter(self._tensors))].device.index
or 0 to safely handle an empty _tensors dict and avoid a KeyError/StopIteration.
- Around line 179-183: The _get_tensor_descriptors method currently returns the
original per-tensor list (self._nixl_manager.tensor_descriptors) which is wrong
when contiguous registration is used; change it to return the actual registered
descriptors by calling self._nixl_manager.get_registered_descriptors() when a
_nixl_manager exists (mirror the approach used in vllm_loader.py and ensure
behavior matches MX_CONTIGUOUS_REG=1); update references inside the
_get_tensor_descriptors function to use get_registered_descriptors() instead of
tensor_descriptors.

In `@modelexpress_client/python/modelexpress/vllm_loader.py`:
- Around line 979-981: The except block that currently raises RuntimeError with
f"Transfer failed after {transfer_retries} attempts: {transfer_err}" should
preserve the original exception chain; change the re-raise to use exception
chaining by adding "from transfer_err" to the raise (i.e., raise
RuntimeError(...) from transfer_err) within the same context where transfer_err
and transfer_retries are available so the original traceback is retained for
debugging.

In `@modelexpress_client/python/pyproject.toml`:
- Around line 29-37: Update pyproject.toml dependency constraints to match the
runtime checks in the generated protobuf/grpc code: change "protobuf>=4.25.0" to
"protobuf>=6.31.1" and "grpcio>=1.60.0" to "grpcio>=1.76.0" so imports in
p2p_pb2.py and p2p_pb2_grpc.py won't fail; also update any corresponding
dev/test dependency entries that pin protobuf or grpcio to older ranges to the
same minimums to keep builds consistent.
🧹 Nitpick comments (15)
examples/p2p_transfer_k8s/model-download.yaml (1)

56-56: Consider pinning huggingface_hub version.

Using pip install huggingface_hub without a version pin may lead to non-reproducible builds if the library introduces breaking changes.

Suggested fix
-              pip install huggingface_hub
+              pip install huggingface_hub==0.27.0
modelexpress_client/python/tests/test_types.py (2)

25-34: Move pytest import to module level.

The pytest import inside the test method works but is not idiomatic Python. Moving it to the top with other imports improves readability and follows standard conventions.

Suggested refactor
 # SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
 # SPDX-License-Identifier: Apache-2.0

 """Tests for ModelExpress type definitions."""

+import pytest
+
 from modelexpress.types import TensorDescriptor, WorkerMetadata, GetMetadataResponse

Then remove the inline import:

     def test_dtype_required(self):
         """Test that dtype is a required field."""
-        import pytest
         with pytest.raises(TypeError):

93-100: Missing trailing newline.

The file should end with a newline character for POSIX compliance.

Suggested fix
     def test_not_found_response(self):
         """Test response when source is not found."""
         response = GetMetadataResponse(
             found=False,
             workers=[],
         )
         assert response.found is False
         assert len(response.workers) == 0
+
examples/p2p_transfer_k8s/modelexpress-server.yaml (1)

72-75: Consider adding resource limits for the main container.

The modelexpress-server container specifies requests but no limits for CPU and memory. This could lead to unbounded resource consumption on the node.

Suggested addition
           resources:
             requests:
               cpu: 4
               memory: 8Gi
+            limits:
+              cpu: 8
+              memory: 16Gi
docs/CONTEXT.md (2)

17-17: Add language identifier to fenced code blocks.

Multiple fenced code blocks (lines 17, 66, 124) lack language identifiers. Use text for ASCII diagrams to satisfy markdownlint MD040.


654-657: Use proper Markdown link syntax for URL.

The NIXL repository URL should use Markdown link syntax rather than a bare URL.

Suggested fix
-- **NIXL**: NVIDIA's low-level transfer library (https://github.com/ai-dynamo/nixl)
+- **NIXL**: NVIDIA's low-level transfer library ([GitHub](https://github.com/ai-dynamo/nixl))
modelexpress_client/python/pyproject.toml (1)

31-31: Consider making CUDA version configurable for nixl dependency.

The nixl[cu12] extra hardcodes CUDA 12 support. Users with CUDA 11 environments will face compatibility issues. Consider documenting this requirement or providing optional extras for different CUDA versions if nixl supports them.

modelexpress_server/src/p2p_service.rs (1)

111-117: Error case returns indistinguishable response from "not found" case.

When get_metadata fails due to an error (e.g., Redis connection failure), the response is identical to "model not found" (found: false, workers: []). The caller cannot distinguish between "model doesn't exist" and "service error occurred."

Consider either returning a gRPC Status error or adding an error field to GetMetadataResponse to surface failures appropriately.

examples/p2p_transfer_k8s/Dockerfile.client (1)

32-34: Hardcoded Python version path is fragile.

The paths /usr/local/lib/python3.12/dist-packages/ assume the base image uses Python 3.12. If the vLLM base image upgrades Python, these paths will silently fail.

Consider using python -c "import site; print(site.getsitepackages()[0])" or similar to dynamically resolve the path:

Proposed fix for dynamic path resolution
-RUN pip uninstall -y modelexpress 2>/dev/null || true && \
-    rm -rf /usr/local/lib/python3.12/dist-packages/modelexpress* && \
+RUN SITE_PACKAGES=$(python3 -c "import site; print(site.getsitepackages()[0])") && \
+    pip uninstall -y modelexpress 2>/dev/null || true && \
+    rm -rf "$SITE_PACKAGES"/modelexpress* && \
     pip install .
 
 # Patch vLLM's model loader __init__.py to include ModelExpress loaders
-RUN LOADER_INIT="/usr/local/lib/python3.12/dist-packages/vllm/model_executor/model_loader/__init__.py" && \
+RUN SITE_PACKAGES=$(python3 -c "import site; print(site.getsitepackages()[0])") && \
+    LOADER_INIT="$SITE_PACKAGES/vllm/model_executor/model_loader/__init__.py" && \

Also applies to: 38-48

examples/p2p_transfer_k8s/vllm-source.yaml (2)

43-46: Consider hardening container security context.

The static analysis flags missing allowPrivilegeEscalation: false and runAsNonRoot: true. While IPC_LOCK is required for GPUDirect RDMA, you can still restrict privilege escalation.

Proposed enhancement
          securityContext:
            capabilities:
              add:
                - IPC_LOCK
+           allowPrivilegeEscalation: false

105-105: Extract the inline startup script to a ConfigMap for maintainability.

The command embeds a complex shell script with inline Python that publishes Redis ready flags. This is difficult to read, test, and maintain. The inline Python also hardcodes the key pattern mx:nixl_ready:{model}:worker:{w} which must stay aligned with SourceReadyCoordinator.wait_for_source_ready() in vllm_loader.py (line 172).

Consider extracting this to a ConfigMap-mounted script. This would also make the key pattern easier to verify and update consistently.

modelexpress_client/python/modelexpress/vllm_extension.py (1)

169-171: Use logger.exception to capture the stack trace.

When logging errors from caught exceptions, logger.exception automatically includes the traceback, which aids debugging.

Proposed fix
         except Exception as e:
-            logger.error(f"Worker {self._worker_rank}: Failed to initialize NIXL: {e}")
+            logger.exception(f"Worker {self._worker_rank}: Failed to initialize NIXL: {e}")
             self._nixl_manager = None
modelexpress_client/python/modelexpress/nixl_transfer.py (1)

399-411: Consider adding exponential backoff to the polling loop.

The transfer completion polling uses a fixed 1ms sleep. For long-running transfers, this creates unnecessary CPU wake-ups. Consider starting with a short interval and increasing it for longer waits.

modelexpress_server/src/state.rs (1)

70-76: Potential precision loss in visit_f64 for large GPU addresses.

Similar to the Python side, casting f64 to u64 can lose precision for values > 2^53. GPU addresses like 139948187451390 are within safe range, but addresses from high-memory systems could exceed this. Consider validating or warning.

Proposed enhancement
         fn visit_f64<E>(self, value: f64) -> Result<Self::Value, E>
         where
             E: de::Error,
         {
             // Handle floats from cjson (the problematic case)
+            // Note: f64 can only represent integers exactly up to 2^53
+            if value > (1u64 << 53) as f64 {
+                return Err(E::custom("f64 value exceeds safe integer precision"));
+            }
             Ok(value as u64)
         }
modelexpress_client/python/modelexpress/vllm_loader.py (1)

396-412: Consider extracting duplicate _get_worker_rank method.

Both MxSourceModelLoader and MxTargetModelLoader have identical _get_worker_rank implementations. Consider extracting to a shared utility function or mixin.

Proposed refactor
def _get_worker_rank_impl(device: torch.device) -> int:
    """Get the TP rank of this worker."""
    try:
        from vllm.distributed import get_tensor_model_parallel_rank
        rank = get_tensor_model_parallel_rank()
        _log(f"Got TP rank from vllm.distributed: {rank}", "DEBUG")
        return rank
    except (ImportError, RuntimeError) as e:
        _log(f"Could not get TP rank from vllm.distributed: {e}", "DEBUG")
    
    if hasattr(device, "index") and device.index is not None:
        _log(f"Using device.index as rank: {device.index}", "DEBUG")
        return device.index
    
    _log("Defaulting to rank 0", "DEBUG")
    return 0

Then use self._get_worker_rank = lambda d: _get_worker_rank_impl(d) or call directly.

Also applies to: 1003-1019

FROM vllm/vllm-openai:v0.12.0 AS builder

# Install Python dependencies for gRPC and Redis coordination
RUN pip install grpcio grpcio-tools protobuf redis
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Pin dependency versions to match generated code requirements.

Installing grpcio, grpcio-tools, and protobuf without version constraints may install versions incompatible with the generated code, which requires grpcio>=1.76.0 and protobuf>=6.31.1.

Proposed fix
-RUN pip install grpcio grpcio-tools protobuf redis
+RUN pip install "grpcio>=1.76.0" "grpcio-tools>=1.76.0" "protobuf>=6.31.1" redis
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
RUN pip install grpcio grpcio-tools protobuf redis
RUN pip install "grpcio>=1.76.0" "grpcio-tools>=1.76.0" "protobuf>=6.31.1" redis
🤖 Prompt for AI Agents
In `@examples/p2p_transfer_k8s/Dockerfile.client` at line 7, Pin the Python
package versions in the Dockerfile.client RUN line so the generated code has
compatible runtime packages: change the pip install invocation that currently
installs grpcio, grpcio-tools, protobuf, and redis to specify at least
grpcio>=1.76.0 and protobuf>=6.31.1 (and pin grpcio-tools to match grpcio, e.g.,
grpcio-tools>=1.76.0); update the RUN line installing grpcio, grpcio-tools,
protobuf, and redis to use these version specifiers.

Comment on lines +73 to +66
kubectl logs deployment/mx-source -c client -f
```
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Check container names in the vLLM manifests
echo "=== Containers in vllm-source.yaml ==="
rg "^\s+- name:" examples/p2p_transfer_k8s/vllm-source.yaml -A0

echo ""
echo "=== Containers in vllm-target.yaml ==="
rg "^\s+- name:" examples/p2p_transfer_k8s/vllm-target.yaml -A0

Repository: ai-dynamo/modelexpress

Length of output: 1782


🏁 Script executed:

sed -n '73,74p' examples/p2p_transfer_k8s/README.md

Repository: ai-dynamo/modelexpress

Length of output: 118


Change container reference to match actual manifest.

The kubectl command references -c client, but both vllm-source.yaml and vllm-target.yaml only define a vllm container. Update the command to use -c vllm:

Diff
-kubectl logs deployment/mx-source -c client -f
+kubectl logs deployment/mx-source -c vllm -f
🤖 Prompt for AI Agents
In `@examples/p2p_transfer_k8s/README.md` around lines 73 - 74, Update the kubectl
log command in README.md to use the actual container name defined in the
manifests: replace the referenced container flag "-c client" with "-c vllm" so
it matches the container named "vllm" in vllm-source.yaml and vllm-target.yaml;
ensure the example line reads the same command semantics (kubectl logs
deployment/mx-source -c vllm -f).

Comment on lines +135 to +128
kubectl exec -it deployment/modelexpress-server -- redis-cli -h modelexpress-redis ping
```
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Fix Redis connectivity check command.

The command references modelexpress-redis host, but based on the server manifest, Redis runs as a sidecar within the modelexpress-server pod and is accessible at localhost from within that pod.

Suggested fix
-kubectl exec -it deployment/modelexpress-server -- redis-cli -h modelexpress-redis ping
+kubectl exec -it deployment/modelexpress-server -c redis -- redis-cli ping
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
kubectl exec -it deployment/modelexpress-server -- redis-cli -h modelexpress-redis ping
```
kubectl exec -it deployment/modelexpress-server -c redis -- redis-cli ping
🤖 Prompt for AI Agents
In `@examples/p2p_transfer_k8s/README.md` around lines 135 - 136, The README's
Redis connectivity check incorrectly targets a host named modelexpress-redis;
instead exec into the modelexpress-server pod and run redis-cli against
localhost (or omit -h) because Redis is running as a sidecar inside the same
pod. Update the example command that uses kubectl exec -it
deployment/modelexpress-server -- redis-cli -h modelexpress-redis ping to invoke
redis-cli against localhost (or simply redis-cli ping) so the check connects to
the sidecar Redis running in the modelexpress-server pod.

Comment on lines +517 to +664
for rank, socket in self._zmq_sockets.items():
try:
socket.send_pyobj({"cmd": "done"})
socket.recv_string()
except Exception:
pass
socket.close()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Rename unused loop variable and avoid silent exception swallowing.

The loop variable rank is unused, and exceptions are silently swallowed with pass.

Proposed fix
-        for rank, socket in self._zmq_sockets.items():
+        for _rank, socket in self._zmq_sockets.items():
             try:
                 socket.send_pyobj({"cmd": "done"})
                 socket.recv_string()
-            except Exception:
-                pass
+            except Exception as e:
+                logger.debug(f"Error during socket cleanup: {e}")
             socket.close()
🧰 Tools
🪛 Ruff (0.14.14)

[warning] 517-517: Loop control variable rank not used within loop body

Rename unused rank to _rank

(B007)


[error] 521-522: try-except-pass detected, consider logging the exception

(S110)


[warning] 521-521: Do not catch blind exception: Exception

(BLE001)

🤖 Prompt for AI Agents
In `@modelexpress_client/python/modelexpress/client.py` around lines 517 - 523,
The loop over self._zmq_sockets currently uses an unused variable named rank and
silently swallows all exceptions; change the loop variable to a conventional
unused name like "_" (for example: for _, socket in self._zmq_sockets.items())
and replace the bare except: pass with a proper try/except/finally: catch
Exception as e and log the error (e.g. via self.logger.error or
self.logger.exception) including the exception details, and ensure
socket.close() is executed in a finally block so sockets are always closed even
on error; update references to socket.send_pyobj and socket.recv_string
accordingly inside the try block.

# Build indexed list with local tensor info
# (remote_desc, local_addr, local_size)
indexed = []
for remote, local in zip(remote_descs, local_tensors):
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Add strict=True to zip() to catch length mismatches.

If remote_descs and local_tensors have different lengths, zip() silently truncates. This could mask bugs where tensor matching went wrong.

Proposed fix
-        for remote, local in zip(remote_descs, local_tensors):
+        for remote, local in zip(remote_descs, local_tensors, strict=True):
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
for remote, local in zip(remote_descs, local_tensors):
for remote, local in zip(remote_descs, local_tensors, strict=True):
🧰 Tools
🪛 Ruff (0.14.14)

[warning] 470-470: zip() without an explicit strict= parameter

Add explicit value for parameter strict=

(B905)

🤖 Prompt for AI Agents
In `@modelexpress_client/python/modelexpress/nixl_transfer.py` at line 470, The
loop using zip(remote_descs, local_tensors) silently truncates when lengths
differ; change the call to zip(remote_descs, local_tensors, strict=True) in the
function where that loop appears (the iterator over remote_descs and
local_tensors) so a ValueError is raised on length mismatch, and update any
surrounding error handling to surface that exception if needed.

self._nixl_manager = None
return

device_id = self._tensors[next(iter(self._tensors))].device.index or 0
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Potential KeyError if _tensors is empty.

If _build_weight_infos() produces an empty _tensors dict (e.g., model has no CUDA parameters), next(iter(self._tensors)) will raise StopIteration.

Proposed fix
-        device_id = self._tensors[next(iter(self._tensors))].device.index or 0
+        if not self._tensors:
+            logger.warning("No CUDA tensors found, skipping NIXL initialization")
+            self._nixl_manager = None
+            return
+        device_id = next(iter(self._tensors.values())).device.index or 0
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
device_id = self._tensors[next(iter(self._tensors))].device.index or 0
if not self._tensors:
logger.warning("No CUDA tensors found, skipping NIXL initialization")
self._nixl_manager = None
return
device_id = next(iter(self._tensors.values())).device.index or 0
🤖 Prompt for AI Agents
In `@modelexpress_client/python/modelexpress/vllm_extension.py` at line 159, The
assignment computing device_id using next(iter(self._tensors)) can raise
StopIteration if _tensors is empty; modify the code in the method that calls
_build_weight_infos() (where device_id is set) to first check if self._tensors
is truthy (or use next(iter(self._tensors), None)) and default device_id to 0
when there are no tensors; update the device_id calculation line that currently
references self._tensors[next(iter(self._tensors))].device.index or 0 to safely
handle an empty _tensors dict and avoid a KeyError/StopIteration.

Comment on lines +179 to +183
def _get_tensor_descriptors(self) -> list[TensorDescriptor]:
"""Get tensor descriptors for this worker."""
if self._nixl_manager is None:
return []
return self._nixl_manager.tensor_descriptors
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Check if get_registered_descriptors is used elsewhere for consistency
rg -n "get_registered_descriptors|tensor_descriptors" modelexpress_client/python/modelexpress/

Repository: ai-dynamo/modelexpress

Length of output: 2084


🏁 Script executed:

#!/bin/bash
# Check the implementation of get_registered_descriptors
sed -n '163,190p' modelexpress_client/python/modelexpress/nixl_transfer.py

Repository: ai-dynamo/modelexpress

Length of output: 1176


🏁 Script executed:

#!/bin/bash
# Check how tensor_descriptors are used in vllm_extension.py
sed -n '215,235p' modelexpress_client/python/modelexpress/vllm_extension.py

Repository: ai-dynamo/modelexpress

Length of output: 744


🏁 Script executed:

#!/bin/bash
# Check how get_registered_descriptors is used in vllm_loader.py
sed -n '505,520p' modelexpress_client/python/modelexpress/vllm_loader.py

Repository: ai-dynamo/modelexpress

Length of output: 995


Return registered descriptors to align with what was actually registered to NIXL.

_get_tensor_descriptors() currently returns self._nixl_manager.tensor_descriptors (original per-tensor descriptors), but when MX_CONTIGUOUS_REG=1, the manager registers contiguous regions instead. The get_registered_descriptors() method exists precisely for this purpose—its docstring states: "This is important for publishing to the server - we must publish what was actually registered, not the original tensors."

The vllm_loader.py correctly uses get_registered_descriptors() when MX_CONTIGUOUS_REG=1 (see line 510), so _get_tensor_descriptors() should do the same to ensure consistency across the extension.

🤖 Prompt for AI Agents
In `@modelexpress_client/python/modelexpress/vllm_extension.py` around lines 179 -
183, The _get_tensor_descriptors method currently returns the original
per-tensor list (self._nixl_manager.tensor_descriptors) which is wrong when
contiguous registration is used; change it to return the actual registered
descriptors by calling self._nixl_manager.get_registered_descriptors() when a
_nixl_manager exists (mirror the approach used in vllm_loader.py and ensure
behavior matches MX_CONTIGUOUS_REG=1); update references inside the
_get_tensor_descriptors function to use get_registered_descriptors() instead of
tensor_descriptors.

Comment on lines +979 to +897
raise RuntimeError(
f"Transfer failed after {transfer_retries} attempts: {transfer_err}"
)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Chain the exception for better debugging.

When re-raising exceptions within an except block, use raise ... from err to preserve the exception chain.

Proposed fix
                     else:
                         _log(f"[Worker {device_id}] Transfer failed after {transfer_retries} attempts: {transfer_err}", "ERROR")
-                        raise RuntimeError(
+                        raise RuntimeError(
                             f"Transfer failed after {transfer_retries} attempts: {transfer_err}"
-                        )
+                        ) from transfer_err
🧰 Tools
🪛 Ruff (0.14.14)

[warning] 979-981: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling

(B904)


[warning] 979-981: Avoid specifying long messages outside the exception class

(TRY003)

🤖 Prompt for AI Agents
In `@modelexpress_client/python/modelexpress/vllm_loader.py` around lines 979 -
981, The except block that currently raises RuntimeError with f"Transfer failed
after {transfer_retries} attempts: {transfer_err}" should preserve the original
exception chain; change the re-raise to use exception chaining by adding "from
transfer_err" to the raise (i.e., raise RuntimeError(...) from transfer_err)
within the same context where transfer_err and transfer_retries are available so
the original traceback is retained for debugging.

Comment on lines 29 to 36
dependencies = [
"grpcio>=1.60.0",
"nixl[cu12]",
"numpy>=1.24.0",
"protobuf>=4.25.0",
"pydantic>=2.0.0",
"pyzmq>=25.0.0",
"torch>=2.6.0",
]
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Dependency version constraints are inconsistent with generated code requirements.

The generated protobuf and gRPC code in this PR enforce stricter version requirements at runtime:

  • p2p_pb2.py validates protobuf>=6.31.1 at import time, but the constraint here is >=4.25.0
  • p2p_pb2_grpc.py validates grpcio>=1.76.0 at import time, but the constraint here is >=1.60.0

This mismatch allows pip to install versions that satisfy these constraints but fail at runtime with import errors.

Proposed fix to align dependency versions
 dependencies = [
-    "grpcio>=1.60.0",
+    "grpcio>=1.76.0",
     "nixl[cu12]",
     "numpy>=1.24.0",
-    "protobuf>=4.25.0",
+    "protobuf>=6.31.1",
     "pydantic>=2.0.0",
     "pyzmq>=25.0.0",
     "torch>=2.6.0",
 ]

Also update the dev dependency:

 [project.optional-dependencies]
 dev = [
-    "grpcio-tools>=1.60.0",
+    "grpcio-tools>=1.76.0",
     "pytest>=7.0.0",
     "pytest-asyncio>=0.21.0",
 ]
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
dependencies = [
"grpcio>=1.60.0",
"nixl[cu12]",
"numpy>=1.24.0",
"protobuf>=4.25.0",
"pydantic>=2.0.0",
"pyzmq>=25.0.0",
"torch>=2.6.0",
]
dependencies = [
"grpcio>=1.76.0",
"nixl[cu12]",
"numpy>=1.24.0",
"protobuf>=6.31.1",
"pydantic>=2.0.0",
"pyzmq>=25.0.0",
"torch>=2.6.0",
]
🤖 Prompt for AI Agents
In `@modelexpress_client/python/pyproject.toml` around lines 29 - 37, Update
pyproject.toml dependency constraints to match the runtime checks in the
generated protobuf/grpc code: change "protobuf>=4.25.0" to "protobuf>=6.31.1"
and "grpcio>=1.60.0" to "grpcio>=1.76.0" so imports in p2p_pb2.py and
p2p_pb2_grpc.py won't fail; also update any corresponding dev/test dependency
entries that pin protobuf or grpcio to older ranges to the same minimums to keep
builds consistent.

Copy link
Contributor

@AndyDai-nv AndyDai-nv left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Such a huge PR, great work! Left some questions and comments.

Signed-off-by: Kavin Krishnan <kavinkrishnan@gmail.com>
Co-authored-by: Cursor <cursoragent@cursor.com>
Signed-off-by: Kavin Krishnan <kavinkrishnan@gmail.com>
Co-authored-by: Cursor <cursoragent@cursor.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants