Skip to content

Commit 16be03b

Browse files
[Feature] Implement YuanrongConnector based on OmniConnectorBase (vllm-project#716)
Signed-off-by: yangsonglin13 <yangsonglin566@gmail.com>
1 parent 2cc4375 commit 16be03b

File tree

11 files changed

+416
-154
lines changed

11 files changed

+416
-154
lines changed
Lines changed: 51 additions & 145 deletions
Original file line numberDiff line numberDiff line change
@@ -1,200 +1,106 @@
11
# Disaggregated Inference for Omni-Modality Models
22

3-
This guide explains how to configure and use distributed connectors (vllm_omni/distributed/connectors) in vllm-omni for multi-stage pipelines.
3+
This guide explains how to configure and use distributed connectors
4+
(`vllm_omni/distributed/omni_connectors`) in vllm-omni for multi-stage pipelines.
45

5-
## 1. Overview
6+
Backend-specific setup lives in separate docs:
7+
8+
- [SharedMemoryConnector](omni_connectors/shared_memory_connector.md)
9+
- [MooncakeConnector](omni_connectors/mooncake_connector.md)
10+
- [YuanrongConnector](omni_connectors/yuanrong_connector.md)
11+
12+
## Overview
613

714
Connectors enable data transfer between pipeline stages (e.g., Thinker -> Talker).
8-
Currently supported connectors operate in **D2H2D (Device-to-Host-to-Device)** mode:
9-
1. **SharedMemoryConnector**: Uses system shared memory.
10-
2. **MooncakeConnector**: Uses [Mooncake](https://github.com/kvcache-ai/Mooncake).
15+
Current connectors operate in D2H2D (device to host to device) mode.
1116

12-
* **SharedMemoryConnector (Default)**: Zero-copy (on host), lowest latency. Best for **single-node** deployments. Auto-configured if no connectors are specified.
13-
* **MooncakeConnector**: TCP/RDMA based. Best for **multi-node** distributed deployments. Requires a Mooncake Master service.
17+
## Connector Choices
1418

15-
## 2. API Design
19+
| Use Case | Recommended Connector | Notes |
20+
| :--- | :--- | :--- |
21+
| Single node | SharedMemoryConnector | Auto-configured if no connector is specified. |
22+
| Multi node (Mooncake) | MooncakeConnector | Requires Mooncake Master + metadata server. |
23+
| Multi node (Yuanrong) | YuanrongConnector | Requires Yuanrong Datasystem + etcd. |
1624

17-
The connector system is built around the `OmniConnectorBase` abstraction, which decouples data transport from stage logic.
25+
## Core API
1826

19-
### Core Interface
27+
The connector system is built around `OmniConnectorBase`.
2028

2129
```python
2230
class OmniConnectorBase(ABC):
2331
@abstractmethod
24-
def put(self, from_stage: str, to_stage: str, request_id: str, data: Any) -> tuple[bool, int, Optional[dict]]:
32+
def put(self, from_stage: str, to_stage: str, put_key: str, data: Any) -> tuple[bool, int, Optional[dict]]:
2533
"""
2634
Store data.
2735
Returns: (success, serialized_size, metadata)
2836
"""
2937
pass
3038

3139
@abstractmethod
32-
def get(self, from_stage: str, to_stage: str, request_id: str, metadata: Optional[dict] = None) -> Optional[tuple[Any, int]]:
40+
def get(self, from_stage: str, to_stage: str, get_key: str, metadata: Optional[dict] = None) -> Optional[tuple[Any, int]]:
3341
"""
3442
Retrieve data.
35-
Args: metadata - Transport-specific handles returned by put() (e.g., SHM name).
43+
Args: metadata - transport-specific handles returned by put() (e.g., SHM name).
3644
Returns: (object, serialized_size)
3745
"""
3846
pass
3947
```
4048

41-
### Key Concept: Metadata Passing
42-
Unlike a pure key-value store, some connectors (like `SharedMemoryConnector`) generate transient resources (e.g., a shared memory block name) during `put()`. This `metadata` **must be passed via the control plane** (e.g., HTTP headers, queue messages) from the producer stage to the consumer stage so `get()` can locate the data.
43-
44-
## 3. Backends & Use Cases
45-
46-
### 3.1 SharedMemoryConnector
47-
**Best for:** Single-node, high-performance IPC.
48-
49-
* **Mechanism:**
50-
* **Small Payloads (< Threshold)**: Data is serialized and passed directly "inline" within the `metadata` dictionary. This avoids the overhead of creating SHM blocks for tiny messages.
51-
* **Large Payloads (>= Threshold)**: Data is written to a named System Shared Memory block. The block name is returned in `metadata`.
52-
* **Configuration:**
53-
* `shm_threshold_bytes`: Size in bytes to switch from inline to SHM (default: 64KB).
54-
55-
### 3.2 MooncakeConnector
56-
**Best for:** Multi-node distributed inference.
57-
58-
* **Mechanism:** Uses Mooncake's distributed KVCache store.
59-
* **Data Plane**: TCP or RDMA for high-bandwidth transfer.
60-
* **Control Plane**: Uses a centralized Mooncake Master and Metadata Server.
61-
* **Keying**: Deterministic keys based on `request_id/from_stage_to_stage`.
62-
* **Requirements**: Requires a running Mooncake Master service.
63-
64-
## 4. Relationship with vLLM
65-
66-
vLLM provides specialized distributed mechanisms for specific artifacts:
67-
* **KV Transfer** (`vllm.distributed.kv_transfer`): Optimized for transferring KV caches between prefill and decode instances (using NCCL, Mooncake, etc.).
68-
* **EC Transfer** (`vllm.distributed.ec_transfer`): Optimized for sharing encoder embeddings.
69-
* **Device Communicators** (`vllm.distributed.device_communicators`): Low-level primitives (NCCL, SHM) for Tensor/Pipeline Parallelism.
70-
71-
`vllm-omni` complements this by introducing a **Generalized Connector Abstraction** (`OmniConnector`) for multimodal pipelines. While vLLM's connectors are artifact-specific, `vllm-omni`:
72-
73-
1. **Unifies Transport**: Provides a single API (`put`/`get`) to transport *any* stage artifact (Input Embeddings, Hidden States, Audio/Image Tensors, KV Cache, Final Output) between arbitrary pipeline stages (e.g., AudioEncoder -> LLM -> AudioGenerator).
74-
2. **Extends Connectivity**: Enables flexible construction of complex DAGs (Directed Acyclic Graphs) where stages can run in the same process, same node, or across nodes, using the most appropriate backend (SHM, Mooncake, etc.) for each edge.
75-
3. **Wraps & Adapts**: Can internally utilize vLLM's specialized `kv_transfer` for KV paths while using generic transports (SHM/Mooncake) for other data types, presenting a consistent interface to the application layer.
76-
77-
## 5. Installation (Mooncake)
78-
79-
If using `MooncakeConnector`, install the library first:
80-
81-
```bash
82-
# For CUDA-enabled systems (Recommended)
83-
pip install mooncake-transfer-engine
84-
85-
# For non-CUDA systems
86-
pip install mooncake-transfer-engine-non-cuda
87-
```
88-
89-
## 6. Using MooncakeConnector
90-
91-
### 6.1 Start Mooncake Master
92-
93-
Start the master service on your primary node:
94-
95-
```bash
96-
# if you use mooncake SSD storage
97-
mkdir -p ./mc_storage
49+
### Metadata Passing
9850

99-
mooncake_master \
100-
--rpc_port=50051 \
101-
--enable_http_metadata_server=true \
102-
--http_metadata_server_host=0.0.0.0 \
103-
--http_metadata_server_port=8080 \
104-
--metrics_port=9003 \
105-
--root_fs_dir=./mc_storage/ \
106-
--cluster_id=mc-local-1 &
107-
```
108-
109-
### 6.2 Configuration (YAML)
51+
Some connectors (e.g., SharedMemoryConnector) generate transient resources during `put()`.
52+
This `metadata` must be passed through the control plane so `get()` can locate the data.
11053

111-
Edit your stage config (e.g., `qwen2_5_omni.yaml`).
54+
## Configuration Model
11255

113-
**Step 1: Define Connector in Global Runtime**
56+
Define connectors in runtime:
11457

11558
```yaml
11659
runtime:
11760
connectors:
118-
connector_of_mooncake:
119-
name: MooncakeConnector
61+
connector_of_shared_memory:
62+
name: SharedMemoryConnector
12063
extra:
121-
host: "127.0.0.1" # Local Worker IP
122-
metadata_server: "http://<MASTER_IP>:8080/metadata"
123-
master: "<MASTER_IP>:50051"
124-
segment: 512000000 # 512MB segment
125-
localbuf: 64000000 # 64MB buffer
126-
proto: "tcp" # "tcp" or "rdma"
127-
```
128-
129-
**Mooncake Configuration Parameters:**
130-
131-
* **host**: The hostname or IP address of the local machine (worker). Mooncake uses this to register itself in the metadata server so other nodes can find it.
132-
* **metadata_server**: The URL of the metadata server. This is used for service discovery and connection establishment (e.g., exchanging QP information for RDMA).
133-
* **master**: The address of the Mooncake Master Server (e.g., `<MASTER_IP>:50051`). This is used for global state management and control plane operations.
134-
* **segment**: The size of the global memory segment in bytes (default: ~512MB). This defines the shared memory region accessible by Mooncake for data transfer.
135-
* **localbuf**: The size of the local buffer in bytes (default: ~64MB). Used for local data buffering during transfer operations.
136-
* **proto**: The transport protocol to use. Options:
137-
* `tcp`: Standard TCP/IP (easier setup, universal compatibility).
138-
* `rdma`: Remote Direct Memory Access (higher performance, requires RDMA-capable hardware).
139-
140-
For more details, refer to the [Mooncake Repository](https://github.com/kvcache-ai/Mooncake).
141-
142-
**Step 2: Reference in Stages**
64+
shm_threshold_bytes: 65536
65+
```
14366
144-
Explicitly link stages using `input_connectors` and `output_connectors`:
67+
Wire stages to connectors:
14568
14669
```yaml
14770
stage_args:
14871
- stage_id: 0
149-
# ...
15072
output_connectors:
151-
to_stage_1: connector_of_mooncake
73+
to_stage_1: connector_of_shared_memory
15274

15375
- stage_id: 1
154-
# ...
15576
input_connectors:
156-
from_stage_0: connector_of_mooncake
77+
from_stage_0: connector_of_shared_memory
15778
```
15879
159-
## 7. Using SharedMemoryConnector (Auto-Mode)
160-
161-
**Best for single-node.**
162-
163-
The system will automatically create `SharedMemoryConnector`s for any pipeline edge that does not have an explicit connector defined. This is inferred from:
164-
1. `runtime.edges` list in the config.
165-
2. `engine_input_source` dependencies defined in `stage_args`.
80+
If a pipeline edge has no explicit connector, the system auto-creates a
81+
SharedMemoryConnector for that edge.
16682
167-
### Threshold Configuration
168-
By default, payloads larger than **64KB** (default threshold) are transferred via shared memory, while smaller ones use the control queue (inline).
83+
## Relationship with vLLM
16984
170-
To adjust this threshold (e.g., to 1GB), add the following to your `runtime.connectors`:
171-
172-
```yaml
173-
runtime:
174-
connectors:
175-
connector_of_shared_memory:
176-
name: SharedMemoryConnector
177-
extra:
178-
shm_threshold_bytes: 1024 # 1KB threshold
179-
```
180-
181-
## 8. Summary
85+
vLLM provides specialized distributed mechanisms for specific artifacts:
18286
183-
| Use Case | Recommended Connector | Configuration |
184-
| :--- | :--- | :--- |
185-
| **Single Node** | `SharedMemoryConnector` | **None** (Automatic) or Custom Threshold |
186-
| **Multi Node** | `MooncakeConnector` | Explicit YAML + Mooncake Master |
87+
- KV Transfer (`vllm.distributed.kv_transfer`): optimized for KV caches.
88+
- EC Transfer (`vllm.distributed.ec_transfer`): optimized for encoder embeddings.
89+
- Device Communicators (`vllm.distributed.device_communicators`): low-level primitives (NCCL, SHM).
18790

188-
## 9. Operational Notes (important)
91+
vllm-omni complements this with a generalized connector abstraction:
18992

190-
- **Fail-fast config validation**: the loader raises if any expected edge is missing a connector. Define `input_connectors`/`output_connectors` or rely on auto-SHM filling; otherwise startup aborts.
191-
- **Missing payloads halt the stage**: workers expect connector payloads; if metadata or connector config is missing, the stage raises and stops. Verify connector wiring and metadata propagation before production.
93+
1. Unifies transport via a single `put`/`get` API for any stage artifact.
94+
2. Enables DAG-style pipelines across processes or nodes with per-edge transports.
95+
3. Can wrap vLLM-specific transfers for KV paths while keeping a consistent interface.
19296

193-
## 10. Future Roadmap: Device-to-Device (D2D) Transport
97+
## Operational Notes
19498

195-
The current implementations (`SharedMemoryConnector`, `MooncakeConnector`) utilize a **D2H2D (Device-to-Host-to-Device)** data path. Tensors are moved to CPU memory (Host) for transport, which incurs PCIe overhead.
99+
- Fail-fast config validation: missing expected edges cause startup failures.
100+
- Missing payloads halt stages: verify connector wiring and metadata propagation.
196101

197-
As outlined in the design RFC, future versions will introduce **D2D (Device-to-Device)** connectors:
102+
## Future Roadmap: D2D Transport
198103

199-
* **Goal**: Direct GPU-to-GPU transfer (via NCCL, UCX, or IPC) to minimize latency for large tensor payloads.
200-
* **Mechanism**: The `OmniConnector` API allows `put()` to initiate a transfer and return a lightweight handle (metadata) via the control plane, while the heavy payload flows directly between devices.
104+
Current connectors use D2H2D paths. Future versions will introduce direct
105+
device-to-device connectors (NCCL, UCX, IPC) to reduce latency for large
106+
tensor payloads.
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
# MooncakeConnector
2+
3+
## When to Use
4+
5+
Best for multi-node distributed inference using Mooncake.
6+
7+
## Installation
8+
9+
```bash
10+
# For CUDA-enabled systems (recommended)
11+
pip install mooncake-transfer-engine
12+
13+
# For non-CUDA systems
14+
pip install mooncake-transfer-engine-non-cuda
15+
```
16+
17+
## Start Mooncake Master
18+
19+
```bash
20+
# If you use Mooncake SSD storage
21+
mkdir -p ./mc_storage
22+
23+
mooncake_master \
24+
--rpc_port=50051 \
25+
--enable_http_metadata_server=true \
26+
--http_metadata_server_host=0.0.0.0 \
27+
--http_metadata_server_port=8080 \
28+
--metrics_port=9003 \
29+
--root_fs_dir=./mc_storage/ \
30+
--cluster_id=mc-local-1 &
31+
```
32+
33+
## Configuration
34+
35+
Define the connector in runtime:
36+
37+
```yaml
38+
runtime:
39+
connectors:
40+
connector_of_mooncake:
41+
name: MooncakeConnector
42+
extra:
43+
host: "127.0.0.1"
44+
metadata_server: "http://<MASTER_IP>:8080/metadata"
45+
master: "<MASTER_IP>:50051"
46+
segment: 512000000
47+
localbuf: 64000000
48+
proto: "tcp"
49+
```
50+
51+
Wire stages to the connector:
52+
53+
```yaml
54+
stage_args:
55+
- stage_id: 0
56+
output_connectors:
57+
to_stage_1: connector_of_mooncake
58+
59+
- stage_id: 1
60+
input_connectors:
61+
from_stage_0: connector_of_mooncake
62+
```
63+
64+
Parameters:
65+
66+
- host: local worker IP registered in the metadata server.
67+
- metadata_server: metadata server URL for discovery and setup.
68+
- master: Mooncake Master address.
69+
- segment: global memory segment size in bytes.
70+
- localbuf: local buffer size in bytes.
71+
- proto: transport protocol ("tcp" or "rdma").
72+
73+
For more details, refer to the
74+
[Mooncake repository](https://github.com/kvcache-ai/Mooncake).
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# SharedMemoryConnector
2+
3+
## When to Use
4+
5+
Best for single-node deployments where stages run on the same host. It is
6+
auto-configured when no explicit connector is specified for an edge.
7+
8+
## How It Works
9+
10+
- Small payloads (< threshold): serialized and passed inline in metadata.
11+
- Large payloads (>= threshold): stored in shared memory; the SHM name is
12+
returned in metadata.
13+
14+
## Configuration
15+
16+
```yaml
17+
runtime:
18+
connectors:
19+
connector_of_shared_memory:
20+
name: SharedMemoryConnector
21+
extra:
22+
shm_threshold_bytes: 65536
23+
```
24+
25+
## Notes
26+
27+
- Auto-mode uses SharedMemoryConnector if no connector is declared for an edge.

0 commit comments

Comments
 (0)