[Config Refactor][1/N] Model Pipeline Configuration System#1115
[Config Refactor][1/N] Model Pipeline Configuration System#1115lishunyang12 wants to merge 6 commits intovllm-project:mainfrom
Conversation
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 5f718c777c
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
vllm_omni/entrypoints/omni.py
Outdated
| # Filter stages if --stage-id is specified (for independent launch) | ||
| if stage_id_filter is not None: | ||
| filtered_configs = [cfg for cfg in self.stage_configs if getattr(cfg, "stage_id", None) == stage_id_filter] |
There was a problem hiding this comment.
Preserve stage IDs when filtering configs
Filtering down to a single stage_id here reindexes self.stage_configs from 0, but downstream _start_stages still uses enumerate(self.stage_list) to pick the stage_id passed into get_stage_connector_config/resolve_omni_kv_config_for_stage. In an independent launch like --stage-id 1, the only stage will be treated as stage 0 for connector wiring, so it will subscribe to the wrong edges and never receive upstream payloads. Consider keeping original stage IDs or introducing an index→stage_id mapping for connector setup and scheduling in independent stage mode.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Added a NOTE comment explaining this is intentional — in independent
launch mode the stage runs in isolation without cross-stage connectors, so
list index 0 is correct.
xiedeyantu
left a comment
There was a problem hiding this comment.
Thank you for your excellent work. I have a few questions.
vllm_omni/entrypoints/omni.py
Outdated
| batch_timeout = kwargs.get("batch_timeout", 10) | ||
| stage_configs_path = kwargs.get("stage_configs_path", None) | ||
| log_stats = kwargs.get("log_stats", False) | ||
| stage_id_filter = kwargs.get("stage_id", None) # For independent stage launch |
There was a problem hiding this comment.
Maybe keep stage_id is better?
There was a problem hiding this comment.
Agreed, renamed to stage_id
vllm_omni/config/stage_config.py
Outdated
| final_output: Whether this stage produces final output. | ||
| final_output_type: Type of final output ("text", "audio", "image"). | ||
| worker_type: Worker type ("ar" or "generation"). | ||
| scheduler_cls: Full module path to scheduler class. |
There was a problem hiding this comment.
I have a small question: will there be multiple cls values? Which one will be used by default if I don't specify it? Could you add a comment to explain this?
There was a problem hiding this comment.
Added a comment. When None (default), the engine uses its built-in
scheduler. Model developers can specify a custom class (e.g.
vllm_omni.core.sched.omni_ar_scheduler.OmniARScheduler) to override
scheduling for a particular stage
There was a problem hiding this comment.
can we remove the scheduler_cls or the make the default shorter: OmniARScheduler/OmniGenerationScheduler...., I do not remember why we need to keep vllm_omni.core.sched.omni_ar_scheduler.OmniARScheduler?
There was a problem hiding this comment.
The full path is needed because scheduler_cls gets dynamically imported at runtime (importlib.import_module in omni_stage.py), so it has to be a resolvable module path. We could add a short-name registry that maps e.g. OmniARScheduler -> vllm_omni.core.sched.omni_ar_scheduler.OmniARScheduler, but that adds another lookup table to maintain. I would lean toward keeping the full path for now since it is explicit and only lives in the topology YAML (not user-facing). But happy to add the registry if you prefer — would be a small change.
vllm_omni/config/stage_config.py
Outdated
| # Build full config dict | ||
| config_dict: dict[str, Any] = { | ||
| "stage_id": self.stage_id, | ||
| "stage_type": self.stage_type.value if isinstance(self.stage_type, StageType) else self.stage_type, |
There was a problem hiding this comment.
Is it better to use an enumeration here or not? If we use an enumeration, should the else block throw an error?
There was a problem hiding this comment.
Good catch. Changed to StageType(self.stage_type).value which enforces the
enum and raises ValueError on invalid values
| "stage_type": self.stage_type.value if isinstance(self.stage_type, StageType) else self.stage_type, | ||
| "engine_args": OmegaConf.create(engine_args), | ||
| "runtime": OmegaConf.create(runtime), | ||
| "engine_input_source": self.input_sources, # Legacy field name |
There was a problem hiding this comment.
Will input_sources use the values from edges subsequently?
There was a problem hiding this comment.
Added a comment. For now input_sources is the simple upstream ID list. In
the future it may be derived from StageTopology.edges for richer edge
metadata (data format, buffering policy).
| @@ -0,0 +1,416 @@ | |||
| # SPDX-License-Identifier: Apache-2.0 | |||
There was a problem hiding this comment.
Could we also add tests for stage yaml parsing?
There was a problem hiding this comment.
Make sense, I will add it.
There was a problem hiding this comment.
Added TestTopologyYamlParsing with 5 tests: full qwen3_omni_moe parsing,
legacy engine_input_source backward compat, connectors/edges, validation
pass-through, and diffusion stage type
vllm_omni/config/stage_config.py
Outdated
| connectors: dict[str, Any] | None = None | ||
| edges: list[dict[str, Any]] | None = None | ||
|
|
||
| def validate_dag(self) -> list[str]: |
There was a problem hiding this comment.
I don't get the point why we need to check cycle dependency. It looks like it will add extra cost but almost no benefit. Anything I miss? Because when we support one model, it means that the topology will be valid and static. We wouldn't need to check or update it anymore.
There was a problem hiding this comment.
You're right. This would indeed add unnecessary workload. Now that you've pointed it out, I understand the issue. As someone new to testing, I'll revert the change and will be more mindful of this in future PRs.
There was a problem hiding this comment.
Removed _detect_cycles(). Topology is static and validated at integration
time, so cycle detection adds cost with no benefit.
f8b0ba0 to
f7e7639
Compare
wuhang2014
left a comment
There was a problem hiding this comment.
Big step forward of refactor. I have a suggestion here: we can use dataclass for configuration in omni.py and omni_stage.py instead of dict.
vllm_omni/config/stage_config.py
Outdated
| Two-Tier Stage Configuration System for vLLM-Omni. | ||
|
|
||
| Design Principles: | ||
| - Tier-1 (DAG Topology): INTERNAL ONLY - set by model developers at integration time |
There was a problem hiding this comment.
We may just naming it pipeline configuration or model pipeline configuration. DAG is one of implementations other than an abstraction.
There was a problem hiding this comment.
Good point. Renamed all "DAG Topology" references to "Pipeline Topology" throughout docstrings and comments. Also
renamed validate_dag() → validate_topology() for consistency
vllm_omni/config/stage_config.py
Outdated
| topology_path = PROJECT_ROOT / "vllm_omni" / "model_executor" / "stage_topologies" / topology_file | ||
|
|
||
| if not topology_path.exists(): | ||
| logger.debug(f"Topology file not found: {topology_path}") | ||
| return None |
There was a problem hiding this comment.
Is there any other method that can be more elegant to implement this? Maybe we can create a python module in topo dir, and just import the module to involve the config file. Or just move class StageConfigFactory to topo dir.
There was a problem hiding this comment.
Agreed — the hardcoded PROJECT_ROOT path was fragile. Created stage_topologies/init.py as a proper Python package
that exposes get_topology_path(filename). Now stage_config.py just does from vllm_omni.model_executor.stage_topologies
import get_topology_path and calls get_topology_path(topology_file) — no more PROJECT_ROOT computation.
vllm_omni/entrypoints/utils.py
Outdated
| runtime_params = { | ||
| "devices", | ||
| "gpu_memory_utilization", | ||
| "tensor_parallel_size", | ||
| "enforce_eager", | ||
| "max_num_batched_tokens", | ||
| "trust_remote_code", | ||
| "max_batch_size", | ||
| "distributed_executor_backend", | ||
| "enable_prefix_caching", | ||
| } |
There was a problem hiding this comment.
Can other cli args be override ? In my opinion, all parameters from cli that registered by engines can be used to construct EngineArgs
There was a problem hiding this comment.
You're right. Changed from a hardcoded allowlist to a blocklist approach: _INTERNAL_KEYS lists the orchestrator-only
keys that should not be forwarded (model, stage_configs_path, batch_timeout, etc.). Everything else is passed through,
so any engine-registered CLI arg works out of the box. The old RUNTIME_PARAMS set is kept as documentation but no
longer gates forwarding
| # TODO: hack, convert dtype to string to avoid non-premitive omegaconf create error. | ||
| if "dtype" in kwargs: | ||
| kwargs["dtype"] = str(kwargs["dtype"]) | ||
| def _create_default_diffusion_stage_cfg(self, kwargs: dict[str, Any]) -> list[dict[str, Any]]: |
There was a problem hiding this comment.
I'm thinking if we can define the stage config in python dataclass. Because dict is not easy to read when reading the code, and checks cannot be applied to dict too.
There was a problem hiding this comment.
Agreed. create_default_diffusion() now builds a typed StageConfig dataclass first (with validation), then converts to
the legacy OmegaConf dict at the boundary for backward compatibility with OmniStage. This gives us type safety at
creation time while keeping the existing downstream contract intact. Full migration of OmniStage to consume
StageConfig directly is planned for a later phase.
71008fc to
4c39c90
Compare
|
can we make to in v0.16.0 stable release? this is going to be essential for improving user experience |
|
@tzhouam @Gaohan123 @fake0fan please review this PR in detail |
c615016 to
35f8a59
Compare
|
Yes, targeting v0.16.0. Main dependency is #939 — once that merges I will rebase to hook into load_and_resolve_stage_configs() and remove the [WIP] tag. |
76a7185 to
1aa4640
Compare
|
This PR is ready for review. |
vllm_omni/config/stage_config.py
Outdated
|
|
||
| # Mapping of model types to architecture classes | ||
| ARCH_MAPPING: dict[str, str] = { | ||
| "qwen3_omni_moe": "Qwen3OmniMoeForConditionalGeneration", |
There was a problem hiding this comment.
Please consider whether it's necessary to create the model_type mapping here. From the current Qwen3-TTS implementation, we need to add model_arch: Qwen3TTSCode2Wav in stage 1. Not sure whether it's a bug of Qwen3-TTS. Or should we directly reuse vLLM's parameter model_arch and make the selection of each stage less restrictive.
There was a problem hiding this comment.
Agreed — a model-level arch mapping can't handle per-stage differences (e.g., Qwen3-TTS code2wav needs Qwen3TTSCode2Wav, distinct from stage 0). Removed both ARCH_MAPPING and _auto_detect_model_arch since they're dead code in [1/N] anyway. When this gets wired up in [2/N], it should be per-stage model_arch in the topology YAML.
There was a problem hiding this comment.
The model_type mapping (now PIPELINE_MODELS) is how we map HF config model_type → pipeline YAML directory. For model_arch in code2wav — that should go in default_args.yaml as a per-stage engine arg, not in the pipeline topology. Will check if the Qwen3-TTS impl needs a fix for this.
fake0fan
left a comment
There was a problem hiding this comment.
left some comments. TBH I'm still considering whether to include Independent Service Launch in this PR.
tests/test_config_factory.py
Outdated
| ] | ||
| topology = StageTopology(model_type="test", stages=stages) | ||
| errors = topology.validate_topology() | ||
| assert any("entry point" in e.lower() for e in errors) |
There was a problem hiding this comment.
test_mutual_dependency_detected_as_missing_entry and test_missing_entry_point are duplicate.
There was a problem hiding this comment.
Yep, same setup and assertion — removed it.
There was a problem hiding this comment.
Good catch, removed the duplicate test.
| parser.add_argument( | ||
| "--gpu-memory-utilization", | ||
| type=float, | ||
| default=None, | ||
| help="GPU memory utilization for all stages (Tier-2 override). Example: 0.9", | ||
| ) | ||
| parser.add_argument( | ||
| "--tensor-parallel-size", | ||
| type=int, | ||
| default=None, | ||
| help="Tensor parallel size for all stages (Tier-2 override). Example: 2", | ||
| ) |
There was a problem hiding this comment.
So if these parameters are set individually now, it will apply to all stages?
There was a problem hiding this comment.
Right — global CLI params like --gpu-memory-utilization apply to every stage by default. Per-stage overrides (--stage-N-*) take precedence when specified. Added a comment at the kwargs block to clarify this.
There was a problem hiding this comment.
Yes, global args like --gpu-memory-utilization apply to all stages. Per-stage overrides (--stage-N-*) take precedence when specified — this will be wired up in [2/N].
| def _merge_cli_overrides( | ||
| cls, | ||
| stage: StageConfig, | ||
| cli_overrides: dict[str, Any], | ||
| ) -> dict[str, Any]: | ||
| """Merge CLI overrides into stage runtime config. | ||
|
|
||
| All CLI arguments registered by engine config classes (e.g. | ||
| EngineArgs / OmniDiffusionConfig) are accepted as overrides, | ||
| not just the well-known ``RUNTIME_PARAMS`` set. | ||
|
|
||
| Handles: | ||
| - Global overrides (apply to all stages) | ||
| - Per-stage overrides (--stage-N-* format, take precedence) | ||
|
|
||
| Args: | ||
| stage: The stage to merge overrides into. | ||
| cli_overrides: CLI arguments from VllmConfig/OmniDiffusionConfig. | ||
|
|
||
| Returns: | ||
| Dict of runtime overrides for this stage. | ||
| """ | ||
| result: dict[str, Any] = {} | ||
|
|
||
| # Apply global overrides – any key not in the internal blocklist | ||
| # is forwarded so that engine-registered params work out of the box. | ||
| for key, value in cli_overrides.items(): | ||
| if key in cls._INTERNAL_KEYS: | ||
| continue | ||
| if key.startswith("stage_"): | ||
| # Per-stage keys handled below | ||
| continue | ||
| if value is not None: | ||
| result[key] = value | ||
|
|
||
| # Apply per-stage overrides (--stage-N-* format, take precedence) | ||
| stage_prefix = f"stage_{stage.stage_id}_" | ||
| for key, value in cli_overrides.items(): | ||
| if key.startswith(stage_prefix) and value is not None: | ||
| param_name = key[len(stage_prefix) :] | ||
| result[param_name] = value | ||
|
|
||
| return result |
There was a problem hiding this comment.
RUNTIME_PARAMS is not used here.
There was a problem hiding this comment.
Good catch, removed. Only _INTERNAL_KEYS is actually used by _merge_cli_overrides.
There was a problem hiding this comment.
Removed in latest push. The blocklist approach in _INTERNAL_KEYS replaced it.
tests/test_config_factory.py
Outdated
| def test_extract_basic_params(self): | ||
| """Test extracting basic runtime parameters.""" | ||
| from vllm_omni.entrypoints.utils import extract_runtime_overrides | ||
|
|
||
| kwargs = { | ||
| "gpu_memory_utilization": 0.9, | ||
| "tensor_parallel_size": 2, | ||
| "devices": "0,1", | ||
| "custom_engine_param": "forwarded", | ||
| "none_param": None, # Should be excluded | ||
| "model": "some_model", # Internal key, should be excluded | ||
| } | ||
|
|
||
| result = extract_runtime_overrides(kwargs) | ||
|
|
||
| assert result["gpu_memory_utilization"] == 0.9 | ||
| assert result["tensor_parallel_size"] == 2 | ||
| assert result["devices"] == "0,1" | ||
| # Non-internal, non-None params are forwarded (extensible overrides) | ||
| assert result["custom_engine_param"] == "forwarded" | ||
| assert "none_param" not in result | ||
| # Internal keys are excluded | ||
| assert "model" not in result |
There was a problem hiding this comment.
Since the detection below does not use RUNTIME_PARAMS, I think this test will fail.
There was a problem hiding this comment.
The test itself passes (it's testing the blocklist behavior which works), but the comment was misleading. Fixed it to reference \ instead.
| # TODO: hack, calculate devices based on parallel config. | ||
| devices = "0" | ||
| if "parallel_config" in kwargs: | ||
| num_devices = kwargs["parallel_config"].world_size | ||
| for i in range(1, num_devices): | ||
| devices += f",{i}" | ||
| default_stage_cfg = [ | ||
| { | ||
| "stage_id": 0, | ||
| "stage_type": "diffusion", | ||
| "runtime": { |
There was a problem hiding this comment.
Can we also remove the _create_default_diffusion_stage_cfg function from async omni?
There was a problem hiding this comment.
Looked into it — async_omni's version has extra diffusion-parallel logic (ulysses_degree, ring_degree, DiffusionParallelConfig) that isn't in the base version. Not a straightforward dedup. Deferring to [2/N] where we can unify both properly.
|
Good call on the independent service launch — I agree it doesn't belong here. #939 already adds Also addressed the inline comments — thanks for the careful review. |
7176385 to
2836f93
Compare
2bc21e0 to
fd9369a
Compare
princepride
left a comment
There was a problem hiding this comment.
I have a question:
Some fields in the old files have been discarded in the new topology. However some default_sampling_params includes temperature, top_k, stop_token_ids, etc. These appear to be neither pure topology nor pure runtime parameters, but rather default values bound to the model. async_chunk and engine_output_type are also inherent model attributes.
How are these fields planned to be handled during [2/N] integration? Will they be added back to the Tier-1 topology YAML, or are there other solutions?
tests/test_config_factory.py
Outdated
| ] | ||
| topology = StageTopology(model_type="test", stages=stages) | ||
|
|
||
| stage = topology.get_stage(1) |
There was a problem hiding this comment.
Seems topology don't have method get_stage
There was a problem hiding this comment.
Good catch, the method was missing. Added it.
tests/test_config_factory.py
Outdated
| assert stage is not None | ||
| assert stage.model_stage == "talker" | ||
|
|
||
| missing = topology.get_stage(99) |
There was a problem hiding this comment.
Fixed, same commit.
| for key, value in cli_overrides.items(): | ||
| if key in cls._INTERNAL_KEYS: | ||
| continue | ||
| if key.startswith("stage_"): | ||
| # Per-stage keys handled below | ||
| continue | ||
| if value is not None: | ||
| result[key] = value | ||
|
|
||
| # Apply per-stage overrides (--stage-N-* format, take precedence) | ||
| stage_prefix = f"stage_{stage.stage_id}_" | ||
| for key, value in cli_overrides.items(): | ||
| if key.startswith(stage_prefix) and value is not None: | ||
| param_name = key[len(stage_prefix) :] | ||
| result[param_name] = value |
There was a problem hiding this comment.
Seems the filter logics are not same. Why the second part don't skip cls._INTERNAL_KEYS
There was a problem hiding this comment.
You're right, that's inconsistent — a per-stage key like stage_0_model would bypass the blocklist. Fixed, added a test for it too.
Review of [Config Refactor][1/N] Two-Tier Stage Configuration SystemOverviewFoundation PR (1/N) that introduces a two-tier config system separating pipeline topology (Tier-1, developer-owned YAML) from runtime knobs (Tier-2, user CLI args). Adds the data model, topology files, OmegaConf isolation, and tests — but does not wire the new factory into the engine startup path yet. +1307 / -52 across 14 files. Well-structured PR description with clear motivation. Issues1. Stale/misleading comment in
|
Good question. Fields like |
|
LGTM, Looking for your following PR. |
|
@hsliuustc0106 @wuhang2014 Unit tests: All bundled topologies:
New CLI override tests (Qwen3-Omni):
No regressions across any model type. |
|
resolve conflicts |
| type=str, | ||
| default=None, | ||
| help="Path to a stage configs file.", | ||
| help="Path to a stage configs file. If not specified, uses auto-detected Tier-1 topology.", |
There was a problem hiding this comment.
do we need to tell the users to understand the definition of Tier-1/2?
vllm_omni/config/yaml_util.py
Outdated
| from omegaconf import DictConfig, OmegaConf | ||
|
|
||
|
|
||
| def load_yaml_raw(path: str | Any) -> DictConfig: |
There was a problem hiding this comment.
def load_yaml_raw(path: str) -> DictConfig:
cfg = OmegaConf.load(path)
if not isinstance(cfg, DictConfig):
raise TypeError(f"Expected a DictConfig from {path}, but got {type(cfg)}")
return cfg
load_yaml_raw: the function name is very strange to users, maybe change to load_yaml_to_config?
| @@ -0,0 +1,44 @@ | |||
| # Tier-1 Stage Topology for Qwen3-Omni-MoE | |||
There was a problem hiding this comment.
stage_topo change back to stage_config
| final_output: true | ||
| final_output_type: image | ||
|
|
||
| connectors: |
There was a problem hiding this comment.
connector should be added in cli or yaml?
| shared_memory_connector: | ||
| name: SharedMemoryConnector | ||
|
|
||
| edges: |
There was a problem hiding this comment.
do we still need this definition of "edge"
| stages: | ||
| - stage_id: 0 | ||
| model_stage: thinker | ||
| stage_type: llm |
There was a problem hiding this comment.
what's the differenceof stage_type and worker_type? are they redundant?
| stage_type: llm | ||
| input_sources: [] # Entry point - no upstream stages | ||
| worker_type: ar | ||
| scheduler_cls: vllm_omni.core.sched.omni_ar_scheduler.OmniARScheduler |
There was a problem hiding this comment.
is the scheduler_cls still necessary if we have worker_type or stage_type?
| scheduler_cls: vllm_omni.core.sched.omni_ar_scheduler.OmniARScheduler | ||
| final_output: true | ||
| final_output_type: text | ||
| is_comprehension: true |
wuhang2014
left a comment
There was a problem hiding this comment.
I think we should try to introduce as little number of concepts as we can.
There was a problem hiding this comment.
Here is my proposal for concepts we introduce here.
Stage Topololy -> Model Pipeline: The concept of model pipeline is used to define a computation pipeline of a specific model which is composed of several stages.
Tier-1 Config -> Model Pipeline Config: We should not introduce Tier to users or developers which make confusion for different concepts that not orthogonal. Currently we see Model Pipeline Config as not configurable by user.
Tier-2 Config -> vLLM-Omni Config: We should make all configs in scope of vLLM-Omni Config. vLLM-Omni Config includes configs inherent from vLLM, e.g. Parallel Config for parallelism, and also includes configs new based on vLLM-Omni specific scenarios, e.g. Diffusion Config for diffusion models.
There was a problem hiding this comment.
Done. Renamed StageTopology -> ModelPipeline, validate_topology() -> validate_pipeline(), dropped all Tier-1/Tier-2 terminology throughout. Latest commit: 15ee3d3.
There was a problem hiding this comment.
Here is my proposal for coorperation of configs with start-up CLI to maintain compatibility with both single stage CLI and all stage in one CLI:
-
There is a
Pathfor placing config files after refactoring, it may have one value from the following two:Path=vllm_omni/model_executor/model_pipelinesPath=vllm_omni/model_pipelinesI prefer this one because it is shorter, and configs in it have no strong corelation with model_executor
-
There is a file for Model Pipeline Config in each dir named by model:
{Path}/<model_name>/pipeline.yaml
-
For compatibility of All stage in one CLI, we introduce a new file under each dir to store default startup args of CLI and device info:
{Path}/<model_name>/default_args.yaml
There was a problem hiding this comment.
An example of bagel:
# In {Path}/bagel/pipeline.yaml
model_type: bagel
stages:
- stage_id: 0
model_stage: thinker
stage_type: llm
input_sources: [] # Entry point - no upstream stages
worker_type: ar
scheduler_cls: vllm_omni.core.sched.omni_ar_scheduler.OmniARScheduler
final_output: true
final_output_type: text
is_comprehension: true
- stage_id: 1
model_stage: dit
stage_type: diffusion
input_sources: [0] # Receives from thinker
final_output: true
final_output_type: image
# In {Path}/bagel/default_args.yaml
stage_args:
- stage_id: 0
runtime:
devices: "0"
max_batch_size: 1
engine_args:
model_stage: thinker
model_arch: BagelForConditionalGeneration
worker_type: ar
scheduler_cls: vllm_omni.core.sched.omni_ar_scheduler.OmniARScheduler
gpu_memory_utilization: 0.35
enforce_eager: true
trust_remote_code: true
engine_output_type: text
distributed_executor_backend: "mp"
enable_prefix_caching: false
max_num_batched_tokens: 32768
tensor_parallel_size: 1
omni_kv_config:
need_send_cache: true
kv_transfer_criteria:
type: prefill_finished #or special token generated
default_sampling_params:
temperature: 0.4
top_p: 0.9
top_k: 1
max_tokens: 2048
seed: 52
detokenize: True
repetition_penalty: 1.05
- stage_id: 1
runtime:
devices: "0"
max_batch_size: 1
engine_args:
model_stage: dit
gpu_memory_utilization: 0.55
enforce_eager: true
trust_remote_code: true
engine_output_type: image
distributed_executor_backend: "mp"
enable_prefix_caching: false
max_num_batched_tokens: 32768
tensor_parallel_size: 1
omni_kv_config:
need_recv_cache: true
default_sampling_params:
seed: 52
# Runtime edges
runtime:
# Distributed connectors configuration (optional)
# More connectors will be supported in the future.
connectors:
shared_memory_connector:
name: SharedMemoryConnector
extra:
shm_threshold_bytes: 65536 # 64KB threshold
There was a problem hiding this comment.
Adopted your proposal exactly. Went with \ (the shorter path). Each model now has \ + \ co-located in its own directory. The old \ directory is deleted; \ is kept as a legacy fallback for platform-specific configs.
There was a problem hiding this comment.
Implemented exactly this layout for all 4 models. Thanks for the detailed example -- it made the migration straightforward.
There was a problem hiding this comment.
do we use stages/stage_args for different yaml?
There was a problem hiding this comment.
do we use stages/stage_args for different yaml?
I am not quite sure of your questions. Can you elaborate on that? An example would make me understand better.
| # Default runtime/engine args for Bagel | ||
| # Users can override these via CLI flags or a custom stage_configs file. | ||
|
|
||
| stage_args: |
There was a problem hiding this comment.
in my opinion, the arg names of pipeline.yaml should be a subset of default_args, please be careful with the arg names
There was a problem hiding this comment.
I think that is the arrangement for the current status. What is your concern now?
Signed-off-by: lishunyang <lishunyang12@163.com> Signed-off-by: Jensen <czjourney@163.com> Signed-off-by: Jiangyun Zhu <zhu.jiangyun@foxmail.com> Signed-off-by: Canlin Guo <canlinguosdu@gmail.com> Signed-off-by: wuhang <wuhang6@huawei.com> Signed-off-by: Hongsheng Liu <liuhongsheng4@huawei.com> Signed-off-by: Chenguang Zheng <fake0fan@users.noreply.github.com> Signed-off-by: lishunyang <lishunyang12@163.com>
Signed-off-by: lishunyang <lishunyang12@163.com>
15ee3d3 to
fe1ecfe
Compare
- Rename StageTopology → ModelPipeline
- Move stage_topologies/ → model_pipelines/<model>/{pipeline,default_args}.yaml
- Rename load_yaml_raw → load_yaml_config
- Drop Tier-1/Tier-2 terminology from code and docs
- Fix bare OmegaConf.create() → create_config()
Signed-off-by: lishunyang12 <lishunyang12@gmail.com>
Signed-off-by: lishunyang <lishunyang12@163.com>
fe1ecfe to
3b3d610
Compare
Signed-off-by: lishunyang12 <lishunyang12@gmail.com> Signed-off-by: lishunyang <lishunyang12@163.com>
…te_code
- Use re.match(r"stage_\d+_") instead of startswith("stage_") to avoid
silently dropping future CLI args like stage_init_timeout
- Return None instead of [] from create_from_model when no pipeline found
- Thread trust_remote_code through _auto_detect_model_type instead of
hardcoding True
Signed-off-by: lishunyang12 <lishunyang12@gmail.com>
Signed-off-by: lishunyang <lishunyang12@163.com>
|
@hsliuustc0106 Thanks for the thorough review. Addressed 1-6, pushed in
|
Signed-off-by: lishunyang <lishunyang12@163.com>
Summary
Multi-stage omni models (Qwen3-Omni, Qwen2.5-Omni, Bagel, Qwen3-TTS) currently define their pipeline structure and runtime knobs in a single flat YAML. This makes it unclear which params are structural (stage ordering, worker types, input edges) vs. user-tunable (gpu_memory_utilization, tp_size, devices), and forces users to understand internal details just to change a runtime knob.
This PR introduces a model pipeline configuration system that cleanly separates the two concerns:
model_pipelines/<model>/pipeline.yaml(bundled)This is the foundation PR ([1/N]) -- it adds the data model, pipeline files, OmegaConf isolation, and unit tests. The factory (
StageConfigFactory.create_from_model()) is defined but not yet wired into the engine startup path; that wiring is [2/N].Directory structure
Each model type gets its own directory under
vllm_omni/model_pipelines/:How different model types are configured
create_default_diffusion()fallbackFor single-stage diffusion models, when no pipeline YAML is found, the orchestrator automatically falls back to
StageConfigFactory.create_default_diffusion()which programmatically creates a minimal config (stage_type=diffusion, final_output=image). No YAML authoring needed.Key classes
StageTypeconfig/stage_config.pyLLMorDIFFUSIONStageConfigconfig/stage_config.pyModelPipelineconfig/stage_config.pyvalidate_pipeline()for integration-time validationStageConfigFactoryconfig/stage_config.pyyaml_utilwrappersconfig/yaml_util.pycreate_config,load_yaml_raw,merge_configs,to_dict-- isolates all OmegaConf usage to one fileModels with bundled pipelines
qwen3_omni_moe/qwen2_5_omni/bagel/qwen3_tts/Benefits
For users: no more YAML editing
Before -- to change
gpu_memory_utilization, users had to find and edit the legacy YAML, which mixes topology with runtime params in 100+ lines.After -- users pass CLI args. Pipeline is auto-detected and auto-loaded:
For model developers: adding a new model is minimal
model_pipelines/new_model/pipeline.yamlmodel_pipelines/new_model/default_args.yamlStageConfigFactory.PIPELINE_DIRS_merge_cli_overrides()For maintainers: OmegaConf is now isolated
OmegaConf.load()scattered acrossomni.py,utils.py,stage_utils.pyyaml_util.pyArchitecture
Pipeline structures supported
Override precedence
Migration path
Both paths coexist during the migration. Once [2/N] wires
StageConfigFactoryinto engine startup, the legacy path can be removed.Testing
Environment: 2x NVIDIA A100-SXM4-80GB, PyTorch 2.10.0+cu128, CUDA 12.8, Python 3.12.3, vLLM 0.16.0
Unit tests
Covers:
StageType,StageConfig,ModelPipeline,StageConfigFactory, CLI override merging, YAML pipeline parsing, validation, and edge cases.End-to-end inference (all bundled pipelines)
end2end.py --query-type CustomVoiceend2end.py --query-type VoiceDesignend2end.py --modality text2imgend2end.pyend2end.py(default)CLI override tests (Qwen3-Omni)
--gpu-memory-utilization 0.85 --enforce-eager--output-dir /tmp/pr1115_test_output