Skip to content

initialization for core APIs#2

Merged
hsliuustc0106 merged 5 commits intomainfrom
hsliu-dev-C
Sep 25, 2025
Merged

initialization for core APIs#2
hsliuustc0106 merged 5 commits intomainfrom
hsliu-dev-C

Conversation

@hsliuustc0106
Copy link
Collaborator

@hsliuustc0106 hsliuustc0106 commented Sep 16, 2025

  1. initialize the design document
  2. provide omnirequest class init for omni modality models
  3. provide scheduler class init for omni request sheduler

API Design Template: Introduced a comprehensive template for designing APIs across vLLM-omni modules, ensuring consistency and completeness in future module development.

OmniRequest Class: Developed an extended request class (OmniRequest) to support multimodal inputs, non-autoregressive processing (like diffusion), and multi-stage processing pipelines within the vLLM-omni framework.

OmniScheduler Class: Implemented an initial OmniScheduler that extends vLLM's base scheduler to handle OmniRequest objects, laying the groundwork for multimodal and diverse request scheduling.

Core Module API Documentation: Provided detailed API design documentation for the Core Module, illustrating the application of the new API design template with specific examples for scheduler, cache, and resource management components, complete with configurations, error handling, and usage examples.

Copy link

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Summary of Changes

Hello @hsliuustc0106, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request lays the foundational groundwork for vLLM-omni by establishing a standardized API design methodology and introducing core components for handling multimodal and non-autoregressive requests. It includes a comprehensive API design template, detailed documentation for the Core Module, and initial implementations of an extended request class (OmniRequest) and a specialized scheduler (OmniScheduler). These changes aim to enable flexible and robust processing of diverse AI tasks within the vLLM ecosystem.

Highlights

  • API Design Template: Introduced a comprehensive template for designing APIs across vLLM-omni modules, ensuring consistency and completeness in future module development.
  • OmniRequest Class: Developed an extended request class (OmniRequest) to support multimodal inputs, non-autoregressive processing (like diffusion), and multi-stage processing pipelines within the vLLM-omni framework.
  • OmniScheduler Class: Implemented an initial OmniScheduler that extends vLLM's base scheduler to handle OmniRequest objects, laying the groundwork for multimodal and diverse request scheduling.
  • Core Module API Documentation: Provided detailed API design documentation for the Core Module, illustrating the application of the new API design template with specific examples for scheduler, cache, and resource management components, complete with configurations, error handling, and usage examples.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces the initial API design documentation and foundational classes for vllm-omni. The design documents in docs/api/ are well-structured and provide a solid starting point for the different modules. The new OmniRequest class is comprehensive, and the OmniScheduler class is a good initial implementation.

My review includes suggestions to improve the consistency of the API documentation examples and to enhance the robustness and clarity of the OmniRequest class implementation.

A couple of minor typos were noticed in the pull request title ('initlization' should be 'initialization') and description ('sheduler' should be 'scheduler').

Comment on lines +156 to +580
class OmniScheduler(SchedulerInterface):
"""Main scheduler for vLLM-omni core module."""

def __init__(self, config: SchedulerConfig, cache_config: CacheConfig):
"""
Initialize the core scheduler.

Args:
config: Scheduler configuration
cache_config: Cache configuration
"""
self.config = config
self.cache_manager = DiTCacheManager(cache_config)
self.resource_manager = ResourceManager({})
self.scheduler = self._create_scheduler()
self._running = False
self._stats = {
"total_requests": 0,
"processed_requests": 0,
"failed_requests": 0,
"average_processing_time": 0.0
}

def _create_scheduler(self) -> BaseScheduler:
"""Factory method to create appropriate scheduler."""
if self.config.scheduler_type == SchedulerType.FIFO:
return FIFOScheduler(self.config)
elif self.config.scheduler_type == SchedulerType.PRIORITY:
return PriorityScheduler(self.config)
elif self.config.scheduler_type == SchedulerType.MULTIMODAL:
return MultimodalScheduler(self.config)
elif self.config.scheduler_type == SchedulerType.DIFFUSION:
return DiffusionScheduler(self.config)
elif self.config.scheduler_type == SchedulerType.HYBRID:
return HybridScheduler(self.config)
else:
raise ValueError(f"Unknown scheduler type: {self.config.scheduler_type}")
```

#### Core Operations

```python
async def schedule_request(self, request: OmniRequest) -> bool:
"""
Schedule a request for processing.

Args:
request: The request to schedule

Returns:
bool: True if successfully scheduled, False otherwise

Raises:
SchedulerError: If scheduling fails
QueueFullError: If queue is at capacity
ResourceError: If insufficient resources
"""
try:
# Validate request
if not self._validate_request(request):
return False

