Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 66 additions & 0 deletions tests/e2e/offline_inference/test_qwen3_omni_pd.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
"""
E2E offline tests for Qwen3-Omni-MoE with PD (Prefill-Decode) disaggregation.

Tests both text-only and audio output modalities through the 4-stage
PD pipeline: Prefill -> Decode -> Talker -> Code2Wav.
"""

import os

os.environ["VLLM_WORKER_MULTIPROC_METHOD"] = "spawn"
os.environ["VLLM_TEST_CLEAN_GPU_MEMORY"] = "0"

from pathlib import Path

import pytest

from tests.conftest import (
generate_synthetic_video,
)
from tests.utils import hardware_test

models = ["Qwen/Qwen3-Omni-30B-A3B-Instruct"]

# PD disaggregation CI stage config (requires 3x GPUs)
stage_configs = [str(Path(__file__).parent.parent / "stage_configs" / "qwen3_omni_pd_ci.yaml")]

# Create parameter combinations for model and stage config
test_params = [(model, stage_config) for model in models for stage_config in stage_configs]


def get_question(prompt_type="video"):
prompts = {
"video": "Describe the video briefly.",
"text": "What is the capital of China? Answer in 20 words.",
}
return prompts.get(prompt_type, prompts["video"])


@pytest.mark.core_model
@pytest.mark.omni
@hardware_test(res={"cuda": "H100"}, num_cards=3)
@pytest.mark.parametrize("omni_runner", test_params, indirect=True)
def test_pd_text_only(omni_runner, omni_runner_handler) -> None:
"""Test PD disaggregation with text-only output (no talker/code2wav)."""
request_config = {
"prompts": get_question("text"),
"modalities": ["text"],
}
omni_runner_handler.send_request(request_config)


@pytest.mark.core_model
@pytest.mark.omni
@hardware_test(res={"cuda": "H100"}, num_cards=3)
@pytest.mark.parametrize("omni_runner", test_params, indirect=True)
def test_pd_video_to_audio(omni_runner, omni_runner_handler) -> None:
"""Test PD disaggregation with video input and audio output
through the full 4-stage pipeline."""
video = generate_synthetic_video(224, 224, 300)["np_array"]

request_config = {
"prompts": get_question("video"),
"videos": video,
"modalities": ["audio"],
}
omni_runner_handler.send_request(request_config)
122 changes: 122 additions & 0 deletions tests/e2e/online_serving/test_qwen3_omni_pd.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
"""
E2E online serving tests for Qwen3-Omni-MoE with PD (Prefill-Decode) disaggregation.

Tests both text-only and audio output modalities via the OpenAI-compatible API
through the 4-stage PD pipeline: Prefill -> Decode -> Talker -> Code2Wav.
"""

import os
from pathlib import Path

import pytest

from tests.conftest import (
dummy_messages_from_mix_data,
generate_synthetic_audio,
generate_synthetic_image,
generate_synthetic_video,
)
from tests.utils import hardware_test

os.environ["VLLM_WORKER_MULTIPROC_METHOD"] = "spawn"
os.environ["VLLM_TEST_CLEAN_GPU_MEMORY"] = "0"

models = ["Qwen/Qwen3-Omni-30B-A3B-Instruct"]

# PD disaggregation CI stage config (requires 3x GPUs)
stage_configs = [str(Path(__file__).parent.parent / "stage_configs" / "qwen3_omni_pd_ci.yaml")]

# Create parameter combinations for model and stage config
test_params = [(model, stage_config) for model in models for stage_config in stage_configs]


def get_system_prompt():
return {
"role": "system",
"content": [
{
"type": "text",
"text": (
"You are Qwen, a virtual human developed by the Qwen Team, "
"Alibaba Group, capable of perceiving auditory and visual inputs, "
"as well as generating text and speech."
),
}
],
}


