Skip to content
Merged
Show file tree
Hide file tree
Changes from 37 commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
bbfc5f4
Basic implementation of request scheduling
FredyRivera-dev Sep 6, 2025
a308e3e
Basic editing in SD and Flux Pipelines
FredyRivera-dev Sep 7, 2025
4799b8e
Small Fix
FredyRivera-dev Sep 7, 2025
eda5847
Fix
FredyRivera-dev Sep 7, 2025
6b5e6be
Update for more pipelines
FredyRivera-dev Sep 7, 2025
df2933f
Add examples/server-async
FredyRivera-dev Sep 7, 2025
5c7c7c6
Add examples/server-async
FredyRivera-dev Sep 7, 2025
e3cd368
Merge branch 'huggingface:main' into main
FredyRivera-dev Sep 10, 2025
09bf796
Merge branch 'huggingface:main' into main
FredyRivera-dev Sep 10, 2025
bd3e48a
Updated RequestScopedPipeline to handle a single tokenizer lock to av…
FredyRivera-dev Sep 10, 2025
534710c
Fix
FredyRivera-dev Sep 10, 2025
4d7c64f
Fix _TokenizerLockWrapper
FredyRivera-dev Sep 10, 2025
18db9e6
Fix _TokenizerLockWrapper
FredyRivera-dev Sep 10, 2025
8f0efb1
Delete _TokenizerLockWrapper
FredyRivera-dev Sep 10, 2025
b479039
Fix tokenizer
FredyRivera-dev Sep 10, 2025
e676b34
Merge branch 'huggingface:main' into main
FredyRivera-dev Sep 10, 2025
0beab1c
Update examples/server-async
FredyRivera-dev Sep 11, 2025
840f0e4
Fix server-async
FredyRivera-dev Sep 11, 2025
bb41c2b
Merge branch 'huggingface:main' into main
FredyRivera-dev Sep 12, 2025
8a238c3
Merge branch 'huggingface:main' into main
FredyRivera-dev Sep 12, 2025
ed617fe
Optimizations in examples/server-async
FredyRivera-dev Sep 13, 2025
b052d27
We keep the implementation simple in examples/server-async
FredyRivera-dev Sep 14, 2025
0f63f4d
Update examples/server-async/README.md
FredyRivera-dev Sep 14, 2025
a9666b1
Update examples/server-async/README.md for changes to tokenizer locks…
FredyRivera-dev Sep 14, 2025
06bb136
The changes to the diffusers core have been undone and all logic is b…
FredyRivera-dev Sep 15, 2025
a519915
Update examples/server-async/utils/*
FredyRivera-dev Sep 15, 2025
7cfee77
Fix BaseAsyncScheduler
FredyRivera-dev Sep 15, 2025
e574f07
Rollback in the core of the diffusers
FredyRivera-dev Sep 15, 2025
05d7936
Merge branch 'huggingface:main' into main
FredyRivera-dev Sep 15, 2025
1049663
Update examples/server-async/README.md
FredyRivera-dev Sep 15, 2025
5316620
Complete rollback of diffusers core files
FredyRivera-dev Sep 15, 2025
0ecdfc3
Simple implementation of an asynchronous server compatible with SD3-3…
FredyRivera-dev Sep 17, 2025
ac5c9e6
Update examples/server-async/README.md
FredyRivera-dev Sep 17, 2025
72e0215
Fixed import errors in 'examples/server-async/serverasync.py'
FredyRivera-dev Sep 17, 2025
edd550b
Flux Pipeline Discard
FredyRivera-dev Sep 17, 2025
6b69367
Update examples/server-async/README.md
FredyRivera-dev Sep 17, 2025
5598557
Merge branch 'main' into main
sayakpaul Sep 18, 2025
7c4f883
Apply style fixes
github-actions[bot] Sep 18, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 79 additions & 0 deletions examples/server-async/Pipelines.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
from diffusers.pipelines.stable_diffusion_3.pipeline_stable_diffusion_3 import StableDiffusion3Pipeline
import torch
import os
import logging
from pydantic import BaseModel
from dataclasses import dataclass, field
from typing import List

logger = logging.getLogger(__name__)

class TextToImageInput(BaseModel):
model: str
prompt: str
size: str | None = None
n: int | None = None


@dataclass
class PresetModels:
SD3: List[str] = field(default_factory=lambda: ['stabilityai/stable-diffusion-3-medium'])
SD3_5: List[str] = field(default_factory=lambda: ['stabilityai/stable-diffusion-3.5-large', 'stabilityai/stable-diffusion-3.5-large-turbo', 'stabilityai/stable-diffusion-3.5-medium'])

class TextToImagePipelineSD3:
def __init__(self, model_path: str | None = None):
self.model_path = model_path or os.getenv("MODEL_PATH")
self.pipeline: StableDiffusion3Pipeline | None = None
self.device: str | None = None

def start(self):
if torch.cuda.is_available():
model_path = self.model_path or "stabilityai/stable-diffusion-3.5-large"
logger.info("Loading CUDA")
self.device = "cuda"
self.pipeline = StableDiffusion3Pipeline.from_pretrained(
model_path,
torch_dtype=torch.float16,
).to(device=self.device)
elif torch.backends.mps.is_available():
model_path = self.model_path or "stabilityai/stable-diffusion-3.5-medium"
logger.info("Loading MPS for Mac M Series")
self.device = "mps"
self.pipeline = StableDiffusion3Pipeline.from_pretrained(
model_path,
torch_dtype=torch.bfloat16,
).to(device=self.device)
else:
raise Exception("No CUDA or MPS device available")

class ModelPipelineInitializer:
def __init__(self, model: str = '', type_models: str = 't2im'):
self.model = model
self.type_models = type_models
self.pipeline = None
self.device = "cuda" if torch.cuda.is_available() else "mps"
self.model_type = None

def initialize_pipeline(self):
if not self.model:
raise ValueError("Model name not provided")

# Check if model exists in PresetModels
preset_models = PresetModels()

# Determine which model type we're dealing with
if self.model in preset_models.SD3:
self.model_type = "SD3"
elif self.model in preset_models.SD3_5:
self.model_type = "SD3_5"

# Create appropriate pipeline based on model type and type_models
if self.type_models == 't2im':
if self.model_type in ["SD3", "SD3_5"]:
self.pipeline = TextToImagePipelineSD3(self.model)
else:
raise ValueError(f"Model type {self.model_type} not supported for text-to-image")
elif self.type_models == 't2v':
raise ValueError(f"Unsupported type_models: {self.type_models}")

return self.pipeline
171 changes: 171 additions & 0 deletions examples/server-async/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
# Asynchronous server and parallel execution of models

> Example/demo server that keeps a single model in memory while safely running parallel inference requests by creating per-request lightweight views and cloning only small, stateful components (schedulers, RNG state, small mutable attrs). Works with StableDiffusion3 pipelines.
> We recommend running 10 to 50 inferences in parallel for optimal performance, averaging between 25 and 30 seconds to 1 minute and 1 minute and 30 seconds. (This is only recommended if you have a GPU with 35GB of VRAM or more; otherwise, keep it to one or two inferences in parallel to avoid decoding or saving errors due to memory shortages.)

## ⚠️ IMPORTANT

* The example demonstrates how to run pipelines like `StableDiffusion3-3.5` concurrently while keeping a single copy of the heavy model parameters on GPU.

## Necessary components

All the components needed to create the inference server are in the current directory:

```
server-async/
├── utils/
├─────── __init__.py
├─────── scheduler.py # BaseAsyncScheduler wrapper and async_retrieve_timesteps for secure inferences
├─────── requestscopedpipeline.py # RequestScoped Pipeline for inference with a single in-memory model
├─────── utils.py # Image/video saving utilities and service configuration
├── Pipelines.py # pipeline loader classes (SD3)
├── serverasync.py # FastAPI app with lifespan management and async inference endpoints
├── test.py # Client test script for inference requests
├── requirements.txt # Dependencies
└── README.md # This documentation
```

## What `diffusers-async` adds / Why we needed it

Core problem: a naive server that calls `pipe.__call__` concurrently can hit **race conditions** (e.g., `scheduler.set_timesteps` mutates shared state) or explode memory by deep-copying the whole pipeline per-request.

`diffusers-async` / this example addresses that by:

* **Request-scoped views**: `RequestScopedPipeline` creates a shallow copy of the pipeline per request so heavy weights (UNet, VAE, text encoder) remain shared and *are not duplicated*.
* **Per-request mutable state**: stateful small objects (scheduler, RNG state, small lists/dicts, callbacks) are cloned per request. The system uses `BaseAsyncScheduler.clone_for_request(...)` for scheduler cloning, with fallback to safe `deepcopy` or other heuristics.
* **Tokenizer concurrency safety**: `RequestScopedPipeline` now manages an internal tokenizer lock with automatic tokenizer detection and wrapping. This ensures that Rust tokenizers are safe to use under concurrency — race condition errors like `Already borrowed` no longer occur.
* **`async_retrieve_timesteps(..., return_scheduler=True)`**: fully retro-compatible helper that returns `(timesteps, num_inference_steps, scheduler)` without mutating the shared scheduler. For users not using `return_scheduler=True`, the behavior is identical to the original API.
* **Robust attribute handling**: wrapper avoids writing to read-only properties (e.g., `components`) and auto-detects small mutable attributes to clone while avoiding duplication of large tensors. Configurable tensor size threshold prevents cloning of large tensors.
* **Enhanced scheduler wrapping**: `BaseAsyncScheduler` automatically wraps schedulers with improved `__getattr__`, `__setattr__`, and debugging methods (`__repr__`, `__str__`).

## How the server works (high-level flow)

1. **Single model instance** is loaded into memory (GPU/MPS) when the server starts.
2. On each HTTP inference request:

* The server uses `RequestScopedPipeline.generate(...)` which:

* automatically wraps the base scheduler in `BaseAsyncScheduler` (if not already wrapped),
* obtains a *local scheduler* (via `clone_for_request()` or `deepcopy`),
* does `local_pipe = copy.copy(base_pipe)` (shallow copy),
* sets `local_pipe.scheduler = local_scheduler` (if possible),
* clones only small mutable attributes (callbacks, rng, small latents) with auto-detection,
* wraps tokenizers with thread-safe locks to prevent race conditions,
* optionally enters a `model_cpu_offload_context()` for memory offload hooks,
* calls the pipeline on the local view (`local_pipe(...)`).
3. **Result**: inference completes, images are moved to CPU & saved (if requested), internal buffers freed (GC + `torch.cuda.empty_cache()`).
4. Multiple requests can run in parallel while sharing heavy weights and isolating mutable state.

## How to set up and run the server

### 1) Install dependencies

Recommended: create a virtualenv / conda environment.

```bash
pip install diffusers
pip install -r requirements.txt
```

### 2) Start the server

Using the `serverasync.py` file that already has everything you need:

```bash
python serverasync.py
```

The server will start on `http://localhost:8500` by default with the following features:
- FastAPI application with async lifespan management
- Automatic model loading and pipeline initialization
- Request counting and active inference tracking
- Memory cleanup after each inference
- CORS middleware for cross-origin requests

### 3) Test the server

Use the included test script:

```bash
python test.py
```

Or send a manual request:

`POST /api/diffusers/inference` with JSON body:

```json
{
"prompt": "A futuristic cityscape, vibrant colors",
"num_inference_steps": 30,
"num_images_per_prompt": 1
}
```

Response example:

```json
{
"response": ["http://localhost:8500/images/img123.png"]
}
```

### 4) Server endpoints

- `GET /` - Welcome message
- `POST /api/diffusers/inference` - Main inference endpoint
- `GET /images/{filename}` - Serve generated images
- `GET /api/status` - Server status and memory info

## Advanced Configuration

### RequestScopedPipeline Parameters

```python
RequestScopedPipeline(
pipeline, # Base pipeline to wrap
mutable_attrs=None, # Custom list of attributes to clone
auto_detect_mutables=True, # Enable automatic detection of mutable attributes
tensor_numel_threshold=1_000_000, # Tensor size threshold for cloning
tokenizer_lock=None, # Custom threading lock for tokenizers
wrap_scheduler=True # Auto-wrap scheduler in BaseAsyncScheduler
)
```

### BaseAsyncScheduler Features

* Transparent proxy to the original scheduler with `__getattr__` and `__setattr__`
* `clone_for_request()` method for safe per-request scheduler cloning
* Enhanced debugging with `__repr__` and `__str__` methods
* Full compatibility with existing scheduler APIs

### Server Configuration

The server configuration can be modified in `serverasync.py` through the `ServerConfigModels` dataclass:

```python
@dataclass
class ServerConfigModels:
model: str = 'stabilityai/stable-diffusion-3.5-medium'
type_models: str = 't2im'
host: str = '0.0.0.0'
port: int = 8500
```

## Troubleshooting (quick)

* `Already borrowed` — previously a Rust tokenizer concurrency error.
✅ This is now fixed: `RequestScopedPipeline` automatically detects and wraps tokenizers with thread locks, so race conditions no longer happen.

* `can't set attribute 'components'` — pipeline exposes read-only `components`.
✅ The RequestScopedPipeline now detects read-only properties and skips setting them automatically.

* Scheduler issues:
* If the scheduler doesn't implement `clone_for_request` and `deepcopy` fails, we log and fallback — but prefer `async_retrieve_timesteps(..., return_scheduler=True)` to avoid mutating the shared scheduler.
✅ Note: `async_retrieve_timesteps` is fully retro-compatible — if you don't pass `return_scheduler=True`, the behavior is unchanged.

* Memory issues with large tensors:
✅ The system now has configurable `tensor_numel_threshold` to prevent cloning of large tensors while still cloning small mutable ones.

* Automatic tokenizer detection:
✅ The system automatically identifies tokenizer components by checking for tokenizer methods, class names, and attributes, then applies thread-safe wrappers.
10 changes: 10 additions & 0 deletions examples/server-async/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
torch
torchvision
transformers
sentencepiece
fastapi
uvicorn
ftfy
accelerate
xformers
protobuf
Loading
Loading