# Check resource availability
if not await self.resource_manager.can_allocate(request):
raise ResourceError("Insufficient resources")

# Schedule the request
success = await self.scheduler.schedule_request(request)
if success:
self._stats["total_requests"] += 1
await self.resource_manager.allocate_resources(request)

return success

except Exception as e:
self._stats["failed_requests"] += 1
raise SchedulerError(f"Failed to schedule request: {e}")

async def get_next_request(self) -> Optional[OmniRequest]:
"""
Get the next request to process.

Returns:
OmniRequest or None: Next request to process

Raises:
SchedulerError: If retrieval fails
"""
try:
request = await self.scheduler.get_next_request()
if request:
request.update_processing_stage(ProcessingStage.PREPROCESSING)
return request
except Exception as e:
raise SchedulerError(f"Failed to get next request: {e}")

async def process_request(self, request: OmniRequest) -> Any:
"""
Process a request through the appropriate pipeline.

Args:
request: The request to process

Returns:
Any: Processing result

Raises:
ProcessingError: If processing fails
"""
start_time = time.time()
try:
# Update processing stage
request.update_processing_stage(ProcessingStage.AR_GENERATION)

# Check cache first
cache_key = request.generate_cache_key()
cached_result = await self.cache_manager.get_cache(cache_key)
if cached_result:
return cached_result

# Process based on request type
if request.request_type == RequestType.TEXT:
result = await self._process_text_request(request)
elif request.request_type == RequestType.IMAGE:
result = await self._process_image_request(request)
elif request.request_type == RequestType.MULTIMODAL:
result = await self._process_multimodal_request(request)
elif request.request_type == RequestType.DIFFUSION:
result = await self._process_diffusion_request(request)
else:
raise ProcessingError(f"Unknown request type: {request.request_type}")

# Cache the result
await self.cache_manager.set_cache(cache_key, result)

# Update statistics
processing_time = time.time() - start_time
self._update_stats(processing_time)

request.update_processing_stage(ProcessingStage.COMPLETED)
return result

except Exception as e:
request.add_error(str(e))
self._stats["failed_requests"] += 1
raise ProcessingError(f"Failed to process request: {e}")
finally:
# Cleanup resources
await self.resource_manager.deallocate_resources(request.request_id)
```

#### Configuration Methods

```python
def update_config(self, new_config: SchedulerConfig) -> None:
"""
Update scheduler configuration.

Args:
new_config: New configuration to apply

Raises:
ConfigError: If configuration is invalid
"""
try:
new_config.validate()
self.config = new_config
# Recreate scheduler with new config
self.scheduler = self._create_scheduler()
except Exception as e:
raise ConfigError(f"Failed to update config: {e}")

def get_config(self) -> SchedulerConfig:
"""Get current configuration."""
return self.config

def update_cache_config(self, new_config: CacheConfig) -> None:
"""Update cache configuration."""
self.cache_manager.config = new_config
```

#### Lifecycle Management

```python
async def start(self) -> None:
"""Start the scheduler."""
if self._running:
raise SchedulerError("Scheduler is already running")

self._running = True
# Start background tasks
asyncio.create_task(self._background_cleanup())
asyncio.create_task(self._monitor_resources())

async def stop(self) -> None:
"""Stop the scheduler gracefully."""
if not self._running:
return

self._running = False
# Wait for current requests to complete
await self._wait_for_completion()
# Cleanup resources
await self._cleanup_resources()

async def shutdown(self) -> None:
"""Force shutdown the scheduler."""
self._running = False
# Immediate cleanup
await self._force_cleanup()
```

#### Monitoring Methods

```python
def get_status(self) -> Dict[str, Any]:
"""Get current scheduler status."""
return {
"running": self._running,
"queue_size": self.scheduler.get_queue_size(),
"processed_requests": self.scheduler.get_processed_count(),
"cache_hit_rate": self.cache_manager.get_hit_rate(),
"memory_usage": self.cache_manager.get_memory_usage(),
"resource_status": self.resource_manager.get_resource_status(),
"stats": self._stats.copy()
}