def get_prompt(prompt_type="text_only"):
prompts = {
"text_only": "What is the capital of China? Answer in 20 words.",
"mix": "What is recited in the audio? What is in this image? Describe the video briefly.",
}
return prompts.get(prompt_type, prompts["text_only"])


@pytest.mark.advanced_model
@pytest.mark.core_model
@pytest.mark.omni
@hardware_test(res={"cuda": "H100"}, num_cards=3)
@pytest.mark.parametrize("omni_server", test_params, indirect=True)
def test_pd_text_to_text(omni_server, openai_client) -> None:
"""
Test PD disaggregation with text-only output via OpenAI API.
Deploy Setting: PD separation yaml
Input Modal: text
Output Modal: text
Input Setting: stream=False
Datasets: single request
"""
messages = dummy_messages_from_mix_data(
system_prompt=get_system_prompt(),
content_text=get_prompt("text_only"),
)

request_config = {
"model": omni_server.model,
"messages": messages,
"stream": False,
"modalities": ["text"],
"key_words": {"text": ["beijing"]},
}

openai_client.send_request(request_config)


@pytest.mark.advanced_model
@pytest.mark.core_model
@pytest.mark.omni
@hardware_test(res={"cuda": "H100"}, num_cards=3)
@pytest.mark.parametrize("omni_server", test_params, indirect=True)
def test_pd_mix_to_text_audio(omni_server, openai_client) -> None:
"""
Test PD disaggregation with multi-modal input and text+audio output via OpenAI API.
Deploy Setting: PD separation yaml
Input Modal: text + audio + video + image
Output Modal: text + audio
Input Setting: stream=True
Datasets: single request
"""
video_data_url = f"data:video/mp4;base64,{generate_synthetic_video(224, 224, 300)['base64']}"
image_data_url = f"data:image/jpeg;base64,{generate_synthetic_image(224, 224)['base64']}"
audio_data_url = f"data:audio/wav;base64,{generate_synthetic_audio(5, 1)['base64']}"
messages = dummy_messages_from_mix_data(
system_prompt=get_system_prompt(),
video_data_url=video_data_url,
image_data_url=image_data_url,
audio_data_url=audio_data_url,
content_text=get_prompt("mix"),
)

request_config = {
"model": omni_server.model,
"messages": messages,
"stream": True,
"key_words": {
"audio": ["water", "chirping", "crackling", "rain"],
"image": ["square", "quadrate"],
},
}

openai_client.send_request(request_config)
184 changes: 184 additions & 0 deletions tests/e2e/stage_configs/qwen3_omni_pd_ci.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
# Stage config for Qwen3-Omni-MoE with PD (Prefill-Decode) disaggregation
# CI variant: uses load_format: dummy so tests can run without real weights.
#
# Stage 0: Thinker Prefill (prompt processing, KV producer)
# Stage 1: Thinker Decode (token generation, KV consumer)
# Stage 2: Talker (text embeddings -> RVQ codec codes)
# Stage 3: Code2Wav (RVQ codes -> audio waveform)
#
# Requires 3x GPUs: GPU 0 = prefill, GPU 1 = decode, GPU 2 = talker + code2wav
# Both prefill and decode stages MUST use the same tensor_parallel_size.

async_chunk: false
stage_args:
- stage_id: 0
stage_type: llm
is_prefill_only: true
runtime:
devices: "0"
max_batch_size: 5
engine_args:
model_stage: thinker
model_arch: Qwen3OmniMoeForConditionalGeneration
worker_type: ar
scheduler_cls: vllm_omni.core.sched.omni_ar_scheduler.OmniARScheduler
gpu_memory_utilization: 0.9
enforce_eager: true
trust_remote_code: true
engine_output_type: latent
distributed_executor_backend: "mp"
enable_prefix_caching: false
max_num_batched_tokens: 32768
max_model_len: 32768
hf_config_name: thinker_config
tensor_parallel_size: 1
load_format: dummy
kv_transfer_config:
kv_connector: "MooncakeConnector"
kv_role: "kv_producer"
kv_rank: 0
kv_parallel_size: 2
engine_id: "omni-thinker-prefill"
kv_connector_extra_config:
mooncake_bootstrap_port: 25201
final_output: false
is_comprehension: true
default_sampling_params:
temperature: 0.4
top_p: 0.9
top_k: 1
max_tokens: 100
seed: 42
detokenize: True
repetition_penalty: 1.05

