Make chunk_size and left_context_size configurable via YAML for async chunking#1423
Make chunk_size and left_context_size configurable via YAML for async chunking#1423LJH-LBJ wants to merge 33 commits intovllm-project:mainfrom
Conversation
…-chunk_size-and-left_context_size Signed-off-by: Junhong Liu <98734602+LJH-LBJ@users.noreply.github.com>
Signed-off-by: Junhong Liu <98734602+LJH-LBJ@users.noreply.github.com>
Signed-off-by: Junhong Liu <98734602+LJH-LBJ@users.noreply.github.com>
Signed-off-by: Junhong Liu <98734602+LJH-LBJ@users.noreply.github.com>
…ntext_size Signed-off-by: Junhong Liu <ljh_lbj@163.com>
Signed-off-by: Junhong Liu <98734602+LJH-LBJ@users.noreply.github.com>
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 9fe559c2fa
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
| hf_config_name: str | None = None | ||
| custom_process_next_stage_input_func: str | None = None | ||
| stage_connector_spec: dict[str, Any] = field(default_factory=dict) | ||
| async_chunk: bool = False |
There was a problem hiding this comment.
Should we remove async_chunk field here? It seems that it's not consistent with model.py
There was a problem hiding this comment.
I will keep async_chunk as before. Take it out from async_chunk_config
lishunyang12
left a comment
There was a problem hiding this comment.
Thanks for the contribution, I have a few comments.
vllm_omni/engine/arg_utils.py
Outdated
| "async_chunk": False, | ||
| "chunk_size": 25, | ||
| "left_context_size": 25, | ||
| } |
There was a problem hiding this comment.
The old async_chunk: bool = False field was replaced by this dict, but create_model_config on line 351 still does async_chunk=self.async_chunk. Since async_chunk is no longer a standalone field on AsyncOmniEngineArgs, this will raise AttributeError at runtime. (gcanlin flagged something similar.) It seems like async_chunk should either stay as a separate field or be derived here, e.g. self.async_chunk_config.get("async_chunk", False).
vllm_omni/engine/arg_utils.py
Outdated
| stage_connector_spec: dict[str, Any] = field(default_factory=dict) | ||
| async_chunk: bool = False | ||
| async_chunk_config: dict[str, Any] = field( | ||
| default_factory=lambda: { |
There was a problem hiding this comment.
Nesting async_chunk (a boolean) inside async_chunk_config (a dict) alongside chunk_size/left_context_size (integers) feels like mixing concerns. OmniModelConfig keeps async_chunk: bool and async_chunk_config: dict as separate fields. Would it be cleaner to keep async_chunk as its own field here too and have async_chunk_config only hold chunk_size and left_context_size?
There was a problem hiding this comment.
Sure, I will keep async_chunk as before.
| else: | ||
| logger.warning("No additional_information provided to code2wav stage.") | ||
| audio_tensors = self.generate_audio(codes, voice_type, left_context_size=left_context_size) | ||
|
|
There was a problem hiding this comment.
This warning fires for every non-async-chunk call (or whenever additional_information is None), which could be very noisy in production. Is this intentional for debugging only, or should it be logger.debug instead?
There was a problem hiding this comment.
Sure, I will use logger.debug instead
| left_context_size = async_chunk_config.get("left_context_size", 25) | ||
| logger.warning( | ||
| "Left context size for async chunking is not provided, falling back to config default: %s", | ||
| left_context_size, |
There was a problem hiding this comment.
Same concern here -- this warning fires every time the fallback path is taken, which is the normal path when additional_information does not include left_context_size. In a streaming scenario this could log on every chunk. Would logger.debug or a one-time warning be more appropriate?
| wav_chunk = batch_wav[idx, :, context_size * self.total_upsample : code_seq_len * self.total_upsample] | ||
| # Remove context from output (left_context_size * total_upsample samples) | ||
| wav_chunk = batch_wav[idx, :, left_context_size * self.total_upsample : code_seq_len * self.total_upsample] | ||
| wavs.append(wav_chunk) |
There was a problem hiding this comment.
chunk_size is declared as a parameter of chunked_decode_streaming but is no longer used anywhere in the method body after this change. Is that intentional? If it is only there for API compatibility, might be worth a comment or removing it entirely.
There was a problem hiding this comment.
ok, I will removed chunk_size entirely
| transfer_manager.code_prompt_token_ids[request_id].append(codec_codes) | ||
| length = len(transfer_manager.code_prompt_token_ids[request_id]) | ||
| chunk_length = length % chunk_size | ||
| chunk_length = length % chunk_size_config |
There was a problem hiding this comment.
What happens when length == context_length (i.e., the first chunk)? min(0, left_context_size_config) gives 0, so end_index = 0 + chunk_size_config. That looks correct. But for the very last chunk when chunk_length != 0, context_length = chunk_length which could be small. Then left_context_size = min(length - chunk_length, left_context_size_config). Have you verified this against the table in the PR description for the final chunk case?
There was a problem hiding this comment.
when the first chunk gets length == context_length, end_index = min(context_length, left_context_size + context_length) = min(4, 0+4), i.e. context_length=4.
when the last chunk is smaller than chunk_size_config, left_context_size = min(length - chunk_length, left_context_size_config) = min(37-1, 25) = 25 in the table in PR.
I think it is correct now.
There was a problem hiding this comment.
Ah got it, that tracks. Thanks for walking through it.
| # Pass additional fields (like left_context_size) to the request | ||
| req.additional_information = { | ||
| k: v for k, v in payload_data.items() if k not in ("code_predictor_codes", "finished") | ||
| } |
There was a problem hiding this comment.
This dict comprehension filters out code_predictor_codes and finished from payload_data. If new keys are added to the payload in the future, they will silently flow into additional_information. Would an explicit allowlist (e.g. only picking left_context_size) be safer here?
There was a problem hiding this comment.
OK, I will just pick left_context_size
| distributed_executor_backend: "mp" | ||
| hf_config_name: talker_config | ||
| custom_process_next_stage_input_func: vllm_omni.model_executor.stage_input_processors.qwen3_omni.talker2code2wav_async_chunk | ||
| async_chunk_config: |
There was a problem hiding this comment.
This config is under stage 1 (talker), which is where chunks are produced. But generate_audio in the code2wav stage also reads async_chunk_config from its own model_config (as a fallback when left_context_size is None). Since stage 2 does not have async_chunk_config in its YAML, it will silently use the default (25). Should async_chunk_config also be set under stage 2, or should the fallback in generate_audio be removed to make the additional_information path the only source of truth?
There was a problem hiding this comment.
Yes, chunk_size is no longer needed here. I will remove the fallback in generate_audio and only use the left_context_size from additional_information. So we needn't set async_chunk_config in stage 2
|
@vllm-omni-reviewer |
🤖 VLLM-Omni PR ReviewCode Review: Make chunk_size and left_context_size configurable via YAML for async chunking1. OverviewThis PR makes Overall Assessment: The PR has a clear purpose and the implementation follows a reasonable pattern. However, there are several concerns around edge case handling, missing validation, and incomplete test documentation that should be addressed before merging. 2. Code QualityPositive Aspects
Issues and ConcernsCritical: Logic change in # OLD CODE (removed):
if code_seq_len <= chunk_size:
context_size = 0
else:
context_size = left_context_size
# NEW CODE:
wav_chunk = batch_wav[idx, :, left_context_size * self.total_upsample : code_seq_len * self.total_upsample]This removes the safety check for short sequences. If Potential negative If left_context_size = max(0, min(length - context_length, left_context_size_config))Overwriting This completely replaces any existing if req.additional_information is None:
req.additional_information = {}
req.additional_information.update({
k: v for k, v in payload_data.items() if k not in ("code_predictor_codes", "finished")
})Missing input validation: No validation that def __post_init__(self):
if self.async_chunk:
chunk_size = self.async_chunk_config.get("chunk_size", 25)
left_context_size = self.async_chunk_config.get("left_context_size", 25)
if not isinstance(chunk_size, int) or chunk_size <= 0:
raise ValueError(f"chunk_size must be a positive integer, got {chunk_size}")
if not isinstance(left_context_size, int) or left_context_size <= 0:
raise ValueError(f"left_context_size must be a positive integer, got {left_context_size}")3. Architecture & DesignPositive Aspects
ConcernsWarning message may be too verbose: This warning will fire for non-async-chunk code paths. Consider checking if async_chunk is enabled first, or using Type hints could be more specific: async_chunk_config: dict[str, Any]Consider using a TypedDict or dataclass for better type safety: from typing import TypedDict
class AsyncChunkConfig(TypedDict, total=False):
chunk_size: int
left_context_size: int4. Security & Safety
5. Testing & DocumentationTest Plan IssuesThe PR states "Wait" for the test plan, which is incomplete. The checklist items are not filled in. Required test coverage:
Documentation
6. Specific Suggestions
|
Signed-off-by: Junhong Liu <98734602+LJH-LBJ@users.noreply.github.com>
|
Thanks for your fixes. I will take a look now. It is closer to ready state. |
There was a problem hiding this comment.
LGTM. Previous concerns resolved — async_chunk kept separate, allowlist for payload data, chunk_size param removed, logger.debug for the non-async path. Looks good. @princepride PTAL
|
Nice work! |
| # Remove context from output (context_size * total_upsample samples) | ||
| wav_chunk = batch_wav[idx, :, context_size * self.total_upsample : code_seq_len * self.total_upsample] | ||
| # Remove context from output (left_context_size * total_upsample samples) | ||
| wav_chunk = batch_wav[idx, :, left_context_size * self.total_upsample : code_seq_len * self.total_upsample] |
There was a problem hiding this comment.
In concurrent scenarios, the left_context_size of each request in code2wav may not be equal, so it is necessary to obtain the left_context_size of each request separately.
|
Please provide the accuracy results under high concurrency scenarios, and also provide the performance test results when chunk_size=4 and chunk_size=25. |
|
@vllm-omni-reviewer |
Signed-off-by: Junhong Liu <98734602+LJH-LBJ@users.noreply.github.com>
…-chunk_size-and-left_context_size Signed-off-by: Junhong Liu <98734602+LJH-LBJ@users.noreply.github.com>
Signed-off-by: Junhong Liu <98734602+LJH-LBJ@users.noreply.github.com>
…ntext_size Signed-off-by: Junhong Liu <ljh_lbj@163.com>
| distributed_executor_backend: "mp" | ||
| hf_config_name: talker_config | ||
| custom_process_next_stage_input_func: vllm_omni.model_executor.stage_input_processors.qwen3_omni.talker2code2wav_async_chunk | ||
| async_chunk_config: |
There was a problem hiding this comment.
The configurations for chunk_size and left_context_size should be consistent with those for TTS.
| transfer_manager: OmniChunkTransferAdapter, | ||
| pooling_output: dict[str, Any], | ||
| request: OmniEngineCoreRequest, | ||
| **kwargs, |
There was a problem hiding this comment.
The chunk_size configuration should be placed within the transfer_manager; do not pass it to the talker2code2wav_async_chunk function. Obtain the parameter from the transfer_manager.
Signed-off-by: Junhong Liu <98734602+LJH-LBJ@users.noreply.github.com>
Signed-off-by: Junhong Liu <98734602+LJH-LBJ@users.noreply.github.com>
Signed-off-by: Junhong Liu <98734602+LJH-LBJ@users.noreply.github.com>
…-chunk_size-and-left_context_size Signed-off-by: Junhong Liu <98734602+LJH-LBJ@users.noreply.github.com>
…ntext_size Signed-off-by: Junhong Liu <ljh_lbj@163.com>
lishunyang12
left a comment
There was a problem hiding this comment.
Left a couple comments. Most of the earlier feedback has been addressed, but there are a few remaining nits.
| scheduled_spec_decode_tokens: dict[str, list[int]] = {} | ||
| scheduled_encoder_inputs: dict[str, list[int]] = {} | ||
| cached_prompt_token_ids: dict[str, list[int]] = {} | ||
| cached_additional_information: dict[str, list[int]] = {} |
There was a problem hiding this comment.
Wrong type hint -- this stores dict | None values, not list[int]. Should be dict[str, dict | None].
vllm_omni/core/sched/output.py
Outdated
| """ | ||
|
|
||
| prompt_token_ids: dict[str, list[int]] | ||
| additional_information: dict[str, list[int]] |
There was a problem hiding this comment.
Same here -- should be dict[str, dict | None] to match what the scheduler actually puts in.
| """ | ||
| if not left_context_size: | ||
| logger.warning( | ||
| "left_context_size is None in chunked_decode_streaming;" |
There was a problem hiding this comment.
Nit: missing space before the semicolon causes the log parts to run together.
| # cached requests. This is required for stages without preprocess | ||
| # (e.g., code2wav) so runtime_additional_information can be refreshed | ||
| # from scheduler cached infos on every step. | ||
| if hasattr(self.model, "has_preprocess") or hasattr(self.model, "enable_update_additional_information"): |
There was a problem hiding this comment.
Why check has_preprocess here when enable_update_additional_information is the intended gate? Models with has_preprocess=True but no enable_update_additional_information would hit this path unnecessarily.
There was a problem hiding this comment.
In the original logic, when has_preprocess=True, update_additional_information can be called. Now I don’t want to change the original logic; I just want the code2wav stage to also be able to update additional_information.
There was a problem hiding this comment.
Fair enough, makes sense.
… of https://github.com/LJH-LBJ/vllm-omni into Supports-configurable-chunk_size-and-left_context_size Signed-off-by: Junhong Liu <98734602+LJH-LBJ@users.noreply.github.com>
|
Please check if the changes to TTS meet expectations. @Sy0307 @linyueqian |
|
Checked the TTS-side changes — overall LGTM. The move from magic Two minor things to flag:
Other than that, the |
|
|
LGTM. Plz add test results of Qwen3-tts . |
Done |
PLEASE FILL IN THE PR DESCRIPTION HERE ENSURING ALL CHECKLIST ITEMS (AT THE BOTTOM) HAVE BEEN CONSIDERED.
chunk_size: 4
left_context_size: 25
qwen3_omni_moe_async_chunk.yamlasync_chunk: true
stage_args:
stage_id: 0
stage_type: llm # Use llm stage type to launch OmniLLM
runtime:
devices: "0"
max_batch_size: 64
engine_args:
model_stage: thinker
model_arch: Qwen3OmniMoeForConditionalGeneration
worker_type: ar
scheduler_cls: vllm_omni.core.sched.omni_ar_scheduler.OmniARScheduler
gpu_memory_utilization: 0.9
enforce_eager: false
trust_remote_code: true
engine_output_type: latent # Output hidden states for talker
distributed_executor_backend: "mp"
enable_prefix_caching: false
max_num_batched_tokens: 32768
hf_config_name: thinker_config
tensor_parallel_size: 1
custom_process_next_stage_input_func: vllm_omni.model_executor.stage_input_processors.qwen3_omni.thinker2talker_async_chunk
final_output: true
final_output_type: text
is_comprehension: true
default_sampling_params:
temperature: 0.4
top_p: 0.9
top_k: 1
max_tokens: 2048
seed: 42
detokenize: True
repetition_penalty: 1.05
stage_id: 1
stage_type: llm # Use llm stage type to launch OmniLLM
runtime:
devices: "1"
max_batch_size: 64
engine_args:
model_stage: talker
model_arch: Qwen3OmniMoeForConditionalGeneration
worker_type: ar
scheduler_cls: vllm_omni.core.sched.omni_ar_scheduler.OmniARScheduler
gpu_memory_utilization: 0.6
enforce_eager: false
trust_remote_code: true
engine_output_type: latent # Output codec codes for code2wav
enable_prefix_caching: false
max_num_batched_tokens: 32768
distributed_executor_backend: "mp"
hf_config_name: talker_config
custom_process_next_stage_input_func: vllm_omni.model_executor.stage_input_processors.qwen3_omni.talker2code2wav_async_chunk
async_chunk_config:
chunk_size: 4 # code2wav decode chunk size
left_context_size: 25 # code2wav left context size
engine_input_source: [0]
default_sampling_params:
temperature: 0.9
top_k: 50
max_tokens: 4096
seed: 42
detokenize: False
repetition_penalty: 1.05
stop_token_ids: [2150]
stage_id: 2
stage_type: llm # Use llm stage type to launch OmniLLM
runtime:
devices: "1"
max_batch_size: 64
engine_args:
model_stage: code2wav
model_arch: Qwen3OmniMoeForConditionalGeneration
worker_type: generation
scheduler_cls: vllm_omni.core.sched.omni_generation_scheduler.OmniGenerationScheduler
enforce_eager: true
trust_remote_code: true
async_scheduling: false
enable_prefix_caching: false
engine_output_type: audio # Final output: audio waveform
gpu_memory_utilization: 0.1
distributed_executor_backend: "mp"
max_num_batched_tokens: 51200 # [TODO] if max_num_batch_tokens < max_batch_size * 800, there will be precision problem.
hf_config_name: thinker_config
engine_input_source: [1]
final_output: true
final_output_type: audio
default_sampling_params:
temperature: 0.0
top_p: 1.0
top_k: -1
max_tokens: 65536
seed: 42
detokenize: True
repetition_penalty: 1.1
Purpose
Resolve: #1239
This PR enables flexible configuration of chunk_size and left_context_size for the Code2Wav pipeline by exposing them in the async_chunk_config section of the stage YAML file. Previously, these values were hardcoded to 25. Now, users can adjust them per stage directly in the YAML, allowing for easier tuning and experimentation. The change ensures that the values from YAML override the dataclass defaults, improving usability and modularity for multi-stage pipelines. No functional logic is altered; only configuration handling is updated.
When the chunk_size is not equal to left_context_size
This is a demonstration of the calculation of left_context_size and code_predictor_codes
Test Plan
qwen3-omni accuracy
qwen3-omni benchmark
qwen3-tts
Test Result
qwen3-omni accuracy
qwen3-omni benchmark
Mean AUDIO_TTFP (ms) 2211 ->1175
main branch
opt branch chunk size = 25
opt branch chunk size = 4
qwen3-tts
Essential Elements of an Effective PR Description Checklist
supported_models.mdandexamplesfor a new model. Please runmkdocs serveto sync the documentation editions to./docs.BEFORE SUBMITTING, PLEASE READ https://github.com/vllm-project/vllm-omni/blob/main/CONTRIBUTING.md (anything written below this line will be removed by GitHub Actions)