def get_metrics(self) -> Dict[str, float]:
"""Get performance metrics."""
return {
"throughput": self._calculate_throughput(),
"average_latency": self._stats["average_processing_time"],
"cache_hit_rate": self.cache_manager.get_hit_rate(),
"memory_utilization": self.cache_manager.get_memory_usage() / self.cache_manager.config.max_memory_gb,
"queue_utilization": self.scheduler.get_queue_size() / self.config.max_queue_size
}

def get_health_status(self) -> Dict[str, Any]:
"""Get health status of the scheduler."""
return {
"healthy": self._is_healthy(),
"issues": self._get_health_issues(),
"recommendations": self._get_recommendations()
}
```

## 4. Configuration

```python
@dataclass
class CoreModuleConfig:
"""Complete configuration for the core module."""

# Scheduler configuration
scheduler: SchedulerConfig = field(default_factory=SchedulerConfig)

# Cache configuration
cache: CacheConfig = field(default_factory=CacheConfig)

# Resource limits
max_memory_gb: float = 16.0
max_gpu_utilization: float = 0.8
max_cpu_utilization: float = 0.9

# Timeouts
request_timeout: int = 300
worker_timeout: int = 60
cleanup_interval: int = 30

# Monitoring
enable_metrics: bool = True
metrics_interval: int = 10
enable_health_checks: bool = True

def validate(self) -> None:
"""Validate configuration parameters."""
if self.max_memory_gb <= 0:
raise ValueError("max_memory_gb must be positive")
if not 0 < self.max_gpu_utilization <= 1:
raise ValueError("max_gpu_utilization must be between 0 and 1")
if not 0 < self.max_cpu_utilization <= 1:
raise ValueError("max_cpu_utilization must be between 0 and 1")
if self.request_timeout <= 0:
raise ValueError("request_timeout must be positive")
```

## 5. Error Handling

```python
class CoreModuleError(Exception):
"""Base exception for core module errors."""
pass

class SchedulerError(CoreModuleError):
"""Scheduler-related errors."""
pass

class QueueFullError(SchedulerError):
"""Queue is at capacity."""
pass

class CacheError(CoreModuleError):
"""Cache-related errors."""
pass

class ResourceError(CoreModuleError):
"""Resource allocation errors."""
pass

class ProcessingError(CoreModuleError):
"""Request processing errors."""
pass

class ConfigError(CoreModuleError):
"""Configuration errors."""
pass
```

## 6. Examples

### Basic Usage

```python
from vllm_omni.core import CoreScheduler, CoreModuleConfig
from vllm_omni.request import create_text_request

# Create configuration
config = CoreModuleConfig(
scheduler=SchedulerConfig(scheduler_type=SchedulerType.FIFO),
cache=CacheConfig(max_memory_gb=8.0)
)

# Initialize scheduler
scheduler = CoreScheduler(config.scheduler, config.cache)
await scheduler.start()

# Create and schedule a request
request = create_text_request(
request_id="req_001",
prompt="Hello, world!",
sampling_params=sampling_params
)

success = await scheduler.schedule_request(request)
if success:
result = await scheduler.process_request(request)
print(f"Result: {result}")

# Get status
status = scheduler.get_status()
print(f"Queue size: {status['queue_size']}")
print(f"Cache hit rate: {status['cache_hit_rate']:.2%}")

await scheduler.stop()
```

### Advanced Usage

```python
# Custom scheduler with priority handling and monitoring
config = CoreModuleConfig(
scheduler=SchedulerConfig(
scheduler_type=SchedulerType.PRIORITY,
priority_weights={"high": 3.0, "normal": 1.0, "low": 0.3},
enable_preemption=True
),
cache=CacheConfig(
strategy=CacheStrategy.ADAPTIVE,
max_memory_gb=16.0,
enable_compression=True
),
enable_metrics=True,
enable_health_checks=True
)

scheduler = CoreScheduler(config.scheduler, config.cache)
await scheduler.start()

# Monitor scheduler
async def monitor_scheduler():
while True:
status = scheduler.get_status()
metrics = scheduler.get_metrics()
health = scheduler.get_health_status()

print(f"Status: {status}")
print(f"Metrics: {metrics}")
print(f"Health: {health}")

await asyncio.sleep(10)

# Start monitoring
asyncio.create_task(monitor_scheduler())
```

### Integration Example

```python
# Integration with other modules
from vllm_omni.engine import EngineManager
from vllm_omni.executor import ExecutorManager