- stage_id: 1
stage_type: llm
is_decode_only: true
runtime:
devices: "1"
max_batch_size: 5
engine_args:
model_stage: thinker
model_arch: Qwen3OmniMoeForConditionalGeneration
worker_type: ar
scheduler_cls: vllm_omni.core.sched.omni_ar_scheduler.OmniARScheduler
gpu_memory_utilization: 0.9
enforce_eager: true
trust_remote_code: true
engine_output_type: latent
distributed_executor_backend: "mp"
enable_prefix_caching: false
max_num_batched_tokens: 32768
max_model_len: 32768
hf_config_name: thinker_config
tensor_parallel_size: 1
load_format: dummy
kv_transfer_config:
kv_connector: "MooncakeConnector"
kv_role: "kv_consumer"
kv_rank: 1
kv_parallel_size: 2
engine_id: "omni-thinker-decode"
kv_connector_extra_config:
mooncake_bootstrap_port: 25202
engine_input_source: [0]
final_output: true
final_output_type: text
is_comprehension: true
default_sampling_params:
temperature: 0.4
top_p: 0.9
top_k: 1
max_tokens: 100
seed: 42
detokenize: True
repetition_penalty: 1.05

- stage_id: 2
stage_type: llm
runtime:
devices: "2"
max_batch_size: 5
engine_args:
model_stage: talker
model_arch: Qwen3OmniMoeForConditionalGeneration
worker_type: ar
scheduler_cls: vllm_omni.core.sched.omni_ar_scheduler.OmniARScheduler
gpu_memory_utilization: 0.6
enforce_eager: true
trust_remote_code: true
engine_output_type: latent
enable_prefix_caching: false
max_num_batched_tokens: 32768
max_model_len: 32768
distributed_executor_backend: "mp"
hf_config_name: talker_config
load_format: dummy
engine_input_source: [1]
custom_process_input_func: vllm_omni.model_executor.stage_input_processors.qwen3_omni.thinker2talker
default_sampling_params:
temperature: 0.9
top_k: 50
max_tokens: 1000
seed: 42
detokenize: False
repetition_penalty: 1.05
stop_token_ids: [2150]

- stage_id: 3
stage_type: llm
runtime:
devices: "2"
max_batch_size: 1
engine_args:
model_stage: code2wav
model_arch: Qwen3OmniMoeForConditionalGeneration
worker_type: generation
scheduler_cls: vllm_omni.core.sched.omni_generation_scheduler.OmniGenerationScheduler
enforce_eager: true
trust_remote_code: true
enable_prefix_caching: false
engine_output_type: audio
gpu_memory_utilization: 0.1
distributed_executor_backend: "mp"
max_num_batched_tokens: 100000
hf_config_name: thinker_config
async_scheduling: false
load_format: dummy
engine_input_source: [2]
custom_process_input_func: vllm_omni.model_executor.stage_input_processors.qwen3_omni.talker2code2wav
final_output: true
final_output_type: audio
default_sampling_params:
temperature: 0.0
top_p: 1.0
top_k: -1
max_tokens: 2000
seed: 42
detokenize: True
repetition_penalty: 1.1

# Runtime edges
runtime:
enabled: true
defaults:
window_size: -1
max_inflight: 1

connectors:
shared_memory_connector:
name: SharedMemoryConnector
extra:
shm_threshold_bytes: 65536

edges:
- from: 0
to: 1
window_size: -1
- from: 1
to: 2
window_size: -1
- from: 2
to: 3
window_size: -1
Loading
Loading