Skip to content

Commit 49f6bad

Browse files
refactor: Address remaining PR review feedback from nv-hwoo
- Move CLAUDE.md to project root (#14) - Fix transfer time claims and sanitize hardcoded paths (#15, #17) - Update env var references to MODEL_EXPRESS_URL (#17) - Add Lua script explanation for atomic worker merge (#7) - Mark unused Rust functions with #[allow(dead_code)] (#6) - Move MAX_MESSAGE_SIZE to module-level constant (#13) - Remove pyzmq from dependencies (provided by vLLM) (#18) - Update K8s README architecture diagram (#23) - Add __pycache__ and local docs to .gitignore Co-authored-by: Cursor <cursoragent@cursor.com>
1 parent 5768661 commit 49f6bad

File tree

6 files changed

+49
-43
lines changed

6 files changed

+49
-43
lines changed

.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,3 +49,5 @@ logs/
4949

5050
# Models database
5151
models.db
52+
**/__pycache__/
53+
docs/FEEDBACK.md

docs/CLAUDE.md renamed to CLAUDE.md

Lines changed: 13 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,8 @@ This file provides context for AI assistants (Claude, Cursor, Copilot) working o
1616

1717
| Model | Status | Transfer Time | Notes |
1818
|-------|--------|---------------|-------|
19-
| DeepSeek-V3 (671B, FP8) | Working | 40-80s | 681GB across 8 GPUs |
20-
| Llama 3.3 70B | Working | ~5s | 140GB across 4-8 GPUs |
19+
| DeepSeek-V3 (671B, FP8) | Working | ~40s | 681GB across 8 GPUs @ ~112 Gbps |
20+
| Llama 3.3 70B | Working | ~5s | 140GB across 8 GPUs @ ~112 Gbps |
2121

2222
---
2323

@@ -125,7 +125,7 @@ UCX_LOG_LEVEL: "WARN" # DEBUG for troubleshooting
125125
126126
```
127127
modelexpress/
128-
├── CLAUDE.md # THIS FILE - AI assistant context
128+
├── CLAUDE.md # THIS FILE (project root) - AI assistant context
129129
├── modelexpress_server/ # Rust gRPC server
130130
│ └── src/
131131
│ ├── main.rs
@@ -162,7 +162,7 @@ Contains custom vLLM model loaders:
162162

163163
- **`MxSourceModelLoader`**: Loads weights from disk, registers with NIXL, publishes metadata
164164
- **`MxTargetModelLoader`**: Creates dummy weights, receives via RDMA, applies FP8 processing
165-
- **`SourceReadyCoordinator`**: Redis-based coordination for source-target synchronization
165+
- **`SourceReadyCoordinator`**: gRPC-based coordination for source-target synchronization (via MxClient)
166166

167167
```python
168168
class MxSourceModelLoader(DefaultModelLoader):
@@ -214,20 +214,20 @@ Rust gRPC service implementation:
214214
### Building Docker Image
215215

216216
```bash
217-
cd /home/kavink/work/gitlab/modelexpress
217+
cd path/to/modelexpress
218218

219219
# Build client image
220220
docker build -f examples/p2p_transfer_k8s/Dockerfile.client \
221-
-t nvcr.io/nvidian/dynamo-dev/modelexpress-p2p-client:YOUR_TAG .
221+
-t nvcr.io/nvidian/dynamo-dev/IMAGE_NAME:YOUR_TAG .
222222

223-
docker push nvcr.io/nvidian/dynamo-dev/modelexpress-p2p-client:YOUR_TAG
223+
docker push nvcr.io/nvidian/dynamo-dev/IMAGE_NAME:YOUR_TAG
224224
```
225225

226226
### Deploying to Kubernetes
227227

228228
```bash
229229
# Namespace
230-
NAMESPACE=kavin
230+
NAMESPACE=<your-namespace>
231231

232232
# 1. Flush Redis (clear stale metadata)
233233
microk8s kubectl -n $NAMESPACE exec deploy/modelexpress-server -c redis -- redis-cli FLUSHALL
@@ -247,14 +247,14 @@ watch microk8s kubectl -n $NAMESPACE get pods -l 'app in (mx-source, mx-target)'
247247

248248
```bash
249249
# Stream logs
250-
microk8s kubectl -n kavin logs -f deploy/mx-source
251-
microk8s kubectl -n kavin logs -f deploy/mx-target
250+
kubectl -n $NAMESPACE logs -f deploy/mx-source
251+
kubectl -n $NAMESPACE logs -f deploy/mx-target
252252

253253
# Check Redis state
254-
microk8s kubectl -n kavin exec deploy/modelexpress-server -c redis -- redis-cli KEYS '*'
254+
kubectl -n $NAMESPACE exec deploy/modelexpress-server -c redis -- redis-cli KEYS '*'
255255

256256
# Test inference
257-
microk8s kubectl -n kavin exec deploy/mx-target -- curl -s http://localhost:8000/v1/completions \
257+
kubectl -n $NAMESPACE exec deploy/mx-target -- curl -s http://localhost:8000/v1/completions \
258258
-H "Content-Type: application/json" \
259259
-d '{"model": "deepseek-ai/DeepSeek-V3", "prompt": "Hello", "max_tokens": 10}'
260260
```
@@ -268,8 +268,7 @@ microk8s kubectl -n kavin exec deploy/mx-target -- curl -s http://localhost:8000
268268
| Variable | Default | Description |
269269
|----------|---------|-------------|
270270
| `MX_REGISTER_LOADERS` | `1` | Auto-register mx-source/mx-target loaders with vLLM |
271-
| `MX_SERVER_ADDRESS` | `modelexpress-server:8001` | gRPC server address |
272-
| `MX_REDIS_HOST` | `modelexpress-server` | Redis host for coordination |
271+
| `MODEL_EXPRESS_URL` | `localhost:8001` | gRPC server address (also reads `MX_SERVER_ADDRESS` for compat) |
273272
| `MX_CONTIGUOUS_REG` | `0` | Enable contiguous region registration (experimental) |
274273
| `MX_EXPECTED_WORKERS` | `8` | Number of GPU workers to wait for |
275274
| `MX_SYNC_PUBLISH` | `1` | Source: wait for all workers before publishing |

examples/p2p_transfer_k8s/README.md

Lines changed: 14 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -5,38 +5,30 @@ This example demonstrates how to set up ModelExpress for P2P GPU weight transfer
55
## Architecture
66

77
```
8-
Node A (Source - first to start) Node B (Target - starts later)
8+
Node A (Source) Node B (Target)
99
+----------------------------------+ +----------------------------------+
10-
| vLLM Container | | vLLM Container |
11-
| - Loads real model weights | | - Starts with dummy weights |
12-
| - Exposes weights via ZMQ | | - Exposes buffers via ZMQ |
13-
| - MX_ZMQ_ADDRESS=ipc:///tmp/mx/ | | - MX_ZMQ_ADDRESS=ipc:///tmp/mx/ |
14-
+----------------------------------+ +----------------------------------+
15-
| ZMQ (IPC sockets) | ZMQ (IPC sockets)
16-
v v
17-
+----------------------------------+ +----------------------------------+
18-
| Client Container | | Client Container |
19-
| - Creates NIXL agents (1 per GPU)| | - Creates NIXL agents (1 per GPU)|
20-
| - Queries server: no source found| RDMA | - Queries server: finds source A |
21-
| - Becomes source, publishes meta |<========>| - Receives weights via NIXL |
22-
+----------------------------------+ NIXL | - Also publishes metadata |
10+
| vLLM + MxSourceModelLoader | | vLLM + MxTargetModelLoader |
11+
| - Loads weights from disk | | - Starts with dummy weights |
12+
| - Registers tensors with NIXL | | - Waits for source ready flag |
13+
| - Publishes metadata via MxClient| RDMA | - Receives weights via NIXL |
14+
| - Publishes ready flag |=========>| - Runs FP8 processing |
15+
+----------------------------------+ NIXL | - Serves inference |
2316
| +----------------------------------+
2417
| |
2518
v v
2619
+--------------------------------------------------------------------+
27-
| ModelExpress Server (CPU) |
28-
| - Stores model metadata (NIXL metadata + tensor descriptors) |
29-
| - Keyed by model name |
30-
| - Redis backend for persistence |
20+
| ModelExpress Server (gRPC + Redis) |
21+
| - PublishMetadata / GetMetadata: tensor metadata coordination |
22+
| - PublishReady / GetReady: source readiness coordination |
3123
+--------------------------------------------------------------------+
3224
```
3325

3426
### Key Design Points
3527

36-
1. **Client Container**: NIXL transfer logic runs in a separate client container, not in vLLM
37-
2. **Symmetric Clients**: Both source and target run identical client code; role is determined dynamically
38-
3. **ZMQ Communication**: vLLM exposes weights via ZMQ IPC sockets (one per TP rank)
39-
4. **Tensor Parallelism**: Full TP > 1 support with rank-matched transfers
28+
1. **Custom vLLM Loaders**: NIXL transfer logic runs inside vLLM via `--load-format mx-source` / `--load-format mx-target`
29+
2. **MxClient**: All gRPC communication goes through `MxClient` (workers never access Redis directly)
30+
3. **FP8 Support**: Raw tensors (including `weight_scale_inv`) transfer BEFORE FP8 processing
31+
4. **Tensor Parallelism**: Full TP support with rank-matched transfers (one NIXL agent per GPU)
4032

4133
## Prerequisites
4234

modelexpress_client/python/pyproject.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@ dependencies = [
3232
"numpy>=1.24.0",
3333
"protobuf>=4.25.0",
3434
"pydantic>=2.0.0",
35-
"pyzmq>=25.0.0",
3635
"torch>=2.6.0",
3736
]
3837

modelexpress_server/src/main.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,10 @@ use tonic::transport::Server;
1919
use tracing::{error, info, warn};
2020
use tracing_subscriber::{EnvFilter, FmtSubscriber};
2121

22+
/// Maximum gRPC message size (100MB) for large models like DeepSeek-V3.
23+
/// Each worker can have thousands of tensor descriptors with NIXL metadata.
24+
const MAX_MESSAGE_SIZE: usize = 100 * 1024 * 1024;
25+
2226
#[tokio::main]
2327
async fn main() -> Result<(), Box<dyn std::error::Error>> {
2428
// Parse command line arguments
@@ -125,8 +129,6 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
125129

126130
// Start the gRPC server
127131
info!("Starting gRPC server on: {addr}");
128-
// Set max message size to 100MB for large models like DeepSeek-V3
129-
const MAX_MESSAGE_SIZE: usize = 100 * 1024 * 1024;
130132
let server_result = Server::builder()
131133
.add_service(HealthServiceServer::new(health_service))
132134
.add_service(ApiServiceServer::new(api_service))

modelexpress_server/src/state.rs

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -236,8 +236,16 @@ impl P2pStateManager {
236236
let new_workers_json = serde_json::to_string(&new_workers)?;
237237
let timestamp = chrono::Utc::now().timestamp();
238238

239-
// Lua script for atomic read-modify-write merge
240-
// This runs atomically in Redis, preventing race conditions
239+
// Lua script for atomic read-modify-write merge of worker metadata.
240+
//
241+
// WHY LUA? In a TP=8 setup, 8 GPU workers publish metadata concurrently.
242+
// Without atomicity, two workers could read the same state, each add their
243+
// own entry, and one overwrites the other (lost update). The Lua script
244+
// runs as a single atomic operation in Redis, so the read-merge-write
245+
// sequence is never interleaved with another worker's publish.
246+
//
247+
// The script: 1) reads existing workers, 2) merges new workers by rank
248+
// (update if rank exists, append if new), 3) sorts by rank, 4) writes back.
241249
let script = redis::Script::new(
242250
r#"
243251
local key = KEYS[1]
@@ -333,7 +341,9 @@ impl P2pStateManager {
333341
}
334342
}
335343

336-
/// Remove metadata for a model (cleanup)
344+
/// Remove metadata for a model (cleanup).
345+
/// Currently unused - reserved for future admin/cleanup endpoints.
346+
#[allow(dead_code)]
337347
pub async fn remove_metadata(
338348
&self,
339349
model_name: &str,
@@ -348,7 +358,9 @@ impl P2pStateManager {
348358
Ok(())
349359
}
350360

351-
/// List all registered model names
361+
/// List all registered model names.
362+
/// Currently unused - reserved for future admin/list endpoints.
363+
#[allow(dead_code)]
352364
pub async fn list_models(
353365
&self,
354366
) -> Result<Vec<String>, Box<dyn std::error::Error + Send + Sync>> {

0 commit comments

Comments
 (0)