class OmniSystem:
def __init__(self, config: CoreModuleConfig):
self.scheduler = CoreScheduler(config.scheduler, config.cache)
self.engine_manager = EngineManager(config.engine)
self.executor_manager = ExecutorManager(config.executor)

async def start(self):
await self.scheduler.start()
await self.engine_manager.start()
await self.executor_manager.start()

async def process_request(self, request: OmniRequest):
# Schedule request
await self.scheduler.schedule_request(request)

# Get next request
next_request = await self.scheduler.get_next_request()
if next_request:
# Route to appropriate executor
executor = self.executor_manager.get_executor(next_request)
result = await executor.execute(next_request)
return result
```

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The example code snippets in this design document have a few inconsistencies that could be confusing for developers:

  • Class Name Mismatch: The class is defined as OmniScheduler (line 156) but instantiated as CoreScheduler in the usage examples (e.g., lines 489 and 531).
  • Incomplete Code: The process_request example (lines 252-305) uses the time module without importing it, and calls several helper methods (e.g., _process_text_request) that are not defined in the class.
  • Configuration Errors: The "Integration Example" (lines 558-580) references config.engine and config.executor, but these fields are not defined in CoreModuleConfig.

Making these examples more consistent and complete would improve the clarity and usefulness of this API design document.

# Error handling
self.errors = []
self.retry_count = 0
self.max_retries = 3

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The max_retries is hardcoded to 3. It would be more flexible to make this configurable, for instance by passing it in the __init__ method or making it a class-level constant. This would allow different retry strategies without changing the code.

self.request_id,
str(self.request_type.value),
str(self.prompt_token_ids) if self.prompt_token_ids else str(self.prompt),
str(self.diffusion_params.num_inference_steps) if self.diffusion_params else "0",

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

In the __init__ method (line 104), self.diffusion_params is initialized with diffusion_params or DiffusionParams(), which ensures it's never None. Therefore, the if self.diffusion_params check here is redundant and can be removed to simplify the code.

Suggested change
str(self.diffusion_params.num_inference_steps) if self.diffusion_params else "0",
str(self.diffusion_params.num_inference_steps),

cursor[bot]

This comment was marked as outdated.

@hsliuustc0106 hsliuustc0106 changed the title initlization for apis initialization for apis Sep 16, 2025
@hsliuustc0106 hsliuustc0106 changed the title initialization for apis initialization for core APIs Sep 16, 2025
cursor[bot]

This comment was marked as outdated.

- Add comprehensive PRD, architecture design, and test design documents
- Implement core modules: OmniLLM, AsyncOmniLLM, stage configurations
- Add DiT scheduler and cache manager for diffusion models
- Implement CLI integration with --omni flag support
- Add API server and plugin system for vLLM integration
- Create comprehensive test suite with fixtures
- Update dependencies to vLLM 0.10.2 and PyTorch 2.8.0
- Add conda environment setup and package installation
- Implement stage-based processing architecture
- Add multimodal output processing capabilities

This commit establishes the foundation for multi-modality models
inference and serving with non-autoregressive structures.
@hsliuustc0106 hsliuustc0106 merged commit 8dbcdd5 into main Sep 25, 2025
1 check passed
blocks_to_copy={},
ignored_seq_groups=[],
num_ignored_seq_groups=0,
)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Duplicate Argument Causes SchedulerOutput Error

The SchedulerOutput constructor is called with a duplicate ignored_seq_groups keyword argument in both the _create_empty_scheduler_output and _create_scheduler_output methods. This causes a TypeError when these methods are invoked.

Fix in Cursor Fix in Web

access_count=0,
size_bytes=size * 4 # Assuming float32
)

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Cache Manager Instantiation and Size Calculation Errors

The DiTCacheManager attempts to instantiate DiTCacheTensor with fields (tensor, timestamp, access_count, size_bytes) not defined in its dataclass, causing runtime errors. The size_bytes calculation also incorrectly assumes float32 for all tensors, leading to inaccurate cache size tracking.

Fix in Cursor Fix in Web

natureofnature pushed a commit to natureofnature/vllm-omni that referenced this pull request Dec 15, 2025
dongbo910220 pushed a commit to dongbo910220/vllm-omni that referenced this pull request Jan 19, 2026
Peft lora wrapper

Signed-off-by: Andy Zhou <jzhoubc@connect.ust.hk>
futurenitian added a commit to futurenitian/vllm-omni that referenced this pull request Feb 2, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant