Skip to content

Commit 5474c3a

Browse files
authored
Merge branch 'main' into xformers_flux
2 parents cb0baf8 + 7e7e62c commit 5474c3a

File tree

11 files changed

+1100
-47
lines changed

11 files changed

+1100
-47
lines changed

examples/server-async/Pipelines.py

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
import logging
2+
import os
3+
from dataclasses import dataclass, field
4+
from typing import List
5+
6+
import torch
7+
from pydantic import BaseModel
8+
9+
from diffusers.pipelines.stable_diffusion_3.pipeline_stable_diffusion_3 import StableDiffusion3Pipeline
10+
11+
12+
logger = logging.getLogger(__name__)
13+
14+
15+
class TextToImageInput(BaseModel):
16+
model: str
17+
prompt: str
18+
size: str | None = None
19+
n: int | None = None
20+
21+
22+
@dataclass
23+
class PresetModels:
24+
SD3: List[str] = field(default_factory=lambda: ["stabilityai/stable-diffusion-3-medium"])
25+
SD3_5: List[str] = field(
26+
default_factory=lambda: [
27+
"stabilityai/stable-diffusion-3.5-large",
28+
"stabilityai/stable-diffusion-3.5-large-turbo",
29+
"stabilityai/stable-diffusion-3.5-medium",
30+
]
31+
)
32+
33+
34+
class TextToImagePipelineSD3:
35+
def __init__(self, model_path: str | None = None):
36+
self.model_path = model_path or os.getenv("MODEL_PATH")
37+
self.pipeline: StableDiffusion3Pipeline | None = None
38+
self.device: str | None = None
39+
40+
def start(self):
41+
if torch.cuda.is_available():
42+
model_path = self.model_path or "stabilityai/stable-diffusion-3.5-large"
43+
logger.info("Loading CUDA")
44+
self.device = "cuda"
45+
self.pipeline = StableDiffusion3Pipeline.from_pretrained(
46+
model_path,
47+
torch_dtype=torch.float16,
48+
).to(device=self.device)
49+
elif torch.backends.mps.is_available():
50+
model_path = self.model_path or "stabilityai/stable-diffusion-3.5-medium"
51+
logger.info("Loading MPS for Mac M Series")
52+
self.device = "mps"
53+
self.pipeline = StableDiffusion3Pipeline.from_pretrained(
54+
model_path,
55+
torch_dtype=torch.bfloat16,
56+
).to(device=self.device)
57+
else:
58+
raise Exception("No CUDA or MPS device available")
59+
60+
61+
class ModelPipelineInitializer:
62+
def __init__(self, model: str = "", type_models: str = "t2im"):
63+
self.model = model
64+
self.type_models = type_models
65+
self.pipeline = None
66+
self.device = "cuda" if torch.cuda.is_available() else "mps"
67+
self.model_type = None
68+
69+
def initialize_pipeline(self):
70+
if not self.model:
71+
raise ValueError("Model name not provided")
72+
73+
# Check if model exists in PresetModels
74+
preset_models = PresetModels()
75+
76+
# Determine which model type we're dealing with
77+
if self.model in preset_models.SD3:
78+
self.model_type = "SD3"
79+
elif self.model in preset_models.SD3_5:
80+
self.model_type = "SD3_5"
81+
82+
# Create appropriate pipeline based on model type and type_models
83+
if self.type_models == "t2im":
84+
if self.model_type in ["SD3", "SD3_5"]:
85+
self.pipeline = TextToImagePipelineSD3(self.model)
86+
else:
87+
raise ValueError(f"Model type {self.model_type} not supported for text-to-image")
88+
elif self.type_models == "t2v":
89+
raise ValueError(f"Unsupported type_models: {self.type_models}")
90+
91+
return self.pipeline

examples/server-async/README.md

Lines changed: 171 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,171 @@
1+
# Asynchronous server and parallel execution of models
2+
3+
> Example/demo server that keeps a single model in memory while safely running parallel inference requests by creating per-request lightweight views and cloning only small, stateful components (schedulers, RNG state, small mutable attrs). Works with StableDiffusion3 pipelines.
4+
> We recommend running 10 to 50 inferences in parallel for optimal performance, averaging between 25 and 30 seconds to 1 minute and 1 minute and 30 seconds. (This is only recommended if you have a GPU with 35GB of VRAM or more; otherwise, keep it to one or two inferences in parallel to avoid decoding or saving errors due to memory shortages.)
5+
6+
## ⚠️ IMPORTANT
7+
8+
* The example demonstrates how to run pipelines like `StableDiffusion3-3.5` concurrently while keeping a single copy of the heavy model parameters on GPU.
9+
10+
## Necessary components
11+
12+
All the components needed to create the inference server are in the current directory:
13+
14+
```
15+
server-async/
16+
├── utils/
17+
├─────── __init__.py
18+
├─────── scheduler.py # BaseAsyncScheduler wrapper and async_retrieve_timesteps for secure inferences
19+
├─────── requestscopedpipeline.py # RequestScoped Pipeline for inference with a single in-memory model
20+
├─────── utils.py # Image/video saving utilities and service configuration
21+
├── Pipelines.py # pipeline loader classes (SD3)
22+
├── serverasync.py # FastAPI app with lifespan management and async inference endpoints
23+
├── test.py # Client test script for inference requests
24+
├── requirements.txt # Dependencies
25+
└── README.md # This documentation
26+
```
27+
28+
## What `diffusers-async` adds / Why we needed it
29+
30+
Core problem: a naive server that calls `pipe.__call__` concurrently can hit **race conditions** (e.g., `scheduler.set_timesteps` mutates shared state) or explode memory by deep-copying the whole pipeline per-request.
31+
32+
`diffusers-async` / this example addresses that by:
33+
34+
* **Request-scoped views**: `RequestScopedPipeline` creates a shallow copy of the pipeline per request so heavy weights (UNet, VAE, text encoder) remain shared and *are not duplicated*.
35+
* **Per-request mutable state**: stateful small objects (scheduler, RNG state, small lists/dicts, callbacks) are cloned per request. The system uses `BaseAsyncScheduler.clone_for_request(...)` for scheduler cloning, with fallback to safe `deepcopy` or other heuristics.
36+
* **Tokenizer concurrency safety**: `RequestScopedPipeline` now manages an internal tokenizer lock with automatic tokenizer detection and wrapping. This ensures that Rust tokenizers are safe to use under concurrency — race condition errors like `Already borrowed` no longer occur.
37+
* **`async_retrieve_timesteps(..., return_scheduler=True)`**: fully retro-compatible helper that returns `(timesteps, num_inference_steps, scheduler)` without mutating the shared scheduler. For users not using `return_scheduler=True`, the behavior is identical to the original API.
38+
* **Robust attribute handling**: wrapper avoids writing to read-only properties (e.g., `components`) and auto-detects small mutable attributes to clone while avoiding duplication of large tensors. Configurable tensor size threshold prevents cloning of large tensors.
39+
* **Enhanced scheduler wrapping**: `BaseAsyncScheduler` automatically wraps schedulers with improved `__getattr__`, `__setattr__`, and debugging methods (`__repr__`, `__str__`).
40+
41+
## How the server works (high-level flow)
42+
43+
1. **Single model instance** is loaded into memory (GPU/MPS) when the server starts.
44+
2. On each HTTP inference request:
45+
46+
* The server uses `RequestScopedPipeline.generate(...)` which:
47+
48+
* automatically wraps the base scheduler in `BaseAsyncScheduler` (if not already wrapped),
49+
* obtains a *local scheduler* (via `clone_for_request()` or `deepcopy`),
50+
* does `local_pipe = copy.copy(base_pipe)` (shallow copy),
51+
* sets `local_pipe.scheduler = local_scheduler` (if possible),
52+
* clones only small mutable attributes (callbacks, rng, small latents) with auto-detection,
53+
* wraps tokenizers with thread-safe locks to prevent race conditions,
54+
* optionally enters a `model_cpu_offload_context()` for memory offload hooks,
55+
* calls the pipeline on the local view (`local_pipe(...)`).
56+
3. **Result**: inference completes, images are moved to CPU & saved (if requested), internal buffers freed (GC + `torch.cuda.empty_cache()`).
57+
4. Multiple requests can run in parallel while sharing heavy weights and isolating mutable state.
58+
59+
## How to set up and run the server
60+
61+
### 1) Install dependencies
62+
63+
Recommended: create a virtualenv / conda environment.
64+
65+
```bash
66+
pip install diffusers
67+
pip install -r requirements.txt
68+
```
69+
70+
### 2) Start the server
71+
72+
Using the `serverasync.py` file that already has everything you need:
73+
74+
```bash
75+
python serverasync.py
76+
```
77+
78+
The server will start on `http://localhost:8500` by default with the following features:
79+
- FastAPI application with async lifespan management
80+
- Automatic model loading and pipeline initialization
81+
- Request counting and active inference tracking
82+
- Memory cleanup after each inference
83+
- CORS middleware for cross-origin requests
84+
85+
### 3) Test the server
86+
87+
Use the included test script:
88+
89+
```bash
90+
python test.py
91+
```
92+
93+
Or send a manual request:
94+
95+
`POST /api/diffusers/inference` with JSON body:
96+
97+
```json
98+
{
99+
"prompt": "A futuristic cityscape, vibrant colors",
100+
"num_inference_steps": 30,
101+
"num_images_per_prompt": 1
102+
}
103+
```
104+
105+
Response example:
106+
107+
```json
108+
{
109+
"response": ["http://localhost:8500/images/img123.png"]
110+
}
111+
```
112+
113+
### 4) Server endpoints
114+
115+
- `GET /` - Welcome message
116+
- `POST /api/diffusers/inference` - Main inference endpoint
117+
- `GET /images/{filename}` - Serve generated images
118+
- `GET /api/status` - Server status and memory info
119+
120+
## Advanced Configuration
121+
122+
### RequestScopedPipeline Parameters
123+
124+
```python
125+
RequestScopedPipeline(
126+
pipeline, # Base pipeline to wrap
127+
mutable_attrs=None, # Custom list of attributes to clone
128+
auto_detect_mutables=True, # Enable automatic detection of mutable attributes
129+
tensor_numel_threshold=1_000_000, # Tensor size threshold for cloning
130+
tokenizer_lock=None, # Custom threading lock for tokenizers
131+
wrap_scheduler=True # Auto-wrap scheduler in BaseAsyncScheduler
132+
)
133+
```
134+
135+
### BaseAsyncScheduler Features
136+
137+
* Transparent proxy to the original scheduler with `__getattr__` and `__setattr__`
138+
* `clone_for_request()` method for safe per-request scheduler cloning
139+
* Enhanced debugging with `__repr__` and `__str__` methods
140+
* Full compatibility with existing scheduler APIs
141+
142+
### Server Configuration
143+
144+
The server configuration can be modified in `serverasync.py` through the `ServerConfigModels` dataclass:
145+
146+
```python
147+
@dataclass
148+
class ServerConfigModels:
149+
model: str = 'stabilityai/stable-diffusion-3.5-medium'
150+
type_models: str = 't2im'
151+
host: str = '0.0.0.0'
152+
port: int = 8500
153+
```
154+
155+
## Troubleshooting (quick)
156+
157+
* `Already borrowed` — previously a Rust tokenizer concurrency error.
158+
✅ This is now fixed: `RequestScopedPipeline` automatically detects and wraps tokenizers with thread locks, so race conditions no longer happen.
159+
160+
* `can't set attribute 'components'` — pipeline exposes read-only `components`.
161+
✅ The RequestScopedPipeline now detects read-only properties and skips setting them automatically.
162+
163+
* Scheduler issues:
164+
* If the scheduler doesn't implement `clone_for_request` and `deepcopy` fails, we log and fallback — but prefer `async_retrieve_timesteps(..., return_scheduler=True)` to avoid mutating the shared scheduler.
165+
✅ Note: `async_retrieve_timesteps` is fully retro-compatible — if you don't pass `return_scheduler=True`, the behavior is unchanged.
166+
167+
* Memory issues with large tensors:
168+
✅ The system now has configurable `tensor_numel_threshold` to prevent cloning of large tensors while still cloning small mutable ones.
169+
170+
* Automatic tokenizer detection:
171+
✅ The system automatically identifies tokenizer components by checking for tokenizer methods, class names, and attributes, then applies thread-safe wrappers.
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
torch
2+
torchvision
3+
transformers
4+
sentencepiece
5+
fastapi
6+
uvicorn
7+
ftfy
8+
accelerate
9+
xformers
10+
protobuf

0 commit comments

Comments
 (0)