|
| 1 | +# Asynchronous server and parallel execution of models |
| 2 | + |
| 3 | +> Example/demo server that keeps a single model in memory while safely running parallel inference requests by creating per-request lightweight views and cloning only small, stateful components (schedulers, RNG state, small mutable attrs). Works with StableDiffusion3 pipelines. |
| 4 | +> We recommend running 10 to 50 inferences in parallel for optimal performance, averaging between 25 and 30 seconds to 1 minute and 1 minute and 30 seconds. (This is only recommended if you have a GPU with 35GB of VRAM or more; otherwise, keep it to one or two inferences in parallel to avoid decoding or saving errors due to memory shortages.) |
| 5 | +
|
| 6 | +## ⚠️ IMPORTANT |
| 7 | + |
| 8 | +* The example demonstrates how to run pipelines like `StableDiffusion3-3.5` concurrently while keeping a single copy of the heavy model parameters on GPU. |
| 9 | + |
| 10 | +## Necessary components |
| 11 | + |
| 12 | +All the components needed to create the inference server are in the current directory: |
| 13 | + |
| 14 | +``` |
| 15 | +server-async/ |
| 16 | +├── utils/ |
| 17 | +├─────── __init__.py |
| 18 | +├─────── scheduler.py # BaseAsyncScheduler wrapper and async_retrieve_timesteps for secure inferences |
| 19 | +├─────── requestscopedpipeline.py # RequestScoped Pipeline for inference with a single in-memory model |
| 20 | +├─────── utils.py # Image/video saving utilities and service configuration |
| 21 | +├── Pipelines.py # pipeline loader classes (SD3) |
| 22 | +├── serverasync.py # FastAPI app with lifespan management and async inference endpoints |
| 23 | +├── test.py # Client test script for inference requests |
| 24 | +├── requirements.txt # Dependencies |
| 25 | +└── README.md # This documentation |
| 26 | +``` |
| 27 | + |
| 28 | +## What `diffusers-async` adds / Why we needed it |
| 29 | + |
| 30 | +Core problem: a naive server that calls `pipe.__call__` concurrently can hit **race conditions** (e.g., `scheduler.set_timesteps` mutates shared state) or explode memory by deep-copying the whole pipeline per-request. |
| 31 | + |
| 32 | +`diffusers-async` / this example addresses that by: |
| 33 | + |
| 34 | +* **Request-scoped views**: `RequestScopedPipeline` creates a shallow copy of the pipeline per request so heavy weights (UNet, VAE, text encoder) remain shared and *are not duplicated*. |
| 35 | +* **Per-request mutable state**: stateful small objects (scheduler, RNG state, small lists/dicts, callbacks) are cloned per request. The system uses `BaseAsyncScheduler.clone_for_request(...)` for scheduler cloning, with fallback to safe `deepcopy` or other heuristics. |
| 36 | +* **Tokenizer concurrency safety**: `RequestScopedPipeline` now manages an internal tokenizer lock with automatic tokenizer detection and wrapping. This ensures that Rust tokenizers are safe to use under concurrency — race condition errors like `Already borrowed` no longer occur. |
| 37 | +* **`async_retrieve_timesteps(..., return_scheduler=True)`**: fully retro-compatible helper that returns `(timesteps, num_inference_steps, scheduler)` without mutating the shared scheduler. For users not using `return_scheduler=True`, the behavior is identical to the original API. |
| 38 | +* **Robust attribute handling**: wrapper avoids writing to read-only properties (e.g., `components`) and auto-detects small mutable attributes to clone while avoiding duplication of large tensors. Configurable tensor size threshold prevents cloning of large tensors. |
| 39 | +* **Enhanced scheduler wrapping**: `BaseAsyncScheduler` automatically wraps schedulers with improved `__getattr__`, `__setattr__`, and debugging methods (`__repr__`, `__str__`). |
| 40 | + |
| 41 | +## How the server works (high-level flow) |
| 42 | + |
| 43 | +1. **Single model instance** is loaded into memory (GPU/MPS) when the server starts. |
| 44 | +2. On each HTTP inference request: |
| 45 | + |
| 46 | + * The server uses `RequestScopedPipeline.generate(...)` which: |
| 47 | + |
| 48 | + * automatically wraps the base scheduler in `BaseAsyncScheduler` (if not already wrapped), |
| 49 | + * obtains a *local scheduler* (via `clone_for_request()` or `deepcopy`), |
| 50 | + * does `local_pipe = copy.copy(base_pipe)` (shallow copy), |
| 51 | + * sets `local_pipe.scheduler = local_scheduler` (if possible), |
| 52 | + * clones only small mutable attributes (callbacks, rng, small latents) with auto-detection, |
| 53 | + * wraps tokenizers with thread-safe locks to prevent race conditions, |
| 54 | + * optionally enters a `model_cpu_offload_context()` for memory offload hooks, |
| 55 | + * calls the pipeline on the local view (`local_pipe(...)`). |
| 56 | +3. **Result**: inference completes, images are moved to CPU & saved (if requested), internal buffers freed (GC + `torch.cuda.empty_cache()`). |
| 57 | +4. Multiple requests can run in parallel while sharing heavy weights and isolating mutable state. |
| 58 | + |
| 59 | +## How to set up and run the server |
| 60 | + |
| 61 | +### 1) Install dependencies |
| 62 | + |
| 63 | +Recommended: create a virtualenv / conda environment. |
| 64 | + |
| 65 | +```bash |
| 66 | +pip install diffusers |
| 67 | +pip install -r requirements.txt |
| 68 | +``` |
| 69 | + |
| 70 | +### 2) Start the server |
| 71 | + |
| 72 | +Using the `serverasync.py` file that already has everything you need: |
| 73 | + |
| 74 | +```bash |
| 75 | +python serverasync.py |
| 76 | +``` |
| 77 | + |
| 78 | +The server will start on `http://localhost:8500` by default with the following features: |
| 79 | +- FastAPI application with async lifespan management |
| 80 | +- Automatic model loading and pipeline initialization |
| 81 | +- Request counting and active inference tracking |
| 82 | +- Memory cleanup after each inference |
| 83 | +- CORS middleware for cross-origin requests |
| 84 | + |
| 85 | +### 3) Test the server |
| 86 | + |
| 87 | +Use the included test script: |
| 88 | + |
| 89 | +```bash |
| 90 | +python test.py |
| 91 | +``` |
| 92 | + |
| 93 | +Or send a manual request: |
| 94 | + |
| 95 | +`POST /api/diffusers/inference` with JSON body: |
| 96 | + |
| 97 | +```json |
| 98 | +{ |
| 99 | + "prompt": "A futuristic cityscape, vibrant colors", |
| 100 | + "num_inference_steps": 30, |
| 101 | + "num_images_per_prompt": 1 |
| 102 | +} |
| 103 | +``` |
| 104 | + |
| 105 | +Response example: |
| 106 | + |
| 107 | +```json |
| 108 | +{ |
| 109 | + "response": ["http://localhost:8500/images/img123.png"] |
| 110 | +} |
| 111 | +``` |
| 112 | + |
| 113 | +### 4) Server endpoints |
| 114 | + |
| 115 | +- `GET /` - Welcome message |
| 116 | +- `POST /api/diffusers/inference` - Main inference endpoint |
| 117 | +- `GET /images/{filename}` - Serve generated images |
| 118 | +- `GET /api/status` - Server status and memory info |
| 119 | + |
| 120 | +## Advanced Configuration |
| 121 | + |
| 122 | +### RequestScopedPipeline Parameters |
| 123 | + |
| 124 | +```python |
| 125 | +RequestScopedPipeline( |
| 126 | + pipeline, # Base pipeline to wrap |
| 127 | + mutable_attrs=None, # Custom list of attributes to clone |
| 128 | + auto_detect_mutables=True, # Enable automatic detection of mutable attributes |
| 129 | + tensor_numel_threshold=1_000_000, # Tensor size threshold for cloning |
| 130 | + tokenizer_lock=None, # Custom threading lock for tokenizers |
| 131 | + wrap_scheduler=True # Auto-wrap scheduler in BaseAsyncScheduler |
| 132 | +) |
| 133 | +``` |
| 134 | + |
| 135 | +### BaseAsyncScheduler Features |
| 136 | + |
| 137 | +* Transparent proxy to the original scheduler with `__getattr__` and `__setattr__` |
| 138 | +* `clone_for_request()` method for safe per-request scheduler cloning |
| 139 | +* Enhanced debugging with `__repr__` and `__str__` methods |
| 140 | +* Full compatibility with existing scheduler APIs |
| 141 | + |
| 142 | +### Server Configuration |
| 143 | + |
| 144 | +The server configuration can be modified in `serverasync.py` through the `ServerConfigModels` dataclass: |
| 145 | + |
| 146 | +```python |
| 147 | +@dataclass |
| 148 | +class ServerConfigModels: |
| 149 | + model: str = 'stabilityai/stable-diffusion-3.5-medium' |
| 150 | + type_models: str = 't2im' |
| 151 | + host: str = '0.0.0.0' |
| 152 | + port: int = 8500 |
| 153 | +``` |
| 154 | + |
| 155 | +## Troubleshooting (quick) |
| 156 | + |
| 157 | +* `Already borrowed` — previously a Rust tokenizer concurrency error. |
| 158 | + ✅ This is now fixed: `RequestScopedPipeline` automatically detects and wraps tokenizers with thread locks, so race conditions no longer happen. |
| 159 | + |
| 160 | +* `can't set attribute 'components'` — pipeline exposes read-only `components`. |
| 161 | + ✅ The RequestScopedPipeline now detects read-only properties and skips setting them automatically. |
| 162 | + |
| 163 | +* Scheduler issues: |
| 164 | + * If the scheduler doesn't implement `clone_for_request` and `deepcopy` fails, we log and fallback — but prefer `async_retrieve_timesteps(..., return_scheduler=True)` to avoid mutating the shared scheduler. |
| 165 | + ✅ Note: `async_retrieve_timesteps` is fully retro-compatible — if you don't pass `return_scheduler=True`, the behavior is unchanged. |
| 166 | + |
| 167 | +* Memory issues with large tensors: |
| 168 | + ✅ The system now has configurable `tensor_numel_threshold` to prevent cloning of large tensors while still cloning small mutable ones. |
| 169 | + |
| 170 | +* Automatic tokenizer detection: |
| 171 | + ✅ The system automatically identifies tokenizer components by checking for tokenizer methods, class names, and attributes, then applies thread-safe wrappers. |
0 commit comments