Skip to content

Commit 217d30d

Browse files
ahengljhclaude
andcommitted
[Review] Address PR #1303 feedback round 2: robustness, tests, e2e
- Neutralize stop/stop_token_ids in prefill sampling params to ensure finish_reason='length' (prevents MooncakeConnector KV transfer cancel) - Add _DEFAULT_MOONCAKE_BOOTSTRAP_PORT named constant - Add tensor_parallel_size validation in PD config check - Improve error messages with type info for kv_transfer_config parsing - Add defense-in-depth cleanup of _pd_kv_params_by_req after generation - Upgrade auto-duplication log to WARNING with suppression hint - Downgrade per-request PD routing/trace logs from INFO to DEBUG - Add vLLM version compatibility warning in monkey_patch.py - Use dynamic __qualname__ from original MooncakeConnector - Add padding threshold warning (512 tokens) in model zero-padding - Add clarifying comments on threading model, merge order, save-patch-restore - Add unit tests: stop neutralization, failure/leak cleanup, TP validation - Add PD e2e tests for both text and audio modalities (offline + online) - Add PD CI stage config with load_format: dummy Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent df087f3 commit 217d30d

File tree

11 files changed

+667
-18
lines changed

11 files changed

+667
-18
lines changed
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
"""
2+
E2E offline tests for Qwen3-Omni-MoE with PD (Prefill-Decode) disaggregation.
3+
4+
Tests both text-only and audio output modalities through the 4-stage
5+
PD pipeline: Prefill -> Decode -> Talker -> Code2Wav.
6+
"""
7+
8+
import os
9+
10+
os.environ["VLLM_WORKER_MULTIPROC_METHOD"] = "spawn"
11+
os.environ["VLLM_TEST_CLEAN_GPU_MEMORY"] = "0"
12+
13+
from pathlib import Path
14+
15+
import pytest
16+
17+
from tests.conftest import (
18+
generate_synthetic_video,
19+
)
20+
from tests.utils import hardware_test
21+
22+
models = ["Qwen/Qwen3-Omni-30B-A3B-Instruct"]
23+
24+
# PD disaggregation CI stage config (requires 3x GPUs)
25+
stage_configs = [str(Path(__file__).parent.parent / "stage_configs" / "qwen3_omni_pd_ci.yaml")]
26+
27+
# Create parameter combinations for model and stage config
28+
test_params = [(model, stage_config) for model in models for stage_config in stage_configs]
29+
30+
31+
def get_question(prompt_type="video"):
32+
prompts = {
33+
"video": "Describe the video briefly.",
34+
"text": "What is the capital of China? Answer in 20 words.",
35+
}
36+
return prompts.get(prompt_type, prompts["video"])
37+
38+
39+
@pytest.mark.core_model
40+
@pytest.mark.omni
41+
@hardware_test(res={"cuda": "H100"}, num_cards=3)
42+
@pytest.mark.parametrize("omni_runner", test_params, indirect=True)
43+
def test_pd_text_only(omni_runner, omni_runner_handler) -> None:
44+
"""Test PD disaggregation with text-only output (no talker/code2wav)."""
45+
request_config = {
46+
"prompts": get_question("text"),
47+
"modalities": ["text"],
48+
}
49+
omni_runner_handler.send_request(request_config)
50+
51+
52+
@pytest.mark.core_model
53+
@pytest.mark.omni
54+
@hardware_test(res={"cuda": "H100"}, num_cards=3)
55+
@pytest.mark.parametrize("omni_runner", test_params, indirect=True)
56+
def test_pd_video_to_audio(omni_runner, omni_runner_handler) -> None:
57+
"""Test PD disaggregation with video input and audio output
58+
through the full 4-stage pipeline."""
59+
video = generate_synthetic_video(224, 224, 300)["np_array"]
60+
61+
request_config = {
62+
"prompts": get_question("video"),
63+
"videos": video,
64+
"modalities": ["audio"],
65+
}
66+
omni_runner_handler.send_request(request_config)
Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
"""
2+
E2E online serving tests for Qwen3-Omni-MoE with PD (Prefill-Decode) disaggregation.
3+
4+
Tests both text-only and audio output modalities via the OpenAI-compatible API
5+
through the 4-stage PD pipeline: Prefill -> Decode -> Talker -> Code2Wav.
6+
"""
7+
8+
import os
9+
from pathlib import Path
10+
11+
import pytest
12+
13+
from tests.conftest import (
14+
dummy_messages_from_mix_data,
15+
generate_synthetic_audio,
16+
generate_synthetic_image,
17+
generate_synthetic_video,
18+
)
19+
from tests.utils import hardware_test
20+
21+
os.environ["VLLM_WORKER_MULTIPROC_METHOD"] = "spawn"
22+
os.environ["VLLM_TEST_CLEAN_GPU_MEMORY"] = "0"
23+
24+
models = ["Qwen/Qwen3-Omni-30B-A3B-Instruct"]
25+
26+
# PD disaggregation CI stage config (requires 3x GPUs)
27+
stage_configs = [str(Path(__file__).parent.parent / "stage_configs" / "qwen3_omni_pd_ci.yaml")]
28+
29+
# Create parameter combinations for model and stage config
30+
test_params = [(model, stage_config) for model in models for stage_config in stage_configs]
31+
32+
33+
def get_system_prompt():
34+
return {
35+
"role": "system",
36+
"content": [
37+
{
38+
"type": "text",
39+
"text": (
40+
"You are Qwen, a virtual human developed by the Qwen Team, "
41+
"Alibaba Group, capable of perceiving auditory and visual inputs, "
42+
"as well as generating text and speech."
43+
),
44+
}
45+
],
46+
}
47+
48+
49+
def get_prompt(prompt_type="text_only"):
50+
prompts = {
51+
"text_only": "What is the capital of China? Answer in 20 words.",
52+
"mix": "What is recited in the audio? What is in this image? Describe the video briefly.",
53+
}
54+
return prompts.get(prompt_type, prompts["text_only"])
55+
56+
57+
@pytest.mark.advanced_model
58+
@pytest.mark.core_model
59+
@pytest.mark.omni
60+
@hardware_test(res={"cuda": "H100"}, num_cards=3)
61+
@pytest.mark.parametrize("omni_server", test_params, indirect=True)
62+
def test_pd_text_to_text(omni_server, openai_client) -> None:
63+
"""
64+
Test PD disaggregation with text-only output via OpenAI API.
65+
Deploy Setting: PD separation yaml
66+
Input Modal: text
67+
Output Modal: text
68+
Input Setting: stream=False
69+
Datasets: single request
70+
"""
71+
messages = dummy_messages_from_mix_data(
72+
system_prompt=get_system_prompt(),
73+
content_text=get_prompt("text_only"),
74+
)
75+
76+
request_config = {
77+
"model": omni_server.model,
78+
"messages": messages,
79+
"stream": False,
80+
"modalities": ["text"],
81+
"key_words": {"text": ["beijing"]},
82+
}
83+
84+
openai_client.send_request(request_config)
85+
86+
87+
@pytest.mark.advanced_model
88+
@pytest.mark.core_model
89+
@pytest.mark.omni
90+
@hardware_test(res={"cuda": "H100"}, num_cards=3)
91+
@pytest.mark.parametrize("omni_server", test_params, indirect=True)
92+
def test_pd_mix_to_text_audio(omni_server, openai_client) -> None:
93+
"""
94+
Test PD disaggregation with multi-modal input and text+audio output via OpenAI API.
95+
Deploy Setting: PD separation yaml
96+
Input Modal: text + audio + video + image
97+
Output Modal: text + audio
98+
Input Setting: stream=True
99+
Datasets: single request
100+
"""
101+
video_data_url = f"data:video/mp4;base64,{generate_synthetic_video(224, 224, 300)['base64']}"
102+
image_data_url = f"data:image/jpeg;base64,{generate_synthetic_image(224, 224)['base64']}"
103+
audio_data_url = f"data:audio/wav;base64,{generate_synthetic_audio(5, 1)['base64']}"
104+
messages = dummy_messages_from_mix_data(
105+
system_prompt=get_system_prompt(),
106+
video_data_url=video_data_url,
107+
image_data_url=image_data_url,
108+
audio_data_url=audio_data_url,
109+
content_text=get_prompt("mix"),
110+
)
111+
112+
request_config = {
113+
"model": omni_server.model,
114+
"messages": messages,
115+
"stream": True,
116+
"key_words": {
117+
"audio": ["water", "chirping", "crackling", "rain"],
118+
"image": ["square", "quadrate"],
119+
},
120+
}
121+
122+
openai_client.send_request(request_config)
Lines changed: 184 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,184 @@
1+
# Stage config for Qwen3-Omni-MoE with PD (Prefill-Decode) disaggregation
2+
# CI variant: uses load_format: dummy so tests can run without real weights.
3+
#
4+
# Stage 0: Thinker Prefill (prompt processing, KV producer)
5+
# Stage 1: Thinker Decode (token generation, KV consumer)
6+
# Stage 2: Talker (text embeddings -> RVQ codec codes)
7+
# Stage 3: Code2Wav (RVQ codes -> audio waveform)
8+
#
9+
# Requires 3x GPUs: GPU 0 = prefill, GPU 1 = decode, GPU 2 = talker + code2wav
10+
# Both prefill and decode stages MUST use the same tensor_parallel_size.
11+
12+
async_chunk: false
13+
stage_args:
14+
- stage_id: 0
15+
stage_type: llm
16+
is_prefill_only: true
17+
runtime:
18+
devices: "0"
19+
max_batch_size: 5
20+
engine_args:
21+
model_stage: thinker
22+
model_arch: Qwen3OmniMoeForConditionalGeneration
23+
worker_type: ar
24+
scheduler_cls: vllm_omni.core.sched.omni_ar_scheduler.OmniARScheduler
25+
gpu_memory_utilization: 0.9
26+
enforce_eager: true
27+
trust_remote_code: true
28+
engine_output_type: latent
29+
distributed_executor_backend: "mp"
30+
enable_prefix_caching: false
31+
max_num_batched_tokens: 32768
32+
max_model_len: 32768
33+
hf_config_name: thinker_config
34+
tensor_parallel_size: 1
35+
load_format: dummy
36+
kv_transfer_config:
37+
kv_connector: "MooncakeConnector"
38+
kv_role: "kv_producer"
39+
kv_rank: 0
40+
kv_parallel_size: 2
41+
engine_id: "omni-thinker-prefill"
42+
kv_connector_extra_config:
43+
mooncake_bootstrap_port: 25201
44+
final_output: false
45+
is_comprehension: true
46+
default_sampling_params:
47+
temperature: 0.4
48+
top_p: 0.9
49+
top_k: 1
50+
max_tokens: 100
51+
seed: 42
52+
detokenize: True
53+
repetition_penalty: 1.05
54+
55+
- stage_id: 1
56+
stage_type: llm
57+
is_decode_only: true
58+
runtime:
59+
devices: "1"
60+
max_batch_size: 5
61+
engine_args:
62+
model_stage: thinker
63+
model_arch: Qwen3OmniMoeForConditionalGeneration
64+
worker_type: ar
65+
scheduler_cls: vllm_omni.core.sched.omni_ar_scheduler.OmniARScheduler
66+
gpu_memory_utilization: 0.9
67+
enforce_eager: true
68+
trust_remote_code: true
69+
engine_output_type: latent
70+
distributed_executor_backend: "mp"
71+
enable_prefix_caching: false
72+
max_num_batched_tokens: 32768
73+
max_model_len: 32768
74+
hf_config_name: thinker_config
75+
tensor_parallel_size: 1
76+
load_format: dummy
77+
kv_transfer_config:
78+
kv_connector: "MooncakeConnector"
79+
kv_role: "kv_consumer"
80+
kv_rank: 1
81+
kv_parallel_size: 2
82+
engine_id: "omni-thinker-decode"
83+
kv_connector_extra_config:
84+
mooncake_bootstrap_port: 25202
85+
engine_input_source: [0]
86+
final_output: true
87+
final_output_type: text
88+
is_comprehension: true
89+
default_sampling_params:
90+
temperature: 0.4
91+
top_p: 0.9
92+
top_k: 1
93+
max_tokens: 100
94+
seed: 42
95+
detokenize: True
96+
repetition_penalty: 1.05
97+
98+
- stage_id: 2
99+
stage_type: llm
100+
runtime:
101+
devices: "2"
102+
max_batch_size: 5
103+
engine_args:
104+
model_stage: talker
105+
model_arch: Qwen3OmniMoeForConditionalGeneration
106+
worker_type: ar
107+
scheduler_cls: vllm_omni.core.sched.omni_ar_scheduler.OmniARScheduler
108+
gpu_memory_utilization: 0.6
109+
enforce_eager: true
110+
trust_remote_code: true
111+
engine_output_type: latent
112+
enable_prefix_caching: false
113+
max_num_batched_tokens: 32768
114+
max_model_len: 32768
115+
distributed_executor_backend: "mp"
116+
hf_config_name: talker_config
117+
load_format: dummy
118+
engine_input_source: [1]
119+
custom_process_input_func: vllm_omni.model_executor.stage_input_processors.qwen3_omni.thinker2talker
120+
default_sampling_params:
121+
temperature: 0.9
122+
top_k: 50
123+
max_tokens: 1000
124+
seed: 42
125+
detokenize: False
126+
repetition_penalty: 1.05
127+
stop_token_ids: [2150]
128+
129+
- stage_id: 3
130+
stage_type: llm
131+
runtime:
132+
devices: "2"
133+
max_batch_size: 1
134+
engine_args:
135+
model_stage: code2wav
136+
model_arch: Qwen3OmniMoeForConditionalGeneration
137+
worker_type: generation
138+
scheduler_cls: vllm_omni.core.sched.omni_generation_scheduler.OmniGenerationScheduler
139+
enforce_eager: true
140+
trust_remote_code: true
141+
enable_prefix_caching: false
142+
engine_output_type: audio
143+
gpu_memory_utilization: 0.1
144+
distributed_executor_backend: "mp"
145+
max_num_batched_tokens: 100000
146+
hf_config_name: thinker_config
147+
async_scheduling: false
148+
load_format: dummy
149+
engine_input_source: [2]
150+
custom_process_input_func: vllm_omni.model_executor.stage_input_processors.qwen3_omni.talker2code2wav
151+
final_output: true
152+
final_output_type: audio
153+
default_sampling_params:
154+
temperature: 0.0
155+
top_p: 1.0
156+
top_k: -1
157+
max_tokens: 2000
158+
seed: 42
159+
detokenize: True
160+
repetition_penalty: 1.1
161+
162+
# Runtime edges
163+
runtime:
164+
enabled: true
165+
defaults:
166+
window_size: -1
167+
max_inflight: 1
168+
169+
connectors:
170+
shared_memory_connector:
171+
name: SharedMemoryConnector
172+
extra:
173+
shm_threshold_bytes: 65536
174+
175+
edges:
176+
- from: 0
177+
to: 1
178+
window_size: -1
179+
- from: 1
180+
to: 2
181+
window_size: -1
182+
- from: 2
183+
to: 3
184+
window_size: -1

0 commit comments

Comments
 (0)